Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ f64c9de6

History | View | Annotate | Download (168.3 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0613,W0201
25

    
26
import os
27
import os.path
28
import sha
29
import time
30
import tempfile
31
import re
32
import platform
33

    
34
from ganeti import rpc
35
from ganeti import ssh
36
from ganeti import logger
37
from ganeti import utils
38
from ganeti import errors
39
from ganeti import hypervisor
40
from ganeti import config
41
from ganeti import constants
42
from ganeti import objects
43
from ganeti import opcodes
44
from ganeti import ssconf
45
from ganeti import serializer
46

    
47

    
48
class LogicalUnit(object):
49
  """Logical Unit base class.
50

51
  Subclasses must follow these rules:
52
    - implement CheckPrereq which also fills in the opcode instance
53
      with all the fields (even if as None)
54
    - implement Exec
55
    - implement BuildHooksEnv
56
    - redefine HPATH and HTYPE
57
    - optionally redefine their run requirements:
58
        REQ_MASTER: the LU needs to run on the master node
59
        REQ_WSSTORE: the LU needs a writable SimpleStore
60
        REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
61

62
  Note that all commands require root permissions.
63

64
  """
65
  HPATH = None
66
  HTYPE = None
67
  _OP_REQP = []
68
  REQ_MASTER = True
69
  REQ_WSSTORE = False
70
  REQ_BGL = True
71

    
72
  def __init__(self, processor, op, context, sstore):
73
    """Constructor for LogicalUnit.
74

75
    This needs to be overriden in derived classes in order to check op
76
    validity.
77

78
    """
79
    self.proc = processor
80
    self.op = op
81
    self.cfg = context.cfg
82
    self.sstore = sstore
83
    self.context = context
84
    self.__ssh = None
85

    
86
    for attr_name in self._OP_REQP:
87
      attr_val = getattr(op, attr_name, None)
88
      if attr_val is None:
89
        raise errors.OpPrereqError("Required parameter '%s' missing" %
90
                                   attr_name)
91

    
92
    if not self.cfg.IsCluster():
93
      raise errors.OpPrereqError("Cluster not initialized yet,"
94
                                 " use 'gnt-cluster init' first.")
95
    if self.REQ_MASTER:
96
      master = sstore.GetMasterNode()
97
      if master != utils.HostInfo().name:
98
        raise errors.OpPrereqError("Commands must be run on the master"
99
                                   " node %s" % master)
100

    
101
  def __GetSSH(self):
102
    """Returns the SshRunner object
103

104
    """
105
    if not self.__ssh:
106
      self.__ssh = ssh.SshRunner(self.sstore)
107
    return self.__ssh
108

    
109
  ssh = property(fget=__GetSSH)
110

    
111
  def CheckPrereq(self):
112
    """Check prerequisites for this LU.
113

114
    This method should check that the prerequisites for the execution
115
    of this LU are fulfilled. It can do internode communication, but
116
    it should be idempotent - no cluster or system changes are
117
    allowed.
118

119
    The method should raise errors.OpPrereqError in case something is
120
    not fulfilled. Its return value is ignored.
121

122
    This method should also update all the parameters of the opcode to
123
    their canonical form; e.g. a short node name must be fully
124
    expanded after this method has successfully completed (so that
125
    hooks, logging, etc. work correctly).
126

127
    """
128
    raise NotImplementedError
129

    
130
  def Exec(self, feedback_fn):
131
    """Execute the LU.
132

133
    This method should implement the actual work. It should raise
134
    errors.OpExecError for failures that are somewhat dealt with in
135
    code, or expected.
136

137
    """
138
    raise NotImplementedError
139

    
140
  def BuildHooksEnv(self):
141
    """Build hooks environment for this LU.
142

143
    This method should return a three-node tuple consisting of: a dict
144
    containing the environment that will be used for running the
145
    specific hook for this LU, a list of node names on which the hook
146
    should run before the execution, and a list of node names on which
147
    the hook should run after the execution.
148

149
    The keys of the dict must not have 'GANETI_' prefixed as this will
150
    be handled in the hooks runner. Also note additional keys will be
151
    added by the hooks runner. If the LU doesn't define any
152
    environment, an empty dict (and not None) should be returned.
153

154
    No nodes should be returned as an empty list (and not None).
155

156
    Note that if the HPATH for a LU class is None, this function will
157
    not be called.
158

159
    """
160
    raise NotImplementedError
161

    
162
  def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
163
    """Notify the LU about the results of its hooks.
164

165
    This method is called every time a hooks phase is executed, and notifies
166
    the Logical Unit about the hooks' result. The LU can then use it to alter
167
    its result based on the hooks.  By default the method does nothing and the
168
    previous result is passed back unchanged but any LU can define it if it
169
    wants to use the local cluster hook-scripts somehow.
170

171
    Args:
172
      phase: the hooks phase that has just been run
173
      hooks_results: the results of the multi-node hooks rpc call
174
      feedback_fn: function to send feedback back to the caller
175
      lu_result: the previous result this LU had, or None in the PRE phase.
176

177
    """
178
    return lu_result
179

    
180

    
181
class NoHooksLU(LogicalUnit):
182
  """Simple LU which runs no hooks.
183

184
  This LU is intended as a parent for other LogicalUnits which will
185
  run no hooks, in order to reduce duplicate code.
186

187
  """
188
  HPATH = None
189
  HTYPE = None
190

    
191

    
192
def _GetWantedNodes(lu, nodes):
193
  """Returns list of checked and expanded node names.
194

195
  Args:
196
    nodes: List of nodes (strings) or None for all
197

198
  """
199
  if not isinstance(nodes, list):
200
    raise errors.OpPrereqError("Invalid argument type 'nodes'")
201

    
202
  if nodes:
203
    wanted = []
204

    
205
    for name in nodes:
206
      node = lu.cfg.ExpandNodeName(name)
207
      if node is None:
208
        raise errors.OpPrereqError("No such node name '%s'" % name)
209
      wanted.append(node)
210

    
211
  else:
212
    wanted = lu.cfg.GetNodeList()
213
  return utils.NiceSort(wanted)
214

    
215

    
216
def _GetWantedInstances(lu, instances):
217
  """Returns list of checked and expanded instance names.
218

219
  Args:
220
    instances: List of instances (strings) or None for all
221

222
  """
223
  if not isinstance(instances, list):
224
    raise errors.OpPrereqError("Invalid argument type 'instances'")
225

    
226
  if instances:
227
    wanted = []
228

    
229
    for name in instances:
230
      instance = lu.cfg.ExpandInstanceName(name)
231
      if instance is None:
232
        raise errors.OpPrereqError("No such instance name '%s'" % name)
233
      wanted.append(instance)
234

    
235
  else:
236
    wanted = lu.cfg.GetInstanceList()
237
  return utils.NiceSort(wanted)
238

    
239

    
240
def _CheckOutputFields(static, dynamic, selected):
241
  """Checks whether all selected fields are valid.
242

243
  Args:
244
    static: Static fields
245
    dynamic: Dynamic fields
246

247
  """
248
  static_fields = frozenset(static)
249
  dynamic_fields = frozenset(dynamic)
250

    
251
  all_fields = static_fields | dynamic_fields
252

    
253
  if not all_fields.issuperset(selected):
254
    raise errors.OpPrereqError("Unknown output fields selected: %s"
255
                               % ",".join(frozenset(selected).
256
                                          difference(all_fields)))
257

    
258

    
259
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
260
                          memory, vcpus, nics):
261
  """Builds instance related env variables for hooks from single variables.
262

263
  Args:
264
    secondary_nodes: List of secondary nodes as strings
265
  """
266
  env = {
267
    "OP_TARGET": name,
268
    "INSTANCE_NAME": name,
269
    "INSTANCE_PRIMARY": primary_node,
270
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
271
    "INSTANCE_OS_TYPE": os_type,
272
    "INSTANCE_STATUS": status,
273
    "INSTANCE_MEMORY": memory,
274
    "INSTANCE_VCPUS": vcpus,
275
  }
276

    
277
  if nics:
278
    nic_count = len(nics)
279
    for idx, (ip, bridge, mac) in enumerate(nics):
280
      if ip is None:
281
        ip = ""
282
      env["INSTANCE_NIC%d_IP" % idx] = ip
283
      env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
284
      env["INSTANCE_NIC%d_HWADDR" % idx] = mac
285
  else:
286
    nic_count = 0
287

    
288
  env["INSTANCE_NIC_COUNT"] = nic_count
289

    
290
  return env
291

    
292

    
293
def _BuildInstanceHookEnvByObject(instance, override=None):
294
  """Builds instance related env variables for hooks from an object.
295

296
  Args:
297
    instance: objects.Instance object of instance
298
    override: dict of values to override
299
  """
300
  args = {
301
    'name': instance.name,
302
    'primary_node': instance.primary_node,
303
    'secondary_nodes': instance.secondary_nodes,
304
    'os_type': instance.os,
305
    'status': instance.os,
306
    'memory': instance.memory,
307
    'vcpus': instance.vcpus,
308
    'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
309
  }
310
  if override:
311
    args.update(override)
312
  return _BuildInstanceHookEnv(**args)
313

    
314

    
315
def _CheckInstanceBridgesExist(instance):
316
  """Check that the brigdes needed by an instance exist.
317

318
  """
319
  # check bridges existance
320
  brlist = [nic.bridge for nic in instance.nics]
321
  if not rpc.call_bridges_exist(instance.primary_node, brlist):
322
    raise errors.OpPrereqError("one or more target bridges %s does not"
323
                               " exist on destination node '%s'" %
324
                               (brlist, instance.primary_node))
325

    
326

    
327
class LUDestroyCluster(NoHooksLU):
328
  """Logical unit for destroying the cluster.
329

330
  """
331
  _OP_REQP = []
332

    
333
  def CheckPrereq(self):
334
    """Check prerequisites.
335

336
    This checks whether the cluster is empty.
337

338
    Any errors are signalled by raising errors.OpPrereqError.
339

340
    """
341
    master = self.sstore.GetMasterNode()
342

    
343
    nodelist = self.cfg.GetNodeList()
344
    if len(nodelist) != 1 or nodelist[0] != master:
345
      raise errors.OpPrereqError("There are still %d node(s) in"
346
                                 " this cluster." % (len(nodelist) - 1))
347
    instancelist = self.cfg.GetInstanceList()
348
    if instancelist:
349
      raise errors.OpPrereqError("There are still %d instance(s) in"
350
                                 " this cluster." % len(instancelist))
351

    
352
  def Exec(self, feedback_fn):
353
    """Destroys the cluster.
354

355
    """
356
    master = self.sstore.GetMasterNode()
357
    if not rpc.call_node_stop_master(master):
358
      raise errors.OpExecError("Could not disable the master role")
359
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
360
    utils.CreateBackup(priv_key)
361
    utils.CreateBackup(pub_key)
362
    rpc.call_node_leave_cluster(master)
363

    
364

    
365
class LUVerifyCluster(LogicalUnit):
366
  """Verifies the cluster status.
367

368
  """
369
  HPATH = "cluster-verify"
370
  HTYPE = constants.HTYPE_CLUSTER
371
  _OP_REQP = ["skip_checks"]
372

    
373
  def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
374
                  remote_version, feedback_fn):
375
    """Run multiple tests against a node.
376

377
    Test list:
378
      - compares ganeti version
379
      - checks vg existance and size > 20G
380
      - checks config file checksum
381
      - checks ssh to other nodes
382

383
    Args:
384
      node: name of the node to check
385
      file_list: required list of files
386
      local_cksum: dictionary of local files and their checksums
387

388
    """
389
    # compares ganeti version
390
    local_version = constants.PROTOCOL_VERSION
391
    if not remote_version:
392
      feedback_fn("  - ERROR: connection to %s failed" % (node))
393
      return True
394

    
395
    if local_version != remote_version:
396
      feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
397
                      (local_version, node, remote_version))
398
      return True
399

    
400
    # checks vg existance and size > 20G
401

    
402
    bad = False
403
    if not vglist:
404
      feedback_fn("  - ERROR: unable to check volume groups on node %s." %
405
                      (node,))
406
      bad = True
407
    else:
408
      vgstatus = utils.CheckVolumeGroupSize(vglist, self.cfg.GetVGName(),
409
                                            constants.MIN_VG_SIZE)
410
      if vgstatus:
411
        feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
412
        bad = True
413

    
414
    # checks config file checksum
415
    # checks ssh to any
416

    
417
    if 'filelist' not in node_result:
418
      bad = True
419
      feedback_fn("  - ERROR: node hasn't returned file checksum data")
420
    else:
421
      remote_cksum = node_result['filelist']
422
      for file_name in file_list:
423
        if file_name not in remote_cksum:
424
          bad = True
425
          feedback_fn("  - ERROR: file '%s' missing" % file_name)
426
        elif remote_cksum[file_name] != local_cksum[file_name]:
427
          bad = True
428
          feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
429

    
430
    if 'nodelist' not in node_result:
431
      bad = True
432
      feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
433
    else:
434
      if node_result['nodelist']:
435
        bad = True
436
        for node in node_result['nodelist']:
437
          feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
438
                          (node, node_result['nodelist'][node]))
439
    if 'node-net-test' not in node_result:
440
      bad = True
441
      feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
442
    else:
443
      if node_result['node-net-test']:
444
        bad = True
445
        nlist = utils.NiceSort(node_result['node-net-test'].keys())
446
        for node in nlist:
447
          feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
448
                          (node, node_result['node-net-test'][node]))
449

    
450
    hyp_result = node_result.get('hypervisor', None)
451
    if hyp_result is not None:
452
      feedback_fn("  - ERROR: hypervisor verify failure: '%s'" % hyp_result)
453
    return bad
454

    
455
  def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
456
                      node_instance, feedback_fn):
457
    """Verify an instance.
458

459
    This function checks to see if the required block devices are
460
    available on the instance's node.
461

462
    """
463
    bad = False
464

    
465
    node_current = instanceconfig.primary_node
466

    
467
    node_vol_should = {}
468
    instanceconfig.MapLVsByNode(node_vol_should)
469

    
470
    for node in node_vol_should:
471
      for volume in node_vol_should[node]:
472
        if node not in node_vol_is or volume not in node_vol_is[node]:
473
          feedback_fn("  - ERROR: volume %s missing on node %s" %
474
                          (volume, node))
475
          bad = True
476

    
477
    if not instanceconfig.status == 'down':
478
      if (node_current not in node_instance or
479
          not instance in node_instance[node_current]):
480
        feedback_fn("  - ERROR: instance %s not running on node %s" %
481
                        (instance, node_current))
482
        bad = True
483

    
484
    for node in node_instance:
485
      if (not node == node_current):
486
        if instance in node_instance[node]:
487
          feedback_fn("  - ERROR: instance %s should not run on node %s" %
488
                          (instance, node))
489
          bad = True
490

    
491
    return bad
492

    
493
  def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
494
    """Verify if there are any unknown volumes in the cluster.
495

496
    The .os, .swap and backup volumes are ignored. All other volumes are
497
    reported as unknown.
498

499
    """
500
    bad = False
501

    
502
    for node in node_vol_is:
503
      for volume in node_vol_is[node]:
504
        if node not in node_vol_should or volume not in node_vol_should[node]:
505
          feedback_fn("  - ERROR: volume %s on node %s should not exist" %
506
                      (volume, node))
507
          bad = True
508
    return bad
509

    
510
  def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
511
    """Verify the list of running instances.
512

513
    This checks what instances are running but unknown to the cluster.
514

515
    """
516
    bad = False
517
    for node in node_instance:
518
      for runninginstance in node_instance[node]:
519
        if runninginstance not in instancelist:
520
          feedback_fn("  - ERROR: instance %s on node %s should not exist" %
521
                          (runninginstance, node))
522
          bad = True
523
    return bad
524

    
525
  def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
526
    """Verify N+1 Memory Resilience.
527

528
    Check that if one single node dies we can still start all the instances it
529
    was primary for.
530

531
    """
532
    bad = False
533

    
534
    for node, nodeinfo in node_info.iteritems():
535
      # This code checks that every node which is now listed as secondary has
536
      # enough memory to host all instances it is supposed to should a single
537
      # other node in the cluster fail.
538
      # FIXME: not ready for failover to an arbitrary node
539
      # FIXME: does not support file-backed instances
540
      # WARNING: we currently take into account down instances as well as up
541
      # ones, considering that even if they're down someone might want to start
542
      # them even in the event of a node failure.
543
      for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
544
        needed_mem = 0
545
        for instance in instances:
546
          needed_mem += instance_cfg[instance].memory
547
        if nodeinfo['mfree'] < needed_mem:
548
          feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
549
                      " failovers should node %s fail" % (node, prinode))
550
          bad = True
551
    return bad
552

    
553
  def CheckPrereq(self):
554
    """Check prerequisites.
555

556
    Transform the list of checks we're going to skip into a set and check that
557
    all its members are valid.
558

559
    """
560
    self.skip_set = frozenset(self.op.skip_checks)
561
    if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
562
      raise errors.OpPrereqError("Invalid checks to be skipped specified")
563

    
564
  def BuildHooksEnv(self):
565
    """Build hooks env.
566

567
    Cluster-Verify hooks just rone in the post phase and their failure makes
568
    the output be logged in the verify output and the verification to fail.
569

570
    """
571
    all_nodes = self.cfg.GetNodeList()
572
    # TODO: populate the environment with useful information for verify hooks
573
    env = {}
574
    return env, [], all_nodes
575

    
576
  def Exec(self, feedback_fn):
577
    """Verify integrity of cluster, performing various test on nodes.
578

579
    """
580
    bad = False
581
    feedback_fn("* Verifying global settings")
582
    for msg in self.cfg.VerifyConfig():
583
      feedback_fn("  - ERROR: %s" % msg)
584

    
585
    vg_name = self.cfg.GetVGName()
586
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
587
    nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
588
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
589
    i_non_redundant = [] # Non redundant instances
590
    node_volume = {}
591
    node_instance = {}
592
    node_info = {}
593
    instance_cfg = {}
594

    
595
    # FIXME: verify OS list
596
    # do local checksums
597
    file_names = list(self.sstore.GetFileList())
598
    file_names.append(constants.SSL_CERT_FILE)
599
    file_names.append(constants.CLUSTER_CONF_FILE)
600
    local_checksums = utils.FingerprintFiles(file_names)
601

    
602
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
603
    all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
604
    all_instanceinfo = rpc.call_instance_list(nodelist)
605
    all_vglist = rpc.call_vg_list(nodelist)
606
    node_verify_param = {
607
      'filelist': file_names,
608
      'nodelist': nodelist,
609
      'hypervisor': None,
610
      'node-net-test': [(node.name, node.primary_ip, node.secondary_ip)
611
                        for node in nodeinfo]
612
      }
613
    all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
614
    all_rversion = rpc.call_version(nodelist)
615
    all_ninfo = rpc.call_node_info(nodelist, self.cfg.GetVGName())
616

    
617
    for node in nodelist:
618
      feedback_fn("* Verifying node %s" % node)
619
      result = self._VerifyNode(node, file_names, local_checksums,
620
                                all_vglist[node], all_nvinfo[node],
621
                                all_rversion[node], feedback_fn)
622
      bad = bad or result
623

    
624
      # node_volume
625
      volumeinfo = all_volumeinfo[node]
626

    
627
      if isinstance(volumeinfo, basestring):
628
        feedback_fn("  - ERROR: LVM problem on node %s: %s" %
629
                    (node, volumeinfo[-400:].encode('string_escape')))
630
        bad = True
631
        node_volume[node] = {}
632
      elif not isinstance(volumeinfo, dict):
633
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
634
        bad = True
635
        continue
636
      else:
637
        node_volume[node] = volumeinfo
638

    
639
      # node_instance
640
      nodeinstance = all_instanceinfo[node]
641
      if type(nodeinstance) != list:
642
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
643
        bad = True
644
        continue
645

    
646
      node_instance[node] = nodeinstance
647

    
648
      # node_info
649
      nodeinfo = all_ninfo[node]
650
      if not isinstance(nodeinfo, dict):
651
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
652
        bad = True
653
        continue
654

    
655
      try:
656
        node_info[node] = {
657
          "mfree": int(nodeinfo['memory_free']),
658
          "dfree": int(nodeinfo['vg_free']),
659
          "pinst": [],
660
          "sinst": [],
661
          # dictionary holding all instances this node is secondary for,
662
          # grouped by their primary node. Each key is a cluster node, and each
663
          # value is a list of instances which have the key as primary and the
664
          # current node as secondary.  this is handy to calculate N+1 memory
665
          # availability if you can only failover from a primary to its
666
          # secondary.
667
          "sinst-by-pnode": {},
668
        }
669
      except ValueError:
670
        feedback_fn("  - ERROR: invalid value returned from node %s" % (node,))
671
        bad = True
672
        continue
673

    
674
    node_vol_should = {}
675

    
676
    for instance in instancelist:
677
      feedback_fn("* Verifying instance %s" % instance)
678
      inst_config = self.cfg.GetInstanceInfo(instance)
679
      result =  self._VerifyInstance(instance, inst_config, node_volume,
680
                                     node_instance, feedback_fn)
681
      bad = bad or result
682

    
683
      inst_config.MapLVsByNode(node_vol_should)
684

    
685
      instance_cfg[instance] = inst_config
686

    
687
      pnode = inst_config.primary_node
688
      if pnode in node_info:
689
        node_info[pnode]['pinst'].append(instance)
690
      else:
691
        feedback_fn("  - ERROR: instance %s, connection to primary node"
692
                    " %s failed" % (instance, pnode))
693
        bad = True
694

    
695
      # If the instance is non-redundant we cannot survive losing its primary
696
      # node, so we are not N+1 compliant. On the other hand we have no disk
697
      # templates with more than one secondary so that situation is not well
698
      # supported either.
699
      # FIXME: does not support file-backed instances
700
      if len(inst_config.secondary_nodes) == 0:
701
        i_non_redundant.append(instance)
702
      elif len(inst_config.secondary_nodes) > 1:
703
        feedback_fn("  - WARNING: multiple secondaries for instance %s"
704
                    % instance)
705

    
706
      for snode in inst_config.secondary_nodes:
707
        if snode in node_info:
708
          node_info[snode]['sinst'].append(instance)
709
          if pnode not in node_info[snode]['sinst-by-pnode']:
710
            node_info[snode]['sinst-by-pnode'][pnode] = []
711
          node_info[snode]['sinst-by-pnode'][pnode].append(instance)
712
        else:
713
          feedback_fn("  - ERROR: instance %s, connection to secondary node"
714
                      " %s failed" % (instance, snode))
715

    
716
    feedback_fn("* Verifying orphan volumes")
717
    result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
718
                                       feedback_fn)
719
    bad = bad or result
720

    
721
    feedback_fn("* Verifying remaining instances")
722
    result = self._VerifyOrphanInstances(instancelist, node_instance,
723
                                         feedback_fn)
724
    bad = bad or result
725

    
726
    if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
727
      feedback_fn("* Verifying N+1 Memory redundancy")
728
      result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
729
      bad = bad or result
730

    
731
    feedback_fn("* Other Notes")
732
    if i_non_redundant:
733
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
734
                  % len(i_non_redundant))
735

    
736
    return int(bad)
737

    
738
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
739
    """Analize the post-hooks' result, handle it, and send some
740
    nicely-formatted feedback back to the user.
741

742
    Args:
743
      phase: the hooks phase that has just been run
744
      hooks_results: the results of the multi-node hooks rpc call
745
      feedback_fn: function to send feedback back to the caller
746
      lu_result: previous Exec result
747

748
    """
749
    # We only really run POST phase hooks, and are only interested in their results
750
    if phase == constants.HOOKS_PHASE_POST:
751
      # Used to change hooks' output to proper indentation
752
      indent_re = re.compile('^', re.M)
753
      feedback_fn("* Hooks Results")
754
      if not hooks_results:
755
        feedback_fn("  - ERROR: general communication failure")
756
        lu_result = 1
757
      else:
758
        for node_name in hooks_results:
759
          show_node_header = True
760
          res = hooks_results[node_name]
761
          if res is False or not isinstance(res, list):
762
            feedback_fn("    Communication failure")
763
            lu_result = 1
764
            continue
765
          for script, hkr, output in res:
766
            if hkr == constants.HKR_FAIL:
767
              # The node header is only shown once, if there are
768
              # failing hooks on that node
769
              if show_node_header:
770
                feedback_fn("  Node %s:" % node_name)
771
                show_node_header = False
772
              feedback_fn("    ERROR: Script %s failed, output:" % script)
773
              output = indent_re.sub('      ', output)
774
              feedback_fn("%s" % output)
775
              lu_result = 1
776

    
777
      return lu_result
778

    
779

    
780
class LUVerifyDisks(NoHooksLU):
781
  """Verifies the cluster disks status.
782

783
  """
784
  _OP_REQP = []
785

    
786
  def CheckPrereq(self):
787
    """Check prerequisites.
788

789
    This has no prerequisites.
790

791
    """
792
    pass
793

    
794
  def Exec(self, feedback_fn):
795
    """Verify integrity of cluster disks.
796

797
    """
798
    result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
799

    
800
    vg_name = self.cfg.GetVGName()
801
    nodes = utils.NiceSort(self.cfg.GetNodeList())
802
    instances = [self.cfg.GetInstanceInfo(name)
803
                 for name in self.cfg.GetInstanceList()]
804

    
805
    nv_dict = {}
806
    for inst in instances:
807
      inst_lvs = {}
808
      if (inst.status != "up" or
809
          inst.disk_template not in constants.DTS_NET_MIRROR):
810
        continue
811
      inst.MapLVsByNode(inst_lvs)
812
      # transform { iname: {node: [vol,],},} to {(node, vol): iname}
813
      for node, vol_list in inst_lvs.iteritems():
814
        for vol in vol_list:
815
          nv_dict[(node, vol)] = inst
816

    
817
    if not nv_dict:
818
      return result
819

    
820
    node_lvs = rpc.call_volume_list(nodes, vg_name)
821

    
822
    to_act = set()
823
    for node in nodes:
824
      # node_volume
825
      lvs = node_lvs[node]
826

    
827
      if isinstance(lvs, basestring):
828
        logger.Info("error enumerating LVs on node %s: %s" % (node, lvs))
829
        res_nlvm[node] = lvs
830
      elif not isinstance(lvs, dict):
831
        logger.Info("connection to node %s failed or invalid data returned" %
832
                    (node,))
833
        res_nodes.append(node)
834
        continue
835

    
836
      for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
837
        inst = nv_dict.pop((node, lv_name), None)
838
        if (not lv_online and inst is not None
839
            and inst.name not in res_instances):
840
          res_instances.append(inst.name)
841

    
842
    # any leftover items in nv_dict are missing LVs, let's arrange the
843
    # data better
844
    for key, inst in nv_dict.iteritems():
845
      if inst.name not in res_missing:
846
        res_missing[inst.name] = []
847
      res_missing[inst.name].append(key)
848

    
849
    return result
850

    
851

    
852
class LURenameCluster(LogicalUnit):
853
  """Rename the cluster.
854

855
  """
856
  HPATH = "cluster-rename"
857
  HTYPE = constants.HTYPE_CLUSTER
858
  _OP_REQP = ["name"]
859
  REQ_WSSTORE = True
860

    
861
  def BuildHooksEnv(self):
862
    """Build hooks env.
863

864
    """
865
    env = {
866
      "OP_TARGET": self.sstore.GetClusterName(),
867
      "NEW_NAME": self.op.name,
868
      }
869
    mn = self.sstore.GetMasterNode()
870
    return env, [mn], [mn]
871

    
872
  def CheckPrereq(self):
873
    """Verify that the passed name is a valid one.
874

875
    """
876
    hostname = utils.HostInfo(self.op.name)
877

    
878
    new_name = hostname.name
879
    self.ip = new_ip = hostname.ip
880
    old_name = self.sstore.GetClusterName()
881
    old_ip = self.sstore.GetMasterIP()
882
    if new_name == old_name and new_ip == old_ip:
883
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
884
                                 " cluster has changed")
885
    if new_ip != old_ip:
886
      if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
887
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
888
                                   " reachable on the network. Aborting." %
889
                                   new_ip)
890

    
891
    self.op.name = new_name
892

    
893
  def Exec(self, feedback_fn):
894
    """Rename the cluster.
895

896
    """
897
    clustername = self.op.name
898
    ip = self.ip
899
    ss = self.sstore
900

    
901
    # shutdown the master IP
902
    master = ss.GetMasterNode()
903
    if not rpc.call_node_stop_master(master):
904
      raise errors.OpExecError("Could not disable the master role")
905

    
906
    try:
907
      # modify the sstore
908
      ss.SetKey(ss.SS_MASTER_IP, ip)
909
      ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
910

    
911
      # Distribute updated ss config to all nodes
912
      myself = self.cfg.GetNodeInfo(master)
913
      dist_nodes = self.cfg.GetNodeList()
914
      if myself.name in dist_nodes:
915
        dist_nodes.remove(myself.name)
916

    
917
      logger.Debug("Copying updated ssconf data to all nodes")
918
      for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
919
        fname = ss.KeyToFilename(keyname)
920
        result = rpc.call_upload_file(dist_nodes, fname)
921
        for to_node in dist_nodes:
922
          if not result[to_node]:
923
            logger.Error("copy of file %s to node %s failed" %
924
                         (fname, to_node))
925
    finally:
926
      if not rpc.call_node_start_master(master):
927
        logger.Error("Could not re-enable the master role on the master,"
928
                     " please restart manually.")
929

    
930

    
931
def _RecursiveCheckIfLVMBased(disk):
932
  """Check if the given disk or its children are lvm-based.
933

934
  Args:
935
    disk: ganeti.objects.Disk object
936

937
  Returns:
938
    boolean indicating whether a LD_LV dev_type was found or not
939

940
  """
941
  if disk.children:
942
    for chdisk in disk.children:
943
      if _RecursiveCheckIfLVMBased(chdisk):
944
        return True
945
  return disk.dev_type == constants.LD_LV
946

    
947

    
948
class LUSetClusterParams(LogicalUnit):
949
  """Change the parameters of the cluster.
950

951
  """
952
  HPATH = "cluster-modify"
953
  HTYPE = constants.HTYPE_CLUSTER
954
  _OP_REQP = []
955

    
956
  def BuildHooksEnv(self):
957
    """Build hooks env.
958

959
    """
960
    env = {
961
      "OP_TARGET": self.sstore.GetClusterName(),
962
      "NEW_VG_NAME": self.op.vg_name,
963
      }
964
    mn = self.sstore.GetMasterNode()
965
    return env, [mn], [mn]
966

    
967
  def CheckPrereq(self):
968
    """Check prerequisites.
969

970
    This checks whether the given params don't conflict and
971
    if the given volume group is valid.
972

973
    """
974
    if not self.op.vg_name:
975
      instances = [self.cfg.GetInstanceInfo(name)
976
                   for name in self.cfg.GetInstanceList()]
977
      for inst in instances:
978
        for disk in inst.disks:
979
          if _RecursiveCheckIfLVMBased(disk):
980
            raise errors.OpPrereqError("Cannot disable lvm storage while"
981
                                       " lvm-based instances exist")
982

    
983
    # if vg_name not None, checks given volume group on all nodes
984
    if self.op.vg_name:
985
      node_list = self.cfg.GetNodeList()
986
      vglist = rpc.call_vg_list(node_list)
987
      for node in node_list:
988
        vgstatus = utils.CheckVolumeGroupSize(vglist[node], self.op.vg_name,
989
                                              constants.MIN_VG_SIZE)
990
        if vgstatus:
991
          raise errors.OpPrereqError("Error on node '%s': %s" %
992
                                     (node, vgstatus))
993

    
994
  def Exec(self, feedback_fn):
995
    """Change the parameters of the cluster.
996

997
    """
998
    if self.op.vg_name != self.cfg.GetVGName():
999
      self.cfg.SetVGName(self.op.vg_name)
1000
    else:
1001
      feedback_fn("Cluster LVM configuration already in desired"
1002
                  " state, not changing")
1003

    
1004

    
1005
def _WaitForSync(cfgw, instance, proc, oneshot=False, unlock=False):
1006
  """Sleep and poll for an instance's disk to sync.
1007

1008
  """
1009
  if not instance.disks:
1010
    return True
1011

    
1012
  if not oneshot:
1013
    proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1014

    
1015
  node = instance.primary_node
1016

    
1017
  for dev in instance.disks:
1018
    cfgw.SetDiskID(dev, node)
1019

    
1020
  retries = 0
1021
  while True:
1022
    max_time = 0
1023
    done = True
1024
    cumul_degraded = False
1025
    rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1026
    if not rstats:
1027
      proc.LogWarning("Can't get any data from node %s" % node)
1028
      retries += 1
1029
      if retries >= 10:
1030
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1031
                                 " aborting." % node)
1032
      time.sleep(6)
1033
      continue
1034
    retries = 0
1035
    for i in range(len(rstats)):
1036
      mstat = rstats[i]
1037
      if mstat is None:
1038
        proc.LogWarning("Can't compute data for node %s/%s" %
1039
                        (node, instance.disks[i].iv_name))
1040
        continue
1041
      # we ignore the ldisk parameter
1042
      perc_done, est_time, is_degraded, _ = mstat
1043
      cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1044
      if perc_done is not None:
1045
        done = False
1046
        if est_time is not None:
1047
          rem_time = "%d estimated seconds remaining" % est_time
1048
          max_time = est_time
1049
        else:
1050
          rem_time = "no time estimate"
1051
        proc.LogInfo("- device %s: %5.2f%% done, %s" %
1052
                     (instance.disks[i].iv_name, perc_done, rem_time))
1053
    if done or oneshot:
1054
      break
1055

    
1056
    if unlock:
1057
      #utils.Unlock('cmd')
1058
      pass
1059
    try:
1060
      time.sleep(min(60, max_time))
1061
    finally:
1062
      if unlock:
1063
        #utils.Lock('cmd')
1064
        pass
1065

    
1066
  if done:
1067
    proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1068
  return not cumul_degraded
1069

    
1070

    
1071
def _CheckDiskConsistency(cfgw, dev, node, on_primary, ldisk=False):
1072
  """Check that mirrors are not degraded.
1073

1074
  The ldisk parameter, if True, will change the test from the
1075
  is_degraded attribute (which represents overall non-ok status for
1076
  the device(s)) to the ldisk (representing the local storage status).
1077

1078
  """
1079
  cfgw.SetDiskID(dev, node)
1080
  if ldisk:
1081
    idx = 6
1082
  else:
1083
    idx = 5
1084

    
1085
  result = True
1086
  if on_primary or dev.AssembleOnSecondary():
1087
    rstats = rpc.call_blockdev_find(node, dev)
1088
    if not rstats:
1089
      logger.ToStderr("Node %s: Disk degraded, not found or node down" % node)
1090
      result = False
1091
    else:
1092
      result = result and (not rstats[idx])
1093
  if dev.children:
1094
    for child in dev.children:
1095
      result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1096

    
1097
  return result
1098

    
1099

    
1100
class LUDiagnoseOS(NoHooksLU):
1101
  """Logical unit for OS diagnose/query.
1102

1103
  """
1104
  _OP_REQP = ["output_fields", "names"]
1105

    
1106
  def CheckPrereq(self):
1107
    """Check prerequisites.
1108

1109
    This always succeeds, since this is a pure query LU.
1110

1111
    """
1112
    if self.op.names:
1113
      raise errors.OpPrereqError("Selective OS query not supported")
1114

    
1115
    self.dynamic_fields = frozenset(["name", "valid", "node_status"])
1116
    _CheckOutputFields(static=[],
1117
                       dynamic=self.dynamic_fields,
1118
                       selected=self.op.output_fields)
1119

    
1120
  @staticmethod
1121
  def _DiagnoseByOS(node_list, rlist):
1122
    """Remaps a per-node return list into an a per-os per-node dictionary
1123

1124
      Args:
1125
        node_list: a list with the names of all nodes
1126
        rlist: a map with node names as keys and OS objects as values
1127

1128
      Returns:
1129
        map: a map with osnames as keys and as value another map, with
1130
             nodes as
1131
             keys and list of OS objects as values
1132
             e.g. {"debian-etch": {"node1": [<object>,...],
1133
                                   "node2": [<object>,]}
1134
                  }
1135

1136
    """
1137
    all_os = {}
1138
    for node_name, nr in rlist.iteritems():
1139
      if not nr:
1140
        continue
1141
      for os_obj in nr:
1142
        if os_obj.name not in all_os:
1143
          # build a list of nodes for this os containing empty lists
1144
          # for each node in node_list
1145
          all_os[os_obj.name] = {}
1146
          for nname in node_list:
1147
            all_os[os_obj.name][nname] = []
1148
        all_os[os_obj.name][node_name].append(os_obj)
1149
    return all_os
1150

    
1151
  def Exec(self, feedback_fn):
1152
    """Compute the list of OSes.
1153

1154
    """
1155
    node_list = self.cfg.GetNodeList()
1156
    node_data = rpc.call_os_diagnose(node_list)
1157
    if node_data == False:
1158
      raise errors.OpExecError("Can't gather the list of OSes")
1159
    pol = self._DiagnoseByOS(node_list, node_data)
1160
    output = []
1161
    for os_name, os_data in pol.iteritems():
1162
      row = []
1163
      for field in self.op.output_fields:
1164
        if field == "name":
1165
          val = os_name
1166
        elif field == "valid":
1167
          val = utils.all([osl and osl[0] for osl in os_data.values()])
1168
        elif field == "node_status":
1169
          val = {}
1170
          for node_name, nos_list in os_data.iteritems():
1171
            val[node_name] = [(v.status, v.path) for v in nos_list]
1172
        else:
1173
          raise errors.ParameterError(field)
1174
        row.append(val)
1175
      output.append(row)
1176

    
1177
    return output
1178

    
1179

    
1180
class LURemoveNode(LogicalUnit):
1181
  """Logical unit for removing a node.
1182

1183
  """
1184
  HPATH = "node-remove"
1185
  HTYPE = constants.HTYPE_NODE
1186
  _OP_REQP = ["node_name"]
1187

    
1188
  def BuildHooksEnv(self):
1189
    """Build hooks env.
1190

1191
    This doesn't run on the target node in the pre phase as a failed
1192
    node would then be impossible to remove.
1193

1194
    """
1195
    env = {
1196
      "OP_TARGET": self.op.node_name,
1197
      "NODE_NAME": self.op.node_name,
1198
      }
1199
    all_nodes = self.cfg.GetNodeList()
1200
    all_nodes.remove(self.op.node_name)
1201
    return env, all_nodes, all_nodes
1202

    
1203
  def CheckPrereq(self):
1204
    """Check prerequisites.
1205

1206
    This checks:
1207
     - the node exists in the configuration
1208
     - it does not have primary or secondary instances
1209
     - it's not the master
1210

1211
    Any errors are signalled by raising errors.OpPrereqError.
1212

1213
    """
1214
    node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1215
    if node is None:
1216
      raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1217

    
1218
    instance_list = self.cfg.GetInstanceList()
1219

    
1220
    masternode = self.sstore.GetMasterNode()
1221
    if node.name == masternode:
1222
      raise errors.OpPrereqError("Node is the master node,"
1223
                                 " you need to failover first.")
1224

    
1225
    for instance_name in instance_list:
1226
      instance = self.cfg.GetInstanceInfo(instance_name)
1227
      if node.name == instance.primary_node:
1228
        raise errors.OpPrereqError("Instance %s still running on the node,"
1229
                                   " please remove first." % instance_name)
1230
      if node.name in instance.secondary_nodes:
1231
        raise errors.OpPrereqError("Instance %s has node as a secondary,"
1232
                                   " please remove first." % instance_name)
1233
    self.op.node_name = node.name
1234
    self.node = node
1235

    
1236
  def Exec(self, feedback_fn):
1237
    """Removes the node from the cluster.
1238

1239
    """
1240
    node = self.node
1241
    logger.Info("stopping the node daemon and removing configs from node %s" %
1242
                node.name)
1243

    
1244
    rpc.call_node_leave_cluster(node.name)
1245

    
1246
    logger.Info("Removing node %s from config" % node.name)
1247

    
1248
    self.cfg.RemoveNode(node.name)
1249
    # Remove the node from the Ganeti Lock Manager
1250
    self.context.glm.remove(locking.LEVEL_NODE, node.name)
1251

    
1252
    utils.RemoveHostFromEtcHosts(node.name)
1253

    
1254

    
1255
class LUQueryNodes(NoHooksLU):
1256
  """Logical unit for querying nodes.
1257

1258
  """
1259
  _OP_REQP = ["output_fields", "names"]
1260

    
1261
  def CheckPrereq(self):
1262
    """Check prerequisites.
1263

1264
    This checks that the fields required are valid output fields.
1265

1266
    """
1267
    self.dynamic_fields = frozenset([
1268
      "dtotal", "dfree",
1269
      "mtotal", "mnode", "mfree",
1270
      "bootid",
1271
      "ctotal",
1272
      ])
1273

    
1274
    _CheckOutputFields(static=["name", "pinst_cnt", "sinst_cnt",
1275
                               "pinst_list", "sinst_list",
1276
                               "pip", "sip", "tags"],
1277
                       dynamic=self.dynamic_fields,
1278
                       selected=self.op.output_fields)
1279

    
1280
    self.wanted = _GetWantedNodes(self, self.op.names)
1281

    
1282
  def Exec(self, feedback_fn):
1283
    """Computes the list of nodes and their attributes.
1284

1285
    """
1286
    nodenames = self.wanted
1287
    nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1288

    
1289
    # begin data gathering
1290

    
1291
    if self.dynamic_fields.intersection(self.op.output_fields):
1292
      live_data = {}
1293
      node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1294
      for name in nodenames:
1295
        nodeinfo = node_data.get(name, None)
1296
        if nodeinfo:
1297
          live_data[name] = {
1298
            "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1299
            "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1300
            "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1301
            "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1302
            "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1303
            "ctotal": utils.TryConvert(int, nodeinfo['cpu_total']),
1304
            "bootid": nodeinfo['bootid'],
1305
            }
1306
        else:
1307
          live_data[name] = {}
1308
    else:
1309
      live_data = dict.fromkeys(nodenames, {})
1310

    
1311
    node_to_primary = dict([(name, set()) for name in nodenames])
1312
    node_to_secondary = dict([(name, set()) for name in nodenames])
1313

    
1314
    inst_fields = frozenset(("pinst_cnt", "pinst_list",
1315
                             "sinst_cnt", "sinst_list"))
1316
    if inst_fields & frozenset(self.op.output_fields):
1317
      instancelist = self.cfg.GetInstanceList()
1318

    
1319
      for instance_name in instancelist:
1320
        inst = self.cfg.GetInstanceInfo(instance_name)
1321
        if inst.primary_node in node_to_primary:
1322
          node_to_primary[inst.primary_node].add(inst.name)
1323
        for secnode in inst.secondary_nodes:
1324
          if secnode in node_to_secondary:
1325
            node_to_secondary[secnode].add(inst.name)
1326

    
1327
    # end data gathering
1328

    
1329
    output = []
1330
    for node in nodelist:
1331
      node_output = []
1332
      for field in self.op.output_fields:
1333
        if field == "name":
1334
          val = node.name
1335
        elif field == "pinst_list":
1336
          val = list(node_to_primary[node.name])
1337
        elif field == "sinst_list":
1338
          val = list(node_to_secondary[node.name])
1339
        elif field == "pinst_cnt":
1340
          val = len(node_to_primary[node.name])
1341
        elif field == "sinst_cnt":
1342
          val = len(node_to_secondary[node.name])
1343
        elif field == "pip":
1344
          val = node.primary_ip
1345
        elif field == "sip":
1346
          val = node.secondary_ip
1347
        elif field == "tags":
1348
          val = list(node.GetTags())
1349
        elif field in self.dynamic_fields:
1350
          val = live_data[node.name].get(field, None)
1351
        else:
1352
          raise errors.ParameterError(field)
1353
        node_output.append(val)
1354
      output.append(node_output)
1355

    
1356
    return output
1357

    
1358

    
1359
class LUQueryNodeVolumes(NoHooksLU):
1360
  """Logical unit for getting volumes on node(s).
1361

1362
  """
1363
  _OP_REQP = ["nodes", "output_fields"]
1364

    
1365
  def CheckPrereq(self):
1366
    """Check prerequisites.
1367

1368
    This checks that the fields required are valid output fields.
1369

1370
    """
1371
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1372

    
1373
    _CheckOutputFields(static=["node"],
1374
                       dynamic=["phys", "vg", "name", "size", "instance"],
1375
                       selected=self.op.output_fields)
1376

    
1377

    
1378
  def Exec(self, feedback_fn):
1379
    """Computes the list of nodes and their attributes.
1380

1381
    """
1382
    nodenames = self.nodes
1383
    volumes = rpc.call_node_volumes(nodenames)
1384

    
1385
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
1386
             in self.cfg.GetInstanceList()]
1387

    
1388
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1389

    
1390
    output = []
1391
    for node in nodenames:
1392
      if node not in volumes or not volumes[node]:
1393
        continue
1394

    
1395
      node_vols = volumes[node][:]
1396
      node_vols.sort(key=lambda vol: vol['dev'])
1397

    
1398
      for vol in node_vols:
1399
        node_output = []
1400
        for field in self.op.output_fields:
1401
          if field == "node":
1402
            val = node
1403
          elif field == "phys":
1404
            val = vol['dev']
1405
          elif field == "vg":
1406
            val = vol['vg']
1407
          elif field == "name":
1408
            val = vol['name']
1409
          elif field == "size":
1410
            val = int(float(vol['size']))
1411
          elif field == "instance":
1412
            for inst in ilist:
1413
              if node not in lv_by_node[inst]:
1414
                continue
1415
              if vol['name'] in lv_by_node[inst][node]:
1416
                val = inst.name
1417
                break
1418
            else:
1419
              val = '-'
1420
          else:
1421
            raise errors.ParameterError(field)
1422
          node_output.append(str(val))
1423

    
1424
        output.append(node_output)
1425

    
1426
    return output
1427

    
1428

    
1429
class LUAddNode(LogicalUnit):
1430
  """Logical unit for adding node to the cluster.
1431

1432
  """
1433
  HPATH = "node-add"
1434
  HTYPE = constants.HTYPE_NODE
1435
  _OP_REQP = ["node_name"]
1436

    
1437
  def BuildHooksEnv(self):
1438
    """Build hooks env.
1439

1440
    This will run on all nodes before, and on all nodes + the new node after.
1441

1442
    """
1443
    env = {
1444
      "OP_TARGET": self.op.node_name,
1445
      "NODE_NAME": self.op.node_name,
1446
      "NODE_PIP": self.op.primary_ip,
1447
      "NODE_SIP": self.op.secondary_ip,
1448
      }
1449
    nodes_0 = self.cfg.GetNodeList()
1450
    nodes_1 = nodes_0 + [self.op.node_name, ]
1451
    return env, nodes_0, nodes_1
1452

    
1453
  def CheckPrereq(self):
1454
    """Check prerequisites.
1455

1456
    This checks:
1457
     - the new node is not already in the config
1458
     - it is resolvable
1459
     - its parameters (single/dual homed) matches the cluster
1460

1461
    Any errors are signalled by raising errors.OpPrereqError.
1462

1463
    """
1464
    node_name = self.op.node_name
1465
    cfg = self.cfg
1466

    
1467
    dns_data = utils.HostInfo(node_name)
1468

    
1469
    node = dns_data.name
1470
    primary_ip = self.op.primary_ip = dns_data.ip
1471
    secondary_ip = getattr(self.op, "secondary_ip", None)
1472
    if secondary_ip is None:
1473
      secondary_ip = primary_ip
1474
    if not utils.IsValidIP(secondary_ip):
1475
      raise errors.OpPrereqError("Invalid secondary IP given")
1476
    self.op.secondary_ip = secondary_ip
1477

    
1478
    node_list = cfg.GetNodeList()
1479
    if not self.op.readd and node in node_list:
1480
      raise errors.OpPrereqError("Node %s is already in the configuration" %
1481
                                 node)
1482
    elif self.op.readd and node not in node_list:
1483
      raise errors.OpPrereqError("Node %s is not in the configuration" % node)
1484

    
1485
    for existing_node_name in node_list:
1486
      existing_node = cfg.GetNodeInfo(existing_node_name)
1487

    
1488
      if self.op.readd and node == existing_node_name:
1489
        if (existing_node.primary_ip != primary_ip or
1490
            existing_node.secondary_ip != secondary_ip):
1491
          raise errors.OpPrereqError("Readded node doesn't have the same IP"
1492
                                     " address configuration as before")
1493
        continue
1494

    
1495
      if (existing_node.primary_ip == primary_ip or
1496
          existing_node.secondary_ip == primary_ip or
1497
          existing_node.primary_ip == secondary_ip or
1498
          existing_node.secondary_ip == secondary_ip):
1499
        raise errors.OpPrereqError("New node ip address(es) conflict with"
1500
                                   " existing node %s" % existing_node.name)
1501

    
1502
    # check that the type of the node (single versus dual homed) is the
1503
    # same as for the master
1504
    myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1505
    master_singlehomed = myself.secondary_ip == myself.primary_ip
1506
    newbie_singlehomed = secondary_ip == primary_ip
1507
    if master_singlehomed != newbie_singlehomed:
1508
      if master_singlehomed:
1509
        raise errors.OpPrereqError("The master has no private ip but the"
1510
                                   " new node has one")
1511
      else:
1512
        raise errors.OpPrereqError("The master has a private ip but the"
1513
                                   " new node doesn't have one")
1514

    
1515
    # checks reachablity
1516
    if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1517
      raise errors.OpPrereqError("Node not reachable by ping")
1518

    
1519
    if not newbie_singlehomed:
1520
      # check reachability from my secondary ip to newbie's secondary ip
1521
      if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1522
                           source=myself.secondary_ip):
1523
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1524
                                   " based ping to noded port")
1525

    
1526
    self.new_node = objects.Node(name=node,
1527
                                 primary_ip=primary_ip,
1528
                                 secondary_ip=secondary_ip)
1529

    
1530
  def Exec(self, feedback_fn):
1531
    """Adds the new node to the cluster.
1532

1533
    """
1534
    new_node = self.new_node
1535
    node = new_node.name
1536

    
1537
    # check connectivity
1538
    result = rpc.call_version([node])[node]
1539
    if result:
1540
      if constants.PROTOCOL_VERSION == result:
1541
        logger.Info("communication to node %s fine, sw version %s match" %
1542
                    (node, result))
1543
      else:
1544
        raise errors.OpExecError("Version mismatch master version %s,"
1545
                                 " node version %s" %
1546
                                 (constants.PROTOCOL_VERSION, result))
1547
    else:
1548
      raise errors.OpExecError("Cannot get version from the new node")
1549

    
1550
    # setup ssh on node
1551
    logger.Info("copy ssh key to node %s" % node)
1552
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1553
    keyarray = []
1554
    keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1555
                constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1556
                priv_key, pub_key]
1557

    
1558
    for i in keyfiles:
1559
      f = open(i, 'r')
1560
      try:
1561
        keyarray.append(f.read())
1562
      finally:
1563
        f.close()
1564

    
1565
    result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1566
                               keyarray[3], keyarray[4], keyarray[5])
1567

    
1568
    if not result:
1569
      raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1570

    
1571
    # Add node to our /etc/hosts, and add key to known_hosts
1572
    utils.AddHostToEtcHosts(new_node.name)
1573

    
1574
    if new_node.secondary_ip != new_node.primary_ip:
1575
      if not rpc.call_node_tcp_ping(new_node.name,
1576
                                    constants.LOCALHOST_IP_ADDRESS,
1577
                                    new_node.secondary_ip,
1578
                                    constants.DEFAULT_NODED_PORT,
1579
                                    10, False):
1580
        raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1581
                                 " you gave (%s). Please fix and re-run this"
1582
                                 " command." % new_node.secondary_ip)
1583

    
1584
    node_verify_list = [self.sstore.GetMasterNode()]
1585
    node_verify_param = {
1586
      'nodelist': [node],
1587
      # TODO: do a node-net-test as well?
1588
    }
1589

    
1590
    result = rpc.call_node_verify(node_verify_list, node_verify_param)
1591
    for verifier in node_verify_list:
1592
      if not result[verifier]:
1593
        raise errors.OpExecError("Cannot communicate with %s's node daemon"
1594
                                 " for remote verification" % verifier)
1595
      if result[verifier]['nodelist']:
1596
        for failed in result[verifier]['nodelist']:
1597
          feedback_fn("ssh/hostname verification failed %s -> %s" %
1598
                      (verifier, result[verifier]['nodelist'][failed]))
1599
        raise errors.OpExecError("ssh/hostname verification failed.")
1600

    
1601
    # Distribute updated /etc/hosts and known_hosts to all nodes,
1602
    # including the node just added
1603
    myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1604
    dist_nodes = self.cfg.GetNodeList()
1605
    if not self.op.readd:
1606
      dist_nodes.append(node)
1607
    if myself.name in dist_nodes:
1608
      dist_nodes.remove(myself.name)
1609

    
1610
    logger.Debug("Copying hosts and known_hosts to all nodes")
1611
    for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1612
      result = rpc.call_upload_file(dist_nodes, fname)
1613
      for to_node in dist_nodes:
1614
        if not result[to_node]:
1615
          logger.Error("copy of file %s to node %s failed" %
1616
                       (fname, to_node))
1617

    
1618
    to_copy = self.sstore.GetFileList()
1619
    if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1620
      to_copy.append(constants.VNC_PASSWORD_FILE)
1621
    for fname in to_copy:
1622
      result = rpc.call_upload_file([node], fname)
1623
      if not result[node]:
1624
        logger.Error("could not copy file %s to node %s" % (fname, node))
1625

    
1626
    if not self.op.readd:
1627
      logger.Info("adding node %s to cluster.conf" % node)
1628
      self.cfg.AddNode(new_node)
1629
      # Add the new node to the Ganeti Lock Manager
1630
      self.context.glm.add(locking.LEVEL_NODE, node)
1631

    
1632

    
1633
class LUMasterFailover(LogicalUnit):
1634
  """Failover the master node to the current node.
1635

1636
  This is a special LU in that it must run on a non-master node.
1637

1638
  """
1639
  HPATH = "master-failover"
1640
  HTYPE = constants.HTYPE_CLUSTER
1641
  REQ_MASTER = False
1642
  REQ_WSSTORE = True
1643
  _OP_REQP = []
1644

    
1645
  def BuildHooksEnv(self):
1646
    """Build hooks env.
1647

1648
    This will run on the new master only in the pre phase, and on all
1649
    the nodes in the post phase.
1650

1651
    """
1652
    env = {
1653
      "OP_TARGET": self.new_master,
1654
      "NEW_MASTER": self.new_master,
1655
      "OLD_MASTER": self.old_master,
1656
      }
1657
    return env, [self.new_master], self.cfg.GetNodeList()
1658

    
1659
  def CheckPrereq(self):
1660
    """Check prerequisites.
1661

1662
    This checks that we are not already the master.
1663

1664
    """
1665
    self.new_master = utils.HostInfo().name
1666
    self.old_master = self.sstore.GetMasterNode()
1667

    
1668
    if self.old_master == self.new_master:
1669
      raise errors.OpPrereqError("This commands must be run on the node"
1670
                                 " where you want the new master to be."
1671
                                 " %s is already the master" %
1672
                                 self.old_master)
1673

    
1674
  def Exec(self, feedback_fn):
1675
    """Failover the master node.
1676

1677
    This command, when run on a non-master node, will cause the current
1678
    master to cease being master, and the non-master to become new
1679
    master.
1680

1681
    """
1682
    #TODO: do not rely on gethostname returning the FQDN
1683
    logger.Info("setting master to %s, old master: %s" %
1684
                (self.new_master, self.old_master))
1685

    
1686
    if not rpc.call_node_stop_master(self.old_master):
1687
      logger.Error("could disable the master role on the old master"
1688
                   " %s, please disable manually" % self.old_master)
1689

    
1690
    ss = self.sstore
1691
    ss.SetKey(ss.SS_MASTER_NODE, self.new_master)
1692
    if not rpc.call_upload_file(self.cfg.GetNodeList(),
1693
                                ss.KeyToFilename(ss.SS_MASTER_NODE)):
1694
      logger.Error("could not distribute the new simple store master file"
1695
                   " to the other nodes, please check.")
1696

    
1697
    if not rpc.call_node_start_master(self.new_master):
1698
      logger.Error("could not start the master role on the new master"
1699
                   " %s, please check" % self.new_master)
1700
      feedback_fn("Error in activating the master IP on the new master,"
1701
                  " please fix manually.")
1702

    
1703

    
1704

    
1705
class LUQueryClusterInfo(NoHooksLU):
1706
  """Query cluster configuration.
1707

1708
  """
1709
  _OP_REQP = []
1710
  REQ_MASTER = False
1711

    
1712
  def CheckPrereq(self):
1713
    """No prerequsites needed for this LU.
1714

1715
    """
1716
    pass
1717

    
1718
  def Exec(self, feedback_fn):
1719
    """Return cluster config.
1720

1721
    """
1722
    result = {
1723
      "name": self.sstore.GetClusterName(),
1724
      "software_version": constants.RELEASE_VERSION,
1725
      "protocol_version": constants.PROTOCOL_VERSION,
1726
      "config_version": constants.CONFIG_VERSION,
1727
      "os_api_version": constants.OS_API_VERSION,
1728
      "export_version": constants.EXPORT_VERSION,
1729
      "master": self.sstore.GetMasterNode(),
1730
      "architecture": (platform.architecture()[0], platform.machine()),
1731
      "hypervisor_type": self.sstore.GetHypervisorType(),
1732
      }
1733

    
1734
    return result
1735

    
1736

    
1737
class LUDumpClusterConfig(NoHooksLU):
1738
  """Return a text-representation of the cluster-config.
1739

1740
  """
1741
  _OP_REQP = []
1742

    
1743
  def CheckPrereq(self):
1744
    """No prerequisites.
1745

1746
    """
1747
    pass
1748

    
1749
  def Exec(self, feedback_fn):
1750
    """Dump a representation of the cluster config to the standard output.
1751

1752
    """
1753
    return self.cfg.DumpConfig()
1754

    
1755

    
1756
class LUActivateInstanceDisks(NoHooksLU):
1757
  """Bring up an instance's disks.
1758

1759
  """
1760
  _OP_REQP = ["instance_name"]
1761

    
1762
  def CheckPrereq(self):
1763
    """Check prerequisites.
1764

1765
    This checks that the instance is in the cluster.
1766

1767
    """
1768
    instance = self.cfg.GetInstanceInfo(
1769
      self.cfg.ExpandInstanceName(self.op.instance_name))
1770
    if instance is None:
1771
      raise errors.OpPrereqError("Instance '%s' not known" %
1772
                                 self.op.instance_name)
1773
    self.instance = instance
1774

    
1775

    
1776
  def Exec(self, feedback_fn):
1777
    """Activate the disks.
1778

1779
    """
1780
    disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
1781
    if not disks_ok:
1782
      raise errors.OpExecError("Cannot activate block devices")
1783

    
1784
    return disks_info
1785

    
1786

    
1787
def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
1788
  """Prepare the block devices for an instance.
1789

1790
  This sets up the block devices on all nodes.
1791

1792
  Args:
1793
    instance: a ganeti.objects.Instance object
1794
    ignore_secondaries: if true, errors on secondary nodes won't result
1795
                        in an error return from the function
1796

1797
  Returns:
1798
    false if the operation failed
1799
    list of (host, instance_visible_name, node_visible_name) if the operation
1800
         suceeded with the mapping from node devices to instance devices
1801
  """
1802
  device_info = []
1803
  disks_ok = True
1804
  iname = instance.name
1805
  # With the two passes mechanism we try to reduce the window of
1806
  # opportunity for the race condition of switching DRBD to primary
1807
  # before handshaking occured, but we do not eliminate it
1808

    
1809
  # The proper fix would be to wait (with some limits) until the
1810
  # connection has been made and drbd transitions from WFConnection
1811
  # into any other network-connected state (Connected, SyncTarget,
1812
  # SyncSource, etc.)
1813

    
1814
  # 1st pass, assemble on all nodes in secondary mode
1815
  for inst_disk in instance.disks:
1816
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1817
      cfg.SetDiskID(node_disk, node)
1818
      result = rpc.call_blockdev_assemble(node, node_disk, iname, False)
1819
      if not result:
1820
        logger.Error("could not prepare block device %s on node %s"
1821
                     " (is_primary=False, pass=1)" % (inst_disk.iv_name, node))
1822
        if not ignore_secondaries:
1823
          disks_ok = False
1824

    
1825
  # FIXME: race condition on drbd migration to primary
1826

    
1827
  # 2nd pass, do only the primary node
1828
  for inst_disk in instance.disks:
1829
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1830
      if node != instance.primary_node:
1831
        continue
1832
      cfg.SetDiskID(node_disk, node)
1833
      result = rpc.call_blockdev_assemble(node, node_disk, iname, True)
1834
      if not result:
1835
        logger.Error("could not prepare block device %s on node %s"
1836
                     " (is_primary=True, pass=2)" % (inst_disk.iv_name, node))
1837
        disks_ok = False
1838
    device_info.append((instance.primary_node, inst_disk.iv_name, result))
1839

    
1840
  # leave the disks configured for the primary node
1841
  # this is a workaround that would be fixed better by
1842
  # improving the logical/physical id handling
1843
  for disk in instance.disks:
1844
    cfg.SetDiskID(disk, instance.primary_node)
1845

    
1846
  return disks_ok, device_info
1847

    
1848

    
1849
def _StartInstanceDisks(cfg, instance, force):
1850
  """Start the disks of an instance.
1851

1852
  """
1853
  disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
1854
                                           ignore_secondaries=force)
1855
  if not disks_ok:
1856
    _ShutdownInstanceDisks(instance, cfg)
1857
    if force is not None and not force:
1858
      logger.Error("If the message above refers to a secondary node,"
1859
                   " you can retry the operation using '--force'.")
1860
    raise errors.OpExecError("Disk consistency error")
1861

    
1862

    
1863
class LUDeactivateInstanceDisks(NoHooksLU):
1864
  """Shutdown an instance's disks.
1865

1866
  """
1867
  _OP_REQP = ["instance_name"]
1868

    
1869
  def CheckPrereq(self):
1870
    """Check prerequisites.
1871

1872
    This checks that the instance is in the cluster.
1873

1874
    """
1875
    instance = self.cfg.GetInstanceInfo(
1876
      self.cfg.ExpandInstanceName(self.op.instance_name))
1877
    if instance is None:
1878
      raise errors.OpPrereqError("Instance '%s' not known" %
1879
                                 self.op.instance_name)
1880
    self.instance = instance
1881

    
1882
  def Exec(self, feedback_fn):
1883
    """Deactivate the disks
1884

1885
    """
1886
    instance = self.instance
1887
    ins_l = rpc.call_instance_list([instance.primary_node])
1888
    ins_l = ins_l[instance.primary_node]
1889
    if not type(ins_l) is list:
1890
      raise errors.OpExecError("Can't contact node '%s'" %
1891
                               instance.primary_node)
1892

    
1893
    if self.instance.name in ins_l:
1894
      raise errors.OpExecError("Instance is running, can't shutdown"
1895
                               " block devices.")
1896

    
1897
    _ShutdownInstanceDisks(instance, self.cfg)
1898

    
1899

    
1900
def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
1901
  """Shutdown block devices of an instance.
1902

1903
  This does the shutdown on all nodes of the instance.
1904

1905
  If the ignore_primary is false, errors on the primary node are
1906
  ignored.
1907

1908
  """
1909
  result = True
1910
  for disk in instance.disks:
1911
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
1912
      cfg.SetDiskID(top_disk, node)
1913
      if not rpc.call_blockdev_shutdown(node, top_disk):
1914
        logger.Error("could not shutdown block device %s on node %s" %
1915
                     (disk.iv_name, node))
1916
        if not ignore_primary or node != instance.primary_node:
1917
          result = False
1918
  return result
1919

    
1920

    
1921
def _CheckNodeFreeMemory(cfg, node, reason, requested):
1922
  """Checks if a node has enough free memory.
1923

1924
  This function check if a given node has the needed amount of free
1925
  memory. In case the node has less memory or we cannot get the
1926
  information from the node, this function raise an OpPrereqError
1927
  exception.
1928

1929
  Args:
1930
    - cfg: a ConfigWriter instance
1931
    - node: the node name
1932
    - reason: string to use in the error message
1933
    - requested: the amount of memory in MiB
1934

1935
  """
1936
  nodeinfo = rpc.call_node_info([node], cfg.GetVGName())
1937
  if not nodeinfo or not isinstance(nodeinfo, dict):
1938
    raise errors.OpPrereqError("Could not contact node %s for resource"
1939
                             " information" % (node,))
1940

    
1941
  free_mem = nodeinfo[node].get('memory_free')
1942
  if not isinstance(free_mem, int):
1943
    raise errors.OpPrereqError("Can't compute free memory on node %s, result"
1944
                             " was '%s'" % (node, free_mem))
1945
  if requested > free_mem:
1946
    raise errors.OpPrereqError("Not enough memory on node %s for %s:"
1947
                             " needed %s MiB, available %s MiB" %
1948
                             (node, reason, requested, free_mem))
1949

    
1950

    
1951
class LUStartupInstance(LogicalUnit):
1952
  """Starts an instance.
1953

1954
  """
1955
  HPATH = "instance-start"
1956
  HTYPE = constants.HTYPE_INSTANCE
1957
  _OP_REQP = ["instance_name", "force"]
1958

    
1959
  def BuildHooksEnv(self):
1960
    """Build hooks env.
1961

1962
    This runs on master, primary and secondary nodes of the instance.
1963

1964
    """
1965
    env = {
1966
      "FORCE": self.op.force,
1967
      }
1968
    env.update(_BuildInstanceHookEnvByObject(self.instance))
1969
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
1970
          list(self.instance.secondary_nodes))
1971
    return env, nl, nl
1972

    
1973
  def CheckPrereq(self):
1974
    """Check prerequisites.
1975

1976
    This checks that the instance is in the cluster.
1977

1978
    """
1979
    instance = self.cfg.GetInstanceInfo(
1980
      self.cfg.ExpandInstanceName(self.op.instance_name))
1981
    if instance is None:
1982
      raise errors.OpPrereqError("Instance '%s' not known" %
1983
                                 self.op.instance_name)
1984

    
1985
    # check bridges existance
1986
    _CheckInstanceBridgesExist(instance)
1987

    
1988
    _CheckNodeFreeMemory(self.cfg, instance.primary_node,
1989
                         "starting instance %s" % instance.name,
1990
                         instance.memory)
1991

    
1992
    self.instance = instance
1993
    self.op.instance_name = instance.name
1994

    
1995
  def Exec(self, feedback_fn):
1996
    """Start the instance.
1997

1998
    """
1999
    instance = self.instance
2000
    force = self.op.force
2001
    extra_args = getattr(self.op, "extra_args", "")
2002

    
2003
    self.cfg.MarkInstanceUp(instance.name)
2004

    
2005
    node_current = instance.primary_node
2006

    
2007
    _StartInstanceDisks(self.cfg, instance, force)
2008

    
2009
    if not rpc.call_instance_start(node_current, instance, extra_args):
2010
      _ShutdownInstanceDisks(instance, self.cfg)
2011
      raise errors.OpExecError("Could not start instance")
2012

    
2013

    
2014
class LURebootInstance(LogicalUnit):
2015
  """Reboot an instance.
2016

2017
  """
2018
  HPATH = "instance-reboot"
2019
  HTYPE = constants.HTYPE_INSTANCE
2020
  _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2021

    
2022
  def BuildHooksEnv(self):
2023
    """Build hooks env.
2024

2025
    This runs on master, primary and secondary nodes of the instance.
2026

2027
    """
2028
    env = {
2029
      "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2030
      }
2031
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2032
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2033
          list(self.instance.secondary_nodes))
2034
    return env, nl, nl
2035

    
2036
  def CheckPrereq(self):
2037
    """Check prerequisites.
2038

2039
    This checks that the instance is in the cluster.
2040

2041
    """
2042
    instance = self.cfg.GetInstanceInfo(
2043
      self.cfg.ExpandInstanceName(self.op.instance_name))
2044
    if instance is None:
2045
      raise errors.OpPrereqError("Instance '%s' not known" %
2046
                                 self.op.instance_name)
2047

    
2048
    # check bridges existance
2049
    _CheckInstanceBridgesExist(instance)
2050

    
2051
    self.instance = instance
2052
    self.op.instance_name = instance.name
2053

    
2054
  def Exec(self, feedback_fn):
2055
    """Reboot the instance.
2056

2057
    """
2058
    instance = self.instance
2059
    ignore_secondaries = self.op.ignore_secondaries
2060
    reboot_type = self.op.reboot_type
2061
    extra_args = getattr(self.op, "extra_args", "")
2062

    
2063
    node_current = instance.primary_node
2064

    
2065
    if reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2066
                           constants.INSTANCE_REBOOT_HARD,
2067
                           constants.INSTANCE_REBOOT_FULL]:
2068
      raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2069
                                  (constants.INSTANCE_REBOOT_SOFT,
2070
                                   constants.INSTANCE_REBOOT_HARD,
2071
                                   constants.INSTANCE_REBOOT_FULL))
2072

    
2073
    if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2074
                       constants.INSTANCE_REBOOT_HARD]:
2075
      if not rpc.call_instance_reboot(node_current, instance,
2076
                                      reboot_type, extra_args):
2077
        raise errors.OpExecError("Could not reboot instance")
2078
    else:
2079
      if not rpc.call_instance_shutdown(node_current, instance):
2080
        raise errors.OpExecError("could not shutdown instance for full reboot")
2081
      _ShutdownInstanceDisks(instance, self.cfg)
2082
      _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
2083
      if not rpc.call_instance_start(node_current, instance, extra_args):
2084
        _ShutdownInstanceDisks(instance, self.cfg)
2085
        raise errors.OpExecError("Could not start instance for full reboot")
2086

    
2087
    self.cfg.MarkInstanceUp(instance.name)
2088

    
2089

    
2090
class LUShutdownInstance(LogicalUnit):
2091
  """Shutdown an instance.
2092

2093
  """
2094
  HPATH = "instance-stop"
2095
  HTYPE = constants.HTYPE_INSTANCE
2096
  _OP_REQP = ["instance_name"]
2097

    
2098
  def BuildHooksEnv(self):
2099
    """Build hooks env.
2100

2101
    This runs on master, primary and secondary nodes of the instance.
2102

2103
    """
2104
    env = _BuildInstanceHookEnvByObject(self.instance)
2105
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2106
          list(self.instance.secondary_nodes))
2107
    return env, nl, nl
2108

    
2109
  def CheckPrereq(self):
2110
    """Check prerequisites.
2111

2112
    This checks that the instance is in the cluster.
2113

2114
    """
2115
    instance = self.cfg.GetInstanceInfo(
2116
      self.cfg.ExpandInstanceName(self.op.instance_name))
2117
    if instance is None:
2118
      raise errors.OpPrereqError("Instance '%s' not known" %
2119
                                 self.op.instance_name)
2120
    self.instance = instance
2121

    
2122
  def Exec(self, feedback_fn):
2123
    """Shutdown the instance.
2124

2125
    """
2126
    instance = self.instance
2127
    node_current = instance.primary_node
2128
    self.cfg.MarkInstanceDown(instance.name)
2129
    if not rpc.call_instance_shutdown(node_current, instance):
2130
      logger.Error("could not shutdown instance")
2131

    
2132
    _ShutdownInstanceDisks(instance, self.cfg)
2133

    
2134

    
2135
class LUReinstallInstance(LogicalUnit):
2136
  """Reinstall an instance.
2137

2138
  """
2139
  HPATH = "instance-reinstall"
2140
  HTYPE = constants.HTYPE_INSTANCE
2141
  _OP_REQP = ["instance_name"]
2142

    
2143
  def BuildHooksEnv(self):
2144
    """Build hooks env.
2145

2146
    This runs on master, primary and secondary nodes of the instance.
2147

2148
    """
2149
    env = _BuildInstanceHookEnvByObject(self.instance)
2150
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2151
          list(self.instance.secondary_nodes))
2152
    return env, nl, nl
2153

    
2154
  def CheckPrereq(self):
2155
    """Check prerequisites.
2156

2157
    This checks that the instance is in the cluster and is not running.
2158

2159
    """
2160
    instance = self.cfg.GetInstanceInfo(
2161
      self.cfg.ExpandInstanceName(self.op.instance_name))
2162
    if instance is None:
2163
      raise errors.OpPrereqError("Instance '%s' not known" %
2164
                                 self.op.instance_name)
2165
    if instance.disk_template == constants.DT_DISKLESS:
2166
      raise errors.OpPrereqError("Instance '%s' has no disks" %
2167
                                 self.op.instance_name)
2168
    if instance.status != "down":
2169
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2170
                                 self.op.instance_name)
2171
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2172
    if remote_info:
2173
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2174
                                 (self.op.instance_name,
2175
                                  instance.primary_node))
2176

    
2177
    self.op.os_type = getattr(self.op, "os_type", None)
2178
    if self.op.os_type is not None:
2179
      # OS verification
2180
      pnode = self.cfg.GetNodeInfo(
2181
        self.cfg.ExpandNodeName(instance.primary_node))
2182
      if pnode is None:
2183
        raise errors.OpPrereqError("Primary node '%s' is unknown" %
2184
                                   self.op.pnode)
2185
      os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2186
      if not os_obj:
2187
        raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2188
                                   " primary node"  % self.op.os_type)
2189

    
2190
    self.instance = instance
2191

    
2192
  def Exec(self, feedback_fn):
2193
    """Reinstall the instance.
2194

2195
    """
2196
    inst = self.instance
2197

    
2198
    if self.op.os_type is not None:
2199
      feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2200
      inst.os = self.op.os_type
2201
      self.cfg.AddInstance(inst)
2202

    
2203
    _StartInstanceDisks(self.cfg, inst, None)
2204
    try:
2205
      feedback_fn("Running the instance OS create scripts...")
2206
      if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2207
        raise errors.OpExecError("Could not install OS for instance %s"
2208
                                 " on node %s" %
2209
                                 (inst.name, inst.primary_node))
2210
    finally:
2211
      _ShutdownInstanceDisks(inst, self.cfg)
2212

    
2213

    
2214
class LURenameInstance(LogicalUnit):
2215
  """Rename an instance.
2216

2217
  """
2218
  HPATH = "instance-rename"
2219
  HTYPE = constants.HTYPE_INSTANCE
2220
  _OP_REQP = ["instance_name", "new_name"]
2221

    
2222
  def BuildHooksEnv(self):
2223
    """Build hooks env.
2224

2225
    This runs on master, primary and secondary nodes of the instance.
2226

2227
    """
2228
    env = _BuildInstanceHookEnvByObject(self.instance)
2229
    env["INSTANCE_NEW_NAME"] = self.op.new_name
2230
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2231
          list(self.instance.secondary_nodes))
2232
    return env, nl, nl
2233

    
2234
  def CheckPrereq(self):
2235
    """Check prerequisites.
2236

2237
    This checks that the instance is in the cluster and is not running.
2238

2239
    """
2240
    instance = self.cfg.GetInstanceInfo(
2241
      self.cfg.ExpandInstanceName(self.op.instance_name))
2242
    if instance is None:
2243
      raise errors.OpPrereqError("Instance '%s' not known" %
2244
                                 self.op.instance_name)
2245
    if instance.status != "down":
2246
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2247
                                 self.op.instance_name)
2248
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2249
    if remote_info:
2250
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2251
                                 (self.op.instance_name,
2252
                                  instance.primary_node))
2253
    self.instance = instance
2254

    
2255
    # new name verification
2256
    name_info = utils.HostInfo(self.op.new_name)
2257

    
2258
    self.op.new_name = new_name = name_info.name
2259
    instance_list = self.cfg.GetInstanceList()
2260
    if new_name in instance_list:
2261
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2262
                                 new_name)
2263

    
2264
    if not getattr(self.op, "ignore_ip", False):
2265
      if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
2266
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
2267
                                   (name_info.ip, new_name))
2268

    
2269

    
2270
  def Exec(self, feedback_fn):
2271
    """Reinstall the instance.
2272

2273
    """
2274
    inst = self.instance
2275
    old_name = inst.name
2276

    
2277
    if inst.disk_template == constants.DT_FILE:
2278
      old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2279

    
2280
    self.cfg.RenameInstance(inst.name, self.op.new_name)
2281

    
2282
    # re-read the instance from the configuration after rename
2283
    inst = self.cfg.GetInstanceInfo(self.op.new_name)
2284

    
2285
    if inst.disk_template == constants.DT_FILE:
2286
      new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2287
      result = rpc.call_file_storage_dir_rename(inst.primary_node,
2288
                                                old_file_storage_dir,
2289
                                                new_file_storage_dir)
2290

    
2291
      if not result:
2292
        raise errors.OpExecError("Could not connect to node '%s' to rename"
2293
                                 " directory '%s' to '%s' (but the instance"
2294
                                 " has been renamed in Ganeti)" % (
2295
                                 inst.primary_node, old_file_storage_dir,
2296
                                 new_file_storage_dir))
2297

    
2298
      if not result[0]:
2299
        raise errors.OpExecError("Could not rename directory '%s' to '%s'"
2300
                                 " (but the instance has been renamed in"
2301
                                 " Ganeti)" % (old_file_storage_dir,
2302
                                               new_file_storage_dir))
2303

    
2304
    _StartInstanceDisks(self.cfg, inst, None)
2305
    try:
2306
      if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2307
                                          "sda", "sdb"):
2308
        msg = ("Could run OS rename script for instance %s on node %s (but the"
2309
               " instance has been renamed in Ganeti)" %
2310
               (inst.name, inst.primary_node))
2311
        logger.Error(msg)
2312
    finally:
2313
      _ShutdownInstanceDisks(inst, self.cfg)
2314

    
2315

    
2316
class LURemoveInstance(LogicalUnit):
2317
  """Remove an instance.
2318

2319
  """
2320
  HPATH = "instance-remove"
2321
  HTYPE = constants.HTYPE_INSTANCE
2322
  _OP_REQP = ["instance_name", "ignore_failures"]
2323

    
2324
  def BuildHooksEnv(self):
2325
    """Build hooks env.
2326

2327
    This runs on master, primary and secondary nodes of the instance.
2328

2329
    """
2330
    env = _BuildInstanceHookEnvByObject(self.instance)
2331
    nl = [self.sstore.GetMasterNode()]
2332
    return env, nl, nl
2333

    
2334
  def CheckPrereq(self):
2335
    """Check prerequisites.
2336

2337
    This checks that the instance is in the cluster.
2338

2339
    """
2340
    instance = self.cfg.GetInstanceInfo(
2341
      self.cfg.ExpandInstanceName(self.op.instance_name))
2342
    if instance is None:
2343
      raise errors.OpPrereqError("Instance '%s' not known" %
2344
                                 self.op.instance_name)
2345
    self.instance = instance
2346

    
2347
  def Exec(self, feedback_fn):
2348
    """Remove the instance.
2349

2350
    """
2351
    instance = self.instance
2352
    logger.Info("shutting down instance %s on node %s" %
2353
                (instance.name, instance.primary_node))
2354

    
2355
    if not rpc.call_instance_shutdown(instance.primary_node, instance):
2356
      if self.op.ignore_failures:
2357
        feedback_fn("Warning: can't shutdown instance")
2358
      else:
2359
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2360
                                 (instance.name, instance.primary_node))
2361

    
2362
    logger.Info("removing block devices for instance %s" % instance.name)
2363

    
2364
    if not _RemoveDisks(instance, self.cfg):
2365
      if self.op.ignore_failures:
2366
        feedback_fn("Warning: can't remove instance's disks")
2367
      else:
2368
        raise errors.OpExecError("Can't remove instance's disks")
2369

    
2370
    logger.Info("removing instance %s out of cluster config" % instance.name)
2371

    
2372
    self.cfg.RemoveInstance(instance.name)
2373
    # Remove the new instance from the Ganeti Lock Manager
2374
    self.context.glm.remove(locking.LEVEL_INSTANCE, instance.name)
2375

    
2376

    
2377
class LUQueryInstances(NoHooksLU):
2378
  """Logical unit for querying instances.
2379

2380
  """
2381
  _OP_REQP = ["output_fields", "names"]
2382

    
2383
  def CheckPrereq(self):
2384
    """Check prerequisites.
2385

2386
    This checks that the fields required are valid output fields.
2387

2388
    """
2389
    self.dynamic_fields = frozenset(["oper_state", "oper_ram", "status"])
2390
    _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2391
                               "admin_state", "admin_ram",
2392
                               "disk_template", "ip", "mac", "bridge",
2393
                               "sda_size", "sdb_size", "vcpus", "tags"],
2394
                       dynamic=self.dynamic_fields,
2395
                       selected=self.op.output_fields)
2396

    
2397
    self.wanted = _GetWantedInstances(self, self.op.names)
2398

    
2399
  def Exec(self, feedback_fn):
2400
    """Computes the list of nodes and their attributes.
2401

2402
    """
2403
    instance_names = self.wanted
2404
    instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2405
                     in instance_names]
2406

    
2407
    # begin data gathering
2408

    
2409
    nodes = frozenset([inst.primary_node for inst in instance_list])
2410

    
2411
    bad_nodes = []
2412
    if self.dynamic_fields.intersection(self.op.output_fields):
2413
      live_data = {}
2414
      node_data = rpc.call_all_instances_info(nodes)
2415
      for name in nodes:
2416
        result = node_data[name]
2417
        if result:
2418
          live_data.update(result)
2419
        elif result == False:
2420
          bad_nodes.append(name)
2421
        # else no instance is alive
2422
    else:
2423
      live_data = dict([(name, {}) for name in instance_names])
2424

    
2425
    # end data gathering
2426

    
2427
    output = []
2428
    for instance in instance_list:
2429
      iout = []
2430
      for field in self.op.output_fields:
2431
        if field == "name":
2432
          val = instance.name
2433
        elif field == "os":
2434
          val = instance.os
2435
        elif field == "pnode":
2436
          val = instance.primary_node
2437
        elif field == "snodes":
2438
          val = list(instance.secondary_nodes)
2439
        elif field == "admin_state":
2440
          val = (instance.status != "down")
2441
        elif field == "oper_state":
2442
          if instance.primary_node in bad_nodes:
2443
            val = None
2444
          else:
2445
            val = bool(live_data.get(instance.name))
2446
        elif field == "status":
2447
          if instance.primary_node in bad_nodes:
2448
            val = "ERROR_nodedown"
2449
          else:
2450
            running = bool(live_data.get(instance.name))
2451
            if running:
2452
              if instance.status != "down":
2453
                val = "running"
2454
              else:
2455
                val = "ERROR_up"
2456
            else:
2457
              if instance.status != "down":
2458
                val = "ERROR_down"
2459
              else:
2460
                val = "ADMIN_down"
2461
        elif field == "admin_ram":
2462
          val = instance.memory
2463
        elif field == "oper_ram":
2464
          if instance.primary_node in bad_nodes:
2465
            val = None
2466
          elif instance.name in live_data:
2467
            val = live_data[instance.name].get("memory", "?")
2468
          else:
2469
            val = "-"
2470
        elif field == "disk_template":
2471
          val = instance.disk_template
2472
        elif field == "ip":
2473
          val = instance.nics[0].ip
2474
        elif field == "bridge":
2475
          val = instance.nics[0].bridge
2476
        elif field == "mac":
2477
          val = instance.nics[0].mac
2478
        elif field == "sda_size" or field == "sdb_size":
2479
          disk = instance.FindDisk(field[:3])
2480
          if disk is None:
2481
            val = None
2482
          else:
2483
            val = disk.size
2484
        elif field == "vcpus":
2485
          val = instance.vcpus
2486
        elif field == "tags":
2487
          val = list(instance.GetTags())
2488
        else:
2489
          raise errors.ParameterError(field)
2490
        iout.append(val)
2491
      output.append(iout)
2492

    
2493
    return output
2494

    
2495

    
2496
class LUFailoverInstance(LogicalUnit):
2497
  """Failover an instance.
2498

2499
  """
2500
  HPATH = "instance-failover"
2501
  HTYPE = constants.HTYPE_INSTANCE
2502
  _OP_REQP = ["instance_name", "ignore_consistency"]
2503

    
2504
  def BuildHooksEnv(self):
2505
    """Build hooks env.
2506

2507
    This runs on master, primary and secondary nodes of the instance.
2508

2509
    """
2510
    env = {
2511
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2512
      }
2513
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2514
    nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2515
    return env, nl, nl
2516

    
2517
  def CheckPrereq(self):
2518
    """Check prerequisites.
2519

2520
    This checks that the instance is in the cluster.
2521

2522
    """
2523
    instance = self.cfg.GetInstanceInfo(
2524
      self.cfg.ExpandInstanceName(self.op.instance_name))
2525
    if instance is None:
2526
      raise errors.OpPrereqError("Instance '%s' not known" %
2527
                                 self.op.instance_name)
2528

    
2529
    if instance.disk_template not in constants.DTS_NET_MIRROR:
2530
      raise errors.OpPrereqError("Instance's disk layout is not"
2531
                                 " network mirrored, cannot failover.")
2532

    
2533
    secondary_nodes = instance.secondary_nodes
2534
    if not secondary_nodes:
2535
      raise errors.ProgrammerError("no secondary node but using "
2536
                                   "a mirrored disk template")
2537

    
2538
    target_node = secondary_nodes[0]
2539
    # check memory requirements on the secondary node
2540
    _CheckNodeFreeMemory(self.cfg, target_node, "failing over instance %s" %
2541
                         instance.name, instance.memory)
2542

    
2543
    # check bridge existance
2544
    brlist = [nic.bridge for nic in instance.nics]
2545
    if not rpc.call_bridges_exist(target_node, brlist):
2546
      raise errors.OpPrereqError("One or more target bridges %s does not"
2547
                                 " exist on destination node '%s'" %
2548
                                 (brlist, target_node))
2549

    
2550
    self.instance = instance
2551

    
2552
  def Exec(self, feedback_fn):
2553
    """Failover an instance.
2554

2555
    The failover is done by shutting it down on its present node and
2556
    starting it on the secondary.
2557

2558
    """
2559
    instance = self.instance
2560

    
2561
    source_node = instance.primary_node
2562
    target_node = instance.secondary_nodes[0]
2563

    
2564
    feedback_fn("* checking disk consistency between source and target")
2565
    for dev in instance.disks:
2566
      # for drbd, these are drbd over lvm
2567
      if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2568
        if instance.status == "up" and not self.op.ignore_consistency:
2569
          raise errors.OpExecError("Disk %s is degraded on target node,"
2570
                                   " aborting failover." % dev.iv_name)
2571

    
2572
    feedback_fn("* shutting down instance on source node")
2573
    logger.Info("Shutting down instance %s on node %s" %
2574
                (instance.name, source_node))
2575

    
2576
    if not rpc.call_instance_shutdown(source_node, instance):
2577
      if self.op.ignore_consistency:
2578
        logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2579
                     " anyway. Please make sure node %s is down"  %
2580
                     (instance.name, source_node, source_node))
2581
      else:
2582
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2583
                                 (instance.name, source_node))
2584

    
2585
    feedback_fn("* deactivating the instance's disks on source node")
2586
    if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2587
      raise errors.OpExecError("Can't shut down the instance's disks.")
2588

    
2589
    instance.primary_node = target_node
2590
    # distribute new instance config to the other nodes
2591
    self.cfg.Update(instance)
2592

    
2593
    # Only start the instance if it's marked as up
2594
    if instance.status == "up":
2595
      feedback_fn("* activating the instance's disks on target node")
2596
      logger.Info("Starting instance %s on node %s" %
2597
                  (instance.name, target_node))
2598

    
2599
      disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2600
                                               ignore_secondaries=True)
2601
      if not disks_ok:
2602
        _ShutdownInstanceDisks(instance, self.cfg)
2603
        raise errors.OpExecError("Can't activate the instance's disks")
2604

    
2605
      feedback_fn("* starting the instance on the target node")
2606
      if not rpc.call_instance_start(target_node, instance, None):
2607
        _ShutdownInstanceDisks(instance, self.cfg)
2608
        raise errors.OpExecError("Could not start instance %s on node %s." %
2609
                                 (instance.name, target_node))
2610

    
2611

    
2612
def _CreateBlockDevOnPrimary(cfg, node, instance, device, info):
2613
  """Create a tree of block devices on the primary node.
2614

2615
  This always creates all devices.
2616

2617
  """
2618
  if device.children:
2619
    for child in device.children:
2620
      if not _CreateBlockDevOnPrimary(cfg, node, instance, child, info):
2621
        return False
2622

    
2623
  cfg.SetDiskID(device, node)
2624
  new_id = rpc.call_blockdev_create(node, device, device.size,
2625
                                    instance.name, True, info)
2626
  if not new_id:
2627
    return False
2628
  if device.physical_id is None:
2629
    device.physical_id = new_id
2630
  return True
2631

    
2632

    
2633
def _CreateBlockDevOnSecondary(cfg, node, instance, device, force, info):
2634
  """Create a tree of block devices on a secondary node.
2635

2636
  If this device type has to be created on secondaries, create it and
2637
  all its children.
2638

2639
  If not, just recurse to children keeping the same 'force' value.
2640

2641
  """
2642
  if device.CreateOnSecondary():
2643
    force = True
2644
  if device.children:
2645
    for child in device.children:
2646
      if not _CreateBlockDevOnSecondary(cfg, node, instance,
2647
                                        child, force, info):
2648
        return False
2649

    
2650
  if not force:
2651
    return True
2652
  cfg.SetDiskID(device, node)
2653
  new_id = rpc.call_blockdev_create(node, device, device.size,
2654
                                    instance.name, False, info)
2655
  if not new_id:
2656
    return False
2657
  if device.physical_id is None:
2658
    device.physical_id = new_id
2659
  return True
2660

    
2661

    
2662
def _GenerateUniqueNames(cfg, exts):
2663
  """Generate a suitable LV name.
2664

2665
  This will generate a logical volume name for the given instance.
2666

2667
  """
2668
  results = []
2669
  for val in exts:
2670
    new_id = cfg.GenerateUniqueID()
2671
    results.append("%s%s" % (new_id, val))
2672
  return results
2673

    
2674

    
2675
def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name):
2676
  """Generate a drbd8 device complete with its children.
2677

2678
  """
2679
  port = cfg.AllocatePort()
2680
  vgname = cfg.GetVGName()
2681
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2682
                          logical_id=(vgname, names[0]))
2683
  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2684
                          logical_id=(vgname, names[1]))
2685
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
2686
                          logical_id = (primary, secondary, port),
2687
                          children = [dev_data, dev_meta],
2688
                          iv_name=iv_name)
2689
  return drbd_dev
2690

    
2691

    
2692
def _GenerateDiskTemplate(cfg, template_name,
2693
                          instance_name, primary_node,
2694
                          secondary_nodes, disk_sz, swap_sz,
2695
                          file_storage_dir, file_driver):
2696
  """Generate the entire disk layout for a given template type.
2697

2698
  """
2699
  #TODO: compute space requirements
2700

    
2701
  vgname = cfg.GetVGName()
2702
  if template_name == constants.DT_DISKLESS:
2703
    disks = []
2704
  elif template_name == constants.DT_PLAIN:
2705
    if len(secondary_nodes) != 0:
2706
      raise errors.ProgrammerError("Wrong template configuration")
2707

    
2708
    names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
2709
    sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2710
                           logical_id=(vgname, names[0]),
2711
                           iv_name = "sda")
2712
    sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2713
                           logical_id=(vgname, names[1]),
2714
                           iv_name = "sdb")
2715
    disks = [sda_dev, sdb_dev]
2716
  elif template_name == constants.DT_DRBD8:
2717
    if len(secondary_nodes) != 1:
2718
      raise errors.ProgrammerError("Wrong template configuration")
2719
    remote_node = secondary_nodes[0]
2720
    names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2721
                                       ".sdb_data", ".sdb_meta"])
2722
    drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2723
                                         disk_sz, names[0:2], "sda")
2724
    drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2725
                                         swap_sz, names[2:4], "sdb")
2726
    disks = [drbd_sda_dev, drbd_sdb_dev]
2727
  elif template_name == constants.DT_FILE:
2728
    if len(secondary_nodes) != 0:
2729
      raise errors.ProgrammerError("Wrong template configuration")
2730

    
2731
    file_sda_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk_sz,
2732
                                iv_name="sda", logical_id=(file_driver,
2733
                                "%s/sda" % file_storage_dir))
2734
    file_sdb_dev = objects.Disk(dev_type=constants.LD_FILE, size=swap_sz,
2735
                                iv_name="sdb", logical_id=(file_driver,
2736
                                "%s/sdb" % file_storage_dir))
2737
    disks = [file_sda_dev, file_sdb_dev]
2738
  else:
2739
    raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
2740
  return disks
2741

    
2742

    
2743
def _GetInstanceInfoText(instance):
2744
  """Compute that text that should be added to the disk's metadata.
2745

2746
  """
2747
  return "originstname+%s" % instance.name
2748

    
2749

    
2750
def _CreateDisks(cfg, instance):
2751
  """Create all disks for an instance.
2752

2753
  This abstracts away some work from AddInstance.
2754

2755
  Args:
2756
    instance: the instance object
2757

2758
  Returns:
2759
    True or False showing the success of the creation process
2760

2761
  """
2762
  info = _GetInstanceInfoText(instance)
2763

    
2764
  if instance.disk_template == constants.DT_FILE:
2765
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
2766
    result = rpc.call_file_storage_dir_create(instance.primary_node,
2767
                                              file_storage_dir)
2768

    
2769
    if not result:
2770
      logger.Error("Could not connect to node '%s'" % instance.primary_node)
2771
      return False
2772

    
2773
    if not result[0]:
2774
      logger.Error("failed to create directory '%s'" % file_storage_dir)
2775
      return False
2776

    
2777
  for device in instance.disks:
2778
    logger.Info("creating volume %s for instance %s" %
2779
                (device.iv_name, instance.name))
2780
    #HARDCODE
2781
    for secondary_node in instance.secondary_nodes:
2782
      if not _CreateBlockDevOnSecondary(cfg, secondary_node, instance,
2783
                                        device, False, info):
2784
        logger.Error("failed to create volume %s (%s) on secondary node %s!" %
2785
                     (device.iv_name, device, secondary_node))
2786
        return False
2787
    #HARDCODE
2788
    if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
2789
                                    instance, device, info):
2790
      logger.Error("failed to create volume %s on primary!" %
2791
                   device.iv_name)
2792
      return False
2793

    
2794
  return True
2795

    
2796

    
2797
def _RemoveDisks(instance, cfg):
2798
  """Remove all disks for an instance.
2799

2800
  This abstracts away some work from `AddInstance()` and
2801
  `RemoveInstance()`. Note that in case some of the devices couldn't
2802
  be removed, the removal will continue with the other ones (compare
2803
  with `_CreateDisks()`).
2804

2805
  Args:
2806
    instance: the instance object
2807

2808
  Returns:
2809
    True or False showing the success of the removal proces
2810

2811
  """
2812
  logger.Info("removing block devices for instance %s" % instance.name)
2813

    
2814
  result = True
2815
  for device in instance.disks:
2816
    for node, disk in device.ComputeNodeTree(instance.primary_node):
2817
      cfg.SetDiskID(disk, node)
2818
      if not rpc.call_blockdev_remove(node, disk):
2819
        logger.Error("could not remove block device %s on node %s,"
2820
                     " continuing anyway" %
2821
                     (device.iv_name, node))
2822
        result = False
2823

    
2824
  if instance.disk_template == constants.DT_FILE:
2825
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
2826
    if not rpc.call_file_storage_dir_remove(instance.primary_node,
2827
                                            file_storage_dir):
2828
      logger.Error("could not remove directory '%s'" % file_storage_dir)
2829
      result = False
2830

    
2831
  return result
2832

    
2833

    
2834
def _ComputeDiskSize(disk_template, disk_size, swap_size):
2835
  """Compute disk size requirements in the volume group
2836

2837
  This is currently hard-coded for the two-drive layout.
2838

2839
  """
2840
  # Required free disk space as a function of disk and swap space
2841
  req_size_dict = {
2842
    constants.DT_DISKLESS: None,
2843
    constants.DT_PLAIN: disk_size + swap_size,
2844
    # 256 MB are added for drbd metadata, 128MB for each drbd device
2845
    constants.DT_DRBD8: disk_size + swap_size + 256,
2846
    constants.DT_FILE: None,
2847
  }
2848

    
2849
  if disk_template not in req_size_dict:
2850
    raise errors.ProgrammerError("Disk template '%s' size requirement"
2851
                                 " is unknown" %  disk_template)
2852

    
2853
  return req_size_dict[disk_template]
2854

    
2855

    
2856
class LUCreateInstance(LogicalUnit):
2857
  """Create an instance.
2858

2859
  """
2860
  HPATH = "instance-add"
2861
  HTYPE = constants.HTYPE_INSTANCE
2862
  _OP_REQP = ["instance_name", "mem_size", "disk_size",
2863
              "disk_template", "swap_size", "mode", "start", "vcpus",
2864
              "wait_for_sync", "ip_check", "mac"]
2865

    
2866
  def _RunAllocator(self):
2867
    """Run the allocator based on input opcode.
2868

2869
    """
2870
    disks = [{"size": self.op.disk_size, "mode": "w"},
2871
             {"size": self.op.swap_size, "mode": "w"}]
2872
    nics = [{"mac": self.op.mac, "ip": getattr(self.op, "ip", None),
2873
             "bridge": self.op.bridge}]
2874
    ial = IAllocator(self.cfg, self.sstore,
2875
                     mode=constants.IALLOCATOR_MODE_ALLOC,
2876
                     name=self.op.instance_name,
2877
                     disk_template=self.op.disk_template,
2878
                     tags=[],
2879
                     os=self.op.os_type,
2880
                     vcpus=self.op.vcpus,
2881
                     mem_size=self.op.mem_size,
2882
                     disks=disks,
2883
                     nics=nics,
2884
                     )
2885

    
2886
    ial.Run(self.op.iallocator)
2887

    
2888
    if not ial.success:
2889
      raise errors.OpPrereqError("Can't compute nodes using"
2890
                                 " iallocator '%s': %s" % (self.op.iallocator,
2891
                                                           ial.info))
2892
    if len(ial.nodes) != ial.required_nodes:
2893
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
2894
                                 " of nodes (%s), required %s" %
2895
                                 (len(ial.nodes), ial.required_nodes))
2896
    self.op.pnode = ial.nodes[0]
2897
    logger.ToStdout("Selected nodes for the instance: %s" %
2898
                    (", ".join(ial.nodes),))
2899
    logger.Info("Selected nodes for instance %s via iallocator %s: %s" %
2900
                (self.op.instance_name, self.op.iallocator, ial.nodes))
2901
    if ial.required_nodes == 2:
2902
      self.op.snode = ial.nodes[1]
2903

    
2904
  def BuildHooksEnv(self):
2905
    """Build hooks env.
2906

2907
    This runs on master, primary and secondary nodes of the instance.
2908

2909
    """
2910
    env = {
2911
      "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
2912
      "INSTANCE_DISK_SIZE": self.op.disk_size,
2913
      "INSTANCE_SWAP_SIZE": self.op.swap_size,
2914
      "INSTANCE_ADD_MODE": self.op.mode,
2915
      }
2916
    if self.op.mode == constants.INSTANCE_IMPORT:
2917
      env["INSTANCE_SRC_NODE"] = self.op.src_node
2918
      env["INSTANCE_SRC_PATH"] = self.op.src_path
2919
      env["INSTANCE_SRC_IMAGE"] = self.src_image
2920

    
2921
    env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
2922
      primary_node=self.op.pnode,
2923
      secondary_nodes=self.secondaries,
2924
      status=self.instance_status,
2925
      os_type=self.op.os_type,
2926
      memory=self.op.mem_size,
2927
      vcpus=self.op.vcpus,
2928
      nics=[(self.inst_ip, self.op.bridge, self.op.mac)],
2929
    ))
2930

    
2931
    nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
2932
          self.secondaries)
2933
    return env, nl, nl
2934

    
2935

    
2936
  def CheckPrereq(self):
2937
    """Check prerequisites.
2938

2939
    """
2940
    # set optional parameters to none if they don't exist
2941
    for attr in ["kernel_path", "initrd_path", "hvm_boot_order", "pnode",
2942
                 "iallocator", "hvm_acpi", "hvm_pae", "hvm_cdrom_image_path",
2943
                 "vnc_bind_address"]:
2944
      if not hasattr(self.op, attr):
2945
        setattr(self.op, attr, None)
2946

    
2947
    if self.op.mode not in (constants.INSTANCE_CREATE,
2948
                            constants.INSTANCE_IMPORT):
2949
      raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
2950
                                 self.op.mode)
2951

    
2952
    if (not self.cfg.GetVGName() and
2953
        self.op.disk_template not in constants.DTS_NOT_LVM):
2954
      raise errors.OpPrereqError("Cluster does not support lvm-based"
2955
                                 " instances")
2956

    
2957
    if self.op.mode == constants.INSTANCE_IMPORT:
2958
      src_node = getattr(self.op, "src_node", None)
2959
      src_path = getattr(self.op, "src_path", None)
2960
      if src_node is None or src_path is None:
2961
        raise errors.OpPrereqError("Importing an instance requires source"
2962
                                   " node and path options")
2963
      src_node_full = self.cfg.ExpandNodeName(src_node)
2964
      if src_node_full is None:
2965
        raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
2966
      self.op.src_node = src_node = src_node_full
2967

    
2968
      if not os.path.isabs(src_path):
2969
        raise errors.OpPrereqError("The source path must be absolute")
2970

    
2971
      export_info = rpc.call_export_info(src_node, src_path)
2972

    
2973
      if not export_info:
2974
        raise errors.OpPrereqError("No export found in dir %s" % src_path)
2975

    
2976
      if not export_info.has_section(constants.INISECT_EXP):
2977
        raise errors.ProgrammerError("Corrupted export config")
2978

    
2979
      ei_version = export_info.get(constants.INISECT_EXP, 'version')
2980
      if (int(ei_version) != constants.EXPORT_VERSION):
2981
        raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
2982
                                   (ei_version, constants.EXPORT_VERSION))
2983

    
2984
      if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
2985
        raise errors.OpPrereqError("Can't import instance with more than"
2986
                                   " one data disk")
2987

    
2988
      # FIXME: are the old os-es, disk sizes, etc. useful?
2989
      self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
2990
      diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
2991
                                                         'disk0_dump'))
2992
      self.src_image = diskimage
2993
    else: # INSTANCE_CREATE
2994
      if getattr(self.op, "os_type", None) is None:
2995
        raise errors.OpPrereqError("No guest OS specified")
2996

    
2997
    #### instance parameters check
2998

    
2999
    # disk template and mirror node verification
3000
    if self.op.disk_template not in constants.DISK_TEMPLATES:
3001
      raise errors.OpPrereqError("Invalid disk template name")
3002

    
3003
    # instance name verification
3004
    hostname1 = utils.HostInfo(self.op.instance_name)
3005

    
3006
    self.op.instance_name = instance_name = hostname1.name
3007
    instance_list = self.cfg.GetInstanceList()
3008
    if instance_name in instance_list:
3009
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3010
                                 instance_name)
3011

    
3012
    # ip validity checks
3013
    ip = getattr(self.op, "ip", None)
3014
    if ip is None or ip.lower() == "none":
3015
      inst_ip = None
3016
    elif ip.lower() == "auto":
3017
      inst_ip = hostname1.ip
3018
    else:
3019
      if not utils.IsValidIP(ip):
3020
        raise errors.OpPrereqError("given IP address '%s' doesn't look"
3021
                                   " like a valid IP" % ip)
3022
      inst_ip = ip
3023
    self.inst_ip = self.op.ip = inst_ip
3024

    
3025
    if self.op.start and not self.op.ip_check:
3026
      raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3027
                                 " adding an instance in start mode")
3028

    
3029
    if self.op.ip_check:
3030
      if utils.TcpPing(hostname1.ip, constants.DEFAULT_NODED_PORT):
3031
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
3032
                                   (hostname1.ip, instance_name))
3033

    
3034
    # MAC address verification
3035
    if self.op.mac != "auto":
3036
      if not utils.IsValidMac(self.op.mac.lower()):
3037
        raise errors.OpPrereqError("invalid MAC address specified: %s" %
3038
                                   self.op.mac)
3039

    
3040
    # bridge verification
3041
    bridge = getattr(self.op, "bridge", None)
3042
    if bridge is None:
3043
      self.op.bridge = self.cfg.GetDefBridge()
3044
    else:
3045
      self.op.bridge = bridge
3046

    
3047
    # boot order verification
3048
    if self.op.hvm_boot_order is not None:
3049
      if len(self.op.hvm_boot_order.strip("acdn")) != 0:
3050
        raise errors.OpPrereqError("invalid boot order specified,"
3051
                                   " must be one or more of [acdn]")
3052
    # file storage checks
3053
    if (self.op.file_driver and
3054
        not self.op.file_driver in constants.FILE_DRIVER):
3055
      raise errors.OpPrereqError("Invalid file driver name '%s'" %
3056
                                 self.op.file_driver)
3057

    
3058
    if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
3059
      raise errors.OpPrereqError("File storage directory not a relative"
3060
                                 " path")
3061
    #### allocator run
3062

    
3063
    if [self.op.iallocator, self.op.pnode].count(None) != 1:
3064
      raise errors.OpPrereqError("One and only one of iallocator and primary"
3065
                                 " node must be given")
3066

    
3067
    if self.op.iallocator is not None:
3068
      self._RunAllocator()
3069

    
3070
    #### node related checks
3071

    
3072
    # check primary node
3073
    pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
3074
    if pnode is None:
3075
      raise errors.OpPrereqError("Primary node '%s' is unknown" %
3076
                                 self.op.pnode)
3077
    self.op.pnode = pnode.name
3078
    self.pnode = pnode
3079
    self.secondaries = []
3080

    
3081
    # mirror node verification
3082
    if self.op.disk_template in constants.DTS_NET_MIRROR:
3083
      if getattr(self.op, "snode", None) is None:
3084
        raise errors.OpPrereqError("The networked disk templates need"
3085
                                   " a mirror node")
3086

    
3087
      snode_name = self.cfg.ExpandNodeName(self.op.snode)
3088
      if snode_name is None:
3089
        raise errors.OpPrereqError("Unknown secondary node '%s'" %
3090
                                   self.op.snode)
3091
      elif snode_name == pnode.name:
3092
        raise errors.OpPrereqError("The secondary node cannot be"
3093
                                   " the primary node.")
3094
      self.secondaries.append(snode_name)
3095

    
3096
    req_size = _ComputeDiskSize(self.op.disk_template,
3097
                                self.op.disk_size, self.op.swap_size)
3098

    
3099
    # Check lv size requirements
3100
    if req_size is not None:
3101
      nodenames = [pnode.name] + self.secondaries
3102
      nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
3103
      for node in nodenames:
3104
        info = nodeinfo.get(node, None)
3105
        if not info:
3106
          raise errors.OpPrereqError("Cannot get current information"
3107
                                     " from node '%s'" % node)
3108
        vg_free = info.get('vg_free', None)
3109
        if not isinstance(vg_free, int):
3110
          raise errors.OpPrereqError("Can't compute free disk space on"
3111
                                     " node %s" % node)
3112
        if req_size > info['vg_free']:
3113
          raise errors.OpPrereqError("Not enough disk space on target node %s."
3114
                                     " %d MB available, %d MB required" %
3115
                                     (node, info['vg_free'], req_size))
3116

    
3117
    # os verification
3118
    os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
3119
    if not os_obj:
3120
      raise errors.OpPrereqError("OS '%s' not in supported os list for"
3121
                                 " primary node"  % self.op.os_type)
3122

    
3123
    if self.op.kernel_path == constants.VALUE_NONE:
3124
      raise errors.OpPrereqError("Can't set instance kernel to none")
3125

    
3126

    
3127
    # bridge check on primary node
3128
    if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3129
      raise errors.OpPrereqError("target bridge '%s' does not exist on"
3130
                                 " destination node '%s'" %
3131
                                 (self.op.bridge, pnode.name))
3132

    
3133
    # memory check on primary node
3134
    if self.op.start:
3135
      _CheckNodeFreeMemory(self.cfg, self.pnode.name,
3136
                           "creating instance %s" % self.op.instance_name,
3137
                           self.op.mem_size)
3138

    
3139
    # hvm_cdrom_image_path verification
3140
    if self.op.hvm_cdrom_image_path is not None:
3141
      if not os.path.isabs(self.op.hvm_cdrom_image_path):
3142
        raise errors.OpPrereqError("The path to the HVM CDROM image must"
3143
                                   " be an absolute path or None, not %s" %
3144
                                   self.op.hvm_cdrom_image_path)
3145
      if not os.path.isfile(self.op.hvm_cdrom_image_path):
3146
        raise errors.OpPrereqError("The HVM CDROM image must either be a"
3147
                                   " regular file or a symlink pointing to"
3148
                                   " an existing regular file, not %s" %
3149
                                   self.op.hvm_cdrom_image_path)
3150

    
3151
    # vnc_bind_address verification
3152
    if self.op.vnc_bind_address is not None:
3153
      if not utils.IsValidIP(self.op.vnc_bind_address):
3154
        raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
3155
                                   " like a valid IP address" %
3156
                                   self.op.vnc_bind_address)
3157

    
3158
    if self.op.start:
3159
      self.instance_status = 'up'
3160
    else:
3161
      self.instance_status = 'down'
3162

    
3163
  def Exec(self, feedback_fn):
3164
    """Create and add the instance to the cluster.
3165

3166
    """
3167
    instance = self.op.instance_name
3168
    pnode_name = self.pnode.name
3169

    
3170
    if self.op.mac == "auto":
3171
      mac_address = self.cfg.GenerateMAC()
3172
    else:
3173
      mac_address = self.op.mac
3174

    
3175
    nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3176
    if self.inst_ip is not None:
3177
      nic.ip = self.inst_ip
3178

    
3179
    ht_kind = self.sstore.GetHypervisorType()
3180
    if ht_kind in constants.HTS_REQ_PORT:
3181
      network_port = self.cfg.AllocatePort()
3182
    else:
3183
      network_port = None
3184

    
3185
    if self.op.vnc_bind_address is None:
3186
      self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
3187

    
3188
    # this is needed because os.path.join does not accept None arguments
3189
    if self.op.file_storage_dir is None:
3190
      string_file_storage_dir = ""
3191
    else:
3192
      string_file_storage_dir = self.op.file_storage_dir
3193

    
3194
    # build the full file storage dir path
3195
    file_storage_dir = os.path.normpath(os.path.join(
3196
                                        self.sstore.GetFileStorageDir(),
3197
                                        string_file_storage_dir, instance))
3198

    
3199

    
3200
    disks = _GenerateDiskTemplate(self.cfg,
3201
                                  self.op.disk_template,
3202
                                  instance, pnode_name,
3203
                                  self.secondaries, self.op.disk_size,
3204
                                  self.op.swap_size,
3205
                                  file_storage_dir,
3206
                                  self.op.file_driver)
3207

    
3208
    iobj = objects.Instance(name=instance, os=self.op.os_type,
3209
                            primary_node=pnode_name,
3210
                            memory=self.op.mem_size,
3211
                            vcpus=self.op.vcpus,
3212
                            nics=[nic], disks=disks,
3213
                            disk_template=self.op.disk_template,
3214
                            status=self.instance_status,
3215
                            network_port=network_port,
3216
                            kernel_path=self.op.kernel_path,
3217
                            initrd_path=self.op.initrd_path,
3218
                            hvm_boot_order=self.op.hvm_boot_order,
3219
                            hvm_acpi=self.op.hvm_acpi,
3220
                            hvm_pae=self.op.hvm_pae,
3221
                            hvm_cdrom_image_path=self.op.hvm_cdrom_image_path,
3222
                            vnc_bind_address=self.op.vnc_bind_address,
3223
                            )
3224

    
3225
    feedback_fn("* creating instance disks...")
3226
    if not _CreateDisks(self.cfg, iobj):
3227
      _RemoveDisks(iobj, self.cfg)
3228
      raise errors.OpExecError("Device creation failed, reverting...")
3229

    
3230
    feedback_fn("adding instance %s to cluster config" % instance)
3231

    
3232
    self.cfg.AddInstance(iobj)
3233
    # Add the new instance to the Ganeti Lock Manager
3234
    self.context.glm.add(locking.LEVEL_INSTANCE, instance)
3235

    
3236
    if self.op.wait_for_sync:
3237
      disk_abort = not _WaitForSync(self.cfg, iobj, self.proc)
3238
    elif iobj.disk_template in constants.DTS_NET_MIRROR:
3239
      # make sure the disks are not degraded (still sync-ing is ok)
3240
      time.sleep(15)
3241
      feedback_fn("* checking mirrors status")
3242
      disk_abort = not _WaitForSync(self.cfg, iobj, self.proc, oneshot=True)
3243
    else:
3244
      disk_abort = False
3245

    
3246
    if disk_abort:
3247
      _RemoveDisks(iobj, self.cfg)
3248
      self.cfg.RemoveInstance(iobj.name)
3249
      # Remove the new instance from the Ganeti Lock Manager
3250
      self.context.glm.remove(locking.LEVEL_INSTANCE, iobj.name)
3251
      raise errors.OpExecError("There are some degraded disks for"
3252
                               " this instance")
3253

    
3254
    feedback_fn("creating os for instance %s on node %s" %
3255
                (instance, pnode_name))
3256

    
3257
    if iobj.disk_template != constants.DT_DISKLESS:
3258
      if self.op.mode == constants.INSTANCE_CREATE:
3259
        feedback_fn("* running the instance OS create scripts...")
3260
        if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3261
          raise errors.OpExecError("could not add os for instance %s"
3262
                                   " on node %s" %
3263
                                   (instance, pnode_name))
3264

    
3265
      elif self.op.mode == constants.INSTANCE_IMPORT:
3266
        feedback_fn("* running the instance OS import scripts...")
3267
        src_node = self.op.src_node
3268
        src_image = self.src_image
3269
        if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3270
                                                src_node, src_image):
3271
          raise errors.OpExecError("Could not import os for instance"
3272
                                   " %s on node %s" %
3273
                                   (instance, pnode_name))
3274
      else:
3275
        # also checked in the prereq part
3276
        raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3277
                                     % self.op.mode)
3278

    
3279
    if self.op.start:
3280
      logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3281
      feedback_fn("* starting instance...")
3282
      if not rpc.call_instance_start(pnode_name, iobj, None):
3283
        raise errors.OpExecError("Could not start instance")
3284

    
3285

    
3286
class LUConnectConsole(NoHooksLU):
3287
  """Connect to an instance's console.
3288

3289
  This is somewhat special in that it returns the command line that
3290
  you need to run on the master node in order to connect to the
3291
  console.
3292

3293
  """
3294
  _OP_REQP = ["instance_name"]
3295

    
3296
  def CheckPrereq(self):
3297
    """Check prerequisites.
3298

3299
    This checks that the instance is in the cluster.
3300

3301
    """
3302
    instance = self.cfg.GetInstanceInfo(
3303
      self.cfg.ExpandInstanceName(self.op.instance_name))
3304
    if instance is None:
3305
      raise errors.OpPrereqError("Instance '%s' not known" %
3306
                                 self.op.instance_name)
3307
    self.instance = instance
3308

    
3309
  def Exec(self, feedback_fn):
3310
    """Connect to the console of an instance
3311

3312
    """
3313
    instance = self.instance
3314
    node = instance.primary_node
3315

    
3316
    node_insts = rpc.call_instance_list([node])[node]
3317
    if node_insts is False:
3318
      raise errors.OpExecError("Can't connect to node %s." % node)
3319

    
3320
    if instance.name not in node_insts:
3321
      raise errors.OpExecError("Instance %s is not running." % instance.name)
3322

    
3323
    logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3324

    
3325
    hyper = hypervisor.GetHypervisor()
3326
    console_cmd = hyper.GetShellCommandForConsole(instance)
3327

    
3328
    # build ssh cmdline
3329
    return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
3330

    
3331

    
3332
class LUReplaceDisks(LogicalUnit):
3333
  """Replace the disks of an instance.
3334

3335
  """
3336
  HPATH = "mirrors-replace"
3337
  HTYPE = constants.HTYPE_INSTANCE
3338
  _OP_REQP = ["instance_name", "mode", "disks"]
3339

    
3340
  def _RunAllocator(self):
3341
    """Compute a new secondary node using an IAllocator.
3342

3343
    """
3344
    ial = IAllocator(self.cfg, self.sstore,
3345
                     mode=constants.IALLOCATOR_MODE_RELOC,
3346
                     name=self.op.instance_name,
3347
                     relocate_from=[self.sec_node])
3348

    
3349
    ial.Run(self.op.iallocator)
3350

    
3351
    if not ial.success:
3352
      raise errors.OpPrereqError("Can't compute nodes using"
3353
                                 " iallocator '%s': %s" % (self.op.iallocator,
3354
                                                           ial.info))
3355
    if len(ial.nodes) != ial.required_nodes:
3356
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3357
                                 " of nodes (%s), required %s" %
3358
                                 (len(ial.nodes), ial.required_nodes))
3359
    self.op.remote_node = ial.nodes[0]
3360
    logger.ToStdout("Selected new secondary for the instance: %s" %
3361
                    self.op.remote_node)
3362

    
3363
  def BuildHooksEnv(self):
3364
    """Build hooks env.
3365

3366
    This runs on the master, the primary and all the secondaries.
3367

3368
    """
3369
    env = {
3370
      "MODE": self.op.mode,
3371
      "NEW_SECONDARY": self.op.remote_node,
3372
      "OLD_SECONDARY": self.instance.secondary_nodes[0],
3373
      }
3374
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3375
    nl = [
3376
      self.sstore.GetMasterNode(),
3377
      self.instance.primary_node,
3378
      ]
3379
    if self.op.remote_node is not None:
3380
      nl.append(self.op.remote_node)
3381
    return env, nl, nl
3382

    
3383
  def CheckPrereq(self):
3384
    """Check prerequisites.
3385

3386
    This checks that the instance is in the cluster.
3387

3388
    """
3389
    if not hasattr(self.op, "remote_node"):
3390
      self.op.remote_node = None
3391

    
3392
    instance = self.cfg.GetInstanceInfo(
3393
      self.cfg.ExpandInstanceName(self.op.instance_name))
3394
    if instance is None:
3395
      raise errors.OpPrereqError("Instance '%s' not known" %
3396
                                 self.op.instance_name)
3397
    self.instance = instance
3398
    self.op.instance_name = instance.name
3399

    
3400
    if instance.disk_template not in constants.DTS_NET_MIRROR:
3401
      raise errors.OpPrereqError("Instance's disk layout is not"
3402
                                 " network mirrored.")
3403

    
3404
    if len(instance.secondary_nodes) != 1:
3405
      raise errors.OpPrereqError("The instance has a strange layout,"
3406
                                 " expected one secondary but found %d" %
3407
                                 len(instance.secondary_nodes))
3408

    
3409
    self.sec_node = instance.secondary_nodes[0]
3410

    
3411
    ia_name = getattr(self.op, "iallocator", None)
3412
    if ia_name is not None:
3413
      if self.op.remote_node is not None:
3414
        raise errors.OpPrereqError("Give either the iallocator or the new"
3415
                                   " secondary, not both")
3416
      self.op.remote_node = self._RunAllocator()
3417

    
3418
    remote_node = self.op.remote_node
3419
    if remote_node is not None:
3420
      remote_node = self.cfg.ExpandNodeName(remote_node)
3421
      if remote_node is None:
3422
        raise errors.OpPrereqError("Node '%s' not known" %
3423
                                   self.op.remote_node)
3424
      self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3425
    else:
3426
      self.remote_node_info = None
3427
    if remote_node == instance.primary_node:
3428
      raise errors.OpPrereqError("The specified node is the primary node of"
3429
                                 " the instance.")
3430
    elif remote_node == self.sec_node:
3431
      if self.op.mode == constants.REPLACE_DISK_SEC:
3432
        # this is for DRBD8, where we can't execute the same mode of
3433
        # replacement as for drbd7 (no different port allocated)
3434
        raise errors.OpPrereqError("Same secondary given, cannot execute"
3435
                                   " replacement")
3436
    if instance.disk_template == constants.DT_DRBD8:
3437
      if (self.op.mode == constants.REPLACE_DISK_ALL and
3438
          remote_node is not None):
3439
        # switch to replace secondary mode
3440
        self.op.mode = constants.REPLACE_DISK_SEC
3441

    
3442
      if self.op.mode == constants.REPLACE_DISK_ALL:
3443
        raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3444
                                   " secondary disk replacement, not"
3445
                                   " both at once")
3446
      elif self.op.mode == constants.REPLACE_DISK_PRI:
3447
        if remote_node is not None:
3448
          raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3449
                                     " the secondary while doing a primary"
3450
                                     " node disk replacement")
3451
        self.tgt_node = instance.primary_node
3452
        self.oth_node = instance.secondary_nodes[0]
3453
      elif self.op.mode == constants.REPLACE_DISK_SEC:
3454
        self.new_node = remote_node # this can be None, in which case
3455
                                    # we don't change the secondary
3456
        self.tgt_node = instance.secondary_nodes[0]
3457
        self.oth_node = instance.primary_node
3458
      else:
3459
        raise errors.ProgrammerError("Unhandled disk replace mode")
3460

    
3461
    for name in self.op.disks:
3462
      if instance.FindDisk(name) is None:
3463
        raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3464
                                   (name, instance.name))
3465
    self.op.remote_node = remote_node
3466

    
3467
  def _ExecD8DiskOnly(self, feedback_fn):
3468
    """Replace a disk on the primary or secondary for dbrd8.
3469

3470
    The algorithm for replace is quite complicated:
3471
      - for each disk to be replaced:
3472
        - create new LVs on the target node with unique names
3473
        - detach old LVs from the drbd device
3474
        - rename old LVs to name_replaced.<time_t>
3475
        - rename new LVs to old LVs
3476
        - attach the new LVs (with the old names now) to the drbd device
3477
      - wait for sync across all devices
3478
      - for each modified disk:
3479
        - remove old LVs (which have the name name_replaces.<time_t>)
3480

3481
    Failures are not very well handled.
3482

3483
    """
3484
    steps_total = 6
3485
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3486
    instance = self.instance
3487
    iv_names = {}
3488
    vgname = self.cfg.GetVGName()
3489
    # start of work
3490
    cfg = self.cfg
3491
    tgt_node = self.tgt_node
3492
    oth_node = self.oth_node
3493

    
3494
    # Step: check device activation
3495
    self.proc.LogStep(1, steps_total, "check device existence")
3496
    info("checking volume groups")
3497
    my_vg = cfg.GetVGName()
3498
    results = rpc.call_vg_list([oth_node, tgt_node])
3499
    if not results:
3500
      raise errors.OpExecError("Can't list volume groups on the nodes")
3501
    for node in oth_node, tgt_node:
3502
      res = results.get(node, False)
3503
      if not res or my_vg not in res:
3504
        raise errors.OpExecError("Volume group '%s' not found on %s" %
3505
                                 (my_vg, node))
3506
    for dev in instance.disks:
3507
      if not dev.iv_name in self.op.disks:
3508
        continue
3509
      for node in tgt_node, oth_node:
3510
        info("checking %s on %s" % (dev.iv_name, node))
3511
        cfg.SetDiskID(dev, node)
3512
        if not rpc.call_blockdev_find(node, dev):
3513
          raise errors.OpExecError("Can't find device %s on node %s" %
3514
                                   (dev.iv_name, node))
3515

    
3516
    # Step: check other node consistency
3517
    self.proc.LogStep(2, steps_total, "check peer consistency")
3518
    for dev in instance.disks:
3519
      if not dev.iv_name in self.op.disks:
3520
        continue
3521
      info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3522
      if not _CheckDiskConsistency(self.cfg, dev, oth_node,
3523
                                   oth_node==instance.primary_node):
3524
        raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3525
                                 " to replace disks on this node (%s)" %
3526
                                 (oth_node, tgt_node))
3527

    
3528
    # Step: create new storage
3529
    self.proc.LogStep(3, steps_total, "allocate new storage")
3530
    for dev in instance.disks:
3531
      if not dev.iv_name in self.op.disks:
3532
        continue
3533
      size = dev.size
3534
      cfg.SetDiskID(dev, tgt_node)
3535
      lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3536
      names = _GenerateUniqueNames(cfg, lv_names)
3537
      lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3538
                             logical_id=(vgname, names[0]))
3539
      lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3540
                             logical_id=(vgname, names[1]))
3541
      new_lvs = [lv_data, lv_meta]
3542
      old_lvs = dev.children
3543
      iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3544
      info("creating new local storage on %s for %s" %
3545
           (tgt_node, dev.iv_name))
3546
      # since we *always* want to create this LV, we use the
3547
      # _Create...OnPrimary (which forces the creation), even if we
3548
      # are talking about the secondary node
3549
      for new_lv in new_lvs:
3550
        if not _CreateBlockDevOnPrimary(cfg, tgt_node, instance, new_lv,
3551
                                        _GetInstanceInfoText(instance)):
3552
          raise errors.OpExecError("Failed to create new LV named '%s' on"
3553
                                   " node '%s'" %
3554
                                   (new_lv.logical_id[1], tgt_node))
3555

    
3556
    # Step: for each lv, detach+rename*2+attach
3557
    self.proc.LogStep(4, steps_total, "change drbd configuration")
3558
    for dev, old_lvs, new_lvs in iv_names.itervalues():
3559
      info("detaching %s drbd from local storage" % dev.iv_name)
3560
      if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
3561
        raise errors.OpExecError("Can't detach drbd from local storage on node"
3562
                                 " %s for device %s" % (tgt_node, dev.iv_name))
3563
      #dev.children = []
3564
      #cfg.Update(instance)
3565

    
3566
      # ok, we created the new LVs, so now we know we have the needed
3567
      # storage; as such, we proceed on the target node to rename
3568
      # old_lv to _old, and new_lv to old_lv; note that we rename LVs
3569
      # using the assumption that logical_id == physical_id (which in
3570
      # turn is the unique_id on that node)
3571

    
3572
      # FIXME(iustin): use a better name for the replaced LVs
3573
      temp_suffix = int(time.time())
3574
      ren_fn = lambda d, suff: (d.physical_id[0],
3575
                                d.physical_id[1] + "_replaced-%s" % suff)
3576
      # build the rename list based on what LVs exist on the node
3577
      rlist = []
3578
      for to_ren in old_lvs:
3579
        find_res = rpc.call_blockdev_find(tgt_node, to_ren)
3580
        if find_res is not None: # device exists
3581
          rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
3582

    
3583
      info("renaming the old LVs on the target node")
3584
      if not rpc.call_blockdev_rename(tgt_node, rlist):
3585
        raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
3586
      # now we rename the new LVs to the old LVs
3587
      info("renaming the new LVs on the target node")
3588
      rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
3589
      if not rpc.call_blockdev_rename(tgt_node, rlist):
3590
        raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
3591

    
3592
      for old, new in zip(old_lvs, new_lvs):
3593
        new.logical_id = old.logical_id
3594
        cfg.SetDiskID(new, tgt_node)
3595

    
3596
      for disk in old_lvs:
3597
        disk.logical_id = ren_fn(disk, temp_suffix)
3598
        cfg.SetDiskID(disk, tgt_node)
3599

    
3600
      # now that the new lvs have the old name, we can add them to the device
3601
      info("adding new mirror component on %s" % tgt_node)
3602
      if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
3603
        for new_lv in new_lvs:
3604
          if not rpc.call_blockdev_remove(tgt_node, new_lv):
3605
            warning("Can't rollback device %s", hint="manually cleanup unused"
3606
                    " logical volumes")
3607
        raise errors.OpExecError("Can't add local storage to drbd")
3608

    
3609
      dev.children = new_lvs
3610
      cfg.Update(instance)
3611

    
3612
    # Step: wait for sync
3613

    
3614
    # this can fail as the old devices are degraded and _WaitForSync
3615
    # does a combined result over all disks, so we don't check its
3616
    # return value
3617
    self.proc.LogStep(5, steps_total, "sync devices")
3618
    _WaitForSync(cfg, instance, self.proc, unlock=True)
3619

    
3620
    # so check manually all the devices
3621
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3622
      cfg.SetDiskID(dev, instance.primary_node)
3623
      is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3624
      if is_degr:
3625
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
3626

    
3627
    # Step: remove old storage
3628
    self.proc.LogStep(6, steps_total, "removing old storage")
3629
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3630
      info("remove logical volumes for %s" % name)
3631
      for lv in old_lvs:
3632
        cfg.SetDiskID(lv, tgt_node)
3633
        if not rpc.call_blockdev_remove(tgt_node, lv):
3634
          warning("Can't remove old LV", hint="manually remove unused LVs")
3635
          continue
3636

    
3637
  def _ExecD8Secondary(self, feedback_fn):
3638
    """Replace the secondary node for drbd8.
3639

3640
    The algorithm for replace is quite complicated:
3641
      - for all disks of the instance:
3642
        - create new LVs on the new node with same names
3643
        - shutdown the drbd device on the old secondary
3644
        - disconnect the drbd network on the primary
3645
        - create the drbd device on the new secondary
3646
        - network attach the drbd on the primary, using an artifice:
3647
          the drbd code for Attach() will connect to the network if it
3648
          finds a device which is connected to the good local disks but
3649
          not network enabled
3650
      - wait for sync across all devices
3651
      - remove all disks from the old secondary
3652

3653
    Failures are not very well handled.
3654

3655
    """
3656
    steps_total = 6
3657
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3658
    instance = self.instance
3659
    iv_names = {}
3660
    vgname = self.cfg.GetVGName()
3661
    # start of work
3662
    cfg = self.cfg
3663
    old_node = self.tgt_node
3664
    new_node = self.new_node
3665
    pri_node = instance.primary_node
3666

    
3667
    # Step: check device activation
3668
    self.proc.LogStep(1, steps_total, "check device existence")
3669
    info("checking volume groups")
3670
    my_vg = cfg.GetVGName()
3671
    results = rpc.call_vg_list([pri_node, new_node])
3672
    if not results:
3673
      raise errors.OpExecError("Can't list volume groups on the nodes")
3674
    for node in pri_node, new_node:
3675
      res = results.get(node, False)
3676
      if not res or my_vg not in res:
3677
        raise errors.OpExecError("Volume group '%s' not found on %s" %
3678
                                 (my_vg, node))
3679
    for dev in instance.disks:
3680
      if not dev.iv_name in self.op.disks:
3681
        continue
3682
      info("checking %s on %s" % (dev.iv_name, pri_node))
3683
      cfg.SetDiskID(dev, pri_node)
3684
      if not rpc.call_blockdev_find(pri_node, dev):
3685
        raise errors.OpExecError("Can't find device %s on node %s" %
3686
                                 (dev.iv_name, pri_node))
3687

    
3688
    # Step: check other node consistency
3689
    self.proc.LogStep(2, steps_total, "check peer consistency")
3690
    for dev in instance.disks:
3691
      if not dev.iv_name in self.op.disks:
3692
        continue
3693
      info("checking %s consistency on %s" % (dev.iv_name, pri_node))
3694
      if not _CheckDiskConsistency(self.cfg, dev, pri_node, True, ldisk=True):
3695
        raise errors.OpExecError("Primary node (%s) has degraded storage,"
3696
                                 " unsafe to replace the secondary" %
3697
                                 pri_node)
3698

    
3699
    # Step: create new storage
3700
    self.proc.LogStep(3, steps_total, "allocate new storage")
3701
    for dev in instance.disks:
3702
      size = dev.size
3703
      info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
3704
      # since we *always* want to create this LV, we use the
3705
      # _Create...OnPrimary (which forces the creation), even if we
3706
      # are talking about the secondary node
3707
      for new_lv in dev.children:
3708
        if not _CreateBlockDevOnPrimary(cfg, new_node, instance, new_lv,
3709
                                        _GetInstanceInfoText(instance)):
3710
          raise errors.OpExecError("Failed to create new LV named '%s' on"
3711
                                   " node '%s'" %
3712
                                   (new_lv.logical_id[1], new_node))
3713

    
3714
      iv_names[dev.iv_name] = (dev, dev.children)
3715

    
3716
    self.proc.LogStep(4, steps_total, "changing drbd configuration")
3717
    for dev in instance.disks:
3718
      size = dev.size
3719
      info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
3720
      # create new devices on new_node
3721
      new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
3722
                              logical_id=(pri_node, new_node,
3723
                                          dev.logical_id[2]),
3724
                              children=dev.children)
3725
      if not _CreateBlockDevOnSecondary(cfg, new_node, instance,
3726
                                        new_drbd, False,
3727
                                      _GetInstanceInfoText(instance)):
3728
        raise errors.OpExecError("Failed to create new DRBD on"
3729
                                 " node '%s'" % new_node)
3730

    
3731
    for dev in instance.disks:
3732
      # we have new devices, shutdown the drbd on the old secondary
3733
      info("shutting down drbd for %s on old node" % dev.iv_name)
3734
      cfg.SetDiskID(dev, old_node)
3735
      if not rpc.call_blockdev_shutdown(old_node, dev):
3736
        warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
3737
                hint="Please cleanup this device manually as soon as possible")
3738

    
3739
    info("detaching primary drbds from the network (=> standalone)")
3740
    done = 0
3741
    for dev in instance.disks:
3742
      cfg.SetDiskID(dev, pri_node)
3743
      # set the physical (unique in bdev terms) id to None, meaning
3744
      # detach from network
3745
      dev.physical_id = (None,) * len(dev.physical_id)
3746
      # and 'find' the device, which will 'fix' it to match the
3747
      # standalone state
3748
      if rpc.call_blockdev_find(pri_node, dev):
3749
        done += 1
3750
      else:
3751
        warning("Failed to detach drbd %s from network, unusual case" %
3752
                dev.iv_name)
3753

    
3754
    if not done:
3755
      # no detaches succeeded (very unlikely)
3756
      raise errors.OpExecError("Can't detach at least one DRBD from old node")
3757

    
3758
    # if we managed to detach at least one, we update all the disks of
3759
    # the instance to point to the new secondary
3760
    info("updating instance configuration")
3761
    for dev in instance.disks:
3762
      dev.logical_id = (pri_node, new_node) + dev.logical_id[2:]
3763
      cfg.SetDiskID(dev, pri_node)
3764
    cfg.Update(instance)
3765

    
3766
    # and now perform the drbd attach
3767
    info("attaching primary drbds to new secondary (standalone => connected)")
3768
    failures = []
3769
    for dev in instance.disks:
3770
      info("attaching primary drbd for %s to new secondary node" % dev.iv_name)
3771
      # since the attach is smart, it's enough to 'find' the device,
3772
      # it will automatically activate the network, if the physical_id
3773
      # is correct
3774
      cfg.SetDiskID(dev, pri_node)
3775
      if not rpc.call_blockdev_find(pri_node, dev):
3776
        warning("can't attach drbd %s to new secondary!" % dev.iv_name,
3777
                "please do a gnt-instance info to see the status of disks")
3778

    
3779
    # this can fail as the old devices are degraded and _WaitForSync
3780
    # does a combined result over all disks, so we don't check its
3781
    # return value
3782
    self.proc.LogStep(5, steps_total, "sync devices")
3783
    _WaitForSync(cfg, instance, self.proc, unlock=True)
3784

    
3785
    # so check manually all the devices
3786
    for name, (dev, old_lvs) in iv_names.iteritems():
3787
      cfg.SetDiskID(dev, pri_node)
3788
      is_degr = rpc.call_blockdev_find(pri_node, dev)[5]
3789
      if is_degr:
3790
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
3791

    
3792
    self.proc.LogStep(6, steps_total, "removing old storage")
3793
    for name, (dev, old_lvs) in iv_names.iteritems():
3794
      info("remove logical volumes for %s" % name)
3795
      for lv in old_lvs:
3796
        cfg.SetDiskID(lv, old_node)
3797
        if not rpc.call_blockdev_remove(old_node, lv):
3798
          warning("Can't remove LV on old secondary",
3799
                  hint="Cleanup stale volumes by hand")
3800

    
3801
  def Exec(self, feedback_fn):
3802
    """Execute disk replacement.
3803

3804
    This dispatches the disk replacement to the appropriate handler.
3805

3806
    """
3807
    instance = self.instance
3808

    
3809
    # Activate the instance disks if we're replacing them on a down instance
3810
    if instance.status == "down":
3811
      op = opcodes.OpActivateInstanceDisks(instance_name=instance.name)
3812
      self.proc.ChainOpCode(op)
3813

    
3814
    if instance.disk_template == constants.DT_DRBD8:
3815
      if self.op.remote_node is None:
3816
        fn = self._ExecD8DiskOnly
3817
      else:
3818
        fn = self._ExecD8Secondary
3819
    else:
3820
      raise errors.ProgrammerError("Unhandled disk replacement case")
3821

    
3822
    ret = fn(feedback_fn)
3823

    
3824
    # Deactivate the instance disks if we're replacing them on a down instance
3825
    if instance.status == "down":
3826
      op = opcodes.OpDeactivateInstanceDisks(instance_name=instance.name)
3827
      self.proc.ChainOpCode(op)
3828

    
3829
    return ret
3830

    
3831

    
3832
class LUGrowDisk(LogicalUnit):
3833
  """Grow a disk of an instance.
3834

3835
  """
3836
  HPATH = "disk-grow"
3837
  HTYPE = constants.HTYPE_INSTANCE
3838
  _OP_REQP = ["instance_name", "disk", "amount"]
3839

    
3840
  def BuildHooksEnv(self):
3841
    """Build hooks env.
3842

3843
    This runs on the master, the primary and all the secondaries.
3844

3845
    """
3846
    env = {
3847
      "DISK": self.op.disk,
3848
      "AMOUNT": self.op.amount,
3849
      }
3850
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3851
    nl = [
3852
      self.sstore.GetMasterNode(),
3853
      self.instance.primary_node,
3854
      ]
3855
    return env, nl, nl
3856

    
3857
  def CheckPrereq(self):
3858
    """Check prerequisites.
3859

3860
    This checks that the instance is in the cluster.
3861

3862
    """
3863
    instance = self.cfg.GetInstanceInfo(
3864
      self.cfg.ExpandInstanceName(self.op.instance_name))
3865
    if instance is None:
3866
      raise errors.OpPrereqError("Instance '%s' not known" %
3867
                                 self.op.instance_name)
3868
    self.instance = instance
3869
    self.op.instance_name = instance.name
3870

    
3871
    if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
3872
      raise errors.OpPrereqError("Instance's disk layout does not support"
3873
                                 " growing.")
3874

    
3875
    if instance.FindDisk(self.op.disk) is None:
3876
      raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3877
                                 (self.op.disk, instance.name))
3878

    
3879
    nodenames = [instance.primary_node] + list(instance.secondary_nodes)
3880
    nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
3881
    for node in nodenames:
3882
      info = nodeinfo.get(node, None)
3883
      if not info:
3884
        raise errors.OpPrereqError("Cannot get current information"
3885
                                   " from node '%s'" % node)
3886
      vg_free = info.get('vg_free', None)
3887
      if not isinstance(vg_free, int):
3888
        raise errors.OpPrereqError("Can't compute free disk space on"
3889
                                   " node %s" % node)
3890
      if self.op.amount > info['vg_free']:
3891
        raise errors.OpPrereqError("Not enough disk space on target node %s:"
3892
                                   " %d MiB available, %d MiB required" %
3893
                                   (node, info['vg_free'], self.op.amount))
3894

    
3895
  def Exec(self, feedback_fn):
3896
    """Execute disk grow.
3897

3898
    """
3899
    instance = self.instance
3900
    disk = instance.FindDisk(self.op.disk)
3901
    for node in (instance.secondary_nodes + (instance.primary_node,)):
3902
      self.cfg.SetDiskID(disk, node)
3903
      result = rpc.call_blockdev_grow(node, disk, self.op.amount)
3904
      if not result or not isinstance(result, tuple) or len(result) != 2:
3905
        raise errors.OpExecError("grow request failed to node %s" % node)
3906
      elif not result[0]:
3907
        raise errors.OpExecError("grow request failed to node %s: %s" %
3908
                                 (node, result[1]))
3909
    disk.RecordGrow(self.op.amount)
3910
    self.cfg.Update(instance)
3911
    return
3912

    
3913

    
3914
class LUQueryInstanceData(NoHooksLU):
3915
  """Query runtime instance data.
3916

3917
  """
3918
  _OP_REQP = ["instances"]
3919

    
3920
  def CheckPrereq(self):
3921
    """Check prerequisites.
3922

3923
    This only checks the optional instance list against the existing names.
3924

3925
    """
3926
    if not isinstance(self.op.instances, list):
3927
      raise errors.OpPrereqError("Invalid argument type 'instances'")
3928
    if self.op.instances:
3929
      self.wanted_instances = []
3930
      names = self.op.instances
3931
      for name in names:
3932
        instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
3933
        if instance is None:
3934
          raise errors.OpPrereqError("No such instance name '%s'" % name)
3935
        self.wanted_instances.append(instance)
3936
    else:
3937
      self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
3938
                               in self.cfg.GetInstanceList()]
3939
    return
3940

    
3941

    
3942
  def _ComputeDiskStatus(self, instance, snode, dev):
3943
    """Compute block device status.
3944

3945
    """
3946
    self.cfg.SetDiskID(dev, instance.primary_node)
3947
    dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
3948
    if dev.dev_type in constants.LDS_DRBD:
3949
      # we change the snode then (otherwise we use the one passed in)
3950
      if dev.logical_id[0] == instance.primary_node:
3951
        snode = dev.logical_id[1]
3952
      else:
3953
        snode = dev.logical_id[0]
3954

    
3955
    if snode:
3956
      self.cfg.SetDiskID(dev, snode)
3957
      dev_sstatus = rpc.call_blockdev_find(snode, dev)
3958
    else:
3959
      dev_sstatus = None
3960

    
3961
    if dev.children:
3962
      dev_children = [self._ComputeDiskStatus(instance, snode, child)
3963
                      for child in dev.children]
3964
    else:
3965
      dev_children = []
3966

    
3967
    data = {
3968
      "iv_name": dev.iv_name,
3969
      "dev_type": dev.dev_type,
3970
      "logical_id": dev.logical_id,
3971
      "physical_id": dev.physical_id,
3972
      "pstatus": dev_pstatus,
3973
      "sstatus": dev_sstatus,
3974
      "children": dev_children,
3975
      }
3976

    
3977
    return data
3978

    
3979
  def Exec(self, feedback_fn):
3980
    """Gather and return data"""
3981
    result = {}
3982
    for instance in self.wanted_instances:
3983
      remote_info = rpc.call_instance_info(instance.primary_node,
3984
                                                instance.name)
3985
      if remote_info and "state" in remote_info:
3986
        remote_state = "up"
3987
      else:
3988
        remote_state = "down"
3989
      if instance.status == "down":
3990
        config_state = "down"
3991
      else:
3992
        config_state = "up"
3993

    
3994
      disks = [self._ComputeDiskStatus(instance, None, device)
3995
               for device in instance.disks]
3996

    
3997
      idict = {
3998
        "name": instance.name,
3999
        "config_state": config_state,
4000
        "run_state": remote_state,
4001
        "pnode": instance.primary_node,
4002
        "snodes": instance.secondary_nodes,
4003
        "os": instance.os,
4004
        "memory": instance.memory,
4005
        "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
4006
        "disks": disks,
4007
        "vcpus": instance.vcpus,
4008
        }
4009

    
4010
      htkind = self.sstore.GetHypervisorType()
4011
      if htkind == constants.HT_XEN_PVM30:
4012
        idict["kernel_path"] = instance.kernel_path
4013
        idict["initrd_path"] = instance.initrd_path
4014

    
4015
      if htkind == constants.HT_XEN_HVM31:
4016
        idict["hvm_boot_order"] = instance.hvm_boot_order
4017
        idict["hvm_acpi"] = instance.hvm_acpi
4018
        idict["hvm_pae"] = instance.hvm_pae
4019
        idict["hvm_cdrom_image_path"] = instance.hvm_cdrom_image_path
4020

    
4021
      if htkind in constants.HTS_REQ_PORT:
4022
        idict["vnc_bind_address"] = instance.vnc_bind_address
4023
        idict["network_port"] = instance.network_port
4024

    
4025
      result[instance.name] = idict
4026

    
4027
    return result
4028

    
4029

    
4030
class LUSetInstanceParams(LogicalUnit):
4031
  """Modifies an instances's parameters.
4032

4033
  """
4034
  HPATH = "instance-modify"
4035
  HTYPE = constants.HTYPE_INSTANCE
4036
  _OP_REQP = ["instance_name"]
4037

    
4038
  def BuildHooksEnv(self):
4039
    """Build hooks env.
4040

4041
    This runs on the master, primary and secondaries.
4042

4043
    """
4044
    args = dict()
4045
    if self.mem:
4046
      args['memory'] = self.mem
4047
    if self.vcpus:
4048
      args['vcpus'] = self.vcpus
4049
    if self.do_ip or self.do_bridge or self.mac:
4050
      if self.do_ip:
4051
        ip = self.ip
4052
      else:
4053
        ip = self.instance.nics[0].ip
4054
      if self.bridge:
4055
        bridge = self.bridge
4056
      else:
4057
        bridge = self.instance.nics[0].bridge
4058
      if self.mac:
4059
        mac = self.mac
4060
      else:
4061
        mac = self.instance.nics[0].mac
4062
      args['nics'] = [(ip, bridge, mac)]
4063
    env = _BuildInstanceHookEnvByObject(self.instance, override=args)
4064
    nl = [self.sstore.GetMasterNode(),
4065
          self.instance.primary_node] + list(self.instance.secondary_nodes)
4066
    return env, nl, nl
4067

    
4068
  def CheckPrereq(self):
4069
    """Check prerequisites.
4070

4071
    This only checks the instance list against the existing names.
4072

4073
    """
4074
    self.mem = getattr(self.op, "mem", None)
4075
    self.vcpus = getattr(self.op, "vcpus", None)
4076
    self.ip = getattr(self.op, "ip", None)
4077
    self.mac = getattr(self.op, "mac", None)
4078
    self.bridge = getattr(self.op, "bridge", None)
4079
    self.kernel_path = getattr(self.op, "kernel_path", None)
4080
    self.initrd_path = getattr(self.op, "initrd_path", None)
4081
    self.hvm_boot_order = getattr(self.op, "hvm_boot_order", None)
4082
    self.hvm_acpi = getattr(self.op, "hvm_acpi", None)
4083
    self.hvm_pae = getattr(self.op, "hvm_pae", None)
4084
    self.hvm_cdrom_image_path = getattr(self.op, "hvm_cdrom_image_path", None)
4085
    self.vnc_bind_address = getattr(self.op, "vnc_bind_address", None)
4086
    all_parms = [self.mem, self.vcpus, self.ip, self.bridge, self.mac,
4087
                 self.kernel_path, self.initrd_path, self.hvm_boot_order,
4088
                 self.hvm_acpi, self.hvm_pae, self.hvm_cdrom_image_path,
4089
                 self.vnc_bind_address]
4090
    if all_parms.count(None) == len(all_parms):
4091
      raise errors.OpPrereqError("No changes submitted")
4092
    if self.mem is not None:
4093
      try:
4094
        self.mem = int(self.mem)
4095
      except ValueError, err:
4096
        raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
4097
    if self.vcpus is not None:
4098
      try:
4099
        self.vcpus = int(self.vcpus)
4100
      except ValueError, err:
4101
        raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
4102
    if self.ip is not None:
4103
      self.do_ip = True
4104
      if self.ip.lower() == "none":
4105
        self.ip = None
4106
      else:
4107
        if not utils.IsValidIP(self.ip):
4108
          raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
4109
    else:
4110
      self.do_ip = False
4111
    self.do_bridge = (self.bridge is not None)
4112
    if self.mac is not None:
4113
      if self.cfg.IsMacInUse(self.mac):
4114
        raise errors.OpPrereqError('MAC address %s already in use in cluster' %
4115
                                   self.mac)
4116
      if not utils.IsValidMac(self.mac):
4117
        raise errors.OpPrereqError('Invalid MAC address %s' % self.mac)
4118

    
4119
    if self.kernel_path is not None:
4120
      self.do_kernel_path = True
4121
      if self.kernel_path == constants.VALUE_NONE:
4122
        raise errors.OpPrereqError("Can't set instance to no kernel")
4123

    
4124
      if self.kernel_path != constants.VALUE_DEFAULT:
4125
        if not os.path.isabs(self.kernel_path):
4126
          raise errors.OpPrereqError("The kernel path must be an absolute"
4127
                                    " filename")
4128
    else:
4129
      self.do_kernel_path = False
4130

    
4131
    if self.initrd_path is not None:
4132
      self.do_initrd_path = True
4133
      if self.initrd_path not in (constants.VALUE_NONE,
4134
                                  constants.VALUE_DEFAULT):
4135
        if not os.path.isabs(self.initrd_path):
4136
          raise errors.OpPrereqError("The initrd path must be an absolute"
4137
                                    " filename")
4138
    else:
4139
      self.do_initrd_path = False
4140

    
4141
    # boot order verification
4142
    if self.hvm_boot_order is not None:
4143
      if self.hvm_boot_order != constants.VALUE_DEFAULT:
4144
        if len(self.hvm_boot_order.strip("acdn")) != 0:
4145
          raise errors.OpPrereqError("invalid boot order specified,"
4146
                                     " must be one or more of [acdn]"
4147
                                     " or 'default'")
4148

    
4149
    # hvm_cdrom_image_path verification
4150
    if self.op.hvm_cdrom_image_path is not None:
4151
      if not os.path.isabs(self.op.hvm_cdrom_image_path):
4152
        raise errors.OpPrereqError("The path to the HVM CDROM image must"
4153
                                   " be an absolute path or None, not %s" %
4154
                                   self.op.hvm_cdrom_image_path)
4155
      if not os.path.isfile(self.op.hvm_cdrom_image_path):
4156
        raise errors.OpPrereqError("The HVM CDROM image must either be a"
4157
                                   " regular file or a symlink pointing to"
4158
                                   " an existing regular file, not %s" %
4159
                                   self.op.hvm_cdrom_image_path)
4160

    
4161
    # vnc_bind_address verification
4162
    if self.op.vnc_bind_address is not None:
4163
      if not utils.IsValidIP(self.op.vnc_bind_address):
4164
        raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
4165
                                   " like a valid IP address" %
4166
                                   self.op.vnc_bind_address)
4167

    
4168
    instance = self.cfg.GetInstanceInfo(
4169
      self.cfg.ExpandInstanceName(self.op.instance_name))
4170
    if instance is None:
4171
      raise errors.OpPrereqError("No such instance name '%s'" %
4172
                                 self.op.instance_name)
4173
    self.op.instance_name = instance.name
4174
    self.instance = instance
4175
    return
4176

    
4177
  def Exec(self, feedback_fn):
4178
    """Modifies an instance.
4179

4180
    All parameters take effect only at the next restart of the instance.
4181
    """
4182
    result = []
4183
    instance = self.instance
4184
    if self.mem:
4185
      instance.memory = self.mem
4186
      result.append(("mem", self.mem))
4187
    if self.vcpus:
4188
      instance.vcpus = self.vcpus
4189
      result.append(("vcpus",  self.vcpus))
4190
    if self.do_ip:
4191
      instance.nics[0].ip = self.ip
4192
      result.append(("ip", self.ip))
4193
    if self.bridge:
4194
      instance.nics[0].bridge = self.bridge
4195
      result.append(("bridge", self.bridge))
4196
    if self.mac:
4197
      instance.nics[0].mac = self.mac
4198
      result.append(("mac", self.mac))
4199
    if self.do_kernel_path:
4200
      instance.kernel_path = self.kernel_path
4201
      result.append(("kernel_path", self.kernel_path))
4202
    if self.do_initrd_path:
4203
      instance.initrd_path = self.initrd_path
4204
      result.append(("initrd_path", self.initrd_path))
4205
    if self.hvm_boot_order:
4206
      if self.hvm_boot_order == constants.VALUE_DEFAULT:
4207
        instance.hvm_boot_order = None
4208
      else:
4209
        instance.hvm_boot_order = self.hvm_boot_order
4210
      result.append(("hvm_boot_order", self.hvm_boot_order))
4211
    if self.hvm_acpi:
4212
      instance.hvm_acpi = self.hvm_acpi
4213
      result.append(("hvm_acpi", self.hvm_acpi))
4214
    if self.hvm_pae:
4215
      instance.hvm_pae = self.hvm_pae
4216
      result.append(("hvm_pae", self.hvm_pae))
4217
    if self.hvm_cdrom_image_path:
4218
      instance.hvm_cdrom_image_path = self.hvm_cdrom_image_path
4219
      result.append(("hvm_cdrom_image_path", self.hvm_cdrom_image_path))
4220
    if self.vnc_bind_address:
4221
      instance.vnc_bind_address = self.vnc_bind_address
4222
      result.append(("vnc_bind_address", self.vnc_bind_address))
4223

    
4224
    self.cfg.AddInstance(instance)
4225

    
4226
    return result
4227

    
4228

    
4229
class LUQueryExports(NoHooksLU):
4230
  """Query the exports list
4231

4232
  """
4233
  _OP_REQP = []
4234

    
4235
  def CheckPrereq(self):
4236
    """Check that the nodelist contains only existing nodes.
4237

4238
    """
4239
    self.nodes = _GetWantedNodes(self, getattr(self.op, "nodes", None))
4240

    
4241
  def Exec(self, feedback_fn):
4242
    """Compute the list of all the exported system images.
4243

4244
    Returns:
4245
      a dictionary with the structure node->(export-list)
4246
      where export-list is a list of the instances exported on
4247
      that node.
4248

4249
    """
4250
    return rpc.call_export_list(self.nodes)
4251

    
4252

    
4253
class LUExportInstance(LogicalUnit):
4254
  """Export an instance to an image in the cluster.
4255

4256
  """
4257
  HPATH = "instance-export"
4258
  HTYPE = constants.HTYPE_INSTANCE
4259
  _OP_REQP = ["instance_name", "target_node", "shutdown"]
4260

    
4261
  def BuildHooksEnv(self):
4262
    """Build hooks env.
4263

4264
    This will run on the master, primary node and target node.
4265

4266
    """
4267
    env = {
4268
      "EXPORT_NODE": self.op.target_node,
4269
      "EXPORT_DO_SHUTDOWN": self.op.shutdown,
4270
      }
4271
    env.update(_BuildInstanceHookEnvByObject(self.instance))
4272
    nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
4273
          self.op.target_node]
4274
    return env, nl, nl
4275

    
4276
  def CheckPrereq(self):
4277
    """Check prerequisites.
4278

4279
    This checks that the instance and node names are valid.
4280

4281
    """
4282
    instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4283
    self.instance = self.cfg.GetInstanceInfo(instance_name)
4284
    if self.instance is None:
4285
      raise errors.OpPrereqError("Instance '%s' not found" %
4286
                                 self.op.instance_name)
4287

    
4288
    # node verification
4289
    dst_node_short = self.cfg.ExpandNodeName(self.op.target_node)
4290
    self.dst_node = self.cfg.GetNodeInfo(dst_node_short)
4291

    
4292
    if self.dst_node is None:
4293
      raise errors.OpPrereqError("Destination node '%s' is unknown." %
4294
                                 self.op.target_node)
4295
    self.op.target_node = self.dst_node.name
4296

    
4297
    # instance disk type verification
4298
    for disk in self.instance.disks:
4299
      if disk.dev_type == constants.LD_FILE:
4300
        raise errors.OpPrereqError("Export not supported for instances with"
4301
                                   " file-based disks")
4302

    
4303
  def Exec(self, feedback_fn):
4304
    """Export an instance to an image in the cluster.
4305

4306
    """
4307
    instance = self.instance
4308
    dst_node = self.dst_node
4309
    src_node = instance.primary_node
4310
    if self.op.shutdown:
4311
      # shutdown the instance, but not the disks
4312
      if not rpc.call_instance_shutdown(src_node, instance):
4313
         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
4314
                                  (instance.name, src_node))
4315

    
4316
    vgname = self.cfg.GetVGName()
4317

    
4318
    snap_disks = []
4319

    
4320
    try:
4321
      for disk in instance.disks:
4322
        if disk.iv_name == "sda":
4323
          # new_dev_name will be a snapshot of an lvm leaf of the one we passed
4324
          new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
4325

    
4326
          if not new_dev_name:
4327
            logger.Error("could not snapshot block device %s on node %s" %
4328
                         (disk.logical_id[1], src_node))
4329
          else:
4330
            new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
4331
                                      logical_id=(vgname, new_dev_name),
4332
                                      physical_id=(vgname, new_dev_name),
4333
                                      iv_name=disk.iv_name)
4334
            snap_disks.append(new_dev)
4335

    
4336
    finally:
4337
      if self.op.shutdown and instance.status == "up":
4338
        if not rpc.call_instance_start(src_node, instance, None):
4339
          _ShutdownInstanceDisks(instance, self.cfg)
4340
          raise errors.OpExecError("Could not start instance")
4341

    
4342
    # TODO: check for size
4343

    
4344
    for dev in snap_disks:
4345
      if not rpc.call_snapshot_export(src_node, dev, dst_node.name, instance):
4346
        logger.Error("could not export block device %s from node %s to node %s"
4347
                     % (dev.logical_id[1], src_node, dst_node.name))
4348
      if not rpc.call_blockdev_remove(src_node, dev):
4349
        logger.Error("could not remove snapshot block device %s from node %s" %
4350
                     (dev.logical_id[1], src_node))
4351

    
4352
    if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
4353
      logger.Error("could not finalize export for instance %s on node %s" %
4354
                   (instance.name, dst_node.name))
4355

    
4356
    nodelist = self.cfg.GetNodeList()
4357
    nodelist.remove(dst_node.name)
4358

    
4359
    # on one-node clusters nodelist will be empty after the removal
4360
    # if we proceed the backup would be removed because OpQueryExports
4361
    # substitutes an empty list with the full cluster node list.
4362
    if nodelist:
4363
      op = opcodes.OpQueryExports(nodes=nodelist)
4364
      exportlist = self.proc.ChainOpCode(op)
4365
      for node in exportlist:
4366
        if instance.name in exportlist[node]:
4367
          if not rpc.call_export_remove(node, instance.name):
4368
            logger.Error("could not remove older export for instance %s"
4369
                         " on node %s" % (instance.name, node))
4370

    
4371

    
4372
class LURemoveExport(NoHooksLU):
4373
  """Remove exports related to the named instance.
4374

4375
  """
4376
  _OP_REQP = ["instance_name"]
4377

    
4378
  def CheckPrereq(self):
4379
    """Check prerequisites.
4380
    """
4381
    pass
4382

    
4383
  def Exec(self, feedback_fn):
4384
    """Remove any export.
4385

4386
    """
4387
    instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4388
    # If the instance was not found we'll try with the name that was passed in.
4389
    # This will only work if it was an FQDN, though.
4390
    fqdn_warn = False
4391
    if not instance_name:
4392
      fqdn_warn = True
4393
      instance_name = self.op.instance_name
4394

    
4395
    op = opcodes.OpQueryExports(nodes=[])
4396
    exportlist = self.proc.ChainOpCode(op)
4397
    found = False
4398
    for node in exportlist:
4399
      if instance_name in exportlist[node]:
4400
        found = True
4401
        if not rpc.call_export_remove(node, instance_name):
4402
          logger.Error("could not remove export for instance %s"
4403
                       " on node %s" % (instance_name, node))
4404

    
4405
    if fqdn_warn and not found:
4406
      feedback_fn("Export not found. If trying to remove an export belonging"
4407
                  " to a deleted instance please use its Fully Qualified"
4408
                  " Domain Name.")
4409

    
4410

    
4411
class TagsLU(NoHooksLU):
4412
  """Generic tags LU.
4413

4414
  This is an abstract class which is the parent of all the other tags LUs.
4415

4416
  """
4417
  def CheckPrereq(self):
4418
    """Check prerequisites.
4419

4420
    """
4421
    if self.op.kind == constants.TAG_CLUSTER:
4422
      self.target = self.cfg.GetClusterInfo()
4423
    elif self.op.kind == constants.TAG_NODE:
4424
      name = self.cfg.ExpandNodeName(self.op.name)
4425
      if name is None:
4426
        raise errors.OpPrereqError("Invalid node name (%s)" %
4427
                                   (self.op.name,))
4428
      self.op.name = name
4429
      self.target = self.cfg.GetNodeInfo(name)
4430
    elif self.op.kind == constants.TAG_INSTANCE:
4431
      name = self.cfg.ExpandInstanceName(self.op.name)
4432
      if name is None:
4433
        raise errors.OpPrereqError("Invalid instance name (%s)" %
4434
                                   (self.op.name,))
4435
      self.op.name = name
4436
      self.target = self.cfg.GetInstanceInfo(name)
4437
    else:
4438
      raise errors.OpPrereqError("Wrong tag type requested (%s)" %
4439
                                 str(self.op.kind))
4440

    
4441

    
4442
class LUGetTags(TagsLU):
4443
  """Returns the tags of a given object.
4444

4445
  """
4446
  _OP_REQP = ["kind", "name"]
4447

    
4448
  def Exec(self, feedback_fn):
4449
    """Returns the tag list.
4450

4451
    """
4452
    return self.target.GetTags()
4453

    
4454

    
4455
class LUSearchTags(NoHooksLU):
4456
  """Searches the tags for a given pattern.
4457

4458
  """
4459
  _OP_REQP = ["pattern"]
4460

    
4461
  def CheckPrereq(self):
4462
    """Check prerequisites.
4463

4464
    This checks the pattern passed for validity by compiling it.
4465

4466
    """
4467
    try:
4468
      self.re = re.compile(self.op.pattern)
4469
    except re.error, err:
4470
      raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
4471
                                 (self.op.pattern, err))
4472

    
4473
  def Exec(self, feedback_fn):
4474
    """Returns the tag list.
4475

4476
    """
4477
    cfg = self.cfg
4478
    tgts = [("/cluster", cfg.GetClusterInfo())]
4479
    ilist = [cfg.GetInstanceInfo(name) for name in cfg.GetInstanceList()]
4480
    tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
4481
    nlist = [cfg.GetNodeInfo(name) for name in cfg.GetNodeList()]
4482
    tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
4483
    results = []
4484
    for path, target in tgts:
4485
      for tag in target.GetTags():
4486
        if self.re.search(tag):
4487
          results.append((path, tag))
4488
    return results
4489

    
4490

    
4491
class LUAddTags(TagsLU):
4492
  """Sets a tag on a given object.
4493

4494
  """
4495
  _OP_REQP = ["kind", "name", "tags"]
4496

    
4497
  def CheckPrereq(self):
4498
    """Check prerequisites.
4499

4500
    This checks the type and length of the tag name and value.
4501

4502
    """
4503
    TagsLU.CheckPrereq(self)
4504
    for tag in self.op.tags:
4505
      objects.TaggableObject.ValidateTag(tag)
4506

    
4507
  def Exec(self, feedback_fn):
4508
    """Sets the tag.
4509

4510
    """
4511
    try:
4512
      for tag in self.op.tags:
4513
        self.target.AddTag(tag)
4514
    except errors.TagError, err:
4515
      raise errors.OpExecError("Error while setting tag: %s" % str(err))
4516
    try:
4517
      self.cfg.Update(self.target)
4518
    except errors.ConfigurationError:
4519
      raise errors.OpRetryError("There has been a modification to the"
4520
                                " config file and the operation has been"
4521
                                " aborted. Please retry.")
4522

    
4523

    
4524
class LUDelTags(TagsLU):
4525
  """Delete a list of tags from a given object.
4526

4527
  """
4528
  _OP_REQP = ["kind", "name", "tags"]
4529

    
4530
  def CheckPrereq(self):
4531
    """Check prerequisites.
4532

4533
    This checks that we have the given tag.
4534

4535
    """
4536
    TagsLU.CheckPrereq(self)
4537
    for tag in self.op.tags:
4538
      objects.TaggableObject.ValidateTag(tag)
4539
    del_tags = frozenset(self.op.tags)
4540
    cur_tags = self.target.GetTags()
4541
    if not del_tags <= cur_tags:
4542
      diff_tags = del_tags - cur_tags
4543
      diff_names = ["'%s'" % tag for tag in diff_tags]
4544
      diff_names.sort()
4545
      raise errors.OpPrereqError("Tag(s) %s not found" %
4546
                                 (",".join(diff_names)))
4547

    
4548
  def Exec(self, feedback_fn):
4549
    """Remove the tag from the object.
4550

4551
    """
4552
    for tag in self.op.tags:
4553
      self.target.RemoveTag(tag)
4554
    try:
4555
      self.cfg.Update(self.target)
4556
    except errors.ConfigurationError:
4557
      raise errors.OpRetryError("There has been a modification to the"
4558
                                " config file and the operation has been"
4559
                                " aborted. Please retry.")
4560

    
4561
class LUTestDelay(NoHooksLU):
4562
  """Sleep for a specified amount of time.
4563

4564
  This LU sleeps on the master and/or nodes for a specified amount of
4565
  time.
4566

4567
  """
4568
  _OP_REQP = ["duration", "on_master", "on_nodes"]
4569

    
4570
  def CheckPrereq(self):
4571
    """Check prerequisites.
4572

4573
    This checks that we have a good list of nodes and/or the duration
4574
    is valid.
4575

4576
    """
4577

    
4578
    if self.op.on_nodes:
4579
      self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
4580

    
4581
  def Exec(self, feedback_fn):
4582
    """Do the actual sleep.
4583

4584
    """
4585
    if self.op.on_master:
4586
      if not utils.TestDelay(self.op.duration):
4587
        raise errors.OpExecError("Error during master delay test")
4588
    if self.op.on_nodes:
4589
      result = rpc.call_test_delay(self.op.on_nodes, self.op.duration)
4590
      if not result:
4591
        raise errors.OpExecError("Complete failure from rpc call")
4592
      for node, node_result in result.items():
4593
        if not node_result:
4594
          raise errors.OpExecError("Failure during rpc call to node %s,"
4595
                                   " result: %s" % (node, node_result))
4596

    
4597

    
4598
class IAllocator(object):
4599
  """IAllocator framework.
4600

4601
  An IAllocator instance has three sets of attributes:
4602
    - cfg/sstore that are needed to query the cluster
4603
    - input data (all members of the _KEYS class attribute are required)
4604
    - four buffer attributes (in|out_data|text), that represent the
4605
      input (to the external script) in text and data structure format,
4606
      and the output from it, again in two formats
4607
    - the result variables from the script (success, info, nodes) for
4608
      easy usage
4609

4610
  """
4611
  _ALLO_KEYS = [
4612
    "mem_size", "disks", "disk_template",
4613
    "os", "tags", "nics", "vcpus",
4614
    ]
4615
  _RELO_KEYS = [
4616
    "relocate_from",
4617
    ]
4618

    
4619
  def __init__(self, cfg, sstore, mode, name, **kwargs):
4620
    self.cfg = cfg
4621
    self.sstore = sstore
4622
    # init buffer variables
4623
    self.in_text = self.out_text = self.in_data = self.out_data = None
4624
    # init all input fields so that pylint is happy
4625
    self.mode = mode
4626
    self.name = name
4627
    self.mem_size = self.disks = self.disk_template = None
4628
    self.os = self.tags = self.nics = self.vcpus = None
4629
    self.relocate_from = None
4630
    # computed fields
4631
    self.required_nodes = None
4632
    # init result fields
4633
    self.success = self.info = self.nodes = None
4634
    if self.mode == constants.IALLOCATOR_MODE_ALLOC:
4635
      keyset = self._ALLO_KEYS
4636
    elif self.mode == constants.IALLOCATOR_MODE_RELOC:
4637
      keyset = self._RELO_KEYS
4638
    else:
4639
      raise errors.ProgrammerError("Unknown mode '%s' passed to the"
4640
                                   " IAllocator" % self.mode)
4641
    for key in kwargs:
4642
      if key not in keyset:
4643
        raise errors.ProgrammerError("Invalid input parameter '%s' to"
4644
                                     " IAllocator" % key)
4645
      setattr(self, key, kwargs[key])
4646
    for key in keyset:
4647
      if key not in kwargs:
4648
        raise errors.ProgrammerError("Missing input parameter '%s' to"
4649
                                     " IAllocator" % key)
4650
    self._BuildInputData()
4651

    
4652
  def _ComputeClusterData(self):
4653
    """Compute the generic allocator input data.
4654

4655
    This is the data that is independent of the actual operation.
4656

4657
    """
4658
    cfg = self.cfg
4659
    # cluster data
4660
    data = {
4661
      "version": 1,
4662
      "cluster_name": self.sstore.GetClusterName(),
4663
      "cluster_tags": list(cfg.GetClusterInfo().GetTags()),
4664
      "hypervisor_type": self.sstore.GetHypervisorType(),
4665
      # we don't have job IDs
4666
      }
4667

    
4668
    i_list = [cfg.GetInstanceInfo(iname) for iname in cfg.GetInstanceList()]
4669

    
4670
    # node data
4671
    node_results = {}
4672
    node_list = cfg.GetNodeList()
4673
    node_data = rpc.call_node_info(node_list, cfg.GetVGName())
4674
    for nname in node_list:
4675
      ninfo = cfg.GetNodeInfo(nname)
4676
      if nname not in node_data or not isinstance(node_data[nname], dict):
4677
        raise errors.OpExecError("Can't get data for node %s" % nname)
4678
      remote_info = node_data[nname]
4679
      for attr in ['memory_total', 'memory_free', 'memory_dom0',
4680
                   'vg_size', 'vg_free', 'cpu_total']:
4681
        if attr not in remote_info:
4682
          raise errors.OpExecError("Node '%s' didn't return attribute '%s'" %
4683
                                   (nname, attr))
4684
        try:
4685
          remote_info[attr] = int(remote_info[attr])
4686
        except ValueError, err:
4687
          raise errors.OpExecError("Node '%s' returned invalid value for '%s':"
4688
                                   " %s" % (nname, attr, str(err)))
4689
      # compute memory used by primary instances
4690
      i_p_mem = i_p_up_mem = 0
4691
      for iinfo in i_list:
4692
        if iinfo.primary_node == nname:
4693
          i_p_mem += iinfo.memory
4694
          if iinfo.status == "up":
4695
            i_p_up_mem += iinfo.memory
4696

    
4697
      # compute memory used by instances
4698
      pnr = {
4699
        "tags": list(ninfo.GetTags()),
4700
        "total_memory": remote_info['memory_total'],
4701
        "reserved_memory": remote_info['memory_dom0'],
4702
        "free_memory": remote_info['memory_free'],
4703
        "i_pri_memory": i_p_mem,
4704
        "i_pri_up_memory": i_p_up_mem,
4705
        "total_disk": remote_info['vg_size'],
4706
        "free_disk": remote_info['vg_free'],
4707
        "primary_ip": ninfo.primary_ip,
4708
        "secondary_ip": ninfo.secondary_ip,
4709
        "total_cpus": remote_info['cpu_total'],
4710
        }
4711
      node_results[nname] = pnr
4712
    data["nodes"] = node_results
4713

    
4714
    # instance data
4715
    instance_data = {}
4716
    for iinfo in i_list:
4717
      nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
4718
                  for n in iinfo.nics]
4719
      pir = {
4720
        "tags": list(iinfo.GetTags()),
4721
        "should_run": iinfo.status == "up",
4722
        "vcpus": iinfo.vcpus,
4723
        "memory": iinfo.memory,
4724
        "os": iinfo.os,
4725
        "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
4726
        "nics": nic_data,
4727
        "disks": [{"size": dsk.size, "mode": "w"} for dsk in iinfo.disks],
4728
        "disk_template": iinfo.disk_template,
4729
        }
4730
      instance_data[iinfo.name] = pir
4731

    
4732
    data["instances"] = instance_data
4733

    
4734
    self.in_data = data
4735

    
4736
  def _AddNewInstance(self):
4737
    """Add new instance data to allocator structure.
4738

4739
    This in combination with _AllocatorGetClusterData will create the
4740
    correct structure needed as input for the allocator.
4741

4742
    The checks for the completeness of the opcode must have already been
4743
    done.
4744

4745
    """
4746
    data = self.in_data
4747
    if len(self.disks) != 2:
4748
      raise errors.OpExecError("Only two-disk configurations supported")
4749

    
4750
    disk_space = _ComputeDiskSize(self.disk_template,
4751
                                  self.disks[0]["size"], self.disks[1]["size"])
4752

    
4753
    if self.disk_template in constants.DTS_NET_MIRROR:
4754
      self.required_nodes = 2
4755
    else:
4756
      self.required_nodes = 1
4757
    request = {
4758
      "type": "allocate",
4759
      "name": self.name,
4760
      "disk_template": self.disk_template,
4761
      "tags": self.tags,
4762
      "os": self.os,
4763
      "vcpus": self.vcpus,
4764
      "memory": self.mem_size,
4765
      "disks": self.disks,
4766
      "disk_space_total": disk_space,
4767
      "nics": self.nics,
4768
      "required_nodes": self.required_nodes,
4769
      }
4770
    data["request"] = request
4771

    
4772
  def _AddRelocateInstance(self):
4773
    """Add relocate instance data to allocator structure.
4774

4775
    This in combination with _IAllocatorGetClusterData will create the
4776
    correct structure needed as input for the allocator.
4777

4778
    The checks for the completeness of the opcode must have already been
4779
    done.
4780

4781
    """
4782
    instance = self.cfg.GetInstanceInfo(self.name)
4783
    if instance is None:
4784
      raise errors.ProgrammerError("Unknown instance '%s' passed to"
4785
                                   " IAllocator" % self.name)
4786

    
4787
    if instance.disk_template not in constants.DTS_NET_MIRROR:
4788
      raise errors.OpPrereqError("Can't relocate non-mirrored instances")
4789

    
4790
    if len(instance.secondary_nodes) != 1:
4791
      raise errors.OpPrereqError("Instance has not exactly one secondary node")
4792

    
4793
    self.required_nodes = 1
4794

    
4795
    disk_space = _ComputeDiskSize(instance.disk_template,
4796
                                  instance.disks[0].size,
4797
                                  instance.disks[1].size)
4798

    
4799
    request = {
4800
      "type": "relocate",
4801
      "name": self.name,
4802
      "disk_space_total": disk_space,
4803
      "required_nodes": self.required_nodes,
4804
      "relocate_from": self.relocate_from,
4805
      }
4806
    self.in_data["request"] = request
4807

    
4808
  def _BuildInputData(self):
4809
    """Build input data structures.
4810

4811
    """
4812
    self._ComputeClusterData()
4813

    
4814
    if self.mode == constants.IALLOCATOR_MODE_ALLOC:
4815
      self._AddNewInstance()
4816
    else:
4817
      self._AddRelocateInstance()
4818

    
4819
    self.in_text = serializer.Dump(self.in_data)
4820

    
4821
  def Run(self, name, validate=True, call_fn=rpc.call_iallocator_runner):
4822
    """Run an instance allocator and return the results.
4823

4824
    """
4825
    data = self.in_text
4826

    
4827
    result = call_fn(self.sstore.GetMasterNode(), name, self.in_text)
4828

    
4829
    if not isinstance(result, tuple) or len(result) != 4:
4830
      raise errors.OpExecError("Invalid result from master iallocator runner")
4831

    
4832
    rcode, stdout, stderr, fail = result
4833

    
4834
    if rcode == constants.IARUN_NOTFOUND:
4835
      raise errors.OpExecError("Can't find allocator '%s'" % name)
4836
    elif rcode == constants.IARUN_FAILURE:
4837
        raise errors.OpExecError("Instance allocator call failed: %s,"
4838
                                 " output: %s" %
4839
                                 (fail, stdout+stderr))
4840
    self.out_text = stdout
4841
    if validate:
4842
      self._ValidateResult()
4843

    
4844
  def _ValidateResult(self):
4845
    """Process the allocator results.
4846

4847
    This will process and if successful save the result in
4848
    self.out_data and the other parameters.
4849

4850
    """
4851
    try:
4852
      rdict = serializer.Load(self.out_text)
4853
    except Exception, err:
4854
      raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
4855

    
4856
    if not isinstance(rdict, dict):
4857
      raise errors.OpExecError("Can't parse iallocator results: not a dict")
4858

    
4859
    for key in "success", "info", "nodes":
4860
      if key not in rdict:
4861
        raise errors.OpExecError("Can't parse iallocator results:"
4862
                                 " missing key '%s'" % key)
4863
      setattr(self, key, rdict[key])
4864

    
4865
    if not isinstance(rdict["nodes"], list):
4866
      raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
4867
                               " is not a list")
4868
    self.out_data = rdict
4869

    
4870

    
4871
class LUTestAllocator(NoHooksLU):
4872
  """Run allocator tests.
4873

4874
  This LU runs the allocator tests
4875

4876
  """
4877
  _OP_REQP = ["direction", "mode", "name"]
4878

    
4879
  def CheckPrereq(self):
4880
    """Check prerequisites.
4881

4882
    This checks the opcode parameters depending on the director and mode test.
4883

4884
    """
4885
    if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
4886
      for attr in ["name", "mem_size", "disks", "disk_template",
4887
                   "os", "tags", "nics", "vcpus"]:
4888
        if not hasattr(self.op, attr):
4889
          raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
4890
                                     attr)
4891
      iname = self.cfg.ExpandInstanceName(self.op.name)
4892
      if iname is not None:
4893
        raise errors.OpPrereqError("Instance '%s' already in the cluster" %
4894
                                   iname)
4895
      if not isinstance(self.op.nics, list):
4896
        raise errors.OpPrereqError("Invalid parameter 'nics'")
4897
      for row in self.op.nics:
4898
        if (not isinstance(row, dict) or
4899
            "mac" not in row or
4900
            "ip" not in row or
4901
            "bridge" not in row):
4902
          raise errors.OpPrereqError("Invalid contents of the"
4903
                                     " 'nics' parameter")
4904
      if not isinstance(self.op.disks, list):
4905
        raise errors.OpPrereqError("Invalid parameter 'disks'")
4906
      if len(self.op.disks) != 2:
4907
        raise errors.OpPrereqError("Only two-disk configurations supported")
4908
      for row in self.op.disks:
4909
        if (not isinstance(row, dict) or
4910
            "size" not in row or
4911
            not isinstance(row["size"], int) or
4912
            "mode" not in row or
4913
            row["mode"] not in ['r', 'w']):
4914
          raise errors.OpPrereqError("Invalid contents of the"
4915
                                     " 'disks' parameter")
4916
    elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
4917
      if not hasattr(self.op, "name"):
4918
        raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
4919
      fname = self.cfg.ExpandInstanceName(self.op.name)
4920
      if fname is None:
4921
        raise errors.OpPrereqError("Instance '%s' not found for relocation" %
4922
                                   self.op.name)
4923
      self.op.name = fname
4924
      self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
4925
    else:
4926
      raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
4927
                                 self.op.mode)
4928

    
4929
    if self.op.direction == constants.IALLOCATOR_DIR_OUT:
4930
      if not hasattr(self.op, "allocator") or self.op.allocator is None:
4931
        raise errors.OpPrereqError("Missing allocator name")
4932
    elif self.op.direction != constants.IALLOCATOR_DIR_IN:
4933
      raise errors.OpPrereqError("Wrong allocator test '%s'" %
4934
                                 self.op.direction)
4935

    
4936
  def Exec(self, feedback_fn):
4937
    """Run the allocator test.
4938

4939
    """
4940
    if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
4941
      ial = IAllocator(self.cfg, self.sstore,
4942
                       mode=self.op.mode,
4943
                       name=self.op.name,
4944
                       mem_size=self.op.mem_size,
4945
                       disks=self.op.disks,
4946
                       disk_template=self.op.disk_template,
4947
                       os=self.op.os,
4948
                       tags=self.op.tags,
4949
                       nics=self.op.nics,
4950
                       vcpus=self.op.vcpus,
4951
                       )
4952
    else:
4953
      ial = IAllocator(self.cfg, self.sstore,
4954
                       mode=self.op.mode,
4955
                       name=self.op.name,
4956
                       relocate_from=list(self.relocate_from),
4957
                       )
4958

    
4959
    if self.op.direction == constants.IALLOCATOR_DIR_IN:
4960
      result = ial.in_text
4961
    else:
4962
      ial.Run(self.op.allocator, validate=False)
4963
      result = ial.out_text
4964
    return result