Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ 6048c986

History | View | Annotate | Download (168.4 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0613,W0201
25

    
26
import os
27
import os.path
28
import sha
29
import time
30
import tempfile
31
import re
32
import platform
33

    
34
from ganeti import rpc
35
from ganeti import ssh
36
from ganeti import logger
37
from ganeti import utils
38
from ganeti import errors
39
from ganeti import hypervisor
40
from ganeti import locking
41
from ganeti import config
42
from ganeti import constants
43
from ganeti import objects
44
from ganeti import opcodes
45
from ganeti import ssconf
46
from ganeti import serializer
47

    
48

    
49
class LogicalUnit(object):
50
  """Logical Unit base class.
51

52
  Subclasses must follow these rules:
53
    - implement CheckPrereq which also fills in the opcode instance
54
      with all the fields (even if as None)
55
    - implement Exec
56
    - implement BuildHooksEnv
57
    - redefine HPATH and HTYPE
58
    - optionally redefine their run requirements:
59
        REQ_MASTER: the LU needs to run on the master node
60
        REQ_WSSTORE: the LU needs a writable SimpleStore
61
        REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
62

63
  Note that all commands require root permissions.
64

65
  """
66
  HPATH = None
67
  HTYPE = None
68
  _OP_REQP = []
69
  REQ_MASTER = True
70
  REQ_WSSTORE = False
71
  REQ_BGL = True
72

    
73
  def __init__(self, processor, op, context, sstore):
74
    """Constructor for LogicalUnit.
75

76
    This needs to be overriden in derived classes in order to check op
77
    validity.
78

79
    """
80
    self.proc = processor
81
    self.op = op
82
    self.cfg = context.cfg
83
    self.sstore = sstore
84
    self.context = context
85
    self.__ssh = None
86

    
87
    for attr_name in self._OP_REQP:
88
      attr_val = getattr(op, attr_name, None)
89
      if attr_val is None:
90
        raise errors.OpPrereqError("Required parameter '%s' missing" %
91
                                   attr_name)
92

    
93
    if not self.cfg.IsCluster():
94
      raise errors.OpPrereqError("Cluster not initialized yet,"
95
                                 " use 'gnt-cluster init' first.")
96
    if self.REQ_MASTER:
97
      master = sstore.GetMasterNode()
98
      if master != utils.HostInfo().name:
99
        raise errors.OpPrereqError("Commands must be run on the master"
100
                                   " node %s" % master)
101

    
102
  def __GetSSH(self):
103
    """Returns the SshRunner object
104

105
    """
106
    if not self.__ssh:
107
      self.__ssh = ssh.SshRunner(self.sstore)
108
    return self.__ssh
109

    
110
  ssh = property(fget=__GetSSH)
111

    
112
  def CheckPrereq(self):
113
    """Check prerequisites for this LU.
114

115
    This method should check that the prerequisites for the execution
116
    of this LU are fulfilled. It can do internode communication, but
117
    it should be idempotent - no cluster or system changes are
118
    allowed.
119

120
    The method should raise errors.OpPrereqError in case something is
121
    not fulfilled. Its return value is ignored.
122

123
    This method should also update all the parameters of the opcode to
124
    their canonical form; e.g. a short node name must be fully
125
    expanded after this method has successfully completed (so that
126
    hooks, logging, etc. work correctly).
127

128
    """
129
    raise NotImplementedError
130

    
131
  def Exec(self, feedback_fn):
132
    """Execute the LU.
133

134
    This method should implement the actual work. It should raise
135
    errors.OpExecError for failures that are somewhat dealt with in
136
    code, or expected.
137

138
    """
139
    raise NotImplementedError
140

    
141
  def BuildHooksEnv(self):
142
    """Build hooks environment for this LU.
143

144
    This method should return a three-node tuple consisting of: a dict
145
    containing the environment that will be used for running the
146
    specific hook for this LU, a list of node names on which the hook
147
    should run before the execution, and a list of node names on which
148
    the hook should run after the execution.
149

150
    The keys of the dict must not have 'GANETI_' prefixed as this will
151
    be handled in the hooks runner. Also note additional keys will be
152
    added by the hooks runner. If the LU doesn't define any
153
    environment, an empty dict (and not None) should be returned.
154

155
    No nodes should be returned as an empty list (and not None).
156

157
    Note that if the HPATH for a LU class is None, this function will
158
    not be called.
159

160
    """
161
    raise NotImplementedError
162

    
163
  def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
164
    """Notify the LU about the results of its hooks.
165

166
    This method is called every time a hooks phase is executed, and notifies
167
    the Logical Unit about the hooks' result. The LU can then use it to alter
168
    its result based on the hooks.  By default the method does nothing and the
169
    previous result is passed back unchanged but any LU can define it if it
170
    wants to use the local cluster hook-scripts somehow.
171

172
    Args:
173
      phase: the hooks phase that has just been run
174
      hooks_results: the results of the multi-node hooks rpc call
175
      feedback_fn: function to send feedback back to the caller
176
      lu_result: the previous result this LU had, or None in the PRE phase.
177

178
    """
179
    return lu_result
180

    
181

    
182
class NoHooksLU(LogicalUnit):
183
  """Simple LU which runs no hooks.
184

185
  This LU is intended as a parent for other LogicalUnits which will
186
  run no hooks, in order to reduce duplicate code.
187

188
  """
189
  HPATH = None
190
  HTYPE = None
191

    
192

    
193
def _GetWantedNodes(lu, nodes):
194
  """Returns list of checked and expanded node names.
195

196
  Args:
197
    nodes: List of nodes (strings) or None for all
198

199
  """
200
  if not isinstance(nodes, list):
201
    raise errors.OpPrereqError("Invalid argument type 'nodes'")
202

    
203
  if nodes:
204
    wanted = []
205

    
206
    for name in nodes:
207
      node = lu.cfg.ExpandNodeName(name)
208
      if node is None:
209
        raise errors.OpPrereqError("No such node name '%s'" % name)
210
      wanted.append(node)
211

    
212
  else:
213
    wanted = lu.cfg.GetNodeList()
214
  return utils.NiceSort(wanted)
215

    
216

    
217
def _GetWantedInstances(lu, instances):
218
  """Returns list of checked and expanded instance names.
219

220
  Args:
221
    instances: List of instances (strings) or None for all
222

223
  """
224
  if not isinstance(instances, list):
225
    raise errors.OpPrereqError("Invalid argument type 'instances'")
226

    
227
  if instances:
228
    wanted = []
229

    
230
    for name in instances:
231
      instance = lu.cfg.ExpandInstanceName(name)
232
      if instance is None:
233
        raise errors.OpPrereqError("No such instance name '%s'" % name)
234
      wanted.append(instance)
235

    
236
  else:
237
    wanted = lu.cfg.GetInstanceList()
238
  return utils.NiceSort(wanted)
239

    
240

    
241
def _CheckOutputFields(static, dynamic, selected):
242
  """Checks whether all selected fields are valid.
243

244
  Args:
245
    static: Static fields
246
    dynamic: Dynamic fields
247

248
  """
249
  static_fields = frozenset(static)
250
  dynamic_fields = frozenset(dynamic)
251

    
252
  all_fields = static_fields | dynamic_fields
253

    
254
  if not all_fields.issuperset(selected):
255
    raise errors.OpPrereqError("Unknown output fields selected: %s"
256
                               % ",".join(frozenset(selected).
257
                                          difference(all_fields)))
258

    
259

    
260
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
261
                          memory, vcpus, nics):
262
  """Builds instance related env variables for hooks from single variables.
263

264
  Args:
265
    secondary_nodes: List of secondary nodes as strings
266
  """
267
  env = {
268
    "OP_TARGET": name,
269
    "INSTANCE_NAME": name,
270
    "INSTANCE_PRIMARY": primary_node,
271
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
272
    "INSTANCE_OS_TYPE": os_type,
273
    "INSTANCE_STATUS": status,
274
    "INSTANCE_MEMORY": memory,
275
    "INSTANCE_VCPUS": vcpus,
276
  }
277

    
278
  if nics:
279
    nic_count = len(nics)
280
    for idx, (ip, bridge, mac) in enumerate(nics):
281
      if ip is None:
282
        ip = ""
283
      env["INSTANCE_NIC%d_IP" % idx] = ip
284
      env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
285
      env["INSTANCE_NIC%d_HWADDR" % idx] = mac
286
  else:
287
    nic_count = 0
288

    
289
  env["INSTANCE_NIC_COUNT"] = nic_count
290

    
291
  return env
292

    
293

    
294
def _BuildInstanceHookEnvByObject(instance, override=None):
295
  """Builds instance related env variables for hooks from an object.
296

297
  Args:
298
    instance: objects.Instance object of instance
299
    override: dict of values to override
300
  """
301
  args = {
302
    'name': instance.name,
303
    'primary_node': instance.primary_node,
304
    'secondary_nodes': instance.secondary_nodes,
305
    'os_type': instance.os,
306
    'status': instance.os,
307
    'memory': instance.memory,
308
    'vcpus': instance.vcpus,
309
    'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
310
  }
311
  if override:
312
    args.update(override)
313
  return _BuildInstanceHookEnv(**args)
314

    
315

    
316
def _CheckInstanceBridgesExist(instance):
317
  """Check that the brigdes needed by an instance exist.
318

319
  """
320
  # check bridges existance
321
  brlist = [nic.bridge for nic in instance.nics]
322
  if not rpc.call_bridges_exist(instance.primary_node, brlist):
323
    raise errors.OpPrereqError("one or more target bridges %s does not"
324
                               " exist on destination node '%s'" %
325
                               (brlist, instance.primary_node))
326

    
327

    
328
class LUDestroyCluster(NoHooksLU):
329
  """Logical unit for destroying the cluster.
330

331
  """
332
  _OP_REQP = []
333

    
334
  def CheckPrereq(self):
335
    """Check prerequisites.
336

337
    This checks whether the cluster is empty.
338

339
    Any errors are signalled by raising errors.OpPrereqError.
340

341
    """
342
    master = self.sstore.GetMasterNode()
343

    
344
    nodelist = self.cfg.GetNodeList()
345
    if len(nodelist) != 1 or nodelist[0] != master:
346
      raise errors.OpPrereqError("There are still %d node(s) in"
347
                                 " this cluster." % (len(nodelist) - 1))
348
    instancelist = self.cfg.GetInstanceList()
349
    if instancelist:
350
      raise errors.OpPrereqError("There are still %d instance(s) in"
351
                                 " this cluster." % len(instancelist))
352

    
353
  def Exec(self, feedback_fn):
354
    """Destroys the cluster.
355

356
    """
357
    master = self.sstore.GetMasterNode()
358
    if not rpc.call_node_stop_master(master):
359
      raise errors.OpExecError("Could not disable the master role")
360
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
361
    utils.CreateBackup(priv_key)
362
    utils.CreateBackup(pub_key)
363
    rpc.call_node_leave_cluster(master)
364

    
365

    
366
class LUVerifyCluster(LogicalUnit):
367
  """Verifies the cluster status.
368

369
  """
370
  HPATH = "cluster-verify"
371
  HTYPE = constants.HTYPE_CLUSTER
372
  _OP_REQP = ["skip_checks"]
373

    
374
  def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
375
                  remote_version, feedback_fn):
376
    """Run multiple tests against a node.
377

378
    Test list:
379
      - compares ganeti version
380
      - checks vg existance and size > 20G
381
      - checks config file checksum
382
      - checks ssh to other nodes
383

384
    Args:
385
      node: name of the node to check
386
      file_list: required list of files
387
      local_cksum: dictionary of local files and their checksums
388

389
    """
390
    # compares ganeti version
391
    local_version = constants.PROTOCOL_VERSION
392
    if not remote_version:
393
      feedback_fn("  - ERROR: connection to %s failed" % (node))
394
      return True
395

    
396
    if local_version != remote_version:
397
      feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
398
                      (local_version, node, remote_version))
399
      return True
400

    
401
    # checks vg existance and size > 20G
402

    
403
    bad = False
404
    if not vglist:
405
      feedback_fn("  - ERROR: unable to check volume groups on node %s." %
406
                      (node,))
407
      bad = True
408
    else:
409
      vgstatus = utils.CheckVolumeGroupSize(vglist, self.cfg.GetVGName(),
410
                                            constants.MIN_VG_SIZE)
411
      if vgstatus:
412
        feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
413
        bad = True
414

    
415
    # checks config file checksum
416
    # checks ssh to any
417

    
418
    if 'filelist' not in node_result:
419
      bad = True
420
      feedback_fn("  - ERROR: node hasn't returned file checksum data")
421
    else:
422
      remote_cksum = node_result['filelist']
423
      for file_name in file_list:
424
        if file_name not in remote_cksum:
425
          bad = True
426
          feedback_fn("  - ERROR: file '%s' missing" % file_name)
427
        elif remote_cksum[file_name] != local_cksum[file_name]:
428
          bad = True
429
          feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
430

    
431
    if 'nodelist' not in node_result:
432
      bad = True
433
      feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
434
    else:
435
      if node_result['nodelist']:
436
        bad = True
437
        for node in node_result['nodelist']:
438
          feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
439
                          (node, node_result['nodelist'][node]))
440
    if 'node-net-test' not in node_result:
441
      bad = True
442
      feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
443
    else:
444
      if node_result['node-net-test']:
445
        bad = True
446
        nlist = utils.NiceSort(node_result['node-net-test'].keys())
447
        for node in nlist:
448
          feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
449
                          (node, node_result['node-net-test'][node]))
450

    
451
    hyp_result = node_result.get('hypervisor', None)
452
    if hyp_result is not None:
453
      feedback_fn("  - ERROR: hypervisor verify failure: '%s'" % hyp_result)
454
    return bad
455

    
456
  def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
457
                      node_instance, feedback_fn):
458
    """Verify an instance.
459

460
    This function checks to see if the required block devices are
461
    available on the instance's node.
462

463
    """
464
    bad = False
465

    
466
    node_current = instanceconfig.primary_node
467

    
468
    node_vol_should = {}
469
    instanceconfig.MapLVsByNode(node_vol_should)
470

    
471
    for node in node_vol_should:
472
      for volume in node_vol_should[node]:
473
        if node not in node_vol_is or volume not in node_vol_is[node]:
474
          feedback_fn("  - ERROR: volume %s missing on node %s" %
475
                          (volume, node))
476
          bad = True
477

    
478
    if not instanceconfig.status == 'down':
479
      if (node_current not in node_instance or
480
          not instance in node_instance[node_current]):
481
        feedback_fn("  - ERROR: instance %s not running on node %s" %
482
                        (instance, node_current))
483
        bad = True
484

    
485
    for node in node_instance:
486
      if (not node == node_current):
487
        if instance in node_instance[node]:
488
          feedback_fn("  - ERROR: instance %s should not run on node %s" %
489
                          (instance, node))
490
          bad = True
491

    
492
    return bad
493

    
494
  def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
495
    """Verify if there are any unknown volumes in the cluster.
496

497
    The .os, .swap and backup volumes are ignored. All other volumes are
498
    reported as unknown.
499

500
    """
501
    bad = False
502

    
503
    for node in node_vol_is:
504
      for volume in node_vol_is[node]:
505
        if node not in node_vol_should or volume not in node_vol_should[node]:
506
          feedback_fn("  - ERROR: volume %s on node %s should not exist" %
507
                      (volume, node))
508
          bad = True
509
    return bad
510

    
511
  def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
512
    """Verify the list of running instances.
513

514
    This checks what instances are running but unknown to the cluster.
515

516
    """
517
    bad = False
518
    for node in node_instance:
519
      for runninginstance in node_instance[node]:
520
        if runninginstance not in instancelist:
521
          feedback_fn("  - ERROR: instance %s on node %s should not exist" %
522
                          (runninginstance, node))
523
          bad = True
524
    return bad
525

    
526
  def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
527
    """Verify N+1 Memory Resilience.
528

529
    Check that if one single node dies we can still start all the instances it
530
    was primary for.
531

532
    """
533
    bad = False
534

    
535
    for node, nodeinfo in node_info.iteritems():
536
      # This code checks that every node which is now listed as secondary has
537
      # enough memory to host all instances it is supposed to should a single
538
      # other node in the cluster fail.
539
      # FIXME: not ready for failover to an arbitrary node
540
      # FIXME: does not support file-backed instances
541
      # WARNING: we currently take into account down instances as well as up
542
      # ones, considering that even if they're down someone might want to start
543
      # them even in the event of a node failure.
544
      for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
545
        needed_mem = 0
546
        for instance in instances:
547
          needed_mem += instance_cfg[instance].memory
548
        if nodeinfo['mfree'] < needed_mem:
549
          feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
550
                      " failovers should node %s fail" % (node, prinode))
551
          bad = True
552
    return bad
553

    
554
  def CheckPrereq(self):
555
    """Check prerequisites.
556

557
    Transform the list of checks we're going to skip into a set and check that
558
    all its members are valid.
559

560
    """
561
    self.skip_set = frozenset(self.op.skip_checks)
562
    if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
563
      raise errors.OpPrereqError("Invalid checks to be skipped specified")
564

    
565
  def BuildHooksEnv(self):
566
    """Build hooks env.
567

568
    Cluster-Verify hooks just rone in the post phase and their failure makes
569
    the output be logged in the verify output and the verification to fail.
570

571
    """
572
    all_nodes = self.cfg.GetNodeList()
573
    # TODO: populate the environment with useful information for verify hooks
574
    env = {}
575
    return env, [], all_nodes
576

    
577
  def Exec(self, feedback_fn):
578
    """Verify integrity of cluster, performing various test on nodes.
579

580
    """
581
    bad = False
582
    feedback_fn("* Verifying global settings")
583
    for msg in self.cfg.VerifyConfig():
584
      feedback_fn("  - ERROR: %s" % msg)
585

    
586
    vg_name = self.cfg.GetVGName()
587
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
588
    nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
589
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
590
    i_non_redundant = [] # Non redundant instances
591
    node_volume = {}
592
    node_instance = {}
593
    node_info = {}
594
    instance_cfg = {}
595

    
596
    # FIXME: verify OS list
597
    # do local checksums
598
    file_names = list(self.sstore.GetFileList())
599
    file_names.append(constants.SSL_CERT_FILE)
600
    file_names.append(constants.CLUSTER_CONF_FILE)
601
    local_checksums = utils.FingerprintFiles(file_names)
602

    
603
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
604
    all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
605
    all_instanceinfo = rpc.call_instance_list(nodelist)
606
    all_vglist = rpc.call_vg_list(nodelist)
607
    node_verify_param = {
608
      'filelist': file_names,
609
      'nodelist': nodelist,
610
      'hypervisor': None,
611
      'node-net-test': [(node.name, node.primary_ip, node.secondary_ip)
612
                        for node in nodeinfo]
613
      }
614
    all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
615
    all_rversion = rpc.call_version(nodelist)
616
    all_ninfo = rpc.call_node_info(nodelist, self.cfg.GetVGName())
617

    
618
    for node in nodelist:
619
      feedback_fn("* Verifying node %s" % node)
620
      result = self._VerifyNode(node, file_names, local_checksums,
621
                                all_vglist[node], all_nvinfo[node],
622
                                all_rversion[node], feedback_fn)
623
      bad = bad or result
624

    
625
      # node_volume
626
      volumeinfo = all_volumeinfo[node]
627

    
628
      if isinstance(volumeinfo, basestring):
629
        feedback_fn("  - ERROR: LVM problem on node %s: %s" %
630
                    (node, volumeinfo[-400:].encode('string_escape')))
631
        bad = True
632
        node_volume[node] = {}
633
      elif not isinstance(volumeinfo, dict):
634
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
635
        bad = True
636
        continue
637
      else:
638
        node_volume[node] = volumeinfo
639

    
640
      # node_instance
641
      nodeinstance = all_instanceinfo[node]
642
      if type(nodeinstance) != list:
643
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
644
        bad = True
645
        continue
646

    
647
      node_instance[node] = nodeinstance
648

    
649
      # node_info
650
      nodeinfo = all_ninfo[node]
651
      if not isinstance(nodeinfo, dict):
652
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
653
        bad = True
654
        continue
655

    
656
      try:
657
        node_info[node] = {
658
          "mfree": int(nodeinfo['memory_free']),
659
          "dfree": int(nodeinfo['vg_free']),
660
          "pinst": [],
661
          "sinst": [],
662
          # dictionary holding all instances this node is secondary for,
663
          # grouped by their primary node. Each key is a cluster node, and each
664
          # value is a list of instances which have the key as primary and the
665
          # current node as secondary.  this is handy to calculate N+1 memory
666
          # availability if you can only failover from a primary to its
667
          # secondary.
668
          "sinst-by-pnode": {},
669
        }
670
      except ValueError:
671
        feedback_fn("  - ERROR: invalid value returned from node %s" % (node,))
672
        bad = True
673
        continue
674

    
675
    node_vol_should = {}
676

    
677
    for instance in instancelist:
678
      feedback_fn("* Verifying instance %s" % instance)
679
      inst_config = self.cfg.GetInstanceInfo(instance)
680
      result =  self._VerifyInstance(instance, inst_config, node_volume,
681
                                     node_instance, feedback_fn)
682
      bad = bad or result
683

    
684
      inst_config.MapLVsByNode(node_vol_should)
685

    
686
      instance_cfg[instance] = inst_config
687

    
688
      pnode = inst_config.primary_node
689
      if pnode in node_info:
690
        node_info[pnode]['pinst'].append(instance)
691
      else:
692
        feedback_fn("  - ERROR: instance %s, connection to primary node"
693
                    " %s failed" % (instance, pnode))
694
        bad = True
695

    
696
      # If the instance is non-redundant we cannot survive losing its primary
697
      # node, so we are not N+1 compliant. On the other hand we have no disk
698
      # templates with more than one secondary so that situation is not well
699
      # supported either.
700
      # FIXME: does not support file-backed instances
701
      if len(inst_config.secondary_nodes) == 0:
702
        i_non_redundant.append(instance)
703
      elif len(inst_config.secondary_nodes) > 1:
704
        feedback_fn("  - WARNING: multiple secondaries for instance %s"
705
                    % instance)
706

    
707
      for snode in inst_config.secondary_nodes:
708
        if snode in node_info:
709
          node_info[snode]['sinst'].append(instance)
710
          if pnode not in node_info[snode]['sinst-by-pnode']:
711
            node_info[snode]['sinst-by-pnode'][pnode] = []
712
          node_info[snode]['sinst-by-pnode'][pnode].append(instance)
713
        else:
714
          feedback_fn("  - ERROR: instance %s, connection to secondary node"
715
                      " %s failed" % (instance, snode))
716

    
717
    feedback_fn("* Verifying orphan volumes")
718
    result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
719
                                       feedback_fn)
720
    bad = bad or result
721

    
722
    feedback_fn("* Verifying remaining instances")
723
    result = self._VerifyOrphanInstances(instancelist, node_instance,
724
                                         feedback_fn)
725
    bad = bad or result
726

    
727
    if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
728
      feedback_fn("* Verifying N+1 Memory redundancy")
729
      result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
730
      bad = bad or result
731

    
732
    feedback_fn("* Other Notes")
733
    if i_non_redundant:
734
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
735
                  % len(i_non_redundant))
736

    
737
    return int(bad)
738

    
739
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
740
    """Analize the post-hooks' result, handle it, and send some
741
    nicely-formatted feedback back to the user.
742

743
    Args:
744
      phase: the hooks phase that has just been run
745
      hooks_results: the results of the multi-node hooks rpc call
746
      feedback_fn: function to send feedback back to the caller
747
      lu_result: previous Exec result
748

749
    """
750
    # We only really run POST phase hooks, and are only interested in their results
751
    if phase == constants.HOOKS_PHASE_POST:
752
      # Used to change hooks' output to proper indentation
753
      indent_re = re.compile('^', re.M)
754
      feedback_fn("* Hooks Results")
755
      if not hooks_results:
756
        feedback_fn("  - ERROR: general communication failure")
757
        lu_result = 1
758
      else:
759
        for node_name in hooks_results:
760
          show_node_header = True
761
          res = hooks_results[node_name]
762
          if res is False or not isinstance(res, list):
763
            feedback_fn("    Communication failure")
764
            lu_result = 1
765
            continue
766
          for script, hkr, output in res:
767
            if hkr == constants.HKR_FAIL:
768
              # The node header is only shown once, if there are
769
              # failing hooks on that node
770
              if show_node_header:
771
                feedback_fn("  Node %s:" % node_name)
772
                show_node_header = False
773
              feedback_fn("    ERROR: Script %s failed, output:" % script)
774
              output = indent_re.sub('      ', output)
775
              feedback_fn("%s" % output)
776
              lu_result = 1
777

    
778
      return lu_result
779

    
780

    
781
class LUVerifyDisks(NoHooksLU):
782
  """Verifies the cluster disks status.
783

784
  """
785
  _OP_REQP = []
786

    
787
  def CheckPrereq(self):
788
    """Check prerequisites.
789

790
    This has no prerequisites.
791

792
    """
793
    pass
794

    
795
  def Exec(self, feedback_fn):
796
    """Verify integrity of cluster disks.
797

798
    """
799
    result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
800

    
801
    vg_name = self.cfg.GetVGName()
802
    nodes = utils.NiceSort(self.cfg.GetNodeList())
803
    instances = [self.cfg.GetInstanceInfo(name)
804
                 for name in self.cfg.GetInstanceList()]
805

    
806
    nv_dict = {}
807
    for inst in instances:
808
      inst_lvs = {}
809
      if (inst.status != "up" or
810
          inst.disk_template not in constants.DTS_NET_MIRROR):
811
        continue
812
      inst.MapLVsByNode(inst_lvs)
813
      # transform { iname: {node: [vol,],},} to {(node, vol): iname}
814
      for node, vol_list in inst_lvs.iteritems():
815
        for vol in vol_list:
816
          nv_dict[(node, vol)] = inst
817

    
818
    if not nv_dict:
819
      return result
820

    
821
    node_lvs = rpc.call_volume_list(nodes, vg_name)
822

    
823
    to_act = set()
824
    for node in nodes:
825
      # node_volume
826
      lvs = node_lvs[node]
827

    
828
      if isinstance(lvs, basestring):
829
        logger.Info("error enumerating LVs on node %s: %s" % (node, lvs))
830
        res_nlvm[node] = lvs
831
      elif not isinstance(lvs, dict):
832
        logger.Info("connection to node %s failed or invalid data returned" %
833
                    (node,))
834
        res_nodes.append(node)
835
        continue
836

    
837
      for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
838
        inst = nv_dict.pop((node, lv_name), None)
839
        if (not lv_online and inst is not None
840
            and inst.name not in res_instances):
841
          res_instances.append(inst.name)
842

    
843
    # any leftover items in nv_dict are missing LVs, let's arrange the
844
    # data better
845
    for key, inst in nv_dict.iteritems():
846
      if inst.name not in res_missing:
847
        res_missing[inst.name] = []
848
      res_missing[inst.name].append(key)
849

    
850
    return result
851

    
852

    
853
class LURenameCluster(LogicalUnit):
854
  """Rename the cluster.
855

856
  """
857
  HPATH = "cluster-rename"
858
  HTYPE = constants.HTYPE_CLUSTER
859
  _OP_REQP = ["name"]
860
  REQ_WSSTORE = True
861

    
862
  def BuildHooksEnv(self):
863
    """Build hooks env.
864

865
    """
866
    env = {
867
      "OP_TARGET": self.sstore.GetClusterName(),
868
      "NEW_NAME": self.op.name,
869
      }
870
    mn = self.sstore.GetMasterNode()
871
    return env, [mn], [mn]
872

    
873
  def CheckPrereq(self):
874
    """Verify that the passed name is a valid one.
875

876
    """
877
    hostname = utils.HostInfo(self.op.name)
878

    
879
    new_name = hostname.name
880
    self.ip = new_ip = hostname.ip
881
    old_name = self.sstore.GetClusterName()
882
    old_ip = self.sstore.GetMasterIP()
883
    if new_name == old_name and new_ip == old_ip:
884
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
885
                                 " cluster has changed")
886
    if new_ip != old_ip:
887
      if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
888
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
889
                                   " reachable on the network. Aborting." %
890
                                   new_ip)
891

    
892
    self.op.name = new_name
893

    
894
  def Exec(self, feedback_fn):
895
    """Rename the cluster.
896

897
    """
898
    clustername = self.op.name
899
    ip = self.ip
900
    ss = self.sstore
901

    
902
    # shutdown the master IP
903
    master = ss.GetMasterNode()
904
    if not rpc.call_node_stop_master(master):
905
      raise errors.OpExecError("Could not disable the master role")
906

    
907
    try:
908
      # modify the sstore
909
      ss.SetKey(ss.SS_MASTER_IP, ip)
910
      ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
911

    
912
      # Distribute updated ss config to all nodes
913
      myself = self.cfg.GetNodeInfo(master)
914
      dist_nodes = self.cfg.GetNodeList()
915
      if myself.name in dist_nodes:
916
        dist_nodes.remove(myself.name)
917

    
918
      logger.Debug("Copying updated ssconf data to all nodes")
919
      for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
920
        fname = ss.KeyToFilename(keyname)
921
        result = rpc.call_upload_file(dist_nodes, fname)
922
        for to_node in dist_nodes:
923
          if not result[to_node]:
924
            logger.Error("copy of file %s to node %s failed" %
925
                         (fname, to_node))
926
    finally:
927
      if not rpc.call_node_start_master(master):
928
        logger.Error("Could not re-enable the master role on the master,"
929
                     " please restart manually.")
930

    
931

    
932
def _RecursiveCheckIfLVMBased(disk):
933
  """Check if the given disk or its children are lvm-based.
934

935
  Args:
936
    disk: ganeti.objects.Disk object
937

938
  Returns:
939
    boolean indicating whether a LD_LV dev_type was found or not
940

941
  """
942
  if disk.children:
943
    for chdisk in disk.children:
944
      if _RecursiveCheckIfLVMBased(chdisk):
945
        return True
946
  return disk.dev_type == constants.LD_LV
947

    
948

    
949
class LUSetClusterParams(LogicalUnit):
950
  """Change the parameters of the cluster.
951

952
  """
953
  HPATH = "cluster-modify"
954
  HTYPE = constants.HTYPE_CLUSTER
955
  _OP_REQP = []
956

    
957
  def BuildHooksEnv(self):
958
    """Build hooks env.
959

960
    """
961
    env = {
962
      "OP_TARGET": self.sstore.GetClusterName(),
963
      "NEW_VG_NAME": self.op.vg_name,
964
      }
965
    mn = self.sstore.GetMasterNode()
966
    return env, [mn], [mn]
967

    
968
  def CheckPrereq(self):
969
    """Check prerequisites.
970

971
    This checks whether the given params don't conflict and
972
    if the given volume group is valid.
973

974
    """
975
    if not self.op.vg_name:
976
      instances = [self.cfg.GetInstanceInfo(name)
977
                   for name in self.cfg.GetInstanceList()]
978
      for inst in instances:
979
        for disk in inst.disks:
980
          if _RecursiveCheckIfLVMBased(disk):
981
            raise errors.OpPrereqError("Cannot disable lvm storage while"
982
                                       " lvm-based instances exist")
983

    
984
    # if vg_name not None, checks given volume group on all nodes
985
    if self.op.vg_name:
986
      node_list = self.cfg.GetNodeList()
987
      vglist = rpc.call_vg_list(node_list)
988
      for node in node_list:
989
        vgstatus = utils.CheckVolumeGroupSize(vglist[node], self.op.vg_name,
990
                                              constants.MIN_VG_SIZE)
991
        if vgstatus:
992
          raise errors.OpPrereqError("Error on node '%s': %s" %
993
                                     (node, vgstatus))
994

    
995
  def Exec(self, feedback_fn):
996
    """Change the parameters of the cluster.
997

998
    """
999
    if self.op.vg_name != self.cfg.GetVGName():
1000
      self.cfg.SetVGName(self.op.vg_name)
1001
    else:
1002
      feedback_fn("Cluster LVM configuration already in desired"
1003
                  " state, not changing")
1004

    
1005

    
1006
def _WaitForSync(cfgw, instance, proc, oneshot=False, unlock=False):
1007
  """Sleep and poll for an instance's disk to sync.
1008

1009
  """
1010
  if not instance.disks:
1011
    return True
1012

    
1013
  if not oneshot:
1014
    proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1015

    
1016
  node = instance.primary_node
1017

    
1018
  for dev in instance.disks:
1019
    cfgw.SetDiskID(dev, node)
1020

    
1021
  retries = 0
1022
  while True:
1023
    max_time = 0
1024
    done = True
1025
    cumul_degraded = False
1026
    rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1027
    if not rstats:
1028
      proc.LogWarning("Can't get any data from node %s" % node)
1029
      retries += 1
1030
      if retries >= 10:
1031
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1032
                                 " aborting." % node)
1033
      time.sleep(6)
1034
      continue
1035
    retries = 0
1036
    for i in range(len(rstats)):
1037
      mstat = rstats[i]
1038
      if mstat is None:
1039
        proc.LogWarning("Can't compute data for node %s/%s" %
1040
                        (node, instance.disks[i].iv_name))
1041
        continue
1042
      # we ignore the ldisk parameter
1043
      perc_done, est_time, is_degraded, _ = mstat
1044
      cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1045
      if perc_done is not None:
1046
        done = False
1047
        if est_time is not None:
1048
          rem_time = "%d estimated seconds remaining" % est_time
1049
          max_time = est_time
1050
        else:
1051
          rem_time = "no time estimate"
1052
        proc.LogInfo("- device %s: %5.2f%% done, %s" %
1053
                     (instance.disks[i].iv_name, perc_done, rem_time))
1054
    if done or oneshot:
1055
      break
1056

    
1057
    if unlock:
1058
      #utils.Unlock('cmd')
1059
      pass
1060
    try:
1061
      time.sleep(min(60, max_time))
1062
    finally:
1063
      if unlock:
1064
        #utils.Lock('cmd')
1065
        pass
1066

    
1067
  if done:
1068
    proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1069
  return not cumul_degraded
1070

    
1071

    
1072
def _CheckDiskConsistency(cfgw, dev, node, on_primary, ldisk=False):
1073
  """Check that mirrors are not degraded.
1074

1075
  The ldisk parameter, if True, will change the test from the
1076
  is_degraded attribute (which represents overall non-ok status for
1077
  the device(s)) to the ldisk (representing the local storage status).
1078

1079
  """
1080
  cfgw.SetDiskID(dev, node)
1081
  if ldisk:
1082
    idx = 6
1083
  else:
1084
    idx = 5
1085

    
1086
  result = True
1087
  if on_primary or dev.AssembleOnSecondary():
1088
    rstats = rpc.call_blockdev_find(node, dev)
1089
    if not rstats:
1090
      logger.ToStderr("Node %s: Disk degraded, not found or node down" % node)
1091
      result = False
1092
    else:
1093
      result = result and (not rstats[idx])
1094
  if dev.children:
1095
    for child in dev.children:
1096
      result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1097

    
1098
  return result
1099

    
1100

    
1101
class LUDiagnoseOS(NoHooksLU):
1102
  """Logical unit for OS diagnose/query.
1103

1104
  """
1105
  _OP_REQP = ["output_fields", "names"]
1106

    
1107
  def CheckPrereq(self):
1108
    """Check prerequisites.
1109

1110
    This always succeeds, since this is a pure query LU.
1111

1112
    """
1113
    if self.op.names:
1114
      raise errors.OpPrereqError("Selective OS query not supported")
1115

    
1116
    self.dynamic_fields = frozenset(["name", "valid", "node_status"])
1117
    _CheckOutputFields(static=[],
1118
                       dynamic=self.dynamic_fields,
1119
                       selected=self.op.output_fields)
1120

    
1121
  @staticmethod
1122
  def _DiagnoseByOS(node_list, rlist):
1123
    """Remaps a per-node return list into an a per-os per-node dictionary
1124

1125
      Args:
1126
        node_list: a list with the names of all nodes
1127
        rlist: a map with node names as keys and OS objects as values
1128

1129
      Returns:
1130
        map: a map with osnames as keys and as value another map, with
1131
             nodes as
1132
             keys and list of OS objects as values
1133
             e.g. {"debian-etch": {"node1": [<object>,...],
1134
                                   "node2": [<object>,]}
1135
                  }
1136

1137
    """
1138
    all_os = {}
1139
    for node_name, nr in rlist.iteritems():
1140
      if not nr:
1141
        continue
1142
      for os_obj in nr:
1143
        if os_obj.name not in all_os:
1144
          # build a list of nodes for this os containing empty lists
1145
          # for each node in node_list
1146
          all_os[os_obj.name] = {}
1147
          for nname in node_list:
1148
            all_os[os_obj.name][nname] = []
1149
        all_os[os_obj.name][node_name].append(os_obj)
1150
    return all_os
1151

    
1152
  def Exec(self, feedback_fn):
1153
    """Compute the list of OSes.
1154

1155
    """
1156
    node_list = self.cfg.GetNodeList()
1157
    node_data = rpc.call_os_diagnose(node_list)
1158
    if node_data == False:
1159
      raise errors.OpExecError("Can't gather the list of OSes")
1160
    pol = self._DiagnoseByOS(node_list, node_data)
1161
    output = []
1162
    for os_name, os_data in pol.iteritems():
1163
      row = []
1164
      for field in self.op.output_fields:
1165
        if field == "name":
1166
          val = os_name
1167
        elif field == "valid":
1168
          val = utils.all([osl and osl[0] for osl in os_data.values()])
1169
        elif field == "node_status":
1170
          val = {}
1171
          for node_name, nos_list in os_data.iteritems():
1172
            val[node_name] = [(v.status, v.path) for v in nos_list]
1173
        else:
1174
          raise errors.ParameterError(field)
1175
        row.append(val)
1176
      output.append(row)
1177

    
1178
    return output
1179

    
1180

    
1181
class LURemoveNode(LogicalUnit):
1182
  """Logical unit for removing a node.
1183

1184
  """
1185
  HPATH = "node-remove"
1186
  HTYPE = constants.HTYPE_NODE
1187
  _OP_REQP = ["node_name"]
1188

    
1189
  def BuildHooksEnv(self):
1190
    """Build hooks env.
1191

1192
    This doesn't run on the target node in the pre phase as a failed
1193
    node would then be impossible to remove.
1194

1195
    """
1196
    env = {
1197
      "OP_TARGET": self.op.node_name,
1198
      "NODE_NAME": self.op.node_name,
1199
      }
1200
    all_nodes = self.cfg.GetNodeList()
1201
    all_nodes.remove(self.op.node_name)
1202
    return env, all_nodes, all_nodes
1203

    
1204
  def CheckPrereq(self):
1205
    """Check prerequisites.
1206

1207
    This checks:
1208
     - the node exists in the configuration
1209
     - it does not have primary or secondary instances
1210
     - it's not the master
1211

1212
    Any errors are signalled by raising errors.OpPrereqError.
1213

1214
    """
1215
    node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1216
    if node is None:
1217
      raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1218

    
1219
    instance_list = self.cfg.GetInstanceList()
1220

    
1221
    masternode = self.sstore.GetMasterNode()
1222
    if node.name == masternode:
1223
      raise errors.OpPrereqError("Node is the master node,"
1224
                                 " you need to failover first.")
1225

    
1226
    for instance_name in instance_list:
1227
      instance = self.cfg.GetInstanceInfo(instance_name)
1228
      if node.name == instance.primary_node:
1229
        raise errors.OpPrereqError("Instance %s still running on the node,"
1230
                                   " please remove first." % instance_name)
1231
      if node.name in instance.secondary_nodes:
1232
        raise errors.OpPrereqError("Instance %s has node as a secondary,"
1233
                                   " please remove first." % instance_name)
1234
    self.op.node_name = node.name
1235
    self.node = node
1236

    
1237
  def Exec(self, feedback_fn):
1238
    """Removes the node from the cluster.
1239

1240
    """
1241
    node = self.node
1242
    logger.Info("stopping the node daemon and removing configs from node %s" %
1243
                node.name)
1244

    
1245
    rpc.call_node_leave_cluster(node.name)
1246

    
1247
    logger.Info("Removing node %s from config" % node.name)
1248

    
1249
    self.cfg.RemoveNode(node.name)
1250
    # Remove the node from the Ganeti Lock Manager
1251
    self.context.glm.remove(locking.LEVEL_NODE, node.name)
1252

    
1253
    utils.RemoveHostFromEtcHosts(node.name)
1254

    
1255

    
1256
class LUQueryNodes(NoHooksLU):
1257
  """Logical unit for querying nodes.
1258

1259
  """
1260
  _OP_REQP = ["output_fields", "names"]
1261

    
1262
  def CheckPrereq(self):
1263
    """Check prerequisites.
1264

1265
    This checks that the fields required are valid output fields.
1266

1267
    """
1268
    self.dynamic_fields = frozenset([
1269
      "dtotal", "dfree",
1270
      "mtotal", "mnode", "mfree",
1271
      "bootid",
1272
      "ctotal",
1273
      ])
1274

    
1275
    _CheckOutputFields(static=["name", "pinst_cnt", "sinst_cnt",
1276
                               "pinst_list", "sinst_list",
1277
                               "pip", "sip", "tags"],
1278
                       dynamic=self.dynamic_fields,
1279
                       selected=self.op.output_fields)
1280

    
1281
    self.wanted = _GetWantedNodes(self, self.op.names)
1282

    
1283
  def Exec(self, feedback_fn):
1284
    """Computes the list of nodes and their attributes.
1285

1286
    """
1287
    nodenames = self.wanted
1288
    nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1289

    
1290
    # begin data gathering
1291

    
1292
    if self.dynamic_fields.intersection(self.op.output_fields):
1293
      live_data = {}
1294
      node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1295
      for name in nodenames:
1296
        nodeinfo = node_data.get(name, None)
1297
        if nodeinfo:
1298
          live_data[name] = {
1299
            "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1300
            "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1301
            "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1302
            "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1303
            "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1304
            "ctotal": utils.TryConvert(int, nodeinfo['cpu_total']),
1305
            "bootid": nodeinfo['bootid'],
1306
            }
1307
        else:
1308
          live_data[name] = {}
1309
    else:
1310
      live_data = dict.fromkeys(nodenames, {})
1311

    
1312
    node_to_primary = dict([(name, set()) for name in nodenames])
1313
    node_to_secondary = dict([(name, set()) for name in nodenames])
1314

    
1315
    inst_fields = frozenset(("pinst_cnt", "pinst_list",
1316
                             "sinst_cnt", "sinst_list"))
1317
    if inst_fields & frozenset(self.op.output_fields):
1318
      instancelist = self.cfg.GetInstanceList()
1319

    
1320
      for instance_name in instancelist:
1321
        inst = self.cfg.GetInstanceInfo(instance_name)
1322
        if inst.primary_node in node_to_primary:
1323
          node_to_primary[inst.primary_node].add(inst.name)
1324
        for secnode in inst.secondary_nodes:
1325
          if secnode in node_to_secondary:
1326
            node_to_secondary[secnode].add(inst.name)
1327

    
1328
    # end data gathering
1329

    
1330
    output = []
1331
    for node in nodelist:
1332
      node_output = []
1333
      for field in self.op.output_fields:
1334
        if field == "name":
1335
          val = node.name
1336
        elif field == "pinst_list":
1337
          val = list(node_to_primary[node.name])
1338
        elif field == "sinst_list":
1339
          val = list(node_to_secondary[node.name])
1340
        elif field == "pinst_cnt":
1341
          val = len(node_to_primary[node.name])
1342
        elif field == "sinst_cnt":
1343
          val = len(node_to_secondary[node.name])
1344
        elif field == "pip":
1345
          val = node.primary_ip
1346
        elif field == "sip":
1347
          val = node.secondary_ip
1348
        elif field == "tags":
1349
          val = list(node.GetTags())
1350
        elif field in self.dynamic_fields:
1351
          val = live_data[node.name].get(field, None)
1352
        else:
1353
          raise errors.ParameterError(field)
1354
        node_output.append(val)
1355
      output.append(node_output)
1356

    
1357
    return output
1358

    
1359

    
1360
class LUQueryNodeVolumes(NoHooksLU):
1361
  """Logical unit for getting volumes on node(s).
1362

1363
  """
1364
  _OP_REQP = ["nodes", "output_fields"]
1365

    
1366
  def CheckPrereq(self):
1367
    """Check prerequisites.
1368

1369
    This checks that the fields required are valid output fields.
1370

1371
    """
1372
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1373

    
1374
    _CheckOutputFields(static=["node"],
1375
                       dynamic=["phys", "vg", "name", "size", "instance"],
1376
                       selected=self.op.output_fields)
1377

    
1378

    
1379
  def Exec(self, feedback_fn):
1380
    """Computes the list of nodes and their attributes.
1381

1382
    """
1383
    nodenames = self.nodes
1384
    volumes = rpc.call_node_volumes(nodenames)
1385

    
1386
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
1387
             in self.cfg.GetInstanceList()]
1388

    
1389
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1390

    
1391
    output = []
1392
    for node in nodenames:
1393
      if node not in volumes or not volumes[node]:
1394
        continue
1395

    
1396
      node_vols = volumes[node][:]
1397
      node_vols.sort(key=lambda vol: vol['dev'])
1398

    
1399
      for vol in node_vols:
1400
        node_output = []
1401
        for field in self.op.output_fields:
1402
          if field == "node":
1403
            val = node
1404
          elif field == "phys":
1405
            val = vol['dev']
1406
          elif field == "vg":
1407
            val = vol['vg']
1408
          elif field == "name":
1409
            val = vol['name']
1410
          elif field == "size":
1411
            val = int(float(vol['size']))
1412
          elif field == "instance":
1413
            for inst in ilist:
1414
              if node not in lv_by_node[inst]:
1415
                continue
1416
              if vol['name'] in lv_by_node[inst][node]:
1417
                val = inst.name
1418
                break
1419
            else:
1420
              val = '-'
1421
          else:
1422
            raise errors.ParameterError(field)
1423
          node_output.append(str(val))
1424

    
1425
        output.append(node_output)
1426

    
1427
    return output
1428

    
1429

    
1430
class LUAddNode(LogicalUnit):
1431
  """Logical unit for adding node to the cluster.
1432

1433
  """
1434
  HPATH = "node-add"
1435
  HTYPE = constants.HTYPE_NODE
1436
  _OP_REQP = ["node_name"]
1437

    
1438
  def BuildHooksEnv(self):
1439
    """Build hooks env.
1440

1441
    This will run on all nodes before, and on all nodes + the new node after.
1442

1443
    """
1444
    env = {
1445
      "OP_TARGET": self.op.node_name,
1446
      "NODE_NAME": self.op.node_name,
1447
      "NODE_PIP": self.op.primary_ip,
1448
      "NODE_SIP": self.op.secondary_ip,
1449
      }
1450
    nodes_0 = self.cfg.GetNodeList()
1451
    nodes_1 = nodes_0 + [self.op.node_name, ]
1452
    return env, nodes_0, nodes_1
1453

    
1454
  def CheckPrereq(self):
1455
    """Check prerequisites.
1456

1457
    This checks:
1458
     - the new node is not already in the config
1459
     - it is resolvable
1460
     - its parameters (single/dual homed) matches the cluster
1461

1462
    Any errors are signalled by raising errors.OpPrereqError.
1463

1464
    """
1465
    node_name = self.op.node_name
1466
    cfg = self.cfg
1467

    
1468
    dns_data = utils.HostInfo(node_name)
1469

    
1470
    node = dns_data.name
1471
    primary_ip = self.op.primary_ip = dns_data.ip
1472
    secondary_ip = getattr(self.op, "secondary_ip", None)
1473
    if secondary_ip is None:
1474
      secondary_ip = primary_ip
1475
    if not utils.IsValidIP(secondary_ip):
1476
      raise errors.OpPrereqError("Invalid secondary IP given")
1477
    self.op.secondary_ip = secondary_ip
1478

    
1479
    node_list = cfg.GetNodeList()
1480
    if not self.op.readd and node in node_list:
1481
      raise errors.OpPrereqError("Node %s is already in the configuration" %
1482
                                 node)
1483
    elif self.op.readd and node not in node_list:
1484
      raise errors.OpPrereqError("Node %s is not in the configuration" % node)
1485

    
1486
    for existing_node_name in node_list:
1487
      existing_node = cfg.GetNodeInfo(existing_node_name)
1488

    
1489
      if self.op.readd and node == existing_node_name:
1490
        if (existing_node.primary_ip != primary_ip or
1491
            existing_node.secondary_ip != secondary_ip):
1492
          raise errors.OpPrereqError("Readded node doesn't have the same IP"
1493
                                     " address configuration as before")
1494
        continue
1495

    
1496
      if (existing_node.primary_ip == primary_ip or
1497
          existing_node.secondary_ip == primary_ip or
1498
          existing_node.primary_ip == secondary_ip or
1499
          existing_node.secondary_ip == secondary_ip):
1500
        raise errors.OpPrereqError("New node ip address(es) conflict with"
1501
                                   " existing node %s" % existing_node.name)
1502

    
1503
    # check that the type of the node (single versus dual homed) is the
1504
    # same as for the master
1505
    myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1506
    master_singlehomed = myself.secondary_ip == myself.primary_ip
1507
    newbie_singlehomed = secondary_ip == primary_ip
1508
    if master_singlehomed != newbie_singlehomed:
1509
      if master_singlehomed:
1510
        raise errors.OpPrereqError("The master has no private ip but the"
1511
                                   " new node has one")
1512
      else:
1513
        raise errors.OpPrereqError("The master has a private ip but the"
1514
                                   " new node doesn't have one")
1515

    
1516
    # checks reachablity
1517
    if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1518
      raise errors.OpPrereqError("Node not reachable by ping")
1519

    
1520
    if not newbie_singlehomed:
1521
      # check reachability from my secondary ip to newbie's secondary ip
1522
      if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1523
                           source=myself.secondary_ip):
1524
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1525
                                   " based ping to noded port")
1526

    
1527
    self.new_node = objects.Node(name=node,
1528
                                 primary_ip=primary_ip,
1529
                                 secondary_ip=secondary_ip)
1530

    
1531
  def Exec(self, feedback_fn):
1532
    """Adds the new node to the cluster.
1533

1534
    """
1535
    new_node = self.new_node
1536
    node = new_node.name
1537

    
1538
    # check connectivity
1539
    result = rpc.call_version([node])[node]
1540
    if result:
1541
      if constants.PROTOCOL_VERSION == result:
1542
        logger.Info("communication to node %s fine, sw version %s match" %
1543
                    (node, result))
1544
      else:
1545
        raise errors.OpExecError("Version mismatch master version %s,"
1546
                                 " node version %s" %
1547
                                 (constants.PROTOCOL_VERSION, result))
1548
    else:
1549
      raise errors.OpExecError("Cannot get version from the new node")
1550

    
1551
    # setup ssh on node
1552
    logger.Info("copy ssh key to node %s" % node)
1553
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1554
    keyarray = []
1555
    keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1556
                constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1557
                priv_key, pub_key]
1558

    
1559
    for i in keyfiles:
1560
      f = open(i, 'r')
1561
      try:
1562
        keyarray.append(f.read())
1563
      finally:
1564
        f.close()
1565

    
1566
    result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1567
                               keyarray[3], keyarray[4], keyarray[5])
1568

    
1569
    if not result:
1570
      raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1571

    
1572
    # Add node to our /etc/hosts, and add key to known_hosts
1573
    utils.AddHostToEtcHosts(new_node.name)
1574

    
1575
    if new_node.secondary_ip != new_node.primary_ip:
1576
      if not rpc.call_node_tcp_ping(new_node.name,
1577
                                    constants.LOCALHOST_IP_ADDRESS,
1578
                                    new_node.secondary_ip,
1579
                                    constants.DEFAULT_NODED_PORT,
1580
                                    10, False):
1581
        raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1582
                                 " you gave (%s). Please fix and re-run this"
1583
                                 " command." % new_node.secondary_ip)
1584

    
1585
    node_verify_list = [self.sstore.GetMasterNode()]
1586
    node_verify_param = {
1587
      'nodelist': [node],
1588
      # TODO: do a node-net-test as well?
1589
    }
1590

    
1591
    result = rpc.call_node_verify(node_verify_list, node_verify_param)
1592
    for verifier in node_verify_list:
1593
      if not result[verifier]:
1594
        raise errors.OpExecError("Cannot communicate with %s's node daemon"
1595
                                 " for remote verification" % verifier)
1596
      if result[verifier]['nodelist']:
1597
        for failed in result[verifier]['nodelist']:
1598
          feedback_fn("ssh/hostname verification failed %s -> %s" %
1599
                      (verifier, result[verifier]['nodelist'][failed]))
1600
        raise errors.OpExecError("ssh/hostname verification failed.")
1601

    
1602
    # Distribute updated /etc/hosts and known_hosts to all nodes,
1603
    # including the node just added
1604
    myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1605
    dist_nodes = self.cfg.GetNodeList()
1606
    if not self.op.readd:
1607
      dist_nodes.append(node)
1608
    if myself.name in dist_nodes:
1609
      dist_nodes.remove(myself.name)
1610

    
1611
    logger.Debug("Copying hosts and known_hosts to all nodes")
1612
    for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1613
      result = rpc.call_upload_file(dist_nodes, fname)
1614
      for to_node in dist_nodes:
1615
        if not result[to_node]:
1616
          logger.Error("copy of file %s to node %s failed" %
1617
                       (fname, to_node))
1618

    
1619
    to_copy = self.sstore.GetFileList()
1620
    if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1621
      to_copy.append(constants.VNC_PASSWORD_FILE)
1622
    for fname in to_copy:
1623
      result = rpc.call_upload_file([node], fname)
1624
      if not result[node]:
1625
        logger.Error("could not copy file %s to node %s" % (fname, node))
1626

    
1627
    if not self.op.readd:
1628
      logger.Info("adding node %s to cluster.conf" % node)
1629
      self.cfg.AddNode(new_node)
1630
      # Add the new node to the Ganeti Lock Manager
1631
      self.context.glm.add(locking.LEVEL_NODE, node)
1632

    
1633

    
1634
class LUMasterFailover(LogicalUnit):
1635
  """Failover the master node to the current node.
1636

1637
  This is a special LU in that it must run on a non-master node.
1638

1639
  """
1640
  HPATH = "master-failover"
1641
  HTYPE = constants.HTYPE_CLUSTER
1642
  REQ_MASTER = False
1643
  REQ_WSSTORE = True
1644
  _OP_REQP = []
1645

    
1646
  def BuildHooksEnv(self):
1647
    """Build hooks env.
1648

1649
    This will run on the new master only in the pre phase, and on all
1650
    the nodes in the post phase.
1651

1652
    """
1653
    env = {
1654
      "OP_TARGET": self.new_master,
1655
      "NEW_MASTER": self.new_master,
1656
      "OLD_MASTER": self.old_master,
1657
      }
1658
    return env, [self.new_master], self.cfg.GetNodeList()
1659

    
1660
  def CheckPrereq(self):
1661
    """Check prerequisites.
1662

1663
    This checks that we are not already the master.
1664

1665
    """
1666
    self.new_master = utils.HostInfo().name
1667
    self.old_master = self.sstore.GetMasterNode()
1668

    
1669
    if self.old_master == self.new_master:
1670
      raise errors.OpPrereqError("This commands must be run on the node"
1671
                                 " where you want the new master to be."
1672
                                 " %s is already the master" %
1673
                                 self.old_master)
1674

    
1675
  def Exec(self, feedback_fn):
1676
    """Failover the master node.
1677

1678
    This command, when run on a non-master node, will cause the current
1679
    master to cease being master, and the non-master to become new
1680
    master.
1681

1682
    """
1683
    #TODO: do not rely on gethostname returning the FQDN
1684
    logger.Info("setting master to %s, old master: %s" %
1685
                (self.new_master, self.old_master))
1686

    
1687
    if not rpc.call_node_stop_master(self.old_master):
1688
      logger.Error("could disable the master role on the old master"
1689
                   " %s, please disable manually" % self.old_master)
1690

    
1691
    ss = self.sstore
1692
    ss.SetKey(ss.SS_MASTER_NODE, self.new_master)
1693
    if not rpc.call_upload_file(self.cfg.GetNodeList(),
1694
                                ss.KeyToFilename(ss.SS_MASTER_NODE)):
1695
      logger.Error("could not distribute the new simple store master file"
1696
                   " to the other nodes, please check.")
1697

    
1698
    if not rpc.call_node_start_master(self.new_master):
1699
      logger.Error("could not start the master role on the new master"
1700
                   " %s, please check" % self.new_master)
1701
      feedback_fn("Error in activating the master IP on the new master,"
1702
                  " please fix manually.")
1703

    
1704

    
1705

    
1706
class LUQueryClusterInfo(NoHooksLU):
1707
  """Query cluster configuration.
1708

1709
  """
1710
  _OP_REQP = []
1711
  REQ_MASTER = False
1712

    
1713
  def CheckPrereq(self):
1714
    """No prerequsites needed for this LU.
1715

1716
    """
1717
    pass
1718

    
1719
  def Exec(self, feedback_fn):
1720
    """Return cluster config.
1721

1722
    """
1723
    result = {
1724
      "name": self.sstore.GetClusterName(),
1725
      "software_version": constants.RELEASE_VERSION,
1726
      "protocol_version": constants.PROTOCOL_VERSION,
1727
      "config_version": constants.CONFIG_VERSION,
1728
      "os_api_version": constants.OS_API_VERSION,
1729
      "export_version": constants.EXPORT_VERSION,
1730
      "master": self.sstore.GetMasterNode(),
1731
      "architecture": (platform.architecture()[0], platform.machine()),
1732
      "hypervisor_type": self.sstore.GetHypervisorType(),
1733
      }
1734

    
1735
    return result
1736

    
1737

    
1738
class LUDumpClusterConfig(NoHooksLU):
1739
  """Return a text-representation of the cluster-config.
1740

1741
  """
1742
  _OP_REQP = []
1743

    
1744
  def CheckPrereq(self):
1745
    """No prerequisites.
1746

1747
    """
1748
    pass
1749

    
1750
  def Exec(self, feedback_fn):
1751
    """Dump a representation of the cluster config to the standard output.
1752

1753
    """
1754
    return self.cfg.DumpConfig()
1755

    
1756

    
1757
class LUActivateInstanceDisks(NoHooksLU):
1758
  """Bring up an instance's disks.
1759

1760
  """
1761
  _OP_REQP = ["instance_name"]
1762

    
1763
  def CheckPrereq(self):
1764
    """Check prerequisites.
1765

1766
    This checks that the instance is in the cluster.
1767

1768
    """
1769
    instance = self.cfg.GetInstanceInfo(
1770
      self.cfg.ExpandInstanceName(self.op.instance_name))
1771
    if instance is None:
1772
      raise errors.OpPrereqError("Instance '%s' not known" %
1773
                                 self.op.instance_name)
1774
    self.instance = instance
1775

    
1776

    
1777
  def Exec(self, feedback_fn):
1778
    """Activate the disks.
1779

1780
    """
1781
    disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
1782
    if not disks_ok:
1783
      raise errors.OpExecError("Cannot activate block devices")
1784

    
1785
    return disks_info
1786

    
1787

    
1788
def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
1789
  """Prepare the block devices for an instance.
1790

1791
  This sets up the block devices on all nodes.
1792

1793
  Args:
1794
    instance: a ganeti.objects.Instance object
1795
    ignore_secondaries: if true, errors on secondary nodes won't result
1796
                        in an error return from the function
1797

1798
  Returns:
1799
    false if the operation failed
1800
    list of (host, instance_visible_name, node_visible_name) if the operation
1801
         suceeded with the mapping from node devices to instance devices
1802
  """
1803
  device_info = []
1804
  disks_ok = True
1805
  iname = instance.name
1806
  # With the two passes mechanism we try to reduce the window of
1807
  # opportunity for the race condition of switching DRBD to primary
1808
  # before handshaking occured, but we do not eliminate it
1809

    
1810
  # The proper fix would be to wait (with some limits) until the
1811
  # connection has been made and drbd transitions from WFConnection
1812
  # into any other network-connected state (Connected, SyncTarget,
1813
  # SyncSource, etc.)
1814

    
1815
  # 1st pass, assemble on all nodes in secondary mode
1816
  for inst_disk in instance.disks:
1817
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1818
      cfg.SetDiskID(node_disk, node)
1819
      result = rpc.call_blockdev_assemble(node, node_disk, iname, False)
1820
      if not result:
1821
        logger.Error("could not prepare block device %s on node %s"
1822
                     " (is_primary=False, pass=1)" % (inst_disk.iv_name, node))
1823
        if not ignore_secondaries:
1824
          disks_ok = False
1825

    
1826
  # FIXME: race condition on drbd migration to primary
1827

    
1828
  # 2nd pass, do only the primary node
1829
  for inst_disk in instance.disks:
1830
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1831
      if node != instance.primary_node:
1832
        continue
1833
      cfg.SetDiskID(node_disk, node)
1834
      result = rpc.call_blockdev_assemble(node, node_disk, iname, True)
1835
      if not result:
1836
        logger.Error("could not prepare block device %s on node %s"
1837
                     " (is_primary=True, pass=2)" % (inst_disk.iv_name, node))
1838
        disks_ok = False
1839
    device_info.append((instance.primary_node, inst_disk.iv_name, result))
1840

    
1841
  # leave the disks configured for the primary node
1842
  # this is a workaround that would be fixed better by
1843
  # improving the logical/physical id handling
1844
  for disk in instance.disks:
1845
    cfg.SetDiskID(disk, instance.primary_node)
1846

    
1847
  return disks_ok, device_info
1848

    
1849

    
1850
def _StartInstanceDisks(cfg, instance, force):
1851
  """Start the disks of an instance.
1852

1853
  """
1854
  disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
1855
                                           ignore_secondaries=force)
1856
  if not disks_ok:
1857
    _ShutdownInstanceDisks(instance, cfg)
1858
    if force is not None and not force:
1859
      logger.Error("If the message above refers to a secondary node,"
1860
                   " you can retry the operation using '--force'.")
1861
    raise errors.OpExecError("Disk consistency error")
1862

    
1863

    
1864
class LUDeactivateInstanceDisks(NoHooksLU):
1865
  """Shutdown an instance's disks.
1866

1867
  """
1868
  _OP_REQP = ["instance_name"]
1869

    
1870
  def CheckPrereq(self):
1871
    """Check prerequisites.
1872

1873
    This checks that the instance is in the cluster.
1874

1875
    """
1876
    instance = self.cfg.GetInstanceInfo(
1877
      self.cfg.ExpandInstanceName(self.op.instance_name))
1878
    if instance is None:
1879
      raise errors.OpPrereqError("Instance '%s' not known" %
1880
                                 self.op.instance_name)
1881
    self.instance = instance
1882

    
1883
  def Exec(self, feedback_fn):
1884
    """Deactivate the disks
1885

1886
    """
1887
    instance = self.instance
1888
    ins_l = rpc.call_instance_list([instance.primary_node])
1889
    ins_l = ins_l[instance.primary_node]
1890
    if not type(ins_l) is list:
1891
      raise errors.OpExecError("Can't contact node '%s'" %
1892
                               instance.primary_node)
1893

    
1894
    if self.instance.name in ins_l:
1895
      raise errors.OpExecError("Instance is running, can't shutdown"
1896
                               " block devices.")
1897

    
1898
    _ShutdownInstanceDisks(instance, self.cfg)
1899

    
1900

    
1901
def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
1902
  """Shutdown block devices of an instance.
1903

1904
  This does the shutdown on all nodes of the instance.
1905

1906
  If the ignore_primary is false, errors on the primary node are
1907
  ignored.
1908

1909
  """
1910
  result = True
1911
  for disk in instance.disks:
1912
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
1913
      cfg.SetDiskID(top_disk, node)
1914
      if not rpc.call_blockdev_shutdown(node, top_disk):
1915
        logger.Error("could not shutdown block device %s on node %s" %
1916
                     (disk.iv_name, node))
1917
        if not ignore_primary or node != instance.primary_node:
1918
          result = False
1919
  return result
1920

    
1921

    
1922
def _CheckNodeFreeMemory(cfg, node, reason, requested):
1923
  """Checks if a node has enough free memory.
1924

1925
  This function check if a given node has the needed amount of free
1926
  memory. In case the node has less memory or we cannot get the
1927
  information from the node, this function raise an OpPrereqError
1928
  exception.
1929

1930
  Args:
1931
    - cfg: a ConfigWriter instance
1932
    - node: the node name
1933
    - reason: string to use in the error message
1934
    - requested: the amount of memory in MiB
1935

1936
  """
1937
  nodeinfo = rpc.call_node_info([node], cfg.GetVGName())
1938
  if not nodeinfo or not isinstance(nodeinfo, dict):
1939
    raise errors.OpPrereqError("Could not contact node %s for resource"
1940
                             " information" % (node,))
1941

    
1942
  free_mem = nodeinfo[node].get('memory_free')
1943
  if not isinstance(free_mem, int):
1944
    raise errors.OpPrereqError("Can't compute free memory on node %s, result"
1945
                             " was '%s'" % (node, free_mem))
1946
  if requested > free_mem:
1947
    raise errors.OpPrereqError("Not enough memory on node %s for %s:"
1948
                             " needed %s MiB, available %s MiB" %
1949
                             (node, reason, requested, free_mem))
1950

    
1951

    
1952
class LUStartupInstance(LogicalUnit):
1953
  """Starts an instance.
1954

1955
  """
1956
  HPATH = "instance-start"
1957
  HTYPE = constants.HTYPE_INSTANCE
1958
  _OP_REQP = ["instance_name", "force"]
1959

    
1960
  def BuildHooksEnv(self):
1961
    """Build hooks env.
1962

1963
    This runs on master, primary and secondary nodes of the instance.
1964

1965
    """
1966
    env = {
1967
      "FORCE": self.op.force,
1968
      }
1969
    env.update(_BuildInstanceHookEnvByObject(self.instance))
1970
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
1971
          list(self.instance.secondary_nodes))
1972
    return env, nl, nl
1973

    
1974
  def CheckPrereq(self):
1975
    """Check prerequisites.
1976

1977
    This checks that the instance is in the cluster.
1978

1979
    """
1980
    instance = self.cfg.GetInstanceInfo(
1981
      self.cfg.ExpandInstanceName(self.op.instance_name))
1982
    if instance is None:
1983
      raise errors.OpPrereqError("Instance '%s' not known" %
1984
                                 self.op.instance_name)
1985

    
1986
    # check bridges existance
1987
    _CheckInstanceBridgesExist(instance)
1988

    
1989
    _CheckNodeFreeMemory(self.cfg, instance.primary_node,
1990
                         "starting instance %s" % instance.name,
1991
                         instance.memory)
1992

    
1993
    self.instance = instance
1994
    self.op.instance_name = instance.name
1995

    
1996
  def Exec(self, feedback_fn):
1997
    """Start the instance.
1998

1999
    """
2000
    instance = self.instance
2001
    force = self.op.force
2002
    extra_args = getattr(self.op, "extra_args", "")
2003

    
2004
    self.cfg.MarkInstanceUp(instance.name)
2005

    
2006
    node_current = instance.primary_node
2007

    
2008
    _StartInstanceDisks(self.cfg, instance, force)
2009

    
2010
    if not rpc.call_instance_start(node_current, instance, extra_args):
2011
      _ShutdownInstanceDisks(instance, self.cfg)
2012
      raise errors.OpExecError("Could not start instance")
2013

    
2014

    
2015
class LURebootInstance(LogicalUnit):
2016
  """Reboot an instance.
2017

2018
  """
2019
  HPATH = "instance-reboot"
2020
  HTYPE = constants.HTYPE_INSTANCE
2021
  _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2022

    
2023
  def BuildHooksEnv(self):
2024
    """Build hooks env.
2025

2026
    This runs on master, primary and secondary nodes of the instance.
2027

2028
    """
2029
    env = {
2030
      "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2031
      }
2032
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2033
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2034
          list(self.instance.secondary_nodes))
2035
    return env, nl, nl
2036

    
2037
  def CheckPrereq(self):
2038
    """Check prerequisites.
2039

2040
    This checks that the instance is in the cluster.
2041

2042
    """
2043
    instance = self.cfg.GetInstanceInfo(
2044
      self.cfg.ExpandInstanceName(self.op.instance_name))
2045
    if instance is None:
2046
      raise errors.OpPrereqError("Instance '%s' not known" %
2047
                                 self.op.instance_name)
2048

    
2049
    # check bridges existance
2050
    _CheckInstanceBridgesExist(instance)
2051

    
2052
    self.instance = instance
2053
    self.op.instance_name = instance.name
2054

    
2055
  def Exec(self, feedback_fn):
2056
    """Reboot the instance.
2057

2058
    """
2059
    instance = self.instance
2060
    ignore_secondaries = self.op.ignore_secondaries
2061
    reboot_type = self.op.reboot_type
2062
    extra_args = getattr(self.op, "extra_args", "")
2063

    
2064
    node_current = instance.primary_node
2065

    
2066
    if reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2067
                           constants.INSTANCE_REBOOT_HARD,
2068
                           constants.INSTANCE_REBOOT_FULL]:
2069
      raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2070
                                  (constants.INSTANCE_REBOOT_SOFT,
2071
                                   constants.INSTANCE_REBOOT_HARD,
2072
                                   constants.INSTANCE_REBOOT_FULL))
2073

    
2074
    if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2075
                       constants.INSTANCE_REBOOT_HARD]:
2076
      if not rpc.call_instance_reboot(node_current, instance,
2077
                                      reboot_type, extra_args):
2078
        raise errors.OpExecError("Could not reboot instance")
2079
    else:
2080
      if not rpc.call_instance_shutdown(node_current, instance):
2081
        raise errors.OpExecError("could not shutdown instance for full reboot")
2082
      _ShutdownInstanceDisks(instance, self.cfg)
2083
      _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
2084
      if not rpc.call_instance_start(node_current, instance, extra_args):
2085
        _ShutdownInstanceDisks(instance, self.cfg)
2086
        raise errors.OpExecError("Could not start instance for full reboot")
2087

    
2088
    self.cfg.MarkInstanceUp(instance.name)
2089

    
2090

    
2091
class LUShutdownInstance(LogicalUnit):
2092
  """Shutdown an instance.
2093

2094
  """
2095
  HPATH = "instance-stop"
2096
  HTYPE = constants.HTYPE_INSTANCE
2097
  _OP_REQP = ["instance_name"]
2098

    
2099
  def BuildHooksEnv(self):
2100
    """Build hooks env.
2101

2102
    This runs on master, primary and secondary nodes of the instance.
2103

2104
    """
2105
    env = _BuildInstanceHookEnvByObject(self.instance)
2106
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2107
          list(self.instance.secondary_nodes))
2108
    return env, nl, nl
2109

    
2110
  def CheckPrereq(self):
2111
    """Check prerequisites.
2112

2113
    This checks that the instance is in the cluster.
2114

2115
    """
2116
    instance = self.cfg.GetInstanceInfo(
2117
      self.cfg.ExpandInstanceName(self.op.instance_name))
2118
    if instance is None:
2119
      raise errors.OpPrereqError("Instance '%s' not known" %
2120
                                 self.op.instance_name)
2121
    self.instance = instance
2122

    
2123
  def Exec(self, feedback_fn):
2124
    """Shutdown the instance.
2125

2126
    """
2127
    instance = self.instance
2128
    node_current = instance.primary_node
2129
    self.cfg.MarkInstanceDown(instance.name)
2130
    if not rpc.call_instance_shutdown(node_current, instance):
2131
      logger.Error("could not shutdown instance")
2132

    
2133
    _ShutdownInstanceDisks(instance, self.cfg)
2134

    
2135

    
2136
class LUReinstallInstance(LogicalUnit):
2137
  """Reinstall an instance.
2138

2139
  """
2140
  HPATH = "instance-reinstall"
2141
  HTYPE = constants.HTYPE_INSTANCE
2142
  _OP_REQP = ["instance_name"]
2143

    
2144
  def BuildHooksEnv(self):
2145
    """Build hooks env.
2146

2147
    This runs on master, primary and secondary nodes of the instance.
2148

2149
    """
2150
    env = _BuildInstanceHookEnvByObject(self.instance)
2151
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2152
          list(self.instance.secondary_nodes))
2153
    return env, nl, nl
2154

    
2155
  def CheckPrereq(self):
2156
    """Check prerequisites.
2157

2158
    This checks that the instance is in the cluster and is not running.
2159

2160
    """
2161
    instance = self.cfg.GetInstanceInfo(
2162
      self.cfg.ExpandInstanceName(self.op.instance_name))
2163
    if instance is None:
2164
      raise errors.OpPrereqError("Instance '%s' not known" %
2165
                                 self.op.instance_name)
2166
    if instance.disk_template == constants.DT_DISKLESS:
2167
      raise errors.OpPrereqError("Instance '%s' has no disks" %
2168
                                 self.op.instance_name)
2169
    if instance.status != "down":
2170
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2171
                                 self.op.instance_name)
2172
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2173
    if remote_info:
2174
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2175
                                 (self.op.instance_name,
2176
                                  instance.primary_node))
2177

    
2178
    self.op.os_type = getattr(self.op, "os_type", None)
2179
    if self.op.os_type is not None:
2180
      # OS verification
2181
      pnode = self.cfg.GetNodeInfo(
2182
        self.cfg.ExpandNodeName(instance.primary_node))
2183
      if pnode is None:
2184
        raise errors.OpPrereqError("Primary node '%s' is unknown" %
2185
                                   self.op.pnode)
2186
      os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2187
      if not os_obj:
2188
        raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2189
                                   " primary node"  % self.op.os_type)
2190

    
2191
    self.instance = instance
2192

    
2193
  def Exec(self, feedback_fn):
2194
    """Reinstall the instance.
2195

2196
    """
2197
    inst = self.instance
2198

    
2199
    if self.op.os_type is not None:
2200
      feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2201
      inst.os = self.op.os_type
2202
      self.cfg.AddInstance(inst)
2203

    
2204
    _StartInstanceDisks(self.cfg, inst, None)
2205
    try:
2206
      feedback_fn("Running the instance OS create scripts...")
2207
      if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2208
        raise errors.OpExecError("Could not install OS for instance %s"
2209
                                 " on node %s" %
2210
                                 (inst.name, inst.primary_node))
2211
    finally:
2212
      _ShutdownInstanceDisks(inst, self.cfg)
2213

    
2214

    
2215
class LURenameInstance(LogicalUnit):
2216
  """Rename an instance.
2217

2218
  """
2219
  HPATH = "instance-rename"
2220
  HTYPE = constants.HTYPE_INSTANCE
2221
  _OP_REQP = ["instance_name", "new_name"]
2222

    
2223
  def BuildHooksEnv(self):
2224
    """Build hooks env.
2225

2226
    This runs on master, primary and secondary nodes of the instance.
2227

2228
    """
2229
    env = _BuildInstanceHookEnvByObject(self.instance)
2230
    env["INSTANCE_NEW_NAME"] = self.op.new_name
2231
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2232
          list(self.instance.secondary_nodes))
2233
    return env, nl, nl
2234

    
2235
  def CheckPrereq(self):
2236
    """Check prerequisites.
2237

2238
    This checks that the instance is in the cluster and is not running.
2239

2240
    """
2241
    instance = self.cfg.GetInstanceInfo(
2242
      self.cfg.ExpandInstanceName(self.op.instance_name))
2243
    if instance is None:
2244
      raise errors.OpPrereqError("Instance '%s' not known" %
2245
                                 self.op.instance_name)
2246
    if instance.status != "down":
2247
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2248
                                 self.op.instance_name)
2249
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2250
    if remote_info:
2251
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2252
                                 (self.op.instance_name,
2253
                                  instance.primary_node))
2254
    self.instance = instance
2255

    
2256
    # new name verification
2257
    name_info = utils.HostInfo(self.op.new_name)
2258

    
2259
    self.op.new_name = new_name = name_info.name
2260
    instance_list = self.cfg.GetInstanceList()
2261
    if new_name in instance_list:
2262
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2263
                                 new_name)
2264

    
2265
    if not getattr(self.op, "ignore_ip", False):
2266
      if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
2267
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
2268
                                   (name_info.ip, new_name))
2269

    
2270

    
2271
  def Exec(self, feedback_fn):
2272
    """Reinstall the instance.
2273

2274
    """
2275
    inst = self.instance
2276
    old_name = inst.name
2277

    
2278
    if inst.disk_template == constants.DT_FILE:
2279
      old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2280

    
2281
    self.cfg.RenameInstance(inst.name, self.op.new_name)
2282

    
2283
    # re-read the instance from the configuration after rename
2284
    inst = self.cfg.GetInstanceInfo(self.op.new_name)
2285

    
2286
    if inst.disk_template == constants.DT_FILE:
2287
      new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2288
      result = rpc.call_file_storage_dir_rename(inst.primary_node,
2289
                                                old_file_storage_dir,
2290
                                                new_file_storage_dir)
2291

    
2292
      if not result:
2293
        raise errors.OpExecError("Could not connect to node '%s' to rename"
2294
                                 " directory '%s' to '%s' (but the instance"
2295
                                 " has been renamed in Ganeti)" % (
2296
                                 inst.primary_node, old_file_storage_dir,
2297
                                 new_file_storage_dir))
2298

    
2299
      if not result[0]:
2300
        raise errors.OpExecError("Could not rename directory '%s' to '%s'"
2301
                                 " (but the instance has been renamed in"
2302
                                 " Ganeti)" % (old_file_storage_dir,
2303
                                               new_file_storage_dir))
2304

    
2305
    _StartInstanceDisks(self.cfg, inst, None)
2306
    try:
2307
      if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2308
                                          "sda", "sdb"):
2309
        msg = ("Could run OS rename script for instance %s on node %s (but the"
2310
               " instance has been renamed in Ganeti)" %
2311
               (inst.name, inst.primary_node))
2312
        logger.Error(msg)
2313
    finally:
2314
      _ShutdownInstanceDisks(inst, self.cfg)
2315

    
2316

    
2317
class LURemoveInstance(LogicalUnit):
2318
  """Remove an instance.
2319

2320
  """
2321
  HPATH = "instance-remove"
2322
  HTYPE = constants.HTYPE_INSTANCE
2323
  _OP_REQP = ["instance_name", "ignore_failures"]
2324

    
2325
  def BuildHooksEnv(self):
2326
    """Build hooks env.
2327

2328
    This runs on master, primary and secondary nodes of the instance.
2329

2330
    """
2331
    env = _BuildInstanceHookEnvByObject(self.instance)
2332
    nl = [self.sstore.GetMasterNode()]
2333
    return env, nl, nl
2334

    
2335
  def CheckPrereq(self):
2336
    """Check prerequisites.
2337

2338
    This checks that the instance is in the cluster.
2339

2340
    """
2341
    instance = self.cfg.GetInstanceInfo(
2342
      self.cfg.ExpandInstanceName(self.op.instance_name))
2343
    if instance is None:
2344
      raise errors.OpPrereqError("Instance '%s' not known" %
2345
                                 self.op.instance_name)
2346
    self.instance = instance
2347

    
2348
  def Exec(self, feedback_fn):
2349
    """Remove the instance.
2350

2351
    """
2352
    instance = self.instance
2353
    logger.Info("shutting down instance %s on node %s" %
2354
                (instance.name, instance.primary_node))
2355

    
2356
    if not rpc.call_instance_shutdown(instance.primary_node, instance):
2357
      if self.op.ignore_failures:
2358
        feedback_fn("Warning: can't shutdown instance")
2359
      else:
2360
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2361
                                 (instance.name, instance.primary_node))
2362

    
2363
    logger.Info("removing block devices for instance %s" % instance.name)
2364

    
2365
    if not _RemoveDisks(instance, self.cfg):
2366
      if self.op.ignore_failures:
2367
        feedback_fn("Warning: can't remove instance's disks")
2368
      else:
2369
        raise errors.OpExecError("Can't remove instance's disks")
2370

    
2371
    logger.Info("removing instance %s out of cluster config" % instance.name)
2372

    
2373
    self.cfg.RemoveInstance(instance.name)
2374
    # Remove the new instance from the Ganeti Lock Manager
2375
    self.context.glm.remove(locking.LEVEL_INSTANCE, instance.name)
2376

    
2377

    
2378
class LUQueryInstances(NoHooksLU):
2379
  """Logical unit for querying instances.
2380

2381
  """
2382
  _OP_REQP = ["output_fields", "names"]
2383

    
2384
  def CheckPrereq(self):
2385
    """Check prerequisites.
2386

2387
    This checks that the fields required are valid output fields.
2388

2389
    """
2390
    self.dynamic_fields = frozenset(["oper_state", "oper_ram", "status"])
2391
    _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2392
                               "admin_state", "admin_ram",
2393
                               "disk_template", "ip", "mac", "bridge",
2394
                               "sda_size", "sdb_size", "vcpus", "tags"],
2395
                       dynamic=self.dynamic_fields,
2396
                       selected=self.op.output_fields)
2397

    
2398
    self.wanted = _GetWantedInstances(self, self.op.names)
2399

    
2400
  def Exec(self, feedback_fn):
2401
    """Computes the list of nodes and their attributes.
2402

2403
    """
2404
    instance_names = self.wanted
2405
    instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2406
                     in instance_names]
2407

    
2408
    # begin data gathering
2409

    
2410
    nodes = frozenset([inst.primary_node for inst in instance_list])
2411

    
2412
    bad_nodes = []
2413
    if self.dynamic_fields.intersection(self.op.output_fields):
2414
      live_data = {}
2415
      node_data = rpc.call_all_instances_info(nodes)
2416
      for name in nodes:
2417
        result = node_data[name]
2418
        if result:
2419
          live_data.update(result)
2420
        elif result == False:
2421
          bad_nodes.append(name)
2422
        # else no instance is alive
2423
    else:
2424
      live_data = dict([(name, {}) for name in instance_names])
2425

    
2426
    # end data gathering
2427

    
2428
    output = []
2429
    for instance in instance_list:
2430
      iout = []
2431
      for field in self.op.output_fields:
2432
        if field == "name":
2433
          val = instance.name
2434
        elif field == "os":
2435
          val = instance.os
2436
        elif field == "pnode":
2437
          val = instance.primary_node
2438
        elif field == "snodes":
2439
          val = list(instance.secondary_nodes)
2440
        elif field == "admin_state":
2441
          val = (instance.status != "down")
2442
        elif field == "oper_state":
2443
          if instance.primary_node in bad_nodes:
2444
            val = None
2445
          else:
2446
            val = bool(live_data.get(instance.name))
2447
        elif field == "status":
2448
          if instance.primary_node in bad_nodes:
2449
            val = "ERROR_nodedown"
2450
          else:
2451
            running = bool(live_data.get(instance.name))
2452
            if running:
2453
              if instance.status != "down":
2454
                val = "running"
2455
              else:
2456
                val = "ERROR_up"
2457
            else:
2458
              if instance.status != "down":
2459
                val = "ERROR_down"
2460
              else:
2461
                val = "ADMIN_down"
2462
        elif field == "admin_ram":
2463
          val = instance.memory
2464
        elif field == "oper_ram":
2465
          if instance.primary_node in bad_nodes:
2466
            val = None
2467
          elif instance.name in live_data:
2468
            val = live_data[instance.name].get("memory", "?")
2469
          else:
2470
            val = "-"
2471
        elif field == "disk_template":
2472
          val = instance.disk_template
2473
        elif field == "ip":
2474
          val = instance.nics[0].ip
2475
        elif field == "bridge":
2476
          val = instance.nics[0].bridge
2477
        elif field == "mac":
2478
          val = instance.nics[0].mac
2479
        elif field == "sda_size" or field == "sdb_size":
2480
          disk = instance.FindDisk(field[:3])
2481
          if disk is None:
2482
            val = None
2483
          else:
2484
            val = disk.size
2485
        elif field == "vcpus":
2486
          val = instance.vcpus
2487
        elif field == "tags":
2488
          val = list(instance.GetTags())
2489
        else:
2490
          raise errors.ParameterError(field)
2491
        iout.append(val)
2492
      output.append(iout)
2493

    
2494
    return output
2495

    
2496

    
2497
class LUFailoverInstance(LogicalUnit):
2498
  """Failover an instance.
2499

2500
  """
2501
  HPATH = "instance-failover"
2502
  HTYPE = constants.HTYPE_INSTANCE
2503
  _OP_REQP = ["instance_name", "ignore_consistency"]
2504

    
2505
  def BuildHooksEnv(self):
2506
    """Build hooks env.
2507

2508
    This runs on master, primary and secondary nodes of the instance.
2509

2510
    """
2511
    env = {
2512
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2513
      }
2514
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2515
    nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2516
    return env, nl, nl
2517

    
2518
  def CheckPrereq(self):
2519
    """Check prerequisites.
2520

2521
    This checks that the instance is in the cluster.
2522

2523
    """
2524
    instance = self.cfg.GetInstanceInfo(
2525
      self.cfg.ExpandInstanceName(self.op.instance_name))
2526
    if instance is None:
2527
      raise errors.OpPrereqError("Instance '%s' not known" %
2528
                                 self.op.instance_name)
2529

    
2530
    if instance.disk_template not in constants.DTS_NET_MIRROR:
2531
      raise errors.OpPrereqError("Instance's disk layout is not"
2532
                                 " network mirrored, cannot failover.")
2533

    
2534
    secondary_nodes = instance.secondary_nodes
2535
    if not secondary_nodes:
2536
      raise errors.ProgrammerError("no secondary node but using "
2537
                                   "a mirrored disk template")
2538

    
2539
    target_node = secondary_nodes[0]
2540
    # check memory requirements on the secondary node
2541
    _CheckNodeFreeMemory(self.cfg, target_node, "failing over instance %s" %
2542
                         instance.name, instance.memory)
2543

    
2544
    # check bridge existance
2545
    brlist = [nic.bridge for nic in instance.nics]
2546
    if not rpc.call_bridges_exist(target_node, brlist):
2547
      raise errors.OpPrereqError("One or more target bridges %s does not"
2548
                                 " exist on destination node '%s'" %
2549
                                 (brlist, target_node))
2550

    
2551
    self.instance = instance
2552

    
2553
  def Exec(self, feedback_fn):
2554
    """Failover an instance.
2555

2556
    The failover is done by shutting it down on its present node and
2557
    starting it on the secondary.
2558

2559
    """
2560
    instance = self.instance
2561

    
2562
    source_node = instance.primary_node
2563
    target_node = instance.secondary_nodes[0]
2564

    
2565
    feedback_fn("* checking disk consistency between source and target")
2566
    for dev in instance.disks:
2567
      # for drbd, these are drbd over lvm
2568
      if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2569
        if instance.status == "up" and not self.op.ignore_consistency:
2570
          raise errors.OpExecError("Disk %s is degraded on target node,"
2571
                                   " aborting failover." % dev.iv_name)
2572

    
2573
    feedback_fn("* shutting down instance on source node")
2574
    logger.Info("Shutting down instance %s on node %s" %
2575
                (instance.name, source_node))
2576

    
2577
    if not rpc.call_instance_shutdown(source_node, instance):
2578
      if self.op.ignore_consistency:
2579
        logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2580
                     " anyway. Please make sure node %s is down"  %
2581
                     (instance.name, source_node, source_node))
2582
      else:
2583
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2584
                                 (instance.name, source_node))
2585

    
2586
    feedback_fn("* deactivating the instance's disks on source node")
2587
    if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2588
      raise errors.OpExecError("Can't shut down the instance's disks.")
2589

    
2590
    instance.primary_node = target_node
2591
    # distribute new instance config to the other nodes
2592
    self.cfg.Update(instance)
2593

    
2594
    # Only start the instance if it's marked as up
2595
    if instance.status == "up":
2596
      feedback_fn("* activating the instance's disks on target node")
2597
      logger.Info("Starting instance %s on node %s" %
2598
                  (instance.name, target_node))
2599

    
2600
      disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2601
                                               ignore_secondaries=True)
2602
      if not disks_ok:
2603
        _ShutdownInstanceDisks(instance, self.cfg)
2604
        raise errors.OpExecError("Can't activate the instance's disks")
2605

    
2606
      feedback_fn("* starting the instance on the target node")
2607
      if not rpc.call_instance_start(target_node, instance, None):
2608
        _ShutdownInstanceDisks(instance, self.cfg)
2609
        raise errors.OpExecError("Could not start instance %s on node %s." %
2610
                                 (instance.name, target_node))
2611

    
2612

    
2613
def _CreateBlockDevOnPrimary(cfg, node, instance, device, info):
2614
  """Create a tree of block devices on the primary node.
2615

2616
  This always creates all devices.
2617

2618
  """
2619
  if device.children:
2620
    for child in device.children:
2621
      if not _CreateBlockDevOnPrimary(cfg, node, instance, child, info):
2622
        return False
2623

    
2624
  cfg.SetDiskID(device, node)
2625
  new_id = rpc.call_blockdev_create(node, device, device.size,
2626
                                    instance.name, True, info)
2627
  if not new_id:
2628
    return False
2629
  if device.physical_id is None:
2630
    device.physical_id = new_id
2631
  return True
2632

    
2633

    
2634
def _CreateBlockDevOnSecondary(cfg, node, instance, device, force, info):
2635
  """Create a tree of block devices on a secondary node.
2636

2637
  If this device type has to be created on secondaries, create it and
2638
  all its children.
2639

2640
  If not, just recurse to children keeping the same 'force' value.
2641

2642
  """
2643
  if device.CreateOnSecondary():
2644
    force = True
2645
  if device.children:
2646
    for child in device.children:
2647
      if not _CreateBlockDevOnSecondary(cfg, node, instance,
2648
                                        child, force, info):
2649
        return False
2650

    
2651
  if not force:
2652
    return True
2653
  cfg.SetDiskID(device, node)
2654
  new_id = rpc.call_blockdev_create(node, device, device.size,
2655
                                    instance.name, False, info)
2656
  if not new_id:
2657
    return False
2658
  if device.physical_id is None:
2659
    device.physical_id = new_id
2660
  return True
2661

    
2662

    
2663
def _GenerateUniqueNames(cfg, exts):
2664
  """Generate a suitable LV name.
2665

2666
  This will generate a logical volume name for the given instance.
2667

2668
  """
2669
  results = []
2670
  for val in exts:
2671
    new_id = cfg.GenerateUniqueID()
2672
    results.append("%s%s" % (new_id, val))
2673
  return results
2674

    
2675

    
2676
def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name):
2677
  """Generate a drbd8 device complete with its children.
2678

2679
  """
2680
  port = cfg.AllocatePort()
2681
  vgname = cfg.GetVGName()
2682
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2683
                          logical_id=(vgname, names[0]))
2684
  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2685
                          logical_id=(vgname, names[1]))
2686
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
2687
                          logical_id = (primary, secondary, port),
2688
                          children = [dev_data, dev_meta],
2689
                          iv_name=iv_name)
2690
  return drbd_dev
2691

    
2692

    
2693
def _GenerateDiskTemplate(cfg, template_name,
2694
                          instance_name, primary_node,
2695
                          secondary_nodes, disk_sz, swap_sz,
2696
                          file_storage_dir, file_driver):
2697
  """Generate the entire disk layout for a given template type.
2698

2699
  """
2700
  #TODO: compute space requirements
2701

    
2702
  vgname = cfg.GetVGName()
2703
  if template_name == constants.DT_DISKLESS:
2704
    disks = []
2705
  elif template_name == constants.DT_PLAIN:
2706
    if len(secondary_nodes) != 0:
2707
      raise errors.ProgrammerError("Wrong template configuration")
2708

    
2709
    names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
2710
    sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2711
                           logical_id=(vgname, names[0]),
2712
                           iv_name = "sda")
2713
    sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2714
                           logical_id=(vgname, names[1]),
2715
                           iv_name = "sdb")
2716
    disks = [sda_dev, sdb_dev]
2717
  elif template_name == constants.DT_DRBD8:
2718
    if len(secondary_nodes) != 1:
2719
      raise errors.ProgrammerError("Wrong template configuration")
2720
    remote_node = secondary_nodes[0]
2721
    names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2722
                                       ".sdb_data", ".sdb_meta"])
2723
    drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2724
                                         disk_sz, names[0:2], "sda")
2725
    drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2726
                                         swap_sz, names[2:4], "sdb")
2727
    disks = [drbd_sda_dev, drbd_sdb_dev]
2728
  elif template_name == constants.DT_FILE:
2729
    if len(secondary_nodes) != 0:
2730
      raise errors.ProgrammerError("Wrong template configuration")
2731

    
2732
    file_sda_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk_sz,
2733
                                iv_name="sda", logical_id=(file_driver,
2734
                                "%s/sda" % file_storage_dir))
2735
    file_sdb_dev = objects.Disk(dev_type=constants.LD_FILE, size=swap_sz,
2736
                                iv_name="sdb", logical_id=(file_driver,
2737
                                "%s/sdb" % file_storage_dir))
2738
    disks = [file_sda_dev, file_sdb_dev]
2739
  else:
2740
    raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
2741
  return disks
2742

    
2743

    
2744
def _GetInstanceInfoText(instance):
2745
  """Compute that text that should be added to the disk's metadata.
2746

2747
  """
2748
  return "originstname+%s" % instance.name
2749

    
2750

    
2751
def _CreateDisks(cfg, instance):
2752
  """Create all disks for an instance.
2753

2754
  This abstracts away some work from AddInstance.
2755

2756
  Args:
2757
    instance: the instance object
2758

2759
  Returns:
2760
    True or False showing the success of the creation process
2761

2762
  """
2763
  info = _GetInstanceInfoText(instance)
2764

    
2765
  if instance.disk_template == constants.DT_FILE:
2766
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
2767
    result = rpc.call_file_storage_dir_create(instance.primary_node,
2768
                                              file_storage_dir)
2769

    
2770
    if not result:
2771
      logger.Error("Could not connect to node '%s'" % instance.primary_node)
2772
      return False
2773

    
2774
    if not result[0]:
2775
      logger.Error("failed to create directory '%s'" % file_storage_dir)
2776
      return False
2777

    
2778
  for device in instance.disks:
2779
    logger.Info("creating volume %s for instance %s" %
2780
                (device.iv_name, instance.name))
2781
    #HARDCODE
2782
    for secondary_node in instance.secondary_nodes:
2783
      if not _CreateBlockDevOnSecondary(cfg, secondary_node, instance,
2784
                                        device, False, info):
2785
        logger.Error("failed to create volume %s (%s) on secondary node %s!" %
2786
                     (device.iv_name, device, secondary_node))
2787
        return False
2788
    #HARDCODE
2789
    if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
2790
                                    instance, device, info):
2791
      logger.Error("failed to create volume %s on primary!" %
2792
                   device.iv_name)
2793
      return False
2794

    
2795
  return True
2796

    
2797

    
2798
def _RemoveDisks(instance, cfg):
2799
  """Remove all disks for an instance.
2800

2801
  This abstracts away some work from `AddInstance()` and
2802
  `RemoveInstance()`. Note that in case some of the devices couldn't
2803
  be removed, the removal will continue with the other ones (compare
2804
  with `_CreateDisks()`).
2805

2806
  Args:
2807
    instance: the instance object
2808

2809
  Returns:
2810
    True or False showing the success of the removal proces
2811

2812
  """
2813
  logger.Info("removing block devices for instance %s" % instance.name)
2814

    
2815
  result = True
2816
  for device in instance.disks:
2817
    for node, disk in device.ComputeNodeTree(instance.primary_node):
2818
      cfg.SetDiskID(disk, node)
2819
      if not rpc.call_blockdev_remove(node, disk):
2820
        logger.Error("could not remove block device %s on node %s,"
2821
                     " continuing anyway" %
2822
                     (device.iv_name, node))
2823
        result = False
2824

    
2825
  if instance.disk_template == constants.DT_FILE:
2826
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
2827
    if not rpc.call_file_storage_dir_remove(instance.primary_node,
2828
                                            file_storage_dir):
2829
      logger.Error("could not remove directory '%s'" % file_storage_dir)
2830
      result = False
2831

    
2832
  return result
2833

    
2834

    
2835
def _ComputeDiskSize(disk_template, disk_size, swap_size):
2836
  """Compute disk size requirements in the volume group
2837

2838
  This is currently hard-coded for the two-drive layout.
2839

2840
  """
2841
  # Required free disk space as a function of disk and swap space
2842
  req_size_dict = {
2843
    constants.DT_DISKLESS: None,
2844
    constants.DT_PLAIN: disk_size + swap_size,
2845
    # 256 MB are added for drbd metadata, 128MB for each drbd device
2846
    constants.DT_DRBD8: disk_size + swap_size + 256,
2847
    constants.DT_FILE: None,
2848
  }
2849

    
2850
  if disk_template not in req_size_dict:
2851
    raise errors.ProgrammerError("Disk template '%s' size requirement"
2852
                                 " is unknown" %  disk_template)
2853

    
2854
  return req_size_dict[disk_template]
2855

    
2856

    
2857
class LUCreateInstance(LogicalUnit):
2858
  """Create an instance.
2859

2860
  """
2861
  HPATH = "instance-add"
2862
  HTYPE = constants.HTYPE_INSTANCE
2863
  _OP_REQP = ["instance_name", "mem_size", "disk_size",
2864
              "disk_template", "swap_size", "mode", "start", "vcpus",
2865
              "wait_for_sync", "ip_check", "mac"]
2866

    
2867
  def _RunAllocator(self):
2868
    """Run the allocator based on input opcode.
2869

2870
    """
2871
    disks = [{"size": self.op.disk_size, "mode": "w"},
2872
             {"size": self.op.swap_size, "mode": "w"}]
2873
    nics = [{"mac": self.op.mac, "ip": getattr(self.op, "ip", None),
2874
             "bridge": self.op.bridge}]
2875
    ial = IAllocator(self.cfg, self.sstore,
2876
                     mode=constants.IALLOCATOR_MODE_ALLOC,
2877
                     name=self.op.instance_name,
2878
                     disk_template=self.op.disk_template,
2879
                     tags=[],
2880
                     os=self.op.os_type,
2881
                     vcpus=self.op.vcpus,
2882
                     mem_size=self.op.mem_size,
2883
                     disks=disks,
2884
                     nics=nics,
2885
                     )
2886

    
2887
    ial.Run(self.op.iallocator)
2888

    
2889
    if not ial.success:
2890
      raise errors.OpPrereqError("Can't compute nodes using"
2891
                                 " iallocator '%s': %s" % (self.op.iallocator,
2892
                                                           ial.info))
2893
    if len(ial.nodes) != ial.required_nodes:
2894
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
2895
                                 " of nodes (%s), required %s" %
2896
                                 (len(ial.nodes), ial.required_nodes))
2897
    self.op.pnode = ial.nodes[0]
2898
    logger.ToStdout("Selected nodes for the instance: %s" %
2899
                    (", ".join(ial.nodes),))
2900
    logger.Info("Selected nodes for instance %s via iallocator %s: %s" %
2901
                (self.op.instance_name, self.op.iallocator, ial.nodes))
2902
    if ial.required_nodes == 2:
2903
      self.op.snode = ial.nodes[1]
2904

    
2905
  def BuildHooksEnv(self):
2906
    """Build hooks env.
2907

2908
    This runs on master, primary and secondary nodes of the instance.
2909

2910
    """
2911
    env = {
2912
      "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
2913
      "INSTANCE_DISK_SIZE": self.op.disk_size,
2914
      "INSTANCE_SWAP_SIZE": self.op.swap_size,
2915
      "INSTANCE_ADD_MODE": self.op.mode,
2916
      }
2917
    if self.op.mode == constants.INSTANCE_IMPORT:
2918
      env["INSTANCE_SRC_NODE"] = self.op.src_node
2919
      env["INSTANCE_SRC_PATH"] = self.op.src_path
2920
      env["INSTANCE_SRC_IMAGE"] = self.src_image
2921

    
2922
    env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
2923
      primary_node=self.op.pnode,
2924
      secondary_nodes=self.secondaries,
2925
      status=self.instance_status,
2926
      os_type=self.op.os_type,
2927
      memory=self.op.mem_size,
2928
      vcpus=self.op.vcpus,
2929
      nics=[(self.inst_ip, self.op.bridge, self.op.mac)],
2930
    ))
2931

    
2932
    nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
2933
          self.secondaries)
2934
    return env, nl, nl
2935

    
2936

    
2937
  def CheckPrereq(self):
2938
    """Check prerequisites.
2939

2940
    """
2941
    # set optional parameters to none if they don't exist
2942
    for attr in ["kernel_path", "initrd_path", "hvm_boot_order", "pnode",
2943
                 "iallocator", "hvm_acpi", "hvm_pae", "hvm_cdrom_image_path",
2944
                 "vnc_bind_address"]:
2945
      if not hasattr(self.op, attr):
2946
        setattr(self.op, attr, None)
2947

    
2948
    if self.op.mode not in (constants.INSTANCE_CREATE,
2949
                            constants.INSTANCE_IMPORT):
2950
      raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
2951
                                 self.op.mode)
2952

    
2953
    if (not self.cfg.GetVGName() and
2954
        self.op.disk_template not in constants.DTS_NOT_LVM):
2955
      raise errors.OpPrereqError("Cluster does not support lvm-based"
2956
                                 " instances")
2957

    
2958
    if self.op.mode == constants.INSTANCE_IMPORT:
2959
      src_node = getattr(self.op, "src_node", None)
2960
      src_path = getattr(self.op, "src_path", None)
2961
      if src_node is None or src_path is None:
2962
        raise errors.OpPrereqError("Importing an instance requires source"
2963
                                   " node and path options")
2964
      src_node_full = self.cfg.ExpandNodeName(src_node)
2965
      if src_node_full is None:
2966
        raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
2967
      self.op.src_node = src_node = src_node_full
2968

    
2969
      if not os.path.isabs(src_path):
2970
        raise errors.OpPrereqError("The source path must be absolute")
2971

    
2972
      export_info = rpc.call_export_info(src_node, src_path)
2973

    
2974
      if not export_info:
2975
        raise errors.OpPrereqError("No export found in dir %s" % src_path)
2976

    
2977
      if not export_info.has_section(constants.INISECT_EXP):
2978
        raise errors.ProgrammerError("Corrupted export config")
2979

    
2980
      ei_version = export_info.get(constants.INISECT_EXP, 'version')
2981
      if (int(ei_version) != constants.EXPORT_VERSION):
2982
        raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
2983
                                   (ei_version, constants.EXPORT_VERSION))
2984

    
2985
      if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
2986
        raise errors.OpPrereqError("Can't import instance with more than"
2987
                                   " one data disk")
2988

    
2989
      # FIXME: are the old os-es, disk sizes, etc. useful?
2990
      self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
2991
      diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
2992
                                                         'disk0_dump'))
2993
      self.src_image = diskimage
2994
    else: # INSTANCE_CREATE
2995
      if getattr(self.op, "os_type", None) is None:
2996
        raise errors.OpPrereqError("No guest OS specified")
2997

    
2998
    #### instance parameters check
2999

    
3000
    # disk template and mirror node verification
3001
    if self.op.disk_template not in constants.DISK_TEMPLATES:
3002
      raise errors.OpPrereqError("Invalid disk template name")
3003

    
3004
    # instance name verification
3005
    hostname1 = utils.HostInfo(self.op.instance_name)
3006

    
3007
    self.op.instance_name = instance_name = hostname1.name
3008
    instance_list = self.cfg.GetInstanceList()
3009
    if instance_name in instance_list:
3010
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3011
                                 instance_name)
3012

    
3013
    # ip validity checks
3014
    ip = getattr(self.op, "ip", None)
3015
    if ip is None or ip.lower() == "none":
3016
      inst_ip = None
3017
    elif ip.lower() == "auto":
3018
      inst_ip = hostname1.ip
3019
    else:
3020
      if not utils.IsValidIP(ip):
3021
        raise errors.OpPrereqError("given IP address '%s' doesn't look"
3022
                                   " like a valid IP" % ip)
3023
      inst_ip = ip
3024
    self.inst_ip = self.op.ip = inst_ip
3025

    
3026
    if self.op.start and not self.op.ip_check:
3027
      raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3028
                                 " adding an instance in start mode")
3029

    
3030
    if self.op.ip_check:
3031
      if utils.TcpPing(hostname1.ip, constants.DEFAULT_NODED_PORT):
3032
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
3033
                                   (hostname1.ip, instance_name))
3034

    
3035
    # MAC address verification
3036
    if self.op.mac != "auto":
3037
      if not utils.IsValidMac(self.op.mac.lower()):
3038
        raise errors.OpPrereqError("invalid MAC address specified: %s" %
3039
                                   self.op.mac)
3040

    
3041
    # bridge verification
3042
    bridge = getattr(self.op, "bridge", None)
3043
    if bridge is None:
3044
      self.op.bridge = self.cfg.GetDefBridge()
3045
    else:
3046
      self.op.bridge = bridge
3047

    
3048
    # boot order verification
3049
    if self.op.hvm_boot_order is not None:
3050
      if len(self.op.hvm_boot_order.strip("acdn")) != 0:
3051
        raise errors.OpPrereqError("invalid boot order specified,"
3052
                                   " must be one or more of [acdn]")
3053
    # file storage checks
3054
    if (self.op.file_driver and
3055
        not self.op.file_driver in constants.FILE_DRIVER):
3056
      raise errors.OpPrereqError("Invalid file driver name '%s'" %
3057
                                 self.op.file_driver)
3058

    
3059
    if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
3060
      raise errors.OpPrereqError("File storage directory not a relative"
3061
                                 " path")
3062
    #### allocator run
3063

    
3064
    if [self.op.iallocator, self.op.pnode].count(None) != 1:
3065
      raise errors.OpPrereqError("One and only one of iallocator and primary"
3066
                                 " node must be given")
3067

    
3068
    if self.op.iallocator is not None:
3069
      self._RunAllocator()
3070

    
3071
    #### node related checks
3072

    
3073
    # check primary node
3074
    pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
3075
    if pnode is None:
3076
      raise errors.OpPrereqError("Primary node '%s' is unknown" %
3077
                                 self.op.pnode)
3078
    self.op.pnode = pnode.name
3079
    self.pnode = pnode
3080
    self.secondaries = []
3081

    
3082
    # mirror node verification
3083
    if self.op.disk_template in constants.DTS_NET_MIRROR:
3084
      if getattr(self.op, "snode", None) is None:
3085
        raise errors.OpPrereqError("The networked disk templates need"
3086
                                   " a mirror node")
3087

    
3088
      snode_name = self.cfg.ExpandNodeName(self.op.snode)
3089
      if snode_name is None:
3090
        raise errors.OpPrereqError("Unknown secondary node '%s'" %
3091
                                   self.op.snode)
3092
      elif snode_name == pnode.name:
3093
        raise errors.OpPrereqError("The secondary node cannot be"
3094
                                   " the primary node.")
3095
      self.secondaries.append(snode_name)
3096

    
3097
    req_size = _ComputeDiskSize(self.op.disk_template,
3098
                                self.op.disk_size, self.op.swap_size)
3099

    
3100
    # Check lv size requirements
3101
    if req_size is not None:
3102
      nodenames = [pnode.name] + self.secondaries
3103
      nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
3104
      for node in nodenames:
3105
        info = nodeinfo.get(node, None)
3106
        if not info:
3107
          raise errors.OpPrereqError("Cannot get current information"
3108
                                     " from node '%s'" % node)
3109
        vg_free = info.get('vg_free', None)
3110
        if not isinstance(vg_free, int):
3111
          raise errors.OpPrereqError("Can't compute free disk space on"
3112
                                     " node %s" % node)
3113
        if req_size > info['vg_free']:
3114
          raise errors.OpPrereqError("Not enough disk space on target node %s."
3115
                                     " %d MB available, %d MB required" %
3116
                                     (node, info['vg_free'], req_size))
3117

    
3118
    # os verification
3119
    os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
3120
    if not os_obj:
3121
      raise errors.OpPrereqError("OS '%s' not in supported os list for"
3122
                                 " primary node"  % self.op.os_type)
3123

    
3124
    if self.op.kernel_path == constants.VALUE_NONE:
3125
      raise errors.OpPrereqError("Can't set instance kernel to none")
3126

    
3127

    
3128
    # bridge check on primary node
3129
    if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3130
      raise errors.OpPrereqError("target bridge '%s' does not exist on"
3131
                                 " destination node '%s'" %
3132
                                 (self.op.bridge, pnode.name))
3133

    
3134
    # memory check on primary node
3135
    if self.op.start:
3136
      _CheckNodeFreeMemory(self.cfg, self.pnode.name,
3137
                           "creating instance %s" % self.op.instance_name,
3138
                           self.op.mem_size)
3139

    
3140
    # hvm_cdrom_image_path verification
3141
    if self.op.hvm_cdrom_image_path is not None:
3142
      if not os.path.isabs(self.op.hvm_cdrom_image_path):
3143
        raise errors.OpPrereqError("The path to the HVM CDROM image must"
3144
                                   " be an absolute path or None, not %s" %
3145
                                   self.op.hvm_cdrom_image_path)
3146
      if not os.path.isfile(self.op.hvm_cdrom_image_path):
3147
        raise errors.OpPrereqError("The HVM CDROM image must either be a"
3148
                                   " regular file or a symlink pointing to"
3149
                                   " an existing regular file, not %s" %
3150
                                   self.op.hvm_cdrom_image_path)
3151

    
3152
    # vnc_bind_address verification
3153
    if self.op.vnc_bind_address is not None:
3154
      if not utils.IsValidIP(self.op.vnc_bind_address):
3155
        raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
3156
                                   " like a valid IP address" %
3157
                                   self.op.vnc_bind_address)
3158

    
3159
    if self.op.start:
3160
      self.instance_status = 'up'
3161
    else:
3162
      self.instance_status = 'down'
3163

    
3164
  def Exec(self, feedback_fn):
3165
    """Create and add the instance to the cluster.
3166

3167
    """
3168
    instance = self.op.instance_name
3169
    pnode_name = self.pnode.name
3170

    
3171
    if self.op.mac == "auto":
3172
      mac_address = self.cfg.GenerateMAC()
3173
    else:
3174
      mac_address = self.op.mac
3175

    
3176
    nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3177
    if self.inst_ip is not None:
3178
      nic.ip = self.inst_ip
3179

    
3180
    ht_kind = self.sstore.GetHypervisorType()
3181
    if ht_kind in constants.HTS_REQ_PORT:
3182
      network_port = self.cfg.AllocatePort()
3183
    else:
3184
      network_port = None
3185

    
3186
    if self.op.vnc_bind_address is None:
3187
      self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
3188

    
3189
    # this is needed because os.path.join does not accept None arguments
3190
    if self.op.file_storage_dir is None:
3191
      string_file_storage_dir = ""
3192
    else:
3193
      string_file_storage_dir = self.op.file_storage_dir
3194

    
3195
    # build the full file storage dir path
3196
    file_storage_dir = os.path.normpath(os.path.join(
3197
                                        self.sstore.GetFileStorageDir(),
3198
                                        string_file_storage_dir, instance))
3199

    
3200

    
3201
    disks = _GenerateDiskTemplate(self.cfg,
3202
                                  self.op.disk_template,
3203
                                  instance, pnode_name,
3204
                                  self.secondaries, self.op.disk_size,
3205
                                  self.op.swap_size,
3206
                                  file_storage_dir,
3207
                                  self.op.file_driver)
3208

    
3209
    iobj = objects.Instance(name=instance, os=self.op.os_type,
3210
                            primary_node=pnode_name,
3211
                            memory=self.op.mem_size,
3212
                            vcpus=self.op.vcpus,
3213
                            nics=[nic], disks=disks,
3214
                            disk_template=self.op.disk_template,
3215
                            status=self.instance_status,
3216
                            network_port=network_port,
3217
                            kernel_path=self.op.kernel_path,
3218
                            initrd_path=self.op.initrd_path,
3219
                            hvm_boot_order=self.op.hvm_boot_order,
3220
                            hvm_acpi=self.op.hvm_acpi,
3221
                            hvm_pae=self.op.hvm_pae,
3222
                            hvm_cdrom_image_path=self.op.hvm_cdrom_image_path,
3223
                            vnc_bind_address=self.op.vnc_bind_address,
3224
                            )
3225

    
3226
    feedback_fn("* creating instance disks...")
3227
    if not _CreateDisks(self.cfg, iobj):
3228
      _RemoveDisks(iobj, self.cfg)
3229
      raise errors.OpExecError("Device creation failed, reverting...")
3230

    
3231
    feedback_fn("adding instance %s to cluster config" % instance)
3232

    
3233
    self.cfg.AddInstance(iobj)
3234
    # Add the new instance to the Ganeti Lock Manager
3235
    self.context.glm.add(locking.LEVEL_INSTANCE, instance)
3236

    
3237
    if self.op.wait_for_sync:
3238
      disk_abort = not _WaitForSync(self.cfg, iobj, self.proc)
3239
    elif iobj.disk_template in constants.DTS_NET_MIRROR:
3240
      # make sure the disks are not degraded (still sync-ing is ok)
3241
      time.sleep(15)
3242
      feedback_fn("* checking mirrors status")
3243
      disk_abort = not _WaitForSync(self.cfg, iobj, self.proc, oneshot=True)
3244
    else:
3245
      disk_abort = False
3246

    
3247
    if disk_abort:
3248
      _RemoveDisks(iobj, self.cfg)
3249
      self.cfg.RemoveInstance(iobj.name)
3250
      # Remove the new instance from the Ganeti Lock Manager
3251
      self.context.glm.remove(locking.LEVEL_INSTANCE, iobj.name)
3252
      raise errors.OpExecError("There are some degraded disks for"
3253
                               " this instance")
3254

    
3255
    feedback_fn("creating os for instance %s on node %s" %
3256
                (instance, pnode_name))
3257

    
3258
    if iobj.disk_template != constants.DT_DISKLESS:
3259
      if self.op.mode == constants.INSTANCE_CREATE:
3260
        feedback_fn("* running the instance OS create scripts...")
3261
        if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3262
          raise errors.OpExecError("could not add os for instance %s"
3263
                                   " on node %s" %
3264
                                   (instance, pnode_name))
3265

    
3266
      elif self.op.mode == constants.INSTANCE_IMPORT:
3267
        feedback_fn("* running the instance OS import scripts...")
3268
        src_node = self.op.src_node
3269
        src_image = self.src_image
3270
        if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3271
                                                src_node, src_image):
3272
          raise errors.OpExecError("Could not import os for instance"
3273
                                   " %s on node %s" %
3274
                                   (instance, pnode_name))
3275
      else:
3276
        # also checked in the prereq part
3277
        raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3278
                                     % self.op.mode)
3279

    
3280
    if self.op.start:
3281
      logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3282
      feedback_fn("* starting instance...")
3283
      if not rpc.call_instance_start(pnode_name, iobj, None):
3284
        raise errors.OpExecError("Could not start instance")
3285

    
3286

    
3287
class LUConnectConsole(NoHooksLU):
3288
  """Connect to an instance's console.
3289

3290
  This is somewhat special in that it returns the command line that
3291
  you need to run on the master node in order to connect to the
3292
  console.
3293

3294
  """
3295
  _OP_REQP = ["instance_name"]
3296

    
3297
  def CheckPrereq(self):
3298
    """Check prerequisites.
3299

3300
    This checks that the instance is in the cluster.
3301

3302
    """
3303
    instance = self.cfg.GetInstanceInfo(
3304
      self.cfg.ExpandInstanceName(self.op.instance_name))
3305
    if instance is None:
3306
      raise errors.OpPrereqError("Instance '%s' not known" %
3307
                                 self.op.instance_name)
3308
    self.instance = instance
3309

    
3310
  def Exec(self, feedback_fn):
3311
    """Connect to the console of an instance
3312

3313
    """
3314
    instance = self.instance
3315
    node = instance.primary_node
3316

    
3317
    node_insts = rpc.call_instance_list([node])[node]
3318
    if node_insts is False:
3319
      raise errors.OpExecError("Can't connect to node %s." % node)
3320

    
3321
    if instance.name not in node_insts:
3322
      raise errors.OpExecError("Instance %s is not running." % instance.name)
3323

    
3324
    logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3325

    
3326
    hyper = hypervisor.GetHypervisor()
3327
    console_cmd = hyper.GetShellCommandForConsole(instance)
3328

    
3329
    # build ssh cmdline
3330
    return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
3331

    
3332

    
3333
class LUReplaceDisks(LogicalUnit):
3334
  """Replace the disks of an instance.
3335

3336
  """
3337
  HPATH = "mirrors-replace"
3338
  HTYPE = constants.HTYPE_INSTANCE
3339
  _OP_REQP = ["instance_name", "mode", "disks"]
3340

    
3341
  def _RunAllocator(self):
3342
    """Compute a new secondary node using an IAllocator.
3343

3344
    """
3345
    ial = IAllocator(self.cfg, self.sstore,
3346
                     mode=constants.IALLOCATOR_MODE_RELOC,
3347
                     name=self.op.instance_name,
3348
                     relocate_from=[self.sec_node])
3349

    
3350
    ial.Run(self.op.iallocator)
3351

    
3352
    if not ial.success:
3353
      raise errors.OpPrereqError("Can't compute nodes using"
3354
                                 " iallocator '%s': %s" % (self.op.iallocator,
3355
                                                           ial.info))
3356
    if len(ial.nodes) != ial.required_nodes:
3357
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3358
                                 " of nodes (%s), required %s" %
3359
                                 (len(ial.nodes), ial.required_nodes))
3360
    self.op.remote_node = ial.nodes[0]
3361
    logger.ToStdout("Selected new secondary for the instance: %s" %
3362
                    self.op.remote_node)
3363

    
3364
  def BuildHooksEnv(self):
3365
    """Build hooks env.
3366

3367
    This runs on the master, the primary and all the secondaries.
3368

3369
    """
3370
    env = {
3371
      "MODE": self.op.mode,
3372
      "NEW_SECONDARY": self.op.remote_node,
3373
      "OLD_SECONDARY": self.instance.secondary_nodes[0],
3374
      }
3375
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3376
    nl = [
3377
      self.sstore.GetMasterNode(),
3378
      self.instance.primary_node,
3379
      ]
3380
    if self.op.remote_node is not None:
3381
      nl.append(self.op.remote_node)
3382
    return env, nl, nl
3383

    
3384
  def CheckPrereq(self):
3385
    """Check prerequisites.
3386

3387
    This checks that the instance is in the cluster.
3388

3389
    """
3390
    if not hasattr(self.op, "remote_node"):
3391
      self.op.remote_node = None
3392

    
3393
    instance = self.cfg.GetInstanceInfo(
3394
      self.cfg.ExpandInstanceName(self.op.instance_name))
3395
    if instance is None:
3396
      raise errors.OpPrereqError("Instance '%s' not known" %
3397
                                 self.op.instance_name)
3398
    self.instance = instance
3399
    self.op.instance_name = instance.name
3400

    
3401
    if instance.disk_template not in constants.DTS_NET_MIRROR:
3402
      raise errors.OpPrereqError("Instance's disk layout is not"
3403
                                 " network mirrored.")
3404

    
3405
    if len(instance.secondary_nodes) != 1:
3406
      raise errors.OpPrereqError("The instance has a strange layout,"
3407
                                 " expected one secondary but found %d" %
3408
                                 len(instance.secondary_nodes))
3409

    
3410
    self.sec_node = instance.secondary_nodes[0]
3411

    
3412
    ia_name = getattr(self.op, "iallocator", None)
3413
    if ia_name is not None:
3414
      if self.op.remote_node is not None:
3415
        raise errors.OpPrereqError("Give either the iallocator or the new"
3416
                                   " secondary, not both")
3417
      self.op.remote_node = self._RunAllocator()
3418

    
3419
    remote_node = self.op.remote_node
3420
    if remote_node is not None:
3421
      remote_node = self.cfg.ExpandNodeName(remote_node)
3422
      if remote_node is None:
3423
        raise errors.OpPrereqError("Node '%s' not known" %
3424
                                   self.op.remote_node)
3425
      self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3426
    else:
3427
      self.remote_node_info = None
3428
    if remote_node == instance.primary_node:
3429
      raise errors.OpPrereqError("The specified node is the primary node of"
3430
                                 " the instance.")
3431
    elif remote_node == self.sec_node:
3432
      if self.op.mode == constants.REPLACE_DISK_SEC:
3433
        # this is for DRBD8, where we can't execute the same mode of
3434
        # replacement as for drbd7 (no different port allocated)
3435
        raise errors.OpPrereqError("Same secondary given, cannot execute"
3436
                                   " replacement")
3437
    if instance.disk_template == constants.DT_DRBD8:
3438
      if (self.op.mode == constants.REPLACE_DISK_ALL and
3439
          remote_node is not None):
3440
        # switch to replace secondary mode
3441
        self.op.mode = constants.REPLACE_DISK_SEC
3442

    
3443
      if self.op.mode == constants.REPLACE_DISK_ALL:
3444
        raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3445
                                   " secondary disk replacement, not"
3446
                                   " both at once")
3447
      elif self.op.mode == constants.REPLACE_DISK_PRI:
3448
        if remote_node is not None:
3449
          raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3450
                                     " the secondary while doing a primary"
3451
                                     " node disk replacement")
3452
        self.tgt_node = instance.primary_node
3453
        self.oth_node = instance.secondary_nodes[0]
3454
      elif self.op.mode == constants.REPLACE_DISK_SEC:
3455
        self.new_node = remote_node # this can be None, in which case
3456
                                    # we don't change the secondary
3457
        self.tgt_node = instance.secondary_nodes[0]
3458
        self.oth_node = instance.primary_node
3459
      else:
3460
        raise errors.ProgrammerError("Unhandled disk replace mode")
3461

    
3462
    for name in self.op.disks:
3463
      if instance.FindDisk(name) is None:
3464
        raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3465
                                   (name, instance.name))
3466
    self.op.remote_node = remote_node
3467

    
3468
  def _ExecD8DiskOnly(self, feedback_fn):
3469
    """Replace a disk on the primary or secondary for dbrd8.
3470

3471
    The algorithm for replace is quite complicated:
3472
      - for each disk to be replaced:
3473
        - create new LVs on the target node with unique names
3474
        - detach old LVs from the drbd device
3475
        - rename old LVs to name_replaced.<time_t>
3476
        - rename new LVs to old LVs
3477
        - attach the new LVs (with the old names now) to the drbd device
3478
      - wait for sync across all devices
3479
      - for each modified disk:
3480
        - remove old LVs (which have the name name_replaces.<time_t>)
3481

3482
    Failures are not very well handled.
3483

3484
    """
3485
    steps_total = 6
3486
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3487
    instance = self.instance
3488
    iv_names = {}
3489
    vgname = self.cfg.GetVGName()
3490
    # start of work
3491
    cfg = self.cfg
3492
    tgt_node = self.tgt_node
3493
    oth_node = self.oth_node
3494

    
3495
    # Step: check device activation
3496
    self.proc.LogStep(1, steps_total, "check device existence")
3497
    info("checking volume groups")
3498
    my_vg = cfg.GetVGName()
3499
    results = rpc.call_vg_list([oth_node, tgt_node])
3500
    if not results:
3501
      raise errors.OpExecError("Can't list volume groups on the nodes")
3502
    for node in oth_node, tgt_node:
3503
      res = results.get(node, False)
3504
      if not res or my_vg not in res:
3505
        raise errors.OpExecError("Volume group '%s' not found on %s" %
3506
                                 (my_vg, node))
3507
    for dev in instance.disks:
3508
      if not dev.iv_name in self.op.disks:
3509
        continue
3510
      for node in tgt_node, oth_node:
3511
        info("checking %s on %s" % (dev.iv_name, node))
3512
        cfg.SetDiskID(dev, node)
3513
        if not rpc.call_blockdev_find(node, dev):
3514
          raise errors.OpExecError("Can't find device %s on node %s" %
3515
                                   (dev.iv_name, node))
3516

    
3517
    # Step: check other node consistency
3518
    self.proc.LogStep(2, steps_total, "check peer consistency")
3519
    for dev in instance.disks:
3520
      if not dev.iv_name in self.op.disks:
3521
        continue
3522
      info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3523
      if not _CheckDiskConsistency(self.cfg, dev, oth_node,
3524
                                   oth_node==instance.primary_node):
3525
        raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3526
                                 " to replace disks on this node (%s)" %
3527
                                 (oth_node, tgt_node))
3528

    
3529
    # Step: create new storage
3530
    self.proc.LogStep(3, steps_total, "allocate new storage")
3531
    for dev in instance.disks:
3532
      if not dev.iv_name in self.op.disks:
3533
        continue
3534
      size = dev.size
3535
      cfg.SetDiskID(dev, tgt_node)
3536
      lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3537
      names = _GenerateUniqueNames(cfg, lv_names)
3538
      lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3539
                             logical_id=(vgname, names[0]))
3540
      lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3541
                             logical_id=(vgname, names[1]))
3542
      new_lvs = [lv_data, lv_meta]
3543
      old_lvs = dev.children
3544
      iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3545
      info("creating new local storage on %s for %s" %
3546
           (tgt_node, dev.iv_name))
3547
      # since we *always* want to create this LV, we use the
3548
      # _Create...OnPrimary (which forces the creation), even if we
3549
      # are talking about the secondary node
3550
      for new_lv in new_lvs:
3551
        if not _CreateBlockDevOnPrimary(cfg, tgt_node, instance, new_lv,
3552
                                        _GetInstanceInfoText(instance)):
3553
          raise errors.OpExecError("Failed to create new LV named '%s' on"
3554
                                   " node '%s'" %
3555
                                   (new_lv.logical_id[1], tgt_node))
3556

    
3557
    # Step: for each lv, detach+rename*2+attach
3558
    self.proc.LogStep(4, steps_total, "change drbd configuration")
3559
    for dev, old_lvs, new_lvs in iv_names.itervalues():
3560
      info("detaching %s drbd from local storage" % dev.iv_name)
3561
      if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
3562
        raise errors.OpExecError("Can't detach drbd from local storage on node"
3563
                                 " %s for device %s" % (tgt_node, dev.iv_name))
3564
      #dev.children = []
3565
      #cfg.Update(instance)
3566

    
3567
      # ok, we created the new LVs, so now we know we have the needed
3568
      # storage; as such, we proceed on the target node to rename
3569
      # old_lv to _old, and new_lv to old_lv; note that we rename LVs
3570
      # using the assumption that logical_id == physical_id (which in
3571
      # turn is the unique_id on that node)
3572

    
3573
      # FIXME(iustin): use a better name for the replaced LVs
3574
      temp_suffix = int(time.time())
3575
      ren_fn = lambda d, suff: (d.physical_id[0],
3576
                                d.physical_id[1] + "_replaced-%s" % suff)
3577
      # build the rename list based on what LVs exist on the node
3578
      rlist = []
3579
      for to_ren in old_lvs:
3580
        find_res = rpc.call_blockdev_find(tgt_node, to_ren)
3581
        if find_res is not None: # device exists
3582
          rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
3583

    
3584
      info("renaming the old LVs on the target node")
3585
      if not rpc.call_blockdev_rename(tgt_node, rlist):
3586
        raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
3587
      # now we rename the new LVs to the old LVs
3588
      info("renaming the new LVs on the target node")
3589
      rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
3590
      if not rpc.call_blockdev_rename(tgt_node, rlist):
3591
        raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
3592

    
3593
      for old, new in zip(old_lvs, new_lvs):
3594
        new.logical_id = old.logical_id
3595
        cfg.SetDiskID(new, tgt_node)
3596

    
3597
      for disk in old_lvs:
3598
        disk.logical_id = ren_fn(disk, temp_suffix)
3599
        cfg.SetDiskID(disk, tgt_node)
3600

    
3601
      # now that the new lvs have the old name, we can add them to the device
3602
      info("adding new mirror component on %s" % tgt_node)
3603
      if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
3604
        for new_lv in new_lvs:
3605
          if not rpc.call_blockdev_remove(tgt_node, new_lv):
3606
            warning("Can't rollback device %s", hint="manually cleanup unused"
3607
                    " logical volumes")
3608
        raise errors.OpExecError("Can't add local storage to drbd")
3609

    
3610
      dev.children = new_lvs
3611
      cfg.Update(instance)
3612

    
3613
    # Step: wait for sync
3614

    
3615
    # this can fail as the old devices are degraded and _WaitForSync
3616
    # does a combined result over all disks, so we don't check its
3617
    # return value
3618
    self.proc.LogStep(5, steps_total, "sync devices")
3619
    _WaitForSync(cfg, instance, self.proc, unlock=True)
3620

    
3621
    # so check manually all the devices
3622
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3623
      cfg.SetDiskID(dev, instance.primary_node)
3624
      is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3625
      if is_degr:
3626
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
3627

    
3628
    # Step: remove old storage
3629
    self.proc.LogStep(6, steps_total, "removing old storage")
3630
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3631
      info("remove logical volumes for %s" % name)
3632
      for lv in old_lvs:
3633
        cfg.SetDiskID(lv, tgt_node)
3634
        if not rpc.call_blockdev_remove(tgt_node, lv):
3635
          warning("Can't remove old LV", hint="manually remove unused LVs")
3636
          continue
3637

    
3638
  def _ExecD8Secondary(self, feedback_fn):
3639
    """Replace the secondary node for drbd8.
3640

3641
    The algorithm for replace is quite complicated:
3642
      - for all disks of the instance:
3643
        - create new LVs on the new node with same names
3644
        - shutdown the drbd device on the old secondary
3645
        - disconnect the drbd network on the primary
3646
        - create the drbd device on the new secondary
3647
        - network attach the drbd on the primary, using an artifice:
3648
          the drbd code for Attach() will connect to the network if it
3649
          finds a device which is connected to the good local disks but
3650
          not network enabled
3651
      - wait for sync across all devices
3652
      - remove all disks from the old secondary
3653

3654
    Failures are not very well handled.
3655

3656
    """
3657
    steps_total = 6
3658
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3659
    instance = self.instance
3660
    iv_names = {}
3661
    vgname = self.cfg.GetVGName()
3662
    # start of work
3663
    cfg = self.cfg
3664
    old_node = self.tgt_node
3665
    new_node = self.new_node
3666
    pri_node = instance.primary_node
3667

    
3668
    # Step: check device activation
3669
    self.proc.LogStep(1, steps_total, "check device existence")
3670
    info("checking volume groups")
3671
    my_vg = cfg.GetVGName()
3672
    results = rpc.call_vg_list([pri_node, new_node])
3673
    if not results:
3674
      raise errors.OpExecError("Can't list volume groups on the nodes")
3675
    for node in pri_node, new_node:
3676
      res = results.get(node, False)
3677
      if not res or my_vg not in res:
3678
        raise errors.OpExecError("Volume group '%s' not found on %s" %
3679
                                 (my_vg, node))
3680
    for dev in instance.disks:
3681
      if not dev.iv_name in self.op.disks:
3682
        continue
3683
      info("checking %s on %s" % (dev.iv_name, pri_node))
3684
      cfg.SetDiskID(dev, pri_node)
3685
      if not rpc.call_blockdev_find(pri_node, dev):
3686
        raise errors.OpExecError("Can't find device %s on node %s" %
3687
                                 (dev.iv_name, pri_node))
3688

    
3689
    # Step: check other node consistency
3690
    self.proc.LogStep(2, steps_total, "check peer consistency")
3691
    for dev in instance.disks:
3692
      if not dev.iv_name in self.op.disks:
3693
        continue
3694
      info("checking %s consistency on %s" % (dev.iv_name, pri_node))
3695
      if not _CheckDiskConsistency(self.cfg, dev, pri_node, True, ldisk=True):
3696
        raise errors.OpExecError("Primary node (%s) has degraded storage,"
3697
                                 " unsafe to replace the secondary" %
3698
                                 pri_node)
3699

    
3700
    # Step: create new storage
3701
    self.proc.LogStep(3, steps_total, "allocate new storage")
3702
    for dev in instance.disks:
3703
      size = dev.size
3704
      info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
3705
      # since we *always* want to create this LV, we use the
3706
      # _Create...OnPrimary (which forces the creation), even if we
3707
      # are talking about the secondary node
3708
      for new_lv in dev.children:
3709
        if not _CreateBlockDevOnPrimary(cfg, new_node, instance, new_lv,
3710
                                        _GetInstanceInfoText(instance)):
3711
          raise errors.OpExecError("Failed to create new LV named '%s' on"
3712
                                   " node '%s'" %
3713
                                   (new_lv.logical_id[1], new_node))
3714

    
3715
      iv_names[dev.iv_name] = (dev, dev.children)
3716

    
3717
    self.proc.LogStep(4, steps_total, "changing drbd configuration")
3718
    for dev in instance.disks:
3719
      size = dev.size
3720
      info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
3721
      # create new devices on new_node
3722
      new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
3723
                              logical_id=(pri_node, new_node,
3724
                                          dev.logical_id[2]),
3725
                              children=dev.children)
3726
      if not _CreateBlockDevOnSecondary(cfg, new_node, instance,
3727
                                        new_drbd, False,
3728
                                      _GetInstanceInfoText(instance)):
3729
        raise errors.OpExecError("Failed to create new DRBD on"
3730
                                 " node '%s'" % new_node)
3731

    
3732
    for dev in instance.disks:
3733
      # we have new devices, shutdown the drbd on the old secondary
3734
      info("shutting down drbd for %s on old node" % dev.iv_name)
3735
      cfg.SetDiskID(dev, old_node)
3736
      if not rpc.call_blockdev_shutdown(old_node, dev):
3737
        warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
3738
                hint="Please cleanup this device manually as soon as possible")
3739

    
3740
    info("detaching primary drbds from the network (=> standalone)")
3741
    done = 0
3742
    for dev in instance.disks:
3743
      cfg.SetDiskID(dev, pri_node)
3744
      # set the physical (unique in bdev terms) id to None, meaning
3745
      # detach from network
3746
      dev.physical_id = (None,) * len(dev.physical_id)
3747
      # and 'find' the device, which will 'fix' it to match the
3748
      # standalone state
3749
      if rpc.call_blockdev_find(pri_node, dev):
3750
        done += 1
3751
      else:
3752
        warning("Failed to detach drbd %s from network, unusual case" %
3753
                dev.iv_name)
3754

    
3755
    if not done:
3756
      # no detaches succeeded (very unlikely)
3757
      raise errors.OpExecError("Can't detach at least one DRBD from old node")
3758

    
3759
    # if we managed to detach at least one, we update all the disks of
3760
    # the instance to point to the new secondary
3761
    info("updating instance configuration")
3762
    for dev in instance.disks:
3763
      dev.logical_id = (pri_node, new_node) + dev.logical_id[2:]
3764
      cfg.SetDiskID(dev, pri_node)
3765
    cfg.Update(instance)
3766

    
3767
    # and now perform the drbd attach
3768
    info("attaching primary drbds to new secondary (standalone => connected)")
3769
    failures = []
3770
    for dev in instance.disks:
3771
      info("attaching primary drbd for %s to new secondary node" % dev.iv_name)
3772
      # since the attach is smart, it's enough to 'find' the device,
3773
      # it will automatically activate the network, if the physical_id
3774
      # is correct
3775
      cfg.SetDiskID(dev, pri_node)
3776
      if not rpc.call_blockdev_find(pri_node, dev):
3777
        warning("can't attach drbd %s to new secondary!" % dev.iv_name,
3778
                "please do a gnt-instance info to see the status of disks")
3779

    
3780
    # this can fail as the old devices are degraded and _WaitForSync
3781
    # does a combined result over all disks, so we don't check its
3782
    # return value
3783
    self.proc.LogStep(5, steps_total, "sync devices")
3784
    _WaitForSync(cfg, instance, self.proc, unlock=True)
3785

    
3786
    # so check manually all the devices
3787
    for name, (dev, old_lvs) in iv_names.iteritems():
3788
      cfg.SetDiskID(dev, pri_node)
3789
      is_degr = rpc.call_blockdev_find(pri_node, dev)[5]
3790
      if is_degr:
3791
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
3792

    
3793
    self.proc.LogStep(6, steps_total, "removing old storage")
3794
    for name, (dev, old_lvs) in iv_names.iteritems():
3795
      info("remove logical volumes for %s" % name)
3796
      for lv in old_lvs:
3797
        cfg.SetDiskID(lv, old_node)
3798
        if not rpc.call_blockdev_remove(old_node, lv):
3799
          warning("Can't remove LV on old secondary",
3800
                  hint="Cleanup stale volumes by hand")
3801

    
3802
  def Exec(self, feedback_fn):
3803
    """Execute disk replacement.
3804

3805
    This dispatches the disk replacement to the appropriate handler.
3806

3807
    """
3808
    instance = self.instance
3809

    
3810
    # Activate the instance disks if we're replacing them on a down instance
3811
    if instance.status == "down":
3812
      op = opcodes.OpActivateInstanceDisks(instance_name=instance.name)
3813
      self.proc.ChainOpCode(op)
3814

    
3815
    if instance.disk_template == constants.DT_DRBD8:
3816
      if self.op.remote_node is None:
3817
        fn = self._ExecD8DiskOnly
3818
      else:
3819
        fn = self._ExecD8Secondary
3820
    else:
3821
      raise errors.ProgrammerError("Unhandled disk replacement case")
3822

    
3823
    ret = fn(feedback_fn)
3824

    
3825
    # Deactivate the instance disks if we're replacing them on a down instance
3826
    if instance.status == "down":
3827
      op = opcodes.OpDeactivateInstanceDisks(instance_name=instance.name)
3828
      self.proc.ChainOpCode(op)
3829

    
3830
    return ret
3831

    
3832

    
3833
class LUGrowDisk(LogicalUnit):
3834
  """Grow a disk of an instance.
3835

3836
  """
3837
  HPATH = "disk-grow"
3838
  HTYPE = constants.HTYPE_INSTANCE
3839
  _OP_REQP = ["instance_name", "disk", "amount"]
3840

    
3841
  def BuildHooksEnv(self):
3842
    """Build hooks env.
3843

3844
    This runs on the master, the primary and all the secondaries.
3845

3846
    """
3847
    env = {
3848
      "DISK": self.op.disk,
3849
      "AMOUNT": self.op.amount,
3850
      }
3851
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3852
    nl = [
3853
      self.sstore.GetMasterNode(),
3854
      self.instance.primary_node,
3855
      ]
3856
    return env, nl, nl
3857

    
3858
  def CheckPrereq(self):
3859
    """Check prerequisites.
3860

3861
    This checks that the instance is in the cluster.
3862

3863
    """
3864
    instance = self.cfg.GetInstanceInfo(
3865
      self.cfg.ExpandInstanceName(self.op.instance_name))
3866
    if instance is None:
3867
      raise errors.OpPrereqError("Instance '%s' not known" %
3868
                                 self.op.instance_name)
3869
    self.instance = instance
3870
    self.op.instance_name = instance.name
3871

    
3872
    if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
3873
      raise errors.OpPrereqError("Instance's disk layout does not support"
3874
                                 " growing.")
3875

    
3876
    if instance.FindDisk(self.op.disk) is None:
3877
      raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3878
                                 (self.op.disk, instance.name))
3879

    
3880
    nodenames = [instance.primary_node] + list(instance.secondary_nodes)
3881
    nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
3882
    for node in nodenames:
3883
      info = nodeinfo.get(node, None)
3884
      if not info:
3885
        raise errors.OpPrereqError("Cannot get current information"
3886
                                   " from node '%s'" % node)
3887
      vg_free = info.get('vg_free', None)
3888
      if not isinstance(vg_free, int):
3889
        raise errors.OpPrereqError("Can't compute free disk space on"
3890
                                   " node %s" % node)
3891
      if self.op.amount > info['vg_free']:
3892
        raise errors.OpPrereqError("Not enough disk space on target node %s:"
3893
                                   " %d MiB available, %d MiB required" %
3894
                                   (node, info['vg_free'], self.op.amount))
3895

    
3896
  def Exec(self, feedback_fn):
3897
    """Execute disk grow.
3898

3899
    """
3900
    instance = self.instance
3901
    disk = instance.FindDisk(self.op.disk)
3902
    for node in (instance.secondary_nodes + (instance.primary_node,)):
3903
      self.cfg.SetDiskID(disk, node)
3904
      result = rpc.call_blockdev_grow(node, disk, self.op.amount)
3905
      if not result or not isinstance(result, tuple) or len(result) != 2:
3906
        raise errors.OpExecError("grow request failed to node %s" % node)
3907
      elif not result[0]:
3908
        raise errors.OpExecError("grow request failed to node %s: %s" %
3909
                                 (node, result[1]))
3910
    disk.RecordGrow(self.op.amount)
3911
    self.cfg.Update(instance)
3912
    return
3913

    
3914

    
3915
class LUQueryInstanceData(NoHooksLU):
3916
  """Query runtime instance data.
3917

3918
  """
3919
  _OP_REQP = ["instances"]
3920

    
3921
  def CheckPrereq(self):
3922
    """Check prerequisites.
3923

3924
    This only checks the optional instance list against the existing names.
3925

3926
    """
3927
    if not isinstance(self.op.instances, list):
3928
      raise errors.OpPrereqError("Invalid argument type 'instances'")
3929
    if self.op.instances:
3930
      self.wanted_instances = []
3931
      names = self.op.instances
3932
      for name in names:
3933
        instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
3934
        if instance is None:
3935
          raise errors.OpPrereqError("No such instance name '%s'" % name)
3936
        self.wanted_instances.append(instance)
3937
    else:
3938
      self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
3939
                               in self.cfg.GetInstanceList()]
3940
    return
3941

    
3942

    
3943
  def _ComputeDiskStatus(self, instance, snode, dev):
3944
    """Compute block device status.
3945

3946
    """
3947
    self.cfg.SetDiskID(dev, instance.primary_node)
3948
    dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
3949
    if dev.dev_type in constants.LDS_DRBD:
3950
      # we change the snode then (otherwise we use the one passed in)
3951
      if dev.logical_id[0] == instance.primary_node:
3952
        snode = dev.logical_id[1]
3953
      else:
3954
        snode = dev.logical_id[0]
3955

    
3956
    if snode:
3957
      self.cfg.SetDiskID(dev, snode)
3958
      dev_sstatus = rpc.call_blockdev_find(snode, dev)
3959
    else:
3960
      dev_sstatus = None
3961

    
3962
    if dev.children:
3963
      dev_children = [self._ComputeDiskStatus(instance, snode, child)
3964
                      for child in dev.children]
3965
    else:
3966
      dev_children = []
3967

    
3968
    data = {
3969
      "iv_name": dev.iv_name,
3970
      "dev_type": dev.dev_type,
3971
      "logical_id": dev.logical_id,
3972
      "physical_id": dev.physical_id,
3973
      "pstatus": dev_pstatus,
3974
      "sstatus": dev_sstatus,
3975
      "children": dev_children,
3976
      }
3977

    
3978
    return data
3979

    
3980
  def Exec(self, feedback_fn):
3981
    """Gather and return data"""
3982
    result = {}
3983
    for instance in self.wanted_instances:
3984
      remote_info = rpc.call_instance_info(instance.primary_node,
3985
                                                instance.name)
3986
      if remote_info and "state" in remote_info:
3987
        remote_state = "up"
3988
      else:
3989
        remote_state = "down"
3990
      if instance.status == "down":
3991
        config_state = "down"
3992
      else:
3993
        config_state = "up"
3994

    
3995
      disks = [self._ComputeDiskStatus(instance, None, device)
3996
               for device in instance.disks]
3997

    
3998
      idict = {
3999
        "name": instance.name,
4000
        "config_state": config_state,
4001
        "run_state": remote_state,
4002
        "pnode": instance.primary_node,
4003
        "snodes": instance.secondary_nodes,
4004
        "os": instance.os,
4005
        "memory": instance.memory,
4006
        "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
4007
        "disks": disks,
4008
        "vcpus": instance.vcpus,
4009
        }
4010

    
4011
      htkind = self.sstore.GetHypervisorType()
4012
      if htkind == constants.HT_XEN_PVM30:
4013
        idict["kernel_path"] = instance.kernel_path
4014
        idict["initrd_path"] = instance.initrd_path
4015

    
4016
      if htkind == constants.HT_XEN_HVM31:
4017
        idict["hvm_boot_order"] = instance.hvm_boot_order
4018
        idict["hvm_acpi"] = instance.hvm_acpi
4019
        idict["hvm_pae"] = instance.hvm_pae
4020
        idict["hvm_cdrom_image_path"] = instance.hvm_cdrom_image_path
4021

    
4022
      if htkind in constants.HTS_REQ_PORT:
4023
        idict["vnc_bind_address"] = instance.vnc_bind_address
4024
        idict["network_port"] = instance.network_port
4025

    
4026
      result[instance.name] = idict
4027

    
4028
    return result
4029

    
4030

    
4031
class LUSetInstanceParams(LogicalUnit):
4032
  """Modifies an instances's parameters.
4033

4034
  """
4035
  HPATH = "instance-modify"
4036
  HTYPE = constants.HTYPE_INSTANCE
4037
  _OP_REQP = ["instance_name"]
4038

    
4039
  def BuildHooksEnv(self):
4040
    """Build hooks env.
4041

4042
    This runs on the master, primary and secondaries.
4043

4044
    """
4045
    args = dict()
4046
    if self.mem:
4047
      args['memory'] = self.mem
4048
    if self.vcpus:
4049
      args['vcpus'] = self.vcpus
4050
    if self.do_ip or self.do_bridge or self.mac:
4051
      if self.do_ip:
4052
        ip = self.ip
4053
      else:
4054
        ip = self.instance.nics[0].ip
4055
      if self.bridge:
4056
        bridge = self.bridge
4057
      else:
4058
        bridge = self.instance.nics[0].bridge
4059
      if self.mac:
4060
        mac = self.mac
4061
      else:
4062
        mac = self.instance.nics[0].mac
4063
      args['nics'] = [(ip, bridge, mac)]
4064
    env = _BuildInstanceHookEnvByObject(self.instance, override=args)
4065
    nl = [self.sstore.GetMasterNode(),
4066
          self.instance.primary_node] + list(self.instance.secondary_nodes)
4067
    return env, nl, nl
4068

    
4069
  def CheckPrereq(self):
4070
    """Check prerequisites.
4071

4072
    This only checks the instance list against the existing names.
4073

4074
    """
4075
    self.mem = getattr(self.op, "mem", None)
4076
    self.vcpus = getattr(self.op, "vcpus", None)
4077
    self.ip = getattr(self.op, "ip", None)
4078
    self.mac = getattr(self.op, "mac", None)
4079
    self.bridge = getattr(self.op, "bridge", None)
4080
    self.kernel_path = getattr(self.op, "kernel_path", None)
4081
    self.initrd_path = getattr(self.op, "initrd_path", None)
4082
    self.hvm_boot_order = getattr(self.op, "hvm_boot_order", None)
4083
    self.hvm_acpi = getattr(self.op, "hvm_acpi", None)
4084
    self.hvm_pae = getattr(self.op, "hvm_pae", None)
4085
    self.hvm_cdrom_image_path = getattr(self.op, "hvm_cdrom_image_path", None)
4086
    self.vnc_bind_address = getattr(self.op, "vnc_bind_address", None)
4087
    all_parms = [self.mem, self.vcpus, self.ip, self.bridge, self.mac,
4088
                 self.kernel_path, self.initrd_path, self.hvm_boot_order,
4089
                 self.hvm_acpi, self.hvm_pae, self.hvm_cdrom_image_path,
4090
                 self.vnc_bind_address]
4091
    if all_parms.count(None) == len(all_parms):
4092
      raise errors.OpPrereqError("No changes submitted")
4093
    if self.mem is not None:
4094
      try:
4095
        self.mem = int(self.mem)
4096
      except ValueError, err:
4097
        raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
4098
    if self.vcpus is not None:
4099
      try:
4100
        self.vcpus = int(self.vcpus)
4101
      except ValueError, err:
4102
        raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
4103
    if self.ip is not None:
4104
      self.do_ip = True
4105
      if self.ip.lower() == "none":
4106
        self.ip = None
4107
      else:
4108
        if not utils.IsValidIP(self.ip):
4109
          raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
4110
    else:
4111
      self.do_ip = False
4112
    self.do_bridge = (self.bridge is not None)
4113
    if self.mac is not None:
4114
      if self.cfg.IsMacInUse(self.mac):
4115
        raise errors.OpPrereqError('MAC address %s already in use in cluster' %
4116
                                   self.mac)
4117
      if not utils.IsValidMac(self.mac):
4118
        raise errors.OpPrereqError('Invalid MAC address %s' % self.mac)
4119

    
4120
    if self.kernel_path is not None:
4121
      self.do_kernel_path = True
4122
      if self.kernel_path == constants.VALUE_NONE:
4123
        raise errors.OpPrereqError("Can't set instance to no kernel")
4124

    
4125
      if self.kernel_path != constants.VALUE_DEFAULT:
4126
        if not os.path.isabs(self.kernel_path):
4127
          raise errors.OpPrereqError("The kernel path must be an absolute"
4128
                                    " filename")
4129
    else:
4130
      self.do_kernel_path = False
4131

    
4132
    if self.initrd_path is not None:
4133
      self.do_initrd_path = True
4134
      if self.initrd_path not in (constants.VALUE_NONE,
4135
                                  constants.VALUE_DEFAULT):
4136
        if not os.path.isabs(self.initrd_path):
4137
          raise errors.OpPrereqError("The initrd path must be an absolute"
4138
                                    " filename")
4139
    else:
4140
      self.do_initrd_path = False
4141

    
4142
    # boot order verification
4143
    if self.hvm_boot_order is not None:
4144
      if self.hvm_boot_order != constants.VALUE_DEFAULT:
4145
        if len(self.hvm_boot_order.strip("acdn")) != 0:
4146
          raise errors.OpPrereqError("invalid boot order specified,"
4147
                                     " must be one or more of [acdn]"
4148
                                     " or 'default'")
4149

    
4150
    # hvm_cdrom_image_path verification
4151
    if self.op.hvm_cdrom_image_path is not None:
4152
      if not os.path.isabs(self.op.hvm_cdrom_image_path):
4153
        raise errors.OpPrereqError("The path to the HVM CDROM image must"
4154
                                   " be an absolute path or None, not %s" %
4155
                                   self.op.hvm_cdrom_image_path)
4156
      if not os.path.isfile(self.op.hvm_cdrom_image_path):
4157
        raise errors.OpPrereqError("The HVM CDROM image must either be a"
4158
                                   " regular file or a symlink pointing to"
4159
                                   " an existing regular file, not %s" %
4160
                                   self.op.hvm_cdrom_image_path)
4161

    
4162
    # vnc_bind_address verification
4163
    if self.op.vnc_bind_address is not None:
4164
      if not utils.IsValidIP(self.op.vnc_bind_address):
4165
        raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
4166
                                   " like a valid IP address" %
4167
                                   self.op.vnc_bind_address)
4168

    
4169
    instance = self.cfg.GetInstanceInfo(
4170
      self.cfg.ExpandInstanceName(self.op.instance_name))
4171
    if instance is None:
4172
      raise errors.OpPrereqError("No such instance name '%s'" %
4173
                                 self.op.instance_name)
4174
    self.op.instance_name = instance.name
4175
    self.instance = instance
4176
    return
4177

    
4178
  def Exec(self, feedback_fn):
4179
    """Modifies an instance.
4180

4181
    All parameters take effect only at the next restart of the instance.
4182
    """
4183
    result = []
4184
    instance = self.instance
4185
    if self.mem:
4186
      instance.memory = self.mem
4187
      result.append(("mem", self.mem))
4188
    if self.vcpus:
4189
      instance.vcpus = self.vcpus
4190
      result.append(("vcpus",  self.vcpus))
4191
    if self.do_ip:
4192
      instance.nics[0].ip = self.ip
4193
      result.append(("ip", self.ip))
4194
    if self.bridge:
4195
      instance.nics[0].bridge = self.bridge
4196
      result.append(("bridge", self.bridge))
4197
    if self.mac:
4198
      instance.nics[0].mac = self.mac
4199
      result.append(("mac", self.mac))
4200
    if self.do_kernel_path:
4201
      instance.kernel_path = self.kernel_path
4202
      result.append(("kernel_path", self.kernel_path))
4203
    if self.do_initrd_path:
4204
      instance.initrd_path = self.initrd_path
4205
      result.append(("initrd_path", self.initrd_path))
4206
    if self.hvm_boot_order:
4207
      if self.hvm_boot_order == constants.VALUE_DEFAULT:
4208
        instance.hvm_boot_order = None
4209
      else:
4210
        instance.hvm_boot_order = self.hvm_boot_order
4211
      result.append(("hvm_boot_order", self.hvm_boot_order))
4212
    if self.hvm_acpi:
4213
      instance.hvm_acpi = self.hvm_acpi
4214
      result.append(("hvm_acpi", self.hvm_acpi))
4215
    if self.hvm_pae:
4216
      instance.hvm_pae = self.hvm_pae
4217
      result.append(("hvm_pae", self.hvm_pae))
4218
    if self.hvm_cdrom_image_path:
4219
      instance.hvm_cdrom_image_path = self.hvm_cdrom_image_path
4220
      result.append(("hvm_cdrom_image_path", self.hvm_cdrom_image_path))
4221
    if self.vnc_bind_address:
4222
      instance.vnc_bind_address = self.vnc_bind_address
4223
      result.append(("vnc_bind_address", self.vnc_bind_address))
4224

    
4225
    self.cfg.AddInstance(instance)
4226

    
4227
    return result
4228

    
4229

    
4230
class LUQueryExports(NoHooksLU):
4231
  """Query the exports list
4232

4233
  """
4234
  _OP_REQP = []
4235

    
4236
  def CheckPrereq(self):
4237
    """Check that the nodelist contains only existing nodes.
4238

4239
    """
4240
    self.nodes = _GetWantedNodes(self, getattr(self.op, "nodes", None))
4241

    
4242
  def Exec(self, feedback_fn):
4243
    """Compute the list of all the exported system images.
4244

4245
    Returns:
4246
      a dictionary with the structure node->(export-list)
4247
      where export-list is a list of the instances exported on
4248
      that node.
4249

4250
    """
4251
    return rpc.call_export_list(self.nodes)
4252

    
4253

    
4254
class LUExportInstance(LogicalUnit):
4255
  """Export an instance to an image in the cluster.
4256

4257
  """
4258
  HPATH = "instance-export"
4259
  HTYPE = constants.HTYPE_INSTANCE
4260
  _OP_REQP = ["instance_name", "target_node", "shutdown"]
4261

    
4262
  def BuildHooksEnv(self):
4263
    """Build hooks env.
4264

4265
    This will run on the master, primary node and target node.
4266

4267
    """
4268
    env = {
4269
      "EXPORT_NODE": self.op.target_node,
4270
      "EXPORT_DO_SHUTDOWN": self.op.shutdown,
4271
      }
4272
    env.update(_BuildInstanceHookEnvByObject(self.instance))
4273
    nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
4274
          self.op.target_node]
4275
    return env, nl, nl
4276

    
4277
  def CheckPrereq(self):
4278
    """Check prerequisites.
4279

4280
    This checks that the instance and node names are valid.
4281

4282
    """
4283
    instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4284
    self.instance = self.cfg.GetInstanceInfo(instance_name)
4285
    if self.instance is None:
4286
      raise errors.OpPrereqError("Instance '%s' not found" %
4287
                                 self.op.instance_name)
4288

    
4289
    # node verification
4290
    dst_node_short = self.cfg.ExpandNodeName(self.op.target_node)
4291
    self.dst_node = self.cfg.GetNodeInfo(dst_node_short)
4292

    
4293
    if self.dst_node is None:
4294
      raise errors.OpPrereqError("Destination node '%s' is unknown." %
4295
                                 self.op.target_node)
4296
    self.op.target_node = self.dst_node.name
4297

    
4298
    # instance disk type verification
4299
    for disk in self.instance.disks:
4300
      if disk.dev_type == constants.LD_FILE:
4301
        raise errors.OpPrereqError("Export not supported for instances with"
4302
                                   " file-based disks")
4303

    
4304
  def Exec(self, feedback_fn):
4305
    """Export an instance to an image in the cluster.
4306

4307
    """
4308
    instance = self.instance
4309
    dst_node = self.dst_node
4310
    src_node = instance.primary_node
4311
    if self.op.shutdown:
4312
      # shutdown the instance, but not the disks
4313
      if not rpc.call_instance_shutdown(src_node, instance):
4314
         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
4315
                                  (instance.name, src_node))
4316

    
4317
    vgname = self.cfg.GetVGName()
4318

    
4319
    snap_disks = []
4320

    
4321
    try:
4322
      for disk in instance.disks:
4323
        if disk.iv_name == "sda":
4324
          # new_dev_name will be a snapshot of an lvm leaf of the one we passed
4325
          new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
4326

    
4327
          if not new_dev_name:
4328
            logger.Error("could not snapshot block device %s on node %s" %
4329
                         (disk.logical_id[1], src_node))
4330
          else:
4331
            new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
4332
                                      logical_id=(vgname, new_dev_name),
4333
                                      physical_id=(vgname, new_dev_name),
4334
                                      iv_name=disk.iv_name)
4335
            snap_disks.append(new_dev)
4336

    
4337
    finally:
4338
      if self.op.shutdown and instance.status == "up":
4339
        if not rpc.call_instance_start(src_node, instance, None):
4340
          _ShutdownInstanceDisks(instance, self.cfg)
4341
          raise errors.OpExecError("Could not start instance")
4342

    
4343
    # TODO: check for size
4344

    
4345
    for dev in snap_disks:
4346
      if not rpc.call_snapshot_export(src_node, dev, dst_node.name, instance):
4347
        logger.Error("could not export block device %s from node %s to node %s"
4348
                     % (dev.logical_id[1], src_node, dst_node.name))
4349
      if not rpc.call_blockdev_remove(src_node, dev):
4350
        logger.Error("could not remove snapshot block device %s from node %s" %
4351
                     (dev.logical_id[1], src_node))
4352

    
4353
    if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
4354
      logger.Error("could not finalize export for instance %s on node %s" %
4355
                   (instance.name, dst_node.name))
4356

    
4357
    nodelist = self.cfg.GetNodeList()
4358
    nodelist.remove(dst_node.name)
4359

    
4360
    # on one-node clusters nodelist will be empty after the removal
4361
    # if we proceed the backup would be removed because OpQueryExports
4362
    # substitutes an empty list with the full cluster node list.
4363
    if nodelist:
4364
      op = opcodes.OpQueryExports(nodes=nodelist)
4365
      exportlist = self.proc.ChainOpCode(op)
4366
      for node in exportlist:
4367
        if instance.name in exportlist[node]:
4368
          if not rpc.call_export_remove(node, instance.name):
4369
            logger.Error("could not remove older export for instance %s"
4370
                         " on node %s" % (instance.name, node))
4371

    
4372

    
4373
class LURemoveExport(NoHooksLU):
4374
  """Remove exports related to the named instance.
4375

4376
  """
4377
  _OP_REQP = ["instance_name"]
4378

    
4379
  def CheckPrereq(self):
4380
    """Check prerequisites.
4381
    """
4382
    pass
4383

    
4384
  def Exec(self, feedback_fn):
4385
    """Remove any export.
4386

4387
    """
4388
    instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4389
    # If the instance was not found we'll try with the name that was passed in.
4390
    # This will only work if it was an FQDN, though.
4391
    fqdn_warn = False
4392
    if not instance_name:
4393
      fqdn_warn = True
4394
      instance_name = self.op.instance_name
4395

    
4396
    op = opcodes.OpQueryExports(nodes=[])
4397
    exportlist = self.proc.ChainOpCode(op)
4398
    found = False
4399
    for node in exportlist:
4400
      if instance_name in exportlist[node]:
4401
        found = True
4402
        if not rpc.call_export_remove(node, instance_name):
4403
          logger.Error("could not remove export for instance %s"
4404
                       " on node %s" % (instance_name, node))
4405

    
4406
    if fqdn_warn and not found:
4407
      feedback_fn("Export not found. If trying to remove an export belonging"
4408
                  " to a deleted instance please use its Fully Qualified"
4409
                  " Domain Name.")
4410

    
4411

    
4412
class TagsLU(NoHooksLU):
4413
  """Generic tags LU.
4414

4415
  This is an abstract class which is the parent of all the other tags LUs.
4416

4417
  """
4418
  def CheckPrereq(self):
4419
    """Check prerequisites.
4420

4421
    """
4422
    if self.op.kind == constants.TAG_CLUSTER:
4423
      self.target = self.cfg.GetClusterInfo()
4424
    elif self.op.kind == constants.TAG_NODE:
4425
      name = self.cfg.ExpandNodeName(self.op.name)
4426
      if name is None:
4427
        raise errors.OpPrereqError("Invalid node name (%s)" %
4428
                                   (self.op.name,))
4429
      self.op.name = name
4430
      self.target = self.cfg.GetNodeInfo(name)
4431
    elif self.op.kind == constants.TAG_INSTANCE:
4432
      name = self.cfg.ExpandInstanceName(self.op.name)
4433
      if name is None:
4434
        raise errors.OpPrereqError("Invalid instance name (%s)" %
4435
                                   (self.op.name,))
4436
      self.op.name = name
4437
      self.target = self.cfg.GetInstanceInfo(name)
4438
    else:
4439
      raise errors.OpPrereqError("Wrong tag type requested (%s)" %
4440
                                 str(self.op.kind))
4441

    
4442

    
4443
class LUGetTags(TagsLU):
4444
  """Returns the tags of a given object.
4445

4446
  """
4447
  _OP_REQP = ["kind", "name"]
4448

    
4449
  def Exec(self, feedback_fn):
4450
    """Returns the tag list.
4451

4452
    """
4453
    return self.target.GetTags()
4454

    
4455

    
4456
class LUSearchTags(NoHooksLU):
4457
  """Searches the tags for a given pattern.
4458

4459
  """
4460
  _OP_REQP = ["pattern"]
4461

    
4462
  def CheckPrereq(self):
4463
    """Check prerequisites.
4464

4465
    This checks the pattern passed for validity by compiling it.
4466

4467
    """
4468
    try:
4469
      self.re = re.compile(self.op.pattern)
4470
    except re.error, err:
4471
      raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
4472
                                 (self.op.pattern, err))
4473

    
4474
  def Exec(self, feedback_fn):
4475
    """Returns the tag list.
4476

4477
    """
4478
    cfg = self.cfg
4479
    tgts = [("/cluster", cfg.GetClusterInfo())]
4480
    ilist = [cfg.GetInstanceInfo(name) for name in cfg.GetInstanceList()]
4481
    tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
4482
    nlist = [cfg.GetNodeInfo(name) for name in cfg.GetNodeList()]
4483
    tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
4484
    results = []
4485
    for path, target in tgts:
4486
      for tag in target.GetTags():
4487
        if self.re.search(tag):
4488
          results.append((path, tag))
4489
    return results
4490

    
4491

    
4492
class LUAddTags(TagsLU):
4493
  """Sets a tag on a given object.
4494

4495
  """
4496
  _OP_REQP = ["kind", "name", "tags"]
4497

    
4498
  def CheckPrereq(self):
4499
    """Check prerequisites.
4500

4501
    This checks the type and length of the tag name and value.
4502

4503
    """
4504
    TagsLU.CheckPrereq(self)
4505
    for tag in self.op.tags:
4506
      objects.TaggableObject.ValidateTag(tag)
4507

    
4508
  def Exec(self, feedback_fn):
4509
    """Sets the tag.
4510

4511
    """
4512
    try:
4513
      for tag in self.op.tags:
4514
        self.target.AddTag(tag)
4515
    except errors.TagError, err:
4516
      raise errors.OpExecError("Error while setting tag: %s" % str(err))
4517
    try:
4518
      self.cfg.Update(self.target)
4519
    except errors.ConfigurationError:
4520
      raise errors.OpRetryError("There has been a modification to the"
4521
                                " config file and the operation has been"
4522
                                " aborted. Please retry.")
4523

    
4524

    
4525
class LUDelTags(TagsLU):
4526
  """Delete a list of tags from a given object.
4527

4528
  """
4529
  _OP_REQP = ["kind", "name", "tags"]
4530

    
4531
  def CheckPrereq(self):
4532
    """Check prerequisites.
4533

4534
    This checks that we have the given tag.
4535

4536
    """
4537
    TagsLU.CheckPrereq(self)
4538
    for tag in self.op.tags:
4539
      objects.TaggableObject.ValidateTag(tag)
4540
    del_tags = frozenset(self.op.tags)
4541
    cur_tags = self.target.GetTags()
4542
    if not del_tags <= cur_tags:
4543
      diff_tags = del_tags - cur_tags
4544
      diff_names = ["'%s'" % tag for tag in diff_tags]
4545
      diff_names.sort()
4546
      raise errors.OpPrereqError("Tag(s) %s not found" %
4547
                                 (",".join(diff_names)))
4548

    
4549
  def Exec(self, feedback_fn):
4550
    """Remove the tag from the object.
4551

4552
    """
4553
    for tag in self.op.tags:
4554
      self.target.RemoveTag(tag)
4555
    try:
4556
      self.cfg.Update(self.target)
4557
    except errors.ConfigurationError:
4558
      raise errors.OpRetryError("There has been a modification to the"
4559
                                " config file and the operation has been"
4560
                                " aborted. Please retry.")
4561

    
4562
class LUTestDelay(NoHooksLU):
4563
  """Sleep for a specified amount of time.
4564

4565
  This LU sleeps on the master and/or nodes for a specified amount of
4566
  time.
4567

4568
  """
4569
  _OP_REQP = ["duration", "on_master", "on_nodes"]
4570

    
4571
  def CheckPrereq(self):
4572
    """Check prerequisites.
4573

4574
    This checks that we have a good list of nodes and/or the duration
4575
    is valid.
4576

4577
    """
4578

    
4579
    if self.op.on_nodes:
4580
      self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
4581

    
4582
  def Exec(self, feedback_fn):
4583
    """Do the actual sleep.
4584

4585
    """
4586
    if self.op.on_master:
4587
      if not utils.TestDelay(self.op.duration):
4588
        raise errors.OpExecError("Error during master delay test")
4589
    if self.op.on_nodes:
4590
      result = rpc.call_test_delay(self.op.on_nodes, self.op.duration)
4591
      if not result:
4592
        raise errors.OpExecError("Complete failure from rpc call")
4593
      for node, node_result in result.items():
4594
        if not node_result:
4595
          raise errors.OpExecError("Failure during rpc call to node %s,"
4596
                                   " result: %s" % (node, node_result))
4597

    
4598

    
4599
class IAllocator(object):
4600
  """IAllocator framework.
4601

4602
  An IAllocator instance has three sets of attributes:
4603
    - cfg/sstore that are needed to query the cluster
4604
    - input data (all members of the _KEYS class attribute are required)
4605
    - four buffer attributes (in|out_data|text), that represent the
4606
      input (to the external script) in text and data structure format,
4607
      and the output from it, again in two formats
4608
    - the result variables from the script (success, info, nodes) for
4609
      easy usage
4610

4611
  """
4612
  _ALLO_KEYS = [
4613
    "mem_size", "disks", "disk_template",
4614
    "os", "tags", "nics", "vcpus",
4615
    ]
4616
  _RELO_KEYS = [
4617
    "relocate_from",
4618
    ]
4619

    
4620
  def __init__(self, cfg, sstore, mode, name, **kwargs):
4621
    self.cfg = cfg
4622
    self.sstore = sstore
4623
    # init buffer variables
4624
    self.in_text = self.out_text = self.in_data = self.out_data = None
4625
    # init all input fields so that pylint is happy
4626
    self.mode = mode
4627
    self.name = name
4628
    self.mem_size = self.disks = self.disk_template = None
4629
    self.os = self.tags = self.nics = self.vcpus = None
4630
    self.relocate_from = None
4631
    # computed fields
4632
    self.required_nodes = None
4633
    # init result fields
4634
    self.success = self.info = self.nodes = None
4635
    if self.mode == constants.IALLOCATOR_MODE_ALLOC:
4636
      keyset = self._ALLO_KEYS
4637
    elif self.mode == constants.IALLOCATOR_MODE_RELOC:
4638
      keyset = self._RELO_KEYS
4639
    else:
4640
      raise errors.ProgrammerError("Unknown mode '%s' passed to the"
4641
                                   " IAllocator" % self.mode)
4642
    for key in kwargs:
4643
      if key not in keyset:
4644
        raise errors.ProgrammerError("Invalid input parameter '%s' to"
4645
                                     " IAllocator" % key)
4646
      setattr(self, key, kwargs[key])
4647
    for key in keyset:
4648
      if key not in kwargs:
4649
        raise errors.ProgrammerError("Missing input parameter '%s' to"
4650
                                     " IAllocator" % key)
4651
    self._BuildInputData()
4652

    
4653
  def _ComputeClusterData(self):
4654
    """Compute the generic allocator input data.
4655

4656
    This is the data that is independent of the actual operation.
4657

4658
    """
4659
    cfg = self.cfg
4660
    # cluster data
4661
    data = {
4662
      "version": 1,
4663
      "cluster_name": self.sstore.GetClusterName(),
4664
      "cluster_tags": list(cfg.GetClusterInfo().GetTags()),
4665
      "hypervisor_type": self.sstore.GetHypervisorType(),
4666
      # we don't have job IDs
4667
      }
4668

    
4669
    i_list = [cfg.GetInstanceInfo(iname) for iname in cfg.GetInstanceList()]
4670

    
4671
    # node data
4672
    node_results = {}
4673
    node_list = cfg.GetNodeList()
4674
    node_data = rpc.call_node_info(node_list, cfg.GetVGName())
4675
    for nname in node_list:
4676
      ninfo = cfg.GetNodeInfo(nname)
4677
      if nname not in node_data or not isinstance(node_data[nname], dict):
4678
        raise errors.OpExecError("Can't get data for node %s" % nname)
4679
      remote_info = node_data[nname]
4680
      for attr in ['memory_total', 'memory_free', 'memory_dom0',
4681
                   'vg_size', 'vg_free', 'cpu_total']:
4682
        if attr not in remote_info:
4683
          raise errors.OpExecError("Node '%s' didn't return attribute '%s'" %
4684
                                   (nname, attr))
4685
        try:
4686
          remote_info[attr] = int(remote_info[attr])
4687
        except ValueError, err:
4688
          raise errors.OpExecError("Node '%s' returned invalid value for '%s':"
4689
                                   " %s" % (nname, attr, str(err)))
4690
      # compute memory used by primary instances
4691
      i_p_mem = i_p_up_mem = 0
4692
      for iinfo in i_list:
4693
        if iinfo.primary_node == nname:
4694
          i_p_mem += iinfo.memory
4695
          if iinfo.status == "up":
4696
            i_p_up_mem += iinfo.memory
4697

    
4698
      # compute memory used by instances
4699
      pnr = {
4700
        "tags": list(ninfo.GetTags()),
4701
        "total_memory": remote_info['memory_total'],
4702
        "reserved_memory": remote_info['memory_dom0'],
4703
        "free_memory": remote_info['memory_free'],
4704
        "i_pri_memory": i_p_mem,
4705
        "i_pri_up_memory": i_p_up_mem,
4706
        "total_disk": remote_info['vg_size'],
4707
        "free_disk": remote_info['vg_free'],
4708
        "primary_ip": ninfo.primary_ip,
4709
        "secondary_ip": ninfo.secondary_ip,
4710
        "total_cpus": remote_info['cpu_total'],
4711
        }
4712
      node_results[nname] = pnr
4713
    data["nodes"] = node_results
4714

    
4715
    # instance data
4716
    instance_data = {}
4717
    for iinfo in i_list:
4718
      nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
4719
                  for n in iinfo.nics]
4720
      pir = {
4721
        "tags": list(iinfo.GetTags()),
4722
        "should_run": iinfo.status == "up",
4723
        "vcpus": iinfo.vcpus,
4724
        "memory": iinfo.memory,
4725
        "os": iinfo.os,
4726
        "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
4727
        "nics": nic_data,
4728
        "disks": [{"size": dsk.size, "mode": "w"} for dsk in iinfo.disks],
4729
        "disk_template": iinfo.disk_template,
4730
        }
4731
      instance_data[iinfo.name] = pir
4732

    
4733
    data["instances"] = instance_data
4734

    
4735
    self.in_data = data
4736

    
4737
  def _AddNewInstance(self):
4738
    """Add new instance data to allocator structure.
4739

4740
    This in combination with _AllocatorGetClusterData will create the
4741
    correct structure needed as input for the allocator.
4742

4743
    The checks for the completeness of the opcode must have already been
4744
    done.
4745

4746
    """
4747
    data = self.in_data
4748
    if len(self.disks) != 2:
4749
      raise errors.OpExecError("Only two-disk configurations supported")
4750

    
4751
    disk_space = _ComputeDiskSize(self.disk_template,
4752
                                  self.disks[0]["size"], self.disks[1]["size"])
4753

    
4754
    if self.disk_template in constants.DTS_NET_MIRROR:
4755
      self.required_nodes = 2
4756
    else:
4757
      self.required_nodes = 1
4758
    request = {
4759
      "type": "allocate",
4760
      "name": self.name,
4761
      "disk_template": self.disk_template,
4762
      "tags": self.tags,
4763
      "os": self.os,
4764
      "vcpus": self.vcpus,
4765
      "memory": self.mem_size,
4766
      "disks": self.disks,
4767
      "disk_space_total": disk_space,
4768
      "nics": self.nics,
4769
      "required_nodes": self.required_nodes,
4770
      }
4771
    data["request"] = request
4772

    
4773
  def _AddRelocateInstance(self):
4774
    """Add relocate instance data to allocator structure.
4775

4776
    This in combination with _IAllocatorGetClusterData will create the
4777
    correct structure needed as input for the allocator.
4778

4779
    The checks for the completeness of the opcode must have already been
4780
    done.
4781

4782
    """
4783
    instance = self.cfg.GetInstanceInfo(self.name)
4784
    if instance is None:
4785
      raise errors.ProgrammerError("Unknown instance '%s' passed to"
4786
                                   " IAllocator" % self.name)
4787

    
4788
    if instance.disk_template not in constants.DTS_NET_MIRROR:
4789
      raise errors.OpPrereqError("Can't relocate non-mirrored instances")
4790

    
4791
    if len(instance.secondary_nodes) != 1:
4792
      raise errors.OpPrereqError("Instance has not exactly one secondary node")
4793

    
4794
    self.required_nodes = 1
4795

    
4796
    disk_space = _ComputeDiskSize(instance.disk_template,
4797
                                  instance.disks[0].size,
4798
                                  instance.disks[1].size)
4799

    
4800
    request = {
4801
      "type": "relocate",
4802
      "name": self.name,
4803
      "disk_space_total": disk_space,
4804
      "required_nodes": self.required_nodes,
4805
      "relocate_from": self.relocate_from,
4806
      }
4807
    self.in_data["request"] = request
4808

    
4809
  def _BuildInputData(self):
4810
    """Build input data structures.
4811

4812
    """
4813
    self._ComputeClusterData()
4814

    
4815
    if self.mode == constants.IALLOCATOR_MODE_ALLOC:
4816
      self._AddNewInstance()
4817
    else:
4818
      self._AddRelocateInstance()
4819

    
4820
    self.in_text = serializer.Dump(self.in_data)
4821

    
4822
  def Run(self, name, validate=True, call_fn=rpc.call_iallocator_runner):
4823
    """Run an instance allocator and return the results.
4824

4825
    """
4826
    data = self.in_text
4827

    
4828
    result = call_fn(self.sstore.GetMasterNode(), name, self.in_text)
4829

    
4830
    if not isinstance(result, tuple) or len(result) != 4:
4831
      raise errors.OpExecError("Invalid result from master iallocator runner")
4832

    
4833
    rcode, stdout, stderr, fail = result
4834

    
4835
    if rcode == constants.IARUN_NOTFOUND:
4836
      raise errors.OpExecError("Can't find allocator '%s'" % name)
4837
    elif rcode == constants.IARUN_FAILURE:
4838
        raise errors.OpExecError("Instance allocator call failed: %s,"
4839
                                 " output: %s" %
4840
                                 (fail, stdout+stderr))
4841
    self.out_text = stdout
4842
    if validate:
4843
      self._ValidateResult()
4844

    
4845
  def _ValidateResult(self):
4846
    """Process the allocator results.
4847

4848
    This will process and if successful save the result in
4849
    self.out_data and the other parameters.
4850

4851
    """
4852
    try:
4853
      rdict = serializer.Load(self.out_text)
4854
    except Exception, err:
4855
      raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
4856

    
4857
    if not isinstance(rdict, dict):
4858
      raise errors.OpExecError("Can't parse iallocator results: not a dict")
4859

    
4860
    for key in "success", "info", "nodes":
4861
      if key not in rdict:
4862
        raise errors.OpExecError("Can't parse iallocator results:"
4863
                                 " missing key '%s'" % key)
4864
      setattr(self, key, rdict[key])
4865

    
4866
    if not isinstance(rdict["nodes"], list):
4867
      raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
4868
                               " is not a list")
4869
    self.out_data = rdict
4870

    
4871

    
4872
class LUTestAllocator(NoHooksLU):
4873
  """Run allocator tests.
4874

4875
  This LU runs the allocator tests
4876

4877
  """
4878
  _OP_REQP = ["direction", "mode", "name"]
4879

    
4880
  def CheckPrereq(self):
4881
    """Check prerequisites.
4882

4883
    This checks the opcode parameters depending on the director and mode test.
4884

4885
    """
4886
    if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
4887
      for attr in ["name", "mem_size", "disks", "disk_template",
4888
                   "os", "tags", "nics", "vcpus"]:
4889
        if not hasattr(self.op, attr):
4890
          raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
4891
                                     attr)
4892
      iname = self.cfg.ExpandInstanceName(self.op.name)
4893
      if iname is not None:
4894
        raise errors.OpPrereqError("Instance '%s' already in the cluster" %
4895
                                   iname)
4896
      if not isinstance(self.op.nics, list):
4897
        raise errors.OpPrereqError("Invalid parameter 'nics'")
4898
      for row in self.op.nics:
4899
        if (not isinstance(row, dict) or
4900
            "mac" not in row or
4901
            "ip" not in row or
4902
            "bridge" not in row):
4903
          raise errors.OpPrereqError("Invalid contents of the"
4904
                                     " 'nics' parameter")
4905
      if not isinstance(self.op.disks, list):
4906
        raise errors.OpPrereqError("Invalid parameter 'disks'")
4907
      if len(self.op.disks) != 2:
4908
        raise errors.OpPrereqError("Only two-disk configurations supported")
4909
      for row in self.op.disks:
4910
        if (not isinstance(row, dict) or
4911
            "size" not in row or
4912
            not isinstance(row["size"], int) or
4913
            "mode" not in row or
4914
            row["mode"] not in ['r', 'w']):
4915
          raise errors.OpPrereqError("Invalid contents of the"
4916
                                     " 'disks' parameter")
4917
    elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
4918
      if not hasattr(self.op, "name"):
4919
        raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
4920
      fname = self.cfg.ExpandInstanceName(self.op.name)
4921
      if fname is None:
4922
        raise errors.OpPrereqError("Instance '%s' not found for relocation" %
4923
                                   self.op.name)
4924
      self.op.name = fname
4925
      self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
4926
    else:
4927
      raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
4928
                                 self.op.mode)
4929

    
4930
    if self.op.direction == constants.IALLOCATOR_DIR_OUT:
4931
      if not hasattr(self.op, "allocator") or self.op.allocator is None:
4932
        raise errors.OpPrereqError("Missing allocator name")
4933
    elif self.op.direction != constants.IALLOCATOR_DIR_IN:
4934
      raise errors.OpPrereqError("Wrong allocator test '%s'" %
4935
                                 self.op.direction)
4936

    
4937
  def Exec(self, feedback_fn):
4938
    """Run the allocator test.
4939

4940
    """
4941
    if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
4942
      ial = IAllocator(self.cfg, self.sstore,
4943
                       mode=self.op.mode,
4944
                       name=self.op.name,
4945
                       mem_size=self.op.mem_size,
4946
                       disks=self.op.disks,
4947
                       disk_template=self.op.disk_template,
4948
                       os=self.op.os,
4949
                       tags=self.op.tags,
4950
                       nics=self.op.nics,
4951
                       vcpus=self.op.vcpus,
4952
                       )
4953
    else:
4954
      ial = IAllocator(self.cfg, self.sstore,
4955
                       mode=self.op.mode,
4956
                       name=self.op.name,
4957
                       relocate_from=list(self.relocate_from),
4958
                       )
4959

    
4960
    if self.op.direction == constants.IALLOCATOR_DIR_IN:
4961
      result = ial.in_text
4962
    else:
4963
      ial.Run(self.op.allocator, validate=False)
4964
      result = ial.out_text
4965
    return result