Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ 57a2fb91

History | View | Annotate | Download (183.3 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0613,W0201
25

    
26
import os
27
import os.path
28
import sha
29
import time
30
import tempfile
31
import re
32
import platform
33

    
34
from ganeti import rpc
35
from ganeti import ssh
36
from ganeti import logger
37
from ganeti import utils
38
from ganeti import errors
39
from ganeti import hypervisor
40
from ganeti import locking
41
from ganeti import constants
42
from ganeti import objects
43
from ganeti import opcodes
44
from ganeti import serializer
45

    
46

    
47
class LogicalUnit(object):
48
  """Logical Unit base class.
49

50
  Subclasses must follow these rules:
51
    - implement ExpandNames
52
    - implement CheckPrereq
53
    - implement Exec
54
    - implement BuildHooksEnv
55
    - redefine HPATH and HTYPE
56
    - optionally redefine their run requirements:
57
        REQ_MASTER: the LU needs to run on the master node
58
        REQ_WSSTORE: the LU needs a writable SimpleStore
59
        REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
60

61
  Note that all commands require root permissions.
62

63
  """
64
  HPATH = None
65
  HTYPE = None
66
  _OP_REQP = []
67
  REQ_MASTER = True
68
  REQ_WSSTORE = False
69
  REQ_BGL = True
70

    
71
  def __init__(self, processor, op, context, sstore):
72
    """Constructor for LogicalUnit.
73

74
    This needs to be overriden in derived classes in order to check op
75
    validity.
76

77
    """
78
    self.proc = processor
79
    self.op = op
80
    self.cfg = context.cfg
81
    self.sstore = sstore
82
    self.context = context
83
    self.needed_locks = None
84
    self.acquired_locks = {}
85
    self.share_locks = dict(((i, 0) for i in locking.LEVELS))
86
    # Used to force good behavior when calling helper functions
87
    self.recalculate_locks = {}
88
    self.__ssh = None
89

    
90
    for attr_name in self._OP_REQP:
91
      attr_val = getattr(op, attr_name, None)
92
      if attr_val is None:
93
        raise errors.OpPrereqError("Required parameter '%s' missing" %
94
                                   attr_name)
95

    
96
    if not self.cfg.IsCluster():
97
      raise errors.OpPrereqError("Cluster not initialized yet,"
98
                                 " use 'gnt-cluster init' first.")
99
    if self.REQ_MASTER:
100
      master = sstore.GetMasterNode()
101
      if master != utils.HostInfo().name:
102
        raise errors.OpPrereqError("Commands must be run on the master"
103
                                   " node %s" % master)
104

    
105
  def __GetSSH(self):
106
    """Returns the SshRunner object
107

108
    """
109
    if not self.__ssh:
110
      self.__ssh = ssh.SshRunner(self.sstore)
111
    return self.__ssh
112

    
113
  ssh = property(fget=__GetSSH)
114

    
115
  def ExpandNames(self):
116
    """Expand names for this LU.
117

118
    This method is called before starting to execute the opcode, and it should
119
    update all the parameters of the opcode to their canonical form (e.g. a
120
    short node name must be fully expanded after this method has successfully
121
    completed). This way locking, hooks, logging, ecc. can work correctly.
122

123
    LUs which implement this method must also populate the self.needed_locks
124
    member, as a dict with lock levels as keys, and a list of needed lock names
125
    as values. Rules:
126
      - Use an empty dict if you don't need any lock
127
      - If you don't need any lock at a particular level omit that level
128
      - Don't put anything for the BGL level
129
      - If you want all locks at a level use locking.ALL_SET as a value
130

131
    If you need to share locks (rather than acquire them exclusively) at one
132
    level you can modify self.share_locks, setting a true value (usually 1) for
133
    that level. By default locks are not shared.
134

135
    Examples:
136
    # Acquire all nodes and one instance
137
    self.needed_locks = {
138
      locking.LEVEL_NODE: locking.ALL_SET,
139
      locking.LEVEL_INSTANCE: ['instance1.example.tld'],
140
    }
141
    # Acquire just two nodes
142
    self.needed_locks = {
143
      locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
144
    }
145
    # Acquire no locks
146
    self.needed_locks = {} # No, you can't leave it to the default value None
147

148
    """
149
    # The implementation of this method is mandatory only if the new LU is
150
    # concurrent, so that old LUs don't need to be changed all at the same
151
    # time.
152
    if self.REQ_BGL:
153
      self.needed_locks = {} # Exclusive LUs don't need locks.
154
    else:
155
      raise NotImplementedError
156

    
157
  def DeclareLocks(self, level):
158
    """Declare LU locking needs for a level
159

160
    While most LUs can just declare their locking needs at ExpandNames time,
161
    sometimes there's the need to calculate some locks after having acquired
162
    the ones before. This function is called just before acquiring locks at a
163
    particular level, but after acquiring the ones at lower levels, and permits
164
    such calculations. It can be used to modify self.needed_locks, and by
165
    default it does nothing.
166

167
    This function is only called if you have something already set in
168
    self.needed_locks for the level.
169

170
    @param level: Locking level which is going to be locked
171
    @type level: member of ganeti.locking.LEVELS
172

173
    """
174

    
175
  def CheckPrereq(self):
176
    """Check prerequisites for this LU.
177

178
    This method should check that the prerequisites for the execution
179
    of this LU are fulfilled. It can do internode communication, but
180
    it should be idempotent - no cluster or system changes are
181
    allowed.
182

183
    The method should raise errors.OpPrereqError in case something is
184
    not fulfilled. Its return value is ignored.
185

186
    This method should also update all the parameters of the opcode to
187
    their canonical form if it hasn't been done by ExpandNames before.
188

189
    """
190
    raise NotImplementedError
191

    
192
  def Exec(self, feedback_fn):
193
    """Execute the LU.
194

195
    This method should implement the actual work. It should raise
196
    errors.OpExecError for failures that are somewhat dealt with in
197
    code, or expected.
198

199
    """
200
    raise NotImplementedError
201

    
202
  def BuildHooksEnv(self):
203
    """Build hooks environment for this LU.
204

205
    This method should return a three-node tuple consisting of: a dict
206
    containing the environment that will be used for running the
207
    specific hook for this LU, a list of node names on which the hook
208
    should run before the execution, and a list of node names on which
209
    the hook should run after the execution.
210

211
    The keys of the dict must not have 'GANETI_' prefixed as this will
212
    be handled in the hooks runner. Also note additional keys will be
213
    added by the hooks runner. If the LU doesn't define any
214
    environment, an empty dict (and not None) should be returned.
215

216
    No nodes should be returned as an empty list (and not None).
217

218
    Note that if the HPATH for a LU class is None, this function will
219
    not be called.
220

221
    """
222
    raise NotImplementedError
223

    
224
  def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
225
    """Notify the LU about the results of its hooks.
226

227
    This method is called every time a hooks phase is executed, and notifies
228
    the Logical Unit about the hooks' result. The LU can then use it to alter
229
    its result based on the hooks.  By default the method does nothing and the
230
    previous result is passed back unchanged but any LU can define it if it
231
    wants to use the local cluster hook-scripts somehow.
232

233
    Args:
234
      phase: the hooks phase that has just been run
235
      hooks_results: the results of the multi-node hooks rpc call
236
      feedback_fn: function to send feedback back to the caller
237
      lu_result: the previous result this LU had, or None in the PRE phase.
238

239
    """
240
    return lu_result
241

    
242
  def _ExpandAndLockInstance(self):
243
    """Helper function to expand and lock an instance.
244

245
    Many LUs that work on an instance take its name in self.op.instance_name
246
    and need to expand it and then declare the expanded name for locking. This
247
    function does it, and then updates self.op.instance_name to the expanded
248
    name. It also initializes needed_locks as a dict, if this hasn't been done
249
    before.
250

251
    """
252
    if self.needed_locks is None:
253
      self.needed_locks = {}
254
    else:
255
      assert locking.LEVEL_INSTANCE not in self.needed_locks, \
256
        "_ExpandAndLockInstance called with instance-level locks set"
257
    expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
258
    if expanded_name is None:
259
      raise errors.OpPrereqError("Instance '%s' not known" %
260
                                  self.op.instance_name)
261
    self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
262
    self.op.instance_name = expanded_name
263

    
264
  def _LockInstancesNodes(self, primary_only=False):
265
    """Helper function to declare instances' nodes for locking.
266

267
    This function should be called after locking one or more instances to lock
268
    their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
269
    with all primary or secondary nodes for instances already locked and
270
    present in self.needed_locks[locking.LEVEL_INSTANCE].
271

272
    It should be called from DeclareLocks, and for safety only works if
273
    self.recalculate_locks[locking.LEVEL_NODE] is set.
274

275
    In the future it may grow parameters to just lock some instance's nodes, or
276
    to just lock primaries or secondary nodes, if needed.
277

278
    If should be called in DeclareLocks in a way similar to:
279

280
    if level == locking.LEVEL_NODE:
281
      self._LockInstancesNodes()
282

283
    @type primary_only: boolean
284
    @param primary_only: only lock primary nodes of locked instances
285

286
    """
287
    assert locking.LEVEL_NODE in self.recalculate_locks, \
288
      "_LockInstancesNodes helper function called with no nodes to recalculate"
289

    
290
    # TODO: check if we're really been called with the instance locks held
291

    
292
    # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
293
    # future we might want to have different behaviors depending on the value
294
    # of self.recalculate_locks[locking.LEVEL_NODE]
295
    wanted_nodes = []
296
    for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
297
      instance = self.context.cfg.GetInstanceInfo(instance_name)
298
      wanted_nodes.append(instance.primary_node)
299
      if not primary_only:
300
        wanted_nodes.extend(instance.secondary_nodes)
301

    
302
    if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
303
      self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
304
    elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
305
      self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
306

    
307
    del self.recalculate_locks[locking.LEVEL_NODE]
308

    
309

    
310
class NoHooksLU(LogicalUnit):
311
  """Simple LU which runs no hooks.
312

313
  This LU is intended as a parent for other LogicalUnits which will
314
  run no hooks, in order to reduce duplicate code.
315

316
  """
317
  HPATH = None
318
  HTYPE = None
319

    
320

    
321
def _GetWantedNodes(lu, nodes):
322
  """Returns list of checked and expanded node names.
323

324
  Args:
325
    nodes: List of nodes (strings) or None for all
326

327
  """
328
  if not isinstance(nodes, list):
329
    raise errors.OpPrereqError("Invalid argument type 'nodes'")
330

    
331
  if not nodes:
332
    raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
333
      " non-empty list of nodes whose name is to be expanded.")
334

    
335
  wanted = []
336
  for name in nodes:
337
    node = lu.cfg.ExpandNodeName(name)
338
    if node is None:
339
      raise errors.OpPrereqError("No such node name '%s'" % name)
340
    wanted.append(node)
341

    
342
  return utils.NiceSort(wanted)
343

    
344

    
345
def _GetWantedInstances(lu, instances):
346
  """Returns list of checked and expanded instance names.
347

348
  Args:
349
    instances: List of instances (strings) or None for all
350

351
  """
352
  if not isinstance(instances, list):
353
    raise errors.OpPrereqError("Invalid argument type 'instances'")
354

    
355
  if instances:
356
    wanted = []
357

    
358
    for name in instances:
359
      instance = lu.cfg.ExpandInstanceName(name)
360
      if instance is None:
361
        raise errors.OpPrereqError("No such instance name '%s'" % name)
362
      wanted.append(instance)
363

    
364
  else:
365
    wanted = lu.cfg.GetInstanceList()
366
  return utils.NiceSort(wanted)
367

    
368

    
369
def _CheckOutputFields(static, dynamic, selected):
370
  """Checks whether all selected fields are valid.
371

372
  Args:
373
    static: Static fields
374
    dynamic: Dynamic fields
375

376
  """
377
  static_fields = frozenset(static)
378
  dynamic_fields = frozenset(dynamic)
379

    
380
  all_fields = static_fields | dynamic_fields
381

    
382
  if not all_fields.issuperset(selected):
383
    raise errors.OpPrereqError("Unknown output fields selected: %s"
384
                               % ",".join(frozenset(selected).
385
                                          difference(all_fields)))
386

    
387

    
388
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
389
                          memory, vcpus, nics):
390
  """Builds instance related env variables for hooks from single variables.
391

392
  Args:
393
    secondary_nodes: List of secondary nodes as strings
394
  """
395
  env = {
396
    "OP_TARGET": name,
397
    "INSTANCE_NAME": name,
398
    "INSTANCE_PRIMARY": primary_node,
399
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
400
    "INSTANCE_OS_TYPE": os_type,
401
    "INSTANCE_STATUS": status,
402
    "INSTANCE_MEMORY": memory,
403
    "INSTANCE_VCPUS": vcpus,
404
  }
405

    
406
  if nics:
407
    nic_count = len(nics)
408
    for idx, (ip, bridge, mac) in enumerate(nics):
409
      if ip is None:
410
        ip = ""
411
      env["INSTANCE_NIC%d_IP" % idx] = ip
412
      env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
413
      env["INSTANCE_NIC%d_HWADDR" % idx] = mac
414
  else:
415
    nic_count = 0
416

    
417
  env["INSTANCE_NIC_COUNT"] = nic_count
418

    
419
  return env
420

    
421

    
422
def _BuildInstanceHookEnvByObject(instance, override=None):
423
  """Builds instance related env variables for hooks from an object.
424

425
  Args:
426
    instance: objects.Instance object of instance
427
    override: dict of values to override
428
  """
429
  args = {
430
    'name': instance.name,
431
    'primary_node': instance.primary_node,
432
    'secondary_nodes': instance.secondary_nodes,
433
    'os_type': instance.os,
434
    'status': instance.os,
435
    'memory': instance.memory,
436
    'vcpus': instance.vcpus,
437
    'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
438
  }
439
  if override:
440
    args.update(override)
441
  return _BuildInstanceHookEnv(**args)
442

    
443

    
444
def _CheckInstanceBridgesExist(instance):
445
  """Check that the brigdes needed by an instance exist.
446

447
  """
448
  # check bridges existance
449
  brlist = [nic.bridge for nic in instance.nics]
450
  if not rpc.call_bridges_exist(instance.primary_node, brlist):
451
    raise errors.OpPrereqError("one or more target bridges %s does not"
452
                               " exist on destination node '%s'" %
453
                               (brlist, instance.primary_node))
454

    
455

    
456
class LUDestroyCluster(NoHooksLU):
457
  """Logical unit for destroying the cluster.
458

459
  """
460
  _OP_REQP = []
461

    
462
  def CheckPrereq(self):
463
    """Check prerequisites.
464

465
    This checks whether the cluster is empty.
466

467
    Any errors are signalled by raising errors.OpPrereqError.
468

469
    """
470
    master = self.sstore.GetMasterNode()
471

    
472
    nodelist = self.cfg.GetNodeList()
473
    if len(nodelist) != 1 or nodelist[0] != master:
474
      raise errors.OpPrereqError("There are still %d node(s) in"
475
                                 " this cluster." % (len(nodelist) - 1))
476
    instancelist = self.cfg.GetInstanceList()
477
    if instancelist:
478
      raise errors.OpPrereqError("There are still %d instance(s) in"
479
                                 " this cluster." % len(instancelist))
480

    
481
  def Exec(self, feedback_fn):
482
    """Destroys the cluster.
483

484
    """
485
    master = self.sstore.GetMasterNode()
486
    if not rpc.call_node_stop_master(master, False):
487
      raise errors.OpExecError("Could not disable the master role")
488
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
489
    utils.CreateBackup(priv_key)
490
    utils.CreateBackup(pub_key)
491
    return master
492

    
493

    
494
class LUVerifyCluster(LogicalUnit):
495
  """Verifies the cluster status.
496

497
  """
498
  HPATH = "cluster-verify"
499
  HTYPE = constants.HTYPE_CLUSTER
500
  _OP_REQP = ["skip_checks"]
501
  REQ_BGL = False
502

    
503
  def ExpandNames(self):
504
    self.needed_locks = {
505
      locking.LEVEL_NODE: locking.ALL_SET,
506
      locking.LEVEL_INSTANCE: locking.ALL_SET,
507
    }
508
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
509

    
510
  def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
511
                  remote_version, feedback_fn):
512
    """Run multiple tests against a node.
513

514
    Test list:
515
      - compares ganeti version
516
      - checks vg existance and size > 20G
517
      - checks config file checksum
518
      - checks ssh to other nodes
519

520
    Args:
521
      node: name of the node to check
522
      file_list: required list of files
523
      local_cksum: dictionary of local files and their checksums
524

525
    """
526
    # compares ganeti version
527
    local_version = constants.PROTOCOL_VERSION
528
    if not remote_version:
529
      feedback_fn("  - ERROR: connection to %s failed" % (node))
530
      return True
531

    
532
    if local_version != remote_version:
533
      feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
534
                      (local_version, node, remote_version))
535
      return True
536

    
537
    # checks vg existance and size > 20G
538

    
539
    bad = False
540
    if not vglist:
541
      feedback_fn("  - ERROR: unable to check volume groups on node %s." %
542
                      (node,))
543
      bad = True
544
    else:
545
      vgstatus = utils.CheckVolumeGroupSize(vglist, self.cfg.GetVGName(),
546
                                            constants.MIN_VG_SIZE)
547
      if vgstatus:
548
        feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
549
        bad = True
550

    
551
    # checks config file checksum
552
    # checks ssh to any
553

    
554
    if 'filelist' not in node_result:
555
      bad = True
556
      feedback_fn("  - ERROR: node hasn't returned file checksum data")
557
    else:
558
      remote_cksum = node_result['filelist']
559
      for file_name in file_list:
560
        if file_name not in remote_cksum:
561
          bad = True
562
          feedback_fn("  - ERROR: file '%s' missing" % file_name)
563
        elif remote_cksum[file_name] != local_cksum[file_name]:
564
          bad = True
565
          feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
566

    
567
    if 'nodelist' not in node_result:
568
      bad = True
569
      feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
570
    else:
571
      if node_result['nodelist']:
572
        bad = True
573
        for node in node_result['nodelist']:
574
          feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
575
                          (node, node_result['nodelist'][node]))
576
    if 'node-net-test' not in node_result:
577
      bad = True
578
      feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
579
    else:
580
      if node_result['node-net-test']:
581
        bad = True
582
        nlist = utils.NiceSort(node_result['node-net-test'].keys())
583
        for node in nlist:
584
          feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
585
                          (node, node_result['node-net-test'][node]))
586

    
587
    hyp_result = node_result.get('hypervisor', None)
588
    if hyp_result is not None:
589
      feedback_fn("  - ERROR: hypervisor verify failure: '%s'" % hyp_result)
590
    return bad
591

    
592
  def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
593
                      node_instance, feedback_fn):
594
    """Verify an instance.
595

596
    This function checks to see if the required block devices are
597
    available on the instance's node.
598

599
    """
600
    bad = False
601

    
602
    node_current = instanceconfig.primary_node
603

    
604
    node_vol_should = {}
605
    instanceconfig.MapLVsByNode(node_vol_should)
606

    
607
    for node in node_vol_should:
608
      for volume in node_vol_should[node]:
609
        if node not in node_vol_is or volume not in node_vol_is[node]:
610
          feedback_fn("  - ERROR: volume %s missing on node %s" %
611
                          (volume, node))
612
          bad = True
613

    
614
    if not instanceconfig.status == 'down':
615
      if (node_current not in node_instance or
616
          not instance in node_instance[node_current]):
617
        feedback_fn("  - ERROR: instance %s not running on node %s" %
618
                        (instance, node_current))
619
        bad = True
620

    
621
    for node in node_instance:
622
      if (not node == node_current):
623
        if instance in node_instance[node]:
624
          feedback_fn("  - ERROR: instance %s should not run on node %s" %
625
                          (instance, node))
626
          bad = True
627

    
628
    return bad
629

    
630
  def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
631
    """Verify if there are any unknown volumes in the cluster.
632

633
    The .os, .swap and backup volumes are ignored. All other volumes are
634
    reported as unknown.
635

636
    """
637
    bad = False
638

    
639
    for node in node_vol_is:
640
      for volume in node_vol_is[node]:
641
        if node not in node_vol_should or volume not in node_vol_should[node]:
642
          feedback_fn("  - ERROR: volume %s on node %s should not exist" %
643
                      (volume, node))
644
          bad = True
645
    return bad
646

    
647
  def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
648
    """Verify the list of running instances.
649

650
    This checks what instances are running but unknown to the cluster.
651

652
    """
653
    bad = False
654
    for node in node_instance:
655
      for runninginstance in node_instance[node]:
656
        if runninginstance not in instancelist:
657
          feedback_fn("  - ERROR: instance %s on node %s should not exist" %
658
                          (runninginstance, node))
659
          bad = True
660
    return bad
661

    
662
  def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
663
    """Verify N+1 Memory Resilience.
664

665
    Check that if one single node dies we can still start all the instances it
666
    was primary for.
667

668
    """
669
    bad = False
670

    
671
    for node, nodeinfo in node_info.iteritems():
672
      # This code checks that every node which is now listed as secondary has
673
      # enough memory to host all instances it is supposed to should a single
674
      # other node in the cluster fail.
675
      # FIXME: not ready for failover to an arbitrary node
676
      # FIXME: does not support file-backed instances
677
      # WARNING: we currently take into account down instances as well as up
678
      # ones, considering that even if they're down someone might want to start
679
      # them even in the event of a node failure.
680
      for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
681
        needed_mem = 0
682
        for instance in instances:
683
          needed_mem += instance_cfg[instance].memory
684
        if nodeinfo['mfree'] < needed_mem:
685
          feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
686
                      " failovers should node %s fail" % (node, prinode))
687
          bad = True
688
    return bad
689

    
690
  def CheckPrereq(self):
691
    """Check prerequisites.
692

693
    Transform the list of checks we're going to skip into a set and check that
694
    all its members are valid.
695

696
    """
697
    self.skip_set = frozenset(self.op.skip_checks)
698
    if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
699
      raise errors.OpPrereqError("Invalid checks to be skipped specified")
700

    
701
  def BuildHooksEnv(self):
702
    """Build hooks env.
703

704
    Cluster-Verify hooks just rone in the post phase and their failure makes
705
    the output be logged in the verify output and the verification to fail.
706

707
    """
708
    all_nodes = self.cfg.GetNodeList()
709
    # TODO: populate the environment with useful information for verify hooks
710
    env = {}
711
    return env, [], all_nodes
712

    
713
  def Exec(self, feedback_fn):
714
    """Verify integrity of cluster, performing various test on nodes.
715

716
    """
717
    bad = False
718
    feedback_fn("* Verifying global settings")
719
    for msg in self.cfg.VerifyConfig():
720
      feedback_fn("  - ERROR: %s" % msg)
721

    
722
    vg_name = self.cfg.GetVGName()
723
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
724
    nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
725
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
726
    i_non_redundant = [] # Non redundant instances
727
    node_volume = {}
728
    node_instance = {}
729
    node_info = {}
730
    instance_cfg = {}
731

    
732
    # FIXME: verify OS list
733
    # do local checksums
734
    file_names = list(self.sstore.GetFileList())
735
    file_names.append(constants.SSL_CERT_FILE)
736
    file_names.append(constants.CLUSTER_CONF_FILE)
737
    local_checksums = utils.FingerprintFiles(file_names)
738

    
739
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
740
    all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
741
    all_instanceinfo = rpc.call_instance_list(nodelist)
742
    all_vglist = rpc.call_vg_list(nodelist)
743
    node_verify_param = {
744
      'filelist': file_names,
745
      'nodelist': nodelist,
746
      'hypervisor': None,
747
      'node-net-test': [(node.name, node.primary_ip, node.secondary_ip)
748
                        for node in nodeinfo]
749
      }
750
    all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
751
    all_rversion = rpc.call_version(nodelist)
752
    all_ninfo = rpc.call_node_info(nodelist, self.cfg.GetVGName())
753

    
754
    for node in nodelist:
755
      feedback_fn("* Verifying node %s" % node)
756
      result = self._VerifyNode(node, file_names, local_checksums,
757
                                all_vglist[node], all_nvinfo[node],
758
                                all_rversion[node], feedback_fn)
759
      bad = bad or result
760

    
761
      # node_volume
762
      volumeinfo = all_volumeinfo[node]
763

    
764
      if isinstance(volumeinfo, basestring):
765
        feedback_fn("  - ERROR: LVM problem on node %s: %s" %
766
                    (node, volumeinfo[-400:].encode('string_escape')))
767
        bad = True
768
        node_volume[node] = {}
769
      elif not isinstance(volumeinfo, dict):
770
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
771
        bad = True
772
        continue
773
      else:
774
        node_volume[node] = volumeinfo
775

    
776
      # node_instance
777
      nodeinstance = all_instanceinfo[node]
778
      if type(nodeinstance) != list:
779
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
780
        bad = True
781
        continue
782

    
783
      node_instance[node] = nodeinstance
784

    
785
      # node_info
786
      nodeinfo = all_ninfo[node]
787
      if not isinstance(nodeinfo, dict):
788
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
789
        bad = True
790
        continue
791

    
792
      try:
793
        node_info[node] = {
794
          "mfree": int(nodeinfo['memory_free']),
795
          "dfree": int(nodeinfo['vg_free']),
796
          "pinst": [],
797
          "sinst": [],
798
          # dictionary holding all instances this node is secondary for,
799
          # grouped by their primary node. Each key is a cluster node, and each
800
          # value is a list of instances which have the key as primary and the
801
          # current node as secondary.  this is handy to calculate N+1 memory
802
          # availability if you can only failover from a primary to its
803
          # secondary.
804
          "sinst-by-pnode": {},
805
        }
806
      except ValueError:
807
        feedback_fn("  - ERROR: invalid value returned from node %s" % (node,))
808
        bad = True
809
        continue
810

    
811
    node_vol_should = {}
812

    
813
    for instance in instancelist:
814
      feedback_fn("* Verifying instance %s" % instance)
815
      inst_config = self.cfg.GetInstanceInfo(instance)
816
      result =  self._VerifyInstance(instance, inst_config, node_volume,
817
                                     node_instance, feedback_fn)
818
      bad = bad or result
819

    
820
      inst_config.MapLVsByNode(node_vol_should)
821

    
822
      instance_cfg[instance] = inst_config
823

    
824
      pnode = inst_config.primary_node
825
      if pnode in node_info:
826
        node_info[pnode]['pinst'].append(instance)
827
      else:
828
        feedback_fn("  - ERROR: instance %s, connection to primary node"
829
                    " %s failed" % (instance, pnode))
830
        bad = True
831

    
832
      # If the instance is non-redundant we cannot survive losing its primary
833
      # node, so we are not N+1 compliant. On the other hand we have no disk
834
      # templates with more than one secondary so that situation is not well
835
      # supported either.
836
      # FIXME: does not support file-backed instances
837
      if len(inst_config.secondary_nodes) == 0:
838
        i_non_redundant.append(instance)
839
      elif len(inst_config.secondary_nodes) > 1:
840
        feedback_fn("  - WARNING: multiple secondaries for instance %s"
841
                    % instance)
842

    
843
      for snode in inst_config.secondary_nodes:
844
        if snode in node_info:
845
          node_info[snode]['sinst'].append(instance)
846
          if pnode not in node_info[snode]['sinst-by-pnode']:
847
            node_info[snode]['sinst-by-pnode'][pnode] = []
848
          node_info[snode]['sinst-by-pnode'][pnode].append(instance)
849
        else:
850
          feedback_fn("  - ERROR: instance %s, connection to secondary node"
851
                      " %s failed" % (instance, snode))
852

    
853
    feedback_fn("* Verifying orphan volumes")
854
    result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
855
                                       feedback_fn)
856
    bad = bad or result
857

    
858
    feedback_fn("* Verifying remaining instances")
859
    result = self._VerifyOrphanInstances(instancelist, node_instance,
860
                                         feedback_fn)
861
    bad = bad or result
862

    
863
    if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
864
      feedback_fn("* Verifying N+1 Memory redundancy")
865
      result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
866
      bad = bad or result
867

    
868
    feedback_fn("* Other Notes")
869
    if i_non_redundant:
870
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
871
                  % len(i_non_redundant))
872

    
873
    return not bad
874

    
875
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
876
    """Analize the post-hooks' result, handle it, and send some
877
    nicely-formatted feedback back to the user.
878

879
    Args:
880
      phase: the hooks phase that has just been run
881
      hooks_results: the results of the multi-node hooks rpc call
882
      feedback_fn: function to send feedback back to the caller
883
      lu_result: previous Exec result
884

885
    """
886
    # We only really run POST phase hooks, and are only interested in
887
    # their results
888
    if phase == constants.HOOKS_PHASE_POST:
889
      # Used to change hooks' output to proper indentation
890
      indent_re = re.compile('^', re.M)
891
      feedback_fn("* Hooks Results")
892
      if not hooks_results:
893
        feedback_fn("  - ERROR: general communication failure")
894
        lu_result = 1
895
      else:
896
        for node_name in hooks_results:
897
          show_node_header = True
898
          res = hooks_results[node_name]
899
          if res is False or not isinstance(res, list):
900
            feedback_fn("    Communication failure")
901
            lu_result = 1
902
            continue
903
          for script, hkr, output in res:
904
            if hkr == constants.HKR_FAIL:
905
              # The node header is only shown once, if there are
906
              # failing hooks on that node
907
              if show_node_header:
908
                feedback_fn("  Node %s:" % node_name)
909
                show_node_header = False
910
              feedback_fn("    ERROR: Script %s failed, output:" % script)
911
              output = indent_re.sub('      ', output)
912
              feedback_fn("%s" % output)
913
              lu_result = 1
914

    
915
      return lu_result
916

    
917

    
918
class LUVerifyDisks(NoHooksLU):
919
  """Verifies the cluster disks status.
920

921
  """
922
  _OP_REQP = []
923
  REQ_BGL = False
924

    
925
  def ExpandNames(self):
926
    self.needed_locks = {
927
      locking.LEVEL_NODE: locking.ALL_SET,
928
      locking.LEVEL_INSTANCE: locking.ALL_SET,
929
    }
930
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
931

    
932
  def CheckPrereq(self):
933
    """Check prerequisites.
934

935
    This has no prerequisites.
936

937
    """
938
    pass
939

    
940
  def Exec(self, feedback_fn):
941
    """Verify integrity of cluster disks.
942

943
    """
944
    result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
945

    
946
    vg_name = self.cfg.GetVGName()
947
    nodes = utils.NiceSort(self.cfg.GetNodeList())
948
    instances = [self.cfg.GetInstanceInfo(name)
949
                 for name in self.cfg.GetInstanceList()]
950

    
951
    nv_dict = {}
952
    for inst in instances:
953
      inst_lvs = {}
954
      if (inst.status != "up" or
955
          inst.disk_template not in constants.DTS_NET_MIRROR):
956
        continue
957
      inst.MapLVsByNode(inst_lvs)
958
      # transform { iname: {node: [vol,],},} to {(node, vol): iname}
959
      for node, vol_list in inst_lvs.iteritems():
960
        for vol in vol_list:
961
          nv_dict[(node, vol)] = inst
962

    
963
    if not nv_dict:
964
      return result
965

    
966
    node_lvs = rpc.call_volume_list(nodes, vg_name)
967

    
968
    to_act = set()
969
    for node in nodes:
970
      # node_volume
971
      lvs = node_lvs[node]
972

    
973
      if isinstance(lvs, basestring):
974
        logger.Info("error enumerating LVs on node %s: %s" % (node, lvs))
975
        res_nlvm[node] = lvs
976
      elif not isinstance(lvs, dict):
977
        logger.Info("connection to node %s failed or invalid data returned" %
978
                    (node,))
979
        res_nodes.append(node)
980
        continue
981

    
982
      for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
983
        inst = nv_dict.pop((node, lv_name), None)
984
        if (not lv_online and inst is not None
985
            and inst.name not in res_instances):
986
          res_instances.append(inst.name)
987

    
988
    # any leftover items in nv_dict are missing LVs, let's arrange the
989
    # data better
990
    for key, inst in nv_dict.iteritems():
991
      if inst.name not in res_missing:
992
        res_missing[inst.name] = []
993
      res_missing[inst.name].append(key)
994

    
995
    return result
996

    
997

    
998
class LURenameCluster(LogicalUnit):
999
  """Rename the cluster.
1000

1001
  """
1002
  HPATH = "cluster-rename"
1003
  HTYPE = constants.HTYPE_CLUSTER
1004
  _OP_REQP = ["name"]
1005
  REQ_WSSTORE = True
1006

    
1007
  def BuildHooksEnv(self):
1008
    """Build hooks env.
1009

1010
    """
1011
    env = {
1012
      "OP_TARGET": self.sstore.GetClusterName(),
1013
      "NEW_NAME": self.op.name,
1014
      }
1015
    mn = self.sstore.GetMasterNode()
1016
    return env, [mn], [mn]
1017

    
1018
  def CheckPrereq(self):
1019
    """Verify that the passed name is a valid one.
1020

1021
    """
1022
    hostname = utils.HostInfo(self.op.name)
1023

    
1024
    new_name = hostname.name
1025
    self.ip = new_ip = hostname.ip
1026
    old_name = self.sstore.GetClusterName()
1027
    old_ip = self.sstore.GetMasterIP()
1028
    if new_name == old_name and new_ip == old_ip:
1029
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
1030
                                 " cluster has changed")
1031
    if new_ip != old_ip:
1032
      if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1033
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
1034
                                   " reachable on the network. Aborting." %
1035
                                   new_ip)
1036

    
1037
    self.op.name = new_name
1038

    
1039
  def Exec(self, feedback_fn):
1040
    """Rename the cluster.
1041

1042
    """
1043
    clustername = self.op.name
1044
    ip = self.ip
1045
    ss = self.sstore
1046

    
1047
    # shutdown the master IP
1048
    master = ss.GetMasterNode()
1049
    if not rpc.call_node_stop_master(master, False):
1050
      raise errors.OpExecError("Could not disable the master role")
1051

    
1052
    try:
1053
      # modify the sstore
1054
      ss.SetKey(ss.SS_MASTER_IP, ip)
1055
      ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
1056

    
1057
      # Distribute updated ss config to all nodes
1058
      myself = self.cfg.GetNodeInfo(master)
1059
      dist_nodes = self.cfg.GetNodeList()
1060
      if myself.name in dist_nodes:
1061
        dist_nodes.remove(myself.name)
1062

    
1063
      logger.Debug("Copying updated ssconf data to all nodes")
1064
      for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1065
        fname = ss.KeyToFilename(keyname)
1066
        result = rpc.call_upload_file(dist_nodes, fname)
1067
        for to_node in dist_nodes:
1068
          if not result[to_node]:
1069
            logger.Error("copy of file %s to node %s failed" %
1070
                         (fname, to_node))
1071
    finally:
1072
      if not rpc.call_node_start_master(master, False):
1073
        logger.Error("Could not re-enable the master role on the master,"
1074
                     " please restart manually.")
1075

    
1076

    
1077
def _RecursiveCheckIfLVMBased(disk):
1078
  """Check if the given disk or its children are lvm-based.
1079

1080
  Args:
1081
    disk: ganeti.objects.Disk object
1082

1083
  Returns:
1084
    boolean indicating whether a LD_LV dev_type was found or not
1085

1086
  """
1087
  if disk.children:
1088
    for chdisk in disk.children:
1089
      if _RecursiveCheckIfLVMBased(chdisk):
1090
        return True
1091
  return disk.dev_type == constants.LD_LV
1092

    
1093

    
1094
class LUSetClusterParams(LogicalUnit):
1095
  """Change the parameters of the cluster.
1096

1097
  """
1098
  HPATH = "cluster-modify"
1099
  HTYPE = constants.HTYPE_CLUSTER
1100
  _OP_REQP = []
1101

    
1102
  def BuildHooksEnv(self):
1103
    """Build hooks env.
1104

1105
    """
1106
    env = {
1107
      "OP_TARGET": self.sstore.GetClusterName(),
1108
      "NEW_VG_NAME": self.op.vg_name,
1109
      }
1110
    mn = self.sstore.GetMasterNode()
1111
    return env, [mn], [mn]
1112

    
1113
  def CheckPrereq(self):
1114
    """Check prerequisites.
1115

1116
    This checks whether the given params don't conflict and
1117
    if the given volume group is valid.
1118

1119
    """
1120
    if not self.op.vg_name:
1121
      instances = [self.cfg.GetInstanceInfo(name)
1122
                   for name in self.cfg.GetInstanceList()]
1123
      for inst in instances:
1124
        for disk in inst.disks:
1125
          if _RecursiveCheckIfLVMBased(disk):
1126
            raise errors.OpPrereqError("Cannot disable lvm storage while"
1127
                                       " lvm-based instances exist")
1128

    
1129
    # if vg_name not None, checks given volume group on all nodes
1130
    if self.op.vg_name:
1131
      node_list = self.cfg.GetNodeList()
1132
      vglist = rpc.call_vg_list(node_list)
1133
      for node in node_list:
1134
        vgstatus = utils.CheckVolumeGroupSize(vglist[node], self.op.vg_name,
1135
                                              constants.MIN_VG_SIZE)
1136
        if vgstatus:
1137
          raise errors.OpPrereqError("Error on node '%s': %s" %
1138
                                     (node, vgstatus))
1139

    
1140
  def Exec(self, feedback_fn):
1141
    """Change the parameters of the cluster.
1142

1143
    """
1144
    if self.op.vg_name != self.cfg.GetVGName():
1145
      self.cfg.SetVGName(self.op.vg_name)
1146
    else:
1147
      feedback_fn("Cluster LVM configuration already in desired"
1148
                  " state, not changing")
1149

    
1150

    
1151
def _WaitForSync(cfgw, instance, proc, oneshot=False, unlock=False):
1152
  """Sleep and poll for an instance's disk to sync.
1153

1154
  """
1155
  if not instance.disks:
1156
    return True
1157

    
1158
  if not oneshot:
1159
    proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1160

    
1161
  node = instance.primary_node
1162

    
1163
  for dev in instance.disks:
1164
    cfgw.SetDiskID(dev, node)
1165

    
1166
  retries = 0
1167
  while True:
1168
    max_time = 0
1169
    done = True
1170
    cumul_degraded = False
1171
    rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1172
    if not rstats:
1173
      proc.LogWarning("Can't get any data from node %s" % node)
1174
      retries += 1
1175
      if retries >= 10:
1176
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1177
                                 " aborting." % node)
1178
      time.sleep(6)
1179
      continue
1180
    retries = 0
1181
    for i in range(len(rstats)):
1182
      mstat = rstats[i]
1183
      if mstat is None:
1184
        proc.LogWarning("Can't compute data for node %s/%s" %
1185
                        (node, instance.disks[i].iv_name))
1186
        continue
1187
      # we ignore the ldisk parameter
1188
      perc_done, est_time, is_degraded, _ = mstat
1189
      cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1190
      if perc_done is not None:
1191
        done = False
1192
        if est_time is not None:
1193
          rem_time = "%d estimated seconds remaining" % est_time
1194
          max_time = est_time
1195
        else:
1196
          rem_time = "no time estimate"
1197
        proc.LogInfo("- device %s: %5.2f%% done, %s" %
1198
                     (instance.disks[i].iv_name, perc_done, rem_time))
1199
    if done or oneshot:
1200
      break
1201

    
1202
    time.sleep(min(60, max_time))
1203

    
1204
  if done:
1205
    proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1206
  return not cumul_degraded
1207

    
1208

    
1209
def _CheckDiskConsistency(cfgw, dev, node, on_primary, ldisk=False):
1210
  """Check that mirrors are not degraded.
1211

1212
  The ldisk parameter, if True, will change the test from the
1213
  is_degraded attribute (which represents overall non-ok status for
1214
  the device(s)) to the ldisk (representing the local storage status).
1215

1216
  """
1217
  cfgw.SetDiskID(dev, node)
1218
  if ldisk:
1219
    idx = 6
1220
  else:
1221
    idx = 5
1222

    
1223
  result = True
1224
  if on_primary or dev.AssembleOnSecondary():
1225
    rstats = rpc.call_blockdev_find(node, dev)
1226
    if not rstats:
1227
      logger.ToStderr("Node %s: Disk degraded, not found or node down" % node)
1228
      result = False
1229
    else:
1230
      result = result and (not rstats[idx])
1231
  if dev.children:
1232
    for child in dev.children:
1233
      result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1234

    
1235
  return result
1236

    
1237

    
1238
class LUDiagnoseOS(NoHooksLU):
1239
  """Logical unit for OS diagnose/query.
1240

1241
  """
1242
  _OP_REQP = ["output_fields", "names"]
1243
  REQ_BGL = False
1244

    
1245
  def ExpandNames(self):
1246
    if self.op.names:
1247
      raise errors.OpPrereqError("Selective OS query not supported")
1248

    
1249
    self.dynamic_fields = frozenset(["name", "valid", "node_status"])
1250
    _CheckOutputFields(static=[],
1251
                       dynamic=self.dynamic_fields,
1252
                       selected=self.op.output_fields)
1253

    
1254
    # Lock all nodes, in shared mode
1255
    self.needed_locks = {}
1256
    self.share_locks[locking.LEVEL_NODE] = 1
1257
    self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1258

    
1259
  def CheckPrereq(self):
1260
    """Check prerequisites.
1261

1262
    """
1263

    
1264
  @staticmethod
1265
  def _DiagnoseByOS(node_list, rlist):
1266
    """Remaps a per-node return list into an a per-os per-node dictionary
1267

1268
      Args:
1269
        node_list: a list with the names of all nodes
1270
        rlist: a map with node names as keys and OS objects as values
1271

1272
      Returns:
1273
        map: a map with osnames as keys and as value another map, with
1274
             nodes as
1275
             keys and list of OS objects as values
1276
             e.g. {"debian-etch": {"node1": [<object>,...],
1277
                                   "node2": [<object>,]}
1278
                  }
1279

1280
    """
1281
    all_os = {}
1282
    for node_name, nr in rlist.iteritems():
1283
      if not nr:
1284
        continue
1285
      for os_obj in nr:
1286
        if os_obj.name not in all_os:
1287
          # build a list of nodes for this os containing empty lists
1288
          # for each node in node_list
1289
          all_os[os_obj.name] = {}
1290
          for nname in node_list:
1291
            all_os[os_obj.name][nname] = []
1292
        all_os[os_obj.name][node_name].append(os_obj)
1293
    return all_os
1294

    
1295
  def Exec(self, feedback_fn):
1296
    """Compute the list of OSes.
1297

1298
    """
1299
    node_list = self.acquired_locks[locking.LEVEL_NODE]
1300
    node_data = rpc.call_os_diagnose(node_list)
1301
    if node_data == False:
1302
      raise errors.OpExecError("Can't gather the list of OSes")
1303
    pol = self._DiagnoseByOS(node_list, node_data)
1304
    output = []
1305
    for os_name, os_data in pol.iteritems():
1306
      row = []
1307
      for field in self.op.output_fields:
1308
        if field == "name":
1309
          val = os_name
1310
        elif field == "valid":
1311
          val = utils.all([osl and osl[0] for osl in os_data.values()])
1312
        elif field == "node_status":
1313
          val = {}
1314
          for node_name, nos_list in os_data.iteritems():
1315
            val[node_name] = [(v.status, v.path) for v in nos_list]
1316
        else:
1317
          raise errors.ParameterError(field)
1318
        row.append(val)
1319
      output.append(row)
1320

    
1321
    return output
1322

    
1323

    
1324
class LURemoveNode(LogicalUnit):
1325
  """Logical unit for removing a node.
1326

1327
  """
1328
  HPATH = "node-remove"
1329
  HTYPE = constants.HTYPE_NODE
1330
  _OP_REQP = ["node_name"]
1331

    
1332
  def BuildHooksEnv(self):
1333
    """Build hooks env.
1334

1335
    This doesn't run on the target node in the pre phase as a failed
1336
    node would then be impossible to remove.
1337

1338
    """
1339
    env = {
1340
      "OP_TARGET": self.op.node_name,
1341
      "NODE_NAME": self.op.node_name,
1342
      }
1343
    all_nodes = self.cfg.GetNodeList()
1344
    all_nodes.remove(self.op.node_name)
1345
    return env, all_nodes, all_nodes
1346

    
1347
  def CheckPrereq(self):
1348
    """Check prerequisites.
1349

1350
    This checks:
1351
     - the node exists in the configuration
1352
     - it does not have primary or secondary instances
1353
     - it's not the master
1354

1355
    Any errors are signalled by raising errors.OpPrereqError.
1356

1357
    """
1358
    node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1359
    if node is None:
1360
      raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1361

    
1362
    instance_list = self.cfg.GetInstanceList()
1363

    
1364
    masternode = self.sstore.GetMasterNode()
1365
    if node.name == masternode:
1366
      raise errors.OpPrereqError("Node is the master node,"
1367
                                 " you need to failover first.")
1368

    
1369
    for instance_name in instance_list:
1370
      instance = self.cfg.GetInstanceInfo(instance_name)
1371
      if node.name == instance.primary_node:
1372
        raise errors.OpPrereqError("Instance %s still running on the node,"
1373
                                   " please remove first." % instance_name)
1374
      if node.name in instance.secondary_nodes:
1375
        raise errors.OpPrereqError("Instance %s has node as a secondary,"
1376
                                   " please remove first." % instance_name)
1377
    self.op.node_name = node.name
1378
    self.node = node
1379

    
1380
  def Exec(self, feedback_fn):
1381
    """Removes the node from the cluster.
1382

1383
    """
1384
    node = self.node
1385
    logger.Info("stopping the node daemon and removing configs from node %s" %
1386
                node.name)
1387

    
1388
    self.context.RemoveNode(node.name)
1389

    
1390
    rpc.call_node_leave_cluster(node.name)
1391

    
1392

    
1393
class LUQueryNodes(NoHooksLU):
1394
  """Logical unit for querying nodes.
1395

1396
  """
1397
  _OP_REQP = ["output_fields", "names"]
1398
  REQ_BGL = False
1399

    
1400
  def ExpandNames(self):
1401
    self.dynamic_fields = frozenset([
1402
      "dtotal", "dfree",
1403
      "mtotal", "mnode", "mfree",
1404
      "bootid",
1405
      "ctotal",
1406
      ])
1407

    
1408
    self.static_fields = frozenset([
1409
      "name", "pinst_cnt", "sinst_cnt",
1410
      "pinst_list", "sinst_list",
1411
      "pip", "sip", "tags",
1412
      ])
1413

    
1414
    _CheckOutputFields(static=self.static_fields,
1415
                       dynamic=self.dynamic_fields,
1416
                       selected=self.op.output_fields)
1417

    
1418
    self.needed_locks = {}
1419
    self.share_locks[locking.LEVEL_NODE] = 1
1420

    
1421
    if self.op.names:
1422
      self.wanted = _GetWantedNodes(self, self.op.names)
1423
    else:
1424
      self.wanted = locking.ALL_SET
1425

    
1426
    self.do_locking = not self.static_fields.issuperset(self.op.output_fields)
1427
    if self.do_locking:
1428
      # if we don't request only static fields, we need to lock the nodes
1429
      self.needed_locks[locking.LEVEL_NODE] = self.wanted
1430

    
1431

    
1432
  def CheckPrereq(self):
1433
    """Check prerequisites.
1434

1435
    """
1436
    # The validation of the node list is done in the _GetWantedNodes,
1437
    # if non empty, and if empty, there's no validation to do
1438
    pass
1439

    
1440
  def Exec(self, feedback_fn):
1441
    """Computes the list of nodes and their attributes.
1442

1443
    """
1444
    all_info = self.cfg.GetAllNodesInfo()
1445
    if self.do_locking:
1446
      nodenames = self.acquired_locks[locking.LEVEL_NODE]
1447
    else:
1448
      nodenames = all_info.keys()
1449
    nodelist = [all_info[name] for name in nodenames]
1450

    
1451
    # begin data gathering
1452

    
1453
    if self.dynamic_fields.intersection(self.op.output_fields):
1454
      live_data = {}
1455
      node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1456
      for name in nodenames:
1457
        nodeinfo = node_data.get(name, None)
1458
        if nodeinfo:
1459
          live_data[name] = {
1460
            "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1461
            "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1462
            "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1463
            "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1464
            "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1465
            "ctotal": utils.TryConvert(int, nodeinfo['cpu_total']),
1466
            "bootid": nodeinfo['bootid'],
1467
            }
1468
        else:
1469
          live_data[name] = {}
1470
    else:
1471
      live_data = dict.fromkeys(nodenames, {})
1472

    
1473
    node_to_primary = dict([(name, set()) for name in nodenames])
1474
    node_to_secondary = dict([(name, set()) for name in nodenames])
1475

    
1476
    inst_fields = frozenset(("pinst_cnt", "pinst_list",
1477
                             "sinst_cnt", "sinst_list"))
1478
    if inst_fields & frozenset(self.op.output_fields):
1479
      instancelist = self.cfg.GetInstanceList()
1480

    
1481
      for instance_name in instancelist:
1482
        inst = self.cfg.GetInstanceInfo(instance_name)
1483
        if inst.primary_node in node_to_primary:
1484
          node_to_primary[inst.primary_node].add(inst.name)
1485
        for secnode in inst.secondary_nodes:
1486
          if secnode in node_to_secondary:
1487
            node_to_secondary[secnode].add(inst.name)
1488

    
1489
    # end data gathering
1490

    
1491
    output = []
1492
    for node in nodelist:
1493
      node_output = []
1494
      for field in self.op.output_fields:
1495
        if field == "name":
1496
          val = node.name
1497
        elif field == "pinst_list":
1498
          val = list(node_to_primary[node.name])
1499
        elif field == "sinst_list":
1500
          val = list(node_to_secondary[node.name])
1501
        elif field == "pinst_cnt":
1502
          val = len(node_to_primary[node.name])
1503
        elif field == "sinst_cnt":
1504
          val = len(node_to_secondary[node.name])
1505
        elif field == "pip":
1506
          val = node.primary_ip
1507
        elif field == "sip":
1508
          val = node.secondary_ip
1509
        elif field == "tags":
1510
          val = list(node.GetTags())
1511
        elif field in self.dynamic_fields:
1512
          val = live_data[node.name].get(field, None)
1513
        else:
1514
          raise errors.ParameterError(field)
1515
        node_output.append(val)
1516
      output.append(node_output)
1517

    
1518
    return output
1519

    
1520

    
1521
class LUQueryNodeVolumes(NoHooksLU):
1522
  """Logical unit for getting volumes on node(s).
1523

1524
  """
1525
  _OP_REQP = ["nodes", "output_fields"]
1526
  REQ_BGL = False
1527

    
1528
  def ExpandNames(self):
1529
    _CheckOutputFields(static=["node"],
1530
                       dynamic=["phys", "vg", "name", "size", "instance"],
1531
                       selected=self.op.output_fields)
1532

    
1533
    self.needed_locks = {}
1534
    self.share_locks[locking.LEVEL_NODE] = 1
1535
    if not self.op.nodes:
1536
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1537
    else:
1538
      self.needed_locks[locking.LEVEL_NODE] = \
1539
        _GetWantedNodes(self, self.op.nodes)
1540

    
1541
  def CheckPrereq(self):
1542
    """Check prerequisites.
1543

1544
    This checks that the fields required are valid output fields.
1545

1546
    """
1547
    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
1548

    
1549
  def Exec(self, feedback_fn):
1550
    """Computes the list of nodes and their attributes.
1551

1552
    """
1553
    nodenames = self.nodes
1554
    volumes = rpc.call_node_volumes(nodenames)
1555

    
1556
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
1557
             in self.cfg.GetInstanceList()]
1558

    
1559
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1560

    
1561
    output = []
1562
    for node in nodenames:
1563
      if node not in volumes or not volumes[node]:
1564
        continue
1565

    
1566
      node_vols = volumes[node][:]
1567
      node_vols.sort(key=lambda vol: vol['dev'])
1568

    
1569
      for vol in node_vols:
1570
        node_output = []
1571
        for field in self.op.output_fields:
1572
          if field == "node":
1573
            val = node
1574
          elif field == "phys":
1575
            val = vol['dev']
1576
          elif field == "vg":
1577
            val = vol['vg']
1578
          elif field == "name":
1579
            val = vol['name']
1580
          elif field == "size":
1581
            val = int(float(vol['size']))
1582
          elif field == "instance":
1583
            for inst in ilist:
1584
              if node not in lv_by_node[inst]:
1585
                continue
1586
              if vol['name'] in lv_by_node[inst][node]:
1587
                val = inst.name
1588
                break
1589
            else:
1590
              val = '-'
1591
          else:
1592
            raise errors.ParameterError(field)
1593
          node_output.append(str(val))
1594

    
1595
        output.append(node_output)
1596

    
1597
    return output
1598

    
1599

    
1600
class LUAddNode(LogicalUnit):
1601
  """Logical unit for adding node to the cluster.
1602

1603
  """
1604
  HPATH = "node-add"
1605
  HTYPE = constants.HTYPE_NODE
1606
  _OP_REQP = ["node_name"]
1607

    
1608
  def BuildHooksEnv(self):
1609
    """Build hooks env.
1610

1611
    This will run on all nodes before, and on all nodes + the new node after.
1612

1613
    """
1614
    env = {
1615
      "OP_TARGET": self.op.node_name,
1616
      "NODE_NAME": self.op.node_name,
1617
      "NODE_PIP": self.op.primary_ip,
1618
      "NODE_SIP": self.op.secondary_ip,
1619
      }
1620
    nodes_0 = self.cfg.GetNodeList()
1621
    nodes_1 = nodes_0 + [self.op.node_name, ]
1622
    return env, nodes_0, nodes_1
1623

    
1624
  def CheckPrereq(self):
1625
    """Check prerequisites.
1626

1627
    This checks:
1628
     - the new node is not already in the config
1629
     - it is resolvable
1630
     - its parameters (single/dual homed) matches the cluster
1631

1632
    Any errors are signalled by raising errors.OpPrereqError.
1633

1634
    """
1635
    node_name = self.op.node_name
1636
    cfg = self.cfg
1637

    
1638
    dns_data = utils.HostInfo(node_name)
1639

    
1640
    node = dns_data.name
1641
    primary_ip = self.op.primary_ip = dns_data.ip
1642
    secondary_ip = getattr(self.op, "secondary_ip", None)
1643
    if secondary_ip is None:
1644
      secondary_ip = primary_ip
1645
    if not utils.IsValidIP(secondary_ip):
1646
      raise errors.OpPrereqError("Invalid secondary IP given")
1647
    self.op.secondary_ip = secondary_ip
1648

    
1649
    node_list = cfg.GetNodeList()
1650
    if not self.op.readd and node in node_list:
1651
      raise errors.OpPrereqError("Node %s is already in the configuration" %
1652
                                 node)
1653
    elif self.op.readd and node not in node_list:
1654
      raise errors.OpPrereqError("Node %s is not in the configuration" % node)
1655

    
1656
    for existing_node_name in node_list:
1657
      existing_node = cfg.GetNodeInfo(existing_node_name)
1658

    
1659
      if self.op.readd and node == existing_node_name:
1660
        if (existing_node.primary_ip != primary_ip or
1661
            existing_node.secondary_ip != secondary_ip):
1662
          raise errors.OpPrereqError("Readded node doesn't have the same IP"
1663
                                     " address configuration as before")
1664
        continue
1665

    
1666
      if (existing_node.primary_ip == primary_ip or
1667
          existing_node.secondary_ip == primary_ip or
1668
          existing_node.primary_ip == secondary_ip or
1669
          existing_node.secondary_ip == secondary_ip):
1670
        raise errors.OpPrereqError("New node ip address(es) conflict with"
1671
                                   " existing node %s" % existing_node.name)
1672

    
1673
    # check that the type of the node (single versus dual homed) is the
1674
    # same as for the master
1675
    myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1676
    master_singlehomed = myself.secondary_ip == myself.primary_ip
1677
    newbie_singlehomed = secondary_ip == primary_ip
1678
    if master_singlehomed != newbie_singlehomed:
1679
      if master_singlehomed:
1680
        raise errors.OpPrereqError("The master has no private ip but the"
1681
                                   " new node has one")
1682
      else:
1683
        raise errors.OpPrereqError("The master has a private ip but the"
1684
                                   " new node doesn't have one")
1685

    
1686
    # checks reachablity
1687
    if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1688
      raise errors.OpPrereqError("Node not reachable by ping")
1689

    
1690
    if not newbie_singlehomed:
1691
      # check reachability from my secondary ip to newbie's secondary ip
1692
      if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1693
                           source=myself.secondary_ip):
1694
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1695
                                   " based ping to noded port")
1696

    
1697
    self.new_node = objects.Node(name=node,
1698
                                 primary_ip=primary_ip,
1699
                                 secondary_ip=secondary_ip)
1700

    
1701
  def Exec(self, feedback_fn):
1702
    """Adds the new node to the cluster.
1703

1704
    """
1705
    new_node = self.new_node
1706
    node = new_node.name
1707

    
1708
    # check connectivity
1709
    result = rpc.call_version([node])[node]
1710
    if result:
1711
      if constants.PROTOCOL_VERSION == result:
1712
        logger.Info("communication to node %s fine, sw version %s match" %
1713
                    (node, result))
1714
      else:
1715
        raise errors.OpExecError("Version mismatch master version %s,"
1716
                                 " node version %s" %
1717
                                 (constants.PROTOCOL_VERSION, result))
1718
    else:
1719
      raise errors.OpExecError("Cannot get version from the new node")
1720

    
1721
    # setup ssh on node
1722
    logger.Info("copy ssh key to node %s" % node)
1723
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1724
    keyarray = []
1725
    keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1726
                constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1727
                priv_key, pub_key]
1728

    
1729
    for i in keyfiles:
1730
      f = open(i, 'r')
1731
      try:
1732
        keyarray.append(f.read())
1733
      finally:
1734
        f.close()
1735

    
1736
    result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1737
                               keyarray[3], keyarray[4], keyarray[5])
1738

    
1739
    if not result:
1740
      raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1741

    
1742
    # Add node to our /etc/hosts, and add key to known_hosts
1743
    utils.AddHostToEtcHosts(new_node.name)
1744

    
1745
    if new_node.secondary_ip != new_node.primary_ip:
1746
      if not rpc.call_node_tcp_ping(new_node.name,
1747
                                    constants.LOCALHOST_IP_ADDRESS,
1748
                                    new_node.secondary_ip,
1749
                                    constants.DEFAULT_NODED_PORT,
1750
                                    10, False):
1751
        raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1752
                                 " you gave (%s). Please fix and re-run this"
1753
                                 " command." % new_node.secondary_ip)
1754

    
1755
    node_verify_list = [self.sstore.GetMasterNode()]
1756
    node_verify_param = {
1757
      'nodelist': [node],
1758
      # TODO: do a node-net-test as well?
1759
    }
1760

    
1761
    result = rpc.call_node_verify(node_verify_list, node_verify_param)
1762
    for verifier in node_verify_list:
1763
      if not result[verifier]:
1764
        raise errors.OpExecError("Cannot communicate with %s's node daemon"
1765
                                 " for remote verification" % verifier)
1766
      if result[verifier]['nodelist']:
1767
        for failed in result[verifier]['nodelist']:
1768
          feedback_fn("ssh/hostname verification failed %s -> %s" %
1769
                      (verifier, result[verifier]['nodelist'][failed]))
1770
        raise errors.OpExecError("ssh/hostname verification failed.")
1771

    
1772
    # Distribute updated /etc/hosts and known_hosts to all nodes,
1773
    # including the node just added
1774
    myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1775
    dist_nodes = self.cfg.GetNodeList()
1776
    if not self.op.readd:
1777
      dist_nodes.append(node)
1778
    if myself.name in dist_nodes:
1779
      dist_nodes.remove(myself.name)
1780

    
1781
    logger.Debug("Copying hosts and known_hosts to all nodes")
1782
    for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1783
      result = rpc.call_upload_file(dist_nodes, fname)
1784
      for to_node in dist_nodes:
1785
        if not result[to_node]:
1786
          logger.Error("copy of file %s to node %s failed" %
1787
                       (fname, to_node))
1788

    
1789
    to_copy = self.sstore.GetFileList()
1790
    if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1791
      to_copy.append(constants.VNC_PASSWORD_FILE)
1792
    for fname in to_copy:
1793
      result = rpc.call_upload_file([node], fname)
1794
      if not result[node]:
1795
        logger.Error("could not copy file %s to node %s" % (fname, node))
1796

    
1797
    if self.op.readd:
1798
      self.context.ReaddNode(new_node)
1799
    else:
1800
      self.context.AddNode(new_node)
1801

    
1802

    
1803
class LUQueryClusterInfo(NoHooksLU):
1804
  """Query cluster configuration.
1805

1806
  """
1807
  _OP_REQP = []
1808
  REQ_MASTER = False
1809
  REQ_BGL = False
1810

    
1811
  def ExpandNames(self):
1812
    self.needed_locks = {}
1813

    
1814
  def CheckPrereq(self):
1815
    """No prerequsites needed for this LU.
1816

1817
    """
1818
    pass
1819

    
1820
  def Exec(self, feedback_fn):
1821
    """Return cluster config.
1822

1823
    """
1824
    result = {
1825
      "name": self.sstore.GetClusterName(),
1826
      "software_version": constants.RELEASE_VERSION,
1827
      "protocol_version": constants.PROTOCOL_VERSION,
1828
      "config_version": constants.CONFIG_VERSION,
1829
      "os_api_version": constants.OS_API_VERSION,
1830
      "export_version": constants.EXPORT_VERSION,
1831
      "master": self.sstore.GetMasterNode(),
1832
      "architecture": (platform.architecture()[0], platform.machine()),
1833
      "hypervisor_type": self.sstore.GetHypervisorType(),
1834
      }
1835

    
1836
    return result
1837

    
1838

    
1839
class LUDumpClusterConfig(NoHooksLU):
1840
  """Return a text-representation of the cluster-config.
1841

1842
  """
1843
  _OP_REQP = []
1844
  REQ_BGL = False
1845

    
1846
  def ExpandNames(self):
1847
    self.needed_locks = {}
1848

    
1849
  def CheckPrereq(self):
1850
    """No prerequisites.
1851

1852
    """
1853
    pass
1854

    
1855
  def Exec(self, feedback_fn):
1856
    """Dump a representation of the cluster config to the standard output.
1857

1858
    """
1859
    return self.cfg.DumpConfig()
1860

    
1861

    
1862
class LUActivateInstanceDisks(NoHooksLU):
1863
  """Bring up an instance's disks.
1864

1865
  """
1866
  _OP_REQP = ["instance_name"]
1867
  REQ_BGL = False
1868

    
1869
  def ExpandNames(self):
1870
    self._ExpandAndLockInstance()
1871
    self.needed_locks[locking.LEVEL_NODE] = []
1872
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1873

    
1874
  def DeclareLocks(self, level):
1875
    if level == locking.LEVEL_NODE:
1876
      self._LockInstancesNodes()
1877

    
1878
  def CheckPrereq(self):
1879
    """Check prerequisites.
1880

1881
    This checks that the instance is in the cluster.
1882

1883
    """
1884
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
1885
    assert self.instance is not None, \
1886
      "Cannot retrieve locked instance %s" % self.op.instance_name
1887

    
1888
  def Exec(self, feedback_fn):
1889
    """Activate the disks.
1890

1891
    """
1892
    disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
1893
    if not disks_ok:
1894
      raise errors.OpExecError("Cannot activate block devices")
1895

    
1896
    return disks_info
1897

    
1898

    
1899
def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
1900
  """Prepare the block devices for an instance.
1901

1902
  This sets up the block devices on all nodes.
1903

1904
  Args:
1905
    instance: a ganeti.objects.Instance object
1906
    ignore_secondaries: if true, errors on secondary nodes won't result
1907
                        in an error return from the function
1908

1909
  Returns:
1910
    false if the operation failed
1911
    list of (host, instance_visible_name, node_visible_name) if the operation
1912
         suceeded with the mapping from node devices to instance devices
1913
  """
1914
  device_info = []
1915
  disks_ok = True
1916
  iname = instance.name
1917
  # With the two passes mechanism we try to reduce the window of
1918
  # opportunity for the race condition of switching DRBD to primary
1919
  # before handshaking occured, but we do not eliminate it
1920

    
1921
  # The proper fix would be to wait (with some limits) until the
1922
  # connection has been made and drbd transitions from WFConnection
1923
  # into any other network-connected state (Connected, SyncTarget,
1924
  # SyncSource, etc.)
1925

    
1926
  # 1st pass, assemble on all nodes in secondary mode
1927
  for inst_disk in instance.disks:
1928
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1929
      cfg.SetDiskID(node_disk, node)
1930
      result = rpc.call_blockdev_assemble(node, node_disk, iname, False)
1931
      if not result:
1932
        logger.Error("could not prepare block device %s on node %s"
1933
                     " (is_primary=False, pass=1)" % (inst_disk.iv_name, node))
1934
        if not ignore_secondaries:
1935
          disks_ok = False
1936

    
1937
  # FIXME: race condition on drbd migration to primary
1938

    
1939
  # 2nd pass, do only the primary node
1940
  for inst_disk in instance.disks:
1941
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1942
      if node != instance.primary_node:
1943
        continue
1944
      cfg.SetDiskID(node_disk, node)
1945
      result = rpc.call_blockdev_assemble(node, node_disk, iname, True)
1946
      if not result:
1947
        logger.Error("could not prepare block device %s on node %s"
1948
                     " (is_primary=True, pass=2)" % (inst_disk.iv_name, node))
1949
        disks_ok = False
1950
    device_info.append((instance.primary_node, inst_disk.iv_name, result))
1951

    
1952
  # leave the disks configured for the primary node
1953
  # this is a workaround that would be fixed better by
1954
  # improving the logical/physical id handling
1955
  for disk in instance.disks:
1956
    cfg.SetDiskID(disk, instance.primary_node)
1957

    
1958
  return disks_ok, device_info
1959

    
1960

    
1961
def _StartInstanceDisks(cfg, instance, force):
1962
  """Start the disks of an instance.
1963

1964
  """
1965
  disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
1966
                                           ignore_secondaries=force)
1967
  if not disks_ok:
1968
    _ShutdownInstanceDisks(instance, cfg)
1969
    if force is not None and not force:
1970
      logger.Error("If the message above refers to a secondary node,"
1971
                   " you can retry the operation using '--force'.")
1972
    raise errors.OpExecError("Disk consistency error")
1973

    
1974

    
1975
class LUDeactivateInstanceDisks(NoHooksLU):
1976
  """Shutdown an instance's disks.
1977

1978
  """
1979
  _OP_REQP = ["instance_name"]
1980
  REQ_BGL = False
1981

    
1982
  def ExpandNames(self):
1983
    self._ExpandAndLockInstance()
1984
    self.needed_locks[locking.LEVEL_NODE] = []
1985
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1986

    
1987
  def DeclareLocks(self, level):
1988
    if level == locking.LEVEL_NODE:
1989
      self._LockInstancesNodes()
1990

    
1991
  def CheckPrereq(self):
1992
    """Check prerequisites.
1993

1994
    This checks that the instance is in the cluster.
1995

1996
    """
1997
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
1998
    assert self.instance is not None, \
1999
      "Cannot retrieve locked instance %s" % self.op.instance_name
2000

    
2001
  def Exec(self, feedback_fn):
2002
    """Deactivate the disks
2003

2004
    """
2005
    instance = self.instance
2006
    _SafeShutdownInstanceDisks(instance, self.cfg)
2007

    
2008

    
2009
def _SafeShutdownInstanceDisks(instance, cfg):
2010
  """Shutdown block devices of an instance.
2011

2012
  This function checks if an instance is running, before calling
2013
  _ShutdownInstanceDisks.
2014

2015
  """
2016
  ins_l = rpc.call_instance_list([instance.primary_node])
2017
  ins_l = ins_l[instance.primary_node]
2018
  if not type(ins_l) is list:
2019
    raise errors.OpExecError("Can't contact node '%s'" %
2020
                             instance.primary_node)
2021

    
2022
  if instance.name in ins_l:
2023
    raise errors.OpExecError("Instance is running, can't shutdown"
2024
                             " block devices.")
2025

    
2026
  _ShutdownInstanceDisks(instance, cfg)
2027

    
2028

    
2029
def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
2030
  """Shutdown block devices of an instance.
2031

2032
  This does the shutdown on all nodes of the instance.
2033

2034
  If the ignore_primary is false, errors on the primary node are
2035
  ignored.
2036

2037
  """
2038
  result = True
2039
  for disk in instance.disks:
2040
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2041
      cfg.SetDiskID(top_disk, node)
2042
      if not rpc.call_blockdev_shutdown(node, top_disk):
2043
        logger.Error("could not shutdown block device %s on node %s" %
2044
                     (disk.iv_name, node))
2045
        if not ignore_primary or node != instance.primary_node:
2046
          result = False
2047
  return result
2048

    
2049

    
2050
def _CheckNodeFreeMemory(cfg, node, reason, requested):
2051
  """Checks if a node has enough free memory.
2052

2053
  This function check if a given node has the needed amount of free
2054
  memory. In case the node has less memory or we cannot get the
2055
  information from the node, this function raise an OpPrereqError
2056
  exception.
2057

2058
  Args:
2059
    - cfg: a ConfigWriter instance
2060
    - node: the node name
2061
    - reason: string to use in the error message
2062
    - requested: the amount of memory in MiB
2063

2064
  """
2065
  nodeinfo = rpc.call_node_info([node], cfg.GetVGName())
2066
  if not nodeinfo or not isinstance(nodeinfo, dict):
2067
    raise errors.OpPrereqError("Could not contact node %s for resource"
2068
                             " information" % (node,))
2069

    
2070
  free_mem = nodeinfo[node].get('memory_free')
2071
  if not isinstance(free_mem, int):
2072
    raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2073
                             " was '%s'" % (node, free_mem))
2074
  if requested > free_mem:
2075
    raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2076
                             " needed %s MiB, available %s MiB" %
2077
                             (node, reason, requested, free_mem))
2078

    
2079

    
2080
class LUStartupInstance(LogicalUnit):
2081
  """Starts an instance.
2082

2083
  """
2084
  HPATH = "instance-start"
2085
  HTYPE = constants.HTYPE_INSTANCE
2086
  _OP_REQP = ["instance_name", "force"]
2087
  REQ_BGL = False
2088

    
2089
  def ExpandNames(self):
2090
    self._ExpandAndLockInstance()
2091
    self.needed_locks[locking.LEVEL_NODE] = []
2092
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2093

    
2094
  def DeclareLocks(self, level):
2095
    if level == locking.LEVEL_NODE:
2096
      self._LockInstancesNodes()
2097

    
2098
  def BuildHooksEnv(self):
2099
    """Build hooks env.
2100

2101
    This runs on master, primary and secondary nodes of the instance.
2102

2103
    """
2104
    env = {
2105
      "FORCE": self.op.force,
2106
      }
2107
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2108
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2109
          list(self.instance.secondary_nodes))
2110
    return env, nl, nl
2111

    
2112
  def CheckPrereq(self):
2113
    """Check prerequisites.
2114

2115
    This checks that the instance is in the cluster.
2116

2117
    """
2118
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2119
    assert self.instance is not None, \
2120
      "Cannot retrieve locked instance %s" % self.op.instance_name
2121

    
2122
    # check bridges existance
2123
    _CheckInstanceBridgesExist(instance)
2124

    
2125
    _CheckNodeFreeMemory(self.cfg, instance.primary_node,
2126
                         "starting instance %s" % instance.name,
2127
                         instance.memory)
2128

    
2129
  def Exec(self, feedback_fn):
2130
    """Start the instance.
2131

2132
    """
2133
    instance = self.instance
2134
    force = self.op.force
2135
    extra_args = getattr(self.op, "extra_args", "")
2136

    
2137
    self.cfg.MarkInstanceUp(instance.name)
2138

    
2139
    node_current = instance.primary_node
2140

    
2141
    _StartInstanceDisks(self.cfg, instance, force)
2142

    
2143
    if not rpc.call_instance_start(node_current, instance, extra_args):
2144
      _ShutdownInstanceDisks(instance, self.cfg)
2145
      raise errors.OpExecError("Could not start instance")
2146

    
2147

    
2148
class LURebootInstance(LogicalUnit):
2149
  """Reboot an instance.
2150

2151
  """
2152
  HPATH = "instance-reboot"
2153
  HTYPE = constants.HTYPE_INSTANCE
2154
  _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2155
  REQ_BGL = False
2156

    
2157
  def ExpandNames(self):
2158
    if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2159
                                   constants.INSTANCE_REBOOT_HARD,
2160
                                   constants.INSTANCE_REBOOT_FULL]:
2161
      raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2162
                                  (constants.INSTANCE_REBOOT_SOFT,
2163
                                   constants.INSTANCE_REBOOT_HARD,
2164
                                   constants.INSTANCE_REBOOT_FULL))
2165
    self._ExpandAndLockInstance()
2166
    self.needed_locks[locking.LEVEL_NODE] = []
2167
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2168

    
2169
  def DeclareLocks(self, level):
2170
    if level == locking.LEVEL_NODE:
2171
      primary_only = not constants.INSTANCE_REBOOT_FULL
2172
      self._LockInstancesNodes(primary_only=primary_only)
2173

    
2174
  def BuildHooksEnv(self):
2175
    """Build hooks env.
2176

2177
    This runs on master, primary and secondary nodes of the instance.
2178

2179
    """
2180
    env = {
2181
      "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2182
      }
2183
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2184
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2185
          list(self.instance.secondary_nodes))
2186
    return env, nl, nl
2187

    
2188
  def CheckPrereq(self):
2189
    """Check prerequisites.
2190

2191
    This checks that the instance is in the cluster.
2192

2193
    """
2194
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2195
    assert self.instance is not None, \
2196
      "Cannot retrieve locked instance %s" % self.op.instance_name
2197

    
2198
    # check bridges existance
2199
    _CheckInstanceBridgesExist(instance)
2200

    
2201
  def Exec(self, feedback_fn):
2202
    """Reboot the instance.
2203

2204
    """
2205
    instance = self.instance
2206
    ignore_secondaries = self.op.ignore_secondaries
2207
    reboot_type = self.op.reboot_type
2208
    extra_args = getattr(self.op, "extra_args", "")
2209

    
2210
    node_current = instance.primary_node
2211

    
2212
    if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2213
                       constants.INSTANCE_REBOOT_HARD]:
2214
      if not rpc.call_instance_reboot(node_current, instance,
2215
                                      reboot_type, extra_args):
2216
        raise errors.OpExecError("Could not reboot instance")
2217
    else:
2218
      if not rpc.call_instance_shutdown(node_current, instance):
2219
        raise errors.OpExecError("could not shutdown instance for full reboot")
2220
      _ShutdownInstanceDisks(instance, self.cfg)
2221
      _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
2222
      if not rpc.call_instance_start(node_current, instance, extra_args):
2223
        _ShutdownInstanceDisks(instance, self.cfg)
2224
        raise errors.OpExecError("Could not start instance for full reboot")
2225

    
2226
    self.cfg.MarkInstanceUp(instance.name)
2227

    
2228

    
2229
class LUShutdownInstance(LogicalUnit):
2230
  """Shutdown an instance.
2231

2232
  """
2233
  HPATH = "instance-stop"
2234
  HTYPE = constants.HTYPE_INSTANCE
2235
  _OP_REQP = ["instance_name"]
2236
  REQ_BGL = False
2237

    
2238
  def ExpandNames(self):
2239
    self._ExpandAndLockInstance()
2240
    self.needed_locks[locking.LEVEL_NODE] = []
2241
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2242

    
2243
  def DeclareLocks(self, level):
2244
    if level == locking.LEVEL_NODE:
2245
      self._LockInstancesNodes()
2246

    
2247
  def BuildHooksEnv(self):
2248
    """Build hooks env.
2249

2250
    This runs on master, primary and secondary nodes of the instance.
2251

2252
    """
2253
    env = _BuildInstanceHookEnvByObject(self.instance)
2254
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2255
          list(self.instance.secondary_nodes))
2256
    return env, nl, nl
2257

    
2258
  def CheckPrereq(self):
2259
    """Check prerequisites.
2260

2261
    This checks that the instance is in the cluster.
2262

2263
    """
2264
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2265
    assert self.instance is not None, \
2266
      "Cannot retrieve locked instance %s" % self.op.instance_name
2267

    
2268
  def Exec(self, feedback_fn):
2269
    """Shutdown the instance.
2270

2271
    """
2272
    instance = self.instance
2273
    node_current = instance.primary_node
2274
    self.cfg.MarkInstanceDown(instance.name)
2275
    if not rpc.call_instance_shutdown(node_current, instance):
2276
      logger.Error("could not shutdown instance")
2277

    
2278
    _ShutdownInstanceDisks(instance, self.cfg)
2279

    
2280

    
2281
class LUReinstallInstance(LogicalUnit):
2282
  """Reinstall an instance.
2283

2284
  """
2285
  HPATH = "instance-reinstall"
2286
  HTYPE = constants.HTYPE_INSTANCE
2287
  _OP_REQP = ["instance_name"]
2288
  REQ_BGL = False
2289

    
2290
  def ExpandNames(self):
2291
    self._ExpandAndLockInstance()
2292
    self.needed_locks[locking.LEVEL_NODE] = []
2293
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2294

    
2295
  def DeclareLocks(self, level):
2296
    if level == locking.LEVEL_NODE:
2297
      self._LockInstancesNodes()
2298

    
2299
  def BuildHooksEnv(self):
2300
    """Build hooks env.
2301

2302
    This runs on master, primary and secondary nodes of the instance.
2303

2304
    """
2305
    env = _BuildInstanceHookEnvByObject(self.instance)
2306
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2307
          list(self.instance.secondary_nodes))
2308
    return env, nl, nl
2309

    
2310
  def CheckPrereq(self):
2311
    """Check prerequisites.
2312

2313
    This checks that the instance is in the cluster and is not running.
2314

2315
    """
2316
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2317
    assert instance is not None, \
2318
      "Cannot retrieve locked instance %s" % self.op.instance_name
2319

    
2320
    if instance.disk_template == constants.DT_DISKLESS:
2321
      raise errors.OpPrereqError("Instance '%s' has no disks" %
2322
                                 self.op.instance_name)
2323
    if instance.status != "down":
2324
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2325
                                 self.op.instance_name)
2326
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2327
    if remote_info:
2328
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2329
                                 (self.op.instance_name,
2330
                                  instance.primary_node))
2331

    
2332
    self.op.os_type = getattr(self.op, "os_type", None)
2333
    if self.op.os_type is not None:
2334
      # OS verification
2335
      pnode = self.cfg.GetNodeInfo(
2336
        self.cfg.ExpandNodeName(instance.primary_node))
2337
      if pnode is None:
2338
        raise errors.OpPrereqError("Primary node '%s' is unknown" %
2339
                                   self.op.pnode)
2340
      os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2341
      if not os_obj:
2342
        raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2343
                                   " primary node"  % self.op.os_type)
2344

    
2345
    self.instance = instance
2346

    
2347
  def Exec(self, feedback_fn):
2348
    """Reinstall the instance.
2349

2350
    """
2351
    inst = self.instance
2352

    
2353
    if self.op.os_type is not None:
2354
      feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2355
      inst.os = self.op.os_type
2356
      self.cfg.AddInstance(inst)
2357

    
2358
    _StartInstanceDisks(self.cfg, inst, None)
2359
    try:
2360
      feedback_fn("Running the instance OS create scripts...")
2361
      if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2362
        raise errors.OpExecError("Could not install OS for instance %s"
2363
                                 " on node %s" %
2364
                                 (inst.name, inst.primary_node))
2365
    finally:
2366
      _ShutdownInstanceDisks(inst, self.cfg)
2367

    
2368

    
2369
class LURenameInstance(LogicalUnit):
2370
  """Rename an instance.
2371

2372
  """
2373
  HPATH = "instance-rename"
2374
  HTYPE = constants.HTYPE_INSTANCE
2375
  _OP_REQP = ["instance_name", "new_name"]
2376

    
2377
  def BuildHooksEnv(self):
2378
    """Build hooks env.
2379

2380
    This runs on master, primary and secondary nodes of the instance.
2381

2382
    """
2383
    env = _BuildInstanceHookEnvByObject(self.instance)
2384
    env["INSTANCE_NEW_NAME"] = self.op.new_name
2385
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2386
          list(self.instance.secondary_nodes))
2387
    return env, nl, nl
2388

    
2389
  def CheckPrereq(self):
2390
    """Check prerequisites.
2391

2392
    This checks that the instance is in the cluster and is not running.
2393

2394
    """
2395
    instance = self.cfg.GetInstanceInfo(
2396
      self.cfg.ExpandInstanceName(self.op.instance_name))
2397
    if instance is None:
2398
      raise errors.OpPrereqError("Instance '%s' not known" %
2399
                                 self.op.instance_name)
2400
    if instance.status != "down":
2401
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2402
                                 self.op.instance_name)
2403
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2404
    if remote_info:
2405
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2406
                                 (self.op.instance_name,
2407
                                  instance.primary_node))
2408
    self.instance = instance
2409

    
2410
    # new name verification
2411
    name_info = utils.HostInfo(self.op.new_name)
2412

    
2413
    self.op.new_name = new_name = name_info.name
2414
    instance_list = self.cfg.GetInstanceList()
2415
    if new_name in instance_list:
2416
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2417
                                 new_name)
2418

    
2419
    if not getattr(self.op, "ignore_ip", False):
2420
      if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
2421
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
2422
                                   (name_info.ip, new_name))
2423

    
2424

    
2425
  def Exec(self, feedback_fn):
2426
    """Reinstall the instance.
2427

2428
    """
2429
    inst = self.instance
2430
    old_name = inst.name
2431

    
2432
    if inst.disk_template == constants.DT_FILE:
2433
      old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2434

    
2435
    self.cfg.RenameInstance(inst.name, self.op.new_name)
2436
    # Change the instance lock. This is definitely safe while we hold the BGL
2437
    self.context.glm.remove(locking.LEVEL_INSTANCE, inst.name)
2438
    self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
2439

    
2440
    # re-read the instance from the configuration after rename
2441
    inst = self.cfg.GetInstanceInfo(self.op.new_name)
2442

    
2443
    if inst.disk_template == constants.DT_FILE:
2444
      new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2445
      result = rpc.call_file_storage_dir_rename(inst.primary_node,
2446
                                                old_file_storage_dir,
2447
                                                new_file_storage_dir)
2448

    
2449
      if not result:
2450
        raise errors.OpExecError("Could not connect to node '%s' to rename"
2451
                                 " directory '%s' to '%s' (but the instance"
2452
                                 " has been renamed in Ganeti)" % (
2453
                                 inst.primary_node, old_file_storage_dir,
2454
                                 new_file_storage_dir))
2455

    
2456
      if not result[0]:
2457
        raise errors.OpExecError("Could not rename directory '%s' to '%s'"
2458
                                 " (but the instance has been renamed in"
2459
                                 " Ganeti)" % (old_file_storage_dir,
2460
                                               new_file_storage_dir))
2461

    
2462
    _StartInstanceDisks(self.cfg, inst, None)
2463
    try:
2464
      if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2465
                                          "sda", "sdb"):
2466
        msg = ("Could not run OS rename script for instance %s on node %s"
2467
               " (but the instance has been renamed in Ganeti)" %
2468
               (inst.name, inst.primary_node))
2469
        logger.Error(msg)
2470
    finally:
2471
      _ShutdownInstanceDisks(inst, self.cfg)
2472

    
2473

    
2474
class LURemoveInstance(LogicalUnit):
2475
  """Remove an instance.
2476

2477
  """
2478
  HPATH = "instance-remove"
2479
  HTYPE = constants.HTYPE_INSTANCE
2480
  _OP_REQP = ["instance_name", "ignore_failures"]
2481

    
2482
  def BuildHooksEnv(self):
2483
    """Build hooks env.
2484

2485
    This runs on master, primary and secondary nodes of the instance.
2486

2487
    """
2488
    env = _BuildInstanceHookEnvByObject(self.instance)
2489
    nl = [self.sstore.GetMasterNode()]
2490
    return env, nl, nl
2491

    
2492
  def CheckPrereq(self):
2493
    """Check prerequisites.
2494

2495
    This checks that the instance is in the cluster.
2496

2497
    """
2498
    instance = self.cfg.GetInstanceInfo(
2499
      self.cfg.ExpandInstanceName(self.op.instance_name))
2500
    if instance is None:
2501
      raise errors.OpPrereqError("Instance '%s' not known" %
2502
                                 self.op.instance_name)
2503
    self.instance = instance
2504

    
2505
  def Exec(self, feedback_fn):
2506
    """Remove the instance.
2507

2508
    """
2509
    instance = self.instance
2510
    logger.Info("shutting down instance %s on node %s" %
2511
                (instance.name, instance.primary_node))
2512

    
2513
    if not rpc.call_instance_shutdown(instance.primary_node, instance):
2514
      if self.op.ignore_failures:
2515
        feedback_fn("Warning: can't shutdown instance")
2516
      else:
2517
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2518
                                 (instance.name, instance.primary_node))
2519

    
2520
    logger.Info("removing block devices for instance %s" % instance.name)
2521

    
2522
    if not _RemoveDisks(instance, self.cfg):
2523
      if self.op.ignore_failures:
2524
        feedback_fn("Warning: can't remove instance's disks")
2525
      else:
2526
        raise errors.OpExecError("Can't remove instance's disks")
2527

    
2528
    logger.Info("removing instance %s out of cluster config" % instance.name)
2529

    
2530
    self.cfg.RemoveInstance(instance.name)
2531
    # Remove the new instance from the Ganeti Lock Manager
2532
    self.context.glm.remove(locking.LEVEL_INSTANCE, instance.name)
2533

    
2534

    
2535
class LUQueryInstances(NoHooksLU):
2536
  """Logical unit for querying instances.
2537

2538
  """
2539
  _OP_REQP = ["output_fields", "names"]
2540
  REQ_BGL = False
2541

    
2542
  def ExpandNames(self):
2543
    self.dynamic_fields = frozenset(["oper_state", "oper_ram", "status"])
2544
    self.static_fields = frozenset([
2545
      "name", "os", "pnode", "snodes",
2546
      "admin_state", "admin_ram",
2547
      "disk_template", "ip", "mac", "bridge",
2548
      "sda_size", "sdb_size", "vcpus", "tags",
2549
      "auto_balance",
2550
      "network_port", "kernel_path", "initrd_path",
2551
      "hvm_boot_order", "hvm_acpi", "hvm_pae",
2552
      "hvm_cdrom_image_path", "hvm_nic_type",
2553
      "hvm_disk_type", "vnc_bind_address",
2554
      ])
2555
    _CheckOutputFields(static=self.static_fields,
2556
                       dynamic=self.dynamic_fields,
2557
                       selected=self.op.output_fields)
2558

    
2559
    self.needed_locks = {}
2560
    self.share_locks[locking.LEVEL_INSTANCE] = 1
2561
    self.share_locks[locking.LEVEL_NODE] = 1
2562

    
2563
    if self.op.names:
2564
      self.wanted = _GetWantedInstances(self, self.op.names)
2565
    else:
2566
      self.wanted = locking.ALL_SET
2567

    
2568
    self.do_locking = not self.static_fields.issuperset(self.op.output_fields)
2569
    if self.do_locking:
2570
      self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
2571
      self.needed_locks[locking.LEVEL_NODE] = []
2572
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2573

    
2574
  def DeclareLocks(self, level):
2575
    if level == locking.LEVEL_NODE and self.do_locking:
2576
      self._LockInstancesNodes()
2577

    
2578
  def CheckPrereq(self):
2579
    """Check prerequisites.
2580

2581
    """
2582
    pass
2583

    
2584
  def Exec(self, feedback_fn):
2585
    """Computes the list of nodes and their attributes.
2586

2587
    """
2588
    all_info = self.cfg.GetAllInstancesInfo()
2589
    if self.do_locking:
2590
      instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
2591
    else:
2592
      instance_names = all_info.keys()
2593
    instance_list = [all_info[iname] for iname in instance_names]
2594

    
2595
    # begin data gathering
2596

    
2597
    nodes = frozenset([inst.primary_node for inst in instance_list])
2598

    
2599
    bad_nodes = []
2600
    if self.dynamic_fields.intersection(self.op.output_fields):
2601
      live_data = {}
2602
      node_data = rpc.call_all_instances_info(nodes)
2603
      for name in nodes:
2604
        result = node_data[name]
2605
        if result:
2606
          live_data.update(result)
2607
        elif result == False:
2608
          bad_nodes.append(name)
2609
        # else no instance is alive
2610
    else:
2611
      live_data = dict([(name, {}) for name in instance_names])
2612

    
2613
    # end data gathering
2614

    
2615
    output = []
2616
    for instance in instance_list:
2617
      iout = []
2618
      for field in self.op.output_fields:
2619
        if field == "name":
2620
          val = instance.name
2621
        elif field == "os":
2622
          val = instance.os
2623
        elif field == "pnode":
2624
          val = instance.primary_node
2625
        elif field == "snodes":
2626
          val = list(instance.secondary_nodes)
2627
        elif field == "admin_state":
2628
          val = (instance.status != "down")
2629
        elif field == "oper_state":
2630
          if instance.primary_node in bad_nodes:
2631
            val = None
2632
          else:
2633
            val = bool(live_data.get(instance.name))
2634
        elif field == "status":
2635
          if instance.primary_node in bad_nodes:
2636
            val = "ERROR_nodedown"
2637
          else:
2638
            running = bool(live_data.get(instance.name))
2639
            if running:
2640
              if instance.status != "down":
2641
                val = "running"
2642
              else:
2643
                val = "ERROR_up"
2644
            else:
2645
              if instance.status != "down":
2646
                val = "ERROR_down"
2647
              else:
2648
                val = "ADMIN_down"
2649
        elif field == "admin_ram":
2650
          val = instance.memory
2651
        elif field == "oper_ram":
2652
          if instance.primary_node in bad_nodes:
2653
            val = None
2654
          elif instance.name in live_data:
2655
            val = live_data[instance.name].get("memory", "?")
2656
          else:
2657
            val = "-"
2658
        elif field == "disk_template":
2659
          val = instance.disk_template
2660
        elif field == "ip":
2661
          val = instance.nics[0].ip
2662
        elif field == "bridge":
2663
          val = instance.nics[0].bridge
2664
        elif field == "mac":
2665
          val = instance.nics[0].mac
2666
        elif field == "sda_size" or field == "sdb_size":
2667
          disk = instance.FindDisk(field[:3])
2668
          if disk is None:
2669
            val = None
2670
          else:
2671
            val = disk.size
2672
        elif field == "vcpus":
2673
          val = instance.vcpus
2674
        elif field == "tags":
2675
          val = list(instance.GetTags())
2676
        elif field in ("network_port", "kernel_path", "initrd_path",
2677
                       "hvm_boot_order", "hvm_acpi", "hvm_pae",
2678
                       "hvm_cdrom_image_path", "hvm_nic_type",
2679
                       "hvm_disk_type", "vnc_bind_address"):
2680
          val = getattr(instance, field, None)
2681
          if val is not None:
2682
            pass
2683
          elif field in ("hvm_nic_type", "hvm_disk_type",
2684
                         "kernel_path", "initrd_path"):
2685
            val = "default"
2686
          else:
2687
            val = "-"
2688
        else:
2689
          raise errors.ParameterError(field)
2690
        iout.append(val)
2691
      output.append(iout)
2692

    
2693
    return output
2694

    
2695

    
2696
class LUFailoverInstance(LogicalUnit):
2697
  """Failover an instance.
2698

2699
  """
2700
  HPATH = "instance-failover"
2701
  HTYPE = constants.HTYPE_INSTANCE
2702
  _OP_REQP = ["instance_name", "ignore_consistency"]
2703
  REQ_BGL = False
2704

    
2705
  def ExpandNames(self):
2706
    self._ExpandAndLockInstance()
2707
    self.needed_locks[locking.LEVEL_NODE] = []
2708
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2709

    
2710
  def DeclareLocks(self, level):
2711
    if level == locking.LEVEL_NODE:
2712
      self._LockInstancesNodes()
2713

    
2714
  def BuildHooksEnv(self):
2715
    """Build hooks env.
2716

2717
    This runs on master, primary and secondary nodes of the instance.
2718

2719
    """
2720
    env = {
2721
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2722
      }
2723
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2724
    nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2725
    return env, nl, nl
2726

    
2727
  def CheckPrereq(self):
2728
    """Check prerequisites.
2729

2730
    This checks that the instance is in the cluster.
2731

2732
    """
2733
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2734
    assert self.instance is not None, \
2735
      "Cannot retrieve locked instance %s" % self.op.instance_name
2736

    
2737
    if instance.disk_template not in constants.DTS_NET_MIRROR:
2738
      raise errors.OpPrereqError("Instance's disk layout is not"
2739
                                 " network mirrored, cannot failover.")
2740

    
2741
    secondary_nodes = instance.secondary_nodes
2742
    if not secondary_nodes:
2743
      raise errors.ProgrammerError("no secondary node but using "
2744
                                   "a mirrored disk template")
2745

    
2746
    target_node = secondary_nodes[0]
2747
    # check memory requirements on the secondary node
2748
    _CheckNodeFreeMemory(self.cfg, target_node, "failing over instance %s" %
2749
                         instance.name, instance.memory)
2750

    
2751
    # check bridge existance
2752
    brlist = [nic.bridge for nic in instance.nics]
2753
    if not rpc.call_bridges_exist(target_node, brlist):
2754
      raise errors.OpPrereqError("One or more target bridges %s does not"
2755
                                 " exist on destination node '%s'" %
2756
                                 (brlist, target_node))
2757

    
2758
  def Exec(self, feedback_fn):
2759
    """Failover an instance.
2760

2761
    The failover is done by shutting it down on its present node and
2762
    starting it on the secondary.
2763

2764
    """
2765
    instance = self.instance
2766

    
2767
    source_node = instance.primary_node
2768
    target_node = instance.secondary_nodes[0]
2769

    
2770
    feedback_fn("* checking disk consistency between source and target")
2771
    for dev in instance.disks:
2772
      # for drbd, these are drbd over lvm
2773
      if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2774
        if instance.status == "up" and not self.op.ignore_consistency:
2775
          raise errors.OpExecError("Disk %s is degraded on target node,"
2776
                                   " aborting failover." % dev.iv_name)
2777

    
2778
    feedback_fn("* shutting down instance on source node")
2779
    logger.Info("Shutting down instance %s on node %s" %
2780
                (instance.name, source_node))
2781

    
2782
    if not rpc.call_instance_shutdown(source_node, instance):
2783
      if self.op.ignore_consistency:
2784
        logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2785
                     " anyway. Please make sure node %s is down"  %
2786
                     (instance.name, source_node, source_node))
2787
      else:
2788
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2789
                                 (instance.name, source_node))
2790

    
2791
    feedback_fn("* deactivating the instance's disks on source node")
2792
    if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2793
      raise errors.OpExecError("Can't shut down the instance's disks.")
2794

    
2795
    instance.primary_node = target_node
2796
    # distribute new instance config to the other nodes
2797
    self.cfg.Update(instance)
2798

    
2799
    # Only start the instance if it's marked as up
2800
    if instance.status == "up":
2801
      feedback_fn("* activating the instance's disks on target node")
2802
      logger.Info("Starting instance %s on node %s" %
2803
                  (instance.name, target_node))
2804

    
2805
      disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2806
                                               ignore_secondaries=True)
2807
      if not disks_ok:
2808
        _ShutdownInstanceDisks(instance, self.cfg)
2809
        raise errors.OpExecError("Can't activate the instance's disks")
2810

    
2811
      feedback_fn("* starting the instance on the target node")
2812
      if not rpc.call_instance_start(target_node, instance, None):
2813
        _ShutdownInstanceDisks(instance, self.cfg)
2814
        raise errors.OpExecError("Could not start instance %s on node %s." %
2815
                                 (instance.name, target_node))
2816

    
2817

    
2818
def _CreateBlockDevOnPrimary(cfg, node, instance, device, info):
2819
  """Create a tree of block devices on the primary node.
2820

2821
  This always creates all devices.
2822

2823
  """
2824
  if device.children:
2825
    for child in device.children:
2826
      if not _CreateBlockDevOnPrimary(cfg, node, instance, child, info):
2827
        return False
2828

    
2829
  cfg.SetDiskID(device, node)
2830
  new_id = rpc.call_blockdev_create(node, device, device.size,
2831
                                    instance.name, True, info)
2832
  if not new_id:
2833
    return False
2834
  if device.physical_id is None:
2835
    device.physical_id = new_id
2836
  return True
2837

    
2838

    
2839
def _CreateBlockDevOnSecondary(cfg, node, instance, device, force, info):
2840
  """Create a tree of block devices on a secondary node.
2841

2842
  If this device type has to be created on secondaries, create it and
2843
  all its children.
2844

2845
  If not, just recurse to children keeping the same 'force' value.
2846

2847
  """
2848
  if device.CreateOnSecondary():
2849
    force = True
2850
  if device.children:
2851
    for child in device.children:
2852
      if not _CreateBlockDevOnSecondary(cfg, node, instance,
2853
                                        child, force, info):
2854
        return False
2855

    
2856
  if not force:
2857
    return True
2858
  cfg.SetDiskID(device, node)
2859
  new_id = rpc.call_blockdev_create(node, device, device.size,
2860
                                    instance.name, False, info)
2861
  if not new_id:
2862
    return False
2863
  if device.physical_id is None:
2864
    device.physical_id = new_id
2865
  return True
2866

    
2867

    
2868
def _GenerateUniqueNames(cfg, exts):
2869
  """Generate a suitable LV name.
2870

2871
  This will generate a logical volume name for the given instance.
2872

2873
  """
2874
  results = []
2875
  for val in exts:
2876
    new_id = cfg.GenerateUniqueID()
2877
    results.append("%s%s" % (new_id, val))
2878
  return results
2879

    
2880

    
2881
def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name):
2882
  """Generate a drbd8 device complete with its children.
2883

2884
  """
2885
  port = cfg.AllocatePort()
2886
  vgname = cfg.GetVGName()
2887
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2888
                          logical_id=(vgname, names[0]))
2889
  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2890
                          logical_id=(vgname, names[1]))
2891
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
2892
                          logical_id = (primary, secondary, port),
2893
                          children = [dev_data, dev_meta],
2894
                          iv_name=iv_name)
2895
  return drbd_dev
2896

    
2897

    
2898
def _GenerateDiskTemplate(cfg, template_name,
2899
                          instance_name, primary_node,
2900
                          secondary_nodes, disk_sz, swap_sz,
2901
                          file_storage_dir, file_driver):
2902
  """Generate the entire disk layout for a given template type.
2903

2904
  """
2905
  #TODO: compute space requirements
2906

    
2907
  vgname = cfg.GetVGName()
2908
  if template_name == constants.DT_DISKLESS:
2909
    disks = []
2910
  elif template_name == constants.DT_PLAIN:
2911
    if len(secondary_nodes) != 0:
2912
      raise errors.ProgrammerError("Wrong template configuration")
2913

    
2914
    names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
2915
    sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2916
                           logical_id=(vgname, names[0]),
2917
                           iv_name = "sda")
2918
    sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2919
                           logical_id=(vgname, names[1]),
2920
                           iv_name = "sdb")
2921
    disks = [sda_dev, sdb_dev]
2922
  elif template_name == constants.DT_DRBD8:
2923
    if len(secondary_nodes) != 1:
2924
      raise errors.ProgrammerError("Wrong template configuration")
2925
    remote_node = secondary_nodes[0]
2926
    names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2927
                                       ".sdb_data", ".sdb_meta"])
2928
    drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2929
                                         disk_sz, names[0:2], "sda")
2930
    drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2931
                                         swap_sz, names[2:4], "sdb")
2932
    disks = [drbd_sda_dev, drbd_sdb_dev]
2933
  elif template_name == constants.DT_FILE:
2934
    if len(secondary_nodes) != 0:
2935
      raise errors.ProgrammerError("Wrong template configuration")
2936

    
2937
    file_sda_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk_sz,
2938
                                iv_name="sda", logical_id=(file_driver,
2939
                                "%s/sda" % file_storage_dir))
2940
    file_sdb_dev = objects.Disk(dev_type=constants.LD_FILE, size=swap_sz,
2941
                                iv_name="sdb", logical_id=(file_driver,
2942
                                "%s/sdb" % file_storage_dir))
2943
    disks = [file_sda_dev, file_sdb_dev]
2944
  else:
2945
    raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
2946
  return disks
2947

    
2948

    
2949
def _GetInstanceInfoText(instance):
2950
  """Compute that text that should be added to the disk's metadata.
2951

2952
  """
2953
  return "originstname+%s" % instance.name
2954

    
2955

    
2956
def _CreateDisks(cfg, instance):
2957
  """Create all disks for an instance.
2958

2959
  This abstracts away some work from AddInstance.
2960

2961
  Args:
2962
    instance: the instance object
2963

2964
  Returns:
2965
    True or False showing the success of the creation process
2966

2967
  """
2968
  info = _GetInstanceInfoText(instance)
2969

    
2970
  if instance.disk_template == constants.DT_FILE:
2971
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
2972
    result = rpc.call_file_storage_dir_create(instance.primary_node,
2973
                                              file_storage_dir)
2974

    
2975
    if not result:
2976
      logger.Error("Could not connect to node '%s'" % instance.primary_node)
2977
      return False
2978

    
2979
    if not result[0]:
2980
      logger.Error("failed to create directory '%s'" % file_storage_dir)
2981
      return False
2982

    
2983
  for device in instance.disks:
2984
    logger.Info("creating volume %s for instance %s" %
2985
                (device.iv_name, instance.name))
2986
    #HARDCODE
2987
    for secondary_node in instance.secondary_nodes:
2988
      if not _CreateBlockDevOnSecondary(cfg, secondary_node, instance,
2989
                                        device, False, info):
2990
        logger.Error("failed to create volume %s (%s) on secondary node %s!" %
2991
                     (device.iv_name, device, secondary_node))
2992
        return False
2993
    #HARDCODE
2994
    if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
2995
                                    instance, device, info):
2996
      logger.Error("failed to create volume %s on primary!" %
2997
                   device.iv_name)
2998
      return False
2999

    
3000
  return True
3001

    
3002

    
3003
def _RemoveDisks(instance, cfg):
3004
  """Remove all disks for an instance.
3005

3006
  This abstracts away some work from `AddInstance()` and
3007
  `RemoveInstance()`. Note that in case some of the devices couldn't
3008
  be removed, the removal will continue with the other ones (compare
3009
  with `_CreateDisks()`).
3010

3011
  Args:
3012
    instance: the instance object
3013

3014
  Returns:
3015
    True or False showing the success of the removal proces
3016

3017
  """
3018
  logger.Info("removing block devices for instance %s" % instance.name)
3019

    
3020
  result = True
3021
  for device in instance.disks:
3022
    for node, disk in device.ComputeNodeTree(instance.primary_node):
3023
      cfg.SetDiskID(disk, node)
3024
      if not rpc.call_blockdev_remove(node, disk):
3025
        logger.Error("could not remove block device %s on node %s,"
3026
                     " continuing anyway" %
3027
                     (device.iv_name, node))
3028
        result = False
3029

    
3030
  if instance.disk_template == constants.DT_FILE:
3031
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3032
    if not rpc.call_file_storage_dir_remove(instance.primary_node,
3033
                                            file_storage_dir):
3034
      logger.Error("could not remove directory '%s'" % file_storage_dir)
3035
      result = False
3036

    
3037
  return result
3038

    
3039

    
3040
def _ComputeDiskSize(disk_template, disk_size, swap_size):
3041
  """Compute disk size requirements in the volume group
3042

3043
  This is currently hard-coded for the two-drive layout.
3044

3045
  """
3046
  # Required free disk space as a function of disk and swap space
3047
  req_size_dict = {
3048
    constants.DT_DISKLESS: None,
3049
    constants.DT_PLAIN: disk_size + swap_size,
3050
    # 256 MB are added for drbd metadata, 128MB for each drbd device
3051
    constants.DT_DRBD8: disk_size + swap_size + 256,
3052
    constants.DT_FILE: None,
3053
  }
3054

    
3055
  if disk_template not in req_size_dict:
3056
    raise errors.ProgrammerError("Disk template '%s' size requirement"
3057
                                 " is unknown" %  disk_template)
3058

    
3059
  return req_size_dict[disk_template]
3060

    
3061

    
3062
class LUCreateInstance(LogicalUnit):
3063
  """Create an instance.
3064

3065
  """
3066
  HPATH = "instance-add"
3067
  HTYPE = constants.HTYPE_INSTANCE
3068
  _OP_REQP = ["instance_name", "mem_size", "disk_size",
3069
              "disk_template", "swap_size", "mode", "start", "vcpus",
3070
              "wait_for_sync", "ip_check", "mac"]
3071

    
3072
  def _RunAllocator(self):
3073
    """Run the allocator based on input opcode.
3074

3075
    """
3076
    disks = [{"size": self.op.disk_size, "mode": "w"},
3077
             {"size": self.op.swap_size, "mode": "w"}]
3078
    nics = [{"mac": self.op.mac, "ip": getattr(self.op, "ip", None),
3079
             "bridge": self.op.bridge}]
3080
    ial = IAllocator(self.cfg, self.sstore,
3081
                     mode=constants.IALLOCATOR_MODE_ALLOC,
3082
                     name=self.op.instance_name,
3083
                     disk_template=self.op.disk_template,
3084
                     tags=[],
3085
                     os=self.op.os_type,
3086
                     vcpus=self.op.vcpus,
3087
                     mem_size=self.op.mem_size,
3088
                     disks=disks,
3089
                     nics=nics,
3090
                     )
3091

    
3092
    ial.Run(self.op.iallocator)
3093

    
3094
    if not ial.success:
3095
      raise errors.OpPrereqError("Can't compute nodes using"
3096
                                 " iallocator '%s': %s" % (self.op.iallocator,
3097
                                                           ial.info))
3098
    if len(ial.nodes) != ial.required_nodes:
3099
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3100
                                 " of nodes (%s), required %s" %
3101
                                 (len(ial.nodes), ial.required_nodes))
3102
    self.op.pnode = ial.nodes[0]
3103
    logger.ToStdout("Selected nodes for the instance: %s" %
3104
                    (", ".join(ial.nodes),))
3105
    logger.Info("Selected nodes for instance %s via iallocator %s: %s" %
3106
                (self.op.instance_name, self.op.iallocator, ial.nodes))
3107
    if ial.required_nodes == 2:
3108
      self.op.snode = ial.nodes[1]
3109

    
3110
  def BuildHooksEnv(self):
3111
    """Build hooks env.
3112

3113
    This runs on master, primary and secondary nodes of the instance.
3114

3115
    """
3116
    env = {
3117
      "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
3118
      "INSTANCE_DISK_SIZE": self.op.disk_size,
3119
      "INSTANCE_SWAP_SIZE": self.op.swap_size,
3120
      "INSTANCE_ADD_MODE": self.op.mode,
3121
      }
3122
    if self.op.mode == constants.INSTANCE_IMPORT:
3123
      env["INSTANCE_SRC_NODE"] = self.op.src_node
3124
      env["INSTANCE_SRC_PATH"] = self.op.src_path
3125
      env["INSTANCE_SRC_IMAGE"] = self.src_image
3126

    
3127
    env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
3128
      primary_node=self.op.pnode,
3129
      secondary_nodes=self.secondaries,
3130
      status=self.instance_status,
3131
      os_type=self.op.os_type,
3132
      memory=self.op.mem_size,
3133
      vcpus=self.op.vcpus,
3134
      nics=[(self.inst_ip, self.op.bridge, self.op.mac)],
3135
    ))
3136

    
3137
    nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
3138
          self.secondaries)
3139
    return env, nl, nl
3140

    
3141

    
3142
  def CheckPrereq(self):
3143
    """Check prerequisites.
3144

3145
    """
3146
    # set optional parameters to none if they don't exist
3147
    for attr in ["kernel_path", "initrd_path", "hvm_boot_order", "pnode",
3148
                 "iallocator", "hvm_acpi", "hvm_pae", "hvm_cdrom_image_path",
3149
                 "hvm_nic_type", "hvm_disk_type", "vnc_bind_address"]:
3150
      if not hasattr(self.op, attr):
3151
        setattr(self.op, attr, None)
3152

    
3153
    if self.op.mode not in (constants.INSTANCE_CREATE,
3154
                            constants.INSTANCE_IMPORT):
3155
      raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
3156
                                 self.op.mode)
3157

    
3158
    if (not self.cfg.GetVGName() and
3159
        self.op.disk_template not in constants.DTS_NOT_LVM):
3160
      raise errors.OpPrereqError("Cluster does not support lvm-based"
3161
                                 " instances")
3162

    
3163
    if self.op.mode == constants.INSTANCE_IMPORT:
3164
      src_node = getattr(self.op, "src_node", None)
3165
      src_path = getattr(self.op, "src_path", None)
3166
      if src_node is None or src_path is None:
3167
        raise errors.OpPrereqError("Importing an instance requires source"
3168
                                   " node and path options")
3169
      src_node_full = self.cfg.ExpandNodeName(src_node)
3170
      if src_node_full is None:
3171
        raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
3172
      self.op.src_node = src_node = src_node_full
3173

    
3174
      if not os.path.isabs(src_path):
3175
        raise errors.OpPrereqError("The source path must be absolute")
3176

    
3177
      export_info = rpc.call_export_info(src_node, src_path)
3178

    
3179
      if not export_info:
3180
        raise errors.OpPrereqError("No export found in dir %s" % src_path)
3181

    
3182
      if not export_info.has_section(constants.INISECT_EXP):
3183
        raise errors.ProgrammerError("Corrupted export config")
3184

    
3185
      ei_version = export_info.get(constants.INISECT_EXP, 'version')
3186
      if (int(ei_version) != constants.EXPORT_VERSION):
3187
        raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3188
                                   (ei_version, constants.EXPORT_VERSION))
3189

    
3190
      if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
3191
        raise errors.OpPrereqError("Can't import instance with more than"
3192
                                   " one data disk")
3193

    
3194
      # FIXME: are the old os-es, disk sizes, etc. useful?
3195
      self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3196
      diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
3197
                                                         'disk0_dump'))
3198
      self.src_image = diskimage
3199
    else: # INSTANCE_CREATE
3200
      if getattr(self.op, "os_type", None) is None:
3201
        raise errors.OpPrereqError("No guest OS specified")
3202

    
3203
    #### instance parameters check
3204

    
3205
    # disk template and mirror node verification
3206
    if self.op.disk_template not in constants.DISK_TEMPLATES:
3207
      raise errors.OpPrereqError("Invalid disk template name")
3208

    
3209
    # instance name verification
3210
    hostname1 = utils.HostInfo(self.op.instance_name)
3211

    
3212
    self.op.instance_name = instance_name = hostname1.name
3213
    instance_list = self.cfg.GetInstanceList()
3214
    if instance_name in instance_list:
3215
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3216
                                 instance_name)
3217

    
3218
    # ip validity checks
3219
    ip = getattr(self.op, "ip", None)
3220
    if ip is None or ip.lower() == "none":
3221
      inst_ip = None
3222
    elif ip.lower() == "auto":
3223
      inst_ip = hostname1.ip
3224
    else:
3225
      if not utils.IsValidIP(ip):
3226
        raise errors.OpPrereqError("given IP address '%s' doesn't look"
3227
                                   " like a valid IP" % ip)
3228
      inst_ip = ip
3229
    self.inst_ip = self.op.ip = inst_ip
3230

    
3231
    if self.op.start and not self.op.ip_check:
3232
      raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3233
                                 " adding an instance in start mode")
3234

    
3235
    if self.op.ip_check:
3236
      if utils.TcpPing(hostname1.ip, constants.DEFAULT_NODED_PORT):
3237
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
3238
                                   (hostname1.ip, instance_name))
3239

    
3240
    # MAC address verification
3241
    if self.op.mac != "auto":
3242
      if not utils.IsValidMac(self.op.mac.lower()):
3243
        raise errors.OpPrereqError("invalid MAC address specified: %s" %
3244
                                   self.op.mac)
3245

    
3246
    # bridge verification
3247
    bridge = getattr(self.op, "bridge", None)
3248
    if bridge is None:
3249
      self.op.bridge = self.cfg.GetDefBridge()
3250
    else:
3251
      self.op.bridge = bridge
3252

    
3253
    # boot order verification
3254
    if self.op.hvm_boot_order is not None:
3255
      if len(self.op.hvm_boot_order.strip("acdn")) != 0:
3256
        raise errors.OpPrereqError("invalid boot order specified,"
3257
                                   " must be one or more of [acdn]")
3258
    # file storage checks
3259
    if (self.op.file_driver and
3260
        not self.op.file_driver in constants.FILE_DRIVER):
3261
      raise errors.OpPrereqError("Invalid file driver name '%s'" %
3262
                                 self.op.file_driver)
3263

    
3264
    if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
3265
      raise errors.OpPrereqError("File storage directory not a relative"
3266
                                 " path")
3267
    #### allocator run
3268

    
3269
    if [self.op.iallocator, self.op.pnode].count(None) != 1:
3270
      raise errors.OpPrereqError("One and only one of iallocator and primary"
3271
                                 " node must be given")
3272

    
3273
    if self.op.iallocator is not None:
3274
      self._RunAllocator()
3275

    
3276
    #### node related checks
3277

    
3278
    # check primary node
3279
    pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
3280
    if pnode is None:
3281
      raise errors.OpPrereqError("Primary node '%s' is unknown" %
3282
                                 self.op.pnode)
3283
    self.op.pnode = pnode.name
3284
    self.pnode = pnode
3285
    self.secondaries = []
3286

    
3287
    # mirror node verification
3288
    if self.op.disk_template in constants.DTS_NET_MIRROR:
3289
      if getattr(self.op, "snode", None) is None:
3290
        raise errors.OpPrereqError("The networked disk templates need"
3291
                                   " a mirror node")
3292

    
3293
      snode_name = self.cfg.ExpandNodeName(self.op.snode)
3294
      if snode_name is None:
3295
        raise errors.OpPrereqError("Unknown secondary node '%s'" %
3296
                                   self.op.snode)
3297
      elif snode_name == pnode.name:
3298
        raise errors.OpPrereqError("The secondary node cannot be"
3299
                                   " the primary node.")
3300
      self.secondaries.append(snode_name)
3301

    
3302
    req_size = _ComputeDiskSize(self.op.disk_template,
3303
                                self.op.disk_size, self.op.swap_size)
3304

    
3305
    # Check lv size requirements
3306
    if req_size is not None:
3307
      nodenames = [pnode.name] + self.secondaries
3308
      nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
3309
      for node in nodenames:
3310
        info = nodeinfo.get(node, None)
3311
        if not info:
3312
          raise errors.OpPrereqError("Cannot get current information"
3313
                                     " from node '%s'" % node)
3314
        vg_free = info.get('vg_free', None)
3315
        if not isinstance(vg_free, int):
3316
          raise errors.OpPrereqError("Can't compute free disk space on"
3317
                                     " node %s" % node)
3318
        if req_size > info['vg_free']:
3319
          raise errors.OpPrereqError("Not enough disk space on target node %s."
3320
                                     " %d MB available, %d MB required" %
3321
                                     (node, info['vg_free'], req_size))
3322

    
3323
    # os verification
3324
    os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
3325
    if not os_obj:
3326
      raise errors.OpPrereqError("OS '%s' not in supported os list for"
3327
                                 " primary node"  % self.op.os_type)
3328

    
3329
    if self.op.kernel_path == constants.VALUE_NONE:
3330
      raise errors.OpPrereqError("Can't set instance kernel to none")
3331

    
3332

    
3333
    # bridge check on primary node
3334
    if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3335
      raise errors.OpPrereqError("target bridge '%s' does not exist on"
3336
                                 " destination node '%s'" %
3337
                                 (self.op.bridge, pnode.name))
3338

    
3339
    # memory check on primary node
3340
    if self.op.start:
3341
      _CheckNodeFreeMemory(self.cfg, self.pnode.name,
3342
                           "creating instance %s" % self.op.instance_name,
3343
                           self.op.mem_size)
3344

    
3345
    # hvm_cdrom_image_path verification
3346
    if self.op.hvm_cdrom_image_path is not None:
3347
      if not os.path.isabs(self.op.hvm_cdrom_image_path):
3348
        raise errors.OpPrereqError("The path to the HVM CDROM image must"
3349
                                   " be an absolute path or None, not %s" %
3350
                                   self.op.hvm_cdrom_image_path)
3351
      if not os.path.isfile(self.op.hvm_cdrom_image_path):
3352
        raise errors.OpPrereqError("The HVM CDROM image must either be a"
3353
                                   " regular file or a symlink pointing to"
3354
                                   " an existing regular file, not %s" %
3355
                                   self.op.hvm_cdrom_image_path)
3356

    
3357
    # vnc_bind_address verification
3358
    if self.op.vnc_bind_address is not None:
3359
      if not utils.IsValidIP(self.op.vnc_bind_address):
3360
        raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
3361
                                   " like a valid IP address" %
3362
                                   self.op.vnc_bind_address)
3363

    
3364
    # Xen HVM device type checks
3365
    if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
3366
      if self.op.hvm_nic_type not in constants.HT_HVM_VALID_NIC_TYPES:
3367
        raise errors.OpPrereqError("Invalid NIC type %s specified for Xen HVM"
3368
                                   " hypervisor" % self.op.hvm_nic_type)
3369
      if self.op.hvm_disk_type not in constants.HT_HVM_VALID_DISK_TYPES:
3370
        raise errors.OpPrereqError("Invalid disk type %s specified for Xen HVM"
3371
                                   " hypervisor" % self.op.hvm_disk_type)
3372

    
3373
    if self.op.start:
3374
      self.instance_status = 'up'
3375
    else:
3376
      self.instance_status = 'down'
3377

    
3378
  def Exec(self, feedback_fn):
3379
    """Create and add the instance to the cluster.
3380

3381
    """
3382
    instance = self.op.instance_name
3383
    pnode_name = self.pnode.name
3384

    
3385
    if self.op.mac == "auto":
3386
      mac_address = self.cfg.GenerateMAC()
3387
    else:
3388
      mac_address = self.op.mac
3389

    
3390
    nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3391
    if self.inst_ip is not None:
3392
      nic.ip = self.inst_ip
3393

    
3394
    ht_kind = self.sstore.GetHypervisorType()
3395
    if ht_kind in constants.HTS_REQ_PORT:
3396
      network_port = self.cfg.AllocatePort()
3397
    else:
3398
      network_port = None
3399

    
3400
    if self.op.vnc_bind_address is None:
3401
      self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
3402

    
3403
    # this is needed because os.path.join does not accept None arguments
3404
    if self.op.file_storage_dir is None:
3405
      string_file_storage_dir = ""
3406
    else:
3407
      string_file_storage_dir = self.op.file_storage_dir
3408

    
3409
    # build the full file storage dir path
3410
    file_storage_dir = os.path.normpath(os.path.join(
3411
                                        self.sstore.GetFileStorageDir(),
3412
                                        string_file_storage_dir, instance))
3413

    
3414

    
3415
    disks = _GenerateDiskTemplate(self.cfg,
3416
                                  self.op.disk_template,
3417
                                  instance, pnode_name,
3418
                                  self.secondaries, self.op.disk_size,
3419
                                  self.op.swap_size,
3420
                                  file_storage_dir,
3421
                                  self.op.file_driver)
3422

    
3423
    iobj = objects.Instance(name=instance, os=self.op.os_type,
3424
                            primary_node=pnode_name,
3425
                            memory=self.op.mem_size,
3426
                            vcpus=self.op.vcpus,
3427
                            nics=[nic], disks=disks,
3428
                            disk_template=self.op.disk_template,
3429
                            status=self.instance_status,
3430
                            network_port=network_port,
3431
                            kernel_path=self.op.kernel_path,
3432
                            initrd_path=self.op.initrd_path,
3433
                            hvm_boot_order=self.op.hvm_boot_order,
3434
                            hvm_acpi=self.op.hvm_acpi,
3435
                            hvm_pae=self.op.hvm_pae,
3436
                            hvm_cdrom_image_path=self.op.hvm_cdrom_image_path,
3437
                            vnc_bind_address=self.op.vnc_bind_address,
3438
                            hvm_nic_type=self.op.hvm_nic_type,
3439
                            hvm_disk_type=self.op.hvm_disk_type,
3440
                            )
3441

    
3442
    feedback_fn("* creating instance disks...")
3443
    if not _CreateDisks(self.cfg, iobj):
3444
      _RemoveDisks(iobj, self.cfg)
3445
      raise errors.OpExecError("Device creation failed, reverting...")
3446

    
3447
    feedback_fn("adding instance %s to cluster config" % instance)
3448

    
3449
    self.cfg.AddInstance(iobj)
3450
    # Add the new instance to the Ganeti Lock Manager
3451
    self.context.glm.add(locking.LEVEL_INSTANCE, instance)
3452

    
3453
    if self.op.wait_for_sync:
3454
      disk_abort = not _WaitForSync(self.cfg, iobj, self.proc)
3455
    elif iobj.disk_template in constants.DTS_NET_MIRROR:
3456
      # make sure the disks are not degraded (still sync-ing is ok)
3457
      time.sleep(15)
3458
      feedback_fn("* checking mirrors status")
3459
      disk_abort = not _WaitForSync(self.cfg, iobj, self.proc, oneshot=True)
3460
    else:
3461
      disk_abort = False
3462

    
3463
    if disk_abort:
3464
      _RemoveDisks(iobj, self.cfg)
3465
      self.cfg.RemoveInstance(iobj.name)
3466
      # Remove the new instance from the Ganeti Lock Manager
3467
      self.context.glm.remove(locking.LEVEL_INSTANCE, iobj.name)
3468
      raise errors.OpExecError("There are some degraded disks for"
3469
                               " this instance")
3470

    
3471
    feedback_fn("creating os for instance %s on node %s" %
3472
                (instance, pnode_name))
3473

    
3474
    if iobj.disk_template != constants.DT_DISKLESS:
3475
      if self.op.mode == constants.INSTANCE_CREATE:
3476
        feedback_fn("* running the instance OS create scripts...")
3477
        if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3478
          raise errors.OpExecError("could not add os for instance %s"
3479
                                   " on node %s" %
3480
                                   (instance, pnode_name))
3481

    
3482
      elif self.op.mode == constants.INSTANCE_IMPORT:
3483
        feedback_fn("* running the instance OS import scripts...")
3484
        src_node = self.op.src_node
3485
        src_image = self.src_image
3486
        if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3487
                                                src_node, src_image):
3488
          raise errors.OpExecError("Could not import os for instance"
3489
                                   " %s on node %s" %
3490
                                   (instance, pnode_name))
3491
      else:
3492
        # also checked in the prereq part
3493
        raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3494
                                     % self.op.mode)
3495

    
3496
    if self.op.start:
3497
      logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3498
      feedback_fn("* starting instance...")
3499
      if not rpc.call_instance_start(pnode_name, iobj, None):
3500
        raise errors.OpExecError("Could not start instance")
3501

    
3502

    
3503
class LUConnectConsole(NoHooksLU):
3504
  """Connect to an instance's console.
3505

3506
  This is somewhat special in that it returns the command line that
3507
  you need to run on the master node in order to connect to the
3508
  console.
3509

3510
  """
3511
  _OP_REQP = ["instance_name"]
3512
  REQ_BGL = False
3513

    
3514
  def ExpandNames(self):
3515
    self._ExpandAndLockInstance()
3516

    
3517
  def CheckPrereq(self):
3518
    """Check prerequisites.
3519

3520
    This checks that the instance is in the cluster.
3521

3522
    """
3523
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3524
    assert self.instance is not None, \
3525
      "Cannot retrieve locked instance %s" % self.op.instance_name
3526

    
3527
  def Exec(self, feedback_fn):
3528
    """Connect to the console of an instance
3529

3530
    """
3531
    instance = self.instance
3532
    node = instance.primary_node
3533

    
3534
    node_insts = rpc.call_instance_list([node])[node]
3535
    if node_insts is False:
3536
      raise errors.OpExecError("Can't connect to node %s." % node)
3537

    
3538
    if instance.name not in node_insts:
3539
      raise errors.OpExecError("Instance %s is not running." % instance.name)
3540

    
3541
    logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3542

    
3543
    hyper = hypervisor.GetHypervisor()
3544
    console_cmd = hyper.GetShellCommandForConsole(instance)
3545

    
3546
    # build ssh cmdline
3547
    return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
3548

    
3549

    
3550
class LUReplaceDisks(LogicalUnit):
3551
  """Replace the disks of an instance.
3552

3553
  """
3554
  HPATH = "mirrors-replace"
3555
  HTYPE = constants.HTYPE_INSTANCE
3556
  _OP_REQP = ["instance_name", "mode", "disks"]
3557
  REQ_BGL = False
3558

    
3559
  def ExpandNames(self):
3560
    self._ExpandAndLockInstance()
3561

    
3562
    if not hasattr(self.op, "remote_node"):
3563
      self.op.remote_node = None
3564

    
3565
    ia_name = getattr(self.op, "iallocator", None)
3566
    if ia_name is not None:
3567
      if self.op.remote_node is not None:
3568
        raise errors.OpPrereqError("Give either the iallocator or the new"
3569
                                   " secondary, not both")
3570
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3571
    elif self.op.remote_node is not None:
3572
      remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
3573
      if remote_node is None:
3574
        raise errors.OpPrereqError("Node '%s' not known" %
3575
                                   self.op.remote_node)
3576
      self.op.remote_node = remote_node
3577
      self.needed_locks[locking.LEVEL_NODE] = [remote_node]
3578
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
3579
    else:
3580
      self.needed_locks[locking.LEVEL_NODE] = []
3581
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3582

    
3583
  def DeclareLocks(self, level):
3584
    # If we're not already locking all nodes in the set we have to declare the
3585
    # instance's primary/secondary nodes.
3586
    if (level == locking.LEVEL_NODE and
3587
        self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
3588
      self._LockInstancesNodes()
3589

    
3590
  def _RunAllocator(self):
3591
    """Compute a new secondary node using an IAllocator.
3592

3593
    """
3594
    ial = IAllocator(self.cfg, self.sstore,
3595
                     mode=constants.IALLOCATOR_MODE_RELOC,
3596
                     name=self.op.instance_name,
3597
                     relocate_from=[self.sec_node])
3598

    
3599
    ial.Run(self.op.iallocator)
3600

    
3601
    if not ial.success:
3602
      raise errors.OpPrereqError("Can't compute nodes using"
3603
                                 " iallocator '%s': %s" % (self.op.iallocator,
3604
                                                           ial.info))
3605
    if len(ial.nodes) != ial.required_nodes:
3606
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3607
                                 " of nodes (%s), required %s" %
3608
                                 (len(ial.nodes), ial.required_nodes))
3609
    self.op.remote_node = ial.nodes[0]
3610
    logger.ToStdout("Selected new secondary for the instance: %s" %
3611
                    self.op.remote_node)
3612

    
3613
  def BuildHooksEnv(self):
3614
    """Build hooks env.
3615

3616
    This runs on the master, the primary and all the secondaries.
3617

3618
    """
3619
    env = {
3620
      "MODE": self.op.mode,
3621
      "NEW_SECONDARY": self.op.remote_node,
3622
      "OLD_SECONDARY": self.instance.secondary_nodes[0],
3623
      }
3624
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3625
    nl = [
3626
      self.sstore.GetMasterNode(),
3627
      self.instance.primary_node,
3628
      ]
3629
    if self.op.remote_node is not None:
3630
      nl.append(self.op.remote_node)
3631
    return env, nl, nl
3632

    
3633
  def CheckPrereq(self):
3634
    """Check prerequisites.
3635

3636
    This checks that the instance is in the cluster.
3637

3638
    """
3639
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3640
    assert instance is not None, \
3641
      "Cannot retrieve locked instance %s" % self.op.instance_name
3642
    self.instance = instance
3643

    
3644
    if instance.disk_template not in constants.DTS_NET_MIRROR:
3645
      raise errors.OpPrereqError("Instance's disk layout is not"
3646
                                 " network mirrored.")
3647

    
3648
    if len(instance.secondary_nodes) != 1:
3649
      raise errors.OpPrereqError("The instance has a strange layout,"
3650
                                 " expected one secondary but found %d" %
3651
                                 len(instance.secondary_nodes))
3652

    
3653
    self.sec_node = instance.secondary_nodes[0]
3654

    
3655
    ia_name = getattr(self.op, "iallocator", None)
3656
    if ia_name is not None:
3657
      self._RunAllocator()
3658

    
3659
    remote_node = self.op.remote_node
3660
    if remote_node is not None:
3661
      self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3662
      assert self.remote_node_info is not None, \
3663
        "Cannot retrieve locked node %s" % remote_node
3664
    else:
3665
      self.remote_node_info = None
3666
    if remote_node == instance.primary_node:
3667
      raise errors.OpPrereqError("The specified node is the primary node of"
3668
                                 " the instance.")
3669
    elif remote_node == self.sec_node:
3670
      if self.op.mode == constants.REPLACE_DISK_SEC:
3671
        # this is for DRBD8, where we can't execute the same mode of
3672
        # replacement as for drbd7 (no different port allocated)
3673
        raise errors.OpPrereqError("Same secondary given, cannot execute"
3674
                                   " replacement")
3675
    if instance.disk_template == constants.DT_DRBD8:
3676
      if (self.op.mode == constants.REPLACE_DISK_ALL and
3677
          remote_node is not None):
3678
        # switch to replace secondary mode
3679
        self.op.mode = constants.REPLACE_DISK_SEC
3680

    
3681
      if self.op.mode == constants.REPLACE_DISK_ALL:
3682
        raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3683
                                   " secondary disk replacement, not"
3684
                                   " both at once")
3685
      elif self.op.mode == constants.REPLACE_DISK_PRI:
3686
        if remote_node is not None:
3687
          raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3688
                                     " the secondary while doing a primary"
3689
                                     " node disk replacement")
3690
        self.tgt_node = instance.primary_node
3691
        self.oth_node = instance.secondary_nodes[0]
3692
      elif self.op.mode == constants.REPLACE_DISK_SEC:
3693
        self.new_node = remote_node # this can be None, in which case
3694
                                    # we don't change the secondary
3695
        self.tgt_node = instance.secondary_nodes[0]
3696
        self.oth_node = instance.primary_node
3697
      else:
3698
        raise errors.ProgrammerError("Unhandled disk replace mode")
3699

    
3700
    for name in self.op.disks:
3701
      if instance.FindDisk(name) is None:
3702
        raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3703
                                   (name, instance.name))
3704

    
3705
  def _ExecD8DiskOnly(self, feedback_fn):
3706
    """Replace a disk on the primary or secondary for dbrd8.
3707

3708
    The algorithm for replace is quite complicated:
3709
      - for each disk to be replaced:
3710
        - create new LVs on the target node with unique names
3711
        - detach old LVs from the drbd device
3712
        - rename old LVs to name_replaced.<time_t>
3713
        - rename new LVs to old LVs
3714
        - attach the new LVs (with the old names now) to the drbd device
3715
      - wait for sync across all devices
3716
      - for each modified disk:
3717
        - remove old LVs (which have the name name_replaces.<time_t>)
3718

3719
    Failures are not very well handled.
3720

3721
    """
3722
    steps_total = 6
3723
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3724
    instance = self.instance
3725
    iv_names = {}
3726
    vgname = self.cfg.GetVGName()
3727
    # start of work
3728
    cfg = self.cfg
3729
    tgt_node = self.tgt_node
3730
    oth_node = self.oth_node
3731

    
3732
    # Step: check device activation
3733
    self.proc.LogStep(1, steps_total, "check device existence")
3734
    info("checking volume groups")
3735
    my_vg = cfg.GetVGName()
3736
    results = rpc.call_vg_list([oth_node, tgt_node])
3737
    if not results:
3738
      raise errors.OpExecError("Can't list volume groups on the nodes")
3739
    for node in oth_node, tgt_node:
3740
      res = results.get(node, False)
3741
      if not res or my_vg not in res:
3742
        raise errors.OpExecError("Volume group '%s' not found on %s" %
3743
                                 (my_vg, node))
3744
    for dev in instance.disks:
3745
      if not dev.iv_name in self.op.disks:
3746
        continue
3747
      for node in tgt_node, oth_node:
3748
        info("checking %s on %s" % (dev.iv_name, node))
3749
        cfg.SetDiskID(dev, node)
3750
        if not rpc.call_blockdev_find(node, dev):
3751
          raise errors.OpExecError("Can't find device %s on node %s" %
3752
                                   (dev.iv_name, node))
3753

    
3754
    # Step: check other node consistency
3755
    self.proc.LogStep(2, steps_total, "check peer consistency")
3756
    for dev in instance.disks:
3757
      if not dev.iv_name in self.op.disks:
3758
        continue
3759
      info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3760
      if not _CheckDiskConsistency(self.cfg, dev, oth_node,
3761
                                   oth_node==instance.primary_node):
3762
        raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3763
                                 " to replace disks on this node (%s)" %
3764
                                 (oth_node, tgt_node))
3765

    
3766
    # Step: create new storage
3767
    self.proc.LogStep(3, steps_total, "allocate new storage")
3768
    for dev in instance.disks:
3769
      if not dev.iv_name in self.op.disks:
3770
        continue
3771
      size = dev.size
3772
      cfg.SetDiskID(dev, tgt_node)
3773
      lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3774
      names = _GenerateUniqueNames(cfg, lv_names)
3775
      lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3776
                             logical_id=(vgname, names[0]))
3777
      lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3778
                             logical_id=(vgname, names[1]))
3779
      new_lvs = [lv_data, lv_meta]
3780
      old_lvs = dev.children
3781
      iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3782
      info("creating new local storage on %s for %s" %
3783
           (tgt_node, dev.iv_name))
3784
      # since we *always* want to create this LV, we use the
3785
      # _Create...OnPrimary (which forces the creation), even if we
3786
      # are talking about the secondary node
3787
      for new_lv in new_lvs:
3788
        if not _CreateBlockDevOnPrimary(cfg, tgt_node, instance, new_lv,
3789
                                        _GetInstanceInfoText(instance)):
3790
          raise errors.OpExecError("Failed to create new LV named '%s' on"
3791
                                   " node '%s'" %
3792
                                   (new_lv.logical_id[1], tgt_node))
3793

    
3794
    # Step: for each lv, detach+rename*2+attach
3795
    self.proc.LogStep(4, steps_total, "change drbd configuration")
3796
    for dev, old_lvs, new_lvs in iv_names.itervalues():
3797
      info("detaching %s drbd from local storage" % dev.iv_name)
3798
      if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
3799
        raise errors.OpExecError("Can't detach drbd from local storage on node"
3800
                                 " %s for device %s" % (tgt_node, dev.iv_name))
3801
      #dev.children = []
3802
      #cfg.Update(instance)
3803

    
3804
      # ok, we created the new LVs, so now we know we have the needed
3805
      # storage; as such, we proceed on the target node to rename
3806
      # old_lv to _old, and new_lv to old_lv; note that we rename LVs
3807
      # using the assumption that logical_id == physical_id (which in
3808
      # turn is the unique_id on that node)
3809

    
3810
      # FIXME(iustin): use a better name for the replaced LVs
3811
      temp_suffix = int(time.time())
3812
      ren_fn = lambda d, suff: (d.physical_id[0],
3813
                                d.physical_id[1] + "_replaced-%s" % suff)
3814
      # build the rename list based on what LVs exist on the node
3815
      rlist = []
3816
      for to_ren in old_lvs:
3817
        find_res = rpc.call_blockdev_find(tgt_node, to_ren)
3818
        if find_res is not None: # device exists
3819
          rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
3820

    
3821
      info("renaming the old LVs on the target node")
3822
      if not rpc.call_blockdev_rename(tgt_node, rlist):
3823
        raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
3824
      # now we rename the new LVs to the old LVs
3825
      info("renaming the new LVs on the target node")
3826
      rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
3827
      if not rpc.call_blockdev_rename(tgt_node, rlist):
3828
        raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
3829

    
3830
      for old, new in zip(old_lvs, new_lvs):
3831
        new.logical_id = old.logical_id
3832
        cfg.SetDiskID(new, tgt_node)
3833

    
3834
      for disk in old_lvs:
3835
        disk.logical_id = ren_fn(disk, temp_suffix)
3836
        cfg.SetDiskID(disk, tgt_node)
3837

    
3838
      # now that the new lvs have the old name, we can add them to the device
3839
      info("adding new mirror component on %s" % tgt_node)
3840
      if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
3841
        for new_lv in new_lvs:
3842
          if not rpc.call_blockdev_remove(tgt_node, new_lv):
3843
            warning("Can't rollback device %s", hint="manually cleanup unused"
3844
                    " logical volumes")
3845
        raise errors.OpExecError("Can't add local storage to drbd")
3846

    
3847
      dev.children = new_lvs
3848
      cfg.Update(instance)
3849

    
3850
    # Step: wait for sync
3851

    
3852
    # this can fail as the old devices are degraded and _WaitForSync
3853
    # does a combined result over all disks, so we don't check its
3854
    # return value
3855
    self.proc.LogStep(5, steps_total, "sync devices")
3856
    _WaitForSync(cfg, instance, self.proc, unlock=True)
3857

    
3858
    # so check manually all the devices
3859
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3860
      cfg.SetDiskID(dev, instance.primary_node)
3861
      is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3862
      if is_degr:
3863
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
3864

    
3865
    # Step: remove old storage
3866
    self.proc.LogStep(6, steps_total, "removing old storage")
3867
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3868
      info("remove logical volumes for %s" % name)
3869
      for lv in old_lvs:
3870
        cfg.SetDiskID(lv, tgt_node)
3871
        if not rpc.call_blockdev_remove(tgt_node, lv):
3872
          warning("Can't remove old LV", hint="manually remove unused LVs")
3873
          continue
3874

    
3875
  def _ExecD8Secondary(self, feedback_fn):
3876
    """Replace the secondary node for drbd8.
3877

3878
    The algorithm for replace is quite complicated:
3879
      - for all disks of the instance:
3880
        - create new LVs on the new node with same names
3881
        - shutdown the drbd device on the old secondary
3882
        - disconnect the drbd network on the primary
3883
        - create the drbd device on the new secondary
3884
        - network attach the drbd on the primary, using an artifice:
3885
          the drbd code for Attach() will connect to the network if it
3886
          finds a device which is connected to the good local disks but
3887
          not network enabled
3888
      - wait for sync across all devices
3889
      - remove all disks from the old secondary
3890

3891
    Failures are not very well handled.
3892

3893
    """
3894
    steps_total = 6
3895
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3896
    instance = self.instance
3897
    iv_names = {}
3898
    vgname = self.cfg.GetVGName()
3899
    # start of work
3900
    cfg = self.cfg
3901
    old_node = self.tgt_node
3902
    new_node = self.new_node
3903
    pri_node = instance.primary_node
3904

    
3905
    # Step: check device activation
3906
    self.proc.LogStep(1, steps_total, "check device existence")
3907
    info("checking volume groups")
3908
    my_vg = cfg.GetVGName()
3909
    results = rpc.call_vg_list([pri_node, new_node])
3910
    if not results:
3911
      raise errors.OpExecError("Can't list volume groups on the nodes")
3912
    for node in pri_node, new_node:
3913
      res = results.get(node, False)
3914
      if not res or my_vg not in res:
3915
        raise errors.OpExecError("Volume group '%s' not found on %s" %
3916
                                 (my_vg, node))
3917
    for dev in instance.disks:
3918
      if not dev.iv_name in self.op.disks:
3919
        continue
3920
      info("checking %s on %s" % (dev.iv_name, pri_node))
3921
      cfg.SetDiskID(dev, pri_node)
3922
      if not rpc.call_blockdev_find(pri_node, dev):
3923
        raise errors.OpExecError("Can't find device %s on node %s" %
3924
                                 (dev.iv_name, pri_node))
3925

    
3926
    # Step: check other node consistency
3927
    self.proc.LogStep(2, steps_total, "check peer consistency")
3928
    for dev in instance.disks:
3929
      if not dev.iv_name in self.op.disks:
3930
        continue
3931
      info("checking %s consistency on %s" % (dev.iv_name, pri_node))
3932
      if not _CheckDiskConsistency(self.cfg, dev, pri_node, True, ldisk=True):
3933
        raise errors.OpExecError("Primary node (%s) has degraded storage,"
3934
                                 " unsafe to replace the secondary" %
3935
                                 pri_node)
3936

    
3937
    # Step: create new storage
3938
    self.proc.LogStep(3, steps_total, "allocate new storage")
3939
    for dev in instance.disks:
3940
      size = dev.size
3941
      info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
3942
      # since we *always* want to create this LV, we use the
3943
      # _Create...OnPrimary (which forces the creation), even if we
3944
      # are talking about the secondary node
3945
      for new_lv in dev.children:
3946
        if not _CreateBlockDevOnPrimary(cfg, new_node, instance, new_lv,
3947
                                        _GetInstanceInfoText(instance)):
3948
          raise errors.OpExecError("Failed to create new LV named '%s' on"
3949
                                   " node '%s'" %
3950
                                   (new_lv.logical_id[1], new_node))
3951

    
3952
      iv_names[dev.iv_name] = (dev, dev.children)
3953

    
3954
    self.proc.LogStep(4, steps_total, "changing drbd configuration")
3955
    for dev in instance.disks:
3956
      size = dev.size
3957
      info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
3958
      # create new devices on new_node
3959
      new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
3960
                              logical_id=(pri_node, new_node,
3961
                                          dev.logical_id[2]),
3962
                              children=dev.children)
3963
      if not _CreateBlockDevOnSecondary(cfg, new_node, instance,
3964
                                        new_drbd, False,
3965
                                      _GetInstanceInfoText(instance)):
3966
        raise errors.OpExecError("Failed to create new DRBD on"
3967
                                 " node '%s'" % new_node)
3968

    
3969
    for dev in instance.disks:
3970
      # we have new devices, shutdown the drbd on the old secondary
3971
      info("shutting down drbd for %s on old node" % dev.iv_name)
3972