Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ f22a8ba3

History | View | Annotate | Download (181.3 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0613,W0201
25

    
26
import os
27
import os.path
28
import sha
29
import time
30
import tempfile
31
import re
32
import platform
33

    
34
from ganeti import rpc
35
from ganeti import ssh
36
from ganeti import logger
37
from ganeti import utils
38
from ganeti import errors
39
from ganeti import hypervisor
40
from ganeti import locking
41
from ganeti import constants
42
from ganeti import objects
43
from ganeti import opcodes
44
from ganeti import serializer
45

    
46

    
47
class LogicalUnit(object):
48
  """Logical Unit base class.
49

50
  Subclasses must follow these rules:
51
    - implement ExpandNames
52
    - implement CheckPrereq
53
    - implement Exec
54
    - implement BuildHooksEnv
55
    - redefine HPATH and HTYPE
56
    - optionally redefine their run requirements:
57
        REQ_MASTER: the LU needs to run on the master node
58
        REQ_WSSTORE: the LU needs a writable SimpleStore
59
        REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
60

61
  Note that all commands require root permissions.
62

63
  """
64
  HPATH = None
65
  HTYPE = None
66
  _OP_REQP = []
67
  REQ_MASTER = True
68
  REQ_WSSTORE = False
69
  REQ_BGL = True
70

    
71
  def __init__(self, processor, op, context, sstore):
72
    """Constructor for LogicalUnit.
73

74
    This needs to be overriden in derived classes in order to check op
75
    validity.
76

77
    """
78
    self.proc = processor
79
    self.op = op
80
    self.cfg = context.cfg
81
    self.sstore = sstore
82
    self.context = context
83
    self.needed_locks = None
84
    self.acquired_locks = {}
85
    self.share_locks = dict(((i, 0) for i in locking.LEVELS))
86
    # Used to force good behavior when calling helper functions
87
    self.recalculate_locks = {}
88
    self.__ssh = None
89

    
90
    for attr_name in self._OP_REQP:
91
      attr_val = getattr(op, attr_name, None)
92
      if attr_val is None:
93
        raise errors.OpPrereqError("Required parameter '%s' missing" %
94
                                   attr_name)
95

    
96
    if not self.cfg.IsCluster():
97
      raise errors.OpPrereqError("Cluster not initialized yet,"
98
                                 " use 'gnt-cluster init' first.")
99
    if self.REQ_MASTER:
100
      master = sstore.GetMasterNode()
101
      if master != utils.HostInfo().name:
102
        raise errors.OpPrereqError("Commands must be run on the master"
103
                                   " node %s" % master)
104

    
105
  def __GetSSH(self):
106
    """Returns the SshRunner object
107

108
    """
109
    if not self.__ssh:
110
      self.__ssh = ssh.SshRunner(self.sstore)
111
    return self.__ssh
112

    
113
  ssh = property(fget=__GetSSH)
114

    
115
  def ExpandNames(self):
116
    """Expand names for this LU.
117

118
    This method is called before starting to execute the opcode, and it should
119
    update all the parameters of the opcode to their canonical form (e.g. a
120
    short node name must be fully expanded after this method has successfully
121
    completed). This way locking, hooks, logging, ecc. can work correctly.
122

123
    LUs which implement this method must also populate the self.needed_locks
124
    member, as a dict with lock levels as keys, and a list of needed lock names
125
    as values. Rules:
126
      - Use an empty dict if you don't need any lock
127
      - If you don't need any lock at a particular level omit that level
128
      - Don't put anything for the BGL level
129
      - If you want all locks at a level use locking.ALL_SET as a value
130

131
    If you need to share locks (rather than acquire them exclusively) at one
132
    level you can modify self.share_locks, setting a true value (usually 1) for
133
    that level. By default locks are not shared.
134

135
    Examples:
136
    # Acquire all nodes and one instance
137
    self.needed_locks = {
138
      locking.LEVEL_NODE: locking.ALL_SET,
139
      locking.LEVEL_INSTANCE: ['instance1.example.tld'],
140
    }
141
    # Acquire just two nodes
142
    self.needed_locks = {
143
      locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
144
    }
145
    # Acquire no locks
146
    self.needed_locks = {} # No, you can't leave it to the default value None
147

148
    """
149
    # The implementation of this method is mandatory only if the new LU is
150
    # concurrent, so that old LUs don't need to be changed all at the same
151
    # time.
152
    if self.REQ_BGL:
153
      self.needed_locks = {} # Exclusive LUs don't need locks.
154
    else:
155
      raise NotImplementedError
156

    
157
  def DeclareLocks(self, level):
158
    """Declare LU locking needs for a level
159

160
    While most LUs can just declare their locking needs at ExpandNames time,
161
    sometimes there's the need to calculate some locks after having acquired
162
    the ones before. This function is called just before acquiring locks at a
163
    particular level, but after acquiring the ones at lower levels, and permits
164
    such calculations. It can be used to modify self.needed_locks, and by
165
    default it does nothing.
166

167
    This function is only called if you have something already set in
168
    self.needed_locks for the level.
169

170
    @param level: Locking level which is going to be locked
171
    @type level: member of ganeti.locking.LEVELS
172

173
    """
174

    
175
  def CheckPrereq(self):
176
    """Check prerequisites for this LU.
177

178
    This method should check that the prerequisites for the execution
179
    of this LU are fulfilled. It can do internode communication, but
180
    it should be idempotent - no cluster or system changes are
181
    allowed.
182

183
    The method should raise errors.OpPrereqError in case something is
184
    not fulfilled. Its return value is ignored.
185

186
    This method should also update all the parameters of the opcode to
187
    their canonical form if it hasn't been done by ExpandNames before.
188

189
    """
190
    raise NotImplementedError
191

    
192
  def Exec(self, feedback_fn):
193
    """Execute the LU.
194

195
    This method should implement the actual work. It should raise
196
    errors.OpExecError for failures that are somewhat dealt with in
197
    code, or expected.
198

199
    """
200
    raise NotImplementedError
201

    
202
  def BuildHooksEnv(self):
203
    """Build hooks environment for this LU.
204

205
    This method should return a three-node tuple consisting of: a dict
206
    containing the environment that will be used for running the
207
    specific hook for this LU, a list of node names on which the hook
208
    should run before the execution, and a list of node names on which
209
    the hook should run after the execution.
210

211
    The keys of the dict must not have 'GANETI_' prefixed as this will
212
    be handled in the hooks runner. Also note additional keys will be
213
    added by the hooks runner. If the LU doesn't define any
214
    environment, an empty dict (and not None) should be returned.
215

216
    No nodes should be returned as an empty list (and not None).
217

218
    Note that if the HPATH for a LU class is None, this function will
219
    not be called.
220

221
    """
222
    raise NotImplementedError
223

    
224
  def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
225
    """Notify the LU about the results of its hooks.
226

227
    This method is called every time a hooks phase is executed, and notifies
228
    the Logical Unit about the hooks' result. The LU can then use it to alter
229
    its result based on the hooks.  By default the method does nothing and the
230
    previous result is passed back unchanged but any LU can define it if it
231
    wants to use the local cluster hook-scripts somehow.
232

233
    Args:
234
      phase: the hooks phase that has just been run
235
      hooks_results: the results of the multi-node hooks rpc call
236
      feedback_fn: function to send feedback back to the caller
237
      lu_result: the previous result this LU had, or None in the PRE phase.
238

239
    """
240
    return lu_result
241

    
242
  def _ExpandAndLockInstance(self):
243
    """Helper function to expand and lock an instance.
244

245
    Many LUs that work on an instance take its name in self.op.instance_name
246
    and need to expand it and then declare the expanded name for locking. This
247
    function does it, and then updates self.op.instance_name to the expanded
248
    name. It also initializes needed_locks as a dict, if this hasn't been done
249
    before.
250

251
    """
252
    if self.needed_locks is None:
253
      self.needed_locks = {}
254
    else:
255
      assert locking.LEVEL_INSTANCE not in self.needed_locks, \
256
        "_ExpandAndLockInstance called with instance-level locks set"
257
    expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
258
    if expanded_name is None:
259
      raise errors.OpPrereqError("Instance '%s' not known" %
260
                                  self.op.instance_name)
261
    self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
262
    self.op.instance_name = expanded_name
263

    
264
  def _LockInstancesNodes(self, primary_only=False):
265
    """Helper function to declare instances' nodes for locking.
266

267
    This function should be called after locking one or more instances to lock
268
    their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
269
    with all primary or secondary nodes for instances already locked and
270
    present in self.needed_locks[locking.LEVEL_INSTANCE].
271

272
    It should be called from DeclareLocks, and for safety only works if
273
    self.recalculate_locks[locking.LEVEL_NODE] is set.
274

275
    In the future it may grow parameters to just lock some instance's nodes, or
276
    to just lock primaries or secondary nodes, if needed.
277

278
    If should be called in DeclareLocks in a way similar to:
279

280
    if level == locking.LEVEL_NODE:
281
      self._LockInstancesNodes()
282

283
    @type primary_only: boolean
284
    @param primary_only: only lock primary nodes of locked instances
285

286
    """
287
    assert locking.LEVEL_NODE in self.recalculate_locks, \
288
      "_LockInstancesNodes helper function called with no nodes to recalculate"
289

    
290
    # TODO: check if we're really been called with the instance locks held
291

    
292
    # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
293
    # future we might want to have different behaviors depending on the value
294
    # of self.recalculate_locks[locking.LEVEL_NODE]
295
    wanted_nodes = []
296
    for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
297
      instance = self.context.cfg.GetInstanceInfo(instance_name)
298
      wanted_nodes.append(instance.primary_node)
299
      if not primary_only:
300
        wanted_nodes.extend(instance.secondary_nodes)
301
    self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
302

    
303
    del self.recalculate_locks[locking.LEVEL_NODE]
304

    
305

    
306
class NoHooksLU(LogicalUnit):
307
  """Simple LU which runs no hooks.
308

309
  This LU is intended as a parent for other LogicalUnits which will
310
  run no hooks, in order to reduce duplicate code.
311

312
  """
313
  HPATH = None
314
  HTYPE = None
315

    
316

    
317
def _GetWantedNodes(lu, nodes):
318
  """Returns list of checked and expanded node names.
319

320
  Args:
321
    nodes: List of nodes (strings) or None for all
322

323
  """
324
  if not isinstance(nodes, list):
325
    raise errors.OpPrereqError("Invalid argument type 'nodes'")
326

    
327
  if not nodes:
328
    raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
329
      " non-empty list of nodes whose name is to be expanded.")
330

    
331
  wanted = []
332
  for name in nodes:
333
    node = lu.cfg.ExpandNodeName(name)
334
    if node is None:
335
      raise errors.OpPrereqError("No such node name '%s'" % name)
336
    wanted.append(node)
337

    
338
  return utils.NiceSort(wanted)
339

    
340

    
341
def _GetWantedInstances(lu, instances):
342
  """Returns list of checked and expanded instance names.
343

344
  Args:
345
    instances: List of instances (strings) or None for all
346

347
  """
348
  if not isinstance(instances, list):
349
    raise errors.OpPrereqError("Invalid argument type 'instances'")
350

    
351
  if instances:
352
    wanted = []
353

    
354
    for name in instances:
355
      instance = lu.cfg.ExpandInstanceName(name)
356
      if instance is None:
357
        raise errors.OpPrereqError("No such instance name '%s'" % name)
358
      wanted.append(instance)
359

    
360
  else:
361
    wanted = lu.cfg.GetInstanceList()
362
  return utils.NiceSort(wanted)
363

    
364

    
365
def _CheckOutputFields(static, dynamic, selected):
366
  """Checks whether all selected fields are valid.
367

368
  Args:
369
    static: Static fields
370
    dynamic: Dynamic fields
371

372
  """
373
  static_fields = frozenset(static)
374
  dynamic_fields = frozenset(dynamic)
375

    
376
  all_fields = static_fields | dynamic_fields
377

    
378
  if not all_fields.issuperset(selected):
379
    raise errors.OpPrereqError("Unknown output fields selected: %s"
380
                               % ",".join(frozenset(selected).
381
                                          difference(all_fields)))
382

    
383

    
384
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
385
                          memory, vcpus, nics):
386
  """Builds instance related env variables for hooks from single variables.
387

388
  Args:
389
    secondary_nodes: List of secondary nodes as strings
390
  """
391
  env = {
392
    "OP_TARGET": name,
393
    "INSTANCE_NAME": name,
394
    "INSTANCE_PRIMARY": primary_node,
395
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
396
    "INSTANCE_OS_TYPE": os_type,
397
    "INSTANCE_STATUS": status,
398
    "INSTANCE_MEMORY": memory,
399
    "INSTANCE_VCPUS": vcpus,
400
  }
401

    
402
  if nics:
403
    nic_count = len(nics)
404
    for idx, (ip, bridge, mac) in enumerate(nics):
405
      if ip is None:
406
        ip = ""
407
      env["INSTANCE_NIC%d_IP" % idx] = ip
408
      env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
409
      env["INSTANCE_NIC%d_HWADDR" % idx] = mac
410
  else:
411
    nic_count = 0
412

    
413
  env["INSTANCE_NIC_COUNT"] = nic_count
414

    
415
  return env
416

    
417

    
418
def _BuildInstanceHookEnvByObject(instance, override=None):
419
  """Builds instance related env variables for hooks from an object.
420

421
  Args:
422
    instance: objects.Instance object of instance
423
    override: dict of values to override
424
  """
425
  args = {
426
    'name': instance.name,
427
    'primary_node': instance.primary_node,
428
    'secondary_nodes': instance.secondary_nodes,
429
    'os_type': instance.os,
430
    'status': instance.os,
431
    'memory': instance.memory,
432
    'vcpus': instance.vcpus,
433
    'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
434
  }
435
  if override:
436
    args.update(override)
437
  return _BuildInstanceHookEnv(**args)
438

    
439

    
440
def _CheckInstanceBridgesExist(instance):
441
  """Check that the brigdes needed by an instance exist.
442

443
  """
444
  # check bridges existance
445
  brlist = [nic.bridge for nic in instance.nics]
446
  if not rpc.call_bridges_exist(instance.primary_node, brlist):
447
    raise errors.OpPrereqError("one or more target bridges %s does not"
448
                               " exist on destination node '%s'" %
449
                               (brlist, instance.primary_node))
450

    
451

    
452
class LUDestroyCluster(NoHooksLU):
453
  """Logical unit for destroying the cluster.
454

455
  """
456
  _OP_REQP = []
457

    
458
  def CheckPrereq(self):
459
    """Check prerequisites.
460

461
    This checks whether the cluster is empty.
462

463
    Any errors are signalled by raising errors.OpPrereqError.
464

465
    """
466
    master = self.sstore.GetMasterNode()
467

    
468
    nodelist = self.cfg.GetNodeList()
469
    if len(nodelist) != 1 or nodelist[0] != master:
470
      raise errors.OpPrereqError("There are still %d node(s) in"
471
                                 " this cluster." % (len(nodelist) - 1))
472
    instancelist = self.cfg.GetInstanceList()
473
    if instancelist:
474
      raise errors.OpPrereqError("There are still %d instance(s) in"
475
                                 " this cluster." % len(instancelist))
476

    
477
  def Exec(self, feedback_fn):
478
    """Destroys the cluster.
479

480
    """
481
    master = self.sstore.GetMasterNode()
482
    if not rpc.call_node_stop_master(master, False):
483
      raise errors.OpExecError("Could not disable the master role")
484
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
485
    utils.CreateBackup(priv_key)
486
    utils.CreateBackup(pub_key)
487
    return master
488

    
489

    
490
class LUVerifyCluster(LogicalUnit):
491
  """Verifies the cluster status.
492

493
  """
494
  HPATH = "cluster-verify"
495
  HTYPE = constants.HTYPE_CLUSTER
496
  _OP_REQP = ["skip_checks"]
497

    
498
  def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
499
                  remote_version, feedback_fn):
500
    """Run multiple tests against a node.
501

502
    Test list:
503
      - compares ganeti version
504
      - checks vg existance and size > 20G
505
      - checks config file checksum
506
      - checks ssh to other nodes
507

508
    Args:
509
      node: name of the node to check
510
      file_list: required list of files
511
      local_cksum: dictionary of local files and their checksums
512

513
    """
514
    # compares ganeti version
515
    local_version = constants.PROTOCOL_VERSION
516
    if not remote_version:
517
      feedback_fn("  - ERROR: connection to %s failed" % (node))
518
      return True
519

    
520
    if local_version != remote_version:
521
      feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
522
                      (local_version, node, remote_version))
523
      return True
524

    
525
    # checks vg existance and size > 20G
526

    
527
    bad = False
528
    if not vglist:
529
      feedback_fn("  - ERROR: unable to check volume groups on node %s." %
530
                      (node,))
531
      bad = True
532
    else:
533
      vgstatus = utils.CheckVolumeGroupSize(vglist, self.cfg.GetVGName(),
534
                                            constants.MIN_VG_SIZE)
535
      if vgstatus:
536
        feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
537
        bad = True
538

    
539
    # checks config file checksum
540
    # checks ssh to any
541

    
542
    if 'filelist' not in node_result:
543
      bad = True
544
      feedback_fn("  - ERROR: node hasn't returned file checksum data")
545
    else:
546
      remote_cksum = node_result['filelist']
547
      for file_name in file_list:
548
        if file_name not in remote_cksum:
549
          bad = True
550
          feedback_fn("  - ERROR: file '%s' missing" % file_name)
551
        elif remote_cksum[file_name] != local_cksum[file_name]:
552
          bad = True
553
          feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
554

    
555
    if 'nodelist' not in node_result:
556
      bad = True
557
      feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
558
    else:
559
      if node_result['nodelist']:
560
        bad = True
561
        for node in node_result['nodelist']:
562
          feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
563
                          (node, node_result['nodelist'][node]))
564
    if 'node-net-test' not in node_result:
565
      bad = True
566
      feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
567
    else:
568
      if node_result['node-net-test']:
569
        bad = True
570
        nlist = utils.NiceSort(node_result['node-net-test'].keys())
571
        for node in nlist:
572
          feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
573
                          (node, node_result['node-net-test'][node]))
574

    
575
    hyp_result = node_result.get('hypervisor', None)
576
    if hyp_result is not None:
577
      feedback_fn("  - ERROR: hypervisor verify failure: '%s'" % hyp_result)
578
    return bad
579

    
580
  def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
581
                      node_instance, feedback_fn):
582
    """Verify an instance.
583

584
    This function checks to see if the required block devices are
585
    available on the instance's node.
586

587
    """
588
    bad = False
589

    
590
    node_current = instanceconfig.primary_node
591

    
592
    node_vol_should = {}
593
    instanceconfig.MapLVsByNode(node_vol_should)
594

    
595
    for node in node_vol_should:
596
      for volume in node_vol_should[node]:
597
        if node not in node_vol_is or volume not in node_vol_is[node]:
598
          feedback_fn("  - ERROR: volume %s missing on node %s" %
599
                          (volume, node))
600
          bad = True
601

    
602
    if not instanceconfig.status == 'down':
603
      if (node_current not in node_instance or
604
          not instance in node_instance[node_current]):
605
        feedback_fn("  - ERROR: instance %s not running on node %s" %
606
                        (instance, node_current))
607
        bad = True
608

    
609
    for node in node_instance:
610
      if (not node == node_current):
611
        if instance in node_instance[node]:
612
          feedback_fn("  - ERROR: instance %s should not run on node %s" %
613
                          (instance, node))
614
          bad = True
615

    
616
    return bad
617

    
618
  def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
619
    """Verify if there are any unknown volumes in the cluster.
620

621
    The .os, .swap and backup volumes are ignored. All other volumes are
622
    reported as unknown.
623

624
    """
625
    bad = False
626

    
627
    for node in node_vol_is:
628
      for volume in node_vol_is[node]:
629
        if node not in node_vol_should or volume not in node_vol_should[node]:
630
          feedback_fn("  - ERROR: volume %s on node %s should not exist" %
631
                      (volume, node))
632
          bad = True
633
    return bad
634

    
635
  def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
636
    """Verify the list of running instances.
637

638
    This checks what instances are running but unknown to the cluster.
639

640
    """
641
    bad = False
642
    for node in node_instance:
643
      for runninginstance in node_instance[node]:
644
        if runninginstance not in instancelist:
645
          feedback_fn("  - ERROR: instance %s on node %s should not exist" %
646
                          (runninginstance, node))
647
          bad = True
648
    return bad
649

    
650
  def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
651
    """Verify N+1 Memory Resilience.
652

653
    Check that if one single node dies we can still start all the instances it
654
    was primary for.
655

656
    """
657
    bad = False
658

    
659
    for node, nodeinfo in node_info.iteritems():
660
      # This code checks that every node which is now listed as secondary has
661
      # enough memory to host all instances it is supposed to should a single
662
      # other node in the cluster fail.
663
      # FIXME: not ready for failover to an arbitrary node
664
      # FIXME: does not support file-backed instances
665
      # WARNING: we currently take into account down instances as well as up
666
      # ones, considering that even if they're down someone might want to start
667
      # them even in the event of a node failure.
668
      for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
669
        needed_mem = 0
670
        for instance in instances:
671
          needed_mem += instance_cfg[instance].memory
672
        if nodeinfo['mfree'] < needed_mem:
673
          feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
674
                      " failovers should node %s fail" % (node, prinode))
675
          bad = True
676
    return bad
677

    
678
  def CheckPrereq(self):
679
    """Check prerequisites.
680

681
    Transform the list of checks we're going to skip into a set and check that
682
    all its members are valid.
683

684
    """
685
    self.skip_set = frozenset(self.op.skip_checks)
686
    if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
687
      raise errors.OpPrereqError("Invalid checks to be skipped specified")
688

    
689
  def BuildHooksEnv(self):
690
    """Build hooks env.
691

692
    Cluster-Verify hooks just rone in the post phase and their failure makes
693
    the output be logged in the verify output and the verification to fail.
694

695
    """
696
    all_nodes = self.cfg.GetNodeList()
697
    # TODO: populate the environment with useful information for verify hooks
698
    env = {}
699
    return env, [], all_nodes
700

    
701
  def Exec(self, feedback_fn):
702
    """Verify integrity of cluster, performing various test on nodes.
703

704
    """
705
    bad = False
706
    feedback_fn("* Verifying global settings")
707
    for msg in self.cfg.VerifyConfig():
708
      feedback_fn("  - ERROR: %s" % msg)
709

    
710
    vg_name = self.cfg.GetVGName()
711
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
712
    nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
713
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
714
    i_non_redundant = [] # Non redundant instances
715
    node_volume = {}
716
    node_instance = {}
717
    node_info = {}
718
    instance_cfg = {}
719

    
720
    # FIXME: verify OS list
721
    # do local checksums
722
    file_names = list(self.sstore.GetFileList())
723
    file_names.append(constants.SSL_CERT_FILE)
724
    file_names.append(constants.CLUSTER_CONF_FILE)
725
    local_checksums = utils.FingerprintFiles(file_names)
726

    
727
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
728
    all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
729
    all_instanceinfo = rpc.call_instance_list(nodelist)
730
    all_vglist = rpc.call_vg_list(nodelist)
731
    node_verify_param = {
732
      'filelist': file_names,
733
      'nodelist': nodelist,
734
      'hypervisor': None,
735
      'node-net-test': [(node.name, node.primary_ip, node.secondary_ip)
736
                        for node in nodeinfo]
737
      }
738
    all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
739
    all_rversion = rpc.call_version(nodelist)
740
    all_ninfo = rpc.call_node_info(nodelist, self.cfg.GetVGName())
741

    
742
    for node in nodelist:
743
      feedback_fn("* Verifying node %s" % node)
744
      result = self._VerifyNode(node, file_names, local_checksums,
745
                                all_vglist[node], all_nvinfo[node],
746
                                all_rversion[node], feedback_fn)
747
      bad = bad or result
748

    
749
      # node_volume
750
      volumeinfo = all_volumeinfo[node]
751

    
752
      if isinstance(volumeinfo, basestring):
753
        feedback_fn("  - ERROR: LVM problem on node %s: %s" %
754
                    (node, volumeinfo[-400:].encode('string_escape')))
755
        bad = True
756
        node_volume[node] = {}
757
      elif not isinstance(volumeinfo, dict):
758
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
759
        bad = True
760
        continue
761
      else:
762
        node_volume[node] = volumeinfo
763

    
764
      # node_instance
765
      nodeinstance = all_instanceinfo[node]
766
      if type(nodeinstance) != list:
767
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
768
        bad = True
769
        continue
770

    
771
      node_instance[node] = nodeinstance
772

    
773
      # node_info
774
      nodeinfo = all_ninfo[node]
775
      if not isinstance(nodeinfo, dict):
776
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
777
        bad = True
778
        continue
779

    
780
      try:
781
        node_info[node] = {
782
          "mfree": int(nodeinfo['memory_free']),
783
          "dfree": int(nodeinfo['vg_free']),
784
          "pinst": [],
785
          "sinst": [],
786
          # dictionary holding all instances this node is secondary for,
787
          # grouped by their primary node. Each key is a cluster node, and each
788
          # value is a list of instances which have the key as primary and the
789
          # current node as secondary.  this is handy to calculate N+1 memory
790
          # availability if you can only failover from a primary to its
791
          # secondary.
792
          "sinst-by-pnode": {},
793
        }
794
      except ValueError:
795
        feedback_fn("  - ERROR: invalid value returned from node %s" % (node,))
796
        bad = True
797
        continue
798

    
799
    node_vol_should = {}
800

    
801
    for instance in instancelist:
802
      feedback_fn("* Verifying instance %s" % instance)
803
      inst_config = self.cfg.GetInstanceInfo(instance)
804
      result =  self._VerifyInstance(instance, inst_config, node_volume,
805
                                     node_instance, feedback_fn)
806
      bad = bad or result
807

    
808
      inst_config.MapLVsByNode(node_vol_should)
809

    
810
      instance_cfg[instance] = inst_config
811

    
812
      pnode = inst_config.primary_node
813
      if pnode in node_info:
814
        node_info[pnode]['pinst'].append(instance)
815
      else:
816
        feedback_fn("  - ERROR: instance %s, connection to primary node"
817
                    " %s failed" % (instance, pnode))
818
        bad = True
819

    
820
      # If the instance is non-redundant we cannot survive losing its primary
821
      # node, so we are not N+1 compliant. On the other hand we have no disk
822
      # templates with more than one secondary so that situation is not well
823
      # supported either.
824
      # FIXME: does not support file-backed instances
825
      if len(inst_config.secondary_nodes) == 0:
826
        i_non_redundant.append(instance)
827
      elif len(inst_config.secondary_nodes) > 1:
828
        feedback_fn("  - WARNING: multiple secondaries for instance %s"
829
                    % instance)
830

    
831
      for snode in inst_config.secondary_nodes:
832
        if snode in node_info:
833
          node_info[snode]['sinst'].append(instance)
834
          if pnode not in node_info[snode]['sinst-by-pnode']:
835
            node_info[snode]['sinst-by-pnode'][pnode] = []
836
          node_info[snode]['sinst-by-pnode'][pnode].append(instance)
837
        else:
838
          feedback_fn("  - ERROR: instance %s, connection to secondary node"
839
                      " %s failed" % (instance, snode))
840

    
841
    feedback_fn("* Verifying orphan volumes")
842
    result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
843
                                       feedback_fn)
844
    bad = bad or result
845

    
846
    feedback_fn("* Verifying remaining instances")
847
    result = self._VerifyOrphanInstances(instancelist, node_instance,
848
                                         feedback_fn)
849
    bad = bad or result
850

    
851
    if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
852
      feedback_fn("* Verifying N+1 Memory redundancy")
853
      result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
854
      bad = bad or result
855

    
856
    feedback_fn("* Other Notes")
857
    if i_non_redundant:
858
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
859
                  % len(i_non_redundant))
860

    
861
    return not bad
862

    
863
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
864
    """Analize the post-hooks' result, handle it, and send some
865
    nicely-formatted feedback back to the user.
866

867
    Args:
868
      phase: the hooks phase that has just been run
869
      hooks_results: the results of the multi-node hooks rpc call
870
      feedback_fn: function to send feedback back to the caller
871
      lu_result: previous Exec result
872

873
    """
874
    # We only really run POST phase hooks, and are only interested in
875
    # their results
876
    if phase == constants.HOOKS_PHASE_POST:
877
      # Used to change hooks' output to proper indentation
878
      indent_re = re.compile('^', re.M)
879
      feedback_fn("* Hooks Results")
880
      if not hooks_results:
881
        feedback_fn("  - ERROR: general communication failure")
882
        lu_result = 1
883
      else:
884
        for node_name in hooks_results:
885
          show_node_header = True
886
          res = hooks_results[node_name]
887
          if res is False or not isinstance(res, list):
888
            feedback_fn("    Communication failure")
889
            lu_result = 1
890
            continue
891
          for script, hkr, output in res:
892
            if hkr == constants.HKR_FAIL:
893
              # The node header is only shown once, if there are
894
              # failing hooks on that node
895
              if show_node_header:
896
                feedback_fn("  Node %s:" % node_name)
897
                show_node_header = False
898
              feedback_fn("    ERROR: Script %s failed, output:" % script)
899
              output = indent_re.sub('      ', output)
900
              feedback_fn("%s" % output)
901
              lu_result = 1
902

    
903
      return lu_result
904

    
905

    
906
class LUVerifyDisks(NoHooksLU):
907
  """Verifies the cluster disks status.
908

909
  """
910
  _OP_REQP = []
911

    
912
  def CheckPrereq(self):
913
    """Check prerequisites.
914

915
    This has no prerequisites.
916

917
    """
918
    pass
919

    
920
  def Exec(self, feedback_fn):
921
    """Verify integrity of cluster disks.
922

923
    """
924
    result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
925

    
926
    vg_name = self.cfg.GetVGName()
927
    nodes = utils.NiceSort(self.cfg.GetNodeList())
928
    instances = [self.cfg.GetInstanceInfo(name)
929
                 for name in self.cfg.GetInstanceList()]
930

    
931
    nv_dict = {}
932
    for inst in instances:
933
      inst_lvs = {}
934
      if (inst.status != "up" or
935
          inst.disk_template not in constants.DTS_NET_MIRROR):
936
        continue
937
      inst.MapLVsByNode(inst_lvs)
938
      # transform { iname: {node: [vol,],},} to {(node, vol): iname}
939
      for node, vol_list in inst_lvs.iteritems():
940
        for vol in vol_list:
941
          nv_dict[(node, vol)] = inst
942

    
943
    if not nv_dict:
944
      return result
945

    
946
    node_lvs = rpc.call_volume_list(nodes, vg_name)
947

    
948
    to_act = set()
949
    for node in nodes:
950
      # node_volume
951
      lvs = node_lvs[node]
952

    
953
      if isinstance(lvs, basestring):
954
        logger.Info("error enumerating LVs on node %s: %s" % (node, lvs))
955
        res_nlvm[node] = lvs
956
      elif not isinstance(lvs, dict):
957
        logger.Info("connection to node %s failed or invalid data returned" %
958
                    (node,))
959
        res_nodes.append(node)
960
        continue
961

    
962
      for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
963
        inst = nv_dict.pop((node, lv_name), None)
964
        if (not lv_online and inst is not None
965
            and inst.name not in res_instances):
966
          res_instances.append(inst.name)
967

    
968
    # any leftover items in nv_dict are missing LVs, let's arrange the
969
    # data better
970
    for key, inst in nv_dict.iteritems():
971
      if inst.name not in res_missing:
972
        res_missing[inst.name] = []
973
      res_missing[inst.name].append(key)
974

    
975
    return result
976

    
977

    
978
class LURenameCluster(LogicalUnit):
979
  """Rename the cluster.
980

981
  """
982
  HPATH = "cluster-rename"
983
  HTYPE = constants.HTYPE_CLUSTER
984
  _OP_REQP = ["name"]
985
  REQ_WSSTORE = True
986

    
987
  def BuildHooksEnv(self):
988
    """Build hooks env.
989

990
    """
991
    env = {
992
      "OP_TARGET": self.sstore.GetClusterName(),
993
      "NEW_NAME": self.op.name,
994
      }
995
    mn = self.sstore.GetMasterNode()
996
    return env, [mn], [mn]
997

    
998
  def CheckPrereq(self):
999
    """Verify that the passed name is a valid one.
1000

1001
    """
1002
    hostname = utils.HostInfo(self.op.name)
1003

    
1004
    new_name = hostname.name
1005
    self.ip = new_ip = hostname.ip
1006
    old_name = self.sstore.GetClusterName()
1007
    old_ip = self.sstore.GetMasterIP()
1008
    if new_name == old_name and new_ip == old_ip:
1009
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
1010
                                 " cluster has changed")
1011
    if new_ip != old_ip:
1012
      if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1013
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
1014
                                   " reachable on the network. Aborting." %
1015
                                   new_ip)
1016

    
1017
    self.op.name = new_name
1018

    
1019
  def Exec(self, feedback_fn):
1020
    """Rename the cluster.
1021

1022
    """
1023
    clustername = self.op.name
1024
    ip = self.ip
1025
    ss = self.sstore
1026

    
1027
    # shutdown the master IP
1028
    master = ss.GetMasterNode()
1029
    if not rpc.call_node_stop_master(master, False):
1030
      raise errors.OpExecError("Could not disable the master role")
1031

    
1032
    try:
1033
      # modify the sstore
1034
      ss.SetKey(ss.SS_MASTER_IP, ip)
1035
      ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
1036

    
1037
      # Distribute updated ss config to all nodes
1038
      myself = self.cfg.GetNodeInfo(master)
1039
      dist_nodes = self.cfg.GetNodeList()
1040
      if myself.name in dist_nodes:
1041
        dist_nodes.remove(myself.name)
1042

    
1043
      logger.Debug("Copying updated ssconf data to all nodes")
1044
      for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1045
        fname = ss.KeyToFilename(keyname)
1046
        result = rpc.call_upload_file(dist_nodes, fname)
1047
        for to_node in dist_nodes:
1048
          if not result[to_node]:
1049
            logger.Error("copy of file %s to node %s failed" %
1050
                         (fname, to_node))
1051
    finally:
1052
      if not rpc.call_node_start_master(master, False):
1053
        logger.Error("Could not re-enable the master role on the master,"
1054
                     " please restart manually.")
1055

    
1056

    
1057
def _RecursiveCheckIfLVMBased(disk):
1058
  """Check if the given disk or its children are lvm-based.
1059

1060
  Args:
1061
    disk: ganeti.objects.Disk object
1062

1063
  Returns:
1064
    boolean indicating whether a LD_LV dev_type was found or not
1065

1066
  """
1067
  if disk.children:
1068
    for chdisk in disk.children:
1069
      if _RecursiveCheckIfLVMBased(chdisk):
1070
        return True
1071
  return disk.dev_type == constants.LD_LV
1072

    
1073

    
1074
class LUSetClusterParams(LogicalUnit):
1075
  """Change the parameters of the cluster.
1076

1077
  """
1078
  HPATH = "cluster-modify"
1079
  HTYPE = constants.HTYPE_CLUSTER
1080
  _OP_REQP = []
1081

    
1082
  def BuildHooksEnv(self):
1083
    """Build hooks env.
1084

1085
    """
1086
    env = {
1087
      "OP_TARGET": self.sstore.GetClusterName(),
1088
      "NEW_VG_NAME": self.op.vg_name,
1089
      }
1090
    mn = self.sstore.GetMasterNode()
1091
    return env, [mn], [mn]
1092

    
1093
  def CheckPrereq(self):
1094
    """Check prerequisites.
1095

1096
    This checks whether the given params don't conflict and
1097
    if the given volume group is valid.
1098

1099
    """
1100
    if not self.op.vg_name:
1101
      instances = [self.cfg.GetInstanceInfo(name)
1102
                   for name in self.cfg.GetInstanceList()]
1103
      for inst in instances:
1104
        for disk in inst.disks:
1105
          if _RecursiveCheckIfLVMBased(disk):
1106
            raise errors.OpPrereqError("Cannot disable lvm storage while"
1107
                                       " lvm-based instances exist")
1108

    
1109
    # if vg_name not None, checks given volume group on all nodes
1110
    if self.op.vg_name:
1111
      node_list = self.cfg.GetNodeList()
1112
      vglist = rpc.call_vg_list(node_list)
1113
      for node in node_list:
1114
        vgstatus = utils.CheckVolumeGroupSize(vglist[node], self.op.vg_name,
1115
                                              constants.MIN_VG_SIZE)
1116
        if vgstatus:
1117
          raise errors.OpPrereqError("Error on node '%s': %s" %
1118
                                     (node, vgstatus))
1119

    
1120
  def Exec(self, feedback_fn):
1121
    """Change the parameters of the cluster.
1122

1123
    """
1124
    if self.op.vg_name != self.cfg.GetVGName():
1125
      self.cfg.SetVGName(self.op.vg_name)
1126
    else:
1127
      feedback_fn("Cluster LVM configuration already in desired"
1128
                  " state, not changing")
1129

    
1130

    
1131
def _WaitForSync(cfgw, instance, proc, oneshot=False, unlock=False):
1132
  """Sleep and poll for an instance's disk to sync.
1133

1134
  """
1135
  if not instance.disks:
1136
    return True
1137

    
1138
  if not oneshot:
1139
    proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1140

    
1141
  node = instance.primary_node
1142

    
1143
  for dev in instance.disks:
1144
    cfgw.SetDiskID(dev, node)
1145

    
1146
  retries = 0
1147
  while True:
1148
    max_time = 0
1149
    done = True
1150
    cumul_degraded = False
1151
    rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1152
    if not rstats:
1153
      proc.LogWarning("Can't get any data from node %s" % node)
1154
      retries += 1
1155
      if retries >= 10:
1156
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1157
                                 " aborting." % node)
1158
      time.sleep(6)
1159
      continue
1160
    retries = 0
1161
    for i in range(len(rstats)):
1162
      mstat = rstats[i]
1163
      if mstat is None:
1164
        proc.LogWarning("Can't compute data for node %s/%s" %
1165
                        (node, instance.disks[i].iv_name))
1166
        continue
1167
      # we ignore the ldisk parameter
1168
      perc_done, est_time, is_degraded, _ = mstat
1169
      cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1170
      if perc_done is not None:
1171
        done = False
1172
        if est_time is not None:
1173
          rem_time = "%d estimated seconds remaining" % est_time
1174
          max_time = est_time
1175
        else:
1176
          rem_time = "no time estimate"
1177
        proc.LogInfo("- device %s: %5.2f%% done, %s" %
1178
                     (instance.disks[i].iv_name, perc_done, rem_time))
1179
    if done or oneshot:
1180
      break
1181

    
1182
    time.sleep(min(60, max_time))
1183

    
1184
  if done:
1185
    proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1186
  return not cumul_degraded
1187

    
1188

    
1189
def _CheckDiskConsistency(cfgw, dev, node, on_primary, ldisk=False):
1190
  """Check that mirrors are not degraded.
1191

1192
  The ldisk parameter, if True, will change the test from the
1193
  is_degraded attribute (which represents overall non-ok status for
1194
  the device(s)) to the ldisk (representing the local storage status).
1195

1196
  """
1197
  cfgw.SetDiskID(dev, node)
1198
  if ldisk:
1199
    idx = 6
1200
  else:
1201
    idx = 5
1202

    
1203
  result = True
1204
  if on_primary or dev.AssembleOnSecondary():
1205
    rstats = rpc.call_blockdev_find(node, dev)
1206
    if not rstats:
1207
      logger.ToStderr("Node %s: Disk degraded, not found or node down" % node)
1208
      result = False
1209
    else:
1210
      result = result and (not rstats[idx])
1211
  if dev.children:
1212
    for child in dev.children:
1213
      result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1214

    
1215
  return result
1216

    
1217

    
1218
class LUDiagnoseOS(NoHooksLU):
1219
  """Logical unit for OS diagnose/query.
1220

1221
  """
1222
  _OP_REQP = ["output_fields", "names"]
1223
  REQ_BGL = False
1224

    
1225
  def ExpandNames(self):
1226
    if self.op.names:
1227
      raise errors.OpPrereqError("Selective OS query not supported")
1228

    
1229
    self.dynamic_fields = frozenset(["name", "valid", "node_status"])
1230
    _CheckOutputFields(static=[],
1231
                       dynamic=self.dynamic_fields,
1232
                       selected=self.op.output_fields)
1233

    
1234
    # Lock all nodes, in shared mode
1235
    self.needed_locks = {}
1236
    self.share_locks[locking.LEVEL_NODE] = 1
1237
    self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1238

    
1239
  def CheckPrereq(self):
1240
    """Check prerequisites.
1241

1242
    """
1243

    
1244
  @staticmethod
1245
  def _DiagnoseByOS(node_list, rlist):
1246
    """Remaps a per-node return list into an a per-os per-node dictionary
1247

1248
      Args:
1249
        node_list: a list with the names of all nodes
1250
        rlist: a map with node names as keys and OS objects as values
1251

1252
      Returns:
1253
        map: a map with osnames as keys and as value another map, with
1254
             nodes as
1255
             keys and list of OS objects as values
1256
             e.g. {"debian-etch": {"node1": [<object>,...],
1257
                                   "node2": [<object>,]}
1258
                  }
1259

1260
    """
1261
    all_os = {}
1262
    for node_name, nr in rlist.iteritems():
1263
      if not nr:
1264
        continue
1265
      for os_obj in nr:
1266
        if os_obj.name not in all_os:
1267
          # build a list of nodes for this os containing empty lists
1268
          # for each node in node_list
1269
          all_os[os_obj.name] = {}
1270
          for nname in node_list:
1271
            all_os[os_obj.name][nname] = []
1272
        all_os[os_obj.name][node_name].append(os_obj)
1273
    return all_os
1274

    
1275
  def Exec(self, feedback_fn):
1276
    """Compute the list of OSes.
1277

1278
    """
1279
    node_list = self.acquired_locks[locking.LEVEL_NODE]
1280
    node_data = rpc.call_os_diagnose(node_list)
1281
    if node_data == False:
1282
      raise errors.OpExecError("Can't gather the list of OSes")
1283
    pol = self._DiagnoseByOS(node_list, node_data)
1284
    output = []
1285
    for os_name, os_data in pol.iteritems():
1286
      row = []
1287
      for field in self.op.output_fields:
1288
        if field == "name":
1289
          val = os_name
1290
        elif field == "valid":
1291
          val = utils.all([osl and osl[0] for osl in os_data.values()])
1292
        elif field == "node_status":
1293
          val = {}
1294
          for node_name, nos_list in os_data.iteritems():
1295
            val[node_name] = [(v.status, v.path) for v in nos_list]
1296
        else:
1297
          raise errors.ParameterError(field)
1298
        row.append(val)
1299
      output.append(row)
1300

    
1301
    return output
1302

    
1303

    
1304
class LURemoveNode(LogicalUnit):
1305
  """Logical unit for removing a node.
1306

1307
  """
1308
  HPATH = "node-remove"
1309
  HTYPE = constants.HTYPE_NODE
1310
  _OP_REQP = ["node_name"]
1311

    
1312
  def BuildHooksEnv(self):
1313
    """Build hooks env.
1314

1315
    This doesn't run on the target node in the pre phase as a failed
1316
    node would then be impossible to remove.
1317

1318
    """
1319
    env = {
1320
      "OP_TARGET": self.op.node_name,
1321
      "NODE_NAME": self.op.node_name,
1322
      }
1323
    all_nodes = self.cfg.GetNodeList()
1324
    all_nodes.remove(self.op.node_name)
1325
    return env, all_nodes, all_nodes
1326

    
1327
  def CheckPrereq(self):
1328
    """Check prerequisites.
1329

1330
    This checks:
1331
     - the node exists in the configuration
1332
     - it does not have primary or secondary instances
1333
     - it's not the master
1334

1335
    Any errors are signalled by raising errors.OpPrereqError.
1336

1337
    """
1338
    node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1339
    if node is None:
1340
      raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1341

    
1342
    instance_list = self.cfg.GetInstanceList()
1343

    
1344
    masternode = self.sstore.GetMasterNode()
1345
    if node.name == masternode:
1346
      raise errors.OpPrereqError("Node is the master node,"
1347
                                 " you need to failover first.")
1348

    
1349
    for instance_name in instance_list:
1350
      instance = self.cfg.GetInstanceInfo(instance_name)
1351
      if node.name == instance.primary_node:
1352
        raise errors.OpPrereqError("Instance %s still running on the node,"
1353
                                   " please remove first." % instance_name)
1354
      if node.name in instance.secondary_nodes:
1355
        raise errors.OpPrereqError("Instance %s has node as a secondary,"
1356
                                   " please remove first." % instance_name)
1357
    self.op.node_name = node.name
1358
    self.node = node
1359

    
1360
  def Exec(self, feedback_fn):
1361
    """Removes the node from the cluster.
1362

1363
    """
1364
    node = self.node
1365
    logger.Info("stopping the node daemon and removing configs from node %s" %
1366
                node.name)
1367

    
1368
    self.context.RemoveNode(node.name)
1369

    
1370
    rpc.call_node_leave_cluster(node.name)
1371

    
1372

    
1373
class LUQueryNodes(NoHooksLU):
1374
  """Logical unit for querying nodes.
1375

1376
  """
1377
  _OP_REQP = ["output_fields", "names"]
1378
  REQ_BGL = False
1379

    
1380
  def ExpandNames(self):
1381
    self.dynamic_fields = frozenset([
1382
      "dtotal", "dfree",
1383
      "mtotal", "mnode", "mfree",
1384
      "bootid",
1385
      "ctotal",
1386
      ])
1387

    
1388
    _CheckOutputFields(static=["name", "pinst_cnt", "sinst_cnt",
1389
                               "pinst_list", "sinst_list",
1390
                               "pip", "sip", "tags"],
1391
                       dynamic=self.dynamic_fields,
1392
                       selected=self.op.output_fields)
1393

    
1394
    self.needed_locks = {}
1395
    self.share_locks[locking.LEVEL_NODE] = 1
1396
    # TODO: we could lock nodes only if the user asked for dynamic fields. For
1397
    # that we need atomic ways to get info for a group of nodes from the
1398
    # config, though.
1399
    if not self.op.names:
1400
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1401
    else:
1402
      self.needed_locks[locking.LEVEL_NODE] = \
1403
        _GetWantedNodes(self, self.op.names)
1404

    
1405
  def CheckPrereq(self):
1406
    """Check prerequisites.
1407

1408
    """
1409
    # This of course is valid only if we locked the nodes
1410
    self.wanted = self.acquired_locks[locking.LEVEL_NODE]
1411

    
1412
  def Exec(self, feedback_fn):
1413
    """Computes the list of nodes and their attributes.
1414

1415
    """
1416
    nodenames = self.wanted
1417
    nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1418

    
1419
    # begin data gathering
1420

    
1421
    if self.dynamic_fields.intersection(self.op.output_fields):
1422
      live_data = {}
1423
      node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1424
      for name in nodenames:
1425
        nodeinfo = node_data.get(name, None)
1426
        if nodeinfo:
1427
          live_data[name] = {
1428
            "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1429
            "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1430
            "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1431
            "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1432
            "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1433
            "ctotal": utils.TryConvert(int, nodeinfo['cpu_total']),
1434
            "bootid": nodeinfo['bootid'],
1435
            }
1436
        else:
1437
          live_data[name] = {}
1438
    else:
1439
      live_data = dict.fromkeys(nodenames, {})
1440

    
1441
    node_to_primary = dict([(name, set()) for name in nodenames])
1442
    node_to_secondary = dict([(name, set()) for name in nodenames])
1443

    
1444
    inst_fields = frozenset(("pinst_cnt", "pinst_list",
1445
                             "sinst_cnt", "sinst_list"))
1446
    if inst_fields & frozenset(self.op.output_fields):
1447
      instancelist = self.cfg.GetInstanceList()
1448

    
1449
      for instance_name in instancelist:
1450
        inst = self.cfg.GetInstanceInfo(instance_name)
1451
        if inst.primary_node in node_to_primary:
1452
          node_to_primary[inst.primary_node].add(inst.name)
1453
        for secnode in inst.secondary_nodes:
1454
          if secnode in node_to_secondary:
1455
            node_to_secondary[secnode].add(inst.name)
1456

    
1457
    # end data gathering
1458

    
1459
    output = []
1460
    for node in nodelist:
1461
      node_output = []
1462
      for field in self.op.output_fields:
1463
        if field == "name":
1464
          val = node.name
1465
        elif field == "pinst_list":
1466
          val = list(node_to_primary[node.name])
1467
        elif field == "sinst_list":
1468
          val = list(node_to_secondary[node.name])
1469
        elif field == "pinst_cnt":
1470
          val = len(node_to_primary[node.name])
1471
        elif field == "sinst_cnt":
1472
          val = len(node_to_secondary[node.name])
1473
        elif field == "pip":
1474
          val = node.primary_ip
1475
        elif field == "sip":
1476
          val = node.secondary_ip
1477
        elif field == "tags":
1478
          val = list(node.GetTags())
1479
        elif field in self.dynamic_fields:
1480
          val = live_data[node.name].get(field, None)
1481
        else:
1482
          raise errors.ParameterError(field)
1483
        node_output.append(val)
1484
      output.append(node_output)
1485

    
1486
    return output
1487

    
1488

    
1489
class LUQueryNodeVolumes(NoHooksLU):
1490
  """Logical unit for getting volumes on node(s).
1491

1492
  """
1493
  _OP_REQP = ["nodes", "output_fields"]
1494
  REQ_BGL = False
1495

    
1496
  def ExpandNames(self):
1497
    _CheckOutputFields(static=["node"],
1498
                       dynamic=["phys", "vg", "name", "size", "instance"],
1499
                       selected=self.op.output_fields)
1500

    
1501
    self.needed_locks = {}
1502
    self.share_locks[locking.LEVEL_NODE] = 1
1503
    if not self.op.nodes:
1504
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1505
    else:
1506
      self.needed_locks[locking.LEVEL_NODE] = \
1507
        _GetWantedNodes(self, self.op.nodes)
1508

    
1509
  def CheckPrereq(self):
1510
    """Check prerequisites.
1511

1512
    This checks that the fields required are valid output fields.
1513

1514
    """
1515
    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
1516

    
1517
  def Exec(self, feedback_fn):
1518
    """Computes the list of nodes and their attributes.
1519

1520
    """
1521
    nodenames = self.nodes
1522
    volumes = rpc.call_node_volumes(nodenames)
1523

    
1524
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
1525
             in self.cfg.GetInstanceList()]
1526

    
1527
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1528

    
1529
    output = []
1530
    for node in nodenames:
1531
      if node not in volumes or not volumes[node]:
1532
        continue
1533

    
1534
      node_vols = volumes[node][:]
1535
      node_vols.sort(key=lambda vol: vol['dev'])
1536

    
1537
      for vol in node_vols:
1538
        node_output = []
1539
        for field in self.op.output_fields:
1540
          if field == "node":
1541
            val = node
1542
          elif field == "phys":
1543
            val = vol['dev']
1544
          elif field == "vg":
1545
            val = vol['vg']
1546
          elif field == "name":
1547
            val = vol['name']
1548
          elif field == "size":
1549
            val = int(float(vol['size']))
1550
          elif field == "instance":
1551
            for inst in ilist:
1552
              if node not in lv_by_node[inst]:
1553
                continue
1554
              if vol['name'] in lv_by_node[inst][node]:
1555
                val = inst.name
1556
                break
1557
            else:
1558
              val = '-'
1559
          else:
1560
            raise errors.ParameterError(field)
1561
          node_output.append(str(val))
1562

    
1563
        output.append(node_output)
1564

    
1565
    return output
1566

    
1567

    
1568
class LUAddNode(LogicalUnit):
1569
  """Logical unit for adding node to the cluster.
1570

1571
  """
1572
  HPATH = "node-add"
1573
  HTYPE = constants.HTYPE_NODE
1574
  _OP_REQP = ["node_name"]
1575

    
1576
  def BuildHooksEnv(self):
1577
    """Build hooks env.
1578

1579
    This will run on all nodes before, and on all nodes + the new node after.
1580

1581
    """
1582
    env = {
1583
      "OP_TARGET": self.op.node_name,
1584
      "NODE_NAME": self.op.node_name,
1585
      "NODE_PIP": self.op.primary_ip,
1586
      "NODE_SIP": self.op.secondary_ip,
1587
      }
1588
    nodes_0 = self.cfg.GetNodeList()
1589
    nodes_1 = nodes_0 + [self.op.node_name, ]
1590
    return env, nodes_0, nodes_1
1591

    
1592
  def CheckPrereq(self):
1593
    """Check prerequisites.
1594

1595
    This checks:
1596
     - the new node is not already in the config
1597
     - it is resolvable
1598
     - its parameters (single/dual homed) matches the cluster
1599

1600
    Any errors are signalled by raising errors.OpPrereqError.
1601

1602
    """
1603
    node_name = self.op.node_name
1604
    cfg = self.cfg
1605

    
1606
    dns_data = utils.HostInfo(node_name)
1607

    
1608
    node = dns_data.name
1609
    primary_ip = self.op.primary_ip = dns_data.ip
1610
    secondary_ip = getattr(self.op, "secondary_ip", None)
1611
    if secondary_ip is None:
1612
      secondary_ip = primary_ip
1613
    if not utils.IsValidIP(secondary_ip):
1614
      raise errors.OpPrereqError("Invalid secondary IP given")
1615
    self.op.secondary_ip = secondary_ip
1616

    
1617
    node_list = cfg.GetNodeList()
1618
    if not self.op.readd and node in node_list:
1619
      raise errors.OpPrereqError("Node %s is already in the configuration" %
1620
                                 node)
1621
    elif self.op.readd and node not in node_list:
1622
      raise errors.OpPrereqError("Node %s is not in the configuration" % node)
1623

    
1624
    for existing_node_name in node_list:
1625
      existing_node = cfg.GetNodeInfo(existing_node_name)
1626

    
1627
      if self.op.readd and node == existing_node_name:
1628
        if (existing_node.primary_ip != primary_ip or
1629
            existing_node.secondary_ip != secondary_ip):
1630
          raise errors.OpPrereqError("Readded node doesn't have the same IP"
1631
                                     " address configuration as before")
1632
        continue
1633

    
1634
      if (existing_node.primary_ip == primary_ip or
1635
          existing_node.secondary_ip == primary_ip or
1636
          existing_node.primary_ip == secondary_ip or
1637
          existing_node.secondary_ip == secondary_ip):
1638
        raise errors.OpPrereqError("New node ip address(es) conflict with"
1639
                                   " existing node %s" % existing_node.name)
1640

    
1641
    # check that the type of the node (single versus dual homed) is the
1642
    # same as for the master
1643
    myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1644
    master_singlehomed = myself.secondary_ip == myself.primary_ip
1645
    newbie_singlehomed = secondary_ip == primary_ip
1646
    if master_singlehomed != newbie_singlehomed:
1647
      if master_singlehomed:
1648
        raise errors.OpPrereqError("The master has no private ip but the"
1649
                                   " new node has one")
1650
      else:
1651
        raise errors.OpPrereqError("The master has a private ip but the"
1652
                                   " new node doesn't have one")
1653

    
1654
    # checks reachablity
1655
    if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1656
      raise errors.OpPrereqError("Node not reachable by ping")
1657

    
1658
    if not newbie_singlehomed:
1659
      # check reachability from my secondary ip to newbie's secondary ip
1660
      if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1661
                           source=myself.secondary_ip):
1662
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1663
                                   " based ping to noded port")
1664

    
1665
    self.new_node = objects.Node(name=node,
1666
                                 primary_ip=primary_ip,
1667
                                 secondary_ip=secondary_ip)
1668

    
1669
  def Exec(self, feedback_fn):
1670
    """Adds the new node to the cluster.
1671

1672
    """
1673
    new_node = self.new_node
1674
    node = new_node.name
1675

    
1676
    # check connectivity
1677
    result = rpc.call_version([node])[node]
1678
    if result:
1679
      if constants.PROTOCOL_VERSION == result:
1680
        logger.Info("communication to node %s fine, sw version %s match" %
1681
                    (node, result))
1682
      else:
1683
        raise errors.OpExecError("Version mismatch master version %s,"
1684
                                 " node version %s" %
1685
                                 (constants.PROTOCOL_VERSION, result))
1686
    else:
1687
      raise errors.OpExecError("Cannot get version from the new node")
1688

    
1689
    # setup ssh on node
1690
    logger.Info("copy ssh key to node %s" % node)
1691
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1692
    keyarray = []
1693
    keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1694
                constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1695
                priv_key, pub_key]
1696

    
1697
    for i in keyfiles:
1698
      f = open(i, 'r')
1699
      try:
1700
        keyarray.append(f.read())
1701
      finally:
1702
        f.close()
1703

    
1704
    result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1705
                               keyarray[3], keyarray[4], keyarray[5])
1706

    
1707
    if not result:
1708
      raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1709

    
1710
    # Add node to our /etc/hosts, and add key to known_hosts
1711
    utils.AddHostToEtcHosts(new_node.name)
1712

    
1713
    if new_node.secondary_ip != new_node.primary_ip:
1714
      if not rpc.call_node_tcp_ping(new_node.name,
1715
                                    constants.LOCALHOST_IP_ADDRESS,
1716
                                    new_node.secondary_ip,
1717
                                    constants.DEFAULT_NODED_PORT,
1718
                                    10, False):
1719
        raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1720
                                 " you gave (%s). Please fix and re-run this"
1721
                                 " command." % new_node.secondary_ip)
1722

    
1723
    node_verify_list = [self.sstore.GetMasterNode()]
1724
    node_verify_param = {
1725
      'nodelist': [node],
1726
      # TODO: do a node-net-test as well?
1727
    }
1728

    
1729
    result = rpc.call_node_verify(node_verify_list, node_verify_param)
1730
    for verifier in node_verify_list:
1731
      if not result[verifier]:
1732
        raise errors.OpExecError("Cannot communicate with %s's node daemon"
1733
                                 " for remote verification" % verifier)
1734
      if result[verifier]['nodelist']:
1735
        for failed in result[verifier]['nodelist']:
1736
          feedback_fn("ssh/hostname verification failed %s -> %s" %
1737
                      (verifier, result[verifier]['nodelist'][failed]))
1738
        raise errors.OpExecError("ssh/hostname verification failed.")
1739

    
1740
    # Distribute updated /etc/hosts and known_hosts to all nodes,
1741
    # including the node just added
1742
    myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1743
    dist_nodes = self.cfg.GetNodeList()
1744
    if not self.op.readd:
1745
      dist_nodes.append(node)
1746
    if myself.name in dist_nodes:
1747
      dist_nodes.remove(myself.name)
1748

    
1749
    logger.Debug("Copying hosts and known_hosts to all nodes")
1750
    for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1751
      result = rpc.call_upload_file(dist_nodes, fname)
1752
      for to_node in dist_nodes:
1753
        if not result[to_node]:
1754
          logger.Error("copy of file %s to node %s failed" %
1755
                       (fname, to_node))
1756

    
1757
    to_copy = self.sstore.GetFileList()
1758
    if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1759
      to_copy.append(constants.VNC_PASSWORD_FILE)
1760
    for fname in to_copy:
1761
      result = rpc.call_upload_file([node], fname)
1762
      if not result[node]:
1763
        logger.Error("could not copy file %s to node %s" % (fname, node))
1764

    
1765
    if self.op.readd:
1766
      self.context.ReaddNode(new_node)
1767
    else:
1768
      self.context.AddNode(new_node)
1769

    
1770

    
1771
class LUQueryClusterInfo(NoHooksLU):
1772
  """Query cluster configuration.
1773

1774
  """
1775
  _OP_REQP = []
1776
  REQ_MASTER = False
1777
  REQ_BGL = False
1778

    
1779
  def ExpandNames(self):
1780
    self.needed_locks = {}
1781

    
1782
  def CheckPrereq(self):
1783
    """No prerequsites needed for this LU.
1784

1785
    """
1786
    pass
1787

    
1788
  def Exec(self, feedback_fn):
1789
    """Return cluster config.
1790

1791
    """
1792
    result = {
1793
      "name": self.sstore.GetClusterName(),
1794
      "software_version": constants.RELEASE_VERSION,
1795
      "protocol_version": constants.PROTOCOL_VERSION,
1796
      "config_version": constants.CONFIG_VERSION,
1797
      "os_api_version": constants.OS_API_VERSION,
1798
      "export_version": constants.EXPORT_VERSION,
1799
      "master": self.sstore.GetMasterNode(),
1800
      "architecture": (platform.architecture()[0], platform.machine()),
1801
      "hypervisor_type": self.sstore.GetHypervisorType(),
1802
      }
1803

    
1804
    return result
1805

    
1806

    
1807
class LUDumpClusterConfig(NoHooksLU):
1808
  """Return a text-representation of the cluster-config.
1809

1810
  """
1811
  _OP_REQP = []
1812
  REQ_BGL = False
1813

    
1814
  def ExpandNames(self):
1815
    self.needed_locks = {}
1816

    
1817
  def CheckPrereq(self):
1818
    """No prerequisites.
1819

1820
    """
1821
    pass
1822

    
1823
  def Exec(self, feedback_fn):
1824
    """Dump a representation of the cluster config to the standard output.
1825

1826
    """
1827
    return self.cfg.DumpConfig()
1828

    
1829

    
1830
class LUActivateInstanceDisks(NoHooksLU):
1831
  """Bring up an instance's disks.
1832

1833
  """
1834
  _OP_REQP = ["instance_name"]
1835
  REQ_BGL = False
1836

    
1837
  def ExpandNames(self):
1838
    self._ExpandAndLockInstance()
1839
    self.needed_locks[locking.LEVEL_NODE] = []
1840
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1841

    
1842
  def DeclareLocks(self, level):
1843
    if level == locking.LEVEL_NODE:
1844
      self._LockInstancesNodes()
1845

    
1846
  def CheckPrereq(self):
1847
    """Check prerequisites.
1848

1849
    This checks that the instance is in the cluster.
1850

1851
    """
1852
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
1853
    assert self.instance is not None, \
1854
      "Cannot retrieve locked instance %s" % self.op.instance_name
1855

    
1856
  def Exec(self, feedback_fn):
1857
    """Activate the disks.
1858

1859
    """
1860
    disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
1861
    if not disks_ok:
1862
      raise errors.OpExecError("Cannot activate block devices")
1863

    
1864
    return disks_info
1865

    
1866

    
1867
def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
1868
  """Prepare the block devices for an instance.
1869

1870
  This sets up the block devices on all nodes.
1871

1872
  Args:
1873
    instance: a ganeti.objects.Instance object
1874
    ignore_secondaries: if true, errors on secondary nodes won't result
1875
                        in an error return from the function
1876

1877
  Returns:
1878
    false if the operation failed
1879
    list of (host, instance_visible_name, node_visible_name) if the operation
1880
         suceeded with the mapping from node devices to instance devices
1881
  """
1882
  device_info = []
1883
  disks_ok = True
1884
  iname = instance.name
1885
  # With the two passes mechanism we try to reduce the window of
1886
  # opportunity for the race condition of switching DRBD to primary
1887
  # before handshaking occured, but we do not eliminate it
1888

    
1889
  # The proper fix would be to wait (with some limits) until the
1890
  # connection has been made and drbd transitions from WFConnection
1891
  # into any other network-connected state (Connected, SyncTarget,
1892
  # SyncSource, etc.)
1893

    
1894
  # 1st pass, assemble on all nodes in secondary mode
1895
  for inst_disk in instance.disks:
1896
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1897
      cfg.SetDiskID(node_disk, node)
1898
      result = rpc.call_blockdev_assemble(node, node_disk, iname, False)
1899
      if not result:
1900
        logger.Error("could not prepare block device %s on node %s"
1901
                     " (is_primary=False, pass=1)" % (inst_disk.iv_name, node))
1902
        if not ignore_secondaries:
1903
          disks_ok = False
1904

    
1905
  # FIXME: race condition on drbd migration to primary
1906

    
1907
  # 2nd pass, do only the primary node
1908
  for inst_disk in instance.disks:
1909
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1910
      if node != instance.primary_node:
1911
        continue
1912
      cfg.SetDiskID(node_disk, node)
1913
      result = rpc.call_blockdev_assemble(node, node_disk, iname, True)
1914
      if not result:
1915
        logger.Error("could not prepare block device %s on node %s"
1916
                     " (is_primary=True, pass=2)" % (inst_disk.iv_name, node))
1917
        disks_ok = False
1918
    device_info.append((instance.primary_node, inst_disk.iv_name, result))
1919

    
1920
  # leave the disks configured for the primary node
1921
  # this is a workaround that would be fixed better by
1922
  # improving the logical/physical id handling
1923
  for disk in instance.disks:
1924
    cfg.SetDiskID(disk, instance.primary_node)
1925

    
1926
  return disks_ok, device_info
1927

    
1928

    
1929
def _StartInstanceDisks(cfg, instance, force):
1930
  """Start the disks of an instance.
1931

1932
  """
1933
  disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
1934
                                           ignore_secondaries=force)
1935
  if not disks_ok:
1936
    _ShutdownInstanceDisks(instance, cfg)
1937
    if force is not None and not force:
1938
      logger.Error("If the message above refers to a secondary node,"
1939
                   " you can retry the operation using '--force'.")
1940
    raise errors.OpExecError("Disk consistency error")
1941

    
1942

    
1943
class LUDeactivateInstanceDisks(NoHooksLU):
1944
  """Shutdown an instance's disks.
1945

1946
  """
1947
  _OP_REQP = ["instance_name"]
1948
  REQ_BGL = False
1949

    
1950
  def ExpandNames(self):
1951
    self._ExpandAndLockInstance()
1952
    self.needed_locks[locking.LEVEL_NODE] = []
1953
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1954

    
1955
  def DeclareLocks(self, level):
1956
    if level == locking.LEVEL_NODE:
1957
      self._LockInstancesNodes()
1958

    
1959
  def CheckPrereq(self):
1960
    """Check prerequisites.
1961

1962
    This checks that the instance is in the cluster.
1963

1964
    """
1965
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
1966
    assert self.instance is not None, \
1967
      "Cannot retrieve locked instance %s" % self.op.instance_name
1968

    
1969
  def Exec(self, feedback_fn):
1970
    """Deactivate the disks
1971

1972
    """
1973
    instance = self.instance
1974
    _SafeShutdownInstanceDisks(instance, self.cfg)
1975

    
1976

    
1977
def _SafeShutdownInstanceDisks(instance, cfg):
1978
  """Shutdown block devices of an instance.
1979

1980
  This function checks if an instance is running, before calling
1981
  _ShutdownInstanceDisks.
1982

1983
  """
1984
  ins_l = rpc.call_instance_list([instance.primary_node])
1985
  ins_l = ins_l[instance.primary_node]
1986
  if not type(ins_l) is list:
1987
    raise errors.OpExecError("Can't contact node '%s'" %
1988
                             instance.primary_node)
1989

    
1990
  if instance.name in ins_l:
1991
    raise errors.OpExecError("Instance is running, can't shutdown"
1992
                             " block devices.")
1993

    
1994
  _ShutdownInstanceDisks(instance, cfg)
1995

    
1996

    
1997
def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
1998
  """Shutdown block devices of an instance.
1999

2000
  This does the shutdown on all nodes of the instance.
2001

2002
  If the ignore_primary is false, errors on the primary node are
2003
  ignored.
2004

2005
  """
2006
  result = True
2007
  for disk in instance.disks:
2008
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2009
      cfg.SetDiskID(top_disk, node)
2010
      if not rpc.call_blockdev_shutdown(node, top_disk):
2011
        logger.Error("could not shutdown block device %s on node %s" %
2012
                     (disk.iv_name, node))
2013
        if not ignore_primary or node != instance.primary_node:
2014
          result = False
2015
  return result
2016

    
2017

    
2018
def _CheckNodeFreeMemory(cfg, node, reason, requested):
2019
  """Checks if a node has enough free memory.
2020

2021
  This function check if a given node has the needed amount of free
2022
  memory. In case the node has less memory or we cannot get the
2023
  information from the node, this function raise an OpPrereqError
2024
  exception.
2025

2026
  Args:
2027
    - cfg: a ConfigWriter instance
2028
    - node: the node name
2029
    - reason: string to use in the error message
2030
    - requested: the amount of memory in MiB
2031

2032
  """
2033
  nodeinfo = rpc.call_node_info([node], cfg.GetVGName())
2034
  if not nodeinfo or not isinstance(nodeinfo, dict):
2035
    raise errors.OpPrereqError("Could not contact node %s for resource"
2036
                             " information" % (node,))
2037

    
2038
  free_mem = nodeinfo[node].get('memory_free')
2039
  if not isinstance(free_mem, int):
2040
    raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2041
                             " was '%s'" % (node, free_mem))
2042
  if requested > free_mem:
2043
    raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2044
                             " needed %s MiB, available %s MiB" %
2045
                             (node, reason, requested, free_mem))
2046

    
2047

    
2048
class LUStartupInstance(LogicalUnit):
2049
  """Starts an instance.
2050

2051
  """
2052
  HPATH = "instance-start"
2053
  HTYPE = constants.HTYPE_INSTANCE
2054
  _OP_REQP = ["instance_name", "force"]
2055
  REQ_BGL = False
2056

    
2057
  def ExpandNames(self):
2058
    self._ExpandAndLockInstance()
2059
    self.needed_locks[locking.LEVEL_NODE] = []
2060
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2061

    
2062
  def DeclareLocks(self, level):
2063
    if level == locking.LEVEL_NODE:
2064
      self._LockInstancesNodes()
2065

    
2066
  def BuildHooksEnv(self):
2067
    """Build hooks env.
2068

2069
    This runs on master, primary and secondary nodes of the instance.
2070

2071
    """
2072
    env = {
2073
      "FORCE": self.op.force,
2074
      }
2075
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2076
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2077
          list(self.instance.secondary_nodes))
2078
    return env, nl, nl
2079

    
2080
  def CheckPrereq(self):
2081
    """Check prerequisites.
2082

2083
    This checks that the instance is in the cluster.
2084

2085
    """
2086
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2087
    assert self.instance is not None, \
2088
      "Cannot retrieve locked instance %s" % self.op.instance_name
2089

    
2090
    # check bridges existance
2091
    _CheckInstanceBridgesExist(instance)
2092

    
2093
    _CheckNodeFreeMemory(self.cfg, instance.primary_node,
2094
                         "starting instance %s" % instance.name,
2095
                         instance.memory)
2096

    
2097
  def Exec(self, feedback_fn):
2098
    """Start the instance.
2099

2100
    """
2101
    instance = self.instance
2102
    force = self.op.force
2103
    extra_args = getattr(self.op, "extra_args", "")
2104

    
2105
    self.cfg.MarkInstanceUp(instance.name)
2106

    
2107
    node_current = instance.primary_node
2108

    
2109
    _StartInstanceDisks(self.cfg, instance, force)
2110

    
2111
    if not rpc.call_instance_start(node_current, instance, extra_args):
2112
      _ShutdownInstanceDisks(instance, self.cfg)
2113
      raise errors.OpExecError("Could not start instance")
2114

    
2115

    
2116
class LURebootInstance(LogicalUnit):
2117
  """Reboot an instance.
2118

2119
  """
2120
  HPATH = "instance-reboot"
2121
  HTYPE = constants.HTYPE_INSTANCE
2122
  _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2123
  REQ_BGL = False
2124

    
2125
  def ExpandNames(self):
2126
    if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2127
                                   constants.INSTANCE_REBOOT_HARD,
2128
                                   constants.INSTANCE_REBOOT_FULL]:
2129
      raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2130
                                  (constants.INSTANCE_REBOOT_SOFT,
2131
                                   constants.INSTANCE_REBOOT_HARD,
2132
                                   constants.INSTANCE_REBOOT_FULL))
2133
    self._ExpandAndLockInstance()
2134
    self.needed_locks[locking.LEVEL_NODE] = []
2135
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2136

    
2137
  def DeclareLocks(self, level):
2138
    if level == locking.LEVEL_NODE:
2139
      primary_only = not constants.INSTANCE_REBOOT_FULL
2140
      self._LockInstancesNodes(primary_only=primary_only)
2141

    
2142
  def BuildHooksEnv(self):
2143
    """Build hooks env.
2144

2145
    This runs on master, primary and secondary nodes of the instance.
2146

2147
    """
2148
    env = {
2149
      "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2150
      }
2151
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2152
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2153
          list(self.instance.secondary_nodes))
2154
    return env, nl, nl
2155

    
2156
  def CheckPrereq(self):
2157
    """Check prerequisites.
2158

2159
    This checks that the instance is in the cluster.
2160

2161
    """
2162
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2163
    assert self.instance is not None, \
2164
      "Cannot retrieve locked instance %s" % self.op.instance_name
2165

    
2166
    # check bridges existance
2167
    _CheckInstanceBridgesExist(instance)
2168

    
2169
  def Exec(self, feedback_fn):
2170
    """Reboot the instance.
2171

2172
    """
2173
    instance = self.instance
2174
    ignore_secondaries = self.op.ignore_secondaries
2175
    reboot_type = self.op.reboot_type
2176
    extra_args = getattr(self.op, "extra_args", "")
2177

    
2178
    node_current = instance.primary_node
2179

    
2180
    if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2181
                       constants.INSTANCE_REBOOT_HARD]:
2182
      if not rpc.call_instance_reboot(node_current, instance,
2183
                                      reboot_type, extra_args):
2184
        raise errors.OpExecError("Could not reboot instance")
2185
    else:
2186
      if not rpc.call_instance_shutdown(node_current, instance):
2187
        raise errors.OpExecError("could not shutdown instance for full reboot")
2188
      _ShutdownInstanceDisks(instance, self.cfg)
2189
      _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
2190
      if not rpc.call_instance_start(node_current, instance, extra_args):
2191
        _ShutdownInstanceDisks(instance, self.cfg)
2192
        raise errors.OpExecError("Could not start instance for full reboot")
2193

    
2194
    self.cfg.MarkInstanceUp(instance.name)
2195

    
2196

    
2197
class LUShutdownInstance(LogicalUnit):
2198
  """Shutdown an instance.
2199

2200
  """
2201
  HPATH = "instance-stop"
2202
  HTYPE = constants.HTYPE_INSTANCE
2203
  _OP_REQP = ["instance_name"]
2204
  REQ_BGL = False
2205

    
2206
  def ExpandNames(self):
2207
    self._ExpandAndLockInstance()
2208
    self.needed_locks[locking.LEVEL_NODE] = []
2209
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2210

    
2211
  def DeclareLocks(self, level):
2212
    if level == locking.LEVEL_NODE:
2213
      self._LockInstancesNodes()
2214

    
2215
  def BuildHooksEnv(self):
2216
    """Build hooks env.
2217

2218
    This runs on master, primary and secondary nodes of the instance.
2219

2220
    """
2221
    env = _BuildInstanceHookEnvByObject(self.instance)
2222
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2223
          list(self.instance.secondary_nodes))
2224
    return env, nl, nl
2225

    
2226
  def CheckPrereq(self):
2227
    """Check prerequisites.
2228

2229
    This checks that the instance is in the cluster.
2230

2231
    """
2232
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2233
    assert self.instance is not None, \
2234
      "Cannot retrieve locked instance %s" % self.op.instance_name
2235

    
2236
  def Exec(self, feedback_fn):
2237
    """Shutdown the instance.
2238

2239
    """
2240
    instance = self.instance
2241
    node_current = instance.primary_node
2242
    self.cfg.MarkInstanceDown(instance.name)
2243
    if not rpc.call_instance_shutdown(node_current, instance):
2244
      logger.Error("could not shutdown instance")
2245

    
2246
    _ShutdownInstanceDisks(instance, self.cfg)
2247

    
2248

    
2249
class LUReinstallInstance(LogicalUnit):
2250
  """Reinstall an instance.
2251

2252
  """
2253
  HPATH = "instance-reinstall"
2254
  HTYPE = constants.HTYPE_INSTANCE
2255
  _OP_REQP = ["instance_name"]
2256
  REQ_BGL = False
2257

    
2258
  def ExpandNames(self):
2259
    self._ExpandAndLockInstance()
2260
    self.needed_locks[locking.LEVEL_NODE] = []
2261
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2262

    
2263
  def DeclareLocks(self, level):
2264
    if level == locking.LEVEL_NODE:
2265
      self._LockInstancesNodes()
2266

    
2267
  def BuildHooksEnv(self):
2268
    """Build hooks env.
2269

2270
    This runs on master, primary and secondary nodes of the instance.
2271

2272
    """
2273
    env = _BuildInstanceHookEnvByObject(self.instance)
2274
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2275
          list(self.instance.secondary_nodes))
2276
    return env, nl, nl
2277

    
2278
  def CheckPrereq(self):
2279
    """Check prerequisites.
2280

2281
    This checks that the instance is in the cluster and is not running.
2282

2283
    """
2284
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2285
    assert instance is not None, \
2286
      "Cannot retrieve locked instance %s" % self.op.instance_name
2287

    
2288
    if instance.disk_template == constants.DT_DISKLESS:
2289
      raise errors.OpPrereqError("Instance '%s' has no disks" %
2290
                                 self.op.instance_name)
2291
    if instance.status != "down":
2292
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2293
                                 self.op.instance_name)
2294
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2295
    if remote_info:
2296
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2297
                                 (self.op.instance_name,
2298
                                  instance.primary_node))
2299

    
2300
    self.op.os_type = getattr(self.op, "os_type", None)
2301
    if self.op.os_type is not None:
2302
      # OS verification
2303
      pnode = self.cfg.GetNodeInfo(
2304
        self.cfg.ExpandNodeName(instance.primary_node))
2305
      if pnode is None:
2306
        raise errors.OpPrereqError("Primary node '%s' is unknown" %
2307
                                   self.op.pnode)
2308
      os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2309
      if not os_obj:
2310
        raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2311
                                   " primary node"  % self.op.os_type)
2312

    
2313
    self.instance = instance
2314

    
2315
  def Exec(self, feedback_fn):
2316
    """Reinstall the instance.
2317

2318
    """
2319
    inst = self.instance
2320

    
2321
    if self.op.os_type is not None:
2322
      feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2323
      inst.os = self.op.os_type
2324
      self.cfg.AddInstance(inst)
2325

    
2326
    _StartInstanceDisks(self.cfg, inst, None)
2327
    try:
2328
      feedback_fn("Running the instance OS create scripts...")
2329
      if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2330
        raise errors.OpExecError("Could not install OS for instance %s"
2331
                                 " on node %s" %
2332
                                 (inst.name, inst.primary_node))
2333
    finally:
2334
      _ShutdownInstanceDisks(inst, self.cfg)
2335

    
2336

    
2337
class LURenameInstance(LogicalUnit):
2338
  """Rename an instance.
2339

2340
  """
2341
  HPATH = "instance-rename"
2342
  HTYPE = constants.HTYPE_INSTANCE
2343
  _OP_REQP = ["instance_name", "new_name"]
2344

    
2345
  def BuildHooksEnv(self):
2346
    """Build hooks env.
2347

2348
    This runs on master, primary and secondary nodes of the instance.
2349

2350
    """
2351
    env = _BuildInstanceHookEnvByObject(self.instance)
2352
    env["INSTANCE_NEW_NAME"] = self.op.new_name
2353
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2354
          list(self.instance.secondary_nodes))
2355
    return env, nl, nl
2356

    
2357
  def CheckPrereq(self):
2358
    """Check prerequisites.
2359

2360
    This checks that the instance is in the cluster and is not running.
2361

2362
    """
2363
    instance = self.cfg.GetInstanceInfo(
2364
      self.cfg.ExpandInstanceName(self.op.instance_name))
2365
    if instance is None:
2366
      raise errors.OpPrereqError("Instance '%s' not known" %
2367
                                 self.op.instance_name)
2368
    if instance.status != "down":
2369
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2370
                                 self.op.instance_name)
2371
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2372
    if remote_info:
2373
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2374
                                 (self.op.instance_name,
2375
                                  instance.primary_node))
2376
    self.instance = instance
2377

    
2378
    # new name verification
2379
    name_info = utils.HostInfo(self.op.new_name)
2380

    
2381
    self.op.new_name = new_name = name_info.name
2382
    instance_list = self.cfg.GetInstanceList()
2383
    if new_name in instance_list:
2384
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2385
                                 new_name)
2386

    
2387
    if not getattr(self.op, "ignore_ip", False):
2388
      if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
2389
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
2390
                                   (name_info.ip, new_name))
2391

    
2392

    
2393
  def Exec(self, feedback_fn):
2394
    """Reinstall the instance.
2395

2396
    """
2397
    inst = self.instance
2398
    old_name = inst.name
2399

    
2400
    if inst.disk_template == constants.DT_FILE:
2401
      old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2402

    
2403
    self.cfg.RenameInstance(inst.name, self.op.new_name)
2404
    # Change the instance lock. This is definitely safe while we hold the BGL
2405
    self.context.glm.remove(locking.LEVEL_INSTANCE, inst.name)
2406
    self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
2407

    
2408
    # re-read the instance from the configuration after rename
2409
    inst = self.cfg.GetInstanceInfo(self.op.new_name)
2410

    
2411
    if inst.disk_template == constants.DT_FILE:
2412
      new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2413
      result = rpc.call_file_storage_dir_rename(inst.primary_node,
2414
                                                old_file_storage_dir,
2415
                                                new_file_storage_dir)
2416

    
2417
      if not result:
2418
        raise errors.OpExecError("Could not connect to node '%s' to rename"
2419
                                 " directory '%s' to '%s' (but the instance"
2420
                                 " has been renamed in Ganeti)" % (
2421
                                 inst.primary_node, old_file_storage_dir,
2422
                                 new_file_storage_dir))
2423

    
2424
      if not result[0]:
2425
        raise errors.OpExecError("Could not rename directory '%s' to '%s'"
2426
                                 " (but the instance has been renamed in"
2427
                                 " Ganeti)" % (old_file_storage_dir,
2428
                                               new_file_storage_dir))
2429

    
2430
    _StartInstanceDisks(self.cfg, inst, None)
2431
    try:
2432
      if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2433
                                          "sda", "sdb"):
2434
        msg = ("Could not run OS rename script for instance %s on node %s"
2435
               " (but the instance has been renamed in Ganeti)" %
2436
               (inst.name, inst.primary_node))
2437
        logger.Error(msg)
2438
    finally:
2439
      _ShutdownInstanceDisks(inst, self.cfg)
2440

    
2441

    
2442
class LURemoveInstance(LogicalUnit):
2443
  """Remove an instance.
2444

2445
  """
2446
  HPATH = "instance-remove"
2447
  HTYPE = constants.HTYPE_INSTANCE
2448
  _OP_REQP = ["instance_name", "ignore_failures"]
2449

    
2450
  def BuildHooksEnv(self):
2451
    """Build hooks env.
2452

2453
    This runs on master, primary and secondary nodes of the instance.
2454

2455
    """
2456
    env = _BuildInstanceHookEnvByObject(self.instance)
2457
    nl = [self.sstore.GetMasterNode()]
2458
    return env, nl, nl
2459

    
2460
  def CheckPrereq(self):
2461
    """Check prerequisites.
2462

2463
    This checks that the instance is in the cluster.
2464

2465
    """
2466
    instance = self.cfg.GetInstanceInfo(
2467
      self.cfg.ExpandInstanceName(self.op.instance_name))
2468
    if instance is None:
2469
      raise errors.OpPrereqError("Instance '%s' not known" %
2470
                                 self.op.instance_name)
2471
    self.instance = instance
2472

    
2473
  def Exec(self, feedback_fn):
2474
    """Remove the instance.
2475

2476
    """
2477
    instance = self.instance
2478
    logger.Info("shutting down instance %s on node %s" %
2479
                (instance.name, instance.primary_node))
2480

    
2481
    if not rpc.call_instance_shutdown(instance.primary_node, instance):
2482
      if self.op.ignore_failures:
2483
        feedback_fn("Warning: can't shutdown instance")
2484
      else:
2485
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2486
                                 (instance.name, instance.primary_node))
2487

    
2488
    logger.Info("removing block devices for instance %s" % instance.name)
2489

    
2490
    if not _RemoveDisks(instance, self.cfg):
2491
      if self.op.ignore_failures:
2492
        feedback_fn("Warning: can't remove instance's disks")
2493
      else:
2494
        raise errors.OpExecError("Can't remove instance's disks")
2495

    
2496
    logger.Info("removing instance %s out of cluster config" % instance.name)
2497

    
2498
    self.cfg.RemoveInstance(instance.name)
2499
    # Remove the new instance from the Ganeti Lock Manager
2500
    self.context.glm.remove(locking.LEVEL_INSTANCE, instance.name)
2501

    
2502

    
2503
class LUQueryInstances(NoHooksLU):
2504
  """Logical unit for querying instances.
2505

2506
  """
2507
  _OP_REQP = ["output_fields", "names"]
2508
  REQ_BGL = False
2509

    
2510
  def ExpandNames(self):
2511
    self.dynamic_fields = frozenset(["oper_state", "oper_ram", "status"])
2512
    _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2513
                               "admin_state", "admin_ram",
2514
                               "disk_template", "ip", "mac", "bridge",
2515
                               "sda_size", "sdb_size", "vcpus", "tags",
2516
                               "auto_balance",
2517
                               "network_port", "kernel_path", "initrd_path",
2518
                               "hvm_boot_order", "hvm_acpi", "hvm_pae",
2519
                               "hvm_cdrom_image_path", "hvm_nic_type",
2520
                               "hvm_disk_type", "vnc_bind_address"],
2521
                       dynamic=self.dynamic_fields,
2522
                       selected=self.op.output_fields)
2523

    
2524
    self.needed_locks = {}
2525
    self.share_locks[locking.LEVEL_INSTANCE] = 1
2526
    self.share_locks[locking.LEVEL_NODE] = 1
2527

    
2528
    # TODO: we could lock instances (and nodes) only if the user asked for
2529
    # dynamic fields. For that we need atomic ways to get info for a group of
2530
    # instances from the config, though.
2531
    if not self.op.names:
2532
      self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
2533
    else:
2534
      self.needed_locks[locking.LEVEL_INSTANCE] = \
2535
        _GetWantedInstances(self, self.op.names)
2536

    
2537
    self.needed_locks[locking.LEVEL_NODE] = []
2538
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2539

    
2540
  def DeclareLocks(self, level):
2541
    # TODO: locking of nodes could be avoided when not querying them
2542
    if level == locking.LEVEL_NODE:
2543
      self._LockInstancesNodes()
2544

    
2545
  def CheckPrereq(self):
2546
    """Check prerequisites.
2547

2548
    """
2549
    # This of course is valid only if we locked the instances
2550
    self.wanted = self.acquired_locks[locking.LEVEL_INSTANCE]
2551

    
2552
  def Exec(self, feedback_fn):
2553
    """Computes the list of nodes and their attributes.
2554

2555
    """
2556
    instance_names = self.wanted
2557
    instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2558
                     in instance_names]
2559

    
2560
    # begin data gathering
2561

    
2562
    nodes = frozenset([inst.primary_node for inst in instance_list])
2563

    
2564
    bad_nodes = []
2565
    if self.dynamic_fields.intersection(self.op.output_fields):
2566
      live_data = {}
2567
      node_data = rpc.call_all_instances_info(nodes)
2568
      for name in nodes:
2569
        result = node_data[name]
2570
        if result:
2571
          live_data.update(result)
2572
        elif result == False:
2573
          bad_nodes.append(name)
2574
        # else no instance is alive
2575
    else:
2576
      live_data = dict([(name, {}) for name in instance_names])
2577

    
2578
    # end data gathering
2579

    
2580
    output = []
2581
    for instance in instance_list:
2582
      iout = []
2583
      for field in self.op.output_fields:
2584
        if field == "name":
2585
          val = instance.name
2586
        elif field == "os":
2587
          val = instance.os
2588
        elif field == "pnode":
2589
          val = instance.primary_node
2590
        elif field == "snodes":
2591
          val = list(instance.secondary_nodes)
2592
        elif field == "admin_state":
2593
          val = (instance.status != "down")
2594
        elif field == "oper_state":
2595
          if instance.primary_node in bad_nodes:
2596
            val = None
2597
          else:
2598
            val = bool(live_data.get(instance.name))
2599
        elif field == "status":
2600
          if instance.primary_node in bad_nodes:
2601
            val = "ERROR_nodedown"
2602
          else:
2603
            running = bool(live_data.get(instance.name))
2604
            if running:
2605
              if instance.status != "down":
2606
                val = "running"
2607
              else:
2608
                val = "ERROR_up"
2609
            else:
2610
              if instance.status != "down":
2611
                val = "ERROR_down"
2612
              else:
2613
                val = "ADMIN_down"
2614
        elif field == "admin_ram":
2615
          val = instance.memory
2616
        elif field == "oper_ram":
2617
          if instance.primary_node in bad_nodes:
2618
            val = None
2619
          elif instance.name in live_data:
2620
            val = live_data[instance.name].get("memory", "?")
2621
          else:
2622
            val = "-"
2623
        elif field == "disk_template":
2624
          val = instance.disk_template
2625
        elif field == "ip":
2626
          val = instance.nics[0].ip
2627
        elif field == "bridge":
2628
          val = instance.nics[0].bridge
2629
        elif field == "mac":
2630
          val = instance.nics[0].mac
2631
        elif field == "sda_size" or field == "sdb_size":
2632
          disk = instance.FindDisk(field[:3])
2633
          if disk is None:
2634
            val = None
2635
          else:
2636
            val = disk.size
2637
        elif field == "vcpus":
2638
          val = instance.vcpus
2639
        elif field == "tags":
2640
          val = list(instance.GetTags())
2641
        elif field in ("network_port", "kernel_path", "initrd_path",
2642
                       "hvm_boot_order", "hvm_acpi", "hvm_pae",
2643
                       "hvm_cdrom_image_path", "hvm_nic_type",
2644
                       "hvm_disk_type", "vnc_bind_address"):
2645
          val = getattr(instance, field, None)
2646
          if val is not None:
2647
            pass
2648
          elif field in ("hvm_nic_type", "hvm_disk_type",
2649
                         "kernel_path", "initrd_path"):
2650
            val = "default"
2651
          else:
2652
            val = "-"
2653
        else:
2654
          raise errors.ParameterError(field)
2655
        iout.append(val)
2656
      output.append(iout)
2657

    
2658
    return output
2659

    
2660

    
2661
class LUFailoverInstance(LogicalUnit):
2662
  """Failover an instance.
2663

2664
  """
2665
  HPATH = "instance-failover"
2666
  HTYPE = constants.HTYPE_INSTANCE
2667
  _OP_REQP = ["instance_name", "ignore_consistency"]
2668
  REQ_BGL = False
2669

    
2670
  def ExpandNames(self):
2671
    self._ExpandAndLockInstance()
2672
    self.needed_locks[locking.LEVEL_NODE] = []
2673
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2674

    
2675
  def DeclareLocks(self, level):
2676
    if level == locking.LEVEL_NODE:
2677
      self._LockInstancesNodes()
2678

    
2679
  def BuildHooksEnv(self):
2680
    """Build hooks env.
2681

2682
    This runs on master, primary and secondary nodes of the instance.
2683

2684
    """
2685
    env = {
2686
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2687
      }
2688
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2689
    nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2690
    return env, nl, nl
2691

    
2692
  def CheckPrereq(self):
2693
    """Check prerequisites.
2694

2695
    This checks that the instance is in the cluster.
2696

2697
    """
2698
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2699
    assert self.instance is not None, \
2700
      "Cannot retrieve locked instance %s" % self.op.instance_name
2701

    
2702
    if instance.disk_template not in constants.DTS_NET_MIRROR:
2703
      raise errors.OpPrereqError("Instance's disk layout is not"
2704
                                 " network mirrored, cannot failover.")
2705

    
2706
    secondary_nodes = instance.secondary_nodes
2707
    if not secondary_nodes:
2708
      raise errors.ProgrammerError("no secondary node but using "
2709
                                   "a mirrored disk template")
2710

    
2711
    target_node = secondary_nodes[0]
2712
    # check memory requirements on the secondary node
2713
    _CheckNodeFreeMemory(self.cfg, target_node, "failing over instance %s" %
2714
                         instance.name, instance.memory)
2715

    
2716
    # check bridge existance
2717
    brlist = [nic.bridge for nic in instance.nics]
2718
    if not rpc.call_bridges_exist(target_node, brlist):
2719
      raise errors.OpPrereqError("One or more target bridges %s does not"
2720
                                 " exist on destination node '%s'" %
2721
                                 (brlist, target_node))
2722

    
2723
  def Exec(self, feedback_fn):
2724
    """Failover an instance.
2725

2726
    The failover is done by shutting it down on its present node and
2727
    starting it on the secondary.
2728

2729
    """
2730
    instance = self.instance
2731

    
2732
    source_node = instance.primary_node
2733
    target_node = instance.secondary_nodes[0]
2734

    
2735
    feedback_fn("* checking disk consistency between source and target")
2736
    for dev in instance.disks:
2737
      # for drbd, these are drbd over lvm
2738
      if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2739
        if instance.status == "up" and not self.op.ignore_consistency:
2740
          raise errors.OpExecError("Disk %s is degraded on target node,"
2741
                                   " aborting failover." % dev.iv_name)
2742

    
2743
    feedback_fn("* shutting down instance on source node")
2744
    logger.Info("Shutting down instance %s on node %s" %
2745
                (instance.name, source_node))
2746

    
2747
    if not rpc.call_instance_shutdown(source_node, instance):
2748
      if self.op.ignore_consistency:
2749
        logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2750
                     " anyway. Please make sure node %s is down"  %
2751
                     (instance.name, source_node, source_node))
2752
      else:
2753
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2754
                                 (instance.name, source_node))
2755

    
2756
    feedback_fn("* deactivating the instance's disks on source node")
2757
    if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2758
      raise errors.OpExecError("Can't shut down the instance's disks.")
2759

    
2760
    instance.primary_node = target_node
2761
    # distribute new instance config to the other nodes
2762
    self.cfg.Update(instance)
2763

    
2764
    # Only start the instance if it's marked as up
2765
    if instance.status == "up":
2766
      feedback_fn("* activating the instance's disks on target node")
2767
      logger.Info("Starting instance %s on node %s" %
2768
                  (instance.name, target_node))
2769

    
2770
      disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2771
                                               ignore_secondaries=True)
2772
      if not disks_ok:
2773
        _ShutdownInstanceDisks(instance, self.cfg)
2774
        raise errors.OpExecError("Can't activate the instance's disks")
2775

    
2776
      feedback_fn("* starting the instance on the target node")
2777
      if not rpc.call_instance_start(target_node, instance, None):
2778
        _ShutdownInstanceDisks(instance, self.cfg)
2779
        raise errors.OpExecError("Could not start instance %s on node %s." %
2780
                                 (instance.name, target_node))
2781

    
2782

    
2783
def _CreateBlockDevOnPrimary(cfg, node, instance, device, info):
2784
  """Create a tree of block devices on the primary node.
2785

2786
  This always creates all devices.
2787

2788
  """
2789
  if device.children:
2790
    for child in device.children:
2791
      if not _CreateBlockDevOnPrimary(cfg, node, instance, child, info):
2792
        return False
2793

    
2794
  cfg.SetDiskID(device, node)
2795
  new_id = rpc.call_blockdev_create(node, device, device.size,
2796
                                    instance.name, True, info)
2797
  if not new_id:
2798
    return False
2799
  if device.physical_id is None:
2800
    device.physical_id = new_id
2801
  return True
2802

    
2803

    
2804
def _CreateBlockDevOnSecondary(cfg, node, instance, device, force, info):
2805
  """Create a tree of block devices on a secondary node.
2806

2807
  If this device type has to be created on secondaries, create it and
2808
  all its children.
2809

2810
  If not, just recurse to children keeping the same 'force' value.
2811

2812
  """
2813
  if device.CreateOnSecondary():
2814
    force = True
2815
  if device.children:
2816
    for child in device.children:
2817
      if not _CreateBlockDevOnSecondary(cfg, node, instance,
2818
                                        child, force, info):
2819
        return False
2820

    
2821
  if not force:
2822
    return True
2823
  cfg.SetDiskID(device, node)
2824
  new_id = rpc.call_blockdev_create(node, device, device.size,
2825
                                    instance.name, False, info)
2826
  if not new_id:
2827
    return False
2828
  if device.physical_id is None:
2829
    device.physical_id = new_id
2830
  return True
2831

    
2832

    
2833
def _GenerateUniqueNames(cfg, exts):
2834
  """Generate a suitable LV name.
2835

2836
  This will generate a logical volume name for the given instance.
2837

2838
  """
2839
  results = []
2840
  for val in exts:
2841
    new_id = cfg.GenerateUniqueID()
2842
    results.append("%s%s" % (new_id, val))
2843
  return results
2844

    
2845

    
2846
def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name):
2847
  """Generate a drbd8 device complete with its children.
2848

2849
  """
2850
  port = cfg.AllocatePort()
2851
  vgname = cfg.GetVGName()
2852
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2853
                          logical_id=(vgname, names[0]))
2854
  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2855
                          logical_id=(vgname, names[1]))
2856
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
2857
                          logical_id = (primary, secondary, port),
2858
                          children = [dev_data, dev_meta],
2859
                          iv_name=iv_name)
2860
  return drbd_dev
2861

    
2862

    
2863
def _GenerateDiskTemplate(cfg, template_name,
2864
                          instance_name, primary_node,
2865
                          secondary_nodes, disk_sz, swap_sz,
2866
                          file_storage_dir, file_driver):
2867
  """Generate the entire disk layout for a given template type.
2868

2869
  """
2870
  #TODO: compute space requirements
2871

    
2872
  vgname = cfg.GetVGName()
2873
  if template_name == constants.DT_DISKLESS:
2874
    disks = []
2875
  elif template_name == constants.DT_PLAIN:
2876
    if len(secondary_nodes) != 0:
2877
      raise errors.ProgrammerError("Wrong template configuration")
2878

    
2879
    names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
2880
    sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2881
                           logical_id=(vgname, names[0]),
2882
                           iv_name = "sda")
2883
    sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2884
                           logical_id=(vgname, names[1]),
2885
                           iv_name = "sdb")
2886
    disks = [sda_dev, sdb_dev]
2887
  elif template_name == constants.DT_DRBD8:
2888
    if len(secondary_nodes) != 1:
2889
      raise errors.ProgrammerError("Wrong template configuration")
2890
    remote_node = secondary_nodes[0]
2891
    names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2892
                                       ".sdb_data", ".sdb_meta"])
2893
    drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2894
                                         disk_sz, names[0:2], "sda")
2895
    drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2896
                                         swap_sz, names[2:4], "sdb")
2897
    disks = [drbd_sda_dev, drbd_sdb_dev]
2898
  elif template_name == constants.DT_FILE:
2899
    if len(secondary_nodes) != 0:
2900
      raise errors.ProgrammerError("Wrong template configuration")
2901

    
2902
    file_sda_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk_sz,
2903
                                iv_name="sda", logical_id=(file_driver,
2904
                                "%s/sda" % file_storage_dir))
2905
    file_sdb_dev = objects.Disk(dev_type=constants.LD_FILE, size=swap_sz,
2906
                                iv_name="sdb", logical_id=(file_driver,
2907
                                "%s/sdb" % file_storage_dir))
2908
    disks = [file_sda_dev, file_sdb_dev]
2909
  else:
2910
    raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
2911
  return disks
2912

    
2913

    
2914
def _GetInstanceInfoText(instance):
2915
  """Compute that text that should be added to the disk's metadata.
2916

2917
  """
2918
  return "originstname+%s" % instance.name
2919

    
2920

    
2921
def _CreateDisks(cfg, instance):
2922
  """Create all disks for an instance.
2923

2924
  This abstracts away some work from AddInstance.
2925

2926
  Args:
2927
    instance: the instance object
2928

2929
  Returns:
2930
    True or False showing the success of the creation process
2931

2932
  """
2933
  info = _GetInstanceInfoText(instance)
2934

    
2935
  if instance.disk_template == constants.DT_FILE:
2936
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
2937
    result = rpc.call_file_storage_dir_create(instance.primary_node,
2938
                                              file_storage_dir)
2939

    
2940
    if not result:
2941
      logger.Error("Could not connect to node '%s'" % instance.primary_node)
2942
      return False
2943

    
2944
    if not result[0]:
2945
      logger.Error("failed to create directory '%s'" % file_storage_dir)
2946
      return False
2947

    
2948
  for device in instance.disks:
2949
    logger.Info("creating volume %s for instance %s" %
2950
                (device.iv_name, instance.name))
2951
    #HARDCODE
2952
    for secondary_node in instance.secondary_nodes:
2953
      if not _CreateBlockDevOnSecondary(cfg, secondary_node, instance,
2954
                                        device, False, info):
2955
        logger.Error("failed to create volume %s (%s) on secondary node %s!" %
2956
                     (device.iv_name, device, secondary_node))
2957
        return False
2958
    #HARDCODE
2959
    if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
2960
                                    instance, device, info):
2961
      logger.Error("failed to create volume %s on primary!" %
2962
                   device.iv_name)
2963
      return False
2964

    
2965
  return True
2966

    
2967

    
2968
def _RemoveDisks(instance, cfg):
2969
  """Remove all disks for an instance.
2970

2971
  This abstracts away some work from `AddInstance()` and
2972
  `RemoveInstance()`. Note that in case some of the devices couldn't
2973
  be removed, the removal will continue with the other ones (compare
2974
  with `_CreateDisks()`).
2975

2976
  Args:
2977
    instance: the instance object
2978

2979
  Returns:
2980
    True or False showing the success of the removal proces
2981

2982
  """
2983
  logger.Info("removing block devices for instance %s" % instance.name)
2984

    
2985
  result = True
2986
  for device in instance.disks:
2987
    for node, disk in device.ComputeNodeTree(instance.primary_node):
2988
      cfg.SetDiskID(disk, node)
2989
      if not rpc.call_blockdev_remove(node, disk):
2990
        logger.Error("could not remove block device %s on node %s,"
2991
                     " continuing anyway" %
2992
                     (device.iv_name, node))
2993
        result = False
2994

    
2995
  if instance.disk_template == constants.DT_FILE:
2996
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
2997
    if not rpc.call_file_storage_dir_remove(instance.primary_node,
2998
                                            file_storage_dir):
2999
      logger.Error("could not remove directory '%s'" % file_storage_dir)
3000
      result = False
3001

    
3002
  return result
3003

    
3004

    
3005
def _ComputeDiskSize(disk_template, disk_size, swap_size):
3006
  """Compute disk size requirements in the volume group
3007

3008
  This is currently hard-coded for the two-drive layout.
3009

3010
  """
3011
  # Required free disk space as a function of disk and swap space
3012
  req_size_dict = {
3013
    constants.DT_DISKLESS: None,
3014
    constants.DT_PLAIN: disk_size + swap_size,
3015
    # 256 MB are added for drbd metadata, 128MB for each drbd device
3016
    constants.DT_DRBD8: disk_size + swap_size + 256,
3017
    constants.DT_FILE: None,
3018
  }
3019

    
3020
  if disk_template not in req_size_dict:
3021
    raise errors.ProgrammerError("Disk template '%s' size requirement"
3022
                                 " is unknown" %  disk_template)
3023

    
3024
  return req_size_dict[disk_template]
3025

    
3026

    
3027
class LUCreateInstance(LogicalUnit):
3028
  """Create an instance.
3029

3030
  """
3031
  HPATH = "instance-add"
3032
  HTYPE = constants.HTYPE_INSTANCE
3033
  _OP_REQP = ["instance_name", "mem_size", "disk_size",
3034
              "disk_template", "swap_size", "mode", "start", "vcpus",
3035
              "wait_for_sync", "ip_check", "mac"]
3036

    
3037
  def _RunAllocator(self):
3038
    """Run the allocator based on input opcode.
3039

3040
    """
3041
    disks = [{"size": self.op.disk_size, "mode": "w"},
3042
             {"size": self.op.swap_size, "mode": "w"}]
3043
    nics = [{"mac": self.op.mac, "ip": getattr(self.op, "ip", None),
3044
             "bridge": self.op.bridge}]
3045
    ial = IAllocator(self.cfg, self.sstore,
3046
                     mode=constants.IALLOCATOR_MODE_ALLOC,
3047
                     name=self.op.instance_name,
3048
                     disk_template=self.op.disk_template,
3049
                     tags=[],
3050
                     os=self.op.os_type,
3051
                     vcpus=self.op.vcpus,
3052
                     mem_size=self.op.mem_size,
3053
                     disks=disks,
3054
                     nics=nics,
3055
                     )
3056

    
3057
    ial.Run(self.op.iallocator)
3058

    
3059
    if not ial.success:
3060
      raise errors.OpPrereqError("Can't compute nodes using"
3061
                                 " iallocator '%s': %s" % (self.op.iallocator,
3062
                                                           ial.info))
3063
    if len(ial.nodes) != ial.required_nodes:
3064
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3065
                                 " of nodes (%s), required %s" %
3066
                                 (len(ial.nodes), ial.required_nodes))
3067
    self.op.pnode = ial.nodes[0]
3068
    logger.ToStdout("Selected nodes for the instance: %s" %
3069
                    (", ".join(ial.nodes),))
3070
    logger.Info("Selected nodes for instance %s via iallocator %s: %s" %
3071
                (self.op.instance_name, self.op.iallocator, ial.nodes))
3072
    if ial.required_nodes == 2:
3073
      self.op.snode = ial.nodes[1]
3074

    
3075
  def BuildHooksEnv(self):
3076
    """Build hooks env.
3077

3078
    This runs on master, primary and secondary nodes of the instance.
3079

3080
    """
3081
    env = {
3082
      "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
3083
      "INSTANCE_DISK_SIZE": self.op.disk_size,
3084
      "INSTANCE_SWAP_SIZE": self.op.swap_size,
3085
      "INSTANCE_ADD_MODE": self.op.mode,
3086
      }
3087
    if self.op.mode == constants.INSTANCE_IMPORT:
3088
      env["INSTANCE_SRC_NODE"] = self.op.src_node
3089
      env["INSTANCE_SRC_PATH"] = self.op.src_path
3090
      env["INSTANCE_SRC_IMAGE"] = self.src_image
3091

    
3092
    env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
3093
      primary_node=self.op.pnode,
3094
      secondary_nodes=self.secondaries,
3095
      status=self.instance_status,
3096
      os_type=self.op.os_type,
3097
      memory=self.op.mem_size,
3098
      vcpus=self.op.vcpus,
3099
      nics=[(self.inst_ip, self.op.bridge, self.op.mac)],
3100
    ))
3101

    
3102
    nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
3103
          self.secondaries)
3104
    return env, nl, nl
3105

    
3106

    
3107
  def CheckPrereq(self):
3108
    """Check prerequisites.
3109

3110
    """
3111
    # set optional parameters to none if they don't exist
3112
    for attr in ["kernel_path", "initrd_path", "hvm_boot_order", "pnode",
3113
                 "iallocator", "hvm_acpi", "hvm_pae", "hvm_cdrom_image_path",
3114
                 "hvm_nic_type", "hvm_disk_type", "vnc_bind_address"]:
3115
      if not hasattr(self.op, attr):
3116
        setattr(self.op, attr, None)
3117

    
3118
    if self.op.mode not in (constants.INSTANCE_CREATE,
3119
                            constants.INSTANCE_IMPORT):
3120
      raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
3121
                                 self.op.mode)
3122

    
3123
    if (not self.cfg.GetVGName() and
3124
        self.op.disk_template not in constants.DTS_NOT_LVM):
3125
      raise errors.OpPrereqError("Cluster does not support lvm-based"
3126
                                 " instances")
3127

    
3128
    if self.op.mode == constants.INSTANCE_IMPORT:
3129
      src_node = getattr(self.op, "src_node", None)
3130
      src_path = getattr(self.op, "src_path", None)
3131
      if src_node is None or src_path is None:
3132
        raise errors.OpPrereqError("Importing an instance requires source"
3133
                                   " node and path options")
3134
      src_node_full = self.cfg.ExpandNodeName(src_node)
3135
      if src_node_full is None:
3136
        raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
3137
      self.op.src_node = src_node = src_node_full
3138

    
3139
      if not os.path.isabs(src_path):
3140
        raise errors.OpPrereqError("The source path must be absolute")
3141

    
3142
      export_info = rpc.call_export_info(src_node, src_path)
3143

    
3144
      if not export_info:
3145
        raise errors.OpPrereqError("No export found in dir %s" % src_path)
3146

    
3147
      if not export_info.has_section(constants.INISECT_EXP):
3148
        raise errors.ProgrammerError("Corrupted export config")
3149

    
3150
      ei_version = export_info.get(constants.INISECT_EXP, 'version')
3151
      if (int(ei_version) != constants.EXPORT_VERSION):
3152
        raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3153
                                   (ei_version, constants.EXPORT_VERSION))
3154

    
3155
      if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
3156
        raise errors.OpPrereqError("Can't import instance with more than"
3157
                                   " one data disk")
3158

    
3159
      # FIXME: are the old os-es, disk sizes, etc. useful?
3160
      self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3161
      diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
3162
                                                         'disk0_dump'))
3163
      self.src_image = diskimage
3164
    else: # INSTANCE_CREATE
3165
      if getattr(self.op, "os_type", None) is None:
3166
        raise errors.OpPrereqError("No guest OS specified")
3167

    
3168
    #### instance parameters check
3169

    
3170
    # disk template and mirror node verification
3171
    if self.op.disk_template not in constants.DISK_TEMPLATES:
3172
      raise errors.OpPrereqError("Invalid disk template name")
3173

    
3174
    # instance name verification
3175
    hostname1 = utils.HostInfo(self.op.instance_name)
3176

    
3177
    self.op.instance_name = instance_name = hostname1.name
3178
    instance_list = self.cfg.GetInstanceList()
3179
    if instance_name in instance_list:
3180
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3181
                                 instance_name)
3182

    
3183
    # ip validity checks
3184
    ip = getattr(self.op, "ip", None)
3185
    if ip is None or ip.lower() == "none":
3186
      inst_ip = None
3187
    elif ip.lower() == "auto":
3188
      inst_ip = hostname1.ip
3189
    else:
3190
      if not utils.IsValidIP(ip):
3191
        raise errors.OpPrereqError("given IP address '%s' doesn't look"
3192
                                   " like a valid IP" % ip)
3193
      inst_ip = ip
3194
    self.inst_ip = self.op.ip = inst_ip
3195

    
3196
    if self.op.start and not self.op.ip_check:
3197
      raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3198
                                 " adding an instance in start mode")
3199

    
3200
    if self.op.ip_check:
3201
      if utils.TcpPing(hostname1.ip, constants.DEFAULT_NODED_PORT):
3202
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
3203
                                   (hostname1.ip, instance_name))
3204

    
3205
    # MAC address verification
3206
    if self.op.mac != "auto":
3207
      if not utils.IsValidMac(self.op.mac.lower()):
3208
        raise errors.OpPrereqError("invalid MAC address specified: %s" %
3209
                                   self.op.mac)
3210

    
3211
    # bridge verification
3212
    bridge = getattr(self.op, "bridge", None)
3213
    if bridge is None:
3214
      self.op.bridge = self.cfg.GetDefBridge()
3215
    else:
3216
      self.op.bridge = bridge
3217

    
3218
    # boot order verification
3219
    if self.op.hvm_boot_order is not None:
3220
      if len(self.op.hvm_boot_order.strip("acdn")) != 0:
3221
        raise errors.OpPrereqError("invalid boot order specified,"
3222
                                   " must be one or more of [acdn]")
3223
    # file storage checks
3224
    if (self.op.file_driver and
3225
        not self.op.file_driver in constants.FILE_DRIVER):
3226
      raise errors.OpPrereqError("Invalid file driver name '%s'" %
3227
                                 self.op.file_driver)
3228

    
3229
    if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
3230
      raise errors.OpPrereqError("File storage directory not a relative"
3231
                                 " path")
3232
    #### allocator run
3233

    
3234
    if [self.op.iallocator, self.op.pnode].count(None) != 1:
3235
      raise errors.OpPrereqError("One and only one of iallocator and primary"
3236
                                 " node must be given")
3237

    
3238
    if self.op.iallocator is not None:
3239
      self._RunAllocator()
3240

    
3241
    #### node related checks
3242

    
3243
    # check primary node
3244
    pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
3245
    if pnode is None:
3246
      raise errors.OpPrereqError("Primary node '%s' is unknown" %
3247
                                 self.op.pnode)
3248
    self.op.pnode = pnode.name
3249
    self.pnode = pnode
3250
    self.secondaries = []
3251

    
3252
    # mirror node verification
3253
    if self.op.disk_template in constants.DTS_NET_MIRROR:
3254
      if getattr(self.op, "snode", None) is None:
3255
        raise errors.OpPrereqError("The networked disk templates need"
3256
                                   " a mirror node")
3257

    
3258
      snode_name = self.cfg.ExpandNodeName(self.op.snode)
3259
      if snode_name is None:
3260
        raise errors.OpPrereqError("Unknown secondary node '%s'" %
3261
                                   self.op.snode)
3262
      elif snode_name == pnode.name:
3263
        raise errors.OpPrereqError("The secondary node cannot be"
3264
                                   " the primary node.")
3265
      self.secondaries.append(snode_name)
3266

    
3267
    req_size = _ComputeDiskSize(self.op.disk_template,
3268
                                self.op.disk_size, self.op.swap_size)
3269

    
3270
    # Check lv size requirements
3271
    if req_size is not None:
3272
      nodenames = [pnode.name] + self.secondaries
3273
      nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
3274
      for node in nodenames:
3275
        info = nodeinfo.get(node, None)
3276
        if not info:
3277
          raise errors.OpPrereqError("Cannot get current information"
3278
                                     " from node '%s'" % node)
3279
        vg_free = info.get('vg_free', None)
3280
        if not isinstance(vg_free, int):
3281
          raise errors.OpPrereqError("Can't compute free disk space on"
3282
                                     " node %s" % node)
3283
        if req_size > info['vg_free']:
3284
          raise errors.OpPrereqError("Not enough disk space on target node %s."
3285
                                     " %d MB available, %d MB required" %
3286
                                     (node, info['vg_free'], req_size))
3287

    
3288
    # os verification
3289
    os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
3290
    if not os_obj:
3291
      raise errors.OpPrereqError("OS '%s' not in supported os list for"
3292
                                 " primary node"  % self.op.os_type)
3293

    
3294
    if self.op.kernel_path == constants.VALUE_NONE:
3295
      raise errors.OpPrereqError("Can't set instance kernel to none")
3296

    
3297

    
3298
    # bridge check on primary node
3299
    if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3300
      raise errors.OpPrereqError("target bridge '%s' does not exist on"
3301
                                 " destination node '%s'" %
3302
                                 (self.op.bridge, pnode.name))
3303

    
3304
    # memory check on primary node
3305
    if self.op.start:
3306
      _CheckNodeFreeMemory(self.cfg, self.pnode.name,
3307
                           "creating instance %s" % self.op.instance_name,
3308
                           self.op.mem_size)
3309

    
3310
    # hvm_cdrom_image_path verification
3311
    if self.op.hvm_cdrom_image_path is not None:
3312
      if not os.path.isabs(self.op.hvm_cdrom_image_path):
3313
        raise errors.OpPrereqError("The path to the HVM CDROM image must"
3314
                                   " be an absolute path or None, not %s" %
3315
                                   self.op.hvm_cdrom_image_path)
3316
      if not os.path.isfile(self.op.hvm_cdrom_image_path):
3317
        raise errors.OpPrereqError("The HVM CDROM image must either be a"
3318
                                   " regular file or a symlink pointing to"
3319
                                   " an existing regular file, not %s" %
3320
                                   self.op.hvm_cdrom_image_path)
3321

    
3322
    # vnc_bind_address verification
3323
    if self.op.vnc_bind_address is not None:
3324
      if not utils.IsValidIP(self.op.vnc_bind_address):
3325
        raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
3326
                                   " like a valid IP address" %
3327
                                   self.op.vnc_bind_address)
3328

    
3329
    # Xen HVM device type checks
3330
    if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
3331
      if self.op.hvm_nic_type not in constants.HT_HVM_VALID_NIC_TYPES:
3332
        raise errors.OpPrereqError("Invalid NIC type %s specified for Xen HVM"
3333
                                   " hypervisor" % self.op.hvm_nic_type)
3334
      if self.op.hvm_disk_type not in constants.HT_HVM_VALID_DISK_TYPES:
3335
        raise errors.OpPrereqError("Invalid disk type %s specified for Xen HVM"
3336
                                   " hypervisor" % self.op.hvm_disk_type)
3337

    
3338
    if self.op.start:
3339
      self.instance_status = 'up'
3340
    else:
3341
      self.instance_status = 'down'
3342

    
3343
  def Exec(self, feedback_fn):
3344
    """Create and add the instance to the cluster.
3345

3346
    """
3347
    instance = self.op.instance_name
3348
    pnode_name = self.pnode.name
3349

    
3350
    if self.op.mac == "auto":
3351
      mac_address = self.cfg.GenerateMAC()
3352
    else:
3353
      mac_address = self.op.mac
3354

    
3355
    nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3356
    if self.inst_ip is not None:
3357
      nic.ip = self.inst_ip
3358

    
3359
    ht_kind = self.sstore.GetHypervisorType()
3360
    if ht_kind in constants.HTS_REQ_PORT:
3361
      network_port = self.cfg.AllocatePort()
3362
    else:
3363
      network_port = None
3364

    
3365
    if self.op.vnc_bind_address is None:
3366
      self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
3367

    
3368
    # this is needed because os.path.join does not accept None arguments
3369
    if self.op.file_storage_dir is None:
3370
      string_file_storage_dir = ""
3371
    else:
3372
      string_file_storage_dir = self.op.file_storage_dir
3373

    
3374
    # build the full file storage dir path
3375
    file_storage_dir = os.path.normpath(os.path.join(
3376
                                        self.sstore.GetFileStorageDir(),
3377
                                        string_file_storage_dir, instance))
3378

    
3379

    
3380
    disks = _GenerateDiskTemplate(self.cfg,
3381
                                  self.op.disk_template,
3382
                                  instance, pnode_name,
3383
                                  self.secondaries, self.op.disk_size,
3384
                                  self.op.swap_size,
3385
                                  file_storage_dir,
3386
                                  self.op.file_driver)
3387

    
3388
    iobj = objects.Instance(name=instance, os=self.op.os_type,
3389
                            primary_node=pnode_name,
3390
                            memory=self.op.mem_size,
3391
                            vcpus=self.op.vcpus,
3392
                            nics=[nic], disks=disks,
3393
                            disk_template=self.op.disk_template,
3394
                            status=self.instance_status,
3395
                            network_port=network_port,
3396
                            kernel_path=self.op.kernel_path,
3397
                            initrd_path=self.op.initrd_path,
3398
                            hvm_boot_order=self.op.hvm_boot_order,
3399
                            hvm_acpi=self.op.hvm_acpi,
3400
                            hvm_pae=self.op.hvm_pae,
3401
                            hvm_cdrom_image_path=self.op.hvm_cdrom_image_path,
3402
                            vnc_bind_address=self.op.vnc_bind_address,
3403
                            hvm_nic_type=self.op.hvm_nic_type,
3404
                            hvm_disk_type=self.op.hvm_disk_type,
3405
                            )
3406

    
3407
    feedback_fn("* creating instance disks...")
3408
    if not _CreateDisks(self.cfg, iobj):
3409
      _RemoveDisks(iobj, self.cfg)
3410
      raise errors.OpExecError("Device creation failed, reverting...")
3411

    
3412
    feedback_fn("adding instance %s to cluster config" % instance)
3413

    
3414
    self.cfg.AddInstance(iobj)
3415
    # Add the new instance to the Ganeti Lock Manager
3416
    self.context.glm.add(locking.LEVEL_INSTANCE, instance)
3417

    
3418
    if self.op.wait_for_sync:
3419
      disk_abort = not _WaitForSync(self.cfg, iobj, self.proc)
3420
    elif iobj.disk_template in constants.DTS_NET_MIRROR:
3421
      # make sure the disks are not degraded (still sync-ing is ok)
3422
      time.sleep(15)
3423
      feedback_fn("* checking mirrors status")
3424
      disk_abort = not _WaitForSync(self.cfg, iobj, self.proc, oneshot=True)
3425
    else:
3426
      disk_abort = False
3427

    
3428
    if disk_abort:
3429
      _RemoveDisks(iobj, self.cfg)
3430
      self.cfg.RemoveInstance(iobj.name)
3431
      # Remove the new instance from the Ganeti Lock Manager
3432
      self.context.glm.remove(locking.LEVEL_INSTANCE, iobj.name)
3433
      raise errors.OpExecError("There are some degraded disks for"
3434
                               " this instance")
3435

    
3436
    feedback_fn("creating os for instance %s on node %s" %
3437
                (instance, pnode_name))
3438

    
3439
    if iobj.disk_template != constants.DT_DISKLESS:
3440
      if self.op.mode == constants.INSTANCE_CREATE:
3441
        feedback_fn("* running the instance OS create scripts...")
3442
        if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3443
          raise errors.OpExecError("could not add os for instance %s"
3444
                                   " on node %s" %
3445
                                   (instance, pnode_name))
3446

    
3447
      elif self.op.mode == constants.INSTANCE_IMPORT:
3448
        feedback_fn("* running the instance OS import scripts...")
3449
        src_node = self.op.src_node
3450
        src_image = self.src_image
3451
        if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3452
                                                src_node, src_image):
3453
          raise errors.OpExecError("Could not import os for instance"
3454
                                   " %s on node %s" %
3455
                                   (instance, pnode_name))
3456
      else:
3457
        # also checked in the prereq part
3458
        raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3459
                                     % self.op.mode)
3460

    
3461
    if self.op.start:
3462
      logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3463
      feedback_fn("* starting instance...")
3464
      if not rpc.call_instance_start(pnode_name, iobj, None):
3465
        raise errors.OpExecError("Could not start instance")
3466

    
3467

    
3468
class LUConnectConsole(NoHooksLU):
3469
  """Connect to an instance's console.
3470

3471
  This is somewhat special in that it returns the command line that
3472
  you need to run on the master node in order to connect to the
3473
  console.
3474

3475
  """
3476
  _OP_REQP = ["instance_name"]
3477
  REQ_BGL = False
3478

    
3479
  def ExpandNames(self):
3480
    self._ExpandAndLockInstance()
3481

    
3482
  def CheckPrereq(self):
3483
    """Check prerequisites.
3484

3485
    This checks that the instance is in the cluster.
3486

3487
    """
3488
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3489
    assert self.instance is not None, \
3490
      "Cannot retrieve locked instance %s" % self.op.instance_name
3491

    
3492
  def Exec(self, feedback_fn):
3493
    """Connect to the console of an instance
3494

3495
    """
3496
    instance = self.instance
3497
    node = instance.primary_node
3498

    
3499
    node_insts = rpc.call_instance_list([node])[node]
3500
    if node_insts is False:
3501
      raise errors.OpExecError("Can't connect to node %s." % node)
3502

    
3503
    if instance.name not in node_insts:
3504
      raise errors.OpExecError("Instance %s is not running." % instance.name)
3505

    
3506
    logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3507

    
3508
    hyper = hypervisor.GetHypervisor()
3509
    console_cmd = hyper.GetShellCommandForConsole(instance)
3510

    
3511
    # build ssh cmdline
3512
    return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
3513

    
3514

    
3515
class LUReplaceDisks(LogicalUnit):
3516
  """Replace the disks of an instance.
3517

3518
  """
3519
  HPATH = "mirrors-replace"
3520
  HTYPE = constants.HTYPE_INSTANCE
3521
  _OP_REQP = ["instance_name", "mode", "disks"]
3522

    
3523
  def _RunAllocator(self):
3524
    """Compute a new secondary node using an IAllocator.
3525

3526
    """
3527
    ial = IAllocator(self.cfg, self.sstore,
3528
                     mode=constants.IALLOCATOR_MODE_RELOC,
3529
                     name=self.op.instance_name,
3530
                     relocate_from=[self.sec_node])
3531

    
3532
    ial.Run(self.op.iallocator)
3533

    
3534
    if not ial.success:
3535
      raise errors.OpPrereqError("Can't compute nodes using"
3536
                                 " iallocator '%s': %s" % (self.op.iallocator,
3537
                                                           ial.info))
3538
    if len(ial.nodes) != ial.required_nodes:
3539
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3540
                                 " of nodes (%s), required %s" %
3541
                                 (len(ial.nodes), ial.required_nodes))
3542
    self.op.remote_node = ial.nodes[0]
3543
    logger.ToStdout("Selected new secondary for the instance: %s" %
3544
                    self.op.remote_node)
3545

    
3546
  def BuildHooksEnv(self):
3547
    """Build hooks env.
3548

3549
    This runs on the master, the primary and all the secondaries.
3550

3551
    """
3552
    env = {
3553
      "MODE": self.op.mode,
3554
      "NEW_SECONDARY": self.op.remote_node,
3555
      "OLD_SECONDARY": self.instance.secondary_nodes[0],
3556
      }
3557
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3558
    nl = [
3559
      self.sstore.GetMasterNode(),
3560
      self.instance.primary_node,
3561
      ]
3562
    if self.op.remote_node is not None:
3563
      nl.append(self.op.remote_node)
3564
    return env, nl, nl
3565

    
3566
  def CheckPrereq(self):
3567
    """Check prerequisites.
3568

3569
    This checks that the instance is in the cluster.
3570

3571
    """
3572
    if not hasattr(self.op, "remote_node"):
3573
      self.op.remote_node = None
3574

    
3575
    instance = self.cfg.GetInstanceInfo(
3576
      self.cfg.ExpandInstanceName(self.op.instance_name))
3577
    if instance is None:
3578
      raise errors.OpPrereqError("Instance '%s' not known" %
3579
                                 self.op.instance_name)
3580
    self.instance = instance
3581
    self.op.instance_name = instance.name
3582

    
3583
    if instance.disk_template not in constants.DTS_NET_MIRROR:
3584
      raise errors.OpPrereqError("Instance's disk layout is not"
3585
                                 " network mirrored.")
3586

    
3587
    if len(instance.secondary_nodes) != 1:
3588
      raise errors.OpPrereqError("The instance has a strange layout,"
3589
                                 " expected one secondary but found %d" %
3590
                                 len(instance.secondary_nodes))
3591

    
3592
    self.sec_node = instance.secondary_nodes[0]
3593

    
3594
    ia_name = getattr(self.op, "iallocator", None)
3595
    if ia_name is not None:
3596
      if self.op.remote_node is not None:
3597
        raise errors.OpPrereqError("Give either the iallocator or the new"
3598
                                   " secondary, not both")
3599
      self._RunAllocator()
3600

    
3601
    remote_node = self.op.remote_node
3602
    if remote_node is not None:
3603
      remote_node = self.cfg.ExpandNodeName(remote_node)
3604
      if remote_node is None:
3605
        raise errors.OpPrereqError("Node '%s' not known" %
3606
                                   self.op.remote_node)
3607
      self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3608
    else:
3609
      self.remote_node_info = None
3610
    if remote_node == instance.primary_node:
3611
      raise errors.OpPrereqError("The specified node is the primary node of"
3612
                                 " the instance.")
3613
    elif remote_node == self.sec_node:
3614
      if self.op.mode == constants.REPLACE_DISK_SEC:
3615
        # this is for DRBD8, where we can't execute the same mode of
3616
        # replacement as for drbd7 (no different port allocated)
3617
        raise errors.OpPrereqError("Same secondary given, cannot execute"
3618
                                   " replacement")
3619
    if instance.disk_template == constants.DT_DRBD8:
3620
      if (self.op.mode == constants.REPLACE_DISK_ALL and
3621
          remote_node is not None):
3622
        # switch to replace secondary mode
3623
        self.op.mode = constants.REPLACE_DISK_SEC
3624

    
3625
      if self.op.mode == constants.REPLACE_DISK_ALL:
3626
        raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3627
                                   " secondary disk replacement, not"
3628
                                   " both at once")
3629
      elif self.op.mode == constants.REPLACE_DISK_PRI:
3630
        if remote_node is not None:
3631
          raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3632
                                     " the secondary while doing a primary"
3633
                                     " node disk replacement")
3634
        self.tgt_node = instance.primary_node
3635
        self.oth_node = instance.secondary_nodes[0]
3636
      elif self.op.mode == constants.REPLACE_DISK_SEC:
3637
        self.new_node = remote_node # this can be None, in which case
3638
                                    # we don't change the secondary
3639
        self.tgt_node = instance.secondary_nodes[0]
3640
        self.oth_node = instance.primary_node
3641
      else:
3642
        raise errors.ProgrammerError("Unhandled disk replace mode")
3643

    
3644
    for name in self.op.disks:
3645
      if instance.FindDisk(name) is None:
3646
        raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3647
                                   (name, instance.name))
3648
    self.op.remote_node = remote_node
3649

    
3650
  def _ExecD8DiskOnly(self, feedback_fn):
3651
    """Replace a disk on the primary or secondary for dbrd8.
3652

3653
    The algorithm for replace is quite complicated:
3654
      - for each disk to be replaced:
3655
        - create new LVs on the target node with unique names
3656
        - detach old LVs from the drbd device
3657
        - rename old LVs to name_replaced.<time_t>
3658
        - rename new LVs to old LVs
3659
        - attach the new LVs (with the old names now) to the drbd device
3660
      - wait for sync across all devices
3661
      - for each modified disk:
3662
        - remove old LVs (which have the name name_replaces.<time_t>)
3663

3664
    Failures are not very well handled.
3665

3666
    """
3667
    steps_total = 6
3668
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3669
    instance = self.instance
3670
    iv_names = {}
3671
    vgname = self.cfg.GetVGName()
3672
    # start of work
3673
    cfg = self.cfg
3674
    tgt_node = self.tgt_node
3675
    oth_node = self.oth_node
3676

    
3677
    # Step: check device activation
3678
    self.proc.LogStep(1, steps_total, "check device existence")
3679
    info("checking volume groups")
3680
    my_vg = cfg.GetVGName()
3681
    results = rpc.call_vg_list([oth_node, tgt_node])
3682
    if not results:
3683
      raise errors.OpExecError("Can't list volume groups on the nodes")
3684
    for node in oth_node, tgt_node:
3685
      res = results.get(node, False)
3686
      if not res or my_vg not in res:
3687
        raise errors.OpExecError("Volume group '%s' not found on %s" %
3688
                                 (my_vg, node))
3689
    for dev in instance.disks:
3690
      if not dev.iv_name in self.op.disks:
3691
        continue
3692
      for node in tgt_node, oth_node:
3693
        info("checking %s on %s" % (dev.iv_name, node))
3694
        cfg.SetDiskID(dev, node)
3695
        if not rpc.call_blockdev_find(node, dev):
3696
          raise errors.OpExecError("Can't find device %s on node %s" %
3697
                                   (dev.iv_name, node))
3698

    
3699
    # Step: check other node consistency
3700
    self.proc.LogStep(2, steps_total, "check peer consistency")
3701
    for dev in instance.disks:
3702
      if not dev.iv_name in self.op.disks:
3703
        continue
3704
      info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3705
      if not _CheckDiskConsistency(self.cfg, dev, oth_node,
3706
                                   oth_node==instance.primary_node):
3707
        raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3708
                                 " to replace disks on this node (%s)" %
3709
                                 (oth_node, tgt_node))
3710

    
3711
    # Step: create new storage
3712
    self.proc.LogStep(3, steps_total, "allocate new storage")
3713
    for dev in instance.disks:
3714
      if not dev.iv_name in self.op.disks:
3715
        continue
3716
      size = dev.size
3717
      cfg.SetDiskID(dev, tgt_node)
3718
      lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3719
      names = _GenerateUniqueNames(cfg, lv_names)
3720
      lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3721
                             logical_id=(vgname, names[0]))
3722
      lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3723
                             logical_id=(vgname, names[1]))
3724
      new_lvs = [lv_data, lv_meta]
3725
      old_lvs = dev.children
3726
      iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3727
      info("creating new local storage on %s for %s" %
3728
           (tgt_node, dev.iv_name))
3729
      # since we *always* want to create this LV, we use the
3730
      # _Create...OnPrimary (which forces the creation), even if we
3731
      # are talking about the secondary node
3732
      for new_lv in new_lvs:
3733
        if not _CreateBlockDevOnPrimary(cfg, tgt_node, instance, new_lv,
3734
                                        _GetInstanceInfoText(instance)):
3735
          raise errors.OpExecError("Failed to create new LV named '%s' on"
3736
                                   " node '%s'" %
3737
                                   (new_lv.logical_id[1], tgt_node))
3738

    
3739
    # Step: for each lv, detach+rename*2+attach
3740
    self.proc.LogStep(4, steps_total, "change drbd configuration")
3741
    for dev, old_lvs, new_lvs in iv_names.itervalues():
3742
      info("detaching %s drbd from local storage" % dev.iv_name)
3743
      if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
3744
        raise errors.OpExecError("Can't detach drbd from local storage on node"
3745
                                 " %s for device %s" % (tgt_node, dev.iv_name))
3746
      #dev.children = []
3747
      #cfg.Update(instance)
3748

    
3749
      # ok, we created the new LVs, so now we know we have the needed
3750
      # storage; as such, we proceed on the target node to rename
3751
      # old_lv to _old, and new_lv to old_lv; note that we rename LVs
3752
      # using the assumption that logical_id == physical_id (which in
3753
      # turn is the unique_id on that node)
3754

    
3755
      # FIXME(iustin): use a better name for the replaced LVs
3756
      temp_suffix = int(time.time())
3757
      ren_fn = lambda d, suff: (d.physical_id[0],
3758
                                d.physical_id[1] + "_replaced-%s" % suff)
3759
      # build the rename list based on what LVs exist on the node
3760
      rlist = []
3761
      for to_ren in old_lvs:
3762
        find_res = rpc.call_blockdev_find(tgt_node, to_ren)
3763
        if find_res is not None: # device exists
3764
          rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
3765

    
3766
      info("renaming the old LVs on the target node")
3767
      if not rpc.call_blockdev_rename(tgt_node, rlist):
3768
        raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
3769
      # now we rename the new LVs to the old LVs
3770
      info("renaming the new LVs on the target node")
3771
      rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
3772
      if not rpc.call_blockdev_rename(tgt_node, rlist):
3773
        raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
3774

    
3775
      for old, new in zip(old_lvs, new_lvs):
3776
        new.logical_id = old.logical_id
3777
        cfg.SetDiskID(new, tgt_node)
3778

    
3779
      for disk in old_lvs:
3780
        disk.logical_id = ren_fn(disk, temp_suffix)
3781
        cfg.SetDiskID(disk, tgt_node)
3782

    
3783
      # now that the new lvs have the old name, we can add them to the device
3784
      info("adding new mirror component on %s" % tgt_node)
3785
      if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
3786
        for new_lv in new_lvs:
3787
          if not rpc.call_blockdev_remove(tgt_node, new_lv):
3788
            warning("Can't rollback device %s", hint="manually cleanup unused"
3789
                    " logical volumes")
3790
        raise errors.OpExecError("Can't add local storage to drbd")
3791

    
3792
      dev.children = new_lvs
3793
      cfg.Update(instance)
3794

    
3795
    # Step: wait for sync
3796

    
3797
    # this can fail as the old devices are degraded and _WaitForSync
3798
    # does a combined result over all disks, so we don't check its
3799
    # return value
3800
    self.proc.LogStep(5, steps_total, "sync devices")
3801
    _WaitForSync(cfg, instance, self.proc, unlock=True)
3802

    
3803
    # so check manually all the devices
3804
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3805
      cfg.SetDiskID(dev, instance.primary_node)
3806
      is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3807
      if is_degr:
3808
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
3809

    
3810
    # Step: remove old storage
3811
    self.proc.LogStep(6, steps_total, "removing old storage")
3812
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3813
      info("remove logical volumes for %s" % name)
3814
      for lv in old_lvs:
3815
        cfg.SetDiskID(lv, tgt_node)
3816
        if not rpc.call_blockdev_remove(tgt_node, lv):
3817
          warning("Can't remove old LV", hint="manually remove unused LVs")
3818
          continue
3819

    
3820
  def _ExecD8Secondary(self, feedback_fn):
3821
    """Replace the secondary node for drbd8.
3822

3823
    The algorithm for replace is quite complicated:
3824
      - for all disks of the instance:
3825
        - create new LVs on the new node with same names
3826
        - shutdown the drbd device on the old secondary
3827
        - disconnect the drbd network on the primary
3828
        - create the drbd device on the new secondary
3829
        - network attach the drbd on the primary, using an artifice:
3830
          the drbd code for Attach() will connect to the network if it
3831
          finds a device which is connected to the good local disks but
3832
          not network enabled
3833
      - wait for sync across all devices
3834
      - remove all disks from the old secondary
3835

3836
    Failures are not very well handled.
3837

3838
    """
3839
    steps_total = 6
3840
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3841
    instance = self.instance
3842
    iv_names = {}
3843
    vgname = self.cfg.GetVGName()
3844
    # start of work
3845
    cfg = self.cfg
3846
    old_node = self.tgt_node
3847
    new_node = self.new_node
3848
    pri_node = instance.primary_node
3849

    
3850
    # Step: check device activation
3851
    self.proc.LogStep(1, steps_total, "check device existence")
3852
    info("checking volume groups")
3853
    my_vg = cfg.GetVGName()
3854
    results = rpc.call_vg_list([pri_node, new_node])
3855
    if not results:
3856
      raise errors.OpExecError("Can't list volume groups on the nodes")
3857
    for node in pri_node, new_node:
3858
      res = results.get(node, False)
3859
      if not res or my_vg not in res:
3860
        raise errors.OpExecError("Volume group '%s' not found on %s" %
3861
                                 (my_vg, node))
3862
    for dev in instance.disks:
3863
      if not dev.iv_name in self.op.disks:
3864
        continue
3865
      info("checking %s on %s" % (dev.iv_name, pri_node))
3866
      cfg.SetDiskID(dev, pri_node)
3867
      if not rpc.call_blockdev_find(pri_node, dev):
3868
        raise errors.OpExecError("Can't find device %s on node %s" %
3869
                                 (dev.iv_name, pri_node))
3870

    
3871
    # Step: check other node consistency
3872
    self.proc.LogStep(2, steps_total, "check peer consistency")
3873
    for dev in instance.disks:
3874
      if not dev.iv_name in self.op.disks:
3875
        continue
3876
      info("checking %s consistency on %s" % (dev.iv_name, pri_node))
3877
      if not _CheckDiskConsistency(self.cfg, dev, pri_node, True, ldisk=True):
3878
        raise errors.OpExecError("Primary node (%s) has degraded storage,"
3879
                                 " unsafe to replace the secondary" %
3880
                                 pri_node)
3881

    
3882
    # Step: create new storage
3883
    self.proc.LogStep(3, steps_total, "allocate new storage")
3884
    for dev in instance.disks:
3885
      size = dev.size
3886
      info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
3887
      # since we *always* want to create this LV, we use the
3888
      # _Create...OnPrimary (which forces the creation), even if we
3889
      # are talking about the secondary node
3890
      for new_lv in dev.children:
3891
        if not _CreateBlockDevOnPrimary(cfg, new_node, instance, new_lv,
3892
                                        _GetInstanceInfoText(instance)):
3893
          raise errors.OpExecError("Failed to create new LV named '%s' on"
3894
                                   " node '%s'" %
3895
                                   (new_lv.logical_id[1], new_node))
3896

    
3897
      iv_names[dev.iv_name] = (dev, dev.children)
3898

    
3899
    self.proc.LogStep(4, steps_total, "changing drbd configuration")
3900
    for dev in instance.disks:
3901
      size = dev.size
3902
      info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
3903
      # create new devices on new_node
3904
      new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
3905
                              logical_id=(pri_node, new_node,
3906
                                          dev.logical_id[2]),
3907
                              children=dev.children)
3908
      if not _CreateBlockDevOnSecondary(cfg, new_node, instance,
3909
                                        new_drbd, False,
3910
                                      _GetInstanceInfoText(instance)):
3911
        raise errors.OpExecError("Failed to create new DRBD on"
3912
                                 " node '%s'" % new_node)
3913

    
3914
    for dev in instance.disks:
3915
      # we have new devices, shutdown the drbd on the old secondary
3916
      info("shutting down drbd for %s on old node" % dev.iv_name)
3917
      cfg.SetDiskID(dev, old_node)
3918
      if not rpc.call_blockdev_shutdown(old_node, dev):
3919
        warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
3920
                hint="Please cleanup this device manually as soon as possible")
3921

    
3922
    info("detaching primary drbds from the network (=> standalone)")
3923
    done = 0
3924
    for dev in instance.disks:
3925
      cfg.SetDiskID(dev, pri_node)
3926
      # set the physical (unique in bdev terms) id to None, meaning
3927
      # detach from network
3928
      dev.physical_id = (None,) * len(dev.physical_id)
3929
      # and 'find' the device, which will 'fix' it to match the
3930
      # standalone state
3931
      if rpc.call_blockdev_find(pri_node, dev):
3932
        done += 1
3933
      else:
3934
        warning("Failed to detach drbd %s from network, unusual case" %
3935
                dev.iv_name)
3936

    
3937
    if not done:
3938
      # no detaches succeeded (very unlikely)
3939
      raise errors.OpExecError("Can't detach at least one DRBD from old node")
3940

    
3941
    # if we managed to detach at least one, we update all the disks of
3942
    # the instance to point to the new secondary
3943
    info("updating instance configuration")
3944
    for dev in instance.disks:
3945
      dev.logical_id = (pri_node, new_node) + dev.logical_id[2:]
3946
      cfg.SetDiskID(dev, pri_node)
3947
    cfg.Update(instance)
3948

    
3949
    # and now perform the drbd attach
3950
    info("attaching primary drbds to new secondary (standalone => connected)")
3951
    failures = []
3952
    for dev in instance.disks:
3953
      info("attaching primary drbd for %s to new secondary node" % dev.iv_name)
3954
      # since the attach is smart, it's enough to 'find' the device,
3955
      # it will automatically activate the network, if the physical_id
3956
      # is correct
3957
      cfg.SetDiskID(dev, pri_node)
3958
      if not rpc.call_blockdev_find(pri_node, dev):
3959
        warning("can't attach drbd %s to new secondary!" % dev.iv_name,
3960
                "please do a gnt-instance info to see the status of disks")
3961

    
3962
    # this can fail as the old devices are degraded and _WaitForSync
3963
    # does a combined result over all disks, so we don't check its
3964
    # return value
3965
    self.proc.LogStep(5, steps_total, "sync devices")
3966
    _WaitForSync(cfg, instance, self.proc, unlock=True)
3967

    
3968
    # so check manually all the devices
3969
    for name, (dev, old_lvs) in iv_names.iteritems():
3970</