Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ 0b097284

History | View | Annotate | Download (167.7 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0613,W0201
25

    
26
import os
27
import os.path
28
import sha
29
import time
30
import tempfile
31
import re
32
import platform
33

    
34
from ganeti import rpc
35
from ganeti import ssh
36
from ganeti import logger
37
from ganeti import utils
38
from ganeti import errors
39
from ganeti import hypervisor
40
from ganeti import config
41
from ganeti import constants
42
from ganeti import objects
43
from ganeti import opcodes
44
from ganeti import ssconf
45
from ganeti import serializer
46

    
47

    
48
class LogicalUnit(object):
49
  """Logical Unit base class.
50

51
  Subclasses must follow these rules:
52
    - implement CheckPrereq which also fills in the opcode instance
53
      with all the fields (even if as None)
54
    - implement Exec
55
    - implement BuildHooksEnv
56
    - redefine HPATH and HTYPE
57
    - optionally redefine their run requirements:
58
        REQ_MASTER: the LU needs to run on the master node
59
        REQ_WSSTORE: the LU needs a writable SimpleStore
60
        REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
61

62
  Note that all commands require root permissions.
63

64
  """
65
  HPATH = None
66
  HTYPE = None
67
  _OP_REQP = []
68
  REQ_MASTER = True
69
  REQ_WSSTORE = False
70
  REQ_BGL = True
71

    
72
  def __init__(self, processor, op, cfg, sstore):
73
    """Constructor for LogicalUnit.
74

75
    This needs to be overriden in derived classes in order to check op
76
    validity.
77

78
    """
79
    self.proc = processor
80
    self.op = op
81
    self.cfg = cfg
82
    self.sstore = sstore
83
    self.__ssh = None
84

    
85
    for attr_name in self._OP_REQP:
86
      attr_val = getattr(op, attr_name, None)
87
      if attr_val is None:
88
        raise errors.OpPrereqError("Required parameter '%s' missing" %
89
                                   attr_name)
90

    
91
    if not cfg.IsCluster():
92
      raise errors.OpPrereqError("Cluster not initialized yet,"
93
                                 " use 'gnt-cluster init' first.")
94
    if self.REQ_MASTER:
95
      master = sstore.GetMasterNode()
96
      if master != utils.HostInfo().name:
97
        raise errors.OpPrereqError("Commands must be run on the master"
98
                                   " node %s" % master)
99

    
100
  def __GetSSH(self):
101
    """Returns the SshRunner object
102

103
    """
104
    if not self.__ssh:
105
      self.__ssh = ssh.SshRunner(self.sstore)
106
    return self.__ssh
107

    
108
  ssh = property(fget=__GetSSH)
109

    
110
  def CheckPrereq(self):
111
    """Check prerequisites for this LU.
112

113
    This method should check that the prerequisites for the execution
114
    of this LU are fulfilled. It can do internode communication, but
115
    it should be idempotent - no cluster or system changes are
116
    allowed.
117

118
    The method should raise errors.OpPrereqError in case something is
119
    not fulfilled. Its return value is ignored.
120

121
    This method should also update all the parameters of the opcode to
122
    their canonical form; e.g. a short node name must be fully
123
    expanded after this method has successfully completed (so that
124
    hooks, logging, etc. work correctly).
125

126
    """
127
    raise NotImplementedError
128

    
129
  def Exec(self, feedback_fn):
130
    """Execute the LU.
131

132
    This method should implement the actual work. It should raise
133
    errors.OpExecError for failures that are somewhat dealt with in
134
    code, or expected.
135

136
    """
137
    raise NotImplementedError
138

    
139
  def BuildHooksEnv(self):
140
    """Build hooks environment for this LU.
141

142
    This method should return a three-node tuple consisting of: a dict
143
    containing the environment that will be used for running the
144
    specific hook for this LU, a list of node names on which the hook
145
    should run before the execution, and a list of node names on which
146
    the hook should run after the execution.
147

148
    The keys of the dict must not have 'GANETI_' prefixed as this will
149
    be handled in the hooks runner. Also note additional keys will be
150
    added by the hooks runner. If the LU doesn't define any
151
    environment, an empty dict (and not None) should be returned.
152

153
    No nodes should be returned as an empty list (and not None).
154

155
    Note that if the HPATH for a LU class is None, this function will
156
    not be called.
157

158
    """
159
    raise NotImplementedError
160

    
161
  def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
162
    """Notify the LU about the results of its hooks.
163

164
    This method is called every time a hooks phase is executed, and notifies
165
    the Logical Unit about the hooks' result. The LU can then use it to alter
166
    its result based on the hooks.  By default the method does nothing and the
167
    previous result is passed back unchanged but any LU can define it if it
168
    wants to use the local cluster hook-scripts somehow.
169

170
    Args:
171
      phase: the hooks phase that has just been run
172
      hooks_results: the results of the multi-node hooks rpc call
173
      feedback_fn: function to send feedback back to the caller
174
      lu_result: the previous result this LU had, or None in the PRE phase.
175

176
    """
177
    return lu_result
178

    
179

    
180
class NoHooksLU(LogicalUnit):
181
  """Simple LU which runs no hooks.
182

183
  This LU is intended as a parent for other LogicalUnits which will
184
  run no hooks, in order to reduce duplicate code.
185

186
  """
187
  HPATH = None
188
  HTYPE = None
189

    
190

    
191
def _GetWantedNodes(lu, nodes):
192
  """Returns list of checked and expanded node names.
193

194
  Args:
195
    nodes: List of nodes (strings) or None for all
196

197
  """
198
  if not isinstance(nodes, list):
199
    raise errors.OpPrereqError("Invalid argument type 'nodes'")
200

    
201
  if nodes:
202
    wanted = []
203

    
204
    for name in nodes:
205
      node = lu.cfg.ExpandNodeName(name)
206
      if node is None:
207
        raise errors.OpPrereqError("No such node name '%s'" % name)
208
      wanted.append(node)
209

    
210
  else:
211
    wanted = lu.cfg.GetNodeList()
212
  return utils.NiceSort(wanted)
213

    
214

    
215
def _GetWantedInstances(lu, instances):
216
  """Returns list of checked and expanded instance names.
217

218
  Args:
219
    instances: List of instances (strings) or None for all
220

221
  """
222
  if not isinstance(instances, list):
223
    raise errors.OpPrereqError("Invalid argument type 'instances'")
224

    
225
  if instances:
226
    wanted = []
227

    
228
    for name in instances:
229
      instance = lu.cfg.ExpandInstanceName(name)
230
      if instance is None:
231
        raise errors.OpPrereqError("No such instance name '%s'" % name)
232
      wanted.append(instance)
233

    
234
  else:
235
    wanted = lu.cfg.GetInstanceList()
236
  return utils.NiceSort(wanted)
237

    
238

    
239
def _CheckOutputFields(static, dynamic, selected):
240
  """Checks whether all selected fields are valid.
241

242
  Args:
243
    static: Static fields
244
    dynamic: Dynamic fields
245

246
  """
247
  static_fields = frozenset(static)
248
  dynamic_fields = frozenset(dynamic)
249

    
250
  all_fields = static_fields | dynamic_fields
251

    
252
  if not all_fields.issuperset(selected):
253
    raise errors.OpPrereqError("Unknown output fields selected: %s"
254
                               % ",".join(frozenset(selected).
255
                                          difference(all_fields)))
256

    
257

    
258
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
259
                          memory, vcpus, nics):
260
  """Builds instance related env variables for hooks from single variables.
261

262
  Args:
263
    secondary_nodes: List of secondary nodes as strings
264
  """
265
  env = {
266
    "OP_TARGET": name,
267
    "INSTANCE_NAME": name,
268
    "INSTANCE_PRIMARY": primary_node,
269
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
270
    "INSTANCE_OS_TYPE": os_type,
271
    "INSTANCE_STATUS": status,
272
    "INSTANCE_MEMORY": memory,
273
    "INSTANCE_VCPUS": vcpus,
274
  }
275

    
276
  if nics:
277
    nic_count = len(nics)
278
    for idx, (ip, bridge, mac) in enumerate(nics):
279
      if ip is None:
280
        ip = ""
281
      env["INSTANCE_NIC%d_IP" % idx] = ip
282
      env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
283
      env["INSTANCE_NIC%d_HWADDR" % idx] = mac
284
  else:
285
    nic_count = 0
286

    
287
  env["INSTANCE_NIC_COUNT"] = nic_count
288

    
289
  return env
290

    
291

    
292
def _BuildInstanceHookEnvByObject(instance, override=None):
293
  """Builds instance related env variables for hooks from an object.
294

295
  Args:
296
    instance: objects.Instance object of instance
297
    override: dict of values to override
298
  """
299
  args = {
300
    'name': instance.name,
301
    'primary_node': instance.primary_node,
302
    'secondary_nodes': instance.secondary_nodes,
303
    'os_type': instance.os,
304
    'status': instance.os,
305
    'memory': instance.memory,
306
    'vcpus': instance.vcpus,
307
    'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
308
  }
309
  if override:
310
    args.update(override)
311
  return _BuildInstanceHookEnv(**args)
312

    
313

    
314
def _CheckInstanceBridgesExist(instance):
315
  """Check that the brigdes needed by an instance exist.
316

317
  """
318
  # check bridges existance
319
  brlist = [nic.bridge for nic in instance.nics]
320
  if not rpc.call_bridges_exist(instance.primary_node, brlist):
321
    raise errors.OpPrereqError("one or more target bridges %s does not"
322
                               " exist on destination node '%s'" %
323
                               (brlist, instance.primary_node))
324

    
325

    
326
class LUDestroyCluster(NoHooksLU):
327
  """Logical unit for destroying the cluster.
328

329
  """
330
  _OP_REQP = []
331

    
332
  def CheckPrereq(self):
333
    """Check prerequisites.
334

335
    This checks whether the cluster is empty.
336

337
    Any errors are signalled by raising errors.OpPrereqError.
338

339
    """
340
    master = self.sstore.GetMasterNode()
341

    
342
    nodelist = self.cfg.GetNodeList()
343
    if len(nodelist) != 1 or nodelist[0] != master:
344
      raise errors.OpPrereqError("There are still %d node(s) in"
345
                                 " this cluster." % (len(nodelist) - 1))
346
    instancelist = self.cfg.GetInstanceList()
347
    if instancelist:
348
      raise errors.OpPrereqError("There are still %d instance(s) in"
349
                                 " this cluster." % len(instancelist))
350

    
351
  def Exec(self, feedback_fn):
352
    """Destroys the cluster.
353

354
    """
355
    master = self.sstore.GetMasterNode()
356
    if not rpc.call_node_stop_master(master):
357
      raise errors.OpExecError("Could not disable the master role")
358
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
359
    utils.CreateBackup(priv_key)
360
    utils.CreateBackup(pub_key)
361
    rpc.call_node_leave_cluster(master)
362

    
363

    
364
class LUVerifyCluster(LogicalUnit):
365
  """Verifies the cluster status.
366

367
  """
368
  HPATH = "cluster-verify"
369
  HTYPE = constants.HTYPE_CLUSTER
370
  _OP_REQP = ["skip_checks"]
371

    
372
  def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
373
                  remote_version, feedback_fn):
374
    """Run multiple tests against a node.
375

376
    Test list:
377
      - compares ganeti version
378
      - checks vg existance and size > 20G
379
      - checks config file checksum
380
      - checks ssh to other nodes
381

382
    Args:
383
      node: name of the node to check
384
      file_list: required list of files
385
      local_cksum: dictionary of local files and their checksums
386

387
    """
388
    # compares ganeti version
389
    local_version = constants.PROTOCOL_VERSION
390
    if not remote_version:
391
      feedback_fn("  - ERROR: connection to %s failed" % (node))
392
      return True
393

    
394
    if local_version != remote_version:
395
      feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
396
                      (local_version, node, remote_version))
397
      return True
398

    
399
    # checks vg existance and size > 20G
400

    
401
    bad = False
402
    if not vglist:
403
      feedback_fn("  - ERROR: unable to check volume groups on node %s." %
404
                      (node,))
405
      bad = True
406
    else:
407
      vgstatus = utils.CheckVolumeGroupSize(vglist, self.cfg.GetVGName(),
408
                                            constants.MIN_VG_SIZE)
409
      if vgstatus:
410
        feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
411
        bad = True
412

    
413
    # checks config file checksum
414
    # checks ssh to any
415

    
416
    if 'filelist' not in node_result:
417
      bad = True
418
      feedback_fn("  - ERROR: node hasn't returned file checksum data")
419
    else:
420
      remote_cksum = node_result['filelist']
421
      for file_name in file_list:
422
        if file_name not in remote_cksum:
423
          bad = True
424
          feedback_fn("  - ERROR: file '%s' missing" % file_name)
425
        elif remote_cksum[file_name] != local_cksum[file_name]:
426
          bad = True
427
          feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
428

    
429
    if 'nodelist' not in node_result:
430
      bad = True
431
      feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
432
    else:
433
      if node_result['nodelist']:
434
        bad = True
435
        for node in node_result['nodelist']:
436
          feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
437
                          (node, node_result['nodelist'][node]))
438
    if 'node-net-test' not in node_result:
439
      bad = True
440
      feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
441
    else:
442
      if node_result['node-net-test']:
443
        bad = True
444
        nlist = utils.NiceSort(node_result['node-net-test'].keys())
445
        for node in nlist:
446
          feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
447
                          (node, node_result['node-net-test'][node]))
448

    
449
    hyp_result = node_result.get('hypervisor', None)
450
    if hyp_result is not None:
451
      feedback_fn("  - ERROR: hypervisor verify failure: '%s'" % hyp_result)
452
    return bad
453

    
454
  def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
455
                      node_instance, feedback_fn):
456
    """Verify an instance.
457

458
    This function checks to see if the required block devices are
459
    available on the instance's node.
460

461
    """
462
    bad = False
463

    
464
    node_current = instanceconfig.primary_node
465

    
466
    node_vol_should = {}
467
    instanceconfig.MapLVsByNode(node_vol_should)
468

    
469
    for node in node_vol_should:
470
      for volume in node_vol_should[node]:
471
        if node not in node_vol_is or volume not in node_vol_is[node]:
472
          feedback_fn("  - ERROR: volume %s missing on node %s" %
473
                          (volume, node))
474
          bad = True
475

    
476
    if not instanceconfig.status == 'down':
477
      if (node_current not in node_instance or
478
          not instance in node_instance[node_current]):
479
        feedback_fn("  - ERROR: instance %s not running on node %s" %
480
                        (instance, node_current))
481
        bad = True
482

    
483
    for node in node_instance:
484
      if (not node == node_current):
485
        if instance in node_instance[node]:
486
          feedback_fn("  - ERROR: instance %s should not run on node %s" %
487
                          (instance, node))
488
          bad = True
489

    
490
    return bad
491

    
492
  def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
493
    """Verify if there are any unknown volumes in the cluster.
494

495
    The .os, .swap and backup volumes are ignored. All other volumes are
496
    reported as unknown.
497

498
    """
499
    bad = False
500

    
501
    for node in node_vol_is:
502
      for volume in node_vol_is[node]:
503
        if node not in node_vol_should or volume not in node_vol_should[node]:
504
          feedback_fn("  - ERROR: volume %s on node %s should not exist" %
505
                      (volume, node))
506
          bad = True
507
    return bad
508

    
509
  def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
510
    """Verify the list of running instances.
511

512
    This checks what instances are running but unknown to the cluster.
513

514
    """
515
    bad = False
516
    for node in node_instance:
517
      for runninginstance in node_instance[node]:
518
        if runninginstance not in instancelist:
519
          feedback_fn("  - ERROR: instance %s on node %s should not exist" %
520
                          (runninginstance, node))
521
          bad = True
522
    return bad
523

    
524
  def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
525
    """Verify N+1 Memory Resilience.
526

527
    Check that if one single node dies we can still start all the instances it
528
    was primary for.
529

530
    """
531
    bad = False
532

    
533
    for node, nodeinfo in node_info.iteritems():
534
      # This code checks that every node which is now listed as secondary has
535
      # enough memory to host all instances it is supposed to should a single
536
      # other node in the cluster fail.
537
      # FIXME: not ready for failover to an arbitrary node
538
      # FIXME: does not support file-backed instances
539
      # WARNING: we currently take into account down instances as well as up
540
      # ones, considering that even if they're down someone might want to start
541
      # them even in the event of a node failure.
542
      for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
543
        needed_mem = 0
544
        for instance in instances:
545
          needed_mem += instance_cfg[instance].memory
546
        if nodeinfo['mfree'] < needed_mem:
547
          feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
548
                      " failovers should node %s fail" % (node, prinode))
549
          bad = True
550
    return bad
551

    
552
  def CheckPrereq(self):
553
    """Check prerequisites.
554

555
    Transform the list of checks we're going to skip into a set and check that
556
    all its members are valid.
557

558
    """
559
    self.skip_set = frozenset(self.op.skip_checks)
560
    if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
561
      raise errors.OpPrereqError("Invalid checks to be skipped specified")
562

    
563
  def BuildHooksEnv(self):
564
    """Build hooks env.
565

566
    Cluster-Verify hooks just rone in the post phase and their failure makes
567
    the output be logged in the verify output and the verification to fail.
568

569
    """
570
    all_nodes = self.cfg.GetNodeList()
571
    # TODO: populate the environment with useful information for verify hooks
572
    env = {}
573
    return env, [], all_nodes
574

    
575
  def Exec(self, feedback_fn):
576
    """Verify integrity of cluster, performing various test on nodes.
577

578
    """
579
    bad = False
580
    feedback_fn("* Verifying global settings")
581
    for msg in self.cfg.VerifyConfig():
582
      feedback_fn("  - ERROR: %s" % msg)
583

    
584
    vg_name = self.cfg.GetVGName()
585
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
586
    nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
587
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
588
    i_non_redundant = [] # Non redundant instances
589
    node_volume = {}
590
    node_instance = {}
591
    node_info = {}
592
    instance_cfg = {}
593

    
594
    # FIXME: verify OS list
595
    # do local checksums
596
    file_names = list(self.sstore.GetFileList())
597
    file_names.append(constants.SSL_CERT_FILE)
598
    file_names.append(constants.CLUSTER_CONF_FILE)
599
    local_checksums = utils.FingerprintFiles(file_names)
600

    
601
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
602
    all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
603
    all_instanceinfo = rpc.call_instance_list(nodelist)
604
    all_vglist = rpc.call_vg_list(nodelist)
605
    node_verify_param = {
606
      'filelist': file_names,
607
      'nodelist': nodelist,
608
      'hypervisor': None,
609
      'node-net-test': [(node.name, node.primary_ip, node.secondary_ip)
610
                        for node in nodeinfo]
611
      }
612
    all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
613
    all_rversion = rpc.call_version(nodelist)
614
    all_ninfo = rpc.call_node_info(nodelist, self.cfg.GetVGName())
615

    
616
    for node in nodelist:
617
      feedback_fn("* Verifying node %s" % node)
618
      result = self._VerifyNode(node, file_names, local_checksums,
619
                                all_vglist[node], all_nvinfo[node],
620
                                all_rversion[node], feedback_fn)
621
      bad = bad or result
622

    
623
      # node_volume
624
      volumeinfo = all_volumeinfo[node]
625

    
626
      if isinstance(volumeinfo, basestring):
627
        feedback_fn("  - ERROR: LVM problem on node %s: %s" %
628
                    (node, volumeinfo[-400:].encode('string_escape')))
629
        bad = True
630
        node_volume[node] = {}
631
      elif not isinstance(volumeinfo, dict):
632
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
633
        bad = True
634
        continue
635
      else:
636
        node_volume[node] = volumeinfo
637

    
638
      # node_instance
639
      nodeinstance = all_instanceinfo[node]
640
      if type(nodeinstance) != list:
641
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
642
        bad = True
643
        continue
644

    
645
      node_instance[node] = nodeinstance
646

    
647
      # node_info
648
      nodeinfo = all_ninfo[node]
649
      if not isinstance(nodeinfo, dict):
650
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
651
        bad = True
652
        continue
653

    
654
      try:
655
        node_info[node] = {
656
          "mfree": int(nodeinfo['memory_free']),
657
          "dfree": int(nodeinfo['vg_free']),
658
          "pinst": [],
659
          "sinst": [],
660
          # dictionary holding all instances this node is secondary for,
661
          # grouped by their primary node. Each key is a cluster node, and each
662
          # value is a list of instances which have the key as primary and the
663
          # current node as secondary.  this is handy to calculate N+1 memory
664
          # availability if you can only failover from a primary to its
665
          # secondary.
666
          "sinst-by-pnode": {},
667
        }
668
      except ValueError:
669
        feedback_fn("  - ERROR: invalid value returned from node %s" % (node,))
670
        bad = True
671
        continue
672

    
673
    node_vol_should = {}
674

    
675
    for instance in instancelist:
676
      feedback_fn("* Verifying instance %s" % instance)
677
      inst_config = self.cfg.GetInstanceInfo(instance)
678
      result =  self._VerifyInstance(instance, inst_config, node_volume,
679
                                     node_instance, feedback_fn)
680
      bad = bad or result
681

    
682
      inst_config.MapLVsByNode(node_vol_should)
683

    
684
      instance_cfg[instance] = inst_config
685

    
686
      pnode = inst_config.primary_node
687
      if pnode in node_info:
688
        node_info[pnode]['pinst'].append(instance)
689
      else:
690
        feedback_fn("  - ERROR: instance %s, connection to primary node"
691
                    " %s failed" % (instance, pnode))
692
        bad = True
693

    
694
      # If the instance is non-redundant we cannot survive losing its primary
695
      # node, so we are not N+1 compliant. On the other hand we have no disk
696
      # templates with more than one secondary so that situation is not well
697
      # supported either.
698
      # FIXME: does not support file-backed instances
699
      if len(inst_config.secondary_nodes) == 0:
700
        i_non_redundant.append(instance)
701
      elif len(inst_config.secondary_nodes) > 1:
702
        feedback_fn("  - WARNING: multiple secondaries for instance %s"
703
                    % instance)
704

    
705
      for snode in inst_config.secondary_nodes:
706
        if snode in node_info:
707
          node_info[snode]['sinst'].append(instance)
708
          if pnode not in node_info[snode]['sinst-by-pnode']:
709
            node_info[snode]['sinst-by-pnode'][pnode] = []
710
          node_info[snode]['sinst-by-pnode'][pnode].append(instance)
711
        else:
712
          feedback_fn("  - ERROR: instance %s, connection to secondary node"
713
                      " %s failed" % (instance, snode))
714

    
715
    feedback_fn("* Verifying orphan volumes")
716
    result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
717
                                       feedback_fn)
718
    bad = bad or result
719

    
720
    feedback_fn("* Verifying remaining instances")
721
    result = self._VerifyOrphanInstances(instancelist, node_instance,
722
                                         feedback_fn)
723
    bad = bad or result
724

    
725
    if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
726
      feedback_fn("* Verifying N+1 Memory redundancy")
727
      result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
728
      bad = bad or result
729

    
730
    feedback_fn("* Other Notes")
731
    if i_non_redundant:
732
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
733
                  % len(i_non_redundant))
734

    
735
    return int(bad)
736

    
737
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
738
    """Analize the post-hooks' result, handle it, and send some
739
    nicely-formatted feedback back to the user.
740

741
    Args:
742
      phase: the hooks phase that has just been run
743
      hooks_results: the results of the multi-node hooks rpc call
744
      feedback_fn: function to send feedback back to the caller
745
      lu_result: previous Exec result
746

747
    """
748
    # We only really run POST phase hooks, and are only interested in their results
749
    if phase == constants.HOOKS_PHASE_POST:
750
      # Used to change hooks' output to proper indentation
751
      indent_re = re.compile('^', re.M)
752
      feedback_fn("* Hooks Results")
753
      if not hooks_results:
754
        feedback_fn("  - ERROR: general communication failure")
755
        lu_result = 1
756
      else:
757
        for node_name in hooks_results:
758
          show_node_header = True
759
          res = hooks_results[node_name]
760
          if res is False or not isinstance(res, list):
761
            feedback_fn("    Communication failure")
762
            lu_result = 1
763
            continue
764
          for script, hkr, output in res:
765
            if hkr == constants.HKR_FAIL:
766
              # The node header is only shown once, if there are
767
              # failing hooks on that node
768
              if show_node_header:
769
                feedback_fn("  Node %s:" % node_name)
770
                show_node_header = False
771
              feedback_fn("    ERROR: Script %s failed, output:" % script)
772
              output = indent_re.sub('      ', output)
773
              feedback_fn("%s" % output)
774
              lu_result = 1
775

    
776
      return lu_result
777

    
778

    
779
class LUVerifyDisks(NoHooksLU):
780
  """Verifies the cluster disks status.
781

782
  """
783
  _OP_REQP = []
784

    
785
  def CheckPrereq(self):
786
    """Check prerequisites.
787

788
    This has no prerequisites.
789

790
    """
791
    pass
792

    
793
  def Exec(self, feedback_fn):
794
    """Verify integrity of cluster disks.
795

796
    """
797
    result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
798

    
799
    vg_name = self.cfg.GetVGName()
800
    nodes = utils.NiceSort(self.cfg.GetNodeList())
801
    instances = [self.cfg.GetInstanceInfo(name)
802
                 for name in self.cfg.GetInstanceList()]
803

    
804
    nv_dict = {}
805
    for inst in instances:
806
      inst_lvs = {}
807
      if (inst.status != "up" or
808
          inst.disk_template not in constants.DTS_NET_MIRROR):
809
        continue
810
      inst.MapLVsByNode(inst_lvs)
811
      # transform { iname: {node: [vol,],},} to {(node, vol): iname}
812
      for node, vol_list in inst_lvs.iteritems():
813
        for vol in vol_list:
814
          nv_dict[(node, vol)] = inst
815

    
816
    if not nv_dict:
817
      return result
818

    
819
    node_lvs = rpc.call_volume_list(nodes, vg_name)
820

    
821
    to_act = set()
822
    for node in nodes:
823
      # node_volume
824
      lvs = node_lvs[node]
825

    
826
      if isinstance(lvs, basestring):
827
        logger.Info("error enumerating LVs on node %s: %s" % (node, lvs))
828
        res_nlvm[node] = lvs
829
      elif not isinstance(lvs, dict):
830
        logger.Info("connection to node %s failed or invalid data returned" %
831
                    (node,))
832
        res_nodes.append(node)
833
        continue
834

    
835
      for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
836
        inst = nv_dict.pop((node, lv_name), None)
837
        if (not lv_online and inst is not None
838
            and inst.name not in res_instances):
839
          res_instances.append(inst.name)
840

    
841
    # any leftover items in nv_dict are missing LVs, let's arrange the
842
    # data better
843
    for key, inst in nv_dict.iteritems():
844
      if inst.name not in res_missing:
845
        res_missing[inst.name] = []
846
      res_missing[inst.name].append(key)
847

    
848
    return result
849

    
850

    
851
class LURenameCluster(LogicalUnit):
852
  """Rename the cluster.
853

854
  """
855
  HPATH = "cluster-rename"
856
  HTYPE = constants.HTYPE_CLUSTER
857
  _OP_REQP = ["name"]
858
  REQ_WSSTORE = True
859

    
860
  def BuildHooksEnv(self):
861
    """Build hooks env.
862

863
    """
864
    env = {
865
      "OP_TARGET": self.sstore.GetClusterName(),
866
      "NEW_NAME": self.op.name,
867
      }
868
    mn = self.sstore.GetMasterNode()
869
    return env, [mn], [mn]
870

    
871
  def CheckPrereq(self):
872
    """Verify that the passed name is a valid one.
873

874
    """
875
    hostname = utils.HostInfo(self.op.name)
876

    
877
    new_name = hostname.name
878
    self.ip = new_ip = hostname.ip
879
    old_name = self.sstore.GetClusterName()
880
    old_ip = self.sstore.GetMasterIP()
881
    if new_name == old_name and new_ip == old_ip:
882
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
883
                                 " cluster has changed")
884
    if new_ip != old_ip:
885
      if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
886
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
887
                                   " reachable on the network. Aborting." %
888
                                   new_ip)
889

    
890
    self.op.name = new_name
891

    
892
  def Exec(self, feedback_fn):
893
    """Rename the cluster.
894

895
    """
896
    clustername = self.op.name
897
    ip = self.ip
898
    ss = self.sstore
899

    
900
    # shutdown the master IP
901
    master = ss.GetMasterNode()
902
    if not rpc.call_node_stop_master(master):
903
      raise errors.OpExecError("Could not disable the master role")
904

    
905
    try:
906
      # modify the sstore
907
      ss.SetKey(ss.SS_MASTER_IP, ip)
908
      ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
909

    
910
      # Distribute updated ss config to all nodes
911
      myself = self.cfg.GetNodeInfo(master)
912
      dist_nodes = self.cfg.GetNodeList()
913
      if myself.name in dist_nodes:
914
        dist_nodes.remove(myself.name)
915

    
916
      logger.Debug("Copying updated ssconf data to all nodes")
917
      for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
918
        fname = ss.KeyToFilename(keyname)
919
        result = rpc.call_upload_file(dist_nodes, fname)
920
        for to_node in dist_nodes:
921
          if not result[to_node]:
922
            logger.Error("copy of file %s to node %s failed" %
923
                         (fname, to_node))
924
    finally:
925
      if not rpc.call_node_start_master(master):
926
        logger.Error("Could not re-enable the master role on the master,"
927
                     " please restart manually.")
928

    
929

    
930
def _RecursiveCheckIfLVMBased(disk):
931
  """Check if the given disk or its children are lvm-based.
932

933
  Args:
934
    disk: ganeti.objects.Disk object
935

936
  Returns:
937
    boolean indicating whether a LD_LV dev_type was found or not
938

939
  """
940
  if disk.children:
941
    for chdisk in disk.children:
942
      if _RecursiveCheckIfLVMBased(chdisk):
943
        return True
944
  return disk.dev_type == constants.LD_LV
945

    
946

    
947
class LUSetClusterParams(LogicalUnit):
948
  """Change the parameters of the cluster.
949

950
  """
951
  HPATH = "cluster-modify"
952
  HTYPE = constants.HTYPE_CLUSTER
953
  _OP_REQP = []
954

    
955
  def BuildHooksEnv(self):
956
    """Build hooks env.
957

958
    """
959
    env = {
960
      "OP_TARGET": self.sstore.GetClusterName(),
961
      "NEW_VG_NAME": self.op.vg_name,
962
      }
963
    mn = self.sstore.GetMasterNode()
964
    return env, [mn], [mn]
965

    
966
  def CheckPrereq(self):
967
    """Check prerequisites.
968

969
    This checks whether the given params don't conflict and
970
    if the given volume group is valid.
971

972
    """
973
    if not self.op.vg_name:
974
      instances = [self.cfg.GetInstanceInfo(name)
975
                   for name in self.cfg.GetInstanceList()]
976
      for inst in instances:
977
        for disk in inst.disks:
978
          if _RecursiveCheckIfLVMBased(disk):
979
            raise errors.OpPrereqError("Cannot disable lvm storage while"
980
                                       " lvm-based instances exist")
981

    
982
    # if vg_name not None, checks given volume group on all nodes
983
    if self.op.vg_name:
984
      node_list = self.cfg.GetNodeList()
985
      vglist = rpc.call_vg_list(node_list)
986
      for node in node_list:
987
        vgstatus = utils.CheckVolumeGroupSize(vglist[node], self.op.vg_name,
988
                                              constants.MIN_VG_SIZE)
989
        if vgstatus:
990
          raise errors.OpPrereqError("Error on node '%s': %s" %
991
                                     (node, vgstatus))
992

    
993
  def Exec(self, feedback_fn):
994
    """Change the parameters of the cluster.
995

996
    """
997
    if self.op.vg_name != self.cfg.GetVGName():
998
      self.cfg.SetVGName(self.op.vg_name)
999
    else:
1000
      feedback_fn("Cluster LVM configuration already in desired"
1001
                  " state, not changing")
1002

    
1003

    
1004
def _WaitForSync(cfgw, instance, proc, oneshot=False, unlock=False):
1005
  """Sleep and poll for an instance's disk to sync.
1006

1007
  """
1008
  if not instance.disks:
1009
    return True
1010

    
1011
  if not oneshot:
1012
    proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1013

    
1014
  node = instance.primary_node
1015

    
1016
  for dev in instance.disks:
1017
    cfgw.SetDiskID(dev, node)
1018

    
1019
  retries = 0
1020
  while True:
1021
    max_time = 0
1022
    done = True
1023
    cumul_degraded = False
1024
    rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1025
    if not rstats:
1026
      proc.LogWarning("Can't get any data from node %s" % node)
1027
      retries += 1
1028
      if retries >= 10:
1029
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1030
                                 " aborting." % node)
1031
      time.sleep(6)
1032
      continue
1033
    retries = 0
1034
    for i in range(len(rstats)):
1035
      mstat = rstats[i]
1036
      if mstat is None:
1037
        proc.LogWarning("Can't compute data for node %s/%s" %
1038
                        (node, instance.disks[i].iv_name))
1039
        continue
1040
      # we ignore the ldisk parameter
1041
      perc_done, est_time, is_degraded, _ = mstat
1042
      cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1043
      if perc_done is not None:
1044
        done = False
1045
        if est_time is not None:
1046
          rem_time = "%d estimated seconds remaining" % est_time
1047
          max_time = est_time
1048
        else:
1049
          rem_time = "no time estimate"
1050
        proc.LogInfo("- device %s: %5.2f%% done, %s" %
1051
                     (instance.disks[i].iv_name, perc_done, rem_time))
1052
    if done or oneshot:
1053
      break
1054

    
1055
    if unlock:
1056
      #utils.Unlock('cmd')
1057
      pass
1058
    try:
1059
      time.sleep(min(60, max_time))
1060
    finally:
1061
      if unlock:
1062
        #utils.Lock('cmd')
1063
        pass
1064

    
1065
  if done:
1066
    proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1067
  return not cumul_degraded
1068

    
1069

    
1070
def _CheckDiskConsistency(cfgw, dev, node, on_primary, ldisk=False):
1071
  """Check that mirrors are not degraded.
1072

1073
  The ldisk parameter, if True, will change the test from the
1074
  is_degraded attribute (which represents overall non-ok status for
1075
  the device(s)) to the ldisk (representing the local storage status).
1076

1077
  """
1078
  cfgw.SetDiskID(dev, node)
1079
  if ldisk:
1080
    idx = 6
1081
  else:
1082
    idx = 5
1083

    
1084
  result = True
1085
  if on_primary or dev.AssembleOnSecondary():
1086
    rstats = rpc.call_blockdev_find(node, dev)
1087
    if not rstats:
1088
      logger.ToStderr("Node %s: Disk degraded, not found or node down" % node)
1089
      result = False
1090
    else:
1091
      result = result and (not rstats[idx])
1092
  if dev.children:
1093
    for child in dev.children:
1094
      result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1095

    
1096
  return result
1097

    
1098

    
1099
class LUDiagnoseOS(NoHooksLU):
1100
  """Logical unit for OS diagnose/query.
1101

1102
  """
1103
  _OP_REQP = ["output_fields", "names"]
1104

    
1105
  def CheckPrereq(self):
1106
    """Check prerequisites.
1107

1108
    This always succeeds, since this is a pure query LU.
1109

1110
    """
1111
    if self.op.names:
1112
      raise errors.OpPrereqError("Selective OS query not supported")
1113

    
1114
    self.dynamic_fields = frozenset(["name", "valid", "node_status"])
1115
    _CheckOutputFields(static=[],
1116
                       dynamic=self.dynamic_fields,
1117
                       selected=self.op.output_fields)
1118

    
1119
  @staticmethod
1120
  def _DiagnoseByOS(node_list, rlist):
1121
    """Remaps a per-node return list into an a per-os per-node dictionary
1122

1123
      Args:
1124
        node_list: a list with the names of all nodes
1125
        rlist: a map with node names as keys and OS objects as values
1126

1127
      Returns:
1128
        map: a map with osnames as keys and as value another map, with
1129
             nodes as
1130
             keys and list of OS objects as values
1131
             e.g. {"debian-etch": {"node1": [<object>,...],
1132
                                   "node2": [<object>,]}
1133
                  }
1134

1135
    """
1136
    all_os = {}
1137
    for node_name, nr in rlist.iteritems():
1138
      if not nr:
1139
        continue
1140
      for os_obj in nr:
1141
        if os_obj.name not in all_os:
1142
          # build a list of nodes for this os containing empty lists
1143
          # for each node in node_list
1144
          all_os[os_obj.name] = {}
1145
          for nname in node_list:
1146
            all_os[os_obj.name][nname] = []
1147
        all_os[os_obj.name][node_name].append(os_obj)
1148
    return all_os
1149

    
1150
  def Exec(self, feedback_fn):
1151
    """Compute the list of OSes.
1152

1153
    """
1154
    node_list = self.cfg.GetNodeList()
1155
    node_data = rpc.call_os_diagnose(node_list)
1156
    if node_data == False:
1157
      raise errors.OpExecError("Can't gather the list of OSes")
1158
    pol = self._DiagnoseByOS(node_list, node_data)
1159
    output = []
1160
    for os_name, os_data in pol.iteritems():
1161
      row = []
1162
      for field in self.op.output_fields:
1163
        if field == "name":
1164
          val = os_name
1165
        elif field == "valid":
1166
          val = utils.all([osl and osl[0] for osl in os_data.values()])
1167
        elif field == "node_status":
1168
          val = {}
1169
          for node_name, nos_list in os_data.iteritems():
1170
            val[node_name] = [(v.status, v.path) for v in nos_list]
1171
        else:
1172
          raise errors.ParameterError(field)
1173
        row.append(val)
1174
      output.append(row)
1175

    
1176
    return output
1177

    
1178

    
1179
class LURemoveNode(LogicalUnit):
1180
  """Logical unit for removing a node.
1181

1182
  """
1183
  HPATH = "node-remove"
1184
  HTYPE = constants.HTYPE_NODE
1185
  _OP_REQP = ["node_name"]
1186

    
1187
  def BuildHooksEnv(self):
1188
    """Build hooks env.
1189

1190
    This doesn't run on the target node in the pre phase as a failed
1191
    node would then be impossible to remove.
1192

1193
    """
1194
    env = {
1195
      "OP_TARGET": self.op.node_name,
1196
      "NODE_NAME": self.op.node_name,
1197
      }
1198
    all_nodes = self.cfg.GetNodeList()
1199
    all_nodes.remove(self.op.node_name)
1200
    return env, all_nodes, all_nodes
1201

    
1202
  def CheckPrereq(self):
1203
    """Check prerequisites.
1204

1205
    This checks:
1206
     - the node exists in the configuration
1207
     - it does not have primary or secondary instances
1208
     - it's not the master
1209

1210
    Any errors are signalled by raising errors.OpPrereqError.
1211

1212
    """
1213
    node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1214
    if node is None:
1215
      raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1216

    
1217
    instance_list = self.cfg.GetInstanceList()
1218

    
1219
    masternode = self.sstore.GetMasterNode()
1220
    if node.name == masternode:
1221
      raise errors.OpPrereqError("Node is the master node,"
1222
                                 " you need to failover first.")
1223

    
1224
    for instance_name in instance_list:
1225
      instance = self.cfg.GetInstanceInfo(instance_name)
1226
      if node.name == instance.primary_node:
1227
        raise errors.OpPrereqError("Instance %s still running on the node,"
1228
                                   " please remove first." % instance_name)
1229
      if node.name in instance.secondary_nodes:
1230
        raise errors.OpPrereqError("Instance %s has node as a secondary,"
1231
                                   " please remove first." % instance_name)
1232
    self.op.node_name = node.name
1233
    self.node = node
1234

    
1235
  def Exec(self, feedback_fn):
1236
    """Removes the node from the cluster.
1237

1238
    """
1239
    node = self.node
1240
    logger.Info("stopping the node daemon and removing configs from node %s" %
1241
                node.name)
1242

    
1243
    rpc.call_node_leave_cluster(node.name)
1244

    
1245
    logger.Info("Removing node %s from config" % node.name)
1246

    
1247
    self.cfg.RemoveNode(node.name)
1248

    
1249
    utils.RemoveHostFromEtcHosts(node.name)
1250

    
1251

    
1252
class LUQueryNodes(NoHooksLU):
1253
  """Logical unit for querying nodes.
1254

1255
  """
1256
  _OP_REQP = ["output_fields", "names"]
1257

    
1258
  def CheckPrereq(self):
1259
    """Check prerequisites.
1260

1261
    This checks that the fields required are valid output fields.
1262

1263
    """
1264
    self.dynamic_fields = frozenset([
1265
      "dtotal", "dfree",
1266
      "mtotal", "mnode", "mfree",
1267
      "bootid",
1268
      "ctotal",
1269
      ])
1270

    
1271
    _CheckOutputFields(static=["name", "pinst_cnt", "sinst_cnt",
1272
                               "pinst_list", "sinst_list",
1273
                               "pip", "sip", "tags"],
1274
                       dynamic=self.dynamic_fields,
1275
                       selected=self.op.output_fields)
1276

    
1277
    self.wanted = _GetWantedNodes(self, self.op.names)
1278

    
1279
  def Exec(self, feedback_fn):
1280
    """Computes the list of nodes and their attributes.
1281

1282
    """
1283
    nodenames = self.wanted
1284
    nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1285

    
1286
    # begin data gathering
1287

    
1288
    if self.dynamic_fields.intersection(self.op.output_fields):
1289
      live_data = {}
1290
      node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1291
      for name in nodenames:
1292
        nodeinfo = node_data.get(name, None)
1293
        if nodeinfo:
1294
          live_data[name] = {
1295
            "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1296
            "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1297
            "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1298
            "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1299
            "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1300
            "ctotal": utils.TryConvert(int, nodeinfo['cpu_total']),
1301
            "bootid": nodeinfo['bootid'],
1302
            }
1303
        else:
1304
          live_data[name] = {}
1305
    else:
1306
      live_data = dict.fromkeys(nodenames, {})
1307

    
1308
    node_to_primary = dict([(name, set()) for name in nodenames])
1309
    node_to_secondary = dict([(name, set()) for name in nodenames])
1310

    
1311
    inst_fields = frozenset(("pinst_cnt", "pinst_list",
1312
                             "sinst_cnt", "sinst_list"))
1313
    if inst_fields & frozenset(self.op.output_fields):
1314
      instancelist = self.cfg.GetInstanceList()
1315

    
1316
      for instance_name in instancelist:
1317
        inst = self.cfg.GetInstanceInfo(instance_name)
1318
        if inst.primary_node in node_to_primary:
1319
          node_to_primary[inst.primary_node].add(inst.name)
1320
        for secnode in inst.secondary_nodes:
1321
          if secnode in node_to_secondary:
1322
            node_to_secondary[secnode].add(inst.name)
1323

    
1324
    # end data gathering
1325

    
1326
    output = []
1327
    for node in nodelist:
1328
      node_output = []
1329
      for field in self.op.output_fields:
1330
        if field == "name":
1331
          val = node.name
1332
        elif field == "pinst_list":
1333
          val = list(node_to_primary[node.name])
1334
        elif field == "sinst_list":
1335
          val = list(node_to_secondary[node.name])
1336
        elif field == "pinst_cnt":
1337
          val = len(node_to_primary[node.name])
1338
        elif field == "sinst_cnt":
1339
          val = len(node_to_secondary[node.name])
1340
        elif field == "pip":
1341
          val = node.primary_ip
1342
        elif field == "sip":
1343
          val = node.secondary_ip
1344
        elif field == "tags":
1345
          val = list(node.GetTags())
1346
        elif field in self.dynamic_fields:
1347
          val = live_data[node.name].get(field, None)
1348
        else:
1349
          raise errors.ParameterError(field)
1350
        node_output.append(val)
1351
      output.append(node_output)
1352

    
1353
    return output
1354

    
1355

    
1356
class LUQueryNodeVolumes(NoHooksLU):
1357
  """Logical unit for getting volumes on node(s).
1358

1359
  """
1360
  _OP_REQP = ["nodes", "output_fields"]
1361

    
1362
  def CheckPrereq(self):
1363
    """Check prerequisites.
1364

1365
    This checks that the fields required are valid output fields.
1366

1367
    """
1368
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1369

    
1370
    _CheckOutputFields(static=["node"],
1371
                       dynamic=["phys", "vg", "name", "size", "instance"],
1372
                       selected=self.op.output_fields)
1373

    
1374

    
1375
  def Exec(self, feedback_fn):
1376
    """Computes the list of nodes and their attributes.
1377

1378
    """
1379
    nodenames = self.nodes
1380
    volumes = rpc.call_node_volumes(nodenames)
1381

    
1382
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
1383
             in self.cfg.GetInstanceList()]
1384

    
1385
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1386

    
1387
    output = []
1388
    for node in nodenames:
1389
      if node not in volumes or not volumes[node]:
1390
        continue
1391

    
1392
      node_vols = volumes[node][:]
1393
      node_vols.sort(key=lambda vol: vol['dev'])
1394

    
1395
      for vol in node_vols:
1396
        node_output = []
1397
        for field in self.op.output_fields:
1398
          if field == "node":
1399
            val = node
1400
          elif field == "phys":
1401
            val = vol['dev']
1402
          elif field == "vg":
1403
            val = vol['vg']
1404
          elif field == "name":
1405
            val = vol['name']
1406
          elif field == "size":
1407
            val = int(float(vol['size']))
1408
          elif field == "instance":
1409
            for inst in ilist:
1410
              if node not in lv_by_node[inst]:
1411
                continue
1412
              if vol['name'] in lv_by_node[inst][node]:
1413
                val = inst.name
1414
                break
1415
            else:
1416
              val = '-'
1417
          else:
1418
            raise errors.ParameterError(field)
1419
          node_output.append(str(val))
1420

    
1421
        output.append(node_output)
1422

    
1423
    return output
1424

    
1425

    
1426
class LUAddNode(LogicalUnit):
1427
  """Logical unit for adding node to the cluster.
1428

1429
  """
1430
  HPATH = "node-add"
1431
  HTYPE = constants.HTYPE_NODE
1432
  _OP_REQP = ["node_name"]
1433

    
1434
  def BuildHooksEnv(self):
1435
    """Build hooks env.
1436

1437
    This will run on all nodes before, and on all nodes + the new node after.
1438

1439
    """
1440
    env = {
1441
      "OP_TARGET": self.op.node_name,
1442
      "NODE_NAME": self.op.node_name,
1443
      "NODE_PIP": self.op.primary_ip,
1444
      "NODE_SIP": self.op.secondary_ip,
1445
      }
1446
    nodes_0 = self.cfg.GetNodeList()
1447
    nodes_1 = nodes_0 + [self.op.node_name, ]
1448
    return env, nodes_0, nodes_1
1449

    
1450
  def CheckPrereq(self):
1451
    """Check prerequisites.
1452

1453
    This checks:
1454
     - the new node is not already in the config
1455
     - it is resolvable
1456
     - its parameters (single/dual homed) matches the cluster
1457

1458
    Any errors are signalled by raising errors.OpPrereqError.
1459

1460
    """
1461
    node_name = self.op.node_name
1462
    cfg = self.cfg
1463

    
1464
    dns_data = utils.HostInfo(node_name)
1465

    
1466
    node = dns_data.name
1467
    primary_ip = self.op.primary_ip = dns_data.ip
1468
    secondary_ip = getattr(self.op, "secondary_ip", None)
1469
    if secondary_ip is None:
1470
      secondary_ip = primary_ip
1471
    if not utils.IsValidIP(secondary_ip):
1472
      raise errors.OpPrereqError("Invalid secondary IP given")
1473
    self.op.secondary_ip = secondary_ip
1474

    
1475
    node_list = cfg.GetNodeList()
1476
    if not self.op.readd and node in node_list:
1477
      raise errors.OpPrereqError("Node %s is already in the configuration" %
1478
                                 node)
1479
    elif self.op.readd and node not in node_list:
1480
      raise errors.OpPrereqError("Node %s is not in the configuration" % node)
1481

    
1482
    for existing_node_name in node_list:
1483
      existing_node = cfg.GetNodeInfo(existing_node_name)
1484

    
1485
      if self.op.readd and node == existing_node_name:
1486
        if (existing_node.primary_ip != primary_ip or
1487
            existing_node.secondary_ip != secondary_ip):
1488
          raise errors.OpPrereqError("Readded node doesn't have the same IP"
1489
                                     " address configuration as before")
1490
        continue
1491

    
1492
      if (existing_node.primary_ip == primary_ip or
1493
          existing_node.secondary_ip == primary_ip or
1494
          existing_node.primary_ip == secondary_ip or
1495
          existing_node.secondary_ip == secondary_ip):
1496
        raise errors.OpPrereqError("New node ip address(es) conflict with"
1497
                                   " existing node %s" % existing_node.name)
1498

    
1499
    # check that the type of the node (single versus dual homed) is the
1500
    # same as for the master
1501
    myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1502
    master_singlehomed = myself.secondary_ip == myself.primary_ip
1503
    newbie_singlehomed = secondary_ip == primary_ip
1504
    if master_singlehomed != newbie_singlehomed:
1505
      if master_singlehomed:
1506
        raise errors.OpPrereqError("The master has no private ip but the"
1507
                                   " new node has one")
1508
      else:
1509
        raise errors.OpPrereqError("The master has a private ip but the"
1510
                                   " new node doesn't have one")
1511

    
1512
    # checks reachablity
1513
    if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1514
      raise errors.OpPrereqError("Node not reachable by ping")
1515

    
1516
    if not newbie_singlehomed:
1517
      # check reachability from my secondary ip to newbie's secondary ip
1518
      if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1519
                           source=myself.secondary_ip):
1520
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1521
                                   " based ping to noded port")
1522

    
1523
    self.new_node = objects.Node(name=node,
1524
                                 primary_ip=primary_ip,
1525
                                 secondary_ip=secondary_ip)
1526

    
1527
  def Exec(self, feedback_fn):
1528
    """Adds the new node to the cluster.
1529

1530
    """
1531
    new_node = self.new_node
1532
    node = new_node.name
1533

    
1534
    # check connectivity
1535
    result = rpc.call_version([node])[node]
1536
    if result:
1537
      if constants.PROTOCOL_VERSION == result:
1538
        logger.Info("communication to node %s fine, sw version %s match" %
1539
                    (node, result))
1540
      else:
1541
        raise errors.OpExecError("Version mismatch master version %s,"
1542
                                 " node version %s" %
1543
                                 (constants.PROTOCOL_VERSION, result))
1544
    else:
1545
      raise errors.OpExecError("Cannot get version from the new node")
1546

    
1547
    # setup ssh on node
1548
    logger.Info("copy ssh key to node %s" % node)
1549
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1550
    keyarray = []
1551
    keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1552
                constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1553
                priv_key, pub_key]
1554

    
1555
    for i in keyfiles:
1556
      f = open(i, 'r')
1557
      try:
1558
        keyarray.append(f.read())
1559
      finally:
1560
        f.close()
1561

    
1562
    result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1563
                               keyarray[3], keyarray[4], keyarray[5])
1564

    
1565
    if not result:
1566
      raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1567

    
1568
    # Add node to our /etc/hosts, and add key to known_hosts
1569
    utils.AddHostToEtcHosts(new_node.name)
1570

    
1571
    if new_node.secondary_ip != new_node.primary_ip:
1572
      if not rpc.call_node_tcp_ping(new_node.name,
1573
                                    constants.LOCALHOST_IP_ADDRESS,
1574
                                    new_node.secondary_ip,
1575
                                    constants.DEFAULT_NODED_PORT,
1576
                                    10, False):
1577
        raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1578
                                 " you gave (%s). Please fix and re-run this"
1579
                                 " command." % new_node.secondary_ip)
1580

    
1581
    node_verify_list = [self.sstore.GetMasterNode()]
1582
    node_verify_param = {
1583
      'nodelist': [node],
1584
      # TODO: do a node-net-test as well?
1585
    }
1586

    
1587
    result = rpc.call_node_verify(node_verify_list, node_verify_param)
1588
    for verifier in node_verify_list:
1589
      if not result[verifier]:
1590
        raise errors.OpExecError("Cannot communicate with %s's node daemon"
1591
                                 " for remote verification" % verifier)
1592
      if result[verifier]['nodelist']:
1593
        for failed in result[verifier]['nodelist']:
1594
          feedback_fn("ssh/hostname verification failed %s -> %s" %
1595
                      (verifier, result[verifier]['nodelist'][failed]))
1596
        raise errors.OpExecError("ssh/hostname verification failed.")
1597

    
1598
    # Distribute updated /etc/hosts and known_hosts to all nodes,
1599
    # including the node just added
1600
    myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1601
    dist_nodes = self.cfg.GetNodeList()
1602
    if not self.op.readd:
1603
      dist_nodes.append(node)
1604
    if myself.name in dist_nodes:
1605
      dist_nodes.remove(myself.name)
1606

    
1607
    logger.Debug("Copying hosts and known_hosts to all nodes")
1608
    for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1609
      result = rpc.call_upload_file(dist_nodes, fname)
1610
      for to_node in dist_nodes:
1611
        if not result[to_node]:
1612
          logger.Error("copy of file %s to node %s failed" %
1613
                       (fname, to_node))
1614

    
1615
    to_copy = self.sstore.GetFileList()
1616
    if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1617
      to_copy.append(constants.VNC_PASSWORD_FILE)
1618
    for fname in to_copy:
1619
      result = rpc.call_upload_file([node], fname)
1620
      if not result[node]:
1621
        logger.Error("could not copy file %s to node %s" % (fname, node))
1622

    
1623
    if not self.op.readd:
1624
      logger.Info("adding node %s to cluster.conf" % node)
1625
      self.cfg.AddNode(new_node)
1626

    
1627

    
1628
class LUMasterFailover(LogicalUnit):
1629
  """Failover the master node to the current node.
1630

1631
  This is a special LU in that it must run on a non-master node.
1632

1633
  """
1634
  HPATH = "master-failover"
1635
  HTYPE = constants.HTYPE_CLUSTER
1636
  REQ_MASTER = False
1637
  REQ_WSSTORE = True
1638
  _OP_REQP = []
1639

    
1640
  def BuildHooksEnv(self):
1641
    """Build hooks env.
1642

1643
    This will run on the new master only in the pre phase, and on all
1644
    the nodes in the post phase.
1645

1646
    """
1647
    env = {
1648
      "OP_TARGET": self.new_master,
1649
      "NEW_MASTER": self.new_master,
1650
      "OLD_MASTER": self.old_master,
1651
      }
1652
    return env, [self.new_master], self.cfg.GetNodeList()
1653

    
1654
  def CheckPrereq(self):
1655
    """Check prerequisites.
1656

1657
    This checks that we are not already the master.
1658

1659
    """
1660
    self.new_master = utils.HostInfo().name
1661
    self.old_master = self.sstore.GetMasterNode()
1662

    
1663
    if self.old_master == self.new_master:
1664
      raise errors.OpPrereqError("This commands must be run on the node"
1665
                                 " where you want the new master to be."
1666
                                 " %s is already the master" %
1667
                                 self.old_master)
1668

    
1669
  def Exec(self, feedback_fn):
1670
    """Failover the master node.
1671

1672
    This command, when run on a non-master node, will cause the current
1673
    master to cease being master, and the non-master to become new
1674
    master.
1675

1676
    """
1677
    #TODO: do not rely on gethostname returning the FQDN
1678
    logger.Info("setting master to %s, old master: %s" %
1679
                (self.new_master, self.old_master))
1680

    
1681
    if not rpc.call_node_stop_master(self.old_master):
1682
      logger.Error("could disable the master role on the old master"
1683
                   " %s, please disable manually" % self.old_master)
1684

    
1685
    ss = self.sstore
1686
    ss.SetKey(ss.SS_MASTER_NODE, self.new_master)
1687
    if not rpc.call_upload_file(self.cfg.GetNodeList(),
1688
                                ss.KeyToFilename(ss.SS_MASTER_NODE)):
1689
      logger.Error("could not distribute the new simple store master file"
1690
                   " to the other nodes, please check.")
1691

    
1692
    if not rpc.call_node_start_master(self.new_master):
1693
      logger.Error("could not start the master role on the new master"
1694
                   " %s, please check" % self.new_master)
1695
      feedback_fn("Error in activating the master IP on the new master,"
1696
                  " please fix manually.")
1697

    
1698

    
1699

    
1700
class LUQueryClusterInfo(NoHooksLU):
1701
  """Query cluster configuration.
1702

1703
  """
1704
  _OP_REQP = []
1705
  REQ_MASTER = False
1706

    
1707
  def CheckPrereq(self):
1708
    """No prerequsites needed for this LU.
1709

1710
    """
1711
    pass
1712

    
1713
  def Exec(self, feedback_fn):
1714
    """Return cluster config.
1715

1716
    """
1717
    result = {
1718
      "name": self.sstore.GetClusterName(),
1719
      "software_version": constants.RELEASE_VERSION,
1720
      "protocol_version": constants.PROTOCOL_VERSION,
1721
      "config_version": constants.CONFIG_VERSION,
1722
      "os_api_version": constants.OS_API_VERSION,
1723
      "export_version": constants.EXPORT_VERSION,
1724
      "master": self.sstore.GetMasterNode(),
1725
      "architecture": (platform.architecture()[0], platform.machine()),
1726
      "hypervisor_type": self.sstore.GetHypervisorType(),
1727
      }
1728

    
1729
    return result
1730

    
1731

    
1732
class LUDumpClusterConfig(NoHooksLU):
1733
  """Return a text-representation of the cluster-config.
1734

1735
  """
1736
  _OP_REQP = []
1737

    
1738
  def CheckPrereq(self):
1739
    """No prerequisites.
1740

1741
    """
1742
    pass
1743

    
1744
  def Exec(self, feedback_fn):
1745
    """Dump a representation of the cluster config to the standard output.
1746

1747
    """
1748
    return self.cfg.DumpConfig()
1749

    
1750

    
1751
class LUActivateInstanceDisks(NoHooksLU):
1752
  """Bring up an instance's disks.
1753

1754
  """
1755
  _OP_REQP = ["instance_name"]
1756

    
1757
  def CheckPrereq(self):
1758
    """Check prerequisites.
1759

1760
    This checks that the instance is in the cluster.
1761

1762
    """
1763
    instance = self.cfg.GetInstanceInfo(
1764
      self.cfg.ExpandInstanceName(self.op.instance_name))
1765
    if instance is None:
1766
      raise errors.OpPrereqError("Instance '%s' not known" %
1767
                                 self.op.instance_name)
1768
    self.instance = instance
1769

    
1770

    
1771
  def Exec(self, feedback_fn):
1772
    """Activate the disks.
1773

1774
    """
1775
    disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
1776
    if not disks_ok:
1777
      raise errors.OpExecError("Cannot activate block devices")
1778

    
1779
    return disks_info
1780

    
1781

    
1782
def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
1783
  """Prepare the block devices for an instance.
1784

1785
  This sets up the block devices on all nodes.
1786

1787
  Args:
1788
    instance: a ganeti.objects.Instance object
1789
    ignore_secondaries: if true, errors on secondary nodes won't result
1790
                        in an error return from the function
1791

1792
  Returns:
1793
    false if the operation failed
1794
    list of (host, instance_visible_name, node_visible_name) if the operation
1795
         suceeded with the mapping from node devices to instance devices
1796
  """
1797
  device_info = []
1798
  disks_ok = True
1799
  iname = instance.name
1800
  # With the two passes mechanism we try to reduce the window of
1801
  # opportunity for the race condition of switching DRBD to primary
1802
  # before handshaking occured, but we do not eliminate it
1803

    
1804
  # The proper fix would be to wait (with some limits) until the
1805
  # connection has been made and drbd transitions from WFConnection
1806
  # into any other network-connected state (Connected, SyncTarget,
1807
  # SyncSource, etc.)
1808

    
1809
  # 1st pass, assemble on all nodes in secondary mode
1810
  for inst_disk in instance.disks:
1811
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1812
      cfg.SetDiskID(node_disk, node)
1813
      result = rpc.call_blockdev_assemble(node, node_disk, iname, False)
1814
      if not result:
1815
        logger.Error("could not prepare block device %s on node %s"
1816
                     " (is_primary=False, pass=1)" % (inst_disk.iv_name, node))
1817
        if not ignore_secondaries:
1818
          disks_ok = False
1819

    
1820
  # FIXME: race condition on drbd migration to primary
1821

    
1822
  # 2nd pass, do only the primary node
1823
  for inst_disk in instance.disks:
1824
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1825
      if node != instance.primary_node:
1826
        continue
1827
      cfg.SetDiskID(node_disk, node)
1828
      result = rpc.call_blockdev_assemble(node, node_disk, iname, True)
1829
      if not result:
1830
        logger.Error("could not prepare block device %s on node %s"
1831
                     " (is_primary=True, pass=2)" % (inst_disk.iv_name, node))
1832
        disks_ok = False
1833
    device_info.append((instance.primary_node, inst_disk.iv_name, result))
1834

    
1835
  # leave the disks configured for the primary node
1836
  # this is a workaround that would be fixed better by
1837
  # improving the logical/physical id handling
1838
  for disk in instance.disks:
1839
    cfg.SetDiskID(disk, instance.primary_node)
1840

    
1841
  return disks_ok, device_info
1842

    
1843

    
1844
def _StartInstanceDisks(cfg, instance, force):
1845
  """Start the disks of an instance.
1846

1847
  """
1848
  disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
1849
                                           ignore_secondaries=force)
1850
  if not disks_ok:
1851
    _ShutdownInstanceDisks(instance, cfg)
1852
    if force is not None and not force:
1853
      logger.Error("If the message above refers to a secondary node,"
1854
                   " you can retry the operation using '--force'.")
1855
    raise errors.OpExecError("Disk consistency error")
1856

    
1857

    
1858
class LUDeactivateInstanceDisks(NoHooksLU):
1859
  """Shutdown an instance's disks.
1860

1861
  """
1862
  _OP_REQP = ["instance_name"]
1863

    
1864
  def CheckPrereq(self):
1865
    """Check prerequisites.
1866

1867
    This checks that the instance is in the cluster.
1868

1869
    """
1870
    instance = self.cfg.GetInstanceInfo(
1871
      self.cfg.ExpandInstanceName(self.op.instance_name))
1872
    if instance is None:
1873
      raise errors.OpPrereqError("Instance '%s' not known" %
1874
                                 self.op.instance_name)
1875
    self.instance = instance
1876

    
1877
  def Exec(self, feedback_fn):
1878
    """Deactivate the disks
1879

1880
    """
1881
    instance = self.instance
1882
    ins_l = rpc.call_instance_list([instance.primary_node])
1883
    ins_l = ins_l[instance.primary_node]
1884
    if not type(ins_l) is list:
1885
      raise errors.OpExecError("Can't contact node '%s'" %
1886
                               instance.primary_node)
1887

    
1888
    if self.instance.name in ins_l:
1889
      raise errors.OpExecError("Instance is running, can't shutdown"
1890
                               " block devices.")
1891

    
1892
    _ShutdownInstanceDisks(instance, self.cfg)
1893

    
1894

    
1895
def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
1896
  """Shutdown block devices of an instance.
1897

1898
  This does the shutdown on all nodes of the instance.
1899

1900
  If the ignore_primary is false, errors on the primary node are
1901
  ignored.
1902

1903
  """
1904
  result = True
1905
  for disk in instance.disks:
1906
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
1907
      cfg.SetDiskID(top_disk, node)
1908
      if not rpc.call_blockdev_shutdown(node, top_disk):
1909
        logger.Error("could not shutdown block device %s on node %s" %
1910
                     (disk.iv_name, node))
1911
        if not ignore_primary or node != instance.primary_node:
1912
          result = False
1913
  return result
1914

    
1915

    
1916
def _CheckNodeFreeMemory(cfg, node, reason, requested):
1917
  """Checks if a node has enough free memory.
1918

1919
  This function check if a given node has the needed amount of free
1920
  memory. In case the node has less memory or we cannot get the
1921
  information from the node, this function raise an OpPrereqError
1922
  exception.
1923

1924
  Args:
1925
    - cfg: a ConfigWriter instance
1926
    - node: the node name
1927
    - reason: string to use in the error message
1928
    - requested: the amount of memory in MiB
1929

1930
  """
1931
  nodeinfo = rpc.call_node_info([node], cfg.GetVGName())
1932
  if not nodeinfo or not isinstance(nodeinfo, dict):
1933
    raise errors.OpPrereqError("Could not contact node %s for resource"
1934
                             " information" % (node,))
1935

    
1936
  free_mem = nodeinfo[node].get('memory_free')
1937
  if not isinstance(free_mem, int):
1938
    raise errors.OpPrereqError("Can't compute free memory on node %s, result"
1939
                             " was '%s'" % (node, free_mem))
1940
  if requested > free_mem:
1941
    raise errors.OpPrereqError("Not enough memory on node %s for %s:"
1942
                             " needed %s MiB, available %s MiB" %
1943
                             (node, reason, requested, free_mem))
1944

    
1945

    
1946
class LUStartupInstance(LogicalUnit):
1947
  """Starts an instance.
1948

1949
  """
1950
  HPATH = "instance-start"
1951
  HTYPE = constants.HTYPE_INSTANCE
1952
  _OP_REQP = ["instance_name", "force"]
1953

    
1954
  def BuildHooksEnv(self):
1955
    """Build hooks env.
1956

1957
    This runs on master, primary and secondary nodes of the instance.
1958

1959
    """
1960
    env = {
1961
      "FORCE": self.op.force,
1962
      }
1963
    env.update(_BuildInstanceHookEnvByObject(self.instance))
1964
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
1965
          list(self.instance.secondary_nodes))
1966
    return env, nl, nl
1967

    
1968
  def CheckPrereq(self):
1969
    """Check prerequisites.
1970

1971
    This checks that the instance is in the cluster.
1972

1973
    """
1974
    instance = self.cfg.GetInstanceInfo(
1975
      self.cfg.ExpandInstanceName(self.op.instance_name))
1976
    if instance is None:
1977
      raise errors.OpPrereqError("Instance '%s' not known" %
1978
                                 self.op.instance_name)
1979

    
1980
    # check bridges existance
1981
    _CheckInstanceBridgesExist(instance)
1982

    
1983
    _CheckNodeFreeMemory(self.cfg, instance.primary_node,
1984
                         "starting instance %s" % instance.name,
1985
                         instance.memory)
1986

    
1987
    self.instance = instance
1988
    self.op.instance_name = instance.name
1989

    
1990
  def Exec(self, feedback_fn):
1991
    """Start the instance.
1992

1993
    """
1994
    instance = self.instance
1995
    force = self.op.force
1996
    extra_args = getattr(self.op, "extra_args", "")
1997

    
1998
    self.cfg.MarkInstanceUp(instance.name)
1999

    
2000
    node_current = instance.primary_node
2001

    
2002
    _StartInstanceDisks(self.cfg, instance, force)
2003

    
2004
    if not rpc.call_instance_start(node_current, instance, extra_args):
2005
      _ShutdownInstanceDisks(instance, self.cfg)
2006
      raise errors.OpExecError("Could not start instance")
2007

    
2008

    
2009
class LURebootInstance(LogicalUnit):
2010
  """Reboot an instance.
2011

2012
  """
2013
  HPATH = "instance-reboot"
2014
  HTYPE = constants.HTYPE_INSTANCE
2015
  _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2016

    
2017
  def BuildHooksEnv(self):
2018
    """Build hooks env.
2019

2020
    This runs on master, primary and secondary nodes of the instance.
2021

2022
    """
2023
    env = {
2024
      "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2025
      }
2026
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2027
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2028
          list(self.instance.secondary_nodes))
2029
    return env, nl, nl
2030

    
2031
  def CheckPrereq(self):
2032
    """Check prerequisites.
2033

2034
    This checks that the instance is in the cluster.
2035

2036
    """
2037
    instance = self.cfg.GetInstanceInfo(
2038
      self.cfg.ExpandInstanceName(self.op.instance_name))
2039
    if instance is None:
2040
      raise errors.OpPrereqError("Instance '%s' not known" %
2041
                                 self.op.instance_name)
2042

    
2043
    # check bridges existance
2044
    _CheckInstanceBridgesExist(instance)
2045

    
2046
    self.instance = instance
2047
    self.op.instance_name = instance.name
2048

    
2049
  def Exec(self, feedback_fn):
2050
    """Reboot the instance.
2051

2052
    """
2053
    instance = self.instance
2054
    ignore_secondaries = self.op.ignore_secondaries
2055
    reboot_type = self.op.reboot_type
2056
    extra_args = getattr(self.op, "extra_args", "")
2057

    
2058
    node_current = instance.primary_node
2059

    
2060
    if reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2061
                           constants.INSTANCE_REBOOT_HARD,
2062
                           constants.INSTANCE_REBOOT_FULL]:
2063
      raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2064
                                  (constants.INSTANCE_REBOOT_SOFT,
2065
                                   constants.INSTANCE_REBOOT_HARD,
2066
                                   constants.INSTANCE_REBOOT_FULL))
2067

    
2068
    if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2069
                       constants.INSTANCE_REBOOT_HARD]:
2070
      if not rpc.call_instance_reboot(node_current, instance,
2071
                                      reboot_type, extra_args):
2072
        raise errors.OpExecError("Could not reboot instance")
2073
    else:
2074
      if not rpc.call_instance_shutdown(node_current, instance):
2075
        raise errors.OpExecError("could not shutdown instance for full reboot")
2076
      _ShutdownInstanceDisks(instance, self.cfg)
2077
      _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
2078
      if not rpc.call_instance_start(node_current, instance, extra_args):
2079
        _ShutdownInstanceDisks(instance, self.cfg)
2080
        raise errors.OpExecError("Could not start instance for full reboot")
2081

    
2082
    self.cfg.MarkInstanceUp(instance.name)
2083

    
2084

    
2085
class LUShutdownInstance(LogicalUnit):
2086
  """Shutdown an instance.
2087

2088
  """
2089
  HPATH = "instance-stop"
2090
  HTYPE = constants.HTYPE_INSTANCE
2091
  _OP_REQP = ["instance_name"]
2092

    
2093
  def BuildHooksEnv(self):
2094
    """Build hooks env.
2095

2096
    This runs on master, primary and secondary nodes of the instance.
2097

2098
    """
2099
    env = _BuildInstanceHookEnvByObject(self.instance)
2100
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2101
          list(self.instance.secondary_nodes))
2102
    return env, nl, nl
2103

    
2104
  def CheckPrereq(self):
2105
    """Check prerequisites.
2106

2107
    This checks that the instance is in the cluster.
2108

2109
    """
2110
    instance = self.cfg.GetInstanceInfo(
2111
      self.cfg.ExpandInstanceName(self.op.instance_name))
2112
    if instance is None:
2113
      raise errors.OpPrereqError("Instance '%s' not known" %
2114
                                 self.op.instance_name)
2115
    self.instance = instance
2116

    
2117
  def Exec(self, feedback_fn):
2118
    """Shutdown the instance.
2119

2120
    """
2121
    instance = self.instance
2122
    node_current = instance.primary_node
2123
    self.cfg.MarkInstanceDown(instance.name)
2124
    if not rpc.call_instance_shutdown(node_current, instance):
2125
      logger.Error("could not shutdown instance")
2126

    
2127
    _ShutdownInstanceDisks(instance, self.cfg)
2128

    
2129

    
2130
class LUReinstallInstance(LogicalUnit):
2131
  """Reinstall an instance.
2132

2133
  """
2134
  HPATH = "instance-reinstall"
2135
  HTYPE = constants.HTYPE_INSTANCE
2136
  _OP_REQP = ["instance_name"]
2137

    
2138
  def BuildHooksEnv(self):
2139
    """Build hooks env.
2140

2141
    This runs on master, primary and secondary nodes of the instance.
2142

2143
    """
2144
    env = _BuildInstanceHookEnvByObject(self.instance)
2145
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2146
          list(self.instance.secondary_nodes))
2147
    return env, nl, nl
2148

    
2149
  def CheckPrereq(self):
2150
    """Check prerequisites.
2151

2152
    This checks that the instance is in the cluster and is not running.
2153

2154
    """
2155
    instance = self.cfg.GetInstanceInfo(
2156
      self.cfg.ExpandInstanceName(self.op.instance_name))
2157
    if instance is None:
2158
      raise errors.OpPrereqError("Instance '%s' not known" %
2159
                                 self.op.instance_name)
2160
    if instance.disk_template == constants.DT_DISKLESS:
2161
      raise errors.OpPrereqError("Instance '%s' has no disks" %
2162
                                 self.op.instance_name)
2163
    if instance.status != "down":
2164
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2165
                                 self.op.instance_name)
2166
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2167
    if remote_info:
2168
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2169
                                 (self.op.instance_name,
2170
                                  instance.primary_node))
2171

    
2172
    self.op.os_type = getattr(self.op, "os_type", None)
2173
    if self.op.os_type is not None:
2174
      # OS verification
2175
      pnode = self.cfg.GetNodeInfo(
2176
        self.cfg.ExpandNodeName(instance.primary_node))
2177
      if pnode is None:
2178
        raise errors.OpPrereqError("Primary node '%s' is unknown" %
2179
                                   self.op.pnode)
2180
      os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2181
      if not os_obj:
2182
        raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2183
                                   " primary node"  % self.op.os_type)
2184

    
2185
    self.instance = instance
2186

    
2187
  def Exec(self, feedback_fn):
2188
    """Reinstall the instance.
2189

2190
    """
2191
    inst = self.instance
2192

    
2193
    if self.op.os_type is not None:
2194
      feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2195
      inst.os = self.op.os_type
2196
      self.cfg.AddInstance(inst)
2197

    
2198
    _StartInstanceDisks(self.cfg, inst, None)
2199
    try:
2200
      feedback_fn("Running the instance OS create scripts...")
2201
      if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2202
        raise errors.OpExecError("Could not install OS for instance %s"
2203
                                 " on node %s" %
2204
                                 (inst.name, inst.primary_node))
2205
    finally:
2206
      _ShutdownInstanceDisks(inst, self.cfg)
2207

    
2208

    
2209
class LURenameInstance(LogicalUnit):
2210
  """Rename an instance.
2211

2212
  """
2213
  HPATH = "instance-rename"
2214
  HTYPE = constants.HTYPE_INSTANCE
2215
  _OP_REQP = ["instance_name", "new_name"]
2216

    
2217
  def BuildHooksEnv(self):
2218
    """Build hooks env.
2219

2220
    This runs on master, primary and secondary nodes of the instance.
2221

2222
    """
2223
    env = _BuildInstanceHookEnvByObject(self.instance)
2224
    env["INSTANCE_NEW_NAME"] = self.op.new_name
2225
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2226
          list(self.instance.secondary_nodes))
2227
    return env, nl, nl
2228

    
2229
  def CheckPrereq(self):
2230
    """Check prerequisites.
2231

2232
    This checks that the instance is in the cluster and is not running.
2233

2234
    """
2235
    instance = self.cfg.GetInstanceInfo(
2236
      self.cfg.ExpandInstanceName(self.op.instance_name))
2237
    if instance is None:
2238
      raise errors.OpPrereqError("Instance '%s' not known" %
2239
                                 self.op.instance_name)
2240
    if instance.status != "down":
2241
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2242
                                 self.op.instance_name)
2243
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2244
    if remote_info:
2245
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2246
                                 (self.op.instance_name,
2247
                                  instance.primary_node))
2248
    self.instance = instance
2249

    
2250
    # new name verification
2251
    name_info = utils.HostInfo(self.op.new_name)
2252

    
2253
    self.op.new_name = new_name = name_info.name
2254
    instance_list = self.cfg.GetInstanceList()
2255
    if new_name in instance_list:
2256
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2257
                                 new_name)
2258

    
2259
    if not getattr(self.op, "ignore_ip", False):
2260
      if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
2261
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
2262
                                   (name_info.ip, new_name))
2263

    
2264

    
2265
  def Exec(self, feedback_fn):
2266
    """Reinstall the instance.
2267

2268
    """
2269
    inst = self.instance
2270
    old_name = inst.name
2271

    
2272
    if inst.disk_template == constants.DT_FILE:
2273
      old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2274

    
2275
    self.cfg.RenameInstance(inst.name, self.op.new_name)
2276

    
2277
    # re-read the instance from the configuration after rename
2278
    inst = self.cfg.GetInstanceInfo(self.op.new_name)
2279

    
2280
    if inst.disk_template == constants.DT_FILE:
2281
      new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2282
      result = rpc.call_file_storage_dir_rename(inst.primary_node,
2283
                                                old_file_storage_dir,
2284
                                                new_file_storage_dir)
2285

    
2286
      if not result:
2287
        raise errors.OpExecError("Could not connect to node '%s' to rename"
2288
                                 " directory '%s' to '%s' (but the instance"
2289
                                 " has been renamed in Ganeti)" % (
2290
                                 inst.primary_node, old_file_storage_dir,
2291
                                 new_file_storage_dir))
2292

    
2293
      if not result[0]:
2294
        raise errors.OpExecError("Could not rename directory '%s' to '%s'"
2295
                                 " (but the instance has been renamed in"
2296
                                 " Ganeti)" % (old_file_storage_dir,
2297
                                               new_file_storage_dir))
2298

    
2299
    _StartInstanceDisks(self.cfg, inst, None)
2300
    try:
2301
      if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2302
                                          "sda", "sdb"):
2303
        msg = ("Could run OS rename script for instance %s on node %s (but the"
2304
               " instance has been renamed in Ganeti)" %
2305
               (inst.name, inst.primary_node))
2306
        logger.Error(msg)
2307
    finally:
2308
      _ShutdownInstanceDisks(inst, self.cfg)
2309

    
2310

    
2311
class LURemoveInstance(LogicalUnit):
2312
  """Remove an instance.
2313

2314
  """
2315
  HPATH = "instance-remove"
2316
  HTYPE = constants.HTYPE_INSTANCE
2317
  _OP_REQP = ["instance_name", "ignore_failures"]
2318

    
2319
  def BuildHooksEnv(self):
2320
    """Build hooks env.
2321

2322
    This runs on master, primary and secondary nodes of the instance.
2323

2324
    """
2325
    env = _BuildInstanceHookEnvByObject(self.instance)
2326
    nl = [self.sstore.GetMasterNode()]
2327
    return env, nl, nl
2328

    
2329
  def CheckPrereq(self):
2330
    """Check prerequisites.
2331

2332
    This checks that the instance is in the cluster.
2333

2334
    """
2335
    instance = self.cfg.GetInstanceInfo(
2336
      self.cfg.ExpandInstanceName(self.op.instance_name))
2337
    if instance is None:
2338
      raise errors.OpPrereqError("Instance '%s' not known" %
2339
                                 self.op.instance_name)
2340
    self.instance = instance
2341

    
2342
  def Exec(self, feedback_fn):
2343
    """Remove the instance.
2344

2345
    """
2346
    instance = self.instance
2347
    logger.Info("shutting down instance %s on node %s" %
2348
                (instance.name, instance.primary_node))
2349

    
2350
    if not rpc.call_instance_shutdown(instance.primary_node, instance):
2351
      if self.op.ignore_failures:
2352
        feedback_fn("Warning: can't shutdown instance")
2353
      else:
2354
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2355
                                 (instance.name, instance.primary_node))
2356

    
2357
    logger.Info("removing block devices for instance %s" % instance.name)
2358

    
2359
    if not _RemoveDisks(instance, self.cfg):
2360
      if self.op.ignore_failures:
2361
        feedback_fn("Warning: can't remove instance's disks")
2362
      else:
2363
        raise errors.OpExecError("Can't remove instance's disks")
2364

    
2365
    logger.Info("removing instance %s out of cluster config" % instance.name)
2366

    
2367
    self.cfg.RemoveInstance(instance.name)
2368

    
2369

    
2370
class LUQueryInstances(NoHooksLU):
2371
  """Logical unit for querying instances.
2372

2373
  """
2374
  _OP_REQP = ["output_fields", "names"]
2375

    
2376
  def CheckPrereq(self):
2377
    """Check prerequisites.
2378

2379
    This checks that the fields required are valid output fields.
2380

2381
    """
2382
    self.dynamic_fields = frozenset(["oper_state", "oper_ram", "status"])
2383
    _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2384
                               "admin_state", "admin_ram",
2385
                               "disk_template", "ip", "mac", "bridge",
2386
                               "sda_size", "sdb_size", "vcpus", "tags"],
2387
                       dynamic=self.dynamic_fields,
2388
                       selected=self.op.output_fields)
2389

    
2390
    self.wanted = _GetWantedInstances(self, self.op.names)
2391

    
2392
  def Exec(self, feedback_fn):
2393
    """Computes the list of nodes and their attributes.
2394

2395
    """
2396
    instance_names = self.wanted
2397
    instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2398
                     in instance_names]
2399

    
2400
    # begin data gathering
2401

    
2402
    nodes = frozenset([inst.primary_node for inst in instance_list])
2403

    
2404
    bad_nodes = []
2405
    if self.dynamic_fields.intersection(self.op.output_fields):
2406
      live_data = {}
2407
      node_data = rpc.call_all_instances_info(nodes)
2408
      for name in nodes:
2409
        result = node_data[name]
2410
        if result:
2411
          live_data.update(result)
2412
        elif result == False:
2413
          bad_nodes.append(name)
2414
        # else no instance is alive
2415
    else:
2416
      live_data = dict([(name, {}) for name in instance_names])
2417

    
2418
    # end data gathering
2419

    
2420
    output = []
2421
    for instance in instance_list:
2422
      iout = []
2423
      for field in self.op.output_fields:
2424
        if field == "name":
2425
          val = instance.name
2426
        elif field == "os":
2427
          val = instance.os
2428
        elif field == "pnode":
2429
          val = instance.primary_node
2430
        elif field == "snodes":
2431
          val = list(instance.secondary_nodes)
2432
        elif field == "admin_state":
2433
          val = (instance.status != "down")
2434
        elif field == "oper_state":
2435
          if instance.primary_node in bad_nodes:
2436
            val = None
2437
          else:
2438
            val = bool(live_data.get(instance.name))
2439
        elif field == "status":
2440
          if instance.primary_node in bad_nodes:
2441
            val = "ERROR_nodedown"
2442
          else:
2443
            running = bool(live_data.get(instance.name))
2444
            if running:
2445
              if instance.status != "down":
2446
                val = "running"
2447
              else:
2448
                val = "ERROR_up"
2449
            else:
2450
              if instance.status != "down":
2451
                val = "ERROR_down"
2452
              else:
2453
                val = "ADMIN_down"
2454
        elif field == "admin_ram":
2455
          val = instance.memory
2456
        elif field == "oper_ram":
2457
          if instance.primary_node in bad_nodes:
2458
            val = None
2459
          elif instance.name in live_data:
2460
            val = live_data[instance.name].get("memory", "?")
2461
          else:
2462
            val = "-"
2463
        elif field == "disk_template":
2464
          val = instance.disk_template
2465
        elif field == "ip":
2466
          val = instance.nics[0].ip
2467
        elif field == "bridge":
2468
          val = instance.nics[0].bridge
2469
        elif field == "mac":
2470
          val = instance.nics[0].mac
2471
        elif field == "sda_size" or field == "sdb_size":
2472
          disk = instance.FindDisk(field[:3])
2473
          if disk is None:
2474
            val = None
2475
          else:
2476
            val = disk.size
2477
        elif field == "vcpus":
2478
          val = instance.vcpus
2479
        elif field == "tags":
2480
          val = list(instance.GetTags())
2481
        else:
2482
          raise errors.ParameterError(field)
2483
        iout.append(val)
2484
      output.append(iout)
2485

    
2486
    return output
2487

    
2488

    
2489
class LUFailoverInstance(LogicalUnit):
2490
  """Failover an instance.
2491

2492
  """
2493
  HPATH = "instance-failover"
2494
  HTYPE = constants.HTYPE_INSTANCE
2495
  _OP_REQP = ["instance_name", "ignore_consistency"]
2496

    
2497
  def BuildHooksEnv(self):
2498
    """Build hooks env.
2499

2500
    This runs on master, primary and secondary nodes of the instance.
2501

2502
    """
2503
    env = {
2504
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2505
      }
2506
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2507
    nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2508
    return env, nl, nl
2509

    
2510
  def CheckPrereq(self):
2511
    """Check prerequisites.
2512

2513
    This checks that the instance is in the cluster.
2514

2515
    """
2516
    instance = self.cfg.GetInstanceInfo(
2517
      self.cfg.ExpandInstanceName(self.op.instance_name))
2518
    if instance is None:
2519
      raise errors.OpPrereqError("Instance '%s' not known" %
2520
                                 self.op.instance_name)
2521

    
2522
    if instance.disk_template not in constants.DTS_NET_MIRROR:
2523
      raise errors.OpPrereqError("Instance's disk layout is not"
2524
                                 " network mirrored, cannot failover.")
2525

    
2526
    secondary_nodes = instance.secondary_nodes
2527
    if not secondary_nodes:
2528
      raise errors.ProgrammerError("no secondary node but using "
2529
                                   "a mirrored disk template")
2530

    
2531
    target_node = secondary_nodes[0]
2532
    # check memory requirements on the secondary node
2533
    _CheckNodeFreeMemory(self.cfg, target_node, "failing over instance %s" %
2534
                         instance.name, instance.memory)
2535

    
2536
    # check bridge existance
2537
    brlist = [nic.bridge for nic in instance.nics]
2538
    if not rpc.call_bridges_exist(target_node, brlist):
2539
      raise errors.OpPrereqError("One or more target bridges %s does not"
2540
                                 " exist on destination node '%s'" %
2541
                                 (brlist, target_node))
2542

    
2543
    self.instance = instance
2544

    
2545
  def Exec(self, feedback_fn):
2546
    """Failover an instance.
2547

2548
    The failover is done by shutting it down on its present node and
2549
    starting it on the secondary.
2550

2551
    """
2552
    instance = self.instance
2553

    
2554
    source_node = instance.primary_node
2555
    target_node = instance.secondary_nodes[0]
2556

    
2557
    feedback_fn("* checking disk consistency between source and target")
2558
    for dev in instance.disks:
2559
      # for drbd, these are drbd over lvm
2560
      if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2561
        if instance.status == "up" and not self.op.ignore_consistency:
2562
          raise errors.OpExecError("Disk %s is degraded on target node,"
2563
                                   " aborting failover." % dev.iv_name)
2564

    
2565
    feedback_fn("* shutting down instance on source node")
2566
    logger.Info("Shutting down instance %s on node %s" %
2567
                (instance.name, source_node))
2568

    
2569
    if not rpc.call_instance_shutdown(source_node, instance):
2570
      if self.op.ignore_consistency:
2571
        logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2572
                     " anyway. Please make sure node %s is down"  %
2573
                     (instance.name, source_node, source_node))
2574
      else:
2575
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2576
                                 (instance.name, source_node))
2577

    
2578
    feedback_fn("* deactivating the instance's disks on source node")
2579
    if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2580
      raise errors.OpExecError("Can't shut down the instance's disks.")
2581

    
2582
    instance.primary_node = target_node
2583
    # distribute new instance config to the other nodes
2584
    self.cfg.Update(instance)
2585

    
2586
    # Only start the instance if it's marked as up
2587
    if instance.status == "up":
2588
      feedback_fn("* activating the instance's disks on target node")
2589
      logger.Info("Starting instance %s on node %s" %
2590
                  (instance.name, target_node))
2591

    
2592
      disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2593
                                               ignore_secondaries=True)
2594
      if not disks_ok:
2595
        _ShutdownInstanceDisks(instance, self.cfg)
2596
        raise errors.OpExecError("Can't activate the instance's disks")
2597

    
2598
      feedback_fn("* starting the instance on the target node")
2599
      if not rpc.call_instance_start(target_node, instance, None):
2600
        _ShutdownInstanceDisks(instance, self.cfg)
2601
        raise errors.OpExecError("Could not start instance %s on node %s." %
2602
                                 (instance.name, target_node))
2603

    
2604

    
2605
def _CreateBlockDevOnPrimary(cfg, node, instance, device, info):
2606
  """Create a tree of block devices on the primary node.
2607

2608
  This always creates all devices.
2609

2610
  """
2611
  if device.children:
2612
    for child in device.children:
2613
      if not _CreateBlockDevOnPrimary(cfg, node, instance, child, info):
2614
        return False
2615

    
2616
  cfg.SetDiskID(device, node)
2617
  new_id = rpc.call_blockdev_create(node, device, device.size,
2618
                                    instance.name, True, info)
2619
  if not new_id:
2620
    return False
2621
  if device.physical_id is None:
2622
    device.physical_id = new_id
2623
  return True
2624

    
2625

    
2626
def _CreateBlockDevOnSecondary(cfg, node, instance, device, force, info):
2627
  """Create a tree of block devices on a secondary node.
2628

2629
  If this device type has to be created on secondaries, create it and
2630
  all its children.
2631

2632
  If not, just recurse to children keeping the same 'force' value.
2633

2634
  """
2635
  if device.CreateOnSecondary():
2636
    force = True
2637
  if device.children:
2638
    for child in device.children:
2639
      if not _CreateBlockDevOnSecondary(cfg, node, instance,
2640
                                        child, force, info):
2641
        return False
2642

    
2643
  if not force:
2644
    return True
2645
  cfg.SetDiskID(device, node)
2646
  new_id = rpc.call_blockdev_create(node, device, device.size,
2647
                                    instance.name, False, info)
2648
  if not new_id:
2649
    return False
2650
  if device.physical_id is None:
2651
    device.physical_id = new_id
2652
  return True
2653

    
2654

    
2655
def _GenerateUniqueNames(cfg, exts):
2656
  """Generate a suitable LV name.
2657

2658
  This will generate a logical volume name for the given instance.
2659

2660
  """
2661
  results = []
2662
  for val in exts:
2663
    new_id = cfg.GenerateUniqueID()
2664
    results.append("%s%s" % (new_id, val))
2665
  return results
2666

    
2667

    
2668
def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name):
2669
  """Generate a drbd8 device complete with its children.
2670

2671
  """
2672
  port = cfg.AllocatePort()
2673
  vgname = cfg.GetVGName()
2674
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2675
                          logical_id=(vgname, names[0]))
2676
  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2677
                          logical_id=(vgname, names[1]))
2678
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
2679
                          logical_id = (primary, secondary, port),
2680
                          children = [dev_data, dev_meta],
2681
                          iv_name=iv_name)
2682
  return drbd_dev
2683

    
2684

    
2685
def _GenerateDiskTemplate(cfg, template_name,
2686
                          instance_name, primary_node,
2687
                          secondary_nodes, disk_sz, swap_sz,
2688
                          file_storage_dir, file_driver):
2689
  """Generate the entire disk layout for a given template type.
2690

2691
  """
2692
  #TODO: compute space requirements
2693

    
2694
  vgname = cfg.GetVGName()
2695
  if template_name == constants.DT_DISKLESS:
2696
    disks = []
2697
  elif template_name == constants.DT_PLAIN:
2698
    if len(secondary_nodes) != 0:
2699
      raise errors.ProgrammerError("Wrong template configuration")
2700

    
2701
    names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
2702
    sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2703
                           logical_id=(vgname, names[0]),
2704
                           iv_name = "sda")
2705
    sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2706
                           logical_id=(vgname, names[1]),
2707
                           iv_name = "sdb")
2708
    disks = [sda_dev, sdb_dev]
2709
  elif template_name == constants.DT_DRBD8:
2710
    if len(secondary_nodes) != 1:
2711
      raise errors.ProgrammerError("Wrong template configuration")
2712
    remote_node = secondary_nodes[0]
2713
    names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2714
                                       ".sdb_data", ".sdb_meta"])
2715
    drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2716
                                         disk_sz, names[0:2], "sda")
2717
    drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2718
                                         swap_sz, names[2:4], "sdb")
2719
    disks = [drbd_sda_dev, drbd_sdb_dev]
2720
  elif template_name == constants.DT_FILE:
2721
    if len(secondary_nodes) != 0:
2722
      raise errors.ProgrammerError("Wrong template configuration")
2723

    
2724
    file_sda_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk_sz,
2725
                                iv_name="sda", logical_id=(file_driver,
2726
                                "%s/sda" % file_storage_dir))
2727
    file_sdb_dev = objects.Disk(dev_type=constants.LD_FILE, size=swap_sz,
2728
                                iv_name="sdb", logical_id=(file_driver,
2729
                                "%s/sdb" % file_storage_dir))
2730
    disks = [file_sda_dev, file_sdb_dev]
2731
  else:
2732
    raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
2733
  return disks
2734

    
2735

    
2736
def _GetInstanceInfoText(instance):
2737
  """Compute that text that should be added to the disk's metadata.
2738

2739
  """
2740
  return "originstname+%s" % instance.name
2741

    
2742

    
2743
def _CreateDisks(cfg, instance):
2744
  """Create all disks for an instance.
2745

2746
  This abstracts away some work from AddInstance.
2747

2748
  Args:
2749
    instance: the instance object
2750

2751
  Returns:
2752
    True or False showing the success of the creation process
2753

2754
  """
2755
  info = _GetInstanceInfoText(instance)
2756

    
2757
  if instance.disk_template == constants.DT_FILE:
2758
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
2759
    result = rpc.call_file_storage_dir_create(instance.primary_node,
2760
                                              file_storage_dir)
2761

    
2762
    if not result:
2763
      logger.Error("Could not connect to node '%s'" % instance.primary_node)
2764
      return False
2765

    
2766
    if not result[0]:
2767
      logger.Error("failed to create directory '%s'" % file_storage_dir)
2768
      return False
2769

    
2770
  for device in instance.disks:
2771
    logger.Info("creating volume %s for instance %s" %
2772
                (device.iv_name, instance.name))
2773
    #HARDCODE
2774
    for secondary_node in instance.secondary_nodes:
2775
      if not _CreateBlockDevOnSecondary(cfg, secondary_node, instance,
2776
                                        device, False, info):
2777
        logger.Error("failed to create volume %s (%s) on secondary node %s!" %
2778
                     (device.iv_name, device, secondary_node))
2779
        return False
2780
    #HARDCODE
2781
    if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
2782
                                    instance, device, info):
2783
      logger.Error("failed to create volume %s on primary!" %
2784
                   device.iv_name)
2785
      return False
2786

    
2787
  return True
2788

    
2789

    
2790
def _RemoveDisks(instance, cfg):
2791
  """Remove all disks for an instance.
2792

2793
  This abstracts away some work from `AddInstance()` and
2794
  `RemoveInstance()`. Note that in case some of the devices couldn't
2795
  be removed, the removal will continue with the other ones (compare
2796
  with `_CreateDisks()`).
2797

2798
  Args:
2799
    instance: the instance object
2800

2801
  Returns:
2802
    True or False showing the success of the removal proces
2803

2804
  """
2805
  logger.Info("removing block devices for instance %s" % instance.name)
2806

    
2807
  result = True
2808
  for device in instance.disks:
2809
    for node, disk in device.ComputeNodeTree(instance.primary_node):
2810
      cfg.SetDiskID(disk, node)
2811
      if not rpc.call_blockdev_remove(node, disk):
2812
        logger.Error("could not remove block device %s on node %s,"
2813
                     " continuing anyway" %
2814
                     (device.iv_name, node))
2815
        result = False
2816

    
2817
  if instance.disk_template == constants.DT_FILE:
2818
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
2819
    if not rpc.call_file_storage_dir_remove(instance.primary_node,
2820
                                            file_storage_dir):
2821
      logger.Error("could not remove directory '%s'" % file_storage_dir)
2822
      result = False
2823

    
2824
  return result
2825

    
2826

    
2827
def _ComputeDiskSize(disk_template, disk_size, swap_size):
2828
  """Compute disk size requirements in the volume group
2829

2830
  This is currently hard-coded for the two-drive layout.
2831

2832
  """
2833
  # Required free disk space as a function of disk and swap space
2834
  req_size_dict = {
2835
    constants.DT_DISKLESS: None,
2836
    constants.DT_PLAIN: disk_size + swap_size,
2837
    # 256 MB are added for drbd metadata, 128MB for each drbd device
2838
    constants.DT_DRBD8: disk_size + swap_size + 256,
2839
    constants.DT_FILE: None,
2840
  }
2841

    
2842
  if disk_template not in req_size_dict:
2843
    raise errors.ProgrammerError("Disk template '%s' size requirement"
2844
                                 " is unknown" %  disk_template)
2845

    
2846
  return req_size_dict[disk_template]
2847

    
2848

    
2849
class LUCreateInstance(LogicalUnit):
2850
  """Create an instance.
2851

2852
  """
2853
  HPATH = "instance-add"
2854
  HTYPE = constants.HTYPE_INSTANCE
2855
  _OP_REQP = ["instance_name", "mem_size", "disk_size",
2856
              "disk_template", "swap_size", "mode", "start", "vcpus",
2857
              "wait_for_sync", "ip_check", "mac"]
2858

    
2859
  def _RunAllocator(self):
2860
    """Run the allocator based on input opcode.
2861

2862
    """
2863
    disks = [{"size": self.op.disk_size, "mode": "w"},
2864
             {"size": self.op.swap_size, "mode": "w"}]
2865
    nics = [{"mac": self.op.mac, "ip": getattr(self.op, "ip", None),
2866
             "bridge": self.op.bridge}]
2867
    ial = IAllocator(self.cfg, self.sstore,
2868
                     mode=constants.IALLOCATOR_MODE_ALLOC,
2869
                     name=self.op.instance_name,
2870
                     disk_template=self.op.disk_template,
2871
                     tags=[],
2872
                     os=self.op.os_type,
2873
                     vcpus=self.op.vcpus,
2874
                     mem_size=self.op.mem_size,
2875
                     disks=disks,
2876
                     nics=nics,
2877
                     )
2878

    
2879
    ial.Run(self.op.iallocator)
2880

    
2881
    if not ial.success:
2882
      raise errors.OpPrereqError("Can't compute nodes using"
2883
                                 " iallocator '%s': %s" % (self.op.iallocator,
2884
                                                           ial.info))
2885
    if len(ial.nodes) != ial.required_nodes:
2886
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
2887
                                 " of nodes (%s), required %s" %
2888
                                 (len(ial.nodes), ial.required_nodes))
2889
    self.op.pnode = ial.nodes[0]
2890
    logger.ToStdout("Selected nodes for the instance: %s" %
2891
                    (", ".join(ial.nodes),))
2892
    logger.Info("Selected nodes for instance %s via iallocator %s: %s" %
2893
                (self.op.instance_name, self.op.iallocator, ial.nodes))
2894
    if ial.required_nodes == 2:
2895
      self.op.snode = ial.nodes[1]
2896

    
2897
  def BuildHooksEnv(self):
2898
    """Build hooks env.
2899

2900
    This runs on master, primary and secondary nodes of the instance.
2901

2902
    """
2903
    env = {
2904
      "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
2905
      "INSTANCE_DISK_SIZE": self.op.disk_size,
2906
      "INSTANCE_SWAP_SIZE": self.op.swap_size,
2907
      "INSTANCE_ADD_MODE": self.op.mode,
2908
      }
2909
    if self.op.mode == constants.INSTANCE_IMPORT:
2910
      env["INSTANCE_SRC_NODE"] = self.op.src_node
2911
      env["INSTANCE_SRC_PATH"] = self.op.src_path
2912
      env["INSTANCE_SRC_IMAGE"] = self.src_image
2913

    
2914
    env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
2915
      primary_node=self.op.pnode,
2916
      secondary_nodes=self.secondaries,
2917
      status=self.instance_status,
2918
      os_type=self.op.os_type,
2919
      memory=self.op.mem_size,
2920
      vcpus=self.op.vcpus,
2921
      nics=[(self.inst_ip, self.op.bridge, self.op.mac)],
2922
    ))
2923

    
2924
    nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
2925
          self.secondaries)
2926
    return env, nl, nl
2927

    
2928

    
2929
  def CheckPrereq(self):
2930
    """Check prerequisites.
2931

2932
    """
2933
    # set optional parameters to none if they don't exist
2934
    for attr in ["kernel_path", "initrd_path", "hvm_boot_order", "pnode",
2935
                 "iallocator", "hvm_acpi", "hvm_pae", "hvm_cdrom_image_path",
2936
                 "vnc_bind_address"]:
2937
      if not hasattr(self.op, attr):
2938
        setattr(self.op, attr, None)
2939

    
2940
    if self.op.mode not in (constants.INSTANCE_CREATE,
2941
                            constants.INSTANCE_IMPORT):
2942
      raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
2943
                                 self.op.mode)
2944

    
2945
    if (not self.cfg.GetVGName() and
2946
        self.op.disk_template not in constants.DTS_NOT_LVM):
2947
      raise errors.OpPrereqError("Cluster does not support lvm-based"
2948
                                 " instances")
2949

    
2950
    if self.op.mode == constants.INSTANCE_IMPORT:
2951
      src_node = getattr(self.op, "src_node", None)
2952
      src_path = getattr(self.op, "src_path", None)
2953
      if src_node is None or src_path is None:
2954
        raise errors.OpPrereqError("Importing an instance requires source"
2955
                                   " node and path options")
2956
      src_node_full = self.cfg.ExpandNodeName(src_node)
2957
      if src_node_full is None:
2958
        raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
2959
      self.op.src_node = src_node = src_node_full
2960

    
2961
      if not os.path.isabs(src_path):
2962
        raise errors.OpPrereqError("The source path must be absolute")
2963

    
2964
      export_info = rpc.call_export_info(src_node, src_path)
2965

    
2966
      if not export_info:
2967
        raise errors.OpPrereqError("No export found in dir %s" % src_path)
2968

    
2969
      if not export_info.has_section(constants.INISECT_EXP):
2970
        raise errors.ProgrammerError("Corrupted export config")
2971

    
2972
      ei_version = export_info.get(constants.INISECT_EXP, 'version')
2973
      if (int(ei_version) != constants.EXPORT_VERSION):
2974
        raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
2975
                                   (ei_version, constants.EXPORT_VERSION))
2976

    
2977
      if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
2978
        raise errors.OpPrereqError("Can't import instance with more than"
2979
                                   " one data disk")
2980

    
2981
      # FIXME: are the old os-es, disk sizes, etc. useful?
2982
      self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
2983
      diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
2984
                                                         'disk0_dump'))
2985
      self.src_image = diskimage
2986
    else: # INSTANCE_CREATE
2987
      if getattr(self.op, "os_type", None) is None:
2988
        raise errors.OpPrereqError("No guest OS specified")
2989

    
2990
    #### instance parameters check
2991

    
2992
    # disk template and mirror node verification
2993
    if self.op.disk_template not in constants.DISK_TEMPLATES:
2994
      raise errors.OpPrereqError("Invalid disk template name")
2995

    
2996
    # instance name verification
2997
    hostname1 = utils.HostInfo(self.op.instance_name)
2998

    
2999
    self.op.instance_name = instance_name = hostname1.name
3000
    instance_list = self.cfg.GetInstanceList()
3001
    if instance_name in instance_list:
3002
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3003
                                 instance_name)
3004

    
3005
    # ip validity checks
3006
    ip = getattr(self.op, "ip", None)
3007
    if ip is None or ip.lower() == "none":
3008
      inst_ip = None
3009
    elif ip.lower() == "auto":
3010
      inst_ip = hostname1.ip
3011
    else:
3012
      if not utils.IsValidIP(ip):
3013
        raise errors.OpPrereqError("given IP address '%s' doesn't look"
3014
                                   " like a valid IP" % ip)
3015
      inst_ip = ip
3016
    self.inst_ip = self.op.ip = inst_ip
3017

    
3018
    if self.op.start and not self.op.ip_check:
3019
      raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3020
                                 " adding an instance in start mode")
3021

    
3022
    if self.op.ip_check:
3023
      if utils.TcpPing(hostname1.ip, constants.DEFAULT_NODED_PORT):
3024
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
3025
                                   (hostname1.ip, instance_name))
3026

    
3027
    # MAC address verification
3028
    if self.op.mac != "auto":
3029
      if not utils.IsValidMac(self.op.mac.lower()):
3030
        raise errors.OpPrereqError("invalid MAC address specified: %s" %
3031
                                   self.op.mac)
3032

    
3033
    # bridge verification
3034
    bridge = getattr(self.op, "bridge", None)
3035
    if bridge is None:
3036
      self.op.bridge = self.cfg.GetDefBridge()
3037
    else:
3038
      self.op.bridge = bridge
3039

    
3040
    # boot order verification
3041
    if self.op.hvm_boot_order is not None:
3042
      if len(self.op.hvm_boot_order.strip("acdn")) != 0:
3043
        raise errors.OpPrereqError("invalid boot order specified,"
3044
                                   " must be one or more of [acdn]")
3045
    # file storage checks
3046
    if (self.op.file_driver and
3047
        not self.op.file_driver in constants.FILE_DRIVER):
3048
      raise errors.OpPrereqError("Invalid file driver name '%s'" %
3049
                                 self.op.file_driver)
3050

    
3051
    if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
3052
      raise errors.OpPrereqError("File storage directory not a relative"
3053
                                 " path")
3054
    #### allocator run
3055

    
3056
    if [self.op.iallocator, self.op.pnode].count(None) != 1:
3057
      raise errors.OpPrereqError("One and only one of iallocator and primary"
3058
                                 " node must be given")
3059

    
3060
    if self.op.iallocator is not None:
3061
      self._RunAllocator()
3062

    
3063
    #### node related checks
3064

    
3065
    # check primary node
3066
    pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
3067
    if pnode is None:
3068
      raise errors.OpPrereqError("Primary node '%s' is unknown" %
3069
                                 self.op.pnode)
3070
    self.op.pnode = pnode.name
3071
    self.pnode = pnode
3072
    self.secondaries = []
3073

    
3074
    # mirror node verification
3075
    if self.op.disk_template in constants.DTS_NET_MIRROR:
3076
      if getattr(self.op, "snode", None) is None:
3077
        raise errors.OpPrereqError("The networked disk templates need"
3078
                                   " a mirror node")
3079

    
3080
      snode_name = self.cfg.ExpandNodeName(self.op.snode)
3081
      if snode_name is None:
3082
        raise errors.OpPrereqError("Unknown secondary node '%s'" %
3083
                                   self.op.snode)
3084
      elif snode_name == pnode.name:
3085
        raise errors.OpPrereqError("The secondary node cannot be"
3086
                                   " the primary node.")
3087
      self.secondaries.append(snode_name)
3088

    
3089
    req_size = _ComputeDiskSize(self.op.disk_template,
3090
                                self.op.disk_size, self.op.swap_size)
3091

    
3092
    # Check lv size requirements
3093
    if req_size is not None:
3094
      nodenames = [pnode.name] + self.secondaries
3095
      nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
3096
      for node in nodenames:
3097
        info = nodeinfo.get(node, None)
3098
        if not info:
3099
          raise errors.OpPrereqError("Cannot get current information"
3100
                                     " from node '%s'" % node)
3101
        vg_free = info.get('vg_free', None)
3102
        if not isinstance(vg_free, int):
3103
          raise errors.OpPrereqError("Can't compute free disk space on"
3104
                                     " node %s" % node)
3105
        if req_size > info['vg_free']:
3106
          raise errors.OpPrereqError("Not enough disk space on target node %s."
3107
                                     " %d MB available, %d MB required" %
3108
                                     (node, info['vg_free'], req_size))
3109

    
3110
    # os verification
3111
    os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
3112
    if not os_obj:
3113
      raise errors.OpPrereqError("OS '%s' not in supported os list for"
3114
                                 " primary node"  % self.op.os_type)
3115

    
3116
    if self.op.kernel_path == constants.VALUE_NONE:
3117
      raise errors.OpPrereqError("Can't set instance kernel to none")
3118

    
3119

    
3120
    # bridge check on primary node
3121
    if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3122
      raise errors.OpPrereqError("target bridge '%s' does not exist on"
3123
                                 " destination node '%s'" %
3124
                                 (self.op.bridge, pnode.name))
3125

    
3126
    # memory check on primary node
3127
    if self.op.start:
3128
      _CheckNodeFreeMemory(self.cfg, self.pnode.name,
3129
                           "creating instance %s" % self.op.instance_name,
3130
                           self.op.mem_size)
3131

    
3132
    # hvm_cdrom_image_path verification
3133
    if self.op.hvm_cdrom_image_path is not None:
3134
      if not os.path.isabs(self.op.hvm_cdrom_image_path):
3135
        raise errors.OpPrereqError("The path to the HVM CDROM image must"
3136
                                   " be an absolute path or None, not %s" %
3137
                                   self.op.hvm_cdrom_image_path)
3138
      if not os.path.isfile(self.op.hvm_cdrom_image_path):
3139
        raise errors.OpPrereqError("The HVM CDROM image must either be a"
3140
                                   " regular file or a symlink pointing to"
3141
                                   " an existing regular file, not %s" %
3142
                                   self.op.hvm_cdrom_image_path)
3143

    
3144
    # vnc_bind_address verification
3145
    if self.op.vnc_bind_address is not None:
3146
      if not utils.IsValidIP(self.op.vnc_bind_address):
3147
        raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
3148
                                   " like a valid IP address" %
3149
                                   self.op.vnc_bind_address)
3150

    
3151
    if self.op.start:
3152
      self.instance_status = 'up'
3153
    else:
3154
      self.instance_status = 'down'
3155

    
3156
  def Exec(self, feedback_fn):
3157
    """Create and add the instance to the cluster.
3158

3159
    """
3160
    instance = self.op.instance_name
3161
    pnode_name = self.pnode.name
3162

    
3163
    if self.op.mac == "auto":
3164
      mac_address = self.cfg.GenerateMAC()
3165
    else:
3166
      mac_address = self.op.mac
3167

    
3168
    nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3169
    if self.inst_ip is not None:
3170
      nic.ip = self.inst_ip
3171

    
3172
    ht_kind = self.sstore.GetHypervisorType()
3173
    if ht_kind in constants.HTS_REQ_PORT:
3174
      network_port = self.cfg.AllocatePort()
3175
    else:
3176
      network_port = None
3177

    
3178
    if self.op.vnc_bind_address is None:
3179
      self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
3180

    
3181
    # this is needed because os.path.join does not accept None arguments
3182
    if self.op.file_storage_dir is None:
3183
      string_file_storage_dir = ""
3184
    else:
3185
      string_file_storage_dir = self.op.file_storage_dir
3186

    
3187
    # build the full file storage dir path
3188
    file_storage_dir = os.path.normpath(os.path.join(
3189
                                        self.sstore.GetFileStorageDir(),
3190
                                        string_file_storage_dir, instance))
3191

    
3192

    
3193
    disks = _GenerateDiskTemplate(self.cfg,
3194
                                  self.op.disk_template,
3195
                                  instance, pnode_name,
3196
                                  self.secondaries, self.op.disk_size,
3197
                                  self.op.swap_size,
3198
                                  file_storage_dir,
3199
                                  self.op.file_driver)
3200

    
3201
    iobj = objects.Instance(name=instance, os=self.op.os_type,
3202
                            primary_node=pnode_name,
3203
                            memory=self.op.mem_size,
3204
                            vcpus=self.op.vcpus,
3205
                            nics=[nic], disks=disks,
3206
                            disk_template=self.op.disk_template,
3207
                            status=self.instance_status,
3208
                            network_port=network_port,
3209
                            kernel_path=self.op.kernel_path,
3210
                            initrd_path=self.op.initrd_path,
3211
                            hvm_boot_order=self.op.hvm_boot_order,
3212
                            hvm_acpi=self.op.hvm_acpi,
3213
                            hvm_pae=self.op.hvm_pae,
3214
                            hvm_cdrom_image_path=self.op.hvm_cdrom_image_path,
3215
                            vnc_bind_address=self.op.vnc_bind_address,
3216
                            )
3217

    
3218
    feedback_fn("* creating instance disks...")
3219
    if not _CreateDisks(self.cfg, iobj):
3220
      _RemoveDisks(iobj, self.cfg)
3221
      raise errors.OpExecError("Device creation failed, reverting...")
3222

    
3223
    feedback_fn("adding instance %s to cluster config" % instance)
3224

    
3225
    self.cfg.AddInstance(iobj)
3226

    
3227
    if self.op.wait_for_sync:
3228
      disk_abort = not _WaitForSync(self.cfg, iobj, self.proc)
3229
    elif iobj.disk_template in constants.DTS_NET_MIRROR:
3230
      # make sure the disks are not degraded (still sync-ing is ok)
3231
      time.sleep(15)
3232
      feedback_fn("* checking mirrors status")
3233
      disk_abort = not _WaitForSync(self.cfg, iobj, self.proc, oneshot=True)
3234
    else:
3235
      disk_abort = False
3236

    
3237
    if disk_abort:
3238
      _RemoveDisks(iobj, self.cfg)
3239
      self.cfg.RemoveInstance(iobj.name)
3240
      raise errors.OpExecError("There are some degraded disks for"
3241
                               " this instance")
3242

    
3243
    feedback_fn("creating os for instance %s on node %s" %
3244
                (instance, pnode_name))
3245

    
3246
    if iobj.disk_template != constants.DT_DISKLESS:
3247
      if self.op.mode == constants.INSTANCE_CREATE:
3248
        feedback_fn("* running the instance OS create scripts...")
3249
        if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3250
          raise errors.OpExecError("could not add os for instance %s"
3251
                                   " on node %s" %
3252
                                   (instance, pnode_name))
3253

    
3254
      elif self.op.mode == constants.INSTANCE_IMPORT:
3255
        feedback_fn("* running the instance OS import scripts...")
3256
        src_node = self.op.src_node
3257
        src_image = self.src_image
3258
        if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3259
                                                src_node, src_image):
3260
          raise errors.OpExecError("Could not import os for instance"
3261
                                   " %s on node %s" %
3262
                                   (instance, pnode_name))
3263
      else:
3264
        # also checked in the prereq part
3265
        raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3266
                                     % self.op.mode)
3267

    
3268
    if self.op.start:
3269
      logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3270
      feedback_fn("* starting instance...")
3271
      if not rpc.call_instance_start(pnode_name, iobj, None):
3272
        raise errors.OpExecError("Could not start instance")
3273

    
3274

    
3275
class LUConnectConsole(NoHooksLU):
3276
  """Connect to an instance's console.
3277

3278
  This is somewhat special in that it returns the command line that
3279
  you need to run on the master node in order to connect to the
3280
  console.
3281

3282
  """
3283
  _OP_REQP = ["instance_name"]
3284

    
3285
  def CheckPrereq(self):
3286
    """Check prerequisites.
3287

3288
    This checks that the instance is in the cluster.
3289

3290
    """
3291
    instance = self.cfg.GetInstanceInfo(
3292
      self.cfg.ExpandInstanceName(self.op.instance_name))
3293
    if instance is None:
3294
      raise errors.OpPrereqError("Instance '%s' not known" %
3295
                                 self.op.instance_name)
3296
    self.instance = instance
3297

    
3298
  def Exec(self, feedback_fn):
3299
    """Connect to the console of an instance
3300

3301
    """
3302
    instance = self.instance
3303
    node = instance.primary_node
3304

    
3305
    node_insts = rpc.call_instance_list([node])[node]
3306
    if node_insts is False:
3307
      raise errors.OpExecError("Can't connect to node %s." % node)
3308

    
3309
    if instance.name not in node_insts:
3310
      raise errors.OpExecError("Instance %s is not running." % instance.name)
3311

    
3312
    logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3313

    
3314
    hyper = hypervisor.GetHypervisor()
3315
    console_cmd = hyper.GetShellCommandForConsole(instance)
3316

    
3317
    # build ssh cmdline
3318
    return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
3319

    
3320

    
3321
class LUReplaceDisks(LogicalUnit):
3322
  """Replace the disks of an instance.
3323

3324
  """
3325
  HPATH = "mirrors-replace"
3326
  HTYPE = constants.HTYPE_INSTANCE
3327
  _OP_REQP = ["instance_name", "mode", "disks"]
3328

    
3329
  def _RunAllocator(self):
3330
    """Compute a new secondary node using an IAllocator.
3331

3332
    """
3333
    ial = IAllocator(self.cfg, self.sstore,
3334
                     mode=constants.IALLOCATOR_MODE_RELOC,
3335
                     name=self.op.instance_name,
3336
                     relocate_from=[self.sec_node])
3337

    
3338
    ial.Run(self.op.iallocator)
3339

    
3340
    if not ial.success:
3341
      raise errors.OpPrereqError("Can't compute nodes using"
3342
                                 " iallocator '%s': %s" % (self.op.iallocator,
3343
                                                           ial.info))
3344
    if len(ial.nodes) != ial.required_nodes:
3345
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3346
                                 " of nodes (%s), required %s" %
3347
                                 (len(ial.nodes), ial.required_nodes))
3348
    self.op.remote_node = ial.nodes[0]
3349
    logger.ToStdout("Selected new secondary for the instance: %s" %
3350
                    self.op.remote_node)
3351

    
3352
  def BuildHooksEnv(self):
3353
    """Build hooks env.
3354

3355
    This runs on the master, the primary and all the secondaries.
3356

3357
    """
3358
    env = {
3359
      "MODE": self.op.mode,
3360
      "NEW_SECONDARY": self.op.remote_node,
3361
      "OLD_SECONDARY": self.instance.secondary_nodes[0],
3362
      }
3363
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3364
    nl = [
3365
      self.sstore.GetMasterNode(),
3366
      self.instance.primary_node,
3367
      ]
3368
    if self.op.remote_node is not None:
3369
      nl.append(self.op.remote_node)
3370
    return env, nl, nl
3371

    
3372
  def CheckPrereq(self):
3373
    """Check prerequisites.
3374

3375
    This checks that the instance is in the cluster.
3376

3377
    """
3378
    if not hasattr(self.op, "remote_node"):
3379
      self.op.remote_node = None
3380

    
3381
    instance = self.cfg.GetInstanceInfo(
3382
      self.cfg.ExpandInstanceName(self.op.instance_name))
3383
    if instance is None:
3384
      raise errors.OpPrereqError("Instance '%s' not known" %
3385
                                 self.op.instance_name)
3386
    self.instance = instance
3387
    self.op.instance_name = instance.name
3388

    
3389
    if instance.disk_template not in constants.DTS_NET_MIRROR:
3390
      raise errors.OpPrereqError("Instance's disk layout is not"
3391
                                 " network mirrored.")
3392

    
3393
    if len(instance.secondary_nodes) != 1:
3394
      raise errors.OpPrereqError("The instance has a strange layout,"
3395
                                 " expected one secondary but found %d" %
3396
                                 len(instance.secondary_nodes))
3397

    
3398
    self.sec_node = instance.secondary_nodes[0]
3399

    
3400
    ia_name = getattr(self.op, "iallocator", None)
3401
    if ia_name is not None:
3402
      if self.op.remote_node is not None:
3403
        raise errors.OpPrereqError("Give either the iallocator or the new"
3404
                                   " secondary, not both")
3405
      self.op.remote_node = self._RunAllocator()
3406

    
3407
    remote_node = self.op.remote_node
3408
    if remote_node is not None:
3409
      remote_node = self.cfg.ExpandNodeName(remote_node)
3410
      if remote_node is None:
3411
        raise errors.OpPrereqError("Node '%s' not known" %
3412
                                   self.op.remote_node)
3413
      self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3414
    else:
3415
      self.remote_node_info = None
3416
    if remote_node == instance.primary_node:
3417
      raise errors.OpPrereqError("The specified node is the primary node of"
3418
                                 " the instance.")
3419
    elif remote_node == self.sec_node:
3420
      if self.op.mode == constants.REPLACE_DISK_SEC:
3421
        # this is for DRBD8, where we can't execute the same mode of
3422
        # replacement as for drbd7 (no different port allocated)
3423
        raise errors.OpPrereqError("Same secondary given, cannot execute"
3424
                                   " replacement")
3425
    if instance.disk_template == constants.DT_DRBD8:
3426
      if (self.op.mode == constants.REPLACE_DISK_ALL and
3427
          remote_node is not None):
3428
        # switch to replace secondary mode
3429
        self.op.mode = constants.REPLACE_DISK_SEC
3430

    
3431
      if self.op.mode == constants.REPLACE_DISK_ALL:
3432
        raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3433
                                   " secondary disk replacement, not"
3434
                                   " both at once")
3435
      elif self.op.mode == constants.REPLACE_DISK_PRI:
3436
        if remote_node is not None:
3437
          raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3438
                                     " the secondary while doing a primary"
3439
                                     " node disk replacement")
3440
        self.tgt_node = instance.primary_node
3441
        self.oth_node = instance.secondary_nodes[0]
3442
      elif self.op.mode == constants.REPLACE_DISK_SEC:
3443
        self.new_node = remote_node # this can be None, in which case
3444
                                    # we don't change the secondary
3445
        self.tgt_node = instance.secondary_nodes[0]
3446
        self.oth_node = instance.primary_node
3447
      else:
3448
        raise errors.ProgrammerError("Unhandled disk replace mode")
3449

    
3450
    for name in self.op.disks:
3451
      if instance.FindDisk(name) is None:
3452
        raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3453
                                   (name, instance.name))
3454
    self.op.remote_node = remote_node
3455

    
3456
  def _ExecD8DiskOnly(self, feedback_fn):
3457
    """Replace a disk on the primary or secondary for dbrd8.
3458

3459
    The algorithm for replace is quite complicated:
3460
      - for each disk to be replaced:
3461
        - create new LVs on the target node with unique names
3462
        - detach old LVs from the drbd device
3463
        - rename old LVs to name_replaced.<time_t>
3464
        - rename new LVs to old LVs
3465
        - attach the new LVs (with the old names now) to the drbd device
3466
      - wait for sync across all devices
3467
      - for each modified disk:
3468
        - remove old LVs (which have the name name_replaces.<time_t>)
3469

3470
    Failures are not very well handled.
3471

3472
    """
3473
    steps_total = 6
3474
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3475
    instance = self.instance
3476
    iv_names = {}
3477
    vgname = self.cfg.GetVGName()
3478
    # start of work
3479
    cfg = self.cfg
3480
    tgt_node = self.tgt_node
3481
    oth_node = self.oth_node
3482

    
3483
    # Step: check device activation
3484
    self.proc.LogStep(1, steps_total, "check device existence")
3485
    info("checking volume groups")
3486
    my_vg = cfg.GetVGName()
3487
    results = rpc.call_vg_list([oth_node, tgt_node])
3488
    if not results:
3489
      raise errors.OpExecError("Can't list volume groups on the nodes")
3490
    for node in oth_node, tgt_node:
3491
      res = results.get(node, False)
3492
      if not res or my_vg not in res:
3493
        raise errors.OpExecError("Volume group '%s' not found on %s" %
3494
                                 (my_vg, node))
3495
    for dev in instance.disks:
3496
      if not dev.iv_name in self.op.disks:
3497
        continue
3498
      for node in tgt_node, oth_node:
3499
        info("checking %s on %s" % (dev.iv_name, node))
3500
        cfg.SetDiskID(dev, node)
3501
        if not rpc.call_blockdev_find(node, dev):
3502
          raise errors.OpExecError("Can't find device %s on node %s" %
3503
                                   (dev.iv_name, node))
3504

    
3505
    # Step: check other node consistency
3506
    self.proc.LogStep(2, steps_total, "check peer consistency")
3507
    for dev in instance.disks:
3508
      if not dev.iv_name in self.op.disks:
3509
        continue
3510
      info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3511
      if not _CheckDiskConsistency(self.cfg, dev, oth_node,
3512
                                   oth_node==instance.primary_node):
3513
        raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3514
                                 " to replace disks on this node (%s)" %
3515
                                 (oth_node, tgt_node))
3516

    
3517
    # Step: create new storage
3518
    self.proc.LogStep(3, steps_total, "allocate new storage")
3519
    for dev in instance.disks:
3520
      if not dev.iv_name in self.op.disks:
3521
        continue
3522
      size = dev.size
3523
      cfg.SetDiskID(dev, tgt_node)
3524
      lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3525
      names = _GenerateUniqueNames(cfg, lv_names)
3526
      lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3527
                             logical_id=(vgname, names[0]))
3528
      lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3529
                             logical_id=(vgname, names[1]))
3530
      new_lvs = [lv_data, lv_meta]
3531
      old_lvs = dev.children
3532
      iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3533
      info("creating new local storage on %s for %s" %
3534
           (tgt_node, dev.iv_name))
3535
      # since we *always* want to create this LV, we use the
3536
      # _Create...OnPrimary (which forces the creation), even if we
3537
      # are talking about the secondary node
3538
      for new_lv in new_lvs:
3539
        if not _CreateBlockDevOnPrimary(cfg, tgt_node, instance, new_lv,
3540
                                        _GetInstanceInfoText(instance)):
3541
          raise errors.OpExecError("Failed to create new LV named '%s' on"
3542
                                   " node '%s'" %
3543
                                   (new_lv.logical_id[1], tgt_node))
3544

    
3545
    # Step: for each lv, detach+rename*2+attach
3546
    self.proc.LogStep(4, steps_total, "change drbd configuration")
3547
    for dev, old_lvs, new_lvs in iv_names.itervalues():
3548
      info("detaching %s drbd from local storage" % dev.iv_name)
3549
      if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
3550
        raise errors.OpExecError("Can't detach drbd from local storage on node"
3551
                                 " %s for device %s" % (tgt_node, dev.iv_name))
3552
      #dev.children = []
3553
      #cfg.Update(instance)
3554

    
3555
      # ok, we created the new LVs, so now we know we have the needed
3556
      # storage; as such, we proceed on the target node to rename
3557
      # old_lv to _old, and new_lv to old_lv; note that we rename LVs
3558
      # using the assumption that logical_id == physical_id (which in
3559
      # turn is the unique_id on that node)
3560

    
3561
      # FIXME(iustin): use a better name for the replaced LVs
3562
      temp_suffix = int(time.time())
3563
      ren_fn = lambda d, suff: (d.physical_id[0],
3564
                                d.physical_id[1] + "_replaced-%s" % suff)
3565
      # build the rename list based on what LVs exist on the node
3566
      rlist = []
3567
      for to_ren in old_lvs:
3568
        find_res = rpc.call_blockdev_find(tgt_node, to_ren)
3569
        if find_res is not None: # device exists
3570
          rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
3571

    
3572
      info("renaming the old LVs on the target node")
3573
      if not rpc.call_blockdev_rename(tgt_node, rlist):
3574
        raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
3575
      # now we rename the new LVs to the old LVs
3576
      info("renaming the new LVs on the target node")
3577
      rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
3578
      if not rpc.call_blockdev_rename(tgt_node, rlist):
3579
        raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
3580

    
3581
      for old, new in zip(old_lvs, new_lvs):
3582
        new.logical_id = old.logical_id
3583
        cfg.SetDiskID(new, tgt_node)
3584

    
3585
      for disk in old_lvs:
3586
        disk.logical_id = ren_fn(disk, temp_suffix)
3587
        cfg.SetDiskID(disk, tgt_node)
3588

    
3589
      # now that the new lvs have the old name, we can add them to the device
3590
      info("adding new mirror component on %s" % tgt_node)
3591
      if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
3592
        for new_lv in new_lvs:
3593
          if not rpc.call_blockdev_remove(tgt_node, new_lv):
3594
            warning("Can't rollback device %s", hint="manually cleanup unused"
3595
                    " logical volumes")
3596
        raise errors.OpExecError("Can't add local storage to drbd")
3597

    
3598
      dev.children = new_lvs
3599
      cfg.Update(instance)
3600

    
3601
    # Step: wait for sync
3602

    
3603
    # this can fail as the old devices are degraded and _WaitForSync
3604
    # does a combined result over all disks, so we don't check its
3605
    # return value
3606
    self.proc.LogStep(5, steps_total, "sync devices")
3607
    _WaitForSync(cfg, instance, self.proc, unlock=True)
3608

    
3609
    # so check manually all the devices
3610
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3611
      cfg.SetDiskID(dev, instance.primary_node)
3612
      is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3613
      if is_degr:
3614
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
3615

    
3616
    # Step: remove old storage
3617
    self.proc.LogStep(6, steps_total, "removing old storage")
3618
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3619
      info("remove logical volumes for %s" % name)
3620
      for lv in old_lvs:
3621
        cfg.SetDiskID(lv, tgt_node)
3622
        if not rpc.call_blockdev_remove(tgt_node, lv):
3623
          warning("Can't remove old LV", hint="manually remove unused LVs")
3624
          continue
3625

    
3626
  def _ExecD8Secondary(self, feedback_fn):
3627
    """Replace the secondary node for drbd8.
3628

3629
    The algorithm for replace is quite complicated:
3630
      - for all disks of the instance:
3631
        - create new LVs on the new node with same names
3632
        - shutdown the drbd device on the old secondary
3633
        - disconnect the drbd network on the primary
3634
        - create the drbd device on the new secondary
3635
        - network attach the drbd on the primary, using an artifice:
3636
          the drbd code for Attach() will connect to the network if it
3637
          finds a device which is connected to the good local disks but
3638
          not network enabled
3639
      - wait for sync across all devices
3640
      - remove all disks from the old secondary
3641

3642
    Failures are not very well handled.
3643

3644
    """
3645
    steps_total = 6
3646
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3647
    instance = self.instance
3648
    iv_names = {}
3649
    vgname = self.cfg.GetVGName()
3650
    # start of work
3651
    cfg = self.cfg
3652
    old_node = self.tgt_node
3653
    new_node = self.new_node
3654
    pri_node = instance.primary_node
3655

    
3656
    # Step: check device activation
3657
    self.proc.LogStep(1, steps_total, "check device existence")
3658
    info("checking volume groups")
3659
    my_vg = cfg.GetVGName()
3660
    results = rpc.call_vg_list([pri_node, new_node])
3661
    if not results:
3662
      raise errors.OpExecError("Can't list volume groups on the nodes")
3663
    for node in pri_node, new_node:
3664
      res = results.get(node, False)
3665
      if not res or my_vg not in res:
3666
        raise errors.OpExecError("Volume group '%s' not found on %s" %
3667
                                 (my_vg, node))
3668
    for dev in instance.disks:
3669
      if not dev.iv_name in self.op.disks:
3670
        continue
3671
      info("checking %s on %s" % (dev.iv_name, pri_node))
3672
      cfg.SetDiskID(dev, pri_node)
3673
      if not rpc.call_blockdev_find(pri_node, dev):
3674
        raise errors.OpExecError("Can't find device %s on node %s" %
3675
                                 (dev.iv_name, pri_node))
3676

    
3677
    # Step: check other node consistency
3678
    self.proc.LogStep(2, steps_total, "check peer consistency")
3679
    for dev in instance.disks:
3680
      if not dev.iv_name in self.op.disks:
3681
        continue
3682
      info("checking %s consistency on %s" % (dev.iv_name, pri_node))
3683
      if not _CheckDiskConsistency(self.cfg, dev, pri_node, True, ldisk=True):
3684
        raise errors.OpExecError("Primary node (%s) has degraded storage,"
3685
                                 " unsafe to replace the secondary" %
3686
                                 pri_node)
3687

    
3688
    # Step: create new storage
3689
    self.proc.LogStep(3, steps_total, "allocate new storage")
3690
    for dev in instance.disks:
3691
      size = dev.size
3692
      info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
3693
      # since we *always* want to create this LV, we use the
3694
      # _Create...OnPrimary (which forces the creation), even if we
3695
      # are talking about the secondary node
3696
      for new_lv in dev.children:
3697
        if not _CreateBlockDevOnPrimary(cfg, new_node, instance, new_lv,
3698
                                        _GetInstanceInfoText(instance)):
3699
          raise errors.OpExecError("Failed to create new LV named '%s' on"
3700
                                   " node '%s'" %
3701
                                   (new_lv.logical_id[1], new_node))
3702

    
3703
      iv_names[dev.iv_name] = (dev, dev.children)
3704

    
3705
    self.proc.LogStep(4, steps_total, "changing drbd configuration")
3706
    for dev in instance.disks:
3707
      size = dev.size
3708
      info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
3709
      # create new devices on new_node
3710
      new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
3711
                              logical_id=(pri_node, new_node,
3712
                                          dev.logical_id[2]),
3713
                              children=dev.children)
3714
      if not _CreateBlockDevOnSecondary(cfg, new_node, instance,
3715
                                        new_drbd, False,
3716
                                      _GetInstanceInfoText(instance)):
3717
        raise errors.OpExecError("Failed to create new DRBD on"
3718
                                 " node '%s'" % new_node)
3719

    
3720
    for dev in instance.disks:
3721
      # we have new devices, shutdown the drbd on the old secondary
3722
      info("shutting down drbd for %s on old node" % dev.iv_name)
3723
      cfg.SetDiskID(dev, old_node)
3724
      if not rpc.call_blockdev_shutdown(old_node, dev):
3725
        warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
3726
                hint="Please cleanup this device manually as soon as possible")
3727

    
3728
    info("detaching primary drbds from the network (=> standalone)")
3729
    done = 0
3730
    for dev in instance.disks:
3731
      cfg.SetDiskID(dev, pri_node)
3732
      # set the physical (unique in bdev terms) id to None, meaning
3733
      # detach from network
3734
      dev.physical_id = (None,) * len(dev.physical_id)
3735
      # and 'find' the device, which will 'fix' it to match the
3736
      # standalone state
3737
      if rpc.call_blockdev_find(pri_node, dev):
3738
        done += 1
3739
      else:
3740
        warning("Failed to detach drbd %s from network, unusual case" %
3741
                dev.iv_name)
3742

    
3743
    if not done:
3744
      # no detaches succeeded (very unlikely)
3745
      raise errors.OpExecError("Can't detach at least one DRBD from old node")
3746

    
3747
    # if we managed to detach at least one, we update all the disks of
3748
    # the instance to point to the new secondary
3749
    info("updating instance configuration")
3750
    for dev in instance.disks:
3751
      dev.logical_id = (pri_node, new_node) + dev.logical_id[2:]
3752
      cfg.SetDiskID(dev, pri_node)
3753
    cfg.Update(instance)
3754

    
3755
    # and now perform the drbd attach
3756
    info("attaching primary drbds to new secondary (standalone => connected)")
3757
    failures = []
3758
    for dev in instance.disks:
3759
      info("attaching primary drbd for %s to new secondary node" % dev.iv_name)
3760
      # since the attach is smart, it's enough to 'find' the device,
3761
      # it will automatically activate the network, if the physical_id
3762
      # is correct
3763
      cfg.SetDiskID(dev, pri_node)
3764
      if not rpc.call_blockdev_find(pri_node, dev):
3765
        warning("can't attach drbd %s to new secondary!" % dev.iv_name,
3766
                "please do a gnt-instance info to see the status of disks")
3767

    
3768
    # this can fail as the old devices are degraded and _WaitForSync
3769
    # does a combined result over all disks, so we don't check its
3770
    # return value
3771
    self.proc.LogStep(5, steps_total, "sync devices")
3772
    _WaitForSync(cfg, instance, self.proc, unlock=True)
3773

    
3774
    # so check manually all the devices
3775
    for name, (dev, old_lvs) in iv_names.iteritems():
3776
      cfg.SetDiskID(dev, pri_node)
3777
      is_degr = rpc.call_blockdev_find(pri_node, dev)[5]
3778
      if is_degr:
3779
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
3780

    
3781
    self.proc.LogStep(6, steps_total, "removing old storage")
3782
    for name, (dev, old_lvs) in iv_names.iteritems():
3783
      info("remove logical volumes for %s" % name)
3784
      for lv in old_lvs:
3785
        cfg.SetDiskID(lv, old_node)
3786
        if not rpc.call_blockdev_remove(old_node, lv):
3787
          warning("Can't remove LV on old secondary",
3788
                  hint="Cleanup stale volumes by hand")
3789

    
3790
  def Exec(self, feedback_fn):
3791
    """Execute disk replacement.
3792

3793
    This dispatches the disk replacement to the appropriate handler.
3794

3795
    """
3796
    instance = self.instance
3797

    
3798
    # Activate the instance disks if we're replacing them on a down instance
3799
    if instance.status == "down":
3800
      op = opcodes.OpActivateInstanceDisks(instance_name=instance.name)
3801
      self.proc.ChainOpCode(op)
3802

    
3803
    if instance.disk_template == constants.DT_DRBD8:
3804
      if self.op.remote_node is None:
3805
        fn = self._ExecD8DiskOnly
3806
      else:
3807
        fn = self._ExecD8Secondary
3808
    else:
3809
      raise errors.ProgrammerError("Unhandled disk replacement case")
3810

    
3811
    ret = fn(feedback_fn)
3812

    
3813
    # Deactivate the instance disks if we're replacing them on a down instance
3814
    if instance.status == "down":
3815
      op = opcodes.OpDeactivateInstanceDisks(instance_name=instance.name)
3816
      self.proc.ChainOpCode(op)
3817

    
3818
    return ret
3819

    
3820

    
3821
class LUGrowDisk(LogicalUnit):
3822
  """Grow a disk of an instance.
3823

3824
  """
3825
  HPATH = "disk-grow"
3826
  HTYPE = constants.HTYPE_INSTANCE
3827
  _OP_REQP = ["instance_name", "disk", "amount"]
3828

    
3829
  def BuildHooksEnv(self):
3830
    """Build hooks env.
3831

3832
    This runs on the master, the primary and all the secondaries.
3833

3834
    """
3835
    env = {
3836
      "DISK": self.op.disk,
3837
      "AMOUNT": self.op.amount,
3838
      }
3839
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3840
    nl = [
3841
      self.sstore.GetMasterNode(),
3842
      self.instance.primary_node,
3843
      ]
3844
    return env, nl, nl
3845

    
3846
  def CheckPrereq(self):
3847
    """Check prerequisites.
3848

3849
    This checks that the instance is in the cluster.
3850

3851
    """
3852
    instance = self.cfg.GetInstanceInfo(
3853
      self.cfg.ExpandInstanceName(self.op.instance_name))
3854
    if instance is None:
3855
      raise errors.OpPrereqError("Instance '%s' not known" %
3856
                                 self.op.instance_name)
3857
    self.instance = instance
3858
    self.op.instance_name = instance.name
3859

    
3860
    if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
3861
      raise errors.OpPrereqError("Instance's disk layout does not support"
3862
                                 " growing.")
3863

    
3864
    if instance.FindDisk(self.op.disk) is None:
3865
      raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3866
                                 (self.op.disk, instance.name))
3867

    
3868
    nodenames = [instance.primary_node] + list(instance.secondary_nodes)
3869
    nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
3870
    for node in nodenames:
3871
      info = nodeinfo.get(node, None)
3872
      if not info:
3873
        raise errors.OpPrereqError("Cannot get current information"
3874
                                   " from node '%s'" % node)
3875
      vg_free = info.get('vg_free', None)
3876
      if not isinstance(vg_free, int):
3877
        raise errors.OpPrereqError("Can't compute free disk space on"
3878
                                   " node %s" % node)
3879
      if self.op.amount > info['vg_free']:
3880
        raise errors.OpPrereqError("Not enough disk space on target node %s:"
3881
                                   " %d MiB available, %d MiB required" %
3882
                                   (node, info['vg_free'], self.op.amount))
3883

    
3884
  def Exec(self, feedback_fn):
3885
    """Execute disk grow.
3886

3887
    """
3888
    instance = self.instance
3889
    disk = instance.FindDisk(self.op.disk)
3890
    for node in (instance.secondary_nodes + (instance.primary_node,)):
3891
      self.cfg.SetDiskID(disk, node)
3892
      result = rpc.call_blockdev_grow(node, disk, self.op.amount)
3893
      if not result or not isinstance(result, tuple) or len(result) != 2:
3894
        raise errors.OpExecError("grow request failed to node %s" % node)
3895
      elif not result[0]:
3896
        raise errors.OpExecError("grow request failed to node %s: %s" %
3897
                                 (node, result[1]))
3898
    disk.RecordGrow(self.op.amount)
3899
    self.cfg.Update(instance)
3900
    return
3901

    
3902

    
3903
class LUQueryInstanceData(NoHooksLU):
3904
  """Query runtime instance data.
3905

3906
  """
3907
  _OP_REQP = ["instances"]
3908

    
3909
  def CheckPrereq(self):
3910
    """Check prerequisites.
3911

3912
    This only checks the optional instance list against the existing names.
3913

3914
    """
3915
    if not isinstance(self.op.instances, list):
3916
      raise errors.OpPrereqError("Invalid argument type 'instances'")
3917
    if self.op.instances:
3918
      self.wanted_instances = []
3919
      names = self.op.instances
3920
      for name in names:
3921
        instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
3922
        if instance is None:
3923
          raise errors.OpPrereqError("No such instance name '%s'" % name)
3924
        self.wanted_instances.append(instance)
3925
    else:
3926
      self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
3927
                               in self.cfg.GetInstanceList()]
3928
    return
3929

    
3930

    
3931
  def _ComputeDiskStatus(self, instance, snode, dev):
3932
    """Compute block device status.
3933

3934
    """
3935
    self.cfg.SetDiskID(dev, instance.primary_node)
3936
    dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
3937
    if dev.dev_type in constants.LDS_DRBD:
3938
      # we change the snode then (otherwise we use the one passed in)
3939
      if dev.logical_id[0] == instance.primary_node:
3940
        snode = dev.logical_id[1]
3941
      else:
3942
        snode = dev.logical_id[0]
3943

    
3944
    if snode:
3945
      self.cfg.SetDiskID(dev, snode)
3946
      dev_sstatus = rpc.call_blockdev_find(snode, dev)
3947
    else:
3948
      dev_sstatus = None
3949

    
3950
    if dev.children:
3951
      dev_children = [self._ComputeDiskStatus(instance, snode, child)
3952
                      for child in dev.children]
3953
    else:
3954
      dev_children = []
3955

    
3956
    data = {
3957
      "iv_name": dev.iv_name,
3958
      "dev_type": dev.dev_type,
3959
      "logical_id": dev.logical_id,
3960
      "physical_id": dev.physical_id,
3961
      "pstatus": dev_pstatus,
3962
      "sstatus": dev_sstatus,
3963
      "children": dev_children,
3964
      }
3965

    
3966
    return data
3967

    
3968
  def Exec(self, feedback_fn):
3969
    """Gather and return data"""
3970
    result = {}
3971
    for instance in self.wanted_instances:
3972
      remote_info = rpc.call_instance_info(instance.primary_node,
3973
                                                instance.name)
3974
      if remote_info and "state" in remote_info:
3975
        remote_state = "up"
3976
      else:
3977
        remote_state = "down"
3978
      if instance.status == "down":
3979
        config_state = "down"
3980
      else:
3981
        config_state = "up"
3982

    
3983
      disks = [self._ComputeDiskStatus(instance, None, device)
3984
               for device in instance.disks]
3985

    
3986
      idict = {
3987
        "name": instance.name,
3988
        "config_state": config_state,
3989
        "run_state": remote_state,
3990
        "pnode": instance.primary_node,
3991
        "snodes": instance.secondary_nodes,
3992
        "os": instance.os,
3993
        "memory": instance.memory,
3994
        "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
3995
        "disks": disks,
3996
        "vcpus": instance.vcpus,
3997
        }
3998

    
3999
      htkind = self.sstore.GetHypervisorType()
4000
      if htkind == constants.HT_XEN_PVM30:
4001
        idict["kernel_path"] = instance.kernel_path
4002
        idict["initrd_path"] = instance.initrd_path
4003

    
4004
      if htkind == constants.HT_XEN_HVM31:
4005
        idict["hvm_boot_order"] = instance.hvm_boot_order
4006
        idict["hvm_acpi"] = instance.hvm_acpi
4007
        idict["hvm_pae"] = instance.hvm_pae
4008
        idict["hvm_cdrom_image_path"] = instance.hvm_cdrom_image_path
4009

    
4010
      if htkind in constants.HTS_REQ_PORT:
4011
        idict["vnc_bind_address"] = instance.vnc_bind_address
4012
        idict["network_port"] = instance.network_port
4013

    
4014
      result[instance.name] = idict
4015

    
4016
    return result
4017

    
4018

    
4019
class LUSetInstanceParams(LogicalUnit):
4020
  """Modifies an instances's parameters.
4021

4022
  """
4023
  HPATH = "instance-modify"
4024
  HTYPE = constants.HTYPE_INSTANCE
4025
  _OP_REQP = ["instance_name"]
4026

    
4027
  def BuildHooksEnv(self):
4028
    """Build hooks env.
4029

4030
    This runs on the master, primary and secondaries.
4031

4032
    """
4033
    args = dict()
4034
    if self.mem:
4035
      args['memory'] = self.mem
4036
    if self.vcpus:
4037
      args['vcpus'] = self.vcpus
4038
    if self.do_ip or self.do_bridge or self.mac:
4039
      if self.do_ip:
4040
        ip = self.ip
4041
      else:
4042
        ip = self.instance.nics[0].ip
4043
      if self.bridge:
4044
        bridge = self.bridge
4045
      else:
4046
        bridge = self.instance.nics[0].bridge
4047
      if self.mac:
4048
        mac = self.mac
4049
      else:
4050
        mac = self.instance.nics[0].mac
4051
      args['nics'] = [(ip, bridge, mac)]
4052
    env = _BuildInstanceHookEnvByObject(self.instance, override=args)
4053
    nl = [self.sstore.GetMasterNode(),
4054
          self.instance.primary_node] + list(self.instance.secondary_nodes)
4055
    return env, nl, nl
4056

    
4057
  def CheckPrereq(self):
4058
    """Check prerequisites.
4059

4060
    This only checks the instance list against the existing names.
4061

4062
    """
4063
    self.mem = getattr(self.op, "mem", None)
4064
    self.vcpus = getattr(self.op, "vcpus", None)
4065
    self.ip = getattr(self.op, "ip", None)
4066
    self.mac = getattr(self.op, "mac", None)
4067
    self.bridge = getattr(self.op, "bridge", None)
4068
    self.kernel_path = getattr(self.op, "kernel_path", None)
4069
    self.initrd_path = getattr(self.op, "initrd_path", None)
4070
    self.hvm_boot_order = getattr(self.op, "hvm_boot_order", None)
4071
    self.hvm_acpi = getattr(self.op, "hvm_acpi", None)
4072
    self.hvm_pae = getattr(self.op, "hvm_pae", None)
4073
    self.hvm_cdrom_image_path = getattr(self.op, "hvm_cdrom_image_path", None)
4074
    self.vnc_bind_address = getattr(self.op, "vnc_bind_address", None)
4075
    all_parms = [self.mem, self.vcpus, self.ip, self.bridge, self.mac,
4076
                 self.kernel_path, self.initrd_path, self.hvm_boot_order,
4077
                 self.hvm_acpi, self.hvm_pae, self.hvm_cdrom_image_path,
4078
                 self.vnc_bind_address]
4079
    if all_parms.count(None) == len(all_parms):
4080
      raise errors.OpPrereqError("No changes submitted")
4081
    if self.mem is not None:
4082
      try:
4083
        self.mem = int(self.mem)
4084
      except ValueError, err:
4085
        raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
4086
    if self.vcpus is not None:
4087
      try:
4088
        self.vcpus = int(self.vcpus)
4089
      except ValueError, err:
4090
        raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
4091
    if self.ip is not None:
4092
      self.do_ip = True
4093
      if self.ip.lower() == "none":
4094
        self.ip = None
4095
      else:
4096
        if not utils.IsValidIP(self.ip):
4097
          raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
4098
    else:
4099
      self.do_ip = False
4100
    self.do_bridge = (self.bridge is not None)
4101
    if self.mac is not None:
4102
      if self.cfg.IsMacInUse(self.mac):
4103
        raise errors.OpPrereqError('MAC address %s already in use in cluster' %
4104
                                   self.mac)
4105
      if not utils.IsValidMac(self.mac):
4106
        raise errors.OpPrereqError('Invalid MAC address %s' % self.mac)
4107

    
4108
    if self.kernel_path is not None:
4109
      self.do_kernel_path = True
4110
      if self.kernel_path == constants.VALUE_NONE:
4111
        raise errors.OpPrereqError("Can't set instance to no kernel")
4112

    
4113
      if self.kernel_path != constants.VALUE_DEFAULT:
4114
        if not os.path.isabs(self.kernel_path):
4115
          raise errors.OpPrereqError("The kernel path must be an absolute"
4116
                                    " filename")
4117
    else:
4118
      self.do_kernel_path = False
4119

    
4120
    if self.initrd_path is not None:
4121
      self.do_initrd_path = True
4122
      if self.initrd_path not in (constants.VALUE_NONE,
4123
                                  constants.VALUE_DEFAULT):
4124
        if not os.path.isabs(self.initrd_path):
4125
          raise errors.OpPrereqError("The initrd path must be an absolute"
4126
                                    " filename")
4127
    else:
4128
      self.do_initrd_path = False
4129

    
4130
    # boot order verification
4131
    if self.hvm_boot_order is not None:
4132
      if self.hvm_boot_order != constants.VALUE_DEFAULT:
4133
        if len(self.hvm_boot_order.strip("acdn")) != 0:
4134
          raise errors.OpPrereqError("invalid boot order specified,"
4135
                                     " must be one or more of [acdn]"
4136
                                     " or 'default'")
4137

    
4138
    # hvm_cdrom_image_path verification
4139
    if self.op.hvm_cdrom_image_path is not None:
4140
      if not os.path.isabs(self.op.hvm_cdrom_image_path):
4141
        raise errors.OpPrereqError("The path to the HVM CDROM image must"
4142
                                   " be an absolute path or None, not %s" %
4143
                                   self.op.hvm_cdrom_image_path)
4144
      if not os.path.isfile(self.op.hvm_cdrom_image_path):
4145
        raise errors.OpPrereqError("The HVM CDROM image must either be a"
4146
                                   " regular file or a symlink pointing to"
4147
                                   " an existing regular file, not %s" %
4148
                                   self.op.hvm_cdrom_image_path)
4149

    
4150
    # vnc_bind_address verification
4151
    if self.op.vnc_bind_address is not None:
4152
      if not utils.IsValidIP(self.op.vnc_bind_address):
4153
        raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
4154
                                   " like a valid IP address" %
4155
                                   self.op.vnc_bind_address)
4156

    
4157
    instance = self.cfg.GetInstanceInfo(
4158
      self.cfg.ExpandInstanceName(self.op.instance_name))
4159
    if instance is None:
4160
      raise errors.OpPrereqError("No such instance name '%s'" %
4161
                                 self.op.instance_name)
4162
    self.op.instance_name = instance.name
4163
    self.instance = instance
4164
    return
4165

    
4166
  def Exec(self, feedback_fn):
4167
    """Modifies an instance.
4168

4169
    All parameters take effect only at the next restart of the instance.
4170
    """
4171
    result = []
4172
    instance = self.instance
4173
    if self.mem:
4174
      instance.memory = self.mem
4175
      result.append(("mem", self.mem))
4176
    if self.vcpus:
4177
      instance.vcpus = self.vcpus
4178
      result.append(("vcpus",  self.vcpus))
4179
    if self.do_ip:
4180
      instance.nics[0].ip = self.ip
4181
      result.append(("ip", self.ip))
4182
    if self.bridge:
4183
      instance.nics[0].bridge = self.bridge
4184
      result.append(("bridge", self.bridge))
4185
    if self.mac:
4186
      instance.nics[0].mac = self.mac
4187
      result.append(("mac", self.mac))
4188
    if self.do_kernel_path:
4189
      instance.kernel_path = self.kernel_path
4190
      result.append(("kernel_path", self.kernel_path))
4191
    if self.do_initrd_path:
4192
      instance.initrd_path = self.initrd_path
4193
      result.append(("initrd_path", self.initrd_path))
4194
    if self.hvm_boot_order:
4195
      if self.hvm_boot_order == constants.VALUE_DEFAULT:
4196
        instance.hvm_boot_order = None
4197
      else:
4198
        instance.hvm_boot_order = self.hvm_boot_order
4199
      result.append(("hvm_boot_order", self.hvm_boot_order))
4200
    if self.hvm_acpi:
4201
      instance.hvm_acpi = self.hvm_acpi
4202
      result.append(("hvm_acpi", self.hvm_acpi))
4203
    if self.hvm_pae:
4204
      instance.hvm_pae = self.hvm_pae
4205
      result.append(("hvm_pae", self.hvm_pae))
4206
    if self.hvm_cdrom_image_path:
4207
      instance.hvm_cdrom_image_path = self.hvm_cdrom_image_path
4208
      result.append(("hvm_cdrom_image_path", self.hvm_cdrom_image_path))
4209
    if self.vnc_bind_address:
4210
      instance.vnc_bind_address = self.vnc_bind_address
4211
      result.append(("vnc_bind_address", self.vnc_bind_address))
4212

    
4213
    self.cfg.AddInstance(instance)
4214

    
4215
    return result
4216

    
4217

    
4218
class LUQueryExports(NoHooksLU):
4219
  """Query the exports list
4220

4221
  """
4222
  _OP_REQP = []
4223

    
4224
  def CheckPrereq(self):
4225
    """Check that the nodelist contains only existing nodes.
4226

4227
    """
4228
    self.nodes = _GetWantedNodes(self, getattr(self.op, "nodes", None))
4229

    
4230
  def Exec(self, feedback_fn):
4231
    """Compute the list of all the exported system images.
4232

4233
    Returns:
4234
      a dictionary with the structure node->(export-list)
4235
      where export-list is a list of the instances exported on
4236
      that node.
4237

4238
    """
4239
    return rpc.call_export_list(self.nodes)
4240

    
4241

    
4242
class LUExportInstance(LogicalUnit):
4243
  """Export an instance to an image in the cluster.
4244

4245
  """
4246
  HPATH = "instance-export"
4247
  HTYPE = constants.HTYPE_INSTANCE
4248
  _OP_REQP = ["instance_name", "target_node", "shutdown"]
4249

    
4250
  def BuildHooksEnv(self):
4251
    """Build hooks env.
4252

4253
    This will run on the master, primary node and target node.
4254

4255
    """
4256
    env = {
4257
      "EXPORT_NODE": self.op.target_node,
4258
      "EXPORT_DO_SHUTDOWN": self.op.shutdown,
4259
      }
4260
    env.update(_BuildInstanceHookEnvByObject(self.instance))
4261
    nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
4262
          self.op.target_node]
4263
    return env, nl, nl
4264

    
4265
  def CheckPrereq(self):
4266
    """Check prerequisites.
4267

4268
    This checks that the instance and node names are valid.
4269

4270
    """
4271
    instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4272
    self.instance = self.cfg.GetInstanceInfo(instance_name)
4273
    if self.instance is None:
4274
      raise errors.OpPrereqError("Instance '%s' not found" %
4275
                                 self.op.instance_name)
4276

    
4277
    # node verification
4278
    dst_node_short = self.cfg.ExpandNodeName(self.op.target_node)
4279
    self.dst_node = self.cfg.GetNodeInfo(dst_node_short)
4280

    
4281
    if self.dst_node is None:
4282
      raise errors.OpPrereqError("Destination node '%s' is unknown." %
4283
                                 self.op.target_node)
4284
    self.op.target_node = self.dst_node.name
4285

    
4286
    # instance disk type verification
4287
    for disk in self.instance.disks:
4288
      if disk.dev_type == constants.LD_FILE:
4289
        raise errors.OpPrereqError("Export not supported for instances with"
4290
                                   " file-based disks")
4291

    
4292
  def Exec(self, feedback_fn):
4293
    """Export an instance to an image in the cluster.
4294

4295
    """
4296
    instance = self.instance
4297
    dst_node = self.dst_node
4298
    src_node = instance.primary_node
4299
    if self.op.shutdown:
4300
      # shutdown the instance, but not the disks
4301
      if not rpc.call_instance_shutdown(src_node, instance):
4302
         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
4303
                                  (instance.name, src_node))
4304

    
4305
    vgname = self.cfg.GetVGName()
4306

    
4307
    snap_disks = []
4308

    
4309
    try:
4310
      for disk in instance.disks:
4311
        if disk.iv_name == "sda":
4312
          # new_dev_name will be a snapshot of an lvm leaf of the one we passed
4313
          new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
4314

    
4315
          if not new_dev_name:
4316
            logger.Error("could not snapshot block device %s on node %s" %
4317
                         (disk.logical_id[1], src_node))
4318
          else:
4319
            new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
4320
                                      logical_id=(vgname, new_dev_name),
4321
                                      physical_id=(vgname, new_dev_name),
4322
                                      iv_name=disk.iv_name)
4323
            snap_disks.append(new_dev)
4324

    
4325
    finally:
4326
      if self.op.shutdown and instance.status == "up":
4327
        if not rpc.call_instance_start(src_node, instance, None):
4328
          _ShutdownInstanceDisks(instance, self.cfg)
4329
          raise errors.OpExecError("Could not start instance")
4330

    
4331
    # TODO: check for size
4332

    
4333
    for dev in snap_disks:
4334
      if not rpc.call_snapshot_export(src_node, dev, dst_node.name, instance):
4335
        logger.Error("could not export block device %s from node %s to node %s"
4336
                     % (dev.logical_id[1], src_node, dst_node.name))
4337
      if not rpc.call_blockdev_remove(src_node, dev):
4338
        logger.Error("could not remove snapshot block device %s from node %s" %
4339
                     (dev.logical_id[1], src_node))
4340

    
4341
    if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
4342
      logger.Error("could not finalize export for instance %s on node %s" %
4343
                   (instance.name, dst_node.name))
4344

    
4345
    nodelist = self.cfg.GetNodeList()
4346
    nodelist.remove(dst_node.name)
4347

    
4348
    # on one-node clusters nodelist will be empty after the removal
4349
    # if we proceed the backup would be removed because OpQueryExports
4350
    # substitutes an empty list with the full cluster node list.
4351
    if nodelist:
4352
      op = opcodes.OpQueryExports(nodes=nodelist)
4353
      exportlist = self.proc.ChainOpCode(op)
4354
      for node in exportlist:
4355
        if instance.name in exportlist[node]:
4356
          if not rpc.call_export_remove(node, instance.name):
4357
            logger.Error("could not remove older export for instance %s"
4358
                         " on node %s" % (instance.name, node))
4359

    
4360

    
4361
class LURemoveExport(NoHooksLU):
4362
  """Remove exports related to the named instance.
4363

4364
  """
4365
  _OP_REQP = ["instance_name"]
4366

    
4367
  def CheckPrereq(self):
4368
    """Check prerequisites.
4369
    """
4370
    pass
4371

    
4372
  def Exec(self, feedback_fn):
4373
    """Remove any export.
4374

4375
    """
4376
    instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4377
    # If the instance was not found we'll try with the name that was passed in.
4378
    # This will only work if it was an FQDN, though.
4379
    fqdn_warn = False
4380
    if not instance_name:
4381
      fqdn_warn = True
4382
      instance_name = self.op.instance_name
4383

    
4384
    op = opcodes.OpQueryExports(nodes=[])
4385
    exportlist = self.proc.ChainOpCode(op)
4386
    found = False
4387
    for node in exportlist:
4388
      if instance_name in exportlist[node]:
4389
        found = True
4390
        if not rpc.call_export_remove(node, instance_name):
4391
          logger.Error("could not remove export for instance %s"
4392
                       " on node %s" % (instance_name, node))
4393

    
4394
    if fqdn_warn and not found:
4395
      feedback_fn("Export not found. If trying to remove an export belonging"
4396
                  " to a deleted instance please use its Fully Qualified"
4397
                  " Domain Name.")
4398

    
4399

    
4400
class TagsLU(NoHooksLU):
4401
  """Generic tags LU.
4402

4403
  This is an abstract class which is the parent of all the other tags LUs.
4404

4405
  """
4406
  def CheckPrereq(self):
4407
    """Check prerequisites.
4408

4409
    """
4410
    if self.op.kind == constants.TAG_CLUSTER:
4411
      self.target = self.cfg.GetClusterInfo()
4412
    elif self.op.kind == constants.TAG_NODE:
4413
      name = self.cfg.ExpandNodeName(self.op.name)
4414
      if name is None:
4415
        raise errors.OpPrereqError("Invalid node name (%s)" %
4416
                                   (self.op.name,))
4417
      self.op.name = name
4418
      self.target = self.cfg.GetNodeInfo(name)
4419
    elif self.op.kind == constants.TAG_INSTANCE:
4420
      name = self.cfg.ExpandInstanceName(self.op.name)
4421
      if name is None:
4422
        raise errors.OpPrereqError("Invalid instance name (%s)" %
4423
                                   (self.op.name,))
4424
      self.op.name = name
4425
      self.target = self.cfg.GetInstanceInfo(name)
4426
    else:
4427
      raise errors.OpPrereqError("Wrong tag type requested (%s)" %
4428
                                 str(self.op.kind))
4429

    
4430

    
4431
class LUGetTags(TagsLU):
4432
  """Returns the tags of a given object.
4433

4434
  """
4435
  _OP_REQP = ["kind", "name"]
4436

    
4437
  def Exec(self, feedback_fn):
4438
    """Returns the tag list.
4439

4440
    """
4441
    return self.target.GetTags()
4442

    
4443

    
4444
class LUSearchTags(NoHooksLU):
4445
  """Searches the tags for a given pattern.
4446

4447
  """
4448
  _OP_REQP = ["pattern"]
4449

    
4450
  def CheckPrereq(self):
4451
    """Check prerequisites.
4452

4453
    This checks the pattern passed for validity by compiling it.
4454

4455
    """
4456
    try:
4457
      self.re = re.compile(self.op.pattern)
4458
    except re.error, err:
4459
      raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
4460
                                 (self.op.pattern, err))
4461

    
4462
  def Exec(self, feedback_fn):
4463
    """Returns the tag list.
4464

4465
    """
4466
    cfg = self.cfg
4467
    tgts = [("/cluster", cfg.GetClusterInfo())]
4468
    ilist = [cfg.GetInstanceInfo(name) for name in cfg.GetInstanceList()]
4469
    tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
4470
    nlist = [cfg.GetNodeInfo(name) for name in cfg.GetNodeList()]
4471
    tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
4472
    results = []
4473
    for path, target in tgts:
4474
      for tag in target.GetTags():
4475
        if self.re.search(tag):
4476
          results.append((path, tag))
4477
    return results
4478

    
4479

    
4480
class LUAddTags(TagsLU):
4481
  """Sets a tag on a given object.
4482

4483
  """
4484
  _OP_REQP = ["kind", "name", "tags"]
4485

    
4486
  def CheckPrereq(self):
4487
    """Check prerequisites.
4488

4489
    This checks the type and length of the tag name and value.
4490

4491
    """
4492
    TagsLU.CheckPrereq(self)
4493
    for tag in self.op.tags:
4494
      objects.TaggableObject.ValidateTag(tag)
4495

    
4496
  def Exec(self, feedback_fn):
4497
    """Sets the tag.
4498

4499
    """
4500
    try:
4501
      for tag in self.op.tags:
4502
        self.target.AddTag(tag)
4503
    except errors.TagError, err:
4504
      raise errors.OpExecError("Error while setting tag: %s" % str(err))
4505
    try:
4506
      self.cfg.Update(self.target)
4507
    except errors.ConfigurationError:
4508
      raise errors.OpRetryError("There has been a modification to the"
4509
                                " config file and the operation has been"
4510
                                " aborted. Please retry.")
4511

    
4512

    
4513
class LUDelTags(TagsLU):
4514
  """Delete a list of tags from a given object.
4515

4516
  """
4517
  _OP_REQP = ["kind", "name", "tags"]
4518

    
4519
  def CheckPrereq(self):
4520
    """Check prerequisites.
4521

4522
    This checks that we have the given tag.
4523

4524
    """
4525
    TagsLU.CheckPrereq(self)
4526
    for tag in self.op.tags:
4527
      objects.TaggableObject.ValidateTag(tag)
4528
    del_tags = frozenset(self.op.tags)
4529
    cur_tags = self.target.GetTags()
4530
    if not del_tags <= cur_tags:
4531
      diff_tags = del_tags - cur_tags
4532
      diff_names = ["'%s'" % tag for tag in diff_tags]
4533
      diff_names.sort()
4534
      raise errors.OpPrereqError("Tag(s) %s not found" %
4535
                                 (",".join(diff_names)))
4536

    
4537
  def Exec(self, feedback_fn):
4538
    """Remove the tag from the object.
4539

4540
    """
4541
    for tag in self.op.tags:
4542
      self.target.RemoveTag(tag)
4543
    try:
4544
      self.cfg.Update(self.target)
4545
    except errors.ConfigurationError:
4546
      raise errors.OpRetryError("There has been a modification to the"
4547
                                " config file and the operation has been"
4548
                                " aborted. Please retry.")
4549

    
4550
class LUTestDelay(NoHooksLU):
4551
  """Sleep for a specified amount of time.
4552

4553
  This LU sleeps on the master and/or nodes for a specified amount of
4554
  time.
4555

4556
  """
4557
  _OP_REQP = ["duration", "on_master", "on_nodes"]
4558

    
4559
  def CheckPrereq(self):
4560
    """Check prerequisites.
4561

4562
    This checks that we have a good list of nodes and/or the duration
4563
    is valid.
4564

4565
    """
4566

    
4567
    if self.op.on_nodes:
4568
      self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
4569

    
4570
  def Exec(self, feedback_fn):
4571
    """Do the actual sleep.
4572

4573
    """
4574
    if self.op.on_master:
4575
      if not utils.TestDelay(self.op.duration):
4576
        raise errors.OpExecError("Error during master delay test")
4577
    if self.op.on_nodes:
4578
      result = rpc.call_test_delay(self.op.on_nodes, self.op.duration)
4579
      if not result:
4580
        raise errors.OpExecError("Complete failure from rpc call")
4581
      for node, node_result in result.items():
4582
        if not node_result:
4583
          raise errors.OpExecError("Failure during rpc call to node %s,"
4584
                                   " result: %s" % (node, node_result))
4585

    
4586

    
4587
class IAllocator(object):
4588
  """IAllocator framework.
4589

4590
  An IAllocator instance has three sets of attributes:
4591
    - cfg/sstore that are needed to query the cluster
4592
    - input data (all members of the _KEYS class attribute are required)
4593
    - four buffer attributes (in|out_data|text), that represent the
4594
      input (to the external script) in text and data structure format,
4595
      and the output from it, again in two formats
4596
    - the result variables from the script (success, info, nodes) for
4597
      easy usage
4598

4599
  """
4600
  _ALLO_KEYS = [
4601
    "mem_size", "disks", "disk_template",
4602
    "os", "tags", "nics", "vcpus",
4603
    ]
4604
  _RELO_KEYS = [
4605
    "relocate_from",
4606
    ]
4607

    
4608
  def __init__(self, cfg, sstore, mode, name, **kwargs):
4609
    self.cfg = cfg
4610
    self.sstore = sstore
4611
    # init buffer variables
4612
    self.in_text = self.out_text = self.in_data = self.out_data = None
4613
    # init all input fields so that pylint is happy
4614
    self.mode = mode
4615
    self.name = name
4616
    self.mem_size = self.disks = self.disk_template = None
4617
    self.os = self.tags = self.nics = self.vcpus = None
4618
    self.relocate_from = None
4619
    # computed fields
4620
    self.required_nodes = None
4621
    # init result fields
4622
    self.success = self.info = self.nodes = None
4623
    if self.mode == constants.IALLOCATOR_MODE_ALLOC:
4624
      keyset = self._ALLO_KEYS
4625
    elif self.mode == constants.IALLOCATOR_MODE_RELOC:
4626
      keyset = self._RELO_KEYS
4627
    else:
4628
      raise errors.ProgrammerError("Unknown mode '%s' passed to the"
4629
                                   " IAllocator" % self.mode)
4630
    for key in kwargs:
4631
      if key not in keyset:
4632
        raise errors.ProgrammerError("Invalid input parameter '%s' to"
4633
                                     " IAllocator" % key)
4634
      setattr(self, key, kwargs[key])
4635
    for key in keyset:
4636
      if key not in kwargs:
4637
        raise errors.ProgrammerError("Missing input parameter '%s' to"
4638
                                     " IAllocator" % key)
4639
    self._BuildInputData()
4640

    
4641
  def _ComputeClusterData(self):
4642
    """Compute the generic allocator input data.
4643

4644
    This is the data that is independent of the actual operation.
4645

4646
    """
4647
    cfg = self.cfg
4648
    # cluster data
4649
    data = {
4650
      "version": 1,
4651
      "cluster_name": self.sstore.GetClusterName(),
4652
      "cluster_tags": list(cfg.GetClusterInfo().GetTags()),
4653
      "hypervisor_type": self.sstore.GetHypervisorType(),
4654
      # we don't have job IDs
4655
      }
4656

    
4657
    i_list = [cfg.GetInstanceInfo(iname) for iname in cfg.GetInstanceList()]
4658

    
4659
    # node data
4660
    node_results = {}
4661
    node_list = cfg.GetNodeList()
4662
    node_data = rpc.call_node_info(node_list, cfg.GetVGName())
4663
    for nname in node_list:
4664
      ninfo = cfg.GetNodeInfo(nname)
4665
      if nname not in node_data or not isinstance(node_data[nname], dict):
4666
        raise errors.OpExecError("Can't get data for node %s" % nname)
4667
      remote_info = node_data[nname]
4668
      for attr in ['memory_total', 'memory_free', 'memory_dom0',
4669
                   'vg_size', 'vg_free', 'cpu_total']:
4670
        if attr not in remote_info:
4671
          raise errors.OpExecError("Node '%s' didn't return attribute '%s'" %
4672
                                   (nname, attr))
4673
        try:
4674
          remote_info[attr] = int(remote_info[attr])
4675
        except ValueError, err:
4676
          raise errors.OpExecError("Node '%s' returned invalid value for '%s':"
4677
                                   " %s" % (nname, attr, str(err)))
4678
      # compute memory used by primary instances
4679
      i_p_mem = i_p_up_mem = 0
4680
      for iinfo in i_list:
4681
        if iinfo.primary_node == nname:
4682
          i_p_mem += iinfo.memory
4683
          if iinfo.status == "up":
4684
            i_p_up_mem += iinfo.memory
4685

    
4686
      # compute memory used by instances
4687
      pnr = {
4688
        "tags": list(ninfo.GetTags()),
4689
        "total_memory": remote_info['memory_total'],
4690
        "reserved_memory": remote_info['memory_dom0'],
4691
        "free_memory": remote_info['memory_free'],
4692
        "i_pri_memory": i_p_mem,
4693
        "i_pri_up_memory": i_p_up_mem,
4694
        "total_disk": remote_info['vg_size'],
4695
        "free_disk": remote_info['vg_free'],
4696
        "primary_ip": ninfo.primary_ip,
4697
        "secondary_ip": ninfo.secondary_ip,
4698
        "total_cpus": remote_info['cpu_total'],
4699
        }
4700
      node_results[nname] = pnr
4701
    data["nodes"] = node_results
4702

    
4703
    # instance data
4704
    instance_data = {}
4705
    for iinfo in i_list:
4706
      nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
4707
                  for n in iinfo.nics]
4708
      pir = {
4709
        "tags": list(iinfo.GetTags()),
4710
        "should_run": iinfo.status == "up",
4711
        "vcpus": iinfo.vcpus,
4712
        "memory": iinfo.memory,
4713
        "os": iinfo.os,
4714
        "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
4715
        "nics": nic_data,
4716
        "disks": [{"size": dsk.size, "mode": "w"} for dsk in iinfo.disks],
4717
        "disk_template": iinfo.disk_template,
4718
        }
4719
      instance_data[iinfo.name] = pir
4720

    
4721
    data["instances"] = instance_data
4722

    
4723
    self.in_data = data
4724

    
4725
  def _AddNewInstance(self):
4726
    """Add new instance data to allocator structure.
4727

4728
    This in combination with _AllocatorGetClusterData will create the
4729
    correct structure needed as input for the allocator.
4730

4731
    The checks for the completeness of the opcode must have already been
4732
    done.
4733

4734
    """
4735
    data = self.in_data
4736
    if len(self.disks) != 2:
4737
      raise errors.OpExecError("Only two-disk configurations supported")
4738

    
4739
    disk_space = _ComputeDiskSize(self.disk_template,
4740
                                  self.disks[0]["size"], self.disks[1]["size"])
4741

    
4742
    if self.disk_template in constants.DTS_NET_MIRROR:
4743
      self.required_nodes = 2
4744
    else:
4745
      self.required_nodes = 1
4746
    request = {
4747
      "type": "allocate",
4748
      "name": self.name,
4749
      "disk_template": self.disk_template,
4750
      "tags": self.tags,
4751
      "os": self.os,
4752
      "vcpus": self.vcpus,
4753
      "memory": self.mem_size,
4754
      "disks": self.disks,
4755
      "disk_space_total": disk_space,
4756
      "nics": self.nics,
4757
      "required_nodes": self.required_nodes,
4758
      }
4759
    data["request"] = request
4760

    
4761
  def _AddRelocateInstance(self):
4762
    """Add relocate instance data to allocator structure.
4763

4764
    This in combination with _IAllocatorGetClusterData will create the
4765
    correct structure needed as input for the allocator.
4766

4767
    The checks for the completeness of the opcode must have already been
4768
    done.
4769

4770
    """
4771
    instance = self.cfg.GetInstanceInfo(self.name)
4772
    if instance is None:
4773
      raise errors.ProgrammerError("Unknown instance '%s' passed to"
4774
                                   " IAllocator" % self.name)
4775

    
4776
    if instance.disk_template not in constants.DTS_NET_MIRROR:
4777
      raise errors.OpPrereqError("Can't relocate non-mirrored instances")
4778

    
4779
    if len(instance.secondary_nodes) != 1:
4780
      raise errors.OpPrereqError("Instance has not exactly one secondary node")
4781

    
4782
    self.required_nodes = 1
4783

    
4784
    disk_space = _ComputeDiskSize(instance.disk_template,
4785
                                  instance.disks[0].size,
4786
                                  instance.disks[1].size)
4787

    
4788
    request = {
4789
      "type": "relocate",
4790
      "name": self.name,
4791
      "disk_space_total": disk_space,
4792
      "required_nodes": self.required_nodes,
4793
      "relocate_from": self.relocate_from,
4794
      }
4795
    self.in_data["request"] = request
4796

    
4797
  def _BuildInputData(self):
4798
    """Build input data structures.
4799

4800
    """
4801
    self._ComputeClusterData()
4802

    
4803
    if self.mode == constants.IALLOCATOR_MODE_ALLOC:
4804
      self._AddNewInstance()
4805
    else:
4806
      self._AddRelocateInstance()
4807

    
4808
    self.in_text = serializer.Dump(self.in_data)
4809

    
4810
  def Run(self, name, validate=True, call_fn=rpc.call_iallocator_runner):
4811
    """Run an instance allocator and return the results.
4812

4813
    """
4814
    data = self.in_text
4815

    
4816
    result = call_fn(self.sstore.GetMasterNode(), name, self.in_text)
4817

    
4818
    if not isinstance(result, tuple) or len(result) != 4:
4819
      raise errors.OpExecError("Invalid result from master iallocator runner")
4820

    
4821
    rcode, stdout, stderr, fail = result
4822

    
4823
    if rcode == constants.IARUN_NOTFOUND:
4824
      raise errors.OpExecError("Can't find allocator '%s'" % name)
4825
    elif rcode == constants.IARUN_FAILURE:
4826
        raise errors.OpExecError("Instance allocator call failed: %s,"
4827
                                 " output: %s" %
4828
                                 (fail, stdout+stderr))
4829
    self.out_text = stdout
4830
    if validate:
4831
      self._ValidateResult()
4832

    
4833
  def _ValidateResult(self):
4834
    """Process the allocator results.
4835

4836
    This will process and if successful save the result in
4837
    self.out_data and the other parameters.
4838

4839
    """
4840
    try:
4841
      rdict = serializer.Load(self.out_text)
4842
    except Exception, err:
4843
      raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
4844

    
4845
    if not isinstance(rdict, dict):
4846
      raise errors.OpExecError("Can't parse iallocator results: not a dict")
4847

    
4848
    for key in "success", "info", "nodes":
4849
      if key not in rdict:
4850
        raise errors.OpExecError("Can't parse iallocator results:"
4851
                                 " missing key '%s'" % key)
4852
      setattr(self, key, rdict[key])
4853

    
4854
    if not isinstance(rdict["nodes"], list):
4855
      raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
4856
                               " is not a list")
4857
    self.out_data = rdict
4858

    
4859

    
4860
class LUTestAllocator(NoHooksLU):
4861
  """Run allocator tests.
4862

4863
  This LU runs the allocator tests
4864

4865
  """
4866
  _OP_REQP = ["direction", "mode", "name"]
4867

    
4868
  def CheckPrereq(self):
4869
    """Check prerequisites.
4870

4871
    This checks the opcode parameters depending on the director and mode test.
4872

4873
    """
4874
    if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
4875
      for attr in ["name", "mem_size", "disks", "disk_template",
4876
                   "os", "tags", "nics", "vcpus"]:
4877
        if not hasattr(self.op, attr):
4878
          raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
4879
                                     attr)
4880
      iname = self.cfg.ExpandInstanceName(self.op.name)
4881
      if iname is not None:
4882
        raise errors.OpPrereqError("Instance '%s' already in the cluster" %
4883
                                   iname)
4884
      if not isinstance(self.op.nics, list):
4885
        raise errors.OpPrereqError("Invalid parameter 'nics'")
4886
      for row in self.op.nics:
4887
        if (not isinstance(row, dict) or
4888
            "mac" not in row or
4889
            "ip" not in row or
4890
            "bridge" not in row):
4891
          raise errors.OpPrereqError("Invalid contents of the"
4892
                                     " 'nics' parameter")
4893
      if not isinstance(self.op.disks, list):
4894
        raise errors.OpPrereqError("Invalid parameter 'disks'")
4895
      if len(self.op.disks) != 2:
4896
        raise errors.OpPrereqError("Only two-disk configurations supported")
4897
      for row in self.op.disks:
4898
        if (not isinstance(row, dict) or
4899
            "size" not in row or
4900
            not isinstance(row["size"], int) or
4901
            "mode" not in row or
4902
            row["mode"] not in ['r', 'w']):
4903
          raise errors.OpPrereqError("Invalid contents of the"
4904
                                     " 'disks' parameter")
4905
    elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
4906
      if not hasattr(self.op, "name"):
4907
        raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
4908
      fname = self.cfg.ExpandInstanceName(self.op.name)
4909
      if fname is None:
4910
        raise errors.OpPrereqError("Instance '%s' not found for relocation" %
4911
                                   self.op.name)
4912
      self.op.name = fname
4913
      self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
4914
    else:
4915
      raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
4916
                                 self.op.mode)
4917

    
4918
    if self.op.direction == constants.IALLOCATOR_DIR_OUT:
4919
      if not hasattr(self.op, "allocator") or self.op.allocator is None:
4920
        raise errors.OpPrereqError("Missing allocator name")
4921
    elif self.op.direction != constants.IALLOCATOR_DIR_IN:
4922
      raise errors.OpPrereqError("Wrong allocator test '%s'" %
4923
                                 self.op.direction)
4924

    
4925
  def Exec(self, feedback_fn):
4926
    """Run the allocator test.
4927

4928
    """
4929
    if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
4930
      ial = IAllocator(self.cfg, self.sstore,
4931
                       mode=self.op.mode,
4932
                       name=self.op.name,
4933
                       mem_size=self.op.mem_size,
4934
                       disks=self.op.disks,
4935
                       disk_template=self.op.disk_template,
4936
                       os=self.op.os,
4937
                       tags=self.op.tags,
4938
                       nics=self.op.nics,
4939
                       vcpus=self.op.vcpus,
4940
                       )
4941
    else:
4942
      ial = IAllocator(self.cfg, self.sstore,
4943
                       mode=self.op.mode,
4944
                       name=self.op.name,
4945
                       relocate_from=list(self.relocate_from),
4946
                       )
4947

    
4948
    if self.op.direction == constants.IALLOCATOR_DIR_IN:
4949
      result = ial.in_text
4950
    else:
4951
      ial.Run(self.op.allocator, validate=False)
4952
      result = ial.out_text
4953
    return result