Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ 21a15682

History | View | Annotate | Download (179.9 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0613,W0201
25

    
26
import os
27
import os.path
28
import sha
29
import time
30
import tempfile
31
import re
32
import platform
33

    
34
from ganeti import rpc
35
from ganeti import ssh
36
from ganeti import logger
37
from ganeti import utils
38
from ganeti import errors
39
from ganeti import hypervisor
40
from ganeti import locking
41
from ganeti import constants
42
from ganeti import objects
43
from ganeti import opcodes
44
from ganeti import serializer
45

    
46

    
47
class LogicalUnit(object):
48
  """Logical Unit base class.
49

50
  Subclasses must follow these rules:
51
    - implement ExpandNames
52
    - implement CheckPrereq
53
    - implement Exec
54
    - implement BuildHooksEnv
55
    - redefine HPATH and HTYPE
56
    - optionally redefine their run requirements:
57
        REQ_MASTER: the LU needs to run on the master node
58
        REQ_WSSTORE: the LU needs a writable SimpleStore
59
        REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
60

61
  Note that all commands require root permissions.
62

63
  """
64
  HPATH = None
65
  HTYPE = None
66
  _OP_REQP = []
67
  REQ_MASTER = True
68
  REQ_WSSTORE = False
69
  REQ_BGL = True
70

    
71
  def __init__(self, processor, op, context, sstore):
72
    """Constructor for LogicalUnit.
73

74
    This needs to be overriden in derived classes in order to check op
75
    validity.
76

77
    """
78
    self.proc = processor
79
    self.op = op
80
    self.cfg = context.cfg
81
    self.sstore = sstore
82
    self.context = context
83
    self.needed_locks = None
84
    self.acquired_locks = {}
85
    self.share_locks = dict(((i, 0) for i in locking.LEVELS))
86
    # Used to force good behavior when calling helper functions
87
    self.recalculate_locks = {}
88
    self.__ssh = None
89

    
90
    for attr_name in self._OP_REQP:
91
      attr_val = getattr(op, attr_name, None)
92
      if attr_val is None:
93
        raise errors.OpPrereqError("Required parameter '%s' missing" %
94
                                   attr_name)
95

    
96
    if not self.cfg.IsCluster():
97
      raise errors.OpPrereqError("Cluster not initialized yet,"
98
                                 " use 'gnt-cluster init' first.")
99
    if self.REQ_MASTER:
100
      master = sstore.GetMasterNode()
101
      if master != utils.HostInfo().name:
102
        raise errors.OpPrereqError("Commands must be run on the master"
103
                                   " node %s" % master)
104

    
105
  def __GetSSH(self):
106
    """Returns the SshRunner object
107

108
    """
109
    if not self.__ssh:
110
      self.__ssh = ssh.SshRunner(self.sstore)
111
    return self.__ssh
112

    
113
  ssh = property(fget=__GetSSH)
114

    
115
  def ExpandNames(self):
116
    """Expand names for this LU.
117

118
    This method is called before starting to execute the opcode, and it should
119
    update all the parameters of the opcode to their canonical form (e.g. a
120
    short node name must be fully expanded after this method has successfully
121
    completed). This way locking, hooks, logging, ecc. can work correctly.
122

123
    LUs which implement this method must also populate the self.needed_locks
124
    member, as a dict with lock levels as keys, and a list of needed lock names
125
    as values. Rules:
126
      - Use an empty dict if you don't need any lock
127
      - If you don't need any lock at a particular level omit that level
128
      - Don't put anything for the BGL level
129
      - If you want all locks at a level use None as a value
130
        (this reflects what LockSet does, and will be replaced before
131
        CheckPrereq with the full list of nodes that have been locked)
132

133
    If you need to share locks (rather than acquire them exclusively) at one
134
    level you can modify self.share_locks, setting a true value (usually 1) for
135
    that level. By default locks are not shared.
136

137
    Examples:
138
    # Acquire all nodes and one instance
139
    self.needed_locks = {
140
      locking.LEVEL_NODE: None,
141
      locking.LEVEL_INSTANCES: ['instance1.example.tld'],
142
    }
143
    # Acquire just two nodes
144
    self.needed_locks = {
145
      locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
146
    }
147
    # Acquire no locks
148
    self.needed_locks = {} # No, you can't leave it to the default value None
149

150
    """
151
    # The implementation of this method is mandatory only if the new LU is
152
    # concurrent, so that old LUs don't need to be changed all at the same
153
    # time.
154
    if self.REQ_BGL:
155
      self.needed_locks = {} # Exclusive LUs don't need locks.
156
    else:
157
      raise NotImplementedError
158

    
159
  def DeclareLocks(self, level):
160
    """Declare LU locking needs for a level
161

162
    While most LUs can just declare their locking needs at ExpandNames time,
163
    sometimes there's the need to calculate some locks after having acquired
164
    the ones before. This function is called just before acquiring locks at a
165
    particular level, but after acquiring the ones at lower levels, and permits
166
    such calculations. It can be used to modify self.needed_locks, and by
167
    default it does nothing.
168

169
    This function is only called if you have something already set in
170
    self.needed_locks for the level.
171

172
    @param level: Locking level which is going to be locked
173
    @type level: member of ganeti.locking.LEVELS
174

175
    """
176

    
177
  def CheckPrereq(self):
178
    """Check prerequisites for this LU.
179

180
    This method should check that the prerequisites for the execution
181
    of this LU are fulfilled. It can do internode communication, but
182
    it should be idempotent - no cluster or system changes are
183
    allowed.
184

185
    The method should raise errors.OpPrereqError in case something is
186
    not fulfilled. Its return value is ignored.
187

188
    This method should also update all the parameters of the opcode to
189
    their canonical form if it hasn't been done by ExpandNames before.
190

191
    """
192
    raise NotImplementedError
193

    
194
  def Exec(self, feedback_fn):
195
    """Execute the LU.
196

197
    This method should implement the actual work. It should raise
198
    errors.OpExecError for failures that are somewhat dealt with in
199
    code, or expected.
200

201
    """
202
    raise NotImplementedError
203

    
204
  def BuildHooksEnv(self):
205
    """Build hooks environment for this LU.
206

207
    This method should return a three-node tuple consisting of: a dict
208
    containing the environment that will be used for running the
209
    specific hook for this LU, a list of node names on which the hook
210
    should run before the execution, and a list of node names on which
211
    the hook should run after the execution.
212

213
    The keys of the dict must not have 'GANETI_' prefixed as this will
214
    be handled in the hooks runner. Also note additional keys will be
215
    added by the hooks runner. If the LU doesn't define any
216
    environment, an empty dict (and not None) should be returned.
217

218
    No nodes should be returned as an empty list (and not None).
219

220
    Note that if the HPATH for a LU class is None, this function will
221
    not be called.
222

223
    """
224
    raise NotImplementedError
225

    
226
  def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
227
    """Notify the LU about the results of its hooks.
228

229
    This method is called every time a hooks phase is executed, and notifies
230
    the Logical Unit about the hooks' result. The LU can then use it to alter
231
    its result based on the hooks.  By default the method does nothing and the
232
    previous result is passed back unchanged but any LU can define it if it
233
    wants to use the local cluster hook-scripts somehow.
234

235
    Args:
236
      phase: the hooks phase that has just been run
237
      hooks_results: the results of the multi-node hooks rpc call
238
      feedback_fn: function to send feedback back to the caller
239
      lu_result: the previous result this LU had, or None in the PRE phase.
240

241
    """
242
    return lu_result
243

    
244
  def _ExpandAndLockInstance(self):
245
    """Helper function to expand and lock an instance.
246

247
    Many LUs that work on an instance take its name in self.op.instance_name
248
    and need to expand it and then declare the expanded name for locking. This
249
    function does it, and then updates self.op.instance_name to the expanded
250
    name. It also initializes needed_locks as a dict, if this hasn't been done
251
    before.
252

253
    """
254
    if self.needed_locks is None:
255
      self.needed_locks = {}
256
    else:
257
      assert locking.LEVEL_INSTANCE not in self.needed_locks, \
258
        "_ExpandAndLockInstance called with instance-level locks set"
259
    expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
260
    if expanded_name is None:
261
      raise errors.OpPrereqError("Instance '%s' not known" %
262
                                  self.op.instance_name)
263
    self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
264
    self.op.instance_name = expanded_name
265

    
266
  def _LockInstancesNodes(self):
267
    """Helper function to declare instances' nodes for locking.
268

269
    This function should be called after locking one or more instances to lock
270
    their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
271
    with all primary or secondary nodes for instances already locked and
272
    present in self.needed_locks[locking.LEVEL_INSTANCE].
273

274
    It should be called from DeclareLocks, and for safety only works if
275
    self.recalculate_locks[locking.LEVEL_NODE] is set.
276

277
    In the future it may grow parameters to just lock some instance's nodes, or
278
    to just lock primaries or secondary nodes, if needed.
279

280
    If should be called in DeclareLocks in a way similar to:
281

282
    if level == locking.LEVEL_NODE:
283
      self._LockInstancesNodes()
284

285
    """
286
    assert locking.LEVEL_NODE in self.recalculate_locks, \
287
      "_LockInstancesNodes helper function called with no nodes to recalculate"
288

    
289
    # TODO: check if we're really been called with the instance locks held
290

    
291
    # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
292
    # future we might want to have different behaviors depending on the value
293
    # of self.recalculate_locks[locking.LEVEL_NODE]
294
    wanted_nodes = []
295
    for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
296
      instance = self.context.cfg.GetInstanceInfo(instance_name)
297
      wanted_nodes.append(instance.primary_node)
298
      wanted_nodes.extend(instance.secondary_nodes)
299
    self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
300

    
301
    del self.recalculate_locks[locking.LEVEL_NODE]
302

    
303

    
304
class NoHooksLU(LogicalUnit):
305
  """Simple LU which runs no hooks.
306

307
  This LU is intended as a parent for other LogicalUnits which will
308
  run no hooks, in order to reduce duplicate code.
309

310
  """
311
  HPATH = None
312
  HTYPE = None
313

    
314

    
315
def _GetWantedNodes(lu, nodes):
316
  """Returns list of checked and expanded node names.
317

318
  Args:
319
    nodes: List of nodes (strings) or None for all
320

321
  """
322
  if not isinstance(nodes, list):
323
    raise errors.OpPrereqError("Invalid argument type 'nodes'")
324

    
325
  if nodes:
326
    wanted = []
327

    
328
    for name in nodes:
329
      node = lu.cfg.ExpandNodeName(name)
330
      if node is None:
331
        raise errors.OpPrereqError("No such node name '%s'" % name)
332
      wanted.append(node)
333

    
334
  else:
335
    wanted = lu.cfg.GetNodeList()
336
  return utils.NiceSort(wanted)
337

    
338

    
339
def _GetWantedInstances(lu, instances):
340
  """Returns list of checked and expanded instance names.
341

342
  Args:
343
    instances: List of instances (strings) or None for all
344

345
  """
346
  if not isinstance(instances, list):
347
    raise errors.OpPrereqError("Invalid argument type 'instances'")
348

    
349
  if instances:
350
    wanted = []
351

    
352
    for name in instances:
353
      instance = lu.cfg.ExpandInstanceName(name)
354
      if instance is None:
355
        raise errors.OpPrereqError("No such instance name '%s'" % name)
356
      wanted.append(instance)
357

    
358
  else:
359
    wanted = lu.cfg.GetInstanceList()
360
  return utils.NiceSort(wanted)
361

    
362

    
363
def _CheckOutputFields(static, dynamic, selected):
364
  """Checks whether all selected fields are valid.
365

366
  Args:
367
    static: Static fields
368
    dynamic: Dynamic fields
369

370
  """
371
  static_fields = frozenset(static)
372
  dynamic_fields = frozenset(dynamic)
373

    
374
  all_fields = static_fields | dynamic_fields
375

    
376
  if not all_fields.issuperset(selected):
377
    raise errors.OpPrereqError("Unknown output fields selected: %s"
378
                               % ",".join(frozenset(selected).
379
                                          difference(all_fields)))
380

    
381

    
382
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
383
                          memory, vcpus, nics):
384
  """Builds instance related env variables for hooks from single variables.
385

386
  Args:
387
    secondary_nodes: List of secondary nodes as strings
388
  """
389
  env = {
390
    "OP_TARGET": name,
391
    "INSTANCE_NAME": name,
392
    "INSTANCE_PRIMARY": primary_node,
393
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
394
    "INSTANCE_OS_TYPE": os_type,
395
    "INSTANCE_STATUS": status,
396
    "INSTANCE_MEMORY": memory,
397
    "INSTANCE_VCPUS": vcpus,
398
  }
399

    
400
  if nics:
401
    nic_count = len(nics)
402
    for idx, (ip, bridge, mac) in enumerate(nics):
403
      if ip is None:
404
        ip = ""
405
      env["INSTANCE_NIC%d_IP" % idx] = ip
406
      env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
407
      env["INSTANCE_NIC%d_HWADDR" % idx] = mac
408
  else:
409
    nic_count = 0
410

    
411
  env["INSTANCE_NIC_COUNT"] = nic_count
412

    
413
  return env
414

    
415

    
416
def _BuildInstanceHookEnvByObject(instance, override=None):
417
  """Builds instance related env variables for hooks from an object.
418

419
  Args:
420
    instance: objects.Instance object of instance
421
    override: dict of values to override
422
  """
423
  args = {
424
    'name': instance.name,
425
    'primary_node': instance.primary_node,
426
    'secondary_nodes': instance.secondary_nodes,
427
    'os_type': instance.os,
428
    'status': instance.os,
429
    'memory': instance.memory,
430
    'vcpus': instance.vcpus,
431
    'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
432
  }
433
  if override:
434
    args.update(override)
435
  return _BuildInstanceHookEnv(**args)
436

    
437

    
438
def _CheckInstanceBridgesExist(instance):
439
  """Check that the brigdes needed by an instance exist.
440

441
  """
442
  # check bridges existance
443
  brlist = [nic.bridge for nic in instance.nics]
444
  if not rpc.call_bridges_exist(instance.primary_node, brlist):
445
    raise errors.OpPrereqError("one or more target bridges %s does not"
446
                               " exist on destination node '%s'" %
447
                               (brlist, instance.primary_node))
448

    
449

    
450
class LUDestroyCluster(NoHooksLU):
451
  """Logical unit for destroying the cluster.
452

453
  """
454
  _OP_REQP = []
455

    
456
  def CheckPrereq(self):
457
    """Check prerequisites.
458

459
    This checks whether the cluster is empty.
460

461
    Any errors are signalled by raising errors.OpPrereqError.
462

463
    """
464
    master = self.sstore.GetMasterNode()
465

    
466
    nodelist = self.cfg.GetNodeList()
467
    if len(nodelist) != 1 or nodelist[0] != master:
468
      raise errors.OpPrereqError("There are still %d node(s) in"
469
                                 " this cluster." % (len(nodelist) - 1))
470
    instancelist = self.cfg.GetInstanceList()
471
    if instancelist:
472
      raise errors.OpPrereqError("There are still %d instance(s) in"
473
                                 " this cluster." % len(instancelist))
474

    
475
  def Exec(self, feedback_fn):
476
    """Destroys the cluster.
477

478
    """
479
    master = self.sstore.GetMasterNode()
480
    if not rpc.call_node_stop_master(master, False):
481
      raise errors.OpExecError("Could not disable the master role")
482
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
483
    utils.CreateBackup(priv_key)
484
    utils.CreateBackup(pub_key)
485
    return master
486

    
487

    
488
class LUVerifyCluster(LogicalUnit):
489
  """Verifies the cluster status.
490

491
  """
492
  HPATH = "cluster-verify"
493
  HTYPE = constants.HTYPE_CLUSTER
494
  _OP_REQP = ["skip_checks"]
495

    
496
  def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
497
                  remote_version, feedback_fn):
498
    """Run multiple tests against a node.
499

500
    Test list:
501
      - compares ganeti version
502
      - checks vg existance and size > 20G
503
      - checks config file checksum
504
      - checks ssh to other nodes
505

506
    Args:
507
      node: name of the node to check
508
      file_list: required list of files
509
      local_cksum: dictionary of local files and their checksums
510

511
    """
512
    # compares ganeti version
513
    local_version = constants.PROTOCOL_VERSION
514
    if not remote_version:
515
      feedback_fn("  - ERROR: connection to %s failed" % (node))
516
      return True
517

    
518
    if local_version != remote_version:
519
      feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
520
                      (local_version, node, remote_version))
521
      return True
522

    
523
    # checks vg existance and size > 20G
524

    
525
    bad = False
526
    if not vglist:
527
      feedback_fn("  - ERROR: unable to check volume groups on node %s." %
528
                      (node,))
529
      bad = True
530
    else:
531
      vgstatus = utils.CheckVolumeGroupSize(vglist, self.cfg.GetVGName(),
532
                                            constants.MIN_VG_SIZE)
533
      if vgstatus:
534
        feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
535
        bad = True
536

    
537
    # checks config file checksum
538
    # checks ssh to any
539

    
540
    if 'filelist' not in node_result:
541
      bad = True
542
      feedback_fn("  - ERROR: node hasn't returned file checksum data")
543
    else:
544
      remote_cksum = node_result['filelist']
545
      for file_name in file_list:
546
        if file_name not in remote_cksum:
547
          bad = True
548
          feedback_fn("  - ERROR: file '%s' missing" % file_name)
549
        elif remote_cksum[file_name] != local_cksum[file_name]:
550
          bad = True
551
          feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
552

    
553
    if 'nodelist' not in node_result:
554
      bad = True
555
      feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
556
    else:
557
      if node_result['nodelist']:
558
        bad = True
559
        for node in node_result['nodelist']:
560
          feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
561
                          (node, node_result['nodelist'][node]))
562
    if 'node-net-test' not in node_result:
563
      bad = True
564
      feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
565
    else:
566
      if node_result['node-net-test']:
567
        bad = True
568
        nlist = utils.NiceSort(node_result['node-net-test'].keys())
569
        for node in nlist:
570
          feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
571
                          (node, node_result['node-net-test'][node]))
572

    
573
    hyp_result = node_result.get('hypervisor', None)
574
    if hyp_result is not None:
575
      feedback_fn("  - ERROR: hypervisor verify failure: '%s'" % hyp_result)
576
    return bad
577

    
578
  def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
579
                      node_instance, feedback_fn):
580
    """Verify an instance.
581

582
    This function checks to see if the required block devices are
583
    available on the instance's node.
584

585
    """
586
    bad = False
587

    
588
    node_current = instanceconfig.primary_node
589

    
590
    node_vol_should = {}
591
    instanceconfig.MapLVsByNode(node_vol_should)
592

    
593
    for node in node_vol_should:
594
      for volume in node_vol_should[node]:
595
        if node not in node_vol_is or volume not in node_vol_is[node]:
596
          feedback_fn("  - ERROR: volume %s missing on node %s" %
597
                          (volume, node))
598
          bad = True
599

    
600
    if not instanceconfig.status == 'down':
601
      if (node_current not in node_instance or
602
          not instance in node_instance[node_current]):
603
        feedback_fn("  - ERROR: instance %s not running on node %s" %
604
                        (instance, node_current))
605
        bad = True
606

    
607
    for node in node_instance:
608
      if (not node == node_current):
609
        if instance in node_instance[node]:
610
          feedback_fn("  - ERROR: instance %s should not run on node %s" %
611
                          (instance, node))
612
          bad = True
613

    
614
    return bad
615

    
616
  def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
617
    """Verify if there are any unknown volumes in the cluster.
618

619
    The .os, .swap and backup volumes are ignored. All other volumes are
620
    reported as unknown.
621

622
    """
623
    bad = False
624

    
625
    for node in node_vol_is:
626
      for volume in node_vol_is[node]:
627
        if node not in node_vol_should or volume not in node_vol_should[node]:
628
          feedback_fn("  - ERROR: volume %s on node %s should not exist" %
629
                      (volume, node))
630
          bad = True
631
    return bad
632

    
633
  def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
634
    """Verify the list of running instances.
635

636
    This checks what instances are running but unknown to the cluster.
637

638
    """
639
    bad = False
640
    for node in node_instance:
641
      for runninginstance in node_instance[node]:
642
        if runninginstance not in instancelist:
643
          feedback_fn("  - ERROR: instance %s on node %s should not exist" %
644
                          (runninginstance, node))
645
          bad = True
646
    return bad
647

    
648
  def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
649
    """Verify N+1 Memory Resilience.
650

651
    Check that if one single node dies we can still start all the instances it
652
    was primary for.
653

654
    """
655
    bad = False
656

    
657
    for node, nodeinfo in node_info.iteritems():
658
      # This code checks that every node which is now listed as secondary has
659
      # enough memory to host all instances it is supposed to should a single
660
      # other node in the cluster fail.
661
      # FIXME: not ready for failover to an arbitrary node
662
      # FIXME: does not support file-backed instances
663
      # WARNING: we currently take into account down instances as well as up
664
      # ones, considering that even if they're down someone might want to start
665
      # them even in the event of a node failure.
666
      for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
667
        needed_mem = 0
668
        for instance in instances:
669
          needed_mem += instance_cfg[instance].memory
670
        if nodeinfo['mfree'] < needed_mem:
671
          feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
672
                      " failovers should node %s fail" % (node, prinode))
673
          bad = True
674
    return bad
675

    
676
  def CheckPrereq(self):
677
    """Check prerequisites.
678

679
    Transform the list of checks we're going to skip into a set and check that
680
    all its members are valid.
681

682
    """
683
    self.skip_set = frozenset(self.op.skip_checks)
684
    if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
685
      raise errors.OpPrereqError("Invalid checks to be skipped specified")
686

    
687
  def BuildHooksEnv(self):
688
    """Build hooks env.
689

690
    Cluster-Verify hooks just rone in the post phase and their failure makes
691
    the output be logged in the verify output and the verification to fail.
692

693
    """
694
    all_nodes = self.cfg.GetNodeList()
695
    # TODO: populate the environment with useful information for verify hooks
696
    env = {}
697
    return env, [], all_nodes
698

    
699
  def Exec(self, feedback_fn):
700
    """Verify integrity of cluster, performing various test on nodes.
701

702
    """
703
    bad = False
704
    feedback_fn("* Verifying global settings")
705
    for msg in self.cfg.VerifyConfig():
706
      feedback_fn("  - ERROR: %s" % msg)
707

    
708
    vg_name = self.cfg.GetVGName()
709
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
710
    nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
711
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
712
    i_non_redundant = [] # Non redundant instances
713
    node_volume = {}
714
    node_instance = {}
715
    node_info = {}
716
    instance_cfg = {}
717

    
718
    # FIXME: verify OS list
719
    # do local checksums
720
    file_names = list(self.sstore.GetFileList())
721
    file_names.append(constants.SSL_CERT_FILE)
722
    file_names.append(constants.CLUSTER_CONF_FILE)
723
    local_checksums = utils.FingerprintFiles(file_names)
724

    
725
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
726
    all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
727
    all_instanceinfo = rpc.call_instance_list(nodelist)
728
    all_vglist = rpc.call_vg_list(nodelist)
729
    node_verify_param = {
730
      'filelist': file_names,
731
      'nodelist': nodelist,
732
      'hypervisor': None,
733
      'node-net-test': [(node.name, node.primary_ip, node.secondary_ip)
734
                        for node in nodeinfo]
735
      }
736
    all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
737
    all_rversion = rpc.call_version(nodelist)
738
    all_ninfo = rpc.call_node_info(nodelist, self.cfg.GetVGName())
739

    
740
    for node in nodelist:
741
      feedback_fn("* Verifying node %s" % node)
742
      result = self._VerifyNode(node, file_names, local_checksums,
743
                                all_vglist[node], all_nvinfo[node],
744
                                all_rversion[node], feedback_fn)
745
      bad = bad or result
746

    
747
      # node_volume
748
      volumeinfo = all_volumeinfo[node]
749

    
750
      if isinstance(volumeinfo, basestring):
751
        feedback_fn("  - ERROR: LVM problem on node %s: %s" %
752
                    (node, volumeinfo[-400:].encode('string_escape')))
753
        bad = True
754
        node_volume[node] = {}
755
      elif not isinstance(volumeinfo, dict):
756
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
757
        bad = True
758
        continue
759
      else:
760
        node_volume[node] = volumeinfo
761

    
762
      # node_instance
763
      nodeinstance = all_instanceinfo[node]
764
      if type(nodeinstance) != list:
765
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
766
        bad = True
767
        continue
768

    
769
      node_instance[node] = nodeinstance
770

    
771
      # node_info
772
      nodeinfo = all_ninfo[node]
773
      if not isinstance(nodeinfo, dict):
774
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
775
        bad = True
776
        continue
777

    
778
      try:
779
        node_info[node] = {
780
          "mfree": int(nodeinfo['memory_free']),
781
          "dfree": int(nodeinfo['vg_free']),
782
          "pinst": [],
783
          "sinst": [],
784
          # dictionary holding all instances this node is secondary for,
785
          # grouped by their primary node. Each key is a cluster node, and each
786
          # value is a list of instances which have the key as primary and the
787
          # current node as secondary.  this is handy to calculate N+1 memory
788
          # availability if you can only failover from a primary to its
789
          # secondary.
790
          "sinst-by-pnode": {},
791
        }
792
      except ValueError:
793
        feedback_fn("  - ERROR: invalid value returned from node %s" % (node,))
794
        bad = True
795
        continue
796

    
797
    node_vol_should = {}
798

    
799
    for instance in instancelist:
800
      feedback_fn("* Verifying instance %s" % instance)
801
      inst_config = self.cfg.GetInstanceInfo(instance)
802
      result =  self._VerifyInstance(instance, inst_config, node_volume,
803
                                     node_instance, feedback_fn)
804
      bad = bad or result
805

    
806
      inst_config.MapLVsByNode(node_vol_should)
807

    
808
      instance_cfg[instance] = inst_config
809

    
810
      pnode = inst_config.primary_node
811
      if pnode in node_info:
812
        node_info[pnode]['pinst'].append(instance)
813
      else:
814
        feedback_fn("  - ERROR: instance %s, connection to primary node"
815
                    " %s failed" % (instance, pnode))
816
        bad = True
817

    
818
      # If the instance is non-redundant we cannot survive losing its primary
819
      # node, so we are not N+1 compliant. On the other hand we have no disk
820
      # templates with more than one secondary so that situation is not well
821
      # supported either.
822
      # FIXME: does not support file-backed instances
823
      if len(inst_config.secondary_nodes) == 0:
824
        i_non_redundant.append(instance)
825
      elif len(inst_config.secondary_nodes) > 1:
826
        feedback_fn("  - WARNING: multiple secondaries for instance %s"
827
                    % instance)
828

    
829
      for snode in inst_config.secondary_nodes:
830
        if snode in node_info:
831
          node_info[snode]['sinst'].append(instance)
832
          if pnode not in node_info[snode]['sinst-by-pnode']:
833
            node_info[snode]['sinst-by-pnode'][pnode] = []
834
          node_info[snode]['sinst-by-pnode'][pnode].append(instance)
835
        else:
836
          feedback_fn("  - ERROR: instance %s, connection to secondary node"
837
                      " %s failed" % (instance, snode))
838

    
839
    feedback_fn("* Verifying orphan volumes")
840
    result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
841
                                       feedback_fn)
842
    bad = bad or result
843

    
844
    feedback_fn("* Verifying remaining instances")
845
    result = self._VerifyOrphanInstances(instancelist, node_instance,
846
                                         feedback_fn)
847
    bad = bad or result
848

    
849
    if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
850
      feedback_fn("* Verifying N+1 Memory redundancy")
851
      result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
852
      bad = bad or result
853

    
854
    feedback_fn("* Other Notes")
855
    if i_non_redundant:
856
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
857
                  % len(i_non_redundant))
858

    
859
    return not bad
860

    
861
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
862
    """Analize the post-hooks' result, handle it, and send some
863
    nicely-formatted feedback back to the user.
864

865
    Args:
866
      phase: the hooks phase that has just been run
867
      hooks_results: the results of the multi-node hooks rpc call
868
      feedback_fn: function to send feedback back to the caller
869
      lu_result: previous Exec result
870

871
    """
872
    # We only really run POST phase hooks, and are only interested in
873
    # their results
874
    if phase == constants.HOOKS_PHASE_POST:
875
      # Used to change hooks' output to proper indentation
876
      indent_re = re.compile('^', re.M)
877
      feedback_fn("* Hooks Results")
878
      if not hooks_results:
879
        feedback_fn("  - ERROR: general communication failure")
880
        lu_result = 1
881
      else:
882
        for node_name in hooks_results:
883
          show_node_header = True
884
          res = hooks_results[node_name]
885
          if res is False or not isinstance(res, list):
886
            feedback_fn("    Communication failure")
887
            lu_result = 1
888
            continue
889
          for script, hkr, output in res:
890
            if hkr == constants.HKR_FAIL:
891
              # The node header is only shown once, if there are
892
              # failing hooks on that node
893
              if show_node_header:
894
                feedback_fn("  Node %s:" % node_name)
895
                show_node_header = False
896
              feedback_fn("    ERROR: Script %s failed, output:" % script)
897
              output = indent_re.sub('      ', output)
898
              feedback_fn("%s" % output)
899
              lu_result = 1
900

    
901
      return lu_result
902

    
903

    
904
class LUVerifyDisks(NoHooksLU):
905
  """Verifies the cluster disks status.
906

907
  """
908
  _OP_REQP = []
909

    
910
  def CheckPrereq(self):
911
    """Check prerequisites.
912

913
    This has no prerequisites.
914

915
    """
916
    pass
917

    
918
  def Exec(self, feedback_fn):
919
    """Verify integrity of cluster disks.
920

921
    """
922
    result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
923

    
924
    vg_name = self.cfg.GetVGName()
925
    nodes = utils.NiceSort(self.cfg.GetNodeList())
926
    instances = [self.cfg.GetInstanceInfo(name)
927
                 for name in self.cfg.GetInstanceList()]
928

    
929
    nv_dict = {}
930
    for inst in instances:
931
      inst_lvs = {}
932
      if (inst.status != "up" or
933
          inst.disk_template not in constants.DTS_NET_MIRROR):
934
        continue
935
      inst.MapLVsByNode(inst_lvs)
936
      # transform { iname: {node: [vol,],},} to {(node, vol): iname}
937
      for node, vol_list in inst_lvs.iteritems():
938
        for vol in vol_list:
939
          nv_dict[(node, vol)] = inst
940

    
941
    if not nv_dict:
942
      return result
943

    
944
    node_lvs = rpc.call_volume_list(nodes, vg_name)
945

    
946
    to_act = set()
947
    for node in nodes:
948
      # node_volume
949
      lvs = node_lvs[node]
950

    
951
      if isinstance(lvs, basestring):
952
        logger.Info("error enumerating LVs on node %s: %s" % (node, lvs))
953
        res_nlvm[node] = lvs
954
      elif not isinstance(lvs, dict):
955
        logger.Info("connection to node %s failed or invalid data returned" %
956
                    (node,))
957
        res_nodes.append(node)
958
        continue
959

    
960
      for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
961
        inst = nv_dict.pop((node, lv_name), None)
962
        if (not lv_online and inst is not None
963
            and inst.name not in res_instances):
964
          res_instances.append(inst.name)
965

    
966
    # any leftover items in nv_dict are missing LVs, let's arrange the
967
    # data better
968
    for key, inst in nv_dict.iteritems():
969
      if inst.name not in res_missing:
970
        res_missing[inst.name] = []
971
      res_missing[inst.name].append(key)
972

    
973
    return result
974

    
975

    
976
class LURenameCluster(LogicalUnit):
977
  """Rename the cluster.
978

979
  """
980
  HPATH = "cluster-rename"
981
  HTYPE = constants.HTYPE_CLUSTER
982
  _OP_REQP = ["name"]
983
  REQ_WSSTORE = True
984

    
985
  def BuildHooksEnv(self):
986
    """Build hooks env.
987

988
    """
989
    env = {
990
      "OP_TARGET": self.sstore.GetClusterName(),
991
      "NEW_NAME": self.op.name,
992
      }
993
    mn = self.sstore.GetMasterNode()
994
    return env, [mn], [mn]
995

    
996
  def CheckPrereq(self):
997
    """Verify that the passed name is a valid one.
998

999
    """
1000
    hostname = utils.HostInfo(self.op.name)
1001

    
1002
    new_name = hostname.name
1003
    self.ip = new_ip = hostname.ip
1004
    old_name = self.sstore.GetClusterName()
1005
    old_ip = self.sstore.GetMasterIP()
1006
    if new_name == old_name and new_ip == old_ip:
1007
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
1008
                                 " cluster has changed")
1009
    if new_ip != old_ip:
1010
      if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1011
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
1012
                                   " reachable on the network. Aborting." %
1013
                                   new_ip)
1014

    
1015
    self.op.name = new_name
1016

    
1017
  def Exec(self, feedback_fn):
1018
    """Rename the cluster.
1019

1020
    """
1021
    clustername = self.op.name
1022
    ip = self.ip
1023
    ss = self.sstore
1024

    
1025
    # shutdown the master IP
1026
    master = ss.GetMasterNode()
1027
    if not rpc.call_node_stop_master(master, False):
1028
      raise errors.OpExecError("Could not disable the master role")
1029

    
1030
    try:
1031
      # modify the sstore
1032
      ss.SetKey(ss.SS_MASTER_IP, ip)
1033
      ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
1034

    
1035
      # Distribute updated ss config to all nodes
1036
      myself = self.cfg.GetNodeInfo(master)
1037
      dist_nodes = self.cfg.GetNodeList()
1038
      if myself.name in dist_nodes:
1039
        dist_nodes.remove(myself.name)
1040

    
1041
      logger.Debug("Copying updated ssconf data to all nodes")
1042
      for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1043
        fname = ss.KeyToFilename(keyname)
1044
        result = rpc.call_upload_file(dist_nodes, fname)
1045
        for to_node in dist_nodes:
1046
          if not result[to_node]:
1047
            logger.Error("copy of file %s to node %s failed" %
1048
                         (fname, to_node))
1049
    finally:
1050
      if not rpc.call_node_start_master(master, False):
1051
        logger.Error("Could not re-enable the master role on the master,"
1052
                     " please restart manually.")
1053

    
1054

    
1055
def _RecursiveCheckIfLVMBased(disk):
1056
  """Check if the given disk or its children are lvm-based.
1057

1058
  Args:
1059
    disk: ganeti.objects.Disk object
1060

1061
  Returns:
1062
    boolean indicating whether a LD_LV dev_type was found or not
1063

1064
  """
1065
  if disk.children:
1066
    for chdisk in disk.children:
1067
      if _RecursiveCheckIfLVMBased(chdisk):
1068
        return True
1069
  return disk.dev_type == constants.LD_LV
1070

    
1071

    
1072
class LUSetClusterParams(LogicalUnit):
1073
  """Change the parameters of the cluster.
1074

1075
  """
1076
  HPATH = "cluster-modify"
1077
  HTYPE = constants.HTYPE_CLUSTER
1078
  _OP_REQP = []
1079

    
1080
  def BuildHooksEnv(self):
1081
    """Build hooks env.
1082

1083
    """
1084
    env = {
1085
      "OP_TARGET": self.sstore.GetClusterName(),
1086
      "NEW_VG_NAME": self.op.vg_name,
1087
      }
1088
    mn = self.sstore.GetMasterNode()
1089
    return env, [mn], [mn]
1090

    
1091
  def CheckPrereq(self):
1092
    """Check prerequisites.
1093

1094
    This checks whether the given params don't conflict and
1095
    if the given volume group is valid.
1096

1097
    """
1098
    if not self.op.vg_name:
1099
      instances = [self.cfg.GetInstanceInfo(name)
1100
                   for name in self.cfg.GetInstanceList()]
1101
      for inst in instances:
1102
        for disk in inst.disks:
1103
          if _RecursiveCheckIfLVMBased(disk):
1104
            raise errors.OpPrereqError("Cannot disable lvm storage while"
1105
                                       " lvm-based instances exist")
1106

    
1107
    # if vg_name not None, checks given volume group on all nodes
1108
    if self.op.vg_name:
1109
      node_list = self.cfg.GetNodeList()
1110
      vglist = rpc.call_vg_list(node_list)
1111
      for node in node_list:
1112
        vgstatus = utils.CheckVolumeGroupSize(vglist[node], self.op.vg_name,
1113
                                              constants.MIN_VG_SIZE)
1114
        if vgstatus:
1115
          raise errors.OpPrereqError("Error on node '%s': %s" %
1116
                                     (node, vgstatus))
1117

    
1118
  def Exec(self, feedback_fn):
1119
    """Change the parameters of the cluster.
1120

1121
    """
1122
    if self.op.vg_name != self.cfg.GetVGName():
1123
      self.cfg.SetVGName(self.op.vg_name)
1124
    else:
1125
      feedback_fn("Cluster LVM configuration already in desired"
1126
                  " state, not changing")
1127

    
1128

    
1129
def _WaitForSync(cfgw, instance, proc, oneshot=False, unlock=False):
1130
  """Sleep and poll for an instance's disk to sync.
1131

1132
  """
1133
  if not instance.disks:
1134
    return True
1135

    
1136
  if not oneshot:
1137
    proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1138

    
1139
  node = instance.primary_node
1140

    
1141
  for dev in instance.disks:
1142
    cfgw.SetDiskID(dev, node)
1143

    
1144
  retries = 0
1145
  while True:
1146
    max_time = 0
1147
    done = True
1148
    cumul_degraded = False
1149
    rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1150
    if not rstats:
1151
      proc.LogWarning("Can't get any data from node %s" % node)
1152
      retries += 1
1153
      if retries >= 10:
1154
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1155
                                 " aborting." % node)
1156
      time.sleep(6)
1157
      continue
1158
    retries = 0
1159
    for i in range(len(rstats)):
1160
      mstat = rstats[i]
1161
      if mstat is None:
1162
        proc.LogWarning("Can't compute data for node %s/%s" %
1163
                        (node, instance.disks[i].iv_name))
1164
        continue
1165
      # we ignore the ldisk parameter
1166
      perc_done, est_time, is_degraded, _ = mstat
1167
      cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1168
      if perc_done is not None:
1169
        done = False
1170
        if est_time is not None:
1171
          rem_time = "%d estimated seconds remaining" % est_time
1172
          max_time = est_time
1173
        else:
1174
          rem_time = "no time estimate"
1175
        proc.LogInfo("- device %s: %5.2f%% done, %s" %
1176
                     (instance.disks[i].iv_name, perc_done, rem_time))
1177
    if done or oneshot:
1178
      break
1179

    
1180
    time.sleep(min(60, max_time))
1181

    
1182
  if done:
1183
    proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1184
  return not cumul_degraded
1185

    
1186

    
1187
def _CheckDiskConsistency(cfgw, dev, node, on_primary, ldisk=False):
1188
  """Check that mirrors are not degraded.
1189

1190
  The ldisk parameter, if True, will change the test from the
1191
  is_degraded attribute (which represents overall non-ok status for
1192
  the device(s)) to the ldisk (representing the local storage status).
1193

1194
  """
1195
  cfgw.SetDiskID(dev, node)
1196
  if ldisk:
1197
    idx = 6
1198
  else:
1199
    idx = 5
1200

    
1201
  result = True
1202
  if on_primary or dev.AssembleOnSecondary():
1203
    rstats = rpc.call_blockdev_find(node, dev)
1204
    if not rstats:
1205
      logger.ToStderr("Node %s: Disk degraded, not found or node down" % node)
1206
      result = False
1207
    else:
1208
      result = result and (not rstats[idx])
1209
  if dev.children:
1210
    for child in dev.children:
1211
      result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1212

    
1213
  return result
1214

    
1215

    
1216
class LUDiagnoseOS(NoHooksLU):
1217
  """Logical unit for OS diagnose/query.
1218

1219
  """
1220
  _OP_REQP = ["output_fields", "names"]
1221
  REQ_BGL = False
1222

    
1223
  def ExpandNames(self):
1224
    if self.op.names:
1225
      raise errors.OpPrereqError("Selective OS query not supported")
1226

    
1227
    self.dynamic_fields = frozenset(["name", "valid", "node_status"])
1228
    _CheckOutputFields(static=[],
1229
                       dynamic=self.dynamic_fields,
1230
                       selected=self.op.output_fields)
1231

    
1232
    # Lock all nodes, in shared mode
1233
    self.needed_locks = {}
1234
    self.share_locks[locking.LEVEL_NODE] = 1
1235
    self.needed_locks[locking.LEVEL_NODE] = None
1236

    
1237
  def CheckPrereq(self):
1238
    """Check prerequisites.
1239

1240
    """
1241

    
1242
  @staticmethod
1243
  def _DiagnoseByOS(node_list, rlist):
1244
    """Remaps a per-node return list into an a per-os per-node dictionary
1245

1246
      Args:
1247
        node_list: a list with the names of all nodes
1248
        rlist: a map with node names as keys and OS objects as values
1249

1250
      Returns:
1251
        map: a map with osnames as keys and as value another map, with
1252
             nodes as
1253
             keys and list of OS objects as values
1254
             e.g. {"debian-etch": {"node1": [<object>,...],
1255
                                   "node2": [<object>,]}
1256
                  }
1257

1258
    """
1259
    all_os = {}
1260
    for node_name, nr in rlist.iteritems():
1261
      if not nr:
1262
        continue
1263
      for os_obj in nr:
1264
        if os_obj.name not in all_os:
1265
          # build a list of nodes for this os containing empty lists
1266
          # for each node in node_list
1267
          all_os[os_obj.name] = {}
1268
          for nname in node_list:
1269
            all_os[os_obj.name][nname] = []
1270
        all_os[os_obj.name][node_name].append(os_obj)
1271
    return all_os
1272

    
1273
  def Exec(self, feedback_fn):
1274
    """Compute the list of OSes.
1275

1276
    """
1277
    node_list = self.acquired_locks[locking.LEVEL_NODE]
1278
    node_data = rpc.call_os_diagnose(node_list)
1279
    if node_data == False:
1280
      raise errors.OpExecError("Can't gather the list of OSes")
1281
    pol = self._DiagnoseByOS(node_list, node_data)
1282
    output = []
1283
    for os_name, os_data in pol.iteritems():
1284
      row = []
1285
      for field in self.op.output_fields:
1286
        if field == "name":
1287
          val = os_name
1288
        elif field == "valid":
1289
          val = utils.all([osl and osl[0] for osl in os_data.values()])
1290
        elif field == "node_status":
1291
          val = {}
1292
          for node_name, nos_list in os_data.iteritems():
1293
            val[node_name] = [(v.status, v.path) for v in nos_list]
1294
        else:
1295
          raise errors.ParameterError(field)
1296
        row.append(val)
1297
      output.append(row)
1298

    
1299
    return output
1300

    
1301

    
1302
class LURemoveNode(LogicalUnit):
1303
  """Logical unit for removing a node.
1304

1305
  """
1306
  HPATH = "node-remove"
1307
  HTYPE = constants.HTYPE_NODE
1308
  _OP_REQP = ["node_name"]
1309

    
1310
  def BuildHooksEnv(self):
1311
    """Build hooks env.
1312

1313
    This doesn't run on the target node in the pre phase as a failed
1314
    node would then be impossible to remove.
1315

1316
    """
1317
    env = {
1318
      "OP_TARGET": self.op.node_name,
1319
      "NODE_NAME": self.op.node_name,
1320
      }
1321
    all_nodes = self.cfg.GetNodeList()
1322
    all_nodes.remove(self.op.node_name)
1323
    return env, all_nodes, all_nodes
1324

    
1325
  def CheckPrereq(self):
1326
    """Check prerequisites.
1327

1328
    This checks:
1329
     - the node exists in the configuration
1330
     - it does not have primary or secondary instances
1331
     - it's not the master
1332

1333
    Any errors are signalled by raising errors.OpPrereqError.
1334

1335
    """
1336
    node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1337
    if node is None:
1338
      raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1339

    
1340
    instance_list = self.cfg.GetInstanceList()
1341

    
1342
    masternode = self.sstore.GetMasterNode()
1343
    if node.name == masternode:
1344
      raise errors.OpPrereqError("Node is the master node,"
1345
                                 " you need to failover first.")
1346

    
1347
    for instance_name in instance_list:
1348
      instance = self.cfg.GetInstanceInfo(instance_name)
1349
      if node.name == instance.primary_node:
1350
        raise errors.OpPrereqError("Instance %s still running on the node,"
1351
                                   " please remove first." % instance_name)
1352
      if node.name in instance.secondary_nodes:
1353
        raise errors.OpPrereqError("Instance %s has node as a secondary,"
1354
                                   " please remove first." % instance_name)
1355
    self.op.node_name = node.name
1356
    self.node = node
1357

    
1358
  def Exec(self, feedback_fn):
1359
    """Removes the node from the cluster.
1360

1361
    """
1362
    node = self.node
1363
    logger.Info("stopping the node daemon and removing configs from node %s" %
1364
                node.name)
1365

    
1366
    self.context.RemoveNode(node.name)
1367

    
1368
    rpc.call_node_leave_cluster(node.name)
1369

    
1370

    
1371
class LUQueryNodes(NoHooksLU):
1372
  """Logical unit for querying nodes.
1373

1374
  """
1375
  _OP_REQP = ["output_fields", "names"]
1376
  REQ_BGL = False
1377

    
1378
  def ExpandNames(self):
1379
    self.dynamic_fields = frozenset([
1380
      "dtotal", "dfree",
1381
      "mtotal", "mnode", "mfree",
1382
      "bootid",
1383
      "ctotal",
1384
      ])
1385

    
1386
    _CheckOutputFields(static=["name", "pinst_cnt", "sinst_cnt",
1387
                               "pinst_list", "sinst_list",
1388
                               "pip", "sip", "tags"],
1389
                       dynamic=self.dynamic_fields,
1390
                       selected=self.op.output_fields)
1391

    
1392
    self.needed_locks = {}
1393
    self.share_locks[locking.LEVEL_NODE] = 1
1394
    # TODO: we could lock nodes only if the user asked for dynamic fields. For
1395
    # that we need atomic ways to get info for a group of nodes from the
1396
    # config, though.
1397
    if not self.op.names:
1398
      self.needed_locks[locking.LEVEL_NODE] = None
1399
    else:
1400
      self.needed_locks[locking.LEVEL_NODE] = \
1401
        _GetWantedNodes(self, self.op.names)
1402

    
1403
  def CheckPrereq(self):
1404
    """Check prerequisites.
1405

1406
    """
1407
    # This of course is valid only if we locked the nodes
1408
    self.wanted = self.acquired_locks[locking.LEVEL_NODE]
1409

    
1410
  def Exec(self, feedback_fn):
1411
    """Computes the list of nodes and their attributes.
1412

1413
    """
1414
    nodenames = self.wanted
1415
    nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1416

    
1417
    # begin data gathering
1418

    
1419
    if self.dynamic_fields.intersection(self.op.output_fields):
1420
      live_data = {}
1421
      node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1422
      for name in nodenames:
1423
        nodeinfo = node_data.get(name, None)
1424
        if nodeinfo:
1425
          live_data[name] = {
1426
            "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1427
            "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1428
            "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1429
            "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1430
            "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1431
            "ctotal": utils.TryConvert(int, nodeinfo['cpu_total']),
1432
            "bootid": nodeinfo['bootid'],
1433
            }
1434
        else:
1435
          live_data[name] = {}
1436
    else:
1437
      live_data = dict.fromkeys(nodenames, {})
1438

    
1439
    node_to_primary = dict([(name, set()) for name in nodenames])
1440
    node_to_secondary = dict([(name, set()) for name in nodenames])
1441

    
1442
    inst_fields = frozenset(("pinst_cnt", "pinst_list",
1443
                             "sinst_cnt", "sinst_list"))
1444
    if inst_fields & frozenset(self.op.output_fields):
1445
      instancelist = self.cfg.GetInstanceList()
1446

    
1447
      for instance_name in instancelist:
1448
        inst = self.cfg.GetInstanceInfo(instance_name)
1449
        if inst.primary_node in node_to_primary:
1450
          node_to_primary[inst.primary_node].add(inst.name)
1451
        for secnode in inst.secondary_nodes:
1452
          if secnode in node_to_secondary:
1453
            node_to_secondary[secnode].add(inst.name)
1454

    
1455
    # end data gathering
1456

    
1457
    output = []
1458
    for node in nodelist:
1459
      node_output = []
1460
      for field in self.op.output_fields:
1461
        if field == "name":
1462
          val = node.name
1463
        elif field == "pinst_list":
1464
          val = list(node_to_primary[node.name])
1465
        elif field == "sinst_list":
1466
          val = list(node_to_secondary[node.name])
1467
        elif field == "pinst_cnt":
1468
          val = len(node_to_primary[node.name])
1469
        elif field == "sinst_cnt":
1470
          val = len(node_to_secondary[node.name])
1471
        elif field == "pip":
1472
          val = node.primary_ip
1473
        elif field == "sip":
1474
          val = node.secondary_ip
1475
        elif field == "tags":
1476
          val = list(node.GetTags())
1477
        elif field in self.dynamic_fields:
1478
          val = live_data[node.name].get(field, None)
1479
        else:
1480
          raise errors.ParameterError(field)
1481
        node_output.append(val)
1482
      output.append(node_output)
1483

    
1484
    return output
1485

    
1486

    
1487
class LUQueryNodeVolumes(NoHooksLU):
1488
  """Logical unit for getting volumes on node(s).
1489

1490
  """
1491
  _OP_REQP = ["nodes", "output_fields"]
1492
  REQ_BGL = False
1493

    
1494
  def ExpandNames(self):
1495
    _CheckOutputFields(static=["node"],
1496
                       dynamic=["phys", "vg", "name", "size", "instance"],
1497
                       selected=self.op.output_fields)
1498

    
1499
    self.needed_locks = {}
1500
    self.share_locks[locking.LEVEL_NODE] = 1
1501
    if not self.op.nodes:
1502
      self.needed_locks[locking.LEVEL_NODE] = None
1503
    else:
1504
      self.needed_locks[locking.LEVEL_NODE] = \
1505
        _GetWantedNodes(self, self.op.nodes)
1506

    
1507
  def CheckPrereq(self):
1508
    """Check prerequisites.
1509

1510
    This checks that the fields required are valid output fields.
1511

1512
    """
1513
    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
1514

    
1515
  def Exec(self, feedback_fn):
1516
    """Computes the list of nodes and their attributes.
1517

1518
    """
1519
    nodenames = self.nodes
1520
    volumes = rpc.call_node_volumes(nodenames)
1521

    
1522
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
1523
             in self.cfg.GetInstanceList()]
1524

    
1525
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1526

    
1527
    output = []
1528
    for node in nodenames:
1529
      if node not in volumes or not volumes[node]:
1530
        continue
1531

    
1532
      node_vols = volumes[node][:]
1533
      node_vols.sort(key=lambda vol: vol['dev'])
1534

    
1535
      for vol in node_vols:
1536
        node_output = []
1537
        for field in self.op.output_fields:
1538
          if field == "node":
1539
            val = node
1540
          elif field == "phys":
1541
            val = vol['dev']
1542
          elif field == "vg":
1543
            val = vol['vg']
1544
          elif field == "name":
1545
            val = vol['name']
1546
          elif field == "size":
1547
            val = int(float(vol['size']))
1548
          elif field == "instance":
1549
            for inst in ilist:
1550
              if node not in lv_by_node[inst]:
1551
                continue
1552
              if vol['name'] in lv_by_node[inst][node]:
1553
                val = inst.name
1554
                break
1555
            else:
1556
              val = '-'
1557
          else:
1558
            raise errors.ParameterError(field)
1559
          node_output.append(str(val))
1560

    
1561
        output.append(node_output)
1562

    
1563
    return output
1564

    
1565

    
1566
class LUAddNode(LogicalUnit):
1567
  """Logical unit for adding node to the cluster.
1568

1569
  """
1570
  HPATH = "node-add"
1571
  HTYPE = constants.HTYPE_NODE
1572
  _OP_REQP = ["node_name"]
1573

    
1574
  def BuildHooksEnv(self):
1575
    """Build hooks env.
1576

1577
    This will run on all nodes before, and on all nodes + the new node after.
1578

1579
    """
1580
    env = {
1581
      "OP_TARGET": self.op.node_name,
1582
      "NODE_NAME": self.op.node_name,
1583
      "NODE_PIP": self.op.primary_ip,
1584
      "NODE_SIP": self.op.secondary_ip,
1585
      }
1586
    nodes_0 = self.cfg.GetNodeList()
1587
    nodes_1 = nodes_0 + [self.op.node_name, ]
1588
    return env, nodes_0, nodes_1
1589

    
1590
  def CheckPrereq(self):
1591
    """Check prerequisites.
1592

1593
    This checks:
1594
     - the new node is not already in the config
1595
     - it is resolvable
1596
     - its parameters (single/dual homed) matches the cluster
1597

1598
    Any errors are signalled by raising errors.OpPrereqError.
1599

1600
    """
1601
    node_name = self.op.node_name
1602
    cfg = self.cfg
1603

    
1604
    dns_data = utils.HostInfo(node_name)
1605

    
1606
    node = dns_data.name
1607
    primary_ip = self.op.primary_ip = dns_data.ip
1608
    secondary_ip = getattr(self.op, "secondary_ip", None)
1609
    if secondary_ip is None:
1610
      secondary_ip = primary_ip
1611
    if not utils.IsValidIP(secondary_ip):
1612
      raise errors.OpPrereqError("Invalid secondary IP given")
1613
    self.op.secondary_ip = secondary_ip
1614

    
1615
    node_list = cfg.GetNodeList()
1616
    if not self.op.readd and node in node_list:
1617
      raise errors.OpPrereqError("Node %s is already in the configuration" %
1618
                                 node)
1619
    elif self.op.readd and node not in node_list:
1620
      raise errors.OpPrereqError("Node %s is not in the configuration" % node)
1621

    
1622
    for existing_node_name in node_list:
1623
      existing_node = cfg.GetNodeInfo(existing_node_name)
1624

    
1625
      if self.op.readd and node == existing_node_name:
1626
        if (existing_node.primary_ip != primary_ip or
1627
            existing_node.secondary_ip != secondary_ip):
1628
          raise errors.OpPrereqError("Readded node doesn't have the same IP"
1629
                                     " address configuration as before")
1630
        continue
1631

    
1632
      if (existing_node.primary_ip == primary_ip or
1633
          existing_node.secondary_ip == primary_ip or
1634
          existing_node.primary_ip == secondary_ip or
1635
          existing_node.secondary_ip == secondary_ip):
1636
        raise errors.OpPrereqError("New node ip address(es) conflict with"
1637
                                   " existing node %s" % existing_node.name)
1638

    
1639
    # check that the type of the node (single versus dual homed) is the
1640
    # same as for the master
1641
    myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1642
    master_singlehomed = myself.secondary_ip == myself.primary_ip
1643
    newbie_singlehomed = secondary_ip == primary_ip
1644
    if master_singlehomed != newbie_singlehomed:
1645
      if master_singlehomed:
1646
        raise errors.OpPrereqError("The master has no private ip but the"
1647
                                   " new node has one")
1648
      else:
1649
        raise errors.OpPrereqError("The master has a private ip but the"
1650
                                   " new node doesn't have one")
1651

    
1652
    # checks reachablity
1653
    if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1654
      raise errors.OpPrereqError("Node not reachable by ping")
1655

    
1656
    if not newbie_singlehomed:
1657
      # check reachability from my secondary ip to newbie's secondary ip
1658
      if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1659
                           source=myself.secondary_ip):
1660
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1661
                                   " based ping to noded port")
1662

    
1663
    self.new_node = objects.Node(name=node,
1664
                                 primary_ip=primary_ip,
1665
                                 secondary_ip=secondary_ip)
1666

    
1667
  def Exec(self, feedback_fn):
1668
    """Adds the new node to the cluster.
1669

1670
    """
1671
    new_node = self.new_node
1672
    node = new_node.name
1673

    
1674
    # check connectivity
1675
    result = rpc.call_version([node])[node]
1676
    if result:
1677
      if constants.PROTOCOL_VERSION == result:
1678
        logger.Info("communication to node %s fine, sw version %s match" %
1679
                    (node, result))
1680
      else:
1681
        raise errors.OpExecError("Version mismatch master version %s,"
1682
                                 " node version %s" %
1683
                                 (constants.PROTOCOL_VERSION, result))
1684
    else:
1685
      raise errors.OpExecError("Cannot get version from the new node")
1686

    
1687
    # setup ssh on node
1688
    logger.Info("copy ssh key to node %s" % node)
1689
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1690
    keyarray = []
1691
    keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1692
                constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1693
                priv_key, pub_key]
1694

    
1695
    for i in keyfiles:
1696
      f = open(i, 'r')
1697
      try:
1698
        keyarray.append(f.read())
1699
      finally:
1700
        f.close()
1701

    
1702
    result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1703
                               keyarray[3], keyarray[4], keyarray[5])
1704

    
1705
    if not result:
1706
      raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1707

    
1708
    # Add node to our /etc/hosts, and add key to known_hosts
1709
    utils.AddHostToEtcHosts(new_node.name)
1710

    
1711
    if new_node.secondary_ip != new_node.primary_ip:
1712
      if not rpc.call_node_tcp_ping(new_node.name,
1713
                                    constants.LOCALHOST_IP_ADDRESS,
1714
                                    new_node.secondary_ip,
1715
                                    constants.DEFAULT_NODED_PORT,
1716
                                    10, False):
1717
        raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1718
                                 " you gave (%s). Please fix and re-run this"
1719
                                 " command." % new_node.secondary_ip)
1720

    
1721
    node_verify_list = [self.sstore.GetMasterNode()]
1722
    node_verify_param = {
1723
      'nodelist': [node],
1724
      # TODO: do a node-net-test as well?
1725
    }
1726

    
1727
    result = rpc.call_node_verify(node_verify_list, node_verify_param)
1728
    for verifier in node_verify_list:
1729
      if not result[verifier]:
1730
        raise errors.OpExecError("Cannot communicate with %s's node daemon"
1731
                                 " for remote verification" % verifier)
1732
      if result[verifier]['nodelist']:
1733
        for failed in result[verifier]['nodelist']:
1734
          feedback_fn("ssh/hostname verification failed %s -> %s" %
1735
                      (verifier, result[verifier]['nodelist'][failed]))
1736
        raise errors.OpExecError("ssh/hostname verification failed.")
1737

    
1738
    # Distribute updated /etc/hosts and known_hosts to all nodes,
1739
    # including the node just added
1740
    myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1741
    dist_nodes = self.cfg.GetNodeList()
1742
    if not self.op.readd:
1743
      dist_nodes.append(node)
1744
    if myself.name in dist_nodes:
1745
      dist_nodes.remove(myself.name)
1746

    
1747
    logger.Debug("Copying hosts and known_hosts to all nodes")
1748
    for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1749
      result = rpc.call_upload_file(dist_nodes, fname)
1750
      for to_node in dist_nodes:
1751
        if not result[to_node]:
1752
          logger.Error("copy of file %s to node %s failed" %
1753
                       (fname, to_node))
1754

    
1755
    to_copy = self.sstore.GetFileList()
1756
    if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1757
      to_copy.append(constants.VNC_PASSWORD_FILE)
1758
    for fname in to_copy:
1759
      result = rpc.call_upload_file([node], fname)
1760
      if not result[node]:
1761
        logger.Error("could not copy file %s to node %s" % (fname, node))
1762

    
1763
    if self.op.readd:
1764
      self.context.ReaddNode(new_node)
1765
    else:
1766
      self.context.AddNode(new_node)
1767

    
1768

    
1769
class LUQueryClusterInfo(NoHooksLU):
1770
  """Query cluster configuration.
1771

1772
  """
1773
  _OP_REQP = []
1774
  REQ_MASTER = False
1775
  REQ_BGL = False
1776

    
1777
  def ExpandNames(self):
1778
    self.needed_locks = {}
1779

    
1780
  def CheckPrereq(self):
1781
    """No prerequsites needed for this LU.
1782

1783
    """
1784
    pass
1785

    
1786
  def Exec(self, feedback_fn):
1787
    """Return cluster config.
1788

1789
    """
1790
    result = {
1791
      "name": self.sstore.GetClusterName(),
1792
      "software_version": constants.RELEASE_VERSION,
1793
      "protocol_version": constants.PROTOCOL_VERSION,
1794
      "config_version": constants.CONFIG_VERSION,
1795
      "os_api_version": constants.OS_API_VERSION,
1796
      "export_version": constants.EXPORT_VERSION,
1797
      "master": self.sstore.GetMasterNode(),
1798
      "architecture": (platform.architecture()[0], platform.machine()),
1799
      "hypervisor_type": self.sstore.GetHypervisorType(),
1800
      }
1801

    
1802
    return result
1803

    
1804

    
1805
class LUDumpClusterConfig(NoHooksLU):
1806
  """Return a text-representation of the cluster-config.
1807

1808
  """
1809
  _OP_REQP = []
1810
  REQ_BGL = False
1811

    
1812
  def ExpandNames(self):
1813
    self.needed_locks = {}
1814

    
1815
  def CheckPrereq(self):
1816
    """No prerequisites.
1817

1818
    """
1819
    pass
1820

    
1821
  def Exec(self, feedback_fn):
1822
    """Dump a representation of the cluster config to the standard output.
1823

1824
    """
1825
    return self.cfg.DumpConfig()
1826

    
1827

    
1828
class LUActivateInstanceDisks(NoHooksLU):
1829
  """Bring up an instance's disks.
1830

1831
  """
1832
  _OP_REQP = ["instance_name"]
1833

    
1834
  def CheckPrereq(self):
1835
    """Check prerequisites.
1836

1837
    This checks that the instance is in the cluster.
1838

1839
    """
1840
    instance = self.cfg.GetInstanceInfo(
1841
      self.cfg.ExpandInstanceName(self.op.instance_name))
1842
    if instance is None:
1843
      raise errors.OpPrereqError("Instance '%s' not known" %
1844
                                 self.op.instance_name)
1845
    self.instance = instance
1846

    
1847

    
1848
  def Exec(self, feedback_fn):
1849
    """Activate the disks.
1850

1851
    """
1852
    disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
1853
    if not disks_ok:
1854
      raise errors.OpExecError("Cannot activate block devices")
1855

    
1856
    return disks_info
1857

    
1858

    
1859
def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
1860
  """Prepare the block devices for an instance.
1861

1862
  This sets up the block devices on all nodes.
1863

1864
  Args:
1865
    instance: a ganeti.objects.Instance object
1866
    ignore_secondaries: if true, errors on secondary nodes won't result
1867
                        in an error return from the function
1868

1869
  Returns:
1870
    false if the operation failed
1871
    list of (host, instance_visible_name, node_visible_name) if the operation
1872
         suceeded with the mapping from node devices to instance devices
1873
  """
1874
  device_info = []
1875
  disks_ok = True
1876
  iname = instance.name
1877
  # With the two passes mechanism we try to reduce the window of
1878
  # opportunity for the race condition of switching DRBD to primary
1879
  # before handshaking occured, but we do not eliminate it
1880

    
1881
  # The proper fix would be to wait (with some limits) until the
1882
  # connection has been made and drbd transitions from WFConnection
1883
  # into any other network-connected state (Connected, SyncTarget,
1884
  # SyncSource, etc.)
1885

    
1886
  # 1st pass, assemble on all nodes in secondary mode
1887
  for inst_disk in instance.disks:
1888
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1889
      cfg.SetDiskID(node_disk, node)
1890
      result = rpc.call_blockdev_assemble(node, node_disk, iname, False)
1891
      if not result:
1892
        logger.Error("could not prepare block device %s on node %s"
1893
                     " (is_primary=False, pass=1)" % (inst_disk.iv_name, node))
1894
        if not ignore_secondaries:
1895
          disks_ok = False
1896

    
1897
  # FIXME: race condition on drbd migration to primary
1898

    
1899
  # 2nd pass, do only the primary node
1900
  for inst_disk in instance.disks:
1901
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1902
      if node != instance.primary_node:
1903
        continue
1904
      cfg.SetDiskID(node_disk, node)
1905
      result = rpc.call_blockdev_assemble(node, node_disk, iname, True)
1906
      if not result:
1907
        logger.Error("could not prepare block device %s on node %s"
1908
                     " (is_primary=True, pass=2)" % (inst_disk.iv_name, node))
1909
        disks_ok = False
1910
    device_info.append((instance.primary_node, inst_disk.iv_name, result))
1911

    
1912
  # leave the disks configured for the primary node
1913
  # this is a workaround that would be fixed better by
1914
  # improving the logical/physical id handling
1915
  for disk in instance.disks:
1916
    cfg.SetDiskID(disk, instance.primary_node)
1917

    
1918
  return disks_ok, device_info
1919

    
1920

    
1921
def _StartInstanceDisks(cfg, instance, force):
1922
  """Start the disks of an instance.
1923

1924
  """
1925
  disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
1926
                                           ignore_secondaries=force)
1927
  if not disks_ok:
1928
    _ShutdownInstanceDisks(instance, cfg)
1929
    if force is not None and not force:
1930
      logger.Error("If the message above refers to a secondary node,"
1931
                   " you can retry the operation using '--force'.")
1932
    raise errors.OpExecError("Disk consistency error")
1933

    
1934

    
1935
class LUDeactivateInstanceDisks(NoHooksLU):
1936
  """Shutdown an instance's disks.
1937

1938
  """
1939
  _OP_REQP = ["instance_name"]
1940

    
1941
  def CheckPrereq(self):
1942
    """Check prerequisites.
1943

1944
    This checks that the instance is in the cluster.
1945

1946
    """
1947
    instance = self.cfg.GetInstanceInfo(
1948
      self.cfg.ExpandInstanceName(self.op.instance_name))
1949
    if instance is None:
1950
      raise errors.OpPrereqError("Instance '%s' not known" %
1951
                                 self.op.instance_name)
1952
    self.instance = instance
1953

    
1954
  def Exec(self, feedback_fn):
1955
    """Deactivate the disks
1956

1957
    """
1958
    instance = self.instance
1959
    ins_l = rpc.call_instance_list([instance.primary_node])
1960
    ins_l = ins_l[instance.primary_node]
1961
    if not type(ins_l) is list:
1962
      raise errors.OpExecError("Can't contact node '%s'" %
1963
                               instance.primary_node)
1964

    
1965
    if self.instance.name in ins_l:
1966
      raise errors.OpExecError("Instance is running, can't shutdown"
1967
                               " block devices.")
1968

    
1969
    _ShutdownInstanceDisks(instance, self.cfg)
1970

    
1971

    
1972
def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
1973
  """Shutdown block devices of an instance.
1974

1975
  This does the shutdown on all nodes of the instance.
1976

1977
  If the ignore_primary is false, errors on the primary node are
1978
  ignored.
1979

1980
  """
1981
  result = True
1982
  for disk in instance.disks:
1983
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
1984
      cfg.SetDiskID(top_disk, node)
1985
      if not rpc.call_blockdev_shutdown(node, top_disk):
1986
        logger.Error("could not shutdown block device %s on node %s" %
1987
                     (disk.iv_name, node))
1988
        if not ignore_primary or node != instance.primary_node:
1989
          result = False
1990
  return result
1991

    
1992

    
1993
def _CheckNodeFreeMemory(cfg, node, reason, requested):
1994
  """Checks if a node has enough free memory.
1995

1996
  This function check if a given node has the needed amount of free
1997
  memory. In case the node has less memory or we cannot get the
1998
  information from the node, this function raise an OpPrereqError
1999
  exception.
2000

2001
  Args:
2002
    - cfg: a ConfigWriter instance
2003
    - node: the node name
2004
    - reason: string to use in the error message
2005
    - requested: the amount of memory in MiB
2006

2007
  """
2008
  nodeinfo = rpc.call_node_info([node], cfg.GetVGName())
2009
  if not nodeinfo or not isinstance(nodeinfo, dict):
2010
    raise errors.OpPrereqError("Could not contact node %s for resource"
2011
                             " information" % (node,))
2012

    
2013
  free_mem = nodeinfo[node].get('memory_free')
2014
  if not isinstance(free_mem, int):
2015
    raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2016
                             " was '%s'" % (node, free_mem))
2017
  if requested > free_mem:
2018
    raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2019
                             " needed %s MiB, available %s MiB" %
2020
                             (node, reason, requested, free_mem))
2021

    
2022

    
2023
class LUStartupInstance(LogicalUnit):
2024
  """Starts an instance.
2025

2026
  """
2027
  HPATH = "instance-start"
2028
  HTYPE = constants.HTYPE_INSTANCE
2029
  _OP_REQP = ["instance_name", "force"]
2030
  REQ_BGL = False
2031

    
2032
  def ExpandNames(self):
2033
    self._ExpandAndLockInstance()
2034
    self.needed_locks[locking.LEVEL_NODE] = []
2035
    self.recalculate_locks[locking.LEVEL_NODE] = 'replace'
2036

    
2037
  def DeclareLocks(self, level):
2038
    if level == locking.LEVEL_NODE:
2039
      self._LockInstancesNodes()
2040

    
2041
  def BuildHooksEnv(self):
2042
    """Build hooks env.
2043

2044
    This runs on master, primary and secondary nodes of the instance.
2045

2046
    """
2047
    env = {
2048
      "FORCE": self.op.force,
2049
      }
2050
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2051
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2052
          list(self.instance.secondary_nodes))
2053
    return env, nl, nl
2054

    
2055
  def CheckPrereq(self):
2056
    """Check prerequisites.
2057

2058
    This checks that the instance is in the cluster.
2059

2060
    """
2061
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2062
    assert self.instance is not None, \
2063
      "Cannot retrieve locked instance %s" % self.op.instance_name
2064

    
2065
    # check bridges existance
2066
    _CheckInstanceBridgesExist(instance)
2067

    
2068
    _CheckNodeFreeMemory(self.cfg, instance.primary_node,
2069
                         "starting instance %s" % instance.name,
2070
                         instance.memory)
2071

    
2072
  def Exec(self, feedback_fn):
2073
    """Start the instance.
2074

2075
    """
2076
    instance = self.instance
2077
    force = self.op.force
2078
    extra_args = getattr(self.op, "extra_args", "")
2079

    
2080
    self.cfg.MarkInstanceUp(instance.name)
2081

    
2082
    node_current = instance.primary_node
2083

    
2084
    _StartInstanceDisks(self.cfg, instance, force)
2085

    
2086
    if not rpc.call_instance_start(node_current, instance, extra_args):
2087
      _ShutdownInstanceDisks(instance, self.cfg)
2088
      raise errors.OpExecError("Could not start instance")
2089

    
2090

    
2091
class LURebootInstance(LogicalUnit):
2092
  """Reboot an instance.
2093

2094
  """
2095
  HPATH = "instance-reboot"
2096
  HTYPE = constants.HTYPE_INSTANCE
2097
  _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2098
  REQ_BGL = False
2099

    
2100
  def ExpandNames(self):
2101
    if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2102
                                   constants.INSTANCE_REBOOT_HARD,
2103
                                   constants.INSTANCE_REBOOT_FULL]:
2104
      raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2105
                                  (constants.INSTANCE_REBOOT_SOFT,
2106
                                   constants.INSTANCE_REBOOT_HARD,
2107
                                   constants.INSTANCE_REBOOT_FULL))
2108
    self._ExpandAndLockInstance()
2109
    self.needed_locks[locking.LEVEL_NODE] = []
2110
    self.recalculate_locks[locking.LEVEL_NODE] = 'replace'
2111

    
2112
  def DeclareLocks(self, level):
2113
    if level == locking.LEVEL_NODE:
2114
      # FIXME: lock only primary on (not constants.INSTANCE_REBOOT_FULL)
2115
      self._LockInstancesNodes()
2116

    
2117
  def BuildHooksEnv(self):
2118
    """Build hooks env.
2119

2120
    This runs on master, primary and secondary nodes of the instance.
2121

2122
    """
2123
    env = {
2124
      "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2125
      }
2126
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2127
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2128
          list(self.instance.secondary_nodes))
2129
    return env, nl, nl
2130

    
2131
  def CheckPrereq(self):
2132
    """Check prerequisites.
2133

2134
    This checks that the instance is in the cluster.
2135

2136
    """
2137
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2138
    assert self.instance is not None, \
2139
      "Cannot retrieve locked instance %s" % self.op.instance_name
2140

    
2141
    # check bridges existance
2142
    _CheckInstanceBridgesExist(instance)
2143

    
2144
  def Exec(self, feedback_fn):
2145
    """Reboot the instance.
2146

2147
    """
2148
    instance = self.instance
2149
    ignore_secondaries = self.op.ignore_secondaries
2150
    reboot_type = self.op.reboot_type
2151
    extra_args = getattr(self.op, "extra_args", "")
2152

    
2153
    node_current = instance.primary_node
2154

    
2155
    if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2156
                       constants.INSTANCE_REBOOT_HARD]:
2157
      if not rpc.call_instance_reboot(node_current, instance,
2158
                                      reboot_type, extra_args):
2159
        raise errors.OpExecError("Could not reboot instance")
2160
    else:
2161
      if not rpc.call_instance_shutdown(node_current, instance):
2162
        raise errors.OpExecError("could not shutdown instance for full reboot")
2163
      _ShutdownInstanceDisks(instance, self.cfg)
2164
      _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
2165
      if not rpc.call_instance_start(node_current, instance, extra_args):
2166
        _ShutdownInstanceDisks(instance, self.cfg)
2167
        raise errors.OpExecError("Could not start instance for full reboot")
2168

    
2169
    self.cfg.MarkInstanceUp(instance.name)
2170

    
2171

    
2172
class LUShutdownInstance(LogicalUnit):
2173
  """Shutdown an instance.
2174

2175
  """
2176
  HPATH = "instance-stop"
2177
  HTYPE = constants.HTYPE_INSTANCE
2178
  _OP_REQP = ["instance_name"]
2179
  REQ_BGL = False
2180

    
2181
  def ExpandNames(self):
2182
    self._ExpandAndLockInstance()
2183
    self.needed_locks[locking.LEVEL_NODE] = []
2184
    self.recalculate_locks[locking.LEVEL_NODE] = 'replace'
2185

    
2186
  def DeclareLocks(self, level):
2187
    if level == locking.LEVEL_NODE:
2188
      self._LockInstancesNodes()
2189

    
2190
  def BuildHooksEnv(self):
2191
    """Build hooks env.
2192

2193
    This runs on master, primary and secondary nodes of the instance.
2194

2195
    """
2196
    env = _BuildInstanceHookEnvByObject(self.instance)
2197
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2198
          list(self.instance.secondary_nodes))
2199
    return env, nl, nl
2200

    
2201
  def CheckPrereq(self):
2202
    """Check prerequisites.
2203

2204
    This checks that the instance is in the cluster.
2205

2206
    """
2207
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2208
    assert self.instance is not None, \
2209
      "Cannot retrieve locked instance %s" % self.op.instance_name
2210

    
2211
  def Exec(self, feedback_fn):
2212
    """Shutdown the instance.
2213

2214
    """
2215
    instance = self.instance
2216
    node_current = instance.primary_node
2217
    self.cfg.MarkInstanceDown(instance.name)
2218
    if not rpc.call_instance_shutdown(node_current, instance):
2219
      logger.Error("could not shutdown instance")
2220

    
2221
    _ShutdownInstanceDisks(instance, self.cfg)
2222

    
2223

    
2224
class LUReinstallInstance(LogicalUnit):
2225
  """Reinstall an instance.
2226

2227
  """
2228
  HPATH = "instance-reinstall"
2229
  HTYPE = constants.HTYPE_INSTANCE
2230
  _OP_REQP = ["instance_name"]
2231
  REQ_BGL = False
2232

    
2233
  def ExpandNames(self):
2234
    self._ExpandAndLockInstance()
2235
    self.needed_locks[locking.LEVEL_NODE] = []
2236
    self.recalculate_locks[locking.LEVEL_NODE] = 'replace'
2237

    
2238
  def DeclareLocks(self, level):
2239
    if level == locking.LEVEL_NODE:
2240
      self._LockInstancesNodes()
2241

    
2242
  def BuildHooksEnv(self):
2243
    """Build hooks env.
2244

2245
    This runs on master, primary and secondary nodes of the instance.
2246

2247
    """
2248
    env = _BuildInstanceHookEnvByObject(self.instance)
2249
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2250
          list(self.instance.secondary_nodes))
2251
    return env, nl, nl
2252

    
2253
  def CheckPrereq(self):
2254
    """Check prerequisites.
2255

2256
    This checks that the instance is in the cluster and is not running.
2257

2258
    """
2259
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2260
    assert instance is not None, \
2261
      "Cannot retrieve locked instance %s" % self.op.instance_name
2262

    
2263
    if instance.disk_template == constants.DT_DISKLESS:
2264
      raise errors.OpPrereqError("Instance '%s' has no disks" %
2265
                                 self.op.instance_name)
2266
    if instance.status != "down":
2267
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2268
                                 self.op.instance_name)
2269
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2270
    if remote_info:
2271
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2272
                                 (self.op.instance_name,
2273
                                  instance.primary_node))
2274

    
2275
    self.op.os_type = getattr(self.op, "os_type", None)
2276
    if self.op.os_type is not None:
2277
      # OS verification
2278
      pnode = self.cfg.GetNodeInfo(
2279
        self.cfg.ExpandNodeName(instance.primary_node))
2280
      if pnode is None:
2281
        raise errors.OpPrereqError("Primary node '%s' is unknown" %
2282
                                   self.op.pnode)
2283
      os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2284
      if not os_obj:
2285
        raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2286
                                   " primary node"  % self.op.os_type)
2287

    
2288
    self.instance = instance
2289

    
2290
  def Exec(self, feedback_fn):
2291
    """Reinstall the instance.
2292

2293
    """
2294
    inst = self.instance
2295

    
2296
    if self.op.os_type is not None:
2297
      feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2298
      inst.os = self.op.os_type
2299
      self.cfg.AddInstance(inst)
2300

    
2301
    _StartInstanceDisks(self.cfg, inst, None)
2302
    try:
2303
      feedback_fn("Running the instance OS create scripts...")
2304
      if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2305
        raise errors.OpExecError("Could not install OS for instance %s"
2306
                                 " on node %s" %
2307
                                 (inst.name, inst.primary_node))
2308
    finally:
2309
      _ShutdownInstanceDisks(inst, self.cfg)
2310

    
2311

    
2312
class LURenameInstance(LogicalUnit):
2313
  """Rename an instance.
2314

2315
  """
2316
  HPATH = "instance-rename"
2317
  HTYPE = constants.HTYPE_INSTANCE
2318
  _OP_REQP = ["instance_name", "new_name"]
2319

    
2320
  def BuildHooksEnv(self):
2321
    """Build hooks env.
2322

2323
    This runs on master, primary and secondary nodes of the instance.
2324

2325
    """
2326
    env = _BuildInstanceHookEnvByObject(self.instance)
2327
    env["INSTANCE_NEW_NAME"] = self.op.new_name
2328
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2329
          list(self.instance.secondary_nodes))
2330
    return env, nl, nl
2331

    
2332
  def CheckPrereq(self):
2333
    """Check prerequisites.
2334

2335
    This checks that the instance is in the cluster and is not running.
2336

2337
    """
2338
    instance = self.cfg.GetInstanceInfo(
2339
      self.cfg.ExpandInstanceName(self.op.instance_name))
2340
    if instance is None:
2341
      raise errors.OpPrereqError("Instance '%s' not known" %
2342
                                 self.op.instance_name)
2343
    if instance.status != "down":
2344
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2345
                                 self.op.instance_name)
2346
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2347
    if remote_info:
2348
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2349
                                 (self.op.instance_name,
2350
                                  instance.primary_node))
2351
    self.instance = instance
2352

    
2353
    # new name verification
2354
    name_info = utils.HostInfo(self.op.new_name)
2355

    
2356
    self.op.new_name = new_name = name_info.name
2357
    instance_list = self.cfg.GetInstanceList()
2358
    if new_name in instance_list:
2359
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2360
                                 new_name)
2361

    
2362
    if not getattr(self.op, "ignore_ip", False):
2363
      if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
2364
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
2365
                                   (name_info.ip, new_name))
2366

    
2367

    
2368
  def Exec(self, feedback_fn):
2369
    """Reinstall the instance.
2370

2371
    """
2372
    inst = self.instance
2373
    old_name = inst.name
2374

    
2375
    if inst.disk_template == constants.DT_FILE:
2376
      old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2377

    
2378
    self.cfg.RenameInstance(inst.name, self.op.new_name)
2379
    # Change the instance lock. This is definitely safe while we hold the BGL
2380
    self.context.glm.remove(locking.LEVEL_INSTANCE, inst.name)
2381
    self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
2382

    
2383
    # re-read the instance from the configuration after rename
2384
    inst = self.cfg.GetInstanceInfo(self.op.new_name)
2385

    
2386
    if inst.disk_template == constants.DT_FILE:
2387
      new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2388
      result = rpc.call_file_storage_dir_rename(inst.primary_node,
2389
                                                old_file_storage_dir,
2390
                                                new_file_storage_dir)
2391

    
2392
      if not result:
2393
        raise errors.OpExecError("Could not connect to node '%s' to rename"
2394
                                 " directory '%s' to '%s' (but the instance"
2395
                                 " has been renamed in Ganeti)" % (
2396
                                 inst.primary_node, old_file_storage_dir,
2397
                                 new_file_storage_dir))
2398

    
2399
      if not result[0]:
2400
        raise errors.OpExecError("Could not rename directory '%s' to '%s'"
2401
                                 " (but the instance has been renamed in"
2402
                                 " Ganeti)" % (old_file_storage_dir,
2403
                                               new_file_storage_dir))
2404

    
2405
    _StartInstanceDisks(self.cfg, inst, None)
2406
    try:
2407
      if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2408
                                          "sda", "sdb"):
2409
        msg = ("Could not run OS rename script for instance %s on node %s"
2410
               " (but the instance has been renamed in Ganeti)" %
2411
               (inst.name, inst.primary_node))
2412
        logger.Error(msg)
2413
    finally:
2414
      _ShutdownInstanceDisks(inst, self.cfg)
2415

    
2416

    
2417
class LURemoveInstance(LogicalUnit):
2418
  """Remove an instance.
2419

2420
  """
2421
  HPATH = "instance-remove"
2422
  HTYPE = constants.HTYPE_INSTANCE
2423
  _OP_REQP = ["instance_name", "ignore_failures"]
2424

    
2425
  def BuildHooksEnv(self):
2426
    """Build hooks env.
2427

2428
    This runs on master, primary and secondary nodes of the instance.
2429

2430
    """
2431
    env = _BuildInstanceHookEnvByObject(self.instance)
2432
    nl = [self.sstore.GetMasterNode()]
2433
    return env, nl, nl
2434

    
2435
  def CheckPrereq(self):
2436
    """Check prerequisites.
2437

2438
    This checks that the instance is in the cluster.
2439

2440
    """
2441
    instance = self.cfg.GetInstanceInfo(
2442
      self.cfg.ExpandInstanceName(self.op.instance_name))
2443
    if instance is None:
2444
      raise errors.OpPrereqError("Instance '%s' not known" %
2445
                                 self.op.instance_name)
2446
    self.instance = instance
2447

    
2448
  def Exec(self, feedback_fn):
2449
    """Remove the instance.
2450

2451
    """
2452
    instance = self.instance
2453
    logger.Info("shutting down instance %s on node %s" %
2454
                (instance.name, instance.primary_node))
2455

    
2456
    if not rpc.call_instance_shutdown(instance.primary_node, instance):
2457
      if self.op.ignore_failures:
2458
        feedback_fn("Warning: can't shutdown instance")
2459
      else:
2460
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2461
                                 (instance.name, instance.primary_node))
2462

    
2463
    logger.Info("removing block devices for instance %s" % instance.name)
2464

    
2465
    if not _RemoveDisks(instance, self.cfg):
2466
      if self.op.ignore_failures:
2467
        feedback_fn("Warning: can't remove instance's disks")
2468
      else:
2469
        raise errors.OpExecError("Can't remove instance's disks")
2470

    
2471
    logger.Info("removing instance %s out of cluster config" % instance.name)
2472

    
2473
    self.cfg.RemoveInstance(instance.name)
2474
    # Remove the new instance from the Ganeti Lock Manager
2475
    self.context.glm.remove(locking.LEVEL_INSTANCE, instance.name)
2476

    
2477

    
2478
class LUQueryInstances(NoHooksLU):
2479
  """Logical unit for querying instances.
2480

2481
  """
2482
  _OP_REQP = ["output_fields", "names"]
2483
  REQ_BGL = False
2484

    
2485
  def ExpandNames(self):
2486
    self.dynamic_fields = frozenset(["oper_state", "oper_ram", "status"])
2487
    _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2488
                               "admin_state", "admin_ram",
2489
                               "disk_template", "ip", "mac", "bridge",
2490
                               "sda_size", "sdb_size", "vcpus", "tags",
2491
                               "auto_balance",
2492
                               "network_port", "kernel_path", "initrd_path",
2493
                               "hvm_boot_order", "hvm_acpi", "hvm_pae",
2494
                               "hvm_cdrom_image_path", "hvm_nic_type",
2495
                               "hvm_disk_type", "vnc_bind_address"],
2496
                       dynamic=self.dynamic_fields,
2497
                       selected=self.op.output_fields)
2498

    
2499
    self.needed_locks = {}
2500
    self.share_locks[locking.LEVEL_INSTANCE] = 1
2501
    self.share_locks[locking.LEVEL_NODE] = 1
2502

    
2503
    # TODO: we could lock instances (and nodes) only if the user asked for
2504
    # dynamic fields. For that we need atomic ways to get info for a group of
2505
    # instances from the config, though.
2506
    if not self.op.names:
2507
      self.needed_locks[locking.LEVEL_INSTANCE] = None # Acquire all
2508
    else:
2509
      self.needed_locks[locking.LEVEL_INSTANCE] = \
2510
        _GetWantedInstances(self, self.op.names)
2511

    
2512
    self.needed_locks[locking.LEVEL_NODE] = []
2513
    self.recalculate_locks[locking.LEVEL_NODE] = 'replace'
2514

    
2515
  def DeclareLocks(self, level):
2516
    # TODO: locking of nodes could be avoided when not querying them
2517
    if level == locking.LEVEL_NODE:
2518
      self._LockInstancesNodes()
2519

    
2520
  def CheckPrereq(self):
2521
    """Check prerequisites.
2522

2523
    """
2524
    # This of course is valid only if we locked the instances
2525
    self.wanted = self.acquired_locks[locking.LEVEL_INSTANCE]
2526

    
2527
  def Exec(self, feedback_fn):
2528
    """Computes the list of nodes and their attributes.
2529

2530
    """
2531
    instance_names = self.wanted
2532
    instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2533
                     in instance_names]
2534

    
2535
    # begin data gathering
2536

    
2537
    nodes = frozenset([inst.primary_node for inst in instance_list])
2538

    
2539
    bad_nodes = []
2540
    if self.dynamic_fields.intersection(self.op.output_fields):
2541
      live_data = {}
2542
      node_data = rpc.call_all_instances_info(nodes)
2543
      for name in nodes:
2544
        result = node_data[name]
2545
        if result:
2546
          live_data.update(result)
2547
        elif result == False:
2548
          bad_nodes.append(name)
2549
        # else no instance is alive
2550
    else:
2551
      live_data = dict([(name, {}) for name in instance_names])
2552

    
2553
    # end data gathering
2554

    
2555
    output = []
2556
    for instance in instance_list:
2557
      iout = []
2558
      for field in self.op.output_fields:
2559
        if field == "name":
2560
          val = instance.name
2561
        elif field == "os":
2562
          val = instance.os
2563
        elif field == "pnode":
2564
          val = instance.primary_node
2565
        elif field == "snodes":
2566
          val = list(instance.secondary_nodes)
2567
        elif field == "admin_state":
2568
          val = (instance.status != "down")
2569
        elif field == "oper_state":
2570
          if instance.primary_node in bad_nodes:
2571
            val = None
2572
          else:
2573
            val = bool(live_data.get(instance.name))
2574
        elif field == "status":
2575
          if instance.primary_node in bad_nodes:
2576
            val = "ERROR_nodedown"
2577
          else:
2578
            running = bool(live_data.get(instance.name))
2579
            if running:
2580
              if instance.status != "down":
2581
                val = "running"
2582
              else:
2583
                val = "ERROR_up"
2584
            else:
2585
              if instance.status != "down":
2586
                val = "ERROR_down"
2587
              else:
2588
                val = "ADMIN_down"
2589
        elif field == "admin_ram":
2590
          val = instance.memory
2591
        elif field == "oper_ram":
2592
          if instance.primary_node in bad_nodes:
2593
            val = None
2594
          elif instance.name in live_data:
2595
            val = live_data[instance.name].get("memory", "?")
2596
          else:
2597
            val = "-"
2598
        elif field == "disk_template":
2599
          val = instance.disk_template
2600
        elif field == "ip":
2601
          val = instance.nics[0].ip
2602
        elif field == "bridge":
2603
          val = instance.nics[0].bridge
2604
        elif field == "mac":
2605
          val = instance.nics[0].mac
2606
        elif field == "sda_size" or field == "sdb_size":
2607
          disk = instance.FindDisk(field[:3])
2608
          if disk is None:
2609
            val = None
2610
          else:
2611
            val = disk.size
2612
        elif field == "vcpus":
2613
          val = instance.vcpus
2614
        elif field == "tags":
2615
          val = list(instance.GetTags())
2616
        elif field in ("network_port", "kernel_path", "initrd_path",
2617
                       "hvm_boot_order", "hvm_acpi", "hvm_pae",
2618
                       "hvm_cdrom_image_path", "hvm_nic_type",
2619
                       "hvm_disk_type", "vnc_bind_address"):
2620
          val = getattr(instance, field, None)
2621
          if val is not None:
2622
            pass
2623
          elif field in ("hvm_nic_type", "hvm_disk_type",
2624
                         "kernel_path", "initrd_path"):
2625
            val = "default"
2626
          else:
2627
            val = "-"
2628
        else:
2629
          raise errors.ParameterError(field)
2630
        iout.append(val)
2631
      output.append(iout)
2632

    
2633
    return output
2634

    
2635

    
2636
class LUFailoverInstance(LogicalUnit):
2637
  """Failover an instance.
2638

2639
  """
2640
  HPATH = "instance-failover"
2641
  HTYPE = constants.HTYPE_INSTANCE
2642
  _OP_REQP = ["instance_name", "ignore_consistency"]
2643
  REQ_BGL = False
2644

    
2645
  def ExpandNames(self):
2646
    self._ExpandAndLockInstance()
2647
    self.needed_locks[locking.LEVEL_NODE] = []
2648
    self.recalculate_locks[locking.LEVEL_NODE] = 'replace'
2649

    
2650
  def DeclareLocks(self, level):
2651
    if level == locking.LEVEL_NODE:
2652
      self._LockInstancesNodes()
2653

    
2654
  def BuildHooksEnv(self):
2655
    """Build hooks env.
2656

2657
    This runs on master, primary and secondary nodes of the instance.
2658

2659
    """
2660
    env = {
2661
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2662
      }
2663
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2664
    nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2665
    return env, nl, nl
2666

    
2667
  def CheckPrereq(self):
2668
    """Check prerequisites.
2669

2670
    This checks that the instance is in the cluster.
2671

2672
    """
2673
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2674
    assert self.instance is not None, \
2675
      "Cannot retrieve locked instance %s" % self.op.instance_name
2676

    
2677
    if instance.disk_template not in constants.DTS_NET_MIRROR:
2678
      raise errors.OpPrereqError("Instance's disk layout is not"
2679
                                 " network mirrored, cannot failover.")
2680

    
2681
    secondary_nodes = instance.secondary_nodes
2682
    if not secondary_nodes:
2683
      raise errors.ProgrammerError("no secondary node but using "
2684
                                   "a mirrored disk template")
2685

    
2686
    target_node = secondary_nodes[0]
2687
    # check memory requirements on the secondary node
2688
    _CheckNodeFreeMemory(self.cfg, target_node, "failing over instance %s" %
2689
                         instance.name, instance.memory)
2690

    
2691
    # check bridge existance
2692
    brlist = [nic.bridge for nic in instance.nics]
2693
    if not rpc.call_bridges_exist(target_node, brlist):
2694
      raise errors.OpPrereqError("One or more target bridges %s does not"
2695
                                 " exist on destination node '%s'" %
2696
                                 (brlist, target_node))
2697

    
2698
  def Exec(self, feedback_fn):
2699
    """Failover an instance.
2700

2701
    The failover is done by shutting it down on its present node and
2702
    starting it on the secondary.
2703

2704
    """
2705
    instance = self.instance
2706

    
2707
    source_node = instance.primary_node
2708
    target_node = instance.secondary_nodes[0]
2709

    
2710
    feedback_fn("* checking disk consistency between source and target")
2711
    for dev in instance.disks:
2712
      # for drbd, these are drbd over lvm
2713
      if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2714
        if instance.status == "up" and not self.op.ignore_consistency:
2715
          raise errors.OpExecError("Disk %s is degraded on target node,"
2716
                                   " aborting failover." % dev.iv_name)
2717

    
2718
    feedback_fn("* shutting down instance on source node")
2719
    logger.Info("Shutting down instance %s on node %s" %
2720
                (instance.name, source_node))
2721

    
2722
    if not rpc.call_instance_shutdown(source_node, instance):
2723
      if self.op.ignore_consistency:
2724
        logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2725
                     " anyway. Please make sure node %s is down"  %
2726
                     (instance.name, source_node, source_node))
2727
      else:
2728
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2729
                                 (instance.name, source_node))
2730

    
2731
    feedback_fn("* deactivating the instance's disks on source node")
2732
    if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2733
      raise errors.OpExecError("Can't shut down the instance's disks.")
2734

    
2735
    instance.primary_node = target_node
2736
    # distribute new instance config to the other nodes
2737
    self.cfg.Update(instance)
2738

    
2739
    # Only start the instance if it's marked as up
2740
    if instance.status == "up":
2741
      feedback_fn("* activating the instance's disks on target node")
2742
      logger.Info("Starting instance %s on node %s" %
2743
                  (instance.name, target_node))
2744

    
2745
      disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2746
                                               ignore_secondaries=True)
2747
      if not disks_ok:
2748
        _ShutdownInstanceDisks(instance, self.cfg)
2749
        raise errors.OpExecError("Can't activate the instance's disks")
2750

    
2751
      feedback_fn("* starting the instance on the target node")
2752
      if not rpc.call_instance_start(target_node, instance, None):
2753
        _ShutdownInstanceDisks(instance, self.cfg)
2754
        raise errors.OpExecError("Could not start instance %s on node %s." %
2755
                                 (instance.name, target_node))
2756

    
2757

    
2758
def _CreateBlockDevOnPrimary(cfg, node, instance, device, info):
2759
  """Create a tree of block devices on the primary node.
2760

2761
  This always creates all devices.
2762

2763
  """
2764
  if device.children:
2765
    for child in device.children:
2766
      if not _CreateBlockDevOnPrimary(cfg, node, instance, child, info):
2767
        return False
2768

    
2769
  cfg.SetDiskID(device, node)
2770
  new_id = rpc.call_blockdev_create(node, device, device.size,
2771
                                    instance.name, True, info)
2772
  if not new_id:
2773
    return False
2774
  if device.physical_id is None:
2775
    device.physical_id = new_id
2776
  return True
2777

    
2778

    
2779
def _CreateBlockDevOnSecondary(cfg, node, instance, device, force, info):
2780
  """Create a tree of block devices on a secondary node.
2781

2782
  If this device type has to be created on secondaries, create it and
2783
  all its children.
2784

2785
  If not, just recurse to children keeping the same 'force' value.
2786

2787
  """
2788
  if device.CreateOnSecondary():
2789
    force = True
2790
  if device.children:
2791
    for child in device.children:
2792
      if not _CreateBlockDevOnSecondary(cfg, node, instance,
2793
                                        child, force, info):
2794
        return False
2795

    
2796
  if not force:
2797
    return True
2798
  cfg.SetDiskID(device, node)
2799
  new_id = rpc.call_blockdev_create(node, device, device.size,
2800
                                    instance.name, False, info)
2801
  if not new_id:
2802
    return False
2803
  if device.physical_id is None:
2804
    device.physical_id = new_id
2805
  return True
2806

    
2807

    
2808
def _GenerateUniqueNames(cfg, exts):
2809
  """Generate a suitable LV name.
2810

2811
  This will generate a logical volume name for the given instance.
2812

2813
  """
2814
  results = []
2815
  for val in exts:
2816
    new_id = cfg.GenerateUniqueID()
2817
    results.append("%s%s" % (new_id, val))
2818
  return results
2819

    
2820

    
2821
def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name):
2822
  """Generate a drbd8 device complete with its children.
2823

2824
  """
2825
  port = cfg.AllocatePort()
2826
  vgname = cfg.GetVGName()
2827
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2828
                          logical_id=(vgname, names[0]))
2829
  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2830
                          logical_id=(vgname, names[1]))
2831
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
2832
                          logical_id = (primary, secondary, port),
2833
                          children = [dev_data, dev_meta],
2834
                          iv_name=iv_name)
2835
  return drbd_dev
2836

    
2837

    
2838
def _GenerateDiskTemplate(cfg, template_name,
2839
                          instance_name, primary_node,
2840
                          secondary_nodes, disk_sz, swap_sz,
2841
                          file_storage_dir, file_driver):
2842
  """Generate the entire disk layout for a given template type.
2843

2844
  """
2845
  #TODO: compute space requirements
2846

    
2847
  vgname = cfg.GetVGName()
2848
  if template_name == constants.DT_DISKLESS:
2849
    disks = []
2850
  elif template_name == constants.DT_PLAIN:
2851
    if len(secondary_nodes) != 0:
2852
      raise errors.ProgrammerError("Wrong template configuration")
2853

    
2854
    names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
2855
    sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2856
                           logical_id=(vgname, names[0]),
2857
                           iv_name = "sda")
2858
    sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2859
                           logical_id=(vgname, names[1]),
2860
                           iv_name = "sdb")
2861
    disks = [sda_dev, sdb_dev]
2862
  elif template_name == constants.DT_DRBD8:
2863
    if len(secondary_nodes) != 1:
2864
      raise errors.ProgrammerError("Wrong template configuration")
2865
    remote_node = secondary_nodes[0]
2866
    names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2867
                                       ".sdb_data", ".sdb_meta"])
2868
    drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2869
                                         disk_sz, names[0:2], "sda")
2870
    drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2871
                                         swap_sz, names[2:4], "sdb")
2872
    disks = [drbd_sda_dev, drbd_sdb_dev]
2873
  elif template_name == constants.DT_FILE:
2874
    if len(secondary_nodes) != 0:
2875
      raise errors.ProgrammerError("Wrong template configuration")
2876

    
2877
    file_sda_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk_sz,
2878
                                iv_name="sda", logical_id=(file_driver,
2879
                                "%s/sda" % file_storage_dir))
2880
    file_sdb_dev = objects.Disk(dev_type=constants.LD_FILE, size=swap_sz,
2881
                                iv_name="sdb", logical_id=(file_driver,
2882
                                "%s/sdb" % file_storage_dir))
2883
    disks = [file_sda_dev, file_sdb_dev]
2884
  else:
2885
    raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
2886
  return disks
2887

    
2888

    
2889
def _GetInstanceInfoText(instance):
2890
  """Compute that text that should be added to the disk's metadata.
2891

2892
  """
2893
  return "originstname+%s" % instance.name
2894

    
2895

    
2896
def _CreateDisks(cfg, instance):
2897
  """Create all disks for an instance.
2898

2899
  This abstracts away some work from AddInstance.
2900

2901
  Args:
2902
    instance: the instance object
2903

2904
  Returns:
2905
    True or False showing the success of the creation process
2906

2907
  """
2908
  info = _GetInstanceInfoText(instance)
2909

    
2910
  if instance.disk_template == constants.DT_FILE:
2911
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
2912
    result = rpc.call_file_storage_dir_create(instance.primary_node,
2913
                                              file_storage_dir)
2914

    
2915
    if not result:
2916
      logger.Error("Could not connect to node '%s'" % instance.primary_node)
2917
      return False
2918

    
2919
    if not result[0]:
2920
      logger.Error("failed to create directory '%s'" % file_storage_dir)
2921
      return False
2922

    
2923
  for device in instance.disks:
2924
    logger.Info("creating volume %s for instance %s" %
2925
                (device.iv_name, instance.name))
2926
    #HARDCODE
2927
    for secondary_node in instance.secondary_nodes:
2928
      if not _CreateBlockDevOnSecondary(cfg, secondary_node, instance,
2929
                                        device, False, info):
2930
        logger.Error("failed to create volume %s (%s) on secondary node %s!" %
2931
                     (device.iv_name, device, secondary_node))
2932
        return False
2933
    #HARDCODE
2934
    if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
2935
                                    instance, device, info):
2936
      logger.Error("failed to create volume %s on primary!" %
2937
                   device.iv_name)
2938
      return False
2939

    
2940
  return True
2941

    
2942

    
2943
def _RemoveDisks(instance, cfg):
2944
  """Remove all disks for an instance.
2945

2946
  This abstracts away some work from `AddInstance()` and
2947
  `RemoveInstance()`. Note that in case some of the devices couldn't
2948
  be removed, the removal will continue with the other ones (compare
2949
  with `_CreateDisks()`).
2950

2951
  Args:
2952
    instance: the instance object
2953

2954
  Returns:
2955
    True or False showing the success of the removal proces
2956

2957
  """
2958
  logger.Info("removing block devices for instance %s" % instance.name)
2959

    
2960
  result = True
2961
  for device in instance.disks:
2962
    for node, disk in device.ComputeNodeTree(instance.primary_node):
2963
      cfg.SetDiskID(disk, node)
2964
      if not rpc.call_blockdev_remove(node, disk):
2965
        logger.Error("could not remove block device %s on node %s,"
2966
                     " continuing anyway" %
2967
                     (device.iv_name, node))
2968
        result = False
2969

    
2970
  if instance.disk_template == constants.DT_FILE:
2971
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
2972
    if not rpc.call_file_storage_dir_remove(instance.primary_node,
2973
                                            file_storage_dir):
2974
      logger.Error("could not remove directory '%s'" % file_storage_dir)
2975
      result = False
2976

    
2977
  return result
2978

    
2979

    
2980
def _ComputeDiskSize(disk_template, disk_size, swap_size):
2981
  """Compute disk size requirements in the volume group
2982

2983
  This is currently hard-coded for the two-drive layout.
2984

2985
  """
2986
  # Required free disk space as a function of disk and swap space
2987
  req_size_dict = {
2988
    constants.DT_DISKLESS: None,
2989
    constants.DT_PLAIN: disk_size + swap_size,
2990
    # 256 MB are added for drbd metadata, 128MB for each drbd device
2991
    constants.DT_DRBD8: disk_size + swap_size + 256,
2992
    constants.DT_FILE: None,
2993
  }
2994

    
2995
  if disk_template not in req_size_dict:
2996
    raise errors.ProgrammerError("Disk template '%s' size requirement"
2997
                                 " is unknown" %  disk_template)
2998

    
2999
  return req_size_dict[disk_template]
3000

    
3001

    
3002
class LUCreateInstance(LogicalUnit):
3003
  """Create an instance.
3004

3005
  """
3006
  HPATH = "instance-add"
3007
  HTYPE = constants.HTYPE_INSTANCE
3008
  _OP_REQP = ["instance_name", "mem_size", "disk_size",
3009
              "disk_template", "swap_size", "mode", "start", "vcpus",
3010
              "wait_for_sync", "ip_check", "mac"]
3011

    
3012
  def _RunAllocator(self):
3013
    """Run the allocator based on input opcode.
3014

3015
    """
3016
    disks = [{"size": self.op.disk_size, "mode": "w"},
3017
             {"size": self.op.swap_size, "mode": "w"}]
3018
    nics = [{"mac": self.op.mac, "ip": getattr(self.op, "ip", None),
3019
             "bridge": self.op.bridge}]
3020
    ial = IAllocator(self.cfg, self.sstore,
3021
                     mode=constants.IALLOCATOR_MODE_ALLOC,
3022
                     name=self.op.instance_name,
3023
                     disk_template=self.op.disk_template,
3024
                     tags=[],
3025
                     os=self.op.os_type,
3026
                     vcpus=self.op.vcpus,
3027
                     mem_size=self.op.mem_size,
3028
                     disks=disks,
3029
                     nics=nics,
3030
                     )
3031

    
3032
    ial.Run(self.op.iallocator)
3033

    
3034
    if not ial.success:
3035
      raise errors.OpPrereqError("Can't compute nodes using"
3036
                                 " iallocator '%s': %s" % (self.op.iallocator,
3037
                                                           ial.info))
3038
    if len(ial.nodes) != ial.required_nodes:
3039
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3040
                                 " of nodes (%s), required %s" %
3041
                                 (len(ial.nodes), ial.required_nodes))
3042
    self.op.pnode = ial.nodes[0]
3043
    logger.ToStdout("Selected nodes for the instance: %s" %
3044
                    (", ".join(ial.nodes),))
3045
    logger.Info("Selected nodes for instance %s via iallocator %s: %s" %
3046
                (self.op.instance_name, self.op.iallocator, ial.nodes))
3047
    if ial.required_nodes == 2:
3048
      self.op.snode = ial.nodes[1]
3049

    
3050
  def BuildHooksEnv(self):
3051
    """Build hooks env.
3052

3053
    This runs on master, primary and secondary nodes of the instance.
3054

3055
    """
3056
    env = {
3057
      "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
3058
      "INSTANCE_DISK_SIZE": self.op.disk_size,
3059
      "INSTANCE_SWAP_SIZE": self.op.swap_size,
3060
      "INSTANCE_ADD_MODE": self.op.mode,
3061
      }
3062
    if self.op.mode == constants.INSTANCE_IMPORT:
3063
      env["INSTANCE_SRC_NODE"] = self.op.src_node
3064
      env["INSTANCE_SRC_PATH"] = self.op.src_path
3065
      env["INSTANCE_SRC_IMAGE"] = self.src_image
3066

    
3067
    env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
3068
      primary_node=self.op.pnode,
3069
      secondary_nodes=self.secondaries,
3070
      status=self.instance_status,
3071
      os_type=self.op.os_type,
3072
      memory=self.op.mem_size,
3073
      vcpus=self.op.vcpus,
3074
      nics=[(self.inst_ip, self.op.bridge, self.op.mac)],
3075
    ))
3076

    
3077
    nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
3078
          self.secondaries)
3079
    return env, nl, nl
3080

    
3081

    
3082
  def CheckPrereq(self):
3083
    """Check prerequisites.
3084

3085
    """
3086
    # set optional parameters to none if they don't exist
3087
    for attr in ["kernel_path", "initrd_path", "hvm_boot_order", "pnode",
3088
                 "iallocator", "hvm_acpi", "hvm_pae", "hvm_cdrom_image_path",
3089
                 "hvm_nic_type", "hvm_disk_type", "vnc_bind_address"]:
3090
      if not hasattr(self.op, attr):
3091
        setattr(self.op, attr, None)
3092

    
3093
    if self.op.mode not in (constants.INSTANCE_CREATE,
3094
                            constants.INSTANCE_IMPORT):
3095
      raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
3096
                                 self.op.mode)
3097

    
3098
    if (not self.cfg.GetVGName() and
3099
        self.op.disk_template not in constants.DTS_NOT_LVM):
3100
      raise errors.OpPrereqError("Cluster does not support lvm-based"
3101
                                 " instances")
3102

    
3103
    if self.op.mode == constants.INSTANCE_IMPORT:
3104
      src_node = getattr(self.op, "src_node", None)
3105
      src_path = getattr(self.op, "src_path", None)
3106
      if src_node is None or src_path is None:
3107
        raise errors.OpPrereqError("Importing an instance requires source"
3108
                                   " node and path options")
3109
      src_node_full = self.cfg.ExpandNodeName(src_node)
3110
      if src_node_full is None:
3111
        raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
3112
      self.op.src_node = src_node = src_node_full
3113

    
3114
      if not os.path.isabs(src_path):
3115
        raise errors.OpPrereqError("The source path must be absolute")
3116

    
3117
      export_info = rpc.call_export_info(src_node, src_path)
3118

    
3119
      if not export_info:
3120
        raise errors.OpPrereqError("No export found in dir %s" % src_path)
3121

    
3122
      if not export_info.has_section(constants.INISECT_EXP):
3123
        raise errors.ProgrammerError("Corrupted export config")
3124

    
3125
      ei_version = export_info.get(constants.INISECT_EXP, 'version')
3126
      if (int(ei_version) != constants.EXPORT_VERSION):
3127
        raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3128
                                   (ei_version, constants.EXPORT_VERSION))
3129

    
3130
      if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
3131
        raise errors.OpPrereqError("Can't import instance with more than"
3132
                                   " one data disk")
3133

    
3134
      # FIXME: are the old os-es, disk sizes, etc. useful?
3135
      self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3136
      diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
3137
                                                         'disk0_dump'))
3138
      self.src_image = diskimage
3139
    else: # INSTANCE_CREATE
3140
      if getattr(self.op, "os_type", None) is None:
3141
        raise errors.OpPrereqError("No guest OS specified")
3142

    
3143
    #### instance parameters check
3144

    
3145
    # disk template and mirror node verification
3146
    if self.op.disk_template not in constants.DISK_TEMPLATES:
3147
      raise errors.OpPrereqError("Invalid disk template name")
3148

    
3149
    # instance name verification
3150
    hostname1 = utils.HostInfo(self.op.instance_name)
3151

    
3152
    self.op.instance_name = instance_name = hostname1.name
3153
    instance_list = self.cfg.GetInstanceList()
3154
    if instance_name in instance_list:
3155
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3156
                                 instance_name)
3157

    
3158
    # ip validity checks
3159
    ip = getattr(self.op, "ip", None)
3160
    if ip is None or ip.lower() == "none":
3161
      inst_ip = None
3162
    elif ip.lower() == "auto":
3163
      inst_ip = hostname1.ip
3164
    else:
3165
      if not utils.IsValidIP(ip):
3166
        raise errors.OpPrereqError("given IP address '%s' doesn't look"
3167
                                   " like a valid IP" % ip)
3168
      inst_ip = ip
3169
    self.inst_ip = self.op.ip = inst_ip
3170

    
3171
    if self.op.start and not self.op.ip_check:
3172
      raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3173
                                 " adding an instance in start mode")
3174

    
3175
    if self.op.ip_check:
3176
      if utils.TcpPing(hostname1.ip, constants.DEFAULT_NODED_PORT):
3177
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
3178
                                   (hostname1.ip, instance_name))
3179

    
3180
    # MAC address verification
3181
    if self.op.mac != "auto":
3182
      if not utils.IsValidMac(self.op.mac.lower()):
3183
        raise errors.OpPrereqError("invalid MAC address specified: %s" %
3184
                                   self.op.mac)
3185

    
3186
    # bridge verification
3187
    bridge = getattr(self.op, "bridge", None)
3188
    if bridge is None:
3189
      self.op.bridge = self.cfg.GetDefBridge()
3190
    else:
3191
      self.op.bridge = bridge
3192

    
3193
    # boot order verification
3194
    if self.op.hvm_boot_order is not None:
3195
      if len(self.op.hvm_boot_order.strip("acdn")) != 0:
3196
        raise errors.OpPrereqError("invalid boot order specified,"
3197
                                   " must be one or more of [acdn]")
3198
    # file storage checks
3199
    if (self.op.file_driver and
3200
        not self.op.file_driver in constants.FILE_DRIVER):
3201
      raise errors.OpPrereqError("Invalid file driver name '%s'" %
3202
                                 self.op.file_driver)
3203

    
3204
    if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
3205
      raise errors.OpPrereqError("File storage directory not a relative"
3206
                                 " path")
3207
    #### allocator run
3208

    
3209
    if [self.op.iallocator, self.op.pnode].count(None) != 1:
3210
      raise errors.OpPrereqError("One and only one of iallocator and primary"
3211
                                 " node must be given")
3212

    
3213
    if self.op.iallocator is not None:
3214
      self._RunAllocator()
3215

    
3216
    #### node related checks
3217

    
3218
    # check primary node
3219
    pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
3220
    if pnode is None:
3221
      raise errors.OpPrereqError("Primary node '%s' is unknown" %
3222
                                 self.op.pnode)
3223
    self.op.pnode = pnode.name
3224
    self.pnode = pnode
3225
    self.secondaries = []
3226

    
3227
    # mirror node verification
3228
    if self.op.disk_template in constants.DTS_NET_MIRROR:
3229
      if getattr(self.op, "snode", None) is None:
3230
        raise errors.OpPrereqError("The networked disk templates need"
3231
                                   " a mirror node")
3232

    
3233
      snode_name = self.cfg.ExpandNodeName(self.op.snode)
3234
      if snode_name is None:
3235
        raise errors.OpPrereqError("Unknown secondary node '%s'" %
3236
                                   self.op.snode)
3237
      elif snode_name == pnode.name:
3238
        raise errors.OpPrereqError("The secondary node cannot be"
3239
                                   " the primary node.")
3240
      self.secondaries.append(snode_name)
3241

    
3242
    req_size = _ComputeDiskSize(self.op.disk_template,
3243
                                self.op.disk_size, self.op.swap_size)
3244

    
3245
    # Check lv size requirements
3246
    if req_size is not None:
3247
      nodenames = [pnode.name] + self.secondaries
3248
      nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
3249
      for node in nodenames:
3250
        info = nodeinfo.get(node, None)
3251
        if not info:
3252
          raise errors.OpPrereqError("Cannot get current information"
3253
                                     " from node '%s'" % node)
3254
        vg_free = info.get('vg_free', None)
3255
        if not isinstance(vg_free, int):
3256
          raise errors.OpPrereqError("Can't compute free disk space on"
3257
                                     " node %s" % node)
3258
        if req_size > info['vg_free']:
3259
          raise errors.OpPrereqError("Not enough disk space on target node %s."
3260
                                     " %d MB available, %d MB required" %
3261
                                     (node, info['vg_free'], req_size))
3262

    
3263
    # os verification
3264
    os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
3265
    if not os_obj:
3266
      raise errors.OpPrereqError("OS '%s' not in supported os list for"
3267
                                 " primary node"  % self.op.os_type)
3268

    
3269
    if self.op.kernel_path == constants.VALUE_NONE:
3270
      raise errors.OpPrereqError("Can't set instance kernel to none")
3271

    
3272

    
3273
    # bridge check on primary node
3274
    if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3275
      raise errors.OpPrereqError("target bridge '%s' does not exist on"
3276
                                 " destination node '%s'" %
3277
                                 (self.op.bridge, pnode.name))
3278

    
3279
    # memory check on primary node
3280
    if self.op.start:
3281
      _CheckNodeFreeMemory(self.cfg, self.pnode.name,
3282
                           "creating instance %s" % self.op.instance_name,
3283
                           self.op.mem_size)
3284

    
3285
    # hvm_cdrom_image_path verification
3286
    if self.op.hvm_cdrom_image_path is not None:
3287
      if not os.path.isabs(self.op.hvm_cdrom_image_path):
3288
        raise errors.OpPrereqError("The path to the HVM CDROM image must"
3289
                                   " be an absolute path or None, not %s" %
3290
                                   self.op.hvm_cdrom_image_path)
3291
      if not os.path.isfile(self.op.hvm_cdrom_image_path):
3292
        raise errors.OpPrereqError("The HVM CDROM image must either be a"
3293
                                   " regular file or a symlink pointing to"
3294
                                   " an existing regular file, not %s" %
3295
                                   self.op.hvm_cdrom_image_path)
3296

    
3297
    # vnc_bind_address verification
3298
    if self.op.vnc_bind_address is not None:
3299
      if not utils.IsValidIP(self.op.vnc_bind_address):
3300
        raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
3301
                                   " like a valid IP address" %
3302
                                   self.op.vnc_bind_address)
3303

    
3304
    # Xen HVM device type checks
3305
    if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
3306
      if self.op.hvm_nic_type not in constants.HT_HVM_VALID_NIC_TYPES:
3307
        raise errors.OpPrereqError("Invalid NIC type %s specified for Xen HVM"
3308
                                   " hypervisor" % self.op.hvm_nic_type)
3309
      if self.op.hvm_disk_type not in constants.HT_HVM_VALID_DISK_TYPES:
3310
        raise errors.OpPrereqError("Invalid disk type %s specified for Xen HVM"
3311
                                   " hypervisor" % self.op.hvm_disk_type)
3312

    
3313
    if self.op.start:
3314
      self.instance_status = 'up'
3315
    else:
3316
      self.instance_status = 'down'
3317

    
3318
  def Exec(self, feedback_fn):
3319
    """Create and add the instance to the cluster.
3320

3321
    """
3322
    instance = self.op.instance_name
3323
    pnode_name = self.pnode.name
3324

    
3325
    if self.op.mac == "auto":
3326
      mac_address = self.cfg.GenerateMAC()
3327
    else:
3328
      mac_address = self.op.mac
3329

    
3330
    nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3331
    if self.inst_ip is not None:
3332
      nic.ip = self.inst_ip
3333

    
3334
    ht_kind = self.sstore.GetHypervisorType()
3335
    if ht_kind in constants.HTS_REQ_PORT:
3336
      network_port = self.cfg.AllocatePort()
3337
    else:
3338
      network_port = None
3339

    
3340
    if self.op.vnc_bind_address is None:
3341
      self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
3342

    
3343
    # this is needed because os.path.join does not accept None arguments
3344
    if self.op.file_storage_dir is None:
3345
      string_file_storage_dir = ""
3346
    else:
3347
      string_file_storage_dir = self.op.file_storage_dir
3348

    
3349
    # build the full file storage dir path
3350
    file_storage_dir = os.path.normpath(os.path.join(
3351
                                        self.sstore.GetFileStorageDir(),
3352
                                        string_file_storage_dir, instance))
3353

    
3354

    
3355
    disks = _GenerateDiskTemplate(self.cfg,
3356
                                  self.op.disk_template,
3357
                                  instance, pnode_name,
3358
                                  self.secondaries, self.op.disk_size,
3359
                                  self.op.swap_size,
3360
                                  file_storage_dir,
3361
                                  self.op.file_driver)
3362

    
3363
    iobj = objects.Instance(name=instance, os=self.op.os_type,
3364
                            primary_node=pnode_name,
3365
                            memory=self.op.mem_size,
3366
                            vcpus=self.op.vcpus,
3367
                            nics=[nic], disks=disks,
3368
                            disk_template=self.op.disk_template,
3369
                            status=self.instance_status,
3370
                            network_port=network_port,
3371
                            kernel_path=self.op.kernel_path,
3372
                            initrd_path=self.op.initrd_path,
3373
                            hvm_boot_order=self.op.hvm_boot_order,
3374
                            hvm_acpi=self.op.hvm_acpi,
3375
                            hvm_pae=self.op.hvm_pae,
3376
                            hvm_cdrom_image_path=self.op.hvm_cdrom_image_path,
3377
                            vnc_bind_address=self.op.vnc_bind_address,
3378
                            hvm_nic_type=self.op.hvm_nic_type,
3379
                            hvm_disk_type=self.op.hvm_disk_type,
3380
                            )
3381

    
3382
    feedback_fn("* creating instance disks...")
3383
    if not _CreateDisks(self.cfg, iobj):
3384
      _RemoveDisks(iobj, self.cfg)
3385
      raise errors.OpExecError("Device creation failed, reverting...")
3386

    
3387
    feedback_fn("adding instance %s to cluster config" % instance)
3388

    
3389
    self.cfg.AddInstance(iobj)
3390
    # Add the new instance to the Ganeti Lock Manager
3391
    self.context.glm.add(locking.LEVEL_INSTANCE, instance)
3392

    
3393
    if self.op.wait_for_sync:
3394
      disk_abort = not _WaitForSync(self.cfg, iobj, self.proc)
3395
    elif iobj.disk_template in constants.DTS_NET_MIRROR:
3396
      # make sure the disks are not degraded (still sync-ing is ok)
3397
      time.sleep(15)
3398
      feedback_fn("* checking mirrors status")
3399
      disk_abort = not _WaitForSync(self.cfg, iobj, self.proc, oneshot=True)
3400
    else:
3401
      disk_abort = False
3402

    
3403
    if disk_abort:
3404
      _RemoveDisks(iobj, self.cfg)
3405
      self.cfg.RemoveInstance(iobj.name)
3406
      # Remove the new instance from the Ganeti Lock Manager
3407
      self.context.glm.remove(locking.LEVEL_INSTANCE, iobj.name)
3408
      raise errors.OpExecError("There are some degraded disks for"
3409
                               " this instance")
3410

    
3411
    feedback_fn("creating os for instance %s on node %s" %
3412
                (instance, pnode_name))
3413

    
3414
    if iobj.disk_template != constants.DT_DISKLESS:
3415
      if self.op.mode == constants.INSTANCE_CREATE:
3416
        feedback_fn("* running the instance OS create scripts...")
3417
        if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3418
          raise errors.OpExecError("could not add os for instance %s"
3419
                                   " on node %s" %
3420
                                   (instance, pnode_name))
3421

    
3422
      elif self.op.mode == constants.INSTANCE_IMPORT:
3423
        feedback_fn("* running the instance OS import scripts...")
3424
        src_node = self.op.src_node
3425
        src_image = self.src_image
3426
        if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3427
                                                src_node, src_image):
3428
          raise errors.OpExecError("Could not import os for instance"
3429
                                   " %s on node %s" %
3430
                                   (instance, pnode_name))
3431
      else:
3432
        # also checked in the prereq part
3433
        raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3434
                                     % self.op.mode)
3435

    
3436
    if self.op.start:
3437
      logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3438
      feedback_fn("* starting instance...")
3439
      if not rpc.call_instance_start(pnode_name, iobj, None):
3440
        raise errors.OpExecError("Could not start instance")
3441

    
3442

    
3443
class LUConnectConsole(NoHooksLU):
3444
  """Connect to an instance's console.
3445

3446
  This is somewhat special in that it returns the command line that
3447
  you need to run on the master node in order to connect to the
3448
  console.
3449

3450
  """
3451
  _OP_REQP = ["instance_name"]
3452
  REQ_BGL = False
3453

    
3454
  def ExpandNames(self):
3455
    self._ExpandAndLockInstance()
3456

    
3457
  def CheckPrereq(self):
3458
    """Check prerequisites.
3459

3460
    This checks that the instance is in the cluster.
3461

3462
    """
3463
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3464
    assert self.instance is not None, \
3465
      "Cannot retrieve locked instance %s" % self.op.instance_name
3466

    
3467
  def Exec(self, feedback_fn):
3468
    """Connect to the console of an instance
3469

3470
    """
3471
    instance = self.instance
3472
    node = instance.primary_node
3473

    
3474
    node_insts = rpc.call_instance_list([node])[node]
3475
    if node_insts is False:
3476
      raise errors.OpExecError("Can't connect to node %s." % node)
3477

    
3478
    if instance.name not in node_insts:
3479
      raise errors.OpExecError("Instance %s is not running." % instance.name)
3480

    
3481
    logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3482

    
3483
    hyper = hypervisor.GetHypervisor()
3484
    console_cmd = hyper.GetShellCommandForConsole(instance)
3485

    
3486
    # build ssh cmdline
3487
    return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
3488

    
3489

    
3490
class LUReplaceDisks(LogicalUnit):
3491
  """Replace the disks of an instance.
3492

3493
  """
3494
  HPATH = "mirrors-replace"
3495
  HTYPE = constants.HTYPE_INSTANCE
3496
  _OP_REQP = ["instance_name", "mode", "disks"]
3497

    
3498
  def _RunAllocator(self):
3499
    """Compute a new secondary node using an IAllocator.
3500

3501
    """
3502
    ial = IAllocator(self.cfg, self.sstore,
3503
                     mode=constants.IALLOCATOR_MODE_RELOC,
3504
                     name=self.op.instance_name,
3505
                     relocate_from=[self.sec_node])
3506

    
3507
    ial.Run(self.op.iallocator)
3508

    
3509
    if not ial.success:
3510
      raise errors.OpPrereqError("Can't compute nodes using"
3511
                                 " iallocator '%s': %s" % (self.op.iallocator,
3512
                                                           ial.info))
3513
    if len(ial.nodes) != ial.required_nodes:
3514
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3515
                                 " of nodes (%s), required %s" %
3516
                                 (len(ial.nodes), ial.required_nodes))
3517
    self.op.remote_node = ial.nodes[0]
3518
    logger.ToStdout("Selected new secondary for the instance: %s" %
3519
                    self.op.remote_node)
3520

    
3521
  def BuildHooksEnv(self):
3522
    """Build hooks env.
3523

3524
    This runs on the master, the primary and all the secondaries.
3525

3526
    """
3527
    env = {
3528
      "MODE": self.op.mode,
3529
      "NEW_SECONDARY": self.op.remote_node,
3530
      "OLD_SECONDARY": self.instance.secondary_nodes[0],
3531
      }
3532
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3533
    nl = [
3534
      self.sstore.GetMasterNode(),
3535
      self.instance.primary_node,
3536
      ]
3537
    if self.op.remote_node is not None:
3538
      nl.append(self.op.remote_node)
3539
    return env, nl, nl
3540

    
3541
  def CheckPrereq(self):
3542
    """Check prerequisites.
3543

3544
    This checks that the instance is in the cluster.
3545

3546
    """
3547
    if not hasattr(self.op, "remote_node"):
3548
      self.op.remote_node = None
3549

    
3550
    instance = self.cfg.GetInstanceInfo(
3551
      self.cfg.ExpandInstanceName(self.op.instance_name))
3552
    if instance is None:
3553
      raise errors.OpPrereqError("Instance '%s' not known" %
3554
                                 self.op.instance_name)
3555
    self.instance = instance
3556
    self.op.instance_name = instance.name
3557

    
3558
    if instance.disk_template not in constants.DTS_NET_MIRROR:
3559
      raise errors.OpPrereqError("Instance's disk layout is not"
3560
                                 " network mirrored.")
3561

    
3562
    if len(instance.secondary_nodes) != 1:
3563
      raise errors.OpPrereqError("The instance has a strange layout,"
3564
                                 " expected one secondary but found %d" %
3565
                                 len(instance.secondary_nodes))
3566

    
3567
    self.sec_node = instance.secondary_nodes[0]
3568

    
3569
    ia_name = getattr(self.op, "iallocator", None)
3570
    if ia_name is not None:
3571
      if self.op.remote_node is not None:
3572
        raise errors.OpPrereqError("Give either the iallocator or the new"
3573
                                   " secondary, not both")
3574
      self.op.remote_node = self._RunAllocator()
3575

    
3576
    remote_node = self.op.remote_node
3577
    if remote_node is not None:
3578
      remote_node = self.cfg.ExpandNodeName(remote_node)
3579
      if remote_node is None:
3580
        raise errors.OpPrereqError("Node '%s' not known" %
3581
                                   self.op.remote_node)
3582
      self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3583
    else:
3584
      self.remote_node_info = None
3585
    if remote_node == instance.primary_node:
3586
      raise errors.OpPrereqError("The specified node is the primary node of"
3587
                                 " the instance.")
3588
    elif remote_node == self.sec_node:
3589
      if self.op.mode == constants.REPLACE_DISK_SEC:
3590
        # this is for DRBD8, where we can't execute the same mode of
3591
        # replacement as for drbd7 (no different port allocated)
3592
        raise errors.OpPrereqError("Same secondary given, cannot execute"
3593
                                   " replacement")
3594
    if instance.disk_template == constants.DT_DRBD8:
3595
      if (self.op.mode == constants.REPLACE_DISK_ALL and
3596
          remote_node is not None):
3597
        # switch to replace secondary mode
3598
        self.op.mode = constants.REPLACE_DISK_SEC
3599

    
3600
      if self.op.mode == constants.REPLACE_DISK_ALL:
3601
        raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3602
                                   " secondary disk replacement, not"
3603
                                   " both at once")
3604
      elif self.op.mode == constants.REPLACE_DISK_PRI:
3605
        if remote_node is not None:
3606
          raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3607
                                     " the secondary while doing a primary"
3608
                                     " node disk replacement")
3609
        self.tgt_node = instance.primary_node
3610
        self.oth_node = instance.secondary_nodes[0]
3611
      elif self.op.mode == constants.REPLACE_DISK_SEC:
3612
        self.new_node = remote_node # this can be None, in which case
3613
                                    # we don't change the secondary
3614
        self.tgt_node = instance.secondary_nodes[0]
3615
        self.oth_node = instance.primary_node
3616
      else:
3617
        raise errors.ProgrammerError("Unhandled disk replace mode")
3618

    
3619
    for name in self.op.disks:
3620
      if instance.FindDisk(name) is None:
3621
        raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3622
                                   (name, instance.name))
3623
    self.op.remote_node = remote_node
3624

    
3625
  def _ExecD8DiskOnly(self, feedback_fn):
3626
    """Replace a disk on the primary or secondary for dbrd8.
3627

3628
    The algorithm for replace is quite complicated:
3629
      - for each disk to be replaced:
3630
        - create new LVs on the target node with unique names
3631
        - detach old LVs from the drbd device
3632
        - rename old LVs to name_replaced.<time_t>
3633
        - rename new LVs to old LVs
3634
        - attach the new LVs (with the old names now) to the drbd device
3635
      - wait for sync across all devices
3636
      - for each modified disk:
3637
        - remove old LVs (which have the name name_replaces.<time_t>)
3638

3639
    Failures are not very well handled.
3640

3641
    """
3642
    steps_total = 6
3643
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3644
    instance = self.instance
3645
    iv_names = {}
3646
    vgname = self.cfg.GetVGName()
3647
    # start of work
3648
    cfg = self.cfg
3649
    tgt_node = self.tgt_node
3650
    oth_node = self.oth_node
3651

    
3652
    # Step: check device activation
3653
    self.proc.LogStep(1, steps_total, "check device existence")
3654
    info("checking volume groups")
3655
    my_vg = cfg.GetVGName()
3656
    results = rpc.call_vg_list([oth_node, tgt_node])
3657
    if not results:
3658
      raise errors.OpExecError("Can't list volume groups on the nodes")
3659
    for node in oth_node, tgt_node:
3660
      res = results.get(node, False)
3661
      if not res or my_vg not in res:
3662
        raise errors.OpExecError("Volume group '%s' not found on %s" %
3663
                                 (my_vg, node))
3664
    for dev in instance.disks:
3665
      if not dev.iv_name in self.op.disks:
3666
        continue
3667
      for node in tgt_node, oth_node:
3668
        info("checking %s on %s" % (dev.iv_name, node))
3669
        cfg.SetDiskID(dev, node)
3670
        if not rpc.call_blockdev_find(node, dev):
3671
          raise errors.OpExecError("Can't find device %s on node %s" %
3672
                                   (dev.iv_name, node))
3673

    
3674
    # Step: check other node consistency
3675
    self.proc.LogStep(2, steps_total, "check peer consistency")
3676
    for dev in instance.disks:
3677
      if not dev.iv_name in self.op.disks:
3678
        continue
3679
      info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3680
      if not _CheckDiskConsistency(self.cfg, dev, oth_node,
3681
                                   oth_node==instance.primary_node):
3682
        raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3683
                                 " to replace disks on this node (%s)" %
3684
                                 (oth_node, tgt_node))
3685

    
3686
    # Step: create new storage
3687
    self.proc.LogStep(3, steps_total, "allocate new storage")
3688
    for dev in instance.disks:
3689
      if not dev.iv_name in self.op.disks:
3690
        continue
3691
      size = dev.size
3692
      cfg.SetDiskID(dev, tgt_node)
3693
      lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3694
      names = _GenerateUniqueNames(cfg, lv_names)
3695
      lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3696
                             logical_id=(vgname, names[0]))
3697
      lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3698
                             logical_id=(vgname, names[1]))
3699
      new_lvs = [lv_data, lv_meta]
3700
      old_lvs = dev.children
3701
      iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3702
      info("creating new local storage on %s for %s" %
3703
           (tgt_node, dev.iv_name))
3704
      # since we *always* want to create this LV, we use the
3705
      # _Create...OnPrimary (which forces the creation), even if we
3706
      # are talking about the secondary node
3707
      for new_lv in new_lvs:
3708
        if not _CreateBlockDevOnPrimary(cfg, tgt_node, instance, new_lv,
3709
                                        _GetInstanceInfoText(instance)):
3710
          raise errors.OpExecError("Failed to create new LV named '%s' on"
3711
                                   " node '%s'" %
3712
                                   (new_lv.logical_id[1], tgt_node))
3713

    
3714
    # Step: for each lv, detach+rename*2+attach
3715
    self.proc.LogStep(4, steps_total, "change drbd configuration")
3716
    for dev, old_lvs, new_lvs in iv_names.itervalues():
3717
      info("detaching %s drbd from local storage" % dev.iv_name)
3718
      if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
3719
        raise errors.OpExecError("Can't detach drbd from local storage on node"
3720
                                 " %s for device %s" % (tgt_node, dev.iv_name))
3721
      #dev.children = []
3722
      #cfg.Update(instance)
3723

    
3724
      # ok, we created the new LVs, so now we know we have the needed
3725
      # storage; as such, we proceed on the target node to rename
3726
      # old_lv to _old, and new_lv to old_lv; note that we rename LVs
3727
      # using the assumption that logical_id == physical_id (which in
3728
      # turn is the unique_id on that node)
3729

    
3730
      # FIXME(iustin): use a better name for the replaced LVs
3731
      temp_suffix = int(time.time())
3732
      ren_fn = lambda d, suff: (d.physical_id[0],
3733
                                d.physical_id[1] + "_replaced-%s" % suff)
3734
      # build the rename list based on what LVs exist on the node
3735
      rlist = []
3736
      for to_ren in old_lvs:
3737
        find_res = rpc.call_blockdev_find(tgt_node, to_ren)
3738
        if find_res is not None: # device exists
3739
          rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
3740

    
3741
      info("renaming the old LVs on the target node")
3742
      if not rpc.call_blockdev_rename(tgt_node, rlist):
3743
        raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
3744
      # now we rename the new LVs to the old LVs
3745
      info("renaming the new LVs on the target node")
3746
      rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
3747
      if not rpc.call_blockdev_rename(tgt_node, rlist):
3748
        raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
3749

    
3750
      for old, new in zip(old_lvs, new_lvs):
3751
        new.logical_id = old.logical_id
3752
        cfg.SetDiskID(new, tgt_node)
3753

    
3754
      for disk in old_lvs:
3755
        disk.logical_id = ren_fn(disk, temp_suffix)
3756
        cfg.SetDiskID(disk, tgt_node)
3757

    
3758
      # now that the new lvs have the old name, we can add them to the device
3759
      info("adding new mirror component on %s" % tgt_node)
3760
      if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
3761
        for new_lv in new_lvs:
3762
          if not rpc.call_blockdev_remove(tgt_node, new_lv):
3763
            warning("Can't rollback device %s", hint="manually cleanup unused"
3764
                    " logical volumes")
3765
        raise errors.OpExecError("Can't add local storage to drbd")
3766

    
3767
      dev.children = new_lvs
3768
      cfg.Update(instance)
3769

    
3770
    # Step: wait for sync
3771

    
3772
    # this can fail as the old devices are degraded and _WaitForSync
3773
    # does a combined result over all disks, so we don't check its
3774
    # return value
3775
    self.proc.LogStep(5, steps_total, "sync devices")
3776
    _WaitForSync(cfg, instance, self.proc, unlock=True)
3777

    
3778
    # so check manually all the devices
3779
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3780
      cfg.SetDiskID(dev, instance.primary_node)
3781
      is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3782
      if is_degr:
3783
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
3784

    
3785
    # Step: remove old storage
3786
    self.proc.LogStep(6, steps_total, "removing old storage")
3787
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3788
      info("remove logical volumes for %s" % name)
3789
      for lv in old_lvs:
3790
        cfg.SetDiskID(lv, tgt_node)
3791
        if not rpc.call_blockdev_remove(tgt_node, lv):
3792
          warning("Can't remove old LV", hint="manually remove unused LVs")
3793
          continue
3794

    
3795
  def _ExecD8Secondary(self, feedback_fn):
3796
    """Replace the secondary node for drbd8.
3797

3798
    The algorithm for replace is quite complicated:
3799
      - for all disks of the instance:
3800
        - create new LVs on the new node with same names
3801
        - shutdown the drbd device on the old secondary
3802
        - disconnect the drbd network on the primary
3803
        - create the drbd device on the new secondary
3804
        - network attach the drbd on the primary, using an artifice:
3805
          the drbd code for Attach() will connect to the network if it
3806
          finds a device which is connected to the good local disks but
3807
          not network enabled
3808
      - wait for sync across all devices
3809
      - remove all disks from the old secondary
3810

3811
    Failures are not very well handled.
3812

3813
    """
3814
    steps_total = 6
3815
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3816
    instance = self.instance
3817
    iv_names = {}
3818
    vgname = self.cfg.GetVGName()
3819
    # start of work
3820
    cfg = self.cfg
3821
    old_node = self.tgt_node
3822
    new_node = self.new_node
3823
    pri_node = instance.primary_node
3824

    
3825
    # Step: check device activation
3826
    self.proc.LogStep(1, steps_total, "check device existence")
3827
    info("checking volume groups")
3828
    my_vg = cfg.GetVGName()
3829
    results = rpc.call_vg_list([pri_node, new_node])
3830
    if not results:
3831
      raise errors.OpExecError("Can't list volume groups on the nodes")
3832
    for node in pri_node, new_node:
3833
      res = results.get(node, False)
3834
      if not res or my_vg not in res:
3835
        raise errors.OpExecError("Volume group '%s' not found on %s" %
3836
                                 (my_vg, node))
3837
    for dev in instance.disks:
3838
      if not dev.iv_name in self.op.disks:
3839
        continue
3840
      info("checking %s on %s" % (dev.iv_name, pri_node))
3841
      cfg.SetDiskID(dev, pri_node)
3842
      if not rpc.call_blockdev_find(pri_node, dev):
3843
        raise errors.OpExecError("Can't find device %s on node %s" %
3844
                                 (dev.iv_name, pri_node))
3845

    
3846
    # Step: check other node consistency
3847
    self.proc.LogStep(2, steps_total, "check peer consistency")
3848
    for dev in instance.disks:
3849
      if not dev.iv_name in self.op.disks:
3850
        continue
3851
      info("checking %s consistency on %s" % (dev.iv_name, pri_node))
3852
      if not _CheckDiskConsistency(self.cfg, dev, pri_node, True, ldisk=True):
3853
        raise errors.OpExecError("Primary node (%s) has degraded storage,"
3854
                                 " unsafe to replace the secondary" %
3855
                                 pri_node)
3856

    
3857
    # Step: create new storage
3858
    self.proc.LogStep(3, steps_total, "allocate new storage")
3859
    for dev in instance.disks:
3860
      size = dev.size
3861
      info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
3862
      # since we *always* want to create this LV, we use the
3863
      # _Create...OnPrimary (which forces the creation), even if we
3864
      # are talking about the secondary node
3865
      for new_lv in dev.children:
3866
        if not _CreateBlockDevOnPrimary(cfg, new_node, instance, new_lv,
3867
                                        _GetInstanceInfoText(instance)):
3868
          raise errors.OpExecError("Failed to create new LV named '%s' on"
3869
                                   " node '%s'" %
3870
                                   (new_lv.logical_id[1], new_node))
3871

    
3872
      iv_names[dev.iv_name] = (dev, dev.children)
3873

    
3874
    self.proc.LogStep(4, steps_total, "changing drbd configuration")
3875
    for dev in instance.disks:
3876
      size = dev.size
3877
      info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
3878
      # create new devices on new_node
3879
      new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
3880
                              logical_id=(pri_node, new_node,
3881
                                          dev.logical_id[2]),
3882
                              children=dev.children)
3883
      if not _CreateBlockDevOnSecondary(cfg, new_node, instance,
3884
                                        new_drbd, False,
3885
                                      _GetInstanceInfoText(instance)):
3886
        raise errors.OpExecError("Failed to create new DRBD on"
3887
                                 " node '%s'" % new_node)
3888

    
3889
    for dev in instance.disks:
3890
      # we have new devices, shutdown the drbd on the old secondary
3891
      info("shutting down drbd for %s on old node" % dev.iv_name)
3892
      cfg.SetDiskID(dev, old_node)
3893
      if not rpc.call_blockdev_shutdown(old_node, dev):
3894
        warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
3895
                hint="Please cleanup this device manually as soon as possible")
3896

    
3897
    info("detaching primary drbds from the network (=> standalone)")
3898
    done = 0
3899
    for dev in instance.disks:
3900
      cfg.SetDiskID(dev, pri_node)
3901
      # set the physical (unique in bdev terms) id to None, meaning
3902
      # detach from network
3903
      dev.physical_id = (None,) * len(dev.physical_id)
3904
      # and 'find' the device, which will 'fix' it to match the
3905
      # standalone state
3906
      if rpc.call_blockdev_find(pri_node, dev):
3907
        done += 1
3908
      else:
3909
        warning("Failed to detach drbd %s from network, unusual case" %
3910
                dev.iv_name)
3911

    
3912
    if not done:
3913
      # no detaches succeeded (very unlikely)
3914
      raise errors.OpExecError("Can't detach at least one DRBD from old node")
3915

    
3916
    # if we managed to detach at least one, we update all the disks of
3917
    # the instance to point to the new secondary
3918
    info("updating instance configuration")
3919
    for dev in instance.disks:
3920
      dev.logical_id = (pri_node, new_node) + dev.logical_id[2:]
3921
      cfg.SetDiskID(dev, pri_node)
3922
    cfg.Update(instance)
3923

    
3924
    # and now perform the drbd attach
3925
    info("attaching primary drbds to new secondary (standalone => connected)")
3926
    failures = []
3927
    for dev in instance.disks:
3928
      info("attaching primary drbd for %s to new secondary node" % dev.iv_name)
3929
      # since the attach is smart, it's enough to 'find' the device,
3930
      # it will automatically activate the network, if the physical_id
3931
      # is correct
3932
      cfg.SetDiskID(dev, pri_node)
3933
      if not rpc.call_blockdev_find(pri_node, dev):
3934
        warning("can't attach drbd %s to new secondary!" % dev.iv_name,
3935
                "please do a gnt-instance info to see the status of disks")
3936

    
3937
    # this can fail as the old devices are degraded and _WaitForSync
3938
    # does a combined result over all disks, so we don't check its
3939
    # return value
3940
    self.proc.LogStep(5, steps_total, "sync devices")
3941
    _WaitForSync(cfg, instance, self.proc, unlock=True)
3942

    
3943
    # so check manually all the devices
3944
    for name, (dev, old_lvs) in iv_names.iteritems():
3945
      cfg.SetDiskID(dev, pri_node)
3946
      is_degr = rpc.call_blockdev_find(pri_node, dev)[5]
3947
      if is_degr:
3948
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
3949

    
3950
    self.proc.LogStep(6, steps_total, "removing old storage")
3951
    for name, (dev, old_lvs) in iv_names.iteritems():
3952
      info("remove logical volumes for %s" % name)
3953
      for lv in old_lvs:
3954
        cfg.SetDiskID(lv, old_node)
3955
        if not rpc.call_blockdev_remove(old_node, lv):
3956
          warning("Can't remove LV on old secondary",
3957
                  hint="Cleanup stale volumes by hand")
3958

    
3959
  def Exec(self, feedback_fn):
3960
    """Execute disk replacement.
3961

3962
    This dispatches the disk replacement to the appropriate handler.
3963

3964
    """
3965
    instance = self.instance
3966

    
3967
    # Activate the instance disks if we're replacing them on a down instance
3968
    if instance.status == "down":
3969
      op = opcodes.OpActivateInstanceDisks(instance_name=instance.name)
3970
      self.proc.ChainOpCode(op)
3971

    
3972
    if instance.disk_template == constants.DT_DRBD8:
3973
      if self.op.remote_node is None:
3974
        fn = self._ExecD8DiskOnly
3975
      else:
3976
        fn = self._ExecD8Secondary
3977
    else:
3978
      raise errors.ProgrammerError("Unhandled disk replacement case")
3979

    
3980
    ret = fn(feedback_fn)
3981

    
3982
    # Deactivate the instance disks if we're replacing them on a down instance
3983
    if instance.status == "down":
3984
      op = opcodes.OpDeactivateInstanceDisks(instance_name=instance.name)
3985
      self.proc.ChainOpCode(op)
3986

    
3987
    return ret
3988

    
3989

    
3990
class LUGrowDisk(LogicalUnit):
3991
  """Grow a disk of an instance.
3992

3993
  """
3994
  HPATH = "disk-grow"
3995
  HTYPE = constants.HTYPE_INSTANCE
3996
  _OP_REQP = ["instance_name", "disk", "amount"]
3997

    
3998
  def BuildHooksEnv(self):
3999
    """Build hooks env.
4000

4001
    This runs on the master, the primary and all the secondaries.
4002

4003
    """
4004
    env = {
4005
      "DISK": self.op.disk,
4006
      "AMOUNT": self.op.amount,
4007
      }
4008
    env.update(_BuildInstanceHookEnvByObject(self.instance))
4009
    nl = [
4010
      self.sstore.GetMasterNode(),
4011
      self.instance.primary_node,
4012
      ]
4013
    return env, nl, nl
4014

    
4015
  def CheckPrereq(self):
4016
    """Check prerequisites.
4017

4018
    This checks that the instance is in the cluster.
4019

4020
    """
4021
    instance = self.cfg.GetInstanceInfo(
4022
      self.cfg.ExpandInstanceName(self.op.instance_name))
4023
    if instance is None:
4024
      raise errors.OpPrereqError("Instance '%s' not known" %
4025
                                 self.op.instance_name)
4026
    self.instance = instance
4027
    self.op.instance_name = instance.name
4028

    
4029
    if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
4030
      raise errors.OpPrereqError("Instance's disk layout does not support"
4031
                                 " growing.")
4032

    
4033
    if instance.FindDisk(self.op.disk) is None:
4034
      raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
4035
                                 (self.op.disk, instance.name))
4036

    
4037
    nodenames = [instance.primary_node] + list(instance.secondary_nodes)
4038
    nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
4039
    for node in nodenames:
4040
      info = nodeinfo.get(node, None)
4041
      if not info:
4042
        raise errors.OpPrereqError("Cannot get current information"
4043
                                   " from node '%s'" % node)
4044
      vg_free = info.get('vg_free', None)
4045
      if not isinstance(vg_free, int):
4046
        raise errors.OpPrereqError("Can't compute free disk space on"
4047
                                   " node %s" % node)
4048
      if self.op.amount > info['vg_free']:
4049
        raise errors.OpPrereqError("Not enough disk space on target node %s:"
4050
                                   " %d MiB available, %d MiB required" %
4051
                                   (node, info['vg_free'], self.op.amount))
4052

    
4053
  def Exec(self, feedback_fn):
4054
    """Execute disk grow.
4055

4056
    """
4057
    instance = self.instance
4058
    disk = instance.FindDisk(self.op.disk)
4059
    for node in (instance.secondary_nodes + (instance.primary_node,)):
4060
      self.cfg.SetDiskID(disk, node)
4061
      result = rpc.call_blockdev_grow(node, disk, self.op.amount)
4062
      if not result or not isinstance(result, tuple) or len(result) != 2:
4063
        raise errors.OpExecError("grow request failed to node %s" % node)
4064
      elif not result[0]:
4065
        raise errors.OpExecError("grow request failed to node %s: %s" %
4066
                                 (node, result[1]))
4067
    disk.RecordGrow(self.op.amount)
4068
    self.cfg.Update(instance)
4069
    return
4070

    
4071

    
4072
class LUQueryInstanceData(NoHooksLU):
4073
  """Query runtime instance data.
4074

4075
  """
4076
  _OP_REQP = ["instances"]
4077

    
4078
  def CheckPrereq(self):
4079
    """Check prerequisites.
4080

4081
    This only checks the optional instance list against the existing names.
4082

4083
    """
4084
    if not isinstance(self.op.instances, list):
4085
      raise errors.OpPrereqError("Invalid argument type 'instances'")
4086
    if self.op.instances:
4087
      self.wanted_instances = []
4088
      names = self.op.instances
4089
      for name in names:
4090
        instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
4091
        if instance is None:
4092
          raise errors.OpPrereqError("No such instance name '%s'" % name)
4093
        self.wanted_instances.append(instance)
4094
    else:
4095
      self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
4096
                               in self.cfg.GetInstanceList()]
4097
    return
4098

    
4099

    
4100
  def _ComputeDiskStatus(self, instance, snode, dev):
4101
    """Compute block device status.
4102

4103
    """
4104
    self.cfg.SetDiskID(dev, instance.primary_node)
4105
    dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
4106
    if dev.dev_type in constants.LDS_DRBD:
4107
      # we change the snode then (otherwise we use the one passed in)
4108
      if dev.logical_id[0] == instance.primary_node:
4109
        snode = dev.logical_id[1]
4110
      else:
4111
        snode = dev.logical_id[0]
4112

    
4113
    if snode:
4114
      self.cfg.SetDiskID(dev, snode)
4115
      dev_sstatus = rpc.call_blockdev_find(snode, dev)
4116
    else:
4117
      dev_sstatus = None
4118

    
4119
    if dev.children:
4120
      dev_children = [self._ComputeDiskStatus(instance, snode, child)
4121
                      for child in dev.children]
4122
    else:
4123
      dev_children = []
4124

    
4125
    data = {
4126
      "iv_name": dev.iv_name,
4127
      "dev_type": dev.dev_type,
4128
      "logical_id": dev.logical_id,
4129
      "physical_id": dev.physical_id,
4130
      "pstatus": dev_pstatus,
4131
      "sstatus": dev_sstatus,
4132
      "children": dev_children,
4133
      }
4134

    
4135
    return data
4136

    
4137
  def Exec(self, feedback_fn):
4138
    """Gather and return data"""
4139
    result = {}
4140
    for instance in self.wanted_instances:
4141
      remote_info = rpc.call_instance_info(instance.primary_node,
4142
                                                instance.name)
4143
      if remote_info and "state" in remote_info:
4144
        remote_state = "up"
4145
      else:
4146
        remote_state = "down"
4147
      if instance.status == "down":
4148
        config_state = "down"
4149
      else:
4150
        config_state = "up"
4151

    
4152
      disks = [self._ComputeDiskStatus(instance, None, device)
4153
               for device in instance.disks]
4154

    
4155
      idict = {
4156
        "name": instance.name,
4157
        "config_state": config_state,
4158
        "run_state": remote_state,
4159
        "pnode": instance.primary_node,
4160
        "snodes": instance.secondary_nodes,
4161
        "os": instance.os,
4162
        "memory": instance.memory,
4163
        "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
4164
        "disks": disks,
4165
        "vcpus": instance.vcpus,
4166
        }
4167

    
4168
      htkind = self.sstore.GetHypervisorType()
4169
      if htkind == constants.HT_XEN_PVM30:
4170
        idict["kernel_path"] = instance.kernel_path
4171
        idict["initrd_path"] = instance.initrd_path
4172

    
4173
      if htkind == constants.HT_XEN_HVM31:
4174
        idict["hvm_boot_order"] = instance.hvm_boot_order
4175
        idict["hvm_acpi"] = instance.hvm_acpi
4176
        idict["hvm_pae"] = instance.hvm_pae
4177
        idict["hvm_cdrom_image_path"] = instance.hvm_cdrom_image_path
4178
        idict["hvm_nic_type"] = instance.hvm_nic_type
4179
        idict["hvm_disk_type"] = instance.hvm_disk_type
4180

    
4181
      if htkind in constants.HTS_REQ_PORT:
4182
        if instance.vnc_bind_address is None:
4183
          vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
4184
        else:
4185
          vnc_bind_address = instance.vnc_bind_address
4186
        if instance.network_port is None:
4187
          vnc_console_port = None
4188
        elif vnc_bind_address == constants.BIND_ADDRESS_GLOBAL:
4189
          vnc_console_port = "%s:%s" % (instance.primary_node,
4190
                                       instance.network_port)
4191
        elif vnc_bind_address == constants.LOCALHOST_IP_ADDRESS:
4192
          vnc_console_port = "%s:%s on node %s" % (vnc_bind_address,
4193
                                                   instance.network_port,
4194
                                                   instance.primary_node)
4195
        else:
4196
          vnc_console_port = "%s:%s" % (instance.vnc_bind_address,
4197
                                        instance.network_port)
4198
        idict["vnc_console_port"] = vnc_console_port
4199
        idict["vnc_bind_address"] = vnc_bind_address
4200
        idict["network_port"] = instance.network_port
4201

    
4202
      result[instance.name] = idict
4203

    
4204
    return result
4205

    
4206

    
4207
class LUSetInstanceParams(LogicalUnit):
4208
  """Modifies an instances's parameters.
4209

4210
  """
4211
  HPATH = "instance-modify"
4212
  HTYPE = constants.HTYPE_INSTANCE
4213
  _OP_REQP = ["instance_name"]
4214
  REQ_BGL = False
4215

    
4216
  def ExpandNames(self):
4217
    self._ExpandAndLockInstance()
4218

    
4219
  def BuildHooksEnv(self):
4220
    """Build hooks env.
4221

4222
    This runs on the master, primary and secondaries.
4223

4224
    """
4225
    args = dict()
4226
    if self.mem:
4227
      args['memory'] = self.mem
4228
    if self.vcpus:
4229
      args['vcpus'] = self.vcpus
4230
    if self.do_ip or self.do_bridge or self.mac:
4231
      if self.do_ip:
4232
        ip = self.ip
4233
      else:
4234
        ip = self.instance.nics[0].ip
4235
      if self.bridge:
4236
        bridge = self.bridge
4237
      else:
4238
        bridge = self.instance.nics[0].bridge
4239
      if self.mac:
4240
        mac = self.mac
4241
      else:
4242
        mac = self.instance.nics[0].mac
4243
      args['nics'] = [(ip, bridge, mac)]
4244
    env = _BuildInstanceHookEnvByObject(self.instance, override=args)
4245
    nl = [self.sstore.GetMasterNode(),
4246
          self.instance.primary_node] + list(self.instance.secondary_nodes)
4247
    return env, nl, nl
4248

    
4249
  def CheckPrereq(self):
4250
    """Check prerequisites.
4251

4252
    This only checks the instance list against the existing names.
4253

4254
    """
4255
    # FIXME: all the parameters could be checked before, in ExpandNames, or in
4256
    # a separate CheckArguments function, if we implement one, so the operation
4257
    # can be aborted without waiting for any lock, should it have an error...
4258
    self.mem = getattr(self.op, "mem", None)
4259
    self.vcpus = getattr(self.op, "vcpus", None)
4260
    self.ip = getattr(self.op, "ip", None)
4261
    self.mac = getattr(self.op, "mac", None)
4262
    self.bridge = getattr(self.op, "bridge", None)
4263
    self.kernel_path = getattr(self.op, "kernel_path", None)
4264
    self.initrd_path = getattr(self.op, "initrd_path", None)
4265
    self.hvm_boot_order = getattr(self.op, "hvm_boot_order", None)
4266
    self.hvm_acpi = getattr(self.op, "hvm_acpi", None)
4267
    self.hvm_pae = getattr(self.op, "hvm_pae", None)
4268
    self.hvm_nic_type = getattr(self.op, "hvm_nic_type", None)
4269
    self.hvm_disk_type = getattr(self.op, "hvm_disk_type", None)
4270
    self.hvm_cdrom_image_path = getattr(self.op, "hvm_cdrom_image_path", None)
4271
    self.vnc_bind_address = getattr(self.op, "vnc_bind_address", None)
4272
    self.force = getattr(self.op, "force", None)
4273
    all_parms = [self.mem, self.vcpus, self.ip, self.bridge, self.mac,
4274
                 self.kernel_path, self.initrd_path, self.hvm_boot_order,
4275
                 self.hvm_acpi, self.hvm_pae, self.hvm_cdrom_image_path,
4276
                 self.vnc_bind_address, self.hvm_nic_type, self.hvm_disk_type]
4277
    if all_parms.count(None) == len(all_parms):
4278
      raise errors.OpPrereqError("No changes submitted")
4279
    if self.mem is not None:
4280
      try:
4281
        self.mem = int(self.mem)
4282
      except ValueError, err:
4283
        raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
4284
    if self.vcpus is not None:
4285
      try:
4286
        self.vcpus = int(self.vcpus)
4287
      except ValueError, err:
4288
        raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
4289
    if self.ip is not None:
4290
      self.do_ip = True
4291
      if self.ip.lower() == "none":
4292
        self.ip = None
4293
      else:
4294
        if not utils.IsValidIP(self.ip):
4295
          raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
4296
    else:
4297
      self.do_ip = False
4298
    self.do_bridge = (self.bridge is not None)
4299
    if self.mac is not None:
4300
      if self.cfg.IsMacInUse(self.mac):
4301
        raise errors.OpPrereqError('MAC address %s already in use in cluster' %
4302
                                   self.mac)
4303
      if not utils.IsValidMac(self.mac):
4304
        raise errors.OpPrereqError('Invalid MAC address %s' % self.mac)
4305

    
4306
    if self.kernel_path is not None:
4307
      self.do_kernel_path = True
4308
      if self.kernel_path == constants.VALUE_NONE:
4309
        raise errors.OpPrereqError("Can't set instance to no kernel")
4310

    
4311
      if self.kernel_path != constants.VALUE_DEFAULT:
4312
        if not os.path.isabs(self.kernel_path):
4313
          raise errors.OpPrereqError("The kernel path must be an absolute"
4314
                                    " filename")
4315
    else:
4316
      self.do_kernel_path = False
4317

    
4318
    if self.initrd_path is not None:
4319
      self.do_initrd_path = True
4320
      if self.initrd_path not in (constants.VALUE_NONE,
4321
                                  constants.VALUE_DEFAULT):
4322
        if not os.path.isabs(self.initrd_path):
4323
          raise errors.OpPrereqError("The initrd path must be an absolute"
4324
                                    " filename")
4325
    else:
4326
      self.do_initrd_path = False
4327

    
4328
    # boot order verification
4329
    if self.hvm_boot_order is not None:
4330
      if self.hvm_boot_order != constants.VALUE_DEFAULT:
4331
        if len(self.hvm_boot_order.strip("acdn")) != 0:
4332
          raise errors.OpPrereqError("invalid boot order specified,"
4333
                                     " must be one or more of [acdn]"
4334
                                     " or 'default'")
4335

    
4336
    # hvm_cdrom_image_path verification
4337
    if self.op.hvm_cdrom_image_path is not None:
4338
      if not (os.path.isabs(self.op.hvm_cdrom_image_path) or
4339
              self.op.hvm_cdrom_image_path.lower() == "none"):
4340
        raise errors.OpPrereqError("The path to the HVM CDROM image must"
4341
                                   " be an absolute path or None, not %s" %
4342
                                   self.op.hvm_cdrom_image_path)
4343
      if not (os.path.isfile(self.op.hvm_cdrom_image_path) or
4344
              self.op.hvm_cdrom_image_path.lower() == "none"):
4345
        raise errors.OpPrereqError("The HVM CDROM image must either be a"
4346
                                   " regular file or a symlink pointing to"
4347
                                   " an existing regular file, not %s" %
4348
                                   self.op.hvm_cdrom_image_path)
4349

    
4350
    # vnc_bind_address verification
4351
    if self.op.vnc_bind_address is not None:
4352
      if not utils.IsValidIP(self.op.vnc_bind_address):
4353
        raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
4354
                                   " like a valid IP address" %
4355
                                   self.op.vnc_bind_address)
4356

    
4357
    instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4358
    assert self.instance is not None, \
4359
      "Cannot retrieve locked instance %s" % self.op.instance_name
4360
    self.warn = []
4361
    if self.mem is not None and not self.force:
4362
      pnode = self.instance.primary_node
4363
      nodelist = [pnode]
4364
      nodelist.extend(instance.secondary_nodes)
4365
      instance_info = rpc.call_instance_info(pnode, instance.name)
4366
      nodeinfo = rpc.call_node_info(nodelist, self.cfg.GetVGName())
4367

    
4368
      if pnode not in nodeinfo or not isinstance(nodeinfo[pnode], dict):
4369
        # Assume the primary node is unreachable and go ahead
4370
        self.warn.append("Can't get info from primary node %s" % pnode)
4371
      else:
4372
        if instance_info:
4373
          current_mem = instance_info['memory']
4374
        else:
4375
          # Assume instance not running
4376
          # (there is a slight race condition here, but it's not very probable,
4377
          # and we have no other way to check)
4378
          current_mem = 0
4379
        miss_mem = self.mem - current_mem - nodeinfo[pnode]['memory_free']
4380
        if miss_mem > 0:
4381
          raise errors.OpPrereqError("This change will prevent the instance"
4382
                                     " from starting, due to %d MB of memory"
4383
                                     " missing on its primary node" % miss_mem)
4384

    
4385
      for node in instance.secondary_nodes:
4386
        if node not in nodeinfo or not isinstance(nodeinfo[node], dict):
4387
          self.warn.append("Can't get info from secondary node %s" % node)
4388
        elif self.mem > nodeinfo[node]['memory_free']:
4389
          self.warn.append("Not enough memory to failover instance to secondary"
4390
                           " node %s" % node)
4391

    
4392
    # Xen HVM device type checks
4393
    if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
4394
      if self.op.hvm_nic_type is not None:
4395
        if self.op.hvm_nic_type not in constants.HT_HVM_VALID_NIC_TYPES:
4396
          raise errors.OpPrereqError("Invalid NIC type %s specified for Xen"
4397
                                     " HVM  hypervisor" % self.op.hvm_nic_type)
4398
      if self.op.hvm_disk_type is not None:
4399
        if self.op.hvm_disk_type not in constants.HT_HVM_VALID_DISK_TYPES:
4400
          raise errors.OpPrereqError("Invalid disk type %s specified for Xen"
4401
                                     " HVM hypervisor" % self.op.hvm_disk_type)
4402

    
4403
    return
4404

    
4405
  def Exec(self, feedback_fn):
4406
    """Modifies an instance.
4407

4408
    All parameters take effect only at the next restart of the instance.
4409
    """
4410
    # Process here the warnings from CheckPrereq, as we don't have a
4411
    # feedback_fn there.
4412
    for warn in self.warn:
4413
      feedback_fn("WARNING: %s" % warn)
4414

    
4415
    result = []
4416
    instance = self.instance
4417
    if self.mem:
4418
      instance.memory = self.mem
4419
      result.append(("mem", self.mem))
4420
    if self.vcpus:
4421
      instance.vcpus = self.vcpus
4422
      result.append(("vcpus",  self.vcpus))
4423
    if self.do_ip:
4424
      instance.nics[0].ip = self.ip
4425
      result.append(("ip", self.ip))
4426
    if self.bridge:
4427
      instance.nics[0].bridge = self.bridge
4428
      result.append(("bridge", self.bridge))
4429
    if self.mac:
4430
      instance.nics[0].mac = self.mac
4431
      result.append(("mac", self.mac))
4432
    if self.do_kernel_path:
4433
      instance.kernel_path = self.kernel_path
4434
      result.append(("kernel_path", self.kernel_path))
4435
    if self.do_initrd_path:
4436
      instance.initrd_path = self.initrd_path
4437
      result.append(("initrd_path", self.initrd_path))
4438
    if self.hvm_boot_order:
4439
      if self.hvm_boot_order == constants.VALUE_DEFAULT:
4440
        instance.hvm_boot_order = None
4441
      else:
4442
        instance.hvm_boot_order = self.hvm_boot_order
4443
      result.append(("hvm_boot_order", self.hvm_boot_order))
4444
    if self.hvm_acpi is not None:
4445
      instance.hvm_acpi = self.hvm_acpi
4446
      result.append(("hvm_acpi", self.hvm_acpi))
4447
    if self.hvm_pae is not None:
4448
      instance.hvm_pae = self.hvm_pae
4449
      result.append(("hvm_pae", self.hvm_pae))
4450
    if self.hvm_nic_type is not None:
4451
      instance.hvm_nic_type = self.hvm_nic_type
4452
      result.append(("hvm_nic_type", self.hvm_nic_type))
4453
    if self.hvm_disk_type is not None:
4454
      instance.hvm_disk_type = self.hvm_disk_type
4455
      result.append(("hvm_disk_type", self.hvm_disk_type))
4456
    if self.hvm_cdrom_image_path:
4457
      if self.hvm_cdrom_image_path == constants.VALUE_NONE:
4458
        instance.hvm_cdrom_image_path = None
4459
      else:
4460
        instance.hvm_cdrom_image_path = self.hvm_cdrom_image_path
4461
      result.append(("hvm_cdrom_image_path", self.hvm_cdrom_image_path))
4462
    if self.vnc_bind_address:
4463
      instance.vnc_bind_address = self.vnc_bind_address
4464
      result.append(("vnc_bind_address", self.vnc_bind_address))
4465

    
4466
    self.cfg.Update(instance)
4467

    
4468
    return result
4469

    
4470

    
4471
class LUQueryExports(NoHooksLU):
4472
  """Query the exports list
4473

4474
  """
4475
  _OP_REQP = ['nodes']
4476
  REQ_BGL = False
4477

    
4478
  def ExpandNames(self):
4479
    self.needed_locks = {}
4480
    self.share_locks[locking.LEVEL_NODE] = 1
4481
    if not self.op.nodes:
4482
      self.needed_locks[locking.LEVEL_NODE] = None
4483
    else:
4484
      self.needed_locks[locking.LEVEL_NODE] = \
4485
        _GetWantedNodes(self, self.op.nodes)
4486

    
4487
  def CheckPrereq(self):
4488
    """Check prerequisites.
4489

4490
    """
4491
    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
4492

    
4493
  def Exec(self, feedback_fn):
4494
    """Compute the list of all the exported system images.
4495

4496
    Returns:
4497
      a dictionary with the structure node->(export-list)
4498
      where export-list is a list of the instances exported on
4499
      that node.
4500

4501
    """
4502
    return rpc.call_export_list(self.nodes)
4503

    
4504

    
4505
class LUExportInstance(LogicalUnit):
4506
  """Export an instance to an image in the cluster.
4507

4508
  """
4509
  HPATH = "instance-export"
4510
  HTYPE = constants.HTYPE_INSTANCE
4511
  _OP_REQP = ["instance_name", "target_node", "shutdown"]
4512

    
4513
  def BuildHooksEnv(self):
4514
    """Build hooks env.
4515

4516
    This will run on the master, primary node and target node.
4517

4518
    """
4519
    env = {
4520
      "EXPORT_NODE": self.op.target_node,
4521
      "EXPORT_DO_SHUTDOWN": self.op.shutdown,
4522
      }
4523
    env.update(_BuildInstanceHookEnvByObject(self.instance))
4524
    nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
4525
          self.op.target_node]
4526
    return env, nl, nl
4527

    
4528
  def CheckPrereq(self):
4529
    """Check prerequisites.
4530

4531
    This checks that the instance and node names are valid.
4532

4533
    """
4534
    instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4535
    self.instance = self.cfg.GetInstanceInfo(instance_name)
4536
    if self.instance is None:
4537
      raise errors.OpPrereqError("Instance '%s' not found" %
4538
                                 self.op.instance_name)
4539

    
4540
    # node verification
4541
    dst_node_short = self.cfg.ExpandNodeName(self.op.target_node)
4542
    self.dst_node = self.cfg.GetNodeInfo(dst_node_short)
4543

    
4544
    if self.dst_node is None:
4545
      raise errors.OpPrereqError("Destination node '%s' is unknown." %
4546
                                 self.op.target_node)
4547
    self.op.target_node = self.dst_node.name
4548

    
4549
    # instance disk type verification
4550
    for disk in self.instance.disks:
4551
      if disk.dev_type == constants.LD_FILE:
4552
        raise errors.OpPrereqError("Export not supported for instances with"
4553
                                   " file-based disks")
4554

    
4555
  def Exec(self, feedback_fn):
4556
    """Export an instance to an image in the cluster.
4557

4558
    """
4559
    instance = self.instance
4560
    dst_node = self.dst_node
4561
    src_node = instance.primary_node
4562
    if self.op.shutdown:
4563
      # shutdown the instance, but not the disks
4564
      if not rpc.call_instance_shutdown(src_node, instance):
4565
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
4566
                                 (instance.name, src_node))
4567

    
4568
    vgname = self.cfg.GetVGName()
4569

    
4570
    snap_disks = []
4571

    
4572
    try:
4573
      for disk in instance.disks:
4574
        if disk.iv_name == "sda":
4575
          # new_dev_name will be a snapshot of an lvm leaf of the one we passed
4576
          new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
4577

    
4578
          if not new_dev_name:
4579
            logger.Error("could not snapshot block device %s on node %s" %
4580
                         (disk.logical_id[1], src_node))
4581
          else:
4582
            new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
4583
                                      logical_id=(vgname, new_dev_name),
4584
                                      physical_id=(vgname, new_dev_name),
4585
                                      iv_name=disk.iv_name)
4586
            snap_disks.append(new_dev)
4587

    
4588
    finally:
4589
      if self.op.shutdown and instance.status == "up":
4590
        if not rpc.call_instance_start(src_node, instance, None):
4591
          _ShutdownInstanceDisks(instance, self.cfg)
4592
          raise errors.OpExecError("Could not start instance")
4593

    
4594
    # TODO: check for size
4595

    
4596
    for dev in snap_disks:
4597
      if not rpc.call_snapshot_export(src_node, dev, dst_node.name, instance):
4598
        logger.Error("could not export block device %s from node %s to node %s"
4599
                     % (dev.logical_id[1], src_node, dst_node.name))
4600
      if not rpc.call_blockdev_remove(src_node, dev):
4601
        logger.Error("could not remove snapshot block device %s from node %s" %
4602
                     (dev.logical_id[1], src_node))
4603

    
4604
    if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
4605
      logger.Error("could not finalize export for instance %s on node %s" %
4606
                   (instance.name, dst_node.name))
4607

    
4608
    nodelist = self.cfg.GetNodeList()
4609
    nodelist.remove(dst_node.name)
4610

    
4611
    # on one-node clusters nodelist will be empty after the removal
4612
    # if we proceed the backup would be removed because OpQueryExports
4613
    # substitutes an empty list with the full cluster node list.
4614
    if nodelist:
4615
      exportlist = rpc.call_export_list(nodelist)
4616
      for node in exportlist:
4617
        if instance.name in exportlist[node]:
4618
          if not rpc.call_export_remove(node, instance.name):
4619
            logger.Error("could not remove older export for instance %s"
4620
                         " on node %s" % (instance.name, node))
4621

    
4622

    
4623
class LURemoveExport(NoHooksLU):
4624
  """Remove exports related to the named instance.
4625

4626
  """
4627
  _OP_REQP = ["instance_name"]
4628

    
4629
  def CheckPrereq(self):
4630
    """Check prerequisites.
4631
    """
4632
    pass
4633

    
4634
  def Exec(self, feedback_fn):
4635
    """Remove any export.
4636

4637
    """
4638
    instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4639
    # If the instance was not found we'll try with the name that was passed in.
4640
    # This will only work if it was an FQDN, though.
4641
    fqdn_warn = False
4642
    if not instance_name:
4643
      fqdn_warn = True
4644
      instance_name = self.op.instance_name
4645

    
4646
    exportlist = rpc.call_export_list(self.cfg.GetNodeList())
4647
    found = False
4648
    for node in exportlist:
4649
      if instance_name in exportlist[node]:
4650
        found = True
4651
        if not rpc.call_export_remove(node, instance_name):
4652
          logger.Error("could not remove export for instance %s"
4653
                       " on node %s" % (instance_name, node))
4654

    
4655
    if fqdn_warn and not found:
4656
      feedback_fn("Export not found. If trying to remove an export belonging"
4657
                  " to a deleted instance please use its Fully Qualified"
4658
                  " Domain Name.")
4659

    
4660

    
4661
class TagsLU(NoHooksLU):
4662
  """Generic tags LU.
4663

4664
  This is an abstract class which is the parent of all the other tags LUs.
4665

4666
  """
4667
  def CheckPrereq(self):
4668
    """Check prerequisites.
4669

4670
    """
4671
    if self.op.kind == constants.TAG_CLUSTER:
4672
      self.target = self.cfg.GetClusterInfo()
4673
    elif self.op.kind == constants.TAG_NODE:
4674
      name = self.cfg.ExpandNodeName(self.op.name)
4675
      if name is None:
4676
        raise errors.OpPrereqError("Invalid node name (%s)" %
4677
                                   (self.op.name,))
4678
      self.op.name = name
4679
      self.target = self.cfg.GetNodeInfo(name)
4680
    elif self.op.kind == constants.TAG_INSTANCE:
4681
      name = self.cfg.ExpandInstanceName(self.op.name)
4682
      if name is None:
4683
        raise errors.OpPrereqError("Invalid instance name (%s)" %
4684
                                   (self.op.name,))
4685
      self.op.name = name
4686
      self.target = self.cfg.GetInstanceInfo(name)
4687
    else:
4688
      raise errors.OpPrereqError("Wrong tag type requested (%s)" %
4689
                                 str(self.op.kind))
4690

    
4691

    
4692
class LUGetTags(TagsLU):
4693
  """Returns the tags of a given object.
4694

4695
  """
4696
  _OP_REQP = ["kind", "name"]
4697

    
4698
  def Exec(self, feedback_fn):
4699
    """Returns the tag list.
4700

4701
    """
4702
    return list(self.target.GetTags())
4703

    
4704

    
4705
class LUSearchTags(NoHooksLU):
4706
  """Searches the tags for a given pattern.
4707

4708
  """
4709
  _OP_REQP = ["pattern"]
4710

    
4711
  def CheckPrereq(self):
4712
    """Check prerequisites.
4713

4714
    This checks the pattern passed for validity by compiling it.
4715

4716
    """
4717
    try:
4718
      self.re = re.compile(self.op.pattern)
4719
    except re.error, err:
4720
      raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
4721
                                 (self.op.pattern, err))
4722

    
4723
  def Exec(self, feedback_fn):
4724
    """Returns the tag list.
4725

4726
    """
4727
    cfg = self.cfg
4728
    tgts = [("/cluster", cfg.GetClusterInfo())]
4729
    ilist = [cfg.GetInstanceInfo(name) for name in cfg.GetInstanceList()]
4730
    tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
4731
    nlist = [cfg.GetNodeInfo(name) for name in cfg.GetNodeList()]
4732
    tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
4733
    results = []
4734
    for path, target in tgts:
4735
      for tag in target.GetTags():
4736
        if self.re.search(tag):
4737
          results.append((path, tag))
4738
    return results
4739

    
4740

    
4741
class LUAddTags(TagsLU):
4742
  """Sets a tag on a given object.
4743

4744
  """
4745
  _OP_REQP = ["kind", "name", "tags"]
4746

    
4747
  def CheckPrereq(self):
4748
    """Check prerequisites.
4749

4750
    This checks the type and length of the tag name and value.
4751

4752
    """
4753
    TagsLU.CheckPrereq(self)
4754
    for tag in self.op.tags:
4755
      objects.TaggableObject.ValidateTag(tag)
4756

    
4757
  def Exec(self, feedback_fn):
4758
    """Sets the tag.
4759

4760
    """
4761
    try:
4762
      for tag in self.op.tags:
4763
        self.target.AddTag(tag)
4764
    except errors.TagError, err:
4765
      raise errors.OpExecError("Error while setting tag: %s" % str(err))
4766
    try:
4767
      self.cfg.Update(self.target)
4768
    except errors.ConfigurationError:
4769
      raise errors.OpRetryError("There has been a modification to the"
4770
                                " config file and the operation has been"
4771
                                " aborted. Please retry.")
4772

    
4773

    
4774
class LUDelTags(TagsLU):
4775
  """Delete a list of tags from a given object.
4776

4777
  """
4778
  _OP_REQP = ["kind", "name", "tags"]
4779

    
4780
  def CheckPrereq(self):
4781
    """Check prerequisites.
4782

4783
    This checks that we have the given tag.
4784

4785
    """
4786
    TagsLU.CheckPrereq(self)
4787
    for tag in self.op.tags:
4788
      objects.TaggableObject.ValidateTag(tag)
4789
    del_tags = frozenset(self.op.tags)
4790
    cur_tags = self.target.GetTags()
4791
    if not del_tags <= cur_tags:
4792
      diff_tags = del_tags - cur_tags
4793
      diff_names = ["'%s'" % tag for tag in diff_tags]
4794
      diff_names.sort()
4795
      raise errors.OpPrereqError("Tag(s) %s not found" %
4796
                                 (",".join(diff_names)))
4797

    
4798
  def Exec(self, feedback_fn):
4799
    """Remove the tag from the object.
4800

4801
    """
4802
    for tag in self.op.tags:
4803
      self.target.RemoveTag(tag)
4804
    try:
4805
      self.cfg.Update(self.target)
4806
    except errors.ConfigurationError:
4807
      raise errors.OpRetryError("There has been a modification to the"
4808
                                " config file and the operation has been"
4809
                                " aborted. Please retry.")
4810

    
4811

    
4812
class LUTestDelay(NoHooksLU):
4813
  """Sleep for a specified amount of time.
4814

4815
  This LU sleeps on the master and/or nodes for a specified amount of
4816
  time.
4817

4818
  """
4819
  _OP_REQP = ["duration", "on_master", "on_nodes"]
4820
  REQ_BGL = False
4821

    
4822
  def ExpandNames(self):
4823
    """Expand names and set required locks.
4824

4825
    This expands the node list, if any.
4826

4827
    """
4828
    self.needed_locks = {}
4829
    if self.op.on_nodes:
4830
      # _GetWantedNodes can be used here, but is not always appropriate to use
4831
      # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
4832
      # more information.
4833
      self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
4834
      self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
4835

    
4836
  def CheckPrereq(self):
4837
    """Check prerequisites.
4838

4839
    """
4840

    
4841
  def Exec(self, feedback_fn):
4842
    """Do the actual sleep.
4843

4844
    """
4845
    if self.op.on_master:
4846
      if not utils.TestDelay(self.op.duration):
4847
        raise errors.OpExecError("Error during master delay test")
4848
    if self.op.on_nodes:
4849
      result = rpc.call_test_delay(self.op.on_nodes, self.op.duration)
4850
      if not result:
4851
        raise errors.OpExecError("Complete failure from rpc call")
4852
      for node, node_result in result.items():
4853
        if not node_result:
4854
          raise errors.OpExecError("Failure during rpc call to node %s,"
4855
                                   " result: %s" % (node, node_result))
4856

    
4857

    
4858
class IAllocator(object):
4859
  """IAllocator framework.
4860

4861
  An IAllocator instance has three sets of attributes:
4862
    - cfg/sstore that are needed to query the cluster
4863
    - input data (all members of the _KEYS class attribute are required)
4864
    - four buffer attributes (in|out_data|text), that represent the
4865
      input (to the external script) in text and data structure format,
4866
      and the output from it, again in two formats
4867
    - the result variables from the script (success, info, nodes) for
4868
      easy usage
4869

4870
  """
4871
  _ALLO_KEYS = [
4872
    "mem_size", "disks", "disk_template",
4873
    "os", "tags", "nics", "vcpus",
4874
    ]
4875
  _RELO_KEYS = [
4876
    "relocate_from",
4877
    ]
4878

    
4879
  def __init__(self, cfg, sstore, mode, name, **kwargs):
4880
    self.cfg = cfg
4881
    self.sstore = sstore
4882
    # init buffer variables
4883
    self.in_text = self.out_text = self.in_data = self.out_data = None
4884
    # init all input fields so that pylint is happy
4885
    self.mode = mode
4886
    self.name = name
4887
    self.mem_size = self.disks = self.disk_template = None
4888
    self.os = self.tags = self.nics = self.vcpus = None
4889
    self.relocate_from = None
4890
    # computed fields
4891
    self.required_nodes = None
4892
    # init result fields
4893
    self.success = self.info = self.nodes = None
4894
    if self.mode == constants.IALLOCATOR_MODE_ALLOC:
4895
      keyset = self._ALLO_KEYS
4896
    elif self.mode == constants.IALLOCATOR_MODE_RELOC:
4897
      keyset = self._RELO_KEYS
4898
    else:
4899
      raise errors.ProgrammerError("Unknown mode '%s' passed to the"
4900
                                   " IAllocator" % self.mode)
4901
    for key in kwargs:
4902
      if key not in keyset:
4903
        raise errors.ProgrammerError("Invalid input parameter '%s' to"
4904
                                     " IAllocator" % key)
4905
      setattr(self, key, kwargs[key])
4906
    for key in keyset:
4907
      if key not in kwargs:
4908
        raise errors.ProgrammerError("Missing input parameter '%s' to"
4909
                                     " IAllocator" % key)
4910
    self._BuildInputData()
4911

    
4912
  def _ComputeClusterData(self):
4913
    """Compute the generic allocator input data.
4914

4915
    This is the data that is independent of the actual operation.
4916

4917
    """
4918
    cfg = self.cfg
4919
    # cluster data
4920
    data = {
4921
      "version": 1,
4922
      "cluster_name": self.sstore.GetClusterName(),
4923
      "cluster_tags": list(cfg.GetClusterInfo().GetTags()),
4924
      "hypervisor_type": self.sstore.GetHypervisorType(),
4925
      # we don't have job IDs
4926
      }
4927

    
4928
    i_list = [cfg.GetInstanceInfo(iname) for iname in cfg.GetInstanceList()]
4929

    
4930
    # node data
4931
    node_results = {}
4932
    node_list = cfg.GetNodeList()
4933
    node_data = rpc.call_node_info(node_list, cfg.GetVGName())
4934
    for nname in node_list:
4935
      ninfo = cfg.GetNodeInfo(nname)
4936
      if nname not in node_data or not isinstance(node_data[nname], dict):
4937
        raise errors.OpExecError("Can't get data for node %s" % nname)
4938
      remote_info = node_data[nname]
4939
      for attr in ['memory_total', 'memory_free', 'memory_dom0',
4940
                   'vg_size', 'vg_free', 'cpu_total']:
4941
        if attr not in remote_info:
4942
          raise errors.OpExecError("Node '%s' didn't return attribute '%s'" %
4943
                                   (nname, attr))
4944
        try:
4945
          remote_info[attr] = int(remote_info[attr])
4946
        except ValueError, err:
4947
          raise errors.OpExecError("Node '%s' returned invalid value for '%s':"
4948
                                   " %s" % (nname, attr, str(err)))
4949
      # compute memory used by primary instances
4950
      i_p_mem = i_p_up_mem = 0
4951
      for iinfo in i_list:
4952
        if iinfo.primary_node == nname:
4953
          i_p_mem += iinfo.memory
4954
          if iinfo.status == "up":
4955
            i_p_up_mem += iinfo.memory
4956

    
4957
      # compute memory used by instances
4958
      pnr = {
4959
        "tags": list(ninfo.GetTags()),
4960
        "total_memory": remote_info['memory_total'],
4961
        "reserved_memory": remote_info['memory_dom0'],
4962
        "free_memory": remote_info['memory_free'],
4963
        "i_pri_memory": i_p_mem,
4964
        "i_pri_up_memory": i_p_up_mem,
4965
        "total_disk": remote_info['vg_size'],
4966
        "free_disk": remote_info['vg_free'],
4967
        "primary_ip": ninfo.primary_ip,
4968
        "secondary_ip": ninfo.secondary_ip,
4969
        "total_cpus": remote_info['cpu_total'],
4970
        }
4971
      node_results[nname] = pnr
4972
    data["nodes"] = node_results
4973

    
4974
    # instance data
4975
    instance_data = {}
4976
    for iinfo in i_list:
4977
      nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
4978
                  for n in iinfo.nics]
4979
      pir = {
4980
        "tags": list(iinfo.GetTags()),
4981
        "should_run": iinfo.status == "up",
4982
        "vcpus": iinfo.vcpus,
4983
        "memory": iinfo.memory,
4984
        "os": iinfo.os,
4985
        "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
4986
        "nics": nic_data,
4987
        "disks": [{"size": dsk.size, "mode": "w"} for dsk in iinfo.disks],
4988
        "disk_template": iinfo.disk_template,
4989
        }
4990
      instance_data[iinfo.name] = pir
4991

    
4992
    data["instances"] = instance_data
4993

    
4994
    self.in_data = data
4995

    
4996
  def _AddNewInstance(self):
4997
    """Add new instance data to allocator structure.
4998

4999
    This in combination with _AllocatorGetClusterData will create the
5000
    correct structure needed as input for the allocator.
5001

5002
    The checks for the completeness of the opcode must have already been
5003
    done.
5004

5005
    """
5006
    data = self.in_data
5007
    if len(self.disks) != 2:
5008
      raise errors.OpExecError("Only two-disk configurations supported")
5009

    
5010
    disk_space = _ComputeDiskSize(self.disk_template,
5011
                                  self.disks[0]["size"], self.disks[1]["size"])
5012

    
5013
    if self.disk_template in constants.DTS_NET_MIRROR:
5014
      self.required_nodes = 2
5015
    else:
5016
      self.required_nodes = 1
5017
    request = {
5018
      "type": "allocate",
5019
      "name": self.name,
5020
      "disk_template": self.disk_template,
5021
      "tags": self.tags,
5022
      "os": self.os,
5023
      "vcpus": self.vcpus,
5024
      "memory": self.mem_size,
5025
      "disks": self.disks,
5026
      "disk_space_total": disk_space,
5027
      "nics": self.nics,
5028
      "required_nodes": self.required_nodes,
5029
      }
5030
    data["request"] = request
5031

    
5032
  def _AddRelocateInstance(self):
5033
    """Add relocate instance data to allocator structure.
5034

5035
    This in combination with _IAllocatorGetClusterData will create the
5036
    correct structure needed as input for the allocator.
5037

5038
    The checks for the completeness of the opcode must have already been
5039
    done.
5040

5041
    """
5042
    instance = self.cfg.GetInstanceInfo(self.name)
5043
    if instance is None:
5044
      raise errors.ProgrammerError("Unknown instance '%s' passed to"
5045
                                   " IAllocator" % self.name)
5046

    
5047
    if instance.disk_template not in constants.DTS_NET_MIRROR:
5048
      raise errors.OpPrereqError("Can't relocate non-mirrored instances")
5049

    
5050
    if len(instance.secondary_nodes) != 1:
5051
      raise errors.OpPrereqError("Instance has not exactly one secondary node")
5052

    
5053
    self.required_nodes = 1
5054

    
5055
    disk_space = _ComputeDiskSize(instance.disk_template,
5056
                                  instance.disks[0].size,
5057
                                  instance.disks[1].size)
5058

    
5059
    request = {
5060
      "type": "relocate",
5061
      "name": self.name,
5062
      "disk_space_total": disk_space,
5063
      "required_nodes": self.required_nodes,
5064
      "relocate_from": self.relocate_from,
5065
      }
5066
    self.in_data["request"] = request
5067

    
5068
  def _BuildInputData(self):
5069
    """Build input data structures.
5070

5071
    """
5072
    self._ComputeClusterData()
5073

    
5074
    if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5075
      self._AddNewInstance()
5076
    else:
5077
      self._AddRelocateInstance()
5078

    
5079
    self.in_text = serializer.Dump(self.in_data)
5080

    
5081
  def Run(self, name, validate=True, call_fn=rpc.call_iallocator_runner):
5082
    """Run an instance allocator and return the results.
5083

5084
    """
5085
    data = self.in_text
5086

    
5087
    result = call_fn(self.sstore.GetMasterNode(), name, self.in_text)
5088

    
5089
    if not isinstance(result, tuple) or len(result) != 4:
5090
      raise errors.OpExecError("Invalid result from master iallocator runner")
5091

    
5092
    rcode, stdout, stderr, fail = result
5093

    
5094
    if rcode == constants.IARUN_NOTFOUND:
5095
      raise errors.OpExecError("Can't find allocator '%s'" % name)
5096
    elif rcode == constants.IARUN_FAILURE:
5097
      raise errors.OpExecError("Instance allocator call failed: %s,"
5098
                               " output: %s" % (fail, stdout+stderr))
5099
    self.out_text = stdout
5100
    if validate:
5101
      self._ValidateResult()
5102

    
5103
  def _ValidateResult(self):
5104
    """Process the allocator results.
5105

5106
    This will process and if successful save the result in
5107
    self.out_data and the other parameters.
5108

5109
    """
5110
    try:
5111
      rdict = serializer.Load(self.out_text)
5112
    except Exception, err:
5113
      raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
5114

    
5115
    if not isinstance(rdict, dict):
5116
      raise errors.OpExecError("Can't parse iallocator results: not a dict")
5117

    
5118
    for key in "success", "info", "nodes":
5119
      if key not in rdict:
5120
        raise errors.OpExecError("Can't parse iallocator results:"
5121
                                 " missing key '%s'" % key)
5122
      setattr(self, key, rdict[key])
5123

    
5124
    if not isinstance(rdict["nodes"], list):
5125
      raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
5126
                               " is not a list")
5127
    self.out_data = rdict
5128

    
5129

    
5130
class LUTestAllocator(NoHooksLU):
5131
  """Run allocator tests.
5132

5133
  This LU runs the allocator tests
5134

5135
  """
5136
  _OP_REQP = ["direction", "mode", "name"]
5137

    
5138
  def CheckPrereq(self):
5139
    """Check prerequisites.
5140

5141
    This checks the opcode parameters depending on the director and mode test.
5142

5143
    """
5144
    if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5145
      for attr in ["name", "mem_size", "disks", "disk_template",
5146
                   "os", "tags", "nics", "vcpus"]:
5147
        if not hasattr(self.op, attr):
5148
          raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
5149
                                     attr)
5150
      iname = self.cfg.ExpandInstanceName(self.op.name)
5151
      if iname is not None:
5152
        raise errors.OpPrereqError("Instance '%s' already in the cluster" %
5153
                                   iname)
5154
      if not isinstance(self.op.nics, list):
5155
        raise errors.OpPrereqError("Invalid parameter 'nics'")
5156
      for row in self.op.nics:
5157
        if (not isinstance(row, dict) or
5158
            "mac" not in row or
5159
            "ip" not in row or
5160
            "bridge" not in row):
5161
          raise errors.OpPrereqError("Invalid contents of the"
5162
                                     " 'nics' parameter")
5163
      if not isinstance(self.op.disks, list):
5164
        raise errors.OpPrereqError("Invalid parameter 'disks'")
5165
      if len(self.op.disks) != 2:
5166
        raise errors.OpPrereqError("Only two-disk configurations supported")
5167
      for row in self.op.disks:
5168
        if (not isinstance(row, dict) or
5169
            "size" not in row or
5170
            not isinstance(row["size"], int) or
5171
            "mode" not in row or
5172
            row["mode"] not in ['r', 'w']):
5173
          raise errors.OpPrereqError("Invalid contents of the"
5174
                                     " 'disks' parameter")
5175
    elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
5176
      if not hasattr(self.op, "name"):
5177
        raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
5178
      fname = self.cfg.ExpandInstanceName(self.op.name)
5179
      if fname is None:
5180
        raise errors.OpPrereqError("Instance '%s' not found for relocation" %
5181
                                   self.op.name)
5182
      self.op.name = fname
5183
      self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
5184
    else:
5185
      raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
5186
                                 self.op.mode)
5187

    
5188
    if self.op.direction == constants.IALLOCATOR_DIR_OUT:
5189
      if not hasattr(self.op, "allocator") or self.op.allocator is None:
5190
        raise errors.OpPrereqError("Missing allocator name")
5191
    elif self.op.direction != constants.IALLOCATOR_DIR_IN:
5192
      raise errors.OpPrereqError("Wrong allocator test '%s'" %
5193
                                 self.op.direction)
5194

    
5195
  def Exec(self, feedback_fn):
5196
    """Run the allocator test.
5197

5198
    """
5199
    if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5200
      ial = IAllocator(self.cfg, self.sstore,
5201
                       mode=self.op.mode,
5202
                       name=self.op.name,
5203
                       mem_size=self.op.mem_size,
5204
                       disks=self.op.disks,
5205
                       disk_template=self.op.disk_template,
5206
                       os=self.op.os,
5207
                       tags=self.op.tags,
5208
                       nics=self.op.nics,
5209
                       vcpus=self.op.vcpus,
5210
                       )
5211
    else:
5212
      ial = IAllocator(self.cfg, self.sstore,
5213
                       mode=self.op.mode,
5214
                       name=self.op.name,
5215
                       relocate_from=list(self.relocate_from),
5216
                       )
5217

    
5218
    if self.op.direction == constants.IALLOCATOR_DIR_IN:
5219
      result = ial.in_text
5220
    else:
5221
      ial.Run(self.op.allocator, validate=False)
5222
      result = ial.out_text
5223
    return result