Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ 6683bba2

History | View | Annotate | Download (173.6 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0613,W0201
25

    
26
import os
27
import os.path
28
import sha
29
import time
30
import tempfile
31
import re
32
import platform
33

    
34
from ganeti import rpc
35
from ganeti import ssh
36
from ganeti import logger
37
from ganeti import utils
38
from ganeti import errors
39
from ganeti import hypervisor
40
from ganeti import locking
41
from ganeti import constants
42
from ganeti import objects
43
from ganeti import opcodes
44
from ganeti import serializer
45

    
46

    
47
class LogicalUnit(object):
48
  """Logical Unit base class.
49

50
  Subclasses must follow these rules:
51
    - implement ExpandNames
52
    - implement CheckPrereq
53
    - implement Exec
54
    - implement BuildHooksEnv
55
    - redefine HPATH and HTYPE
56
    - optionally redefine their run requirements:
57
        REQ_MASTER: the LU needs to run on the master node
58
        REQ_WSSTORE: the LU needs a writable SimpleStore
59
        REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
60

61
  Note that all commands require root permissions.
62

63
  """
64
  HPATH = None
65
  HTYPE = None
66
  _OP_REQP = []
67
  REQ_MASTER = True
68
  REQ_WSSTORE = False
69
  REQ_BGL = True
70

    
71
  def __init__(self, processor, op, context, sstore):
72
    """Constructor for LogicalUnit.
73

74
    This needs to be overriden in derived classes in order to check op
75
    validity.
76

77
    """
78
    self.proc = processor
79
    self.op = op
80
    self.cfg = context.cfg
81
    self.sstore = sstore
82
    self.context = context
83
    self.needed_locks = None
84
    self.acquired_locks = {}
85
    self.share_locks = dict(((i, 0) for i in locking.LEVELS))
86
    # Used to force good behavior when calling helper functions
87
    self.recalculate_locks = {}
88
    self.__ssh = None
89

    
90
    for attr_name in self._OP_REQP:
91
      attr_val = getattr(op, attr_name, None)
92
      if attr_val is None:
93
        raise errors.OpPrereqError("Required parameter '%s' missing" %
94
                                   attr_name)
95

    
96
    if not self.cfg.IsCluster():
97
      raise errors.OpPrereqError("Cluster not initialized yet,"
98
                                 " use 'gnt-cluster init' first.")
99
    if self.REQ_MASTER:
100
      master = sstore.GetMasterNode()
101
      if master != utils.HostInfo().name:
102
        raise errors.OpPrereqError("Commands must be run on the master"
103
                                   " node %s" % master)
104

    
105
  def __GetSSH(self):
106
    """Returns the SshRunner object
107

108
    """
109
    if not self.__ssh:
110
      self.__ssh = ssh.SshRunner(self.sstore)
111
    return self.__ssh
112

    
113
  ssh = property(fget=__GetSSH)
114

    
115
  def ExpandNames(self):
116
    """Expand names for this LU.
117

118
    This method is called before starting to execute the opcode, and it should
119
    update all the parameters of the opcode to their canonical form (e.g. a
120
    short node name must be fully expanded after this method has successfully
121
    completed). This way locking, hooks, logging, ecc. can work correctly.
122

123
    LUs which implement this method must also populate the self.needed_locks
124
    member, as a dict with lock levels as keys, and a list of needed lock names
125
    as values. Rules:
126
      - Use an empty dict if you don't need any lock
127
      - If you don't need any lock at a particular level omit that level
128
      - Don't put anything for the BGL level
129
      - If you want all locks at a level use None as a value
130
        (this reflects what LockSet does, and will be replaced before
131
        CheckPrereq with the full list of nodes that have been locked)
132

133
    If you need to share locks (rather than acquire them exclusively) at one
134
    level you can modify self.share_locks, setting a true value (usually 1) for
135
    that level. By default locks are not shared.
136

137
    Examples:
138
    # Acquire all nodes and one instance
139
    self.needed_locks = {
140
      locking.LEVEL_NODE: None,
141
      locking.LEVEL_INSTANCES: ['instance1.example.tld'],
142
    }
143
    # Acquire just two nodes
144
    self.needed_locks = {
145
      locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
146
    }
147
    # Acquire no locks
148
    self.needed_locks = {} # No, you can't leave it to the default value None
149

150
    """
151
    # The implementation of this method is mandatory only if the new LU is
152
    # concurrent, so that old LUs don't need to be changed all at the same
153
    # time.
154
    if self.REQ_BGL:
155
      self.needed_locks = {} # Exclusive LUs don't need locks.
156
    else:
157
      raise NotImplementedError
158

    
159
  def DeclareLocks(self, level):
160
    """Declare LU locking needs for a level
161

162
    While most LUs can just declare their locking needs at ExpandNames time,
163
    sometimes there's the need to calculate some locks after having acquired
164
    the ones before. This function is called just before acquiring locks at a
165
    particular level, but after acquiring the ones at lower levels, and permits
166
    such calculations. It can be used to modify self.needed_locks, and by
167
    default it does nothing.
168

169
    This function is only called if you have something already set in
170
    self.needed_locks for the level.
171

172
    @param level: Locking level which is going to be locked
173
    @type level: member of ganeti.locking.LEVELS
174

175
    """
176

    
177
  def CheckPrereq(self):
178
    """Check prerequisites for this LU.
179

180
    This method should check that the prerequisites for the execution
181
    of this LU are fulfilled. It can do internode communication, but
182
    it should be idempotent - no cluster or system changes are
183
    allowed.
184

185
    The method should raise errors.OpPrereqError in case something is
186
    not fulfilled. Its return value is ignored.
187

188
    This method should also update all the parameters of the opcode to
189
    their canonical form if it hasn't been done by ExpandNames before.
190

191
    """
192
    raise NotImplementedError
193

    
194
  def Exec(self, feedback_fn):
195
    """Execute the LU.
196

197
    This method should implement the actual work. It should raise
198
    errors.OpExecError for failures that are somewhat dealt with in
199
    code, or expected.
200

201
    """
202
    raise NotImplementedError
203

    
204
  def BuildHooksEnv(self):
205
    """Build hooks environment for this LU.
206

207
    This method should return a three-node tuple consisting of: a dict
208
    containing the environment that will be used for running the
209
    specific hook for this LU, a list of node names on which the hook
210
    should run before the execution, and a list of node names on which
211
    the hook should run after the execution.
212

213
    The keys of the dict must not have 'GANETI_' prefixed as this will
214
    be handled in the hooks runner. Also note additional keys will be
215
    added by the hooks runner. If the LU doesn't define any
216
    environment, an empty dict (and not None) should be returned.
217

218
    No nodes should be returned as an empty list (and not None).
219

220
    Note that if the HPATH for a LU class is None, this function will
221
    not be called.
222

223
    """
224
    raise NotImplementedError
225

    
226
  def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
227
    """Notify the LU about the results of its hooks.
228

229
    This method is called every time a hooks phase is executed, and notifies
230
    the Logical Unit about the hooks' result. The LU can then use it to alter
231
    its result based on the hooks.  By default the method does nothing and the
232
    previous result is passed back unchanged but any LU can define it if it
233
    wants to use the local cluster hook-scripts somehow.
234

235
    Args:
236
      phase: the hooks phase that has just been run
237
      hooks_results: the results of the multi-node hooks rpc call
238
      feedback_fn: function to send feedback back to the caller
239
      lu_result: the previous result this LU had, or None in the PRE phase.
240

241
    """
242
    return lu_result
243

    
244
  def _ExpandAndLockInstance(self):
245
    """Helper function to expand and lock an instance.
246

247
    Many LUs that work on an instance take its name in self.op.instance_name
248
    and need to expand it and then declare the expanded name for locking. This
249
    function does it, and then updates self.op.instance_name to the expanded
250
    name. It also initializes needed_locks as a dict, if this hasn't been done
251
    before.
252

253
    """
254
    if self.needed_locks is None:
255
      self.needed_locks = {}
256
    else:
257
      assert locking.LEVEL_INSTANCE not in self.needed_locks, \
258
        "_ExpandAndLockInstance called with instance-level locks set"
259
    expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
260
    if expanded_name is None:
261
      raise errors.OpPrereqError("Instance '%s' not known" %
262
                                  self.op.instance_name)
263
    self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
264
    self.op.instance_name = expanded_name
265

    
266
  def _LockInstancesNodes(self):
267
    """Helper function to declare instances' nodes for locking.
268

269
    This function should be called after locking one or more instances to lock
270
    their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
271
    with all primary or secondary nodes for instances already locked and
272
    present in self.needed_locks[locking.LEVEL_INSTANCE].
273

274
    It should be called from DeclareLocks, and for safety only works if
275
    self.recalculate_locks[locking.LEVEL_NODE] is set.
276

277
    In the future it may grow parameters to just lock some instance's nodes, or
278
    to just lock primaries or secondary nodes, if needed.
279

280
    If should be called in DeclareLocks in a way similar to:
281

282
    if level == locking.LEVEL_NODE:
283
      self._LockInstancesNodes()
284

285
    """
286
    assert locking.LEVEL_NODE in self.recalculate_locks, \
287
      "_LockInstancesNodes helper function called with no nodes to recalculate"
288

    
289
    # TODO: check if we're really been called with the instance locks held
290

    
291
    # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
292
    # future we might want to have different behaviors depending on the value
293
    # of self.recalculate_locks[locking.LEVEL_NODE]
294
    wanted_nodes = []
295
    for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
296
      instance = self.context.cfg.GetInstanceInfo(instance_name)
297
      wanted_nodes.append(instance.primary_node)
298
      wanted_nodes.extend(instance.secondary_nodes)
299
    self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
300

    
301
    del self.recalculate_locks[locking.LEVEL_NODE]
302

    
303

    
304
class NoHooksLU(LogicalUnit):
305
  """Simple LU which runs no hooks.
306

307
  This LU is intended as a parent for other LogicalUnits which will
308
  run no hooks, in order to reduce duplicate code.
309

310
  """
311
  HPATH = None
312
  HTYPE = None
313

    
314

    
315
def _GetWantedNodes(lu, nodes):
316
  """Returns list of checked and expanded node names.
317

318
  Args:
319
    nodes: List of nodes (strings) or None for all
320

321
  """
322
  if not isinstance(nodes, list):
323
    raise errors.OpPrereqError("Invalid argument type 'nodes'")
324

    
325
  if nodes:
326
    wanted = []
327

    
328
    for name in nodes:
329
      node = lu.cfg.ExpandNodeName(name)
330
      if node is None:
331
        raise errors.OpPrereqError("No such node name '%s'" % name)
332
      wanted.append(node)
333

    
334
  else:
335
    wanted = lu.cfg.GetNodeList()
336
  return utils.NiceSort(wanted)
337

    
338

    
339
def _GetWantedInstances(lu, instances):
340
  """Returns list of checked and expanded instance names.
341

342
  Args:
343
    instances: List of instances (strings) or None for all
344

345
  """
346
  if not isinstance(instances, list):
347
    raise errors.OpPrereqError("Invalid argument type 'instances'")
348

    
349
  if instances:
350
    wanted = []
351

    
352
    for name in instances:
353
      instance = lu.cfg.ExpandInstanceName(name)
354
      if instance is None:
355
        raise errors.OpPrereqError("No such instance name '%s'" % name)
356
      wanted.append(instance)
357

    
358
  else:
359
    wanted = lu.cfg.GetInstanceList()
360
  return utils.NiceSort(wanted)
361

    
362

    
363
def _CheckOutputFields(static, dynamic, selected):
364
  """Checks whether all selected fields are valid.
365

366
  Args:
367
    static: Static fields
368
    dynamic: Dynamic fields
369

370
  """
371
  static_fields = frozenset(static)
372
  dynamic_fields = frozenset(dynamic)
373

    
374
  all_fields = static_fields | dynamic_fields
375

    
376
  if not all_fields.issuperset(selected):
377
    raise errors.OpPrereqError("Unknown output fields selected: %s"
378
                               % ",".join(frozenset(selected).
379
                                          difference(all_fields)))
380

    
381

    
382
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
383
                          memory, vcpus, nics):
384
  """Builds instance related env variables for hooks from single variables.
385

386
  Args:
387
    secondary_nodes: List of secondary nodes as strings
388
  """
389
  env = {
390
    "OP_TARGET": name,
391
    "INSTANCE_NAME": name,
392
    "INSTANCE_PRIMARY": primary_node,
393
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
394
    "INSTANCE_OS_TYPE": os_type,
395
    "INSTANCE_STATUS": status,
396
    "INSTANCE_MEMORY": memory,
397
    "INSTANCE_VCPUS": vcpus,
398
  }
399

    
400
  if nics:
401
    nic_count = len(nics)
402
    for idx, (ip, bridge, mac) in enumerate(nics):
403
      if ip is None:
404
        ip = ""
405
      env["INSTANCE_NIC%d_IP" % idx] = ip
406
      env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
407
      env["INSTANCE_NIC%d_HWADDR" % idx] = mac
408
  else:
409
    nic_count = 0
410

    
411
  env["INSTANCE_NIC_COUNT"] = nic_count
412

    
413
  return env
414

    
415

    
416
def _BuildInstanceHookEnvByObject(instance, override=None):
417
  """Builds instance related env variables for hooks from an object.
418

419
  Args:
420
    instance: objects.Instance object of instance
421
    override: dict of values to override
422
  """
423
  args = {
424
    'name': instance.name,
425
    'primary_node': instance.primary_node,
426
    'secondary_nodes': instance.secondary_nodes,
427
    'os_type': instance.os,
428
    'status': instance.os,
429
    'memory': instance.memory,
430
    'vcpus': instance.vcpus,
431
    'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
432
  }
433
  if override:
434
    args.update(override)
435
  return _BuildInstanceHookEnv(**args)
436

    
437

    
438
def _CheckInstanceBridgesExist(instance):
439
  """Check that the brigdes needed by an instance exist.
440

441
  """
442
  # check bridges existance
443
  brlist = [nic.bridge for nic in instance.nics]
444
  if not rpc.call_bridges_exist(instance.primary_node, brlist):
445
    raise errors.OpPrereqError("one or more target bridges %s does not"
446
                               " exist on destination node '%s'" %
447
                               (brlist, instance.primary_node))
448

    
449

    
450
class LUDestroyCluster(NoHooksLU):
451
  """Logical unit for destroying the cluster.
452

453
  """
454
  _OP_REQP = []
455

    
456
  def CheckPrereq(self):
457
    """Check prerequisites.
458

459
    This checks whether the cluster is empty.
460

461
    Any errors are signalled by raising errors.OpPrereqError.
462

463
    """
464
    master = self.sstore.GetMasterNode()
465

    
466
    nodelist = self.cfg.GetNodeList()
467
    if len(nodelist) != 1 or nodelist[0] != master:
468
      raise errors.OpPrereqError("There are still %d node(s) in"
469
                                 " this cluster." % (len(nodelist) - 1))
470
    instancelist = self.cfg.GetInstanceList()
471
    if instancelist:
472
      raise errors.OpPrereqError("There are still %d instance(s) in"
473
                                 " this cluster." % len(instancelist))
474

    
475
  def Exec(self, feedback_fn):
476
    """Destroys the cluster.
477

478
    """
479
    master = self.sstore.GetMasterNode()
480
    if not rpc.call_node_stop_master(master, False):
481
      raise errors.OpExecError("Could not disable the master role")
482
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
483
    utils.CreateBackup(priv_key)
484
    utils.CreateBackup(pub_key)
485
    return master
486

    
487

    
488
class LUVerifyCluster(LogicalUnit):
489
  """Verifies the cluster status.
490

491
  """
492
  HPATH = "cluster-verify"
493
  HTYPE = constants.HTYPE_CLUSTER
494
  _OP_REQP = ["skip_checks"]
495

    
496
  def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
497
                  remote_version, feedback_fn):
498
    """Run multiple tests against a node.
499

500
    Test list:
501
      - compares ganeti version
502
      - checks vg existance and size > 20G
503
      - checks config file checksum
504
      - checks ssh to other nodes
505

506
    Args:
507
      node: name of the node to check
508
      file_list: required list of files
509
      local_cksum: dictionary of local files and their checksums
510

511
    """
512
    # compares ganeti version
513
    local_version = constants.PROTOCOL_VERSION
514
    if not remote_version:
515
      feedback_fn("  - ERROR: connection to %s failed" % (node))
516
      return True
517

    
518
    if local_version != remote_version:
519
      feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
520
                      (local_version, node, remote_version))
521
      return True
522

    
523
    # checks vg existance and size > 20G
524

    
525
    bad = False
526
    if not vglist:
527
      feedback_fn("  - ERROR: unable to check volume groups on node %s." %
528
                      (node,))
529
      bad = True
530
    else:
531
      vgstatus = utils.CheckVolumeGroupSize(vglist, self.cfg.GetVGName(),
532
                                            constants.MIN_VG_SIZE)
533
      if vgstatus:
534
        feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
535
        bad = True
536

    
537
    # checks config file checksum
538
    # checks ssh to any
539

    
540
    if 'filelist' not in node_result:
541
      bad = True
542
      feedback_fn("  - ERROR: node hasn't returned file checksum data")
543
    else:
544
      remote_cksum = node_result['filelist']
545
      for file_name in file_list:
546
        if file_name not in remote_cksum:
547
          bad = True
548
          feedback_fn("  - ERROR: file '%s' missing" % file_name)
549
        elif remote_cksum[file_name] != local_cksum[file_name]:
550
          bad = True
551
          feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
552

    
553
    if 'nodelist' not in node_result:
554
      bad = True
555
      feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
556
    else:
557
      if node_result['nodelist']:
558
        bad = True
559
        for node in node_result['nodelist']:
560
          feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
561
                          (node, node_result['nodelist'][node]))
562
    if 'node-net-test' not in node_result:
563
      bad = True
564
      feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
565
    else:
566
      if node_result['node-net-test']:
567
        bad = True
568
        nlist = utils.NiceSort(node_result['node-net-test'].keys())
569
        for node in nlist:
570
          feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
571
                          (node, node_result['node-net-test'][node]))
572

    
573
    hyp_result = node_result.get('hypervisor', None)
574
    if hyp_result is not None:
575
      feedback_fn("  - ERROR: hypervisor verify failure: '%s'" % hyp_result)
576
    return bad
577

    
578
  def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
579
                      node_instance, feedback_fn):
580
    """Verify an instance.
581

582
    This function checks to see if the required block devices are
583
    available on the instance's node.
584

585
    """
586
    bad = False
587

    
588
    node_current = instanceconfig.primary_node
589

    
590
    node_vol_should = {}
591
    instanceconfig.MapLVsByNode(node_vol_should)
592

    
593
    for node in node_vol_should:
594
      for volume in node_vol_should[node]:
595
        if node not in node_vol_is or volume not in node_vol_is[node]:
596
          feedback_fn("  - ERROR: volume %s missing on node %s" %
597
                          (volume, node))
598
          bad = True
599

    
600
    if not instanceconfig.status == 'down':
601
      if (node_current not in node_instance or
602
          not instance in node_instance[node_current]):
603
        feedback_fn("  - ERROR: instance %s not running on node %s" %
604
                        (instance, node_current))
605
        bad = True
606

    
607
    for node in node_instance:
608
      if (not node == node_current):
609
        if instance in node_instance[node]:
610
          feedback_fn("  - ERROR: instance %s should not run on node %s" %
611
                          (instance, node))
612
          bad = True
613

    
614
    return bad
615

    
616
  def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
617
    """Verify if there are any unknown volumes in the cluster.
618

619
    The .os, .swap and backup volumes are ignored. All other volumes are
620
    reported as unknown.
621

622
    """
623
    bad = False
624

    
625
    for node in node_vol_is:
626
      for volume in node_vol_is[node]:
627
        if node not in node_vol_should or volume not in node_vol_should[node]:
628
          feedback_fn("  - ERROR: volume %s on node %s should not exist" %
629
                      (volume, node))
630
          bad = True
631
    return bad
632

    
633
  def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
634
    """Verify the list of running instances.
635

636
    This checks what instances are running but unknown to the cluster.
637

638
    """
639
    bad = False
640
    for node in node_instance:
641
      for runninginstance in node_instance[node]:
642
        if runninginstance not in instancelist:
643
          feedback_fn("  - ERROR: instance %s on node %s should not exist" %
644
                          (runninginstance, node))
645
          bad = True
646
    return bad
647

    
648
  def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
649
    """Verify N+1 Memory Resilience.
650

651
    Check that if one single node dies we can still start all the instances it
652
    was primary for.
653

654
    """
655
    bad = False
656

    
657
    for node, nodeinfo in node_info.iteritems():
658
      # This code checks that every node which is now listed as secondary has
659
      # enough memory to host all instances it is supposed to should a single
660
      # other node in the cluster fail.
661
      # FIXME: not ready for failover to an arbitrary node
662
      # FIXME: does not support file-backed instances
663
      # WARNING: we currently take into account down instances as well as up
664
      # ones, considering that even if they're down someone might want to start
665
      # them even in the event of a node failure.
666
      for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
667
        needed_mem = 0
668
        for instance in instances:
669
          needed_mem += instance_cfg[instance].memory
670
        if nodeinfo['mfree'] < needed_mem:
671
          feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
672
                      " failovers should node %s fail" % (node, prinode))
673
          bad = True
674
    return bad
675

    
676
  def CheckPrereq(self):
677
    """Check prerequisites.
678

679
    Transform the list of checks we're going to skip into a set and check that
680
    all its members are valid.
681

682
    """
683
    self.skip_set = frozenset(self.op.skip_checks)
684
    if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
685
      raise errors.OpPrereqError("Invalid checks to be skipped specified")
686

    
687
  def BuildHooksEnv(self):
688
    """Build hooks env.
689

690
    Cluster-Verify hooks just rone in the post phase and their failure makes
691
    the output be logged in the verify output and the verification to fail.
692

693
    """
694
    all_nodes = self.cfg.GetNodeList()
695
    # TODO: populate the environment with useful information for verify hooks
696
    env = {}
697
    return env, [], all_nodes
698

    
699
  def Exec(self, feedback_fn):
700
    """Verify integrity of cluster, performing various test on nodes.
701

702
    """
703
    bad = False
704
    feedback_fn("* Verifying global settings")
705
    for msg in self.cfg.VerifyConfig():
706
      feedback_fn("  - ERROR: %s" % msg)
707

    
708
    vg_name = self.cfg.GetVGName()
709
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
710
    nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
711
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
712
    i_non_redundant = [] # Non redundant instances
713
    node_volume = {}
714
    node_instance = {}
715
    node_info = {}
716
    instance_cfg = {}
717

    
718
    # FIXME: verify OS list
719
    # do local checksums
720
    file_names = list(self.sstore.GetFileList())
721
    file_names.append(constants.SSL_CERT_FILE)
722
    file_names.append(constants.CLUSTER_CONF_FILE)
723
    local_checksums = utils.FingerprintFiles(file_names)
724

    
725
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
726
    all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
727
    all_instanceinfo = rpc.call_instance_list(nodelist)
728
    all_vglist = rpc.call_vg_list(nodelist)
729
    node_verify_param = {
730
      'filelist': file_names,
731
      'nodelist': nodelist,
732
      'hypervisor': None,
733
      'node-net-test': [(node.name, node.primary_ip, node.secondary_ip)
734
                        for node in nodeinfo]
735
      }
736
    all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
737
    all_rversion = rpc.call_version(nodelist)
738
    all_ninfo = rpc.call_node_info(nodelist, self.cfg.GetVGName())
739

    
740
    for node in nodelist:
741
      feedback_fn("* Verifying node %s" % node)
742
      result = self._VerifyNode(node, file_names, local_checksums,
743
                                all_vglist[node], all_nvinfo[node],
744
                                all_rversion[node], feedback_fn)
745
      bad = bad or result
746

    
747
      # node_volume
748
      volumeinfo = all_volumeinfo[node]
749

    
750
      if isinstance(volumeinfo, basestring):
751
        feedback_fn("  - ERROR: LVM problem on node %s: %s" %
752
                    (node, volumeinfo[-400:].encode('string_escape')))
753
        bad = True
754
        node_volume[node] = {}
755
      elif not isinstance(volumeinfo, dict):
756
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
757
        bad = True
758
        continue
759
      else:
760
        node_volume[node] = volumeinfo
761

    
762
      # node_instance
763
      nodeinstance = all_instanceinfo[node]
764
      if type(nodeinstance) != list:
765
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
766
        bad = True
767
        continue
768

    
769
      node_instance[node] = nodeinstance
770

    
771
      # node_info
772
      nodeinfo = all_ninfo[node]
773
      if not isinstance(nodeinfo, dict):
774
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
775
        bad = True
776
        continue
777

    
778
      try:
779
        node_info[node] = {
780
          "mfree": int(nodeinfo['memory_free']),
781
          "dfree": int(nodeinfo['vg_free']),
782
          "pinst": [],
783
          "sinst": [],
784
          # dictionary holding all instances this node is secondary for,
785
          # grouped by their primary node. Each key is a cluster node, and each
786
          # value is a list of instances which have the key as primary and the
787
          # current node as secondary.  this is handy to calculate N+1 memory
788
          # availability if you can only failover from a primary to its
789
          # secondary.
790
          "sinst-by-pnode": {},
791
        }
792
      except ValueError:
793
        feedback_fn("  - ERROR: invalid value returned from node %s" % (node,))
794
        bad = True
795
        continue
796

    
797
    node_vol_should = {}
798

    
799
    for instance in instancelist:
800
      feedback_fn("* Verifying instance %s" % instance)
801
      inst_config = self.cfg.GetInstanceInfo(instance)
802
      result =  self._VerifyInstance(instance, inst_config, node_volume,
803
                                     node_instance, feedback_fn)
804
      bad = bad or result
805

    
806
      inst_config.MapLVsByNode(node_vol_should)
807

    
808
      instance_cfg[instance] = inst_config
809

    
810
      pnode = inst_config.primary_node
811
      if pnode in node_info:
812
        node_info[pnode]['pinst'].append(instance)
813
      else:
814
        feedback_fn("  - ERROR: instance %s, connection to primary node"
815
                    " %s failed" % (instance, pnode))
816
        bad = True
817

    
818
      # If the instance is non-redundant we cannot survive losing its primary
819
      # node, so we are not N+1 compliant. On the other hand we have no disk
820
      # templates with more than one secondary so that situation is not well
821
      # supported either.
822
      # FIXME: does not support file-backed instances
823
      if len(inst_config.secondary_nodes) == 0:
824
        i_non_redundant.append(instance)
825
      elif len(inst_config.secondary_nodes) > 1:
826
        feedback_fn("  - WARNING: multiple secondaries for instance %s"
827
                    % instance)
828

    
829
      for snode in inst_config.secondary_nodes:
830
        if snode in node_info:
831
          node_info[snode]['sinst'].append(instance)
832
          if pnode not in node_info[snode]['sinst-by-pnode']:
833
            node_info[snode]['sinst-by-pnode'][pnode] = []
834
          node_info[snode]['sinst-by-pnode'][pnode].append(instance)
835
        else:
836
          feedback_fn("  - ERROR: instance %s, connection to secondary node"
837
                      " %s failed" % (instance, snode))
838

    
839
    feedback_fn("* Verifying orphan volumes")
840
    result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
841
                                       feedback_fn)
842
    bad = bad or result
843

    
844
    feedback_fn("* Verifying remaining instances")
845
    result = self._VerifyOrphanInstances(instancelist, node_instance,
846
                                         feedback_fn)
847
    bad = bad or result
848

    
849
    if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
850
      feedback_fn("* Verifying N+1 Memory redundancy")
851
      result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
852
      bad = bad or result
853

    
854
    feedback_fn("* Other Notes")
855
    if i_non_redundant:
856
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
857
                  % len(i_non_redundant))
858

    
859
    return not bad
860

    
861
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
862
    """Analize the post-hooks' result, handle it, and send some
863
    nicely-formatted feedback back to the user.
864

865
    Args:
866
      phase: the hooks phase that has just been run
867
      hooks_results: the results of the multi-node hooks rpc call
868
      feedback_fn: function to send feedback back to the caller
869
      lu_result: previous Exec result
870

871
    """
872
    # We only really run POST phase hooks, and are only interested in
873
    # their results
874
    if phase == constants.HOOKS_PHASE_POST:
875
      # Used to change hooks' output to proper indentation
876
      indent_re = re.compile('^', re.M)
877
      feedback_fn("* Hooks Results")
878
      if not hooks_results:
879
        feedback_fn("  - ERROR: general communication failure")
880
        lu_result = 1
881
      else:
882
        for node_name in hooks_results:
883
          show_node_header = True
884
          res = hooks_results[node_name]
885
          if res is False or not isinstance(res, list):
886
            feedback_fn("    Communication failure")
887
            lu_result = 1
888
            continue
889
          for script, hkr, output in res:
890
            if hkr == constants.HKR_FAIL:
891
              # The node header is only shown once, if there are
892
              # failing hooks on that node
893
              if show_node_header:
894
                feedback_fn("  Node %s:" % node_name)
895
                show_node_header = False
896
              feedback_fn("    ERROR: Script %s failed, output:" % script)
897
              output = indent_re.sub('      ', output)
898
              feedback_fn("%s" % output)
899
              lu_result = 1
900

    
901
      return lu_result
902

    
903

    
904
class LUVerifyDisks(NoHooksLU):
905
  """Verifies the cluster disks status.
906

907
  """
908
  _OP_REQP = []
909

    
910
  def CheckPrereq(self):
911
    """Check prerequisites.
912

913
    This has no prerequisites.
914

915
    """
916
    pass
917

    
918
  def Exec(self, feedback_fn):
919
    """Verify integrity of cluster disks.
920

921
    """
922
    result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
923

    
924
    vg_name = self.cfg.GetVGName()
925
    nodes = utils.NiceSort(self.cfg.GetNodeList())
926
    instances = [self.cfg.GetInstanceInfo(name)
927
                 for name in self.cfg.GetInstanceList()]
928

    
929
    nv_dict = {}
930
    for inst in instances:
931
      inst_lvs = {}
932
      if (inst.status != "up" or
933
          inst.disk_template not in constants.DTS_NET_MIRROR):
934
        continue
935
      inst.MapLVsByNode(inst_lvs)
936
      # transform { iname: {node: [vol,],},} to {(node, vol): iname}
937
      for node, vol_list in inst_lvs.iteritems():
938
        for vol in vol_list:
939
          nv_dict[(node, vol)] = inst
940

    
941
    if not nv_dict:
942
      return result
943

    
944
    node_lvs = rpc.call_volume_list(nodes, vg_name)
945

    
946
    to_act = set()
947
    for node in nodes:
948
      # node_volume
949
      lvs = node_lvs[node]
950

    
951
      if isinstance(lvs, basestring):
952
        logger.Info("error enumerating LVs on node %s: %s" % (node, lvs))
953
        res_nlvm[node] = lvs
954
      elif not isinstance(lvs, dict):
955
        logger.Info("connection to node %s failed or invalid data returned" %
956
                    (node,))
957
        res_nodes.append(node)
958
        continue
959

    
960
      for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
961
        inst = nv_dict.pop((node, lv_name), None)
962
        if (not lv_online and inst is not None
963
            and inst.name not in res_instances):
964
          res_instances.append(inst.name)
965

    
966
    # any leftover items in nv_dict are missing LVs, let's arrange the
967
    # data better
968
    for key, inst in nv_dict.iteritems():
969
      if inst.name not in res_missing:
970
        res_missing[inst.name] = []
971
      res_missing[inst.name].append(key)
972

    
973
    return result
974

    
975

    
976
class LURenameCluster(LogicalUnit):
977
  """Rename the cluster.
978

979
  """
980
  HPATH = "cluster-rename"
981
  HTYPE = constants.HTYPE_CLUSTER
982
  _OP_REQP = ["name"]
983
  REQ_WSSTORE = True
984

    
985
  def BuildHooksEnv(self):
986
    """Build hooks env.
987

988
    """
989
    env = {
990
      "OP_TARGET": self.sstore.GetClusterName(),
991
      "NEW_NAME": self.op.name,
992
      }
993
    mn = self.sstore.GetMasterNode()
994
    return env, [mn], [mn]
995

    
996
  def CheckPrereq(self):
997
    """Verify that the passed name is a valid one.
998

999
    """
1000
    hostname = utils.HostInfo(self.op.name)
1001

    
1002
    new_name = hostname.name
1003
    self.ip = new_ip = hostname.ip
1004
    old_name = self.sstore.GetClusterName()
1005
    old_ip = self.sstore.GetMasterIP()
1006
    if new_name == old_name and new_ip == old_ip:
1007
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
1008
                                 " cluster has changed")
1009
    if new_ip != old_ip:
1010
      if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1011
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
1012
                                   " reachable on the network. Aborting." %
1013
                                   new_ip)
1014

    
1015
    self.op.name = new_name
1016

    
1017
  def Exec(self, feedback_fn):
1018
    """Rename the cluster.
1019

1020
    """
1021
    clustername = self.op.name
1022
    ip = self.ip
1023
    ss = self.sstore
1024

    
1025
    # shutdown the master IP
1026
    master = ss.GetMasterNode()
1027
    if not rpc.call_node_stop_master(master, False):
1028
      raise errors.OpExecError("Could not disable the master role")
1029

    
1030
    try:
1031
      # modify the sstore
1032
      ss.SetKey(ss.SS_MASTER_IP, ip)
1033
      ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
1034

    
1035
      # Distribute updated ss config to all nodes
1036
      myself = self.cfg.GetNodeInfo(master)
1037
      dist_nodes = self.cfg.GetNodeList()
1038
      if myself.name in dist_nodes:
1039
        dist_nodes.remove(myself.name)
1040

    
1041
      logger.Debug("Copying updated ssconf data to all nodes")
1042
      for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1043
        fname = ss.KeyToFilename(keyname)
1044
        result = rpc.call_upload_file(dist_nodes, fname)
1045
        for to_node in dist_nodes:
1046
          if not result[to_node]:
1047
            logger.Error("copy of file %s to node %s failed" %
1048
                         (fname, to_node))
1049
    finally:
1050
      if not rpc.call_node_start_master(master, False):
1051
        logger.Error("Could not re-enable the master role on the master,"
1052
                     " please restart manually.")
1053

    
1054

    
1055
def _RecursiveCheckIfLVMBased(disk):
1056
  """Check if the given disk or its children are lvm-based.
1057

1058
  Args:
1059
    disk: ganeti.objects.Disk object
1060

1061
  Returns:
1062
    boolean indicating whether a LD_LV dev_type was found or not
1063

1064
  """
1065
  if disk.children:
1066
    for chdisk in disk.children:
1067
      if _RecursiveCheckIfLVMBased(chdisk):
1068
        return True
1069
  return disk.dev_type == constants.LD_LV
1070

    
1071

    
1072
class LUSetClusterParams(LogicalUnit):
1073
  """Change the parameters of the cluster.
1074

1075
  """
1076
  HPATH = "cluster-modify"
1077
  HTYPE = constants.HTYPE_CLUSTER
1078
  _OP_REQP = []
1079

    
1080
  def BuildHooksEnv(self):
1081
    """Build hooks env.
1082

1083
    """
1084
    env = {
1085
      "OP_TARGET": self.sstore.GetClusterName(),
1086
      "NEW_VG_NAME": self.op.vg_name,
1087
      }
1088
    mn = self.sstore.GetMasterNode()
1089
    return env, [mn], [mn]
1090

    
1091
  def CheckPrereq(self):
1092
    """Check prerequisites.
1093

1094
    This checks whether the given params don't conflict and
1095
    if the given volume group is valid.
1096

1097
    """
1098
    if not self.op.vg_name:
1099
      instances = [self.cfg.GetInstanceInfo(name)
1100
                   for name in self.cfg.GetInstanceList()]
1101
      for inst in instances:
1102
        for disk in inst.disks:
1103
          if _RecursiveCheckIfLVMBased(disk):
1104
            raise errors.OpPrereqError("Cannot disable lvm storage while"
1105
                                       " lvm-based instances exist")
1106

    
1107
    # if vg_name not None, checks given volume group on all nodes
1108
    if self.op.vg_name:
1109
      node_list = self.cfg.GetNodeList()
1110
      vglist = rpc.call_vg_list(node_list)
1111
      for node in node_list:
1112
        vgstatus = utils.CheckVolumeGroupSize(vglist[node], self.op.vg_name,
1113
                                              constants.MIN_VG_SIZE)
1114
        if vgstatus:
1115
          raise errors.OpPrereqError("Error on node '%s': %s" %
1116
                                     (node, vgstatus))
1117

    
1118
  def Exec(self, feedback_fn):
1119
    """Change the parameters of the cluster.
1120

1121
    """
1122
    if self.op.vg_name != self.cfg.GetVGName():
1123
      self.cfg.SetVGName(self.op.vg_name)
1124
    else:
1125
      feedback_fn("Cluster LVM configuration already in desired"
1126
                  " state, not changing")
1127

    
1128

    
1129
def _WaitForSync(cfgw, instance, proc, oneshot=False, unlock=False):
1130
  """Sleep and poll for an instance's disk to sync.
1131

1132
  """
1133
  if not instance.disks:
1134
    return True
1135

    
1136
  if not oneshot:
1137
    proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1138

    
1139
  node = instance.primary_node
1140

    
1141
  for dev in instance.disks:
1142
    cfgw.SetDiskID(dev, node)
1143

    
1144
  retries = 0
1145
  while True:
1146
    max_time = 0
1147
    done = True
1148
    cumul_degraded = False
1149
    rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1150
    if not rstats:
1151
      proc.LogWarning("Can't get any data from node %s" % node)
1152
      retries += 1
1153
      if retries >= 10:
1154
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1155
                                 " aborting." % node)
1156
      time.sleep(6)
1157
      continue
1158
    retries = 0
1159
    for i in range(len(rstats)):
1160
      mstat = rstats[i]
1161
      if mstat is None:
1162
        proc.LogWarning("Can't compute data for node %s/%s" %
1163
                        (node, instance.disks[i].iv_name))
1164
        continue
1165
      # we ignore the ldisk parameter
1166
      perc_done, est_time, is_degraded, _ = mstat
1167
      cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1168
      if perc_done is not None:
1169
        done = False
1170
        if est_time is not None:
1171
          rem_time = "%d estimated seconds remaining" % est_time
1172
          max_time = est_time
1173
        else:
1174
          rem_time = "no time estimate"
1175
        proc.LogInfo("- device %s: %5.2f%% done, %s" %
1176
                     (instance.disks[i].iv_name, perc_done, rem_time))
1177
    if done or oneshot:
1178
      break
1179

    
1180
    time.sleep(min(60, max_time))
1181

    
1182
  if done:
1183
    proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1184
  return not cumul_degraded
1185

    
1186

    
1187
def _CheckDiskConsistency(cfgw, dev, node, on_primary, ldisk=False):
1188
  """Check that mirrors are not degraded.
1189

1190
  The ldisk parameter, if True, will change the test from the
1191
  is_degraded attribute (which represents overall non-ok status for
1192
  the device(s)) to the ldisk (representing the local storage status).
1193

1194
  """
1195
  cfgw.SetDiskID(dev, node)
1196
  if ldisk:
1197
    idx = 6
1198
  else:
1199
    idx = 5
1200

    
1201
  result = True
1202
  if on_primary or dev.AssembleOnSecondary():
1203
    rstats = rpc.call_blockdev_find(node, dev)
1204
    if not rstats:
1205
      logger.ToStderr("Node %s: Disk degraded, not found or node down" % node)
1206
      result = False
1207
    else:
1208
      result = result and (not rstats[idx])
1209
  if dev.children:
1210
    for child in dev.children:
1211
      result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1212

    
1213
  return result
1214

    
1215

    
1216
class LUDiagnoseOS(NoHooksLU):
1217
  """Logical unit for OS diagnose/query.
1218

1219
  """
1220
  _OP_REQP = ["output_fields", "names"]
1221

    
1222
  def CheckPrereq(self):
1223
    """Check prerequisites.
1224

1225
    This always succeeds, since this is a pure query LU.
1226

1227
    """
1228
    if self.op.names:
1229
      raise errors.OpPrereqError("Selective OS query not supported")
1230

    
1231
    self.dynamic_fields = frozenset(["name", "valid", "node_status"])
1232
    _CheckOutputFields(static=[],
1233
                       dynamic=self.dynamic_fields,
1234
                       selected=self.op.output_fields)
1235

    
1236
  @staticmethod
1237
  def _DiagnoseByOS(node_list, rlist):
1238
    """Remaps a per-node return list into an a per-os per-node dictionary
1239

1240
      Args:
1241
        node_list: a list with the names of all nodes
1242
        rlist: a map with node names as keys and OS objects as values
1243

1244
      Returns:
1245
        map: a map with osnames as keys and as value another map, with
1246
             nodes as
1247
             keys and list of OS objects as values
1248
             e.g. {"debian-etch": {"node1": [<object>,...],
1249
                                   "node2": [<object>,]}
1250
                  }
1251

1252
    """
1253
    all_os = {}
1254
    for node_name, nr in rlist.iteritems():
1255
      if not nr:
1256
        continue
1257
      for os_obj in nr:
1258
        if os_obj.name not in all_os:
1259
          # build a list of nodes for this os containing empty lists
1260
          # for each node in node_list
1261
          all_os[os_obj.name] = {}
1262
          for nname in node_list:
1263
            all_os[os_obj.name][nname] = []
1264
        all_os[os_obj.name][node_name].append(os_obj)
1265
    return all_os
1266

    
1267
  def Exec(self, feedback_fn):
1268
    """Compute the list of OSes.
1269

1270
    """
1271
    node_list = self.cfg.GetNodeList()
1272
    node_data = rpc.call_os_diagnose(node_list)
1273
    if node_data == False:
1274
      raise errors.OpExecError("Can't gather the list of OSes")
1275
    pol = self._DiagnoseByOS(node_list, node_data)
1276
    output = []
1277
    for os_name, os_data in pol.iteritems():
1278
      row = []
1279
      for field in self.op.output_fields:
1280
        if field == "name":
1281
          val = os_name
1282
        elif field == "valid":
1283
          val = utils.all([osl and osl[0] for osl in os_data.values()])
1284
        elif field == "node_status":
1285
          val = {}
1286
          for node_name, nos_list in os_data.iteritems():
1287
            val[node_name] = [(v.status, v.path) for v in nos_list]
1288
        else:
1289
          raise errors.ParameterError(field)
1290
        row.append(val)
1291
      output.append(row)
1292

    
1293
    return output
1294

    
1295

    
1296
class LURemoveNode(LogicalUnit):
1297
  """Logical unit for removing a node.
1298

1299
  """
1300
  HPATH = "node-remove"
1301
  HTYPE = constants.HTYPE_NODE
1302
  _OP_REQP = ["node_name"]
1303

    
1304
  def BuildHooksEnv(self):
1305
    """Build hooks env.
1306

1307
    This doesn't run on the target node in the pre phase as a failed
1308
    node would then be impossible to remove.
1309

1310
    """
1311
    env = {
1312
      "OP_TARGET": self.op.node_name,
1313
      "NODE_NAME": self.op.node_name,
1314
      }
1315
    all_nodes = self.cfg.GetNodeList()
1316
    all_nodes.remove(self.op.node_name)
1317
    return env, all_nodes, all_nodes
1318

    
1319
  def CheckPrereq(self):
1320
    """Check prerequisites.
1321

1322
    This checks:
1323
     - the node exists in the configuration
1324
     - it does not have primary or secondary instances
1325
     - it's not the master
1326

1327
    Any errors are signalled by raising errors.OpPrereqError.
1328

1329
    """
1330
    node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1331
    if node is None:
1332
      raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1333

    
1334
    instance_list = self.cfg.GetInstanceList()
1335

    
1336
    masternode = self.sstore.GetMasterNode()
1337
    if node.name == masternode:
1338
      raise errors.OpPrereqError("Node is the master node,"
1339
                                 " you need to failover first.")
1340

    
1341
    for instance_name in instance_list:
1342
      instance = self.cfg.GetInstanceInfo(instance_name)
1343
      if node.name == instance.primary_node:
1344
        raise errors.OpPrereqError("Instance %s still running on the node,"
1345
                                   " please remove first." % instance_name)
1346
      if node.name in instance.secondary_nodes:
1347
        raise errors.OpPrereqError("Instance %s has node as a secondary,"
1348
                                   " please remove first." % instance_name)
1349
    self.op.node_name = node.name
1350
    self.node = node
1351

    
1352
  def Exec(self, feedback_fn):
1353
    """Removes the node from the cluster.
1354

1355
    """
1356
    node = self.node
1357
    logger.Info("stopping the node daemon and removing configs from node %s" %
1358
                node.name)
1359

    
1360
    self.context.RemoveNode(node.name)
1361

    
1362
    rpc.call_node_leave_cluster(node.name)
1363

    
1364

    
1365
class LUQueryNodes(NoHooksLU):
1366
  """Logical unit for querying nodes.
1367

1368
  """
1369
  _OP_REQP = ["output_fields", "names"]
1370
  REQ_BGL = False
1371

    
1372
  def ExpandNames(self):
1373
    self.dynamic_fields = frozenset([
1374
      "dtotal", "dfree",
1375
      "mtotal", "mnode", "mfree",
1376
      "bootid",
1377
      "ctotal",
1378
      ])
1379

    
1380
    _CheckOutputFields(static=["name", "pinst_cnt", "sinst_cnt",
1381
                               "pinst_list", "sinst_list",
1382
                               "pip", "sip", "tags"],
1383
                       dynamic=self.dynamic_fields,
1384
                       selected=self.op.output_fields)
1385

    
1386
    self.needed_locks = {}
1387
    self.share_locks[locking.LEVEL_NODE] = 1
1388
    # TODO: we could lock nodes only if the user asked for dynamic fields. For
1389
    # that we need atomic ways to get info for a group of nodes from the
1390
    # config, though.
1391
    if not self.op.names:
1392
      self.needed_locks[locking.LEVEL_NODE] = None
1393
    else:
1394
       self.needed_locks[locking.LEVEL_NODE] = \
1395
         _GetWantedNodes(self, self.op.names)
1396

    
1397
  def CheckPrereq(self):
1398
    """Check prerequisites.
1399

1400
    """
1401
    # This of course is valid only if we locked the nodes
1402
    self.wanted = self.acquired_locks[locking.LEVEL_NODE]
1403

    
1404
  def Exec(self, feedback_fn):
1405
    """Computes the list of nodes and their attributes.
1406

1407
    """
1408
    nodenames = self.wanted
1409
    nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1410

    
1411
    # begin data gathering
1412

    
1413
    if self.dynamic_fields.intersection(self.op.output_fields):
1414
      live_data = {}
1415
      node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1416
      for name in nodenames:
1417
        nodeinfo = node_data.get(name, None)
1418
        if nodeinfo:
1419
          live_data[name] = {
1420
            "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1421
            "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1422
            "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1423
            "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1424
            "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1425
            "ctotal": utils.TryConvert(int, nodeinfo['cpu_total']),
1426
            "bootid": nodeinfo['bootid'],
1427
            }
1428
        else:
1429
          live_data[name] = {}
1430
    else:
1431
      live_data = dict.fromkeys(nodenames, {})
1432

    
1433
    node_to_primary = dict([(name, set()) for name in nodenames])
1434
    node_to_secondary = dict([(name, set()) for name in nodenames])
1435

    
1436
    inst_fields = frozenset(("pinst_cnt", "pinst_list",
1437
                             "sinst_cnt", "sinst_list"))
1438
    if inst_fields & frozenset(self.op.output_fields):
1439
      instancelist = self.cfg.GetInstanceList()
1440

    
1441
      for instance_name in instancelist:
1442
        inst = self.cfg.GetInstanceInfo(instance_name)
1443
        if inst.primary_node in node_to_primary:
1444
          node_to_primary[inst.primary_node].add(inst.name)
1445
        for secnode in inst.secondary_nodes:
1446
          if secnode in node_to_secondary:
1447
            node_to_secondary[secnode].add(inst.name)
1448

    
1449
    # end data gathering
1450

    
1451
    output = []
1452
    for node in nodelist:
1453
      node_output = []
1454
      for field in self.op.output_fields:
1455
        if field == "name":
1456
          val = node.name
1457
        elif field == "pinst_list":
1458
          val = list(node_to_primary[node.name])
1459
        elif field == "sinst_list":
1460
          val = list(node_to_secondary[node.name])
1461
        elif field == "pinst_cnt":
1462
          val = len(node_to_primary[node.name])
1463
        elif field == "sinst_cnt":
1464
          val = len(node_to_secondary[node.name])
1465
        elif field == "pip":
1466
          val = node.primary_ip
1467
        elif field == "sip":
1468
          val = node.secondary_ip
1469
        elif field == "tags":
1470
          val = list(node.GetTags())
1471
        elif field in self.dynamic_fields:
1472
          val = live_data[node.name].get(field, None)
1473
        else:
1474
          raise errors.ParameterError(field)
1475
        node_output.append(val)
1476
      output.append(node_output)
1477

    
1478
    return output
1479

    
1480

    
1481
class LUQueryNodeVolumes(NoHooksLU):
1482
  """Logical unit for getting volumes on node(s).
1483

1484
  """
1485
  _OP_REQP = ["nodes", "output_fields"]
1486

    
1487
  def CheckPrereq(self):
1488
    """Check prerequisites.
1489

1490
    This checks that the fields required are valid output fields.
1491

1492
    """
1493
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1494

    
1495
    _CheckOutputFields(static=["node"],
1496
                       dynamic=["phys", "vg", "name", "size", "instance"],
1497
                       selected=self.op.output_fields)
1498

    
1499

    
1500
  def Exec(self, feedback_fn):
1501
    """Computes the list of nodes and their attributes.
1502

1503
    """
1504
    nodenames = self.nodes
1505
    volumes = rpc.call_node_volumes(nodenames)
1506

    
1507
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
1508
             in self.cfg.GetInstanceList()]
1509

    
1510
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1511

    
1512
    output = []
1513
    for node in nodenames:
1514
      if node not in volumes or not volumes[node]:
1515
        continue
1516

    
1517
      node_vols = volumes[node][:]
1518
      node_vols.sort(key=lambda vol: vol['dev'])
1519

    
1520
      for vol in node_vols:
1521
        node_output = []
1522
        for field in self.op.output_fields:
1523
          if field == "node":
1524
            val = node
1525
          elif field == "phys":
1526
            val = vol['dev']
1527
          elif field == "vg":
1528
            val = vol['vg']
1529
          elif field == "name":
1530
            val = vol['name']
1531
          elif field == "size":
1532
            val = int(float(vol['size']))
1533
          elif field == "instance":
1534
            for inst in ilist:
1535
              if node not in lv_by_node[inst]:
1536
                continue
1537
              if vol['name'] in lv_by_node[inst][node]:
1538
                val = inst.name
1539
                break
1540
            else:
1541
              val = '-'
1542
          else:
1543
            raise errors.ParameterError(field)
1544
          node_output.append(str(val))
1545

    
1546
        output.append(node_output)
1547

    
1548
    return output
1549

    
1550

    
1551
class LUAddNode(LogicalUnit):
1552
  """Logical unit for adding node to the cluster.
1553

1554
  """
1555
  HPATH = "node-add"
1556
  HTYPE = constants.HTYPE_NODE
1557
  _OP_REQP = ["node_name"]
1558

    
1559
  def BuildHooksEnv(self):
1560
    """Build hooks env.
1561

1562
    This will run on all nodes before, and on all nodes + the new node after.
1563

1564
    """
1565
    env = {
1566
      "OP_TARGET": self.op.node_name,
1567
      "NODE_NAME": self.op.node_name,
1568
      "NODE_PIP": self.op.primary_ip,
1569
      "NODE_SIP": self.op.secondary_ip,
1570
      }
1571
    nodes_0 = self.cfg.GetNodeList()
1572
    nodes_1 = nodes_0 + [self.op.node_name, ]
1573
    return env, nodes_0, nodes_1
1574

    
1575
  def CheckPrereq(self):
1576
    """Check prerequisites.
1577

1578
    This checks:
1579
     - the new node is not already in the config
1580
     - it is resolvable
1581
     - its parameters (single/dual homed) matches the cluster
1582

1583
    Any errors are signalled by raising errors.OpPrereqError.
1584

1585
    """
1586
    node_name = self.op.node_name
1587
    cfg = self.cfg
1588

    
1589
    dns_data = utils.HostInfo(node_name)
1590

    
1591
    node = dns_data.name
1592
    primary_ip = self.op.primary_ip = dns_data.ip
1593
    secondary_ip = getattr(self.op, "secondary_ip", None)
1594
    if secondary_ip is None:
1595
      secondary_ip = primary_ip
1596
    if not utils.IsValidIP(secondary_ip):
1597
      raise errors.OpPrereqError("Invalid secondary IP given")
1598
    self.op.secondary_ip = secondary_ip
1599

    
1600
    node_list = cfg.GetNodeList()
1601
    if not self.op.readd and node in node_list:
1602
      raise errors.OpPrereqError("Node %s is already in the configuration" %
1603
                                 node)
1604
    elif self.op.readd and node not in node_list:
1605
      raise errors.OpPrereqError("Node %s is not in the configuration" % node)
1606

    
1607
    for existing_node_name in node_list:
1608
      existing_node = cfg.GetNodeInfo(existing_node_name)
1609

    
1610
      if self.op.readd and node == existing_node_name:
1611
        if (existing_node.primary_ip != primary_ip or
1612
            existing_node.secondary_ip != secondary_ip):
1613
          raise errors.OpPrereqError("Readded node doesn't have the same IP"
1614
                                     " address configuration as before")
1615
        continue
1616

    
1617
      if (existing_node.primary_ip == primary_ip or
1618
          existing_node.secondary_ip == primary_ip or
1619
          existing_node.primary_ip == secondary_ip or
1620
          existing_node.secondary_ip == secondary_ip):
1621
        raise errors.OpPrereqError("New node ip address(es) conflict with"
1622
                                   " existing node %s" % existing_node.name)
1623

    
1624
    # check that the type of the node (single versus dual homed) is the
1625
    # same as for the master
1626
    myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1627
    master_singlehomed = myself.secondary_ip == myself.primary_ip
1628
    newbie_singlehomed = secondary_ip == primary_ip
1629
    if master_singlehomed != newbie_singlehomed:
1630
      if master_singlehomed:
1631
        raise errors.OpPrereqError("The master has no private ip but the"
1632
                                   " new node has one")
1633
      else:
1634
        raise errors.OpPrereqError("The master has a private ip but the"
1635
                                   " new node doesn't have one")
1636

    
1637
    # checks reachablity
1638
    if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1639
      raise errors.OpPrereqError("Node not reachable by ping")
1640

    
1641
    if not newbie_singlehomed:
1642
      # check reachability from my secondary ip to newbie's secondary ip
1643
      if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1644
                           source=myself.secondary_ip):
1645
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1646
                                   " based ping to noded port")
1647

    
1648
    self.new_node = objects.Node(name=node,
1649
                                 primary_ip=primary_ip,
1650
                                 secondary_ip=secondary_ip)
1651

    
1652
  def Exec(self, feedback_fn):
1653
    """Adds the new node to the cluster.
1654

1655
    """
1656
    new_node = self.new_node
1657
    node = new_node.name
1658

    
1659
    # check connectivity
1660
    result = rpc.call_version([node])[node]
1661
    if result:
1662
      if constants.PROTOCOL_VERSION == result:
1663
        logger.Info("communication to node %s fine, sw version %s match" %
1664
                    (node, result))
1665
      else:
1666
        raise errors.OpExecError("Version mismatch master version %s,"
1667
                                 " node version %s" %
1668
                                 (constants.PROTOCOL_VERSION, result))
1669
    else:
1670
      raise errors.OpExecError("Cannot get version from the new node")
1671

    
1672
    # setup ssh on node
1673
    logger.Info("copy ssh key to node %s" % node)
1674
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1675
    keyarray = []
1676
    keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1677
                constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1678
                priv_key, pub_key]
1679

    
1680
    for i in keyfiles:
1681
      f = open(i, 'r')
1682
      try:
1683
        keyarray.append(f.read())
1684
      finally:
1685
        f.close()
1686

    
1687
    result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1688
                               keyarray[3], keyarray[4], keyarray[5])
1689

    
1690
    if not result:
1691
      raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1692

    
1693
    # Add node to our /etc/hosts, and add key to known_hosts
1694
    utils.AddHostToEtcHosts(new_node.name)
1695

    
1696
    if new_node.secondary_ip != new_node.primary_ip:
1697
      if not rpc.call_node_tcp_ping(new_node.name,
1698
                                    constants.LOCALHOST_IP_ADDRESS,
1699
                                    new_node.secondary_ip,
1700
                                    constants.DEFAULT_NODED_PORT,
1701
                                    10, False):
1702
        raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1703
                                 " you gave (%s). Please fix and re-run this"
1704
                                 " command." % new_node.secondary_ip)
1705

    
1706
    node_verify_list = [self.sstore.GetMasterNode()]
1707
    node_verify_param = {
1708
      'nodelist': [node],
1709
      # TODO: do a node-net-test as well?
1710
    }
1711

    
1712
    result = rpc.call_node_verify(node_verify_list, node_verify_param)
1713
    for verifier in node_verify_list:
1714
      if not result[verifier]:
1715
        raise errors.OpExecError("Cannot communicate with %s's node daemon"
1716
                                 " for remote verification" % verifier)
1717
      if result[verifier]['nodelist']:
1718
        for failed in result[verifier]['nodelist']:
1719
          feedback_fn("ssh/hostname verification failed %s -> %s" %
1720
                      (verifier, result[verifier]['nodelist'][failed]))
1721
        raise errors.OpExecError("ssh/hostname verification failed.")
1722

    
1723
    # Distribute updated /etc/hosts and known_hosts to all nodes,
1724
    # including the node just added
1725
    myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1726
    dist_nodes = self.cfg.GetNodeList()
1727
    if not self.op.readd:
1728
      dist_nodes.append(node)
1729
    if myself.name in dist_nodes:
1730
      dist_nodes.remove(myself.name)
1731

    
1732
    logger.Debug("Copying hosts and known_hosts to all nodes")
1733
    for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1734
      result = rpc.call_upload_file(dist_nodes, fname)
1735
      for to_node in dist_nodes:
1736
        if not result[to_node]:
1737
          logger.Error("copy of file %s to node %s failed" %
1738
                       (fname, to_node))
1739

    
1740
    to_copy = self.sstore.GetFileList()
1741
    if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1742
      to_copy.append(constants.VNC_PASSWORD_FILE)
1743
    for fname in to_copy:
1744
      result = rpc.call_upload_file([node], fname)
1745
      if not result[node]:
1746
        logger.Error("could not copy file %s to node %s" % (fname, node))
1747

    
1748
    if self.op.readd:
1749
      self.context.ReaddNode(new_node)
1750
    else:
1751
      self.context.AddNode(new_node)
1752

    
1753

    
1754
class LUQueryClusterInfo(NoHooksLU):
1755
  """Query cluster configuration.
1756

1757
  """
1758
  _OP_REQP = []
1759
  REQ_MASTER = False
1760
  REQ_BGL = False
1761

    
1762
  def ExpandNames(self):
1763
    self.needed_locks = {}
1764

    
1765
  def CheckPrereq(self):
1766
    """No prerequsites needed for this LU.
1767

1768
    """
1769
    pass
1770

    
1771
  def Exec(self, feedback_fn):
1772
    """Return cluster config.
1773

1774
    """
1775
    result = {
1776
      "name": self.sstore.GetClusterName(),
1777
      "software_version": constants.RELEASE_VERSION,
1778
      "protocol_version": constants.PROTOCOL_VERSION,
1779
      "config_version": constants.CONFIG_VERSION,
1780
      "os_api_version": constants.OS_API_VERSION,
1781
      "export_version": constants.EXPORT_VERSION,
1782
      "master": self.sstore.GetMasterNode(),
1783
      "architecture": (platform.architecture()[0], platform.machine()),
1784
      "hypervisor_type": self.sstore.GetHypervisorType(),
1785
      }
1786

    
1787
    return result
1788

    
1789

    
1790
class LUDumpClusterConfig(NoHooksLU):
1791
  """Return a text-representation of the cluster-config.
1792

1793
  """
1794
  _OP_REQP = []
1795
  REQ_BGL = False
1796

    
1797
  def ExpandNames(self):
1798
    self.needed_locks = {}
1799

    
1800
  def CheckPrereq(self):
1801
    """No prerequisites.
1802

1803
    """
1804
    pass
1805

    
1806
  def Exec(self, feedback_fn):
1807
    """Dump a representation of the cluster config to the standard output.
1808

1809
    """
1810
    return self.cfg.DumpConfig()
1811

    
1812

    
1813
class LUActivateInstanceDisks(NoHooksLU):
1814
  """Bring up an instance's disks.
1815

1816
  """
1817
  _OP_REQP = ["instance_name"]
1818

    
1819
  def CheckPrereq(self):
1820
    """Check prerequisites.
1821

1822
    This checks that the instance is in the cluster.
1823

1824
    """
1825
    instance = self.cfg.GetInstanceInfo(
1826
      self.cfg.ExpandInstanceName(self.op.instance_name))
1827
    if instance is None:
1828
      raise errors.OpPrereqError("Instance '%s' not known" %
1829
                                 self.op.instance_name)
1830
    self.instance = instance
1831

    
1832

    
1833
  def Exec(self, feedback_fn):
1834
    """Activate the disks.
1835

1836
    """
1837
    disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
1838
    if not disks_ok:
1839
      raise errors.OpExecError("Cannot activate block devices")
1840

    
1841
    return disks_info
1842

    
1843

    
1844
def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
1845
  """Prepare the block devices for an instance.
1846

1847
  This sets up the block devices on all nodes.
1848

1849
  Args:
1850
    instance: a ganeti.objects.Instance object
1851
    ignore_secondaries: if true, errors on secondary nodes won't result
1852
                        in an error return from the function
1853

1854
  Returns:
1855
    false if the operation failed
1856
    list of (host, instance_visible_name, node_visible_name) if the operation
1857
         suceeded with the mapping from node devices to instance devices
1858
  """
1859
  device_info = []
1860
  disks_ok = True
1861
  iname = instance.name
1862
  # With the two passes mechanism we try to reduce the window of
1863
  # opportunity for the race condition of switching DRBD to primary
1864
  # before handshaking occured, but we do not eliminate it
1865

    
1866
  # The proper fix would be to wait (with some limits) until the
1867
  # connection has been made and drbd transitions from WFConnection
1868
  # into any other network-connected state (Connected, SyncTarget,
1869
  # SyncSource, etc.)
1870

    
1871
  # 1st pass, assemble on all nodes in secondary mode
1872
  for inst_disk in instance.disks:
1873
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1874
      cfg.SetDiskID(node_disk, node)
1875
      result = rpc.call_blockdev_assemble(node, node_disk, iname, False)
1876
      if not result:
1877
        logger.Error("could not prepare block device %s on node %s"
1878
                     " (is_primary=False, pass=1)" % (inst_disk.iv_name, node))
1879
        if not ignore_secondaries:
1880
          disks_ok = False
1881

    
1882
  # FIXME: race condition on drbd migration to primary
1883

    
1884
  # 2nd pass, do only the primary node
1885
  for inst_disk in instance.disks:
1886
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1887
      if node != instance.primary_node:
1888
        continue
1889
      cfg.SetDiskID(node_disk, node)
1890
      result = rpc.call_blockdev_assemble(node, node_disk, iname, True)
1891
      if not result:
1892
        logger.Error("could not prepare block device %s on node %s"
1893
                     " (is_primary=True, pass=2)" % (inst_disk.iv_name, node))
1894
        disks_ok = False
1895
    device_info.append((instance.primary_node, inst_disk.iv_name, result))
1896

    
1897
  # leave the disks configured for the primary node
1898
  # this is a workaround that would be fixed better by
1899
  # improving the logical/physical id handling
1900
  for disk in instance.disks:
1901
    cfg.SetDiskID(disk, instance.primary_node)
1902

    
1903
  return disks_ok, device_info
1904

    
1905

    
1906
def _StartInstanceDisks(cfg, instance, force):
1907
  """Start the disks of an instance.
1908

1909
  """
1910
  disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
1911
                                           ignore_secondaries=force)
1912
  if not disks_ok:
1913
    _ShutdownInstanceDisks(instance, cfg)
1914
    if force is not None and not force:
1915
      logger.Error("If the message above refers to a secondary node,"
1916
                   " you can retry the operation using '--force'.")
1917
    raise errors.OpExecError("Disk consistency error")
1918

    
1919

    
1920
class LUDeactivateInstanceDisks(NoHooksLU):
1921
  """Shutdown an instance's disks.
1922

1923
  """
1924
  _OP_REQP = ["instance_name"]
1925

    
1926
  def CheckPrereq(self):
1927
    """Check prerequisites.
1928

1929
    This checks that the instance is in the cluster.
1930

1931
    """
1932
    instance = self.cfg.GetInstanceInfo(
1933
      self.cfg.ExpandInstanceName(self.op.instance_name))
1934
    if instance is None:
1935
      raise errors.OpPrereqError("Instance '%s' not known" %
1936
                                 self.op.instance_name)
1937
    self.instance = instance
1938

    
1939
  def Exec(self, feedback_fn):
1940
    """Deactivate the disks
1941

1942
    """
1943
    instance = self.instance
1944
    ins_l = rpc.call_instance_list([instance.primary_node])
1945
    ins_l = ins_l[instance.primary_node]
1946
    if not type(ins_l) is list:
1947
      raise errors.OpExecError("Can't contact node '%s'" %
1948
                               instance.primary_node)
1949

    
1950
    if self.instance.name in ins_l:
1951
      raise errors.OpExecError("Instance is running, can't shutdown"
1952
                               " block devices.")
1953

    
1954
    _ShutdownInstanceDisks(instance, self.cfg)
1955

    
1956

    
1957
def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
1958
  """Shutdown block devices of an instance.
1959

1960
  This does the shutdown on all nodes of the instance.
1961

1962
  If the ignore_primary is false, errors on the primary node are
1963
  ignored.
1964

1965
  """
1966
  result = True
1967
  for disk in instance.disks:
1968
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
1969
      cfg.SetDiskID(top_disk, node)
1970
      if not rpc.call_blockdev_shutdown(node, top_disk):
1971
        logger.Error("could not shutdown block device %s on node %s" %
1972
                     (disk.iv_name, node))
1973
        if not ignore_primary or node != instance.primary_node:
1974
          result = False
1975
  return result
1976

    
1977

    
1978
def _CheckNodeFreeMemory(cfg, node, reason, requested):
1979
  """Checks if a node has enough free memory.
1980

1981
  This function check if a given node has the needed amount of free
1982
  memory. In case the node has less memory or we cannot get the
1983
  information from the node, this function raise an OpPrereqError
1984
  exception.
1985

1986
  Args:
1987
    - cfg: a ConfigWriter instance
1988
    - node: the node name
1989
    - reason: string to use in the error message
1990
    - requested: the amount of memory in MiB
1991

1992
  """
1993
  nodeinfo = rpc.call_node_info([node], cfg.GetVGName())
1994
  if not nodeinfo or not isinstance(nodeinfo, dict):
1995
    raise errors.OpPrereqError("Could not contact node %s for resource"
1996
                             " information" % (node,))
1997

    
1998
  free_mem = nodeinfo[node].get('memory_free')
1999
  if not isinstance(free_mem, int):
2000
    raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2001
                             " was '%s'" % (node, free_mem))
2002
  if requested > free_mem:
2003
    raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2004
                             " needed %s MiB, available %s MiB" %
2005
                             (node, reason, requested, free_mem))
2006

    
2007

    
2008
class LUStartupInstance(LogicalUnit):
2009
  """Starts an instance.
2010

2011
  """
2012
  HPATH = "instance-start"
2013
  HTYPE = constants.HTYPE_INSTANCE
2014
  _OP_REQP = ["instance_name", "force"]
2015
  REQ_BGL = False
2016

    
2017
  def ExpandNames(self):
2018
    self._ExpandAndLockInstance()
2019
    self.needed_locks[locking.LEVEL_NODE] = []
2020
    self.recalculate_locks[locking.LEVEL_NODE] = 'replace'
2021

    
2022
  def DeclareLocks(self, level):
2023
    if level == locking.LEVEL_NODE:
2024
      self._LockInstancesNodes()
2025

    
2026
  def BuildHooksEnv(self):
2027
    """Build hooks env.
2028

2029
    This runs on master, primary and secondary nodes of the instance.
2030

2031
    """
2032
    env = {
2033
      "FORCE": self.op.force,
2034
      }
2035
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2036
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2037
          list(self.instance.secondary_nodes))
2038
    return env, nl, nl
2039

    
2040
  def CheckPrereq(self):
2041
    """Check prerequisites.
2042

2043
    This checks that the instance is in the cluster.
2044

2045
    """
2046
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2047
    assert self.instance is not None, \
2048
      "Cannot retrieve locked instance %s" % self.op.instance_name
2049

    
2050
    # check bridges existance
2051
    _CheckInstanceBridgesExist(instance)
2052

    
2053
    _CheckNodeFreeMemory(self.cfg, instance.primary_node,
2054
                         "starting instance %s" % instance.name,
2055
                         instance.memory)
2056

    
2057
  def Exec(self, feedback_fn):
2058
    """Start the instance.
2059

2060
    """
2061
    instance = self.instance
2062
    force = self.op.force
2063
    extra_args = getattr(self.op, "extra_args", "")
2064

    
2065
    self.cfg.MarkInstanceUp(instance.name)
2066

    
2067
    node_current = instance.primary_node
2068

    
2069
    _StartInstanceDisks(self.cfg, instance, force)
2070

    
2071
    if not rpc.call_instance_start(node_current, instance, extra_args):
2072
      _ShutdownInstanceDisks(instance, self.cfg)
2073
      raise errors.OpExecError("Could not start instance")
2074

    
2075

    
2076
class LURebootInstance(LogicalUnit):
2077
  """Reboot an instance.
2078

2079
  """
2080
  HPATH = "instance-reboot"
2081
  HTYPE = constants.HTYPE_INSTANCE
2082
  _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2083
  REQ_BGL = False
2084

    
2085
  def ExpandNames(self):
2086
    if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2087
                                   constants.INSTANCE_REBOOT_HARD,
2088
                                   constants.INSTANCE_REBOOT_FULL]:
2089
      raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2090
                                  (constants.INSTANCE_REBOOT_SOFT,
2091
                                   constants.INSTANCE_REBOOT_HARD,
2092
                                   constants.INSTANCE_REBOOT_FULL))
2093
    self._ExpandAndLockInstance()
2094
    self.needed_locks[locking.LEVEL_NODE] = []
2095
    self.recalculate_locks[locking.LEVEL_NODE] = 'replace'
2096

    
2097
  def DeclareLocks(self, level):
2098
    if level == locking.LEVEL_NODE:
2099
      # FIXME: lock only primary on (not constants.INSTANCE_REBOOT_FULL)
2100
      self._LockInstancesNodes()
2101

    
2102
  def BuildHooksEnv(self):
2103
    """Build hooks env.
2104

2105
    This runs on master, primary and secondary nodes of the instance.
2106

2107
    """
2108
    env = {
2109
      "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2110
      }
2111
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2112
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2113
          list(self.instance.secondary_nodes))
2114
    return env, nl, nl
2115

    
2116
  def CheckPrereq(self):
2117
    """Check prerequisites.
2118

2119
    This checks that the instance is in the cluster.
2120

2121
    """
2122
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2123
    assert self.instance is not None, \
2124
      "Cannot retrieve locked instance %s" % self.op.instance_name
2125

    
2126
    # check bridges existance
2127
    _CheckInstanceBridgesExist(instance)
2128

    
2129
  def Exec(self, feedback_fn):
2130
    """Reboot the instance.
2131

2132
    """
2133
    instance = self.instance
2134
    ignore_secondaries = self.op.ignore_secondaries
2135
    reboot_type = self.op.reboot_type
2136
    extra_args = getattr(self.op, "extra_args", "")
2137

    
2138
    node_current = instance.primary_node
2139

    
2140
    if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2141
                       constants.INSTANCE_REBOOT_HARD]:
2142
      if not rpc.call_instance_reboot(node_current, instance,
2143
                                      reboot_type, extra_args):
2144
        raise errors.OpExecError("Could not reboot instance")
2145
    else:
2146
      if not rpc.call_instance_shutdown(node_current, instance):
2147
        raise errors.OpExecError("could not shutdown instance for full reboot")
2148
      _ShutdownInstanceDisks(instance, self.cfg)
2149
      _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
2150
      if not rpc.call_instance_start(node_current, instance, extra_args):
2151
        _ShutdownInstanceDisks(instance, self.cfg)
2152
        raise errors.OpExecError("Could not start instance for full reboot")
2153

    
2154
    self.cfg.MarkInstanceUp(instance.name)
2155

    
2156

    
2157
class LUShutdownInstance(LogicalUnit):
2158
  """Shutdown an instance.
2159

2160
  """
2161
  HPATH = "instance-stop"
2162
  HTYPE = constants.HTYPE_INSTANCE
2163
  _OP_REQP = ["instance_name"]
2164
  REQ_BGL = False
2165

    
2166
  def ExpandNames(self):
2167
    self._ExpandAndLockInstance()
2168
    self.needed_locks[locking.LEVEL_NODE] = []
2169
    self.recalculate_locks[locking.LEVEL_NODE] = 'replace'
2170

    
2171
  def DeclareLocks(self, level):
2172
    if level == locking.LEVEL_NODE:
2173
      self._LockInstancesNodes()
2174

    
2175
  def BuildHooksEnv(self):
2176
    """Build hooks env.
2177

2178
    This runs on master, primary and secondary nodes of the instance.
2179

2180
    """
2181
    env = _BuildInstanceHookEnvByObject(self.instance)
2182
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2183
          list(self.instance.secondary_nodes))
2184
    return env, nl, nl
2185

    
2186
  def CheckPrereq(self):
2187
    """Check prerequisites.
2188

2189
    This checks that the instance is in the cluster.
2190

2191
    """
2192
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2193
    assert self.instance is not None, \
2194
      "Cannot retrieve locked instance %s" % self.op.instance_name
2195

    
2196
  def Exec(self, feedback_fn):
2197
    """Shutdown the instance.
2198

2199
    """
2200
    instance = self.instance
2201
    node_current = instance.primary_node
2202
    self.cfg.MarkInstanceDown(instance.name)
2203
    if not rpc.call_instance_shutdown(node_current, instance):
2204
      logger.Error("could not shutdown instance")
2205

    
2206
    _ShutdownInstanceDisks(instance, self.cfg)
2207

    
2208

    
2209
class LUReinstallInstance(LogicalUnit):
2210
  """Reinstall an instance.
2211

2212
  """
2213
  HPATH = "instance-reinstall"
2214
  HTYPE = constants.HTYPE_INSTANCE
2215
  _OP_REQP = ["instance_name"]
2216
  REQ_BGL = False
2217

    
2218
  def ExpandNames(self):
2219
    self._ExpandAndLockInstance()
2220
    self.needed_locks[locking.LEVEL_NODE] = []
2221
    self.recalculate_locks[locking.LEVEL_NODE] = 'replace'
2222

    
2223
  def DeclareLocks(self, level):
2224
    if level == locking.LEVEL_NODE:
2225
      self._LockInstancesNodes()
2226

    
2227
  def BuildHooksEnv(self):
2228
    """Build hooks env.
2229

2230
    This runs on master, primary and secondary nodes of the instance.
2231

2232
    """
2233
    env = _BuildInstanceHookEnvByObject(self.instance)
2234
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2235
          list(self.instance.secondary_nodes))
2236
    return env, nl, nl
2237

    
2238
  def CheckPrereq(self):
2239
    """Check prerequisites.
2240

2241
    This checks that the instance is in the cluster and is not running.
2242

2243
    """
2244
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2245
    assert instance is not None, \
2246
      "Cannot retrieve locked instance %s" % self.op.instance_name
2247

    
2248
    if instance.disk_template == constants.DT_DISKLESS:
2249
      raise errors.OpPrereqError("Instance '%s' has no disks" %
2250
                                 self.op.instance_name)
2251
    if instance.status != "down":
2252
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2253
                                 self.op.instance_name)
2254
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2255
    if remote_info:
2256
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2257
                                 (self.op.instance_name,
2258
                                  instance.primary_node))
2259

    
2260
    self.op.os_type = getattr(self.op, "os_type", None)
2261
    if self.op.os_type is not None:
2262
      # OS verification
2263
      pnode = self.cfg.GetNodeInfo(
2264
        self.cfg.ExpandNodeName(instance.primary_node))
2265
      if pnode is None:
2266
        raise errors.OpPrereqError("Primary node '%s' is unknown" %
2267
                                   self.op.pnode)
2268
      os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2269
      if not os_obj:
2270
        raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2271
                                   " primary node"  % self.op.os_type)
2272

    
2273
    self.instance = instance
2274

    
2275
  def Exec(self, feedback_fn):
2276
    """Reinstall the instance.
2277

2278
    """
2279
    inst = self.instance
2280

    
2281
    if self.op.os_type is not None:
2282
      feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2283
      inst.os = self.op.os_type
2284
      self.cfg.AddInstance(inst)
2285

    
2286
    _StartInstanceDisks(self.cfg, inst, None)
2287
    try:
2288
      feedback_fn("Running the instance OS create scripts...")
2289
      if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2290
        raise errors.OpExecError("Could not install OS for instance %s"
2291
                                 " on node %s" %
2292
                                 (inst.name, inst.primary_node))
2293
    finally:
2294
      _ShutdownInstanceDisks(inst, self.cfg)
2295

    
2296

    
2297
class LURenameInstance(LogicalUnit):
2298
  """Rename an instance.
2299

2300
  """
2301
  HPATH = "instance-rename"
2302
  HTYPE = constants.HTYPE_INSTANCE
2303
  _OP_REQP = ["instance_name", "new_name"]
2304

    
2305
  def BuildHooksEnv(self):
2306
    """Build hooks env.
2307

2308
    This runs on master, primary and secondary nodes of the instance.
2309

2310
    """
2311
    env = _BuildInstanceHookEnvByObject(self.instance)
2312
    env["INSTANCE_NEW_NAME"] = self.op.new_name
2313
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2314
          list(self.instance.secondary_nodes))
2315
    return env, nl, nl
2316

    
2317
  def CheckPrereq(self):
2318
    """Check prerequisites.
2319

2320
    This checks that the instance is in the cluster and is not running.
2321

2322
    """
2323
    instance = self.cfg.GetInstanceInfo(
2324
      self.cfg.ExpandInstanceName(self.op.instance_name))
2325
    if instance is None:
2326
      raise errors.OpPrereqError("Instance '%s' not known" %
2327
                                 self.op.instance_name)
2328
    if instance.status != "down":
2329
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2330
                                 self.op.instance_name)
2331
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2332
    if remote_info:
2333
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2334
                                 (self.op.instance_name,
2335
                                  instance.primary_node))
2336
    self.instance = instance
2337

    
2338
    # new name verification
2339
    name_info = utils.HostInfo(self.op.new_name)
2340

    
2341
    self.op.new_name = new_name = name_info.name
2342
    instance_list = self.cfg.GetInstanceList()
2343
    if new_name in instance_list:
2344
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2345
                                 new_name)
2346

    
2347
    if not getattr(self.op, "ignore_ip", False):
2348
      if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
2349
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
2350
                                   (name_info.ip, new_name))
2351

    
2352

    
2353
  def Exec(self, feedback_fn):
2354
    """Reinstall the instance.
2355

2356
    """
2357
    inst = self.instance
2358
    old_name = inst.name
2359

    
2360
    if inst.disk_template == constants.DT_FILE:
2361
      old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2362

    
2363
    self.cfg.RenameInstance(inst.name, self.op.new_name)
2364
    # Change the instance lock. This is definitely safe while we hold the BGL
2365
    self.context.glm.remove(locking.LEVEL_INSTANCE, inst.name)
2366
    self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
2367

    
2368
    # re-read the instance from the configuration after rename
2369
    inst = self.cfg.GetInstanceInfo(self.op.new_name)
2370

    
2371
    if inst.disk_template == constants.DT_FILE:
2372
      new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2373
      result = rpc.call_file_storage_dir_rename(inst.primary_node,
2374
                                                old_file_storage_dir,
2375
                                                new_file_storage_dir)
2376

    
2377
      if not result:
2378
        raise errors.OpExecError("Could not connect to node '%s' to rename"
2379
                                 " directory '%s' to '%s' (but the instance"
2380
                                 " has been renamed in Ganeti)" % (
2381
                                 inst.primary_node, old_file_storage_dir,
2382
                                 new_file_storage_dir))
2383

    
2384
      if not result[0]:
2385
        raise errors.OpExecError("Could not rename directory '%s' to '%s'"
2386
                                 " (but the instance has been renamed in"
2387
                                 " Ganeti)" % (old_file_storage_dir,
2388
                                               new_file_storage_dir))
2389

    
2390
    _StartInstanceDisks(self.cfg, inst, None)
2391
    try:
2392
      if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2393
                                          "sda", "sdb"):
2394
        msg = ("Could run OS rename script for instance %s on node %s (but the"
2395
               " instance has been renamed in Ganeti)" %
2396
               (inst.name, inst.primary_node))
2397
        logger.Error(msg)
2398
    finally:
2399
      _ShutdownInstanceDisks(inst, self.cfg)
2400

    
2401

    
2402
class LURemoveInstance(LogicalUnit):
2403
  """Remove an instance.
2404

2405
  """
2406
  HPATH = "instance-remove"
2407
  HTYPE = constants.HTYPE_INSTANCE
2408
  _OP_REQP = ["instance_name", "ignore_failures"]
2409

    
2410
  def BuildHooksEnv(self):
2411
    """Build hooks env.
2412

2413
    This runs on master, primary and secondary nodes of the instance.
2414

2415
    """
2416
    env = _BuildInstanceHookEnvByObject(self.instance)
2417
    nl = [self.sstore.GetMasterNode()]
2418
    return env, nl, nl
2419

    
2420
  def CheckPrereq(self):
2421
    """Check prerequisites.
2422

2423
    This checks that the instance is in the cluster.
2424

2425
    """
2426
    instance = self.cfg.GetInstanceInfo(
2427
      self.cfg.ExpandInstanceName(self.op.instance_name))
2428
    if instance is None:
2429
      raise errors.OpPrereqError("Instance '%s' not known" %
2430
                                 self.op.instance_name)
2431
    self.instance = instance
2432

    
2433
  def Exec(self, feedback_fn):
2434
    """Remove the instance.
2435

2436
    """
2437
    instance = self.instance
2438
    logger.Info("shutting down instance %s on node %s" %
2439
                (instance.name, instance.primary_node))
2440

    
2441
    if not rpc.call_instance_shutdown(instance.primary_node, instance):
2442
      if self.op.ignore_failures:
2443
        feedback_fn("Warning: can't shutdown instance")
2444
      else:
2445
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2446
                                 (instance.name, instance.primary_node))
2447

    
2448
    logger.Info("removing block devices for instance %s" % instance.name)
2449

    
2450
    if not _RemoveDisks(instance, self.cfg):
2451
      if self.op.ignore_failures:
2452
        feedback_fn("Warning: can't remove instance's disks")
2453
      else:
2454
        raise errors.OpExecError("Can't remove instance's disks")
2455

    
2456
    logger.Info("removing instance %s out of cluster config" % instance.name)
2457

    
2458
    self.cfg.RemoveInstance(instance.name)
2459
    # Remove the new instance from the Ganeti Lock Manager
2460
    self.context.glm.remove(locking.LEVEL_INSTANCE, instance.name)
2461

    
2462

    
2463
class LUQueryInstances(NoHooksLU):
2464
  """Logical unit for querying instances.
2465

2466
  """
2467
  _OP_REQP = ["output_fields", "names"]
2468
  REQ_BGL = False
2469

    
2470
  def ExpandNames(self):
2471
    self.dynamic_fields = frozenset(["oper_state", "oper_ram", "status"])
2472
    _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2473
                               "admin_state", "admin_ram",
2474
                               "disk_template", "ip", "mac", "bridge",
2475
                               "sda_size", "sdb_size", "vcpus", "tags"],
2476
                       dynamic=self.dynamic_fields,
2477
                       selected=self.op.output_fields)
2478

    
2479
    self.needed_locks = {}
2480
    self.share_locks[locking.LEVEL_INSTANCE] = 1
2481
    self.share_locks[locking.LEVEL_NODE] = 1
2482

    
2483
    # TODO: we could lock instances (and nodes) only if the user asked for
2484
    # dynamic fields. For that we need atomic ways to get info for a group of
2485
    # instances from the config, though.
2486
    if not self.op.names:
2487
      self.needed_locks[locking.LEVEL_INSTANCE] = None # Acquire all
2488
    else:
2489
      self.needed_locks[locking.LEVEL_INSTANCE] = \
2490
        _GetWantedInstances(self, self.op.names)
2491

    
2492
    self.needed_locks[locking.LEVEL_NODE] = []
2493
    self.recalculate_locks[locking.LEVEL_NODE] = 'replace'
2494

    
2495
  def DeclareLocks(self, level):
2496
    # TODO: locking of nodes could be avoided when not querying them
2497
    if level == locking.LEVEL_NODE:
2498
      self._LockInstancesNodes()
2499

    
2500
  def CheckPrereq(self):
2501
    """Check prerequisites.
2502

2503
    """
2504
    # This of course is valid only if we locked the instances
2505
    self.wanted = self.acquired_locks[locking.LEVEL_INSTANCE]
2506

    
2507
  def Exec(self, feedback_fn):
2508
    """Computes the list of nodes and their attributes.
2509

2510
    """
2511
    instance_names = self.wanted
2512
    instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2513
                     in instance_names]
2514

    
2515
    # begin data gathering
2516

    
2517
    nodes = frozenset([inst.primary_node for inst in instance_list])
2518

    
2519
    bad_nodes = []
2520
    if self.dynamic_fields.intersection(self.op.output_fields):
2521
      live_data = {}
2522
      node_data = rpc.call_all_instances_info(nodes)
2523
      for name in nodes:
2524
        result = node_data[name]
2525
        if result:
2526
          live_data.update(result)
2527
        elif result == False:
2528
          bad_nodes.append(name)
2529
        # else no instance is alive
2530
    else:
2531
      live_data = dict([(name, {}) for name in instance_names])
2532

    
2533
    # end data gathering
2534

    
2535
    output = []
2536
    for instance in instance_list:
2537
      iout = []
2538
      for field in self.op.output_fields:
2539
        if field == "name":
2540
          val = instance.name
2541
        elif field == "os":
2542
          val = instance.os
2543
        elif field == "pnode":
2544
          val = instance.primary_node
2545
        elif field == "snodes":
2546
          val = list(instance.secondary_nodes)
2547
        elif field == "admin_state":
2548
          val = (instance.status != "down")
2549
        elif field == "oper_state":
2550
          if instance.primary_node in bad_nodes:
2551
            val = None
2552
          else:
2553
            val = bool(live_data.get(instance.name))
2554
        elif field == "status":
2555
          if instance.primary_node in bad_nodes:
2556
            val = "ERROR_nodedown"
2557
          else:
2558
            running = bool(live_data.get(instance.name))
2559
            if running:
2560
              if instance.status != "down":
2561
                val = "running"
2562
              else:
2563
                val = "ERROR_up"
2564
            else:
2565
              if instance.status != "down":
2566
                val = "ERROR_down"
2567
              else:
2568
                val = "ADMIN_down"
2569
        elif field == "admin_ram":
2570
          val = instance.memory
2571
        elif field == "oper_ram":
2572
          if instance.primary_node in bad_nodes:
2573
            val = None
2574
          elif instance.name in live_data:
2575
            val = live_data[instance.name].get("memory", "?")
2576
          else:
2577
            val = "-"
2578
        elif field == "disk_template":
2579
          val = instance.disk_template
2580
        elif field == "ip":
2581
          val = instance.nics[0].ip
2582
        elif field == "bridge":
2583
          val = instance.nics[0].bridge
2584
        elif field == "mac":
2585
          val = instance.nics[0].mac
2586
        elif field == "sda_size" or field == "sdb_size":
2587
          disk = instance.FindDisk(field[:3])
2588
          if disk is None:
2589
            val = None
2590
          else:
2591
            val = disk.size
2592
        elif field == "vcpus":
2593
          val = instance.vcpus
2594
        elif field == "tags":
2595
          val = list(instance.GetTags())
2596
        else:
2597
          raise errors.ParameterError(field)
2598
        iout.append(val)
2599
      output.append(iout)
2600

    
2601
    return output
2602

    
2603

    
2604
class LUFailoverInstance(LogicalUnit):
2605
  """Failover an instance.
2606

2607
  """
2608
  HPATH = "instance-failover"
2609
  HTYPE = constants.HTYPE_INSTANCE
2610
  _OP_REQP = ["instance_name", "ignore_consistency"]
2611
  REQ_BGL = False
2612

    
2613
  def ExpandNames(self):
2614
    self._ExpandAndLockInstance()
2615
    self.needed_locks[locking.LEVEL_NODE] = []
2616
    self.recalculate_locks[locking.LEVEL_NODE] = 'replace'
2617

    
2618
  def DeclareLocks(self, level):
2619
    if level == locking.LEVEL_NODE:
2620
      self._LockInstancesNodes()
2621

    
2622
  def BuildHooksEnv(self):
2623
    """Build hooks env.
2624

2625
    This runs on master, primary and secondary nodes of the instance.
2626

2627
    """
2628
    env = {
2629
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2630
      }
2631
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2632
    nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2633
    return env, nl, nl
2634

    
2635
  def CheckPrereq(self):
2636
    """Check prerequisites.
2637

2638
    This checks that the instance is in the cluster.
2639

2640
    """
2641
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2642
    assert self.instance is not None, \
2643
      "Cannot retrieve locked instance %s" % self.op.instance_name
2644

    
2645
    if instance.disk_template not in constants.DTS_NET_MIRROR:
2646
      raise errors.OpPrereqError("Instance's disk layout is not"
2647
                                 " network mirrored, cannot failover.")
2648

    
2649
    secondary_nodes = instance.secondary_nodes
2650
    if not secondary_nodes:
2651
      raise errors.ProgrammerError("no secondary node but using "
2652
                                   "a mirrored disk template")
2653

    
2654
    target_node = secondary_nodes[0]
2655
    # check memory requirements on the secondary node
2656
    _CheckNodeFreeMemory(self.cfg, target_node, "failing over instance %s" %
2657
                         instance.name, instance.memory)
2658

    
2659
    # check bridge existance
2660
    brlist = [nic.bridge for nic in instance.nics]
2661
    if not rpc.call_bridges_exist(target_node, brlist):
2662
      raise errors.OpPrereqError("One or more target bridges %s does not"
2663
                                 " exist on destination node '%s'" %
2664
                                 (brlist, target_node))
2665

    
2666
  def Exec(self, feedback_fn):
2667
    """Failover an instance.
2668

2669
    The failover is done by shutting it down on its present node and
2670
    starting it on the secondary.
2671

2672
    """
2673
    instance = self.instance
2674

    
2675
    source_node = instance.primary_node
2676
    target_node = instance.secondary_nodes[0]
2677

    
2678
    feedback_fn("* checking disk consistency between source and target")
2679
    for dev in instance.disks:
2680
      # for drbd, these are drbd over lvm
2681
      if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2682
        if instance.status == "up" and not self.op.ignore_consistency:
2683
          raise errors.OpExecError("Disk %s is degraded on target node,"
2684
                                   " aborting failover." % dev.iv_name)
2685

    
2686
    feedback_fn("* shutting down instance on source node")
2687
    logger.Info("Shutting down instance %s on node %s" %
2688
                (instance.name, source_node))
2689

    
2690
    if not rpc.call_instance_shutdown(source_node, instance):
2691
      if self.op.ignore_consistency:
2692
        logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2693
                     " anyway. Please make sure node %s is down"  %
2694
                     (instance.name, source_node, source_node))
2695
      else:
2696
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2697
                                 (instance.name, source_node))
2698

    
2699
    feedback_fn("* deactivating the instance's disks on source node")
2700
    if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2701
      raise errors.OpExecError("Can't shut down the instance's disks.")
2702

    
2703
    instance.primary_node = target_node
2704
    # distribute new instance config to the other nodes
2705
    self.cfg.Update(instance)
2706

    
2707
    # Only start the instance if it's marked as up
2708
    if instance.status == "up":
2709
      feedback_fn("* activating the instance's disks on target node")
2710
      logger.Info("Starting instance %s on node %s" %
2711
                  (instance.name, target_node))
2712

    
2713
      disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2714
                                               ignore_secondaries=True)
2715
      if not disks_ok:
2716
        _ShutdownInstanceDisks(instance, self.cfg)
2717
        raise errors.OpExecError("Can't activate the instance's disks")
2718

    
2719
      feedback_fn("* starting the instance on the target node")
2720
      if not rpc.call_instance_start(target_node, instance, None):
2721
        _ShutdownInstanceDisks(instance, self.cfg)
2722
        raise errors.OpExecError("Could not start instance %s on node %s." %
2723
                                 (instance.name, target_node))
2724

    
2725

    
2726
def _CreateBlockDevOnPrimary(cfg, node, instance, device, info):
2727
  """Create a tree of block devices on the primary node.
2728

2729
  This always creates all devices.
2730

2731
  """
2732
  if device.children:
2733
    for child in device.children:
2734
      if not _CreateBlockDevOnPrimary(cfg, node, instance, child, info):
2735
        return False
2736

    
2737
  cfg.SetDiskID(device, node)
2738
  new_id = rpc.call_blockdev_create(node, device, device.size,
2739
                                    instance.name, True, info)
2740
  if not new_id:
2741
    return False
2742
  if device.physical_id is None:
2743
    device.physical_id = new_id
2744
  return True
2745

    
2746

    
2747
def _CreateBlockDevOnSecondary(cfg, node, instance, device, force, info):
2748
  """Create a tree of block devices on a secondary node.
2749

2750
  If this device type has to be created on secondaries, create it and
2751
  all its children.
2752

2753
  If not, just recurse to children keeping the same 'force' value.
2754

2755
  """
2756
  if device.CreateOnSecondary():
2757
    force = True
2758
  if device.children:
2759
    for child in device.children:
2760
      if not _CreateBlockDevOnSecondary(cfg, node, instance,
2761
                                        child, force, info):
2762
        return False
2763

    
2764
  if not force:
2765
    return True
2766
  cfg.SetDiskID(device, node)
2767
  new_id = rpc.call_blockdev_create(node, device, device.size,
2768
                                    instance.name, False, info)
2769
  if not new_id:
2770
    return False
2771
  if device.physical_id is None:
2772
    device.physical_id = new_id
2773
  return True
2774

    
2775

    
2776
def _GenerateUniqueNames(cfg, exts):
2777
  """Generate a suitable LV name.
2778

2779
  This will generate a logical volume name for the given instance.
2780

2781
  """
2782
  results = []
2783
  for val in exts:
2784
    new_id = cfg.GenerateUniqueID()
2785
    results.append("%s%s" % (new_id, val))
2786
  return results
2787

    
2788

    
2789
def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name):
2790
  """Generate a drbd8 device complete with its children.
2791

2792
  """
2793
  port = cfg.AllocatePort()
2794
  vgname = cfg.GetVGName()
2795
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2796
                          logical_id=(vgname, names[0]))
2797
  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2798
                          logical_id=(vgname, names[1]))
2799
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
2800
                          logical_id = (primary, secondary, port),
2801
                          children = [dev_data, dev_meta],
2802
                          iv_name=iv_name)
2803
  return drbd_dev
2804

    
2805

    
2806
def _GenerateDiskTemplate(cfg, template_name,
2807
                          instance_name, primary_node,
2808
                          secondary_nodes, disk_sz, swap_sz,
2809
                          file_storage_dir, file_driver):
2810
  """Generate the entire disk layout for a given template type.
2811

2812
  """
2813
  #TODO: compute space requirements
2814

    
2815
  vgname = cfg.GetVGName()
2816
  if template_name == constants.DT_DISKLESS:
2817
    disks = []
2818
  elif template_name == constants.DT_PLAIN:
2819
    if len(secondary_nodes) != 0:
2820
      raise errors.ProgrammerError("Wrong template configuration")
2821

    
2822
    names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
2823
    sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2824
                           logical_id=(vgname, names[0]),
2825
                           iv_name = "sda")
2826
    sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2827
                           logical_id=(vgname, names[1]),
2828
                           iv_name = "sdb")
2829
    disks = [sda_dev, sdb_dev]
2830
  elif template_name == constants.DT_DRBD8:
2831
    if len(secondary_nodes) != 1:
2832
      raise errors.ProgrammerError("Wrong template configuration")
2833
    remote_node = secondary_nodes[0]
2834
    names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2835
                                       ".sdb_data", ".sdb_meta"])
2836
    drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2837
                                         disk_sz, names[0:2], "sda")
2838
    drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2839
                                         swap_sz, names[2:4], "sdb")
2840
    disks = [drbd_sda_dev, drbd_sdb_dev]
2841
  elif template_name == constants.DT_FILE:
2842
    if len(secondary_nodes) != 0:
2843
      raise errors.ProgrammerError("Wrong template configuration")
2844

    
2845
    file_sda_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk_sz,
2846
                                iv_name="sda", logical_id=(file_driver,
2847
                                "%s/sda" % file_storage_dir))
2848
    file_sdb_dev = objects.Disk(dev_type=constants.LD_FILE, size=swap_sz,
2849
                                iv_name="sdb", logical_id=(file_driver,
2850
                                "%s/sdb" % file_storage_dir))
2851
    disks = [file_sda_dev, file_sdb_dev]
2852
  else:
2853
    raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
2854
  return disks
2855

    
2856

    
2857
def _GetInstanceInfoText(instance):
2858
  """Compute that text that should be added to the disk's metadata.
2859

2860
  """
2861
  return "originstname+%s" % instance.name
2862

    
2863

    
2864
def _CreateDisks(cfg, instance):
2865
  """Create all disks for an instance.
2866

2867
  This abstracts away some work from AddInstance.
2868

2869
  Args:
2870
    instance: the instance object
2871

2872
  Returns:
2873
    True or False showing the success of the creation process
2874

2875
  """
2876
  info = _GetInstanceInfoText(instance)
2877

    
2878
  if instance.disk_template == constants.DT_FILE:
2879
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
2880
    result = rpc.call_file_storage_dir_create(instance.primary_node,
2881
                                              file_storage_dir)
2882

    
2883
    if not result:
2884
      logger.Error("Could not connect to node '%s'" % instance.primary_node)
2885
      return False
2886

    
2887
    if not result[0]:
2888
      logger.Error("failed to create directory '%s'" % file_storage_dir)
2889
      return False
2890

    
2891
  for device in instance.disks:
2892
    logger.Info("creating volume %s for instance %s" %
2893
                (device.iv_name, instance.name))
2894
    #HARDCODE
2895
    for secondary_node in instance.secondary_nodes:
2896
      if not _CreateBlockDevOnSecondary(cfg, secondary_node, instance,
2897
                                        device, False, info):
2898
        logger.Error("failed to create volume %s (%s) on secondary node %s!" %
2899
                     (device.iv_name, device, secondary_node))
2900
        return False
2901
    #HARDCODE
2902
    if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
2903
                                    instance, device, info):
2904
      logger.Error("failed to create volume %s on primary!" %
2905
                   device.iv_name)
2906
      return False
2907

    
2908
  return True
2909

    
2910

    
2911
def _RemoveDisks(instance, cfg):
2912
  """Remove all disks for an instance.
2913

2914
  This abstracts away some work from `AddInstance()` and
2915
  `RemoveInstance()`. Note that in case some of the devices couldn't
2916
  be removed, the removal will continue with the other ones (compare
2917
  with `_CreateDisks()`).
2918

2919
  Args:
2920
    instance: the instance object
2921

2922
  Returns:
2923
    True or False showing the success of the removal proces
2924

2925
  """
2926
  logger.Info("removing block devices for instance %s" % instance.name)
2927

    
2928
  result = True
2929
  for device in instance.disks:
2930
    for node, disk in device.ComputeNodeTree(instance.primary_node):
2931
      cfg.SetDiskID(disk, node)
2932
      if not rpc.call_blockdev_remove(node, disk):
2933
        logger.Error("could not remove block device %s on node %s,"
2934
                     " continuing anyway" %
2935
                     (device.iv_name, node))
2936
        result = False
2937

    
2938
  if instance.disk_template == constants.DT_FILE:
2939
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
2940
    if not rpc.call_file_storage_dir_remove(instance.primary_node,
2941
                                            file_storage_dir):
2942
      logger.Error("could not remove directory '%s'" % file_storage_dir)
2943
      result = False
2944

    
2945
  return result
2946

    
2947

    
2948
def _ComputeDiskSize(disk_template, disk_size, swap_size):
2949
  """Compute disk size requirements in the volume group
2950

2951
  This is currently hard-coded for the two-drive layout.
2952

2953
  """
2954
  # Required free disk space as a function of disk and swap space
2955
  req_size_dict = {
2956
    constants.DT_DISKLESS: None,
2957
    constants.DT_PLAIN: disk_size + swap_size,
2958
    # 256 MB are added for drbd metadata, 128MB for each drbd device
2959
    constants.DT_DRBD8: disk_size + swap_size + 256,
2960
    constants.DT_FILE: None,
2961
  }
2962

    
2963
  if disk_template not in req_size_dict:
2964
    raise errors.ProgrammerError("Disk template '%s' size requirement"
2965
                                 " is unknown" %  disk_template)
2966

    
2967
  return req_size_dict[disk_template]
2968

    
2969

    
2970
class LUCreateInstance(LogicalUnit):
2971
  """Create an instance.
2972

2973
  """
2974
  HPATH = "instance-add"
2975
  HTYPE = constants.HTYPE_INSTANCE
2976
  _OP_REQP = ["instance_name", "mem_size", "disk_size",
2977
              "disk_template", "swap_size", "mode", "start", "vcpus",
2978
              "wait_for_sync", "ip_check", "mac"]
2979

    
2980
  def _RunAllocator(self):
2981
    """Run the allocator based on input opcode.
2982

2983
    """
2984
    disks = [{"size": self.op.disk_size, "mode": "w"},
2985
             {"size": self.op.swap_size, "mode": "w"}]
2986
    nics = [{"mac": self.op.mac, "ip": getattr(self.op, "ip", None),
2987
             "bridge": self.op.bridge}]
2988
    ial = IAllocator(self.cfg, self.sstore,
2989
                     mode=constants.IALLOCATOR_MODE_ALLOC,
2990
                     name=self.op.instance_name,
2991
                     disk_template=self.op.disk_template,
2992
                     tags=[],
2993
                     os=self.op.os_type,
2994
                     vcpus=self.op.vcpus,
2995
                     mem_size=self.op.mem_size,
2996
                     disks=disks,
2997
                     nics=nics,
2998
                     )
2999

    
3000
    ial.Run(self.op.iallocator)
3001

    
3002
    if not ial.success:
3003
      raise errors.OpPrereqError("Can't compute nodes using"
3004
                                 " iallocator '%s': %s" % (self.op.iallocator,
3005
                                                           ial.info))
3006
    if len(ial.nodes) != ial.required_nodes:
3007
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3008
                                 " of nodes (%s), required %s" %
3009
                                 (len(ial.nodes), ial.required_nodes))
3010
    self.op.pnode = ial.nodes[0]
3011
    logger.ToStdout("Selected nodes for the instance: %s" %
3012
                    (", ".join(ial.nodes),))
3013
    logger.Info("Selected nodes for instance %s via iallocator %s: %s" %
3014
                (self.op.instance_name, self.op.iallocator, ial.nodes))
3015
    if ial.required_nodes == 2:
3016
      self.op.snode = ial.nodes[1]
3017

    
3018
  def BuildHooksEnv(self):
3019
    """Build hooks env.
3020

3021
    This runs on master, primary and secondary nodes of the instance.
3022

3023
    """
3024
    env = {
3025
      "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
3026
      "INSTANCE_DISK_SIZE": self.op.disk_size,
3027
      "INSTANCE_SWAP_SIZE": self.op.swap_size,
3028
      "INSTANCE_ADD_MODE": self.op.mode,
3029
      }
3030
    if self.op.mode == constants.INSTANCE_IMPORT:
3031
      env["INSTANCE_SRC_NODE"] = self.op.src_node
3032
      env["INSTANCE_SRC_PATH"] = self.op.src_path
3033
      env["INSTANCE_SRC_IMAGE"] = self.src_image
3034

    
3035
    env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
3036
      primary_node=self.op.pnode,
3037
      secondary_nodes=self.secondaries,
3038
      status=self.instance_status,
3039
      os_type=self.op.os_type,
3040
      memory=self.op.mem_size,
3041
      vcpus=self.op.vcpus,
3042
      nics=[(self.inst_ip, self.op.bridge, self.op.mac)],
3043
    ))
3044

    
3045
    nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
3046
          self.secondaries)
3047
    return env, nl, nl
3048

    
3049

    
3050
  def CheckPrereq(self):
3051
    """Check prerequisites.
3052

3053
    """
3054
    # set optional parameters to none if they don't exist
3055
    for attr in ["kernel_path", "initrd_path", "hvm_boot_order", "pnode",
3056
                 "iallocator", "hvm_acpi", "hvm_pae", "hvm_cdrom_image_path",
3057
                 "vnc_bind_address"]:
3058
      if not hasattr(self.op, attr):
3059
        setattr(self.op, attr, None)
3060

    
3061
    if self.op.mode not in (constants.INSTANCE_CREATE,
3062
                            constants.INSTANCE_IMPORT):
3063
      raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
3064
                                 self.op.mode)
3065

    
3066
    if (not self.cfg.GetVGName() and
3067
        self.op.disk_template not in constants.DTS_NOT_LVM):
3068
      raise errors.OpPrereqError("Cluster does not support lvm-based"
3069
                                 " instances")
3070

    
3071
    if self.op.mode == constants.INSTANCE_IMPORT:
3072
      src_node = getattr(self.op, "src_node", None)
3073
      src_path = getattr(self.op, "src_path", None)
3074
      if src_node is None or src_path is None:
3075
        raise errors.OpPrereqError("Importing an instance requires source"
3076
                                   " node and path options")
3077
      src_node_full = self.cfg.ExpandNodeName(src_node)
3078
      if src_node_full is None:
3079
        raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
3080
      self.op.src_node = src_node = src_node_full
3081

    
3082
      if not os.path.isabs(src_path):
3083
        raise errors.OpPrereqError("The source path must be absolute")
3084

    
3085
      export_info = rpc.call_export_info(src_node, src_path)
3086

    
3087
      if not export_info:
3088
        raise errors.OpPrereqError("No export found in dir %s" % src_path)
3089

    
3090
      if not export_info.has_section(constants.INISECT_EXP):
3091
        raise errors.ProgrammerError("Corrupted export config")
3092

    
3093
      ei_version = export_info.get(constants.INISECT_EXP, 'version')
3094
      if (int(ei_version) != constants.EXPORT_VERSION):
3095
        raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3096
                                   (ei_version, constants.EXPORT_VERSION))
3097

    
3098
      if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
3099
        raise errors.OpPrereqError("Can't import instance with more than"
3100
                                   " one data disk")
3101

    
3102
      # FIXME: are the old os-es, disk sizes, etc. useful?
3103
      self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3104
      diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
3105
                                                         'disk0_dump'))
3106
      self.src_image = diskimage
3107
    else: # INSTANCE_CREATE
3108
      if getattr(self.op, "os_type", None) is None:
3109
        raise errors.OpPrereqError("No guest OS specified")
3110

    
3111
    #### instance parameters check
3112

    
3113
    # disk template and mirror node verification
3114
    if self.op.disk_template not in constants.DISK_TEMPLATES:
3115
      raise errors.OpPrereqError("Invalid disk template name")
3116

    
3117
    # instance name verification
3118
    hostname1 = utils.HostInfo(self.op.instance_name)
3119

    
3120
    self.op.instance_name = instance_name = hostname1.name
3121
    instance_list = self.cfg.GetInstanceList()
3122
    if instance_name in instance_list:
3123
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3124
                                 instance_name)
3125

    
3126
    # ip validity checks
3127
    ip = getattr(self.op, "ip", None)
3128
    if ip is None or ip.lower() == "none":
3129
      inst_ip = None
3130
    elif ip.lower() == "auto":
3131
      inst_ip = hostname1.ip
3132
    else:
3133
      if not utils.IsValidIP(ip):
3134
        raise errors.OpPrereqError("given IP address '%s' doesn't look"
3135
                                   " like a valid IP" % ip)
3136
      inst_ip = ip
3137
    self.inst_ip = self.op.ip = inst_ip
3138

    
3139
    if self.op.start and not self.op.ip_check:
3140
      raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3141
                                 " adding an instance in start mode")
3142

    
3143
    if self.op.ip_check:
3144
      if utils.TcpPing(hostname1.ip, constants.DEFAULT_NODED_PORT):
3145
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
3146
                                   (hostname1.ip, instance_name))
3147

    
3148
    # MAC address verification
3149
    if self.op.mac != "auto":
3150
      if not utils.IsValidMac(self.op.mac.lower()):
3151
        raise errors.OpPrereqError("invalid MAC address specified: %s" %
3152
                                   self.op.mac)
3153

    
3154
    # bridge verification
3155
    bridge = getattr(self.op, "bridge", None)
3156
    if bridge is None:
3157
      self.op.bridge = self.cfg.GetDefBridge()
3158
    else:
3159
      self.op.bridge = bridge
3160

    
3161
    # boot order verification
3162
    if self.op.hvm_boot_order is not None:
3163
      if len(self.op.hvm_boot_order.strip("acdn")) != 0:
3164
        raise errors.OpPrereqError("invalid boot order specified,"
3165
                                   " must be one or more of [acdn]")
3166
    # file storage checks
3167
    if (self.op.file_driver and
3168
        not self.op.file_driver in constants.FILE_DRIVER):
3169
      raise errors.OpPrereqError("Invalid file driver name '%s'" %
3170
                                 self.op.file_driver)
3171

    
3172
    if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
3173
      raise errors.OpPrereqError("File storage directory not a relative"
3174
                                 " path")
3175
    #### allocator run
3176

    
3177
    if [self.op.iallocator, self.op.pnode].count(None) != 1:
3178
      raise errors.OpPrereqError("One and only one of iallocator and primary"
3179
                                 " node must be given")
3180

    
3181
    if self.op.iallocator is not None:
3182
      self._RunAllocator()
3183

    
3184
    #### node related checks
3185

    
3186
    # check primary node
3187
    pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
3188
    if pnode is None:
3189
      raise errors.OpPrereqError("Primary node '%s' is unknown" %
3190
                                 self.op.pnode)
3191
    self.op.pnode = pnode.name
3192
    self.pnode = pnode
3193
    self.secondaries = []
3194

    
3195
    # mirror node verification
3196
    if self.op.disk_template in constants.DTS_NET_MIRROR:
3197
      if getattr(self.op, "snode", None) is None:
3198
        raise errors.OpPrereqError("The networked disk templates need"
3199
                                   " a mirror node")
3200

    
3201
      snode_name = self.cfg.ExpandNodeName(self.op.snode)
3202
      if snode_name is None:
3203
        raise errors.OpPrereqError("Unknown secondary node '%s'" %
3204
                                   self.op.snode)
3205
      elif snode_name == pnode.name:
3206
        raise errors.OpPrereqError("The secondary node cannot be"
3207
                                   " the primary node.")
3208
      self.secondaries.append(snode_name)
3209

    
3210
    req_size = _ComputeDiskSize(self.op.disk_template,
3211
                                self.op.disk_size, self.op.swap_size)
3212

    
3213
    # Check lv size requirements
3214
    if req_size is not None:
3215
      nodenames = [pnode.name] + self.secondaries
3216
      nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
3217
      for node in nodenames:
3218
        info = nodeinfo.get(node, None)
3219
        if not info:
3220
          raise errors.OpPrereqError("Cannot get current information"
3221
                                     " from node '%s'" % node)
3222
        vg_free = info.get('vg_free', None)
3223
        if not isinstance(vg_free, int):
3224
          raise errors.OpPrereqError("Can't compute free disk space on"
3225
                                     " node %s" % node)
3226
        if req_size > info['vg_free']:
3227
          raise errors.OpPrereqError("Not enough disk space on target node %s."
3228
                                     " %d MB available, %d MB required" %
3229
                                     (node, info['vg_free'], req_size))
3230

    
3231
    # os verification
3232
    os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
3233
    if not os_obj:
3234
      raise errors.OpPrereqError("OS '%s' not in supported os list for"
3235
                                 " primary node"  % self.op.os_type)
3236

    
3237
    if self.op.kernel_path == constants.VALUE_NONE:
3238
      raise errors.OpPrereqError("Can't set instance kernel to none")
3239

    
3240

    
3241
    # bridge check on primary node
3242
    if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3243
      raise errors.OpPrereqError("target bridge '%s' does not exist on"
3244
                                 " destination node '%s'" %
3245
                                 (self.op.bridge, pnode.name))
3246

    
3247
    # memory check on primary node
3248
    if self.op.start:
3249
      _CheckNodeFreeMemory(self.cfg, self.pnode.name,
3250
                           "creating instance %s" % self.op.instance_name,
3251
                           self.op.mem_size)
3252

    
3253
    # hvm_cdrom_image_path verification
3254
    if self.op.hvm_cdrom_image_path is not None:
3255
      if not os.path.isabs(self.op.hvm_cdrom_image_path):
3256
        raise errors.OpPrereqError("The path to the HVM CDROM image must"
3257
                                   " be an absolute path or None, not %s" %
3258
                                   self.op.hvm_cdrom_image_path)
3259
      if not os.path.isfile(self.op.hvm_cdrom_image_path):
3260
        raise errors.OpPrereqError("The HVM CDROM image must either be a"
3261
                                   " regular file or a symlink pointing to"
3262
                                   " an existing regular file, not %s" %
3263
                                   self.op.hvm_cdrom_image_path)
3264

    
3265
    # vnc_bind_address verification
3266
    if self.op.vnc_bind_address is not None:
3267
      if not utils.IsValidIP(self.op.vnc_bind_address):
3268
        raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
3269
                                   " like a valid IP address" %
3270
                                   self.op.vnc_bind_address)
3271

    
3272
    if self.op.start:
3273
      self.instance_status = 'up'
3274
    else:
3275
      self.instance_status = 'down'
3276

    
3277
  def Exec(self, feedback_fn):
3278
    """Create and add the instance to the cluster.
3279

3280
    """
3281
    instance = self.op.instance_name
3282
    pnode_name = self.pnode.name
3283

    
3284
    if self.op.mac == "auto":
3285
      mac_address = self.cfg.GenerateMAC()
3286
    else:
3287
      mac_address = self.op.mac
3288

    
3289
    nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3290
    if self.inst_ip is not None:
3291
      nic.ip = self.inst_ip
3292

    
3293
    ht_kind = self.sstore.GetHypervisorType()
3294
    if ht_kind in constants.HTS_REQ_PORT:
3295
      network_port = self.cfg.AllocatePort()
3296
    else:
3297
      network_port = None
3298

    
3299
    if self.op.vnc_bind_address is None:
3300
      self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
3301

    
3302
    # this is needed because os.path.join does not accept None arguments
3303
    if self.op.file_storage_dir is None:
3304
      string_file_storage_dir = ""
3305
    else:
3306
      string_file_storage_dir = self.op.file_storage_dir
3307

    
3308
    # build the full file storage dir path
3309
    file_storage_dir = os.path.normpath(os.path.join(
3310
                                        self.sstore.GetFileStorageDir(),
3311
                                        string_file_storage_dir, instance))
3312

    
3313

    
3314
    disks = _GenerateDiskTemplate(self.cfg,
3315
                                  self.op.disk_template,
3316
                                  instance, pnode_name,
3317
                                  self.secondaries, self.op.disk_size,
3318
                                  self.op.swap_size,
3319
                                  file_storage_dir,
3320
                                  self.op.file_driver)
3321

    
3322
    iobj = objects.Instance(name=instance, os=self.op.os_type,
3323
                            primary_node=pnode_name,
3324
                            memory=self.op.mem_size,
3325
                            vcpus=self.op.vcpus,
3326
                            nics=[nic], disks=disks,
3327
                            disk_template=self.op.disk_template,
3328
                            status=self.instance_status,
3329
                            network_port=network_port,
3330
                            kernel_path=self.op.kernel_path,
3331
                            initrd_path=self.op.initrd_path,
3332
                            hvm_boot_order=self.op.hvm_boot_order,
3333
                            hvm_acpi=self.op.hvm_acpi,
3334
                            hvm_pae=self.op.hvm_pae,
3335
                            hvm_cdrom_image_path=self.op.hvm_cdrom_image_path,
3336
                            vnc_bind_address=self.op.vnc_bind_address,
3337
                            )
3338

    
3339
    feedback_fn("* creating instance disks...")
3340
    if not _CreateDisks(self.cfg, iobj):
3341
      _RemoveDisks(iobj, self.cfg)
3342
      raise errors.OpExecError("Device creation failed, reverting...")
3343

    
3344
    feedback_fn("adding instance %s to cluster config" % instance)
3345

    
3346
    self.cfg.AddInstance(iobj)
3347
    # Add the new instance to the Ganeti Lock Manager
3348
    self.context.glm.add(locking.LEVEL_INSTANCE, instance)
3349

    
3350
    if self.op.wait_for_sync:
3351
      disk_abort = not _WaitForSync(self.cfg, iobj, self.proc)
3352
    elif iobj.disk_template in constants.DTS_NET_MIRROR:
3353
      # make sure the disks are not degraded (still sync-ing is ok)
3354
      time.sleep(15)
3355
      feedback_fn("* checking mirrors status")
3356
      disk_abort = not _WaitForSync(self.cfg, iobj, self.proc, oneshot=True)
3357
    else:
3358
      disk_abort = False
3359

    
3360
    if disk_abort:
3361
      _RemoveDisks(iobj, self.cfg)
3362
      self.cfg.RemoveInstance(iobj.name)
3363
      # Remove the new instance from the Ganeti Lock Manager
3364
      self.context.glm.remove(locking.LEVEL_INSTANCE, iobj.name)
3365
      raise errors.OpExecError("There are some degraded disks for"
3366
                               " this instance")
3367

    
3368
    feedback_fn("creating os for instance %s on node %s" %
3369
                (instance, pnode_name))
3370

    
3371
    if iobj.disk_template != constants.DT_DISKLESS:
3372
      if self.op.mode == constants.INSTANCE_CREATE:
3373
        feedback_fn("* running the instance OS create scripts...")
3374
        if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3375
          raise errors.OpExecError("could not add os for instance %s"
3376
                                   " on node %s" %
3377
                                   (instance, pnode_name))
3378

    
3379
      elif self.op.mode == constants.INSTANCE_IMPORT:
3380
        feedback_fn("* running the instance OS import scripts...")
3381
        src_node = self.op.src_node
3382
        src_image = self.src_image
3383
        if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3384
                                                src_node, src_image):
3385
          raise errors.OpExecError("Could not import os for instance"
3386
                                   " %s on node %s" %
3387
                                   (instance, pnode_name))
3388
      else:
3389
        # also checked in the prereq part
3390
        raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3391
                                     % self.op.mode)
3392

    
3393
    if self.op.start:
3394
      logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3395
      feedback_fn("* starting instance...")
3396
      if not rpc.call_instance_start(pnode_name, iobj, None):
3397
        raise errors.OpExecError("Could not start instance")
3398

    
3399

    
3400
class LUConnectConsole(NoHooksLU):
3401
  """Connect to an instance's console.
3402

3403
  This is somewhat special in that it returns the command line that
3404
  you need to run on the master node in order to connect to the
3405
  console.
3406

3407
  """
3408
  _OP_REQP = ["instance_name"]
3409
  REQ_BGL = False
3410

    
3411
  def ExpandNames(self):
3412
    self._ExpandAndLockInstance()
3413

    
3414
  def CheckPrereq(self):
3415
    """Check prerequisites.
3416

3417
    This checks that the instance is in the cluster.
3418

3419
    """
3420
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3421
    assert self.instance is not None, \
3422
      "Cannot retrieve locked instance %s" % self.op.instance_name
3423

    
3424
  def Exec(self, feedback_fn):
3425
    """Connect to the console of an instance
3426

3427
    """
3428
    instance = self.instance
3429
    node = instance.primary_node
3430

    
3431
    node_insts = rpc.call_instance_list([node])[node]
3432
    if node_insts is False:
3433
      raise errors.OpExecError("Can't connect to node %s." % node)
3434

    
3435
    if instance.name not in node_insts:
3436
      raise errors.OpExecError("Instance %s is not running." % instance.name)
3437

    
3438
    logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3439

    
3440
    hyper = hypervisor.GetHypervisor()
3441
    console_cmd = hyper.GetShellCommandForConsole(instance)
3442

    
3443
    # build ssh cmdline
3444
    return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
3445

    
3446

    
3447
class LUReplaceDisks(LogicalUnit):
3448
  """Replace the disks of an instance.
3449

3450
  """
3451
  HPATH = "mirrors-replace"
3452
  HTYPE = constants.HTYPE_INSTANCE
3453
  _OP_REQP = ["instance_name", "mode", "disks"]
3454

    
3455
  def _RunAllocator(self):
3456
    """Compute a new secondary node using an IAllocator.
3457

3458
    """
3459
    ial = IAllocator(self.cfg, self.sstore,
3460
                     mode=constants.IALLOCATOR_MODE_RELOC,
3461
                     name=self.op.instance_name,
3462
                     relocate_from=[self.sec_node])
3463

    
3464
    ial.Run(self.op.iallocator)
3465

    
3466
    if not ial.success:
3467
      raise errors.OpPrereqError("Can't compute nodes using"
3468
                                 " iallocator '%s': %s" % (self.op.iallocator,
3469
                                                           ial.info))
3470
    if len(ial.nodes) != ial.required_nodes:
3471
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3472
                                 " of nodes (%s), required %s" %
3473
                                 (len(ial.nodes), ial.required_nodes))
3474
    self.op.remote_node = ial.nodes[0]
3475
    logger.ToStdout("Selected new secondary for the instance: %s" %
3476
                    self.op.remote_node)
3477

    
3478
  def BuildHooksEnv(self):
3479
    """Build hooks env.
3480

3481
    This runs on the master, the primary and all the secondaries.
3482

3483
    """
3484
    env = {
3485
      "MODE": self.op.mode,
3486
      "NEW_SECONDARY": self.op.remote_node,
3487
      "OLD_SECONDARY": self.instance.secondary_nodes[0],
3488
      }
3489
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3490
    nl = [
3491
      self.sstore.GetMasterNode(),
3492
      self.instance.primary_node,
3493
      ]
3494
    if self.op.remote_node is not None:
3495
      nl.append(self.op.remote_node)
3496
    return env, nl, nl
3497

    
3498
  def CheckPrereq(self):
3499
    """Check prerequisites.
3500

3501
    This checks that the instance is in the cluster.
3502

3503
    """
3504
    if not hasattr(self.op, "remote_node"):
3505
      self.op.remote_node = None
3506

    
3507
    instance = self.cfg.GetInstanceInfo(
3508
      self.cfg.ExpandInstanceName(self.op.instance_name))
3509
    if instance is None:
3510
      raise errors.OpPrereqError("Instance '%s' not known" %
3511
                                 self.op.instance_name)
3512
    self.instance = instance
3513
    self.op.instance_name = instance.name
3514

    
3515
    if instance.disk_template not in constants.DTS_NET_MIRROR:
3516
      raise errors.OpPrereqError("Instance's disk layout is not"
3517
                                 " network mirrored.")
3518

    
3519
    if len(instance.secondary_nodes) != 1:
3520
      raise errors.OpPrereqError("The instance has a strange layout,"
3521
                                 " expected one secondary but found %d" %
3522
                                 len(instance.secondary_nodes))
3523

    
3524
    self.sec_node = instance.secondary_nodes[0]
3525

    
3526
    ia_name = getattr(self.op, "iallocator", None)
3527
    if ia_name is not None:
3528
      if self.op.remote_node is not None:
3529
        raise errors.OpPrereqError("Give either the iallocator or the new"
3530
                                   " secondary, not both")
3531
      self.op.remote_node = self._RunAllocator()
3532

    
3533
    remote_node = self.op.remote_node
3534
    if remote_node is not None:
3535
      remote_node = self.cfg.ExpandNodeName(remote_node)
3536
      if remote_node is None:
3537
        raise errors.OpPrereqError("Node '%s' not known" %
3538
                                   self.op.remote_node)
3539
      self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3540
    else:
3541
      self.remote_node_info = None
3542
    if remote_node == instance.primary_node:
3543
      raise errors.OpPrereqError("The specified node is the primary node of"
3544
                                 " the instance.")
3545
    elif remote_node == self.sec_node:
3546
      if self.op.mode == constants.REPLACE_DISK_SEC:
3547
        # this is for DRBD8, where we can't execute the same mode of
3548
        # replacement as for drbd7 (no different port allocated)
3549
        raise errors.OpPrereqError("Same secondary given, cannot execute"
3550
                                   " replacement")
3551
    if instance.disk_template == constants.DT_DRBD8:
3552
      if (self.op.mode == constants.REPLACE_DISK_ALL and
3553
          remote_node is not None):
3554
        # switch to replace secondary mode
3555
        self.op.mode = constants.REPLACE_DISK_SEC
3556

    
3557
      if self.op.mode == constants.REPLACE_DISK_ALL:
3558
        raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3559
                                   " secondary disk replacement, not"
3560
                                   " both at once")
3561
      elif self.op.mode == constants.REPLACE_DISK_PRI:
3562
        if remote_node is not None:
3563
          raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3564
                                     " the secondary while doing a primary"
3565
                                     " node disk replacement")
3566
        self.tgt_node = instance.primary_node
3567
        self.oth_node = instance.secondary_nodes[0]
3568
      elif self.op.mode == constants.REPLACE_DISK_SEC:
3569
        self.new_node = remote_node # this can be None, in which case
3570
                                    # we don't change the secondary
3571
        self.tgt_node = instance.secondary_nodes[0]
3572
        self.oth_node = instance.primary_node
3573
      else:
3574
        raise errors.ProgrammerError("Unhandled disk replace mode")
3575

    
3576
    for name in self.op.disks:
3577
      if instance.FindDisk(name) is None:
3578
        raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3579
                                   (name, instance.name))
3580
    self.op.remote_node = remote_node
3581

    
3582
  def _ExecD8DiskOnly(self, feedback_fn):
3583
    """Replace a disk on the primary or secondary for dbrd8.
3584

3585
    The algorithm for replace is quite complicated:
3586
      - for each disk to be replaced:
3587
        - create new LVs on the target node with unique names
3588
        - detach old LVs from the drbd device
3589
        - rename old LVs to name_replaced.<time_t>
3590
        - rename new LVs to old LVs
3591
        - attach the new LVs (with the old names now) to the drbd device
3592
      - wait for sync across all devices
3593
      - for each modified disk:
3594
        - remove old LVs (which have the name name_replaces.<time_t>)
3595

3596
    Failures are not very well handled.
3597

3598
    """
3599
    steps_total = 6
3600
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3601
    instance = self.instance
3602
    iv_names = {}
3603
    vgname = self.cfg.GetVGName()
3604
    # start of work
3605
    cfg = self.cfg
3606
    tgt_node = self.tgt_node
3607
    oth_node = self.oth_node
3608

    
3609
    # Step: check device activation
3610
    self.proc.LogStep(1, steps_total, "check device existence")
3611
    info("checking volume groups")
3612
    my_vg = cfg.GetVGName()
3613
    results = rpc.call_vg_list([oth_node, tgt_node])
3614
    if not results:
3615
      raise errors.OpExecError("Can't list volume groups on the nodes")
3616
    for node in oth_node, tgt_node:
3617
      res = results.get(node, False)
3618
      if not res or my_vg not in res:
3619
        raise errors.OpExecError("Volume group '%s' not found on %s" %
3620
                                 (my_vg, node))
3621
    for dev in instance.disks:
3622
      if not dev.iv_name in self.op.disks:
3623
        continue
3624
      for node in tgt_node, oth_node:
3625
        info("checking %s on %s" % (dev.iv_name, node))
3626
        cfg.SetDiskID(dev, node)
3627
        if not rpc.call_blockdev_find(node, dev):
3628
          raise errors.OpExecError("Can't find device %s on node %s" %
3629
                                   (dev.iv_name, node))
3630

    
3631
    # Step: check other node consistency
3632
    self.proc.LogStep(2, steps_total, "check peer consistency")
3633
    for dev in instance.disks:
3634
      if not dev.iv_name in self.op.disks:
3635
        continue
3636
      info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3637
      if not _CheckDiskConsistency(self.cfg, dev, oth_node,
3638
                                   oth_node==instance.primary_node):
3639
        raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3640
                                 " to replace disks on this node (%s)" %
3641
                                 (oth_node, tgt_node))
3642

    
3643
    # Step: create new storage
3644
    self.proc.LogStep(3, steps_total, "allocate new storage")
3645
    for dev in instance.disks:
3646
      if not dev.iv_name in self.op.disks:
3647
        continue
3648
      size = dev.size
3649
      cfg.SetDiskID(dev, tgt_node)
3650
      lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3651
      names = _GenerateUniqueNames(cfg, lv_names)
3652
      lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3653
                             logical_id=(vgname, names[0]))
3654
      lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3655
                             logical_id=(vgname, names[1]))
3656
      new_lvs = [lv_data, lv_meta]
3657
      old_lvs = dev.children
3658
      iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3659
      info("creating new local storage on %s for %s" %
3660
           (tgt_node, dev.iv_name))
3661
      # since we *always* want to create this LV, we use the
3662
      # _Create...OnPrimary (which forces the creation), even if we
3663
      # are talking about the secondary node
3664
      for new_lv in new_lvs:
3665
        if not _CreateBlockDevOnPrimary(cfg, tgt_node, instance, new_lv,
3666
                                        _GetInstanceInfoText(instance)):
3667
          raise errors.OpExecError("Failed to create new LV named '%s' on"
3668
                                   " node '%s'" %
3669
                                   (new_lv.logical_id[1], tgt_node))
3670

    
3671
    # Step: for each lv, detach+rename*2+attach
3672
    self.proc.LogStep(4, steps_total, "change drbd configuration")
3673
    for dev, old_lvs, new_lvs in iv_names.itervalues():
3674
      info("detaching %s drbd from local storage" % dev.iv_name)
3675
      if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
3676
        raise errors.OpExecError("Can't detach drbd from local storage on node"
3677
                                 " %s for device %s" % (tgt_node, dev.iv_name))
3678
      #dev.children = []
3679
      #cfg.Update(instance)
3680

    
3681
      # ok, we created the new LVs, so now we know we have the needed
3682
      # storage; as such, we proceed on the target node to rename
3683
      # old_lv to _old, and new_lv to old_lv; note that we rename LVs
3684
      # using the assumption that logical_id == physical_id (which in
3685
      # turn is the unique_id on that node)
3686

    
3687
      # FIXME(iustin): use a better name for the replaced LVs
3688
      temp_suffix = int(time.time())
3689
      ren_fn = lambda d, suff: (d.physical_id[0],
3690
                                d.physical_id[1] + "_replaced-%s" % suff)
3691
      # build the rename list based on what LVs exist on the node
3692
      rlist = []
3693
      for to_ren in old_lvs:
3694
        find_res = rpc.call_blockdev_find(tgt_node, to_ren)
3695
        if find_res is not None: # device exists
3696
          rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
3697

    
3698
      info("renaming the old LVs on the target node")
3699
      if not rpc.call_blockdev_rename(tgt_node, rlist):
3700
        raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
3701
      # now we rename the new LVs to the old LVs
3702
      info("renaming the new LVs on the target node")
3703
      rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
3704
      if not rpc.call_blockdev_rename(tgt_node, rlist):
3705
        raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
3706

    
3707
      for old, new in zip(old_lvs, new_lvs):
3708
        new.logical_id = old.logical_id
3709
        cfg.SetDiskID(new, tgt_node)
3710

    
3711
      for disk in old_lvs:
3712
        disk.logical_id = ren_fn(disk, temp_suffix)
3713
        cfg.SetDiskID(disk, tgt_node)
3714

    
3715
      # now that the new lvs have the old name, we can add them to the device
3716
      info("adding new mirror component on %s" % tgt_node)
3717
      if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
3718
        for new_lv in new_lvs:
3719
          if not rpc.call_blockdev_remove(tgt_node, new_lv):
3720
            warning("Can't rollback device %s", hint="manually cleanup unused"
3721
                    " logical volumes")
3722
        raise errors.OpExecError("Can't add local storage to drbd")
3723

    
3724
      dev.children = new_lvs
3725
      cfg.Update(instance)
3726

    
3727
    # Step: wait for sync
3728

    
3729
    # this can fail as the old devices are degraded and _WaitForSync
3730
    # does a combined result over all disks, so we don't check its
3731
    # return value
3732
    self.proc.LogStep(5, steps_total, "sync devices")
3733
    _WaitForSync(cfg, instance, self.proc, unlock=True)
3734

    
3735
    # so check manually all the devices
3736
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3737
      cfg.SetDiskID(dev, instance.primary_node)
3738
      is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3739
      if is_degr:
3740
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
3741

    
3742
    # Step: remove old storage
3743
    self.proc.LogStep(6, steps_total, "removing old storage")
3744
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3745
      info("remove logical volumes for %s" % name)
3746
      for lv in old_lvs:
3747
        cfg.SetDiskID(lv, tgt_node)
3748
        if not rpc.call_blockdev_remove(tgt_node, lv):
3749
          warning("Can't remove old LV", hint="manually remove unused LVs")
3750
          continue
3751

    
3752
  def _ExecD8Secondary(self, feedback_fn):
3753
    """Replace the secondary node for drbd8.
3754

3755
    The algorithm for replace is quite complicated:
3756
      - for all disks of the instance:
3757
        - create new LVs on the new node with same names
3758
        - shutdown the drbd device on the old secondary
3759
        - disconnect the drbd network on the primary
3760
        - create the drbd device on the new secondary
3761
        - network attach the drbd on the primary, using an artifice:
3762
          the drbd code for Attach() will connect to the network if it
3763
          finds a device which is connected to the good local disks but
3764
          not network enabled
3765
      - wait for sync across all devices
3766
      - remove all disks from the old secondary
3767

3768
    Failures are not very well handled.
3769

3770
    """
3771
    steps_total = 6
3772
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3773
    instance = self.instance
3774
    iv_names = {}
3775
    vgname = self.cfg.GetVGName()
3776
    # start of work
3777
    cfg = self.cfg
3778
    old_node = self.tgt_node
3779
    new_node = self.new_node
3780
    pri_node = instance.primary_node
3781

    
3782
    # Step: check device activation
3783
    self.proc.LogStep(1, steps_total, "check device existence")
3784
    info("checking volume groups")
3785
    my_vg = cfg.GetVGName()
3786
    results = rpc.call_vg_list([pri_node, new_node])
3787
    if not results:
3788
      raise errors.OpExecError("Can't list volume groups on the nodes")
3789
    for node in pri_node, new_node:
3790
      res = results.get(node, False)
3791
      if not res or my_vg not in res:
3792
        raise errors.OpExecError("Volume group '%s' not found on %s" %
3793
                                 (my_vg, node))
3794
    for dev in instance.disks:
3795
      if not dev.iv_name in self.op.disks:
3796
        continue
3797
      info("checking %s on %s" % (dev.iv_name, pri_node))
3798
      cfg.SetDiskID(dev, pri_node)
3799
      if not rpc.call_blockdev_find(pri_node, dev):
3800
        raise errors.OpExecError("Can't find device %s on node %s" %
3801
                                 (dev.iv_name, pri_node))
3802

    
3803
    # Step: check other node consistency
3804
    self.proc.LogStep(2, steps_total, "check peer consistency")
3805
    for dev in instance.disks:
3806
      if not dev.iv_name in self.op.disks:
3807
        continue
3808
      info("checking %s consistency on %s" % (dev.iv_name, pri_node))
3809
      if not _CheckDiskConsistency(self.cfg, dev, pri_node, True, ldisk=True):
3810
        raise errors.OpExecError("Primary node (%s) has degraded storage,"
3811
                                 " unsafe to replace the secondary" %
3812
                                 pri_node)
3813

    
3814
    # Step: create new storage
3815
    self.proc.LogStep(3, steps_total, "allocate new storage")
3816
    for dev in instance.disks:
3817
      size = dev.size
3818
      info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
3819
      # since we *always* want to create this LV, we use the
3820
      # _Create...OnPrimary (which forces the creation), even if we
3821
      # are talking about the secondary node
3822
      for new_lv in dev.children:
3823
        if not _CreateBlockDevOnPrimary(cfg, new_node, instance, new_lv,
3824
                                        _GetInstanceInfoText(instance)):
3825
          raise errors.OpExecError("Failed to create new LV named '%s' on"
3826
                                   " node '%s'" %
3827
                                   (new_lv.logical_id[1], new_node))
3828

    
3829
      iv_names[dev.iv_name] = (dev, dev.children)
3830

    
3831
    self.proc.LogStep(4, steps_total, "changing drbd configuration")
3832
    for dev in instance.disks:
3833
      size = dev.size
3834
      info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
3835
      # create new devices on new_node
3836
      new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
3837
                              logical_id=(pri_node, new_node,
3838
                                          dev.logical_id[2]),
3839
                              children=dev.children)
3840
      if not _CreateBlockDevOnSecondary(cfg, new_node, instance,
3841
                                        new_drbd, False,
3842
                                      _GetInstanceInfoText(instance)):
3843
        raise errors.OpExecError("Failed to create new DRBD on"
3844
                                 " node '%s'" % new_node)
3845

    
3846
    for dev in instance.disks:
3847
      # we have new devices, shutdown the drbd on the old secondary
3848
      info("shutting down drbd for %s on old node" % dev.iv_name)
3849
      cfg.SetDiskID(dev, old_node)
3850
      if not rpc.call_blockdev_shutdown(old_node, dev):
3851
        warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
3852
                hint="Please cleanup this device manually as soon as possible")
3853

    
3854
    info("detaching primary drbds from the network (=> standalone)")
3855
    done = 0
3856
    for dev in instance.disks:
3857
      cfg.SetDiskID(dev, pri_node)
3858
      # set the physical (unique in bdev terms) id to None, meaning
3859
      # detach from network
3860
      dev.physical_id = (None,) * len(dev.physical_id)
3861
      # and 'find' the device, which will 'fix' it to match the
3862
      # standalone state
3863
      if rpc.call_blockdev_find(pri_node, dev):
3864
        done += 1
3865
      else:
3866
        warning("Failed to detach drbd %s from network, unusual case" %
3867
                dev.iv_name)
3868

    
3869
    if not done:
3870
      # no detaches succeeded (very unlikely)
3871
      raise errors.OpExecError("Can't detach at least one DRBD from old node")
3872

    
3873
    # if we managed to detach at least one, we update all the disks of
3874
    # the instance to point to the new secondary
3875
    info("updating instance configuration")
3876
    for dev in instance.disks:
3877
      dev.logical_id = (pri_node, new_node) + dev.logical_id[2:]
3878
      cfg.SetDiskID(dev, pri_node)
3879
    cfg.Update(instance)
3880

    
3881
    # and now perform the drbd attach
3882
    info("attaching primary drbds to new secondary (standalone => connected)")
3883
    failures = []
3884
    for dev in instance.disks:
3885
      info("attaching primary drbd for %s to new secondary node" % dev.iv_name)
3886
      # since the attach is smart, it's enough to 'find' the device,
3887
      # it will automatically activate the network, if the physical_id
3888
      # is correct
3889
      cfg.SetDiskID(dev, pri_node)
3890
      if not rpc.call_blockdev_find(pri_node, dev):
3891
        warning("can't attach drbd %s to new secondary!" % dev.iv_name,
3892
                "please do a gnt-instance info to see the status of disks")
3893

    
3894
    # this can fail as the old devices are degraded and _WaitForSync
3895
    # does a combined result over all disks, so we don't check its
3896
    # return value
3897
    self.proc.LogStep(5, steps_total, "sync devices")
3898
    _WaitForSync(cfg, instance, self.proc, unlock=True)
3899

    
3900
    # so check manually all the devices
3901
    for name, (dev, old_lvs) in iv_names.iteritems():
3902
      cfg.SetDiskID(dev, pri_node)
3903
      is_degr = rpc.call_blockdev_find(pri_node, dev)[5]
3904
      if is_degr:
3905
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
3906

    
3907
    self.proc.LogStep(6, steps_total, "removing old storage")
3908
    for name, (dev, old_lvs) in iv_names.iteritems():
3909
      info("remove logical volumes for %s" % name)
3910
      for lv in old_lvs:
3911
        cfg.SetDiskID(lv, old_node)
3912
        if not rpc.call_blockdev_remove(old_node, lv):
3913
          warning("Can't remove LV on old secondary",
3914
                  hint="Cleanup stale volumes by hand")
3915

    
3916
  def Exec(self, feedback_fn):
3917
    """Execute disk replacement.
3918

3919
    This dispatches the disk replacement to the appropriate handler.
3920

3921
    """
3922
    instance = self.instance
3923

    
3924
    # Activate the instance disks if we're replacing them on a down instance
3925
    if instance.status == "down":
3926
      op = opcodes.OpActivateInstanceDisks(instance_name=instance.name)
3927
      self.proc.ChainOpCode(op)
3928

    
3929
    if instance.disk_template == constants.DT_DRBD8:
3930
      if self.op.remote_node is None:
3931
        fn = self._ExecD8DiskOnly
3932
      else:
3933
        fn = self._ExecD8Secondary
3934
    else:
3935
      raise errors.ProgrammerError("Unhandled disk replacement case")
3936

    
3937
    ret = fn(feedback_fn)
3938

    
3939
    # Deactivate the instance disks if we're replacing them on a down instance
3940
    if instance.status == "down":
3941
      op = opcodes.OpDeactivateInstanceDisks(instance_name=instance.name)
3942
      self.proc.ChainOpCode(op)
3943

    
3944
    return ret
3945

    
3946

    
3947
class LUGrowDisk(LogicalUnit):
3948
  """Grow a disk of an instance.
3949

3950
  """
3951
  HPATH = "disk-grow"
3952
  HTYPE = constants.HTYPE_INSTANCE
3953
  _OP_REQP = ["instance_name", "disk", "amount"]
3954

    
3955
  def BuildHooksEnv(self):
3956
    """Build hooks env.
3957

3958
    This runs on the master, the primary and all the secondaries.
3959

3960
    """
3961
    env = {
3962
      "DISK": self.op.disk,
3963
      "AMOUNT": self.op.amount,
3964
      }
3965
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3966
    nl = [
3967
      self.sstore.GetMasterNode(),
3968
      self.instance.primary_node,
3969
      ]
3970
    return env, nl, nl
3971

    
3972
  def CheckPrereq(self):
3973
    """Check prerequisites.
3974

3975
    This checks that the instance is in the cluster.
3976

3977
    """
3978
    instance = self.cfg.GetInstanceInfo(
3979
      self.cfg.ExpandInstanceName(self.op.instance_name))
3980
    if instance is None:
3981
      raise errors.OpPrereqError("Instance '%s' not known" %
3982
                                 self.op.instance_name)
3983
    self.instance = instance
3984
    self.op.instance_name = instance.name
3985

    
3986
    if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
3987
      raise errors.OpPrereqError("Instance's disk layout does not support"
3988
                                 " growing.")
3989

    
3990
    if instance.FindDisk(self.op.disk) is None:
3991
      raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3992
                                 (self.op.disk, instance.name))
3993

    
3994
    nodenames = [instance.primary_node] + list(instance.secondary_nodes)
3995
    nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
3996
    for node in nodenames:
3997
      info = nodeinfo.get(node, None)
3998
      if not info:
3999
        raise errors.OpPrereqError("Cannot get current information"
4000
                                   " from node '%s'" % node)
4001
      vg_free = info.get('vg_free', None)
4002
      if not isinstance(vg_free, int):
4003
        raise errors.OpPrereqError("Can't compute free disk space on"
4004
                                   " node %s" % node)
4005
      if self.op.amount > info['vg_free']:
4006
        raise errors.OpPrereqError("Not enough disk space on target node %s:"
4007
                                   " %d MiB available, %d MiB required" %
4008
                                   (node, info['vg_free'], self.op.amount))
4009

    
4010
  def Exec(self, feedback_fn):
4011
    """Execute disk grow.
4012

4013
    """
4014
    instance = self.instance
4015
    disk = instance.FindDisk(self.op.disk)
4016
    for node in (instance.secondary_nodes + (instance.primary_node,)):
4017
      self.cfg.SetDiskID(disk, node)
4018
      result = rpc.call_blockdev_grow(node, disk, self.op.amount)
4019
      if not result or not isinstance(result, tuple) or len(result) != 2:
4020
        raise errors.OpExecError("grow request failed to node %s" % node)
4021
      elif not result[0]:
4022
        raise errors.OpExecError("grow request failed to node %s: %s" %
4023
                                 (node, result[1]))
4024
    disk.RecordGrow(self.op.amount)
4025
    self.cfg.Update(instance)
4026
    return
4027

    
4028

    
4029
class LUQueryInstanceData(NoHooksLU):
4030
  """Query runtime instance data.
4031

4032
  """
4033
  _OP_REQP = ["instances"]
4034

    
4035
  def CheckPrereq(self):
4036
    """Check prerequisites.
4037

4038
    This only checks the optional instance list against the existing names.
4039

4040
    """
4041
    if not isinstance(self.op.instances, list):
4042
      raise errors.OpPrereqError("Invalid argument type 'instances'")
4043
    if self.op.instances:
4044
      self.wanted_instances = []
4045
      names = self.op.instances
4046
      for name in names:
4047
        instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
4048
        if instance is None:
4049
          raise errors.OpPrereqError("No such instance name '%s'" % name)
4050
        self.wanted_instances.append(instance)
4051
    else:
4052
      self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
4053
                               in self.cfg.GetInstanceList()]
4054
    return
4055

    
4056

    
4057
  def _ComputeDiskStatus(self, instance, snode, dev):
4058
    """Compute block device status.
4059

4060
    """
4061
    self.cfg.SetDiskID(dev, instance.primary_node)
4062
    dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
4063
    if dev.dev_type in constants.LDS_DRBD:
4064
      # we change the snode then (otherwise we use the one passed in)
4065
      if dev.logical_id[0] == instance.primary_node:
4066
        snode = dev.logical_id[1]
4067
      else:
4068
        snode = dev.logical_id[0]
4069

    
4070
    if snode:
4071
      self.cfg.SetDiskID(dev, snode)
4072
      dev_sstatus = rpc.call_blockdev_find(snode, dev)
4073
    else:
4074
      dev_sstatus = None
4075

    
4076
    if dev.children:
4077
      dev_children = [self._ComputeDiskStatus(instance, snode, child)
4078
                      for child in dev.children]
4079
    else:
4080
      dev_children = []
4081

    
4082
    data = {
4083
      "iv_name": dev.iv_name,
4084
      "dev_type": dev.dev_type,
4085
      "logical_id": dev.logical_id,
4086
      "physical_id": dev.physical_id,
4087
      "pstatus": dev_pstatus,
4088
      "sstatus": dev_sstatus,
4089
      "children": dev_children,
4090
      }
4091

    
4092
    return data
4093

    
4094
  def Exec(self, feedback_fn):
4095
    """Gather and return data"""
4096
    result = {}
4097
    for instance in self.wanted_instances:
4098
      remote_info = rpc.call_instance_info(instance.primary_node,
4099
                                                instance.name)
4100
      if remote_info and "state" in remote_info:
4101
        remote_state = "up"
4102
      else:
4103
        remote_state = "down"
4104
      if instance.status == "down":
4105
        config_state = "down"
4106
      else:
4107
        config_state = "up"
4108

    
4109
      disks = [self._ComputeDiskStatus(instance, None, device)
4110
               for device in instance.disks]
4111

    
4112
      idict = {
4113
        "name": instance.name,
4114
        "config_state": config_state,
4115
        "run_state": remote_state,
4116
        "pnode": instance.primary_node,
4117
        "snodes": instance.secondary_nodes,
4118
        "os": instance.os,
4119
        "memory": instance.memory,
4120
        "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
4121
        "disks": disks,
4122
        "vcpus": instance.vcpus,
4123
        }
4124

    
4125
      htkind = self.sstore.GetHypervisorType()
4126
      if htkind == constants.HT_XEN_PVM30:
4127
        idict["kernel_path"] = instance.kernel_path
4128
        idict["initrd_path"] = instance.initrd_path
4129

    
4130
      if htkind == constants.HT_XEN_HVM31:
4131
        idict["hvm_boot_order"] = instance.hvm_boot_order
4132
        idict["hvm_acpi"] = instance.hvm_acpi
4133
        idict["hvm_pae"] = instance.hvm_pae
4134
        idict["hvm_cdrom_image_path"] = instance.hvm_cdrom_image_path
4135

    
4136
      if htkind in constants.HTS_REQ_PORT:
4137
        idict["vnc_bind_address"] = instance.vnc_bind_address
4138
        idict["network_port"] = instance.network_port
4139

    
4140
      result[instance.name] = idict
4141

    
4142
    return result
4143

    
4144

    
4145
class LUSetInstanceParams(LogicalUnit):
4146
  """Modifies an instances's parameters.
4147

4148
  """
4149
  HPATH = "instance-modify"
4150
  HTYPE = constants.HTYPE_INSTANCE
4151
  _OP_REQP = ["instance_name"]
4152
  REQ_BGL = False
4153

    
4154
  def ExpandNames(self):
4155
    self._ExpandAndLockInstance()
4156

    
4157
  def BuildHooksEnv(self):
4158
    """Build hooks env.
4159

4160
    This runs on the master, primary and secondaries.
4161

4162
    """
4163
    args = dict()
4164
    if self.mem:
4165
      args['memory'] = self.mem
4166
    if self.vcpus:
4167
      args['vcpus'] = self.vcpus
4168
    if self.do_ip or self.do_bridge or self.mac:
4169
      if self.do_ip:
4170
        ip = self.ip
4171
      else:
4172
        ip = self.instance.nics[0].ip
4173
      if self.bridge:
4174
        bridge = self.bridge
4175
      else:
4176
        bridge = self.instance.nics[0].bridge
4177
      if self.mac:
4178
        mac = self.mac
4179
      else:
4180
        mac = self.instance.nics[0].mac
4181
      args['nics'] = [(ip, bridge, mac)]
4182
    env = _BuildInstanceHookEnvByObject(self.instance, override=args)
4183
    nl = [self.sstore.GetMasterNode(),
4184
          self.instance.primary_node] + list(self.instance.secondary_nodes)
4185
    return env, nl, nl
4186

    
4187
  def CheckPrereq(self):
4188
    """Check prerequisites.
4189

4190
    This only checks the instance list against the existing names.
4191

4192
    """
4193
    # FIXME: all the parameters could be checked before, in ExpandNames, or in
4194
    # a separate CheckArguments function, if we implement one, so the operation
4195
    # can be aborted without waiting for any lock, should it have an error...
4196
    self.mem = getattr(self.op, "mem", None)
4197
    self.vcpus = getattr(self.op, "vcpus", None)
4198
    self.ip = getattr(self.op, "ip", None)
4199
    self.mac = getattr(self.op, "mac", None)
4200
    self.bridge = getattr(self.op, "bridge", None)
4201
    self.kernel_path = getattr(self.op, "kernel_path", None)
4202
    self.initrd_path = getattr(self.op, "initrd_path", None)
4203
    self.hvm_boot_order = getattr(self.op, "hvm_boot_order", None)
4204
    self.hvm_acpi = getattr(self.op, "hvm_acpi", None)
4205
    self.hvm_pae = getattr(self.op, "hvm_pae", None)
4206
    self.hvm_cdrom_image_path = getattr(self.op, "hvm_cdrom_image_path", None)
4207
    self.vnc_bind_address = getattr(self.op, "vnc_bind_address", None)
4208
    all_parms = [self.mem, self.vcpus, self.ip, self.bridge, self.mac,
4209
                 self.kernel_path, self.initrd_path, self.hvm_boot_order,
4210
                 self.hvm_acpi, self.hvm_pae, self.hvm_cdrom_image_path,
4211
                 self.vnc_bind_address]
4212
    if all_parms.count(None) == len(all_parms):
4213
      raise errors.OpPrereqError("No changes submitted")
4214
    if self.mem is not None:
4215
      try:
4216
        self.mem = int(self.mem)
4217
      except ValueError, err:
4218
        raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
4219
    if self.vcpus is not None:
4220
      try:
4221
        self.vcpus = int(self.vcpus)
4222
      except ValueError, err:
4223
        raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
4224
    if self.ip is not None:
4225
      self.do_ip = True
4226
      if self.ip.lower() == "none":
4227
        self.ip = None
4228
      else:
4229
        if not utils.IsValidIP(self.ip):
4230
          raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
4231
    else:
4232
      self.do_ip = False
4233
    self.do_bridge = (self.bridge is not None)
4234
    if self.mac is not None:
4235
      if self.cfg.IsMacInUse(self.mac):
4236
        raise errors.OpPrereqError('MAC address %s already in use in cluster' %
4237
                                   self.mac)
4238
      if not utils.IsValidMac(self.mac):
4239
        raise errors.OpPrereqError('Invalid MAC address %s' % self.mac)
4240

    
4241
    if self.kernel_path is not None:
4242
      self.do_kernel_path = True
4243
      if self.kernel_path == constants.VALUE_NONE:
4244
        raise errors.OpPrereqError("Can't set instance to no kernel")
4245

    
4246
      if self.kernel_path != constants.VALUE_DEFAULT:
4247
        if not os.path.isabs(self.kernel_path):
4248
          raise errors.OpPrereqError("The kernel path must be an absolute"
4249
                                    " filename")
4250
    else:
4251
      self.do_kernel_path = False
4252

    
4253
    if self.initrd_path is not None:
4254
      self.do_initrd_path = True
4255
      if self.initrd_path not in (constants.VALUE_NONE,
4256
                                  constants.VALUE_DEFAULT):
4257
        if not os.path.isabs(self.initrd_path):
4258
          raise errors.OpPrereqError("The initrd path must be an absolute"
4259
                                    " filename")
4260
    else:
4261
      self.do_initrd_path = False
4262

    
4263
    # boot order verification
4264
    if self.hvm_boot_order is not None:
4265
      if self.hvm_boot_order != constants.VALUE_DEFAULT:
4266
        if len(self.hvm_boot_order.strip("acdn")) != 0:
4267
          raise errors.OpPrereqError("invalid boot order specified,"
4268
                                     " must be one or more of [acdn]"
4269
                                     " or 'default'")
4270

    
4271
    # hvm_cdrom_image_path verification
4272
    if self.op.hvm_cdrom_image_path is not None:
4273
      if not os.path.isabs(self.op.hvm_cdrom_image_path):
4274
        raise errors.OpPrereqError("The path to the HVM CDROM image must"
4275
                                   " be an absolute path or None, not %s" %
4276
                                   self.op.hvm_cdrom_image_path)
4277
      if not os.path.isfile(self.op.hvm_cdrom_image_path):
4278
        raise errors.OpPrereqError("The HVM CDROM image must either be a"
4279
                                   " regular file or a symlink pointing to"
4280
                                   " an existing regular file, not %s" %
4281
                                   self.op.hvm_cdrom_image_path)
4282

    
4283
    # vnc_bind_address verification
4284
    if self.op.vnc_bind_address is not None:
4285
      if not utils.IsValidIP(self.op.vnc_bind_address):
4286
        raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
4287
                                   " like a valid IP address" %
4288
                                   self.op.vnc_bind_address)
4289

    
4290
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4291
    assert self.instance is not None, \
4292
      "Cannot retrieve locked instance %s" % self.op.instance_name
4293
    return
4294

    
4295
  def Exec(self, feedback_fn):
4296
    """Modifies an instance.
4297

4298
    All parameters take effect only at the next restart of the instance.
4299
    """
4300
    result = []
4301
    instance = self.instance
4302
    if self.mem:
4303
      instance.memory = self.mem
4304
      result.append(("mem", self.mem))
4305
    if self.vcpus:
4306
      instance.vcpus = self.vcpus
4307
      result.append(("vcpus",  self.vcpus))
4308
    if self.do_ip:
4309
      instance.nics[0].ip = self.ip
4310
      result.append(("ip", self.ip))
4311
    if self.bridge:
4312
      instance.nics[0].bridge = self.bridge
4313
      result.append(("bridge", self.bridge))
4314
    if self.mac:
4315
      instance.nics[0].mac = self.mac
4316
      result.append(("mac", self.mac))
4317
    if self.do_kernel_path:
4318
      instance.kernel_path = self.kernel_path
4319
      result.append(("kernel_path", self.kernel_path))
4320
    if self.do_initrd_path:
4321
      instance.initrd_path = self.initrd_path
4322
      result.append(("initrd_path", self.initrd_path))
4323
    if self.hvm_boot_order:
4324
      if self.hvm_boot_order == constants.VALUE_DEFAULT:
4325
        instance.hvm_boot_order = None
4326
      else:
4327
        instance.hvm_boot_order = self.hvm_boot_order
4328
      result.append(("hvm_boot_order", self.hvm_boot_order))
4329
    if self.hvm_acpi:
4330
      instance.hvm_acpi = self.hvm_acpi
4331
      result.append(("hvm_acpi", self.hvm_acpi))
4332
    if self.hvm_pae:
4333
      instance.hvm_pae = self.hvm_pae
4334
      result.append(("hvm_pae", self.hvm_pae))
4335
    if self.hvm_cdrom_image_path:
4336
      instance.hvm_cdrom_image_path = self.hvm_cdrom_image_path
4337
      result.append(("hvm_cdrom_image_path", self.hvm_cdrom_image_path))
4338
    if self.vnc_bind_address:
4339
      instance.vnc_bind_address = self.vnc_bind_address
4340
      result.append(("vnc_bind_address", self.vnc_bind_address))
4341

    
4342
    self.cfg.Update(instance)
4343

    
4344
    return result
4345

    
4346

    
4347
class LUQueryExports(NoHooksLU):
4348
  """Query the exports list
4349

4350
  """
4351
  _OP_REQP = []
4352

    
4353
  def CheckPrereq(self):
4354
    """Check that the nodelist contains only existing nodes.
4355

4356
    """
4357
    self.nodes = _GetWantedNodes(self, getattr(self.op, "nodes", None))
4358

    
4359
  def Exec(self, feedback_fn):
4360
    """Compute the list of all the exported system images.
4361

4362
    Returns:
4363
      a dictionary with the structure node->(export-list)
4364
      where export-list is a list of the instances exported on
4365
      that node.
4366

4367
    """
4368
    return rpc.call_export_list(self.nodes)
4369

    
4370

    
4371
class LUExportInstance(LogicalUnit):
4372
  """Export an instance to an image in the cluster.
4373

4374
  """
4375
  HPATH = "instance-export"
4376
  HTYPE = constants.HTYPE_INSTANCE
4377
  _OP_REQP = ["instance_name", "target_node", "shutdown"]
4378

    
4379
  def BuildHooksEnv(self):
4380
    """Build hooks env.
4381

4382
    This will run on the master, primary node and target node.
4383

4384
    """
4385
    env = {
4386
      "EXPORT_NODE": self.op.target_node,
4387
      "EXPORT_DO_SHUTDOWN": self.op.shutdown,
4388
      }
4389
    env.update(_BuildInstanceHookEnvByObject(self.instance))
4390
    nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
4391
          self.op.target_node]
4392
    return env, nl, nl
4393

    
4394
  def CheckPrereq(self):
4395
    """Check prerequisites.
4396

4397
    This checks that the instance and node names are valid.
4398

4399
    """
4400
    instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4401
    self.instance = self.cfg.GetInstanceInfo(instance_name)
4402
    if self.instance is None:
4403
      raise errors.OpPrereqError("Instance '%s' not found" %
4404
                                 self.op.instance_name)
4405

    
4406
    # node verification
4407
    dst_node_short = self.cfg.ExpandNodeName(self.op.target_node)
4408
    self.dst_node = self.cfg.GetNodeInfo(dst_node_short)
4409

    
4410
    if self.dst_node is None:
4411
      raise errors.OpPrereqError("Destination node '%s' is unknown." %
4412
                                 self.op.target_node)
4413
    self.op.target_node = self.dst_node.name
4414

    
4415
    # instance disk type verification
4416
    for disk in self.instance.disks:
4417
      if disk.dev_type == constants.LD_FILE:
4418
        raise errors.OpPrereqError("Export not supported for instances with"
4419
                                   " file-based disks")
4420

    
4421
  def Exec(self, feedback_fn):
4422
    """Export an instance to an image in the cluster.
4423

4424
    """
4425
    instance = self.instance
4426
    dst_node = self.dst_node
4427
    src_node = instance.primary_node
4428
    if self.op.shutdown:
4429
      # shutdown the instance, but not the disks
4430
      if not rpc.call_instance_shutdown(src_node, instance):
4431
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
4432
                                 (instance.name, src_node))
4433

    
4434
    vgname = self.cfg.GetVGName()
4435

    
4436
    snap_disks = []
4437

    
4438
    try:
4439
      for disk in instance.disks:
4440
        if disk.iv_name == "sda":
4441
          # new_dev_name will be a snapshot of an lvm leaf of the one we passed
4442
          new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
4443

    
4444
          if not new_dev_name:
4445
            logger.Error("could not snapshot block device %s on node %s" %
4446
                         (disk.logical_id[1], src_node))
4447
          else:
4448
            new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
4449
                                      logical_id=(vgname, new_dev_name),
4450
                                      physical_id=(vgname, new_dev_name),
4451
                                      iv_name=disk.iv_name)
4452
            snap_disks.append(new_dev)
4453

    
4454
    finally:
4455
      if self.op.shutdown and instance.status == "up":
4456
        if not rpc.call_instance_start(src_node, instance, None):
4457
          _ShutdownInstanceDisks(instance, self.cfg)
4458
          raise errors.OpExecError("Could not start instance")
4459

    
4460
    # TODO: check for size
4461

    
4462
    for dev in snap_disks:
4463
      if not rpc.call_snapshot_export(src_node, dev, dst_node.name, instance):
4464
        logger.Error("could not export block device %s from node %s to node %s"
4465
                     % (dev.logical_id[1], src_node, dst_node.name))
4466
      if not rpc.call_blockdev_remove(src_node, dev):
4467
        logger.Error("could not remove snapshot block device %s from node %s" %
4468
                     (dev.logical_id[1], src_node))
4469

    
4470
    if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
4471
      logger.Error("could not finalize export for instance %s on node %s" %
4472
                   (instance.name, dst_node.name))
4473

    
4474
    nodelist = self.cfg.GetNodeList()
4475
    nodelist.remove(dst_node.name)
4476

    
4477
    # on one-node clusters nodelist will be empty after the removal
4478
    # if we proceed the backup would be removed because OpQueryExports
4479
    # substitutes an empty list with the full cluster node list.
4480
    if nodelist:
4481
      op = opcodes.OpQueryExports(nodes=nodelist)
4482
      exportlist = self.proc.ChainOpCode(op)
4483
      for node in exportlist:
4484
        if instance.name in exportlist[node]:
4485
          if not rpc.call_export_remove(node, instance.name):
4486
            logger.Error("could not remove older export for instance %s"
4487
                         " on node %s" % (instance.name, node))
4488

    
4489

    
4490
class LURemoveExport(NoHooksLU):
4491
  """Remove exports related to the named instance.
4492

4493
  """
4494
  _OP_REQP = ["instance_name"]
4495

    
4496
  def CheckPrereq(self):
4497
    """Check prerequisites.
4498
    """
4499
    pass
4500

    
4501
  def Exec(self, feedback_fn):
4502
    """Remove any export.
4503

4504
    """
4505
    instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4506
    # If the instance was not found we'll try with the name that was passed in.
4507
    # This will only work if it was an FQDN, though.
4508
    fqdn_warn = False
4509
    if not instance_name:
4510
      fqdn_warn = True
4511
      instance_name = self.op.instance_name
4512

    
4513
    op = opcodes.OpQueryExports(nodes=[])
4514
    exportlist = self.proc.ChainOpCode(op)
4515
    found = False
4516
    for node in exportlist:
4517
      if instance_name in exportlist[node]:
4518
        found = True
4519
        if not rpc.call_export_remove(node, instance_name):
4520
          logger.Error("could not remove export for instance %s"
4521
                       " on node %s" % (instance_name, node))
4522

    
4523
    if fqdn_warn and not found:
4524
      feedback_fn("Export not found. If trying to remove an export belonging"
4525
                  " to a deleted instance please use its Fully Qualified"
4526
                  " Domain Name.")
4527

    
4528

    
4529
class TagsLU(NoHooksLU):
4530
  """Generic tags LU.
4531

4532
  This is an abstract class which is the parent of all the other tags LUs.
4533

4534
  """
4535
  def CheckPrereq(self):
4536
    """Check prerequisites.
4537

4538
    """
4539
    if self.op.kind == constants.TAG_CLUSTER:
4540
      self.target = self.cfg.GetClusterInfo()
4541
    elif self.op.kind == constants.TAG_NODE:
4542
      name = self.cfg.ExpandNodeName(self.op.name)
4543
      if name is None:
4544
        raise errors.OpPrereqError("Invalid node name (%s)" %
4545
                                   (self.op.name,))
4546
      self.op.name = name
4547
      self.target = self.cfg.GetNodeInfo(name)
4548
    elif self.op.kind == constants.TAG_INSTANCE:
4549
      name = self.cfg.ExpandInstanceName(self.op.name)
4550
      if name is None:
4551
        raise errors.OpPrereqError("Invalid instance name (%s)" %
4552
                                   (self.op.name,))
4553
      self.op.name = name
4554
      self.target = self.cfg.GetInstanceInfo(name)
4555
    else:
4556
      raise errors.OpPrereqError("Wrong tag type requested (%s)" %
4557
                                 str(self.op.kind))
4558

    
4559

    
4560
class LUGetTags(TagsLU):
4561
  """Returns the tags of a given object.
4562

4563
  """
4564
  _OP_REQP = ["kind", "name"]
4565

    
4566
  def Exec(self, feedback_fn):
4567
    """Returns the tag list.
4568

4569
    """
4570
    return list(self.target.GetTags())
4571

    
4572

    
4573
class LUSearchTags(NoHooksLU):
4574
  """Searches the tags for a given pattern.
4575

4576
  """
4577
  _OP_REQP = ["pattern"]
4578

    
4579
  def CheckPrereq(self):
4580
    """Check prerequisites.
4581

4582
    This checks the pattern passed for validity by compiling it.
4583

4584
    """
4585
    try:
4586
      self.re = re.compile(self.op.pattern)
4587
    except re.error, err:
4588
      raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
4589
                                 (self.op.pattern, err))
4590

    
4591
  def Exec(self, feedback_fn):
4592
    """Returns the tag list.
4593

4594
    """
4595
    cfg = self.cfg
4596
    tgts = [("/cluster", cfg.GetClusterInfo())]
4597
    ilist = [cfg.GetInstanceInfo(name) for name in cfg.GetInstanceList()]
4598
    tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
4599
    nlist = [cfg.GetNodeInfo(name) for name in cfg.GetNodeList()]
4600
    tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
4601
    results = []
4602
    for path, target in tgts:
4603
      for tag in target.GetTags():
4604
        if self.re.search(tag):
4605
          results.append((path, tag))
4606
    return results
4607

    
4608

    
4609
class LUAddTags(TagsLU):
4610
  """Sets a tag on a given object.
4611

4612
  """
4613
  _OP_REQP = ["kind", "name", "tags"]
4614

    
4615
  def CheckPrereq(self):
4616
    """Check prerequisites.
4617

4618
    This checks the type and length of the tag name and value.
4619

4620
    """
4621
    TagsLU.CheckPrereq(self)
4622
    for tag in self.op.tags:
4623
      objects.TaggableObject.ValidateTag(tag)
4624

    
4625
  def Exec(self, feedback_fn):
4626
    """Sets the tag.
4627

4628
    """
4629
    try:
4630
      for tag in self.op.tags:
4631
        self.target.AddTag(tag)
4632
    except errors.TagError, err:
4633
      raise errors.OpExecError("Error while setting tag: %s" % str(err))
4634
    try:
4635
      self.cfg.Update(self.target)
4636
    except errors.ConfigurationError:
4637
      raise errors.OpRetryError("There has been a modification to the"
4638
                                " config file and the operation has been"
4639
                                " aborted. Please retry.")
4640

    
4641

    
4642
class LUDelTags(TagsLU):
4643
  """Delete a list of tags from a given object.
4644

4645
  """
4646
  _OP_REQP = ["kind", "name", "tags"]
4647

    
4648
  def CheckPrereq(self):
4649
    """Check prerequisites.
4650

4651
    This checks that we have the given tag.
4652

4653
    """
4654
    TagsLU.CheckPrereq(self)
4655
    for tag in self.op.tags:
4656
      objects.TaggableObject.ValidateTag(tag)
4657
    del_tags = frozenset(self.op.tags)
4658
    cur_tags = self.target.GetTags()
4659
    if not del_tags <= cur_tags:
4660
      diff_tags = del_tags - cur_tags
4661
      diff_names = ["'%s'" % tag for tag in diff_tags]
4662
      diff_names.sort()
4663
      raise errors.OpPrereqError("Tag(s) %s not found" %
4664
                                 (",".join(diff_names)))
4665

    
4666
  def Exec(self, feedback_fn):
4667
    """Remove the tag from the object.
4668

4669
    """
4670
    for tag in self.op.tags:
4671
      self.target.RemoveTag(tag)
4672
    try:
4673
      self.cfg.Update(self.target)
4674
    except errors.ConfigurationError:
4675
      raise errors.OpRetryError("There has been a modification to the"
4676
                                " config file and the operation has been"
4677
                                " aborted. Please retry.")
4678

    
4679

    
4680
class LUTestDelay(NoHooksLU):
4681
  """Sleep for a specified amount of time.
4682

4683
  This LU sleeps on the master and/or nodes for a specified amount of
4684
  time.
4685

4686
  """
4687
  _OP_REQP = ["duration", "on_master", "on_nodes"]
4688
  REQ_BGL = False
4689

    
4690
  def ExpandNames(self):
4691
    """Expand names and set required locks.
4692

4693
    This expands the node list, if any.
4694

4695
    """
4696
    self.needed_locks = {}
4697
    if self.op.on_nodes:
4698
      # _GetWantedNodes can be used here, but is not always appropriate to use
4699
      # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
4700
      # more information.
4701
      self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
4702
      self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
4703

    
4704
  def CheckPrereq(self):
4705
    """Check prerequisites.
4706

4707
    """
4708

    
4709
  def Exec(self, feedback_fn):
4710
    """Do the actual sleep.
4711

4712
    """
4713
    if self.op.on_master:
4714
      if not utils.TestDelay(self.op.duration):
4715
        raise errors.OpExecError("Error during master delay test")
4716
    if self.op.on_nodes:
4717
      result = rpc.call_test_delay(self.op.on_nodes, self.op.duration)
4718
      if not result:
4719
        raise errors.OpExecError("Complete failure from rpc call")
4720
      for node, node_result in result.items():
4721
        if not node_result:
4722
          raise errors.OpExecError("Failure during rpc call to node %s,"
4723
                                   " result: %s" % (node, node_result))
4724

    
4725

    
4726
class IAllocator(object):
4727
  """IAllocator framework.
4728

4729
  An IAllocator instance has three sets of attributes:
4730
    - cfg/sstore that are needed to query the cluster
4731
    - input data (all members of the _KEYS class attribute are required)
4732
    - four buffer attributes (in|out_data|text), that represent the
4733
      input (to the external script) in text and data structure format,
4734
      and the output from it, again in two formats
4735
    - the result variables from the script (success, info, nodes) for
4736
      easy usage
4737

4738
  """
4739
  _ALLO_KEYS = [
4740
    "mem_size", "disks", "disk_template",
4741
    "os", "tags", "nics", "vcpus",
4742
    ]
4743
  _RELO_KEYS = [
4744
    "relocate_from",
4745
    ]
4746

    
4747
  def __init__(self, cfg, sstore, mode, name, **kwargs):
4748
    self.cfg = cfg
4749
    self.sstore = sstore
4750
    # init buffer variables
4751
    self.in_text = self.out_text = self.in_data = self.out_data = None
4752
    # init all input fields so that pylint is happy
4753
    self.mode = mode
4754
    self.name = name
4755
    self.mem_size = self.disks = self.disk_template = None
4756
    self.os = self.tags = self.nics = self.vcpus = None
4757
    self.relocate_from = None
4758
    # computed fields
4759
    self.required_nodes = None
4760
    # init result fields
4761
    self.success = self.info = self.nodes = None
4762
    if self.mode == constants.IALLOCATOR_MODE_ALLOC:
4763
      keyset = self._ALLO_KEYS
4764
    elif self.mode == constants.IALLOCATOR_MODE_RELOC:
4765
      keyset = self._RELO_KEYS
4766
    else:
4767
      raise errors.ProgrammerError("Unknown mode '%s' passed to the"
4768
                                   " IAllocator" % self.mode)
4769
    for key in kwargs:
4770
      if key not in keyset:
4771
        raise errors.ProgrammerError("Invalid input parameter '%s' to"
4772
                                     " IAllocator" % key)
4773
      setattr(self, key, kwargs[key])
4774
    for key in keyset:
4775
      if key not in kwargs:
4776
        raise errors.ProgrammerError("Missing input parameter '%s' to"
4777
                                     " IAllocator" % key)
4778
    self._BuildInputData()
4779

    
4780
  def _ComputeClusterData(self):
4781
    """Compute the generic allocator input data.
4782

4783
    This is the data that is independent of the actual operation.
4784

4785
    """
4786
    cfg = self.cfg
4787
    # cluster data
4788
    data = {
4789
      "version": 1,
4790
      "cluster_name": self.sstore.GetClusterName(),
4791
      "cluster_tags": list(cfg.GetClusterInfo().GetTags()),
4792
      "hypervisor_type": self.sstore.GetHypervisorType(),
4793
      # we don't have job IDs
4794
      }
4795

    
4796
    i_list = [cfg.GetInstanceInfo(iname) for iname in cfg.GetInstanceList()]
4797

    
4798
    # node data
4799
    node_results = {}
4800
    node_list = cfg.GetNodeList()
4801
    node_data = rpc.call_node_info(node_list, cfg.GetVGName())
4802
    for nname in node_list:
4803
      ninfo = cfg.GetNodeInfo(nname)
4804
      if nname not in node_data or not isinstance(node_data[nname], dict):
4805
        raise errors.OpExecError("Can't get data for node %s" % nname)
4806
      remote_info = node_data[nname]
4807
      for attr in ['memory_total', 'memory_free', 'memory_dom0',
4808
                   'vg_size', 'vg_free', 'cpu_total']:
4809
        if attr not in remote_info:
4810
          raise errors.OpExecError("Node '%s' didn't return attribute '%s'" %
4811
                                   (nname, attr))
4812
        try:
4813
          remote_info[attr] = int(remote_info[attr])
4814
        except ValueError, err:
4815
          raise errors.OpExecError("Node '%s' returned invalid value for '%s':"
4816
                                   " %s" % (nname, attr, str(err)))
4817
      # compute memory used by primary instances
4818
      i_p_mem = i_p_up_mem = 0
4819
      for iinfo in i_list:
4820
        if iinfo.primary_node == nname:
4821
          i_p_mem += iinfo.memory
4822
          if iinfo.status == "up":
4823
            i_p_up_mem += iinfo.memory
4824

    
4825
      # compute memory used by instances
4826
      pnr = {
4827
        "tags": list(ninfo.GetTags()),
4828
        "total_memory": remote_info['memory_total'],
4829
        "reserved_memory": remote_info['memory_dom0'],
4830
        "free_memory": remote_info['memory_free'],
4831
        "i_pri_memory": i_p_mem,
4832
        "i_pri_up_memory": i_p_up_mem,
4833
        "total_disk": remote_info['vg_size'],
4834
        "free_disk": remote_info['vg_free'],
4835
        "primary_ip": ninfo.primary_ip,
4836
        "secondary_ip": ninfo.secondary_ip,
4837
        "total_cpus": remote_info['cpu_total'],
4838
        }
4839
      node_results[nname] = pnr
4840
    data["nodes"] = node_results
4841

    
4842
    # instance data
4843
    instance_data = {}
4844
    for iinfo in i_list:
4845
      nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
4846
                  for n in iinfo.nics]
4847
      pir = {
4848
        "tags": list(iinfo.GetTags()),
4849
        "should_run": iinfo.status == "up",
4850
        "vcpus": iinfo.vcpus,
4851
        "memory": iinfo.memory,
4852
        "os": iinfo.os,
4853
        "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
4854
        "nics": nic_data,
4855
        "disks": [{"size": dsk.size, "mode": "w"} for dsk in iinfo.disks],
4856
        "disk_template": iinfo.disk_template,
4857
        }
4858
      instance_data[iinfo.name] = pir
4859

    
4860
    data["instances"] = instance_data
4861

    
4862
    self.in_data = data
4863

    
4864
  def _AddNewInstance(self):
4865
    """Add new instance data to allocator structure.
4866

4867
    This in combination with _AllocatorGetClusterData will create the
4868
    correct structure needed as input for the allocator.
4869

4870
    The checks for the completeness of the opcode must have already been
4871
    done.
4872

4873
    """
4874
    data = self.in_data
4875
    if len(self.disks) != 2:
4876
      raise errors.OpExecError("Only two-disk configurations supported")
4877

    
4878
    disk_space = _ComputeDiskSize(self.disk_template,
4879
                                  self.disks[0]["size"], self.disks[1]["size"])
4880

    
4881
    if self.disk_template in constants.DTS_NET_MIRROR:
4882
      self.required_nodes = 2
4883
    else:
4884
      self.required_nodes = 1
4885
    request = {
4886
      "type": "allocate",
4887
      "name": self.name,
4888
      "disk_template": self.disk_template,
4889
      "tags": self.tags,
4890
      "os": self.os,
4891
      "vcpus": self.vcpus,
4892
      "memory": self.mem_size,
4893
      "disks": self.disks,
4894
      "disk_space_total": disk_space,
4895
      "nics": self.nics,
4896
      "required_nodes": self.required_nodes,
4897
      }
4898
    data["request"] = request
4899

    
4900
  def _AddRelocateInstance(self):
4901
    """Add relocate instance data to allocator structure.
4902

4903
    This in combination with _IAllocatorGetClusterData will create the
4904
    correct structure needed as input for the allocator.
4905

4906
    The checks for the completeness of the opcode must have already been
4907
    done.
4908

4909
    """
4910
    instance = self.cfg.GetInstanceInfo(self.name)
4911
    if instance is None:
4912
      raise errors.ProgrammerError("Unknown instance '%s' passed to"
4913
                                   " IAllocator" % self.name)
4914

    
4915
    if instance.disk_template not in constants.DTS_NET_MIRROR:
4916
      raise errors.OpPrereqError("Can't relocate non-mirrored instances")
4917

    
4918
    if len(instance.secondary_nodes) != 1:
4919
      raise errors.OpPrereqError("Instance has not exactly one secondary node")
4920

    
4921
    self.required_nodes = 1
4922

    
4923
    disk_space = _ComputeDiskSize(instance.disk_template,
4924
                                  instance.disks[0].size,
4925
                                  instance.disks[1].size)
4926

    
4927
    request = {
4928
      "type": "relocate",
4929
      "name": self.name,
4930
      "disk_space_total": disk_space,
4931
      "required_nodes": self.required_nodes,
4932
      "relocate_from": self.relocate_from,
4933
      }
4934
    self.in_data["request"] = request
4935

    
4936
  def _BuildInputData(self):
4937
    """Build input data structures.
4938

4939
    """
4940
    self._ComputeClusterData()
4941

    
4942
    if self.mode == constants.IALLOCATOR_MODE_ALLOC:
4943
      self._AddNewInstance()
4944
    else:
4945
      self._AddRelocateInstance()
4946

    
4947
    self.in_text = serializer.Dump(self.in_data)
4948

    
4949
  def Run(self, name, validate=True, call_fn=rpc.call_iallocator_runner):
4950
    """Run an instance allocator and return the results.
4951

4952
    """
4953
    data = self.in_text
4954

    
4955
    result = call_fn(self.sstore.GetMasterNode(), name, self.in_text)
4956

    
4957
    if not isinstance(result, tuple) or len(result) != 4:
4958
      raise errors.OpExecError("Invalid result from master iallocator runner")
4959

    
4960
    rcode, stdout, stderr, fail = result
4961

    
4962
    if rcode == constants.IARUN_NOTFOUND:
4963
      raise errors.OpExecError("Can't find allocator '%s'" % name)
4964
    elif rcode == constants.IARUN_FAILURE:
4965
      raise errors.OpExecError("Instance allocator call failed: %s,"
4966
                               " output: %s" % (fail, stdout+stderr))
4967
    self.out_text = stdout
4968
    if validate:
4969
      self._ValidateResult()
4970

    
4971
  def _ValidateResult(self):
4972
    """Process the allocator results.
4973

4974
    This will process and if successful save the result in
4975
    self.out_data and the other parameters.
4976

4977
    """
4978
    try:
4979
      rdict = serializer.Load(self.out_text)
4980
    except Exception, err:
4981
      raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
4982

    
4983
    if not isinstance(rdict, dict):
4984
      raise errors.OpExecError("Can't parse iallocator results: not a dict")
4985

    
4986
    for key in "success", "info", "nodes":
4987
      if key not in rdict:
4988
        raise errors.OpExecError("Can't parse iallocator results:"
4989
                                 " missing key '%s'" % key)
4990
      setattr(self, key, rdict[key])
4991

    
4992
    if not isinstance(rdict["nodes"], list):
4993
      raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
4994
                               " is not a list")
4995
    self.out_data = rdict
4996

    
4997

    
4998
class LUTestAllocator(NoHooksLU):
4999
  """Run allocator tests.
5000

5001
  This LU runs the allocator tests
5002

5003
  """
5004
  _OP_REQP = ["direction", "mode", "name"]
5005

    
5006
  def CheckPrereq(self):
5007
    """Check prerequisites.
5008

5009
    This checks the opcode parameters depending on the director and mode test.
5010

5011
    """
5012
    if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5013
      for attr in ["name", "mem_size", "disks", "disk_template",
5014
                   "os", "tags", "nics", "vcpus"]:
5015
        if not hasattr(self.op, attr):
5016
          raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
5017
                                     attr)
5018
      iname = self.cfg.ExpandInstanceName(self.op.name)
5019
      if iname is not None:
5020
        raise errors.OpPrereqError("Instance '%s' already in the cluster" %
5021
                                   iname)
5022
      if not isinstance(self.op.nics, list):
5023
        raise errors.OpPrereqError("Invalid parameter 'nics'")
5024
      for row in self.op.nics:
5025
        if (not isinstance(row, dict) or
5026
            "mac" not in row or
5027
            "ip" not in row or
5028
            "bridge" not in row):
5029
          raise errors.OpPrereqError("Invalid contents of the"
5030
                                     " 'nics' parameter")
5031
      if not isinstance(self.op.disks, list):
5032
        raise errors.OpPrereqError("Invalid parameter 'disks'")
5033
      if len(self.op.disks) != 2:
5034
        raise errors.OpPrereqError("Only two-disk configurations supported")
5035
      for row in self.op.disks:
5036
        if (not isinstance(row, dict) or
5037
            "size" not in row or
5038
            not isinstance(row["size"], int) or
5039
            "mode" not in row or
5040
            row["mode"] not in ['r', 'w']):
5041
          raise errors.OpPrereqError("Invalid contents of the"
5042
                                     " 'disks' parameter")
5043
    elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
5044
      if not hasattr(self.op, "name"):
5045
        raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
5046
      fname = self.cfg.ExpandInstanceName(self.op.name)
5047
      if fname is None:
5048
        raise errors.OpPrereqError("Instance '%s' not found for relocation" %
5049
                                   self.op.name)
5050
      self.op.name = fname
5051
      self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
5052
    else:
5053
      raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
5054
                                 self.op.mode)
5055

    
5056
    if self.op.direction == constants.IALLOCATOR_DIR_OUT:
5057
      if not hasattr(self.op, "allocator") or self.op.allocator is None:
5058
        raise errors.OpPrereqError("Missing allocator name")
5059
    elif self.op.direction != constants.IALLOCATOR_DIR_IN:
5060
      raise errors.OpPrereqError("Wrong allocator test '%s'" %
5061
                                 self.op.direction)
5062

    
5063
  def Exec(self, feedback_fn):
5064
    """Run the allocator test.
5065

5066
    """
5067
    if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5068
      ial = IAllocator(self.cfg, self.sstore,
5069
                       mode=self.op.mode,
5070
                       name=self.op.name,
5071
                       mem_size=self.op.mem_size,
5072
                       disks=self.op.disks,
5073
                       disk_template=self.op.disk_template,
5074
                       os=self.op.os,
5075
                       tags=self.op.tags,
5076
                       nics=self.op.nics,
5077
                       vcpus=self.op.vcpus,
5078
                       )
5079
    else:
5080
      ial = IAllocator(self.cfg, self.sstore,
5081
                       mode=self.op.mode,
5082
                       name=self.op.name,
5083
                       relocate_from=list(self.relocate_from),
5084
                       )
5085

    
5086
    if self.op.direction == constants.IALLOCATOR_DIR_IN:
5087
      result = ial.in_text
5088
    else:
5089
      ial.Run(self.op.allocator, validate=False)
5090
      result = ial.out_text
5091
    return result