Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ e310b019

History | View | Annotate | Download (179.9 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0613,W0201
25

    
26
import os
27
import os.path
28
import sha
29
import time
30
import tempfile
31
import re
32
import platform
33

    
34
from ganeti import rpc
35
from ganeti import ssh
36
from ganeti import logger
37
from ganeti import utils
38
from ganeti import errors
39
from ganeti import hypervisor
40
from ganeti import locking
41
from ganeti import constants
42
from ganeti import objects
43
from ganeti import opcodes
44
from ganeti import serializer
45

    
46

    
47
class LogicalUnit(object):
48
  """Logical Unit base class.
49

50
  Subclasses must follow these rules:
51
    - implement ExpandNames
52
    - implement CheckPrereq
53
    - implement Exec
54
    - implement BuildHooksEnv
55
    - redefine HPATH and HTYPE
56
    - optionally redefine their run requirements:
57
        REQ_MASTER: the LU needs to run on the master node
58
        REQ_WSSTORE: the LU needs a writable SimpleStore
59
        REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
60

61
  Note that all commands require root permissions.
62

63
  """
64
  HPATH = None
65
  HTYPE = None
66
  _OP_REQP = []
67
  REQ_MASTER = True
68
  REQ_WSSTORE = False
69
  REQ_BGL = True
70

    
71
  def __init__(self, processor, op, context, sstore):
72
    """Constructor for LogicalUnit.
73

74
    This needs to be overriden in derived classes in order to check op
75
    validity.
76

77
    """
78
    self.proc = processor
79
    self.op = op
80
    self.cfg = context.cfg
81
    self.sstore = sstore
82
    self.context = context
83
    self.needed_locks = None
84
    self.acquired_locks = {}
85
    self.share_locks = dict(((i, 0) for i in locking.LEVELS))
86
    # Used to force good behavior when calling helper functions
87
    self.recalculate_locks = {}
88
    self.__ssh = None
89

    
90
    for attr_name in self._OP_REQP:
91
      attr_val = getattr(op, attr_name, None)
92
      if attr_val is None:
93
        raise errors.OpPrereqError("Required parameter '%s' missing" %
94
                                   attr_name)
95

    
96
    if not self.cfg.IsCluster():
97
      raise errors.OpPrereqError("Cluster not initialized yet,"
98
                                 " use 'gnt-cluster init' first.")
99
    if self.REQ_MASTER:
100
      master = sstore.GetMasterNode()
101
      if master != utils.HostInfo().name:
102
        raise errors.OpPrereqError("Commands must be run on the master"
103
                                   " node %s" % master)
104

    
105
  def __GetSSH(self):
106
    """Returns the SshRunner object
107

108
    """
109
    if not self.__ssh:
110
      self.__ssh = ssh.SshRunner(self.sstore)
111
    return self.__ssh
112

    
113
  ssh = property(fget=__GetSSH)
114

    
115
  def ExpandNames(self):
116
    """Expand names for this LU.
117

118
    This method is called before starting to execute the opcode, and it should
119
    update all the parameters of the opcode to their canonical form (e.g. a
120
    short node name must be fully expanded after this method has successfully
121
    completed). This way locking, hooks, logging, ecc. can work correctly.
122

123
    LUs which implement this method must also populate the self.needed_locks
124
    member, as a dict with lock levels as keys, and a list of needed lock names
125
    as values. Rules:
126
      - Use an empty dict if you don't need any lock
127
      - If you don't need any lock at a particular level omit that level
128
      - Don't put anything for the BGL level
129
      - If you want all locks at a level use locking.ALL_SET as a value
130

131
    If you need to share locks (rather than acquire them exclusively) at one
132
    level you can modify self.share_locks, setting a true value (usually 1) for
133
    that level. By default locks are not shared.
134

135
    Examples:
136
    # Acquire all nodes and one instance
137
    self.needed_locks = {
138
      locking.LEVEL_NODE: locking.ALL_SET,
139
      locking.LEVEL_INSTANCES: ['instance1.example.tld'],
140
    }
141
    # Acquire just two nodes
142
    self.needed_locks = {
143
      locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
144
    }
145
    # Acquire no locks
146
    self.needed_locks = {} # No, you can't leave it to the default value None
147

148
    """
149
    # The implementation of this method is mandatory only if the new LU is
150
    # concurrent, so that old LUs don't need to be changed all at the same
151
    # time.
152
    if self.REQ_BGL:
153
      self.needed_locks = {} # Exclusive LUs don't need locks.
154
    else:
155
      raise NotImplementedError
156

    
157
  def DeclareLocks(self, level):
158
    """Declare LU locking needs for a level
159

160
    While most LUs can just declare their locking needs at ExpandNames time,
161
    sometimes there's the need to calculate some locks after having acquired
162
    the ones before. This function is called just before acquiring locks at a
163
    particular level, but after acquiring the ones at lower levels, and permits
164
    such calculations. It can be used to modify self.needed_locks, and by
165
    default it does nothing.
166

167
    This function is only called if you have something already set in
168
    self.needed_locks for the level.
169

170
    @param level: Locking level which is going to be locked
171
    @type level: member of ganeti.locking.LEVELS
172

173
    """
174

    
175
  def CheckPrereq(self):
176
    """Check prerequisites for this LU.
177

178
    This method should check that the prerequisites for the execution
179
    of this LU are fulfilled. It can do internode communication, but
180
    it should be idempotent - no cluster or system changes are
181
    allowed.
182

183
    The method should raise errors.OpPrereqError in case something is
184
    not fulfilled. Its return value is ignored.
185

186
    This method should also update all the parameters of the opcode to
187
    their canonical form if it hasn't been done by ExpandNames before.
188

189
    """
190
    raise NotImplementedError
191

    
192
  def Exec(self, feedback_fn):
193
    """Execute the LU.
194

195
    This method should implement the actual work. It should raise
196
    errors.OpExecError for failures that are somewhat dealt with in
197
    code, or expected.
198

199
    """
200
    raise NotImplementedError
201

    
202
  def BuildHooksEnv(self):
203
    """Build hooks environment for this LU.
204

205
    This method should return a three-node tuple consisting of: a dict
206
    containing the environment that will be used for running the
207
    specific hook for this LU, a list of node names on which the hook
208
    should run before the execution, and a list of node names on which
209
    the hook should run after the execution.
210

211
    The keys of the dict must not have 'GANETI_' prefixed as this will
212
    be handled in the hooks runner. Also note additional keys will be
213
    added by the hooks runner. If the LU doesn't define any
214
    environment, an empty dict (and not None) should be returned.
215

216
    No nodes should be returned as an empty list (and not None).
217

218
    Note that if the HPATH for a LU class is None, this function will
219
    not be called.
220

221
    """
222
    raise NotImplementedError
223

    
224
  def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
225
    """Notify the LU about the results of its hooks.
226

227
    This method is called every time a hooks phase is executed, and notifies
228
    the Logical Unit about the hooks' result. The LU can then use it to alter
229
    its result based on the hooks.  By default the method does nothing and the
230
    previous result is passed back unchanged but any LU can define it if it
231
    wants to use the local cluster hook-scripts somehow.
232

233
    Args:
234
      phase: the hooks phase that has just been run
235
      hooks_results: the results of the multi-node hooks rpc call
236
      feedback_fn: function to send feedback back to the caller
237
      lu_result: the previous result this LU had, or None in the PRE phase.
238

239
    """
240
    return lu_result
241

    
242
  def _ExpandAndLockInstance(self):
243
    """Helper function to expand and lock an instance.
244

245
    Many LUs that work on an instance take its name in self.op.instance_name
246
    and need to expand it and then declare the expanded name for locking. This
247
    function does it, and then updates self.op.instance_name to the expanded
248
    name. It also initializes needed_locks as a dict, if this hasn't been done
249
    before.
250

251
    """
252
    if self.needed_locks is None:
253
      self.needed_locks = {}
254
    else:
255
      assert locking.LEVEL_INSTANCE not in self.needed_locks, \
256
        "_ExpandAndLockInstance called with instance-level locks set"
257
    expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
258
    if expanded_name is None:
259
      raise errors.OpPrereqError("Instance '%s' not known" %
260
                                  self.op.instance_name)
261
    self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
262
    self.op.instance_name = expanded_name
263

    
264
  def _LockInstancesNodes(self):
265
    """Helper function to declare instances' nodes for locking.
266

267
    This function should be called after locking one or more instances to lock
268
    their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
269
    with all primary or secondary nodes for instances already locked and
270
    present in self.needed_locks[locking.LEVEL_INSTANCE].
271

272
    It should be called from DeclareLocks, and for safety only works if
273
    self.recalculate_locks[locking.LEVEL_NODE] is set.
274

275
    In the future it may grow parameters to just lock some instance's nodes, or
276
    to just lock primaries or secondary nodes, if needed.
277

278
    If should be called in DeclareLocks in a way similar to:
279

280
    if level == locking.LEVEL_NODE:
281
      self._LockInstancesNodes()
282

283
    """
284
    assert locking.LEVEL_NODE in self.recalculate_locks, \
285
      "_LockInstancesNodes helper function called with no nodes to recalculate"
286

    
287
    # TODO: check if we're really been called with the instance locks held
288

    
289
    # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
290
    # future we might want to have different behaviors depending on the value
291
    # of self.recalculate_locks[locking.LEVEL_NODE]
292
    wanted_nodes = []
293
    for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
294
      instance = self.context.cfg.GetInstanceInfo(instance_name)
295
      wanted_nodes.append(instance.primary_node)
296
      wanted_nodes.extend(instance.secondary_nodes)
297
    self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
298

    
299
    del self.recalculate_locks[locking.LEVEL_NODE]
300

    
301

    
302
class NoHooksLU(LogicalUnit):
303
  """Simple LU which runs no hooks.
304

305
  This LU is intended as a parent for other LogicalUnits which will
306
  run no hooks, in order to reduce duplicate code.
307

308
  """
309
  HPATH = None
310
  HTYPE = None
311

    
312

    
313
def _GetWantedNodes(lu, nodes):
314
  """Returns list of checked and expanded node names.
315

316
  Args:
317
    nodes: List of nodes (strings) or None for all
318

319
  """
320
  if not isinstance(nodes, list):
321
    raise errors.OpPrereqError("Invalid argument type 'nodes'")
322

    
323
  if not nodes:
324
    raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
325
      " non-empty list of nodes whose name is to be expanded.")
326

    
327
  wanted = []
328
  for name in nodes:
329
    node = lu.cfg.ExpandNodeName(name)
330
    if node is None:
331
      raise errors.OpPrereqError("No such node name '%s'" % name)
332
    wanted.append(node)
333

    
334
  return utils.NiceSort(wanted)
335

    
336

    
337
def _GetWantedInstances(lu, instances):
338
  """Returns list of checked and expanded instance names.
339

340
  Args:
341
    instances: List of instances (strings) or None for all
342

343
  """
344
  if not isinstance(instances, list):
345
    raise errors.OpPrereqError("Invalid argument type 'instances'")
346

    
347
  if instances:
348
    wanted = []
349

    
350
    for name in instances:
351
      instance = lu.cfg.ExpandInstanceName(name)
352
      if instance is None:
353
        raise errors.OpPrereqError("No such instance name '%s'" % name)
354
      wanted.append(instance)
355

    
356
  else:
357
    wanted = lu.cfg.GetInstanceList()
358
  return utils.NiceSort(wanted)
359

    
360

    
361
def _CheckOutputFields(static, dynamic, selected):
362
  """Checks whether all selected fields are valid.
363

364
  Args:
365
    static: Static fields
366
    dynamic: Dynamic fields
367

368
  """
369
  static_fields = frozenset(static)
370
  dynamic_fields = frozenset(dynamic)
371

    
372
  all_fields = static_fields | dynamic_fields
373

    
374
  if not all_fields.issuperset(selected):
375
    raise errors.OpPrereqError("Unknown output fields selected: %s"
376
                               % ",".join(frozenset(selected).
377
                                          difference(all_fields)))
378

    
379

    
380
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
381
                          memory, vcpus, nics):
382
  """Builds instance related env variables for hooks from single variables.
383

384
  Args:
385
    secondary_nodes: List of secondary nodes as strings
386
  """
387
  env = {
388
    "OP_TARGET": name,
389
    "INSTANCE_NAME": name,
390
    "INSTANCE_PRIMARY": primary_node,
391
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
392
    "INSTANCE_OS_TYPE": os_type,
393
    "INSTANCE_STATUS": status,
394
    "INSTANCE_MEMORY": memory,
395
    "INSTANCE_VCPUS": vcpus,
396
  }
397

    
398
  if nics:
399
    nic_count = len(nics)
400
    for idx, (ip, bridge, mac) in enumerate(nics):
401
      if ip is None:
402
        ip = ""
403
      env["INSTANCE_NIC%d_IP" % idx] = ip
404
      env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
405
      env["INSTANCE_NIC%d_HWADDR" % idx] = mac
406
  else:
407
    nic_count = 0
408

    
409
  env["INSTANCE_NIC_COUNT"] = nic_count
410

    
411
  return env
412

    
413

    
414
def _BuildInstanceHookEnvByObject(instance, override=None):
415
  """Builds instance related env variables for hooks from an object.
416

417
  Args:
418
    instance: objects.Instance object of instance
419
    override: dict of values to override
420
  """
421
  args = {
422
    'name': instance.name,
423
    'primary_node': instance.primary_node,
424
    'secondary_nodes': instance.secondary_nodes,
425
    'os_type': instance.os,
426
    'status': instance.os,
427
    'memory': instance.memory,
428
    'vcpus': instance.vcpus,
429
    'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
430
  }
431
  if override:
432
    args.update(override)
433
  return _BuildInstanceHookEnv(**args)
434

    
435

    
436
def _CheckInstanceBridgesExist(instance):
437
  """Check that the brigdes needed by an instance exist.
438

439
  """
440
  # check bridges existance
441
  brlist = [nic.bridge for nic in instance.nics]
442
  if not rpc.call_bridges_exist(instance.primary_node, brlist):
443
    raise errors.OpPrereqError("one or more target bridges %s does not"
444
                               " exist on destination node '%s'" %
445
                               (brlist, instance.primary_node))
446

    
447

    
448
class LUDestroyCluster(NoHooksLU):
449
  """Logical unit for destroying the cluster.
450

451
  """
452
  _OP_REQP = []
453

    
454
  def CheckPrereq(self):
455
    """Check prerequisites.
456

457
    This checks whether the cluster is empty.
458

459
    Any errors are signalled by raising errors.OpPrereqError.
460

461
    """
462
    master = self.sstore.GetMasterNode()
463

    
464
    nodelist = self.cfg.GetNodeList()
465
    if len(nodelist) != 1 or nodelist[0] != master:
466
      raise errors.OpPrereqError("There are still %d node(s) in"
467
                                 " this cluster." % (len(nodelist) - 1))
468
    instancelist = self.cfg.GetInstanceList()
469
    if instancelist:
470
      raise errors.OpPrereqError("There are still %d instance(s) in"
471
                                 " this cluster." % len(instancelist))
472

    
473
  def Exec(self, feedback_fn):
474
    """Destroys the cluster.
475

476
    """
477
    master = self.sstore.GetMasterNode()
478
    if not rpc.call_node_stop_master(master, False):
479
      raise errors.OpExecError("Could not disable the master role")
480
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
481
    utils.CreateBackup(priv_key)
482
    utils.CreateBackup(pub_key)
483
    return master
484

    
485

    
486
class LUVerifyCluster(LogicalUnit):
487
  """Verifies the cluster status.
488

489
  """
490
  HPATH = "cluster-verify"
491
  HTYPE = constants.HTYPE_CLUSTER
492
  _OP_REQP = ["skip_checks"]
493

    
494
  def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
495
                  remote_version, feedback_fn):
496
    """Run multiple tests against a node.
497

498
    Test list:
499
      - compares ganeti version
500
      - checks vg existance and size > 20G
501
      - checks config file checksum
502
      - checks ssh to other nodes
503

504
    Args:
505
      node: name of the node to check
506
      file_list: required list of files
507
      local_cksum: dictionary of local files and their checksums
508

509
    """
510
    # compares ganeti version
511
    local_version = constants.PROTOCOL_VERSION
512
    if not remote_version:
513
      feedback_fn("  - ERROR: connection to %s failed" % (node))
514
      return True
515

    
516
    if local_version != remote_version:
517
      feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
518
                      (local_version, node, remote_version))
519
      return True
520

    
521
    # checks vg existance and size > 20G
522

    
523
    bad = False
524
    if not vglist:
525
      feedback_fn("  - ERROR: unable to check volume groups on node %s." %
526
                      (node,))
527
      bad = True
528
    else:
529
      vgstatus = utils.CheckVolumeGroupSize(vglist, self.cfg.GetVGName(),
530
                                            constants.MIN_VG_SIZE)
531
      if vgstatus:
532
        feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
533
        bad = True
534

    
535
    # checks config file checksum
536
    # checks ssh to any
537

    
538
    if 'filelist' not in node_result:
539
      bad = True
540
      feedback_fn("  - ERROR: node hasn't returned file checksum data")
541
    else:
542
      remote_cksum = node_result['filelist']
543
      for file_name in file_list:
544
        if file_name not in remote_cksum:
545
          bad = True
546
          feedback_fn("  - ERROR: file '%s' missing" % file_name)
547
        elif remote_cksum[file_name] != local_cksum[file_name]:
548
          bad = True
549
          feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
550

    
551
    if 'nodelist' not in node_result:
552
      bad = True
553
      feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
554
    else:
555
      if node_result['nodelist']:
556
        bad = True
557
        for node in node_result['nodelist']:
558
          feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
559
                          (node, node_result['nodelist'][node]))
560
    if 'node-net-test' not in node_result:
561
      bad = True
562
      feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
563
    else:
564
      if node_result['node-net-test']:
565
        bad = True
566
        nlist = utils.NiceSort(node_result['node-net-test'].keys())
567
        for node in nlist:
568
          feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
569
                          (node, node_result['node-net-test'][node]))
570

    
571
    hyp_result = node_result.get('hypervisor', None)
572
    if hyp_result is not None:
573
      feedback_fn("  - ERROR: hypervisor verify failure: '%s'" % hyp_result)
574
    return bad
575

    
576
  def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
577
                      node_instance, feedback_fn):
578
    """Verify an instance.
579

580
    This function checks to see if the required block devices are
581
    available on the instance's node.
582

583
    """
584
    bad = False
585

    
586
    node_current = instanceconfig.primary_node
587

    
588
    node_vol_should = {}
589
    instanceconfig.MapLVsByNode(node_vol_should)
590

    
591
    for node in node_vol_should:
592
      for volume in node_vol_should[node]:
593
        if node not in node_vol_is or volume not in node_vol_is[node]:
594
          feedback_fn("  - ERROR: volume %s missing on node %s" %
595
                          (volume, node))
596
          bad = True
597

    
598
    if not instanceconfig.status == 'down':
599
      if (node_current not in node_instance or
600
          not instance in node_instance[node_current]):
601
        feedback_fn("  - ERROR: instance %s not running on node %s" %
602
                        (instance, node_current))
603
        bad = True
604

    
605
    for node in node_instance:
606
      if (not node == node_current):
607
        if instance in node_instance[node]:
608
          feedback_fn("  - ERROR: instance %s should not run on node %s" %
609
                          (instance, node))
610
          bad = True
611

    
612
    return bad
613

    
614
  def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
615
    """Verify if there are any unknown volumes in the cluster.
616

617
    The .os, .swap and backup volumes are ignored. All other volumes are
618
    reported as unknown.
619

620
    """
621
    bad = False
622

    
623
    for node in node_vol_is:
624
      for volume in node_vol_is[node]:
625
        if node not in node_vol_should or volume not in node_vol_should[node]:
626
          feedback_fn("  - ERROR: volume %s on node %s should not exist" %
627
                      (volume, node))
628
          bad = True
629
    return bad
630

    
631
  def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
632
    """Verify the list of running instances.
633

634
    This checks what instances are running but unknown to the cluster.
635

636
    """
637
    bad = False
638
    for node in node_instance:
639
      for runninginstance in node_instance[node]:
640
        if runninginstance not in instancelist:
641
          feedback_fn("  - ERROR: instance %s on node %s should not exist" %
642
                          (runninginstance, node))
643
          bad = True
644
    return bad
645

    
646
  def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
647
    """Verify N+1 Memory Resilience.
648

649
    Check that if one single node dies we can still start all the instances it
650
    was primary for.
651

652
    """
653
    bad = False
654

    
655
    for node, nodeinfo in node_info.iteritems():
656
      # This code checks that every node which is now listed as secondary has
657
      # enough memory to host all instances it is supposed to should a single
658
      # other node in the cluster fail.
659
      # FIXME: not ready for failover to an arbitrary node
660
      # FIXME: does not support file-backed instances
661
      # WARNING: we currently take into account down instances as well as up
662
      # ones, considering that even if they're down someone might want to start
663
      # them even in the event of a node failure.
664
      for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
665
        needed_mem = 0
666
        for instance in instances:
667
          needed_mem += instance_cfg[instance].memory
668
        if nodeinfo['mfree'] < needed_mem:
669
          feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
670
                      " failovers should node %s fail" % (node, prinode))
671
          bad = True
672
    return bad
673

    
674
  def CheckPrereq(self):
675
    """Check prerequisites.
676

677
    Transform the list of checks we're going to skip into a set and check that
678
    all its members are valid.
679

680
    """
681
    self.skip_set = frozenset(self.op.skip_checks)
682
    if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
683
      raise errors.OpPrereqError("Invalid checks to be skipped specified")
684

    
685
  def BuildHooksEnv(self):
686
    """Build hooks env.
687

688
    Cluster-Verify hooks just rone in the post phase and their failure makes
689
    the output be logged in the verify output and the verification to fail.
690

691
    """
692
    all_nodes = self.cfg.GetNodeList()
693
    # TODO: populate the environment with useful information for verify hooks
694
    env = {}
695
    return env, [], all_nodes
696

    
697
  def Exec(self, feedback_fn):
698
    """Verify integrity of cluster, performing various test on nodes.
699

700
    """
701
    bad = False
702
    feedback_fn("* Verifying global settings")
703
    for msg in self.cfg.VerifyConfig():
704
      feedback_fn("  - ERROR: %s" % msg)
705

    
706
    vg_name = self.cfg.GetVGName()
707
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
708
    nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
709
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
710
    i_non_redundant = [] # Non redundant instances
711
    node_volume = {}
712
    node_instance = {}
713
    node_info = {}
714
    instance_cfg = {}
715

    
716
    # FIXME: verify OS list
717
    # do local checksums
718
    file_names = list(self.sstore.GetFileList())
719
    file_names.append(constants.SSL_CERT_FILE)
720
    file_names.append(constants.CLUSTER_CONF_FILE)
721
    local_checksums = utils.FingerprintFiles(file_names)
722

    
723
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
724
    all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
725
    all_instanceinfo = rpc.call_instance_list(nodelist)
726
    all_vglist = rpc.call_vg_list(nodelist)
727
    node_verify_param = {
728
      'filelist': file_names,
729
      'nodelist': nodelist,
730
      'hypervisor': None,
731
      'node-net-test': [(node.name, node.primary_ip, node.secondary_ip)
732
                        for node in nodeinfo]
733
      }
734
    all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
735
    all_rversion = rpc.call_version(nodelist)
736
    all_ninfo = rpc.call_node_info(nodelist, self.cfg.GetVGName())
737

    
738
    for node in nodelist:
739
      feedback_fn("* Verifying node %s" % node)
740
      result = self._VerifyNode(node, file_names, local_checksums,
741
                                all_vglist[node], all_nvinfo[node],
742
                                all_rversion[node], feedback_fn)
743
      bad = bad or result
744

    
745
      # node_volume
746
      volumeinfo = all_volumeinfo[node]
747

    
748
      if isinstance(volumeinfo, basestring):
749
        feedback_fn("  - ERROR: LVM problem on node %s: %s" %
750
                    (node, volumeinfo[-400:].encode('string_escape')))
751
        bad = True
752
        node_volume[node] = {}
753
      elif not isinstance(volumeinfo, dict):
754
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
755
        bad = True
756
        continue
757
      else:
758
        node_volume[node] = volumeinfo
759

    
760
      # node_instance
761
      nodeinstance = all_instanceinfo[node]
762
      if type(nodeinstance) != list:
763
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
764
        bad = True
765
        continue
766

    
767
      node_instance[node] = nodeinstance
768

    
769
      # node_info
770
      nodeinfo = all_ninfo[node]
771
      if not isinstance(nodeinfo, dict):
772
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
773
        bad = True
774
        continue
775

    
776
      try:
777
        node_info[node] = {
778
          "mfree": int(nodeinfo['memory_free']),
779
          "dfree": int(nodeinfo['vg_free']),
780
          "pinst": [],
781
          "sinst": [],
782
          # dictionary holding all instances this node is secondary for,
783
          # grouped by their primary node. Each key is a cluster node, and each
784
          # value is a list of instances which have the key as primary and the
785
          # current node as secondary.  this is handy to calculate N+1 memory
786
          # availability if you can only failover from a primary to its
787
          # secondary.
788
          "sinst-by-pnode": {},
789
        }
790
      except ValueError:
791
        feedback_fn("  - ERROR: invalid value returned from node %s" % (node,))
792
        bad = True
793
        continue
794

    
795
    node_vol_should = {}
796

    
797
    for instance in instancelist:
798
      feedback_fn("* Verifying instance %s" % instance)
799
      inst_config = self.cfg.GetInstanceInfo(instance)
800
      result =  self._VerifyInstance(instance, inst_config, node_volume,
801
                                     node_instance, feedback_fn)
802
      bad = bad or result
803

    
804
      inst_config.MapLVsByNode(node_vol_should)
805

    
806
      instance_cfg[instance] = inst_config
807

    
808
      pnode = inst_config.primary_node
809
      if pnode in node_info:
810
        node_info[pnode]['pinst'].append(instance)
811
      else:
812
        feedback_fn("  - ERROR: instance %s, connection to primary node"
813
                    " %s failed" % (instance, pnode))
814
        bad = True
815

    
816
      # If the instance is non-redundant we cannot survive losing its primary
817
      # node, so we are not N+1 compliant. On the other hand we have no disk
818
      # templates with more than one secondary so that situation is not well
819
      # supported either.
820
      # FIXME: does not support file-backed instances
821
      if len(inst_config.secondary_nodes) == 0:
822
        i_non_redundant.append(instance)
823
      elif len(inst_config.secondary_nodes) > 1:
824
        feedback_fn("  - WARNING: multiple secondaries for instance %s"
825
                    % instance)
826

    
827
      for snode in inst_config.secondary_nodes:
828
        if snode in node_info:
829
          node_info[snode]['sinst'].append(instance)
830
          if pnode not in node_info[snode]['sinst-by-pnode']:
831
            node_info[snode]['sinst-by-pnode'][pnode] = []
832
          node_info[snode]['sinst-by-pnode'][pnode].append(instance)
833
        else:
834
          feedback_fn("  - ERROR: instance %s, connection to secondary node"
835
                      " %s failed" % (instance, snode))
836

    
837
    feedback_fn("* Verifying orphan volumes")
838
    result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
839
                                       feedback_fn)
840
    bad = bad or result
841

    
842
    feedback_fn("* Verifying remaining instances")
843
    result = self._VerifyOrphanInstances(instancelist, node_instance,
844
                                         feedback_fn)
845
    bad = bad or result
846

    
847
    if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
848
      feedback_fn("* Verifying N+1 Memory redundancy")
849
      result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
850
      bad = bad or result
851

    
852
    feedback_fn("* Other Notes")
853
    if i_non_redundant:
854
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
855
                  % len(i_non_redundant))
856

    
857
    return not bad
858

    
859
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
860
    """Analize the post-hooks' result, handle it, and send some
861
    nicely-formatted feedback back to the user.
862

863
    Args:
864
      phase: the hooks phase that has just been run
865
      hooks_results: the results of the multi-node hooks rpc call
866
      feedback_fn: function to send feedback back to the caller
867
      lu_result: previous Exec result
868

869
    """
870
    # We only really run POST phase hooks, and are only interested in
871
    # their results
872
    if phase == constants.HOOKS_PHASE_POST:
873
      # Used to change hooks' output to proper indentation
874
      indent_re = re.compile('^', re.M)
875
      feedback_fn("* Hooks Results")
876
      if not hooks_results:
877
        feedback_fn("  - ERROR: general communication failure")
878
        lu_result = 1
879
      else:
880
        for node_name in hooks_results:
881
          show_node_header = True
882
          res = hooks_results[node_name]
883
          if res is False or not isinstance(res, list):
884
            feedback_fn("    Communication failure")
885
            lu_result = 1
886
            continue
887
          for script, hkr, output in res:
888
            if hkr == constants.HKR_FAIL:
889
              # The node header is only shown once, if there are
890
              # failing hooks on that node
891
              if show_node_header:
892
                feedback_fn("  Node %s:" % node_name)
893
                show_node_header = False
894
              feedback_fn("    ERROR: Script %s failed, output:" % script)
895
              output = indent_re.sub('      ', output)
896
              feedback_fn("%s" % output)
897
              lu_result = 1
898

    
899
      return lu_result
900

    
901

    
902
class LUVerifyDisks(NoHooksLU):
903
  """Verifies the cluster disks status.
904

905
  """
906
  _OP_REQP = []
907

    
908
  def CheckPrereq(self):
909
    """Check prerequisites.
910

911
    This has no prerequisites.
912

913
    """
914
    pass
915

    
916
  def Exec(self, feedback_fn):
917
    """Verify integrity of cluster disks.
918

919
    """
920
    result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
921

    
922
    vg_name = self.cfg.GetVGName()
923
    nodes = utils.NiceSort(self.cfg.GetNodeList())
924
    instances = [self.cfg.GetInstanceInfo(name)
925
                 for name in self.cfg.GetInstanceList()]
926

    
927
    nv_dict = {}
928
    for inst in instances:
929
      inst_lvs = {}
930
      if (inst.status != "up" or
931
          inst.disk_template not in constants.DTS_NET_MIRROR):
932
        continue
933
      inst.MapLVsByNode(inst_lvs)
934
      # transform { iname: {node: [vol,],},} to {(node, vol): iname}
935
      for node, vol_list in inst_lvs.iteritems():
936
        for vol in vol_list:
937
          nv_dict[(node, vol)] = inst
938

    
939
    if not nv_dict:
940
      return result
941

    
942
    node_lvs = rpc.call_volume_list(nodes, vg_name)
943

    
944
    to_act = set()
945
    for node in nodes:
946
      # node_volume
947
      lvs = node_lvs[node]
948

    
949
      if isinstance(lvs, basestring):
950
        logger.Info("error enumerating LVs on node %s: %s" % (node, lvs))
951
        res_nlvm[node] = lvs
952
      elif not isinstance(lvs, dict):
953
        logger.Info("connection to node %s failed or invalid data returned" %
954
                    (node,))
955
        res_nodes.append(node)
956
        continue
957

    
958
      for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
959
        inst = nv_dict.pop((node, lv_name), None)
960
        if (not lv_online and inst is not None
961
            and inst.name not in res_instances):
962
          res_instances.append(inst.name)
963

    
964
    # any leftover items in nv_dict are missing LVs, let's arrange the
965
    # data better
966
    for key, inst in nv_dict.iteritems():
967
      if inst.name not in res_missing:
968
        res_missing[inst.name] = []
969
      res_missing[inst.name].append(key)
970

    
971
    return result
972

    
973

    
974
class LURenameCluster(LogicalUnit):
975
  """Rename the cluster.
976

977
  """
978
  HPATH = "cluster-rename"
979
  HTYPE = constants.HTYPE_CLUSTER
980
  _OP_REQP = ["name"]
981
  REQ_WSSTORE = True
982

    
983
  def BuildHooksEnv(self):
984
    """Build hooks env.
985

986
    """
987
    env = {
988
      "OP_TARGET": self.sstore.GetClusterName(),
989
      "NEW_NAME": self.op.name,
990
      }
991
    mn = self.sstore.GetMasterNode()
992
    return env, [mn], [mn]
993

    
994
  def CheckPrereq(self):
995
    """Verify that the passed name is a valid one.
996

997
    """
998
    hostname = utils.HostInfo(self.op.name)
999

    
1000
    new_name = hostname.name
1001
    self.ip = new_ip = hostname.ip
1002
    old_name = self.sstore.GetClusterName()
1003
    old_ip = self.sstore.GetMasterIP()
1004
    if new_name == old_name and new_ip == old_ip:
1005
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
1006
                                 " cluster has changed")
1007
    if new_ip != old_ip:
1008
      if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1009
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
1010
                                   " reachable on the network. Aborting." %
1011
                                   new_ip)
1012

    
1013
    self.op.name = new_name
1014

    
1015
  def Exec(self, feedback_fn):
1016
    """Rename the cluster.
1017

1018
    """
1019
    clustername = self.op.name
1020
    ip = self.ip
1021
    ss = self.sstore
1022

    
1023
    # shutdown the master IP
1024
    master = ss.GetMasterNode()
1025
    if not rpc.call_node_stop_master(master, False):
1026
      raise errors.OpExecError("Could not disable the master role")
1027

    
1028
    try:
1029
      # modify the sstore
1030
      ss.SetKey(ss.SS_MASTER_IP, ip)
1031
      ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
1032

    
1033
      # Distribute updated ss config to all nodes
1034
      myself = self.cfg.GetNodeInfo(master)
1035
      dist_nodes = self.cfg.GetNodeList()
1036
      if myself.name in dist_nodes:
1037
        dist_nodes.remove(myself.name)
1038

    
1039
      logger.Debug("Copying updated ssconf data to all nodes")
1040
      for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1041
        fname = ss.KeyToFilename(keyname)
1042
        result = rpc.call_upload_file(dist_nodes, fname)
1043
        for to_node in dist_nodes:
1044
          if not result[to_node]:
1045
            logger.Error("copy of file %s to node %s failed" %
1046
                         (fname, to_node))
1047
    finally:
1048
      if not rpc.call_node_start_master(master, False):
1049
        logger.Error("Could not re-enable the master role on the master,"
1050
                     " please restart manually.")
1051

    
1052

    
1053
def _RecursiveCheckIfLVMBased(disk):
1054
  """Check if the given disk or its children are lvm-based.
1055

1056
  Args:
1057
    disk: ganeti.objects.Disk object
1058

1059
  Returns:
1060
    boolean indicating whether a LD_LV dev_type was found or not
1061

1062
  """
1063
  if disk.children:
1064
    for chdisk in disk.children:
1065
      if _RecursiveCheckIfLVMBased(chdisk):
1066
        return True
1067
  return disk.dev_type == constants.LD_LV
1068

    
1069

    
1070
class LUSetClusterParams(LogicalUnit):
1071
  """Change the parameters of the cluster.
1072

1073
  """
1074
  HPATH = "cluster-modify"
1075
  HTYPE = constants.HTYPE_CLUSTER
1076
  _OP_REQP = []
1077

    
1078
  def BuildHooksEnv(self):
1079
    """Build hooks env.
1080

1081
    """
1082
    env = {
1083
      "OP_TARGET": self.sstore.GetClusterName(),
1084
      "NEW_VG_NAME": self.op.vg_name,
1085
      }
1086
    mn = self.sstore.GetMasterNode()
1087
    return env, [mn], [mn]
1088

    
1089
  def CheckPrereq(self):
1090
    """Check prerequisites.
1091

1092
    This checks whether the given params don't conflict and
1093
    if the given volume group is valid.
1094

1095
    """
1096
    if not self.op.vg_name:
1097
      instances = [self.cfg.GetInstanceInfo(name)
1098
                   for name in self.cfg.GetInstanceList()]
1099
      for inst in instances:
1100
        for disk in inst.disks:
1101
          if _RecursiveCheckIfLVMBased(disk):
1102
            raise errors.OpPrereqError("Cannot disable lvm storage while"
1103
                                       " lvm-based instances exist")
1104

    
1105
    # if vg_name not None, checks given volume group on all nodes
1106
    if self.op.vg_name:
1107
      node_list = self.cfg.GetNodeList()
1108
      vglist = rpc.call_vg_list(node_list)
1109
      for node in node_list:
1110
        vgstatus = utils.CheckVolumeGroupSize(vglist[node], self.op.vg_name,
1111
                                              constants.MIN_VG_SIZE)
1112
        if vgstatus:
1113
          raise errors.OpPrereqError("Error on node '%s': %s" %
1114
                                     (node, vgstatus))
1115

    
1116
  def Exec(self, feedback_fn):
1117
    """Change the parameters of the cluster.
1118

1119
    """
1120
    if self.op.vg_name != self.cfg.GetVGName():
1121
      self.cfg.SetVGName(self.op.vg_name)
1122
    else:
1123
      feedback_fn("Cluster LVM configuration already in desired"
1124
                  " state, not changing")
1125

    
1126

    
1127
def _WaitForSync(cfgw, instance, proc, oneshot=False, unlock=False):
1128
  """Sleep and poll for an instance's disk to sync.
1129

1130
  """
1131
  if not instance.disks:
1132
    return True
1133

    
1134
  if not oneshot:
1135
    proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1136

    
1137
  node = instance.primary_node
1138

    
1139
  for dev in instance.disks:
1140
    cfgw.SetDiskID(dev, node)
1141

    
1142
  retries = 0
1143
  while True:
1144
    max_time = 0
1145
    done = True
1146
    cumul_degraded = False
1147
    rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1148
    if not rstats:
1149
      proc.LogWarning("Can't get any data from node %s" % node)
1150
      retries += 1
1151
      if retries >= 10:
1152
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1153
                                 " aborting." % node)
1154
      time.sleep(6)
1155
      continue
1156
    retries = 0
1157
    for i in range(len(rstats)):
1158
      mstat = rstats[i]
1159
      if mstat is None:
1160
        proc.LogWarning("Can't compute data for node %s/%s" %
1161
                        (node, instance.disks[i].iv_name))
1162
        continue
1163
      # we ignore the ldisk parameter
1164
      perc_done, est_time, is_degraded, _ = mstat
1165
      cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1166
      if perc_done is not None:
1167
        done = False
1168
        if est_time is not None:
1169
          rem_time = "%d estimated seconds remaining" % est_time
1170
          max_time = est_time
1171
        else:
1172
          rem_time = "no time estimate"
1173
        proc.LogInfo("- device %s: %5.2f%% done, %s" %
1174
                     (instance.disks[i].iv_name, perc_done, rem_time))
1175
    if done or oneshot:
1176
      break
1177

    
1178
    time.sleep(min(60, max_time))
1179

    
1180
  if done:
1181
    proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1182
  return not cumul_degraded
1183

    
1184

    
1185
def _CheckDiskConsistency(cfgw, dev, node, on_primary, ldisk=False):
1186
  """Check that mirrors are not degraded.
1187

1188
  The ldisk parameter, if True, will change the test from the
1189
  is_degraded attribute (which represents overall non-ok status for
1190
  the device(s)) to the ldisk (representing the local storage status).
1191

1192
  """
1193
  cfgw.SetDiskID(dev, node)
1194
  if ldisk:
1195
    idx = 6
1196
  else:
1197
    idx = 5
1198

    
1199
  result = True
1200
  if on_primary or dev.AssembleOnSecondary():
1201
    rstats = rpc.call_blockdev_find(node, dev)
1202
    if not rstats:
1203
      logger.ToStderr("Node %s: Disk degraded, not found or node down" % node)
1204
      result = False
1205
    else:
1206
      result = result and (not rstats[idx])
1207
  if dev.children:
1208
    for child in dev.children:
1209
      result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1210

    
1211
  return result
1212

    
1213

    
1214
class LUDiagnoseOS(NoHooksLU):
1215
  """Logical unit for OS diagnose/query.
1216

1217
  """
1218
  _OP_REQP = ["output_fields", "names"]
1219
  REQ_BGL = False
1220

    
1221
  def ExpandNames(self):
1222
    if self.op.names:
1223
      raise errors.OpPrereqError("Selective OS query not supported")
1224

    
1225
    self.dynamic_fields = frozenset(["name", "valid", "node_status"])
1226
    _CheckOutputFields(static=[],
1227
                       dynamic=self.dynamic_fields,
1228
                       selected=self.op.output_fields)
1229

    
1230
    # Lock all nodes, in shared mode
1231
    self.needed_locks = {}
1232
    self.share_locks[locking.LEVEL_NODE] = 1
1233
    self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1234

    
1235
  def CheckPrereq(self):
1236
    """Check prerequisites.
1237

1238
    """
1239

    
1240
  @staticmethod
1241
  def _DiagnoseByOS(node_list, rlist):
1242
    """Remaps a per-node return list into an a per-os per-node dictionary
1243

1244
      Args:
1245
        node_list: a list with the names of all nodes
1246
        rlist: a map with node names as keys and OS objects as values
1247

1248
      Returns:
1249
        map: a map with osnames as keys and as value another map, with
1250
             nodes as
1251
             keys and list of OS objects as values
1252
             e.g. {"debian-etch": {"node1": [<object>,...],
1253
                                   "node2": [<object>,]}
1254
                  }
1255

1256
    """
1257
    all_os = {}
1258
    for node_name, nr in rlist.iteritems():
1259
      if not nr:
1260
        continue
1261
      for os_obj in nr:
1262
        if os_obj.name not in all_os:
1263
          # build a list of nodes for this os containing empty lists
1264
          # for each node in node_list
1265
          all_os[os_obj.name] = {}
1266
          for nname in node_list:
1267
            all_os[os_obj.name][nname] = []
1268
        all_os[os_obj.name][node_name].append(os_obj)
1269
    return all_os
1270

    
1271
  def Exec(self, feedback_fn):
1272
    """Compute the list of OSes.
1273

1274
    """
1275
    node_list = self.acquired_locks[locking.LEVEL_NODE]
1276
    node_data = rpc.call_os_diagnose(node_list)
1277
    if node_data == False:
1278
      raise errors.OpExecError("Can't gather the list of OSes")
1279
    pol = self._DiagnoseByOS(node_list, node_data)
1280
    output = []
1281
    for os_name, os_data in pol.iteritems():
1282
      row = []
1283
      for field in self.op.output_fields:
1284
        if field == "name":
1285
          val = os_name
1286
        elif field == "valid":
1287
          val = utils.all([osl and osl[0] for osl in os_data.values()])
1288
        elif field == "node_status":
1289
          val = {}
1290
          for node_name, nos_list in os_data.iteritems():
1291
            val[node_name] = [(v.status, v.path) for v in nos_list]
1292
        else:
1293
          raise errors.ParameterError(field)
1294
        row.append(val)
1295
      output.append(row)
1296

    
1297
    return output
1298

    
1299

    
1300
class LURemoveNode(LogicalUnit):
1301
  """Logical unit for removing a node.
1302

1303
  """
1304
  HPATH = "node-remove"
1305
  HTYPE = constants.HTYPE_NODE
1306
  _OP_REQP = ["node_name"]
1307

    
1308
  def BuildHooksEnv(self):
1309
    """Build hooks env.
1310

1311
    This doesn't run on the target node in the pre phase as a failed
1312
    node would then be impossible to remove.
1313

1314
    """
1315
    env = {
1316
      "OP_TARGET": self.op.node_name,
1317
      "NODE_NAME": self.op.node_name,
1318
      }
1319
    all_nodes = self.cfg.GetNodeList()
1320
    all_nodes.remove(self.op.node_name)
1321
    return env, all_nodes, all_nodes
1322

    
1323
  def CheckPrereq(self):
1324
    """Check prerequisites.
1325

1326
    This checks:
1327
     - the node exists in the configuration
1328
     - it does not have primary or secondary instances
1329
     - it's not the master
1330

1331
    Any errors are signalled by raising errors.OpPrereqError.
1332

1333
    """
1334
    node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1335
    if node is None:
1336
      raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1337

    
1338
    instance_list = self.cfg.GetInstanceList()
1339

    
1340
    masternode = self.sstore.GetMasterNode()
1341
    if node.name == masternode:
1342
      raise errors.OpPrereqError("Node is the master node,"
1343
                                 " you need to failover first.")
1344

    
1345
    for instance_name in instance_list:
1346
      instance = self.cfg.GetInstanceInfo(instance_name)
1347
      if node.name == instance.primary_node:
1348
        raise errors.OpPrereqError("Instance %s still running on the node,"
1349
                                   " please remove first." % instance_name)
1350
      if node.name in instance.secondary_nodes:
1351
        raise errors.OpPrereqError("Instance %s has node as a secondary,"
1352
                                   " please remove first." % instance_name)
1353
    self.op.node_name = node.name
1354
    self.node = node
1355

    
1356
  def Exec(self, feedback_fn):
1357
    """Removes the node from the cluster.
1358

1359
    """
1360
    node = self.node
1361
    logger.Info("stopping the node daemon and removing configs from node %s" %
1362
                node.name)
1363

    
1364
    self.context.RemoveNode(node.name)
1365

    
1366
    rpc.call_node_leave_cluster(node.name)
1367

    
1368

    
1369
class LUQueryNodes(NoHooksLU):
1370
  """Logical unit for querying nodes.
1371

1372
  """
1373
  _OP_REQP = ["output_fields", "names"]
1374
  REQ_BGL = False
1375

    
1376
  def ExpandNames(self):
1377
    self.dynamic_fields = frozenset([
1378
      "dtotal", "dfree",
1379
      "mtotal", "mnode", "mfree",
1380
      "bootid",
1381
      "ctotal",
1382
      ])
1383

    
1384
    _CheckOutputFields(static=["name", "pinst_cnt", "sinst_cnt",
1385
                               "pinst_list", "sinst_list",
1386
                               "pip", "sip", "tags"],
1387
                       dynamic=self.dynamic_fields,
1388
                       selected=self.op.output_fields)
1389

    
1390
    self.needed_locks = {}
1391
    self.share_locks[locking.LEVEL_NODE] = 1
1392
    # TODO: we could lock nodes only if the user asked for dynamic fields. For
1393
    # that we need atomic ways to get info for a group of nodes from the
1394
    # config, though.
1395
    if not self.op.names:
1396
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1397
    else:
1398
      self.needed_locks[locking.LEVEL_NODE] = \
1399
        _GetWantedNodes(self, self.op.names)
1400

    
1401
  def CheckPrereq(self):
1402
    """Check prerequisites.
1403

1404
    """
1405
    # This of course is valid only if we locked the nodes
1406
    self.wanted = self.acquired_locks[locking.LEVEL_NODE]
1407

    
1408
  def Exec(self, feedback_fn):
1409
    """Computes the list of nodes and their attributes.
1410

1411
    """
1412
    nodenames = self.wanted
1413
    nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1414

    
1415
    # begin data gathering
1416

    
1417
    if self.dynamic_fields.intersection(self.op.output_fields):
1418
      live_data = {}
1419
      node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1420
      for name in nodenames:
1421
        nodeinfo = node_data.get(name, None)
1422
        if nodeinfo:
1423
          live_data[name] = {
1424
            "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1425
            "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1426
            "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1427
            "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1428
            "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1429
            "ctotal": utils.TryConvert(int, nodeinfo['cpu_total']),
1430
            "bootid": nodeinfo['bootid'],
1431
            }
1432
        else:
1433
          live_data[name] = {}
1434
    else:
1435
      live_data = dict.fromkeys(nodenames, {})
1436

    
1437
    node_to_primary = dict([(name, set()) for name in nodenames])
1438
    node_to_secondary = dict([(name, set()) for name in nodenames])
1439

    
1440
    inst_fields = frozenset(("pinst_cnt", "pinst_list",
1441
                             "sinst_cnt", "sinst_list"))
1442
    if inst_fields & frozenset(self.op.output_fields):
1443
      instancelist = self.cfg.GetInstanceList()
1444

    
1445
      for instance_name in instancelist:
1446
        inst = self.cfg.GetInstanceInfo(instance_name)
1447
        if inst.primary_node in node_to_primary:
1448
          node_to_primary[inst.primary_node].add(inst.name)
1449
        for secnode in inst.secondary_nodes:
1450
          if secnode in node_to_secondary:
1451
            node_to_secondary[secnode].add(inst.name)
1452

    
1453
    # end data gathering
1454

    
1455
    output = []
1456
    for node in nodelist:
1457
      node_output = []
1458
      for field in self.op.output_fields:
1459
        if field == "name":
1460
          val = node.name
1461
        elif field == "pinst_list":
1462
          val = list(node_to_primary[node.name])
1463
        elif field == "sinst_list":
1464
          val = list(node_to_secondary[node.name])
1465
        elif field == "pinst_cnt":
1466
          val = len(node_to_primary[node.name])
1467
        elif field == "sinst_cnt":
1468
          val = len(node_to_secondary[node.name])
1469
        elif field == "pip":
1470
          val = node.primary_ip
1471
        elif field == "sip":
1472
          val = node.secondary_ip
1473
        elif field == "tags":
1474
          val = list(node.GetTags())
1475
        elif field in self.dynamic_fields:
1476
          val = live_data[node.name].get(field, None)
1477
        else:
1478
          raise errors.ParameterError(field)
1479
        node_output.append(val)
1480
      output.append(node_output)
1481

    
1482
    return output
1483

    
1484

    
1485
class LUQueryNodeVolumes(NoHooksLU):
1486
  """Logical unit for getting volumes on node(s).
1487

1488
  """
1489
  _OP_REQP = ["nodes", "output_fields"]
1490
  REQ_BGL = False
1491

    
1492
  def ExpandNames(self):
1493
    _CheckOutputFields(static=["node"],
1494
                       dynamic=["phys", "vg", "name", "size", "instance"],
1495
                       selected=self.op.output_fields)
1496

    
1497
    self.needed_locks = {}
1498
    self.share_locks[locking.LEVEL_NODE] = 1
1499
    if not self.op.nodes:
1500
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1501
    else:
1502
      self.needed_locks[locking.LEVEL_NODE] = \
1503
        _GetWantedNodes(self, self.op.nodes)
1504

    
1505
  def CheckPrereq(self):
1506
    """Check prerequisites.
1507

1508
    This checks that the fields required are valid output fields.
1509

1510
    """
1511
    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
1512

    
1513
  def Exec(self, feedback_fn):
1514
    """Computes the list of nodes and their attributes.
1515

1516
    """
1517
    nodenames = self.nodes
1518
    volumes = rpc.call_node_volumes(nodenames)
1519

    
1520
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
1521
             in self.cfg.GetInstanceList()]
1522

    
1523
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1524

    
1525
    output = []
1526
    for node in nodenames:
1527
      if node not in volumes or not volumes[node]:
1528
        continue
1529

    
1530
      node_vols = volumes[node][:]
1531
      node_vols.sort(key=lambda vol: vol['dev'])
1532

    
1533
      for vol in node_vols:
1534
        node_output = []
1535
        for field in self.op.output_fields:
1536
          if field == "node":
1537
            val = node
1538
          elif field == "phys":
1539
            val = vol['dev']
1540
          elif field == "vg":
1541
            val = vol['vg']
1542
          elif field == "name":
1543
            val = vol['name']
1544
          elif field == "size":
1545
            val = int(float(vol['size']))
1546
          elif field == "instance":
1547
            for inst in ilist:
1548
              if node not in lv_by_node[inst]:
1549
                continue
1550
              if vol['name'] in lv_by_node[inst][node]:
1551
                val = inst.name
1552
                break
1553
            else:
1554
              val = '-'
1555
          else:
1556
            raise errors.ParameterError(field)
1557
          node_output.append(str(val))
1558

    
1559
        output.append(node_output)
1560

    
1561
    return output
1562

    
1563

    
1564
class LUAddNode(LogicalUnit):
1565
  """Logical unit for adding node to the cluster.
1566

1567
  """
1568
  HPATH = "node-add"
1569
  HTYPE = constants.HTYPE_NODE
1570
  _OP_REQP = ["node_name"]
1571

    
1572
  def BuildHooksEnv(self):
1573
    """Build hooks env.
1574

1575
    This will run on all nodes before, and on all nodes + the new node after.
1576

1577
    """
1578
    env = {
1579
      "OP_TARGET": self.op.node_name,
1580
      "NODE_NAME": self.op.node_name,
1581
      "NODE_PIP": self.op.primary_ip,
1582
      "NODE_SIP": self.op.secondary_ip,
1583
      }
1584
    nodes_0 = self.cfg.GetNodeList()
1585
    nodes_1 = nodes_0 + [self.op.node_name, ]
1586
    return env, nodes_0, nodes_1
1587

    
1588
  def CheckPrereq(self):
1589
    """Check prerequisites.
1590

1591
    This checks:
1592
     - the new node is not already in the config
1593
     - it is resolvable
1594
     - its parameters (single/dual homed) matches the cluster
1595

1596
    Any errors are signalled by raising errors.OpPrereqError.
1597

1598
    """
1599
    node_name = self.op.node_name
1600
    cfg = self.cfg
1601

    
1602
    dns_data = utils.HostInfo(node_name)
1603

    
1604
    node = dns_data.name
1605
    primary_ip = self.op.primary_ip = dns_data.ip
1606
    secondary_ip = getattr(self.op, "secondary_ip", None)
1607
    if secondary_ip is None:
1608
      secondary_ip = primary_ip
1609
    if not utils.IsValidIP(secondary_ip):
1610
      raise errors.OpPrereqError("Invalid secondary IP given")
1611
    self.op.secondary_ip = secondary_ip
1612

    
1613
    node_list = cfg.GetNodeList()
1614
    if not self.op.readd and node in node_list:
1615
      raise errors.OpPrereqError("Node %s is already in the configuration" %
1616
                                 node)
1617
    elif self.op.readd and node not in node_list:
1618
      raise errors.OpPrereqError("Node %s is not in the configuration" % node)
1619

    
1620
    for existing_node_name in node_list:
1621
      existing_node = cfg.GetNodeInfo(existing_node_name)
1622

    
1623
      if self.op.readd and node == existing_node_name:
1624
        if (existing_node.primary_ip != primary_ip or
1625
            existing_node.secondary_ip != secondary_ip):
1626
          raise errors.OpPrereqError("Readded node doesn't have the same IP"
1627
                                     " address configuration as before")
1628
        continue
1629

    
1630
      if (existing_node.primary_ip == primary_ip or
1631
          existing_node.secondary_ip == primary_ip or
1632
          existing_node.primary_ip == secondary_ip or
1633
          existing_node.secondary_ip == secondary_ip):
1634
        raise errors.OpPrereqError("New node ip address(es) conflict with"
1635
                                   " existing node %s" % existing_node.name)
1636

    
1637
    # check that the type of the node (single versus dual homed) is the
1638
    # same as for the master
1639
    myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1640
    master_singlehomed = myself.secondary_ip == myself.primary_ip
1641
    newbie_singlehomed = secondary_ip == primary_ip
1642
    if master_singlehomed != newbie_singlehomed:
1643
      if master_singlehomed:
1644
        raise errors.OpPrereqError("The master has no private ip but the"
1645
                                   " new node has one")
1646
      else:
1647
        raise errors.OpPrereqError("The master has a private ip but the"
1648
                                   " new node doesn't have one")
1649

    
1650
    # checks reachablity
1651
    if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1652
      raise errors.OpPrereqError("Node not reachable by ping")
1653

    
1654
    if not newbie_singlehomed:
1655
      # check reachability from my secondary ip to newbie's secondary ip
1656
      if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1657
                           source=myself.secondary_ip):
1658
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1659
                                   " based ping to noded port")
1660

    
1661
    self.new_node = objects.Node(name=node,
1662
                                 primary_ip=primary_ip,
1663
                                 secondary_ip=secondary_ip)
1664

    
1665
  def Exec(self, feedback_fn):
1666
    """Adds the new node to the cluster.
1667

1668
    """
1669
    new_node = self.new_node
1670
    node = new_node.name
1671

    
1672
    # check connectivity
1673
    result = rpc.call_version([node])[node]
1674
    if result:
1675
      if constants.PROTOCOL_VERSION == result:
1676
        logger.Info("communication to node %s fine, sw version %s match" %
1677
                    (node, result))
1678
      else:
1679
        raise errors.OpExecError("Version mismatch master version %s,"
1680
                                 " node version %s" %
1681
                                 (constants.PROTOCOL_VERSION, result))
1682
    else:
1683
      raise errors.OpExecError("Cannot get version from the new node")
1684

    
1685
    # setup ssh on node
1686
    logger.Info("copy ssh key to node %s" % node)
1687
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1688
    keyarray = []
1689
    keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1690
                constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1691
                priv_key, pub_key]
1692

    
1693
    for i in keyfiles:
1694
      f = open(i, 'r')
1695
      try:
1696
        keyarray.append(f.read())
1697
      finally:
1698
        f.close()
1699

    
1700
    result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1701
                               keyarray[3], keyarray[4], keyarray[5])
1702

    
1703
    if not result:
1704
      raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1705

    
1706
    # Add node to our /etc/hosts, and add key to known_hosts
1707
    utils.AddHostToEtcHosts(new_node.name)
1708

    
1709
    if new_node.secondary_ip != new_node.primary_ip:
1710
      if not rpc.call_node_tcp_ping(new_node.name,
1711
                                    constants.LOCALHOST_IP_ADDRESS,
1712
                                    new_node.secondary_ip,
1713
                                    constants.DEFAULT_NODED_PORT,
1714
                                    10, False):
1715
        raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1716
                                 " you gave (%s). Please fix and re-run this"
1717
                                 " command." % new_node.secondary_ip)
1718

    
1719
    node_verify_list = [self.sstore.GetMasterNode()]
1720
    node_verify_param = {
1721
      'nodelist': [node],
1722
      # TODO: do a node-net-test as well?
1723
    }
1724

    
1725
    result = rpc.call_node_verify(node_verify_list, node_verify_param)
1726
    for verifier in node_verify_list:
1727
      if not result[verifier]:
1728
        raise errors.OpExecError("Cannot communicate with %s's node daemon"
1729
                                 " for remote verification" % verifier)
1730
      if result[verifier]['nodelist']:
1731
        for failed in result[verifier]['nodelist']:
1732
          feedback_fn("ssh/hostname verification failed %s -> %s" %
1733
                      (verifier, result[verifier]['nodelist'][failed]))
1734
        raise errors.OpExecError("ssh/hostname verification failed.")
1735

    
1736
    # Distribute updated /etc/hosts and known_hosts to all nodes,
1737
    # including the node just added
1738
    myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1739
    dist_nodes = self.cfg.GetNodeList()
1740
    if not self.op.readd:
1741
      dist_nodes.append(node)
1742
    if myself.name in dist_nodes:
1743
      dist_nodes.remove(myself.name)
1744

    
1745
    logger.Debug("Copying hosts and known_hosts to all nodes")
1746
    for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1747
      result = rpc.call_upload_file(dist_nodes, fname)
1748
      for to_node in dist_nodes:
1749
        if not result[to_node]:
1750
          logger.Error("copy of file %s to node %s failed" %
1751
                       (fname, to_node))
1752

    
1753
    to_copy = self.sstore.GetFileList()
1754
    if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1755
      to_copy.append(constants.VNC_PASSWORD_FILE)
1756
    for fname in to_copy:
1757
      result = rpc.call_upload_file([node], fname)
1758
      if not result[node]:
1759
        logger.Error("could not copy file %s to node %s" % (fname, node))
1760

    
1761
    if self.op.readd:
1762
      self.context.ReaddNode(new_node)
1763
    else:
1764
      self.context.AddNode(new_node)
1765

    
1766

    
1767
class LUQueryClusterInfo(NoHooksLU):
1768
  """Query cluster configuration.
1769

1770
  """
1771
  _OP_REQP = []
1772
  REQ_MASTER = False
1773
  REQ_BGL = False
1774

    
1775
  def ExpandNames(self):
1776
    self.needed_locks = {}
1777

    
1778
  def CheckPrereq(self):
1779
    """No prerequsites needed for this LU.
1780

1781
    """
1782
    pass
1783

    
1784
  def Exec(self, feedback_fn):
1785
    """Return cluster config.
1786

1787
    """
1788
    result = {
1789
      "name": self.sstore.GetClusterName(),
1790
      "software_version": constants.RELEASE_VERSION,
1791
      "protocol_version": constants.PROTOCOL_VERSION,
1792
      "config_version": constants.CONFIG_VERSION,
1793
      "os_api_version": constants.OS_API_VERSION,
1794
      "export_version": constants.EXPORT_VERSION,
1795
      "master": self.sstore.GetMasterNode(),
1796
      "architecture": (platform.architecture()[0], platform.machine()),
1797
      "hypervisor_type": self.sstore.GetHypervisorType(),
1798
      }
1799

    
1800
    return result
1801

    
1802

    
1803
class LUDumpClusterConfig(NoHooksLU):
1804
  """Return a text-representation of the cluster-config.
1805

1806
  """
1807
  _OP_REQP = []
1808
  REQ_BGL = False
1809

    
1810
  def ExpandNames(self):
1811
    self.needed_locks = {}
1812

    
1813
  def CheckPrereq(self):
1814
    """No prerequisites.
1815

1816
    """
1817
    pass
1818

    
1819
  def Exec(self, feedback_fn):
1820
    """Dump a representation of the cluster config to the standard output.
1821

1822
    """
1823
    return self.cfg.DumpConfig()
1824

    
1825

    
1826
class LUActivateInstanceDisks(NoHooksLU):
1827
  """Bring up an instance's disks.
1828

1829
  """
1830
  _OP_REQP = ["instance_name"]
1831

    
1832
  def CheckPrereq(self):
1833
    """Check prerequisites.
1834

1835
    This checks that the instance is in the cluster.
1836

1837
    """
1838
    instance = self.cfg.GetInstanceInfo(
1839
      self.cfg.ExpandInstanceName(self.op.instance_name))
1840
    if instance is None:
1841
      raise errors.OpPrereqError("Instance '%s' not known" %
1842
                                 self.op.instance_name)
1843
    self.instance = instance
1844

    
1845

    
1846
  def Exec(self, feedback_fn):
1847
    """Activate the disks.
1848

1849
    """
1850
    disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
1851
    if not disks_ok:
1852
      raise errors.OpExecError("Cannot activate block devices")
1853

    
1854
    return disks_info
1855

    
1856

    
1857
def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
1858
  """Prepare the block devices for an instance.
1859

1860
  This sets up the block devices on all nodes.
1861

1862
  Args:
1863
    instance: a ganeti.objects.Instance object
1864
    ignore_secondaries: if true, errors on secondary nodes won't result
1865
                        in an error return from the function
1866

1867
  Returns:
1868
    false if the operation failed
1869
    list of (host, instance_visible_name, node_visible_name) if the operation
1870
         suceeded with the mapping from node devices to instance devices
1871
  """
1872
  device_info = []
1873
  disks_ok = True
1874
  iname = instance.name
1875
  # With the two passes mechanism we try to reduce the window of
1876
  # opportunity for the race condition of switching DRBD to primary
1877
  # before handshaking occured, but we do not eliminate it
1878

    
1879
  # The proper fix would be to wait (with some limits) until the
1880
  # connection has been made and drbd transitions from WFConnection
1881
  # into any other network-connected state (Connected, SyncTarget,
1882
  # SyncSource, etc.)
1883

    
1884
  # 1st pass, assemble on all nodes in secondary mode
1885
  for inst_disk in instance.disks:
1886
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1887
      cfg.SetDiskID(node_disk, node)
1888
      result = rpc.call_blockdev_assemble(node, node_disk, iname, False)
1889
      if not result:
1890
        logger.Error("could not prepare block device %s on node %s"
1891
                     " (is_primary=False, pass=1)" % (inst_disk.iv_name, node))
1892
        if not ignore_secondaries:
1893
          disks_ok = False
1894

    
1895
  # FIXME: race condition on drbd migration to primary
1896

    
1897
  # 2nd pass, do only the primary node
1898
  for inst_disk in instance.disks:
1899
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1900
      if node != instance.primary_node:
1901
        continue
1902
      cfg.SetDiskID(node_disk, node)
1903
      result = rpc.call_blockdev_assemble(node, node_disk, iname, True)
1904
      if not result:
1905
        logger.Error("could not prepare block device %s on node %s"
1906
                     " (is_primary=True, pass=2)" % (inst_disk.iv_name, node))
1907
        disks_ok = False
1908
    device_info.append((instance.primary_node, inst_disk.iv_name, result))
1909

    
1910
  # leave the disks configured for the primary node
1911
  # this is a workaround that would be fixed better by
1912
  # improving the logical/physical id handling
1913
  for disk in instance.disks:
1914
    cfg.SetDiskID(disk, instance.primary_node)
1915

    
1916
  return disks_ok, device_info
1917

    
1918

    
1919
def _StartInstanceDisks(cfg, instance, force):
1920
  """Start the disks of an instance.
1921

1922
  """
1923
  disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
1924
                                           ignore_secondaries=force)
1925
  if not disks_ok:
1926
    _ShutdownInstanceDisks(instance, cfg)
1927
    if force is not None and not force:
1928
      logger.Error("If the message above refers to a secondary node,"
1929
                   " you can retry the operation using '--force'.")
1930
    raise errors.OpExecError("Disk consistency error")
1931

    
1932

    
1933
class LUDeactivateInstanceDisks(NoHooksLU):
1934
  """Shutdown an instance's disks.
1935

1936
  """
1937
  _OP_REQP = ["instance_name"]
1938

    
1939
  def CheckPrereq(self):
1940
    """Check prerequisites.
1941

1942
    This checks that the instance is in the cluster.
1943

1944
    """
1945
    instance = self.cfg.GetInstanceInfo(
1946
      self.cfg.ExpandInstanceName(self.op.instance_name))
1947
    if instance is None:
1948
      raise errors.OpPrereqError("Instance '%s' not known" %
1949
                                 self.op.instance_name)
1950
    self.instance = instance
1951

    
1952
  def Exec(self, feedback_fn):
1953
    """Deactivate the disks
1954

1955
    """
1956
    instance = self.instance
1957
    ins_l = rpc.call_instance_list([instance.primary_node])
1958
    ins_l = ins_l[instance.primary_node]
1959
    if not type(ins_l) is list:
1960
      raise errors.OpExecError("Can't contact node '%s'" %
1961
                               instance.primary_node)
1962

    
1963
    if self.instance.name in ins_l:
1964
      raise errors.OpExecError("Instance is running, can't shutdown"
1965
                               " block devices.")
1966

    
1967
    _ShutdownInstanceDisks(instance, self.cfg)
1968

    
1969

    
1970
def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
1971
  """Shutdown block devices of an instance.
1972

1973
  This does the shutdown on all nodes of the instance.
1974

1975
  If the ignore_primary is false, errors on the primary node are
1976
  ignored.
1977

1978
  """
1979
  result = True
1980
  for disk in instance.disks:
1981
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
1982
      cfg.SetDiskID(top_disk, node)
1983
      if not rpc.call_blockdev_shutdown(node, top_disk):
1984
        logger.Error("could not shutdown block device %s on node %s" %
1985
                     (disk.iv_name, node))
1986
        if not ignore_primary or node != instance.primary_node:
1987
          result = False
1988
  return result
1989

    
1990

    
1991
def _CheckNodeFreeMemory(cfg, node, reason, requested):
1992
  """Checks if a node has enough free memory.
1993

1994
  This function check if a given node has the needed amount of free
1995
  memory. In case the node has less memory or we cannot get the
1996
  information from the node, this function raise an OpPrereqError
1997
  exception.
1998

1999
  Args:
2000
    - cfg: a ConfigWriter instance
2001
    - node: the node name
2002
    - reason: string to use in the error message
2003
    - requested: the amount of memory in MiB
2004

2005
  """
2006
  nodeinfo = rpc.call_node_info([node], cfg.GetVGName())
2007
  if not nodeinfo or not isinstance(nodeinfo, dict):
2008
    raise errors.OpPrereqError("Could not contact node %s for resource"
2009
                             " information" % (node,))
2010

    
2011
  free_mem = nodeinfo[node].get('memory_free')
2012
  if not isinstance(free_mem, int):
2013
    raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2014
                             " was '%s'" % (node, free_mem))
2015
  if requested > free_mem:
2016
    raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2017
                             " needed %s MiB, available %s MiB" %
2018
                             (node, reason, requested, free_mem))
2019

    
2020

    
2021
class LUStartupInstance(LogicalUnit):
2022
  """Starts an instance.
2023

2024
  """
2025
  HPATH = "instance-start"
2026
  HTYPE = constants.HTYPE_INSTANCE
2027
  _OP_REQP = ["instance_name", "force"]
2028
  REQ_BGL = False
2029

    
2030
  def ExpandNames(self):
2031
    self._ExpandAndLockInstance()
2032
    self.needed_locks[locking.LEVEL_NODE] = []
2033
    self.recalculate_locks[locking.LEVEL_NODE] = 'replace'
2034

    
2035
  def DeclareLocks(self, level):
2036
    if level == locking.LEVEL_NODE:
2037
      self._LockInstancesNodes()
2038

    
2039
  def BuildHooksEnv(self):
2040
    """Build hooks env.
2041

2042
    This runs on master, primary and secondary nodes of the instance.
2043

2044
    """
2045
    env = {
2046
      "FORCE": self.op.force,
2047
      }
2048
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2049
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2050
          list(self.instance.secondary_nodes))
2051
    return env, nl, nl
2052

    
2053
  def CheckPrereq(self):
2054
    """Check prerequisites.
2055

2056
    This checks that the instance is in the cluster.
2057

2058
    """
2059
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2060
    assert self.instance is not None, \
2061
      "Cannot retrieve locked instance %s" % self.op.instance_name
2062

    
2063
    # check bridges existance
2064
    _CheckInstanceBridgesExist(instance)
2065

    
2066
    _CheckNodeFreeMemory(self.cfg, instance.primary_node,
2067
                         "starting instance %s" % instance.name,
2068
                         instance.memory)
2069

    
2070
  def Exec(self, feedback_fn):
2071
    """Start the instance.
2072

2073
    """
2074
    instance = self.instance
2075
    force = self.op.force
2076
    extra_args = getattr(self.op, "extra_args", "")
2077

    
2078
    self.cfg.MarkInstanceUp(instance.name)
2079

    
2080
    node_current = instance.primary_node
2081

    
2082
    _StartInstanceDisks(self.cfg, instance, force)
2083

    
2084
    if not rpc.call_instance_start(node_current, instance, extra_args):
2085
      _ShutdownInstanceDisks(instance, self.cfg)
2086
      raise errors.OpExecError("Could not start instance")
2087

    
2088

    
2089
class LURebootInstance(LogicalUnit):
2090
  """Reboot an instance.
2091

2092
  """
2093
  HPATH = "instance-reboot"
2094
  HTYPE = constants.HTYPE_INSTANCE
2095
  _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2096
  REQ_BGL = False
2097

    
2098
  def ExpandNames(self):
2099
    if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2100
                                   constants.INSTANCE_REBOOT_HARD,
2101
                                   constants.INSTANCE_REBOOT_FULL]:
2102
      raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2103
                                  (constants.INSTANCE_REBOOT_SOFT,
2104
                                   constants.INSTANCE_REBOOT_HARD,
2105
                                   constants.INSTANCE_REBOOT_FULL))
2106
    self._ExpandAndLockInstance()
2107
    self.needed_locks[locking.LEVEL_NODE] = []
2108
    self.recalculate_locks[locking.LEVEL_NODE] = 'replace'
2109

    
2110
  def DeclareLocks(self, level):
2111
    if level == locking.LEVEL_NODE:
2112
      # FIXME: lock only primary on (not constants.INSTANCE_REBOOT_FULL)
2113
      self._LockInstancesNodes()
2114

    
2115
  def BuildHooksEnv(self):
2116
    """Build hooks env.
2117

2118
    This runs on master, primary and secondary nodes of the instance.
2119

2120
    """
2121
    env = {
2122
      "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2123
      }
2124
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2125
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2126
          list(self.instance.secondary_nodes))
2127
    return env, nl, nl
2128

    
2129
  def CheckPrereq(self):
2130
    """Check prerequisites.
2131

2132
    This checks that the instance is in the cluster.
2133

2134
    """
2135
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2136
    assert self.instance is not None, \
2137
      "Cannot retrieve locked instance %s" % self.op.instance_name
2138

    
2139
    # check bridges existance
2140
    _CheckInstanceBridgesExist(instance)
2141

    
2142
  def Exec(self, feedback_fn):
2143
    """Reboot the instance.
2144

2145
    """
2146
    instance = self.instance
2147
    ignore_secondaries = self.op.ignore_secondaries
2148
    reboot_type = self.op.reboot_type
2149
    extra_args = getattr(self.op, "extra_args", "")
2150

    
2151
    node_current = instance.primary_node
2152

    
2153
    if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2154
                       constants.INSTANCE_REBOOT_HARD]:
2155
      if not rpc.call_instance_reboot(node_current, instance,
2156
                                      reboot_type, extra_args):
2157
        raise errors.OpExecError("Could not reboot instance")
2158
    else:
2159
      if not rpc.call_instance_shutdown(node_current, instance):
2160
        raise errors.OpExecError("could not shutdown instance for full reboot")
2161
      _ShutdownInstanceDisks(instance, self.cfg)
2162
      _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
2163
      if not rpc.call_instance_start(node_current, instance, extra_args):
2164
        _ShutdownInstanceDisks(instance, self.cfg)
2165
        raise errors.OpExecError("Could not start instance for full reboot")
2166

    
2167
    self.cfg.MarkInstanceUp(instance.name)
2168

    
2169

    
2170
class LUShutdownInstance(LogicalUnit):
2171
  """Shutdown an instance.
2172

2173
  """
2174
  HPATH = "instance-stop"
2175
  HTYPE = constants.HTYPE_INSTANCE
2176
  _OP_REQP = ["instance_name"]
2177
  REQ_BGL = False
2178

    
2179
  def ExpandNames(self):
2180
    self._ExpandAndLockInstance()
2181
    self.needed_locks[locking.LEVEL_NODE] = []
2182
    self.recalculate_locks[locking.LEVEL_NODE] = 'replace'
2183

    
2184
  def DeclareLocks(self, level):
2185
    if level == locking.LEVEL_NODE:
2186
      self._LockInstancesNodes()
2187

    
2188
  def BuildHooksEnv(self):
2189
    """Build hooks env.
2190

2191
    This runs on master, primary and secondary nodes of the instance.
2192

2193
    """
2194
    env = _BuildInstanceHookEnvByObject(self.instance)
2195
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2196
          list(self.instance.secondary_nodes))
2197
    return env, nl, nl
2198

    
2199
  def CheckPrereq(self):
2200
    """Check prerequisites.
2201

2202
    This checks that the instance is in the cluster.
2203

2204
    """
2205
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2206
    assert self.instance is not None, \
2207
      "Cannot retrieve locked instance %s" % self.op.instance_name
2208

    
2209
  def Exec(self, feedback_fn):
2210
    """Shutdown the instance.
2211

2212
    """
2213
    instance = self.instance
2214
    node_current = instance.primary_node
2215
    self.cfg.MarkInstanceDown(instance.name)
2216
    if not rpc.call_instance_shutdown(node_current, instance):
2217
      logger.Error("could not shutdown instance")
2218

    
2219
    _ShutdownInstanceDisks(instance, self.cfg)
2220

    
2221

    
2222
class LUReinstallInstance(LogicalUnit):
2223
  """Reinstall an instance.
2224

2225
  """
2226
  HPATH = "instance-reinstall"
2227
  HTYPE = constants.HTYPE_INSTANCE
2228
  _OP_REQP = ["instance_name"]
2229
  REQ_BGL = False
2230

    
2231
  def ExpandNames(self):
2232
    self._ExpandAndLockInstance()
2233
    self.needed_locks[locking.LEVEL_NODE] = []
2234
    self.recalculate_locks[locking.LEVEL_NODE] = 'replace'
2235

    
2236
  def DeclareLocks(self, level):
2237
    if level == locking.LEVEL_NODE:
2238
      self._LockInstancesNodes()
2239

    
2240
  def BuildHooksEnv(self):
2241
    """Build hooks env.
2242

2243
    This runs on master, primary and secondary nodes of the instance.
2244

2245
    """
2246
    env = _BuildInstanceHookEnvByObject(self.instance)
2247
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2248
          list(self.instance.secondary_nodes))
2249
    return env, nl, nl
2250

    
2251
  def CheckPrereq(self):
2252
    """Check prerequisites.
2253

2254
    This checks that the instance is in the cluster and is not running.
2255

2256
    """
2257
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2258
    assert instance is not None, \
2259
      "Cannot retrieve locked instance %s" % self.op.instance_name
2260

    
2261
    if instance.disk_template == constants.DT_DISKLESS:
2262
      raise errors.OpPrereqError("Instance '%s' has no disks" %
2263
                                 self.op.instance_name)
2264
    if instance.status != "down":
2265
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2266
                                 self.op.instance_name)
2267
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2268
    if remote_info:
2269
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2270
                                 (self.op.instance_name,
2271
                                  instance.primary_node))
2272

    
2273
    self.op.os_type = getattr(self.op, "os_type", None)
2274
    if self.op.os_type is not None:
2275
      # OS verification
2276
      pnode = self.cfg.GetNodeInfo(
2277
        self.cfg.ExpandNodeName(instance.primary_node))
2278
      if pnode is None:
2279
        raise errors.OpPrereqError("Primary node '%s' is unknown" %
2280
                                   self.op.pnode)
2281
      os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2282
      if not os_obj:
2283
        raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2284
                                   " primary node"  % self.op.os_type)
2285

    
2286
    self.instance = instance
2287

    
2288
  def Exec(self, feedback_fn):
2289
    """Reinstall the instance.
2290

2291
    """
2292
    inst = self.instance
2293

    
2294
    if self.op.os_type is not None:
2295
      feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2296
      inst.os = self.op.os_type
2297
      self.cfg.AddInstance(inst)
2298

    
2299
    _StartInstanceDisks(self.cfg, inst, None)
2300
    try:
2301
      feedback_fn("Running the instance OS create scripts...")
2302
      if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2303
        raise errors.OpExecError("Could not install OS for instance %s"
2304
                                 " on node %s" %
2305
                                 (inst.name, inst.primary_node))
2306
    finally:
2307
      _ShutdownInstanceDisks(inst, self.cfg)
2308

    
2309

    
2310
class LURenameInstance(LogicalUnit):
2311
  """Rename an instance.
2312

2313
  """
2314
  HPATH = "instance-rename"
2315
  HTYPE = constants.HTYPE_INSTANCE
2316
  _OP_REQP = ["instance_name", "new_name"]
2317

    
2318
  def BuildHooksEnv(self):
2319
    """Build hooks env.
2320

2321
    This runs on master, primary and secondary nodes of the instance.
2322

2323
    """
2324
    env = _BuildInstanceHookEnvByObject(self.instance)
2325
    env["INSTANCE_NEW_NAME"] = self.op.new_name
2326
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2327
          list(self.instance.secondary_nodes))
2328
    return env, nl, nl
2329

    
2330
  def CheckPrereq(self):
2331
    """Check prerequisites.
2332

2333
    This checks that the instance is in the cluster and is not running.
2334

2335
    """
2336
    instance = self.cfg.GetInstanceInfo(
2337
      self.cfg.ExpandInstanceName(self.op.instance_name))
2338
    if instance is None:
2339
      raise errors.OpPrereqError("Instance '%s' not known" %
2340
                                 self.op.instance_name)
2341
    if instance.status != "down":
2342
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2343
                                 self.op.instance_name)
2344
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2345
    if remote_info:
2346
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2347
                                 (self.op.instance_name,
2348
                                  instance.primary_node))
2349
    self.instance = instance
2350

    
2351
    # new name verification
2352
    name_info = utils.HostInfo(self.op.new_name)
2353

    
2354
    self.op.new_name = new_name = name_info.name
2355
    instance_list = self.cfg.GetInstanceList()
2356
    if new_name in instance_list:
2357
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2358
                                 new_name)
2359

    
2360
    if not getattr(self.op, "ignore_ip", False):
2361
      if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
2362
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
2363
                                   (name_info.ip, new_name))
2364

    
2365

    
2366
  def Exec(self, feedback_fn):
2367
    """Reinstall the instance.
2368

2369
    """
2370
    inst = self.instance
2371
    old_name = inst.name
2372

    
2373
    if inst.disk_template == constants.DT_FILE:
2374
      old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2375

    
2376
    self.cfg.RenameInstance(inst.name, self.op.new_name)
2377
    # Change the instance lock. This is definitely safe while we hold the BGL
2378
    self.context.glm.remove(locking.LEVEL_INSTANCE, inst.name)
2379
    self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
2380

    
2381
    # re-read the instance from the configuration after rename
2382
    inst = self.cfg.GetInstanceInfo(self.op.new_name)
2383

    
2384
    if inst.disk_template == constants.DT_FILE:
2385
      new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2386
      result = rpc.call_file_storage_dir_rename(inst.primary_node,
2387
                                                old_file_storage_dir,
2388
                                                new_file_storage_dir)
2389

    
2390
      if not result:
2391
        raise errors.OpExecError("Could not connect to node '%s' to rename"
2392
                                 " directory '%s' to '%s' (but the instance"
2393
                                 " has been renamed in Ganeti)" % (
2394
                                 inst.primary_node, old_file_storage_dir,
2395
                                 new_file_storage_dir))
2396

    
2397
      if not result[0]:
2398
        raise errors.OpExecError("Could not rename directory '%s' to '%s'"
2399
                                 " (but the instance has been renamed in"
2400
                                 " Ganeti)" % (old_file_storage_dir,
2401
                                               new_file_storage_dir))
2402

    
2403
    _StartInstanceDisks(self.cfg, inst, None)
2404
    try:
2405
      if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2406
                                          "sda", "sdb"):
2407
        msg = ("Could not run OS rename script for instance %s on node %s"
2408
               " (but the instance has been renamed in Ganeti)" %
2409
               (inst.name, inst.primary_node))
2410
        logger.Error(msg)
2411
    finally:
2412
      _ShutdownInstanceDisks(inst, self.cfg)
2413

    
2414

    
2415
class LURemoveInstance(LogicalUnit):
2416
  """Remove an instance.
2417

2418
  """
2419
  HPATH = "instance-remove"
2420
  HTYPE = constants.HTYPE_INSTANCE
2421
  _OP_REQP = ["instance_name", "ignore_failures"]
2422

    
2423
  def BuildHooksEnv(self):
2424
    """Build hooks env.
2425

2426
    This runs on master, primary and secondary nodes of the instance.
2427

2428
    """
2429
    env = _BuildInstanceHookEnvByObject(self.instance)
2430
    nl = [self.sstore.GetMasterNode()]
2431
    return env, nl, nl
2432

    
2433
  def CheckPrereq(self):
2434
    """Check prerequisites.
2435

2436
    This checks that the instance is in the cluster.
2437

2438
    """
2439
    instance = self.cfg.GetInstanceInfo(
2440
      self.cfg.ExpandInstanceName(self.op.instance_name))
2441
    if instance is None:
2442
      raise errors.OpPrereqError("Instance '%s' not known" %
2443
                                 self.op.instance_name)
2444
    self.instance = instance
2445

    
2446
  def Exec(self, feedback_fn):
2447
    """Remove the instance.
2448

2449
    """
2450
    instance = self.instance
2451
    logger.Info("shutting down instance %s on node %s" %
2452
                (instance.name, instance.primary_node))
2453

    
2454
    if not rpc.call_instance_shutdown(instance.primary_node, instance):
2455
      if self.op.ignore_failures:
2456
        feedback_fn("Warning: can't shutdown instance")
2457
      else:
2458
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2459
                                 (instance.name, instance.primary_node))
2460

    
2461
    logger.Info("removing block devices for instance %s" % instance.name)
2462

    
2463
    if not _RemoveDisks(instance, self.cfg):
2464
      if self.op.ignore_failures:
2465
        feedback_fn("Warning: can't remove instance's disks")
2466
      else:
2467
        raise errors.OpExecError("Can't remove instance's disks")
2468

    
2469
    logger.Info("removing instance %s out of cluster config" % instance.name)
2470

    
2471
    self.cfg.RemoveInstance(instance.name)
2472
    # Remove the new instance from the Ganeti Lock Manager
2473
    self.context.glm.remove(locking.LEVEL_INSTANCE, instance.name)
2474

    
2475

    
2476
class LUQueryInstances(NoHooksLU):
2477
  """Logical unit for querying instances.
2478

2479
  """
2480
  _OP_REQP = ["output_fields", "names"]
2481
  REQ_BGL = False
2482

    
2483
  def ExpandNames(self):
2484
    self.dynamic_fields = frozenset(["oper_state", "oper_ram", "status"])
2485
    _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2486
                               "admin_state", "admin_ram",
2487
                               "disk_template", "ip", "mac", "bridge",
2488
                               "sda_size", "sdb_size", "vcpus", "tags",
2489
                               "auto_balance",
2490
                               "network_port", "kernel_path", "initrd_path",
2491
                               "hvm_boot_order", "hvm_acpi", "hvm_pae",
2492
                               "hvm_cdrom_image_path", "hvm_nic_type",
2493
                               "hvm_disk_type", "vnc_bind_address"],
2494
                       dynamic=self.dynamic_fields,
2495
                       selected=self.op.output_fields)
2496

    
2497
    self.needed_locks = {}
2498
    self.share_locks[locking.LEVEL_INSTANCE] = 1
2499
    self.share_locks[locking.LEVEL_NODE] = 1
2500

    
2501
    # TODO: we could lock instances (and nodes) only if the user asked for
2502
    # dynamic fields. For that we need atomic ways to get info for a group of
2503
    # instances from the config, though.
2504
    if not self.op.names:
2505
      self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
2506
    else:
2507
      self.needed_locks[locking.LEVEL_INSTANCE] = \
2508
        _GetWantedInstances(self, self.op.names)
2509

    
2510
    self.needed_locks[locking.LEVEL_NODE] = []
2511
    self.recalculate_locks[locking.LEVEL_NODE] = 'replace'
2512

    
2513
  def DeclareLocks(self, level):
2514
    # TODO: locking of nodes could be avoided when not querying them
2515
    if level == locking.LEVEL_NODE:
2516
      self._LockInstancesNodes()
2517

    
2518
  def CheckPrereq(self):
2519
    """Check prerequisites.
2520

2521
    """
2522
    # This of course is valid only if we locked the instances
2523
    self.wanted = self.acquired_locks[locking.LEVEL_INSTANCE]
2524

    
2525
  def Exec(self, feedback_fn):
2526
    """Computes the list of nodes and their attributes.
2527

2528
    """
2529
    instance_names = self.wanted
2530
    instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2531
                     in instance_names]
2532

    
2533
    # begin data gathering
2534

    
2535
    nodes = frozenset([inst.primary_node for inst in instance_list])
2536

    
2537
    bad_nodes = []
2538
    if self.dynamic_fields.intersection(self.op.output_fields):
2539
      live_data = {}
2540
      node_data = rpc.call_all_instances_info(nodes)
2541
      for name in nodes:
2542
        result = node_data[name]
2543
        if result:
2544
          live_data.update(result)
2545
        elif result == False:
2546
          bad_nodes.append(name)
2547
        # else no instance is alive
2548
    else:
2549
      live_data = dict([(name, {}) for name in instance_names])
2550

    
2551
    # end data gathering
2552

    
2553
    output = []
2554
    for instance in instance_list:
2555
      iout = []
2556
      for field in self.op.output_fields:
2557
        if field == "name":
2558
          val = instance.name
2559
        elif field == "os":
2560
          val = instance.os
2561
        elif field == "pnode":
2562
          val = instance.primary_node
2563
        elif field == "snodes":
2564
          val = list(instance.secondary_nodes)
2565
        elif field == "admin_state":
2566
          val = (instance.status != "down")
2567
        elif field == "oper_state":
2568
          if instance.primary_node in bad_nodes:
2569
            val = None
2570
          else:
2571
            val = bool(live_data.get(instance.name))
2572
        elif field == "status":
2573
          if instance.primary_node in bad_nodes:
2574
            val = "ERROR_nodedown"
2575
          else:
2576
            running = bool(live_data.get(instance.name))
2577
            if running:
2578
              if instance.status != "down":
2579
                val = "running"
2580
              else:
2581
                val = "ERROR_up"
2582
            else:
2583
              if instance.status != "down":
2584
                val = "ERROR_down"
2585
              else:
2586
                val = "ADMIN_down"
2587
        elif field == "admin_ram":
2588
          val = instance.memory
2589
        elif field == "oper_ram":
2590
          if instance.primary_node in bad_nodes:
2591
            val = None
2592
          elif instance.name in live_data:
2593
            val = live_data[instance.name].get("memory", "?")
2594
          else:
2595
            val = "-"
2596
        elif field == "disk_template":
2597
          val = instance.disk_template
2598
        elif field == "ip":
2599
          val = instance.nics[0].ip
2600
        elif field == "bridge":
2601
          val = instance.nics[0].bridge
2602
        elif field == "mac":
2603
          val = instance.nics[0].mac
2604
        elif field == "sda_size" or field == "sdb_size":
2605
          disk = instance.FindDisk(field[:3])
2606
          if disk is None:
2607
            val = None
2608
          else:
2609
            val = disk.size
2610
        elif field == "vcpus":
2611
          val = instance.vcpus
2612
        elif field == "tags":
2613
          val = list(instance.GetTags())
2614
        elif field in ("network_port", "kernel_path", "initrd_path",
2615
                       "hvm_boot_order", "hvm_acpi", "hvm_pae",
2616
                       "hvm_cdrom_image_path", "hvm_nic_type",
2617
                       "hvm_disk_type", "vnc_bind_address"):
2618
          val = getattr(instance, field, None)
2619
          if val is not None:
2620
            pass
2621
          elif field in ("hvm_nic_type", "hvm_disk_type",
2622
                         "kernel_path", "initrd_path"):
2623
            val = "default"
2624
          else:
2625
            val = "-"
2626
        else:
2627
          raise errors.ParameterError(field)
2628
        iout.append(val)
2629
      output.append(iout)
2630

    
2631
    return output
2632

    
2633

    
2634
class LUFailoverInstance(LogicalUnit):
2635
  """Failover an instance.
2636

2637
  """
2638
  HPATH = "instance-failover"
2639
  HTYPE = constants.HTYPE_INSTANCE
2640
  _OP_REQP = ["instance_name", "ignore_consistency"]
2641
  REQ_BGL = False
2642

    
2643
  def ExpandNames(self):
2644
    self._ExpandAndLockInstance()
2645
    self.needed_locks[locking.LEVEL_NODE] = []
2646
    self.recalculate_locks[locking.LEVEL_NODE] = 'replace'
2647

    
2648
  def DeclareLocks(self, level):
2649
    if level == locking.LEVEL_NODE:
2650
      self._LockInstancesNodes()
2651

    
2652
  def BuildHooksEnv(self):
2653
    """Build hooks env.
2654

2655
    This runs on master, primary and secondary nodes of the instance.
2656

2657
    """
2658
    env = {
2659
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2660
      }
2661
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2662
    nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2663
    return env, nl, nl
2664

    
2665
  def CheckPrereq(self):
2666
    """Check prerequisites.
2667

2668
    This checks that the instance is in the cluster.
2669

2670
    """
2671
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2672
    assert self.instance is not None, \
2673
      "Cannot retrieve locked instance %s" % self.op.instance_name
2674

    
2675
    if instance.disk_template not in constants.DTS_NET_MIRROR:
2676
      raise errors.OpPrereqError("Instance's disk layout is not"
2677
                                 " network mirrored, cannot failover.")
2678

    
2679
    secondary_nodes = instance.secondary_nodes
2680
    if not secondary_nodes:
2681
      raise errors.ProgrammerError("no secondary node but using "
2682
                                   "a mirrored disk template")
2683

    
2684
    target_node = secondary_nodes[0]
2685
    # check memory requirements on the secondary node
2686
    _CheckNodeFreeMemory(self.cfg, target_node, "failing over instance %s" %
2687
                         instance.name, instance.memory)
2688

    
2689
    # check bridge existance
2690
    brlist = [nic.bridge for nic in instance.nics]
2691
    if not rpc.call_bridges_exist(target_node, brlist):
2692
      raise errors.OpPrereqError("One or more target bridges %s does not"
2693
                                 " exist on destination node '%s'" %
2694
                                 (brlist, target_node))
2695

    
2696
  def Exec(self, feedback_fn):
2697
    """Failover an instance.
2698

2699
    The failover is done by shutting it down on its present node and
2700
    starting it on the secondary.
2701

2702
    """
2703
    instance = self.instance
2704

    
2705
    source_node = instance.primary_node
2706
    target_node = instance.secondary_nodes[0]
2707

    
2708
    feedback_fn("* checking disk consistency between source and target")
2709
    for dev in instance.disks:
2710
      # for drbd, these are drbd over lvm
2711
      if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2712
        if instance.status == "up" and not self.op.ignore_consistency:
2713
          raise errors.OpExecError("Disk %s is degraded on target node,"
2714
                                   " aborting failover." % dev.iv_name)
2715

    
2716
    feedback_fn("* shutting down instance on source node")
2717
    logger.Info("Shutting down instance %s on node %s" %
2718
                (instance.name, source_node))
2719

    
2720
    if not rpc.call_instance_shutdown(source_node, instance):
2721
      if self.op.ignore_consistency:
2722
        logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2723
                     " anyway. Please make sure node %s is down"  %
2724
                     (instance.name, source_node, source_node))
2725
      else:
2726
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2727
                                 (instance.name, source_node))
2728

    
2729
    feedback_fn("* deactivating the instance's disks on source node")
2730
    if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2731
      raise errors.OpExecError("Can't shut down the instance's disks.")
2732

    
2733
    instance.primary_node = target_node
2734
    # distribute new instance config to the other nodes
2735
    self.cfg.Update(instance)
2736

    
2737
    # Only start the instance if it's marked as up
2738
    if instance.status == "up":
2739
      feedback_fn("* activating the instance's disks on target node")
2740
      logger.Info("Starting instance %s on node %s" %
2741
                  (instance.name, target_node))
2742

    
2743
      disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2744
                                               ignore_secondaries=True)
2745
      if not disks_ok:
2746
        _ShutdownInstanceDisks(instance, self.cfg)
2747
        raise errors.OpExecError("Can't activate the instance's disks")
2748

    
2749
      feedback_fn("* starting the instance on the target node")
2750
      if not rpc.call_instance_start(target_node, instance, None):
2751
        _ShutdownInstanceDisks(instance, self.cfg)
2752
        raise errors.OpExecError("Could not start instance %s on node %s." %
2753
                                 (instance.name, target_node))
2754

    
2755

    
2756
def _CreateBlockDevOnPrimary(cfg, node, instance, device, info):
2757
  """Create a tree of block devices on the primary node.
2758

2759
  This always creates all devices.
2760

2761
  """
2762
  if device.children:
2763
    for child in device.children:
2764
      if not _CreateBlockDevOnPrimary(cfg, node, instance, child, info):
2765
        return False
2766

    
2767
  cfg.SetDiskID(device, node)
2768
  new_id = rpc.call_blockdev_create(node, device, device.size,
2769
                                    instance.name, True, info)
2770
  if not new_id:
2771
    return False
2772
  if device.physical_id is None:
2773
    device.physical_id = new_id
2774
  return True
2775

    
2776

    
2777
def _CreateBlockDevOnSecondary(cfg, node, instance, device, force, info):
2778
  """Create a tree of block devices on a secondary node.
2779

2780
  If this device type has to be created on secondaries, create it and
2781
  all its children.
2782

2783
  If not, just recurse to children keeping the same 'force' value.
2784

2785
  """
2786
  if device.CreateOnSecondary():
2787
    force = True
2788
  if device.children:
2789
    for child in device.children:
2790
      if not _CreateBlockDevOnSecondary(cfg, node, instance,
2791
                                        child, force, info):
2792
        return False
2793

    
2794
  if not force:
2795
    return True
2796
  cfg.SetDiskID(device, node)
2797
  new_id = rpc.call_blockdev_create(node, device, device.size,
2798
                                    instance.name, False, info)
2799
  if not new_id:
2800
    return False
2801
  if device.physical_id is None:
2802
    device.physical_id = new_id
2803
  return True
2804

    
2805

    
2806
def _GenerateUniqueNames(cfg, exts):
2807
  """Generate a suitable LV name.
2808

2809
  This will generate a logical volume name for the given instance.
2810

2811
  """
2812
  results = []
2813
  for val in exts:
2814
    new_id = cfg.GenerateUniqueID()
2815
    results.append("%s%s" % (new_id, val))
2816
  return results
2817

    
2818

    
2819
def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name):
2820
  """Generate a drbd8 device complete with its children.
2821

2822
  """
2823
  port = cfg.AllocatePort()
2824
  vgname = cfg.GetVGName()
2825
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2826
                          logical_id=(vgname, names[0]))
2827
  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2828
                          logical_id=(vgname, names[1]))
2829
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
2830
                          logical_id = (primary, secondary, port),
2831
                          children = [dev_data, dev_meta],
2832
                          iv_name=iv_name)
2833
  return drbd_dev
2834

    
2835

    
2836
def _GenerateDiskTemplate(cfg, template_name,
2837
                          instance_name, primary_node,
2838
                          secondary_nodes, disk_sz, swap_sz,
2839
                          file_storage_dir, file_driver):
2840
  """Generate the entire disk layout for a given template type.
2841

2842
  """
2843
  #TODO: compute space requirements
2844

    
2845
  vgname = cfg.GetVGName()
2846
  if template_name == constants.DT_DISKLESS:
2847
    disks = []
2848
  elif template_name == constants.DT_PLAIN:
2849
    if len(secondary_nodes) != 0:
2850
      raise errors.ProgrammerError("Wrong template configuration")
2851

    
2852
    names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
2853
    sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2854
                           logical_id=(vgname, names[0]),
2855
                           iv_name = "sda")
2856
    sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2857
                           logical_id=(vgname, names[1]),
2858
                           iv_name = "sdb")
2859
    disks = [sda_dev, sdb_dev]
2860
  elif template_name == constants.DT_DRBD8:
2861
    if len(secondary_nodes) != 1:
2862
      raise errors.ProgrammerError("Wrong template configuration")
2863
    remote_node = secondary_nodes[0]
2864
    names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2865
                                       ".sdb_data", ".sdb_meta"])
2866
    drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2867
                                         disk_sz, names[0:2], "sda")
2868
    drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2869
                                         swap_sz, names[2:4], "sdb")
2870
    disks = [drbd_sda_dev, drbd_sdb_dev]
2871
  elif template_name == constants.DT_FILE:
2872
    if len(secondary_nodes) != 0:
2873
      raise errors.ProgrammerError("Wrong template configuration")
2874

    
2875
    file_sda_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk_sz,
2876
                                iv_name="sda", logical_id=(file_driver,
2877
                                "%s/sda" % file_storage_dir))
2878
    file_sdb_dev = objects.Disk(dev_type=constants.LD_FILE, size=swap_sz,
2879
                                iv_name="sdb", logical_id=(file_driver,
2880
                                "%s/sdb" % file_storage_dir))
2881
    disks = [file_sda_dev, file_sdb_dev]
2882
  else:
2883
    raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
2884
  return disks
2885

    
2886

    
2887
def _GetInstanceInfoText(instance):
2888
  """Compute that text that should be added to the disk's metadata.
2889

2890
  """
2891
  return "originstname+%s" % instance.name
2892

    
2893

    
2894
def _CreateDisks(cfg, instance):
2895
  """Create all disks for an instance.
2896

2897
  This abstracts away some work from AddInstance.
2898

2899
  Args:
2900
    instance: the instance object
2901

2902
  Returns:
2903
    True or False showing the success of the creation process
2904

2905
  """
2906
  info = _GetInstanceInfoText(instance)
2907

    
2908
  if instance.disk_template == constants.DT_FILE:
2909
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
2910
    result = rpc.call_file_storage_dir_create(instance.primary_node,
2911
                                              file_storage_dir)
2912

    
2913
    if not result:
2914
      logger.Error("Could not connect to node '%s'" % instance.primary_node)
2915
      return False
2916

    
2917
    if not result[0]:
2918
      logger.Error("failed to create directory '%s'" % file_storage_dir)
2919
      return False
2920

    
2921
  for device in instance.disks:
2922
    logger.Info("creating volume %s for instance %s" %
2923
                (device.iv_name, instance.name))
2924
    #HARDCODE
2925
    for secondary_node in instance.secondary_nodes:
2926
      if not _CreateBlockDevOnSecondary(cfg, secondary_node, instance,
2927
                                        device, False, info):
2928
        logger.Error("failed to create volume %s (%s) on secondary node %s!" %
2929
                     (device.iv_name, device, secondary_node))
2930
        return False
2931
    #HARDCODE
2932
    if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
2933
                                    instance, device, info):
2934
      logger.Error("failed to create volume %s on primary!" %
2935
                   device.iv_name)
2936
      return False
2937

    
2938
  return True
2939

    
2940

    
2941
def _RemoveDisks(instance, cfg):
2942
  """Remove all disks for an instance.
2943

2944
  This abstracts away some work from `AddInstance()` and
2945
  `RemoveInstance()`. Note that in case some of the devices couldn't
2946
  be removed, the removal will continue with the other ones (compare
2947
  with `_CreateDisks()`).
2948

2949
  Args:
2950
    instance: the instance object
2951

2952
  Returns:
2953
    True or False showing the success of the removal proces
2954

2955
  """
2956
  logger.Info("removing block devices for instance %s" % instance.name)
2957

    
2958
  result = True
2959
  for device in instance.disks:
2960
    for node, disk in device.ComputeNodeTree(instance.primary_node):
2961
      cfg.SetDiskID(disk, node)
2962
      if not rpc.call_blockdev_remove(node, disk):
2963
        logger.Error("could not remove block device %s on node %s,"
2964
                     " continuing anyway" %
2965
                     (device.iv_name, node))
2966
        result = False
2967

    
2968
  if instance.disk_template == constants.DT_FILE:
2969
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
2970
    if not rpc.call_file_storage_dir_remove(instance.primary_node,
2971
                                            file_storage_dir):
2972
      logger.Error("could not remove directory '%s'" % file_storage_dir)
2973
      result = False
2974

    
2975
  return result
2976

    
2977

    
2978
def _ComputeDiskSize(disk_template, disk_size, swap_size):
2979
  """Compute disk size requirements in the volume group
2980

2981
  This is currently hard-coded for the two-drive layout.
2982

2983
  """
2984
  # Required free disk space as a function of disk and swap space
2985
  req_size_dict = {
2986
    constants.DT_DISKLESS: None,
2987
    constants.DT_PLAIN: disk_size + swap_size,
2988
    # 256 MB are added for drbd metadata, 128MB for each drbd device
2989
    constants.DT_DRBD8: disk_size + swap_size + 256,
2990
    constants.DT_FILE: None,
2991
  }
2992

    
2993
  if disk_template not in req_size_dict:
2994
    raise errors.ProgrammerError("Disk template '%s' size requirement"
2995
                                 " is unknown" %  disk_template)
2996

    
2997
  return req_size_dict[disk_template]
2998

    
2999

    
3000
class LUCreateInstance(LogicalUnit):
3001
  """Create an instance.
3002

3003
  """
3004
  HPATH = "instance-add"
3005
  HTYPE = constants.HTYPE_INSTANCE
3006
  _OP_REQP = ["instance_name", "mem_size", "disk_size",
3007
              "disk_template", "swap_size", "mode", "start", "vcpus",
3008
              "wait_for_sync", "ip_check", "mac"]
3009

    
3010
  def _RunAllocator(self):
3011
    """Run the allocator based on input opcode.
3012

3013
    """
3014
    disks = [{"size": self.op.disk_size, "mode": "w"},
3015
             {"size": self.op.swap_size, "mode": "w"}]
3016
    nics = [{"mac": self.op.mac, "ip": getattr(self.op, "ip", None),
3017
             "bridge": self.op.bridge}]
3018
    ial = IAllocator(self.cfg, self.sstore,
3019
                     mode=constants.IALLOCATOR_MODE_ALLOC,
3020
                     name=self.op.instance_name,
3021
                     disk_template=self.op.disk_template,
3022
                     tags=[],
3023
                     os=self.op.os_type,
3024
                     vcpus=self.op.vcpus,
3025
                     mem_size=self.op.mem_size,
3026
                     disks=disks,
3027
                     nics=nics,
3028
                     )
3029

    
3030
    ial.Run(self.op.iallocator)
3031

    
3032
    if not ial.success:
3033
      raise errors.OpPrereqError("Can't compute nodes using"
3034
                                 " iallocator '%s': %s" % (self.op.iallocator,
3035
                                                           ial.info))
3036
    if len(ial.nodes) != ial.required_nodes:
3037
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3038
                                 " of nodes (%s), required %s" %
3039
                                 (len(ial.nodes), ial.required_nodes))
3040
    self.op.pnode = ial.nodes[0]
3041
    logger.ToStdout("Selected nodes for the instance: %s" %
3042
                    (", ".join(ial.nodes),))
3043
    logger.Info("Selected nodes for instance %s via iallocator %s: %s" %
3044
                (self.op.instance_name, self.op.iallocator, ial.nodes))
3045
    if ial.required_nodes == 2:
3046
      self.op.snode = ial.nodes[1]
3047

    
3048
  def BuildHooksEnv(self):
3049
    """Build hooks env.
3050

3051
    This runs on master, primary and secondary nodes of the instance.
3052

3053
    """
3054
    env = {
3055
      "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
3056
      "INSTANCE_DISK_SIZE": self.op.disk_size,
3057
      "INSTANCE_SWAP_SIZE": self.op.swap_size,
3058
      "INSTANCE_ADD_MODE": self.op.mode,
3059
      }
3060
    if self.op.mode == constants.INSTANCE_IMPORT:
3061
      env["INSTANCE_SRC_NODE"] = self.op.src_node
3062
      env["INSTANCE_SRC_PATH"] = self.op.src_path
3063
      env["INSTANCE_SRC_IMAGE"] = self.src_image
3064

    
3065
    env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
3066
      primary_node=self.op.pnode,
3067
      secondary_nodes=self.secondaries,
3068
      status=self.instance_status,
3069
      os_type=self.op.os_type,
3070
      memory=self.op.mem_size,
3071
      vcpus=self.op.vcpus,
3072
      nics=[(self.inst_ip, self.op.bridge, self.op.mac)],
3073
    ))
3074

    
3075
    nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
3076
          self.secondaries)
3077
    return env, nl, nl
3078

    
3079

    
3080
  def CheckPrereq(self):
3081
    """Check prerequisites.
3082

3083
    """
3084
    # set optional parameters to none if they don't exist
3085
    for attr in ["kernel_path", "initrd_path", "hvm_boot_order", "pnode",
3086
                 "iallocator", "hvm_acpi", "hvm_pae", "hvm_cdrom_image_path",
3087
                 "hvm_nic_type", "hvm_disk_type", "vnc_bind_address"]:
3088
      if not hasattr(self.op, attr):
3089
        setattr(self.op, attr, None)
3090

    
3091
    if self.op.mode not in (constants.INSTANCE_CREATE,
3092
                            constants.INSTANCE_IMPORT):
3093
      raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
3094
                                 self.op.mode)
3095

    
3096
    if (not self.cfg.GetVGName() and
3097
        self.op.disk_template not in constants.DTS_NOT_LVM):
3098
      raise errors.OpPrereqError("Cluster does not support lvm-based"
3099
                                 " instances")
3100

    
3101
    if self.op.mode == constants.INSTANCE_IMPORT:
3102
      src_node = getattr(self.op, "src_node", None)
3103
      src_path = getattr(self.op, "src_path", None)
3104
      if src_node is None or src_path is None:
3105
        raise errors.OpPrereqError("Importing an instance requires source"
3106
                                   " node and path options")
3107
      src_node_full = self.cfg.ExpandNodeName(src_node)
3108
      if src_node_full is None:
3109
        raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
3110
      self.op.src_node = src_node = src_node_full
3111

    
3112
      if not os.path.isabs(src_path):
3113
        raise errors.OpPrereqError("The source path must be absolute")
3114

    
3115
      export_info = rpc.call_export_info(src_node, src_path)
3116

    
3117
      if not export_info:
3118
        raise errors.OpPrereqError("No export found in dir %s" % src_path)
3119

    
3120
      if not export_info.has_section(constants.INISECT_EXP):
3121
        raise errors.ProgrammerError("Corrupted export config")
3122

    
3123
      ei_version = export_info.get(constants.INISECT_EXP, 'version')
3124
      if (int(ei_version) != constants.EXPORT_VERSION):
3125
        raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3126
                                   (ei_version, constants.EXPORT_VERSION))
3127

    
3128
      if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
3129
        raise errors.OpPrereqError("Can't import instance with more than"
3130
                                   " one data disk")
3131

    
3132
      # FIXME: are the old os-es, disk sizes, etc. useful?
3133
      self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3134
      diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
3135
                                                         'disk0_dump'))
3136
      self.src_image = diskimage
3137
    else: # INSTANCE_CREATE
3138
      if getattr(self.op, "os_type", None) is None:
3139
        raise errors.OpPrereqError("No guest OS specified")
3140

    
3141
    #### instance parameters check
3142

    
3143
    # disk template and mirror node verification
3144
    if self.op.disk_template not in constants.DISK_TEMPLATES:
3145
      raise errors.OpPrereqError("Invalid disk template name")
3146

    
3147
    # instance name verification
3148
    hostname1 = utils.HostInfo(self.op.instance_name)
3149

    
3150
    self.op.instance_name = instance_name = hostname1.name
3151
    instance_list = self.cfg.GetInstanceList()
3152
    if instance_name in instance_list:
3153
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3154
                                 instance_name)
3155

    
3156
    # ip validity checks
3157
    ip = getattr(self.op, "ip", None)
3158
    if ip is None or ip.lower() == "none":
3159
      inst_ip = None
3160
    elif ip.lower() == "auto":
3161
      inst_ip = hostname1.ip
3162
    else:
3163
      if not utils.IsValidIP(ip):
3164
        raise errors.OpPrereqError("given IP address '%s' doesn't look"
3165
                                   " like a valid IP" % ip)
3166
      inst_ip = ip
3167
    self.inst_ip = self.op.ip = inst_ip
3168

    
3169
    if self.op.start and not self.op.ip_check:
3170
      raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3171
                                 " adding an instance in start mode")
3172

    
3173
    if self.op.ip_check:
3174
      if utils.TcpPing(hostname1.ip, constants.DEFAULT_NODED_PORT):
3175
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
3176
                                   (hostname1.ip, instance_name))
3177

    
3178
    # MAC address verification
3179
    if self.op.mac != "auto":
3180
      if not utils.IsValidMac(self.op.mac.lower()):
3181
        raise errors.OpPrereqError("invalid MAC address specified: %s" %
3182
                                   self.op.mac)
3183

    
3184
    # bridge verification
3185
    bridge = getattr(self.op, "bridge", None)
3186
    if bridge is None:
3187
      self.op.bridge = self.cfg.GetDefBridge()
3188
    else:
3189
      self.op.bridge = bridge
3190

    
3191
    # boot order verification
3192
    if self.op.hvm_boot_order is not None:
3193
      if len(self.op.hvm_boot_order.strip("acdn")) != 0:
3194
        raise errors.OpPrereqError("invalid boot order specified,"
3195
                                   " must be one or more of [acdn]")
3196
    # file storage checks
3197
    if (self.op.file_driver and
3198
        not self.op.file_driver in constants.FILE_DRIVER):
3199
      raise errors.OpPrereqError("Invalid file driver name '%s'" %
3200
                                 self.op.file_driver)
3201

    
3202
    if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
3203
      raise errors.OpPrereqError("File storage directory not a relative"
3204
                                 " path")
3205
    #### allocator run
3206

    
3207
    if [self.op.iallocator, self.op.pnode].count(None) != 1:
3208
      raise errors.OpPrereqError("One and only one of iallocator and primary"
3209
                                 " node must be given")
3210

    
3211
    if self.op.iallocator is not None:
3212
      self._RunAllocator()
3213

    
3214
    #### node related checks
3215

    
3216
    # check primary node
3217
    pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
3218
    if pnode is None:
3219
      raise errors.OpPrereqError("Primary node '%s' is unknown" %
3220
                                 self.op.pnode)
3221
    self.op.pnode = pnode.name
3222
    self.pnode = pnode
3223
    self.secondaries = []
3224

    
3225
    # mirror node verification
3226
    if self.op.disk_template in constants.DTS_NET_MIRROR:
3227
      if getattr(self.op, "snode", None) is None:
3228
        raise errors.OpPrereqError("The networked disk templates need"
3229
                                   " a mirror node")
3230

    
3231
      snode_name = self.cfg.ExpandNodeName(self.op.snode)
3232
      if snode_name is None:
3233
        raise errors.OpPrereqError("Unknown secondary node '%s'" %
3234
                                   self.op.snode)
3235
      elif snode_name == pnode.name:
3236
        raise errors.OpPrereqError("The secondary node cannot be"
3237
                                   " the primary node.")
3238
      self.secondaries.append(snode_name)
3239

    
3240
    req_size = _ComputeDiskSize(self.op.disk_template,
3241
                                self.op.disk_size, self.op.swap_size)
3242

    
3243
    # Check lv size requirements
3244
    if req_size is not None:
3245
      nodenames = [pnode.name] + self.secondaries
3246
      nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
3247
      for node in nodenames:
3248
        info = nodeinfo.get(node, None)
3249
        if not info:
3250
          raise errors.OpPrereqError("Cannot get current information"
3251
                                     " from node '%s'" % node)
3252
        vg_free = info.get('vg_free', None)
3253
        if not isinstance(vg_free, int):
3254
          raise errors.OpPrereqError("Can't compute free disk space on"
3255
                                     " node %s" % node)
3256
        if req_size > info['vg_free']:
3257
          raise errors.OpPrereqError("Not enough disk space on target node %s."
3258
                                     " %d MB available, %d MB required" %
3259
                                     (node, info['vg_free'], req_size))
3260

    
3261
    # os verification
3262
    os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
3263
    if not os_obj:
3264
      raise errors.OpPrereqError("OS '%s' not in supported os list for"
3265
                                 " primary node"  % self.op.os_type)
3266

    
3267
    if self.op.kernel_path == constants.VALUE_NONE:
3268
      raise errors.OpPrereqError("Can't set instance kernel to none")
3269

    
3270

    
3271
    # bridge check on primary node
3272
    if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3273
      raise errors.OpPrereqError("target bridge '%s' does not exist on"
3274
                                 " destination node '%s'" %
3275
                                 (self.op.bridge, pnode.name))
3276

    
3277
    # memory check on primary node
3278
    if self.op.start:
3279
      _CheckNodeFreeMemory(self.cfg, self.pnode.name,
3280
                           "creating instance %s" % self.op.instance_name,
3281
                           self.op.mem_size)
3282

    
3283
    # hvm_cdrom_image_path verification
3284
    if self.op.hvm_cdrom_image_path is not None:
3285
      if not os.path.isabs(self.op.hvm_cdrom_image_path):
3286
        raise errors.OpPrereqError("The path to the HVM CDROM image must"
3287
                                   " be an absolute path or None, not %s" %
3288
                                   self.op.hvm_cdrom_image_path)
3289
      if not os.path.isfile(self.op.hvm_cdrom_image_path):
3290
        raise errors.OpPrereqError("The HVM CDROM image must either be a"
3291
                                   " regular file or a symlink pointing to"
3292
                                   " an existing regular file, not %s" %
3293
                                   self.op.hvm_cdrom_image_path)
3294

    
3295
    # vnc_bind_address verification
3296
    if self.op.vnc_bind_address is not None:
3297
      if not utils.IsValidIP(self.op.vnc_bind_address):
3298
        raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
3299
                                   " like a valid IP address" %
3300
                                   self.op.vnc_bind_address)
3301

    
3302
    # Xen HVM device type checks
3303
    if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
3304
      if self.op.hvm_nic_type not in constants.HT_HVM_VALID_NIC_TYPES:
3305
        raise errors.OpPrereqError("Invalid NIC type %s specified for Xen HVM"
3306
                                   " hypervisor" % self.op.hvm_nic_type)
3307
      if self.op.hvm_disk_type not in constants.HT_HVM_VALID_DISK_TYPES:
3308
        raise errors.OpPrereqError("Invalid disk type %s specified for Xen HVM"
3309
                                   " hypervisor" % self.op.hvm_disk_type)
3310

    
3311
    if self.op.start:
3312
      self.instance_status = 'up'
3313
    else:
3314
      self.instance_status = 'down'
3315

    
3316
  def Exec(self, feedback_fn):
3317
    """Create and add the instance to the cluster.
3318

3319
    """
3320
    instance = self.op.instance_name
3321
    pnode_name = self.pnode.name
3322

    
3323
    if self.op.mac == "auto":
3324
      mac_address = self.cfg.GenerateMAC()
3325
    else:
3326
      mac_address = self.op.mac
3327

    
3328
    nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3329
    if self.inst_ip is not None:
3330
      nic.ip = self.inst_ip
3331

    
3332
    ht_kind = self.sstore.GetHypervisorType()
3333
    if ht_kind in constants.HTS_REQ_PORT:
3334
      network_port = self.cfg.AllocatePort()
3335
    else:
3336
      network_port = None
3337

    
3338
    if self.op.vnc_bind_address is None:
3339
      self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
3340

    
3341
    # this is needed because os.path.join does not accept None arguments
3342
    if self.op.file_storage_dir is None:
3343
      string_file_storage_dir = ""
3344
    else:
3345
      string_file_storage_dir = self.op.file_storage_dir
3346

    
3347
    # build the full file storage dir path
3348
    file_storage_dir = os.path.normpath(os.path.join(
3349
                                        self.sstore.GetFileStorageDir(),
3350
                                        string_file_storage_dir, instance))
3351

    
3352

    
3353
    disks = _GenerateDiskTemplate(self.cfg,
3354
                                  self.op.disk_template,
3355
                                  instance, pnode_name,
3356
                                  self.secondaries, self.op.disk_size,
3357
                                  self.op.swap_size,
3358
                                  file_storage_dir,
3359
                                  self.op.file_driver)
3360

    
3361
    iobj = objects.Instance(name=instance, os=self.op.os_type,
3362
                            primary_node=pnode_name,
3363
                            memory=self.op.mem_size,
3364
                            vcpus=self.op.vcpus,
3365
                            nics=[nic], disks=disks,
3366
                            disk_template=self.op.disk_template,
3367
                            status=self.instance_status,
3368
                            network_port=network_port,
3369
                            kernel_path=self.op.kernel_path,
3370
                            initrd_path=self.op.initrd_path,
3371
                            hvm_boot_order=self.op.hvm_boot_order,
3372
                            hvm_acpi=self.op.hvm_acpi,
3373
                            hvm_pae=self.op.hvm_pae,
3374
                            hvm_cdrom_image_path=self.op.hvm_cdrom_image_path,
3375
                            vnc_bind_address=self.op.vnc_bind_address,
3376
                            hvm_nic_type=self.op.hvm_nic_type,
3377
                            hvm_disk_type=self.op.hvm_disk_type,
3378
                            )
3379

    
3380
    feedback_fn("* creating instance disks...")
3381
    if not _CreateDisks(self.cfg, iobj):
3382
      _RemoveDisks(iobj, self.cfg)
3383
      raise errors.OpExecError("Device creation failed, reverting...")
3384

    
3385
    feedback_fn("adding instance %s to cluster config" % instance)
3386

    
3387
    self.cfg.AddInstance(iobj)
3388
    # Add the new instance to the Ganeti Lock Manager
3389
    self.context.glm.add(locking.LEVEL_INSTANCE, instance)
3390

    
3391
    if self.op.wait_for_sync:
3392
      disk_abort = not _WaitForSync(self.cfg, iobj, self.proc)
3393
    elif iobj.disk_template in constants.DTS_NET_MIRROR:
3394
      # make sure the disks are not degraded (still sync-ing is ok)
3395
      time.sleep(15)
3396
      feedback_fn("* checking mirrors status")
3397
      disk_abort = not _WaitForSync(self.cfg, iobj, self.proc, oneshot=True)
3398
    else:
3399
      disk_abort = False
3400

    
3401
    if disk_abort:
3402
      _RemoveDisks(iobj, self.cfg)
3403
      self.cfg.RemoveInstance(iobj.name)
3404
      # Remove the new instance from the Ganeti Lock Manager
3405
      self.context.glm.remove(locking.LEVEL_INSTANCE, iobj.name)
3406
      raise errors.OpExecError("There are some degraded disks for"
3407
                               " this instance")
3408

    
3409
    feedback_fn("creating os for instance %s on node %s" %
3410
                (instance, pnode_name))
3411

    
3412
    if iobj.disk_template != constants.DT_DISKLESS:
3413
      if self.op.mode == constants.INSTANCE_CREATE:
3414
        feedback_fn("* running the instance OS create scripts...")
3415
        if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3416
          raise errors.OpExecError("could not add os for instance %s"
3417
                                   " on node %s" %
3418
                                   (instance, pnode_name))
3419

    
3420
      elif self.op.mode == constants.INSTANCE_IMPORT:
3421
        feedback_fn("* running the instance OS import scripts...")
3422
        src_node = self.op.src_node
3423
        src_image = self.src_image
3424
        if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3425
                                                src_node, src_image):
3426
          raise errors.OpExecError("Could not import os for instance"
3427
                                   " %s on node %s" %
3428
                                   (instance, pnode_name))
3429
      else:
3430
        # also checked in the prereq part
3431
        raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3432
                                     % self.op.mode)
3433

    
3434
    if self.op.start:
3435
      logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3436
      feedback_fn("* starting instance...")
3437
      if not rpc.call_instance_start(pnode_name, iobj, None):
3438
        raise errors.OpExecError("Could not start instance")
3439

    
3440

    
3441
class LUConnectConsole(NoHooksLU):
3442
  """Connect to an instance's console.
3443

3444
  This is somewhat special in that it returns the command line that
3445
  you need to run on the master node in order to connect to the
3446
  console.
3447

3448
  """
3449
  _OP_REQP = ["instance_name"]
3450
  REQ_BGL = False
3451

    
3452
  def ExpandNames(self):
3453
    self._ExpandAndLockInstance()
3454

    
3455
  def CheckPrereq(self):
3456
    """Check prerequisites.
3457

3458
    This checks that the instance is in the cluster.
3459

3460
    """
3461
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3462
    assert self.instance is not None, \
3463
      "Cannot retrieve locked instance %s" % self.op.instance_name
3464

    
3465
  def Exec(self, feedback_fn):
3466
    """Connect to the console of an instance
3467

3468
    """
3469
    instance = self.instance
3470
    node = instance.primary_node
3471

    
3472
    node_insts = rpc.call_instance_list([node])[node]
3473
    if node_insts is False:
3474
      raise errors.OpExecError("Can't connect to node %s." % node)
3475

    
3476
    if instance.name not in node_insts:
3477
      raise errors.OpExecError("Instance %s is not running." % instance.name)
3478

    
3479
    logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3480

    
3481
    hyper = hypervisor.GetHypervisor()
3482
    console_cmd = hyper.GetShellCommandForConsole(instance)
3483

    
3484
    # build ssh cmdline
3485
    return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
3486

    
3487

    
3488
class LUReplaceDisks(LogicalUnit):
3489
  """Replace the disks of an instance.
3490

3491
  """
3492
  HPATH = "mirrors-replace"
3493
  HTYPE = constants.HTYPE_INSTANCE
3494
  _OP_REQP = ["instance_name", "mode", "disks"]
3495

    
3496
  def _RunAllocator(self):
3497
    """Compute a new secondary node using an IAllocator.
3498

3499
    """
3500
    ial = IAllocator(self.cfg, self.sstore,
3501
                     mode=constants.IALLOCATOR_MODE_RELOC,
3502
                     name=self.op.instance_name,
3503
                     relocate_from=[self.sec_node])
3504

    
3505
    ial.Run(self.op.iallocator)
3506

    
3507
    if not ial.success:
3508
      raise errors.OpPrereqError("Can't compute nodes using"
3509
                                 " iallocator '%s': %s" % (self.op.iallocator,
3510
                                                           ial.info))
3511
    if len(ial.nodes) != ial.required_nodes:
3512
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3513
                                 " of nodes (%s), required %s" %
3514
                                 (len(ial.nodes), ial.required_nodes))
3515
    self.op.remote_node = ial.nodes[0]
3516
    logger.ToStdout("Selected new secondary for the instance: %s" %
3517
                    self.op.remote_node)
3518

    
3519
  def BuildHooksEnv(self):
3520
    """Build hooks env.
3521

3522
    This runs on the master, the primary and all the secondaries.
3523

3524
    """
3525
    env = {
3526
      "MODE": self.op.mode,
3527
      "NEW_SECONDARY": self.op.remote_node,
3528
      "OLD_SECONDARY": self.instance.secondary_nodes[0],
3529
      }
3530
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3531
    nl = [
3532
      self.sstore.GetMasterNode(),
3533
      self.instance.primary_node,
3534
      ]
3535
    if self.op.remote_node is not None:
3536
      nl.append(self.op.remote_node)
3537
    return env, nl, nl
3538

    
3539
  def CheckPrereq(self):
3540
    """Check prerequisites.
3541

3542
    This checks that the instance is in the cluster.
3543

3544
    """
3545
    if not hasattr(self.op, "remote_node"):
3546
      self.op.remote_node = None
3547

    
3548
    instance = self.cfg.GetInstanceInfo(
3549
      self.cfg.ExpandInstanceName(self.op.instance_name))
3550
    if instance is None:
3551
      raise errors.OpPrereqError("Instance '%s' not known" %
3552
                                 self.op.instance_name)
3553
    self.instance = instance
3554
    self.op.instance_name = instance.name
3555

    
3556
    if instance.disk_template not in constants.DTS_NET_MIRROR:
3557
      raise errors.OpPrereqError("Instance's disk layout is not"
3558
                                 " network mirrored.")
3559

    
3560
    if len(instance.secondary_nodes) != 1:
3561
      raise errors.OpPrereqError("The instance has a strange layout,"
3562
                                 " expected one secondary but found %d" %
3563
                                 len(instance.secondary_nodes))
3564

    
3565
    self.sec_node = instance.secondary_nodes[0]
3566

    
3567
    ia_name = getattr(self.op, "iallocator", None)
3568
    if ia_name is not None:
3569
      if self.op.remote_node is not None:
3570
        raise errors.OpPrereqError("Give either the iallocator or the new"
3571
                                   " secondary, not both")
3572
      self.op.remote_node = self._RunAllocator()
3573

    
3574
    remote_node = self.op.remote_node
3575
    if remote_node is not None:
3576
      remote_node = self.cfg.ExpandNodeName(remote_node)
3577
      if remote_node is None:
3578
        raise errors.OpPrereqError("Node '%s' not known" %
3579
                                   self.op.remote_node)
3580
      self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3581
    else:
3582
      self.remote_node_info = None
3583
    if remote_node == instance.primary_node:
3584
      raise errors.OpPrereqError("The specified node is the primary node of"
3585
                                 " the instance.")
3586
    elif remote_node == self.sec_node:
3587
      if self.op.mode == constants.REPLACE_DISK_SEC:
3588
        # this is for DRBD8, where we can't execute the same mode of
3589
        # replacement as for drbd7 (no different port allocated)
3590
        raise errors.OpPrereqError("Same secondary given, cannot execute"
3591
                                   " replacement")
3592
    if instance.disk_template == constants.DT_DRBD8:
3593
      if (self.op.mode == constants.REPLACE_DISK_ALL and
3594
          remote_node is not None):
3595
        # switch to replace secondary mode
3596
        self.op.mode = constants.REPLACE_DISK_SEC
3597

    
3598
      if self.op.mode == constants.REPLACE_DISK_ALL:
3599
        raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3600
                                   " secondary disk replacement, not"
3601
                                   " both at once")
3602
      elif self.op.mode == constants.REPLACE_DISK_PRI:
3603
        if remote_node is not None:
3604
          raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3605
                                     " the secondary while doing a primary"
3606
                                     " node disk replacement")
3607
        self.tgt_node = instance.primary_node
3608
        self.oth_node = instance.secondary_nodes[0]
3609
      elif self.op.mode == constants.REPLACE_DISK_SEC:
3610
        self.new_node = remote_node # this can be None, in which case
3611
                                    # we don't change the secondary
3612
        self.tgt_node = instance.secondary_nodes[0]
3613
        self.oth_node = instance.primary_node
3614
      else:
3615
        raise errors.ProgrammerError("Unhandled disk replace mode")
3616

    
3617
    for name in self.op.disks:
3618
      if instance.FindDisk(name) is None:
3619
        raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3620
                                   (name, instance.name))
3621
    self.op.remote_node = remote_node
3622

    
3623
  def _ExecD8DiskOnly(self, feedback_fn):
3624
    """Replace a disk on the primary or secondary for dbrd8.
3625

3626
    The algorithm for replace is quite complicated:
3627
      - for each disk to be replaced:
3628
        - create new LVs on the target node with unique names
3629
        - detach old LVs from the drbd device
3630
        - rename old LVs to name_replaced.<time_t>
3631
        - rename new LVs to old LVs
3632
        - attach the new LVs (with the old names now) to the drbd device
3633
      - wait for sync across all devices
3634
      - for each modified disk:
3635
        - remove old LVs (which have the name name_replaces.<time_t>)
3636

3637
    Failures are not very well handled.
3638

3639
    """
3640
    steps_total = 6
3641
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3642
    instance = self.instance
3643
    iv_names = {}
3644
    vgname = self.cfg.GetVGName()
3645
    # start of work
3646
    cfg = self.cfg
3647
    tgt_node = self.tgt_node
3648
    oth_node = self.oth_node
3649

    
3650
    # Step: check device activation
3651
    self.proc.LogStep(1, steps_total, "check device existence")
3652
    info("checking volume groups")
3653
    my_vg = cfg.GetVGName()
3654
    results = rpc.call_vg_list([oth_node, tgt_node])
3655
    if not results:
3656
      raise errors.OpExecError("Can't list volume groups on the nodes")
3657
    for node in oth_node, tgt_node:
3658
      res = results.get(node, False)
3659
      if not res or my_vg not in res:
3660
        raise errors.OpExecError("Volume group '%s' not found on %s" %
3661
                                 (my_vg, node))
3662
    for dev in instance.disks:
3663
      if not dev.iv_name in self.op.disks:
3664
        continue
3665
      for node in tgt_node, oth_node:
3666
        info("checking %s on %s" % (dev.iv_name, node))
3667
        cfg.SetDiskID(dev, node)
3668
        if not rpc.call_blockdev_find(node, dev):
3669
          raise errors.OpExecError("Can't find device %s on node %s" %
3670
                                   (dev.iv_name, node))
3671

    
3672
    # Step: check other node consistency
3673
    self.proc.LogStep(2, steps_total, "check peer consistency")
3674
    for dev in instance.disks:
3675
      if not dev.iv_name in self.op.disks:
3676
        continue
3677
      info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3678
      if not _CheckDiskConsistency(self.cfg, dev, oth_node,
3679
                                   oth_node==instance.primary_node):
3680
        raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3681
                                 " to replace disks on this node (%s)" %
3682
                                 (oth_node, tgt_node))
3683

    
3684
    # Step: create new storage
3685
    self.proc.LogStep(3, steps_total, "allocate new storage")
3686
    for dev in instance.disks:
3687
      if not dev.iv_name in self.op.disks:
3688
        continue
3689
      size = dev.size
3690
      cfg.SetDiskID(dev, tgt_node)
3691
      lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3692
      names = _GenerateUniqueNames(cfg, lv_names)
3693
      lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3694
                             logical_id=(vgname, names[0]))
3695
      lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3696
                             logical_id=(vgname, names[1]))
3697
      new_lvs = [lv_data, lv_meta]
3698
      old_lvs = dev.children
3699
      iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3700
      info("creating new local storage on %s for %s" %
3701
           (tgt_node, dev.iv_name))
3702
      # since we *always* want to create this LV, we use the
3703
      # _Create...OnPrimary (which forces the creation), even if we
3704
      # are talking about the secondary node
3705
      for new_lv in new_lvs:
3706
        if not _CreateBlockDevOnPrimary(cfg, tgt_node, instance, new_lv,
3707
                                        _GetInstanceInfoText(instance)):
3708
          raise errors.OpExecError("Failed to create new LV named '%s' on"
3709
                                   " node '%s'" %
3710
                                   (new_lv.logical_id[1], tgt_node))
3711

    
3712
    # Step: for each lv, detach+rename*2+attach
3713
    self.proc.LogStep(4, steps_total, "change drbd configuration")
3714
    for dev, old_lvs, new_lvs in iv_names.itervalues():
3715
      info("detaching %s drbd from local storage" % dev.iv_name)
3716
      if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
3717
        raise errors.OpExecError("Can't detach drbd from local storage on node"
3718
                                 " %s for device %s" % (tgt_node, dev.iv_name))
3719
      #dev.children = []
3720
      #cfg.Update(instance)
3721

    
3722
      # ok, we created the new LVs, so now we know we have the needed
3723
      # storage; as such, we proceed on the target node to rename
3724
      # old_lv to _old, and new_lv to old_lv; note that we rename LVs
3725
      # using the assumption that logical_id == physical_id (which in
3726
      # turn is the unique_id on that node)
3727

    
3728
      # FIXME(iustin): use a better name for the replaced LVs
3729
      temp_suffix = int(time.time())
3730
      ren_fn = lambda d, suff: (d.physical_id[0],
3731
                                d.physical_id[1] + "_replaced-%s" % suff)
3732
      # build the rename list based on what LVs exist on the node
3733
      rlist = []
3734
      for to_ren in old_lvs:
3735
        find_res = rpc.call_blockdev_find(tgt_node, to_ren)
3736
        if find_res is not None: # device exists
3737
          rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
3738

    
3739
      info("renaming the old LVs on the target node")
3740
      if not rpc.call_blockdev_rename(tgt_node, rlist):
3741
        raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
3742
      # now we rename the new LVs to the old LVs
3743
      info("renaming the new LVs on the target node")
3744
      rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
3745
      if not rpc.call_blockdev_rename(tgt_node, rlist):
3746
        raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
3747

    
3748
      for old, new in zip(old_lvs, new_lvs):
3749
        new.logical_id = old.logical_id
3750
        cfg.SetDiskID(new, tgt_node)
3751

    
3752
      for disk in old_lvs:
3753
        disk.logical_id = ren_fn(disk, temp_suffix)
3754
        cfg.SetDiskID(disk, tgt_node)
3755

    
3756
      # now that the new lvs have the old name, we can add them to the device
3757
      info("adding new mirror component on %s" % tgt_node)
3758
      if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
3759
        for new_lv in new_lvs:
3760
          if not rpc.call_blockdev_remove(tgt_node, new_lv):
3761
            warning("Can't rollback device %s", hint="manually cleanup unused"
3762
                    " logical volumes")
3763
        raise errors.OpExecError("Can't add local storage to drbd")
3764

    
3765
      dev.children = new_lvs
3766
      cfg.Update(instance)
3767

    
3768
    # Step: wait for sync
3769

    
3770
    # this can fail as the old devices are degraded and _WaitForSync
3771
    # does a combined result over all disks, so we don't check its
3772
    # return value
3773
    self.proc.LogStep(5, steps_total, "sync devices")
3774
    _WaitForSync(cfg, instance, self.proc, unlock=True)
3775

    
3776
    # so check manually all the devices
3777
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3778
      cfg.SetDiskID(dev, instance.primary_node)
3779
      is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3780
      if is_degr:
3781
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
3782

    
3783
    # Step: remove old storage
3784
    self.proc.LogStep(6, steps_total, "removing old storage")
3785
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3786
      info("remove logical volumes for %s" % name)
3787
      for lv in old_lvs:
3788
        cfg.SetDiskID(lv, tgt_node)
3789
        if not rpc.call_blockdev_remove(tgt_node, lv):
3790
          warning("Can't remove old LV", hint="manually remove unused LVs")
3791
          continue
3792

    
3793
  def _ExecD8Secondary(self, feedback_fn):
3794
    """Replace the secondary node for drbd8.
3795

3796
    The algorithm for replace is quite complicated:
3797
      - for all disks of the instance:
3798
        - create new LVs on the new node with same names
3799
        - shutdown the drbd device on the old secondary
3800
        - disconnect the drbd network on the primary
3801
        - create the drbd device on the new secondary
3802
        - network attach the drbd on the primary, using an artifice:
3803
          the drbd code for Attach() will connect to the network if it
3804
          finds a device which is connected to the good local disks but
3805
          not network enabled
3806
      - wait for sync across all devices
3807
      - remove all disks from the old secondary
3808

3809
    Failures are not very well handled.
3810

3811
    """
3812
    steps_total = 6
3813
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3814
    instance = self.instance
3815
    iv_names = {}
3816
    vgname = self.cfg.GetVGName()
3817
    # start of work
3818
    cfg = self.cfg
3819
    old_node = self.tgt_node
3820
    new_node = self.new_node
3821
    pri_node = instance.primary_node
3822

    
3823
    # Step: check device activation
3824
    self.proc.LogStep(1, steps_total, "check device existence")
3825
    info("checking volume groups")
3826
    my_vg = cfg.GetVGName()
3827
    results = rpc.call_vg_list([pri_node, new_node])
3828
    if not results:
3829
      raise errors.OpExecError("Can't list volume groups on the nodes")
3830
    for node in pri_node, new_node:
3831
      res = results.get(node, False)
3832
      if not res or my_vg not in res:
3833
        raise errors.OpExecError("Volume group '%s' not found on %s" %
3834
                                 (my_vg, node))
3835
    for dev in instance.disks:
3836
      if not dev.iv_name in self.op.disks:
3837
        continue
3838
      info("checking %s on %s" % (dev.iv_name, pri_node))
3839
      cfg.SetDiskID(dev, pri_node)
3840
      if not rpc.call_blockdev_find(pri_node, dev):
3841
        raise errors.OpExecError("Can't find device %s on node %s" %
3842
                                 (dev.iv_name, pri_node))
3843

    
3844
    # Step: check other node consistency
3845
    self.proc.LogStep(2, steps_total, "check peer consistency")
3846
    for dev in instance.disks:
3847
      if not dev.iv_name in self.op.disks:
3848
        continue
3849
      info("checking %s consistency on %s" % (dev.iv_name, pri_node))
3850
      if not _CheckDiskConsistency(self.cfg, dev, pri_node, True, ldisk=True):
3851
        raise errors.OpExecError("Primary node (%s) has degraded storage,"
3852
                                 " unsafe to replace the secondary" %
3853
                                 pri_node)
3854

    
3855
    # Step: create new storage
3856
    self.proc.LogStep(3, steps_total, "allocate new storage")
3857
    for dev in instance.disks:
3858
      size = dev.size
3859
      info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
3860
      # since we *always* want to create this LV, we use the
3861
      # _Create...OnPrimary (which forces the creation), even if we
3862
      # are talking about the secondary node
3863
      for new_lv in dev.children:
3864
        if not _CreateBlockDevOnPrimary(cfg, new_node, instance, new_lv,
3865
                                        _GetInstanceInfoText(instance)):
3866
          raise errors.OpExecError("Failed to create new LV named '%s' on"
3867
                                   " node '%s'" %
3868
                                   (new_lv.logical_id[1], new_node))
3869

    
3870
      iv_names[dev.iv_name] = (dev, dev.children)
3871

    
3872
    self.proc.LogStep(4, steps_total, "changing drbd configuration")
3873
    for dev in instance.disks:
3874
      size = dev.size
3875
      info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
3876
      # create new devices on new_node
3877
      new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
3878
                              logical_id=(pri_node, new_node,
3879
                                          dev.logical_id[2]),
3880
                              children=dev.children)
3881
      if not _CreateBlockDevOnSecondary(cfg, new_node, instance,
3882
                                        new_drbd, False,
3883
                                      _GetInstanceInfoText(instance)):
3884
        raise errors.OpExecError("Failed to create new DRBD on"
3885
                                 " node '%s'" % new_node)
3886

    
3887
    for dev in instance.disks:
3888
      # we have new devices, shutdown the drbd on the old secondary
3889
      info("shutting down drbd for %s on old node" % dev.iv_name)
3890
      cfg.SetDiskID(dev, old_node)
3891
      if not rpc.call_blockdev_shutdown(old_node, dev):
3892
        warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
3893
                hint="Please cleanup this device manually as soon as possible")
3894

    
3895
    info("detaching primary drbds from the network (=> standalone)")
3896
    done = 0
3897
    for dev in instance.disks:
3898
      cfg.SetDiskID(dev, pri_node)
3899
      # set the physical (unique in bdev terms) id to None, meaning
3900
      # detach from network
3901
      dev.physical_id = (None,) * len(dev.physical_id)
3902
      # and 'find' the device, which will 'fix' it to match the
3903
      # standalone state
3904
      if rpc.call_blockdev_find(pri_node, dev):
3905
        done += 1
3906
      else:
3907
        warning("Failed to detach drbd %s from network, unusual case" %
3908
                dev.iv_name)
3909

    
3910
    if not done:
3911
      # no detaches succeeded (very unlikely)
3912
      raise errors.OpExecError("Can't detach at least one DRBD from old node")
3913

    
3914
    # if we managed to detach at least one, we update all the disks of
3915
    # the instance to point to the new secondary
3916
    info("updating instance configuration")
3917
    for dev in instance.disks:
3918
      dev.logical_id = (pri_node, new_node) + dev.logical_id[2:]
3919
      cfg.SetDiskID(dev, pri_node)
3920
    cfg.Update(instance)
3921

    
3922
    # and now perform the drbd attach
3923
    info("attaching primary drbds to new secondary (standalone => connected)")
3924
    failures = []
3925
    for dev in instance.disks:
3926
      info("attaching primary drbd for %s to new secondary node" % dev.iv_name)
3927
      # since the attach is smart, it's enough to 'find' the device,
3928
      # it will automatically activate the network, if the physical_id
3929
      # is correct
3930
      cfg.SetDiskID(dev, pri_node)
3931
      if not rpc.call_blockdev_find(pri_node, dev):
3932
        warning("can't attach drbd %s to new secondary!" % dev.iv_name,
3933
                "please do a gnt-instance info to see the status of disks")
3934

    
3935
    # this can fail as the old devices are degraded and _WaitForSync
3936
    # does a combined result over all disks, so we don't check its
3937
    # return value
3938
    self.proc.LogStep(5, steps_total, "sync devices")
3939
    _WaitForSync(cfg, instance, self.proc, unlock=True)
3940

    
3941
    # so check manually all the devices
3942
    for name, (dev, old_lvs) in iv_names.iteritems():
3943
      cfg.SetDiskID(dev, pri_node)
3944
      is_degr = rpc.call_blockdev_find(pri_node, dev)[5]
3945
      if is_degr:
3946
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
3947

    
3948
    self.proc.LogStep(6, steps_total, "removing old storage")
3949
    for name, (dev, old_lvs) in iv_names.iteritems():
3950
      info("remove logical volumes for %s" % name)
3951
      for lv in old_lvs:
3952
        cfg.SetDiskID(lv, old_node)
3953
        if not rpc.call_blockdev_remove(old_node, lv):
3954
          warning("Can't remove LV on old secondary",
3955
                  hint="Cleanup stale volumes by hand")
3956

    
3957
  def Exec(self, feedback_fn):
3958
    """Execute disk replacement.
3959

3960
    This dispatches the disk replacement to the appropriate handler.
3961

3962
    """
3963
    instance = self.instance
3964

    
3965
    # Activate the instance disks if we're replacing them on a down instance
3966
    if instance.status == "down":
3967
      op = opcodes.OpActivateInstanceDisks(instance_name=instance.name)
3968
      self.proc.ChainOpCode(op)
3969

    
3970
    if instance.disk_template == constants.DT_DRBD8:
3971
      if self.op.remote_node is None:
3972
        fn = self._ExecD8DiskOnly
3973
      else:
3974
        fn = self._ExecD8Secondary
3975
    else:
3976
      raise errors.ProgrammerError("Unhandled disk replacement case")
3977

    
3978
    ret = fn(feedback_fn)
3979

    
3980
    # Deactivate the instance disks if we're replacing them on a down instance
3981
    if instance.status == "down":
3982
      op = opcodes.OpDeactivateInstanceDisks(instance_name=instance.name)
3983
      self.proc.ChainOpCode(op)
3984

    
3985
    return ret
3986

    
3987

    
3988
class LUGrowDisk(LogicalUnit):
3989
  """Grow a disk of an instance.
3990

3991
  """
3992
  HPATH = "disk-grow"
3993
  HTYPE = constants.HTYPE_INSTANCE
3994
  _OP_REQP = ["instance_name", "disk", "amount"]
3995

    
3996
  def BuildHooksEnv(self):
3997
    """Build hooks env.
3998

3999
    This runs on the master, the primary and all the secondaries.
4000

4001
    """
4002
    env = {
4003
      "DISK": self.op.disk,
4004
      "AMOUNT": self.op.amount,
4005
      }
4006
    env.update(_BuildInstanceHookEnvByObject(self.instance))
4007
    nl = [
4008
      self.sstore.GetMasterNode(),
4009
      self.instance.primary_node,
4010
      ]
4011
    return env, nl, nl
4012

    
4013
  def CheckPrereq(self):
4014
    """Check prerequisites.
4015

4016
    This checks that the instance is in the cluster.
4017

4018
    """
4019
    instance = self.cfg.GetInstanceInfo(
4020
      self.cfg.ExpandInstanceName(self.op.instance_name))
4021
    if instance is None:
4022
      raise errors.OpPrereqError("Instance '%s' not known" %
4023
                                 self.op.instance_name)
4024
    self.instance = instance
4025
    self.op.instance_name = instance.name
4026

    
4027
    if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
4028
      raise errors.OpPrereqError("Instance's disk layout does not support"
4029
                                 " growing.")
4030

    
4031
    if instance.FindDisk(self.op.disk) is None:
4032
      raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
4033
                                 (self.op.disk, instance.name))
4034

    
4035
    nodenames = [instance.primary_node] + list(instance.secondary_nodes)
4036
    nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
4037
    for node in nodenames:
4038
      info = nodeinfo.get(node, None)
4039
      if not info:
4040
        raise errors.OpPrereqError("Cannot get current information"
4041
                                   " from node '%s'" % node)
4042
      vg_free = info.get('vg_free', None)
4043
      if not isinstance(vg_free, int):
4044
        raise errors.OpPrereqError("Can't compute free disk space on"
4045
                                   " node %s" % node)
4046
      if self.op.amount > info['vg_free']:
4047
        raise errors.OpPrereqError("Not enough disk space on target node %s:"
4048
                                   " %d MiB available, %d MiB required" %
4049
                                   (node, info['vg_free'], self.op.amount))
4050

    
4051
  def Exec(self, feedback_fn):
4052
    """Execute disk grow.
4053

4054
    """
4055
    instance = self.instance
4056
    disk = instance.FindDisk(self.op.disk)
4057
    for node in (instance.secondary_nodes + (instance.primary_node,)):
4058
      self.cfg.SetDiskID(disk, node)
4059
      result = rpc.call_blockdev_grow(node, disk, self.op.amount)
4060
      if not result or not isinstance(result, tuple) or len(result) != 2:
4061
        raise errors.OpExecError("grow request failed to node %s" % node)
4062
      elif not result[0]:
4063
        raise errors.OpExecError("grow request failed to node %s: %s" %
4064
                                 (node, result[1]))
4065
    disk.RecordGrow(self.op.amount)
4066
    self.cfg.Update(instance)
4067
    return
4068

    
4069

    
4070
class LUQueryInstanceData(NoHooksLU):
4071
  """Query runtime instance data.
4072

4073
  """
4074
  _OP_REQP = ["instances"]
4075

    
4076
  def CheckPrereq(self):
4077
    """Check prerequisites.
4078

4079
    This only checks the optional instance list against the existing names.
4080

4081
    """
4082
    if not isinstance(self.op.instances, list):
4083
      raise errors.OpPrereqError("Invalid argument type 'instances'")
4084
    if self.op.instances:
4085
      self.wanted_instances = []
4086
      names = self.op.instances
4087
      for name in names:
4088
        instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
4089
        if instance is None:
4090
          raise errors.OpPrereqError("No such instance name '%s'" % name)
4091
        self.wanted_instances.append(instance)
4092
    else:
4093
      self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
4094
                               in self.cfg.GetInstanceList()]
4095
    return
4096

    
4097

    
4098
  def _ComputeDiskStatus(self, instance, snode, dev):
4099
    """Compute block device status.
4100

4101
    """
4102
    self.cfg.SetDiskID(dev, instance.primary_node)
4103
    dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
4104
    if dev.dev_type in constants.LDS_DRBD:
4105
      # we change the snode then (otherwise we use the one passed in)
4106
      if dev.logical_id[0] == instance.primary_node:
4107
        snode = dev.logical_id[1]
4108
      else:
4109
        snode = dev.logical_id[0]
4110

    
4111
    if snode:
4112
      self.cfg.SetDiskID(dev, snode)
4113
      dev_sstatus = rpc.call_blockdev_find(snode, dev)
4114
    else:
4115
      dev_sstatus = None
4116

    
4117
    if dev.children:
4118
      dev_children = [self._ComputeDiskStatus(instance, snode, child)
4119
                      for child in dev.children]
4120
    else:
4121
      dev_children = []
4122

    
4123
    data = {
4124
      "iv_name": dev.iv_name,
4125
      "dev_type": dev.dev_type,
4126
      "logical_id": dev.logical_id,
4127
      "physical_id": dev.physical_id,
4128
      "pstatus": dev_pstatus,
4129
      "sstatus": dev_sstatus,
4130
      "children": dev_children,
4131
      }
4132

    
4133
    return data
4134

    
4135
  def Exec(self, feedback_fn):
4136
    """Gather and return data"""
4137
    result = {}
4138
    for instance in self.wanted_instances:
4139
      remote_info = rpc.call_instance_info(instance.primary_node,
4140
                                                instance.name)
4141
      if remote_info and "state" in remote_info:
4142
        remote_state = "up"
4143
      else:
4144
        remote_state = "down"
4145
      if instance.status == "down":
4146
        config_state = "down"
4147
      else:
4148
        config_state = "up"
4149

    
4150
      disks = [self._ComputeDiskStatus(instance, None, device)
4151
               for device in instance.disks]
4152

    
4153
      idict = {
4154
        "name": instance.name,
4155
        "config_state": config_state,
4156
        "run_state": remote_state,
4157
        "pnode": instance.primary_node,
4158
        "snodes": instance.secondary_nodes,
4159
        "os": instance.os,
4160
        "memory": instance.memory,
4161
        "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
4162
        "disks": disks,
4163
        "vcpus": instance.vcpus,
4164
        }
4165

    
4166
      htkind = self.sstore.GetHypervisorType()
4167
      if htkind == constants.HT_XEN_PVM30:
4168
        idict["kernel_path"] = instance.kernel_path
4169
        idict["initrd_path"] = instance.initrd_path
4170

    
4171
      if htkind == constants.HT_XEN_HVM31:
4172
        idict["hvm_boot_order"] = instance.hvm_boot_order
4173
        idict["hvm_acpi"] = instance.hvm_acpi
4174
        idict["hvm_pae"] = instance.hvm_pae
4175
        idict["hvm_cdrom_image_path"] = instance.hvm_cdrom_image_path
4176
        idict["hvm_nic_type"] = instance.hvm_nic_type
4177
        idict["hvm_disk_type"] = instance.hvm_disk_type
4178

    
4179
      if htkind in constants.HTS_REQ_PORT:
4180
        if instance.vnc_bind_address is None:
4181
          vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
4182
        else:
4183
          vnc_bind_address = instance.vnc_bind_address
4184
        if instance.network_port is None:
4185
          vnc_console_port = None
4186
        elif vnc_bind_address == constants.BIND_ADDRESS_GLOBAL:
4187
          vnc_console_port = "%s:%s" % (instance.primary_node,
4188
                                       instance.network_port)
4189
        elif vnc_bind_address == constants.LOCALHOST_IP_ADDRESS:
4190
          vnc_console_port = "%s:%s on node %s" % (vnc_bind_address,
4191
                                                   instance.network_port,
4192
                                                   instance.primary_node)
4193
        else:
4194
          vnc_console_port = "%s:%s" % (instance.vnc_bind_address,
4195
                                        instance.network_port)
4196
        idict["vnc_console_port"] = vnc_console_port
4197
        idict["vnc_bind_address"] = vnc_bind_address
4198
        idict["network_port"] = instance.network_port
4199

    
4200
      result[instance.name] = idict
4201

    
4202
    return result
4203

    
4204

    
4205
class LUSetInstanceParams(LogicalUnit):
4206
  """Modifies an instances's parameters.
4207

4208
  """
4209
  HPATH = "instance-modify"
4210
  HTYPE = constants.HTYPE_INSTANCE
4211
  _OP_REQP = ["instance_name"]
4212
  REQ_BGL = False
4213

    
4214
  def ExpandNames(self):
4215
    self._ExpandAndLockInstance()
4216

    
4217
  def BuildHooksEnv(self):
4218
    """Build hooks env.
4219

4220
    This runs on the master, primary and secondaries.
4221

4222
    """
4223
    args = dict()
4224
    if self.mem:
4225
      args['memory'] = self.mem
4226
    if self.vcpus:
4227
      args['vcpus'] = self.vcpus
4228
    if self.do_ip or self.do_bridge or self.mac:
4229
      if self.do_ip:
4230
        ip = self.ip
4231
      else:
4232
        ip = self.instance.nics[0].ip
4233
      if self.bridge:
4234
        bridge = self.bridge
4235
      else:
4236
        bridge = self.instance.nics[0].bridge
4237
      if self.mac:
4238
        mac = self.mac
4239
      else:
4240
        mac = self.instance.nics[0].mac
4241
      args['nics'] = [(ip, bridge, mac)]
4242
    env = _BuildInstanceHookEnvByObject(self.instance, override=args)
4243
    nl = [self.sstore.GetMasterNode(),
4244
          self.instance.primary_node] + list(self.instance.secondary_nodes)
4245
    return env, nl, nl
4246

    
4247
  def CheckPrereq(self):
4248
    """Check prerequisites.
4249

4250
    This only checks the instance list against the existing names.
4251

4252
    """
4253
    # FIXME: all the parameters could be checked before, in ExpandNames, or in
4254
    # a separate CheckArguments function, if we implement one, so the operation
4255
    # can be aborted without waiting for any lock, should it have an error...
4256
    self.mem = getattr(self.op, "mem", None)
4257
    self.vcpus = getattr(self.op, "vcpus", None)
4258
    self.ip = getattr(self.op, "ip", None)
4259
    self.mac = getattr(self.op, "mac", None)
4260
    self.bridge = getattr(self.op, "bridge", None)
4261
    self.kernel_path = getattr(self.op, "kernel_path", None)
4262
    self.initrd_path = getattr(self.op, "initrd_path", None)
4263
    self.hvm_boot_order = getattr(self.op, "hvm_boot_order", None)
4264
    self.hvm_acpi = getattr(self.op, "hvm_acpi", None)
4265
    self.hvm_pae = getattr(self.op, "hvm_pae", None)
4266
    self.hvm_nic_type = getattr(self.op, "hvm_nic_type", None)
4267
    self.hvm_disk_type = getattr(self.op, "hvm_disk_type", None)
4268
    self.hvm_cdrom_image_path = getattr(self.op, "hvm_cdrom_image_path", None)
4269
    self.vnc_bind_address = getattr(self.op, "vnc_bind_address", None)
4270
    self.force = getattr(self.op, "force", None)
4271
    all_parms = [self.mem, self.vcpus, self.ip, self.bridge, self.mac,
4272
                 self.kernel_path, self.initrd_path, self.hvm_boot_order,
4273
                 self.hvm_acpi, self.hvm_pae, self.hvm_cdrom_image_path,
4274
                 self.vnc_bind_address, self.hvm_nic_type, self.hvm_disk_type]
4275
    if all_parms.count(None) == len(all_parms):
4276
      raise errors.OpPrereqError("No changes submitted")
4277
    if self.mem is not None:
4278
      try:
4279
        self.mem = int(self.mem)
4280
      except ValueError, err:
4281
        raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
4282
    if self.vcpus is not None:
4283
      try:
4284
        self.vcpus = int(self.vcpus)
4285
      except ValueError, err:
4286
        raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
4287
    if self.ip is not None:
4288
      self.do_ip = True
4289
      if self.ip.lower() == "none":
4290
        self.ip = None
4291
      else:
4292
        if not utils.IsValidIP(self.ip):
4293
          raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
4294
    else:
4295
      self.do_ip = False
4296
    self.do_bridge = (self.bridge is not None)
4297
    if self.mac is not None:
4298
      if self.cfg.IsMacInUse(self.mac):
4299
        raise errors.OpPrereqError('MAC address %s already in use in cluster' %
4300
                                   self.mac)
4301
      if not utils.IsValidMac(self.mac):
4302
        raise errors.OpPrereqError('Invalid MAC address %s' % self.mac)
4303

    
4304
    if self.kernel_path is not None:
4305
      self.do_kernel_path = True
4306
      if self.kernel_path == constants.VALUE_NONE:
4307
        raise errors.OpPrereqError("Can't set instance to no kernel")
4308

    
4309
      if self.kernel_path != constants.VALUE_DEFAULT:
4310
        if not os.path.isabs(self.kernel_path):
4311
          raise errors.OpPrereqError("The kernel path must be an absolute"
4312
                                    " filename")
4313
    else:
4314
      self.do_kernel_path = False
4315

    
4316
    if self.initrd_path is not None:
4317
      self.do_initrd_path = True
4318
      if self.initrd_path not in (constants.VALUE_NONE,
4319
                                  constants.VALUE_DEFAULT):
4320
        if not os.path.isabs(self.initrd_path):
4321
          raise errors.OpPrereqError("The initrd path must be an absolute"
4322
                                    " filename")
4323
    else:
4324
      self.do_initrd_path = False
4325

    
4326
    # boot order verification
4327
    if self.hvm_boot_order is not None:
4328
      if self.hvm_boot_order != constants.VALUE_DEFAULT:
4329
        if len(self.hvm_boot_order.strip("acdn")) != 0:
4330
          raise errors.OpPrereqError("invalid boot order specified,"
4331
                                     " must be one or more of [acdn]"
4332
                                     " or 'default'")
4333

    
4334
    # hvm_cdrom_image_path verification
4335
    if self.op.hvm_cdrom_image_path is not None:
4336
      if not (os.path.isabs(self.op.hvm_cdrom_image_path) or
4337
              self.op.hvm_cdrom_image_path.lower() == "none"):
4338
        raise errors.OpPrereqError("The path to the HVM CDROM image must"
4339
                                   " be an absolute path or None, not %s" %
4340
                                   self.op.hvm_cdrom_image_path)
4341
      if not (os.path.isfile(self.op.hvm_cdrom_image_path) or
4342
              self.op.hvm_cdrom_image_path.lower() == "none"):
4343
        raise errors.OpPrereqError("The HVM CDROM image must either be a"
4344
                                   " regular file or a symlink pointing to"
4345
                                   " an existing regular file, not %s" %
4346
                                   self.op.hvm_cdrom_image_path)
4347

    
4348
    # vnc_bind_address verification
4349
    if self.op.vnc_bind_address is not None:
4350
      if not utils.IsValidIP(self.op.vnc_bind_address):
4351
        raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
4352
                                   " like a valid IP address" %
4353
                                   self.op.vnc_bind_address)
4354

    
4355
    instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4356
    assert self.instance is not None, \
4357
      "Cannot retrieve locked instance %s" % self.op.instance_name
4358
    self.warn = []
4359
    if self.mem is not None and not self.force:
4360
      pnode = self.instance.primary_node
4361
      nodelist = [pnode]
4362
      nodelist.extend(instance.secondary_nodes)
4363
      instance_info = rpc.call_instance_info(pnode, instance.name)
4364
      nodeinfo = rpc.call_node_info(nodelist, self.cfg.GetVGName())
4365

    
4366
      if pnode not in nodeinfo or not isinstance(nodeinfo[pnode], dict):
4367
        # Assume the primary node is unreachable and go ahead
4368
        self.warn.append("Can't get info from primary node %s" % pnode)
4369
      else:
4370
        if instance_info:
4371
          current_mem = instance_info['memory']
4372
        else:
4373
          # Assume instance not running
4374
          # (there is a slight race condition here, but it's not very probable,
4375
          # and we have no other way to check)
4376
          current_mem = 0
4377
        miss_mem = self.mem - current_mem - nodeinfo[pnode]['memory_free']
4378
        if miss_mem > 0:
4379
          raise errors.OpPrereqError("This change will prevent the instance"
4380
                                     " from starting, due to %d MB of memory"
4381
                                     " missing on its primary node" % miss_mem)
4382

    
4383
      for node in instance.secondary_nodes:
4384
        if node not in nodeinfo or not isinstance(nodeinfo[node], dict):
4385
          self.warn.append("Can't get info from secondary node %s" % node)
4386
        elif self.mem > nodeinfo[node]['memory_free']:
4387
          self.warn.append("Not enough memory to failover instance to secondary"
4388
                           " node %s" % node)
4389

    
4390
    # Xen HVM device type checks
4391
    if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
4392
      if self.op.hvm_nic_type is not None:
4393
        if self.op.hvm_nic_type not in constants.HT_HVM_VALID_NIC_TYPES:
4394
          raise errors.OpPrereqError("Invalid NIC type %s specified for Xen"
4395
                                     " HVM  hypervisor" % self.op.hvm_nic_type)
4396
      if self.op.hvm_disk_type is not None:
4397
        if self.op.hvm_disk_type not in constants.HT_HVM_VALID_DISK_TYPES:
4398
          raise errors.OpPrereqError("Invalid disk type %s specified for Xen"
4399
                                     " HVM hypervisor" % self.op.hvm_disk_type)
4400

    
4401
    return
4402

    
4403
  def Exec(self, feedback_fn):
4404
    """Modifies an instance.
4405

4406
    All parameters take effect only at the next restart of the instance.
4407
    """
4408
    # Process here the warnings from CheckPrereq, as we don't have a
4409
    # feedback_fn there.
4410
    for warn in self.warn:
4411
      feedback_fn("WARNING: %s" % warn)
4412

    
4413
    result = []
4414
    instance = self.instance
4415
    if self.mem:
4416
      instance.memory = self.mem
4417
      result.append(("mem", self.mem))
4418
    if self.vcpus:
4419
      instance.vcpus = self.vcpus
4420
      result.append(("vcpus",  self.vcpus))
4421
    if self.do_ip:
4422
      instance.nics[0].ip = self.ip
4423
      result.append(("ip", self.ip))
4424
    if self.bridge:
4425
      instance.nics[0].bridge = self.bridge
4426
      result.append(("bridge", self.bridge))
4427
    if self.mac:
4428
      instance.nics[0].mac = self.mac
4429
      result.append(("mac", self.mac))
4430
    if self.do_kernel_path:
4431
      instance.kernel_path = self.kernel_path
4432
      result.append(("kernel_path", self.kernel_path))
4433
    if self.do_initrd_path:
4434
      instance.initrd_path = self.initrd_path
4435
      result.append(("initrd_path", self.initrd_path))
4436
    if self.hvm_boot_order:
4437
      if self.hvm_boot_order == constants.VALUE_DEFAULT:
4438
        instance.hvm_boot_order = None
4439
      else:
4440
        instance.hvm_boot_order = self.hvm_boot_order
4441
      result.append(("hvm_boot_order", self.hvm_boot_order))
4442
    if self.hvm_acpi is not None:
4443
      instance.hvm_acpi = self.hvm_acpi
4444
      result.append(("hvm_acpi", self.hvm_acpi))
4445
    if self.hvm_pae is not None:
4446
      instance.hvm_pae = self.hvm_pae
4447
      result.append(("hvm_pae", self.hvm_pae))
4448
    if self.hvm_nic_type is not None:
4449
      instance.hvm_nic_type = self.hvm_nic_type
4450
      result.append(("hvm_nic_type", self.hvm_nic_type))
4451
    if self.hvm_disk_type is not None:
4452
      instance.hvm_disk_type = self.hvm_disk_type
4453
      result.append(("hvm_disk_type", self.hvm_disk_type))
4454
    if self.hvm_cdrom_image_path:
4455
      if self.hvm_cdrom_image_path == constants.VALUE_NONE:
4456
        instance.hvm_cdrom_image_path = None
4457
      else:
4458
        instance.hvm_cdrom_image_path = self.hvm_cdrom_image_path
4459
      result.append(("hvm_cdrom_image_path", self.hvm_cdrom_image_path))
4460
    if self.vnc_bind_address:
4461
      instance.vnc_bind_address = self.vnc_bind_address
4462
      result.append(("vnc_bind_address", self.vnc_bind_address))
4463

    
4464
    self.cfg.Update(instance)
4465

    
4466
    return result
4467

    
4468

    
4469
class LUQueryExports(NoHooksLU):
4470
  """Query the exports list
4471

4472
  """
4473
  _OP_REQP = ['nodes']
4474
  REQ_BGL = False
4475

    
4476
  def ExpandNames(self):
4477
    self.needed_locks = {}
4478
    self.share_locks[locking.LEVEL_NODE] = 1
4479
    if not self.op.nodes:
4480
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4481
    else:
4482
      self.needed_locks[locking.LEVEL_NODE] = \
4483
        _GetWantedNodes(self, self.op.nodes)
4484

    
4485
  def CheckPrereq(self):
4486
    """Check prerequisites.
4487

4488
    """
4489
    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
4490

    
4491
  def Exec(self, feedback_fn):
4492
    """Compute the list of all the exported system images.
4493

4494
    Returns:
4495
      a dictionary with the structure node->(export-list)
4496
      where export-list is a list of the instances exported on
4497
      that node.
4498

4499
    """
4500
    return rpc.call_export_list(self.nodes)
4501

    
4502

    
4503
class LUExportInstance(LogicalUnit):
4504
  """Export an instance to an image in the cluster.
4505

4506
  """
4507
  HPATH = "instance-export"
4508
  HTYPE = constants.HTYPE_INSTANCE
4509
  _OP_REQP = ["instance_name", "target_node", "shutdown"]
4510

    
4511
  def BuildHooksEnv(self):
4512
    """Build hooks env.
4513

4514
    This will run on the master, primary node and target node.
4515

4516
    """
4517
    env = {
4518
      "EXPORT_NODE": self.op.target_node,
4519
      "EXPORT_DO_SHUTDOWN": self.op.shutdown,
4520
      }
4521
    env.update(_BuildInstanceHookEnvByObject(self.instance))
4522
    nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
4523
          self.op.target_node]
4524
    return env, nl, nl
4525

    
4526
  def CheckPrereq(self):
4527
    """Check prerequisites.
4528

4529
    This checks that the instance and node names are valid.
4530

4531
    """
4532
    instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4533
    self.instance = self.cfg.GetInstanceInfo(instance_name)
4534
    if self.instance is None:
4535
      raise errors.OpPrereqError("Instance '%s' not found" %
4536
                                 self.op.instance_name)
4537

    
4538
    # node verification
4539
    dst_node_short = self.cfg.ExpandNodeName(self.op.target_node)
4540
    self.dst_node = self.cfg.GetNodeInfo(dst_node_short)
4541

    
4542
    if self.dst_node is None:
4543
      raise errors.OpPrereqError("Destination node '%s' is unknown." %
4544
                                 self.op.target_node)
4545
    self.op.target_node = self.dst_node.name
4546

    
4547
    # instance disk type verification
4548
    for disk in self.instance.disks:
4549
      if disk.dev_type == constants.LD_FILE:
4550
        raise errors.OpPrereqError("Export not supported for instances with"
4551
                                   " file-based disks")
4552

    
4553
  def Exec(self, feedback_fn):
4554
    """Export an instance to an image in the cluster.
4555

4556
    """
4557
    instance = self.instance
4558
    dst_node = self.dst_node
4559
    src_node = instance.primary_node
4560
    if self.op.shutdown:
4561
      # shutdown the instance, but not the disks
4562
      if not rpc.call_instance_shutdown(src_node, instance):
4563
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
4564
                                 (instance.name, src_node))
4565

    
4566
    vgname = self.cfg.GetVGName()
4567

    
4568
    snap_disks = []
4569

    
4570
    try:
4571
      for disk in instance.disks:
4572
        if disk.iv_name == "sda":
4573
          # new_dev_name will be a snapshot of an lvm leaf of the one we passed
4574
          new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
4575

    
4576
          if not new_dev_name:
4577
            logger.Error("could not snapshot block device %s on node %s" %
4578
                         (disk.logical_id[1], src_node))
4579
          else:
4580
            new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
4581
                                      logical_id=(vgname, new_dev_name),
4582
                                      physical_id=(vgname, new_dev_name),
4583
                                      iv_name=disk.iv_name)
4584
            snap_disks.append(new_dev)
4585

    
4586
    finally:
4587
      if self.op.shutdown and instance.status == "up":
4588
        if not rpc.call_instance_start(src_node, instance, None):
4589
          _ShutdownInstanceDisks(instance, self.cfg)
4590
          raise errors.OpExecError("Could not start instance")
4591

    
4592
    # TODO: check for size
4593

    
4594
    for dev in snap_disks:
4595
      if not rpc.call_snapshot_export(src_node, dev, dst_node.name, instance):
4596
        logger.Error("could not export block device %s from node %s to node %s"
4597
                     % (dev.logical_id[1], src_node, dst_node.name))
4598
      if not rpc.call_blockdev_remove(src_node, dev):
4599
        logger.Error("could not remove snapshot block device %s from node %s" %
4600
                     (dev.logical_id[1], src_node))
4601

    
4602
    if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
4603
      logger.Error("could not finalize export for instance %s on node %s" %
4604
                   (instance.name, dst_node.name))
4605

    
4606
    nodelist = self.cfg.GetNodeList()
4607
    nodelist.remove(dst_node.name)
4608

    
4609
    # on one-node clusters nodelist will be empty after the removal
4610
    # if we proceed the backup would be removed because OpQueryExports
4611
    # substitutes an empty list with the full cluster node list.
4612
    if nodelist:
4613
      exportlist = rpc.call_export_list(nodelist)
4614
      for node in exportlist:
4615
        if instance.name in exportlist[node]:
4616
          if not rpc.call_export_remove(node, instance.name):
4617
            logger.Error("could not remove older export for instance %s"
4618
                         " on node %s" % (instance.name, node))
4619

    
4620

    
4621
class LURemoveExport(NoHooksLU):
4622
  """Remove exports related to the named instance.
4623

4624
  """
4625
  _OP_REQP = ["instance_name"]
4626

    
4627
  def CheckPrereq(self):
4628
    """Check prerequisites.
4629
    """
4630
    pass
4631

    
4632
  def Exec(self, feedback_fn):
4633
    """Remove any export.
4634

4635
    """
4636
    instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4637
    # If the instance was not found we'll try with the name that was passed in.
4638
    # This will only work if it was an FQDN, though.
4639
    fqdn_warn = False
4640
    if not instance_name:
4641
      fqdn_warn = True
4642
      instance_name = self.op.instance_name
4643

    
4644
    exportlist = rpc.call_export_list(self.cfg.GetNodeList())
4645
    found = False
4646
    for node in exportlist:
4647
      if instance_name in exportlist[node]:
4648
        found = True
4649
        if not rpc.call_export_remove(node, instance_name):
4650
          logger.Error("could not remove export for instance %s"
4651
                       " on node %s" % (instance_name, node))
4652

    
4653
    if fqdn_warn and not found:
4654
      feedback_fn("Export not found. If trying to remove an export belonging"
4655
                  " to a deleted instance please use its Fully Qualified"
4656
                  " Domain Name.")
4657

    
4658

    
4659
class TagsLU(NoHooksLU):
4660
  """Generic tags LU.
4661

4662
  This is an abstract class which is the parent of all the other tags LUs.
4663

4664
  """
4665
  def CheckPrereq(self):
4666
    """Check prerequisites.
4667

4668
    """
4669
    if self.op.kind == constants.TAG_CLUSTER:
4670
      self.target = self.cfg.GetClusterInfo()
4671
    elif self.op.kind == constants.TAG_NODE:
4672
      name = self.cfg.ExpandNodeName(self.op.name)
4673
      if name is None:
4674
        raise errors.OpPrereqError("Invalid node name (%s)" %
4675
                                   (self.op.name,))
4676
      self.op.name = name
4677
      self.target = self.cfg.GetNodeInfo(name)
4678
    elif self.op.kind == constants.TAG_INSTANCE:
4679
      name = self.cfg.ExpandInstanceName(self.op.name)
4680
      if name is None:
4681
        raise errors.OpPrereqError("Invalid instance name (%s)" %
4682
                                   (self.op.name,))
4683
      self.op.name = name
4684
      self.target = self.cfg.GetInstanceInfo(name)
4685
    else:
4686
      raise errors.OpPrereqError("Wrong tag type requested (%s)" %
4687
                                 str(self.op.kind))
4688

    
4689

    
4690
class LUGetTags(TagsLU):
4691
  """Returns the tags of a given object.
4692

4693
  """
4694
  _OP_REQP = ["kind", "name"]
4695

    
4696
  def Exec(self, feedback_fn):
4697
    """Returns the tag list.
4698

4699
    """
4700
    return list(self.target.GetTags())
4701

    
4702

    
4703
class LUSearchTags(NoHooksLU):
4704
  """Searches the tags for a given pattern.
4705

4706
  """
4707
  _OP_REQP = ["pattern"]
4708

    
4709
  def CheckPrereq(self):
4710
    """Check prerequisites.
4711

4712
    This checks the pattern passed for validity by compiling it.
4713

4714
    """
4715
    try:
4716
      self.re = re.compile(self.op.pattern)
4717
    except re.error, err:
4718
      raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
4719
                                 (self.op.pattern, err))
4720

    
4721
  def Exec(self, feedback_fn):
4722
    """Returns the tag list.
4723

4724
    """
4725
    cfg = self.cfg
4726
    tgts = [("/cluster", cfg.GetClusterInfo())]
4727
    ilist = [cfg.GetInstanceInfo(name) for name in cfg.GetInstanceList()]
4728
    tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
4729
    nlist = [cfg.GetNodeInfo(name) for name in cfg.GetNodeList()]
4730
    tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
4731
    results = []
4732
    for path, target in tgts:
4733
      for tag in target.GetTags():
4734
        if self.re.search(tag):
4735
          results.append((path, tag))
4736
    return results
4737

    
4738

    
4739
class LUAddTags(TagsLU):
4740
  """Sets a tag on a given object.
4741

4742
  """
4743
  _OP_REQP = ["kind", "name", "tags"]
4744

    
4745
  def CheckPrereq(self):
4746
    """Check prerequisites.
4747

4748
    This checks the type and length of the tag name and value.
4749

4750
    """
4751
    TagsLU.CheckPrereq(self)
4752
    for tag in self.op.tags:
4753
      objects.TaggableObject.ValidateTag(tag)
4754

    
4755
  def Exec(self, feedback_fn):
4756
    """Sets the tag.
4757

4758
    """
4759
    try:
4760
      for tag in self.op.tags:
4761
        self.target.AddTag(tag)
4762
    except errors.TagError, err:
4763
      raise errors.OpExecError("Error while setting tag: %s" % str(err))
4764
    try:
4765
      self.cfg.Update(self.target)
4766
    except errors.ConfigurationError:
4767
      raise errors.OpRetryError("There has been a modification to the"
4768
                                " config file and the operation has been"
4769
                                " aborted. Please retry.")
4770

    
4771

    
4772
class LUDelTags(TagsLU):
4773
  """Delete a list of tags from a given object.
4774

4775
  """
4776
  _OP_REQP = ["kind", "name", "tags"]
4777

    
4778
  def CheckPrereq(self):
4779
    """Check prerequisites.
4780

4781
    This checks that we have the given tag.
4782

4783
    """
4784
    TagsLU.CheckPrereq(self)
4785
    for tag in self.op.tags:
4786
      objects.TaggableObject.ValidateTag(tag)
4787
    del_tags = frozenset(self.op.tags)
4788
    cur_tags = self.target.GetTags()
4789
    if not del_tags <= cur_tags:
4790
      diff_tags = del_tags - cur_tags
4791
      diff_names = ["'%s'" % tag for tag in diff_tags]
4792
      diff_names.sort()
4793
      raise errors.OpPrereqError("Tag(s) %s not found" %
4794
                                 (",".join(diff_names)))
4795

    
4796
  def Exec(self, feedback_fn):
4797
    """Remove the tag from the object.
4798

4799
    """
4800
    for tag in self.op.tags:
4801
      self.target.RemoveTag(tag)
4802
    try:
4803
      self.cfg.Update(self.target)
4804
    except errors.ConfigurationError:
4805
      raise errors.OpRetryError("There has been a modification to the"
4806
                                " config file and the operation has been"
4807
                                " aborted. Please retry.")
4808

    
4809

    
4810
class LUTestDelay(NoHooksLU):
4811
  """Sleep for a specified amount of time.
4812

4813
  This LU sleeps on the master and/or nodes for a specified amount of
4814
  time.
4815

4816
  """
4817
  _OP_REQP = ["duration", "on_master", "on_nodes"]
4818
  REQ_BGL = False
4819

    
4820
  def ExpandNames(self):
4821
    """Expand names and set required locks.
4822

4823
    This expands the node list, if any.
4824

4825
    """
4826
    self.needed_locks = {}
4827
    if self.op.on_nodes:
4828
      # _GetWantedNodes can be used here, but is not always appropriate to use
4829
      # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
4830
      # more information.
4831
      self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
4832
      self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
4833

    
4834
  def CheckPrereq(self):
4835
    """Check prerequisites.
4836

4837
    """
4838

    
4839
  def Exec(self, feedback_fn):
4840
    """Do the actual sleep.
4841

4842
    """
4843
    if self.op.on_master:
4844
      if not utils.TestDelay(self.op.duration):
4845
        raise errors.OpExecError("Error during master delay test")
4846
    if self.op.on_nodes:
4847
      result = rpc.call_test_delay(self.op.on_nodes, self.op.duration)
4848
      if not result:
4849
        raise errors.OpExecError("Complete failure from rpc call")
4850
      for node, node_result in result.items():
4851
        if not node_result:
4852
          raise errors.OpExecError("Failure during rpc call to node %s,"
4853
                                   " result: %s" % (node, node_result))
4854

    
4855

    
4856
class IAllocator(object):
4857
  """IAllocator framework.
4858

4859
  An IAllocator instance has three sets of attributes:
4860
    - cfg/sstore that are needed to query the cluster
4861
    - input data (all members of the _KEYS class attribute are required)
4862
    - four buffer attributes (in|out_data|text), that represent the
4863
      input (to the external script) in text and data structure format,
4864
      and the output from it, again in two formats
4865
    - the result variables from the script (success, info, nodes) for
4866
      easy usage
4867

4868
  """
4869
  _ALLO_KEYS = [
4870
    "mem_size", "disks", "disk_template",
4871
    "os", "tags", "nics", "vcpus",
4872
    ]
4873
  _RELO_KEYS = [
4874
    "relocate_from",
4875
    ]
4876

    
4877
  def __init__(self, cfg, sstore, mode, name, **kwargs):
4878
    self.cfg = cfg
4879
    self.sstore = sstore
4880
    # init buffer variables
4881
    self.in_text = self.out_text = self.in_data = self.out_data = None
4882
    # init all input fields so that pylint is happy
4883
    self.mode = mode
4884
    self.name = name
4885
    self.mem_size = self.disks = self.disk_template = None
4886
    self.os = self.tags = self.nics = self.vcpus = None
4887
    self.relocate_from = None
4888
    # computed fields
4889
    self.required_nodes = None
4890
    # init result fields
4891
    self.success = self.info = self.nodes = None
4892
    if self.mode == constants.IALLOCATOR_MODE_ALLOC:
4893
      keyset = self._ALLO_KEYS
4894
    elif self.mode == constants.IALLOCATOR_MODE_RELOC:
4895
      keyset = self._RELO_KEYS
4896
    else:
4897
      raise errors.ProgrammerError("Unknown mode '%s' passed to the"
4898
                                   " IAllocator" % self.mode)
4899
    for key in kwargs:
4900
      if key not in keyset:
4901
        raise errors.ProgrammerError("Invalid input parameter '%s' to"
4902
                                     " IAllocator" % key)
4903
      setattr(self, key, kwargs[key])
4904
    for key in keyset:
4905
      if key not in kwargs:
4906
        raise errors.ProgrammerError("Missing input parameter '%s' to"
4907
                                     " IAllocator" % key)
4908
    self._BuildInputData()
4909

    
4910
  def _ComputeClusterData(self):
4911
    """Compute the generic allocator input data.
4912

4913
    This is the data that is independent of the actual operation.
4914

4915
    """
4916
    cfg = self.cfg
4917
    # cluster data
4918
    data = {
4919
      "version": 1,
4920
      "cluster_name": self.sstore.GetClusterName(),
4921
      "cluster_tags": list(cfg.GetClusterInfo().GetTags()),
4922
      "hypervisor_type": self.sstore.GetHypervisorType(),
4923
      # we don't have job IDs
4924
      }
4925

    
4926
    i_list = [cfg.GetInstanceInfo(iname) for iname in cfg.GetInstanceList()]
4927

    
4928
    # node data
4929
    node_results = {}
4930
    node_list = cfg.GetNodeList()
4931
    node_data = rpc.call_node_info(node_list, cfg.GetVGName())
4932
    for nname in node_list:
4933
      ninfo = cfg.GetNodeInfo(nname)
4934
      if nname not in node_data or not isinstance(node_data[nname], dict):
4935
        raise errors.OpExecError("Can't get data for node %s" % nname)
4936
      remote_info = node_data[nname]
4937
      for attr in ['memory_total', 'memory_free', 'memory_dom0',
4938
                   'vg_size', 'vg_free', 'cpu_total']:
4939
        if attr not in remote_info:
4940
          raise errors.OpExecError("Node '%s' didn't return attribute '%s'" %
4941
                                   (nname, attr))
4942
        try:
4943
          remote_info[attr] = int(remote_info[attr])
4944
        except ValueError, err:
4945
          raise errors.OpExecError("Node '%s' returned invalid value for '%s':"
4946
                                   " %s" % (nname, attr, str(err)))
4947
      # compute memory used by primary instances
4948
      i_p_mem = i_p_up_mem = 0
4949
      for iinfo in i_list:
4950
        if iinfo.primary_node == nname:
4951
          i_p_mem += iinfo.memory
4952
          if iinfo.status == "up":
4953
            i_p_up_mem += iinfo.memory
4954

    
4955
      # compute memory used by instances
4956
      pnr = {
4957
        "tags": list(ninfo.GetTags()),
4958
        "total_memory": remote_info['memory_total'],
4959
        "reserved_memory": remote_info['memory_dom0'],
4960
        "free_memory": remote_info['memory_free'],
4961
        "i_pri_memory": i_p_mem,
4962
        "i_pri_up_memory": i_p_up_mem,
4963
        "total_disk": remote_info['vg_size'],
4964
        "free_disk": remote_info['vg_free'],
4965
        "primary_ip": ninfo.primary_ip,
4966
        "secondary_ip": ninfo.secondary_ip,
4967
        "total_cpus": remote_info['cpu_total'],
4968
        }
4969
      node_results[nname] = pnr
4970
    data["nodes"] = node_results
4971

    
4972
    # instance data
4973
    instance_data = {}
4974
    for iinfo in i_list:
4975
      nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
4976
                  for n in iinfo.nics]
4977
      pir = {
4978
        "tags": list(iinfo.GetTags()),
4979
        "should_run": iinfo.status == "up",
4980
        "vcpus": iinfo.vcpus,
4981
        "memory": iinfo.memory,
4982
        "os": iinfo.os,
4983
        "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
4984
        "nics": nic_data,
4985
        "disks": [{"size": dsk.size, "mode": "w"} for dsk in iinfo.disks],
4986
        "disk_template": iinfo.disk_template,
4987
        }
4988
      instance_data[iinfo.name] = pir
4989

    
4990
    data["instances"] = instance_data
4991

    
4992
    self.in_data = data
4993

    
4994
  def _AddNewInstance(self):
4995
    """Add new instance data to allocator structure.
4996

4997
    This in combination with _AllocatorGetClusterData will create the
4998
    correct structure needed as input for the allocator.
4999

5000
    The checks for the completeness of the opcode must have already been
5001
    done.
5002

5003
    """
5004
    data = self.in_data
5005
    if len(self.disks) != 2:
5006
      raise errors.OpExecError("Only two-disk configurations supported")
5007

    
5008
    disk_space = _ComputeDiskSize(self.disk_template,
5009
                                  self.disks[0]["size"], self.disks[1]["size"])
5010

    
5011
    if self.disk_template in constants.DTS_NET_MIRROR:
5012
      self.required_nodes = 2
5013
    else:
5014
      self.required_nodes = 1
5015
    request = {
5016
      "type": "allocate",
5017
      "name": self.name,
5018
      "disk_template": self.disk_template,
5019
      "tags": self.tags,
5020
      "os": self.os,
5021
      "vcpus": self.vcpus,
5022
      "memory": self.mem_size,
5023
      "disks": self.disks,
5024
      "disk_space_total": disk_space,
5025
      "nics": self.nics,
5026
      "required_nodes": self.required_nodes,
5027
      }
5028
    data["request"] = request
5029

    
5030
  def _AddRelocateInstance(self):
5031
    """Add relocate instance data to allocator structure.
5032

5033
    This in combination with _IAllocatorGetClusterData will create the
5034
    correct structure needed as input for the allocator.
5035

5036
    The checks for the completeness of the opcode must have already been
5037
    done.
5038

5039
    """
5040
    instance = self.cfg.GetInstanceInfo(self.name)
5041
    if instance is None:
5042
      raise errors.ProgrammerError("Unknown instance '%s' passed to"
5043
                                   " IAllocator" % self.name)
5044

    
5045
    if instance.disk_template not in constants.DTS_NET_MIRROR:
5046
      raise errors.OpPrereqError("Can't relocate non-mirrored instances")
5047

    
5048
    if len(instance.secondary_nodes) != 1:
5049
      raise errors.OpPrereqError("Instance has not exactly one secondary node")
5050

    
5051
    self.required_nodes = 1
5052

    
5053
    disk_space = _ComputeDiskSize(instance.disk_template,
5054
                                  instance.disks[0].size,
5055
                                  instance.disks[1].size)
5056

    
5057
    request = {
5058
      "type": "relocate",
5059
      "name": self.name,
5060
      "disk_space_total": disk_space,
5061
      "required_nodes": self.required_nodes,
5062
      "relocate_from": self.relocate_from,
5063
      }
5064
    self.in_data["request"] = request
5065

    
5066
  def _BuildInputData(self):
5067
    """Build input data structures.
5068

5069
    """
5070
    self._ComputeClusterData()
5071

    
5072
    if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5073
      self._AddNewInstance()
5074
    else:
5075
      self._AddRelocateInstance()
5076

    
5077
    self.in_text = serializer.Dump(self.in_data)
5078

    
5079
  def Run(self, name, validate=True, call_fn=rpc.call_iallocator_runner):
5080
    """Run an instance allocator and return the results.
5081

5082
    """
5083
    data = self.in_text
5084

    
5085
    result = call_fn(self.sstore.GetMasterNode(), name, self.in_text)
5086

    
5087
    if not isinstance(result, tuple) or len(result) != 4:
5088
      raise errors.OpExecError("Invalid result from master iallocator runner")
5089

    
5090
    rcode, stdout, stderr, fail = result
5091

    
5092
    if rcode == constants.IARUN_NOTFOUND:
5093
      raise errors.OpExecError("Can't find allocator '%s'" % name)
5094
    elif rcode == constants.IARUN_FAILURE:
5095
      raise errors.OpExecError("Instance allocator call failed: %s,"
5096
                               " output: %s" % (fail, stdout+stderr))
5097
    self.out_text = stdout
5098
    if validate:
5099
      self._ValidateResult()
5100

    
5101
  def _ValidateResult(self):
5102
    """Process the allocator results.
5103

5104
    This will process and if successful save the result in
5105
    self.out_data and the other parameters.
5106

5107
    """
5108
    try:
5109
      rdict = serializer.Load(self.out_text)
5110
    except Exception, err:
5111
      raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
5112

    
5113
    if not isinstance(rdict, dict):
5114
      raise errors.OpExecError("Can't parse iallocator results: not a dict")
5115

    
5116
    for key in "success", "info", "nodes":
5117
      if key not in rdict:
5118
        raise errors.OpExecError("Can't parse iallocator results:"
5119
                                 " missing key '%s'" % key)
5120
      setattr(self, key, rdict[key])
5121

    
5122
    if not isinstance(rdict["nodes"], list):
5123
      raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
5124
                               " is not a list")
5125
    self.out_data = rdict
5126

    
5127

    
5128
class LUTestAllocator(NoHooksLU):
5129
  """Run allocator tests.
5130

5131
  This LU runs the allocator tests
5132

5133
  """
5134
  _OP_REQP = ["direction", "mode", "name"]
5135

    
5136
  def CheckPrereq(self):
5137
    """Check prerequisites.
5138

5139
    This checks the opcode parameters depending on the director and mode test.
5140

5141
    """
5142
    if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5143
      for attr in ["name", "mem_size", "disks", "disk_template",
5144
                   "os", "tags", "nics", "vcpus"]:
5145
        if not hasattr(self.op, attr):
5146
          raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
5147
                                     attr)
5148
      iname = self.cfg.ExpandInstanceName(self.op.name)
5149
      if iname is not None:
5150
        raise errors.OpPrereqError("Instance '%s' already in the cluster" %
5151
                                   iname)
5152
      if not isinstance(self.op.nics, list):
5153
        raise errors.OpPrereqError("Invalid parameter 'nics'")
5154
      for row in self.op.nics:
5155
        if (not isinstance(row, dict) or
5156
            "mac" not in row or
5157
            "ip" not in row or
5158
            "bridge" not in row):
5159
          raise errors.OpPrereqError("Invalid contents of the"
5160
                                     " 'nics' parameter")
5161
      if not isinstance(self.op.disks, list):
5162
        raise errors.OpPrereqError("Invalid parameter 'disks'")
5163
      if len(self.op.disks) != 2:
5164
        raise errors.OpPrereqError("Only two-disk configurations supported")
5165
      for row in self.op.disks:
5166
        if (not isinstance(row, dict) or
5167
            "size" not in row or
5168
            not isinstance(row["size"], int) or
5169
            "mode" not in row or
5170
            row["mode"] not in ['r', 'w']):
5171
          raise errors.OpPrereqError("Invalid contents of the"
5172
                                     " 'disks' parameter")
5173
    elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
5174
      if not hasattr(self.op, "name"):
5175
        raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
5176
      fname = self.cfg.ExpandInstanceName(self.op.name)
5177
      if fname is None:
5178
        raise errors.OpPrereqError("Instance '%s' not found for relocation" %
5179
                                   self.op.name)
5180
      self.op.name = fname
5181
      self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
5182
    else:
5183
      raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
5184
                                 self.op.mode)
5185

    
5186
    if self.op.direction == constants.IALLOCATOR_DIR_OUT:
5187
      if not hasattr(self.op, "allocator") or self.op.allocator is None:
5188
        raise errors.OpPrereqError("Missing allocator name")
5189
    elif self.op.direction != constants.IALLOCATOR_DIR_IN:
5190
      raise errors.OpPrereqError("Wrong allocator test '%s'" %
5191
                                 self.op.direction)
5192

    
5193
  def Exec(self, feedback_fn):
5194
    """Run the allocator test.
5195

5196
    """
5197
    if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5198
      ial = IAllocator(self.cfg, self.sstore,
5199
                       mode=self.op.mode,
5200
                       name=self.op.name,
5201
                       mem_size=self.op.mem_size,
5202
                       disks=self.op.disks,
5203
                       disk_template=self.op.disk_template,
5204
                       os=self.op.os,
5205
                       tags=self.op.tags,
5206
                       nics=self.op.nics,
5207
                       vcpus=self.op.vcpus,
5208
                       )
5209
    else:
5210
      ial = IAllocator(self.cfg, self.sstore,
5211
                       mode=self.op.mode,
5212
                       name=self.op.name,
5213
                       relocate_from=list(self.relocate_from),
5214
                       )
5215

    
5216
    if self.op.direction == constants.IALLOCATOR_DIR_IN:
5217
      result = ial.in_text
5218
    else:
5219
      ial.Run(self.op.allocator, validate=False)
5220
      result = ial.out_text
5221
    return result