Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ a0c9f010

History | View | Annotate | Download (168.1 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0613,W0201
25

    
26
import os
27
import os.path
28
import sha
29
import time
30
import tempfile
31
import re
32
import platform
33

    
34
from ganeti import rpc
35
from ganeti import ssh
36
from ganeti import logger
37
from ganeti import utils
38
from ganeti import errors
39
from ganeti import hypervisor
40
from ganeti import config
41
from ganeti import constants
42
from ganeti import objects
43
from ganeti import opcodes
44
from ganeti import ssconf
45
from ganeti import serializer
46

    
47

    
48
class LogicalUnit(object):
49
  """Logical Unit base class.
50

51
  Subclasses must follow these rules:
52
    - implement CheckPrereq which also fills in the opcode instance
53
      with all the fields (even if as None)
54
    - implement Exec
55
    - implement BuildHooksEnv
56
    - redefine HPATH and HTYPE
57
    - optionally redefine their run requirements (REQ_MASTER); note that all
58
      commands require root permissions
59

60
  """
61
  HPATH = None
62
  HTYPE = None
63
  _OP_REQP = []
64
  REQ_MASTER = True
65

    
66
  def __init__(self, processor, op, cfg, sstore):
67
    """Constructor for LogicalUnit.
68

69
    This needs to be overriden in derived classes in order to check op
70
    validity.
71

72
    """
73
    self.proc = processor
74
    self.op = op
75
    self.cfg = cfg
76
    self.sstore = sstore
77
    self.__ssh = None
78

    
79
    for attr_name in self._OP_REQP:
80
      attr_val = getattr(op, attr_name, None)
81
      if attr_val is None:
82
        raise errors.OpPrereqError("Required parameter '%s' missing" %
83
                                   attr_name)
84

    
85
    if not cfg.IsCluster():
86
      raise errors.OpPrereqError("Cluster not initialized yet,"
87
                                 " use 'gnt-cluster init' first.")
88
    if self.REQ_MASTER:
89
      master = sstore.GetMasterNode()
90
      if master != utils.HostInfo().name:
91
        raise errors.OpPrereqError("Commands must be run on the master"
92
                                   " node %s" % master)
93

    
94
  def __GetSSH(self):
95
    """Returns the SshRunner object
96

97
    """
98
    if not self.__ssh:
99
      self.__ssh = ssh.SshRunner(self.sstore)
100
    return self.__ssh
101

    
102
  ssh = property(fget=__GetSSH)
103

    
104
  def CheckPrereq(self):
105
    """Check prerequisites for this LU.
106

107
    This method should check that the prerequisites for the execution
108
    of this LU are fulfilled. It can do internode communication, but
109
    it should be idempotent - no cluster or system changes are
110
    allowed.
111

112
    The method should raise errors.OpPrereqError in case something is
113
    not fulfilled. Its return value is ignored.
114

115
    This method should also update all the parameters of the opcode to
116
    their canonical form; e.g. a short node name must be fully
117
    expanded after this method has successfully completed (so that
118
    hooks, logging, etc. work correctly).
119

120
    """
121
    raise NotImplementedError
122

    
123
  def Exec(self, feedback_fn):
124
    """Execute the LU.
125

126
    This method should implement the actual work. It should raise
127
    errors.OpExecError for failures that are somewhat dealt with in
128
    code, or expected.
129

130
    """
131
    raise NotImplementedError
132

    
133
  def BuildHooksEnv(self):
134
    """Build hooks environment for this LU.
135

136
    This method should return a three-node tuple consisting of: a dict
137
    containing the environment that will be used for running the
138
    specific hook for this LU, a list of node names on which the hook
139
    should run before the execution, and a list of node names on which
140
    the hook should run after the execution.
141

142
    The keys of the dict must not have 'GANETI_' prefixed as this will
143
    be handled in the hooks runner. Also note additional keys will be
144
    added by the hooks runner. If the LU doesn't define any
145
    environment, an empty dict (and not None) should be returned.
146

147
    No nodes should be returned as an empty list (and not None).
148

149
    Note that if the HPATH for a LU class is None, this function will
150
    not be called.
151

152
    """
153
    raise NotImplementedError
154

    
155
  def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
156
    """Notify the LU about the results of its hooks.
157

158
    This method is called every time a hooks phase is executed, and notifies
159
    the Logical Unit about the hooks' result. The LU can then use it to alter
160
    its result based on the hooks.  By default the method does nothing and the
161
    previous result is passed back unchanged but any LU can define it if it
162
    wants to use the local cluster hook-scripts somehow.
163

164
    Args:
165
      phase: the hooks phase that has just been run
166
      hooks_results: the results of the multi-node hooks rpc call
167
      feedback_fn: function to send feedback back to the caller
168
      lu_result: the previous result this LU had, or None in the PRE phase.
169

170
    """
171
    return lu_result
172

    
173

    
174
class NoHooksLU(LogicalUnit):
175
  """Simple LU which runs no hooks.
176

177
  This LU is intended as a parent for other LogicalUnits which will
178
  run no hooks, in order to reduce duplicate code.
179

180
  """
181
  HPATH = None
182
  HTYPE = None
183

    
184

    
185
def _GetWantedNodes(lu, nodes):
186
  """Returns list of checked and expanded node names.
187

188
  Args:
189
    nodes: List of nodes (strings) or None for all
190

191
  """
192
  if not isinstance(nodes, list):
193
    raise errors.OpPrereqError("Invalid argument type 'nodes'")
194

    
195
  if nodes:
196
    wanted = []
197

    
198
    for name in nodes:
199
      node = lu.cfg.ExpandNodeName(name)
200
      if node is None:
201
        raise errors.OpPrereqError("No such node name '%s'" % name)
202
      wanted.append(node)
203

    
204
  else:
205
    wanted = lu.cfg.GetNodeList()
206
  return utils.NiceSort(wanted)
207

    
208

    
209
def _GetWantedInstances(lu, instances):
210
  """Returns list of checked and expanded instance names.
211

212
  Args:
213
    instances: List of instances (strings) or None for all
214

215
  """
216
  if not isinstance(instances, list):
217
    raise errors.OpPrereqError("Invalid argument type 'instances'")
218

    
219
  if instances:
220
    wanted = []
221

    
222
    for name in instances:
223
      instance = lu.cfg.ExpandInstanceName(name)
224
      if instance is None:
225
        raise errors.OpPrereqError("No such instance name '%s'" % name)
226
      wanted.append(instance)
227

    
228
  else:
229
    wanted = lu.cfg.GetInstanceList()
230
  return utils.NiceSort(wanted)
231

    
232

    
233
def _CheckOutputFields(static, dynamic, selected):
234
  """Checks whether all selected fields are valid.
235

236
  Args:
237
    static: Static fields
238
    dynamic: Dynamic fields
239

240
  """
241
  static_fields = frozenset(static)
242
  dynamic_fields = frozenset(dynamic)
243

    
244
  all_fields = static_fields | dynamic_fields
245

    
246
  if not all_fields.issuperset(selected):
247
    raise errors.OpPrereqError("Unknown output fields selected: %s"
248
                               % ",".join(frozenset(selected).
249
                                          difference(all_fields)))
250

    
251

    
252
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
253
                          memory, vcpus, nics):
254
  """Builds instance related env variables for hooks from single variables.
255

256
  Args:
257
    secondary_nodes: List of secondary nodes as strings
258
  """
259
  env = {
260
    "OP_TARGET": name,
261
    "INSTANCE_NAME": name,
262
    "INSTANCE_PRIMARY": primary_node,
263
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
264
    "INSTANCE_OS_TYPE": os_type,
265
    "INSTANCE_STATUS": status,
266
    "INSTANCE_MEMORY": memory,
267
    "INSTANCE_VCPUS": vcpus,
268
  }
269

    
270
  if nics:
271
    nic_count = len(nics)
272
    for idx, (ip, bridge, mac) in enumerate(nics):
273
      if ip is None:
274
        ip = ""
275
      env["INSTANCE_NIC%d_IP" % idx] = ip
276
      env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
277
      env["INSTANCE_NIC%d_HWADDR" % idx] = mac
278
  else:
279
    nic_count = 0
280

    
281
  env["INSTANCE_NIC_COUNT"] = nic_count
282

    
283
  return env
284

    
285

    
286
def _BuildInstanceHookEnvByObject(instance, override=None):
287
  """Builds instance related env variables for hooks from an object.
288

289
  Args:
290
    instance: objects.Instance object of instance
291
    override: dict of values to override
292
  """
293
  args = {
294
    'name': instance.name,
295
    'primary_node': instance.primary_node,
296
    'secondary_nodes': instance.secondary_nodes,
297
    'os_type': instance.os,
298
    'status': instance.os,
299
    'memory': instance.memory,
300
    'vcpus': instance.vcpus,
301
    'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
302
  }
303
  if override:
304
    args.update(override)
305
  return _BuildInstanceHookEnv(**args)
306

    
307

    
308
def _CheckInstanceBridgesExist(instance):
309
  """Check that the brigdes needed by an instance exist.
310

311
  """
312
  # check bridges existance
313
  brlist = [nic.bridge for nic in instance.nics]
314
  if not rpc.call_bridges_exist(instance.primary_node, brlist):
315
    raise errors.OpPrereqError("one or more target bridges %s does not"
316
                               " exist on destination node '%s'" %
317
                               (brlist, instance.primary_node))
318

    
319

    
320
class LUDestroyCluster(NoHooksLU):
321
  """Logical unit for destroying the cluster.
322

323
  """
324
  _OP_REQP = []
325

    
326
  def CheckPrereq(self):
327
    """Check prerequisites.
328

329
    This checks whether the cluster is empty.
330

331
    Any errors are signalled by raising errors.OpPrereqError.
332

333
    """
334
    master = self.sstore.GetMasterNode()
335

    
336
    nodelist = self.cfg.GetNodeList()
337
    if len(nodelist) != 1 or nodelist[0] != master:
338
      raise errors.OpPrereqError("There are still %d node(s) in"
339
                                 " this cluster." % (len(nodelist) - 1))
340
    instancelist = self.cfg.GetInstanceList()
341
    if instancelist:
342
      raise errors.OpPrereqError("There are still %d instance(s) in"
343
                                 " this cluster." % len(instancelist))
344

    
345
  def Exec(self, feedback_fn):
346
    """Destroys the cluster.
347

348
    """
349
    master = self.sstore.GetMasterNode()
350
    if not rpc.call_node_stop_master(master):
351
      raise errors.OpExecError("Could not disable the master role")
352
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
353
    utils.CreateBackup(priv_key)
354
    utils.CreateBackup(pub_key)
355
    rpc.call_node_leave_cluster(master)
356

    
357

    
358
class LUVerifyCluster(LogicalUnit):
359
  """Verifies the cluster status.
360

361
  """
362
  HPATH = "cluster-verify"
363
  HTYPE = constants.HTYPE_CLUSTER
364
  _OP_REQP = ["skip_checks"]
365

    
366
  def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
367
                  remote_version, feedback_fn):
368
    """Run multiple tests against a node.
369

370
    Test list:
371
      - compares ganeti version
372
      - checks vg existance and size > 20G
373
      - checks config file checksum
374
      - checks ssh to other nodes
375

376
    Args:
377
      node: name of the node to check
378
      file_list: required list of files
379
      local_cksum: dictionary of local files and their checksums
380

381
    """
382
    # compares ganeti version
383
    local_version = constants.PROTOCOL_VERSION
384
    if not remote_version:
385
      feedback_fn("  - ERROR: connection to %s failed" % (node))
386
      return True
387

    
388
    if local_version != remote_version:
389
      feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
390
                      (local_version, node, remote_version))
391
      return True
392

    
393
    # checks vg existance and size > 20G
394

    
395
    bad = False
396
    if not vglist:
397
      feedback_fn("  - ERROR: unable to check volume groups on node %s." %
398
                      (node,))
399
      bad = True
400
    else:
401
      vgstatus = utils.CheckVolumeGroupSize(vglist, self.cfg.GetVGName(),
402
                                            constants.MIN_VG_SIZE)
403
      if vgstatus:
404
        feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
405
        bad = True
406

    
407
    # checks config file checksum
408
    # checks ssh to any
409

    
410
    if 'filelist' not in node_result:
411
      bad = True
412
      feedback_fn("  - ERROR: node hasn't returned file checksum data")
413
    else:
414
      remote_cksum = node_result['filelist']
415
      for file_name in file_list:
416
        if file_name not in remote_cksum:
417
          bad = True
418
          feedback_fn("  - ERROR: file '%s' missing" % file_name)
419
        elif remote_cksum[file_name] != local_cksum[file_name]:
420
          bad = True
421
          feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
422

    
423
    if 'nodelist' not in node_result:
424
      bad = True
425
      feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
426
    else:
427
      if node_result['nodelist']:
428
        bad = True
429
        for node in node_result['nodelist']:
430
          feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
431
                          (node, node_result['nodelist'][node]))
432
    if 'node-net-test' not in node_result:
433
      bad = True
434
      feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
435
    else:
436
      if node_result['node-net-test']:
437
        bad = True
438
        nlist = utils.NiceSort(node_result['node-net-test'].keys())
439
        for node in nlist:
440
          feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
441
                          (node, node_result['node-net-test'][node]))
442

    
443
    hyp_result = node_result.get('hypervisor', None)
444
    if hyp_result is not None:
445
      feedback_fn("  - ERROR: hypervisor verify failure: '%s'" % hyp_result)
446
    return bad
447

    
448
  def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
449
                      node_instance, feedback_fn):
450
    """Verify an instance.
451

452
    This function checks to see if the required block devices are
453
    available on the instance's node.
454

455
    """
456
    bad = False
457

    
458
    node_current = instanceconfig.primary_node
459

    
460
    node_vol_should = {}
461
    instanceconfig.MapLVsByNode(node_vol_should)
462

    
463
    for node in node_vol_should:
464
      for volume in node_vol_should[node]:
465
        if node not in node_vol_is or volume not in node_vol_is[node]:
466
          feedback_fn("  - ERROR: volume %s missing on node %s" %
467
                          (volume, node))
468
          bad = True
469

    
470
    if not instanceconfig.status == 'down':
471
      if (node_current not in node_instance or
472
          not instance in node_instance[node_current]):
473
        feedback_fn("  - ERROR: instance %s not running on node %s" %
474
                        (instance, node_current))
475
        bad = True
476

    
477
    for node in node_instance:
478
      if (not node == node_current):
479
        if instance in node_instance[node]:
480
          feedback_fn("  - ERROR: instance %s should not run on node %s" %
481
                          (instance, node))
482
          bad = True
483

    
484
    return bad
485

    
486
  def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
487
    """Verify if there are any unknown volumes in the cluster.
488

489
    The .os, .swap and backup volumes are ignored. All other volumes are
490
    reported as unknown.
491

492
    """
493
    bad = False
494

    
495
    for node in node_vol_is:
496
      for volume in node_vol_is[node]:
497
        if node not in node_vol_should or volume not in node_vol_should[node]:
498
          feedback_fn("  - ERROR: volume %s on node %s should not exist" %
499
                      (volume, node))
500
          bad = True
501
    return bad
502

    
503
  def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
504
    """Verify the list of running instances.
505

506
    This checks what instances are running but unknown to the cluster.
507

508
    """
509
    bad = False
510
    for node in node_instance:
511
      for runninginstance in node_instance[node]:
512
        if runninginstance not in instancelist:
513
          feedback_fn("  - ERROR: instance %s on node %s should not exist" %
514
                          (runninginstance, node))
515
          bad = True
516
    return bad
517

    
518
  def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
519
    """Verify N+1 Memory Resilience.
520

521
    Check that if one single node dies we can still start all the instances it
522
    was primary for.
523

524
    """
525
    bad = False
526

    
527
    for node, nodeinfo in node_info.iteritems():
528
      # This code checks that every node which is now listed as secondary has
529
      # enough memory to host all instances it is supposed to should a single
530
      # other node in the cluster fail.
531
      # FIXME: not ready for failover to an arbitrary node
532
      # FIXME: does not support file-backed instances
533
      # WARNING: we currently take into account down instances as well as up
534
      # ones, considering that even if they're down someone might want to start
535
      # them even in the event of a node failure.
536
      for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
537
        needed_mem = 0
538
        for instance in instances:
539
          needed_mem += instance_cfg[instance].memory
540
        if nodeinfo['mfree'] < needed_mem:
541
          feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
542
                      " failovers should node %s fail" % (node, prinode))
543
          bad = True
544
    return bad
545

    
546
  def CheckPrereq(self):
547
    """Check prerequisites.
548

549
    Transform the list of checks we're going to skip into a set and check that
550
    all its members are valid.
551

552
    """
553
    self.skip_set = frozenset(self.op.skip_checks)
554
    if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
555
      raise errors.OpPrereqError("Invalid checks to be skipped specified")
556

    
557
  def BuildHooksEnv(self):
558
    """Build hooks env.
559

560
    Cluster-Verify hooks just rone in the post phase and their failure makes
561
    the output be logged in the verify output and the verification to fail.
562

563
    """
564
    all_nodes = self.cfg.GetNodeList()
565
    # TODO: populate the environment with useful information for verify hooks
566
    env = {}
567
    return env, [], all_nodes
568

    
569
  def Exec(self, feedback_fn):
570
    """Verify integrity of cluster, performing various test on nodes.
571

572
    """
573
    bad = False
574
    feedback_fn("* Verifying global settings")
575
    for msg in self.cfg.VerifyConfig():
576
      feedback_fn("  - ERROR: %s" % msg)
577

    
578
    vg_name = self.cfg.GetVGName()
579
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
580
    nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
581
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
582
    i_non_redundant = [] # Non redundant instances
583
    node_volume = {}
584
    node_instance = {}
585
    node_info = {}
586
    instance_cfg = {}
587

    
588
    # FIXME: verify OS list
589
    # do local checksums
590
    file_names = list(self.sstore.GetFileList())
591
    file_names.append(constants.SSL_CERT_FILE)
592
    file_names.append(constants.CLUSTER_CONF_FILE)
593
    local_checksums = utils.FingerprintFiles(file_names)
594

    
595
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
596
    all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
597
    all_instanceinfo = rpc.call_instance_list(nodelist)
598
    all_vglist = rpc.call_vg_list(nodelist)
599
    node_verify_param = {
600
      'filelist': file_names,
601
      'nodelist': nodelist,
602
      'hypervisor': None,
603
      'node-net-test': [(node.name, node.primary_ip, node.secondary_ip)
604
                        for node in nodeinfo]
605
      }
606
    all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
607
    all_rversion = rpc.call_version(nodelist)
608
    all_ninfo = rpc.call_node_info(nodelist, self.cfg.GetVGName())
609

    
610
    for node in nodelist:
611
      feedback_fn("* Verifying node %s" % node)
612
      result = self._VerifyNode(node, file_names, local_checksums,
613
                                all_vglist[node], all_nvinfo[node],
614
                                all_rversion[node], feedback_fn)
615
      bad = bad or result
616

    
617
      # node_volume
618
      volumeinfo = all_volumeinfo[node]
619

    
620
      if isinstance(volumeinfo, basestring):
621
        feedback_fn("  - ERROR: LVM problem on node %s: %s" %
622
                    (node, volumeinfo[-400:].encode('string_escape')))
623
        bad = True
624
        node_volume[node] = {}
625
      elif not isinstance(volumeinfo, dict):
626
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
627
        bad = True
628
        continue
629
      else:
630
        node_volume[node] = volumeinfo
631

    
632
      # node_instance
633
      nodeinstance = all_instanceinfo[node]
634
      if type(nodeinstance) != list:
635
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
636
        bad = True
637
        continue
638

    
639
      node_instance[node] = nodeinstance
640

    
641
      # node_info
642
      nodeinfo = all_ninfo[node]
643
      if not isinstance(nodeinfo, dict):
644
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
645
        bad = True
646
        continue
647

    
648
      try:
649
        node_info[node] = {
650
          "mfree": int(nodeinfo['memory_free']),
651
          "dfree": int(nodeinfo['vg_free']),
652
          "pinst": [],
653
          "sinst": [],
654
          # dictionary holding all instances this node is secondary for,
655
          # grouped by their primary node. Each key is a cluster node, and each
656
          # value is a list of instances which have the key as primary and the
657
          # current node as secondary.  this is handy to calculate N+1 memory
658
          # availability if you can only failover from a primary to its
659
          # secondary.
660
          "sinst-by-pnode": {},
661
        }
662
      except ValueError:
663
        feedback_fn("  - ERROR: invalid value returned from node %s" % (node,))
664
        bad = True
665
        continue
666

    
667
    node_vol_should = {}
668

    
669
    for instance in instancelist:
670
      feedback_fn("* Verifying instance %s" % instance)
671
      inst_config = self.cfg.GetInstanceInfo(instance)
672
      result =  self._VerifyInstance(instance, inst_config, node_volume,
673
                                     node_instance, feedback_fn)
674
      bad = bad or result
675

    
676
      inst_config.MapLVsByNode(node_vol_should)
677

    
678
      instance_cfg[instance] = inst_config
679

    
680
      pnode = inst_config.primary_node
681
      if pnode in node_info:
682
        node_info[pnode]['pinst'].append(instance)
683
      else:
684
        feedback_fn("  - ERROR: instance %s, connection to primary node"
685
                    " %s failed" % (instance, pnode))
686
        bad = True
687

    
688
      # If the instance is non-redundant we cannot survive losing its primary
689
      # node, so we are not N+1 compliant. On the other hand we have no disk
690
      # templates with more than one secondary so that situation is not well
691
      # supported either.
692
      # FIXME: does not support file-backed instances
693
      if len(inst_config.secondary_nodes) == 0:
694
        i_non_redundant.append(instance)
695
      elif len(inst_config.secondary_nodes) > 1:
696
        feedback_fn("  - WARNING: multiple secondaries for instance %s"
697
                    % instance)
698

    
699
      for snode in inst_config.secondary_nodes:
700
        if snode in node_info:
701
          node_info[snode]['sinst'].append(instance)
702
          if pnode not in node_info[snode]['sinst-by-pnode']:
703
            node_info[snode]['sinst-by-pnode'][pnode] = []
704
          node_info[snode]['sinst-by-pnode'][pnode].append(instance)
705
        else:
706
          feedback_fn("  - ERROR: instance %s, connection to secondary node"
707
                      " %s failed" % (instance, snode))
708

    
709
    feedback_fn("* Verifying orphan volumes")
710
    result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
711
                                       feedback_fn)
712
    bad = bad or result
713

    
714
    feedback_fn("* Verifying remaining instances")
715
    result = self._VerifyOrphanInstances(instancelist, node_instance,
716
                                         feedback_fn)
717
    bad = bad or result
718

    
719
    if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
720
      feedback_fn("* Verifying N+1 Memory redundancy")
721
      result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
722
      bad = bad or result
723

    
724
    feedback_fn("* Other Notes")
725
    if i_non_redundant:
726
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
727
                  % len(i_non_redundant))
728

    
729
    return int(bad)
730

    
731
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
732
    """Analize the post-hooks' result, handle it, and send some
733
    nicely-formatted feedback back to the user.
734

735
    Args:
736
      phase: the hooks phase that has just been run
737
      hooks_results: the results of the multi-node hooks rpc call
738
      feedback_fn: function to send feedback back to the caller
739
      lu_result: previous Exec result
740

741
    """
742
    # We only really run POST phase hooks, and are only interested in their results
743
    if phase == constants.HOOKS_PHASE_POST:
744
      # Used to change hooks' output to proper indentation
745
      indent_re = re.compile('^', re.M)
746
      feedback_fn("* Hooks Results")
747
      if not hooks_results:
748
        feedback_fn("  - ERROR: general communication failure")
749
        lu_result = 1
750
      else:
751
        for node_name in hooks_results:
752
          show_node_header = True
753
          res = hooks_results[node_name]
754
          if res is False or not isinstance(res, list):
755
            feedback_fn("    Communication failure")
756
            lu_result = 1
757
            continue
758
          for script, hkr, output in res:
759
            if hkr == constants.HKR_FAIL:
760
              # The node header is only shown once, if there are
761
              # failing hooks on that node
762
              if show_node_header:
763
                feedback_fn("  Node %s:" % node_name)
764
                show_node_header = False
765
              feedback_fn("    ERROR: Script %s failed, output:" % script)
766
              output = indent_re.sub('      ', output)
767
              feedback_fn("%s" % output)
768
              lu_result = 1
769

    
770
      return lu_result
771

    
772

    
773
class LUVerifyDisks(NoHooksLU):
774
  """Verifies the cluster disks status.
775

776
  """
777
  _OP_REQP = []
778

    
779
  def CheckPrereq(self):
780
    """Check prerequisites.
781

782
    This has no prerequisites.
783

784
    """
785
    pass
786

    
787
  def Exec(self, feedback_fn):
788
    """Verify integrity of cluster disks.
789

790
    """
791
    result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
792

    
793
    vg_name = self.cfg.GetVGName()
794
    nodes = utils.NiceSort(self.cfg.GetNodeList())
795
    instances = [self.cfg.GetInstanceInfo(name)
796
                 for name in self.cfg.GetInstanceList()]
797

    
798
    nv_dict = {}
799
    for inst in instances:
800
      inst_lvs = {}
801
      if (inst.status != "up" or
802
          inst.disk_template not in constants.DTS_NET_MIRROR):
803
        continue
804
      inst.MapLVsByNode(inst_lvs)
805
      # transform { iname: {node: [vol,],},} to {(node, vol): iname}
806
      for node, vol_list in inst_lvs.iteritems():
807
        for vol in vol_list:
808
          nv_dict[(node, vol)] = inst
809

    
810
    if not nv_dict:
811
      return result
812

    
813
    node_lvs = rpc.call_volume_list(nodes, vg_name)
814

    
815
    to_act = set()
816
    for node in nodes:
817
      # node_volume
818
      lvs = node_lvs[node]
819

    
820
      if isinstance(lvs, basestring):
821
        logger.Info("error enumerating LVs on node %s: %s" % (node, lvs))
822
        res_nlvm[node] = lvs
823
      elif not isinstance(lvs, dict):
824
        logger.Info("connection to node %s failed or invalid data returned" %
825
                    (node,))
826
        res_nodes.append(node)
827
        continue
828

    
829
      for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
830
        inst = nv_dict.pop((node, lv_name), None)
831
        if (not lv_online and inst is not None
832
            and inst.name not in res_instances):
833
          res_instances.append(inst.name)
834

    
835
    # any leftover items in nv_dict are missing LVs, let's arrange the
836
    # data better
837
    for key, inst in nv_dict.iteritems():
838
      if inst.name not in res_missing:
839
        res_missing[inst.name] = []
840
      res_missing[inst.name].append(key)
841

    
842
    return result
843

    
844

    
845
class LURenameCluster(LogicalUnit):
846
  """Rename the cluster.
847

848
  """
849
  HPATH = "cluster-rename"
850
  HTYPE = constants.HTYPE_CLUSTER
851
  _OP_REQP = ["name"]
852

    
853
  def BuildHooksEnv(self):
854
    """Build hooks env.
855

856
    """
857
    env = {
858
      "OP_TARGET": self.sstore.GetClusterName(),
859
      "NEW_NAME": self.op.name,
860
      }
861
    mn = self.sstore.GetMasterNode()
862
    return env, [mn], [mn]
863

    
864
  def CheckPrereq(self):
865
    """Verify that the passed name is a valid one.
866

867
    """
868
    hostname = utils.HostInfo(self.op.name)
869

    
870
    new_name = hostname.name
871
    self.ip = new_ip = hostname.ip
872
    old_name = self.sstore.GetClusterName()
873
    old_ip = self.sstore.GetMasterIP()
874
    if new_name == old_name and new_ip == old_ip:
875
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
876
                                 " cluster has changed")
877
    if new_ip != old_ip:
878
      result = utils.RunCmd(["fping", "-q", new_ip])
879
      if not result.failed:
880
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
881
                                   " reachable on the network. Aborting." %
882
                                   new_ip)
883

    
884
    self.op.name = new_name
885

    
886
  def Exec(self, feedback_fn):
887
    """Rename the cluster.
888

889
    """
890
    clustername = self.op.name
891
    ip = self.ip
892
    ss = self.sstore
893

    
894
    # shutdown the master IP
895
    master = ss.GetMasterNode()
896
    if not rpc.call_node_stop_master(master):
897
      raise errors.OpExecError("Could not disable the master role")
898

    
899
    try:
900
      # modify the sstore
901
      ss.SetKey(ss.SS_MASTER_IP, ip)
902
      ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
903

    
904
      # Distribute updated ss config to all nodes
905
      myself = self.cfg.GetNodeInfo(master)
906
      dist_nodes = self.cfg.GetNodeList()
907
      if myself.name in dist_nodes:
908
        dist_nodes.remove(myself.name)
909

    
910
      logger.Debug("Copying updated ssconf data to all nodes")
911
      for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
912
        fname = ss.KeyToFilename(keyname)
913
        result = rpc.call_upload_file(dist_nodes, fname)
914
        for to_node in dist_nodes:
915
          if not result[to_node]:
916
            logger.Error("copy of file %s to node %s failed" %
917
                         (fname, to_node))
918
    finally:
919
      if not rpc.call_node_start_master(master):
920
        logger.Error("Could not re-enable the master role on the master,"
921
                     " please restart manually.")
922

    
923

    
924
def _RecursiveCheckIfLVMBased(disk):
925
  """Check if the given disk or its children are lvm-based.
926

927
  Args:
928
    disk: ganeti.objects.Disk object
929

930
  Returns:
931
    boolean indicating whether a LD_LV dev_type was found or not
932

933
  """
934
  if disk.children:
935
    for chdisk in disk.children:
936
      if _RecursiveCheckIfLVMBased(chdisk):
937
        return True
938
  return disk.dev_type == constants.LD_LV
939

    
940

    
941
class LUSetClusterParams(LogicalUnit):
942
  """Change the parameters of the cluster.
943

944
  """
945
  HPATH = "cluster-modify"
946
  HTYPE = constants.HTYPE_CLUSTER
947
  _OP_REQP = []
948

    
949
  def BuildHooksEnv(self):
950
    """Build hooks env.
951

952
    """
953
    env = {
954
      "OP_TARGET": self.sstore.GetClusterName(),
955
      "NEW_VG_NAME": self.op.vg_name,
956
      }
957
    mn = self.sstore.GetMasterNode()
958
    return env, [mn], [mn]
959

    
960
  def CheckPrereq(self):
961
    """Check prerequisites.
962

963
    This checks whether the given params don't conflict and
964
    if the given volume group is valid.
965

966
    """
967
    if not self.op.vg_name:
968
      instances = [self.cfg.GetInstanceInfo(name)
969
                   for name in self.cfg.GetInstanceList()]
970
      for inst in instances:
971
        for disk in inst.disks:
972
          if _RecursiveCheckIfLVMBased(disk):
973
            raise errors.OpPrereqError("Cannot disable lvm storage while"
974
                                       " lvm-based instances exist")
975

    
976
    # if vg_name not None, checks given volume group on all nodes
977
    if self.op.vg_name:
978
      node_list = self.cfg.GetNodeList()
979
      vglist = rpc.call_vg_list(node_list)
980
      for node in node_list:
981
        vgstatus = utils.CheckVolumeGroupSize(vglist[node], self.op.vg_name,
982
                                              constants.MIN_VG_SIZE)
983
        if vgstatus:
984
          raise errors.OpPrereqError("Error on node '%s': %s" %
985
                                     (node, vgstatus))
986

    
987
  def Exec(self, feedback_fn):
988
    """Change the parameters of the cluster.
989

990
    """
991
    if self.op.vg_name != self.cfg.GetVGName():
992
      self.cfg.SetVGName(self.op.vg_name)
993
    else:
994
      feedback_fn("Cluster LVM configuration already in desired"
995
                  " state, not changing")
996

    
997

    
998
def _WaitForSync(cfgw, instance, proc, oneshot=False, unlock=False):
999
  """Sleep and poll for an instance's disk to sync.
1000

1001
  """
1002
  if not instance.disks:
1003
    return True
1004

    
1005
  if not oneshot:
1006
    proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1007

    
1008
  node = instance.primary_node
1009

    
1010
  for dev in instance.disks:
1011
    cfgw.SetDiskID(dev, node)
1012

    
1013
  retries = 0
1014
  while True:
1015
    max_time = 0
1016
    done = True
1017
    cumul_degraded = False
1018
    rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1019
    if not rstats:
1020
      proc.LogWarning("Can't get any data from node %s" % node)
1021
      retries += 1
1022
      if retries >= 10:
1023
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1024
                                 " aborting." % node)
1025
      time.sleep(6)
1026
      continue
1027
    retries = 0
1028
    for i in range(len(rstats)):
1029
      mstat = rstats[i]
1030
      if mstat is None:
1031
        proc.LogWarning("Can't compute data for node %s/%s" %
1032
                        (node, instance.disks[i].iv_name))
1033
        continue
1034
      # we ignore the ldisk parameter
1035
      perc_done, est_time, is_degraded, _ = mstat
1036
      cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1037
      if perc_done is not None:
1038
        done = False
1039
        if est_time is not None:
1040
          rem_time = "%d estimated seconds remaining" % est_time
1041
          max_time = est_time
1042
        else:
1043
          rem_time = "no time estimate"
1044
        proc.LogInfo("- device %s: %5.2f%% done, %s" %
1045
                     (instance.disks[i].iv_name, perc_done, rem_time))
1046
    if done or oneshot:
1047
      break
1048

    
1049
    if unlock:
1050
      #utils.Unlock('cmd')
1051
      pass
1052
    try:
1053
      time.sleep(min(60, max_time))
1054
    finally:
1055
      if unlock:
1056
        #utils.Lock('cmd')
1057
        pass
1058

    
1059
  if done:
1060
    proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1061
  return not cumul_degraded
1062

    
1063

    
1064
def _CheckDiskConsistency(cfgw, dev, node, on_primary, ldisk=False):
1065
  """Check that mirrors are not degraded.
1066

1067
  The ldisk parameter, if True, will change the test from the
1068
  is_degraded attribute (which represents overall non-ok status for
1069
  the device(s)) to the ldisk (representing the local storage status).
1070

1071
  """
1072
  cfgw.SetDiskID(dev, node)
1073
  if ldisk:
1074
    idx = 6
1075
  else:
1076
    idx = 5
1077

    
1078
  result = True
1079
  if on_primary or dev.AssembleOnSecondary():
1080
    rstats = rpc.call_blockdev_find(node, dev)
1081
    if not rstats:
1082
      logger.ToStderr("Node %s: Disk degraded, not found or node down" % node)
1083
      result = False
1084
    else:
1085
      result = result and (not rstats[idx])
1086
  if dev.children:
1087
    for child in dev.children:
1088
      result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1089

    
1090
  return result
1091

    
1092

    
1093
class LUDiagnoseOS(NoHooksLU):
1094
  """Logical unit for OS diagnose/query.
1095

1096
  """
1097
  _OP_REQP = ["output_fields", "names"]
1098

    
1099
  def CheckPrereq(self):
1100
    """Check prerequisites.
1101

1102
    This always succeeds, since this is a pure query LU.
1103

1104
    """
1105
    if self.op.names:
1106
      raise errors.OpPrereqError("Selective OS query not supported")
1107

    
1108
    self.dynamic_fields = frozenset(["name", "valid", "node_status"])
1109
    _CheckOutputFields(static=[],
1110
                       dynamic=self.dynamic_fields,
1111
                       selected=self.op.output_fields)
1112

    
1113
  @staticmethod
1114
  def _DiagnoseByOS(node_list, rlist):
1115
    """Remaps a per-node return list into an a per-os per-node dictionary
1116

1117
      Args:
1118
        node_list: a list with the names of all nodes
1119
        rlist: a map with node names as keys and OS objects as values
1120

1121
      Returns:
1122
        map: a map with osnames as keys and as value another map, with
1123
             nodes as
1124
             keys and list of OS objects as values
1125
             e.g. {"debian-etch": {"node1": [<object>,...],
1126
                                   "node2": [<object>,]}
1127
                  }
1128

1129
    """
1130
    all_os = {}
1131
    for node_name, nr in rlist.iteritems():
1132
      if not nr:
1133
        continue
1134
      for os_obj in nr:
1135
        if os_obj.name not in all_os:
1136
          # build a list of nodes for this os containing empty lists
1137
          # for each node in node_list
1138
          all_os[os_obj.name] = {}
1139
          for nname in node_list:
1140
            all_os[os_obj.name][nname] = []
1141
        all_os[os_obj.name][node_name].append(os_obj)
1142
    return all_os
1143

    
1144
  def Exec(self, feedback_fn):
1145
    """Compute the list of OSes.
1146

1147
    """
1148
    node_list = self.cfg.GetNodeList()
1149
    node_data = rpc.call_os_diagnose(node_list)
1150
    if node_data == False:
1151
      raise errors.OpExecError("Can't gather the list of OSes")
1152
    pol = self._DiagnoseByOS(node_list, node_data)
1153
    output = []
1154
    for os_name, os_data in pol.iteritems():
1155
      row = []
1156
      for field in self.op.output_fields:
1157
        if field == "name":
1158
          val = os_name
1159
        elif field == "valid":
1160
          val = utils.all([osl and osl[0] for osl in os_data.values()])
1161
        elif field == "node_status":
1162
          val = {}
1163
          for node_name, nos_list in os_data.iteritems():
1164
            val[node_name] = [(v.status, v.path) for v in nos_list]
1165
        else:
1166
          raise errors.ParameterError(field)
1167
        row.append(val)
1168
      output.append(row)
1169

    
1170
    return output
1171

    
1172

    
1173
class LURemoveNode(LogicalUnit):
1174
  """Logical unit for removing a node.
1175

1176
  """
1177
  HPATH = "node-remove"
1178
  HTYPE = constants.HTYPE_NODE
1179
  _OP_REQP = ["node_name"]
1180

    
1181
  def BuildHooksEnv(self):
1182
    """Build hooks env.
1183

1184
    This doesn't run on the target node in the pre phase as a failed
1185
    node would not allows itself to run.
1186

1187
    """
1188
    env = {
1189
      "OP_TARGET": self.op.node_name,
1190
      "NODE_NAME": self.op.node_name,
1191
      }
1192
    all_nodes = self.cfg.GetNodeList()
1193
    all_nodes.remove(self.op.node_name)
1194
    return env, all_nodes, all_nodes
1195

    
1196
  def CheckPrereq(self):
1197
    """Check prerequisites.
1198

1199
    This checks:
1200
     - the node exists in the configuration
1201
     - it does not have primary or secondary instances
1202
     - it's not the master
1203

1204
    Any errors are signalled by raising errors.OpPrereqError.
1205

1206
    """
1207
    node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1208
    if node is None:
1209
      raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1210

    
1211
    instance_list = self.cfg.GetInstanceList()
1212

    
1213
    masternode = self.sstore.GetMasterNode()
1214
    if node.name == masternode:
1215
      raise errors.OpPrereqError("Node is the master node,"
1216
                                 " you need to failover first.")
1217

    
1218
    for instance_name in instance_list:
1219
      instance = self.cfg.GetInstanceInfo(instance_name)
1220
      if node.name == instance.primary_node:
1221
        raise errors.OpPrereqError("Instance %s still running on the node,"
1222
                                   " please remove first." % instance_name)
1223
      if node.name in instance.secondary_nodes:
1224
        raise errors.OpPrereqError("Instance %s has node as a secondary,"
1225
                                   " please remove first." % instance_name)
1226
    self.op.node_name = node.name
1227
    self.node = node
1228

    
1229
  def Exec(self, feedback_fn):
1230
    """Removes the node from the cluster.
1231

1232
    """
1233
    node = self.node
1234
    logger.Info("stopping the node daemon and removing configs from node %s" %
1235
                node.name)
1236

    
1237
    rpc.call_node_leave_cluster(node.name)
1238

    
1239
    self.ssh.Run(node.name, 'root', "%s stop" % constants.NODE_INITD_SCRIPT)
1240

    
1241
    logger.Info("Removing node %s from config" % node.name)
1242

    
1243
    self.cfg.RemoveNode(node.name)
1244

    
1245
    utils.RemoveHostFromEtcHosts(node.name)
1246

    
1247

    
1248
class LUQueryNodes(NoHooksLU):
1249
  """Logical unit for querying nodes.
1250

1251
  """
1252
  _OP_REQP = ["output_fields", "names"]
1253

    
1254
  def CheckPrereq(self):
1255
    """Check prerequisites.
1256

1257
    This checks that the fields required are valid output fields.
1258

1259
    """
1260
    self.dynamic_fields = frozenset([
1261
      "dtotal", "dfree",
1262
      "mtotal", "mnode", "mfree",
1263
      "bootid",
1264
      "ctotal",
1265
      ])
1266

    
1267
    _CheckOutputFields(static=["name", "pinst_cnt", "sinst_cnt",
1268
                               "pinst_list", "sinst_list",
1269
                               "pip", "sip"],
1270
                       dynamic=self.dynamic_fields,
1271
                       selected=self.op.output_fields)
1272

    
1273
    self.wanted = _GetWantedNodes(self, self.op.names)
1274

    
1275
  def Exec(self, feedback_fn):
1276
    """Computes the list of nodes and their attributes.
1277

1278
    """
1279
    nodenames = self.wanted
1280
    nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1281

    
1282
    # begin data gathering
1283

    
1284
    if self.dynamic_fields.intersection(self.op.output_fields):
1285
      live_data = {}
1286
      node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1287
      for name in nodenames:
1288
        nodeinfo = node_data.get(name, None)
1289
        if nodeinfo:
1290
          live_data[name] = {
1291
            "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1292
            "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1293
            "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1294
            "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1295
            "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1296
            "ctotal": utils.TryConvert(int, nodeinfo['cpu_total']),
1297
            "bootid": nodeinfo['bootid'],
1298
            }
1299
        else:
1300
          live_data[name] = {}
1301
    else:
1302
      live_data = dict.fromkeys(nodenames, {})
1303

    
1304
    node_to_primary = dict([(name, set()) for name in nodenames])
1305
    node_to_secondary = dict([(name, set()) for name in nodenames])
1306

    
1307
    inst_fields = frozenset(("pinst_cnt", "pinst_list",
1308
                             "sinst_cnt", "sinst_list"))
1309
    if inst_fields & frozenset(self.op.output_fields):
1310
      instancelist = self.cfg.GetInstanceList()
1311

    
1312
      for instance_name in instancelist:
1313
        inst = self.cfg.GetInstanceInfo(instance_name)
1314
        if inst.primary_node in node_to_primary:
1315
          node_to_primary[inst.primary_node].add(inst.name)
1316
        for secnode in inst.secondary_nodes:
1317
          if secnode in node_to_secondary:
1318
            node_to_secondary[secnode].add(inst.name)
1319

    
1320
    # end data gathering
1321

    
1322
    output = []
1323
    for node in nodelist:
1324
      node_output = []
1325
      for field in self.op.output_fields:
1326
        if field == "name":
1327
          val = node.name
1328
        elif field == "pinst_list":
1329
          val = list(node_to_primary[node.name])
1330
        elif field == "sinst_list":
1331
          val = list(node_to_secondary[node.name])
1332
        elif field == "pinst_cnt":
1333
          val = len(node_to_primary[node.name])
1334
        elif field == "sinst_cnt":
1335
          val = len(node_to_secondary[node.name])
1336
        elif field == "pip":
1337
          val = node.primary_ip
1338
        elif field == "sip":
1339
          val = node.secondary_ip
1340
        elif field in self.dynamic_fields:
1341
          val = live_data[node.name].get(field, None)
1342
        else:
1343
          raise errors.ParameterError(field)
1344
        node_output.append(val)
1345
      output.append(node_output)
1346

    
1347
    return output
1348

    
1349

    
1350
class LUQueryNodeVolumes(NoHooksLU):
1351
  """Logical unit for getting volumes on node(s).
1352

1353
  """
1354
  _OP_REQP = ["nodes", "output_fields"]
1355

    
1356
  def CheckPrereq(self):
1357
    """Check prerequisites.
1358

1359
    This checks that the fields required are valid output fields.
1360

1361
    """
1362
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1363

    
1364
    _CheckOutputFields(static=["node"],
1365
                       dynamic=["phys", "vg", "name", "size", "instance"],
1366
                       selected=self.op.output_fields)
1367

    
1368

    
1369
  def Exec(self, feedback_fn):
1370
    """Computes the list of nodes and their attributes.
1371

1372
    """
1373
    nodenames = self.nodes
1374
    volumes = rpc.call_node_volumes(nodenames)
1375

    
1376
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
1377
             in self.cfg.GetInstanceList()]
1378

    
1379
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1380

    
1381
    output = []
1382
    for node in nodenames:
1383
      if node not in volumes or not volumes[node]:
1384
        continue
1385

    
1386
      node_vols = volumes[node][:]
1387
      node_vols.sort(key=lambda vol: vol['dev'])
1388

    
1389
      for vol in node_vols:
1390
        node_output = []
1391
        for field in self.op.output_fields:
1392
          if field == "node":
1393
            val = node
1394
          elif field == "phys":
1395
            val = vol['dev']
1396
          elif field == "vg":
1397
            val = vol['vg']
1398
          elif field == "name":
1399
            val = vol['name']
1400
          elif field == "size":
1401
            val = int(float(vol['size']))
1402
          elif field == "instance":
1403
            for inst in ilist:
1404
              if node not in lv_by_node[inst]:
1405
                continue
1406
              if vol['name'] in lv_by_node[inst][node]:
1407
                val = inst.name
1408
                break
1409
            else:
1410
              val = '-'
1411
          else:
1412
            raise errors.ParameterError(field)
1413
          node_output.append(str(val))
1414

    
1415
        output.append(node_output)
1416

    
1417
    return output
1418

    
1419

    
1420
class LUAddNode(LogicalUnit):
1421
  """Logical unit for adding node to the cluster.
1422

1423
  """
1424
  HPATH = "node-add"
1425
  HTYPE = constants.HTYPE_NODE
1426
  _OP_REQP = ["node_name"]
1427

    
1428
  def BuildHooksEnv(self):
1429
    """Build hooks env.
1430

1431
    This will run on all nodes before, and on all nodes + the new node after.
1432

1433
    """
1434
    env = {
1435
      "OP_TARGET": self.op.node_name,
1436
      "NODE_NAME": self.op.node_name,
1437
      "NODE_PIP": self.op.primary_ip,
1438
      "NODE_SIP": self.op.secondary_ip,
1439
      }
1440
    nodes_0 = self.cfg.GetNodeList()
1441
    nodes_1 = nodes_0 + [self.op.node_name, ]
1442
    return env, nodes_0, nodes_1
1443

    
1444
  def CheckPrereq(self):
1445
    """Check prerequisites.
1446

1447
    This checks:
1448
     - the new node is not already in the config
1449
     - it is resolvable
1450
     - its parameters (single/dual homed) matches the cluster
1451

1452
    Any errors are signalled by raising errors.OpPrereqError.
1453

1454
    """
1455
    node_name = self.op.node_name
1456
    cfg = self.cfg
1457

    
1458
    dns_data = utils.HostInfo(node_name)
1459

    
1460
    node = dns_data.name
1461
    primary_ip = self.op.primary_ip = dns_data.ip
1462
    secondary_ip = getattr(self.op, "secondary_ip", None)
1463
    if secondary_ip is None:
1464
      secondary_ip = primary_ip
1465
    if not utils.IsValidIP(secondary_ip):
1466
      raise errors.OpPrereqError("Invalid secondary IP given")
1467
    self.op.secondary_ip = secondary_ip
1468

    
1469
    node_list = cfg.GetNodeList()
1470
    if not self.op.readd and node in node_list:
1471
      raise errors.OpPrereqError("Node %s is already in the configuration" %
1472
                                 node)
1473
    elif self.op.readd and node not in node_list:
1474
      raise errors.OpPrereqError("Node %s is not in the configuration" % node)
1475

    
1476
    for existing_node_name in node_list:
1477
      existing_node = cfg.GetNodeInfo(existing_node_name)
1478

    
1479
      if self.op.readd and node == existing_node_name:
1480
        if (existing_node.primary_ip != primary_ip or
1481
            existing_node.secondary_ip != secondary_ip):
1482
          raise errors.OpPrereqError("Readded node doesn't have the same IP"
1483
                                     " address configuration as before")
1484
        continue
1485

    
1486
      if (existing_node.primary_ip == primary_ip or
1487
          existing_node.secondary_ip == primary_ip or
1488
          existing_node.primary_ip == secondary_ip or
1489
          existing_node.secondary_ip == secondary_ip):
1490
        raise errors.OpPrereqError("New node ip address(es) conflict with"
1491
                                   " existing node %s" % existing_node.name)
1492

    
1493
    # check that the type of the node (single versus dual homed) is the
1494
    # same as for the master
1495
    myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1496
    master_singlehomed = myself.secondary_ip == myself.primary_ip
1497
    newbie_singlehomed = secondary_ip == primary_ip
1498
    if master_singlehomed != newbie_singlehomed:
1499
      if master_singlehomed:
1500
        raise errors.OpPrereqError("The master has no private ip but the"
1501
                                   " new node has one")
1502
      else:
1503
        raise errors.OpPrereqError("The master has a private ip but the"
1504
                                   " new node doesn't have one")
1505

    
1506
    # checks reachablity
1507
    if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1508
      raise errors.OpPrereqError("Node not reachable by ping")
1509

    
1510
    if not newbie_singlehomed:
1511
      # check reachability from my secondary ip to newbie's secondary ip
1512
      if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1513
                           source=myself.secondary_ip):
1514
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1515
                                   " based ping to noded port")
1516

    
1517
    self.new_node = objects.Node(name=node,
1518
                                 primary_ip=primary_ip,
1519
                                 secondary_ip=secondary_ip)
1520

    
1521
    if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1522
      if not os.path.exists(constants.VNC_PASSWORD_FILE):
1523
        raise errors.OpPrereqError("Cluster VNC password file %s missing" %
1524
                                   constants.VNC_PASSWORD_FILE)
1525

    
1526
  def Exec(self, feedback_fn):
1527
    """Adds the new node to the cluster.
1528

1529
    """
1530
    new_node = self.new_node
1531
    node = new_node.name
1532

    
1533
    # set up inter-node password and certificate and restarts the node daemon
1534
    gntpass = self.sstore.GetNodeDaemonPassword()
1535
    if not re.match('^[a-zA-Z0-9.]{1,64}$', gntpass):
1536
      raise errors.OpExecError("ganeti password corruption detected")
1537
    f = open(constants.SSL_CERT_FILE)
1538
    try:
1539
      gntpem = f.read(8192)
1540
    finally:
1541
      f.close()
1542
    # in the base64 pem encoding, neither '!' nor '.' are valid chars,
1543
    # so we use this to detect an invalid certificate; as long as the
1544
    # cert doesn't contain this, the here-document will be correctly
1545
    # parsed by the shell sequence below
1546
    if re.search('^!EOF\.', gntpem, re.MULTILINE):
1547
      raise errors.OpExecError("invalid PEM encoding in the SSL certificate")
1548
    if not gntpem.endswith("\n"):
1549
      raise errors.OpExecError("PEM must end with newline")
1550
    logger.Info("copy cluster pass to %s and starting the node daemon" % node)
1551

    
1552
    # and then connect with ssh to set password and start ganeti-noded
1553
    # note that all the below variables are sanitized at this point,
1554
    # either by being constants or by the checks above
1555
    ss = self.sstore
1556
    mycommand = ("umask 077 && "
1557
                 "echo '%s' > '%s' && "
1558
                 "cat > '%s' << '!EOF.' && \n"
1559
                 "%s!EOF.\n%s restart" %
1560
                 (gntpass, ss.KeyToFilename(ss.SS_NODED_PASS),
1561
                  constants.SSL_CERT_FILE, gntpem,
1562
                  constants.NODE_INITD_SCRIPT))
1563

    
1564
    result = self.ssh.Run(node, 'root', mycommand, batch=False, ask_key=True)
1565
    if result.failed:
1566
      raise errors.OpExecError("Remote command on node %s, error: %s,"
1567
                               " output: %s" %
1568
                               (node, result.fail_reason, result.output))
1569

    
1570
    # check connectivity
1571
    time.sleep(4)
1572

    
1573
    result = rpc.call_version([node])[node]
1574
    if result:
1575
      if constants.PROTOCOL_VERSION == result:
1576
        logger.Info("communication to node %s fine, sw version %s match" %
1577
                    (node, result))
1578
      else:
1579
        raise errors.OpExecError("Version mismatch master version %s,"
1580
                                 " node version %s" %
1581
                                 (constants.PROTOCOL_VERSION, result))
1582
    else:
1583
      raise errors.OpExecError("Cannot get version from the new node")
1584

    
1585
    # setup ssh on node
1586
    logger.Info("copy ssh key to node %s" % node)
1587
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1588
    keyarray = []
1589
    keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1590
                constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1591
                priv_key, pub_key]
1592

    
1593
    for i in keyfiles:
1594
      f = open(i, 'r')
1595
      try:
1596
        keyarray.append(f.read())
1597
      finally:
1598
        f.close()
1599

    
1600
    result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1601
                               keyarray[3], keyarray[4], keyarray[5])
1602

    
1603
    if not result:
1604
      raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1605

    
1606
    # Add node to our /etc/hosts, and add key to known_hosts
1607
    utils.AddHostToEtcHosts(new_node.name)
1608

    
1609
    if new_node.secondary_ip != new_node.primary_ip:
1610
      if not rpc.call_node_tcp_ping(new_node.name,
1611
                                    constants.LOCALHOST_IP_ADDRESS,
1612
                                    new_node.secondary_ip,
1613
                                    constants.DEFAULT_NODED_PORT,
1614
                                    10, False):
1615
        raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1616
                                 " you gave (%s). Please fix and re-run this"
1617
                                 " command." % new_node.secondary_ip)
1618

    
1619
    success, msg = self.ssh.VerifyNodeHostname(node)
1620
    if not success:
1621
      raise errors.OpExecError("Node '%s' claims it has a different hostname"
1622
                               " than the one the resolver gives: %s."
1623
                               " Please fix and re-run this command." %
1624
                               (node, msg))
1625

    
1626
    # Distribute updated /etc/hosts and known_hosts to all nodes,
1627
    # including the node just added
1628
    myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1629
    dist_nodes = self.cfg.GetNodeList()
1630
    if not self.op.readd:
1631
      dist_nodes.append(node)
1632
    if myself.name in dist_nodes:
1633
      dist_nodes.remove(myself.name)
1634

    
1635
    logger.Debug("Copying hosts and known_hosts to all nodes")
1636
    for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1637
      result = rpc.call_upload_file(dist_nodes, fname)
1638
      for to_node in dist_nodes:
1639
        if not result[to_node]:
1640
          logger.Error("copy of file %s to node %s failed" %
1641
                       (fname, to_node))
1642

    
1643
    to_copy = ss.GetFileList()
1644
    if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1645
      to_copy.append(constants.VNC_PASSWORD_FILE)
1646
    for fname in to_copy:
1647
      if not self.ssh.CopyFileToNode(node, fname):
1648
        logger.Error("could not copy file %s to node %s" % (fname, node))
1649

    
1650
    if not self.op.readd:
1651
      logger.Info("adding node %s to cluster.conf" % node)
1652
      self.cfg.AddNode(new_node)
1653

    
1654

    
1655
class LUMasterFailover(LogicalUnit):
1656
  """Failover the master node to the current node.
1657

1658
  This is a special LU in that it must run on a non-master node.
1659

1660
  """
1661
  HPATH = "master-failover"
1662
  HTYPE = constants.HTYPE_CLUSTER
1663
  REQ_MASTER = False
1664
  _OP_REQP = []
1665

    
1666
  def BuildHooksEnv(self):
1667
    """Build hooks env.
1668

1669
    This will run on the new master only in the pre phase, and on all
1670
    the nodes in the post phase.
1671

1672
    """
1673
    env = {
1674
      "OP_TARGET": self.new_master,
1675
      "NEW_MASTER": self.new_master,
1676
      "OLD_MASTER": self.old_master,
1677
      }
1678
    return env, [self.new_master], self.cfg.GetNodeList()
1679

    
1680
  def CheckPrereq(self):
1681
    """Check prerequisites.
1682

1683
    This checks that we are not already the master.
1684

1685
    """
1686
    self.new_master = utils.HostInfo().name
1687
    self.old_master = self.sstore.GetMasterNode()
1688

    
1689
    if self.old_master == self.new_master:
1690
      raise errors.OpPrereqError("This commands must be run on the node"
1691
                                 " where you want the new master to be."
1692
                                 " %s is already the master" %
1693
                                 self.old_master)
1694

    
1695
  def Exec(self, feedback_fn):
1696
    """Failover the master node.
1697

1698
    This command, when run on a non-master node, will cause the current
1699
    master to cease being master, and the non-master to become new
1700
    master.
1701

1702
    """
1703
    #TODO: do not rely on gethostname returning the FQDN
1704
    logger.Info("setting master to %s, old master: %s" %
1705
                (self.new_master, self.old_master))
1706

    
1707
    if not rpc.call_node_stop_master(self.old_master):
1708
      logger.Error("could disable the master role on the old master"
1709
                   " %s, please disable manually" % self.old_master)
1710

    
1711
    ss = self.sstore
1712
    ss.SetKey(ss.SS_MASTER_NODE, self.new_master)
1713
    if not rpc.call_upload_file(self.cfg.GetNodeList(),
1714
                                ss.KeyToFilename(ss.SS_MASTER_NODE)):
1715
      logger.Error("could not distribute the new simple store master file"
1716
                   " to the other nodes, please check.")
1717

    
1718
    if not rpc.call_node_start_master(self.new_master):
1719
      logger.Error("could not start the master role on the new master"
1720
                   " %s, please check" % self.new_master)
1721
      feedback_fn("Error in activating the master IP on the new master,"
1722
                  " please fix manually.")
1723

    
1724

    
1725

    
1726
class LUQueryClusterInfo(NoHooksLU):
1727
  """Query cluster configuration.
1728

1729
  """
1730
  _OP_REQP = []
1731
  REQ_MASTER = False
1732

    
1733
  def CheckPrereq(self):
1734
    """No prerequsites needed for this LU.
1735

1736
    """
1737
    pass
1738

    
1739
  def Exec(self, feedback_fn):
1740
    """Return cluster config.
1741

1742
    """
1743
    result = {
1744
      "name": self.sstore.GetClusterName(),
1745
      "software_version": constants.RELEASE_VERSION,
1746
      "protocol_version": constants.PROTOCOL_VERSION,
1747
      "config_version": constants.CONFIG_VERSION,
1748
      "os_api_version": constants.OS_API_VERSION,
1749
      "export_version": constants.EXPORT_VERSION,
1750
      "master": self.sstore.GetMasterNode(),
1751
      "architecture": (platform.architecture()[0], platform.machine()),
1752
      "hypervisor_type": self.sstore.GetHypervisorType(),
1753
      }
1754

    
1755
    return result
1756

    
1757

    
1758
class LUClusterCopyFile(NoHooksLU):
1759
  """Copy file to cluster.
1760

1761
  """
1762
  _OP_REQP = ["nodes", "filename"]
1763

    
1764
  def CheckPrereq(self):
1765
    """Check prerequisites.
1766

1767
    It should check that the named file exists and that the given list
1768
    of nodes is valid.
1769

1770
    """
1771
    if not os.path.exists(self.op.filename):
1772
      raise errors.OpPrereqError("No such filename '%s'" % self.op.filename)
1773

    
1774
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1775

    
1776
  def Exec(self, feedback_fn):
1777
    """Copy a file from master to some nodes.
1778

1779
    Args:
1780
      opts - class with options as members
1781
      args - list containing a single element, the file name
1782
    Opts used:
1783
      nodes - list containing the name of target nodes; if empty, all nodes
1784

1785
    """
1786
    filename = self.op.filename
1787

    
1788
    myname = utils.HostInfo().name
1789

    
1790
    for node in self.nodes:
1791
      if node == myname:
1792
        continue
1793
      if not self.ssh.CopyFileToNode(node, filename):
1794
        logger.Error("Copy of file %s to node %s failed" % (filename, node))
1795

    
1796

    
1797
class LUDumpClusterConfig(NoHooksLU):
1798
  """Return a text-representation of the cluster-config.
1799

1800
  """
1801
  _OP_REQP = []
1802

    
1803
  def CheckPrereq(self):
1804
    """No prerequisites.
1805

1806
    """
1807
    pass
1808

    
1809
  def Exec(self, feedback_fn):
1810
    """Dump a representation of the cluster config to the standard output.
1811

1812
    """
1813
    return self.cfg.DumpConfig()
1814

    
1815

    
1816
class LURunClusterCommand(NoHooksLU):
1817
  """Run a command on some nodes.
1818

1819
  """
1820
  _OP_REQP = ["command", "nodes"]
1821

    
1822
  def CheckPrereq(self):
1823
    """Check prerequisites.
1824

1825
    It checks that the given list of nodes is valid.
1826

1827
    """
1828
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1829

    
1830
  def Exec(self, feedback_fn):
1831
    """Run a command on some nodes.
1832

1833
    """
1834
    # put the master at the end of the nodes list
1835
    master_node = self.sstore.GetMasterNode()
1836
    if master_node in self.nodes:
1837
      self.nodes.remove(master_node)
1838
      self.nodes.append(master_node)
1839

    
1840
    data = []
1841
    for node in self.nodes:
1842
      result = self.ssh.Run(node, "root", self.op.command)
1843
      data.append((node, result.output, result.exit_code))
1844

    
1845
    return data
1846

    
1847

    
1848
class LUActivateInstanceDisks(NoHooksLU):
1849
  """Bring up an instance's disks.
1850

1851
  """
1852
  _OP_REQP = ["instance_name"]
1853

    
1854
  def CheckPrereq(self):
1855
    """Check prerequisites.
1856

1857
    This checks that the instance is in the cluster.
1858

1859
    """
1860
    instance = self.cfg.GetInstanceInfo(
1861
      self.cfg.ExpandInstanceName(self.op.instance_name))
1862
    if instance is None:
1863
      raise errors.OpPrereqError("Instance '%s' not known" %
1864
                                 self.op.instance_name)
1865
    self.instance = instance
1866

    
1867

    
1868
  def Exec(self, feedback_fn):
1869
    """Activate the disks.
1870

1871
    """
1872
    disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
1873
    if not disks_ok:
1874
      raise errors.OpExecError("Cannot activate block devices")
1875

    
1876
    return disks_info
1877

    
1878

    
1879
def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
1880
  """Prepare the block devices for an instance.
1881

1882
  This sets up the block devices on all nodes.
1883

1884
  Args:
1885
    instance: a ganeti.objects.Instance object
1886
    ignore_secondaries: if true, errors on secondary nodes won't result
1887
                        in an error return from the function
1888

1889
  Returns:
1890
    false if the operation failed
1891
    list of (host, instance_visible_name, node_visible_name) if the operation
1892
         suceeded with the mapping from node devices to instance devices
1893
  """
1894
  device_info = []
1895
  disks_ok = True
1896
  iname = instance.name
1897
  # With the two passes mechanism we try to reduce the window of
1898
  # opportunity for the race condition of switching DRBD to primary
1899
  # before handshaking occured, but we do not eliminate it
1900

    
1901
  # The proper fix would be to wait (with some limits) until the
1902
  # connection has been made and drbd transitions from WFConnection
1903
  # into any other network-connected state (Connected, SyncTarget,
1904
  # SyncSource, etc.)
1905

    
1906
  # 1st pass, assemble on all nodes in secondary mode
1907
  for inst_disk in instance.disks:
1908
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1909
      cfg.SetDiskID(node_disk, node)
1910
      result = rpc.call_blockdev_assemble(node, node_disk, iname, False)
1911
      if not result:
1912
        logger.Error("could not prepare block device %s on node %s"
1913
                     " (is_primary=False, pass=1)" % (inst_disk.iv_name, node))
1914
        if not ignore_secondaries:
1915
          disks_ok = False
1916

    
1917
  # FIXME: race condition on drbd migration to primary
1918

    
1919
  # 2nd pass, do only the primary node
1920
  for inst_disk in instance.disks:
1921
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1922
      if node != instance.primary_node:
1923
        continue
1924
      cfg.SetDiskID(node_disk, node)
1925
      result = rpc.call_blockdev_assemble(node, node_disk, iname, True)
1926
      if not result:
1927
        logger.Error("could not prepare block device %s on node %s"
1928
                     " (is_primary=True, pass=2)" % (inst_disk.iv_name, node))
1929
        disks_ok = False
1930
    device_info.append((instance.primary_node, inst_disk.iv_name, result))
1931

    
1932
  # leave the disks configured for the primary node
1933
  # this is a workaround that would be fixed better by
1934
  # improving the logical/physical id handling
1935
  for disk in instance.disks:
1936
    cfg.SetDiskID(disk, instance.primary_node)
1937

    
1938
  return disks_ok, device_info
1939

    
1940

    
1941
def _StartInstanceDisks(cfg, instance, force):
1942
  """Start the disks of an instance.
1943

1944
  """
1945
  disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
1946
                                           ignore_secondaries=force)
1947
  if not disks_ok:
1948
    _ShutdownInstanceDisks(instance, cfg)
1949
    if force is not None and not force:
1950
      logger.Error("If the message above refers to a secondary node,"
1951
                   " you can retry the operation using '--force'.")
1952
    raise errors.OpExecError("Disk consistency error")
1953

    
1954

    
1955
class LUDeactivateInstanceDisks(NoHooksLU):
1956
  """Shutdown an instance's disks.
1957

1958
  """
1959
  _OP_REQP = ["instance_name"]
1960

    
1961
  def CheckPrereq(self):
1962
    """Check prerequisites.
1963

1964
    This checks that the instance is in the cluster.
1965

1966
    """
1967
    instance = self.cfg.GetInstanceInfo(
1968
      self.cfg.ExpandInstanceName(self.op.instance_name))
1969
    if instance is None:
1970
      raise errors.OpPrereqError("Instance '%s' not known" %
1971
                                 self.op.instance_name)
1972
    self.instance = instance
1973

    
1974
  def Exec(self, feedback_fn):
1975
    """Deactivate the disks
1976

1977
    """
1978
    instance = self.instance
1979
    ins_l = rpc.call_instance_list([instance.primary_node])
1980
    ins_l = ins_l[instance.primary_node]
1981
    if not type(ins_l) is list:
1982
      raise errors.OpExecError("Can't contact node '%s'" %
1983
                               instance.primary_node)
1984

    
1985
    if self.instance.name in ins_l:
1986
      raise errors.OpExecError("Instance is running, can't shutdown"
1987
                               " block devices.")
1988

    
1989
    _ShutdownInstanceDisks(instance, self.cfg)
1990

    
1991

    
1992
def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
1993
  """Shutdown block devices of an instance.
1994

1995
  This does the shutdown on all nodes of the instance.
1996

1997
  If the ignore_primary is false, errors on the primary node are
1998
  ignored.
1999

2000
  """
2001
  result = True
2002
  for disk in instance.disks:
2003
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2004
      cfg.SetDiskID(top_disk, node)
2005
      if not rpc.call_blockdev_shutdown(node, top_disk):
2006
        logger.Error("could not shutdown block device %s on node %s" %
2007
                     (disk.iv_name, node))
2008
        if not ignore_primary or node != instance.primary_node:
2009
          result = False
2010
  return result
2011

    
2012

    
2013
def _CheckNodeFreeMemory(cfg, node, reason, requested):
2014
  """Checks if a node has enough free memory.
2015

2016
  This function check if a given node has the needed amount of free
2017
  memory. In case the node has less memory or we cannot get the
2018
  information from the node, this function raise an OpPrereqError
2019
  exception.
2020

2021
  Args:
2022
    - cfg: a ConfigWriter instance
2023
    - node: the node name
2024
    - reason: string to use in the error message
2025
    - requested: the amount of memory in MiB
2026

2027
  """
2028
  nodeinfo = rpc.call_node_info([node], cfg.GetVGName())
2029
  if not nodeinfo or not isinstance(nodeinfo, dict):
2030
    raise errors.OpPrereqError("Could not contact node %s for resource"
2031
                             " information" % (node,))
2032

    
2033
  free_mem = nodeinfo[node].get('memory_free')
2034
  if not isinstance(free_mem, int):
2035
    raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2036
                             " was '%s'" % (node, free_mem))
2037
  if requested > free_mem:
2038
    raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2039
                             " needed %s MiB, available %s MiB" %
2040
                             (node, reason, requested, free_mem))
2041

    
2042

    
2043
class LUStartupInstance(LogicalUnit):
2044
  """Starts an instance.
2045

2046
  """
2047
  HPATH = "instance-start"
2048
  HTYPE = constants.HTYPE_INSTANCE
2049
  _OP_REQP = ["instance_name", "force"]
2050

    
2051
  def BuildHooksEnv(self):
2052
    """Build hooks env.
2053

2054
    This runs on master, primary and secondary nodes of the instance.
2055

2056
    """
2057
    env = {
2058
      "FORCE": self.op.force,
2059
      }
2060
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2061
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2062
          list(self.instance.secondary_nodes))
2063
    return env, nl, nl
2064

    
2065
  def CheckPrereq(self):
2066
    """Check prerequisites.
2067

2068
    This checks that the instance is in the cluster.
2069

2070
    """
2071
    instance = self.cfg.GetInstanceInfo(
2072
      self.cfg.ExpandInstanceName(self.op.instance_name))
2073
    if instance is None:
2074
      raise errors.OpPrereqError("Instance '%s' not known" %
2075
                                 self.op.instance_name)
2076

    
2077
    # check bridges existance
2078
    _CheckInstanceBridgesExist(instance)
2079

    
2080
    _CheckNodeFreeMemory(self.cfg, instance.primary_node,
2081
                         "starting instance %s" % instance.name,
2082
                         instance.memory)
2083

    
2084
    self.instance = instance
2085
    self.op.instance_name = instance.name
2086

    
2087
  def Exec(self, feedback_fn):
2088
    """Start the instance.
2089

2090
    """
2091
    instance = self.instance
2092
    force = self.op.force
2093
    extra_args = getattr(self.op, "extra_args", "")
2094

    
2095
    self.cfg.MarkInstanceUp(instance.name)
2096

    
2097
    node_current = instance.primary_node
2098

    
2099
    _StartInstanceDisks(self.cfg, instance, force)
2100

    
2101
    if not rpc.call_instance_start(node_current, instance, extra_args):
2102
      _ShutdownInstanceDisks(instance, self.cfg)
2103
      raise errors.OpExecError("Could not start instance")
2104

    
2105

    
2106
class LURebootInstance(LogicalUnit):
2107
  """Reboot an instance.
2108

2109
  """
2110
  HPATH = "instance-reboot"
2111
  HTYPE = constants.HTYPE_INSTANCE
2112
  _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2113

    
2114
  def BuildHooksEnv(self):
2115
    """Build hooks env.
2116

2117
    This runs on master, primary and secondary nodes of the instance.
2118

2119
    """
2120
    env = {
2121
      "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2122
      }
2123
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2124
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2125
          list(self.instance.secondary_nodes))
2126
    return env, nl, nl
2127

    
2128
  def CheckPrereq(self):
2129
    """Check prerequisites.
2130

2131
    This checks that the instance is in the cluster.
2132

2133
    """
2134
    instance = self.cfg.GetInstanceInfo(
2135
      self.cfg.ExpandInstanceName(self.op.instance_name))
2136
    if instance is None:
2137
      raise errors.OpPrereqError("Instance '%s' not known" %
2138
                                 self.op.instance_name)
2139

    
2140
    # check bridges existance
2141
    _CheckInstanceBridgesExist(instance)
2142

    
2143
    self.instance = instance
2144
    self.op.instance_name = instance.name
2145

    
2146
  def Exec(self, feedback_fn):
2147
    """Reboot the instance.
2148

2149
    """
2150
    instance = self.instance
2151
    ignore_secondaries = self.op.ignore_secondaries
2152
    reboot_type = self.op.reboot_type
2153
    extra_args = getattr(self.op, "extra_args", "")
2154

    
2155
    node_current = instance.primary_node
2156

    
2157
    if reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2158
                           constants.INSTANCE_REBOOT_HARD,
2159
                           constants.INSTANCE_REBOOT_FULL]:
2160
      raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2161
                                  (constants.INSTANCE_REBOOT_SOFT,
2162
                                   constants.INSTANCE_REBOOT_HARD,
2163
                                   constants.INSTANCE_REBOOT_FULL))
2164

    
2165
    if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2166
                       constants.INSTANCE_REBOOT_HARD]:
2167
      if not rpc.call_instance_reboot(node_current, instance,
2168
                                      reboot_type, extra_args):
2169
        raise errors.OpExecError("Could not reboot instance")
2170
    else:
2171
      if not rpc.call_instance_shutdown(node_current, instance):
2172
        raise errors.OpExecError("could not shutdown instance for full reboot")
2173
      _ShutdownInstanceDisks(instance, self.cfg)
2174
      _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
2175
      if not rpc.call_instance_start(node_current, instance, extra_args):
2176
        _ShutdownInstanceDisks(instance, self.cfg)
2177
        raise errors.OpExecError("Could not start instance for full reboot")
2178

    
2179
    self.cfg.MarkInstanceUp(instance.name)
2180

    
2181

    
2182
class LUShutdownInstance(LogicalUnit):
2183
  """Shutdown an instance.
2184

2185
  """
2186
  HPATH = "instance-stop"
2187
  HTYPE = constants.HTYPE_INSTANCE
2188
  _OP_REQP = ["instance_name"]
2189

    
2190
  def BuildHooksEnv(self):
2191
    """Build hooks env.
2192

2193
    This runs on master, primary and secondary nodes of the instance.
2194

2195
    """
2196
    env = _BuildInstanceHookEnvByObject(self.instance)
2197
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2198
          list(self.instance.secondary_nodes))
2199
    return env, nl, nl
2200

    
2201
  def CheckPrereq(self):
2202
    """Check prerequisites.
2203

2204
    This checks that the instance is in the cluster.
2205

2206
    """
2207
    instance = self.cfg.GetInstanceInfo(
2208
      self.cfg.ExpandInstanceName(self.op.instance_name))
2209
    if instance is None:
2210
      raise errors.OpPrereqError("Instance '%s' not known" %
2211
                                 self.op.instance_name)
2212
    self.instance = instance
2213

    
2214
  def Exec(self, feedback_fn):
2215
    """Shutdown the instance.
2216

2217
    """
2218
    instance = self.instance
2219
    node_current = instance.primary_node
2220
    self.cfg.MarkInstanceDown(instance.name)
2221
    if not rpc.call_instance_shutdown(node_current, instance):
2222
      logger.Error("could not shutdown instance")
2223

    
2224
    _ShutdownInstanceDisks(instance, self.cfg)
2225

    
2226

    
2227
class LUReinstallInstance(LogicalUnit):
2228
  """Reinstall an instance.
2229

2230
  """
2231
  HPATH = "instance-reinstall"
2232
  HTYPE = constants.HTYPE_INSTANCE
2233
  _OP_REQP = ["instance_name"]
2234

    
2235
  def BuildHooksEnv(self):
2236
    """Build hooks env.
2237

2238
    This runs on master, primary and secondary nodes of the instance.
2239

2240
    """
2241
    env = _BuildInstanceHookEnvByObject(self.instance)
2242
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2243
          list(self.instance.secondary_nodes))
2244
    return env, nl, nl
2245

    
2246
  def CheckPrereq(self):
2247
    """Check prerequisites.
2248

2249
    This checks that the instance is in the cluster and is not running.
2250

2251
    """
2252
    instance = self.cfg.GetInstanceInfo(
2253
      self.cfg.ExpandInstanceName(self.op.instance_name))
2254
    if instance is None:
2255
      raise errors.OpPrereqError("Instance '%s' not known" %
2256
                                 self.op.instance_name)
2257
    if instance.disk_template == constants.DT_DISKLESS:
2258
      raise errors.OpPrereqError("Instance '%s' has no disks" %
2259
                                 self.op.instance_name)
2260
    if instance.status != "down":
2261
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2262
                                 self.op.instance_name)
2263
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2264
    if remote_info:
2265
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2266
                                 (self.op.instance_name,
2267
                                  instance.primary_node))
2268

    
2269
    self.op.os_type = getattr(self.op, "os_type", None)
2270
    if self.op.os_type is not None:
2271
      # OS verification
2272
      pnode = self.cfg.GetNodeInfo(
2273
        self.cfg.ExpandNodeName(instance.primary_node))
2274
      if pnode is None:
2275
        raise errors.OpPrereqError("Primary node '%s' is unknown" %
2276
                                   self.op.pnode)
2277
      os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2278
      if not os_obj:
2279
        raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2280
                                   " primary node"  % self.op.os_type)
2281

    
2282
    self.instance = instance
2283

    
2284
  def Exec(self, feedback_fn):
2285
    """Reinstall the instance.
2286

2287
    """
2288
    inst = self.instance
2289

    
2290
    if self.op.os_type is not None:
2291
      feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2292
      inst.os = self.op.os_type
2293
      self.cfg.AddInstance(inst)
2294

    
2295
    _StartInstanceDisks(self.cfg, inst, None)
2296
    try:
2297
      feedback_fn("Running the instance OS create scripts...")
2298
      if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2299
        raise errors.OpExecError("Could not install OS for instance %s"
2300
                                 " on node %s" %
2301
                                 (inst.name, inst.primary_node))
2302
    finally:
2303
      _ShutdownInstanceDisks(inst, self.cfg)
2304

    
2305

    
2306
class LURenameInstance(LogicalUnit):
2307
  """Rename an instance.
2308

2309
  """
2310
  HPATH = "instance-rename"
2311
  HTYPE = constants.HTYPE_INSTANCE
2312
  _OP_REQP = ["instance_name", "new_name"]
2313

    
2314
  def BuildHooksEnv(self):
2315
    """Build hooks env.
2316

2317
    This runs on master, primary and secondary nodes of the instance.
2318

2319
    """
2320
    env = _BuildInstanceHookEnvByObject(self.instance)
2321
    env["INSTANCE_NEW_NAME"] = self.op.new_name
2322
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2323
          list(self.instance.secondary_nodes))
2324
    return env, nl, nl
2325

    
2326
  def CheckPrereq(self):
2327
    """Check prerequisites.
2328

2329
    This checks that the instance is in the cluster and is not running.
2330

2331
    """
2332
    instance = self.cfg.GetInstanceInfo(
2333
      self.cfg.ExpandInstanceName(self.op.instance_name))
2334
    if instance is None:
2335
      raise errors.OpPrereqError("Instance '%s' not known" %
2336
                                 self.op.instance_name)
2337
    if instance.status != "down":
2338
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2339
                                 self.op.instance_name)
2340
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2341
    if remote_info:
2342
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2343
                                 (self.op.instance_name,
2344
                                  instance.primary_node))
2345
    self.instance = instance
2346

    
2347
    # new name verification
2348
    name_info = utils.HostInfo(self.op.new_name)
2349

    
2350
    self.op.new_name = new_name = name_info.name
2351
    instance_list = self.cfg.GetInstanceList()
2352
    if new_name in instance_list:
2353
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2354
                                 new_name)
2355

    
2356
    if not getattr(self.op, "ignore_ip", False):
2357
      command = ["fping", "-q", name_info.ip]
2358
      result = utils.RunCmd(command)
2359
      if not result.failed:
2360
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
2361
                                   (name_info.ip, new_name))
2362

    
2363

    
2364
  def Exec(self, feedback_fn):
2365
    """Reinstall the instance.
2366

2367
    """
2368
    inst = self.instance
2369
    old_name = inst.name
2370

    
2371
    if inst.disk_template == constants.DT_FILE:
2372
      old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2373

    
2374
    self.cfg.RenameInstance(inst.name, self.op.new_name)
2375

    
2376
    # re-read the instance from the configuration after rename
2377
    inst = self.cfg.GetInstanceInfo(self.op.new_name)
2378

    
2379
    if inst.disk_template == constants.DT_FILE:
2380
      new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2381
      result = rpc.call_file_storage_dir_rename(inst.primary_node,
2382
                                                old_file_storage_dir,
2383
                                                new_file_storage_dir)
2384

    
2385
      if not result:
2386
        raise errors.OpExecError("Could not connect to node '%s' to rename"
2387
                                 " directory '%s' to '%s' (but the instance"
2388
                                 " has been renamed in Ganeti)" % (
2389
                                 inst.primary_node, old_file_storage_dir,
2390
                                 new_file_storage_dir))
2391

    
2392
      if not result[0]:
2393
        raise errors.OpExecError("Could not rename directory '%s' to '%s'"
2394
                                 " (but the instance has been renamed in"
2395
                                 " Ganeti)" % (old_file_storage_dir,
2396
                                               new_file_storage_dir))
2397

    
2398
    _StartInstanceDisks(self.cfg, inst, None)
2399
    try:
2400
      if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2401
                                          "sda", "sdb"):
2402
        msg = ("Could run OS rename script for instance %s on node %s (but the"
2403
               " instance has been renamed in Ganeti)" %
2404
               (inst.name, inst.primary_node))
2405
        logger.Error(msg)
2406
    finally:
2407
      _ShutdownInstanceDisks(inst, self.cfg)
2408

    
2409

    
2410
class LURemoveInstance(LogicalUnit):
2411
  """Remove an instance.
2412

2413
  """
2414
  HPATH = "instance-remove"
2415
  HTYPE = constants.HTYPE_INSTANCE
2416
  _OP_REQP = ["instance_name", "ignore_failures"]
2417

    
2418
  def BuildHooksEnv(self):
2419
    """Build hooks env.
2420

2421
    This runs on master, primary and secondary nodes of the instance.
2422

2423
    """
2424
    env = _BuildInstanceHookEnvByObject(self.instance)
2425
    nl = [self.sstore.GetMasterNode()]
2426
    return env, nl, nl
2427

    
2428
  def CheckPrereq(self):
2429
    """Check prerequisites.
2430

2431
    This checks that the instance is in the cluster.
2432

2433
    """
2434
    instance = self.cfg.GetInstanceInfo(
2435
      self.cfg.ExpandInstanceName(self.op.instance_name))
2436
    if instance is None:
2437
      raise errors.OpPrereqError("Instance '%s' not known" %
2438
                                 self.op.instance_name)
2439
    self.instance = instance
2440

    
2441
  def Exec(self, feedback_fn):
2442
    """Remove the instance.
2443

2444
    """
2445
    instance = self.instance
2446
    logger.Info("shutting down instance %s on node %s" %
2447
                (instance.name, instance.primary_node))
2448

    
2449
    if not rpc.call_instance_shutdown(instance.primary_node, instance):
2450
      if self.op.ignore_failures:
2451
        feedback_fn("Warning: can't shutdown instance")
2452
      else:
2453
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2454
                                 (instance.name, instance.primary_node))
2455

    
2456
    logger.Info("removing block devices for instance %s" % instance.name)
2457

    
2458
    if not _RemoveDisks(instance, self.cfg):
2459
      if self.op.ignore_failures:
2460
        feedback_fn("Warning: can't remove instance's disks")
2461
      else:
2462
        raise errors.OpExecError("Can't remove instance's disks")
2463

    
2464
    logger.Info("removing instance %s out of cluster config" % instance.name)
2465

    
2466
    self.cfg.RemoveInstance(instance.name)
2467

    
2468

    
2469
class LUQueryInstances(NoHooksLU):
2470
  """Logical unit for querying instances.
2471

2472
  """
2473
  _OP_REQP = ["output_fields", "names"]
2474

    
2475
  def CheckPrereq(self):
2476
    """Check prerequisites.
2477

2478
    This checks that the fields required are valid output fields.
2479

2480
    """
2481
    self.dynamic_fields = frozenset(["oper_state", "oper_ram", "status"])
2482
    _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2483
                               "admin_state", "admin_ram",
2484
                               "disk_template", "ip", "mac", "bridge",
2485
                               "sda_size", "sdb_size", "vcpus"],
2486
                       dynamic=self.dynamic_fields,
2487
                       selected=self.op.output_fields)
2488

    
2489
    self.wanted = _GetWantedInstances(self, self.op.names)
2490

    
2491
  def Exec(self, feedback_fn):
2492
    """Computes the list of nodes and their attributes.
2493

2494
    """
2495
    instance_names = self.wanted
2496
    instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2497
                     in instance_names]
2498

    
2499
    # begin data gathering
2500

    
2501
    nodes = frozenset([inst.primary_node for inst in instance_list])
2502

    
2503
    bad_nodes = []
2504
    if self.dynamic_fields.intersection(self.op.output_fields):
2505
      live_data = {}
2506
      node_data = rpc.call_all_instances_info(nodes)
2507
      for name in nodes:
2508
        result = node_data[name]
2509
        if result:
2510
          live_data.update(result)
2511
        elif result == False:
2512
          bad_nodes.append(name)
2513
        # else no instance is alive
2514
    else:
2515
      live_data = dict([(name, {}) for name in instance_names])
2516

    
2517
    # end data gathering
2518

    
2519
    output = []
2520
    for instance in instance_list:
2521
      iout = []
2522
      for field in self.op.output_fields:
2523
        if field == "name":
2524
          val = instance.name
2525
        elif field == "os":
2526
          val = instance.os
2527
        elif field == "pnode":
2528
          val = instance.primary_node
2529
        elif field == "snodes":
2530
          val = list(instance.secondary_nodes)
2531
        elif field == "admin_state":
2532
          val = (instance.status != "down")
2533
        elif field == "oper_state":
2534
          if instance.primary_node in bad_nodes:
2535
            val = None
2536
          else:
2537
            val = bool(live_data.get(instance.name))
2538
        elif field == "status":
2539
          if instance.primary_node in bad_nodes:
2540
            val = "ERROR_nodedown"
2541
          else:
2542
            running = bool(live_data.get(instance.name))
2543
            if running:
2544
              if instance.status != "down":
2545
                val = "running"
2546
              else:
2547
                val = "ERROR_up"
2548
            else:
2549
              if instance.status != "down":
2550
                val = "ERROR_down"
2551
              else:
2552
                val = "ADMIN_down"
2553
        elif field == "admin_ram":
2554
          val = instance.memory
2555
        elif field == "oper_ram":
2556
          if instance.primary_node in bad_nodes:
2557
            val = None
2558
          elif instance.name in live_data:
2559
            val = live_data[instance.name].get("memory", "?")
2560
          else:
2561
            val = "-"
2562
        elif field == "disk_template":
2563
          val = instance.disk_template
2564
        elif field == "ip":
2565
          val = instance.nics[0].ip
2566
        elif field == "bridge":
2567
          val = instance.nics[0].bridge
2568
        elif field == "mac":
2569
          val = instance.nics[0].mac
2570
        elif field == "sda_size" or field == "sdb_size":
2571
          disk = instance.FindDisk(field[:3])
2572
          if disk is None:
2573
            val = None
2574
          else:
2575
            val = disk.size
2576
        elif field == "vcpus":
2577
          val = instance.vcpus
2578
        else:
2579
          raise errors.ParameterError(field)
2580
        iout.append(val)
2581
      output.append(iout)
2582

    
2583
    return output
2584

    
2585

    
2586
class LUFailoverInstance(LogicalUnit):
2587
  """Failover an instance.
2588

2589
  """
2590
  HPATH = "instance-failover"
2591
  HTYPE = constants.HTYPE_INSTANCE
2592
  _OP_REQP = ["instance_name", "ignore_consistency"]
2593

    
2594
  def BuildHooksEnv(self):
2595
    """Build hooks env.
2596

2597
    This runs on master, primary and secondary nodes of the instance.
2598

2599
    """
2600
    env = {
2601
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2602
      }
2603
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2604
    nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2605
    return env, nl, nl
2606

    
2607
  def CheckPrereq(self):
2608
    """Check prerequisites.
2609

2610
    This checks that the instance is in the cluster.
2611

2612
    """
2613
    instance = self.cfg.GetInstanceInfo(
2614
      self.cfg.ExpandInstanceName(self.op.instance_name))
2615
    if instance is None:
2616
      raise errors.OpPrereqError("Instance '%s' not known" %
2617
                                 self.op.instance_name)
2618

    
2619
    if instance.disk_template not in constants.DTS_NET_MIRROR:
2620
      raise errors.OpPrereqError("Instance's disk layout is not"
2621
                                 " network mirrored, cannot failover.")
2622

    
2623
    secondary_nodes = instance.secondary_nodes
2624
    if not secondary_nodes:
2625
      raise errors.ProgrammerError("no secondary node but using "
2626
                                   "a mirrored disk template")
2627

    
2628
    target_node = secondary_nodes[0]
2629
    # check memory requirements on the secondary node
2630
    _CheckNodeFreeMemory(self.cfg, target_node, "failing over instance %s" %
2631
                         instance.name, instance.memory)
2632

    
2633
    # check bridge existance
2634
    brlist = [nic.bridge for nic in instance.nics]
2635
    if not rpc.call_bridges_exist(target_node, brlist):
2636
      raise errors.OpPrereqError("One or more target bridges %s does not"
2637
                                 " exist on destination node '%s'" %
2638
                                 (brlist, target_node))
2639

    
2640
    self.instance = instance
2641

    
2642
  def Exec(self, feedback_fn):
2643
    """Failover an instance.
2644

2645
    The failover is done by shutting it down on its present node and
2646
    starting it on the secondary.
2647

2648
    """
2649
    instance = self.instance
2650

    
2651
    source_node = instance.primary_node
2652
    target_node = instance.secondary_nodes[0]
2653

    
2654
    feedback_fn("* checking disk consistency between source and target")
2655
    for dev in instance.disks:
2656
      # for drbd, these are drbd over lvm
2657
      if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2658
        if instance.status == "up" and not self.op.ignore_consistency:
2659
          raise errors.OpExecError("Disk %s is degraded on target node,"
2660
                                   " aborting failover." % dev.iv_name)
2661

    
2662
    feedback_fn("* shutting down instance on source node")
2663
    logger.Info("Shutting down instance %s on node %s" %
2664
                (instance.name, source_node))
2665

    
2666
    if not rpc.call_instance_shutdown(source_node, instance):
2667
      if self.op.ignore_consistency:
2668
        logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2669
                     " anyway. Please make sure node %s is down"  %
2670
                     (instance.name, source_node, source_node))
2671
      else:
2672
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2673
                                 (instance.name, source_node))
2674

    
2675
    feedback_fn("* deactivating the instance's disks on source node")
2676
    if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2677
      raise errors.OpExecError("Can't shut down the instance's disks.")
2678

    
2679
    instance.primary_node = target_node
2680
    # distribute new instance config to the other nodes
2681
    self.cfg.AddInstance(instance)
2682

    
2683
    # Only start the instance if it's marked as up
2684
    if instance.status == "up":
2685
      feedback_fn("* activating the instance's disks on target node")
2686
      logger.Info("Starting instance %s on node %s" %
2687
                  (instance.name, target_node))
2688

    
2689
      disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2690
                                               ignore_secondaries=True)
2691
      if not disks_ok:
2692
        _ShutdownInstanceDisks(instance, self.cfg)
2693
        raise errors.OpExecError("Can't activate the instance's disks")
2694

    
2695
      feedback_fn("* starting the instance on the target node")
2696
      if not rpc.call_instance_start(target_node, instance, None):
2697
        _ShutdownInstanceDisks(instance, self.cfg)
2698
        raise errors.OpExecError("Could not start instance %s on node %s." %
2699
                                 (instance.name, target_node))
2700

    
2701

    
2702
def _CreateBlockDevOnPrimary(cfg, node, instance, device, info):
2703
  """Create a tree of block devices on the primary node.
2704

2705
  This always creates all devices.
2706

2707
  """
2708
  if device.children:
2709
    for child in device.children:
2710
      if not _CreateBlockDevOnPrimary(cfg, node, instance, child, info):
2711
        return False
2712

    
2713
  cfg.SetDiskID(device, node)
2714
  new_id = rpc.call_blockdev_create(node, device, device.size,
2715
                                    instance.name, True, info)
2716
  if not new_id:
2717
    return False
2718
  if device.physical_id is None:
2719
    device.physical_id = new_id
2720
  return True
2721

    
2722

    
2723
def _CreateBlockDevOnSecondary(cfg, node, instance, device, force, info):
2724
  """Create a tree of block devices on a secondary node.
2725

2726
  If this device type has to be created on secondaries, create it and
2727
  all its children.
2728

2729
  If not, just recurse to children keeping the same 'force' value.
2730

2731
  """
2732
  if device.CreateOnSecondary():
2733
    force = True
2734
  if device.children:
2735
    for child in device.children:
2736
      if not _CreateBlockDevOnSecondary(cfg, node, instance,
2737
                                        child, force, info):
2738
        return False
2739

    
2740
  if not force:
2741
    return True
2742
  cfg.SetDiskID(device, node)
2743
  new_id = rpc.call_blockdev_create(node, device, device.size,
2744
                                    instance.name, False, info)
2745
  if not new_id:
2746
    return False
2747
  if device.physical_id is None:
2748
    device.physical_id = new_id
2749
  return True
2750

    
2751

    
2752
def _GenerateUniqueNames(cfg, exts):
2753
  """Generate a suitable LV name.
2754

2755
  This will generate a logical volume name for the given instance.
2756

2757
  """
2758
  results = []
2759
  for val in exts:
2760
    new_id = cfg.GenerateUniqueID()
2761
    results.append("%s%s" % (new_id, val))
2762
  return results
2763

    
2764

    
2765
def _GenerateMDDRBDBranch(cfg, primary, secondary, size, names):
2766
  """Generate a drbd device complete with its children.
2767

2768
  """
2769
  port = cfg.AllocatePort()
2770
  vgname = cfg.GetVGName()
2771
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2772
                          logical_id=(vgname, names[0]))
2773
  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2774
                          logical_id=(vgname, names[1]))
2775
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD7, size=size,
2776
                          logical_id = (primary, secondary, port),
2777
                          children = [dev_data, dev_meta])
2778
  return drbd_dev
2779

    
2780

    
2781
def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name):
2782
  """Generate a drbd8 device complete with its children.
2783

2784
  """
2785
  port = cfg.AllocatePort()
2786
  vgname = cfg.GetVGName()
2787
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2788
                          logical_id=(vgname, names[0]))
2789
  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2790
                          logical_id=(vgname, names[1]))
2791
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
2792
                          logical_id = (primary, secondary, port),
2793
                          children = [dev_data, dev_meta],
2794
                          iv_name=iv_name)
2795
  return drbd_dev
2796

    
2797

    
2798
def _GenerateDiskTemplate(cfg, template_name,
2799
                          instance_name, primary_node,
2800
                          secondary_nodes, disk_sz, swap_sz,
2801
                          file_storage_dir, file_driver):
2802
  """Generate the entire disk layout for a given template type.
2803

2804
  """
2805
  #TODO: compute space requirements
2806

    
2807
  vgname = cfg.GetVGName()
2808
  if template_name == constants.DT_DISKLESS:
2809
    disks = []
2810
  elif template_name == constants.DT_PLAIN:
2811
    if len(secondary_nodes) != 0:
2812
      raise errors.ProgrammerError("Wrong template configuration")
2813

    
2814
    names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
2815
    sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2816
                           logical_id=(vgname, names[0]),
2817
                           iv_name = "sda")
2818
    sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2819
                           logical_id=(vgname, names[1]),
2820
                           iv_name = "sdb")
2821
    disks = [sda_dev, sdb_dev]
2822
  elif template_name == constants.DT_DRBD8:
2823
    if len(secondary_nodes) != 1:
2824
      raise errors.ProgrammerError("Wrong template configuration")
2825
    remote_node = secondary_nodes[0]
2826
    names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2827
                                       ".sdb_data", ".sdb_meta"])
2828
    drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2829
                                         disk_sz, names[0:2], "sda")
2830
    drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2831
                                         swap_sz, names[2:4], "sdb")
2832
    disks = [drbd_sda_dev, drbd_sdb_dev]
2833
  elif template_name == constants.DT_FILE:
2834
    if len(secondary_nodes) != 0:
2835
      raise errors.ProgrammerError("Wrong template configuration")
2836

    
2837
    file_sda_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk_sz,
2838
                                iv_name="sda", logical_id=(file_driver,
2839
                                "%s/sda" % file_storage_dir))
2840
    file_sdb_dev = objects.Disk(dev_type=constants.LD_FILE, size=swap_sz,
2841
                                iv_name="sdb", logical_id=(file_driver,
2842
                                "%s/sdb" % file_storage_dir))
2843
    disks = [file_sda_dev, file_sdb_dev]
2844
  else:
2845
    raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
2846
  return disks
2847

    
2848

    
2849
def _GetInstanceInfoText(instance):
2850
  """Compute that text that should be added to the disk's metadata.
2851

2852
  """
2853
  return "originstname+%s" % instance.name
2854

    
2855

    
2856
def _CreateDisks(cfg, instance):
2857
  """Create all disks for an instance.
2858

2859
  This abstracts away some work from AddInstance.
2860

2861
  Args:
2862
    instance: the instance object
2863

2864
  Returns:
2865
    True or False showing the success of the creation process
2866

2867
  """
2868
  info = _GetInstanceInfoText(instance)
2869

    
2870
  if instance.disk_template == constants.DT_FILE:
2871
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
2872
    result = rpc.call_file_storage_dir_create(instance.primary_node,
2873
                                              file_storage_dir)
2874

    
2875
    if not result:
2876
      logger.Error("Could not connect to node '%s'" % instance.primary_node)
2877
      return False
2878

    
2879
    if not result[0]:
2880
      logger.Error("failed to create directory '%s'" % file_storage_dir)
2881
      return False
2882

    
2883
  for device in instance.disks:
2884
    logger.Info("creating volume %s for instance %s" %
2885
                (device.iv_name, instance.name))
2886
    #HARDCODE
2887
    for secondary_node in instance.secondary_nodes:
2888
      if not _CreateBlockDevOnSecondary(cfg, secondary_node, instance,
2889
                                        device, False, info):
2890
        logger.Error("failed to create volume %s (%s) on secondary node %s!" %
2891
                     (device.iv_name, device, secondary_node))
2892
        return False
2893
    #HARDCODE
2894
    if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
2895
                                    instance, device, info):
2896
      logger.Error("failed to create volume %s on primary!" %
2897
                   device.iv_name)
2898
      return False
2899

    
2900
  return True
2901

    
2902

    
2903
def _RemoveDisks(instance, cfg):
2904
  """Remove all disks for an instance.
2905

2906
  This abstracts away some work from `AddInstance()` and
2907
  `RemoveInstance()`. Note that in case some of the devices couldn't
2908
  be removed, the removal will continue with the other ones (compare
2909
  with `_CreateDisks()`).
2910

2911
  Args:
2912
    instance: the instance object
2913

2914
  Returns:
2915
    True or False showing the success of the removal proces
2916

2917
  """
2918
  logger.Info("removing block devices for instance %s" % instance.name)
2919

    
2920
  result = True
2921
  for device in instance.disks:
2922
    for node, disk in device.ComputeNodeTree(instance.primary_node):
2923
      cfg.SetDiskID(disk, node)
2924
      if not rpc.call_blockdev_remove(node, disk):
2925
        logger.Error("could not remove block device %s on node %s,"
2926
                     " continuing anyway" %
2927
                     (device.iv_name, node))
2928
        result = False
2929

    
2930
  if instance.disk_template == constants.DT_FILE:
2931
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
2932
    if not rpc.call_file_storage_dir_remove(instance.primary_node,
2933
                                            file_storage_dir):
2934
      logger.Error("could not remove directory '%s'" % file_storage_dir)
2935
      result = False
2936

    
2937
  return result
2938

    
2939

    
2940
def _ComputeDiskSize(disk_template, disk_size, swap_size):
2941
  """Compute disk size requirements in the volume group
2942

2943
  This is currently hard-coded for the two-drive layout.
2944

2945
  """
2946
  # Required free disk space as a function of disk and swap space
2947
  req_size_dict = {
2948
    constants.DT_DISKLESS: None,
2949
    constants.DT_PLAIN: disk_size + swap_size,
2950
    # 256 MB are added for drbd metadata, 128MB for each drbd device
2951
    constants.DT_DRBD8: disk_size + swap_size + 256,
2952
    constants.DT_FILE: None,
2953
  }
2954

    
2955
  if disk_template not in req_size_dict:
2956
    raise errors.ProgrammerError("Disk template '%s' size requirement"
2957
                                 " is unknown" %  disk_template)
2958

    
2959
  return req_size_dict[disk_template]
2960

    
2961

    
2962
class LUCreateInstance(LogicalUnit):
2963
  """Create an instance.
2964

2965
  """
2966
  HPATH = "instance-add"
2967
  HTYPE = constants.HTYPE_INSTANCE
2968
  _OP_REQP = ["instance_name", "mem_size", "disk_size",
2969
              "disk_template", "swap_size", "mode", "start", "vcpus",
2970
              "wait_for_sync", "ip_check", "mac"]
2971

    
2972
  def _RunAllocator(self):
2973
    """Run the allocator based on input opcode.
2974

2975
    """
2976
    disks = [{"size": self.op.disk_size, "mode": "w"},
2977
             {"size": self.op.swap_size, "mode": "w"}]
2978
    nics = [{"mac": self.op.mac, "ip": getattr(self.op, "ip", None),
2979
             "bridge": self.op.bridge}]
2980
    ial = IAllocator(self.cfg, self.sstore,
2981
                     mode=constants.IALLOCATOR_MODE_ALLOC,
2982
                     name=self.op.instance_name,
2983
                     disk_template=self.op.disk_template,
2984
                     tags=[],
2985
                     os=self.op.os_type,
2986
                     vcpus=self.op.vcpus,
2987
                     mem_size=self.op.mem_size,
2988
                     disks=disks,
2989
                     nics=nics,
2990
                     )
2991

    
2992
    ial.Run(self.op.iallocator)
2993

    
2994
    if not ial.success:
2995
      raise errors.OpPrereqError("Can't compute nodes using"
2996
                                 " iallocator '%s': %s" % (self.op.iallocator,
2997
                                                           ial.info))
2998
    if len(ial.nodes) != ial.required_nodes:
2999
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3000
                                 " of nodes (%s), required %s" %
3001
                                 (len(ial.nodes), ial.required_nodes))
3002
    self.op.pnode = ial.nodes[0]
3003
    logger.ToStdout("Selected nodes for the instance: %s" %
3004
                    (", ".join(ial.nodes),))
3005
    logger.Info("Selected nodes for instance %s via iallocator %s: %s" %
3006
                (self.op.instance_name, self.op.iallocator, ial.nodes))
3007
    if ial.required_nodes == 2:
3008
      self.op.snode = ial.nodes[1]
3009

    
3010
  def BuildHooksEnv(self):
3011
    """Build hooks env.
3012

3013
    This runs on master, primary and secondary nodes of the instance.
3014

3015
    """
3016
    env = {
3017
      "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
3018
      "INSTANCE_DISK_SIZE": self.op.disk_size,
3019
      "INSTANCE_SWAP_SIZE": self.op.swap_size,
3020
      "INSTANCE_ADD_MODE": self.op.mode,
3021
      }
3022
    if self.op.mode == constants.INSTANCE_IMPORT:
3023
      env["INSTANCE_SRC_NODE"] = self.op.src_node
3024
      env["INSTANCE_SRC_PATH"] = self.op.src_path
3025
      env["INSTANCE_SRC_IMAGE"] = self.src_image
3026

    
3027
    env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
3028
      primary_node=self.op.pnode,
3029
      secondary_nodes=self.secondaries,
3030
      status=self.instance_status,
3031
      os_type=self.op.os_type,
3032
      memory=self.op.mem_size,
3033
      vcpus=self.op.vcpus,
3034
      nics=[(self.inst_ip, self.op.bridge, self.op.mac)],
3035
    ))
3036

    
3037
    nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
3038
          self.secondaries)
3039
    return env, nl, nl
3040

    
3041

    
3042
  def CheckPrereq(self):
3043
    """Check prerequisites.
3044

3045
    """
3046
    # set optional parameters to none if they don't exist
3047
    for attr in ["kernel_path", "initrd_path", "hvm_boot_order", "pnode",
3048
                 "iallocator", "hvm_acpi", "hvm_pae", "hvm_cdrom_image_path",
3049
                 "vnc_bind_address"]:
3050
      if not hasattr(self.op, attr):
3051
        setattr(self.op, attr, None)
3052

    
3053
    if self.op.mode not in (constants.INSTANCE_CREATE,
3054
                            constants.INSTANCE_IMPORT):
3055
      raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
3056
                                 self.op.mode)
3057

    
3058
    if (not self.cfg.GetVGName() and
3059
        self.op.disk_template not in constants.DTS_NOT_LVM):
3060
      raise errors.OpPrereqError("Cluster does not support lvm-based"
3061
                                 " instances")
3062

    
3063
    if self.op.mode == constants.INSTANCE_IMPORT:
3064
      src_node = getattr(self.op, "src_node", None)
3065
      src_path = getattr(self.op, "src_path", None)
3066
      if src_node is None or src_path is None:
3067
        raise errors.OpPrereqError("Importing an instance requires source"
3068
                                   " node and path options")
3069
      src_node_full = self.cfg.ExpandNodeName(src_node)
3070
      if src_node_full is None:
3071
        raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
3072
      self.op.src_node = src_node = src_node_full
3073

    
3074
      if not os.path.isabs(src_path):
3075
        raise errors.OpPrereqError("The source path must be absolute")
3076

    
3077
      export_info = rpc.call_export_info(src_node, src_path)
3078

    
3079
      if not export_info:
3080
        raise errors.OpPrereqError("No export found in dir %s" % src_path)
3081

    
3082
      if not export_info.has_section(constants.INISECT_EXP):
3083
        raise errors.ProgrammerError("Corrupted export config")
3084

    
3085
      ei_version = export_info.get(constants.INISECT_EXP, 'version')
3086
      if (int(ei_version) != constants.EXPORT_VERSION):
3087
        raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3088
                                   (ei_version, constants.EXPORT_VERSION))
3089

    
3090
      if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
3091
        raise errors.OpPrereqError("Can't import instance with more than"
3092
                                   " one data disk")
3093

    
3094
      # FIXME: are the old os-es, disk sizes, etc. useful?
3095
      self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3096
      diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
3097
                                                         'disk0_dump'))
3098
      self.src_image = diskimage
3099
    else: # INSTANCE_CREATE
3100
      if getattr(self.op, "os_type", None) is None:
3101
        raise errors.OpPrereqError("No guest OS specified")
3102

    
3103
    #### instance parameters check
3104

    
3105
    # disk template and mirror node verification
3106
    if self.op.disk_template not in constants.DISK_TEMPLATES:
3107
      raise errors.OpPrereqError("Invalid disk template name")
3108

    
3109
    # instance name verification
3110
    hostname1 = utils.HostInfo(self.op.instance_name)
3111

    
3112
    self.op.instance_name = instance_name = hostname1.name
3113
    instance_list = self.cfg.GetInstanceList()
3114
    if instance_name in instance_list:
3115
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3116
                                 instance_name)
3117

    
3118
    # ip validity checks
3119
    ip = getattr(self.op, "ip", None)
3120
    if ip is None or ip.lower() == "none":
3121
      inst_ip = None
3122
    elif ip.lower() == "auto":
3123
      inst_ip = hostname1.ip
3124
    else:
3125
      if not utils.IsValidIP(ip):
3126
        raise errors.OpPrereqError("given IP address '%s' doesn't look"
3127
                                   " like a valid IP" % ip)
3128
      inst_ip = ip
3129
    self.inst_ip = self.op.ip = inst_ip
3130

    
3131
    if self.op.start and not self.op.ip_check:
3132
      raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3133
                                 " adding an instance in start mode")
3134

    
3135
    if self.op.ip_check:
3136
      if utils.TcpPing(hostname1.ip, constants.DEFAULT_NODED_PORT):
3137
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
3138
                                   (hostname1.ip, instance_name))
3139

    
3140
    # MAC address verification
3141
    if self.op.mac != "auto":
3142
      if not utils.IsValidMac(self.op.mac.lower()):
3143
        raise errors.OpPrereqError("invalid MAC address specified: %s" %
3144
                                   self.op.mac)
3145

    
3146
    # bridge verification
3147
    bridge = getattr(self.op, "bridge", None)
3148
    if bridge is None:
3149
      self.op.bridge = self.cfg.GetDefBridge()
3150
    else:
3151
      self.op.bridge = bridge
3152

    
3153
    # boot order verification
3154
    if self.op.hvm_boot_order is not None:
3155
      if len(self.op.hvm_boot_order.strip("acdn")) != 0:
3156
        raise errors.OpPrereqError("invalid boot order specified,"
3157
                                   " must be one or more of [acdn]")
3158
    # file storage checks
3159
    if (self.op.file_driver and
3160
        not self.op.file_driver in constants.FILE_DRIVER):
3161
      raise errors.OpPrereqError("Invalid file driver name '%s'" %
3162
                                 self.op.file_driver)
3163

    
3164
    if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
3165
      raise errors.OpPrereqError("File storage directory not a relative"
3166
                                 " path")
3167
    #### allocator run
3168

    
3169
    if [self.op.iallocator, self.op.pnode].count(None) != 1:
3170
      raise errors.OpPrereqError("One and only one of iallocator and primary"
3171
                                 " node must be given")
3172

    
3173
    if self.op.iallocator is not None:
3174
      self._RunAllocator()
3175

    
3176
    #### node related checks
3177

    
3178
    # check primary node
3179
    pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
3180
    if pnode is None:
3181
      raise errors.OpPrereqError("Primary node '%s' is unknown" %
3182
                                 self.op.pnode)
3183
    self.op.pnode = pnode.name
3184
    self.pnode = pnode
3185
    self.secondaries = []
3186

    
3187
    # mirror node verification
3188
    if self.op.disk_template in constants.DTS_NET_MIRROR:
3189
      if getattr(self.op, "snode", None) is None:
3190
        raise errors.OpPrereqError("The networked disk templates need"
3191
                                   " a mirror node")
3192

    
3193
      snode_name = self.cfg.ExpandNodeName(self.op.snode)
3194
      if snode_name is None:
3195
        raise errors.OpPrereqError("Unknown secondary node '%s'" %
3196
                                   self.op.snode)
3197
      elif snode_name == pnode.name:
3198
        raise errors.OpPrereqError("The secondary node cannot be"
3199
                                   " the primary node.")
3200
      self.secondaries.append(snode_name)
3201

    
3202
    req_size = _ComputeDiskSize(self.op.disk_template,
3203
                                self.op.disk_size, self.op.swap_size)
3204

    
3205
    # Check lv size requirements
3206
    if req_size is not None:
3207
      nodenames = [pnode.name] + self.secondaries
3208
      nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
3209
      for node in nodenames:
3210
        info = nodeinfo.get(node, None)
3211
        if not info:
3212
          raise errors.OpPrereqError("Cannot get current information"
3213
                                     " from node '%s'" % nodeinfo)
3214
        vg_free = info.get('vg_free', None)
3215
        if not isinstance(vg_free, int):
3216
          raise errors.OpPrereqError("Can't compute free disk space on"
3217
                                     " node %s" % node)
3218
        if req_size > info['vg_free']:
3219
          raise errors.OpPrereqError("Not enough disk space on target node %s."
3220
                                     " %d MB available, %d MB required" %
3221
                                     (node, info['vg_free'], req_size))
3222

    
3223
    # os verification
3224
    os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
3225
    if not os_obj:
3226
      raise errors.OpPrereqError("OS '%s' not in supported os list for"
3227
                                 " primary node"  % self.op.os_type)
3228

    
3229
    if self.op.kernel_path == constants.VALUE_NONE:
3230
      raise errors.OpPrereqError("Can't set instance kernel to none")
3231

    
3232

    
3233
    # bridge check on primary node
3234
    if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3235
      raise errors.OpPrereqError("target bridge '%s' does not exist on"
3236
                                 " destination node '%s'" %
3237
                                 (self.op.bridge, pnode.name))
3238

    
3239
    # memory check on primary node
3240
    if self.op.start:
3241
      _CheckNodeFreeMemory(self.cfg, self.pnode.name,
3242
                           "creating instance %s" % self.op.instance_name,
3243
                           self.op.mem_size)
3244

    
3245
    # hvm_cdrom_image_path verification
3246
    if self.op.hvm_cdrom_image_path is not None:
3247
      if not os.path.isabs(self.op.hvm_cdrom_image_path):
3248
        raise errors.OpPrereqError("The path to the HVM CDROM image must"
3249
                                   " be an absolute path or None, not %s" %
3250
                                   self.op.hvm_cdrom_image_path)
3251
      if not os.path.isfile(self.op.hvm_cdrom_image_path):
3252
        raise errors.OpPrereqError("The HVM CDROM image must either be a"
3253
                                   " regular file or a symlink pointing to"
3254
                                   " an existing regular file, not %s" %
3255
                                   self.op.hvm_cdrom_image_path)
3256

    
3257
    # vnc_bind_address verification
3258
    if self.op.vnc_bind_address is not None:
3259
      if not utils.IsValidIP(self.op.vnc_bind_address):
3260
        raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
3261
                                   " like a valid IP address" %
3262
                                   self.op.vnc_bind_address)
3263

    
3264
    if self.op.start:
3265
      self.instance_status = 'up'
3266
    else:
3267
      self.instance_status = 'down'
3268

    
3269
  def Exec(self, feedback_fn):
3270
    """Create and add the instance to the cluster.
3271

3272
    """
3273
    instance = self.op.instance_name
3274
    pnode_name = self.pnode.name
3275

    
3276
    if self.op.mac == "auto":
3277
      mac_address = self.cfg.GenerateMAC()
3278
    else:
3279
      mac_address = self.op.mac
3280

    
3281
    nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3282
    if self.inst_ip is not None:
3283
      nic.ip = self.inst_ip
3284

    
3285
    ht_kind = self.sstore.GetHypervisorType()
3286
    if ht_kind in constants.HTS_REQ_PORT:
3287
      network_port = self.cfg.AllocatePort()
3288
    else:
3289
      network_port = None
3290

    
3291
    if self.op.vnc_bind_address is None:
3292
      self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
3293

    
3294
    # this is needed because os.path.join does not accept None arguments
3295
    if self.op.file_storage_dir is None:
3296
      string_file_storage_dir = ""
3297
    else:
3298
      string_file_storage_dir = self.op.file_storage_dir
3299

    
3300
    # build the full file storage dir path
3301
    file_storage_dir = os.path.normpath(os.path.join(
3302
                                        self.sstore.GetFileStorageDir(),
3303
                                        string_file_storage_dir, instance))
3304

    
3305

    
3306
    disks = _GenerateDiskTemplate(self.cfg,
3307
                                  self.op.disk_template,
3308
                                  instance, pnode_name,
3309
                                  self.secondaries, self.op.disk_size,
3310
                                  self.op.swap_size,
3311
                                  file_storage_dir,
3312
                                  self.op.file_driver)
3313

    
3314
    iobj = objects.Instance(name=instance, os=self.op.os_type,
3315
                            primary_node=pnode_name,
3316
                            memory=self.op.mem_size,
3317
                            vcpus=self.op.vcpus,
3318
                            nics=[nic], disks=disks,
3319
                            disk_template=self.op.disk_template,
3320
                            status=self.instance_status,
3321
                            network_port=network_port,
3322
                            kernel_path=self.op.kernel_path,
3323
                            initrd_path=self.op.initrd_path,
3324
                            hvm_boot_order=self.op.hvm_boot_order,
3325
                            hvm_acpi=self.op.hvm_acpi,
3326
                            hvm_pae=self.op.hvm_pae,
3327
                            hvm_cdrom_image_path=self.op.hvm_cdrom_image_path,
3328
                            vnc_bind_address=self.op.vnc_bind_address,
3329
                            )
3330

    
3331
    feedback_fn("* creating instance disks...")
3332
    if not _CreateDisks(self.cfg, iobj):
3333
      _RemoveDisks(iobj, self.cfg)
3334
      raise errors.OpExecError("Device creation failed, reverting...")
3335

    
3336
    feedback_fn("adding instance %s to cluster config" % instance)
3337

    
3338
    self.cfg.AddInstance(iobj)
3339

    
3340
    if self.op.wait_for_sync:
3341
      disk_abort = not _WaitForSync(self.cfg, iobj, self.proc)
3342
    elif iobj.disk_template in constants.DTS_NET_MIRROR:
3343
      # make sure the disks are not degraded (still sync-ing is ok)
3344
      time.sleep(15)
3345
      feedback_fn("* checking mirrors status")
3346
      disk_abort = not _WaitForSync(self.cfg, iobj, self.proc, oneshot=True)
3347
    else:
3348
      disk_abort = False
3349

    
3350
    if disk_abort:
3351
      _RemoveDisks(iobj, self.cfg)
3352
      self.cfg.RemoveInstance(iobj.name)
3353
      raise errors.OpExecError("There are some degraded disks for"
3354
                               " this instance")
3355

    
3356
    feedback_fn("creating os for instance %s on node %s" %
3357
                (instance, pnode_name))
3358

    
3359
    if iobj.disk_template != constants.DT_DISKLESS:
3360
      if self.op.mode == constants.INSTANCE_CREATE:
3361
        feedback_fn("* running the instance OS create scripts...")
3362
        if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3363
          raise errors.OpExecError("could not add os for instance %s"
3364
                                   " on node %s" %
3365
                                   (instance, pnode_name))
3366

    
3367
      elif self.op.mode == constants.INSTANCE_IMPORT:
3368
        feedback_fn("* running the instance OS import scripts...")
3369
        src_node = self.op.src_node
3370
        src_image = self.src_image
3371
        if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3372
                                                src_node, src_image):
3373
          raise errors.OpExecError("Could not import os for instance"
3374
                                   " %s on node %s" %
3375
                                   (instance, pnode_name))
3376
      else:
3377
        # also checked in the prereq part
3378
        raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3379
                                     % self.op.mode)
3380

    
3381
    if self.op.start:
3382
      logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3383
      feedback_fn("* starting instance...")
3384
      if not rpc.call_instance_start(pnode_name, iobj, None):
3385
        raise errors.OpExecError("Could not start instance")
3386

    
3387

    
3388
class LUConnectConsole(NoHooksLU):
3389
  """Connect to an instance's console.
3390

3391
  This is somewhat special in that it returns the command line that
3392
  you need to run on the master node in order to connect to the
3393
  console.
3394

3395
  """
3396
  _OP_REQP = ["instance_name"]
3397

    
3398
  def CheckPrereq(self):
3399
    """Check prerequisites.
3400

3401
    This checks that the instance is in the cluster.
3402

3403
    """
3404
    instance = self.cfg.GetInstanceInfo(
3405
      self.cfg.ExpandInstanceName(self.op.instance_name))
3406
    if instance is None:
3407
      raise errors.OpPrereqError("Instance '%s' not known" %
3408
                                 self.op.instance_name)
3409
    self.instance = instance
3410

    
3411
  def Exec(self, feedback_fn):
3412
    """Connect to the console of an instance
3413

3414
    """
3415
    instance = self.instance
3416
    node = instance.primary_node
3417

    
3418
    node_insts = rpc.call_instance_list([node])[node]
3419
    if node_insts is False:
3420
      raise errors.OpExecError("Can't connect to node %s." % node)
3421

    
3422
    if instance.name not in node_insts:
3423
      raise errors.OpExecError("Instance %s is not running." % instance.name)
3424

    
3425
    logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3426

    
3427
    hyper = hypervisor.GetHypervisor()
3428
    console_cmd = hyper.GetShellCommandForConsole(instance)
3429

    
3430
    # build ssh cmdline
3431
    return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
3432

    
3433

    
3434
class LUReplaceDisks(LogicalUnit):
3435
  """Replace the disks of an instance.
3436

3437
  """
3438
  HPATH = "mirrors-replace"
3439
  HTYPE = constants.HTYPE_INSTANCE
3440
  _OP_REQP = ["instance_name", "mode", "disks"]
3441

    
3442
  def _RunAllocator(self):
3443
    """Compute a new secondary node using an IAllocator.
3444

3445
    """
3446
    ial = IAllocator(self.cfg, self.sstore,
3447
                     mode=constants.IALLOCATOR_MODE_RELOC,
3448
                     name=self.op.instance_name,
3449
                     relocate_from=[self.sec_node])
3450

    
3451
    ial.Run(self.op.iallocator)
3452

    
3453
    if not ial.success:
3454
      raise errors.OpPrereqError("Can't compute nodes using"
3455
                                 " iallocator '%s': %s" % (self.op.iallocator,
3456
                                                           ial.info))
3457
    if len(ial.nodes) != ial.required_nodes:
3458
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3459
                                 " of nodes (%s), required %s" %
3460
                                 (len(ial.nodes), ial.required_nodes))
3461
    self.op.remote_node = ial.nodes[0]
3462
    logger.ToStdout("Selected new secondary for the instance: %s" %
3463
                    self.op.remote_node)
3464

    
3465
  def BuildHooksEnv(self):
3466
    """Build hooks env.
3467

3468
    This runs on the master, the primary and all the secondaries.
3469

3470
    """
3471
    env = {
3472
      "MODE": self.op.mode,
3473
      "NEW_SECONDARY": self.op.remote_node,
3474
      "OLD_SECONDARY": self.instance.secondary_nodes[0],
3475
      }
3476
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3477
    nl = [
3478
      self.sstore.GetMasterNode(),
3479
      self.instance.primary_node,
3480
      ]
3481
    if self.op.remote_node is not None:
3482
      nl.append(self.op.remote_node)
3483
    return env, nl, nl
3484

    
3485
  def CheckPrereq(self):
3486
    """Check prerequisites.
3487

3488
    This checks that the instance is in the cluster.
3489

3490
    """
3491
    if not hasattr(self.op, "remote_node"):
3492
      self.op.remote_node = None
3493

    
3494
    instance = self.cfg.GetInstanceInfo(
3495
      self.cfg.ExpandInstanceName(self.op.instance_name))
3496
    if instance is None:
3497
      raise errors.OpPrereqError("Instance '%s' not known" %
3498
                                 self.op.instance_name)
3499
    self.instance = instance
3500
    self.op.instance_name = instance.name
3501

    
3502
    if instance.disk_template not in constants.DTS_NET_MIRROR:
3503
      raise errors.OpPrereqError("Instance's disk layout is not"
3504
                                 " network mirrored.")
3505

    
3506
    if len(instance.secondary_nodes) != 1:
3507
      raise errors.OpPrereqError("The instance has a strange layout,"
3508
                                 " expected one secondary but found %d" %
3509
                                 len(instance.secondary_nodes))
3510

    
3511
    self.sec_node = instance.secondary_nodes[0]
3512

    
3513
    ia_name = getattr(self.op, "iallocator", None)
3514
    if ia_name is not None:
3515
      if self.op.remote_node is not None:
3516
        raise errors.OpPrereqError("Give either the iallocator or the new"
3517
                                   " secondary, not both")
3518
      self.op.remote_node = self._RunAllocator()
3519

    
3520
    remote_node = self.op.remote_node
3521
    if remote_node is not None:
3522
      remote_node = self.cfg.ExpandNodeName(remote_node)
3523
      if remote_node is None:
3524
        raise errors.OpPrereqError("Node '%s' not known" %
3525
                                   self.op.remote_node)
3526
      self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3527
    else:
3528
      self.remote_node_info = None
3529
    if remote_node == instance.primary_node:
3530
      raise errors.OpPrereqError("The specified node is the primary node of"
3531
                                 " the instance.")
3532
    elif remote_node == self.sec_node:
3533
      if self.op.mode == constants.REPLACE_DISK_SEC:
3534
        # this is for DRBD8, where we can't execute the same mode of
3535
        # replacement as for drbd7 (no different port allocated)
3536
        raise errors.OpPrereqError("Same secondary given, cannot execute"
3537
                                   " replacement")
3538
    if instance.disk_template == constants.DT_DRBD8:
3539
      if (self.op.mode == constants.REPLACE_DISK_ALL and
3540
          remote_node is not None):
3541
        # switch to replace secondary mode
3542
        self.op.mode = constants.REPLACE_DISK_SEC
3543

    
3544
      if self.op.mode == constants.REPLACE_DISK_ALL:
3545
        raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3546
                                   " secondary disk replacement, not"
3547
                                   " both at once")
3548
      elif self.op.mode == constants.REPLACE_DISK_PRI:
3549
        if remote_node is not None:
3550
          raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3551
                                     " the secondary while doing a primary"
3552
                                     " node disk replacement")
3553
        self.tgt_node = instance.primary_node
3554
        self.oth_node = instance.secondary_nodes[0]
3555
      elif self.op.mode == constants.REPLACE_DISK_SEC:
3556
        self.new_node = remote_node # this can be None, in which case
3557
                                    # we don't change the secondary
3558
        self.tgt_node = instance.secondary_nodes[0]
3559
        self.oth_node = instance.primary_node
3560
      else:
3561
        raise errors.ProgrammerError("Unhandled disk replace mode")
3562

    
3563
    for name in self.op.disks:
3564
      if instance.FindDisk(name) is None:
3565
        raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3566
                                   (name, instance.name))
3567
    self.op.remote_node = remote_node
3568

    
3569
  def _ExecD8DiskOnly(self, feedback_fn):
3570
    """Replace a disk on the primary or secondary for dbrd8.
3571

3572
    The algorithm for replace is quite complicated:
3573
      - for each disk to be replaced:
3574
        - create new LVs on the target node with unique names
3575
        - detach old LVs from the drbd device
3576
        - rename old LVs to name_replaced.<time_t>
3577
        - rename new LVs to old LVs
3578
        - attach the new LVs (with the old names now) to the drbd device
3579
      - wait for sync across all devices
3580
      - for each modified disk:
3581
        - remove old LVs (which have the name name_replaces.<time_t>)
3582

3583
    Failures are not very well handled.
3584

3585
    """
3586
    steps_total = 6
3587
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3588
    instance = self.instance
3589
    iv_names = {}
3590
    vgname = self.cfg.GetVGName()
3591
    # start of work
3592
    cfg = self.cfg
3593
    tgt_node = self.tgt_node
3594
    oth_node = self.oth_node
3595

    
3596
    # Step: check device activation
3597
    self.proc.LogStep(1, steps_total, "check device existence")
3598
    info("checking volume groups")
3599
    my_vg = cfg.GetVGName()
3600
    results = rpc.call_vg_list([oth_node, tgt_node])
3601
    if not results:
3602
      raise errors.OpExecError("Can't list volume groups on the nodes")
3603
    for node in oth_node, tgt_node:
3604
      res = results.get(node, False)
3605
      if not res or my_vg not in res:
3606
        raise errors.OpExecError("Volume group '%s' not found on %s" %
3607
                                 (my_vg, node))
3608
    for dev in instance.disks:
3609
      if not dev.iv_name in self.op.disks:
3610
        continue
3611
      for node in tgt_node, oth_node:
3612
        info("checking %s on %s" % (dev.iv_name, node))
3613
        cfg.SetDiskID(dev, node)
3614
        if not rpc.call_blockdev_find(node, dev):
3615
          raise errors.OpExecError("Can't find device %s on node %s" %
3616
                                   (dev.iv_name, node))
3617

    
3618
    # Step: check other node consistency
3619
    self.proc.LogStep(2, steps_total, "check peer consistency")
3620
    for dev in instance.disks:
3621
      if not dev.iv_name in self.op.disks:
3622
        continue
3623
      info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3624
      if not _CheckDiskConsistency(self.cfg, dev, oth_node,
3625
                                   oth_node==instance.primary_node):
3626
        raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3627
                                 " to replace disks on this node (%s)" %
3628
                                 (oth_node, tgt_node))
3629

    
3630
    # Step: create new storage
3631
    self.proc.LogStep(3, steps_total, "allocate new storage")
3632
    for dev in instance.disks:
3633
      if not dev.iv_name in self.op.disks:
3634
        continue
3635
      size = dev.size
3636
      cfg.SetDiskID(dev, tgt_node)
3637
      lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3638
      names = _GenerateUniqueNames(cfg, lv_names)
3639
      lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3640
                             logical_id=(vgname, names[0]))
3641
      lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3642
                             logical_id=(vgname, names[1]))
3643
      new_lvs = [lv_data, lv_meta]
3644
      old_lvs = dev.children
3645
      iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3646
      info("creating new local storage on %s for %s" %
3647
           (tgt_node, dev.iv_name))
3648
      # since we *always* want to create this LV, we use the
3649
      # _Create...OnPrimary (which forces the creation), even if we
3650
      # are talking about the secondary node
3651
      for new_lv in new_lvs:
3652
        if not _CreateBlockDevOnPrimary(cfg, tgt_node, instance, new_lv,
3653
                                        _GetInstanceInfoText(instance)):
3654
          raise errors.OpExecError("Failed to create new LV named '%s' on"
3655
                                   " node '%s'" %
3656
                                   (new_lv.logical_id[1], tgt_node))
3657

    
3658
    # Step: for each lv, detach+rename*2+attach
3659
    self.proc.LogStep(4, steps_total, "change drbd configuration")
3660
    for dev, old_lvs, new_lvs in iv_names.itervalues():
3661
      info("detaching %s drbd from local storage" % dev.iv_name)
3662
      if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
3663
        raise errors.OpExecError("Can't detach drbd from local storage on node"
3664
                                 " %s for device %s" % (tgt_node, dev.iv_name))
3665
      #dev.children = []
3666
      #cfg.Update(instance)
3667

    
3668
      # ok, we created the new LVs, so now we know we have the needed
3669
      # storage; as such, we proceed on the target node to rename
3670
      # old_lv to _old, and new_lv to old_lv; note that we rename LVs
3671
      # using the assumption that logical_id == physical_id (which in
3672
      # turn is the unique_id on that node)
3673

    
3674
      # FIXME(iustin): use a better name for the replaced LVs
3675
      temp_suffix = int(time.time())
3676
      ren_fn = lambda d, suff: (d.physical_id[0],
3677
                                d.physical_id[1] + "_replaced-%s" % suff)
3678
      # build the rename list based on what LVs exist on the node
3679
      rlist = []
3680
      for to_ren in old_lvs:
3681
        find_res = rpc.call_blockdev_find(tgt_node, to_ren)
3682
        if find_res is not None: # device exists
3683
          rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
3684

    
3685
      info("renaming the old LVs on the target node")
3686
      if not rpc.call_blockdev_rename(tgt_node, rlist):
3687
        raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
3688
      # now we rename the new LVs to the old LVs
3689
      info("renaming the new LVs on the target node")
3690
      rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
3691
      if not rpc.call_blockdev_rename(tgt_node, rlist):
3692
        raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
3693

    
3694
      for old, new in zip(old_lvs, new_lvs):
3695
        new.logical_id = old.logical_id
3696
        cfg.SetDiskID(new, tgt_node)
3697

    
3698
      for disk in old_lvs:
3699
        disk.logical_id = ren_fn(disk, temp_suffix)
3700
        cfg.SetDiskID(disk, tgt_node)
3701

    
3702
      # now that the new lvs have the old name, we can add them to the device
3703
      info("adding new mirror component on %s" % tgt_node)
3704
      if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
3705
        for new_lv in new_lvs:
3706
          if not rpc.call_blockdev_remove(tgt_node, new_lv):
3707
            warning("Can't rollback device %s", hint="manually cleanup unused"
3708
                    " logical volumes")
3709
        raise errors.OpExecError("Can't add local storage to drbd")
3710

    
3711
      dev.children = new_lvs
3712
      cfg.Update(instance)
3713

    
3714
    # Step: wait for sync
3715

    
3716
    # this can fail as the old devices are degraded and _WaitForSync
3717
    # does a combined result over all disks, so we don't check its
3718
    # return value
3719
    self.proc.LogStep(5, steps_total, "sync devices")
3720
    _WaitForSync(cfg, instance, self.proc, unlock=True)
3721

    
3722
    # so check manually all the devices
3723
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3724
      cfg.SetDiskID(dev, instance.primary_node)
3725
      is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3726
      if is_degr:
3727
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
3728

    
3729
    # Step: remove old storage
3730
    self.proc.LogStep(6, steps_total, "removing old storage")
3731
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3732
      info("remove logical volumes for %s" % name)
3733
      for lv in old_lvs:
3734
        cfg.SetDiskID(lv, tgt_node)
3735
        if not rpc.call_blockdev_remove(tgt_node, lv):
3736
          warning("Can't remove old LV", hint="manually remove unused LVs")
3737
          continue
3738

    
3739
  def _ExecD8Secondary(self, feedback_fn):
3740
    """Replace the secondary node for drbd8.
3741

3742
    The algorithm for replace is quite complicated:
3743
      - for all disks of the instance:
3744
        - create new LVs on the new node with same names
3745
        - shutdown the drbd device on the old secondary
3746
        - disconnect the drbd network on the primary
3747
        - create the drbd device on the new secondary
3748
        - network attach the drbd on the primary, using an artifice:
3749
          the drbd code for Attach() will connect to the network if it
3750
          finds a device which is connected to the good local disks but
3751
          not network enabled
3752
      - wait for sync across all devices
3753
      - remove all disks from the old secondary
3754

3755
    Failures are not very well handled.
3756

3757
    """
3758
    steps_total = 6
3759
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3760
    instance = self.instance
3761
    iv_names = {}
3762
    vgname = self.cfg.GetVGName()
3763
    # start of work
3764
    cfg = self.cfg
3765
    old_node = self.tgt_node
3766
    new_node = self.new_node
3767
    pri_node = instance.primary_node
3768

    
3769
    # Step: check device activation
3770
    self.proc.LogStep(1, steps_total, "check device existence")
3771
    info("checking volume groups")
3772
    my_vg = cfg.GetVGName()
3773
    results = rpc.call_vg_list([pri_node, new_node])
3774
    if not results:
3775
      raise errors.OpExecError("Can't list volume groups on the nodes")
3776
    for node in pri_node, new_node:
3777
      res = results.get(node, False)
3778
      if not res or my_vg not in res:
3779
        raise errors.OpExecError("Volume group '%s' not found on %s" %
3780
                                 (my_vg, node))
3781
    for dev in instance.disks:
3782
      if not dev.iv_name in self.op.disks:
3783
        continue
3784
      info("checking %s on %s" % (dev.iv_name, pri_node))
3785
      cfg.SetDiskID(dev, pri_node)
3786
      if not rpc.call_blockdev_find(pri_node, dev):
3787
        raise errors.OpExecError("Can't find device %s on node %s" %
3788
                                 (dev.iv_name, pri_node))
3789

    
3790
    # Step: check other node consistency
3791
    self.proc.LogStep(2, steps_total, "check peer consistency")
3792
    for dev in instance.disks:
3793
      if not dev.iv_name in self.op.disks:
3794
        continue
3795
      info("checking %s consistency on %s" % (dev.iv_name, pri_node))
3796
      if not _CheckDiskConsistency(self.cfg, dev, pri_node, True, ldisk=True):
3797
        raise errors.OpExecError("Primary node (%s) has degraded storage,"
3798
                                 " unsafe to replace the secondary" %
3799
                                 pri_node)
3800

    
3801
    # Step: create new storage
3802
    self.proc.LogStep(3, steps_total, "allocate new storage")
3803
    for dev in instance.disks:
3804
      size = dev.size
3805
      info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
3806
      # since we *always* want to create this LV, we use the
3807
      # _Create...OnPrimary (which forces the creation), even if we
3808
      # are talking about the secondary node
3809
      for new_lv in dev.children:
3810
        if not _CreateBlockDevOnPrimary(cfg, new_node, instance, new_lv,
3811
                                        _GetInstanceInfoText(instance)):
3812
          raise errors.OpExecError("Failed to create new LV named '%s' on"
3813
                                   " node '%s'" %
3814
                                   (new_lv.logical_id[1], new_node))
3815

    
3816
      iv_names[dev.iv_name] = (dev, dev.children)
3817

    
3818
    self.proc.LogStep(4, steps_total, "changing drbd configuration")
3819
    for dev in instance.disks:
3820
      size = dev.size
3821
      info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
3822
      # create new devices on new_node
3823
      new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
3824
                              logical_id=(pri_node, new_node,
3825
                                          dev.logical_id[2]),
3826
                              children=dev.children)
3827
      if not _CreateBlockDevOnSecondary(cfg, new_node, instance,
3828
                                        new_drbd, False,
3829
                                      _GetInstanceInfoText(instance)):
3830
        raise errors.OpExecError("Failed to create new DRBD on"
3831
                                 " node '%s'" % new_node)
3832

    
3833
    for dev in instance.disks:
3834
      # we have new devices, shutdown the drbd on the old secondary
3835
      info("shutting down drbd for %s on old node" % dev.iv_name)
3836
      cfg.SetDiskID(dev, old_node)
3837
      if not rpc.call_blockdev_shutdown(old_node, dev):
3838
        warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
3839
                hint="Please cleanup this device manually as soon as possible")
3840

    
3841
    info("detaching primary drbds from the network (=> standalone)")
3842
    done = 0
3843
    for dev in instance.disks:
3844
      cfg.SetDiskID(dev, pri_node)
3845
      # set the physical (unique in bdev terms) id to None, meaning
3846
      # detach from network
3847
      dev.physical_id = (None,) * len(dev.physical_id)
3848
      # and 'find' the device, which will 'fix' it to match the
3849
      # standalone state
3850
      if rpc.call_blockdev_find(pri_node, dev):
3851
        done += 1
3852
      else:
3853
        warning("Failed to detach drbd %s from network, unusual case" %
3854
                dev.iv_name)
3855

    
3856
    if not done:
3857
      # no detaches succeeded (very unlikely)
3858
      raise errors.OpExecError("Can't detach at least one DRBD from old node")
3859

    
3860
    # if we managed to detach at least one, we update all the disks of
3861
    # the instance to point to the new secondary
3862
    info("updating instance configuration")
3863
    for dev in instance.disks:
3864
      dev.logical_id = (pri_node, new_node) + dev.logical_id[2:]
3865
      cfg.SetDiskID(dev, pri_node)
3866
    cfg.Update(instance)
3867

    
3868
    # and now perform the drbd attach
3869
    info("attaching primary drbds to new secondary (standalone => connected)")
3870
    failures = []
3871
    for dev in instance.disks:
3872
      info("attaching primary drbd for %s to new secondary node" % dev.iv_name)
3873
      # since the attach is smart, it's enough to 'find' the device,
3874
      # it will automatically activate the network, if the physical_id
3875
      # is correct
3876
      cfg.SetDiskID(dev, pri_node)
3877
      if not rpc.call_blockdev_find(pri_node, dev):
3878
        warning("can't attach drbd %s to new secondary!" % dev.iv_name,
3879
                "please do a gnt-instance info to see the status of disks")
3880

    
3881
    # this can fail as the old devices are degraded and _WaitForSync
3882
    # does a combined result over all disks, so we don't check its
3883
    # return value
3884
    self.proc.LogStep(5, steps_total, "sync devices")
3885
    _WaitForSync(cfg, instance, self.proc, unlock=True)
3886

    
3887
    # so check manually all the devices
3888
    for name, (dev, old_lvs) in iv_names.iteritems():
3889
      cfg.SetDiskID(dev, pri_node)
3890
      is_degr = rpc.call_blockdev_find(pri_node, dev)[5]
3891
      if is_degr:
3892
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
3893

    
3894
    self.proc.LogStep(6, steps_total, "removing old storage")
3895
    for name, (dev, old_lvs) in iv_names.iteritems():
3896
      info("remove logical volumes for %s" % name)
3897
      for lv in old_lvs:
3898
        cfg.SetDiskID(lv, old_node)
3899
        if not rpc.call_blockdev_remove(old_node, lv):
3900
          warning("Can't remove LV on old secondary",
3901
                  hint="Cleanup stale volumes by hand")
3902

    
3903
  def Exec(self, feedback_fn):
3904
    """Execute disk replacement.
3905

3906
    This dispatches the disk replacement to the appropriate handler.
3907

3908
    """
3909
    instance = self.instance
3910
    if instance.disk_template == constants.DT_DRBD8:
3911
      if self.op.remote_node is None:
3912
        fn = self._ExecD8DiskOnly
3913
      else:
3914
        fn = self._ExecD8Secondary
3915
    else:
3916
      raise errors.ProgrammerError("Unhandled disk replacement case")
3917
    return fn(feedback_fn)
3918

    
3919

    
3920
class LUQueryInstanceData(NoHooksLU):
3921
  """Query runtime instance data.
3922

3923
  """
3924
  _OP_REQP = ["instances"]
3925

    
3926
  def CheckPrereq(self):
3927
    """Check prerequisites.
3928

3929
    This only checks the optional instance list against the existing names.
3930

3931
    """
3932
    if not isinstance(self.op.instances, list):
3933
      raise errors.OpPrereqError("Invalid argument type 'instances'")
3934
    if self.op.instances:
3935
      self.wanted_instances = []
3936
      names = self.op.instances
3937
      for name in names:
3938
        instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
3939
        if instance is None:
3940
          raise errors.OpPrereqError("No such instance name '%s'" % name)
3941
        self.wanted_instances.append(instance)
3942
    else:
3943
      self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
3944
                               in self.cfg.GetInstanceList()]
3945
    return
3946

    
3947

    
3948
  def _ComputeDiskStatus(self, instance, snode, dev):
3949
    """Compute block device status.
3950

3951
    """
3952
    self.cfg.SetDiskID(dev, instance.primary_node)
3953
    dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
3954
    if dev.dev_type in constants.LDS_DRBD:
3955
      # we change the snode then (otherwise we use the one passed in)
3956
      if dev.logical_id[0] == instance.primary_node:
3957
        snode = dev.logical_id[1]
3958
      else:
3959
        snode = dev.logical_id[0]
3960

    
3961
    if snode:
3962
      self.cfg.SetDiskID(dev, snode)
3963
      dev_sstatus = rpc.call_blockdev_find(snode, dev)
3964
    else:
3965
      dev_sstatus = None
3966

    
3967
    if dev.children:
3968
      dev_children = [self._ComputeDiskStatus(instance, snode, child)
3969
                      for child in dev.children]
3970
    else:
3971
      dev_children = []
3972

    
3973
    data = {
3974
      "iv_name": dev.iv_name,
3975
      "dev_type": dev.dev_type,
3976
      "logical_id": dev.logical_id,
3977
      "physical_id": dev.physical_id,
3978
      "pstatus": dev_pstatus,
3979
      "sstatus": dev_sstatus,
3980
      "children": dev_children,
3981
      }
3982

    
3983
    return data
3984

    
3985
  def Exec(self, feedback_fn):
3986
    """Gather and return data"""
3987
    result = {}
3988
    for instance in self.wanted_instances:
3989
      remote_info = rpc.call_instance_info(instance.primary_node,
3990
                                                instance.name)
3991
      if remote_info and "state" in remote_info:
3992
        remote_state = "up"
3993
      else:
3994
        remote_state = "down"
3995
      if instance.status == "down":
3996
        config_state = "down"
3997
      else:
3998
        config_state = "up"
3999

    
4000
      disks = [self._ComputeDiskStatus(instance, None, device)
4001
               for device in instance.disks]
4002

    
4003
      idict = {
4004
        "name": instance.name,
4005
        "config_state": config_state,
4006
        "run_state": remote_state,
4007
        "pnode": instance.primary_node,
4008
        "snodes": instance.secondary_nodes,
4009
        "os": instance.os,
4010
        "memory": instance.memory,
4011
        "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
4012
        "disks": disks,
4013
        "vcpus": instance.vcpus,
4014
        }
4015

    
4016
      htkind = self.sstore.GetHypervisorType()
4017
      if htkind == constants.HT_XEN_PVM30:
4018
        idict["kernel_path"] = instance.kernel_path
4019
        idict["initrd_path"] = instance.initrd_path
4020

    
4021
      if htkind == constants.HT_XEN_HVM31:
4022
        idict["hvm_boot_order"] = instance.hvm_boot_order
4023
        idict["hvm_acpi"] = instance.hvm_acpi
4024
        idict["hvm_pae"] = instance.hvm_pae
4025
        idict["hvm_cdrom_image_path"] = instance.hvm_cdrom_image_path
4026

    
4027
      if htkind in constants.HTS_REQ_PORT:
4028
        idict["vnc_bind_address"] = instance.vnc_bind_address
4029
        idict["network_port"] = instance.network_port
4030

    
4031
      result[instance.name] = idict
4032

    
4033
    return result
4034

    
4035

    
4036
class LUSetInstanceParams(LogicalUnit):
4037
  """Modifies an instances's parameters.
4038

4039
  """
4040
  HPATH = "instance-modify"
4041
  HTYPE = constants.HTYPE_INSTANCE
4042
  _OP_REQP = ["instance_name"]
4043

    
4044
  def BuildHooksEnv(self):
4045
    """Build hooks env.
4046

4047
    This runs on the master, primary and secondaries.
4048

4049
    """
4050
    args = dict()
4051
    if self.mem:
4052
      args['memory'] = self.mem
4053
    if self.vcpus:
4054
      args['vcpus'] = self.vcpus
4055
    if self.do_ip or self.do_bridge or self.mac:
4056
      if self.do_ip:
4057
        ip = self.ip
4058
      else:
4059
        ip = self.instance.nics[0].ip
4060
      if self.bridge:
4061
        bridge = self.bridge
4062
      else:
4063
        bridge = self.instance.nics[0].bridge
4064
      if self.mac:
4065
        mac = self.mac
4066
      else:
4067
        mac = self.instance.nics[0].mac
4068
      args['nics'] = [(ip, bridge, mac)]
4069
    env = _BuildInstanceHookEnvByObject(self.instance, override=args)
4070
    nl = [self.sstore.GetMasterNode(),
4071
          self.instance.primary_node] + list(self.instance.secondary_nodes)
4072
    return env, nl, nl
4073

    
4074
  def CheckPrereq(self):
4075
    """Check prerequisites.
4076

4077
    This only checks the instance list against the existing names.
4078

4079
    """
4080
    self.mem = getattr(self.op, "mem", None)
4081
    self.vcpus = getattr(self.op, "vcpus", None)
4082
    self.ip = getattr(self.op, "ip", None)
4083
    self.mac = getattr(self.op, "mac", None)
4084
    self.bridge = getattr(self.op, "bridge", None)
4085
    self.kernel_path = getattr(self.op, "kernel_path", None)
4086
    self.initrd_path = getattr(self.op, "initrd_path", None)
4087
    self.hvm_boot_order = getattr(self.op, "hvm_boot_order", None)
4088
    self.hvm_acpi = getattr(self.op, "hvm_acpi", None)
4089
    self.hvm_pae = getattr(self.op, "hvm_pae", None)
4090
    self.hvm_cdrom_image_path = getattr(self.op, "hvm_cdrom_image_path", None)
4091
    self.vnc_bind_address = getattr(self.op, "vnc_bind_address", None)
4092
    all_parms = [self.mem, self.vcpus, self.ip, self.bridge, self.mac,
4093
                 self.kernel_path, self.initrd_path, self.hvm_boot_order,
4094
                 self.hvm_acpi, self.hvm_pae, self.hvm_cdrom_image_path,
4095
                 self.vnc_bind_address]
4096
    if all_parms.count(None) == len(all_parms):
4097
      raise errors.OpPrereqError("No changes submitted")
4098
    if self.mem is not None:
4099
      try:
4100
        self.mem = int(self.mem)
4101
      except ValueError, err:
4102
        raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
4103
    if self.vcpus is not None:
4104
      try:
4105
        self.vcpus = int(self.vcpus)
4106
      except ValueError, err:
4107
        raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
4108
    if self.ip is not None:
4109
      self.do_ip = True
4110
      if self.ip.lower() == "none":
4111
        self.ip = None
4112
      else:
4113
        if not utils.IsValidIP(self.ip):
4114
          raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
4115
    else:
4116
      self.do_ip = False
4117
    self.do_bridge = (self.bridge is not None)
4118
    if self.mac is not None:
4119
      if self.cfg.IsMacInUse(self.mac):
4120
        raise errors.OpPrereqError('MAC address %s already in use in cluster' %
4121
                                   self.mac)
4122
      if not utils.IsValidMac(self.mac):
4123
        raise errors.OpPrereqError('Invalid MAC address %s' % self.mac)
4124

    
4125
    if self.kernel_path is not None:
4126
      self.do_kernel_path = True
4127
      if self.kernel_path == constants.VALUE_NONE:
4128
        raise errors.OpPrereqError("Can't set instance to no kernel")
4129

    
4130
      if self.kernel_path != constants.VALUE_DEFAULT:
4131
        if not os.path.isabs(self.kernel_path):
4132
          raise errors.OpPrereqError("The kernel path must be an absolute"
4133
                                    " filename")
4134
    else:
4135
      self.do_kernel_path = False
4136

    
4137
    if self.initrd_path is not None:
4138
      self.do_initrd_path = True
4139
      if self.initrd_path not in (constants.VALUE_NONE,
4140
                                  constants.VALUE_DEFAULT):
4141
        if not os.path.isabs(self.initrd_path):
4142
          raise errors.OpPrereqError("The initrd path must be an absolute"
4143
                                    " filename")
4144
    else:
4145
      self.do_initrd_path = False
4146

    
4147
    # boot order verification
4148
    if self.hvm_boot_order is not None:
4149
      if self.hvm_boot_order != constants.VALUE_DEFAULT:
4150
        if len(self.hvm_boot_order.strip("acdn")) != 0:
4151
          raise errors.OpPrereqError("invalid boot order specified,"
4152
                                     " must be one or more of [acdn]"
4153
                                     " or 'default'")
4154

    
4155
    # hvm_cdrom_image_path verification
4156
    if self.op.hvm_cdrom_image_path is not None:
4157
      if not os.path.isabs(self.op.hvm_cdrom_image_path):
4158
        raise errors.OpPrereqError("The path to the HVM CDROM image must"
4159
                                   " be an absolute path or None, not %s" %
4160
                                   self.op.hvm_cdrom_image_path)
4161
      if not os.path.isfile(self.op.hvm_cdrom_image_path):
4162
        raise errors.OpPrereqError("The HVM CDROM image must either be a"
4163
                                   " regular file or a symlink pointing to"
4164
                                   " an existing regular file, not %s" %
4165
                                   self.op.hvm_cdrom_image_path)
4166

    
4167
    # vnc_bind_address verification
4168
    if self.op.vnc_bind_address is not None:
4169
      if not utils.IsValidIP(self.op.vnc_bind_address):
4170
        raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
4171
                                   " like a valid IP address" %
4172
                                   self.op.vnc_bind_address)
4173

    
4174
    instance = self.cfg.GetInstanceInfo(
4175
      self.cfg.ExpandInstanceName(self.op.instance_name))
4176
    if instance is None:
4177
      raise errors.OpPrereqError("No such instance name '%s'" %
4178
                                 self.op.instance_name)
4179
    self.op.instance_name = instance.name
4180
    self.instance = instance
4181
    return
4182

    
4183
  def Exec(self, feedback_fn):
4184
    """Modifies an instance.
4185

4186
    All parameters take effect only at the next restart of the instance.
4187
    """
4188
    result = []
4189
    instance = self.instance
4190
    if self.mem:
4191
      instance.memory = self.mem
4192
      result.append(("mem", self.mem))
4193
    if self.vcpus:
4194
      instance.vcpus = self.vcpus
4195
      result.append(("vcpus",  self.vcpus))
4196
    if self.do_ip:
4197
      instance.nics[0].ip = self.ip
4198
      result.append(("ip", self.ip))
4199
    if self.bridge:
4200
      instance.nics[0].bridge = self.bridge
4201
      result.append(("bridge", self.bridge))
4202
    if self.mac:
4203
      instance.nics[0].mac = self.mac
4204
      result.append(("mac", self.mac))
4205
    if self.do_kernel_path:
4206
      instance.kernel_path = self.kernel_path
4207
      result.append(("kernel_path", self.kernel_path))
4208
    if self.do_initrd_path:
4209
      instance.initrd_path = self.initrd_path
4210
      result.append(("initrd_path", self.initrd_path))
4211
    if self.hvm_boot_order:
4212
      if self.hvm_boot_order == constants.VALUE_DEFAULT:
4213
        instance.hvm_boot_order = None
4214
      else:
4215
        instance.hvm_boot_order = self.hvm_boot_order
4216
      result.append(("hvm_boot_order", self.hvm_boot_order))
4217
    if self.hvm_acpi:
4218
      instance.hvm_acpi = self.hvm_acpi
4219
      result.append(("hvm_acpi", self.hvm_acpi))
4220
    if self.hvm_pae:
4221
      instance.hvm_pae = self.hvm_pae
4222
      result.append(("hvm_pae", self.hvm_pae))
4223
    if self.hvm_cdrom_image_path:
4224
      instance.hvm_cdrom_image_path = self.hvm_cdrom_image_path
4225
      result.append(("hvm_cdrom_image_path", self.hvm_cdrom_image_path))
4226
    if self.vnc_bind_address:
4227
      instance.vnc_bind_address = self.vnc_bind_address
4228
      result.append(("vnc_bind_address", self.vnc_bind_address))
4229

    
4230
    self.cfg.AddInstance(instance)
4231

    
4232
    return result
4233

    
4234

    
4235
class LUQueryExports(NoHooksLU):
4236
  """Query the exports list
4237

4238
  """
4239
  _OP_REQP = []
4240

    
4241
  def CheckPrereq(self):
4242
    """Check that the nodelist contains only existing nodes.
4243

4244
    """
4245
    self.nodes = _GetWantedNodes(self, getattr(self.op, "nodes", None))
4246

    
4247
  def Exec(self, feedback_fn):
4248
    """Compute the list of all the exported system images.
4249

4250
    Returns:
4251
      a dictionary with the structure node->(export-list)
4252
      where export-list is a list of the instances exported on
4253
      that node.
4254

4255
    """
4256
    return rpc.call_export_list(self.nodes)
4257

    
4258

    
4259
class LUExportInstance(LogicalUnit):
4260
  """Export an instance to an image in the cluster.
4261

4262
  """
4263
  HPATH = "instance-export"
4264
  HTYPE = constants.HTYPE_INSTANCE
4265
  _OP_REQP = ["instance_name", "target_node", "shutdown"]
4266

    
4267
  def BuildHooksEnv(self):
4268
    """Build hooks env.
4269

4270
    This will run on the master, primary node and target node.
4271

4272
    """
4273
    env = {
4274
      "EXPORT_NODE": self.op.target_node,
4275
      "EXPORT_DO_SHUTDOWN": self.op.shutdown,
4276
      }
4277
    env.update(_BuildInstanceHookEnvByObject(self.instance))
4278
    nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
4279
          self.op.target_node]
4280
    return env, nl, nl
4281

    
4282
  def CheckPrereq(self):
4283
    """Check prerequisites.
4284

4285
    This checks that the instance and node names are valid.
4286

4287
    """
4288
    instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4289
    self.instance = self.cfg.GetInstanceInfo(instance_name)
4290
    if self.instance is None:
4291
      raise errors.OpPrereqError("Instance '%s' not found" %
4292
                                 self.op.instance_name)
4293

    
4294
    # node verification
4295
    dst_node_short = self.cfg.ExpandNodeName(self.op.target_node)
4296
    self.dst_node = self.cfg.GetNodeInfo(dst_node_short)
4297

    
4298
    if self.dst_node is None:
4299
      raise errors.OpPrereqError("Destination node '%s' is unknown." %
4300
                                 self.op.target_node)
4301
    self.op.target_node = self.dst_node.name
4302

    
4303
    # instance disk type verification
4304
    for disk in self.instance.disks:
4305
      if disk.dev_type == constants.LD_FILE:
4306
        raise errors.OpPrereqError("Export not supported for instances with"
4307
                                   " file-based disks")
4308

    
4309
  def Exec(self, feedback_fn):
4310
    """Export an instance to an image in the cluster.
4311

4312
    """
4313
    instance = self.instance
4314
    dst_node = self.dst_node
4315
    src_node = instance.primary_node
4316
    if self.op.shutdown:
4317
      # shutdown the instance, but not the disks
4318
      if not rpc.call_instance_shutdown(src_node, instance):
4319
         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
4320
                                  (instance.name, src_node))
4321

    
4322
    vgname = self.cfg.GetVGName()
4323

    
4324
    snap_disks = []
4325

    
4326
    try:
4327
      for disk in instance.disks:
4328
        if disk.iv_name == "sda":
4329
          # new_dev_name will be a snapshot of an lvm leaf of the one we passed
4330
          new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
4331

    
4332
          if not new_dev_name:
4333
            logger.Error("could not snapshot block device %s on node %s" %
4334
                         (disk.logical_id[1], src_node))
4335
          else:
4336
            new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
4337
                                      logical_id=(vgname, new_dev_name),
4338
                                      physical_id=(vgname, new_dev_name),
4339
                                      iv_name=disk.iv_name)
4340
            snap_disks.append(new_dev)
4341

    
4342
    finally:
4343
      if self.op.shutdown and instance.status == "up":
4344
        if not rpc.call_instance_start(src_node, instance, None):
4345
          _ShutdownInstanceDisks(instance, self.cfg)
4346
          raise errors.OpExecError("Could not start instance")
4347

    
4348
    # TODO: check for size
4349

    
4350
    for dev in snap_disks:
4351
      if not rpc.call_snapshot_export(src_node, dev, dst_node.name, instance):
4352
        logger.Error("could not export block device %s from node %s to node %s"
4353
                     % (dev.logical_id[1], src_node, dst_node.name))
4354
      if not rpc.call_blockdev_remove(src_node, dev):
4355
        logger.Error("could not remove snapshot block device %s from node %s" %
4356
                     (dev.logical_id[1], src_node))
4357

    
4358
    if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
4359
      logger.Error("could not finalize export for instance %s on node %s" %
4360
                   (instance.name, dst_node.name))
4361

    
4362
    nodelist = self.cfg.GetNodeList()
4363
    nodelist.remove(dst_node.name)
4364

    
4365
    # on one-node clusters nodelist will be empty after the removal
4366
    # if we proceed the backup would be removed because OpQueryExports
4367
    # substitutes an empty list with the full cluster node list.
4368
    if nodelist:
4369
      op = opcodes.OpQueryExports(nodes=nodelist)
4370
      exportlist = self.proc.ChainOpCode(op)
4371
      for node in exportlist:
4372
        if instance.name in exportlist[node]:
4373
          if not rpc.call_export_remove(node, instance.name):
4374
            logger.Error("could not remove older export for instance %s"
4375
                         " on node %s" % (instance.name, node))
4376

    
4377

    
4378
class LURemoveExport(NoHooksLU):
4379
  """Remove exports related to the named instance.
4380

4381
  """
4382
  _OP_REQP = ["instance_name"]
4383

    
4384
  def CheckPrereq(self):
4385
    """Check prerequisites.
4386
    """
4387
    pass
4388

    
4389
  def Exec(self, feedback_fn):
4390
    """Remove any export.
4391

4392
    """
4393
    instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4394
    # If the instance was not found we'll try with the name that was passed in.
4395
    # This will only work if it was an FQDN, though.
4396
    fqdn_warn = False
4397
    if not instance_name:
4398
      fqdn_warn = True
4399
      instance_name = self.op.instance_name
4400

    
4401
    op = opcodes.OpQueryExports(nodes=[])
4402
    exportlist = self.proc.ChainOpCode(op)
4403
    found = False
4404
    for node in exportlist:
4405
      if instance_name in exportlist[node]:
4406
        found = True
4407
        if not rpc.call_export_remove(node, instance_name):
4408
          logger.Error("could not remove export for instance %s"
4409
                       " on node %s" % (instance_name, node))
4410

    
4411
    if fqdn_warn and not found:
4412
      feedback_fn("Export not found. If trying to remove an export belonging"
4413
                  " to a deleted instance please use its Fully Qualified"
4414
                  " Domain Name.")
4415

    
4416

    
4417
class TagsLU(NoHooksLU):
4418
  """Generic tags LU.
4419

4420
  This is an abstract class which is the parent of all the other tags LUs.
4421

4422
  """
4423
  def CheckPrereq(self):
4424
    """Check prerequisites.
4425

4426
    """
4427
    if self.op.kind == constants.TAG_CLUSTER:
4428
      self.target = self.cfg.GetClusterInfo()
4429
    elif self.op.kind == constants.TAG_NODE:
4430
      name = self.cfg.ExpandNodeName(self.op.name)
4431
      if name is None:
4432
        raise errors.OpPrereqError("Invalid node name (%s)" %
4433
                                   (self.op.name,))
4434
      self.op.name = name
4435
      self.target = self.cfg.GetNodeInfo(name)
4436
    elif self.op.kind == constants.TAG_INSTANCE:
4437
      name = self.cfg.ExpandInstanceName(self.op.name)
4438
      if name is None:
4439
        raise errors.OpPrereqError("Invalid instance name (%s)" %
4440
                                   (self.op.name,))
4441
      self.op.name = name
4442
      self.target = self.cfg.GetInstanceInfo(name)
4443
    else:
4444
      raise errors.OpPrereqError("Wrong tag type requested (%s)" %
4445
                                 str(self.op.kind))
4446

    
4447

    
4448
class LUGetTags(TagsLU):
4449
  """Returns the tags of a given object.
4450

4451
  """
4452
  _OP_REQP = ["kind", "name"]
4453

    
4454
  def Exec(self, feedback_fn):
4455
    """Returns the tag list.
4456

4457
    """
4458
    return self.target.GetTags()
4459

    
4460

    
4461
class LUSearchTags(NoHooksLU):
4462
  """Searches the tags for a given pattern.
4463

4464
  """
4465
  _OP_REQP = ["pattern"]
4466

    
4467
  def CheckPrereq(self):
4468
    """Check prerequisites.
4469

4470
    This checks the pattern passed for validity by compiling it.
4471

4472
    """
4473
    try:
4474
      self.re = re.compile(self.op.pattern)
4475
    except re.error, err:
4476
      raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
4477
                                 (self.op.pattern, err))
4478

    
4479
  def Exec(self, feedback_fn):
4480
    """Returns the tag list.
4481

4482
    """
4483
    cfg = self.cfg
4484
    tgts = [("/cluster", cfg.GetClusterInfo())]
4485
    ilist = [cfg.GetInstanceInfo(name) for name in cfg.GetInstanceList()]
4486
    tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
4487
    nlist = [cfg.GetNodeInfo(name) for name in cfg.GetNodeList()]
4488
    tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
4489
    results = []
4490
    for path, target in tgts:
4491
      for tag in target.GetTags():
4492
        if self.re.search(tag):
4493
          results.append((path, tag))
4494
    return results
4495

    
4496

    
4497
class LUAddTags(TagsLU):
4498
  """Sets a tag on a given object.
4499

4500
  """
4501
  _OP_REQP = ["kind", "name", "tags"]
4502

    
4503
  def CheckPrereq(self):
4504
    """Check prerequisites.
4505

4506
    This checks the type and length of the tag name and value.
4507

4508
    """
4509
    TagsLU.CheckPrereq(self)
4510
    for tag in self.op.tags:
4511
      objects.TaggableObject.ValidateTag(tag)
4512

    
4513
  def Exec(self, feedback_fn):
4514
    """Sets the tag.
4515

4516
    """
4517
    try:
4518
      for tag in self.op.tags:
4519
        self.target.AddTag(tag)
4520
    except errors.TagError, err:
4521
      raise errors.OpExecError("Error while setting tag: %s" % str(err))
4522
    try:
4523
      self.cfg.Update(self.target)
4524
    except errors.ConfigurationError:
4525
      raise errors.OpRetryError("There has been a modification to the"
4526
                                " config file and the operation has been"
4527
                                " aborted. Please retry.")
4528

    
4529

    
4530
class LUDelTags(TagsLU):
4531
  """Delete a list of tags from a given object.
4532

4533
  """
4534
  _OP_REQP = ["kind", "name", "tags"]
4535

    
4536
  def CheckPrereq(self):
4537
    """Check prerequisites.
4538

4539
    This checks that we have the given tag.
4540

4541
    """
4542
    TagsLU.CheckPrereq(self)
4543
    for tag in self.op.tags:
4544
      objects.TaggableObject.ValidateTag(tag)
4545
    del_tags = frozenset(self.op.tags)
4546
    cur_tags = self.target.GetTags()
4547
    if not del_tags <= cur_tags:
4548
      diff_tags = del_tags - cur_tags
4549
      diff_names = ["'%s'" % tag for tag in diff_tags]
4550
      diff_names.sort()
4551
      raise errors.OpPrereqError("Tag(s) %s not found" %
4552
                                 (",".join(diff_names)))
4553

    
4554
  def Exec(self, feedback_fn):
4555
    """Remove the tag from the object.
4556

4557
    """
4558
    for tag in self.op.tags:
4559
      self.target.RemoveTag(tag)
4560
    try:
4561
      self.cfg.Update(self.target)
4562
    except errors.ConfigurationError:
4563
      raise errors.OpRetryError("There has been a modification to the"
4564
                                " config file and the operation has been"
4565
                                " aborted. Please retry.")
4566

    
4567
class LUTestDelay(NoHooksLU):
4568
  """Sleep for a specified amount of time.
4569

4570
  This LU sleeps on the master and/or nodes for a specified amoutn of
4571
  time.
4572

4573
  """
4574
  _OP_REQP = ["duration", "on_master", "on_nodes"]
4575

    
4576
  def CheckPrereq(self):
4577
    """Check prerequisites.
4578

4579
    This checks that we have a good list of nodes and/or the duration
4580
    is valid.
4581

4582
    """
4583

    
4584
    if self.op.on_nodes:
4585
      self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
4586

    
4587
  def Exec(self, feedback_fn):
4588
    """Do the actual sleep.
4589

4590
    """
4591
    if self.op.on_master:
4592
      if not utils.TestDelay(self.op.duration):
4593
        raise errors.OpExecError("Error during master delay test")
4594
    if self.op.on_nodes:
4595
      result = rpc.call_test_delay(self.op.on_nodes, self.op.duration)
4596
      if not result:
4597
        raise errors.OpExecError("Complete failure from rpc call")
4598
      for node, node_result in result.items():
4599
        if not node_result:
4600
          raise errors.OpExecError("Failure during rpc call to node %s,"
4601
                                   " result: %s" % (node, node_result))
4602

    
4603

    
4604
class IAllocator(object):
4605
  """IAllocator framework.
4606

4607
  An IAllocator instance has three sets of attributes:
4608
    - cfg/sstore that are needed to query the cluster
4609
    - input data (all members of the _KEYS class attribute are required)
4610
    - four buffer attributes (in|out_data|text), that represent the
4611
      input (to the external script) in text and data structure format,
4612
      and the output from it, again in two formats
4613
    - the result variables from the script (success, info, nodes) for
4614
      easy usage
4615

4616
  """
4617
  _ALLO_KEYS = [
4618
    "mem_size", "disks", "disk_template",
4619
    "os", "tags", "nics", "vcpus",
4620
    ]
4621
  _RELO_KEYS = [
4622
    "relocate_from",
4623
    ]
4624

    
4625
  def __init__(self, cfg, sstore, mode, name, **kwargs):
4626
    self.cfg = cfg
4627
    self.sstore = sstore
4628
    # init buffer variables
4629
    self.in_text = self.out_text = self.in_data = self.out_data = None
4630
    # init all input fields so that pylint is happy
4631
    self.mode = mode
4632
    self.name = name
4633
    self.mem_size = self.disks = self.disk_template = None
4634
    self.os = self.tags = self.nics = self.vcpus = None
4635
    self.relocate_from = None
4636
    # computed fields
4637
    self.required_nodes = None
4638
    # init result fields
4639
    self.success = self.info = self.nodes = None
4640
    if self.mode == constants.IALLOCATOR_MODE_ALLOC:
4641
      keyset = self._ALLO_KEYS
4642
    elif self.mode == constants.IALLOCATOR_MODE_RELOC:
4643
      keyset = self._RELO_KEYS
4644
    else:
4645
      raise errors.ProgrammerError("Unknown mode '%s' passed to the"
4646
                                   " IAllocator" % self.mode)
4647
    for key in kwargs:
4648
      if key not in keyset:
4649
        raise errors.ProgrammerError("Invalid input parameter '%s' to"
4650
                                     " IAllocator" % key)
4651
      setattr(self, key, kwargs[key])
4652
    for key in keyset:
4653
      if key not in kwargs:
4654
        raise errors.ProgrammerError("Missing input parameter '%s' to"
4655
                                     " IAllocator" % key)
4656
    self._BuildInputData()
4657

    
4658
  def _ComputeClusterData(self):
4659
    """Compute the generic allocator input data.
4660

4661
    This is the data that is independent of the actual operation.
4662

4663
    """
4664
    cfg = self.cfg
4665
    # cluster data
4666
    data = {
4667
      "version": 1,
4668
      "cluster_name": self.sstore.GetClusterName(),
4669
      "cluster_tags": list(cfg.GetClusterInfo().GetTags()),
4670
      "hypervisor_type": self.sstore.GetHypervisorType(),
4671
      # we don't have job IDs
4672
      }
4673

    
4674
    i_list = [cfg.GetInstanceInfo(iname) for iname in cfg.GetInstanceList()]
4675

    
4676
    # node data
4677
    node_results = {}
4678
    node_list = cfg.GetNodeList()
4679
    node_data = rpc.call_node_info(node_list, cfg.GetVGName())
4680
    for nname in node_list:
4681
      ninfo = cfg.GetNodeInfo(nname)
4682
      if nname not in node_data or not isinstance(node_data[nname], dict):
4683
        raise errors.OpExecError("Can't get data for node %s" % nname)
4684
      remote_info = node_data[nname]
4685
      for attr in ['memory_total', 'memory_free', 'memory_dom0',
4686
                   'vg_size', 'vg_free', 'cpu_total']:
4687
        if attr not in remote_info:
4688
          raise errors.OpExecError("Node '%s' didn't return attribute '%s'" %
4689
                                   (nname, attr))
4690
        try:
4691
          remote_info[attr] = int(remote_info[attr])
4692
        except ValueError, err:
4693
          raise errors.OpExecError("Node '%s' returned invalid value for '%s':"
4694
                                   " %s" % (nname, attr, str(err)))
4695
      # compute memory used by primary instances
4696
      i_p_mem = i_p_up_mem = 0
4697
      for iinfo in i_list:
4698
        if iinfo.primary_node == nname:
4699
          i_p_mem += iinfo.memory
4700
          if iinfo.status == "up":
4701
            i_p_up_mem += iinfo.memory
4702

    
4703
      # compute memory used by instances
4704
      pnr = {
4705
        "tags": list(ninfo.GetTags()),
4706
        "total_memory": remote_info['memory_total'],
4707
        "reserved_memory": remote_info['memory_dom0'],
4708
        "free_memory": remote_info['memory_free'],
4709
        "i_pri_memory": i_p_mem,
4710
        "i_pri_up_memory": i_p_up_mem,
4711
        "total_disk": remote_info['vg_size'],
4712
        "free_disk": remote_info['vg_free'],
4713
        "primary_ip": ninfo.primary_ip,
4714
        "secondary_ip": ninfo.secondary_ip,
4715
        "total_cpus": remote_info['cpu_total'],
4716
        }
4717
      node_results[nname] = pnr
4718
    data["nodes"] = node_results
4719

    
4720
    # instance data
4721
    instance_data = {}
4722
    for iinfo in i_list:
4723
      nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
4724
                  for n in iinfo.nics]
4725
      pir = {
4726
        "tags": list(iinfo.GetTags()),
4727
        "should_run": iinfo.status == "up",
4728
        "vcpus": iinfo.vcpus,
4729
        "memory": iinfo.memory,
4730
        "os": iinfo.os,
4731
        "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
4732
        "nics": nic_data,
4733
        "disks": [{"size": dsk.size, "mode": "w"} for dsk in iinfo.disks],
4734
        "disk_template": iinfo.disk_template,
4735
        }
4736
      instance_data[iinfo.name] = pir
4737

    
4738
    data["instances"] = instance_data
4739

    
4740
    self.in_data = data
4741

    
4742
  def _AddNewInstance(self):
4743
    """Add new instance data to allocator structure.
4744

4745
    This in combination with _AllocatorGetClusterData will create the
4746
    correct structure needed as input for the allocator.
4747

4748
    The checks for the completeness of the opcode must have already been
4749
    done.
4750

4751
    """
4752
    data = self.in_data
4753
    if len(self.disks) != 2:
4754
      raise errors.OpExecError("Only two-disk configurations supported")
4755

    
4756
    disk_space = _ComputeDiskSize(self.disk_template,
4757
                                  self.disks[0]["size"], self.disks[1]["size"])
4758

    
4759
    if self.disk_template in constants.DTS_NET_MIRROR:
4760
      self.required_nodes = 2
4761
    else:
4762
      self.required_nodes = 1
4763
    request = {
4764
      "type": "allocate",
4765
      "name": self.name,
4766
      "disk_template": self.disk_template,
4767
      "tags": self.tags,
4768
      "os": self.os,
4769
      "vcpus": self.vcpus,
4770
      "memory": self.mem_size,
4771
      "disks": self.disks,
4772
      "disk_space_total": disk_space,
4773
      "nics": self.nics,
4774
      "required_nodes": self.required_nodes,
4775
      }
4776
    data["request"] = request
4777

    
4778
  def _AddRelocateInstance(self):
4779
    """Add relocate instance data to allocator structure.
4780

4781
    This in combination with _IAllocatorGetClusterData will create the
4782
    correct structure needed as input for the allocator.
4783

4784
    The checks for the completeness of the opcode must have already been
4785
    done.
4786

4787
    """
4788
    instance = self.cfg.GetInstanceInfo(self.name)
4789
    if instance is None:
4790
      raise errors.ProgrammerError("Unknown instance '%s' passed to"
4791
                                   " IAllocator" % self.name)
4792

    
4793
    if instance.disk_template not in constants.DTS_NET_MIRROR:
4794
      raise errors.OpPrereqError("Can't relocate non-mirrored instances")
4795

    
4796
    if len(instance.secondary_nodes) != 1:
4797
      raise errors.OpPrereqError("Instance has not exactly one secondary node")
4798

    
4799
    self.required_nodes = 1
4800

    
4801
    disk_space = _ComputeDiskSize(instance.disk_template,
4802
                                  instance.disks[0].size,
4803
                                  instance.disks[1].size)
4804

    
4805
    request = {
4806
      "type": "relocate",
4807
      "name": self.name,
4808
      "disk_space_total": disk_space,
4809
      "required_nodes": self.required_nodes,
4810
      "relocate_from": self.relocate_from,
4811
      }
4812
    self.in_data["request"] = request
4813

    
4814
  def _BuildInputData(self):
4815
    """Build input data structures.
4816

4817
    """
4818
    self._ComputeClusterData()
4819

    
4820
    if self.mode == constants.IALLOCATOR_MODE_ALLOC:
4821
      self._AddNewInstance()
4822
    else:
4823
      self._AddRelocateInstance()
4824

    
4825
    self.in_text = serializer.Dump(self.in_data)
4826

    
4827
  def Run(self, name, validate=True, call_fn=rpc.call_iallocator_runner):
4828
    """Run an instance allocator and return the results.
4829

4830
    """
4831
    data = self.in_text
4832

    
4833
    result = call_fn(self.sstore.GetMasterNode(), name, self.in_text)
4834

    
4835
    if not isinstance(result, tuple) or len(result) != 4:
4836
      raise errors.OpExecError("Invalid result from master iallocator runner")
4837

    
4838
    rcode, stdout, stderr, fail = result
4839

    
4840
    if rcode == constants.IARUN_NOTFOUND:
4841
      raise errors.OpExecError("Can't find allocator '%s'" % name)
4842
    elif rcode == constants.IARUN_FAILURE:
4843
        raise errors.OpExecError("Instance allocator call failed: %s,"
4844
                                 " output: %s" %
4845
                                 (fail, stdout+stderr))
4846
    self.out_text = stdout
4847
    if validate:
4848
      self._ValidateResult()
4849

    
4850
  def _ValidateResult(self):
4851
    """Process the allocator results.
4852

4853
    This will process and if successful save the result in
4854
    self.out_data and the other parameters.
4855

4856
    """
4857
    try:
4858
      rdict = serializer.Load(self.out_text)
4859
    except Exception, err:
4860
      raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
4861

    
4862
    if not isinstance(rdict, dict):
4863
      raise errors.OpExecError("Can't parse iallocator results: not a dict")
4864

    
4865
    for key in "success", "info", "nodes":
4866
      if key not in rdict:
4867
        raise errors.OpExecError("Can't parse iallocator results:"
4868
                                 " missing key '%s'" % key)
4869
      setattr(self, key, rdict[key])
4870

    
4871
    if not isinstance(rdict["nodes"], list):
4872
      raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
4873
                               " is not a list")
4874
    self.out_data = rdict
4875

    
4876

    
4877
class LUTestAllocator(NoHooksLU):
4878
  """Run allocator tests.
4879

4880
  This LU runs the allocator tests
4881

4882
  """
4883
  _OP_REQP = ["direction", "mode", "name"]
4884

    
4885
  def CheckPrereq(self):
4886
    """Check prerequisites.
4887

4888
    This checks the opcode parameters depending on the director and mode test.
4889

4890
    """
4891
    if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
4892
      for attr in ["name", "mem_size", "disks", "disk_template",
4893
                   "os", "tags", "nics", "vcpus"]:
4894
        if not hasattr(self.op, attr):
4895
          raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
4896
                                     attr)
4897
      iname = self.cfg.ExpandInstanceName(self.op.name)
4898
      if iname is not None:
4899
        raise errors.OpPrereqError("Instance '%s' already in the cluster" %
4900
                                   iname)
4901
      if not isinstance(self.op.nics, list):
4902
        raise errors.OpPrereqError("Invalid parameter 'nics'")
4903
      for row in self.op.nics:
4904
        if (not isinstance(row, dict) or
4905
            "mac" not in row or
4906
            "ip" not in row or
4907
            "bridge" not in row):
4908
          raise errors.OpPrereqError("Invalid contents of the"
4909
                                     " 'nics' parameter")
4910
      if not isinstance(self.op.disks, list):
4911
        raise errors.OpPrereqError("Invalid parameter 'disks'")
4912
      if len(self.op.disks) != 2:
4913
        raise errors.OpPrereqError("Only two-disk configurations supported")
4914
      for row in self.op.disks:
4915
        if (not isinstance(row, dict) or
4916
            "size" not in row or
4917
            not isinstance(row["size"], int) or
4918
            "mode" not in row or
4919
            row["mode"] not in ['r', 'w']):
4920
          raise errors.OpPrereqError("Invalid contents of the"
4921
                                     " 'disks' parameter")
4922
    elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
4923
      if not hasattr(self.op, "name"):
4924
        raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
4925
      fname = self.cfg.ExpandInstanceName(self.op.name)
4926
      if fname is None:
4927
        raise errors.OpPrereqError("Instance '%s' not found for relocation" %
4928
                                   self.op.name)
4929
      self.op.name = fname
4930
      self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
4931
    else:
4932
      raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
4933
                                 self.op.mode)
4934

    
4935
    if self.op.direction == constants.IALLOCATOR_DIR_OUT:
4936
      if not hasattr(self.op, "allocator") or self.op.allocator is None:
4937
        raise errors.OpPrereqError("Missing allocator name")
4938
    elif self.op.direction != constants.IALLOCATOR_DIR_IN:
4939
      raise errors.OpPrereqError("Wrong allocator test '%s'" %
4940
                                 self.op.direction)
4941

    
4942
  def Exec(self, feedback_fn):
4943
    """Run the allocator test.
4944

4945
    """
4946
    if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
4947
      ial = IAllocator(self.cfg, self.sstore,
4948
                       mode=self.op.mode,
4949
                       name=self.op.name,
4950
                       mem_size=self.op.mem_size,
4951
                       disks=self.op.disks,
4952
                       disk_template=self.op.disk_template,
4953
                       os=self.op.os,
4954
                       tags=self.op.tags,
4955
                       nics=self.op.nics,
4956
                       vcpus=self.op.vcpus,
4957
                       )
4958
    else:
4959
      ial = IAllocator(self.cfg, self.sstore,
4960
                       mode=self.op.mode,
4961
                       name=self.op.name,
4962
                       relocate_from=list(self.relocate_from),
4963
                       )
4964

    
4965
    if self.op.direction == constants.IALLOCATOR_DIR_IN:
4966
      result = ial.in_text
4967
    else:
4968
      ial.Run(self.op.allocator, validate=False)
4969
      result = ial.out_text
4970
    return result