Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ fb8dcb62

History | View | Annotate | Download (169.9 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0613,W0201
25

    
26
import os
27
import os.path
28
import sha
29
import time
30
import tempfile
31
import re
32
import platform
33

    
34
from ganeti import rpc
35
from ganeti import ssh
36
from ganeti import logger
37
from ganeti import utils
38
from ganeti import errors
39
from ganeti import hypervisor
40
from ganeti import locking
41
from ganeti import config
42
from ganeti import constants
43
from ganeti import objects
44
from ganeti import opcodes
45
from ganeti import ssconf
46
from ganeti import serializer
47

    
48

    
49
class LogicalUnit(object):
50
  """Logical Unit base class.
51

52
  Subclasses must follow these rules:
53
    - implement ExpandNames
54
    - implement CheckPrereq
55
    - implement Exec
56
    - implement BuildHooksEnv
57
    - redefine HPATH and HTYPE
58
    - optionally redefine their run requirements:
59
        REQ_MASTER: the LU needs to run on the master node
60
        REQ_WSSTORE: the LU needs a writable SimpleStore
61
        REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
62

63
  Note that all commands require root permissions.
64

65
  """
66
  HPATH = None
67
  HTYPE = None
68
  _OP_REQP = []
69
  REQ_MASTER = True
70
  REQ_WSSTORE = False
71
  REQ_BGL = True
72

    
73
  def __init__(self, processor, op, context, sstore):
74
    """Constructor for LogicalUnit.
75

76
    This needs to be overriden in derived classes in order to check op
77
    validity.
78

79
    """
80
    self.proc = processor
81
    self.op = op
82
    self.cfg = context.cfg
83
    self.sstore = sstore
84
    self.context = context
85
    self.needed_locks = None
86
    self.__ssh = None
87

    
88
    for attr_name in self._OP_REQP:
89
      attr_val = getattr(op, attr_name, None)
90
      if attr_val is None:
91
        raise errors.OpPrereqError("Required parameter '%s' missing" %
92
                                   attr_name)
93

    
94
    if not self.cfg.IsCluster():
95
      raise errors.OpPrereqError("Cluster not initialized yet,"
96
                                 " use 'gnt-cluster init' first.")
97
    if self.REQ_MASTER:
98
      master = sstore.GetMasterNode()
99
      if master != utils.HostInfo().name:
100
        raise errors.OpPrereqError("Commands must be run on the master"
101
                                   " node %s" % master)
102

    
103
  def __GetSSH(self):
104
    """Returns the SshRunner object
105

106
    """
107
    if not self.__ssh:
108
      self.__ssh = ssh.SshRunner(self.sstore)
109
    return self.__ssh
110

    
111
  ssh = property(fget=__GetSSH)
112

    
113
  def ExpandNames(self):
114
    """Expand names for this LU.
115

116
    This method is called before starting to execute the opcode, and it should
117
    update all the parameters of the opcode to their canonical form (e.g. a
118
    short node name must be fully expanded after this method has successfully
119
    completed). This way locking, hooks, logging, ecc. can work correctly.
120

121
    LUs which implement this method must also populate the self.needed_locks
122
    member, as a dict with lock levels as keys, and a list of needed lock names
123
    as values. Rules:
124
      - Use an empty dict if you don't need any lock
125
      - If you don't need any lock at a particular level omit that level
126
      - Don't put anything for the BGL level
127
      - If you want all locks at a level use None as a value
128
        (this reflects what LockSet does, and will be replaced before
129
        CheckPrereq with the full list of nodes that have been locked)
130

131
    Examples:
132
    # Acquire all nodes and one instance
133
    self.needed_locks = {
134
      locking.LEVEL_NODE: None,
135
      locking.LEVEL_INSTANCES: ['instance1.example.tld'],
136
    }
137
    # Acquire just two nodes
138
    self.needed_locks = {
139
      locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
140
    }
141
    # Acquire no locks
142
    self.needed_locks = {} # No, you can't leave it to the default value None
143

144
    """
145
    # The implementation of this method is mandatory only if the new LU is
146
    # concurrent, so that old LUs don't need to be changed all at the same
147
    # time.
148
    if self.REQ_BGL:
149
      self.needed_locks = {} # Exclusive LUs don't need locks.
150
    else:
151
      raise NotImplementedError
152

    
153
  def DeclareLocks(self, level):
154
    """Declare LU locking needs for a level
155

156
    While most LUs can just declare their locking needs at ExpandNames time,
157
    sometimes there's the need to calculate some locks after having acquired
158
    the ones before. This function is called just before acquiring locks at a
159
    particular level, but after acquiring the ones at lower levels, and permits
160
    such calculations. It can be used to modify self.needed_locks, and by
161
    default it does nothing.
162

163
    This function is only called if you have something already set in
164
    self.needed_locks for the level.
165

166
    @param level: Locking level which is going to be locked
167
    @type level: member of ganeti.locking.LEVELS
168

169
    """
170

    
171
  def CheckPrereq(self):
172
    """Check prerequisites for this LU.
173

174
    This method should check that the prerequisites for the execution
175
    of this LU are fulfilled. It can do internode communication, but
176
    it should be idempotent - no cluster or system changes are
177
    allowed.
178

179
    The method should raise errors.OpPrereqError in case something is
180
    not fulfilled. Its return value is ignored.
181

182
    This method should also update all the parameters of the opcode to
183
    their canonical form if it hasn't been done by ExpandNames before.
184

185
    """
186
    raise NotImplementedError
187

    
188
  def Exec(self, feedback_fn):
189
    """Execute the LU.
190

191
    This method should implement the actual work. It should raise
192
    errors.OpExecError for failures that are somewhat dealt with in
193
    code, or expected.
194

195
    """
196
    raise NotImplementedError
197

    
198
  def BuildHooksEnv(self):
199
    """Build hooks environment for this LU.
200

201
    This method should return a three-node tuple consisting of: a dict
202
    containing the environment that will be used for running the
203
    specific hook for this LU, a list of node names on which the hook
204
    should run before the execution, and a list of node names on which
205
    the hook should run after the execution.
206

207
    The keys of the dict must not have 'GANETI_' prefixed as this will
208
    be handled in the hooks runner. Also note additional keys will be
209
    added by the hooks runner. If the LU doesn't define any
210
    environment, an empty dict (and not None) should be returned.
211

212
    No nodes should be returned as an empty list (and not None).
213

214
    Note that if the HPATH for a LU class is None, this function will
215
    not be called.
216

217
    """
218
    raise NotImplementedError
219

    
220
  def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
221
    """Notify the LU about the results of its hooks.
222

223
    This method is called every time a hooks phase is executed, and notifies
224
    the Logical Unit about the hooks' result. The LU can then use it to alter
225
    its result based on the hooks.  By default the method does nothing and the
226
    previous result is passed back unchanged but any LU can define it if it
227
    wants to use the local cluster hook-scripts somehow.
228

229
    Args:
230
      phase: the hooks phase that has just been run
231
      hooks_results: the results of the multi-node hooks rpc call
232
      feedback_fn: function to send feedback back to the caller
233
      lu_result: the previous result this LU had, or None in the PRE phase.
234

235
    """
236
    return lu_result
237

    
238
  def _ExpandAndLockInstance(self):
239
    """Helper function to expand and lock an instance.
240

241
    Many LUs that work on an instance take its name in self.op.instance_name
242
    and need to expand it and then declare the expanded name for locking. This
243
    function does it, and then updates self.op.instance_name to the expanded
244
    name. It also initializes needed_locks as a dict, if this hasn't been done
245
    before.
246

247
    """
248
    if self.needed_locks is None:
249
      self.needed_locks = {}
250
    else:
251
      assert locking.LEVEL_INSTANCE not in self.needed_locks, \
252
        "_ExpandAndLockInstance called with instance-level locks set"
253
    expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
254
    if expanded_name is None:
255
      raise errors.OpPrereqError("Instance '%s' not known" %
256
                                  self.op.instance_name)
257
    self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
258
    self.op.instance_name = expanded_name
259

    
260

    
261
class NoHooksLU(LogicalUnit):
262
  """Simple LU which runs no hooks.
263

264
  This LU is intended as a parent for other LogicalUnits which will
265
  run no hooks, in order to reduce duplicate code.
266

267
  """
268
  HPATH = None
269
  HTYPE = None
270

    
271

    
272
def _GetWantedNodes(lu, nodes):
273
  """Returns list of checked and expanded node names.
274

275
  Args:
276
    nodes: List of nodes (strings) or None for all
277

278
  """
279
  if not isinstance(nodes, list):
280
    raise errors.OpPrereqError("Invalid argument type 'nodes'")
281

    
282
  if nodes:
283
    wanted = []
284

    
285
    for name in nodes:
286
      node = lu.cfg.ExpandNodeName(name)
287
      if node is None:
288
        raise errors.OpPrereqError("No such node name '%s'" % name)
289
      wanted.append(node)
290

    
291
  else:
292
    wanted = lu.cfg.GetNodeList()
293
  return utils.NiceSort(wanted)
294

    
295

    
296
def _GetWantedInstances(lu, instances):
297
  """Returns list of checked and expanded instance names.
298

299
  Args:
300
    instances: List of instances (strings) or None for all
301

302
  """
303
  if not isinstance(instances, list):
304
    raise errors.OpPrereqError("Invalid argument type 'instances'")
305

    
306
  if instances:
307
    wanted = []
308

    
309
    for name in instances:
310
      instance = lu.cfg.ExpandInstanceName(name)
311
      if instance is None:
312
        raise errors.OpPrereqError("No such instance name '%s'" % name)
313
      wanted.append(instance)
314

    
315
  else:
316
    wanted = lu.cfg.GetInstanceList()
317
  return utils.NiceSort(wanted)
318

    
319

    
320
def _CheckOutputFields(static, dynamic, selected):
321
  """Checks whether all selected fields are valid.
322

323
  Args:
324
    static: Static fields
325
    dynamic: Dynamic fields
326

327
  """
328
  static_fields = frozenset(static)
329
  dynamic_fields = frozenset(dynamic)
330

    
331
  all_fields = static_fields | dynamic_fields
332

    
333
  if not all_fields.issuperset(selected):
334
    raise errors.OpPrereqError("Unknown output fields selected: %s"
335
                               % ",".join(frozenset(selected).
336
                                          difference(all_fields)))
337

    
338

    
339
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
340
                          memory, vcpus, nics):
341
  """Builds instance related env variables for hooks from single variables.
342

343
  Args:
344
    secondary_nodes: List of secondary nodes as strings
345
  """
346
  env = {
347
    "OP_TARGET": name,
348
    "INSTANCE_NAME": name,
349
    "INSTANCE_PRIMARY": primary_node,
350
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
351
    "INSTANCE_OS_TYPE": os_type,
352
    "INSTANCE_STATUS": status,
353
    "INSTANCE_MEMORY": memory,
354
    "INSTANCE_VCPUS": vcpus,
355
  }
356

    
357
  if nics:
358
    nic_count = len(nics)
359
    for idx, (ip, bridge, mac) in enumerate(nics):
360
      if ip is None:
361
        ip = ""
362
      env["INSTANCE_NIC%d_IP" % idx] = ip
363
      env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
364
      env["INSTANCE_NIC%d_HWADDR" % idx] = mac
365
  else:
366
    nic_count = 0
367

    
368
  env["INSTANCE_NIC_COUNT"] = nic_count
369

    
370
  return env
371

    
372

    
373
def _BuildInstanceHookEnvByObject(instance, override=None):
374
  """Builds instance related env variables for hooks from an object.
375

376
  Args:
377
    instance: objects.Instance object of instance
378
    override: dict of values to override
379
  """
380
  args = {
381
    'name': instance.name,
382
    'primary_node': instance.primary_node,
383
    'secondary_nodes': instance.secondary_nodes,
384
    'os_type': instance.os,
385
    'status': instance.os,
386
    'memory': instance.memory,
387
    'vcpus': instance.vcpus,
388
    'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
389
  }
390
  if override:
391
    args.update(override)
392
  return _BuildInstanceHookEnv(**args)
393

    
394

    
395
def _CheckInstanceBridgesExist(instance):
396
  """Check that the brigdes needed by an instance exist.
397

398
  """
399
  # check bridges existance
400
  brlist = [nic.bridge for nic in instance.nics]
401
  if not rpc.call_bridges_exist(instance.primary_node, brlist):
402
    raise errors.OpPrereqError("one or more target bridges %s does not"
403
                               " exist on destination node '%s'" %
404
                               (brlist, instance.primary_node))
405

    
406

    
407
class LUDestroyCluster(NoHooksLU):
408
  """Logical unit for destroying the cluster.
409

410
  """
411
  _OP_REQP = []
412

    
413
  def CheckPrereq(self):
414
    """Check prerequisites.
415

416
    This checks whether the cluster is empty.
417

418
    Any errors are signalled by raising errors.OpPrereqError.
419

420
    """
421
    master = self.sstore.GetMasterNode()
422

    
423
    nodelist = self.cfg.GetNodeList()
424
    if len(nodelist) != 1 or nodelist[0] != master:
425
      raise errors.OpPrereqError("There are still %d node(s) in"
426
                                 " this cluster." % (len(nodelist) - 1))
427
    instancelist = self.cfg.GetInstanceList()
428
    if instancelist:
429
      raise errors.OpPrereqError("There are still %d instance(s) in"
430
                                 " this cluster." % len(instancelist))
431

    
432
  def Exec(self, feedback_fn):
433
    """Destroys the cluster.
434

435
    """
436
    master = self.sstore.GetMasterNode()
437
    if not rpc.call_node_stop_master(master, False):
438
      raise errors.OpExecError("Could not disable the master role")
439
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
440
    utils.CreateBackup(priv_key)
441
    utils.CreateBackup(pub_key)
442
    rpc.call_node_leave_cluster(master)
443

    
444

    
445
class LUVerifyCluster(LogicalUnit):
446
  """Verifies the cluster status.
447

448
  """
449
  HPATH = "cluster-verify"
450
  HTYPE = constants.HTYPE_CLUSTER
451
  _OP_REQP = ["skip_checks"]
452

    
453
  def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
454
                  remote_version, feedback_fn):
455
    """Run multiple tests against a node.
456

457
    Test list:
458
      - compares ganeti version
459
      - checks vg existance and size > 20G
460
      - checks config file checksum
461
      - checks ssh to other nodes
462

463
    Args:
464
      node: name of the node to check
465
      file_list: required list of files
466
      local_cksum: dictionary of local files and their checksums
467

468
    """
469
    # compares ganeti version
470
    local_version = constants.PROTOCOL_VERSION
471
    if not remote_version:
472
      feedback_fn("  - ERROR: connection to %s failed" % (node))
473
      return True
474

    
475
    if local_version != remote_version:
476
      feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
477
                      (local_version, node, remote_version))
478
      return True
479

    
480
    # checks vg existance and size > 20G
481

    
482
    bad = False
483
    if not vglist:
484
      feedback_fn("  - ERROR: unable to check volume groups on node %s." %
485
                      (node,))
486
      bad = True
487
    else:
488
      vgstatus = utils.CheckVolumeGroupSize(vglist, self.cfg.GetVGName(),
489
                                            constants.MIN_VG_SIZE)
490
      if vgstatus:
491
        feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
492
        bad = True
493

    
494
    # checks config file checksum
495
    # checks ssh to any
496

    
497
    if 'filelist' not in node_result:
498
      bad = True
499
      feedback_fn("  - ERROR: node hasn't returned file checksum data")
500
    else:
501
      remote_cksum = node_result['filelist']
502
      for file_name in file_list:
503
        if file_name not in remote_cksum:
504
          bad = True
505
          feedback_fn("  - ERROR: file '%s' missing" % file_name)
506
        elif remote_cksum[file_name] != local_cksum[file_name]:
507
          bad = True
508
          feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
509

    
510
    if 'nodelist' not in node_result:
511
      bad = True
512
      feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
513
    else:
514
      if node_result['nodelist']:
515
        bad = True
516
        for node in node_result['nodelist']:
517
          feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
518
                          (node, node_result['nodelist'][node]))
519
    if 'node-net-test' not in node_result:
520
      bad = True
521
      feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
522
    else:
523
      if node_result['node-net-test']:
524
        bad = True
525
        nlist = utils.NiceSort(node_result['node-net-test'].keys())
526
        for node in nlist:
527
          feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
528
                          (node, node_result['node-net-test'][node]))
529

    
530
    hyp_result = node_result.get('hypervisor', None)
531
    if hyp_result is not None:
532
      feedback_fn("  - ERROR: hypervisor verify failure: '%s'" % hyp_result)
533
    return bad
534

    
535
  def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
536
                      node_instance, feedback_fn):
537
    """Verify an instance.
538

539
    This function checks to see if the required block devices are
540
    available on the instance's node.
541

542
    """
543
    bad = False
544

    
545
    node_current = instanceconfig.primary_node
546

    
547
    node_vol_should = {}
548
    instanceconfig.MapLVsByNode(node_vol_should)
549

    
550
    for node in node_vol_should:
551
      for volume in node_vol_should[node]:
552
        if node not in node_vol_is or volume not in node_vol_is[node]:
553
          feedback_fn("  - ERROR: volume %s missing on node %s" %
554
                          (volume, node))
555
          bad = True
556

    
557
    if not instanceconfig.status == 'down':
558
      if (node_current not in node_instance or
559
          not instance in node_instance[node_current]):
560
        feedback_fn("  - ERROR: instance %s not running on node %s" %
561
                        (instance, node_current))
562
        bad = True
563

    
564
    for node in node_instance:
565
      if (not node == node_current):
566
        if instance in node_instance[node]:
567
          feedback_fn("  - ERROR: instance %s should not run on node %s" %
568
                          (instance, node))
569
          bad = True
570

    
571
    return bad
572

    
573
  def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
574
    """Verify if there are any unknown volumes in the cluster.
575

576
    The .os, .swap and backup volumes are ignored. All other volumes are
577
    reported as unknown.
578

579
    """
580
    bad = False
581

    
582
    for node in node_vol_is:
583
      for volume in node_vol_is[node]:
584
        if node not in node_vol_should or volume not in node_vol_should[node]:
585
          feedback_fn("  - ERROR: volume %s on node %s should not exist" %
586
                      (volume, node))
587
          bad = True
588
    return bad
589

    
590
  def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
591
    """Verify the list of running instances.
592

593
    This checks what instances are running but unknown to the cluster.
594

595
    """
596
    bad = False
597
    for node in node_instance:
598
      for runninginstance in node_instance[node]:
599
        if runninginstance not in instancelist:
600
          feedback_fn("  - ERROR: instance %s on node %s should not exist" %
601
                          (runninginstance, node))
602
          bad = True
603
    return bad
604

    
605
  def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
606
    """Verify N+1 Memory Resilience.
607

608
    Check that if one single node dies we can still start all the instances it
609
    was primary for.
610

611
    """
612
    bad = False
613

    
614
    for node, nodeinfo in node_info.iteritems():
615
      # This code checks that every node which is now listed as secondary has
616
      # enough memory to host all instances it is supposed to should a single
617
      # other node in the cluster fail.
618
      # FIXME: not ready for failover to an arbitrary node
619
      # FIXME: does not support file-backed instances
620
      # WARNING: we currently take into account down instances as well as up
621
      # ones, considering that even if they're down someone might want to start
622
      # them even in the event of a node failure.
623
      for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
624
        needed_mem = 0
625
        for instance in instances:
626
          needed_mem += instance_cfg[instance].memory
627
        if nodeinfo['mfree'] < needed_mem:
628
          feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
629
                      " failovers should node %s fail" % (node, prinode))
630
          bad = True
631
    return bad
632

    
633
  def CheckPrereq(self):
634
    """Check prerequisites.
635

636
    Transform the list of checks we're going to skip into a set and check that
637
    all its members are valid.
638

639
    """
640
    self.skip_set = frozenset(self.op.skip_checks)
641
    if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
642
      raise errors.OpPrereqError("Invalid checks to be skipped specified")
643

    
644
  def BuildHooksEnv(self):
645
    """Build hooks env.
646

647
    Cluster-Verify hooks just rone in the post phase and their failure makes
648
    the output be logged in the verify output and the verification to fail.
649

650
    """
651
    all_nodes = self.cfg.GetNodeList()
652
    # TODO: populate the environment with useful information for verify hooks
653
    env = {}
654
    return env, [], all_nodes
655

    
656
  def Exec(self, feedback_fn):
657
    """Verify integrity of cluster, performing various test on nodes.
658

659
    """
660
    bad = False
661
    feedback_fn("* Verifying global settings")
662
    for msg in self.cfg.VerifyConfig():
663
      feedback_fn("  - ERROR: %s" % msg)
664

    
665
    vg_name = self.cfg.GetVGName()
666
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
667
    nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
668
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
669
    i_non_redundant = [] # Non redundant instances
670
    node_volume = {}
671
    node_instance = {}
672
    node_info = {}
673
    instance_cfg = {}
674

    
675
    # FIXME: verify OS list
676
    # do local checksums
677
    file_names = list(self.sstore.GetFileList())
678
    file_names.append(constants.SSL_CERT_FILE)
679
    file_names.append(constants.CLUSTER_CONF_FILE)
680
    local_checksums = utils.FingerprintFiles(file_names)
681

    
682
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
683
    all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
684
    all_instanceinfo = rpc.call_instance_list(nodelist)
685
    all_vglist = rpc.call_vg_list(nodelist)
686
    node_verify_param = {
687
      'filelist': file_names,
688
      'nodelist': nodelist,
689
      'hypervisor': None,
690
      'node-net-test': [(node.name, node.primary_ip, node.secondary_ip)
691
                        for node in nodeinfo]
692
      }
693
    all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
694
    all_rversion = rpc.call_version(nodelist)
695
    all_ninfo = rpc.call_node_info(nodelist, self.cfg.GetVGName())
696

    
697
    for node in nodelist:
698
      feedback_fn("* Verifying node %s" % node)
699
      result = self._VerifyNode(node, file_names, local_checksums,
700
                                all_vglist[node], all_nvinfo[node],
701
                                all_rversion[node], feedback_fn)
702
      bad = bad or result
703

    
704
      # node_volume
705
      volumeinfo = all_volumeinfo[node]
706

    
707
      if isinstance(volumeinfo, basestring):
708
        feedback_fn("  - ERROR: LVM problem on node %s: %s" %
709
                    (node, volumeinfo[-400:].encode('string_escape')))
710
        bad = True
711
        node_volume[node] = {}
712
      elif not isinstance(volumeinfo, dict):
713
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
714
        bad = True
715
        continue
716
      else:
717
        node_volume[node] = volumeinfo
718

    
719
      # node_instance
720
      nodeinstance = all_instanceinfo[node]
721
      if type(nodeinstance) != list:
722
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
723
        bad = True
724
        continue
725

    
726
      node_instance[node] = nodeinstance
727

    
728
      # node_info
729
      nodeinfo = all_ninfo[node]
730
      if not isinstance(nodeinfo, dict):
731
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
732
        bad = True
733
        continue
734

    
735
      try:
736
        node_info[node] = {
737
          "mfree": int(nodeinfo['memory_free']),
738
          "dfree": int(nodeinfo['vg_free']),
739
          "pinst": [],
740
          "sinst": [],
741
          # dictionary holding all instances this node is secondary for,
742
          # grouped by their primary node. Each key is a cluster node, and each
743
          # value is a list of instances which have the key as primary and the
744
          # current node as secondary.  this is handy to calculate N+1 memory
745
          # availability if you can only failover from a primary to its
746
          # secondary.
747
          "sinst-by-pnode": {},
748
        }
749
      except ValueError:
750
        feedback_fn("  - ERROR: invalid value returned from node %s" % (node,))
751
        bad = True
752
        continue
753

    
754
    node_vol_should = {}
755

    
756
    for instance in instancelist:
757
      feedback_fn("* Verifying instance %s" % instance)
758
      inst_config = self.cfg.GetInstanceInfo(instance)
759
      result =  self._VerifyInstance(instance, inst_config, node_volume,
760
                                     node_instance, feedback_fn)
761
      bad = bad or result
762

    
763
      inst_config.MapLVsByNode(node_vol_should)
764

    
765
      instance_cfg[instance] = inst_config
766

    
767
      pnode = inst_config.primary_node
768
      if pnode in node_info:
769
        node_info[pnode]['pinst'].append(instance)
770
      else:
771
        feedback_fn("  - ERROR: instance %s, connection to primary node"
772
                    " %s failed" % (instance, pnode))
773
        bad = True
774

    
775
      # If the instance is non-redundant we cannot survive losing its primary
776
      # node, so we are not N+1 compliant. On the other hand we have no disk
777
      # templates with more than one secondary so that situation is not well
778
      # supported either.
779
      # FIXME: does not support file-backed instances
780
      if len(inst_config.secondary_nodes) == 0:
781
        i_non_redundant.append(instance)
782
      elif len(inst_config.secondary_nodes) > 1:
783
        feedback_fn("  - WARNING: multiple secondaries for instance %s"
784
                    % instance)
785

    
786
      for snode in inst_config.secondary_nodes:
787
        if snode in node_info:
788
          node_info[snode]['sinst'].append(instance)
789
          if pnode not in node_info[snode]['sinst-by-pnode']:
790
            node_info[snode]['sinst-by-pnode'][pnode] = []
791
          node_info[snode]['sinst-by-pnode'][pnode].append(instance)
792
        else:
793
          feedback_fn("  - ERROR: instance %s, connection to secondary node"
794
                      " %s failed" % (instance, snode))
795

    
796
    feedback_fn("* Verifying orphan volumes")
797
    result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
798
                                       feedback_fn)
799
    bad = bad or result
800

    
801
    feedback_fn("* Verifying remaining instances")
802
    result = self._VerifyOrphanInstances(instancelist, node_instance,
803
                                         feedback_fn)
804
    bad = bad or result
805

    
806
    if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
807
      feedback_fn("* Verifying N+1 Memory redundancy")
808
      result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
809
      bad = bad or result
810

    
811
    feedback_fn("* Other Notes")
812
    if i_non_redundant:
813
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
814
                  % len(i_non_redundant))
815

    
816
    return int(bad)
817

    
818
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
819
    """Analize the post-hooks' result, handle it, and send some
820
    nicely-formatted feedback back to the user.
821

822
    Args:
823
      phase: the hooks phase that has just been run
824
      hooks_results: the results of the multi-node hooks rpc call
825
      feedback_fn: function to send feedback back to the caller
826
      lu_result: previous Exec result
827

828
    """
829
    # We only really run POST phase hooks, and are only interested in their results
830
    if phase == constants.HOOKS_PHASE_POST:
831
      # Used to change hooks' output to proper indentation
832
      indent_re = re.compile('^', re.M)
833
      feedback_fn("* Hooks Results")
834
      if not hooks_results:
835
        feedback_fn("  - ERROR: general communication failure")
836
        lu_result = 1
837
      else:
838
        for node_name in hooks_results:
839
          show_node_header = True
840
          res = hooks_results[node_name]
841
          if res is False or not isinstance(res, list):
842
            feedback_fn("    Communication failure")
843
            lu_result = 1
844
            continue
845
          for script, hkr, output in res:
846
            if hkr == constants.HKR_FAIL:
847
              # The node header is only shown once, if there are
848
              # failing hooks on that node
849
              if show_node_header:
850
                feedback_fn("  Node %s:" % node_name)
851
                show_node_header = False
852
              feedback_fn("    ERROR: Script %s failed, output:" % script)
853
              output = indent_re.sub('      ', output)
854
              feedback_fn("%s" % output)
855
              lu_result = 1
856

    
857
      return lu_result
858

    
859

    
860
class LUVerifyDisks(NoHooksLU):
861
  """Verifies the cluster disks status.
862

863
  """
864
  _OP_REQP = []
865

    
866
  def CheckPrereq(self):
867
    """Check prerequisites.
868

869
    This has no prerequisites.
870

871
    """
872
    pass
873

    
874
  def Exec(self, feedback_fn):
875
    """Verify integrity of cluster disks.
876

877
    """
878
    result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
879

    
880
    vg_name = self.cfg.GetVGName()
881
    nodes = utils.NiceSort(self.cfg.GetNodeList())
882
    instances = [self.cfg.GetInstanceInfo(name)
883
                 for name in self.cfg.GetInstanceList()]
884

    
885
    nv_dict = {}
886
    for inst in instances:
887
      inst_lvs = {}
888
      if (inst.status != "up" or
889
          inst.disk_template not in constants.DTS_NET_MIRROR):
890
        continue
891
      inst.MapLVsByNode(inst_lvs)
892
      # transform { iname: {node: [vol,],},} to {(node, vol): iname}
893
      for node, vol_list in inst_lvs.iteritems():
894
        for vol in vol_list:
895
          nv_dict[(node, vol)] = inst
896

    
897
    if not nv_dict:
898
      return result
899

    
900
    node_lvs = rpc.call_volume_list(nodes, vg_name)
901

    
902
    to_act = set()
903
    for node in nodes:
904
      # node_volume
905
      lvs = node_lvs[node]
906

    
907
      if isinstance(lvs, basestring):
908
        logger.Info("error enumerating LVs on node %s: %s" % (node, lvs))
909
        res_nlvm[node] = lvs
910
      elif not isinstance(lvs, dict):
911
        logger.Info("connection to node %s failed or invalid data returned" %
912
                    (node,))
913
        res_nodes.append(node)
914
        continue
915

    
916
      for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
917
        inst = nv_dict.pop((node, lv_name), None)
918
        if (not lv_online and inst is not None
919
            and inst.name not in res_instances):
920
          res_instances.append(inst.name)
921

    
922
    # any leftover items in nv_dict are missing LVs, let's arrange the
923
    # data better
924
    for key, inst in nv_dict.iteritems():
925
      if inst.name not in res_missing:
926
        res_missing[inst.name] = []
927
      res_missing[inst.name].append(key)
928

    
929
    return result
930

    
931

    
932
class LURenameCluster(LogicalUnit):
933
  """Rename the cluster.
934

935
  """
936
  HPATH = "cluster-rename"
937
  HTYPE = constants.HTYPE_CLUSTER
938
  _OP_REQP = ["name"]
939
  REQ_WSSTORE = True
940

    
941
  def BuildHooksEnv(self):
942
    """Build hooks env.
943

944
    """
945
    env = {
946
      "OP_TARGET": self.sstore.GetClusterName(),
947
      "NEW_NAME": self.op.name,
948
      }
949
    mn = self.sstore.GetMasterNode()
950
    return env, [mn], [mn]
951

    
952
  def CheckPrereq(self):
953
    """Verify that the passed name is a valid one.
954

955
    """
956
    hostname = utils.HostInfo(self.op.name)
957

    
958
    new_name = hostname.name
959
    self.ip = new_ip = hostname.ip
960
    old_name = self.sstore.GetClusterName()
961
    old_ip = self.sstore.GetMasterIP()
962
    if new_name == old_name and new_ip == old_ip:
963
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
964
                                 " cluster has changed")
965
    if new_ip != old_ip:
966
      if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
967
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
968
                                   " reachable on the network. Aborting." %
969
                                   new_ip)
970

    
971
    self.op.name = new_name
972

    
973
  def Exec(self, feedback_fn):
974
    """Rename the cluster.
975

976
    """
977
    clustername = self.op.name
978
    ip = self.ip
979
    ss = self.sstore
980

    
981
    # shutdown the master IP
982
    master = ss.GetMasterNode()
983
    if not rpc.call_node_stop_master(master, False):
984
      raise errors.OpExecError("Could not disable the master role")
985

    
986
    try:
987
      # modify the sstore
988
      ss.SetKey(ss.SS_MASTER_IP, ip)
989
      ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
990

    
991
      # Distribute updated ss config to all nodes
992
      myself = self.cfg.GetNodeInfo(master)
993
      dist_nodes = self.cfg.GetNodeList()
994
      if myself.name in dist_nodes:
995
        dist_nodes.remove(myself.name)
996

    
997
      logger.Debug("Copying updated ssconf data to all nodes")
998
      for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
999
        fname = ss.KeyToFilename(keyname)
1000
        result = rpc.call_upload_file(dist_nodes, fname)
1001
        for to_node in dist_nodes:
1002
          if not result[to_node]:
1003
            logger.Error("copy of file %s to node %s failed" %
1004
                         (fname, to_node))
1005
    finally:
1006
      if not rpc.call_node_start_master(master, False):
1007
        logger.Error("Could not re-enable the master role on the master,"
1008
                     " please restart manually.")
1009

    
1010

    
1011
def _RecursiveCheckIfLVMBased(disk):
1012
  """Check if the given disk or its children are lvm-based.
1013

1014
  Args:
1015
    disk: ganeti.objects.Disk object
1016

1017
  Returns:
1018
    boolean indicating whether a LD_LV dev_type was found or not
1019

1020
  """
1021
  if disk.children:
1022
    for chdisk in disk.children:
1023
      if _RecursiveCheckIfLVMBased(chdisk):
1024
        return True
1025
  return disk.dev_type == constants.LD_LV
1026

    
1027

    
1028
class LUSetClusterParams(LogicalUnit):
1029
  """Change the parameters of the cluster.
1030

1031
  """
1032
  HPATH = "cluster-modify"
1033
  HTYPE = constants.HTYPE_CLUSTER
1034
  _OP_REQP = []
1035

    
1036
  def BuildHooksEnv(self):
1037
    """Build hooks env.
1038

1039
    """
1040
    env = {
1041
      "OP_TARGET": self.sstore.GetClusterName(),
1042
      "NEW_VG_NAME": self.op.vg_name,
1043
      }
1044
    mn = self.sstore.GetMasterNode()
1045
    return env, [mn], [mn]
1046

    
1047
  def CheckPrereq(self):
1048
    """Check prerequisites.
1049

1050
    This checks whether the given params don't conflict and
1051
    if the given volume group is valid.
1052

1053
    """
1054
    if not self.op.vg_name:
1055
      instances = [self.cfg.GetInstanceInfo(name)
1056
                   for name in self.cfg.GetInstanceList()]
1057
      for inst in instances:
1058
        for disk in inst.disks:
1059
          if _RecursiveCheckIfLVMBased(disk):
1060
            raise errors.OpPrereqError("Cannot disable lvm storage while"
1061
                                       " lvm-based instances exist")
1062

    
1063
    # if vg_name not None, checks given volume group on all nodes
1064
    if self.op.vg_name:
1065
      node_list = self.cfg.GetNodeList()
1066
      vglist = rpc.call_vg_list(node_list)
1067
      for node in node_list:
1068
        vgstatus = utils.CheckVolumeGroupSize(vglist[node], self.op.vg_name,
1069
                                              constants.MIN_VG_SIZE)
1070
        if vgstatus:
1071
          raise errors.OpPrereqError("Error on node '%s': %s" %
1072
                                     (node, vgstatus))
1073

    
1074
  def Exec(self, feedback_fn):
1075
    """Change the parameters of the cluster.
1076

1077
    """
1078
    if self.op.vg_name != self.cfg.GetVGName():
1079
      self.cfg.SetVGName(self.op.vg_name)
1080
    else:
1081
      feedback_fn("Cluster LVM configuration already in desired"
1082
                  " state, not changing")
1083

    
1084

    
1085
def _WaitForSync(cfgw, instance, proc, oneshot=False, unlock=False):
1086
  """Sleep and poll for an instance's disk to sync.
1087

1088
  """
1089
  if not instance.disks:
1090
    return True
1091

    
1092
  if not oneshot:
1093
    proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1094

    
1095
  node = instance.primary_node
1096

    
1097
  for dev in instance.disks:
1098
    cfgw.SetDiskID(dev, node)
1099

    
1100
  retries = 0
1101
  while True:
1102
    max_time = 0
1103
    done = True
1104
    cumul_degraded = False
1105
    rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1106
    if not rstats:
1107
      proc.LogWarning("Can't get any data from node %s" % node)
1108
      retries += 1
1109
      if retries >= 10:
1110
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1111
                                 " aborting." % node)
1112
      time.sleep(6)
1113
      continue
1114
    retries = 0
1115
    for i in range(len(rstats)):
1116
      mstat = rstats[i]
1117
      if mstat is None:
1118
        proc.LogWarning("Can't compute data for node %s/%s" %
1119
                        (node, instance.disks[i].iv_name))
1120
        continue
1121
      # we ignore the ldisk parameter
1122
      perc_done, est_time, is_degraded, _ = mstat
1123
      cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1124
      if perc_done is not None:
1125
        done = False
1126
        if est_time is not None:
1127
          rem_time = "%d estimated seconds remaining" % est_time
1128
          max_time = est_time
1129
        else:
1130
          rem_time = "no time estimate"
1131
        proc.LogInfo("- device %s: %5.2f%% done, %s" %
1132
                     (instance.disks[i].iv_name, perc_done, rem_time))
1133
    if done or oneshot:
1134
      break
1135

    
1136
    time.sleep(min(60, max_time))
1137

    
1138
  if done:
1139
    proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1140
  return not cumul_degraded
1141

    
1142

    
1143
def _CheckDiskConsistency(cfgw, dev, node, on_primary, ldisk=False):
1144
  """Check that mirrors are not degraded.
1145

1146
  The ldisk parameter, if True, will change the test from the
1147
  is_degraded attribute (which represents overall non-ok status for
1148
  the device(s)) to the ldisk (representing the local storage status).
1149

1150
  """
1151
  cfgw.SetDiskID(dev, node)
1152
  if ldisk:
1153
    idx = 6
1154
  else:
1155
    idx = 5
1156

    
1157
  result = True
1158
  if on_primary or dev.AssembleOnSecondary():
1159
    rstats = rpc.call_blockdev_find(node, dev)
1160
    if not rstats:
1161
      logger.ToStderr("Node %s: Disk degraded, not found or node down" % node)
1162
      result = False
1163
    else:
1164
      result = result and (not rstats[idx])
1165
  if dev.children:
1166
    for child in dev.children:
1167
      result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1168

    
1169
  return result
1170

    
1171

    
1172
class LUDiagnoseOS(NoHooksLU):
1173
  """Logical unit for OS diagnose/query.
1174

1175
  """
1176
  _OP_REQP = ["output_fields", "names"]
1177

    
1178
  def CheckPrereq(self):
1179
    """Check prerequisites.
1180

1181
    This always succeeds, since this is a pure query LU.
1182

1183
    """
1184
    if self.op.names:
1185
      raise errors.OpPrereqError("Selective OS query not supported")
1186

    
1187
    self.dynamic_fields = frozenset(["name", "valid", "node_status"])
1188
    _CheckOutputFields(static=[],
1189
                       dynamic=self.dynamic_fields,
1190
                       selected=self.op.output_fields)
1191

    
1192
  @staticmethod
1193
  def _DiagnoseByOS(node_list, rlist):
1194
    """Remaps a per-node return list into an a per-os per-node dictionary
1195

1196
      Args:
1197
        node_list: a list with the names of all nodes
1198
        rlist: a map with node names as keys and OS objects as values
1199

1200
      Returns:
1201
        map: a map with osnames as keys and as value another map, with
1202
             nodes as
1203
             keys and list of OS objects as values
1204
             e.g. {"debian-etch": {"node1": [<object>,...],
1205
                                   "node2": [<object>,]}
1206
                  }
1207

1208
    """
1209
    all_os = {}
1210
    for node_name, nr in rlist.iteritems():
1211
      if not nr:
1212
        continue
1213
      for os_obj in nr:
1214
        if os_obj.name not in all_os:
1215
          # build a list of nodes for this os containing empty lists
1216
          # for each node in node_list
1217
          all_os[os_obj.name] = {}
1218
          for nname in node_list:
1219
            all_os[os_obj.name][nname] = []
1220
        all_os[os_obj.name][node_name].append(os_obj)
1221
    return all_os
1222

    
1223
  def Exec(self, feedback_fn):
1224
    """Compute the list of OSes.
1225

1226
    """
1227
    node_list = self.cfg.GetNodeList()
1228
    node_data = rpc.call_os_diagnose(node_list)
1229
    if node_data == False:
1230
      raise errors.OpExecError("Can't gather the list of OSes")
1231
    pol = self._DiagnoseByOS(node_list, node_data)
1232
    output = []
1233
    for os_name, os_data in pol.iteritems():
1234
      row = []
1235
      for field in self.op.output_fields:
1236
        if field == "name":
1237
          val = os_name
1238
        elif field == "valid":
1239
          val = utils.all([osl and osl[0] for osl in os_data.values()])
1240
        elif field == "node_status":
1241
          val = {}
1242
          for node_name, nos_list in os_data.iteritems():
1243
            val[node_name] = [(v.status, v.path) for v in nos_list]
1244
        else:
1245
          raise errors.ParameterError(field)
1246
        row.append(val)
1247
      output.append(row)
1248

    
1249
    return output
1250

    
1251

    
1252
class LURemoveNode(LogicalUnit):
1253
  """Logical unit for removing a node.
1254

1255
  """
1256
  HPATH = "node-remove"
1257
  HTYPE = constants.HTYPE_NODE
1258
  _OP_REQP = ["node_name"]
1259

    
1260
  def BuildHooksEnv(self):
1261
    """Build hooks env.
1262

1263
    This doesn't run on the target node in the pre phase as a failed
1264
    node would then be impossible to remove.
1265

1266
    """
1267
    env = {
1268
      "OP_TARGET": self.op.node_name,
1269
      "NODE_NAME": self.op.node_name,
1270
      }
1271
    all_nodes = self.cfg.GetNodeList()
1272
    all_nodes.remove(self.op.node_name)
1273
    return env, all_nodes, all_nodes
1274

    
1275
  def CheckPrereq(self):
1276
    """Check prerequisites.
1277

1278
    This checks:
1279
     - the node exists in the configuration
1280
     - it does not have primary or secondary instances
1281
     - it's not the master
1282

1283
    Any errors are signalled by raising errors.OpPrereqError.
1284

1285
    """
1286
    node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1287
    if node is None:
1288
      raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1289

    
1290
    instance_list = self.cfg.GetInstanceList()
1291

    
1292
    masternode = self.sstore.GetMasterNode()
1293
    if node.name == masternode:
1294
      raise errors.OpPrereqError("Node is the master node,"
1295
                                 " you need to failover first.")
1296

    
1297
    for instance_name in instance_list:
1298
      instance = self.cfg.GetInstanceInfo(instance_name)
1299
      if node.name == instance.primary_node:
1300
        raise errors.OpPrereqError("Instance %s still running on the node,"
1301
                                   " please remove first." % instance_name)
1302
      if node.name in instance.secondary_nodes:
1303
        raise errors.OpPrereqError("Instance %s has node as a secondary,"
1304
                                   " please remove first." % instance_name)
1305
    self.op.node_name = node.name
1306
    self.node = node
1307

    
1308
  def Exec(self, feedback_fn):
1309
    """Removes the node from the cluster.
1310

1311
    """
1312
    node = self.node
1313
    logger.Info("stopping the node daemon and removing configs from node %s" %
1314
                node.name)
1315

    
1316
    rpc.call_node_leave_cluster(node.name)
1317

    
1318
    logger.Info("Removing node %s from config" % node.name)
1319

    
1320
    self.cfg.RemoveNode(node.name)
1321
    # Remove the node from the Ganeti Lock Manager
1322
    self.context.glm.remove(locking.LEVEL_NODE, node.name)
1323

    
1324
    utils.RemoveHostFromEtcHosts(node.name)
1325

    
1326

    
1327
class LUQueryNodes(NoHooksLU):
1328
  """Logical unit for querying nodes.
1329

1330
  """
1331
  _OP_REQP = ["output_fields", "names"]
1332

    
1333
  def CheckPrereq(self):
1334
    """Check prerequisites.
1335

1336
    This checks that the fields required are valid output fields.
1337

1338
    """
1339
    self.dynamic_fields = frozenset([
1340
      "dtotal", "dfree",
1341
      "mtotal", "mnode", "mfree",
1342
      "bootid",
1343
      "ctotal",
1344
      ])
1345

    
1346
    _CheckOutputFields(static=["name", "pinst_cnt", "sinst_cnt",
1347
                               "pinst_list", "sinst_list",
1348
                               "pip", "sip", "tags"],
1349
                       dynamic=self.dynamic_fields,
1350
                       selected=self.op.output_fields)
1351

    
1352
    self.wanted = _GetWantedNodes(self, self.op.names)
1353

    
1354
  def Exec(self, feedback_fn):
1355
    """Computes the list of nodes and their attributes.
1356

1357
    """
1358
    nodenames = self.wanted
1359
    nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1360

    
1361
    # begin data gathering
1362

    
1363
    if self.dynamic_fields.intersection(self.op.output_fields):
1364
      live_data = {}
1365
      node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1366
      for name in nodenames:
1367
        nodeinfo = node_data.get(name, None)
1368
        if nodeinfo:
1369
          live_data[name] = {
1370
            "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1371
            "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1372
            "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1373
            "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1374
            "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1375
            "ctotal": utils.TryConvert(int, nodeinfo['cpu_total']),
1376
            "bootid": nodeinfo['bootid'],
1377
            }
1378
        else:
1379
          live_data[name] = {}
1380
    else:
1381
      live_data = dict.fromkeys(nodenames, {})
1382

    
1383
    node_to_primary = dict([(name, set()) for name in nodenames])
1384
    node_to_secondary = dict([(name, set()) for name in nodenames])
1385

    
1386
    inst_fields = frozenset(("pinst_cnt", "pinst_list",
1387
                             "sinst_cnt", "sinst_list"))
1388
    if inst_fields & frozenset(self.op.output_fields):
1389
      instancelist = self.cfg.GetInstanceList()
1390

    
1391
      for instance_name in instancelist:
1392
        inst = self.cfg.GetInstanceInfo(instance_name)
1393
        if inst.primary_node in node_to_primary:
1394
          node_to_primary[inst.primary_node].add(inst.name)
1395
        for secnode in inst.secondary_nodes:
1396
          if secnode in node_to_secondary:
1397
            node_to_secondary[secnode].add(inst.name)
1398

    
1399
    # end data gathering
1400

    
1401
    output = []
1402
    for node in nodelist:
1403
      node_output = []
1404
      for field in self.op.output_fields:
1405
        if field == "name":
1406
          val = node.name
1407
        elif field == "pinst_list":
1408
          val = list(node_to_primary[node.name])
1409
        elif field == "sinst_list":
1410
          val = list(node_to_secondary[node.name])
1411
        elif field == "pinst_cnt":
1412
          val = len(node_to_primary[node.name])
1413
        elif field == "sinst_cnt":
1414
          val = len(node_to_secondary[node.name])
1415
        elif field == "pip":
1416
          val = node.primary_ip
1417
        elif field == "sip":
1418
          val = node.secondary_ip
1419
        elif field == "tags":
1420
          val = list(node.GetTags())
1421
        elif field in self.dynamic_fields:
1422
          val = live_data[node.name].get(field, None)
1423
        else:
1424
          raise errors.ParameterError(field)
1425
        node_output.append(val)
1426
      output.append(node_output)
1427

    
1428
    return output
1429

    
1430

    
1431
class LUQueryNodeVolumes(NoHooksLU):
1432
  """Logical unit for getting volumes on node(s).
1433

1434
  """
1435
  _OP_REQP = ["nodes", "output_fields"]
1436

    
1437
  def CheckPrereq(self):
1438
    """Check prerequisites.
1439

1440
    This checks that the fields required are valid output fields.
1441

1442
    """
1443
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1444

    
1445
    _CheckOutputFields(static=["node"],
1446
                       dynamic=["phys", "vg", "name", "size", "instance"],
1447
                       selected=self.op.output_fields)
1448

    
1449

    
1450
  def Exec(self, feedback_fn):
1451
    """Computes the list of nodes and their attributes.
1452

1453
    """
1454
    nodenames = self.nodes
1455
    volumes = rpc.call_node_volumes(nodenames)
1456

    
1457
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
1458
             in self.cfg.GetInstanceList()]
1459

    
1460
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1461

    
1462
    output = []
1463
    for node in nodenames:
1464
      if node not in volumes or not volumes[node]:
1465
        continue
1466

    
1467
      node_vols = volumes[node][:]
1468
      node_vols.sort(key=lambda vol: vol['dev'])
1469

    
1470
      for vol in node_vols:
1471
        node_output = []
1472
        for field in self.op.output_fields:
1473
          if field == "node":
1474
            val = node
1475
          elif field == "phys":
1476
            val = vol['dev']
1477
          elif field == "vg":
1478
            val = vol['vg']
1479
          elif field == "name":
1480
            val = vol['name']
1481
          elif field == "size":
1482
            val = int(float(vol['size']))
1483
          elif field == "instance":
1484
            for inst in ilist:
1485
              if node not in lv_by_node[inst]:
1486
                continue
1487
              if vol['name'] in lv_by_node[inst][node]:
1488
                val = inst.name
1489
                break
1490
            else:
1491
              val = '-'
1492
          else:
1493
            raise errors.ParameterError(field)
1494
          node_output.append(str(val))
1495

    
1496
        output.append(node_output)
1497

    
1498
    return output
1499

    
1500

    
1501
class LUAddNode(LogicalUnit):
1502
  """Logical unit for adding node to the cluster.
1503

1504
  """
1505
  HPATH = "node-add"
1506
  HTYPE = constants.HTYPE_NODE
1507
  _OP_REQP = ["node_name"]
1508

    
1509
  def BuildHooksEnv(self):
1510
    """Build hooks env.
1511

1512
    This will run on all nodes before, and on all nodes + the new node after.
1513

1514
    """
1515
    env = {
1516
      "OP_TARGET": self.op.node_name,
1517
      "NODE_NAME": self.op.node_name,
1518
      "NODE_PIP": self.op.primary_ip,
1519
      "NODE_SIP": self.op.secondary_ip,
1520
      }
1521
    nodes_0 = self.cfg.GetNodeList()
1522
    nodes_1 = nodes_0 + [self.op.node_name, ]
1523
    return env, nodes_0, nodes_1
1524

    
1525
  def CheckPrereq(self):
1526
    """Check prerequisites.
1527

1528
    This checks:
1529
     - the new node is not already in the config
1530
     - it is resolvable
1531
     - its parameters (single/dual homed) matches the cluster
1532

1533
    Any errors are signalled by raising errors.OpPrereqError.
1534

1535
    """
1536
    node_name = self.op.node_name
1537
    cfg = self.cfg
1538

    
1539
    dns_data = utils.HostInfo(node_name)
1540

    
1541
    node = dns_data.name
1542
    primary_ip = self.op.primary_ip = dns_data.ip
1543
    secondary_ip = getattr(self.op, "secondary_ip", None)
1544
    if secondary_ip is None:
1545
      secondary_ip = primary_ip
1546
    if not utils.IsValidIP(secondary_ip):
1547
      raise errors.OpPrereqError("Invalid secondary IP given")
1548
    self.op.secondary_ip = secondary_ip
1549

    
1550
    node_list = cfg.GetNodeList()
1551
    if not self.op.readd and node in node_list:
1552
      raise errors.OpPrereqError("Node %s is already in the configuration" %
1553
                                 node)
1554
    elif self.op.readd and node not in node_list:
1555
      raise errors.OpPrereqError("Node %s is not in the configuration" % node)
1556

    
1557
    for existing_node_name in node_list:
1558
      existing_node = cfg.GetNodeInfo(existing_node_name)
1559

    
1560
      if self.op.readd and node == existing_node_name:
1561
        if (existing_node.primary_ip != primary_ip or
1562
            existing_node.secondary_ip != secondary_ip):
1563
          raise errors.OpPrereqError("Readded node doesn't have the same IP"
1564
                                     " address configuration as before")
1565
        continue
1566

    
1567
      if (existing_node.primary_ip == primary_ip or
1568
          existing_node.secondary_ip == primary_ip or
1569
          existing_node.primary_ip == secondary_ip or
1570
          existing_node.secondary_ip == secondary_ip):
1571
        raise errors.OpPrereqError("New node ip address(es) conflict with"
1572
                                   " existing node %s" % existing_node.name)
1573

    
1574
    # check that the type of the node (single versus dual homed) is the
1575
    # same as for the master
1576
    myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1577
    master_singlehomed = myself.secondary_ip == myself.primary_ip
1578
    newbie_singlehomed = secondary_ip == primary_ip
1579
    if master_singlehomed != newbie_singlehomed:
1580
      if master_singlehomed:
1581
        raise errors.OpPrereqError("The master has no private ip but the"
1582
                                   " new node has one")
1583
      else:
1584
        raise errors.OpPrereqError("The master has a private ip but the"
1585
                                   " new node doesn't have one")
1586

    
1587
    # checks reachablity
1588
    if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1589
      raise errors.OpPrereqError("Node not reachable by ping")
1590

    
1591
    if not newbie_singlehomed:
1592
      # check reachability from my secondary ip to newbie's secondary ip
1593
      if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1594
                           source=myself.secondary_ip):
1595
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1596
                                   " based ping to noded port")
1597

    
1598
    self.new_node = objects.Node(name=node,
1599
                                 primary_ip=primary_ip,
1600
                                 secondary_ip=secondary_ip)
1601

    
1602
  def Exec(self, feedback_fn):
1603
    """Adds the new node to the cluster.
1604

1605
    """
1606
    new_node = self.new_node
1607
    node = new_node.name
1608

    
1609
    # check connectivity
1610
    result = rpc.call_version([node])[node]
1611
    if result:
1612
      if constants.PROTOCOL_VERSION == result:
1613
        logger.Info("communication to node %s fine, sw version %s match" %
1614
                    (node, result))
1615
      else:
1616
        raise errors.OpExecError("Version mismatch master version %s,"
1617
                                 " node version %s" %
1618
                                 (constants.PROTOCOL_VERSION, result))
1619
    else:
1620
      raise errors.OpExecError("Cannot get version from the new node")
1621

    
1622
    # setup ssh on node
1623
    logger.Info("copy ssh key to node %s" % node)
1624
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1625
    keyarray = []
1626
    keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1627
                constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1628
                priv_key, pub_key]
1629

    
1630
    for i in keyfiles:
1631
      f = open(i, 'r')
1632
      try:
1633
        keyarray.append(f.read())
1634
      finally:
1635
        f.close()
1636

    
1637
    result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1638
                               keyarray[3], keyarray[4], keyarray[5])
1639

    
1640
    if not result:
1641
      raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1642

    
1643
    # Add node to our /etc/hosts, and add key to known_hosts
1644
    utils.AddHostToEtcHosts(new_node.name)
1645

    
1646
    if new_node.secondary_ip != new_node.primary_ip:
1647
      if not rpc.call_node_tcp_ping(new_node.name,
1648
                                    constants.LOCALHOST_IP_ADDRESS,
1649
                                    new_node.secondary_ip,
1650
                                    constants.DEFAULT_NODED_PORT,
1651
                                    10, False):
1652
        raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1653
                                 " you gave (%s). Please fix and re-run this"
1654
                                 " command." % new_node.secondary_ip)
1655

    
1656
    node_verify_list = [self.sstore.GetMasterNode()]
1657
    node_verify_param = {
1658
      'nodelist': [node],
1659
      # TODO: do a node-net-test as well?
1660
    }
1661

    
1662
    result = rpc.call_node_verify(node_verify_list, node_verify_param)
1663
    for verifier in node_verify_list:
1664
      if not result[verifier]:
1665
        raise errors.OpExecError("Cannot communicate with %s's node daemon"
1666
                                 " for remote verification" % verifier)
1667
      if result[verifier]['nodelist']:
1668
        for failed in result[verifier]['nodelist']:
1669
          feedback_fn("ssh/hostname verification failed %s -> %s" %
1670
                      (verifier, result[verifier]['nodelist'][failed]))
1671
        raise errors.OpExecError("ssh/hostname verification failed.")
1672

    
1673
    # Distribute updated /etc/hosts and known_hosts to all nodes,
1674
    # including the node just added
1675
    myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1676
    dist_nodes = self.cfg.GetNodeList()
1677
    if not self.op.readd:
1678
      dist_nodes.append(node)
1679
    if myself.name in dist_nodes:
1680
      dist_nodes.remove(myself.name)
1681

    
1682
    logger.Debug("Copying hosts and known_hosts to all nodes")
1683
    for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1684
      result = rpc.call_upload_file(dist_nodes, fname)
1685
      for to_node in dist_nodes:
1686
        if not result[to_node]:
1687
          logger.Error("copy of file %s to node %s failed" %
1688
                       (fname, to_node))
1689

    
1690
    to_copy = self.sstore.GetFileList()
1691
    if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1692
      to_copy.append(constants.VNC_PASSWORD_FILE)
1693
    for fname in to_copy:
1694
      result = rpc.call_upload_file([node], fname)
1695
      if not result[node]:
1696
        logger.Error("could not copy file %s to node %s" % (fname, node))
1697

    
1698
    if not self.op.readd:
1699
      logger.Info("adding node %s to cluster.conf" % node)
1700
      self.cfg.AddNode(new_node)
1701
      # Add the new node to the Ganeti Lock Manager
1702
      self.context.glm.add(locking.LEVEL_NODE, node)
1703

    
1704

    
1705
class LUQueryClusterInfo(NoHooksLU):
1706
  """Query cluster configuration.
1707

1708
  """
1709
  _OP_REQP = []
1710
  REQ_MASTER = False
1711
  REQ_BGL = False
1712

    
1713
  def ExpandNames(self):
1714
    self.needed_locks = {}
1715

    
1716
  def CheckPrereq(self):
1717
    """No prerequsites needed for this LU.
1718

1719
    """
1720
    pass
1721

    
1722
  def Exec(self, feedback_fn):
1723
    """Return cluster config.
1724

1725
    """
1726
    result = {
1727
      "name": self.sstore.GetClusterName(),
1728
      "software_version": constants.RELEASE_VERSION,
1729
      "protocol_version": constants.PROTOCOL_VERSION,
1730
      "config_version": constants.CONFIG_VERSION,
1731
      "os_api_version": constants.OS_API_VERSION,
1732
      "export_version": constants.EXPORT_VERSION,
1733
      "master": self.sstore.GetMasterNode(),
1734
      "architecture": (platform.architecture()[0], platform.machine()),
1735
      "hypervisor_type": self.sstore.GetHypervisorType(),
1736
      }
1737

    
1738
    return result
1739

    
1740

    
1741
class LUDumpClusterConfig(NoHooksLU):
1742
  """Return a text-representation of the cluster-config.
1743

1744
  """
1745
  _OP_REQP = []
1746
  REQ_BGL = False
1747

    
1748
  def ExpandNames(self):
1749
    self.needed_locks = {}
1750

    
1751
  def CheckPrereq(self):
1752
    """No prerequisites.
1753

1754
    """
1755
    pass
1756

    
1757
  def Exec(self, feedback_fn):
1758
    """Dump a representation of the cluster config to the standard output.
1759

1760
    """
1761
    return self.cfg.DumpConfig()
1762

    
1763

    
1764
class LUActivateInstanceDisks(NoHooksLU):
1765
  """Bring up an instance's disks.
1766

1767
  """
1768
  _OP_REQP = ["instance_name"]
1769

    
1770
  def CheckPrereq(self):
1771
    """Check prerequisites.
1772

1773
    This checks that the instance is in the cluster.
1774

1775
    """
1776
    instance = self.cfg.GetInstanceInfo(
1777
      self.cfg.ExpandInstanceName(self.op.instance_name))
1778
    if instance is None:
1779
      raise errors.OpPrereqError("Instance '%s' not known" %
1780
                                 self.op.instance_name)
1781
    self.instance = instance
1782

    
1783

    
1784
  def Exec(self, feedback_fn):
1785
    """Activate the disks.
1786

1787
    """
1788
    disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
1789
    if not disks_ok:
1790
      raise errors.OpExecError("Cannot activate block devices")
1791

    
1792
    return disks_info
1793

    
1794

    
1795
def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
1796
  """Prepare the block devices for an instance.
1797

1798
  This sets up the block devices on all nodes.
1799

1800
  Args:
1801
    instance: a ganeti.objects.Instance object
1802
    ignore_secondaries: if true, errors on secondary nodes won't result
1803
                        in an error return from the function
1804

1805
  Returns:
1806
    false if the operation failed
1807
    list of (host, instance_visible_name, node_visible_name) if the operation
1808
         suceeded with the mapping from node devices to instance devices
1809
  """
1810
  device_info = []
1811
  disks_ok = True
1812
  iname = instance.name
1813
  # With the two passes mechanism we try to reduce the window of
1814
  # opportunity for the race condition of switching DRBD to primary
1815
  # before handshaking occured, but we do not eliminate it
1816

    
1817
  # The proper fix would be to wait (with some limits) until the
1818
  # connection has been made and drbd transitions from WFConnection
1819
  # into any other network-connected state (Connected, SyncTarget,
1820
  # SyncSource, etc.)
1821

    
1822
  # 1st pass, assemble on all nodes in secondary mode
1823
  for inst_disk in instance.disks:
1824
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1825
      cfg.SetDiskID(node_disk, node)
1826
      result = rpc.call_blockdev_assemble(node, node_disk, iname, False)
1827
      if not result:
1828
        logger.Error("could not prepare block device %s on node %s"
1829
                     " (is_primary=False, pass=1)" % (inst_disk.iv_name, node))
1830
        if not ignore_secondaries:
1831
          disks_ok = False
1832

    
1833
  # FIXME: race condition on drbd migration to primary
1834

    
1835
  # 2nd pass, do only the primary node
1836
  for inst_disk in instance.disks:
1837
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1838
      if node != instance.primary_node:
1839
        continue
1840
      cfg.SetDiskID(node_disk, node)
1841
      result = rpc.call_blockdev_assemble(node, node_disk, iname, True)
1842
      if not result:
1843
        logger.Error("could not prepare block device %s on node %s"
1844
                     " (is_primary=True, pass=2)" % (inst_disk.iv_name, node))
1845
        disks_ok = False
1846
    device_info.append((instance.primary_node, inst_disk.iv_name, result))
1847

    
1848
  # leave the disks configured for the primary node
1849
  # this is a workaround that would be fixed better by
1850
  # improving the logical/physical id handling
1851
  for disk in instance.disks:
1852
    cfg.SetDiskID(disk, instance.primary_node)
1853

    
1854
  return disks_ok, device_info
1855

    
1856

    
1857
def _StartInstanceDisks(cfg, instance, force):
1858
  """Start the disks of an instance.
1859

1860
  """
1861
  disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
1862
                                           ignore_secondaries=force)
1863
  if not disks_ok:
1864
    _ShutdownInstanceDisks(instance, cfg)
1865
    if force is not None and not force:
1866
      logger.Error("If the message above refers to a secondary node,"
1867
                   " you can retry the operation using '--force'.")
1868
    raise errors.OpExecError("Disk consistency error")
1869

    
1870

    
1871
class LUDeactivateInstanceDisks(NoHooksLU):
1872
  """Shutdown an instance's disks.
1873

1874
  """
1875
  _OP_REQP = ["instance_name"]
1876

    
1877
  def CheckPrereq(self):
1878
    """Check prerequisites.
1879

1880
    This checks that the instance is in the cluster.
1881

1882
    """
1883
    instance = self.cfg.GetInstanceInfo(
1884
      self.cfg.ExpandInstanceName(self.op.instance_name))
1885
    if instance is None:
1886
      raise errors.OpPrereqError("Instance '%s' not known" %
1887
                                 self.op.instance_name)
1888
    self.instance = instance
1889

    
1890
  def Exec(self, feedback_fn):
1891
    """Deactivate the disks
1892

1893
    """
1894
    instance = self.instance
1895
    ins_l = rpc.call_instance_list([instance.primary_node])
1896
    ins_l = ins_l[instance.primary_node]
1897
    if not type(ins_l) is list:
1898
      raise errors.OpExecError("Can't contact node '%s'" %
1899
                               instance.primary_node)
1900

    
1901
    if self.instance.name in ins_l:
1902
      raise errors.OpExecError("Instance is running, can't shutdown"
1903
                               " block devices.")
1904

    
1905
    _ShutdownInstanceDisks(instance, self.cfg)
1906

    
1907

    
1908
def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
1909
  """Shutdown block devices of an instance.
1910

1911
  This does the shutdown on all nodes of the instance.
1912

1913
  If the ignore_primary is false, errors on the primary node are
1914
  ignored.
1915

1916
  """
1917
  result = True
1918
  for disk in instance.disks:
1919
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
1920
      cfg.SetDiskID(top_disk, node)
1921
      if not rpc.call_blockdev_shutdown(node, top_disk):
1922
        logger.Error("could not shutdown block device %s on node %s" %
1923
                     (disk.iv_name, node))
1924
        if not ignore_primary or node != instance.primary_node:
1925
          result = False
1926
  return result
1927

    
1928

    
1929
def _CheckNodeFreeMemory(cfg, node, reason, requested):
1930
  """Checks if a node has enough free memory.
1931

1932
  This function check if a given node has the needed amount of free
1933
  memory. In case the node has less memory or we cannot get the
1934
  information from the node, this function raise an OpPrereqError
1935
  exception.
1936

1937
  Args:
1938
    - cfg: a ConfigWriter instance
1939
    - node: the node name
1940
    - reason: string to use in the error message
1941
    - requested: the amount of memory in MiB
1942

1943
  """
1944
  nodeinfo = rpc.call_node_info([node], cfg.GetVGName())
1945
  if not nodeinfo or not isinstance(nodeinfo, dict):
1946
    raise errors.OpPrereqError("Could not contact node %s for resource"
1947
                             " information" % (node,))
1948

    
1949
  free_mem = nodeinfo[node].get('memory_free')
1950
  if not isinstance(free_mem, int):
1951
    raise errors.OpPrereqError("Can't compute free memory on node %s, result"
1952
                             " was '%s'" % (node, free_mem))
1953
  if requested > free_mem:
1954
    raise errors.OpPrereqError("Not enough memory on node %s for %s:"
1955
                             " needed %s MiB, available %s MiB" %
1956
                             (node, reason, requested, free_mem))
1957

    
1958

    
1959
class LUStartupInstance(LogicalUnit):
1960
  """Starts an instance.
1961

1962
  """
1963
  HPATH = "instance-start"
1964
  HTYPE = constants.HTYPE_INSTANCE
1965
  _OP_REQP = ["instance_name", "force"]
1966

    
1967
  def BuildHooksEnv(self):
1968
    """Build hooks env.
1969

1970
    This runs on master, primary and secondary nodes of the instance.
1971

1972
    """
1973
    env = {
1974
      "FORCE": self.op.force,
1975
      }
1976
    env.update(_BuildInstanceHookEnvByObject(self.instance))
1977
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
1978
          list(self.instance.secondary_nodes))
1979
    return env, nl, nl
1980

    
1981
  def CheckPrereq(self):
1982
    """Check prerequisites.
1983

1984
    This checks that the instance is in the cluster.
1985

1986
    """
1987
    instance = self.cfg.GetInstanceInfo(
1988
      self.cfg.ExpandInstanceName(self.op.instance_name))
1989
    if instance is None:
1990
      raise errors.OpPrereqError("Instance '%s' not known" %
1991
                                 self.op.instance_name)
1992

    
1993
    # check bridges existance
1994
    _CheckInstanceBridgesExist(instance)
1995

    
1996
    _CheckNodeFreeMemory(self.cfg, instance.primary_node,
1997
                         "starting instance %s" % instance.name,
1998
                         instance.memory)
1999

    
2000
    self.instance = instance
2001
    self.op.instance_name = instance.name
2002

    
2003
  def Exec(self, feedback_fn):
2004
    """Start the instance.
2005

2006
    """
2007
    instance = self.instance
2008
    force = self.op.force
2009
    extra_args = getattr(self.op, "extra_args", "")
2010

    
2011
    self.cfg.MarkInstanceUp(instance.name)
2012

    
2013
    node_current = instance.primary_node
2014

    
2015
    _StartInstanceDisks(self.cfg, instance, force)
2016

    
2017
    if not rpc.call_instance_start(node_current, instance, extra_args):
2018
      _ShutdownInstanceDisks(instance, self.cfg)
2019
      raise errors.OpExecError("Could not start instance")
2020

    
2021

    
2022
class LURebootInstance(LogicalUnit):
2023
  """Reboot an instance.
2024

2025
  """
2026
  HPATH = "instance-reboot"
2027
  HTYPE = constants.HTYPE_INSTANCE
2028
  _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2029

    
2030
  def BuildHooksEnv(self):
2031
    """Build hooks env.
2032

2033
    This runs on master, primary and secondary nodes of the instance.
2034

2035
    """
2036
    env = {
2037
      "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2038
      }
2039
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2040
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2041
          list(self.instance.secondary_nodes))
2042
    return env, nl, nl
2043

    
2044
  def CheckPrereq(self):
2045
    """Check prerequisites.
2046

2047
    This checks that the instance is in the cluster.
2048

2049
    """
2050
    instance = self.cfg.GetInstanceInfo(
2051
      self.cfg.ExpandInstanceName(self.op.instance_name))
2052
    if instance is None:
2053
      raise errors.OpPrereqError("Instance '%s' not known" %
2054
                                 self.op.instance_name)
2055

    
2056
    # check bridges existance
2057
    _CheckInstanceBridgesExist(instance)
2058

    
2059
    self.instance = instance
2060
    self.op.instance_name = instance.name
2061

    
2062
  def Exec(self, feedback_fn):
2063
    """Reboot the instance.
2064

2065
    """
2066
    instance = self.instance
2067
    ignore_secondaries = self.op.ignore_secondaries
2068
    reboot_type = self.op.reboot_type
2069
    extra_args = getattr(self.op, "extra_args", "")
2070

    
2071
    node_current = instance.primary_node
2072

    
2073
    if reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2074
                           constants.INSTANCE_REBOOT_HARD,
2075
                           constants.INSTANCE_REBOOT_FULL]:
2076
      raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2077
                                  (constants.INSTANCE_REBOOT_SOFT,
2078
                                   constants.INSTANCE_REBOOT_HARD,
2079
                                   constants.INSTANCE_REBOOT_FULL))
2080

    
2081
    if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2082
                       constants.INSTANCE_REBOOT_HARD]:
2083
      if not rpc.call_instance_reboot(node_current, instance,
2084
                                      reboot_type, extra_args):
2085
        raise errors.OpExecError("Could not reboot instance")
2086
    else:
2087
      if not rpc.call_instance_shutdown(node_current, instance):
2088
        raise errors.OpExecError("could not shutdown instance for full reboot")
2089
      _ShutdownInstanceDisks(instance, self.cfg)
2090
      _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
2091
      if not rpc.call_instance_start(node_current, instance, extra_args):
2092
        _ShutdownInstanceDisks(instance, self.cfg)
2093
        raise errors.OpExecError("Could not start instance for full reboot")
2094

    
2095
    self.cfg.MarkInstanceUp(instance.name)
2096

    
2097

    
2098
class LUShutdownInstance(LogicalUnit):
2099
  """Shutdown an instance.
2100

2101
  """
2102
  HPATH = "instance-stop"
2103
  HTYPE = constants.HTYPE_INSTANCE
2104
  _OP_REQP = ["instance_name"]
2105

    
2106
  def BuildHooksEnv(self):
2107
    """Build hooks env.
2108

2109
    This runs on master, primary and secondary nodes of the instance.
2110

2111
    """
2112
    env = _BuildInstanceHookEnvByObject(self.instance)
2113
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2114
          list(self.instance.secondary_nodes))
2115
    return env, nl, nl
2116

    
2117
  def CheckPrereq(self):
2118
    """Check prerequisites.
2119

2120
    This checks that the instance is in the cluster.
2121

2122
    """
2123
    instance = self.cfg.GetInstanceInfo(
2124
      self.cfg.ExpandInstanceName(self.op.instance_name))
2125
    if instance is None:
2126
      raise errors.OpPrereqError("Instance '%s' not known" %
2127
                                 self.op.instance_name)
2128
    self.instance = instance
2129

    
2130
  def Exec(self, feedback_fn):
2131
    """Shutdown the instance.
2132

2133
    """
2134
    instance = self.instance
2135
    node_current = instance.primary_node
2136
    self.cfg.MarkInstanceDown(instance.name)
2137
    if not rpc.call_instance_shutdown(node_current, instance):
2138
      logger.Error("could not shutdown instance")
2139

    
2140
    _ShutdownInstanceDisks(instance, self.cfg)
2141

    
2142

    
2143
class LUReinstallInstance(LogicalUnit):
2144
  """Reinstall an instance.
2145

2146
  """
2147
  HPATH = "instance-reinstall"
2148
  HTYPE = constants.HTYPE_INSTANCE
2149
  _OP_REQP = ["instance_name"]
2150

    
2151
  def BuildHooksEnv(self):
2152
    """Build hooks env.
2153

2154
    This runs on master, primary and secondary nodes of the instance.
2155

2156
    """
2157
    env = _BuildInstanceHookEnvByObject(self.instance)
2158
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2159
          list(self.instance.secondary_nodes))
2160
    return env, nl, nl
2161

    
2162
  def CheckPrereq(self):
2163
    """Check prerequisites.
2164

2165
    This checks that the instance is in the cluster and is not running.
2166

2167
    """
2168
    instance = self.cfg.GetInstanceInfo(
2169
      self.cfg.ExpandInstanceName(self.op.instance_name))
2170
    if instance is None:
2171
      raise errors.OpPrereqError("Instance '%s' not known" %
2172
                                 self.op.instance_name)
2173
    if instance.disk_template == constants.DT_DISKLESS:
2174
      raise errors.OpPrereqError("Instance '%s' has no disks" %
2175
                                 self.op.instance_name)
2176
    if instance.status != "down":
2177
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2178
                                 self.op.instance_name)
2179
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2180
    if remote_info:
2181
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2182
                                 (self.op.instance_name,
2183
                                  instance.primary_node))
2184

    
2185
    self.op.os_type = getattr(self.op, "os_type", None)
2186
    if self.op.os_type is not None:
2187
      # OS verification
2188
      pnode = self.cfg.GetNodeInfo(
2189
        self.cfg.ExpandNodeName(instance.primary_node))
2190
      if pnode is None:
2191
        raise errors.OpPrereqError("Primary node '%s' is unknown" %
2192
                                   self.op.pnode)
2193
      os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2194
      if not os_obj:
2195
        raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2196
                                   " primary node"  % self.op.os_type)
2197

    
2198
    self.instance = instance
2199

    
2200
  def Exec(self, feedback_fn):
2201
    """Reinstall the instance.
2202

2203
    """
2204
    inst = self.instance
2205

    
2206
    if self.op.os_type is not None:
2207
      feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2208
      inst.os = self.op.os_type
2209
      self.cfg.AddInstance(inst)
2210

    
2211
    _StartInstanceDisks(self.cfg, inst, None)
2212
    try:
2213
      feedback_fn("Running the instance OS create scripts...")
2214
      if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2215
        raise errors.OpExecError("Could not install OS for instance %s"
2216
                                 " on node %s" %
2217
                                 (inst.name, inst.primary_node))
2218
    finally:
2219
      _ShutdownInstanceDisks(inst, self.cfg)
2220

    
2221

    
2222
class LURenameInstance(LogicalUnit):
2223
  """Rename an instance.
2224

2225
  """
2226
  HPATH = "instance-rename"
2227
  HTYPE = constants.HTYPE_INSTANCE
2228
  _OP_REQP = ["instance_name", "new_name"]
2229

    
2230
  def BuildHooksEnv(self):
2231
    """Build hooks env.
2232

2233
    This runs on master, primary and secondary nodes of the instance.
2234

2235
    """
2236
    env = _BuildInstanceHookEnvByObject(self.instance)
2237
    env["INSTANCE_NEW_NAME"] = self.op.new_name
2238
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2239
          list(self.instance.secondary_nodes))
2240
    return env, nl, nl
2241

    
2242
  def CheckPrereq(self):
2243
    """Check prerequisites.
2244

2245
    This checks that the instance is in the cluster and is not running.
2246

2247
    """
2248
    instance = self.cfg.GetInstanceInfo(
2249
      self.cfg.ExpandInstanceName(self.op.instance_name))
2250
    if instance is None:
2251
      raise errors.OpPrereqError("Instance '%s' not known" %
2252
                                 self.op.instance_name)
2253
    if instance.status != "down":
2254
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2255
                                 self.op.instance_name)
2256
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2257
    if remote_info:
2258
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2259
                                 (self.op.instance_name,
2260
                                  instance.primary_node))
2261
    self.instance = instance
2262

    
2263
    # new name verification
2264
    name_info = utils.HostInfo(self.op.new_name)
2265

    
2266
    self.op.new_name = new_name = name_info.name
2267
    instance_list = self.cfg.GetInstanceList()
2268
    if new_name in instance_list:
2269
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2270
                                 new_name)
2271

    
2272
    if not getattr(self.op, "ignore_ip", False):
2273
      if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
2274
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
2275
                                   (name_info.ip, new_name))
2276

    
2277

    
2278
  def Exec(self, feedback_fn):
2279
    """Reinstall the instance.
2280

2281
    """
2282
    inst = self.instance
2283
    old_name = inst.name
2284

    
2285
    if inst.disk_template == constants.DT_FILE:
2286
      old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2287

    
2288
    self.cfg.RenameInstance(inst.name, self.op.new_name)
2289
    # Change the instance lock. This is definitely safe while we hold the BGL
2290
    self.context.glm.remove(locking.LEVEL_INSTANCE, inst.name)
2291
    self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
2292

    
2293
    # re-read the instance from the configuration after rename
2294
    inst = self.cfg.GetInstanceInfo(self.op.new_name)
2295

    
2296
    if inst.disk_template == constants.DT_FILE:
2297
      new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2298
      result = rpc.call_file_storage_dir_rename(inst.primary_node,
2299
                                                old_file_storage_dir,
2300
                                                new_file_storage_dir)
2301

    
2302
      if not result:
2303
        raise errors.OpExecError("Could not connect to node '%s' to rename"
2304
                                 " directory '%s' to '%s' (but the instance"
2305
                                 " has been renamed in Ganeti)" % (
2306
                                 inst.primary_node, old_file_storage_dir,
2307
                                 new_file_storage_dir))
2308

    
2309
      if not result[0]:
2310
        raise errors.OpExecError("Could not rename directory '%s' to '%s'"
2311
                                 " (but the instance has been renamed in"
2312
                                 " Ganeti)" % (old_file_storage_dir,
2313
                                               new_file_storage_dir))
2314

    
2315
    _StartInstanceDisks(self.cfg, inst, None)
2316
    try:
2317
      if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2318
                                          "sda", "sdb"):
2319
        msg = ("Could run OS rename script for instance %s on node %s (but the"
2320
               " instance has been renamed in Ganeti)" %
2321
               (inst.name, inst.primary_node))
2322
        logger.Error(msg)
2323
    finally:
2324
      _ShutdownInstanceDisks(inst, self.cfg)
2325

    
2326

    
2327
class LURemoveInstance(LogicalUnit):
2328
  """Remove an instance.
2329

2330
  """
2331
  HPATH = "instance-remove"
2332
  HTYPE = constants.HTYPE_INSTANCE
2333
  _OP_REQP = ["instance_name", "ignore_failures"]
2334

    
2335
  def BuildHooksEnv(self):
2336
    """Build hooks env.
2337

2338
    This runs on master, primary and secondary nodes of the instance.
2339

2340
    """
2341
    env = _BuildInstanceHookEnvByObject(self.instance)
2342
    nl = [self.sstore.GetMasterNode()]
2343
    return env, nl, nl
2344

    
2345
  def CheckPrereq(self):
2346
    """Check prerequisites.
2347

2348
    This checks that the instance is in the cluster.
2349

2350
    """
2351
    instance = self.cfg.GetInstanceInfo(
2352
      self.cfg.ExpandInstanceName(self.op.instance_name))
2353
    if instance is None:
2354
      raise errors.OpPrereqError("Instance '%s' not known" %
2355
                                 self.op.instance_name)
2356
    self.instance = instance
2357

    
2358
  def Exec(self, feedback_fn):
2359
    """Remove the instance.
2360

2361
    """
2362
    instance = self.instance
2363
    logger.Info("shutting down instance %s on node %s" %
2364
                (instance.name, instance.primary_node))
2365

    
2366
    if not rpc.call_instance_shutdown(instance.primary_node, instance):
2367
      if self.op.ignore_failures:
2368
        feedback_fn("Warning: can't shutdown instance")
2369
      else:
2370
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2371
                                 (instance.name, instance.primary_node))
2372

    
2373
    logger.Info("removing block devices for instance %s" % instance.name)
2374

    
2375
    if not _RemoveDisks(instance, self.cfg):
2376
      if self.op.ignore_failures:
2377
        feedback_fn("Warning: can't remove instance's disks")
2378
      else:
2379
        raise errors.OpExecError("Can't remove instance's disks")
2380

    
2381
    logger.Info("removing instance %s out of cluster config" % instance.name)
2382

    
2383
    self.cfg.RemoveInstance(instance.name)
2384
    # Remove the new instance from the Ganeti Lock Manager
2385
    self.context.glm.remove(locking.LEVEL_INSTANCE, instance.name)
2386

    
2387

    
2388
class LUQueryInstances(NoHooksLU):
2389
  """Logical unit for querying instances.
2390

2391
  """
2392
  _OP_REQP = ["output_fields", "names"]
2393

    
2394
  def CheckPrereq(self):
2395
    """Check prerequisites.
2396

2397
    This checks that the fields required are valid output fields.
2398

2399
    """
2400
    self.dynamic_fields = frozenset(["oper_state", "oper_ram", "status"])
2401
    _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2402
                               "admin_state", "admin_ram",
2403
                               "disk_template", "ip", "mac", "bridge",
2404
                               "sda_size", "sdb_size", "vcpus", "tags"],
2405
                       dynamic=self.dynamic_fields,
2406
                       selected=self.op.output_fields)
2407

    
2408
    self.wanted = _GetWantedInstances(self, self.op.names)
2409

    
2410
  def Exec(self, feedback_fn):
2411
    """Computes the list of nodes and their attributes.
2412

2413
    """
2414
    instance_names = self.wanted
2415
    instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2416
                     in instance_names]
2417

    
2418
    # begin data gathering
2419

    
2420
    nodes = frozenset([inst.primary_node for inst in instance_list])
2421

    
2422
    bad_nodes = []
2423
    if self.dynamic_fields.intersection(self.op.output_fields):
2424
      live_data = {}
2425
      node_data = rpc.call_all_instances_info(nodes)
2426
      for name in nodes:
2427
        result = node_data[name]
2428
        if result:
2429
          live_data.update(result)
2430
        elif result == False:
2431
          bad_nodes.append(name)
2432
        # else no instance is alive
2433
    else:
2434
      live_data = dict([(name, {}) for name in instance_names])
2435

    
2436
    # end data gathering
2437

    
2438
    output = []
2439
    for instance in instance_list:
2440
      iout = []
2441
      for field in self.op.output_fields:
2442
        if field == "name":
2443
          val = instance.name
2444
        elif field == "os":
2445
          val = instance.os
2446
        elif field == "pnode":
2447
          val = instance.primary_node
2448
        elif field == "snodes":
2449
          val = list(instance.secondary_nodes)
2450
        elif field == "admin_state":
2451
          val = (instance.status != "down")
2452
        elif field == "oper_state":
2453
          if instance.primary_node in bad_nodes:
2454
            val = None
2455
          else:
2456
            val = bool(live_data.get(instance.name))
2457
        elif field == "status":
2458
          if instance.primary_node in bad_nodes:
2459
            val = "ERROR_nodedown"
2460
          else:
2461
            running = bool(live_data.get(instance.name))
2462
            if running:
2463
              if instance.status != "down":
2464
                val = "running"
2465
              else:
2466
                val = "ERROR_up"
2467
            else:
2468
              if instance.status != "down":
2469
                val = "ERROR_down"
2470
              else:
2471
                val = "ADMIN_down"
2472
        elif field == "admin_ram":
2473
          val = instance.memory
2474
        elif field == "oper_ram":
2475
          if instance.primary_node in bad_nodes:
2476
            val = None
2477
          elif instance.name in live_data:
2478
            val = live_data[instance.name].get("memory", "?")
2479
          else:
2480
            val = "-"
2481
        elif field == "disk_template":
2482
          val = instance.disk_template
2483
        elif field == "ip":
2484
          val = instance.nics[0].ip
2485
        elif field == "bridge":
2486
          val = instance.nics[0].bridge
2487
        elif field == "mac":
2488
          val = instance.nics[0].mac
2489
        elif field == "sda_size" or field == "sdb_size":
2490
          disk = instance.FindDisk(field[:3])
2491
          if disk is None:
2492
            val = None
2493
          else:
2494
            val = disk.size
2495
        elif field == "vcpus":
2496
          val = instance.vcpus
2497
        elif field == "tags":
2498
          val = list(instance.GetTags())
2499
        else:
2500
          raise errors.ParameterError(field)
2501
        iout.append(val)
2502
      output.append(iout)
2503

    
2504
    return output
2505

    
2506

    
2507
class LUFailoverInstance(LogicalUnit):
2508
  """Failover an instance.
2509

2510
  """
2511
  HPATH = "instance-failover"
2512
  HTYPE = constants.HTYPE_INSTANCE
2513
  _OP_REQP = ["instance_name", "ignore_consistency"]
2514

    
2515
  def BuildHooksEnv(self):
2516
    """Build hooks env.
2517

2518
    This runs on master, primary and secondary nodes of the instance.
2519

2520
    """
2521
    env = {
2522
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2523
      }
2524
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2525
    nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2526
    return env, nl, nl
2527

    
2528
  def CheckPrereq(self):
2529
    """Check prerequisites.
2530

2531
    This checks that the instance is in the cluster.
2532

2533
    """
2534
    instance = self.cfg.GetInstanceInfo(
2535
      self.cfg.ExpandInstanceName(self.op.instance_name))
2536
    if instance is None:
2537
      raise errors.OpPrereqError("Instance '%s' not known" %
2538
                                 self.op.instance_name)
2539

    
2540
    if instance.disk_template not in constants.DTS_NET_MIRROR:
2541
      raise errors.OpPrereqError("Instance's disk layout is not"
2542
                                 " network mirrored, cannot failover.")
2543

    
2544
    secondary_nodes = instance.secondary_nodes
2545
    if not secondary_nodes:
2546
      raise errors.ProgrammerError("no secondary node but using "
2547
                                   "a mirrored disk template")
2548

    
2549
    target_node = secondary_nodes[0]
2550
    # check memory requirements on the secondary node
2551
    _CheckNodeFreeMemory(self.cfg, target_node, "failing over instance %s" %
2552
                         instance.name, instance.memory)
2553

    
2554
    # check bridge existance
2555
    brlist = [nic.bridge for nic in instance.nics]
2556
    if not rpc.call_bridges_exist(target_node, brlist):
2557
      raise errors.OpPrereqError("One or more target bridges %s does not"
2558
                                 " exist on destination node '%s'" %
2559
                                 (brlist, target_node))
2560

    
2561
    self.instance = instance
2562

    
2563
  def Exec(self, feedback_fn):
2564
    """Failover an instance.
2565

2566
    The failover is done by shutting it down on its present node and
2567
    starting it on the secondary.
2568

2569
    """
2570
    instance = self.instance
2571

    
2572
    source_node = instance.primary_node
2573
    target_node = instance.secondary_nodes[0]
2574

    
2575
    feedback_fn("* checking disk consistency between source and target")
2576
    for dev in instance.disks:
2577
      # for drbd, these are drbd over lvm
2578
      if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2579
        if instance.status == "up" and not self.op.ignore_consistency:
2580
          raise errors.OpExecError("Disk %s is degraded on target node,"
2581
                                   " aborting failover." % dev.iv_name)
2582

    
2583
    feedback_fn("* shutting down instance on source node")
2584
    logger.Info("Shutting down instance %s on node %s" %
2585
                (instance.name, source_node))
2586

    
2587
    if not rpc.call_instance_shutdown(source_node, instance):
2588
      if self.op.ignore_consistency:
2589
        logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2590
                     " anyway. Please make sure node %s is down"  %
2591
                     (instance.name, source_node, source_node))
2592
      else:
2593
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2594
                                 (instance.name, source_node))
2595

    
2596
    feedback_fn("* deactivating the instance's disks on source node")
2597
    if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2598
      raise errors.OpExecError("Can't shut down the instance's disks.")
2599

    
2600
    instance.primary_node = target_node
2601
    # distribute new instance config to the other nodes
2602
    self.cfg.Update(instance)
2603

    
2604
    # Only start the instance if it's marked as up
2605
    if instance.status == "up":
2606
      feedback_fn("* activating the instance's disks on target node")
2607
      logger.Info("Starting instance %s on node %s" %
2608
                  (instance.name, target_node))
2609

    
2610
      disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2611
                                               ignore_secondaries=True)
2612
      if not disks_ok:
2613
        _ShutdownInstanceDisks(instance, self.cfg)
2614
        raise errors.OpExecError("Can't activate the instance's disks")
2615

    
2616
      feedback_fn("* starting the instance on the target node")
2617
      if not rpc.call_instance_start(target_node, instance, None):
2618
        _ShutdownInstanceDisks(instance, self.cfg)
2619
        raise errors.OpExecError("Could not start instance %s on node %s." %
2620
                                 (instance.name, target_node))
2621

    
2622

    
2623
def _CreateBlockDevOnPrimary(cfg, node, instance, device, info):
2624
  """Create a tree of block devices on the primary node.
2625

2626
  This always creates all devices.
2627

2628
  """
2629
  if device.children:
2630
    for child in device.children:
2631
      if not _CreateBlockDevOnPrimary(cfg, node, instance, child, info):
2632
        return False
2633

    
2634
  cfg.SetDiskID(device, node)
2635
  new_id = rpc.call_blockdev_create(node, device, device.size,
2636
                                    instance.name, True, info)
2637
  if not new_id:
2638
    return False
2639
  if device.physical_id is None:
2640
    device.physical_id = new_id
2641
  return True
2642

    
2643

    
2644
def _CreateBlockDevOnSecondary(cfg, node, instance, device, force, info):
2645
  """Create a tree of block devices on a secondary node.
2646

2647
  If this device type has to be created on secondaries, create it and
2648
  all its children.
2649

2650
  If not, just recurse to children keeping the same 'force' value.
2651

2652
  """
2653
  if device.CreateOnSecondary():
2654
    force = True
2655
  if device.children:
2656
    for child in device.children:
2657
      if not _CreateBlockDevOnSecondary(cfg, node, instance,
2658
                                        child, force, info):
2659
        return False
2660

    
2661
  if not force:
2662
    return True
2663
  cfg.SetDiskID(device, node)
2664
  new_id = rpc.call_blockdev_create(node, device, device.size,
2665
                                    instance.name, False, info)
2666
  if not new_id:
2667
    return False
2668
  if device.physical_id is None:
2669
    device.physical_id = new_id
2670
  return True
2671

    
2672

    
2673
def _GenerateUniqueNames(cfg, exts):
2674
  """Generate a suitable LV name.
2675

2676
  This will generate a logical volume name for the given instance.
2677

2678
  """
2679
  results = []
2680
  for val in exts:
2681
    new_id = cfg.GenerateUniqueID()
2682
    results.append("%s%s" % (new_id, val))
2683
  return results
2684

    
2685

    
2686
def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name):
2687
  """Generate a drbd8 device complete with its children.
2688

2689
  """
2690
  port = cfg.AllocatePort()
2691
  vgname = cfg.GetVGName()
2692
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2693
                          logical_id=(vgname, names[0]))
2694
  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2695
                          logical_id=(vgname, names[1]))
2696
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
2697
                          logical_id = (primary, secondary, port),
2698
                          children = [dev_data, dev_meta],
2699
                          iv_name=iv_name)
2700
  return drbd_dev
2701

    
2702

    
2703
def _GenerateDiskTemplate(cfg, template_name,
2704
                          instance_name, primary_node,
2705
                          secondary_nodes, disk_sz, swap_sz,
2706
                          file_storage_dir, file_driver):
2707
  """Generate the entire disk layout for a given template type.
2708

2709
  """
2710
  #TODO: compute space requirements
2711

    
2712
  vgname = cfg.GetVGName()
2713
  if template_name == constants.DT_DISKLESS:
2714
    disks = []
2715
  elif template_name == constants.DT_PLAIN:
2716
    if len(secondary_nodes) != 0:
2717
      raise errors.ProgrammerError("Wrong template configuration")
2718

    
2719
    names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
2720
    sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2721
                           logical_id=(vgname, names[0]),
2722
                           iv_name = "sda")
2723
    sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2724
                           logical_id=(vgname, names[1]),
2725
                           iv_name = "sdb")
2726
    disks = [sda_dev, sdb_dev]
2727
  elif template_name == constants.DT_DRBD8:
2728
    if len(secondary_nodes) != 1:
2729
      raise errors.ProgrammerError("Wrong template configuration")
2730
    remote_node = secondary_nodes[0]
2731
    names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2732
                                       ".sdb_data", ".sdb_meta"])
2733
    drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2734
                                         disk_sz, names[0:2], "sda")
2735
    drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2736
                                         swap_sz, names[2:4], "sdb")
2737
    disks = [drbd_sda_dev, drbd_sdb_dev]
2738
  elif template_name == constants.DT_FILE:
2739
    if len(secondary_nodes) != 0:
2740
      raise errors.ProgrammerError("Wrong template configuration")
2741

    
2742
    file_sda_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk_sz,
2743
                                iv_name="sda", logical_id=(file_driver,
2744
                                "%s/sda" % file_storage_dir))
2745
    file_sdb_dev = objects.Disk(dev_type=constants.LD_FILE, size=swap_sz,
2746
                                iv_name="sdb", logical_id=(file_driver,
2747
                                "%s/sdb" % file_storage_dir))
2748
    disks = [file_sda_dev, file_sdb_dev]
2749
  else:
2750
    raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
2751
  return disks
2752

    
2753

    
2754
def _GetInstanceInfoText(instance):
2755
  """Compute that text that should be added to the disk's metadata.
2756

2757
  """
2758
  return "originstname+%s" % instance.name
2759

    
2760

    
2761
def _CreateDisks(cfg, instance):
2762
  """Create all disks for an instance.
2763

2764
  This abstracts away some work from AddInstance.
2765

2766
  Args:
2767
    instance: the instance object
2768

2769
  Returns:
2770
    True or False showing the success of the creation process
2771

2772
  """
2773
  info = _GetInstanceInfoText(instance)
2774

    
2775
  if instance.disk_template == constants.DT_FILE:
2776
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
2777
    result = rpc.call_file_storage_dir_create(instance.primary_node,
2778
                                              file_storage_dir)
2779

    
2780
    if not result:
2781
      logger.Error("Could not connect to node '%s'" % instance.primary_node)
2782
      return False
2783

    
2784
    if not result[0]:
2785
      logger.Error("failed to create directory '%s'" % file_storage_dir)
2786
      return False
2787

    
2788
  for device in instance.disks:
2789
    logger.Info("creating volume %s for instance %s" %
2790
                (device.iv_name, instance.name))
2791
    #HARDCODE
2792
    for secondary_node in instance.secondary_nodes:
2793
      if not _CreateBlockDevOnSecondary(cfg, secondary_node, instance,
2794
                                        device, False, info):
2795
        logger.Error("failed to create volume %s (%s) on secondary node %s!" %
2796
                     (device.iv_name, device, secondary_node))
2797
        return False
2798
    #HARDCODE
2799
    if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
2800
                                    instance, device, info):
2801
      logger.Error("failed to create volume %s on primary!" %
2802
                   device.iv_name)
2803
      return False
2804

    
2805
  return True
2806

    
2807

    
2808
def _RemoveDisks(instance, cfg):
2809
  """Remove all disks for an instance.
2810

2811
  This abstracts away some work from `AddInstance()` and
2812
  `RemoveInstance()`. Note that in case some of the devices couldn't
2813
  be removed, the removal will continue with the other ones (compare
2814
  with `_CreateDisks()`).
2815

2816
  Args:
2817
    instance: the instance object
2818

2819
  Returns:
2820
    True or False showing the success of the removal proces
2821

2822
  """
2823
  logger.Info("removing block devices for instance %s" % instance.name)
2824

    
2825
  result = True
2826
  for device in instance.disks:
2827
    for node, disk in device.ComputeNodeTree(instance.primary_node):
2828
      cfg.SetDiskID(disk, node)
2829
      if not rpc.call_blockdev_remove(node, disk):
2830
        logger.Error("could not remove block device %s on node %s,"
2831
                     " continuing anyway" %
2832
                     (device.iv_name, node))
2833
        result = False
2834

    
2835
  if instance.disk_template == constants.DT_FILE:
2836
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
2837
    if not rpc.call_file_storage_dir_remove(instance.primary_node,
2838
                                            file_storage_dir):
2839
      logger.Error("could not remove directory '%s'" % file_storage_dir)
2840
      result = False
2841

    
2842
  return result
2843

    
2844

    
2845
def _ComputeDiskSize(disk_template, disk_size, swap_size):
2846
  """Compute disk size requirements in the volume group
2847

2848
  This is currently hard-coded for the two-drive layout.
2849

2850
  """
2851
  # Required free disk space as a function of disk and swap space
2852
  req_size_dict = {
2853
    constants.DT_DISKLESS: None,
2854
    constants.DT_PLAIN: disk_size + swap_size,
2855
    # 256 MB are added for drbd metadata, 128MB for each drbd device
2856
    constants.DT_DRBD8: disk_size + swap_size + 256,
2857
    constants.DT_FILE: None,
2858
  }
2859

    
2860
  if disk_template not in req_size_dict:
2861
    raise errors.ProgrammerError("Disk template '%s' size requirement"
2862
                                 " is unknown" %  disk_template)
2863

    
2864
  return req_size_dict[disk_template]
2865

    
2866

    
2867
class LUCreateInstance(LogicalUnit):
2868
  """Create an instance.
2869

2870
  """
2871
  HPATH = "instance-add"
2872
  HTYPE = constants.HTYPE_INSTANCE
2873
  _OP_REQP = ["instance_name", "mem_size", "disk_size",
2874
              "disk_template", "swap_size", "mode", "start", "vcpus",
2875
              "wait_for_sync", "ip_check", "mac"]
2876

    
2877
  def _RunAllocator(self):
2878
    """Run the allocator based on input opcode.
2879

2880
    """
2881
    disks = [{"size": self.op.disk_size, "mode": "w"},
2882
             {"size": self.op.swap_size, "mode": "w"}]
2883
    nics = [{"mac": self.op.mac, "ip": getattr(self.op, "ip", None),
2884
             "bridge": self.op.bridge}]
2885
    ial = IAllocator(self.cfg, self.sstore,
2886
                     mode=constants.IALLOCATOR_MODE_ALLOC,
2887
                     name=self.op.instance_name,
2888
                     disk_template=self.op.disk_template,
2889
                     tags=[],
2890
                     os=self.op.os_type,
2891
                     vcpus=self.op.vcpus,
2892
                     mem_size=self.op.mem_size,
2893
                     disks=disks,
2894
                     nics=nics,
2895
                     )
2896

    
2897
    ial.Run(self.op.iallocator)
2898

    
2899
    if not ial.success:
2900
      raise errors.OpPrereqError("Can't compute nodes using"
2901
                                 " iallocator '%s': %s" % (self.op.iallocator,
2902
                                                           ial.info))
2903
    if len(ial.nodes) != ial.required_nodes:
2904
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
2905
                                 " of nodes (%s), required %s" %
2906
                                 (len(ial.nodes), ial.required_nodes))
2907
    self.op.pnode = ial.nodes[0]
2908
    logger.ToStdout("Selected nodes for the instance: %s" %
2909
                    (", ".join(ial.nodes),))
2910
    logger.Info("Selected nodes for instance %s via iallocator %s: %s" %
2911
                (self.op.instance_name, self.op.iallocator, ial.nodes))
2912
    if ial.required_nodes == 2:
2913
      self.op.snode = ial.nodes[1]
2914

    
2915
  def BuildHooksEnv(self):
2916
    """Build hooks env.
2917

2918
    This runs on master, primary and secondary nodes of the instance.
2919

2920
    """
2921
    env = {
2922
      "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
2923
      "INSTANCE_DISK_SIZE": self.op.disk_size,
2924
      "INSTANCE_SWAP_SIZE": self.op.swap_size,
2925
      "INSTANCE_ADD_MODE": self.op.mode,
2926
      }
2927
    if self.op.mode == constants.INSTANCE_IMPORT:
2928
      env["INSTANCE_SRC_NODE"] = self.op.src_node
2929
      env["INSTANCE_SRC_PATH"] = self.op.src_path
2930
      env["INSTANCE_SRC_IMAGE"] = self.src_image
2931

    
2932
    env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
2933
      primary_node=self.op.pnode,
2934
      secondary_nodes=self.secondaries,
2935
      status=self.instance_status,
2936
      os_type=self.op.os_type,
2937
      memory=self.op.mem_size,
2938
      vcpus=self.op.vcpus,
2939
      nics=[(self.inst_ip, self.op.bridge, self.op.mac)],
2940
    ))
2941

    
2942
    nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
2943
          self.secondaries)
2944
    return env, nl, nl
2945

    
2946

    
2947
  def CheckPrereq(self):
2948
    """Check prerequisites.
2949

2950
    """
2951
    # set optional parameters to none if they don't exist
2952
    for attr in ["kernel_path", "initrd_path", "hvm_boot_order", "pnode",
2953
                 "iallocator", "hvm_acpi", "hvm_pae", "hvm_cdrom_image_path",
2954
                 "vnc_bind_address"]:
2955
      if not hasattr(self.op, attr):
2956
        setattr(self.op, attr, None)
2957

    
2958
    if self.op.mode not in (constants.INSTANCE_CREATE,
2959
                            constants.INSTANCE_IMPORT):
2960
      raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
2961
                                 self.op.mode)
2962

    
2963
    if (not self.cfg.GetVGName() and
2964
        self.op.disk_template not in constants.DTS_NOT_LVM):
2965
      raise errors.OpPrereqError("Cluster does not support lvm-based"
2966
                                 " instances")
2967

    
2968
    if self.op.mode == constants.INSTANCE_IMPORT:
2969
      src_node = getattr(self.op, "src_node", None)
2970
      src_path = getattr(self.op, "src_path", None)
2971
      if src_node is None or src_path is None:
2972
        raise errors.OpPrereqError("Importing an instance requires source"
2973
                                   " node and path options")
2974
      src_node_full = self.cfg.ExpandNodeName(src_node)
2975
      if src_node_full is None:
2976
        raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
2977
      self.op.src_node = src_node = src_node_full
2978

    
2979
      if not os.path.isabs(src_path):
2980
        raise errors.OpPrereqError("The source path must be absolute")
2981

    
2982
      export_info = rpc.call_export_info(src_node, src_path)
2983

    
2984
      if not export_info:
2985
        raise errors.OpPrereqError("No export found in dir %s" % src_path)
2986

    
2987
      if not export_info.has_section(constants.INISECT_EXP):
2988
        raise errors.ProgrammerError("Corrupted export config")
2989

    
2990
      ei_version = export_info.get(constants.INISECT_EXP, 'version')
2991
      if (int(ei_version) != constants.EXPORT_VERSION):
2992
        raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
2993
                                   (ei_version, constants.EXPORT_VERSION))
2994

    
2995
      if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
2996
        raise errors.OpPrereqError("Can't import instance with more than"
2997
                                   " one data disk")
2998

    
2999
      # FIXME: are the old os-es, disk sizes, etc. useful?
3000
      self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3001
      diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
3002
                                                         'disk0_dump'))
3003
      self.src_image = diskimage
3004
    else: # INSTANCE_CREATE
3005
      if getattr(self.op, "os_type", None) is None:
3006
        raise errors.OpPrereqError("No guest OS specified")
3007

    
3008
    #### instance parameters check
3009

    
3010
    # disk template and mirror node verification
3011
    if self.op.disk_template not in constants.DISK_TEMPLATES:
3012
      raise errors.OpPrereqError("Invalid disk template name")
3013

    
3014
    # instance name verification
3015
    hostname1 = utils.HostInfo(self.op.instance_name)
3016

    
3017
    self.op.instance_name = instance_name = hostname1.name
3018
    instance_list = self.cfg.GetInstanceList()
3019
    if instance_name in instance_list:
3020
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3021
                                 instance_name)
3022

    
3023
    # ip validity checks
3024
    ip = getattr(self.op, "ip", None)
3025
    if ip is None or ip.lower() == "none":
3026
      inst_ip = None
3027
    elif ip.lower() == "auto":
3028
      inst_ip = hostname1.ip
3029
    else:
3030
      if not utils.IsValidIP(ip):
3031
        raise errors.OpPrereqError("given IP address '%s' doesn't look"
3032
                                   " like a valid IP" % ip)
3033
      inst_ip = ip
3034
    self.inst_ip = self.op.ip = inst_ip
3035

    
3036
    if self.op.start and not self.op.ip_check:
3037
      raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3038
                                 " adding an instance in start mode")
3039

    
3040
    if self.op.ip_check:
3041
      if utils.TcpPing(hostname1.ip, constants.DEFAULT_NODED_PORT):
3042
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
3043
                                   (hostname1.ip, instance_name))
3044

    
3045
    # MAC address verification
3046
    if self.op.mac != "auto":
3047
      if not utils.IsValidMac(self.op.mac.lower()):
3048
        raise errors.OpPrereqError("invalid MAC address specified: %s" %
3049
                                   self.op.mac)
3050

    
3051
    # bridge verification
3052
    bridge = getattr(self.op, "bridge", None)
3053
    if bridge is None:
3054
      self.op.bridge = self.cfg.GetDefBridge()
3055
    else:
3056
      self.op.bridge = bridge
3057

    
3058
    # boot order verification
3059
    if self.op.hvm_boot_order is not None:
3060
      if len(self.op.hvm_boot_order.strip("acdn")) != 0:
3061
        raise errors.OpPrereqError("invalid boot order specified,"
3062
                                   " must be one or more of [acdn]")
3063
    # file storage checks
3064
    if (self.op.file_driver and
3065
        not self.op.file_driver in constants.FILE_DRIVER):
3066
      raise errors.OpPrereqError("Invalid file driver name '%s'" %
3067
                                 self.op.file_driver)
3068

    
3069
    if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
3070
      raise errors.OpPrereqError("File storage directory not a relative"
3071
                                 " path")
3072
    #### allocator run
3073

    
3074
    if [self.op.iallocator, self.op.pnode].count(None) != 1:
3075
      raise errors.OpPrereqError("One and only one of iallocator and primary"
3076
                                 " node must be given")
3077

    
3078
    if self.op.iallocator is not None:
3079
      self._RunAllocator()
3080

    
3081
    #### node related checks
3082

    
3083
    # check primary node
3084
    pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
3085
    if pnode is None:
3086
      raise errors.OpPrereqError("Primary node '%s' is unknown" %
3087
                                 self.op.pnode)
3088
    self.op.pnode = pnode.name
3089
    self.pnode = pnode
3090
    self.secondaries = []
3091

    
3092
    # mirror node verification
3093
    if self.op.disk_template in constants.DTS_NET_MIRROR:
3094
      if getattr(self.op, "snode", None) is None:
3095
        raise errors.OpPrereqError("The networked disk templates need"
3096
                                   " a mirror node")
3097

    
3098
      snode_name = self.cfg.ExpandNodeName(self.op.snode)
3099
      if snode_name is None:
3100
        raise errors.OpPrereqError("Unknown secondary node '%s'" %
3101
                                   self.op.snode)
3102
      elif snode_name == pnode.name:
3103
        raise errors.OpPrereqError("The secondary node cannot be"
3104
                                   " the primary node.")
3105
      self.secondaries.append(snode_name)
3106

    
3107
    req_size = _ComputeDiskSize(self.op.disk_template,
3108
                                self.op.disk_size, self.op.swap_size)
3109

    
3110
    # Check lv size requirements
3111
    if req_size is not None:
3112
      nodenames = [pnode.name] + self.secondaries
3113
      nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
3114
      for node in nodenames:
3115
        info = nodeinfo.get(node, None)
3116
        if not info:
3117
          raise errors.OpPrereqError("Cannot get current information"
3118
                                     " from node '%s'" % node)
3119
        vg_free = info.get('vg_free', None)
3120
        if not isinstance(vg_free, int):
3121
          raise errors.OpPrereqError("Can't compute free disk space on"
3122
                                     " node %s" % node)
3123
        if req_size > info['vg_free']:
3124
          raise errors.OpPrereqError("Not enough disk space on target node %s."
3125
                                     " %d MB available, %d MB required" %
3126
                                     (node, info['vg_free'], req_size))
3127

    
3128
    # os verification
3129
    os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
3130
    if not os_obj:
3131
      raise errors.OpPrereqError("OS '%s' not in supported os list for"
3132
                                 " primary node"  % self.op.os_type)
3133

    
3134
    if self.op.kernel_path == constants.VALUE_NONE:
3135
      raise errors.OpPrereqError("Can't set instance kernel to none")
3136

    
3137

    
3138
    # bridge check on primary node
3139
    if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3140
      raise errors.OpPrereqError("target bridge '%s' does not exist on"
3141
                                 " destination node '%s'" %
3142
                                 (self.op.bridge, pnode.name))
3143

    
3144
    # memory check on primary node
3145
    if self.op.start:
3146
      _CheckNodeFreeMemory(self.cfg, self.pnode.name,
3147
                           "creating instance %s" % self.op.instance_name,
3148
                           self.op.mem_size)
3149

    
3150
    # hvm_cdrom_image_path verification
3151
    if self.op.hvm_cdrom_image_path is not None:
3152
      if not os.path.isabs(self.op.hvm_cdrom_image_path):
3153
        raise errors.OpPrereqError("The path to the HVM CDROM image must"
3154
                                   " be an absolute path or None, not %s" %
3155
                                   self.op.hvm_cdrom_image_path)
3156
      if not os.path.isfile(self.op.hvm_cdrom_image_path):
3157
        raise errors.OpPrereqError("The HVM CDROM image must either be a"
3158
                                   " regular file or a symlink pointing to"
3159
                                   " an existing regular file, not %s" %
3160
                                   self.op.hvm_cdrom_image_path)
3161

    
3162
    # vnc_bind_address verification
3163
    if self.op.vnc_bind_address is not None:
3164
      if not utils.IsValidIP(self.op.vnc_bind_address):
3165
        raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
3166
                                   " like a valid IP address" %
3167
                                   self.op.vnc_bind_address)
3168

    
3169
    if self.op.start:
3170
      self.instance_status = 'up'
3171
    else:
3172
      self.instance_status = 'down'
3173

    
3174
  def Exec(self, feedback_fn):
3175
    """Create and add the instance to the cluster.
3176

3177
    """
3178
    instance = self.op.instance_name
3179
    pnode_name = self.pnode.name
3180

    
3181
    if self.op.mac == "auto":
3182
      mac_address = self.cfg.GenerateMAC()
3183
    else:
3184
      mac_address = self.op.mac
3185

    
3186
    nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3187
    if self.inst_ip is not None:
3188
      nic.ip = self.inst_ip
3189

    
3190
    ht_kind = self.sstore.GetHypervisorType()
3191
    if ht_kind in constants.HTS_REQ_PORT:
3192
      network_port = self.cfg.AllocatePort()
3193
    else:
3194
      network_port = None
3195

    
3196
    if self.op.vnc_bind_address is None:
3197
      self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
3198

    
3199
    # this is needed because os.path.join does not accept None arguments
3200
    if self.op.file_storage_dir is None:
3201
      string_file_storage_dir = ""
3202
    else:
3203
      string_file_storage_dir = self.op.file_storage_dir
3204

    
3205
    # build the full file storage dir path
3206
    file_storage_dir = os.path.normpath(os.path.join(
3207
                                        self.sstore.GetFileStorageDir(),
3208
                                        string_file_storage_dir, instance))
3209

    
3210

    
3211
    disks = _GenerateDiskTemplate(self.cfg,
3212
                                  self.op.disk_template,
3213
                                  instance, pnode_name,
3214
                                  self.secondaries, self.op.disk_size,
3215
                                  self.op.swap_size,
3216
                                  file_storage_dir,
3217
                                  self.op.file_driver)
3218

    
3219
    iobj = objects.Instance(name=instance, os=self.op.os_type,
3220
                            primary_node=pnode_name,
3221
                            memory=self.op.mem_size,
3222
                            vcpus=self.op.vcpus,
3223
                            nics=[nic], disks=disks,
3224
                            disk_template=self.op.disk_template,
3225
                            status=self.instance_status,
3226
                            network_port=network_port,
3227
                            kernel_path=self.op.kernel_path,
3228
                            initrd_path=self.op.initrd_path,
3229
                            hvm_boot_order=self.op.hvm_boot_order,
3230
                            hvm_acpi=self.op.hvm_acpi,
3231
                            hvm_pae=self.op.hvm_pae,
3232
                            hvm_cdrom_image_path=self.op.hvm_cdrom_image_path,
3233
                            vnc_bind_address=self.op.vnc_bind_address,
3234
                            )
3235

    
3236
    feedback_fn("* creating instance disks...")
3237
    if not _CreateDisks(self.cfg, iobj):
3238
      _RemoveDisks(iobj, self.cfg)
3239
      raise errors.OpExecError("Device creation failed, reverting...")
3240

    
3241
    feedback_fn("adding instance %s to cluster config" % instance)
3242

    
3243
    self.cfg.AddInstance(iobj)
3244
    # Add the new instance to the Ganeti Lock Manager
3245
    self.context.glm.add(locking.LEVEL_INSTANCE, instance)
3246

    
3247
    if self.op.wait_for_sync:
3248
      disk_abort = not _WaitForSync(self.cfg, iobj, self.proc)
3249
    elif iobj.disk_template in constants.DTS_NET_MIRROR:
3250
      # make sure the disks are not degraded (still sync-ing is ok)
3251
      time.sleep(15)
3252
      feedback_fn("* checking mirrors status")
3253
      disk_abort = not _WaitForSync(self.cfg, iobj, self.proc, oneshot=True)
3254
    else:
3255
      disk_abort = False
3256

    
3257
    if disk_abort:
3258
      _RemoveDisks(iobj, self.cfg)
3259
      self.cfg.RemoveInstance(iobj.name)
3260
      # Remove the new instance from the Ganeti Lock Manager
3261
      self.context.glm.remove(locking.LEVEL_INSTANCE, iobj.name)
3262
      raise errors.OpExecError("There are some degraded disks for"
3263
                               " this instance")
3264

    
3265
    feedback_fn("creating os for instance %s on node %s" %
3266
                (instance, pnode_name))
3267

    
3268
    if iobj.disk_template != constants.DT_DISKLESS:
3269
      if self.op.mode == constants.INSTANCE_CREATE:
3270
        feedback_fn("* running the instance OS create scripts...")
3271
        if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3272
          raise errors.OpExecError("could not add os for instance %s"
3273
                                   " on node %s" %
3274
                                   (instance, pnode_name))
3275

    
3276
      elif self.op.mode == constants.INSTANCE_IMPORT:
3277
        feedback_fn("* running the instance OS import scripts...")
3278
        src_node = self.op.src_node
3279
        src_image = self.src_image
3280
        if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3281
                                                src_node, src_image):
3282
          raise errors.OpExecError("Could not import os for instance"
3283
                                   " %s on node %s" %
3284
                                   (instance, pnode_name))
3285
      else:
3286
        # also checked in the prereq part
3287
        raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3288
                                     % self.op.mode)
3289

    
3290
    if self.op.start:
3291
      logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3292
      feedback_fn("* starting instance...")
3293
      if not rpc.call_instance_start(pnode_name, iobj, None):
3294
        raise errors.OpExecError("Could not start instance")
3295

    
3296

    
3297
class LUConnectConsole(NoHooksLU):
3298
  """Connect to an instance's console.
3299

3300
  This is somewhat special in that it returns the command line that
3301
  you need to run on the master node in order to connect to the
3302
  console.
3303

3304
  """
3305
  _OP_REQP = ["instance_name"]
3306
  REQ_BGL = False
3307

    
3308
  def ExpandNames(self):
3309
    self._ExpandAndLockInstance()
3310

    
3311
  def CheckPrereq(self):
3312
    """Check prerequisites.
3313

3314
    This checks that the instance is in the cluster.
3315

3316
    """
3317
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3318
    assert self.instance is not None, \
3319
      "Cannot retrieve locked instance %s" % self.op.instance_name
3320

    
3321
  def Exec(self, feedback_fn):
3322
    """Connect to the console of an instance
3323

3324
    """
3325
    instance = self.instance
3326
    node = instance.primary_node
3327

    
3328
    node_insts = rpc.call_instance_list([node])[node]
3329
    if node_insts is False:
3330
      raise errors.OpExecError("Can't connect to node %s." % node)
3331

    
3332
    if instance.name not in node_insts:
3333
      raise errors.OpExecError("Instance %s is not running." % instance.name)
3334

    
3335
    logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3336

    
3337
    hyper = hypervisor.GetHypervisor()
3338
    console_cmd = hyper.GetShellCommandForConsole(instance)
3339

    
3340
    # build ssh cmdline
3341
    return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
3342

    
3343

    
3344
class LUReplaceDisks(LogicalUnit):
3345
  """Replace the disks of an instance.
3346

3347
  """
3348
  HPATH = "mirrors-replace"
3349
  HTYPE = constants.HTYPE_INSTANCE
3350
  _OP_REQP = ["instance_name", "mode", "disks"]
3351

    
3352
  def _RunAllocator(self):
3353
    """Compute a new secondary node using an IAllocator.
3354

3355
    """
3356
    ial = IAllocator(self.cfg, self.sstore,
3357
                     mode=constants.IALLOCATOR_MODE_RELOC,
3358
                     name=self.op.instance_name,
3359
                     relocate_from=[self.sec_node])
3360

    
3361
    ial.Run(self.op.iallocator)
3362

    
3363
    if not ial.success:
3364
      raise errors.OpPrereqError("Can't compute nodes using"
3365
                                 " iallocator '%s': %s" % (self.op.iallocator,
3366
                                                           ial.info))
3367
    if len(ial.nodes) != ial.required_nodes:
3368
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3369
                                 " of nodes (%s), required %s" %
3370
                                 (len(ial.nodes), ial.required_nodes))
3371
    self.op.remote_node = ial.nodes[0]
3372
    logger.ToStdout("Selected new secondary for the instance: %s" %
3373
                    self.op.remote_node)
3374

    
3375
  def BuildHooksEnv(self):
3376
    """Build hooks env.
3377

3378
    This runs on the master, the primary and all the secondaries.
3379

3380
    """
3381
    env = {
3382
      "MODE": self.op.mode,
3383
      "NEW_SECONDARY": self.op.remote_node,
3384
      "OLD_SECONDARY": self.instance.secondary_nodes[0],
3385
      }
3386
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3387
    nl = [
3388
      self.sstore.GetMasterNode(),
3389
      self.instance.primary_node,
3390
      ]
3391
    if self.op.remote_node is not None:
3392
      nl.append(self.op.remote_node)
3393
    return env, nl, nl
3394

    
3395
  def CheckPrereq(self):
3396
    """Check prerequisites.
3397

3398
    This checks that the instance is in the cluster.
3399

3400
    """
3401
    if not hasattr(self.op, "remote_node"):
3402
      self.op.remote_node = None
3403

    
3404
    instance = self.cfg.GetInstanceInfo(
3405
      self.cfg.ExpandInstanceName(self.op.instance_name))
3406
    if instance is None:
3407
      raise errors.OpPrereqError("Instance '%s' not known" %
3408
                                 self.op.instance_name)
3409
    self.instance = instance
3410
    self.op.instance_name = instance.name
3411

    
3412
    if instance.disk_template not in constants.DTS_NET_MIRROR:
3413
      raise errors.OpPrereqError("Instance's disk layout is not"
3414
                                 " network mirrored.")
3415

    
3416
    if len(instance.secondary_nodes) != 1:
3417
      raise errors.OpPrereqError("The instance has a strange layout,"
3418
                                 " expected one secondary but found %d" %
3419
                                 len(instance.secondary_nodes))
3420

    
3421
    self.sec_node = instance.secondary_nodes[0]
3422

    
3423
    ia_name = getattr(self.op, "iallocator", None)
3424
    if ia_name is not None:
3425
      if self.op.remote_node is not None:
3426
        raise errors.OpPrereqError("Give either the iallocator or the new"
3427
                                   " secondary, not both")
3428
      self.op.remote_node = self._RunAllocator()
3429

    
3430
    remote_node = self.op.remote_node
3431
    if remote_node is not None:
3432
      remote_node = self.cfg.ExpandNodeName(remote_node)
3433
      if remote_node is None:
3434
        raise errors.OpPrereqError("Node '%s' not known" %
3435
                                   self.op.remote_node)
3436
      self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3437
    else:
3438
      self.remote_node_info = None
3439
    if remote_node == instance.primary_node:
3440
      raise errors.OpPrereqError("The specified node is the primary node of"
3441
                                 " the instance.")
3442
    elif remote_node == self.sec_node:
3443
      if self.op.mode == constants.REPLACE_DISK_SEC:
3444
        # this is for DRBD8, where we can't execute the same mode of
3445
        # replacement as for drbd7 (no different port allocated)
3446
        raise errors.OpPrereqError("Same secondary given, cannot execute"
3447
                                   " replacement")
3448
    if instance.disk_template == constants.DT_DRBD8:
3449
      if (self.op.mode == constants.REPLACE_DISK_ALL and
3450
          remote_node is not None):
3451
        # switch to replace secondary mode
3452
        self.op.mode = constants.REPLACE_DISK_SEC
3453

    
3454
      if self.op.mode == constants.REPLACE_DISK_ALL:
3455
        raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3456
                                   " secondary disk replacement, not"
3457
                                   " both at once")
3458
      elif self.op.mode == constants.REPLACE_DISK_PRI:
3459
        if remote_node is not None:
3460
          raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3461
                                     " the secondary while doing a primary"
3462
                                     " node disk replacement")
3463
        self.tgt_node = instance.primary_node
3464
        self.oth_node = instance.secondary_nodes[0]
3465
      elif self.op.mode == constants.REPLACE_DISK_SEC:
3466
        self.new_node = remote_node # this can be None, in which case
3467
                                    # we don't change the secondary
3468
        self.tgt_node = instance.secondary_nodes[0]
3469
        self.oth_node = instance.primary_node
3470
      else:
3471
        raise errors.ProgrammerError("Unhandled disk replace mode")
3472

    
3473
    for name in self.op.disks:
3474
      if instance.FindDisk(name) is None:
3475
        raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3476
                                   (name, instance.name))
3477
    self.op.remote_node = remote_node
3478

    
3479
  def _ExecD8DiskOnly(self, feedback_fn):
3480
    """Replace a disk on the primary or secondary for dbrd8.
3481

3482
    The algorithm for replace is quite complicated:
3483
      - for each disk to be replaced:
3484
        - create new LVs on the target node with unique names
3485
        - detach old LVs from the drbd device
3486
        - rename old LVs to name_replaced.<time_t>
3487
        - rename new LVs to old LVs
3488
        - attach the new LVs (with the old names now) to the drbd device
3489
      - wait for sync across all devices
3490
      - for each modified disk:
3491
        - remove old LVs (which have the name name_replaces.<time_t>)
3492

3493
    Failures are not very well handled.
3494

3495
    """
3496
    steps_total = 6
3497
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3498
    instance = self.instance
3499
    iv_names = {}
3500
    vgname = self.cfg.GetVGName()
3501
    # start of work
3502
    cfg = self.cfg
3503
    tgt_node = self.tgt_node
3504
    oth_node = self.oth_node
3505

    
3506
    # Step: check device activation
3507
    self.proc.LogStep(1, steps_total, "check device existence")
3508
    info("checking volume groups")
3509
    my_vg = cfg.GetVGName()
3510
    results = rpc.call_vg_list([oth_node, tgt_node])
3511
    if not results:
3512
      raise errors.OpExecError("Can't list volume groups on the nodes")
3513
    for node in oth_node, tgt_node:
3514
      res = results.get(node, False)
3515
      if not res or my_vg not in res:
3516
        raise errors.OpExecError("Volume group '%s' not found on %s" %
3517
                                 (my_vg, node))
3518
    for dev in instance.disks:
3519
      if not dev.iv_name in self.op.disks:
3520
        continue
3521
      for node in tgt_node, oth_node:
3522
        info("checking %s on %s" % (dev.iv_name, node))
3523
        cfg.SetDiskID(dev, node)
3524
        if not rpc.call_blockdev_find(node, dev):
3525
          raise errors.OpExecError("Can't find device %s on node %s" %
3526
                                   (dev.iv_name, node))
3527

    
3528
    # Step: check other node consistency
3529
    self.proc.LogStep(2, steps_total, "check peer consistency")
3530
    for dev in instance.disks:
3531
      if not dev.iv_name in self.op.disks:
3532
        continue
3533
      info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3534
      if not _CheckDiskConsistency(self.cfg, dev, oth_node,
3535
                                   oth_node==instance.primary_node):
3536
        raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3537
                                 " to replace disks on this node (%s)" %
3538
                                 (oth_node, tgt_node))
3539

    
3540
    # Step: create new storage
3541
    self.proc.LogStep(3, steps_total, "allocate new storage")
3542
    for dev in instance.disks:
3543
      if not dev.iv_name in self.op.disks:
3544
        continue
3545
      size = dev.size
3546
      cfg.SetDiskID(dev, tgt_node)
3547
      lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3548
      names = _GenerateUniqueNames(cfg, lv_names)
3549
      lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3550
                             logical_id=(vgname, names[0]))
3551
      lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3552
                             logical_id=(vgname, names[1]))
3553
      new_lvs = [lv_data, lv_meta]
3554
      old_lvs = dev.children
3555
      iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3556
      info("creating new local storage on %s for %s" %
3557
           (tgt_node, dev.iv_name))
3558
      # since we *always* want to create this LV, we use the
3559
      # _Create...OnPrimary (which forces the creation), even if we
3560
      # are talking about the secondary node
3561
      for new_lv in new_lvs:
3562
        if not _CreateBlockDevOnPrimary(cfg, tgt_node, instance, new_lv,
3563
                                        _GetInstanceInfoText(instance)):
3564
          raise errors.OpExecError("Failed to create new LV named '%s' on"
3565
                                   " node '%s'" %
3566
                                   (new_lv.logical_id[1], tgt_node))
3567

    
3568
    # Step: for each lv, detach+rename*2+attach
3569
    self.proc.LogStep(4, steps_total, "change drbd configuration")
3570
    for dev, old_lvs, new_lvs in iv_names.itervalues():
3571
      info("detaching %s drbd from local storage" % dev.iv_name)
3572
      if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
3573
        raise errors.OpExecError("Can't detach drbd from local storage on node"
3574
                                 " %s for device %s" % (tgt_node, dev.iv_name))
3575
      #dev.children = []
3576
      #cfg.Update(instance)
3577

    
3578
      # ok, we created the new LVs, so now we know we have the needed
3579
      # storage; as such, we proceed on the target node to rename
3580
      # old_lv to _old, and new_lv to old_lv; note that we rename LVs
3581
      # using the assumption that logical_id == physical_id (which in
3582
      # turn is the unique_id on that node)
3583

    
3584
      # FIXME(iustin): use a better name for the replaced LVs
3585
      temp_suffix = int(time.time())
3586
      ren_fn = lambda d, suff: (d.physical_id[0],
3587
                                d.physical_id[1] + "_replaced-%s" % suff)
3588
      # build the rename list based on what LVs exist on the node
3589
      rlist = []
3590
      for to_ren in old_lvs:
3591
        find_res = rpc.call_blockdev_find(tgt_node, to_ren)
3592
        if find_res is not None: # device exists
3593
          rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
3594

    
3595
      info("renaming the old LVs on the target node")
3596
      if not rpc.call_blockdev_rename(tgt_node, rlist):
3597
        raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
3598
      # now we rename the new LVs to the old LVs
3599
      info("renaming the new LVs on the target node")
3600
      rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
3601
      if not rpc.call_blockdev_rename(tgt_node, rlist):
3602
        raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
3603

    
3604
      for old, new in zip(old_lvs, new_lvs):
3605
        new.logical_id = old.logical_id
3606
        cfg.SetDiskID(new, tgt_node)
3607

    
3608
      for disk in old_lvs:
3609
        disk.logical_id = ren_fn(disk, temp_suffix)
3610
        cfg.SetDiskID(disk, tgt_node)
3611

    
3612
      # now that the new lvs have the old name, we can add them to the device
3613
      info("adding new mirror component on %s" % tgt_node)
3614
      if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
3615
        for new_lv in new_lvs:
3616
          if not rpc.call_blockdev_remove(tgt_node, new_lv):
3617
            warning("Can't rollback device %s", hint="manually cleanup unused"
3618
                    " logical volumes")
3619
        raise errors.OpExecError("Can't add local storage to drbd")
3620

    
3621
      dev.children = new_lvs
3622
      cfg.Update(instance)
3623

    
3624
    # Step: wait for sync
3625

    
3626
    # this can fail as the old devices are degraded and _WaitForSync
3627
    # does a combined result over all disks, so we don't check its
3628
    # return value
3629
    self.proc.LogStep(5, steps_total, "sync devices")
3630
    _WaitForSync(cfg, instance, self.proc, unlock=True)
3631

    
3632
    # so check manually all the devices
3633
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3634
      cfg.SetDiskID(dev, instance.primary_node)
3635
      is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3636
      if is_degr:
3637
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
3638

    
3639
    # Step: remove old storage
3640
    self.proc.LogStep(6, steps_total, "removing old storage")
3641
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3642
      info("remove logical volumes for %s" % name)
3643
      for lv in old_lvs:
3644
        cfg.SetDiskID(lv, tgt_node)
3645
        if not rpc.call_blockdev_remove(tgt_node, lv):
3646
          warning("Can't remove old LV", hint="manually remove unused LVs")
3647
          continue
3648

    
3649
  def _ExecD8Secondary(self, feedback_fn):
3650
    """Replace the secondary node for drbd8.
3651

3652
    The algorithm for replace is quite complicated:
3653
      - for all disks of the instance:
3654
        - create new LVs on the new node with same names
3655
        - shutdown the drbd device on the old secondary
3656
        - disconnect the drbd network on the primary
3657
        - create the drbd device on the new secondary
3658
        - network attach the drbd on the primary, using an artifice:
3659
          the drbd code for Attach() will connect to the network if it
3660
          finds a device which is connected to the good local disks but
3661
          not network enabled
3662
      - wait for sync across all devices
3663
      - remove all disks from the old secondary
3664

3665
    Failures are not very well handled.
3666

3667
    """
3668
    steps_total = 6
3669
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3670
    instance = self.instance
3671
    iv_names = {}
3672
    vgname = self.cfg.GetVGName()
3673
    # start of work
3674
    cfg = self.cfg
3675
    old_node = self.tgt_node
3676
    new_node = self.new_node
3677
    pri_node = instance.primary_node
3678

    
3679
    # Step: check device activation
3680
    self.proc.LogStep(1, steps_total, "check device existence")
3681
    info("checking volume groups")
3682
    my_vg = cfg.GetVGName()
3683
    results = rpc.call_vg_list([pri_node, new_node])
3684
    if not results:
3685
      raise errors.OpExecError("Can't list volume groups on the nodes")
3686
    for node in pri_node, new_node:
3687
      res = results.get(node, False)
3688
      if not res or my_vg not in res:
3689
        raise errors.OpExecError("Volume group '%s' not found on %s" %
3690
                                 (my_vg, node))
3691
    for dev in instance.disks:
3692
      if not dev.iv_name in self.op.disks:
3693
        continue
3694
      info("checking %s on %s" % (dev.iv_name, pri_node))
3695
      cfg.SetDiskID(dev, pri_node)
3696
      if not rpc.call_blockdev_find(pri_node, dev):
3697
        raise errors.OpExecError("Can't find device %s on node %s" %
3698
                                 (dev.iv_name, pri_node))
3699

    
3700
    # Step: check other node consistency
3701
    self.proc.LogStep(2, steps_total, "check peer consistency")
3702
    for dev in instance.disks:
3703
      if not dev.iv_name in self.op.disks:
3704
        continue
3705
      info("checking %s consistency on %s" % (dev.iv_name, pri_node))
3706
      if not _CheckDiskConsistency(self.cfg, dev, pri_node, True, ldisk=True):
3707
        raise errors.OpExecError("Primary node (%s) has degraded storage,"
3708
                                 " unsafe to replace the secondary" %
3709
                                 pri_node)
3710

    
3711
    # Step: create new storage
3712
    self.proc.LogStep(3, steps_total, "allocate new storage")
3713
    for dev in instance.disks:
3714
      size = dev.size
3715
      info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
3716
      # since we *always* want to create this LV, we use the
3717
      # _Create...OnPrimary (which forces the creation), even if we
3718
      # are talking about the secondary node
3719
      for new_lv in dev.children:
3720
        if not _CreateBlockDevOnPrimary(cfg, new_node, instance, new_lv,
3721
                                        _GetInstanceInfoText(instance)):
3722
          raise errors.OpExecError("Failed to create new LV named '%s' on"
3723
                                   " node '%s'" %
3724
                                   (new_lv.logical_id[1], new_node))
3725

    
3726
      iv_names[dev.iv_name] = (dev, dev.children)
3727

    
3728
    self.proc.LogStep(4, steps_total, "changing drbd configuration")
3729
    for dev in instance.disks:
3730
      size = dev.size
3731
      info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
3732
      # create new devices on new_node
3733
      new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
3734
                              logical_id=(pri_node, new_node,
3735
                                          dev.logical_id[2]),
3736
                              children=dev.children)
3737
      if not _CreateBlockDevOnSecondary(cfg, new_node, instance,
3738
                                        new_drbd, False,
3739
                                      _GetInstanceInfoText(instance)):
3740
        raise errors.OpExecError("Failed to create new DRBD on"
3741
                                 " node '%s'" % new_node)
3742

    
3743
    for dev in instance.disks:
3744
      # we have new devices, shutdown the drbd on the old secondary
3745
      info("shutting down drbd for %s on old node" % dev.iv_name)
3746
      cfg.SetDiskID(dev, old_node)
3747
      if not rpc.call_blockdev_shutdown(old_node, dev):
3748
        warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
3749
                hint="Please cleanup this device manually as soon as possible")
3750

    
3751
    info("detaching primary drbds from the network (=> standalone)")
3752
    done = 0
3753
    for dev in instance.disks:
3754
      cfg.SetDiskID(dev, pri_node)
3755
      # set the physical (unique in bdev terms) id to None, meaning
3756
      # detach from network
3757
      dev.physical_id = (None,) * len(dev.physical_id)
3758
      # and 'find' the device, which will 'fix' it to match the
3759
      # standalone state
3760
      if rpc.call_blockdev_find(pri_node, dev):
3761
        done += 1
3762
      else:
3763
        warning("Failed to detach drbd %s from network, unusual case" %
3764
                dev.iv_name)
3765

    
3766
    if not done:
3767
      # no detaches succeeded (very unlikely)
3768
      raise errors.OpExecError("Can't detach at least one DRBD from old node")
3769

    
3770
    # if we managed to detach at least one, we update all the disks of
3771
    # the instance to point to the new secondary
3772
    info("updating instance configuration")
3773
    for dev in instance.disks:
3774
      dev.logical_id = (pri_node, new_node) + dev.logical_id[2:]
3775
      cfg.SetDiskID(dev, pri_node)
3776
    cfg.Update(instance)
3777

    
3778
    # and now perform the drbd attach
3779
    info("attaching primary drbds to new secondary (standalone => connected)")
3780
    failures = []
3781
    for dev in instance.disks:
3782
      info("attaching primary drbd for %s to new secondary node" % dev.iv_name)
3783
      # since the attach is smart, it's enough to 'find' the device,
3784
      # it will automatically activate the network, if the physical_id
3785
      # is correct
3786
      cfg.SetDiskID(dev, pri_node)
3787
      if not rpc.call_blockdev_find(pri_node, dev):
3788
        warning("can't attach drbd %s to new secondary!" % dev.iv_name,
3789
                "please do a gnt-instance info to see the status of disks")
3790

    
3791
    # this can fail as the old devices are degraded and _WaitForSync
3792
    # does a combined result over all disks, so we don't check its
3793
    # return value
3794
    self.proc.LogStep(5, steps_total, "sync devices")
3795
    _WaitForSync(cfg, instance, self.proc, unlock=True)
3796

    
3797
    # so check manually all the devices
3798
    for name, (dev, old_lvs) in iv_names.iteritems():
3799
      cfg.SetDiskID(dev, pri_node)
3800
      is_degr = rpc.call_blockdev_find(pri_node, dev)[5]
3801
      if is_degr:
3802
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
3803

    
3804
    self.proc.LogStep(6, steps_total, "removing old storage")
3805
    for name, (dev, old_lvs) in iv_names.iteritems():
3806
      info("remove logical volumes for %s" % name)
3807
      for lv in old_lvs:
3808
        cfg.SetDiskID(lv, old_node)
3809
        if not rpc.call_blockdev_remove(old_node, lv):
3810
          warning("Can't remove LV on old secondary",
3811
                  hint="Cleanup stale volumes by hand")
3812

    
3813
  def Exec(self, feedback_fn):
3814
    """Execute disk replacement.
3815

3816
    This dispatches the disk replacement to the appropriate handler.
3817

3818
    """
3819
    instance = self.instance
3820

    
3821
    # Activate the instance disks if we're replacing them on a down instance
3822
    if instance.status == "down":
3823
      op = opcodes.OpActivateInstanceDisks(instance_name=instance.name)
3824
      self.proc.ChainOpCode(op)
3825

    
3826
    if instance.disk_template == constants.DT_DRBD8:
3827
      if self.op.remote_node is None:
3828
        fn = self._ExecD8DiskOnly
3829
      else:
3830
        fn = self._ExecD8Secondary
3831
    else:
3832
      raise errors.ProgrammerError("Unhandled disk replacement case")
3833

    
3834
    ret = fn(feedback_fn)
3835

    
3836
    # Deactivate the instance disks if we're replacing them on a down instance
3837
    if instance.status == "down":
3838
      op = opcodes.OpDeactivateInstanceDisks(instance_name=instance.name)
3839
      self.proc.ChainOpCode(op)
3840

    
3841
    return ret
3842

    
3843

    
3844
class LUGrowDisk(LogicalUnit):
3845
  """Grow a disk of an instance.
3846

3847
  """
3848
  HPATH = "disk-grow"
3849
  HTYPE = constants.HTYPE_INSTANCE
3850
  _OP_REQP = ["instance_name", "disk", "amount"]
3851

    
3852
  def BuildHooksEnv(self):
3853
    """Build hooks env.
3854

3855
    This runs on the master, the primary and all the secondaries.
3856

3857
    """
3858
    env = {
3859
      "DISK": self.op.disk,
3860
      "AMOUNT": self.op.amount,
3861
      }
3862
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3863
    nl = [
3864
      self.sstore.GetMasterNode(),
3865
      self.instance.primary_node,
3866
      ]
3867
    return env, nl, nl
3868

    
3869
  def CheckPrereq(self):
3870
    """Check prerequisites.
3871

3872
    This checks that the instance is in the cluster.
3873

3874
    """
3875
    instance = self.cfg.GetInstanceInfo(
3876
      self.cfg.ExpandInstanceName(self.op.instance_name))
3877
    if instance is None:
3878
      raise errors.OpPrereqError("Instance '%s' not known" %
3879
                                 self.op.instance_name)
3880
    self.instance = instance
3881
    self.op.instance_name = instance.name
3882

    
3883
    if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
3884
      raise errors.OpPrereqError("Instance's disk layout does not support"
3885
                                 " growing.")
3886

    
3887
    if instance.FindDisk(self.op.disk) is None:
3888
      raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3889
                                 (self.op.disk, instance.name))
3890

    
3891
    nodenames = [instance.primary_node] + list(instance.secondary_nodes)
3892
    nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
3893
    for node in nodenames:
3894
      info = nodeinfo.get(node, None)
3895
      if not info:
3896
        raise errors.OpPrereqError("Cannot get current information"
3897
                                   " from node '%s'" % node)
3898
      vg_free = info.get('vg_free', None)
3899
      if not isinstance(vg_free, int):
3900
        raise errors.OpPrereqError("Can't compute free disk space on"
3901
                                   " node %s" % node)
3902
      if self.op.amount > info['vg_free']:
3903
        raise errors.OpPrereqError("Not enough disk space on target node %s:"
3904
                                   " %d MiB available, %d MiB required" %
3905
                                   (node, info['vg_free'], self.op.amount))
3906

    
3907
  def Exec(self, feedback_fn):
3908
    """Execute disk grow.
3909

3910
    """
3911
    instance = self.instance
3912
    disk = instance.FindDisk(self.op.disk)
3913
    for node in (instance.secondary_nodes + (instance.primary_node,)):
3914
      self.cfg.SetDiskID(disk, node)
3915
      result = rpc.call_blockdev_grow(node, disk, self.op.amount)
3916
      if not result or not isinstance(result, tuple) or len(result) != 2:
3917
        raise errors.OpExecError("grow request failed to node %s" % node)
3918
      elif not result[0]:
3919
        raise errors.OpExecError("grow request failed to node %s: %s" %
3920
                                 (node, result[1]))
3921
    disk.RecordGrow(self.op.amount)
3922
    self.cfg.Update(instance)
3923
    return
3924

    
3925

    
3926
class LUQueryInstanceData(NoHooksLU):
3927
  """Query runtime instance data.
3928

3929
  """
3930
  _OP_REQP = ["instances"]
3931

    
3932
  def CheckPrereq(self):
3933
    """Check prerequisites.
3934

3935
    This only checks the optional instance list against the existing names.
3936

3937
    """
3938
    if not isinstance(self.op.instances, list):
3939
      raise errors.OpPrereqError("Invalid argument type 'instances'")
3940
    if self.op.instances:
3941
      self.wanted_instances = []
3942
      names = self.op.instances
3943
      for name in names:
3944
        instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
3945
        if instance is None:
3946
          raise errors.OpPrereqError("No such instance name '%s'" % name)
3947
        self.wanted_instances.append(instance)
3948
    else:
3949
      self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
3950
                               in self.cfg.GetInstanceList()]
3951
    return
3952

    
3953

    
3954
  def _ComputeDiskStatus(self, instance, snode, dev):
3955
    """Compute block device status.
3956

3957
    """
3958
    self.cfg.SetDiskID(dev, instance.primary_node)
3959
    dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
3960
    if dev.dev_type in constants.LDS_DRBD:
3961
      # we change the snode then (otherwise we use the one passed in)
3962
      if dev.logical_id[0] == instance.primary_node:
3963
        snode = dev.logical_id[1]
3964
      else:
3965
        snode = dev.logical_id[0]
3966

    
3967
    if snode:
3968
      self.cfg.SetDiskID(dev, snode)
3969
      dev_sstatus = rpc.call_blockdev_find(snode, dev)
3970
    else:
3971
      dev_sstatus = None
3972

    
3973
    if dev.children:
3974
      dev_children = [self._ComputeDiskStatus(instance, snode, child)
3975
                      for child in dev.children]
3976
    else:
3977
      dev_children = []
3978

    
3979
    data = {
3980
      "iv_name": dev.iv_name,
3981
      "dev_type": dev.dev_type,
3982
      "logical_id": dev.logical_id,
3983
      "physical_id": dev.physical_id,
3984
      "pstatus": dev_pstatus,
3985
      "sstatus": dev_sstatus,
3986
      "children": dev_children,
3987
      }
3988

    
3989
    return data
3990

    
3991
  def Exec(self, feedback_fn):
3992
    """Gather and return data"""
3993
    result = {}
3994
    for instance in self.wanted_instances:
3995
      remote_info = rpc.call_instance_info(instance.primary_node,
3996
                                                instance.name)
3997
      if remote_info and "state" in remote_info:
3998
        remote_state = "up"
3999
      else:
4000
        remote_state = "down"
4001
      if instance.status == "down":
4002
        config_state = "down"
4003
      else:
4004
        config_state = "up"
4005

    
4006
      disks = [self._ComputeDiskStatus(instance, None, device)
4007
               for device in instance.disks]
4008

    
4009
      idict = {
4010
        "name": instance.name,
4011
        "config_state": config_state,
4012
        "run_state": remote_state,
4013
        "pnode": instance.primary_node,
4014
        "snodes": instance.secondary_nodes,
4015
        "os": instance.os,
4016
        "memory": instance.memory,
4017
        "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
4018
        "disks": disks,
4019
        "vcpus": instance.vcpus,
4020
        }
4021

    
4022
      htkind = self.sstore.GetHypervisorType()
4023
      if htkind == constants.HT_XEN_PVM30:
4024
        idict["kernel_path"] = instance.kernel_path
4025
        idict["initrd_path"] = instance.initrd_path
4026

    
4027
      if htkind == constants.HT_XEN_HVM31:
4028
        idict["hvm_boot_order"] = instance.hvm_boot_order
4029
        idict["hvm_acpi"] = instance.hvm_acpi
4030
        idict["hvm_pae"] = instance.hvm_pae
4031
        idict["hvm_cdrom_image_path"] = instance.hvm_cdrom_image_path
4032

    
4033
      if htkind in constants.HTS_REQ_PORT:
4034
        idict["vnc_bind_address"] = instance.vnc_bind_address
4035
        idict["network_port"] = instance.network_port
4036

    
4037
      result[instance.name] = idict
4038

    
4039
    return result
4040

    
4041

    
4042
class LUSetInstanceParams(LogicalUnit):
4043
  """Modifies an instances's parameters.
4044

4045
  """
4046
  HPATH = "instance-modify"
4047
  HTYPE = constants.HTYPE_INSTANCE
4048
  _OP_REQP = ["instance_name"]
4049
  REQ_BGL = False
4050

    
4051
  def ExpandNames(self):
4052
    self._ExpandAndLockInstance()
4053

    
4054
  def BuildHooksEnv(self):
4055
    """Build hooks env.
4056

4057
    This runs on the master, primary and secondaries.
4058

4059
    """
4060
    args = dict()
4061
    if self.mem:
4062
      args['memory'] = self.mem
4063
    if self.vcpus:
4064
      args['vcpus'] = self.vcpus
4065
    if self.do_ip or self.do_bridge or self.mac:
4066
      if self.do_ip:
4067
        ip = self.ip
4068
      else:
4069
        ip = self.instance.nics[0].ip
4070
      if self.bridge:
4071
        bridge = self.bridge
4072
      else:
4073
        bridge = self.instance.nics[0].bridge
4074
      if self.mac:
4075
        mac = self.mac
4076
      else:
4077
        mac = self.instance.nics[0].mac
4078
      args['nics'] = [(ip, bridge, mac)]
4079
    env = _BuildInstanceHookEnvByObject(self.instance, override=args)
4080
    nl = [self.sstore.GetMasterNode(),
4081
          self.instance.primary_node] + list(self.instance.secondary_nodes)
4082
    return env, nl, nl
4083

    
4084
  def CheckPrereq(self):
4085
    """Check prerequisites.
4086

4087
    This only checks the instance list against the existing names.
4088

4089
    """
4090
    # FIXME: all the parameters could be checked before, in ExpandNames, or in
4091
    # a separate CheckArguments function, if we implement one, so the operation
4092
    # can be aborted without waiting for any lock, should it have an error...
4093
    self.mem = getattr(self.op, "mem", None)
4094
    self.vcpus = getattr(self.op, "vcpus", None)
4095
    self.ip = getattr(self.op, "ip", None)
4096
    self.mac = getattr(self.op, "mac", None)
4097
    self.bridge = getattr(self.op, "bridge", None)
4098
    self.kernel_path = getattr(self.op, "kernel_path", None)
4099
    self.initrd_path = getattr(self.op, "initrd_path", None)
4100
    self.hvm_boot_order = getattr(self.op, "hvm_boot_order", None)
4101
    self.hvm_acpi = getattr(self.op, "hvm_acpi", None)
4102
    self.hvm_pae = getattr(self.op, "hvm_pae", None)
4103
    self.hvm_cdrom_image_path = getattr(self.op, "hvm_cdrom_image_path", None)
4104
    self.vnc_bind_address = getattr(self.op, "vnc_bind_address", None)
4105
    all_parms = [self.mem, self.vcpus, self.ip, self.bridge, self.mac,
4106
                 self.kernel_path, self.initrd_path, self.hvm_boot_order,
4107
                 self.hvm_acpi, self.hvm_pae, self.hvm_cdrom_image_path,
4108
                 self.vnc_bind_address]
4109
    if all_parms.count(None) == len(all_parms):
4110
      raise errors.OpPrereqError("No changes submitted")
4111
    if self.mem is not None:
4112
      try:
4113
        self.mem = int(self.mem)
4114
      except ValueError, err:
4115
        raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
4116
    if self.vcpus is not None:
4117
      try:
4118
        self.vcpus = int(self.vcpus)
4119
      except ValueError, err:
4120
        raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
4121
    if self.ip is not None:
4122
      self.do_ip = True
4123
      if self.ip.lower() == "none":
4124
        self.ip = None
4125
      else:
4126
        if not utils.IsValidIP(self.ip):
4127
          raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
4128
    else:
4129
      self.do_ip = False
4130
    self.do_bridge = (self.bridge is not None)
4131
    if self.mac is not None:
4132
      if self.cfg.IsMacInUse(self.mac):
4133
        raise errors.OpPrereqError('MAC address %s already in use in cluster' %
4134
                                   self.mac)
4135
      if not utils.IsValidMac(self.mac):
4136
        raise errors.OpPrereqError('Invalid MAC address %s' % self.mac)
4137

    
4138
    if self.kernel_path is not None:
4139
      self.do_kernel_path = True
4140
      if self.kernel_path == constants.VALUE_NONE:
4141
        raise errors.OpPrereqError("Can't set instance to no kernel")
4142

    
4143
      if self.kernel_path != constants.VALUE_DEFAULT:
4144
        if not os.path.isabs(self.kernel_path):
4145
          raise errors.OpPrereqError("The kernel path must be an absolute"
4146
                                    " filename")
4147
    else:
4148
      self.do_kernel_path = False
4149

    
4150
    if self.initrd_path is not None:
4151
      self.do_initrd_path = True
4152
      if self.initrd_path not in (constants.VALUE_NONE,
4153
                                  constants.VALUE_DEFAULT):
4154
        if not os.path.isabs(self.initrd_path):
4155
          raise errors.OpPrereqError("The initrd path must be an absolute"
4156
                                    " filename")
4157
    else:
4158
      self.do_initrd_path = False
4159

    
4160
    # boot order verification
4161
    if self.hvm_boot_order is not None:
4162
      if self.hvm_boot_order != constants.VALUE_DEFAULT:
4163
        if len(self.hvm_boot_order.strip("acdn")) != 0:
4164
          raise errors.OpPrereqError("invalid boot order specified,"
4165
                                     " must be one or more of [acdn]"
4166
                                     " or 'default'")
4167

    
4168
    # hvm_cdrom_image_path verification
4169
    if self.op.hvm_cdrom_image_path is not None:
4170
      if not os.path.isabs(self.op.hvm_cdrom_image_path):
4171
        raise errors.OpPrereqError("The path to the HVM CDROM image must"
4172
                                   " be an absolute path or None, not %s" %
4173
                                   self.op.hvm_cdrom_image_path)
4174
      if not os.path.isfile(self.op.hvm_cdrom_image_path):
4175
        raise errors.OpPrereqError("The HVM CDROM image must either be a"
4176
                                   " regular file or a symlink pointing to"
4177
                                   " an existing regular file, not %s" %
4178
                                   self.op.hvm_cdrom_image_path)
4179

    
4180
    # vnc_bind_address verification
4181
    if self.op.vnc_bind_address is not None:
4182
      if not utils.IsValidIP(self.op.vnc_bind_address):
4183
        raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
4184
                                   " like a valid IP address" %
4185
                                   self.op.vnc_bind_address)
4186

    
4187
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4188
    assert self.instance is not None, \
4189
      "Cannot retrieve locked instance %s" % self.op.instance_name
4190
    return
4191

    
4192
  def Exec(self, feedback_fn):
4193
    """Modifies an instance.
4194

4195
    All parameters take effect only at the next restart of the instance.
4196
    """
4197
    result = []
4198
    instance = self.instance
4199
    if self.mem:
4200
      instance.memory = self.mem
4201
      result.append(("mem", self.mem))
4202
    if self.vcpus:
4203
      instance.vcpus = self.vcpus
4204
      result.append(("vcpus",  self.vcpus))
4205
    if self.do_ip:
4206
      instance.nics[0].ip = self.ip
4207
      result.append(("ip", self.ip))
4208
    if self.bridge:
4209
      instance.nics[0].bridge = self.bridge
4210
      result.append(("bridge", self.bridge))
4211
    if self.mac:
4212
      instance.nics[0].mac = self.mac
4213
      result.append(("mac", self.mac))
4214
    if self.do_kernel_path:
4215
      instance.kernel_path = self.kernel_path
4216
      result.append(("kernel_path", self.kernel_path))
4217
    if self.do_initrd_path:
4218
      instance.initrd_path = self.initrd_path
4219
      result.append(("initrd_path", self.initrd_path))
4220
    if self.hvm_boot_order:
4221
      if self.hvm_boot_order == constants.VALUE_DEFAULT:
4222
        instance.hvm_boot_order = None
4223
      else:
4224
        instance.hvm_boot_order = self.hvm_boot_order
4225
      result.append(("hvm_boot_order", self.hvm_boot_order))
4226
    if self.hvm_acpi:
4227
      instance.hvm_acpi = self.hvm_acpi
4228
      result.append(("hvm_acpi", self.hvm_acpi))
4229
    if self.hvm_pae:
4230
      instance.hvm_pae = self.hvm_pae
4231
      result.append(("hvm_pae", self.hvm_pae))
4232
    if self.hvm_cdrom_image_path:
4233
      instance.hvm_cdrom_image_path = self.hvm_cdrom_image_path
4234
      result.append(("hvm_cdrom_image_path", self.hvm_cdrom_image_path))
4235
    if self.vnc_bind_address:
4236
      instance.vnc_bind_address = self.vnc_bind_address
4237
      result.append(("vnc_bind_address", self.vnc_bind_address))
4238

    
4239
    self.cfg.Update(instance)
4240

    
4241
    return result
4242

    
4243

    
4244
class LUQueryExports(NoHooksLU):
4245
  """Query the exports list
4246

4247
  """
4248
  _OP_REQP = []
4249

    
4250
  def CheckPrereq(self):
4251
    """Check that the nodelist contains only existing nodes.
4252

4253
    """
4254
    self.nodes = _GetWantedNodes(self, getattr(self.op, "nodes", None))
4255

    
4256
  def Exec(self, feedback_fn):
4257
    """Compute the list of all the exported system images.
4258

4259
    Returns:
4260
      a dictionary with the structure node->(export-list)
4261
      where export-list is a list of the instances exported on
4262
      that node.
4263

4264
    """
4265
    return rpc.call_export_list(self.nodes)
4266

    
4267

    
4268
class LUExportInstance(LogicalUnit):
4269
  """Export an instance to an image in the cluster.
4270

4271
  """
4272
  HPATH = "instance-export"
4273
  HTYPE = constants.HTYPE_INSTANCE
4274
  _OP_REQP = ["instance_name", "target_node", "shutdown"]
4275

    
4276
  def BuildHooksEnv(self):
4277
    """Build hooks env.
4278

4279
    This will run on the master, primary node and target node.
4280

4281
    """
4282
    env = {
4283
      "EXPORT_NODE": self.op.target_node,
4284
      "EXPORT_DO_SHUTDOWN": self.op.shutdown,
4285
      }
4286
    env.update(_BuildInstanceHookEnvByObject(self.instance))
4287
    nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
4288
          self.op.target_node]
4289
    return env, nl, nl
4290

    
4291
  def CheckPrereq(self):
4292
    """Check prerequisites.
4293

4294
    This checks that the instance and node names are valid.
4295

4296
    """
4297
    instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4298
    self.instance = self.cfg.GetInstanceInfo(instance_name)
4299
    if self.instance is None:
4300
      raise errors.OpPrereqError("Instance '%s' not found" %
4301
                                 self.op.instance_name)
4302

    
4303
    # node verification
4304
    dst_node_short = self.cfg.ExpandNodeName(self.op.target_node)
4305
    self.dst_node = self.cfg.GetNodeInfo(dst_node_short)
4306

    
4307
    if self.dst_node is None:
4308
      raise errors.OpPrereqError("Destination node '%s' is unknown." %
4309
                                 self.op.target_node)
4310
    self.op.target_node = self.dst_node.name
4311

    
4312
    # instance disk type verification
4313
    for disk in self.instance.disks:
4314
      if disk.dev_type == constants.LD_FILE:
4315
        raise errors.OpPrereqError("Export not supported for instances with"
4316
                                   " file-based disks")
4317

    
4318
  def Exec(self, feedback_fn):
4319
    """Export an instance to an image in the cluster.
4320

4321
    """
4322
    instance = self.instance
4323
    dst_node = self.dst_node
4324
    src_node = instance.primary_node
4325
    if self.op.shutdown:
4326
      # shutdown the instance, but not the disks
4327
      if not rpc.call_instance_shutdown(src_node, instance):
4328
         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
4329
                                  (instance.name, src_node))
4330

    
4331
    vgname = self.cfg.GetVGName()
4332

    
4333
    snap_disks = []
4334

    
4335
    try:
4336
      for disk in instance.disks:
4337
        if disk.iv_name == "sda":
4338
          # new_dev_name will be a snapshot of an lvm leaf of the one we passed
4339
          new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
4340

    
4341
          if not new_dev_name:
4342
            logger.Error("could not snapshot block device %s on node %s" %
4343
                         (disk.logical_id[1], src_node))
4344
          else:
4345
            new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
4346
                                      logical_id=(vgname, new_dev_name),
4347
                                      physical_id=(vgname, new_dev_name),
4348
                                      iv_name=disk.iv_name)
4349
            snap_disks.append(new_dev)
4350

    
4351
    finally:
4352
      if self.op.shutdown and instance.status == "up":
4353
        if not rpc.call_instance_start(src_node, instance, None):
4354
          _ShutdownInstanceDisks(instance, self.cfg)
4355
          raise errors.OpExecError("Could not start instance")
4356

    
4357
    # TODO: check for size
4358

    
4359
    for dev in snap_disks:
4360
      if not rpc.call_snapshot_export(src_node, dev, dst_node.name, instance):
4361
        logger.Error("could not export block device %s from node %s to node %s"
4362
                     % (dev.logical_id[1], src_node, dst_node.name))
4363
      if not rpc.call_blockdev_remove(src_node, dev):
4364
        logger.Error("could not remove snapshot block device %s from node %s" %
4365
                     (dev.logical_id[1], src_node))
4366

    
4367
    if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
4368
      logger.Error("could not finalize export for instance %s on node %s" %
4369
                   (instance.name, dst_node.name))
4370

    
4371
    nodelist = self.cfg.GetNodeList()
4372
    nodelist.remove(dst_node.name)
4373

    
4374
    # on one-node clusters nodelist will be empty after the removal
4375
    # if we proceed the backup would be removed because OpQueryExports
4376
    # substitutes an empty list with the full cluster node list.
4377
    if nodelist:
4378
      op = opcodes.OpQueryExports(nodes=nodelist)
4379
      exportlist = self.proc.ChainOpCode(op)
4380
      for node in exportlist:
4381
        if instance.name in exportlist[node]:
4382
          if not rpc.call_export_remove(node, instance.name):
4383
            logger.Error("could not remove older export for instance %s"
4384
                         " on node %s" % (instance.name, node))
4385

    
4386

    
4387
class LURemoveExport(NoHooksLU):
4388
  """Remove exports related to the named instance.
4389

4390
  """
4391
  _OP_REQP = ["instance_name"]
4392

    
4393
  def CheckPrereq(self):
4394
    """Check prerequisites.
4395
    """
4396
    pass
4397

    
4398
  def Exec(self, feedback_fn):
4399
    """Remove any export.
4400

4401
    """
4402
    instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4403
    # If the instance was not found we'll try with the name that was passed in.
4404
    # This will only work if it was an FQDN, though.
4405
    fqdn_warn = False
4406
    if not instance_name:
4407
      fqdn_warn = True
4408
      instance_name = self.op.instance_name
4409

    
4410
    op = opcodes.OpQueryExports(nodes=[])
4411
    exportlist = self.proc.ChainOpCode(op)
4412
    found = False
4413
    for node in exportlist:
4414
      if instance_name in exportlist[node]:
4415
        found = True
4416
        if not rpc.call_export_remove(node, instance_name):
4417
          logger.Error("could not remove export for instance %s"
4418
                       " on node %s" % (instance_name, node))
4419

    
4420
    if fqdn_warn and not found:
4421
      feedback_fn("Export not found. If trying to remove an export belonging"
4422
                  " to a deleted instance please use its Fully Qualified"
4423
                  " Domain Name.")
4424

    
4425

    
4426
class TagsLU(NoHooksLU):
4427
  """Generic tags LU.
4428

4429
  This is an abstract class which is the parent of all the other tags LUs.
4430

4431
  """
4432
  def CheckPrereq(self):
4433
    """Check prerequisites.
4434

4435
    """
4436
    if self.op.kind == constants.TAG_CLUSTER:
4437
      self.target = self.cfg.GetClusterInfo()
4438
    elif self.op.kind == constants.TAG_NODE:
4439
      name = self.cfg.ExpandNodeName(self.op.name)
4440
      if name is None:
4441
        raise errors.OpPrereqError("Invalid node name (%s)" %
4442
                                   (self.op.name,))
4443
      self.op.name = name
4444
      self.target = self.cfg.GetNodeInfo(name)
4445
    elif self.op.kind == constants.TAG_INSTANCE:
4446
      name = self.cfg.ExpandInstanceName(self.op.name)
4447
      if name is None:
4448
        raise errors.OpPrereqError("Invalid instance name (%s)" %
4449
                                   (self.op.name,))
4450
      self.op.name = name
4451
      self.target = self.cfg.GetInstanceInfo(name)
4452
    else:
4453
      raise errors.OpPrereqError("Wrong tag type requested (%s)" %
4454
                                 str(self.op.kind))
4455

    
4456

    
4457
class LUGetTags(TagsLU):
4458
  """Returns the tags of a given object.
4459

4460
  """
4461
  _OP_REQP = ["kind", "name"]
4462

    
4463
  def Exec(self, feedback_fn):
4464
    """Returns the tag list.
4465

4466
    """
4467
    return list(self.target.GetTags())
4468

    
4469

    
4470
class LUSearchTags(NoHooksLU):
4471
  """Searches the tags for a given pattern.
4472

4473
  """
4474
  _OP_REQP = ["pattern"]
4475

    
4476
  def CheckPrereq(self):
4477
    """Check prerequisites.
4478

4479
    This checks the pattern passed for validity by compiling it.
4480

4481
    """
4482
    try:
4483
      self.re = re.compile(self.op.pattern)
4484
    except re.error, err:
4485
      raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
4486
                                 (self.op.pattern, err))
4487

    
4488
  def Exec(self, feedback_fn):
4489
    """Returns the tag list.
4490

4491
    """
4492
    cfg = self.cfg
4493
    tgts = [("/cluster", cfg.GetClusterInfo())]
4494
    ilist = [cfg.GetInstanceInfo(name) for name in cfg.GetInstanceList()]
4495
    tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
4496
    nlist = [cfg.GetNodeInfo(name) for name in cfg.GetNodeList()]
4497
    tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
4498
    results = []
4499
    for path, target in tgts:
4500
      for tag in target.GetTags():
4501
        if self.re.search(tag):
4502
          results.append((path, tag))
4503
    return results
4504

    
4505

    
4506
class LUAddTags(TagsLU):
4507
  """Sets a tag on a given object.
4508

4509
  """
4510
  _OP_REQP = ["kind", "name", "tags"]
4511

    
4512
  def CheckPrereq(self):
4513
    """Check prerequisites.
4514

4515
    This checks the type and length of the tag name and value.
4516

4517
    """
4518
    TagsLU.CheckPrereq(self)
4519
    for tag in self.op.tags:
4520
      objects.TaggableObject.ValidateTag(tag)
4521

    
4522
  def Exec(self, feedback_fn):
4523
    """Sets the tag.
4524

4525
    """
4526
    try:
4527
      for tag in self.op.tags:
4528
        self.target.AddTag(tag)
4529
    except errors.TagError, err:
4530
      raise errors.OpExecError("Error while setting tag: %s" % str(err))
4531
    try:
4532
      self.cfg.Update(self.target)
4533
    except errors.ConfigurationError:
4534
      raise errors.OpRetryError("There has been a modification to the"
4535
                                " config file and the operation has been"
4536
                                " aborted. Please retry.")
4537

    
4538

    
4539
class LUDelTags(TagsLU):
4540
  """Delete a list of tags from a given object.
4541

4542
  """
4543
  _OP_REQP = ["kind", "name", "tags"]
4544

    
4545
  def CheckPrereq(self):
4546
    """Check prerequisites.
4547

4548
    This checks that we have the given tag.
4549

4550
    """
4551
    TagsLU.CheckPrereq(self)
4552
    for tag in self.op.tags:
4553
      objects.TaggableObject.ValidateTag(tag)
4554
    del_tags = frozenset(self.op.tags)
4555
    cur_tags = self.target.GetTags()
4556
    if not del_tags <= cur_tags:
4557
      diff_tags = del_tags - cur_tags
4558
      diff_names = ["'%s'" % tag for tag in diff_tags]
4559
      diff_names.sort()
4560
      raise errors.OpPrereqError("Tag(s) %s not found" %
4561
                                 (",".join(diff_names)))
4562

    
4563
  def Exec(self, feedback_fn):
4564
    """Remove the tag from the object.
4565

4566
    """
4567
    for tag in self.op.tags:
4568
      self.target.RemoveTag(tag)
4569
    try:
4570
      self.cfg.Update(self.target)
4571
    except errors.ConfigurationError:
4572
      raise errors.OpRetryError("There has been a modification to the"
4573
                                " config file and the operation has been"
4574
                                " aborted. Please retry.")
4575

    
4576

    
4577
class LUTestDelay(NoHooksLU):
4578
  """Sleep for a specified amount of time.
4579

4580
  This LU sleeps on the master and/or nodes for a specified amount of
4581
  time.
4582

4583
  """
4584
  _OP_REQP = ["duration", "on_master", "on_nodes"]
4585
  REQ_BGL = False
4586

    
4587
  def ExpandNames(self):
4588
    """Expand names and set required locks.
4589

4590
    This expands the node list, if any.
4591

4592
    """
4593
    self.needed_locks = {}
4594
    if self.op.on_nodes:
4595
      # _GetWantedNodes can be used here, but is not always appropriate to use
4596
      # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
4597
      # more information.
4598
      self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
4599
      self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
4600

    
4601
  def CheckPrereq(self):
4602
    """Check prerequisites.
4603

4604
    """
4605

    
4606
  def Exec(self, feedback_fn):
4607
    """Do the actual sleep.
4608

4609
    """
4610
    if self.op.on_master:
4611
      if not utils.TestDelay(self.op.duration):
4612
        raise errors.OpExecError("Error during master delay test")
4613
    if self.op.on_nodes:
4614
      result = rpc.call_test_delay(self.op.on_nodes, self.op.duration)
4615
      if not result:
4616
        raise errors.OpExecError("Complete failure from rpc call")
4617
      for node, node_result in result.items():
4618
        if not node_result:
4619
          raise errors.OpExecError("Failure during rpc call to node %s,"
4620
                                   " result: %s" % (node, node_result))
4621

    
4622

    
4623
class IAllocator(object):
4624
  """IAllocator framework.
4625

4626
  An IAllocator instance has three sets of attributes:
4627
    - cfg/sstore that are needed to query the cluster
4628
    - input data (all members of the _KEYS class attribute are required)
4629
    - four buffer attributes (in|out_data|text), that represent the
4630
      input (to the external script) in text and data structure format,
4631
      and the output from it, again in two formats
4632
    - the result variables from the script (success, info, nodes) for
4633
      easy usage
4634

4635
  """
4636
  _ALLO_KEYS = [
4637
    "mem_size", "disks", "disk_template",
4638
    "os", "tags", "nics", "vcpus",
4639
    ]
4640
  _RELO_KEYS = [
4641
    "relocate_from",
4642
    ]
4643

    
4644
  def __init__(self, cfg, sstore, mode, name, **kwargs):
4645
    self.cfg = cfg
4646
    self.sstore = sstore
4647
    # init buffer variables
4648
    self.in_text = self.out_text = self.in_data = self.out_data = None
4649
    # init all input fields so that pylint is happy
4650
    self.mode = mode
4651
    self.name = name
4652
    self.mem_size = self.disks = self.disk_template = None
4653
    self.os = self.tags = self.nics = self.vcpus = None
4654
    self.relocate_from = None
4655
    # computed fields
4656
    self.required_nodes = None
4657
    # init result fields
4658
    self.success = self.info = self.nodes = None
4659
    if self.mode == constants.IALLOCATOR_MODE_ALLOC:
4660
      keyset = self._ALLO_KEYS
4661
    elif self.mode == constants.IALLOCATOR_MODE_RELOC:
4662
      keyset = self._RELO_KEYS
4663
    else:
4664
      raise errors.ProgrammerError("Unknown mode '%s' passed to the"
4665
                                   " IAllocator" % self.mode)
4666
    for key in kwargs:
4667
      if key not in keyset:
4668
        raise errors.ProgrammerError("Invalid input parameter '%s' to"
4669
                                     " IAllocator" % key)
4670
      setattr(self, key, kwargs[key])
4671
    for key in keyset:
4672
      if key not in kwargs:
4673
        raise errors.ProgrammerError("Missing input parameter '%s' to"
4674
                                     " IAllocator" % key)
4675
    self._BuildInputData()
4676

    
4677
  def _ComputeClusterData(self):
4678
    """Compute the generic allocator input data.
4679

4680
    This is the data that is independent of the actual operation.
4681

4682
    """
4683
    cfg = self.cfg
4684
    # cluster data
4685
    data = {
4686
      "version": 1,
4687
      "cluster_name": self.sstore.GetClusterName(),
4688
      "cluster_tags": list(cfg.GetClusterInfo().GetTags()),
4689
      "hypervisor_type": self.sstore.GetHypervisorType(),
4690
      # we don't have job IDs
4691
      }
4692

    
4693
    i_list = [cfg.GetInstanceInfo(iname) for iname in cfg.GetInstanceList()]
4694

    
4695
    # node data
4696
    node_results = {}
4697
    node_list = cfg.GetNodeList()
4698
    node_data = rpc.call_node_info(node_list, cfg.GetVGName())
4699
    for nname in node_list:
4700
      ninfo = cfg.GetNodeInfo(nname)
4701
      if nname not in node_data or not isinstance(node_data[nname], dict):
4702
        raise errors.OpExecError("Can't get data for node %s" % nname)
4703
      remote_info = node_data[nname]
4704
      for attr in ['memory_total', 'memory_free', 'memory_dom0',
4705
                   'vg_size', 'vg_free', 'cpu_total']:
4706
        if attr not in remote_info:
4707
          raise errors.OpExecError("Node '%s' didn't return attribute '%s'" %
4708
                                   (nname, attr))
4709
        try:
4710
          remote_info[attr] = int(remote_info[attr])
4711
        except ValueError, err:
4712
          raise errors.OpExecError("Node '%s' returned invalid value for '%s':"
4713
                                   " %s" % (nname, attr, str(err)))
4714
      # compute memory used by primary instances
4715
      i_p_mem = i_p_up_mem = 0
4716
      for iinfo in i_list:
4717
        if iinfo.primary_node == nname:
4718
          i_p_mem += iinfo.memory
4719
          if iinfo.status == "up":
4720
            i_p_up_mem += iinfo.memory
4721

    
4722
      # compute memory used by instances
4723
      pnr = {
4724
        "tags": list(ninfo.GetTags()),
4725
        "total_memory": remote_info['memory_total'],
4726
        "reserved_memory": remote_info['memory_dom0'],
4727
        "free_memory": remote_info['memory_free'],
4728
        "i_pri_memory": i_p_mem,
4729
        "i_pri_up_memory": i_p_up_mem,
4730
        "total_disk": remote_info['vg_size'],
4731
        "free_disk": remote_info['vg_free'],
4732
        "primary_ip": ninfo.primary_ip,
4733
        "secondary_ip": ninfo.secondary_ip,
4734
        "total_cpus": remote_info['cpu_total'],
4735
        }
4736
      node_results[nname] = pnr
4737
    data["nodes"] = node_results
4738

    
4739
    # instance data
4740
    instance_data = {}
4741
    for iinfo in i_list:
4742
      nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
4743
                  for n in iinfo.nics]
4744
      pir = {
4745
        "tags": list(iinfo.GetTags()),
4746
        "should_run": iinfo.status == "up",
4747
        "vcpus": iinfo.vcpus,
4748
        "memory": iinfo.memory,
4749
        "os": iinfo.os,
4750
        "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
4751
        "nics": nic_data,
4752
        "disks": [{"size": dsk.size, "mode": "w"} for dsk in iinfo.disks],
4753
        "disk_template": iinfo.disk_template,
4754
        }
4755
      instance_data[iinfo.name] = pir
4756

    
4757
    data["instances"] = instance_data
4758

    
4759
    self.in_data = data
4760

    
4761
  def _AddNewInstance(self):
4762
    """Add new instance data to allocator structure.
4763

4764
    This in combination with _AllocatorGetClusterData will create the
4765
    correct structure needed as input for the allocator.
4766

4767
    The checks for the completeness of the opcode must have already been
4768
    done.
4769

4770
    """
4771
    data = self.in_data
4772
    if len(self.disks) != 2:
4773
      raise errors.OpExecError("Only two-disk configurations supported")
4774

    
4775
    disk_space = _ComputeDiskSize(self.disk_template,
4776
                                  self.disks[0]["size"], self.disks[1]["size"])
4777

    
4778
    if self.disk_template in constants.DTS_NET_MIRROR:
4779
      self.required_nodes = 2
4780
    else:
4781
      self.required_nodes = 1
4782
    request = {
4783
      "type": "allocate",
4784
      "name": self.name,
4785
      "disk_template": self.disk_template,
4786
      "tags": self.tags,
4787
      "os": self.os,
4788
      "vcpus": self.vcpus,
4789
      "memory": self.mem_size,
4790
      "disks": self.disks,
4791
      "disk_space_total": disk_space,
4792
      "nics": self.nics,
4793
      "required_nodes": self.required_nodes,
4794
      }
4795
    data["request"] = request
4796

    
4797
  def _AddRelocateInstance(self):
4798
    """Add relocate instance data to allocator structure.
4799

4800
    This in combination with _IAllocatorGetClusterData will create the
4801
    correct structure needed as input for the allocator.
4802

4803
    The checks for the completeness of the opcode must have already been
4804
    done.
4805

4806
    """
4807
    instance = self.cfg.GetInstanceInfo(self.name)
4808
    if instance is None:
4809
      raise errors.ProgrammerError("Unknown instance '%s' passed to"
4810
                                   " IAllocator" % self.name)
4811

    
4812
    if instance.disk_template not in constants.DTS_NET_MIRROR:
4813
      raise errors.OpPrereqError("Can't relocate non-mirrored instances")
4814

    
4815
    if len(instance.secondary_nodes) != 1:
4816
      raise errors.OpPrereqError("Instance has not exactly one secondary node")
4817

    
4818
    self.required_nodes = 1
4819

    
4820
    disk_space = _ComputeDiskSize(instance.disk_template,
4821
                                  instance.disks[0].size,
4822
                                  instance.disks[1].size)
4823

    
4824
    request = {
4825
      "type": "relocate",
4826
      "name": self.name,
4827
      "disk_space_total": disk_space,
4828
      "required_nodes": self.required_nodes,
4829
      "relocate_from": self.relocate_from,
4830
      }
4831
    self.in_data["request"] = request
4832

    
4833
  def _BuildInputData(self):
4834
    """Build input data structures.
4835

4836
    """
4837
    self._ComputeClusterData()
4838

    
4839
    if self.mode == constants.IALLOCATOR_MODE_ALLOC:
4840
      self._AddNewInstance()
4841
    else:
4842
      self._AddRelocateInstance()
4843

    
4844
    self.in_text = serializer.Dump(self.in_data)
4845

    
4846
  def Run(self, name, validate=True, call_fn=rpc.call_iallocator_runner):
4847
    """Run an instance allocator and return the results.
4848

4849
    """
4850
    data = self.in_text
4851

    
4852
    result = call_fn(self.sstore.GetMasterNode(), name, self.in_text)
4853

    
4854
    if not isinstance(result, tuple) or len(result) != 4:
4855
      raise errors.OpExecError("Invalid result from master iallocator runner")
4856

    
4857
    rcode, stdout, stderr, fail = result
4858

    
4859
    if rcode == constants.IARUN_NOTFOUND:
4860
      raise errors.OpExecError("Can't find allocator '%s'" % name)
4861
    elif rcode == constants.IARUN_FAILURE:
4862
        raise errors.OpExecError("Instance allocator call failed: %s,"
4863
                                 " output: %s" %
4864
                                 (fail, stdout+stderr))
4865
    self.out_text = stdout
4866
    if validate:
4867
      self._ValidateResult()
4868

    
4869
  def _ValidateResult(self):
4870
    """Process the allocator results.
4871

4872
    This will process and if successful save the result in
4873
    self.out_data and the other parameters.
4874

4875
    """
4876
    try:
4877
      rdict = serializer.Load(self.out_text)
4878
    except Exception, err:
4879
      raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
4880

    
4881
    if not isinstance(rdict, dict):
4882
      raise errors.OpExecError("Can't parse iallocator results: not a dict")
4883

    
4884
    for key in "success", "info", "nodes":
4885
      if key not in rdict:
4886
        raise errors.OpExecError("Can't parse iallocator results:"
4887
                                 " missing key '%s'" % key)
4888
      setattr(self, key, rdict[key])
4889

    
4890
    if not isinstance(rdict["nodes"], list):
4891
      raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
4892
                               " is not a list")
4893
    self.out_data = rdict
4894

    
4895

    
4896
class LUTestAllocator(NoHooksLU):
4897
  """Run allocator tests.
4898

4899
  This LU runs the allocator tests
4900

4901
  """
4902
  _OP_REQP = ["direction", "mode", "name"]
4903

    
4904
  def CheckPrereq(self):
4905
    """Check prerequisites.
4906

4907
    This checks the opcode parameters depending on the director and mode test.
4908

4909
    """
4910
    if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
4911
      for attr in ["name", "mem_size", "disks", "disk_template",
4912
                   "os", "tags", "nics", "vcpus"]:
4913
        if not hasattr(self.op, attr):
4914
          raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
4915
                                     attr)
4916
      iname = self.cfg.ExpandInstanceName(self.op.name)
4917
      if iname is not None:
4918
        raise errors.OpPrereqError("Instance '%s' already in the cluster" %
4919
                                   iname)
4920
      if not isinstance(self.op.nics, list):
4921
        raise errors.OpPrereqError("Invalid parameter 'nics'")
4922
      for row in self.op.nics:
4923
        if (not isinstance(row, dict) or
4924
            "mac" not in row or
4925
            "ip" not in row or
4926
            "bridge" not in row):
4927
          raise errors.OpPrereqError("Invalid contents of the"
4928
                                     " 'nics' parameter")
4929
      if not isinstance(self.op.disks, list):
4930
        raise errors.OpPrereqError("Invalid parameter 'disks'")
4931
      if len(self.op.disks) != 2:
4932
        raise errors.OpPrereqError("Only two-disk configurations supported")
4933
      for row in self.op.disks:
4934
        if (not isinstance(row, dict) or
4935
            "size" not in row or
4936
            not isinstance(row["size"], int) or
4937
            "mode" not in row or
4938
            row["mode"] not in ['r', 'w']):
4939
          raise errors.OpPrereqError("Invalid contents of the"
4940
                                     " 'disks' parameter")
4941
    elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
4942
      if not hasattr(self.op, "name"):
4943
        raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
4944
      fname = self.cfg.ExpandInstanceName(self.op.name)
4945
      if fname is None:
4946
        raise errors.OpPrereqError("Instance '%s' not found for relocation" %
4947
                                   self.op.name)
4948
      self.op.name = fname
4949
      self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
4950
    else:
4951
      raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
4952
                                 self.op.mode)
4953

    
4954
    if self.op.direction == constants.IALLOCATOR_DIR_OUT:
4955
      if not hasattr(self.op, "allocator") or self.op.allocator is None:
4956
        raise errors.OpPrereqError("Missing allocator name")
4957
    elif self.op.direction != constants.IALLOCATOR_DIR_IN:
4958
      raise errors.OpPrereqError("Wrong allocator test '%s'" %
4959
                                 self.op.direction)
4960

    
4961
  def Exec(self, feedback_fn):
4962
    """Run the allocator test.
4963

4964
    """
4965
    if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
4966
      ial = IAllocator(self.cfg, self.sstore,
4967
                       mode=self.op.mode,
4968
                       name=self.op.name,
4969
                       mem_size=self.op.mem_size,
4970
                       disks=self.op.disks,
4971
                       disk_template=self.op.disk_template,
4972
                       os=self.op.os,
4973
                       tags=self.op.tags,
4974
                       nics=self.op.nics,
4975
                       vcpus=self.op.vcpus,
4976
                       )
4977
    else:
4978
      ial = IAllocator(self.cfg, self.sstore,
4979
                       mode=self.op.mode,
4980
                       name=self.op.name,
4981
                       relocate_from=list(self.relocate_from),
4982
                       )
4983

    
4984
    if self.op.direction == constants.IALLOCATOR_DIR_IN:
4985
      result = ial.in_text
4986
    else:
4987
      ial.Run(self.op.allocator, validate=False)
4988
      result = ial.out_text
4989
    return result