Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ ca2a79e1

History | View | Annotate | Download (183.4 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0613,W0201
25

    
26
import os
27
import os.path
28
import sha
29
import time
30
import tempfile
31
import re
32
import platform
33

    
34
from ganeti import rpc
35
from ganeti import ssh
36
from ganeti import logger
37
from ganeti import utils
38
from ganeti import errors
39
from ganeti import hypervisor
40
from ganeti import locking
41
from ganeti import constants
42
from ganeti import objects
43
from ganeti import opcodes
44
from ganeti import serializer
45

    
46

    
47
class LogicalUnit(object):
48
  """Logical Unit base class.
49

50
  Subclasses must follow these rules:
51
    - implement ExpandNames
52
    - implement CheckPrereq
53
    - implement Exec
54
    - implement BuildHooksEnv
55
    - redefine HPATH and HTYPE
56
    - optionally redefine their run requirements:
57
        REQ_MASTER: the LU needs to run on the master node
58
        REQ_WSSTORE: the LU needs a writable SimpleStore
59
        REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
60

61
  Note that all commands require root permissions.
62

63
  """
64
  HPATH = None
65
  HTYPE = None
66
  _OP_REQP = []
67
  REQ_MASTER = True
68
  REQ_WSSTORE = False
69
  REQ_BGL = True
70

    
71
  def __init__(self, processor, op, context, sstore):
72
    """Constructor for LogicalUnit.
73

74
    This needs to be overriden in derived classes in order to check op
75
    validity.
76

77
    """
78
    self.proc = processor
79
    self.op = op
80
    self.cfg = context.cfg
81
    self.sstore = sstore
82
    self.context = context
83
    # Dicts used to declare locking needs to mcpu
84
    self.needed_locks = None
85
    self.acquired_locks = {}
86
    self.share_locks = dict(((i, 0) for i in locking.LEVELS))
87
    self.add_locks = {}
88
    self.remove_locks = {}
89
    # Used to force good behavior when calling helper functions
90
    self.recalculate_locks = {}
91
    self.__ssh = None
92

    
93
    for attr_name in self._OP_REQP:
94
      attr_val = getattr(op, attr_name, None)
95
      if attr_val is None:
96
        raise errors.OpPrereqError("Required parameter '%s' missing" %
97
                                   attr_name)
98

    
99
    if not self.cfg.IsCluster():
100
      raise errors.OpPrereqError("Cluster not initialized yet,"
101
                                 " use 'gnt-cluster init' first.")
102
    if self.REQ_MASTER:
103
      master = sstore.GetMasterNode()
104
      if master != utils.HostInfo().name:
105
        raise errors.OpPrereqError("Commands must be run on the master"
106
                                   " node %s" % master)
107

    
108
  def __GetSSH(self):
109
    """Returns the SshRunner object
110

111
    """
112
    if not self.__ssh:
113
      self.__ssh = ssh.SshRunner(self.sstore)
114
    return self.__ssh
115

    
116
  ssh = property(fget=__GetSSH)
117

    
118
  def ExpandNames(self):
119
    """Expand names for this LU.
120

121
    This method is called before starting to execute the opcode, and it should
122
    update all the parameters of the opcode to their canonical form (e.g. a
123
    short node name must be fully expanded after this method has successfully
124
    completed). This way locking, hooks, logging, ecc. can work correctly.
125

126
    LUs which implement this method must also populate the self.needed_locks
127
    member, as a dict with lock levels as keys, and a list of needed lock names
128
    as values. Rules:
129
      - Use an empty dict if you don't need any lock
130
      - If you don't need any lock at a particular level omit that level
131
      - Don't put anything for the BGL level
132
      - If you want all locks at a level use locking.ALL_SET as a value
133

134
    If you need to share locks (rather than acquire them exclusively) at one
135
    level you can modify self.share_locks, setting a true value (usually 1) for
136
    that level. By default locks are not shared.
137

138
    Examples:
139
    # Acquire all nodes and one instance
140
    self.needed_locks = {
141
      locking.LEVEL_NODE: locking.ALL_SET,
142
      locking.LEVEL_INSTANCE: ['instance1.example.tld'],
143
    }
144
    # Acquire just two nodes
145
    self.needed_locks = {
146
      locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
147
    }
148
    # Acquire no locks
149
    self.needed_locks = {} # No, you can't leave it to the default value None
150

151
    """
152
    # The implementation of this method is mandatory only if the new LU is
153
    # concurrent, so that old LUs don't need to be changed all at the same
154
    # time.
155
    if self.REQ_BGL:
156
      self.needed_locks = {} # Exclusive LUs don't need locks.
157
    else:
158
      raise NotImplementedError
159

    
160
  def DeclareLocks(self, level):
161
    """Declare LU locking needs for a level
162

163
    While most LUs can just declare their locking needs at ExpandNames time,
164
    sometimes there's the need to calculate some locks after having acquired
165
    the ones before. This function is called just before acquiring locks at a
166
    particular level, but after acquiring the ones at lower levels, and permits
167
    such calculations. It can be used to modify self.needed_locks, and by
168
    default it does nothing.
169

170
    This function is only called if you have something already set in
171
    self.needed_locks for the level.
172

173
    @param level: Locking level which is going to be locked
174
    @type level: member of ganeti.locking.LEVELS
175

176
    """
177

    
178
  def CheckPrereq(self):
179
    """Check prerequisites for this LU.
180

181
    This method should check that the prerequisites for the execution
182
    of this LU are fulfilled. It can do internode communication, but
183
    it should be idempotent - no cluster or system changes are
184
    allowed.
185

186
    The method should raise errors.OpPrereqError in case something is
187
    not fulfilled. Its return value is ignored.
188

189
    This method should also update all the parameters of the opcode to
190
    their canonical form if it hasn't been done by ExpandNames before.
191

192
    """
193
    raise NotImplementedError
194

    
195
  def Exec(self, feedback_fn):
196
    """Execute the LU.
197

198
    This method should implement the actual work. It should raise
199
    errors.OpExecError for failures that are somewhat dealt with in
200
    code, or expected.
201

202
    """
203
    raise NotImplementedError
204

    
205
  def BuildHooksEnv(self):
206
    """Build hooks environment for this LU.
207

208
    This method should return a three-node tuple consisting of: a dict
209
    containing the environment that will be used for running the
210
    specific hook for this LU, a list of node names on which the hook
211
    should run before the execution, and a list of node names on which
212
    the hook should run after the execution.
213

214
    The keys of the dict must not have 'GANETI_' prefixed as this will
215
    be handled in the hooks runner. Also note additional keys will be
216
    added by the hooks runner. If the LU doesn't define any
217
    environment, an empty dict (and not None) should be returned.
218

219
    No nodes should be returned as an empty list (and not None).
220

221
    Note that if the HPATH for a LU class is None, this function will
222
    not be called.
223

224
    """
225
    raise NotImplementedError
226

    
227
  def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
228
    """Notify the LU about the results of its hooks.
229

230
    This method is called every time a hooks phase is executed, and notifies
231
    the Logical Unit about the hooks' result. The LU can then use it to alter
232
    its result based on the hooks.  By default the method does nothing and the
233
    previous result is passed back unchanged but any LU can define it if it
234
    wants to use the local cluster hook-scripts somehow.
235

236
    Args:
237
      phase: the hooks phase that has just been run
238
      hooks_results: the results of the multi-node hooks rpc call
239
      feedback_fn: function to send feedback back to the caller
240
      lu_result: the previous result this LU had, or None in the PRE phase.
241

242
    """
243
    return lu_result
244

    
245
  def _ExpandAndLockInstance(self):
246
    """Helper function to expand and lock an instance.
247

248
    Many LUs that work on an instance take its name in self.op.instance_name
249
    and need to expand it and then declare the expanded name for locking. This
250
    function does it, and then updates self.op.instance_name to the expanded
251
    name. It also initializes needed_locks as a dict, if this hasn't been done
252
    before.
253

254
    """
255
    if self.needed_locks is None:
256
      self.needed_locks = {}
257
    else:
258
      assert locking.LEVEL_INSTANCE not in self.needed_locks, \
259
        "_ExpandAndLockInstance called with instance-level locks set"
260
    expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
261
    if expanded_name is None:
262
      raise errors.OpPrereqError("Instance '%s' not known" %
263
                                  self.op.instance_name)
264
    self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
265
    self.op.instance_name = expanded_name
266

    
267
  def _LockInstancesNodes(self, primary_only=False):
268
    """Helper function to declare instances' nodes for locking.
269

270
    This function should be called after locking one or more instances to lock
271
    their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
272
    with all primary or secondary nodes for instances already locked and
273
    present in self.needed_locks[locking.LEVEL_INSTANCE].
274

275
    It should be called from DeclareLocks, and for safety only works if
276
    self.recalculate_locks[locking.LEVEL_NODE] is set.
277

278
    In the future it may grow parameters to just lock some instance's nodes, or
279
    to just lock primaries or secondary nodes, if needed.
280

281
    If should be called in DeclareLocks in a way similar to:
282

283
    if level == locking.LEVEL_NODE:
284
      self._LockInstancesNodes()
285

286
    @type primary_only: boolean
287
    @param primary_only: only lock primary nodes of locked instances
288

289
    """
290
    assert locking.LEVEL_NODE in self.recalculate_locks, \
291
      "_LockInstancesNodes helper function called with no nodes to recalculate"
292

    
293
    # TODO: check if we're really been called with the instance locks held
294

    
295
    # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
296
    # future we might want to have different behaviors depending on the value
297
    # of self.recalculate_locks[locking.LEVEL_NODE]
298
    wanted_nodes = []
299
    for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
300
      instance = self.context.cfg.GetInstanceInfo(instance_name)
301
      wanted_nodes.append(instance.primary_node)
302
      if not primary_only:
303
        wanted_nodes.extend(instance.secondary_nodes)
304

    
305
    if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
306
      self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
307
    elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
308
      self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
309

    
310
    del self.recalculate_locks[locking.LEVEL_NODE]
311

    
312

    
313
class NoHooksLU(LogicalUnit):
314
  """Simple LU which runs no hooks.
315

316
  This LU is intended as a parent for other LogicalUnits which will
317
  run no hooks, in order to reduce duplicate code.
318

319
  """
320
  HPATH = None
321
  HTYPE = None
322

    
323

    
324
def _GetWantedNodes(lu, nodes):
325
  """Returns list of checked and expanded node names.
326

327
  Args:
328
    nodes: List of nodes (strings) or None for all
329

330
  """
331
  if not isinstance(nodes, list):
332
    raise errors.OpPrereqError("Invalid argument type 'nodes'")
333

    
334
  if not nodes:
335
    raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
336
      " non-empty list of nodes whose name is to be expanded.")
337

    
338
  wanted = []
339
  for name in nodes:
340
    node = lu.cfg.ExpandNodeName(name)
341
    if node is None:
342
      raise errors.OpPrereqError("No such node name '%s'" % name)
343
    wanted.append(node)
344

    
345
  return utils.NiceSort(wanted)
346

    
347

    
348
def _GetWantedInstances(lu, instances):
349
  """Returns list of checked and expanded instance names.
350

351
  Args:
352
    instances: List of instances (strings) or None for all
353

354
  """
355
  if not isinstance(instances, list):
356
    raise errors.OpPrereqError("Invalid argument type 'instances'")
357

    
358
  if instances:
359
    wanted = []
360

    
361
    for name in instances:
362
      instance = lu.cfg.ExpandInstanceName(name)
363
      if instance is None:
364
        raise errors.OpPrereqError("No such instance name '%s'" % name)
365
      wanted.append(instance)
366

    
367
  else:
368
    wanted = lu.cfg.GetInstanceList()
369
  return utils.NiceSort(wanted)
370

    
371

    
372
def _CheckOutputFields(static, dynamic, selected):
373
  """Checks whether all selected fields are valid.
374

375
  Args:
376
    static: Static fields
377
    dynamic: Dynamic fields
378

379
  """
380
  static_fields = frozenset(static)
381
  dynamic_fields = frozenset(dynamic)
382

    
383
  all_fields = static_fields | dynamic_fields
384

    
385
  if not all_fields.issuperset(selected):
386
    raise errors.OpPrereqError("Unknown output fields selected: %s"
387
                               % ",".join(frozenset(selected).
388
                                          difference(all_fields)))
389

    
390

    
391
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
392
                          memory, vcpus, nics):
393
  """Builds instance related env variables for hooks from single variables.
394

395
  Args:
396
    secondary_nodes: List of secondary nodes as strings
397
  """
398
  env = {
399
    "OP_TARGET": name,
400
    "INSTANCE_NAME": name,
401
    "INSTANCE_PRIMARY": primary_node,
402
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
403
    "INSTANCE_OS_TYPE": os_type,
404
    "INSTANCE_STATUS": status,
405
    "INSTANCE_MEMORY": memory,
406
    "INSTANCE_VCPUS": vcpus,
407
  }
408

    
409
  if nics:
410
    nic_count = len(nics)
411
    for idx, (ip, bridge, mac) in enumerate(nics):
412
      if ip is None:
413
        ip = ""
414
      env["INSTANCE_NIC%d_IP" % idx] = ip
415
      env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
416
      env["INSTANCE_NIC%d_HWADDR" % idx] = mac
417
  else:
418
    nic_count = 0
419

    
420
  env["INSTANCE_NIC_COUNT"] = nic_count
421

    
422
  return env
423

    
424

    
425
def _BuildInstanceHookEnvByObject(instance, override=None):
426
  """Builds instance related env variables for hooks from an object.
427

428
  Args:
429
    instance: objects.Instance object of instance
430
    override: dict of values to override
431
  """
432
  args = {
433
    'name': instance.name,
434
    'primary_node': instance.primary_node,
435
    'secondary_nodes': instance.secondary_nodes,
436
    'os_type': instance.os,
437
    'status': instance.os,
438
    'memory': instance.memory,
439
    'vcpus': instance.vcpus,
440
    'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
441
  }
442
  if override:
443
    args.update(override)
444
  return _BuildInstanceHookEnv(**args)
445

    
446

    
447
def _CheckInstanceBridgesExist(instance):
448
  """Check that the brigdes needed by an instance exist.
449

450
  """
451
  # check bridges existance
452
  brlist = [nic.bridge for nic in instance.nics]
453
  if not rpc.call_bridges_exist(instance.primary_node, brlist):
454
    raise errors.OpPrereqError("one or more target bridges %s does not"
455
                               " exist on destination node '%s'" %
456
                               (brlist, instance.primary_node))
457

    
458

    
459
class LUDestroyCluster(NoHooksLU):
460
  """Logical unit for destroying the cluster.
461

462
  """
463
  _OP_REQP = []
464

    
465
  def CheckPrereq(self):
466
    """Check prerequisites.
467

468
    This checks whether the cluster is empty.
469

470
    Any errors are signalled by raising errors.OpPrereqError.
471

472
    """
473
    master = self.sstore.GetMasterNode()
474

    
475
    nodelist = self.cfg.GetNodeList()
476
    if len(nodelist) != 1 or nodelist[0] != master:
477
      raise errors.OpPrereqError("There are still %d node(s) in"
478
                                 " this cluster." % (len(nodelist) - 1))
479
    instancelist = self.cfg.GetInstanceList()
480
    if instancelist:
481
      raise errors.OpPrereqError("There are still %d instance(s) in"
482
                                 " this cluster." % len(instancelist))
483

    
484
  def Exec(self, feedback_fn):
485
    """Destroys the cluster.
486

487
    """
488
    master = self.sstore.GetMasterNode()
489
    if not rpc.call_node_stop_master(master, False):
490
      raise errors.OpExecError("Could not disable the master role")
491
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
492
    utils.CreateBackup(priv_key)
493
    utils.CreateBackup(pub_key)
494
    return master
495

    
496

    
497
class LUVerifyCluster(LogicalUnit):
498
  """Verifies the cluster status.
499

500
  """
501
  HPATH = "cluster-verify"
502
  HTYPE = constants.HTYPE_CLUSTER
503
  _OP_REQP = ["skip_checks"]
504
  REQ_BGL = False
505

    
506
  def ExpandNames(self):
507
    self.needed_locks = {
508
      locking.LEVEL_NODE: locking.ALL_SET,
509
      locking.LEVEL_INSTANCE: locking.ALL_SET,
510
    }
511
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
512

    
513
  def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
514
                  remote_version, feedback_fn):
515
    """Run multiple tests against a node.
516

517
    Test list:
518
      - compares ganeti version
519
      - checks vg existance and size > 20G
520
      - checks config file checksum
521
      - checks ssh to other nodes
522

523
    Args:
524
      node: name of the node to check
525
      file_list: required list of files
526
      local_cksum: dictionary of local files and their checksums
527

528
    """
529
    # compares ganeti version
530
    local_version = constants.PROTOCOL_VERSION
531
    if not remote_version:
532
      feedback_fn("  - ERROR: connection to %s failed" % (node))
533
      return True
534

    
535
    if local_version != remote_version:
536
      feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
537
                      (local_version, node, remote_version))
538
      return True
539

    
540
    # checks vg existance and size > 20G
541

    
542
    bad = False
543
    if not vglist:
544
      feedback_fn("  - ERROR: unable to check volume groups on node %s." %
545
                      (node,))
546
      bad = True
547
    else:
548
      vgstatus = utils.CheckVolumeGroupSize(vglist, self.cfg.GetVGName(),
549
                                            constants.MIN_VG_SIZE)
550
      if vgstatus:
551
        feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
552
        bad = True
553

    
554
    # checks config file checksum
555
    # checks ssh to any
556

    
557
    if 'filelist' not in node_result:
558
      bad = True
559
      feedback_fn("  - ERROR: node hasn't returned file checksum data")
560
    else:
561
      remote_cksum = node_result['filelist']
562
      for file_name in file_list:
563
        if file_name not in remote_cksum:
564
          bad = True
565
          feedback_fn("  - ERROR: file '%s' missing" % file_name)
566
        elif remote_cksum[file_name] != local_cksum[file_name]:
567
          bad = True
568
          feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
569

    
570
    if 'nodelist' not in node_result:
571
      bad = True
572
      feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
573
    else:
574
      if node_result['nodelist']:
575
        bad = True
576
        for node in node_result['nodelist']:
577
          feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
578
                          (node, node_result['nodelist'][node]))
579
    if 'node-net-test' not in node_result:
580
      bad = True
581
      feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
582
    else:
583
      if node_result['node-net-test']:
584
        bad = True
585
        nlist = utils.NiceSort(node_result['node-net-test'].keys())
586
        for node in nlist:
587
          feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
588
                          (node, node_result['node-net-test'][node]))
589

    
590
    hyp_result = node_result.get('hypervisor', None)
591
    if hyp_result is not None:
592
      feedback_fn("  - ERROR: hypervisor verify failure: '%s'" % hyp_result)
593
    return bad
594

    
595
  def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
596
                      node_instance, feedback_fn):
597
    """Verify an instance.
598

599
    This function checks to see if the required block devices are
600
    available on the instance's node.
601

602
    """
603
    bad = False
604

    
605
    node_current = instanceconfig.primary_node
606

    
607
    node_vol_should = {}
608
    instanceconfig.MapLVsByNode(node_vol_should)
609

    
610
    for node in node_vol_should:
611
      for volume in node_vol_should[node]:
612
        if node not in node_vol_is or volume not in node_vol_is[node]:
613
          feedback_fn("  - ERROR: volume %s missing on node %s" %
614
                          (volume, node))
615
          bad = True
616

    
617
    if not instanceconfig.status == 'down':
618
      if (node_current not in node_instance or
619
          not instance in node_instance[node_current]):
620
        feedback_fn("  - ERROR: instance %s not running on node %s" %
621
                        (instance, node_current))
622
        bad = True
623

    
624
    for node in node_instance:
625
      if (not node == node_current):
626
        if instance in node_instance[node]:
627
          feedback_fn("  - ERROR: instance %s should not run on node %s" %
628
                          (instance, node))
629
          bad = True
630

    
631
    return bad
632

    
633
  def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
634
    """Verify if there are any unknown volumes in the cluster.
635

636
    The .os, .swap and backup volumes are ignored. All other volumes are
637
    reported as unknown.
638

639
    """
640
    bad = False
641

    
642
    for node in node_vol_is:
643
      for volume in node_vol_is[node]:
644
        if node not in node_vol_should or volume not in node_vol_should[node]:
645
          feedback_fn("  - ERROR: volume %s on node %s should not exist" %
646
                      (volume, node))
647
          bad = True
648
    return bad
649

    
650
  def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
651
    """Verify the list of running instances.
652

653
    This checks what instances are running but unknown to the cluster.
654

655
    """
656
    bad = False
657
    for node in node_instance:
658
      for runninginstance in node_instance[node]:
659
        if runninginstance not in instancelist:
660
          feedback_fn("  - ERROR: instance %s on node %s should not exist" %
661
                          (runninginstance, node))
662
          bad = True
663
    return bad
664

    
665
  def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
666
    """Verify N+1 Memory Resilience.
667

668
    Check that if one single node dies we can still start all the instances it
669
    was primary for.
670

671
    """
672
    bad = False
673

    
674
    for node, nodeinfo in node_info.iteritems():
675
      # This code checks that every node which is now listed as secondary has
676
      # enough memory to host all instances it is supposed to should a single
677
      # other node in the cluster fail.
678
      # FIXME: not ready for failover to an arbitrary node
679
      # FIXME: does not support file-backed instances
680
      # WARNING: we currently take into account down instances as well as up
681
      # ones, considering that even if they're down someone might want to start
682
      # them even in the event of a node failure.
683
      for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
684
        needed_mem = 0
685
        for instance in instances:
686
          needed_mem += instance_cfg[instance].memory
687
        if nodeinfo['mfree'] < needed_mem:
688
          feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
689
                      " failovers should node %s fail" % (node, prinode))
690
          bad = True
691
    return bad
692

    
693
  def CheckPrereq(self):
694
    """Check prerequisites.
695

696
    Transform the list of checks we're going to skip into a set and check that
697
    all its members are valid.
698

699
    """
700
    self.skip_set = frozenset(self.op.skip_checks)
701
    if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
702
      raise errors.OpPrereqError("Invalid checks to be skipped specified")
703

    
704
  def BuildHooksEnv(self):
705
    """Build hooks env.
706

707
    Cluster-Verify hooks just rone in the post phase and their failure makes
708
    the output be logged in the verify output and the verification to fail.
709

710
    """
711
    all_nodes = self.cfg.GetNodeList()
712
    # TODO: populate the environment with useful information for verify hooks
713
    env = {}
714
    return env, [], all_nodes
715

    
716
  def Exec(self, feedback_fn):
717
    """Verify integrity of cluster, performing various test on nodes.
718

719
    """
720
    bad = False
721
    feedback_fn("* Verifying global settings")
722
    for msg in self.cfg.VerifyConfig():
723
      feedback_fn("  - ERROR: %s" % msg)
724

    
725
    vg_name = self.cfg.GetVGName()
726
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
727
    nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
728
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
729
    i_non_redundant = [] # Non redundant instances
730
    node_volume = {}
731
    node_instance = {}
732
    node_info = {}
733
    instance_cfg = {}
734

    
735
    # FIXME: verify OS list
736
    # do local checksums
737
    file_names = list(self.sstore.GetFileList())
738
    file_names.append(constants.SSL_CERT_FILE)
739
    file_names.append(constants.CLUSTER_CONF_FILE)
740
    local_checksums = utils.FingerprintFiles(file_names)
741

    
742
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
743
    all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
744
    all_instanceinfo = rpc.call_instance_list(nodelist)
745
    all_vglist = rpc.call_vg_list(nodelist)
746
    node_verify_param = {
747
      'filelist': file_names,
748
      'nodelist': nodelist,
749
      'hypervisor': None,
750
      'node-net-test': [(node.name, node.primary_ip, node.secondary_ip)
751
                        for node in nodeinfo]
752
      }
753
    all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
754
    all_rversion = rpc.call_version(nodelist)
755
    all_ninfo = rpc.call_node_info(nodelist, self.cfg.GetVGName())
756

    
757
    for node in nodelist:
758
      feedback_fn("* Verifying node %s" % node)
759
      result = self._VerifyNode(node, file_names, local_checksums,
760
                                all_vglist[node], all_nvinfo[node],
761
                                all_rversion[node], feedback_fn)
762
      bad = bad or result
763

    
764
      # node_volume
765
      volumeinfo = all_volumeinfo[node]
766

    
767
      if isinstance(volumeinfo, basestring):
768
        feedback_fn("  - ERROR: LVM problem on node %s: %s" %
769
                    (node, volumeinfo[-400:].encode('string_escape')))
770
        bad = True
771
        node_volume[node] = {}
772
      elif not isinstance(volumeinfo, dict):
773
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
774
        bad = True
775
        continue
776
      else:
777
        node_volume[node] = volumeinfo
778

    
779
      # node_instance
780
      nodeinstance = all_instanceinfo[node]
781
      if type(nodeinstance) != list:
782
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
783
        bad = True
784
        continue
785

    
786
      node_instance[node] = nodeinstance
787

    
788
      # node_info
789
      nodeinfo = all_ninfo[node]
790
      if not isinstance(nodeinfo, dict):
791
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
792
        bad = True
793
        continue
794

    
795
      try:
796
        node_info[node] = {
797
          "mfree": int(nodeinfo['memory_free']),
798
          "dfree": int(nodeinfo['vg_free']),
799
          "pinst": [],
800
          "sinst": [],
801
          # dictionary holding all instances this node is secondary for,
802
          # grouped by their primary node. Each key is a cluster node, and each
803
          # value is a list of instances which have the key as primary and the
804
          # current node as secondary.  this is handy to calculate N+1 memory
805
          # availability if you can only failover from a primary to its
806
          # secondary.
807
          "sinst-by-pnode": {},
808
        }
809
      except ValueError:
810
        feedback_fn("  - ERROR: invalid value returned from node %s" % (node,))
811
        bad = True
812
        continue
813

    
814
    node_vol_should = {}
815

    
816
    for instance in instancelist:
817
      feedback_fn("* Verifying instance %s" % instance)
818
      inst_config = self.cfg.GetInstanceInfo(instance)
819
      result =  self._VerifyInstance(instance, inst_config, node_volume,
820
                                     node_instance, feedback_fn)
821
      bad = bad or result
822

    
823
      inst_config.MapLVsByNode(node_vol_should)
824

    
825
      instance_cfg[instance] = inst_config
826

    
827
      pnode = inst_config.primary_node
828
      if pnode in node_info:
829
        node_info[pnode]['pinst'].append(instance)
830
      else:
831
        feedback_fn("  - ERROR: instance %s, connection to primary node"
832
                    " %s failed" % (instance, pnode))
833
        bad = True
834

    
835
      # If the instance is non-redundant we cannot survive losing its primary
836
      # node, so we are not N+1 compliant. On the other hand we have no disk
837
      # templates with more than one secondary so that situation is not well
838
      # supported either.
839
      # FIXME: does not support file-backed instances
840
      if len(inst_config.secondary_nodes) == 0:
841
        i_non_redundant.append(instance)
842
      elif len(inst_config.secondary_nodes) > 1:
843
        feedback_fn("  - WARNING: multiple secondaries for instance %s"
844
                    % instance)
845

    
846
      for snode in inst_config.secondary_nodes:
847
        if snode in node_info:
848
          node_info[snode]['sinst'].append(instance)
849
          if pnode not in node_info[snode]['sinst-by-pnode']:
850
            node_info[snode]['sinst-by-pnode'][pnode] = []
851
          node_info[snode]['sinst-by-pnode'][pnode].append(instance)
852
        else:
853
          feedback_fn("  - ERROR: instance %s, connection to secondary node"
854
                      " %s failed" % (instance, snode))
855

    
856
    feedback_fn("* Verifying orphan volumes")
857
    result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
858
                                       feedback_fn)
859
    bad = bad or result
860

    
861
    feedback_fn("* Verifying remaining instances")
862
    result = self._VerifyOrphanInstances(instancelist, node_instance,
863
                                         feedback_fn)
864
    bad = bad or result
865

    
866
    if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
867
      feedback_fn("* Verifying N+1 Memory redundancy")
868
      result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
869
      bad = bad or result
870

    
871
    feedback_fn("* Other Notes")
872
    if i_non_redundant:
873
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
874
                  % len(i_non_redundant))
875

    
876
    return not bad
877

    
878
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
879
    """Analize the post-hooks' result, handle it, and send some
880
    nicely-formatted feedback back to the user.
881

882
    Args:
883
      phase: the hooks phase that has just been run
884
      hooks_results: the results of the multi-node hooks rpc call
885
      feedback_fn: function to send feedback back to the caller
886
      lu_result: previous Exec result
887

888
    """
889
    # We only really run POST phase hooks, and are only interested in
890
    # their results
891
    if phase == constants.HOOKS_PHASE_POST:
892
      # Used to change hooks' output to proper indentation
893
      indent_re = re.compile('^', re.M)
894
      feedback_fn("* Hooks Results")
895
      if not hooks_results:
896
        feedback_fn("  - ERROR: general communication failure")
897
        lu_result = 1
898
      else:
899
        for node_name in hooks_results:
900
          show_node_header = True
901
          res = hooks_results[node_name]
902
          if res is False or not isinstance(res, list):
903
            feedback_fn("    Communication failure")
904
            lu_result = 1
905
            continue
906
          for script, hkr, output in res:
907
            if hkr == constants.HKR_FAIL:
908
              # The node header is only shown once, if there are
909
              # failing hooks on that node
910
              if show_node_header:
911
                feedback_fn("  Node %s:" % node_name)
912
                show_node_header = False
913
              feedback_fn("    ERROR: Script %s failed, output:" % script)
914
              output = indent_re.sub('      ', output)
915
              feedback_fn("%s" % output)
916
              lu_result = 1
917

    
918
      return lu_result
919

    
920

    
921
class LUVerifyDisks(NoHooksLU):
922
  """Verifies the cluster disks status.
923

924
  """
925
  _OP_REQP = []
926
  REQ_BGL = False
927

    
928
  def ExpandNames(self):
929
    self.needed_locks = {
930
      locking.LEVEL_NODE: locking.ALL_SET,
931
      locking.LEVEL_INSTANCE: locking.ALL_SET,
932
    }
933
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
934

    
935
  def CheckPrereq(self):
936
    """Check prerequisites.
937

938
    This has no prerequisites.
939

940
    """
941
    pass
942

    
943
  def Exec(self, feedback_fn):
944
    """Verify integrity of cluster disks.
945

946
    """
947
    result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
948

    
949
    vg_name = self.cfg.GetVGName()
950
    nodes = utils.NiceSort(self.cfg.GetNodeList())
951
    instances = [self.cfg.GetInstanceInfo(name)
952
                 for name in self.cfg.GetInstanceList()]
953

    
954
    nv_dict = {}
955
    for inst in instances:
956
      inst_lvs = {}
957
      if (inst.status != "up" or
958
          inst.disk_template not in constants.DTS_NET_MIRROR):
959
        continue
960
      inst.MapLVsByNode(inst_lvs)
961
      # transform { iname: {node: [vol,],},} to {(node, vol): iname}
962
      for node, vol_list in inst_lvs.iteritems():
963
        for vol in vol_list:
964
          nv_dict[(node, vol)] = inst
965

    
966
    if not nv_dict:
967
      return result
968

    
969
    node_lvs = rpc.call_volume_list(nodes, vg_name)
970

    
971
    to_act = set()
972
    for node in nodes:
973
      # node_volume
974
      lvs = node_lvs[node]
975

    
976
      if isinstance(lvs, basestring):
977
        logger.Info("error enumerating LVs on node %s: %s" % (node, lvs))
978
        res_nlvm[node] = lvs
979
      elif not isinstance(lvs, dict):
980
        logger.Info("connection to node %s failed or invalid data returned" %
981
                    (node,))
982
        res_nodes.append(node)
983
        continue
984

    
985
      for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
986
        inst = nv_dict.pop((node, lv_name), None)
987
        if (not lv_online and inst is not None
988
            and inst.name not in res_instances):
989
          res_instances.append(inst.name)
990

    
991
    # any leftover items in nv_dict are missing LVs, let's arrange the
992
    # data better
993
    for key, inst in nv_dict.iteritems():
994
      if inst.name not in res_missing:
995
        res_missing[inst.name] = []
996
      res_missing[inst.name].append(key)
997

    
998
    return result
999

    
1000

    
1001
class LURenameCluster(LogicalUnit):
1002
  """Rename the cluster.
1003

1004
  """
1005
  HPATH = "cluster-rename"
1006
  HTYPE = constants.HTYPE_CLUSTER
1007
  _OP_REQP = ["name"]
1008
  REQ_WSSTORE = True
1009

    
1010
  def BuildHooksEnv(self):
1011
    """Build hooks env.
1012

1013
    """
1014
    env = {
1015
      "OP_TARGET": self.sstore.GetClusterName(),
1016
      "NEW_NAME": self.op.name,
1017
      }
1018
    mn = self.sstore.GetMasterNode()
1019
    return env, [mn], [mn]
1020

    
1021
  def CheckPrereq(self):
1022
    """Verify that the passed name is a valid one.
1023

1024
    """
1025
    hostname = utils.HostInfo(self.op.name)
1026

    
1027
    new_name = hostname.name
1028
    self.ip = new_ip = hostname.ip
1029
    old_name = self.sstore.GetClusterName()
1030
    old_ip = self.sstore.GetMasterIP()
1031
    if new_name == old_name and new_ip == old_ip:
1032
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
1033
                                 " cluster has changed")
1034
    if new_ip != old_ip:
1035
      if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1036
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
1037
                                   " reachable on the network. Aborting." %
1038
                                   new_ip)
1039

    
1040
    self.op.name = new_name
1041

    
1042
  def Exec(self, feedback_fn):
1043
    """Rename the cluster.
1044

1045
    """
1046
    clustername = self.op.name
1047
    ip = self.ip
1048
    ss = self.sstore
1049

    
1050
    # shutdown the master IP
1051
    master = ss.GetMasterNode()
1052
    if not rpc.call_node_stop_master(master, False):
1053
      raise errors.OpExecError("Could not disable the master role")
1054

    
1055
    try:
1056
      # modify the sstore
1057
      ss.SetKey(ss.SS_MASTER_IP, ip)
1058
      ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
1059

    
1060
      # Distribute updated ss config to all nodes
1061
      myself = self.cfg.GetNodeInfo(master)
1062
      dist_nodes = self.cfg.GetNodeList()
1063
      if myself.name in dist_nodes:
1064
        dist_nodes.remove(myself.name)
1065

    
1066
      logger.Debug("Copying updated ssconf data to all nodes")
1067
      for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1068
        fname = ss.KeyToFilename(keyname)
1069
        result = rpc.call_upload_file(dist_nodes, fname)
1070
        for to_node in dist_nodes:
1071
          if not result[to_node]:
1072
            logger.Error("copy of file %s to node %s failed" %
1073
                         (fname, to_node))
1074
    finally:
1075
      if not rpc.call_node_start_master(master, False):
1076
        logger.Error("Could not re-enable the master role on the master,"
1077
                     " please restart manually.")
1078

    
1079

    
1080
def _RecursiveCheckIfLVMBased(disk):
1081
  """Check if the given disk or its children are lvm-based.
1082

1083
  Args:
1084
    disk: ganeti.objects.Disk object
1085

1086
  Returns:
1087
    boolean indicating whether a LD_LV dev_type was found or not
1088

1089
  """
1090
  if disk.children:
1091
    for chdisk in disk.children:
1092
      if _RecursiveCheckIfLVMBased(chdisk):
1093
        return True
1094
  return disk.dev_type == constants.LD_LV
1095

    
1096

    
1097
class LUSetClusterParams(LogicalUnit):
1098
  """Change the parameters of the cluster.
1099

1100
  """
1101
  HPATH = "cluster-modify"
1102
  HTYPE = constants.HTYPE_CLUSTER
1103
  _OP_REQP = []
1104

    
1105
  def BuildHooksEnv(self):
1106
    """Build hooks env.
1107

1108
    """
1109
    env = {
1110
      "OP_TARGET": self.sstore.GetClusterName(),
1111
      "NEW_VG_NAME": self.op.vg_name,
1112
      }
1113
    mn = self.sstore.GetMasterNode()
1114
    return env, [mn], [mn]
1115

    
1116
  def CheckPrereq(self):
1117
    """Check prerequisites.
1118

1119
    This checks whether the given params don't conflict and
1120
    if the given volume group is valid.
1121

1122
    """
1123
    if not self.op.vg_name:
1124
      instances = [self.cfg.GetInstanceInfo(name)
1125
                   for name in self.cfg.GetInstanceList()]
1126
      for inst in instances:
1127
        for disk in inst.disks:
1128
          if _RecursiveCheckIfLVMBased(disk):
1129
            raise errors.OpPrereqError("Cannot disable lvm storage while"
1130
                                       " lvm-based instances exist")
1131

    
1132
    # if vg_name not None, checks given volume group on all nodes
1133
    if self.op.vg_name:
1134
      node_list = self.cfg.GetNodeList()
1135
      vglist = rpc.call_vg_list(node_list)
1136
      for node in node_list:
1137
        vgstatus = utils.CheckVolumeGroupSize(vglist[node], self.op.vg_name,
1138
                                              constants.MIN_VG_SIZE)
1139
        if vgstatus:
1140
          raise errors.OpPrereqError("Error on node '%s': %s" %
1141
                                     (node, vgstatus))
1142

    
1143
  def Exec(self, feedback_fn):
1144
    """Change the parameters of the cluster.
1145

1146
    """
1147
    if self.op.vg_name != self.cfg.GetVGName():
1148
      self.cfg.SetVGName(self.op.vg_name)
1149
    else:
1150
      feedback_fn("Cluster LVM configuration already in desired"
1151
                  " state, not changing")
1152

    
1153

    
1154
def _WaitForSync(cfgw, instance, proc, oneshot=False, unlock=False):
1155
  """Sleep and poll for an instance's disk to sync.
1156

1157
  """
1158
  if not instance.disks:
1159
    return True
1160

    
1161
  if not oneshot:
1162
    proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1163

    
1164
  node = instance.primary_node
1165

    
1166
  for dev in instance.disks:
1167
    cfgw.SetDiskID(dev, node)
1168

    
1169
  retries = 0
1170
  while True:
1171
    max_time = 0
1172
    done = True
1173
    cumul_degraded = False
1174
    rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1175
    if not rstats:
1176
      proc.LogWarning("Can't get any data from node %s" % node)
1177
      retries += 1
1178
      if retries >= 10:
1179
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1180
                                 " aborting." % node)
1181
      time.sleep(6)
1182
      continue
1183
    retries = 0
1184
    for i in range(len(rstats)):
1185
      mstat = rstats[i]
1186
      if mstat is None:
1187
        proc.LogWarning("Can't compute data for node %s/%s" %
1188
                        (node, instance.disks[i].iv_name))
1189
        continue
1190
      # we ignore the ldisk parameter
1191
      perc_done, est_time, is_degraded, _ = mstat
1192
      cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1193
      if perc_done is not None:
1194
        done = False
1195
        if est_time is not None:
1196
          rem_time = "%d estimated seconds remaining" % est_time
1197
          max_time = est_time
1198
        else:
1199
          rem_time = "no time estimate"
1200
        proc.LogInfo("- device %s: %5.2f%% done, %s" %
1201
                     (instance.disks[i].iv_name, perc_done, rem_time))
1202
    if done or oneshot:
1203
      break
1204

    
1205
    time.sleep(min(60, max_time))
1206

    
1207
  if done:
1208
    proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1209
  return not cumul_degraded
1210

    
1211

    
1212
def _CheckDiskConsistency(cfgw, dev, node, on_primary, ldisk=False):
1213
  """Check that mirrors are not degraded.
1214

1215
  The ldisk parameter, if True, will change the test from the
1216
  is_degraded attribute (which represents overall non-ok status for
1217
  the device(s)) to the ldisk (representing the local storage status).
1218

1219
  """
1220
  cfgw.SetDiskID(dev, node)
1221
  if ldisk:
1222
    idx = 6
1223
  else:
1224
    idx = 5
1225

    
1226
  result = True
1227
  if on_primary or dev.AssembleOnSecondary():
1228
    rstats = rpc.call_blockdev_find(node, dev)
1229
    if not rstats:
1230
      logger.ToStderr("Node %s: Disk degraded, not found or node down" % node)
1231
      result = False
1232
    else:
1233
      result = result and (not rstats[idx])
1234
  if dev.children:
1235
    for child in dev.children:
1236
      result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1237

    
1238
  return result
1239

    
1240

    
1241
class LUDiagnoseOS(NoHooksLU):
1242
  """Logical unit for OS diagnose/query.
1243

1244
  """
1245
  _OP_REQP = ["output_fields", "names"]
1246
  REQ_BGL = False
1247

    
1248
  def ExpandNames(self):
1249
    if self.op.names:
1250
      raise errors.OpPrereqError("Selective OS query not supported")
1251

    
1252
    self.dynamic_fields = frozenset(["name", "valid", "node_status"])
1253
    _CheckOutputFields(static=[],
1254
                       dynamic=self.dynamic_fields,
1255
                       selected=self.op.output_fields)
1256

    
1257
    # Lock all nodes, in shared mode
1258
    self.needed_locks = {}
1259
    self.share_locks[locking.LEVEL_NODE] = 1
1260
    self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1261

    
1262
  def CheckPrereq(self):
1263
    """Check prerequisites.
1264

1265
    """
1266

    
1267
  @staticmethod
1268
  def _DiagnoseByOS(node_list, rlist):
1269
    """Remaps a per-node return list into an a per-os per-node dictionary
1270

1271
      Args:
1272
        node_list: a list with the names of all nodes
1273
        rlist: a map with node names as keys and OS objects as values
1274

1275
      Returns:
1276
        map: a map with osnames as keys and as value another map, with
1277
             nodes as
1278
             keys and list of OS objects as values
1279
             e.g. {"debian-etch": {"node1": [<object>,...],
1280
                                   "node2": [<object>,]}
1281
                  }
1282

1283
    """
1284
    all_os = {}
1285
    for node_name, nr in rlist.iteritems():
1286
      if not nr:
1287
        continue
1288
      for os_obj in nr:
1289
        if os_obj.name not in all_os:
1290
          # build a list of nodes for this os containing empty lists
1291
          # for each node in node_list
1292
          all_os[os_obj.name] = {}
1293
          for nname in node_list:
1294
            all_os[os_obj.name][nname] = []
1295
        all_os[os_obj.name][node_name].append(os_obj)
1296
    return all_os
1297

    
1298
  def Exec(self, feedback_fn):
1299
    """Compute the list of OSes.
1300

1301
    """
1302
    node_list = self.acquired_locks[locking.LEVEL_NODE]
1303
    node_data = rpc.call_os_diagnose(node_list)
1304
    if node_data == False:
1305
      raise errors.OpExecError("Can't gather the list of OSes")
1306
    pol = self._DiagnoseByOS(node_list, node_data)
1307
    output = []
1308
    for os_name, os_data in pol.iteritems():
1309
      row = []
1310
      for field in self.op.output_fields:
1311
        if field == "name":
1312
          val = os_name
1313
        elif field == "valid":
1314
          val = utils.all([osl and osl[0] for osl in os_data.values()])
1315
        elif field == "node_status":
1316
          val = {}
1317
          for node_name, nos_list in os_data.iteritems():
1318
            val[node_name] = [(v.status, v.path) for v in nos_list]
1319
        else:
1320
          raise errors.ParameterError(field)
1321
        row.append(val)
1322
      output.append(row)
1323

    
1324
    return output
1325

    
1326

    
1327
class LURemoveNode(LogicalUnit):
1328
  """Logical unit for removing a node.
1329

1330
  """
1331
  HPATH = "node-remove"
1332
  HTYPE = constants.HTYPE_NODE
1333
  _OP_REQP = ["node_name"]
1334

    
1335
  def BuildHooksEnv(self):
1336
    """Build hooks env.
1337

1338
    This doesn't run on the target node in the pre phase as a failed
1339
    node would then be impossible to remove.
1340

1341
    """
1342
    env = {
1343
      "OP_TARGET": self.op.node_name,
1344
      "NODE_NAME": self.op.node_name,
1345
      }
1346
    all_nodes = self.cfg.GetNodeList()
1347
    all_nodes.remove(self.op.node_name)
1348
    return env, all_nodes, all_nodes
1349

    
1350
  def CheckPrereq(self):
1351
    """Check prerequisites.
1352

1353
    This checks:
1354
     - the node exists in the configuration
1355
     - it does not have primary or secondary instances
1356
     - it's not the master
1357

1358
    Any errors are signalled by raising errors.OpPrereqError.
1359

1360
    """
1361
    node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1362
    if node is None:
1363
      raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1364

    
1365
    instance_list = self.cfg.GetInstanceList()
1366

    
1367
    masternode = self.sstore.GetMasterNode()
1368
    if node.name == masternode:
1369
      raise errors.OpPrereqError("Node is the master node,"
1370
                                 " you need to failover first.")
1371

    
1372
    for instance_name in instance_list:
1373
      instance = self.cfg.GetInstanceInfo(instance_name)
1374
      if node.name == instance.primary_node:
1375
        raise errors.OpPrereqError("Instance %s still running on the node,"
1376
                                   " please remove first." % instance_name)
1377
      if node.name in instance.secondary_nodes:
1378
        raise errors.OpPrereqError("Instance %s has node as a secondary,"
1379
                                   " please remove first." % instance_name)
1380
    self.op.node_name = node.name
1381
    self.node = node
1382

    
1383
  def Exec(self, feedback_fn):
1384
    """Removes the node from the cluster.
1385

1386
    """
1387
    node = self.node
1388
    logger.Info("stopping the node daemon and removing configs from node %s" %
1389
                node.name)
1390

    
1391
    self.context.RemoveNode(node.name)
1392

    
1393
    rpc.call_node_leave_cluster(node.name)
1394

    
1395

    
1396
class LUQueryNodes(NoHooksLU):
1397
  """Logical unit for querying nodes.
1398

1399
  """
1400
  _OP_REQP = ["output_fields", "names"]
1401
  REQ_BGL = False
1402

    
1403
  def ExpandNames(self):
1404
    self.dynamic_fields = frozenset([
1405
      "dtotal", "dfree",
1406
      "mtotal", "mnode", "mfree",
1407
      "bootid",
1408
      "ctotal",
1409
      ])
1410

    
1411
    self.static_fields = frozenset([
1412
      "name", "pinst_cnt", "sinst_cnt",
1413
      "pinst_list", "sinst_list",
1414
      "pip", "sip", "tags",
1415
      ])
1416

    
1417
    _CheckOutputFields(static=self.static_fields,
1418
                       dynamic=self.dynamic_fields,
1419
                       selected=self.op.output_fields)
1420

    
1421
    self.needed_locks = {}
1422
    self.share_locks[locking.LEVEL_NODE] = 1
1423

    
1424
    if self.op.names:
1425
      self.wanted = _GetWantedNodes(self, self.op.names)
1426
    else:
1427
      self.wanted = locking.ALL_SET
1428

    
1429
    self.do_locking = not self.static_fields.issuperset(self.op.output_fields)
1430
    if self.do_locking:
1431
      # if we don't request only static fields, we need to lock the nodes
1432
      self.needed_locks[locking.LEVEL_NODE] = self.wanted
1433

    
1434

    
1435
  def CheckPrereq(self):
1436
    """Check prerequisites.
1437

1438
    """
1439
    # The validation of the node list is done in the _GetWantedNodes,
1440
    # if non empty, and if empty, there's no validation to do
1441
    pass
1442

    
1443
  def Exec(self, feedback_fn):
1444
    """Computes the list of nodes and their attributes.
1445

1446
    """
1447
    all_info = self.cfg.GetAllNodesInfo()
1448
    if self.do_locking:
1449
      nodenames = self.acquired_locks[locking.LEVEL_NODE]
1450
    else:
1451
      nodenames = all_info.keys()
1452
    nodelist = [all_info[name] for name in nodenames]
1453

    
1454
    # begin data gathering
1455

    
1456
    if self.dynamic_fields.intersection(self.op.output_fields):
1457
      live_data = {}
1458
      node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1459
      for name in nodenames:
1460
        nodeinfo = node_data.get(name, None)
1461
        if nodeinfo:
1462
          live_data[name] = {
1463
            "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1464
            "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1465
            "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1466
            "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1467
            "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1468
            "ctotal": utils.TryConvert(int, nodeinfo['cpu_total']),
1469
            "bootid": nodeinfo['bootid'],
1470
            }
1471
        else:
1472
          live_data[name] = {}
1473
    else:
1474
      live_data = dict.fromkeys(nodenames, {})
1475

    
1476
    node_to_primary = dict([(name, set()) for name in nodenames])
1477
    node_to_secondary = dict([(name, set()) for name in nodenames])
1478

    
1479
    inst_fields = frozenset(("pinst_cnt", "pinst_list",
1480
                             "sinst_cnt", "sinst_list"))
1481
    if inst_fields & frozenset(self.op.output_fields):
1482
      instancelist = self.cfg.GetInstanceList()
1483

    
1484
      for instance_name in instancelist:
1485
        inst = self.cfg.GetInstanceInfo(instance_name)
1486
        if inst.primary_node in node_to_primary:
1487
          node_to_primary[inst.primary_node].add(inst.name)
1488
        for secnode in inst.secondary_nodes:
1489
          if secnode in node_to_secondary:
1490
            node_to_secondary[secnode].add(inst.name)
1491

    
1492
    # end data gathering
1493

    
1494
    output = []
1495
    for node in nodelist:
1496
      node_output = []
1497
      for field in self.op.output_fields:
1498
        if field == "name":
1499
          val = node.name
1500
        elif field == "pinst_list":
1501
          val = list(node_to_primary[node.name])
1502
        elif field == "sinst_list":
1503
          val = list(node_to_secondary[node.name])
1504
        elif field == "pinst_cnt":
1505
          val = len(node_to_primary[node.name])
1506
        elif field == "sinst_cnt":
1507
          val = len(node_to_secondary[node.name])
1508
        elif field == "pip":
1509
          val = node.primary_ip
1510
        elif field == "sip":
1511
          val = node.secondary_ip
1512
        elif field == "tags":
1513
          val = list(node.GetTags())
1514
        elif field in self.dynamic_fields:
1515
          val = live_data[node.name].get(field, None)
1516
        else:
1517
          raise errors.ParameterError(field)
1518
        node_output.append(val)
1519
      output.append(node_output)
1520

    
1521
    return output
1522

    
1523

    
1524
class LUQueryNodeVolumes(NoHooksLU):
1525
  """Logical unit for getting volumes on node(s).
1526

1527
  """
1528
  _OP_REQP = ["nodes", "output_fields"]
1529
  REQ_BGL = False
1530

    
1531
  def ExpandNames(self):
1532
    _CheckOutputFields(static=["node"],
1533
                       dynamic=["phys", "vg", "name", "size", "instance"],
1534
                       selected=self.op.output_fields)
1535

    
1536
    self.needed_locks = {}
1537
    self.share_locks[locking.LEVEL_NODE] = 1
1538
    if not self.op.nodes:
1539
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1540
    else:
1541
      self.needed_locks[locking.LEVEL_NODE] = \
1542
        _GetWantedNodes(self, self.op.nodes)
1543

    
1544
  def CheckPrereq(self):
1545
    """Check prerequisites.
1546

1547
    This checks that the fields required are valid output fields.
1548

1549
    """
1550
    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
1551

    
1552
  def Exec(self, feedback_fn):
1553
    """Computes the list of nodes and their attributes.
1554

1555
    """
1556
    nodenames = self.nodes
1557
    volumes = rpc.call_node_volumes(nodenames)
1558

    
1559
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
1560
             in self.cfg.GetInstanceList()]
1561

    
1562
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1563

    
1564
    output = []
1565
    for node in nodenames:
1566
      if node not in volumes or not volumes[node]:
1567
        continue
1568

    
1569
      node_vols = volumes[node][:]
1570
      node_vols.sort(key=lambda vol: vol['dev'])
1571

    
1572
      for vol in node_vols:
1573
        node_output = []
1574
        for field in self.op.output_fields:
1575
          if field == "node":
1576
            val = node
1577
          elif field == "phys":
1578
            val = vol['dev']
1579
          elif field == "vg":
1580
            val = vol['vg']
1581
          elif field == "name":
1582
            val = vol['name']
1583
          elif field == "size":
1584
            val = int(float(vol['size']))
1585
          elif field == "instance":
1586
            for inst in ilist:
1587
              if node not in lv_by_node[inst]:
1588
                continue
1589
              if vol['name'] in lv_by_node[inst][node]:
1590
                val = inst.name
1591
                break
1592
            else:
1593
              val = '-'
1594
          else:
1595
            raise errors.ParameterError(field)
1596
          node_output.append(str(val))
1597

    
1598
        output.append(node_output)
1599

    
1600
    return output
1601

    
1602

    
1603
class LUAddNode(LogicalUnit):
1604
  """Logical unit for adding node to the cluster.
1605

1606
  """
1607
  HPATH = "node-add"
1608
  HTYPE = constants.HTYPE_NODE
1609
  _OP_REQP = ["node_name"]
1610

    
1611
  def BuildHooksEnv(self):
1612
    """Build hooks env.
1613

1614
    This will run on all nodes before, and on all nodes + the new node after.
1615

1616
    """
1617
    env = {
1618
      "OP_TARGET": self.op.node_name,
1619
      "NODE_NAME": self.op.node_name,
1620
      "NODE_PIP": self.op.primary_ip,
1621
      "NODE_SIP": self.op.secondary_ip,
1622
      }
1623
    nodes_0 = self.cfg.GetNodeList()
1624
    nodes_1 = nodes_0 + [self.op.node_name, ]
1625
    return env, nodes_0, nodes_1
1626

    
1627
  def CheckPrereq(self):
1628
    """Check prerequisites.
1629

1630
    This checks:
1631
     - the new node is not already in the config
1632
     - it is resolvable
1633
     - its parameters (single/dual homed) matches the cluster
1634

1635
    Any errors are signalled by raising errors.OpPrereqError.
1636

1637
    """
1638
    node_name = self.op.node_name
1639
    cfg = self.cfg
1640

    
1641
    dns_data = utils.HostInfo(node_name)
1642

    
1643
    node = dns_data.name
1644
    primary_ip = self.op.primary_ip = dns_data.ip
1645
    secondary_ip = getattr(self.op, "secondary_ip", None)
1646
    if secondary_ip is None:
1647
      secondary_ip = primary_ip
1648
    if not utils.IsValidIP(secondary_ip):
1649
      raise errors.OpPrereqError("Invalid secondary IP given")
1650
    self.op.secondary_ip = secondary_ip
1651

    
1652
    node_list = cfg.GetNodeList()
1653
    if not self.op.readd and node in node_list:
1654
      raise errors.OpPrereqError("Node %s is already in the configuration" %
1655
                                 node)
1656
    elif self.op.readd and node not in node_list:
1657
      raise errors.OpPrereqError("Node %s is not in the configuration" % node)
1658

    
1659
    for existing_node_name in node_list:
1660
      existing_node = cfg.GetNodeInfo(existing_node_name)
1661

    
1662
      if self.op.readd and node == existing_node_name:
1663
        if (existing_node.primary_ip != primary_ip or
1664
            existing_node.secondary_ip != secondary_ip):
1665
          raise errors.OpPrereqError("Readded node doesn't have the same IP"
1666
                                     " address configuration as before")
1667
        continue
1668

    
1669
      if (existing_node.primary_ip == primary_ip or
1670
          existing_node.secondary_ip == primary_ip or
1671
          existing_node.primary_ip == secondary_ip or
1672
          existing_node.secondary_ip == secondary_ip):
1673
        raise errors.OpPrereqError("New node ip address(es) conflict with"
1674
                                   " existing node %s" % existing_node.name)
1675

    
1676
    # check that the type of the node (single versus dual homed) is the
1677
    # same as for the master
1678
    myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1679
    master_singlehomed = myself.secondary_ip == myself.primary_ip
1680
    newbie_singlehomed = secondary_ip == primary_ip
1681
    if master_singlehomed != newbie_singlehomed:
1682
      if master_singlehomed:
1683
        raise errors.OpPrereqError("The master has no private ip but the"
1684
                                   " new node has one")
1685
      else:
1686
        raise errors.OpPrereqError("The master has a private ip but the"
1687
                                   " new node doesn't have one")
1688

    
1689
    # checks reachablity
1690
    if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1691
      raise errors.OpPrereqError("Node not reachable by ping")
1692

    
1693
    if not newbie_singlehomed:
1694
      # check reachability from my secondary ip to newbie's secondary ip
1695
      if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1696
                           source=myself.secondary_ip):
1697
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1698
                                   " based ping to noded port")
1699

    
1700
    self.new_node = objects.Node(name=node,
1701
                                 primary_ip=primary_ip,
1702
                                 secondary_ip=secondary_ip)
1703

    
1704
  def Exec(self, feedback_fn):
1705
    """Adds the new node to the cluster.
1706

1707
    """
1708
    new_node = self.new_node
1709
    node = new_node.name
1710

    
1711
    # check connectivity
1712
    result = rpc.call_version([node])[node]
1713
    if result:
1714
      if constants.PROTOCOL_VERSION == result:
1715
        logger.Info("communication to node %s fine, sw version %s match" %
1716
                    (node, result))
1717
      else:
1718
        raise errors.OpExecError("Version mismatch master version %s,"
1719
                                 " node version %s" %
1720
                                 (constants.PROTOCOL_VERSION, result))
1721
    else:
1722
      raise errors.OpExecError("Cannot get version from the new node")
1723

    
1724
    # setup ssh on node
1725
    logger.Info("copy ssh key to node %s" % node)
1726
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1727
    keyarray = []
1728
    keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1729
                constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1730
                priv_key, pub_key]
1731

    
1732
    for i in keyfiles:
1733
      f = open(i, 'r')
1734
      try:
1735
        keyarray.append(f.read())
1736
      finally:
1737
        f.close()
1738

    
1739
    result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1740
                               keyarray[3], keyarray[4], keyarray[5])
1741

    
1742
    if not result:
1743
      raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1744

    
1745
    # Add node to our /etc/hosts, and add key to known_hosts
1746
    utils.AddHostToEtcHosts(new_node.name)
1747

    
1748
    if new_node.secondary_ip != new_node.primary_ip:
1749
      if not rpc.call_node_tcp_ping(new_node.name,
1750
                                    constants.LOCALHOST_IP_ADDRESS,
1751
                                    new_node.secondary_ip,
1752
                                    constants.DEFAULT_NODED_PORT,
1753
                                    10, False):
1754
        raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1755
                                 " you gave (%s). Please fix and re-run this"
1756
                                 " command." % new_node.secondary_ip)
1757

    
1758
    node_verify_list = [self.sstore.GetMasterNode()]
1759
    node_verify_param = {
1760
      'nodelist': [node],
1761
      # TODO: do a node-net-test as well?
1762
    }
1763

    
1764
    result = rpc.call_node_verify(node_verify_list, node_verify_param)
1765
    for verifier in node_verify_list:
1766
      if not result[verifier]:
1767
        raise errors.OpExecError("Cannot communicate with %s's node daemon"
1768
                                 " for remote verification" % verifier)
1769
      if result[verifier]['nodelist']:
1770
        for failed in result[verifier]['nodelist']:
1771
          feedback_fn("ssh/hostname verification failed %s -> %s" %
1772
                      (verifier, result[verifier]['nodelist'][failed]))
1773
        raise errors.OpExecError("ssh/hostname verification failed.")
1774

    
1775
    # Distribute updated /etc/hosts and known_hosts to all nodes,
1776
    # including the node just added
1777
    myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1778
    dist_nodes = self.cfg.GetNodeList()
1779
    if not self.op.readd:
1780
      dist_nodes.append(node)
1781
    if myself.name in dist_nodes:
1782
      dist_nodes.remove(myself.name)
1783

    
1784
    logger.Debug("Copying hosts and known_hosts to all nodes")
1785
    for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1786
      result = rpc.call_upload_file(dist_nodes, fname)
1787
      for to_node in dist_nodes:
1788
        if not result[to_node]:
1789
          logger.Error("copy of file %s to node %s failed" %
1790
                       (fname, to_node))
1791

    
1792
    to_copy = self.sstore.GetFileList()
1793
    if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1794
      to_copy.append(constants.VNC_PASSWORD_FILE)
1795
    for fname in to_copy:
1796
      result = rpc.call_upload_file([node], fname)
1797
      if not result[node]:
1798
        logger.Error("could not copy file %s to node %s" % (fname, node))
1799

    
1800
    if self.op.readd:
1801
      self.context.ReaddNode(new_node)
1802
    else:
1803
      self.context.AddNode(new_node)
1804

    
1805

    
1806
class LUQueryClusterInfo(NoHooksLU):
1807
  """Query cluster configuration.
1808

1809
  """
1810
  _OP_REQP = []
1811
  REQ_MASTER = False
1812
  REQ_BGL = False
1813

    
1814
  def ExpandNames(self):
1815
    self.needed_locks = {}
1816

    
1817
  def CheckPrereq(self):
1818
    """No prerequsites needed for this LU.
1819

1820
    """
1821
    pass
1822

    
1823
  def Exec(self, feedback_fn):
1824
    """Return cluster config.
1825

1826
    """
1827
    result = {
1828
      "name": self.sstore.GetClusterName(),
1829
      "software_version": constants.RELEASE_VERSION,
1830
      "protocol_version": constants.PROTOCOL_VERSION,
1831
      "config_version": constants.CONFIG_VERSION,
1832
      "os_api_version": constants.OS_API_VERSION,
1833
      "export_version": constants.EXPORT_VERSION,
1834
      "master": self.sstore.GetMasterNode(),
1835
      "architecture": (platform.architecture()[0], platform.machine()),
1836
      "hypervisor_type": self.sstore.GetHypervisorType(),
1837
      }
1838

    
1839
    return result
1840

    
1841

    
1842
class LUDumpClusterConfig(NoHooksLU):
1843
  """Return a text-representation of the cluster-config.
1844

1845
  """
1846
  _OP_REQP = []
1847
  REQ_BGL = False
1848

    
1849
  def ExpandNames(self):
1850
    self.needed_locks = {}
1851

    
1852
  def CheckPrereq(self):
1853
    """No prerequisites.
1854

1855
    """
1856
    pass
1857

    
1858
  def Exec(self, feedback_fn):
1859
    """Dump a representation of the cluster config to the standard output.
1860

1861
    """
1862
    return self.cfg.DumpConfig()
1863

    
1864

    
1865
class LUActivateInstanceDisks(NoHooksLU):
1866
  """Bring up an instance's disks.
1867

1868
  """
1869
  _OP_REQP = ["instance_name"]
1870
  REQ_BGL = False
1871

    
1872
  def ExpandNames(self):
1873
    self._ExpandAndLockInstance()
1874
    self.needed_locks[locking.LEVEL_NODE] = []
1875
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1876

    
1877
  def DeclareLocks(self, level):
1878
    if level == locking.LEVEL_NODE:
1879
      self._LockInstancesNodes()
1880

    
1881
  def CheckPrereq(self):
1882
    """Check prerequisites.
1883

1884
    This checks that the instance is in the cluster.
1885

1886
    """
1887
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
1888
    assert self.instance is not None, \
1889
      "Cannot retrieve locked instance %s" % self.op.instance_name
1890

    
1891
  def Exec(self, feedback_fn):
1892
    """Activate the disks.
1893

1894
    """
1895
    disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
1896
    if not disks_ok:
1897
      raise errors.OpExecError("Cannot activate block devices")
1898

    
1899
    return disks_info
1900

    
1901

    
1902
def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
1903
  """Prepare the block devices for an instance.
1904

1905
  This sets up the block devices on all nodes.
1906

1907
  Args:
1908
    instance: a ganeti.objects.Instance object
1909
    ignore_secondaries: if true, errors on secondary nodes won't result
1910
                        in an error return from the function
1911

1912
  Returns:
1913
    false if the operation failed
1914
    list of (host, instance_visible_name, node_visible_name) if the operation
1915
         suceeded with the mapping from node devices to instance devices
1916
  """
1917
  device_info = []
1918
  disks_ok = True
1919
  iname = instance.name
1920
  # With the two passes mechanism we try to reduce the window of
1921
  # opportunity for the race condition of switching DRBD to primary
1922
  # before handshaking occured, but we do not eliminate it
1923

    
1924
  # The proper fix would be to wait (with some limits) until the
1925
  # connection has been made and drbd transitions from WFConnection
1926
  # into any other network-connected state (Connected, SyncTarget,
1927
  # SyncSource, etc.)
1928

    
1929
  # 1st pass, assemble on all nodes in secondary mode
1930
  for inst_disk in instance.disks:
1931
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1932
      cfg.SetDiskID(node_disk, node)
1933
      result = rpc.call_blockdev_assemble(node, node_disk, iname, False)
1934
      if not result:
1935
        logger.Error("could not prepare block device %s on node %s"
1936
                     " (is_primary=False, pass=1)" % (inst_disk.iv_name, node))
1937
        if not ignore_secondaries:
1938
          disks_ok = False
1939

    
1940
  # FIXME: race condition on drbd migration to primary
1941

    
1942
  # 2nd pass, do only the primary node
1943
  for inst_disk in instance.disks:
1944
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1945
      if node != instance.primary_node:
1946
        continue
1947
      cfg.SetDiskID(node_disk, node)
1948
      result = rpc.call_blockdev_assemble(node, node_disk, iname, True)
1949
      if not result:
1950
        logger.Error("could not prepare block device %s on node %s"
1951
                     " (is_primary=True, pass=2)" % (inst_disk.iv_name, node))
1952
        disks_ok = False
1953
    device_info.append((instance.primary_node, inst_disk.iv_name, result))
1954

    
1955
  # leave the disks configured for the primary node
1956
  # this is a workaround that would be fixed better by
1957
  # improving the logical/physical id handling
1958
  for disk in instance.disks:
1959
    cfg.SetDiskID(disk, instance.primary_node)
1960

    
1961
  return disks_ok, device_info
1962

    
1963

    
1964
def _StartInstanceDisks(cfg, instance, force):
1965
  """Start the disks of an instance.
1966

1967
  """
1968
  disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
1969
                                           ignore_secondaries=force)
1970
  if not disks_ok:
1971
    _ShutdownInstanceDisks(instance, cfg)
1972
    if force is not None and not force:
1973
      logger.Error("If the message above refers to a secondary node,"
1974
                   " you can retry the operation using '--force'.")
1975
    raise errors.OpExecError("Disk consistency error")
1976

    
1977

    
1978
class LUDeactivateInstanceDisks(NoHooksLU):
1979
  """Shutdown an instance's disks.
1980

1981
  """
1982
  _OP_REQP = ["instance_name"]
1983
  REQ_BGL = False
1984

    
1985
  def ExpandNames(self):
1986
    self._ExpandAndLockInstance()
1987
    self.needed_locks[locking.LEVEL_NODE] = []
1988
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1989

    
1990
  def DeclareLocks(self, level):
1991
    if level == locking.LEVEL_NODE:
1992
      self._LockInstancesNodes()
1993

    
1994
  def CheckPrereq(self):
1995
    """Check prerequisites.
1996

1997
    This checks that the instance is in the cluster.
1998

1999
    """
2000
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2001
    assert self.instance is not None, \
2002
      "Cannot retrieve locked instance %s" % self.op.instance_name
2003

    
2004
  def Exec(self, feedback_fn):
2005
    """Deactivate the disks
2006

2007
    """
2008
    instance = self.instance
2009
    _SafeShutdownInstanceDisks(instance, self.cfg)
2010

    
2011

    
2012
def _SafeShutdownInstanceDisks(instance, cfg):
2013
  """Shutdown block devices of an instance.
2014

2015
  This function checks if an instance is running, before calling
2016
  _ShutdownInstanceDisks.
2017

2018
  """
2019
  ins_l = rpc.call_instance_list([instance.primary_node])
2020
  ins_l = ins_l[instance.primary_node]
2021
  if not type(ins_l) is list:
2022
    raise errors.OpExecError("Can't contact node '%s'" %
2023
                             instance.primary_node)
2024

    
2025
  if instance.name in ins_l:
2026
    raise errors.OpExecError("Instance is running, can't shutdown"
2027
                             " block devices.")
2028

    
2029
  _ShutdownInstanceDisks(instance, cfg)
2030

    
2031

    
2032
def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
2033
  """Shutdown block devices of an instance.
2034

2035
  This does the shutdown on all nodes of the instance.
2036

2037
  If the ignore_primary is false, errors on the primary node are
2038
  ignored.
2039

2040
  """
2041
  result = True
2042
  for disk in instance.disks:
2043
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2044
      cfg.SetDiskID(top_disk, node)
2045
      if not rpc.call_blockdev_shutdown(node, top_disk):
2046
        logger.Error("could not shutdown block device %s on node %s" %
2047
                     (disk.iv_name, node))
2048
        if not ignore_primary or node != instance.primary_node:
2049
          result = False
2050
  return result
2051

    
2052

    
2053
def _CheckNodeFreeMemory(cfg, node, reason, requested):
2054
  """Checks if a node has enough free memory.
2055

2056
  This function check if a given node has the needed amount of free
2057
  memory. In case the node has less memory or we cannot get the
2058
  information from the node, this function raise an OpPrereqError
2059
  exception.
2060

2061
  Args:
2062
    - cfg: a ConfigWriter instance
2063
    - node: the node name
2064
    - reason: string to use in the error message
2065
    - requested: the amount of memory in MiB
2066

2067
  """
2068
  nodeinfo = rpc.call_node_info([node], cfg.GetVGName())
2069
  if not nodeinfo or not isinstance(nodeinfo, dict):
2070
    raise errors.OpPrereqError("Could not contact node %s for resource"
2071
                             " information" % (node,))
2072

    
2073
  free_mem = nodeinfo[node].get('memory_free')
2074
  if not isinstance(free_mem, int):
2075
    raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2076
                             " was '%s'" % (node, free_mem))
2077
  if requested > free_mem:
2078
    raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2079
                             " needed %s MiB, available %s MiB" %
2080
                             (node, reason, requested, free_mem))
2081

    
2082

    
2083
class LUStartupInstance(LogicalUnit):
2084
  """Starts an instance.
2085

2086
  """
2087
  HPATH = "instance-start"
2088
  HTYPE = constants.HTYPE_INSTANCE
2089
  _OP_REQP = ["instance_name", "force"]
2090
  REQ_BGL = False
2091

    
2092
  def ExpandNames(self):
2093
    self._ExpandAndLockInstance()
2094
    self.needed_locks[locking.LEVEL_NODE] = []
2095
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2096

    
2097
  def DeclareLocks(self, level):
2098
    if level == locking.LEVEL_NODE:
2099
      self._LockInstancesNodes()
2100

    
2101
  def BuildHooksEnv(self):
2102
    """Build hooks env.
2103

2104
    This runs on master, primary and secondary nodes of the instance.
2105

2106
    """
2107
    env = {
2108
      "FORCE": self.op.force,
2109
      }
2110
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2111
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2112
          list(self.instance.secondary_nodes))
2113
    return env, nl, nl
2114

    
2115
  def CheckPrereq(self):
2116
    """Check prerequisites.
2117

2118
    This checks that the instance is in the cluster.
2119

2120
    """
2121
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2122
    assert self.instance is not None, \
2123
      "Cannot retrieve locked instance %s" % self.op.instance_name
2124

    
2125
    # check bridges existance
2126
    _CheckInstanceBridgesExist(instance)
2127

    
2128
    _CheckNodeFreeMemory(self.cfg, instance.primary_node,
2129
                         "starting instance %s" % instance.name,
2130
                         instance.memory)
2131

    
2132
  def Exec(self, feedback_fn):
2133
    """Start the instance.
2134

2135
    """
2136
    instance = self.instance
2137
    force = self.op.force
2138
    extra_args = getattr(self.op, "extra_args", "")
2139

    
2140
    self.cfg.MarkInstanceUp(instance.name)
2141

    
2142
    node_current = instance.primary_node
2143

    
2144
    _StartInstanceDisks(self.cfg, instance, force)
2145

    
2146
    if not rpc.call_instance_start(node_current, instance, extra_args):
2147
      _ShutdownInstanceDisks(instance, self.cfg)
2148
      raise errors.OpExecError("Could not start instance")
2149

    
2150

    
2151
class LURebootInstance(LogicalUnit):
2152
  """Reboot an instance.
2153

2154
  """
2155
  HPATH = "instance-reboot"
2156
  HTYPE = constants.HTYPE_INSTANCE
2157
  _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2158
  REQ_BGL = False
2159

    
2160
  def ExpandNames(self):
2161
    if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2162
                                   constants.INSTANCE_REBOOT_HARD,
2163
                                   constants.INSTANCE_REBOOT_FULL]:
2164
      raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2165
                                  (constants.INSTANCE_REBOOT_SOFT,
2166
                                   constants.INSTANCE_REBOOT_HARD,
2167
                                   constants.INSTANCE_REBOOT_FULL))
2168
    self._ExpandAndLockInstance()
2169
    self.needed_locks[locking.LEVEL_NODE] = []
2170
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2171

    
2172
  def DeclareLocks(self, level):
2173
    if level == locking.LEVEL_NODE:
2174
      primary_only = not constants.INSTANCE_REBOOT_FULL
2175
      self._LockInstancesNodes(primary_only=primary_only)
2176

    
2177
  def BuildHooksEnv(self):
2178
    """Build hooks env.
2179

2180
    This runs on master, primary and secondary nodes of the instance.
2181

2182
    """
2183
    env = {
2184
      "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2185
      }
2186
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2187
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2188
          list(self.instance.secondary_nodes))
2189
    return env, nl, nl
2190

    
2191
  def CheckPrereq(self):
2192
    """Check prerequisites.
2193

2194
    This checks that the instance is in the cluster.
2195

2196
    """
2197
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2198
    assert self.instance is not None, \
2199
      "Cannot retrieve locked instance %s" % self.op.instance_name
2200

    
2201
    # check bridges existance
2202
    _CheckInstanceBridgesExist(instance)
2203

    
2204
  def Exec(self, feedback_fn):
2205
    """Reboot the instance.
2206

2207
    """
2208
    instance = self.instance
2209
    ignore_secondaries = self.op.ignore_secondaries
2210
    reboot_type = self.op.reboot_type
2211
    extra_args = getattr(self.op, "extra_args", "")
2212

    
2213
    node_current = instance.primary_node
2214

    
2215
    if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2216
                       constants.INSTANCE_REBOOT_HARD]:
2217
      if not rpc.call_instance_reboot(node_current, instance,
2218
                                      reboot_type, extra_args):
2219
        raise errors.OpExecError("Could not reboot instance")
2220
    else:
2221
      if not rpc.call_instance_shutdown(node_current, instance):
2222
        raise errors.OpExecError("could not shutdown instance for full reboot")
2223
      _ShutdownInstanceDisks(instance, self.cfg)
2224
      _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
2225
      if not rpc.call_instance_start(node_current, instance, extra_args):
2226
        _ShutdownInstanceDisks(instance, self.cfg)
2227
        raise errors.OpExecError("Could not start instance for full reboot")
2228

    
2229
    self.cfg.MarkInstanceUp(instance.name)
2230

    
2231

    
2232
class LUShutdownInstance(LogicalUnit):
2233
  """Shutdown an instance.
2234

2235
  """
2236
  HPATH = "instance-stop"
2237
  HTYPE = constants.HTYPE_INSTANCE
2238
  _OP_REQP = ["instance_name"]
2239
  REQ_BGL = False
2240

    
2241
  def ExpandNames(self):
2242
    self._ExpandAndLockInstance()
2243
    self.needed_locks[locking.LEVEL_NODE] = []
2244
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2245

    
2246
  def DeclareLocks(self, level):
2247
    if level == locking.LEVEL_NODE:
2248
      self._LockInstancesNodes()
2249

    
2250
  def BuildHooksEnv(self):
2251
    """Build hooks env.
2252

2253
    This runs on master, primary and secondary nodes of the instance.
2254

2255
    """
2256
    env = _BuildInstanceHookEnvByObject(self.instance)
2257
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2258
          list(self.instance.secondary_nodes))
2259
    return env, nl, nl
2260

    
2261
  def CheckPrereq(self):
2262
    """Check prerequisites.
2263

2264
    This checks that the instance is in the cluster.
2265

2266
    """
2267
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2268
    assert self.instance is not None, \
2269
      "Cannot retrieve locked instance %s" % self.op.instance_name
2270

    
2271
  def Exec(self, feedback_fn):
2272
    """Shutdown the instance.
2273

2274
    """
2275
    instance = self.instance
2276
    node_current = instance.primary_node
2277
    self.cfg.MarkInstanceDown(instance.name)
2278
    if not rpc.call_instance_shutdown(node_current, instance):
2279
      logger.Error("could not shutdown instance")
2280

    
2281
    _ShutdownInstanceDisks(instance, self.cfg)
2282

    
2283

    
2284
class LUReinstallInstance(LogicalUnit):
2285
  """Reinstall an instance.
2286

2287
  """
2288
  HPATH = "instance-reinstall"
2289
  HTYPE = constants.HTYPE_INSTANCE
2290
  _OP_REQP = ["instance_name"]
2291
  REQ_BGL = False
2292

    
2293
  def ExpandNames(self):
2294
    self._ExpandAndLockInstance()
2295
    self.needed_locks[locking.LEVEL_NODE] = []
2296
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2297

    
2298
  def DeclareLocks(self, level):
2299
    if level == locking.LEVEL_NODE:
2300
      self._LockInstancesNodes()
2301

    
2302
  def BuildHooksEnv(self):
2303
    """Build hooks env.
2304

2305
    This runs on master, primary and secondary nodes of the instance.
2306

2307
    """
2308
    env = _BuildInstanceHookEnvByObject(self.instance)
2309
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2310
          list(self.instance.secondary_nodes))
2311
    return env, nl, nl
2312

    
2313
  def CheckPrereq(self):
2314
    """Check prerequisites.
2315

2316
    This checks that the instance is in the cluster and is not running.
2317

2318
    """
2319
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2320
    assert instance is not None, \
2321
      "Cannot retrieve locked instance %s" % self.op.instance_name
2322

    
2323
    if instance.disk_template == constants.DT_DISKLESS:
2324
      raise errors.OpPrereqError("Instance '%s' has no disks" %
2325
                                 self.op.instance_name)
2326
    if instance.status != "down":
2327
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2328
                                 self.op.instance_name)
2329
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2330
    if remote_info:
2331
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2332
                                 (self.op.instance_name,
2333
                                  instance.primary_node))
2334

    
2335
    self.op.os_type = getattr(self.op, "os_type", None)
2336
    if self.op.os_type is not None:
2337
      # OS verification
2338
      pnode = self.cfg.GetNodeInfo(
2339
        self.cfg.ExpandNodeName(instance.primary_node))
2340
      if pnode is None:
2341
        raise errors.OpPrereqError("Primary node '%s' is unknown" %
2342
                                   self.op.pnode)
2343
      os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2344
      if not os_obj:
2345
        raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2346
                                   " primary node"  % self.op.os_type)
2347

    
2348
    self.instance = instance
2349

    
2350
  def Exec(self, feedback_fn):
2351
    """Reinstall the instance.
2352

2353
    """
2354
    inst = self.instance
2355

    
2356
    if self.op.os_type is not None:
2357
      feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2358
      inst.os = self.op.os_type
2359
      self.cfg.AddInstance(inst)
2360

    
2361
    _StartInstanceDisks(self.cfg, inst, None)
2362
    try:
2363
      feedback_fn("Running the instance OS create scripts...")
2364
      if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2365
        raise errors.OpExecError("Could not install OS for instance %s"
2366
                                 " on node %s" %
2367
                                 (inst.name, inst.primary_node))
2368
    finally:
2369
      _ShutdownInstanceDisks(inst, self.cfg)
2370

    
2371

    
2372
class LURenameInstance(LogicalUnit):
2373
  """Rename an instance.
2374

2375
  """
2376
  HPATH = "instance-rename"
2377
  HTYPE = constants.HTYPE_INSTANCE
2378
  _OP_REQP = ["instance_name", "new_name"]
2379

    
2380
  def BuildHooksEnv(self):
2381
    """Build hooks env.
2382

2383
    This runs on master, primary and secondary nodes of the instance.
2384

2385
    """
2386
    env = _BuildInstanceHookEnvByObject(self.instance)
2387
    env["INSTANCE_NEW_NAME"] = self.op.new_name
2388
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2389
          list(self.instance.secondary_nodes))
2390
    return env, nl, nl
2391

    
2392
  def CheckPrereq(self):
2393
    """Check prerequisites.
2394

2395
    This checks that the instance is in the cluster and is not running.
2396

2397
    """
2398
    instance = self.cfg.GetInstanceInfo(
2399
      self.cfg.ExpandInstanceName(self.op.instance_name))
2400
    if instance is None:
2401
      raise errors.OpPrereqError("Instance '%s' not known" %
2402
                                 self.op.instance_name)
2403
    if instance.status != "down":
2404
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2405
                                 self.op.instance_name)
2406
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2407
    if remote_info:
2408
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2409
                                 (self.op.instance_name,
2410
                                  instance.primary_node))
2411
    self.instance = instance
2412

    
2413
    # new name verification
2414
    name_info = utils.HostInfo(self.op.new_name)
2415

    
2416
    self.op.new_name = new_name = name_info.name
2417
    instance_list = self.cfg.GetInstanceList()
2418
    if new_name in instance_list:
2419
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2420
                                 new_name)
2421

    
2422
    if not getattr(self.op, "ignore_ip", False):
2423
      if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
2424
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
2425
                                   (name_info.ip, new_name))
2426

    
2427

    
2428
  def Exec(self, feedback_fn):
2429
    """Reinstall the instance.
2430

2431
    """
2432
    inst = self.instance
2433
    old_name = inst.name
2434

    
2435
    if inst.disk_template == constants.DT_FILE:
2436
      old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2437

    
2438
    self.cfg.RenameInstance(inst.name, self.op.new_name)
2439
    # Change the instance lock. This is definitely safe while we hold the BGL
2440
    self.context.glm.remove(locking.LEVEL_INSTANCE, inst.name)
2441
    self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
2442

    
2443
    # re-read the instance from the configuration after rename
2444
    inst = self.cfg.GetInstanceInfo(self.op.new_name)
2445

    
2446
    if inst.disk_template == constants.DT_FILE:
2447
      new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2448
      result = rpc.call_file_storage_dir_rename(inst.primary_node,
2449
                                                old_file_storage_dir,
2450
                                                new_file_storage_dir)
2451

    
2452
      if not result:
2453
        raise errors.OpExecError("Could not connect to node '%s' to rename"
2454
                                 " directory '%s' to '%s' (but the instance"
2455
                                 " has been renamed in Ganeti)" % (
2456
                                 inst.primary_node, old_file_storage_dir,
2457
                                 new_file_storage_dir))
2458

    
2459
      if not result[0]:
2460
        raise errors.OpExecError("Could not rename directory '%s' to '%s'"
2461
                                 " (but the instance has been renamed in"
2462
                                 " Ganeti)" % (old_file_storage_dir,
2463
                                               new_file_storage_dir))
2464

    
2465
    _StartInstanceDisks(self.cfg, inst, None)
2466
    try:
2467
      if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2468
                                          "sda", "sdb"):
2469
        msg = ("Could not run OS rename script for instance %s on node %s"
2470
               " (but the instance has been renamed in Ganeti)" %
2471
               (inst.name, inst.primary_node))
2472
        logger.Error(msg)
2473
    finally:
2474
      _ShutdownInstanceDisks(inst, self.cfg)
2475

    
2476

    
2477
class LURemoveInstance(LogicalUnit):
2478
  """Remove an instance.
2479

2480
  """
2481
  HPATH = "instance-remove"
2482
  HTYPE = constants.HTYPE_INSTANCE
2483
  _OP_REQP = ["instance_name", "ignore_failures"]
2484

    
2485
  def BuildHooksEnv(self):
2486
    """Build hooks env.
2487

2488
    This runs on master, primary and secondary nodes of the instance.
2489

2490
    """
2491
    env = _BuildInstanceHookEnvByObject(self.instance)
2492
    nl = [self.sstore.GetMasterNode()]
2493
    return env, nl, nl
2494

    
2495
  def CheckPrereq(self):
2496
    """Check prerequisites.
2497

2498
    This checks that the instance is in the cluster.
2499

2500
    """
2501
    instance = self.cfg.GetInstanceInfo(
2502
      self.cfg.ExpandInstanceName(self.op.instance_name))
2503
    if instance is None:
2504
      raise errors.OpPrereqError("Instance '%s' not known" %
2505
                                 self.op.instance_name)
2506
    self.instance = instance
2507

    
2508
  def Exec(self, feedback_fn):
2509
    """Remove the instance.
2510

2511
    """
2512
    instance = self.instance
2513
    logger.Info("shutting down instance %s on node %s" %
2514
                (instance.name, instance.primary_node))
2515

    
2516
    if not rpc.call_instance_shutdown(instance.primary_node, instance):
2517
      if self.op.ignore_failures:
2518
        feedback_fn("Warning: can't shutdown instance")
2519
      else:
2520
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2521
                                 (instance.name, instance.primary_node))
2522

    
2523
    logger.Info("removing block devices for instance %s" % instance.name)
2524

    
2525
    if not _RemoveDisks(instance, self.cfg):
2526
      if self.op.ignore_failures:
2527
        feedback_fn("Warning: can't remove instance's disks")
2528
      else:
2529
        raise errors.OpExecError("Can't remove instance's disks")
2530

    
2531
    logger.Info("removing instance %s out of cluster config" % instance.name)
2532

    
2533
    self.cfg.RemoveInstance(instance.name)
2534
    # Remove the new instance from the Ganeti Lock Manager
2535
    self.context.glm.remove(locking.LEVEL_INSTANCE, instance.name)
2536

    
2537

    
2538
class LUQueryInstances(NoHooksLU):
2539
  """Logical unit for querying instances.
2540

2541
  """
2542
  _OP_REQP = ["output_fields", "names"]
2543
  REQ_BGL = False
2544

    
2545
  def ExpandNames(self):
2546
    self.dynamic_fields = frozenset(["oper_state", "oper_ram", "status"])
2547
    self.static_fields = frozenset([
2548
      "name", "os", "pnode", "snodes",
2549
      "admin_state", "admin_ram",
2550
      "disk_template", "ip", "mac", "bridge",
2551
      "sda_size", "sdb_size", "vcpus", "tags",
2552
      "auto_balance",
2553
      "network_port", "kernel_path", "initrd_path",
2554
      "hvm_boot_order", "hvm_acpi", "hvm_pae",
2555
      "hvm_cdrom_image_path", "hvm_nic_type",
2556
      "hvm_disk_type", "vnc_bind_address",
2557
      ])
2558
    _CheckOutputFields(static=self.static_fields,
2559
                       dynamic=self.dynamic_fields,
2560
                       selected=self.op.output_fields)
2561

    
2562
    self.needed_locks = {}
2563
    self.share_locks[locking.LEVEL_INSTANCE] = 1
2564
    self.share_locks[locking.LEVEL_NODE] = 1
2565

    
2566
    if self.op.names:
2567
      self.wanted = _GetWantedInstances(self, self.op.names)
2568
    else:
2569
      self.wanted = locking.ALL_SET
2570

    
2571
    self.do_locking = not self.static_fields.issuperset(self.op.output_fields)
2572
    if self.do_locking:
2573
      self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
2574
      self.needed_locks[locking.LEVEL_NODE] = []
2575
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2576

    
2577
  def DeclareLocks(self, level):
2578
    if level == locking.LEVEL_NODE and self.do_locking:
2579
      self._LockInstancesNodes()
2580

    
2581
  def CheckPrereq(self):
2582
    """Check prerequisites.
2583

2584
    """
2585
    pass
2586

    
2587
  def Exec(self, feedback_fn):
2588
    """Computes the list of nodes and their attributes.
2589

2590
    """
2591
    all_info = self.cfg.GetAllInstancesInfo()
2592
    if self.do_locking:
2593
      instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
2594
    else:
2595
      instance_names = all_info.keys()
2596
    instance_list = [all_info[iname] for iname in instance_names]
2597

    
2598
    # begin data gathering
2599

    
2600
    nodes = frozenset([inst.primary_node for inst in instance_list])
2601

    
2602
    bad_nodes = []
2603
    if self.dynamic_fields.intersection(self.op.output_fields):
2604
      live_data = {}
2605
      node_data = rpc.call_all_instances_info(nodes)
2606
      for name in nodes:
2607
        result = node_data[name]
2608
        if result:
2609
          live_data.update(result)
2610
        elif result == False:
2611
          bad_nodes.append(name)
2612
        # else no instance is alive
2613
    else:
2614
      live_data = dict([(name, {}) for name in instance_names])
2615

    
2616
    # end data gathering
2617

    
2618
    output = []
2619
    for instance in instance_list:
2620
      iout = []
2621
      for field in self.op.output_fields:
2622
        if field == "name":
2623
          val = instance.name
2624
        elif field == "os":
2625
          val = instance.os
2626
        elif field == "pnode":
2627
          val = instance.primary_node
2628
        elif field == "snodes":
2629
          val = list(instance.secondary_nodes)
2630
        elif field == "admin_state":
2631
          val = (instance.status != "down")
2632
        elif field == "oper_state":
2633
          if instance.primary_node in bad_nodes:
2634
            val = None
2635
          else:
2636
            val = bool(live_data.get(instance.name))
2637
        elif field == "status":
2638
          if instance.primary_node in bad_nodes:
2639
            val = "ERROR_nodedown"
2640
          else:
2641
            running = bool(live_data.get(instance.name))
2642
            if running:
2643
              if instance.status != "down":
2644
                val = "running"
2645
              else:
2646
                val = "ERROR_up"
2647
            else:
2648
              if instance.status != "down":
2649
                val = "ERROR_down"
2650
              else:
2651
                val = "ADMIN_down"
2652
        elif field == "admin_ram":
2653
          val = instance.memory
2654
        elif field == "oper_ram":
2655
          if instance.primary_node in bad_nodes:
2656
            val = None
2657
          elif instance.name in live_data:
2658
            val = live_data[instance.name].get("memory", "?")
2659
          else:
2660
            val = "-"
2661
        elif field == "disk_template":
2662
          val = instance.disk_template
2663
        elif field == "ip":
2664
          val = instance.nics[0].ip
2665
        elif field == "bridge":
2666
          val = instance.nics[0].bridge
2667
        elif field == "mac":
2668
          val = instance.nics[0].mac
2669
        elif field == "sda_size" or field == "sdb_size":
2670
          disk = instance.FindDisk(field[:3])
2671
          if disk is None:
2672
            val = None
2673
          else:
2674
            val = disk.size
2675
        elif field == "vcpus":
2676
          val = instance.vcpus
2677
        elif field == "tags":
2678
          val = list(instance.GetTags())
2679
        elif field in ("network_port", "kernel_path", "initrd_path",
2680
                       "hvm_boot_order", "hvm_acpi", "hvm_pae",
2681
                       "hvm_cdrom_image_path", "hvm_nic_type",
2682
                       "hvm_disk_type", "vnc_bind_address"):
2683
          val = getattr(instance, field, None)
2684
          if val is not None:
2685
            pass
2686
          elif field in ("hvm_nic_type", "hvm_disk_type",
2687
                         "kernel_path", "initrd_path"):
2688
            val = "default"
2689
          else:
2690
            val = "-"
2691
        else:
2692
          raise errors.ParameterError(field)
2693
        iout.append(val)
2694
      output.append(iout)
2695

    
2696
    return output
2697

    
2698

    
2699
class LUFailoverInstance(LogicalUnit):
2700
  """Failover an instance.
2701

2702
  """
2703
  HPATH = "instance-failover"
2704
  HTYPE = constants.HTYPE_INSTANCE
2705
  _OP_REQP = ["instance_name", "ignore_consistency"]
2706
  REQ_BGL = False
2707

    
2708
  def ExpandNames(self):
2709
    self._ExpandAndLockInstance()
2710
    self.needed_locks[locking.LEVEL_NODE] = []
2711
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2712

    
2713
  def DeclareLocks(self, level):
2714
    if level == locking.LEVEL_NODE:
2715
      self._LockInstancesNodes()
2716

    
2717
  def BuildHooksEnv(self):
2718
    """Build hooks env.
2719

2720
    This runs on master, primary and secondary nodes of the instance.
2721

2722
    """
2723
    env = {
2724
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2725
      }
2726
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2727
    nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2728
    return env, nl, nl
2729

    
2730
  def CheckPrereq(self):
2731
    """Check prerequisites.
2732

2733
    This checks that the instance is in the cluster.
2734

2735
    """
2736
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2737
    assert self.instance is not None, \
2738
      "Cannot retrieve locked instance %s" % self.op.instance_name
2739

    
2740
    if instance.disk_template not in constants.DTS_NET_MIRROR:
2741
      raise errors.OpPrereqError("Instance's disk layout is not"
2742
                                 " network mirrored, cannot failover.")
2743

    
2744
    secondary_nodes = instance.secondary_nodes
2745
    if not secondary_nodes:
2746
      raise errors.ProgrammerError("no secondary node but using "
2747
                                   "a mirrored disk template")
2748

    
2749
    target_node = secondary_nodes[0]
2750
    # check memory requirements on the secondary node
2751
    _CheckNodeFreeMemory(self.cfg, target_node, "failing over instance %s" %
2752
                         instance.name, instance.memory)
2753

    
2754
    # check bridge existance
2755
    brlist = [nic.bridge for nic in instance.nics]
2756
    if not rpc.call_bridges_exist(target_node, brlist):
2757
      raise errors.OpPrereqError("One or more target bridges %s does not"
2758
                                 " exist on destination node '%s'" %
2759
                                 (brlist, target_node))
2760

    
2761
  def Exec(self, feedback_fn):
2762
    """Failover an instance.
2763

2764
    The failover is done by shutting it down on its present node and
2765
    starting it on the secondary.
2766

2767
    """
2768
    instance = self.instance
2769

    
2770
    source_node = instance.primary_node
2771
    target_node = instance.secondary_nodes[0]
2772

    
2773
    feedback_fn("* checking disk consistency between source and target")
2774
    for dev in instance.disks:
2775
      # for drbd, these are drbd over lvm
2776
      if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2777
        if instance.status == "up" and not self.op.ignore_consistency:
2778
          raise errors.OpExecError("Disk %s is degraded on target node,"
2779
                                   " aborting failover." % dev.iv_name)
2780

    
2781
    feedback_fn("* shutting down instance on source node")
2782
    logger.Info("Shutting down instance %s on node %s" %
2783
                (instance.name, source_node))
2784

    
2785
    if not rpc.call_instance_shutdown(source_node, instance):
2786
      if self.op.ignore_consistency:
2787
        logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2788
                     " anyway. Please make sure node %s is down"  %
2789
                     (instance.name, source_node, source_node))
2790
      else:
2791
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2792
                                 (instance.name, source_node))
2793

    
2794
    feedback_fn("* deactivating the instance's disks on source node")
2795
    if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2796
      raise errors.OpExecError("Can't shut down the instance's disks.")
2797

    
2798
    instance.primary_node = target_node
2799
    # distribute new instance config to the other nodes
2800
    self.cfg.Update(instance)
2801

    
2802
    # Only start the instance if it's marked as up
2803
    if instance.status == "up":
2804
      feedback_fn("* activating the instance's disks on target node")
2805
      logger.Info("Starting instance %s on node %s" %
2806
                  (instance.name, target_node))
2807

    
2808
      disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2809
                                               ignore_secondaries=True)
2810
      if not disks_ok:
2811
        _ShutdownInstanceDisks(instance, self.cfg)
2812
        raise errors.OpExecError("Can't activate the instance's disks")
2813

    
2814
      feedback_fn("* starting the instance on the target node")
2815
      if not rpc.call_instance_start(target_node, instance, None):
2816
        _ShutdownInstanceDisks(instance, self.cfg)
2817
        raise errors.OpExecError("Could not start instance %s on node %s." %
2818
                                 (instance.name, target_node))
2819

    
2820

    
2821
def _CreateBlockDevOnPrimary(cfg, node, instance, device, info):
2822
  """Create a tree of block devices on the primary node.
2823

2824
  This always creates all devices.
2825

2826
  """
2827
  if device.children:
2828
    for child in device.children:
2829
      if not _CreateBlockDevOnPrimary(cfg, node, instance, child, info):
2830
        return False
2831

    
2832
  cfg.SetDiskID(device, node)
2833
  new_id = rpc.call_blockdev_create(node, device, device.size,
2834
                                    instance.name, True, info)
2835
  if not new_id:
2836
    return False
2837
  if device.physical_id is None:
2838
    device.physical_id = new_id
2839
  return True
2840

    
2841

    
2842
def _CreateBlockDevOnSecondary(cfg, node, instance, device, force, info):
2843
  """Create a tree of block devices on a secondary node.
2844

2845
  If this device type has to be created on secondaries, create it and
2846
  all its children.
2847

2848
  If not, just recurse to children keeping the same 'force' value.
2849

2850
  """
2851
  if device.CreateOnSecondary():
2852
    force = True
2853
  if device.children:
2854
    for child in device.children:
2855
      if not _CreateBlockDevOnSecondary(cfg, node, instance,
2856
                                        child, force, info):
2857
        return False
2858

    
2859
  if not force:
2860
    return True
2861
  cfg.SetDiskID(device, node)
2862
  new_id = rpc.call_blockdev_create(node, device, device.size,
2863
                                    instance.name, False, info)
2864
  if not new_id:
2865
    return False
2866
  if device.physical_id is None:
2867
    device.physical_id = new_id
2868
  return True
2869

    
2870

    
2871
def _GenerateUniqueNames(cfg, exts):
2872
  """Generate a suitable LV name.
2873

2874
  This will generate a logical volume name for the given instance.
2875

2876
  """
2877
  results = []
2878
  for val in exts:
2879
    new_id = cfg.GenerateUniqueID()
2880
    results.append("%s%s" % (new_id, val))
2881
  return results
2882

    
2883

    
2884
def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name):
2885
  """Generate a drbd8 device complete with its children.
2886

2887
  """
2888
  port = cfg.AllocatePort()
2889
  vgname = cfg.GetVGName()
2890
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2891
                          logical_id=(vgname, names[0]))
2892
  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2893
                          logical_id=(vgname, names[1]))
2894
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
2895
                          logical_id = (primary, secondary, port),
2896
                          children = [dev_data, dev_meta],
2897
                          iv_name=iv_name)
2898
  return drbd_dev
2899

    
2900

    
2901
def _GenerateDiskTemplate(cfg, template_name,
2902
                          instance_name, primary_node,
2903
                          secondary_nodes, disk_sz, swap_sz,
2904
                          file_storage_dir, file_driver):
2905
  """Generate the entire disk layout for a given template type.
2906

2907
  """
2908
  #TODO: compute space requirements
2909

    
2910
  vgname = cfg.GetVGName()
2911
  if template_name == constants.DT_DISKLESS:
2912
    disks = []
2913
  elif template_name == constants.DT_PLAIN:
2914
    if len(secondary_nodes) != 0:
2915
      raise errors.ProgrammerError("Wrong template configuration")
2916

    
2917
    names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
2918
    sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2919
                           logical_id=(vgname, names[0]),
2920
                           iv_name = "sda")
2921
    sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2922
                           logical_id=(vgname, names[1]),
2923
                           iv_name = "sdb")
2924
    disks = [sda_dev, sdb_dev]
2925
  elif template_name == constants.DT_DRBD8:
2926
    if len(secondary_nodes) != 1:
2927
      raise errors.ProgrammerError("Wrong template configuration")
2928
    remote_node = secondary_nodes[0]
2929
    names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2930
                                       ".sdb_data", ".sdb_meta"])
2931
    drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2932
                                         disk_sz, names[0:2], "sda")
2933
    drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2934
                                         swap_sz, names[2:4], "sdb")
2935
    disks = [drbd_sda_dev, drbd_sdb_dev]
2936
  elif template_name == constants.DT_FILE:
2937
    if len(secondary_nodes) != 0:
2938
      raise errors.ProgrammerError("Wrong template configuration")
2939

    
2940
    file_sda_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk_sz,
2941
                                iv_name="sda", logical_id=(file_driver,
2942
                                "%s/sda" % file_storage_dir))
2943
    file_sdb_dev = objects.Disk(dev_type=constants.LD_FILE, size=swap_sz,
2944
                                iv_name="sdb", logical_id=(file_driver,
2945
                                "%s/sdb" % file_storage_dir))
2946
    disks = [file_sda_dev, file_sdb_dev]
2947
  else:
2948
    raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
2949
  return disks
2950

    
2951

    
2952
def _GetInstanceInfoText(instance):
2953
  """Compute that text that should be added to the disk's metadata.
2954

2955
  """
2956
  return "originstname+%s" % instance.name
2957

    
2958

    
2959
def _CreateDisks(cfg, instance):
2960
  """Create all disks for an instance.
2961

2962
  This abstracts away some work from AddInstance.
2963

2964
  Args:
2965
    instance: the instance object
2966

2967
  Returns:
2968
    True or False showing the success of the creation process
2969

2970
  """
2971
  info = _GetInstanceInfoText(instance)
2972

    
2973
  if instance.disk_template == constants.DT_FILE:
2974
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
2975
    result = rpc.call_file_storage_dir_create(instance.primary_node,
2976
                                              file_storage_dir)
2977

    
2978
    if not result:
2979
      logger.Error("Could not connect to node '%s'" % instance.primary_node)
2980
      return False
2981

    
2982
    if not result[0]:
2983
      logger.Error("failed to create directory '%s'" % file_storage_dir)
2984
      return False
2985

    
2986
  for device in instance.disks:
2987
    logger.Info("creating volume %s for instance %s" %
2988
                (device.iv_name, instance.name))
2989
    #HARDCODE
2990
    for secondary_node in instance.secondary_nodes:
2991
      if not _CreateBlockDevOnSecondary(cfg, secondary_node, instance,
2992
                                        device, False, info):
2993
        logger.Error("failed to create volume %s (%s) on secondary node %s!" %
2994
                     (device.iv_name, device, secondary_node))
2995
        return False
2996
    #HARDCODE
2997
    if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
2998
                                    instance, device, info):
2999
      logger.Error("failed to create volume %s on primary!" %
3000
                   device.iv_name)
3001
      return False
3002

    
3003
  return True
3004

    
3005

    
3006
def _RemoveDisks(instance, cfg):
3007
  """Remove all disks for an instance.
3008

3009
  This abstracts away some work from `AddInstance()` and
3010
  `RemoveInstance()`. Note that in case some of the devices couldn't
3011
  be removed, the removal will continue with the other ones (compare
3012
  with `_CreateDisks()`).
3013

3014
  Args:
3015
    instance: the instance object
3016

3017
  Returns:
3018
    True or False showing the success of the removal proces
3019

3020
  """
3021
  logger.Info("removing block devices for instance %s" % instance.name)
3022

    
3023
  result = True
3024
  for device in instance.disks:
3025
    for node, disk in device.ComputeNodeTree(instance.primary_node):
3026
      cfg.SetDiskID(disk, node)
3027
      if not rpc.call_blockdev_remove(node, disk):
3028
        logger.Error("could not remove block device %s on node %s,"
3029
                     " continuing anyway" %
3030
                     (device.iv_name, node))
3031
        result = False
3032

    
3033
  if instance.disk_template == constants.DT_FILE:
3034
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3035
    if not rpc.call_file_storage_dir_remove(instance.primary_node,
3036
                                            file_storage_dir):
3037
      logger.Error("could not remove directory '%s'" % file_storage_dir)
3038
      result = False
3039

    
3040
  return result
3041

    
3042

    
3043
def _ComputeDiskSize(disk_template, disk_size, swap_size):
3044
  """Compute disk size requirements in the volume group
3045

3046
  This is currently hard-coded for the two-drive layout.
3047

3048
  """
3049
  # Required free disk space as a function of disk and swap space
3050
  req_size_dict = {
3051
    constants.DT_DISKLESS: None,
3052
    constants.DT_PLAIN: disk_size + swap_size,
3053
    # 256 MB are added for drbd metadata, 128MB for each drbd device
3054
    constants.DT_DRBD8: disk_size + swap_size + 256,
3055
    constants.DT_FILE: None,
3056
  }
3057

    
3058
  if disk_template not in req_size_dict:
3059
    raise errors.ProgrammerError("Disk template '%s' size requirement"
3060
                                 " is unknown" %  disk_template)
3061

    
3062
  return req_size_dict[disk_template]
3063

    
3064

    
3065
class LUCreateInstance(LogicalUnit):
3066
  """Create an instance.
3067

3068
  """
3069
  HPATH = "instance-add"
3070
  HTYPE = constants.HTYPE_INSTANCE
3071
  _OP_REQP = ["instance_name", "mem_size", "disk_size",
3072
              "disk_template", "swap_size", "mode", "start", "vcpus",
3073
              "wait_for_sync", "ip_check", "mac"]
3074

    
3075
  def _RunAllocator(self):
3076
    """Run the allocator based on input opcode.
3077

3078
    """
3079
    disks = [{"size": self.op.disk_size, "mode": "w"},
3080
             {"size": self.op.swap_size, "mode": "w"}]
3081
    nics = [{"mac": self.op.mac, "ip": getattr(self.op, "ip", None),
3082
             "bridge": self.op.bridge}]
3083
    ial = IAllocator(self.cfg, self.sstore,
3084
                     mode=constants.IALLOCATOR_MODE_ALLOC,
3085
                     name=self.op.instance_name,
3086
                     disk_template=self.op.disk_template,
3087
                     tags=[],
3088
                     os=self.op.os_type,
3089
                     vcpus=self.op.vcpus,
3090
                     mem_size=self.op.mem_size,
3091
                     disks=disks,
3092
                     nics=nics,
3093
                     )
3094

    
3095
    ial.Run(self.op.iallocator)
3096

    
3097
    if not ial.success:
3098
      raise errors.OpPrereqError("Can't compute nodes using"
3099
                                 " iallocator '%s': %s" % (self.op.iallocator,
3100
                                                           ial.info))
3101
    if len(ial.nodes) != ial.required_nodes:
3102
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3103
                                 " of nodes (%s), required %s" %
3104
                                 (len(ial.nodes), ial.required_nodes))
3105
    self.op.pnode = ial.nodes[0]
3106
    logger.ToStdout("Selected nodes for the instance: %s" %
3107
                    (", ".join(ial.nodes),))
3108
    logger.Info("Selected nodes for instance %s via iallocator %s: %s" %
3109
                (self.op.instance_name, self.op.iallocator, ial.nodes))
3110
    if ial.required_nodes == 2:
3111
      self.op.snode = ial.nodes[1]
3112

    
3113
  def BuildHooksEnv(self):
3114
    """Build hooks env.
3115

3116
    This runs on master, primary and secondary nodes of the instance.
3117

3118
    """
3119
    env = {
3120
      "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
3121
      "INSTANCE_DISK_SIZE": self.op.disk_size,
3122
      "INSTANCE_SWAP_SIZE": self.op.swap_size,
3123
      "INSTANCE_ADD_MODE": self.op.mode,
3124
      }
3125
    if self.op.mode == constants.INSTANCE_IMPORT:
3126
      env["INSTANCE_SRC_NODE"] = self.op.src_node
3127
      env["INSTANCE_SRC_PATH"] = self.op.src_path
3128
      env["INSTANCE_SRC_IMAGE"] = self.src_image
3129

    
3130
    env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
3131
      primary_node=self.op.pnode,
3132
      secondary_nodes=self.secondaries,
3133
      status=self.instance_status,
3134
      os_type=self.op.os_type,
3135
      memory=self.op.mem_size,
3136
      vcpus=self.op.vcpus,
3137
      nics=[(self.inst_ip, self.op.bridge, self.op.mac)],
3138
    ))
3139

    
3140
    nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
3141
          self.secondaries)
3142
    return env, nl, nl
3143

    
3144

    
3145
  def CheckPrereq(self):
3146
    """Check prerequisites.
3147

3148
    """
3149
    # set optional parameters to none if they don't exist
3150
    for attr in ["kernel_path", "initrd_path", "hvm_boot_order", "pnode",
3151
                 "iallocator", "hvm_acpi", "hvm_pae", "hvm_cdrom_image_path",
3152
                 "hvm_nic_type", "hvm_disk_type", "vnc_bind_address"]:
3153
      if not hasattr(self.op, attr):
3154
        setattr(self.op, attr, None)
3155

    
3156
    if self.op.mode not in (constants.INSTANCE_CREATE,
3157
                            constants.INSTANCE_IMPORT):
3158
      raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
3159
                                 self.op.mode)
3160

    
3161
    if (not self.cfg.GetVGName() and
3162
        self.op.disk_template not in constants.DTS_NOT_LVM):
3163
      raise errors.OpPrereqError("Cluster does not support lvm-based"
3164
                                 " instances")
3165

    
3166
    if self.op.mode == constants.INSTANCE_IMPORT:
3167
      src_node = getattr(self.op, "src_node", None)
3168
      src_path = getattr(self.op, "src_path", None)
3169
      if src_node is None or src_path is None:
3170
        raise errors.OpPrereqError("Importing an instance requires source"
3171
                                   " node and path options")
3172
      src_node_full = self.cfg.ExpandNodeName(src_node)
3173
      if src_node_full is None:
3174
        raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
3175
      self.op.src_node = src_node = src_node_full
3176

    
3177
      if not os.path.isabs(src_path):
3178
        raise errors.OpPrereqError("The source path must be absolute")
3179

    
3180
      export_info = rpc.call_export_info(src_node, src_path)
3181

    
3182
      if not export_info:
3183
        raise errors.OpPrereqError("No export found in dir %s" % src_path)
3184

    
3185
      if not export_info.has_section(constants.INISECT_EXP):
3186
        raise errors.ProgrammerError("Corrupted export config")
3187

    
3188
      ei_version = export_info.get(constants.INISECT_EXP, 'version')
3189
      if (int(ei_version) != constants.EXPORT_VERSION):
3190
        raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3191
                                   (ei_version, constants.EXPORT_VERSION))
3192

    
3193
      if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
3194
        raise errors.OpPrereqError("Can't import instance with more than"
3195
                                   " one data disk")
3196

    
3197
      # FIXME: are the old os-es, disk sizes, etc. useful?
3198
      self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3199
      diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
3200
                                                         'disk0_dump'))
3201
      self.src_image = diskimage
3202
    else: # INSTANCE_CREATE
3203
      if getattr(self.op, "os_type", None) is None:
3204
        raise errors.OpPrereqError("No guest OS specified")
3205

    
3206
    #### instance parameters check
3207

    
3208
    # disk template and mirror node verification
3209
    if self.op.disk_template not in constants.DISK_TEMPLATES:
3210
      raise errors.OpPrereqError("Invalid disk template name")
3211

    
3212
    # instance name verification
3213
    hostname1 = utils.HostInfo(self.op.instance_name)
3214

    
3215
    self.op.instance_name = instance_name = hostname1.name
3216
    instance_list = self.cfg.GetInstanceList()
3217
    if instance_name in instance_list:
3218
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3219
                                 instance_name)
3220

    
3221
    # ip validity checks
3222
    ip = getattr(self.op, "ip", None)
3223
    if ip is None or ip.lower() == "none":
3224
      inst_ip = None
3225
    elif ip.lower() == "auto":
3226
      inst_ip = hostname1.ip
3227
    else:
3228
      if not utils.IsValidIP(ip):
3229
        raise errors.OpPrereqError("given IP address '%s' doesn't look"
3230
                                   " like a valid IP" % ip)
3231
      inst_ip = ip
3232
    self.inst_ip = self.op.ip = inst_ip
3233

    
3234
    if self.op.start and not self.op.ip_check:
3235
      raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3236
                                 " adding an instance in start mode")
3237

    
3238
    if self.op.ip_check:
3239
      if utils.TcpPing(hostname1.ip, constants.DEFAULT_NODED_PORT):
3240
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
3241
                                   (hostname1.ip, instance_name))
3242

    
3243
    # MAC address verification
3244
    if self.op.mac != "auto":
3245
      if not utils.IsValidMac(self.op.mac.lower()):
3246
        raise errors.OpPrereqError("invalid MAC address specified: %s" %
3247
                                   self.op.mac)
3248

    
3249
    # bridge verification
3250
    bridge = getattr(self.op, "bridge", None)
3251
    if bridge is None:
3252
      self.op.bridge = self.cfg.GetDefBridge()
3253
    else:
3254
      self.op.bridge = bridge
3255

    
3256
    # boot order verification
3257
    if self.op.hvm_boot_order is not None:
3258
      if len(self.op.hvm_boot_order.strip("acdn")) != 0:
3259
        raise errors.OpPrereqError("invalid boot order specified,"
3260
                                   " must be one or more of [acdn]")
3261
    # file storage checks
3262
    if (self.op.file_driver and
3263
        not self.op.file_driver in constants.FILE_DRIVER):
3264
      raise errors.OpPrereqError("Invalid file driver name '%s'" %
3265
                                 self.op.file_driver)
3266

    
3267
    if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
3268
      raise errors.OpPrereqError("File storage directory not a relative"
3269
                                 " path")
3270
    #### allocator run
3271

    
3272
    if [self.op.iallocator, self.op.pnode].count(None) != 1:
3273
      raise errors.OpPrereqError("One and only one of iallocator and primary"
3274
                                 " node must be given")
3275

    
3276
    if self.op.iallocator is not None:
3277
      self._RunAllocator()
3278

    
3279
    #### node related checks
3280

    
3281
    # check primary node
3282
    pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
3283
    if pnode is None:
3284
      raise errors.OpPrereqError("Primary node '%s' is unknown" %
3285
                                 self.op.pnode)
3286
    self.op.pnode = pnode.name
3287
    self.pnode = pnode
3288
    self.secondaries = []
3289

    
3290
    # mirror node verification
3291
    if self.op.disk_template in constants.DTS_NET_MIRROR:
3292
      if getattr(self.op, "snode", None) is None:
3293
        raise errors.OpPrereqError("The networked disk templates need"
3294
                                   " a mirror node")
3295

    
3296
      snode_name = self.cfg.ExpandNodeName(self.op.snode)
3297
      if snode_name is None:
3298
        raise errors.OpPrereqError("Unknown secondary node '%s'" %
3299
                                   self.op.snode)
3300
      elif snode_name == pnode.name:
3301
        raise errors.OpPrereqError("The secondary node cannot be"
3302
                                   " the primary node.")
3303
      self.secondaries.append(snode_name)
3304

    
3305
    req_size = _ComputeDiskSize(self.op.disk_template,
3306
                                self.op.disk_size, self.op.swap_size)
3307

    
3308
    # Check lv size requirements
3309
    if req_size is not None:
3310
      nodenames = [pnode.name] + self.secondaries
3311
      nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
3312
      for node in nodenames:
3313
        info = nodeinfo.get(node, None)
3314
        if not info:
3315
          raise errors.OpPrereqError("Cannot get current information"
3316
                                     " from node '%s'" % node)
3317
        vg_free = info.get('vg_free', None)
3318
        if not isinstance(vg_free, int):
3319
          raise errors.OpPrereqError("Can't compute free disk space on"
3320
                                     " node %s" % node)
3321
        if req_size > info['vg_free']:
3322
          raise errors.OpPrereqError("Not enough disk space on target node %s."
3323
                                     " %d MB available, %d MB required" %
3324
                                     (node, info['vg_free'], req_size))
3325

    
3326
    # os verification
3327
    os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
3328
    if not os_obj:
3329
      raise errors.OpPrereqError("OS '%s' not in supported os list for"
3330
                                 " primary node"  % self.op.os_type)
3331

    
3332
    if self.op.kernel_path == constants.VALUE_NONE:
3333
      raise errors.OpPrereqError("Can't set instance kernel to none")
3334

    
3335

    
3336
    # bridge check on primary node
3337
    if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3338
      raise errors.OpPrereqError("target bridge '%s' does not exist on"
3339
                                 " destination node '%s'" %
3340
                                 (self.op.bridge, pnode.name))
3341

    
3342
    # memory check on primary node
3343
    if self.op.start:
3344
      _CheckNodeFreeMemory(self.cfg, self.pnode.name,
3345
                           "creating instance %s" % self.op.instance_name,
3346
                           self.op.mem_size)
3347

    
3348
    # hvm_cdrom_image_path verification
3349
    if self.op.hvm_cdrom_image_path is not None:
3350
      if not os.path.isabs(self.op.hvm_cdrom_image_path):
3351
        raise errors.OpPrereqError("The path to the HVM CDROM image must"
3352
                                   " be an absolute path or None, not %s" %
3353
                                   self.op.hvm_cdrom_image_path)
3354
      if not os.path.isfile(self.op.hvm_cdrom_image_path):
3355
        raise errors.OpPrereqError("The HVM CDROM image must either be a"
3356
                                   " regular file or a symlink pointing to"
3357
                                   " an existing regular file, not %s" %
3358
                                   self.op.hvm_cdrom_image_path)
3359

    
3360
    # vnc_bind_address verification
3361
    if self.op.vnc_bind_address is not None:
3362
      if not utils.IsValidIP(self.op.vnc_bind_address):
3363
        raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
3364
                                   " like a valid IP address" %
3365
                                   self.op.vnc_bind_address)
3366

    
3367
    # Xen HVM device type checks
3368
    if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
3369
      if self.op.hvm_nic_type not in constants.HT_HVM_VALID_NIC_TYPES:
3370
        raise errors.OpPrereqError("Invalid NIC type %s specified for Xen HVM"
3371
                                   " hypervisor" % self.op.hvm_nic_type)
3372
      if self.op.hvm_disk_type not in constants.HT_HVM_VALID_DISK_TYPES:
3373
        raise errors.OpPrereqError("Invalid disk type %s specified for Xen HVM"
3374
                                   " hypervisor" % self.op.hvm_disk_type)
3375

    
3376
    if self.op.start:
3377
      self.instance_status = 'up'
3378
    else:
3379
      self.instance_status = 'down'
3380

    
3381
  def Exec(self, feedback_fn):
3382
    """Create and add the instance to the cluster.
3383

3384
    """
3385
    instance = self.op.instance_name
3386
    pnode_name = self.pnode.name
3387

    
3388
    if self.op.mac == "auto":
3389
      mac_address = self.cfg.GenerateMAC()
3390
    else:
3391
      mac_address = self.op.mac
3392

    
3393
    nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3394
    if self.inst_ip is not None:
3395
      nic.ip = self.inst_ip
3396

    
3397
    ht_kind = self.sstore.GetHypervisorType()
3398
    if ht_kind in constants.HTS_REQ_PORT:
3399
      network_port = self.cfg.AllocatePort()
3400
    else:
3401
      network_port = None
3402

    
3403
    if self.op.vnc_bind_address is None:
3404
      self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
3405

    
3406
    # this is needed because os.path.join does not accept None arguments
3407
    if self.op.file_storage_dir is None:
3408
      string_file_storage_dir = ""
3409
    else:
3410
      string_file_storage_dir = self.op.file_storage_dir
3411

    
3412
    # build the full file storage dir path
3413
    file_storage_dir = os.path.normpath(os.path.join(
3414
                                        self.sstore.GetFileStorageDir(),
3415
                                        string_file_storage_dir, instance))
3416

    
3417

    
3418
    disks = _GenerateDiskTemplate(self.cfg,
3419
                                  self.op.disk_template,
3420
                                  instance, pnode_name,
3421
                                  self.secondaries, self.op.disk_size,
3422
                                  self.op.swap_size,
3423
                                  file_storage_dir,
3424
                                  self.op.file_driver)
3425

    
3426
    iobj = objects.Instance(name=instance, os=self.op.os_type,
3427
                            primary_node=pnode_name,
3428
                            memory=self.op.mem_size,
3429
                            vcpus=self.op.vcpus,
3430
                            nics=[nic], disks=disks,
3431
                            disk_template=self.op.disk_template,
3432
                            status=self.instance_status,
3433
                            network_port=network_port,
3434
                            kernel_path=self.op.kernel_path,
3435
                            initrd_path=self.op.initrd_path,
3436
                            hvm_boot_order=self.op.hvm_boot_order,
3437
                            hvm_acpi=self.op.hvm_acpi,
3438
                            hvm_pae=self.op.hvm_pae,
3439
                            hvm_cdrom_image_path=self.op.hvm_cdrom_image_path,
3440
                            vnc_bind_address=self.op.vnc_bind_address,
3441
                            hvm_nic_type=self.op.hvm_nic_type,
3442
                            hvm_disk_type=self.op.hvm_disk_type,
3443
                            )
3444

    
3445
    feedback_fn("* creating instance disks...")
3446
    if not _CreateDisks(self.cfg, iobj):
3447
      _RemoveDisks(iobj, self.cfg)
3448
      raise errors.OpExecError("Device creation failed, reverting...")
3449

    
3450
    feedback_fn("adding instance %s to cluster config" % instance)
3451

    
3452
    self.cfg.AddInstance(iobj)
3453
    # Add the new instance to the Ganeti Lock Manager
3454
    self.context.glm.add(locking.LEVEL_INSTANCE, instance)
3455

    
3456
    if self.op.wait_for_sync:
3457
      disk_abort = not _WaitForSync(self.cfg, iobj, self.proc)
3458
    elif iobj.disk_template in constants.DTS_NET_MIRROR:
3459
      # make sure the disks are not degraded (still sync-ing is ok)
3460
      time.sleep(15)
3461
      feedback_fn("* checking mirrors status")
3462
      disk_abort = not _WaitForSync(self.cfg, iobj, self.proc, oneshot=True)
3463
    else:
3464
      disk_abort = False
3465

    
3466
    if disk_abort:
3467
      _RemoveDisks(iobj, self.cfg)
3468
      self.cfg.RemoveInstance(iobj.name)
3469
      # Remove the new instance from the Ganeti Lock Manager
3470
      self.context.glm.remove(locking.LEVEL_INSTANCE, iobj.name)
3471
      raise errors.OpExecError("There are some degraded disks for"
3472
                               " this instance")
3473

    
3474
    feedback_fn("creating os for instance %s on node %s" %
3475
                (instance, pnode_name))
3476

    
3477
    if iobj.disk_template != constants.DT_DISKLESS:
3478
      if self.op.mode == constants.INSTANCE_CREATE:
3479
        feedback_fn("* running the instance OS create scripts...")
3480
        if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3481
          raise errors.OpExecError("could not add os for instance %s"
3482
                                   " on node %s" %
3483
                                   (instance, pnode_name))
3484

    
3485
      elif self.op.mode == constants.INSTANCE_IMPORT:
3486
        feedback_fn("* running the instance OS import scripts...")
3487
        src_node = self.op.src_node
3488
        src_image = self.src_image
3489
        if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3490
                                                src_node, src_image):
3491
          raise errors.OpExecError("Could not import os for instance"
3492
                                   " %s on node %s" %
3493
                                   (instance, pnode_name))
3494
      else:
3495
        # also checked in the prereq part
3496
        raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3497
                                     % self.op.mode)
3498

    
3499
    if self.op.start:
3500
      logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3501
      feedback_fn("* starting instance...")
3502
      if not rpc.call_instance_start(pnode_name, iobj, None):
3503
        raise errors.OpExecError("Could not start instance")
3504

    
3505

    
3506
class LUConnectConsole(NoHooksLU):
3507
  """Connect to an instance's console.
3508

3509
  This is somewhat special in that it returns the command line that
3510
  you need to run on the master node in order to connect to the
3511
  console.
3512

3513
  """
3514
  _OP_REQP = ["instance_name"]
3515
  REQ_BGL = False
3516

    
3517
  def ExpandNames(self):
3518
    self._ExpandAndLockInstance()
3519

    
3520
  def CheckPrereq(self):
3521
    """Check prerequisites.
3522

3523
    This checks that the instance is in the cluster.
3524

3525
    """
3526
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3527
    assert self.instance is not None, \
3528
      "Cannot retrieve locked instance %s" % self.op.instance_name
3529

    
3530
  def Exec(self, feedback_fn):
3531
    """Connect to the console of an instance
3532

3533
    """
3534
    instance = self.instance
3535
    node = instance.primary_node
3536

    
3537
    node_insts = rpc.call_instance_list([node])[node]
3538
    if node_insts is False:
3539
      raise errors.OpExecError("Can't connect to node %s." % node)
3540

    
3541
    if instance.name not in node_insts:
3542
      raise errors.OpExecError("Instance %s is not running." % instance.name)
3543

    
3544
    logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3545

    
3546
    hyper = hypervisor.GetHypervisor()
3547
    console_cmd = hyper.GetShellCommandForConsole(instance)
3548

    
3549
    # build ssh cmdline
3550
    return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
3551

    
3552

    
3553
class LUReplaceDisks(LogicalUnit):
3554
  """Replace the disks of an instance.
3555

3556
  """
3557
  HPATH = "mirrors-replace"
3558
  HTYPE = constants.HTYPE_INSTANCE
3559
  _OP_REQP = ["instance_name", "mode", "disks"]
3560
  REQ_BGL = False
3561

    
3562
  def ExpandNames(self):
3563
    self._ExpandAndLockInstance()
3564

    
3565
    if not hasattr(self.op, "remote_node"):
3566
      self.op.remote_node = None
3567

    
3568
    ia_name = getattr(self.op, "iallocator", None)
3569
    if ia_name is not None:
3570
      if self.op.remote_node is not None:
3571
        raise errors.OpPrereqError("Give either the iallocator or the new"
3572
                                   " secondary, not both")
3573
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3574
    elif self.op.remote_node is not None:
3575
      remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
3576
      if remote_node is None:
3577
        raise errors.OpPrereqError("Node '%s' not known" %
3578
                                   self.op.remote_node)
3579
      self.op.remote_node = remote_node
3580
      self.needed_locks[locking.LEVEL_NODE] = [remote_node]
3581
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
3582
    else:
3583
      self.needed_locks[locking.LEVEL_NODE] = []
3584
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3585

    
3586
  def DeclareLocks(self, level):
3587
    # If we're not already locking all nodes in the set we have to declare the
3588
    # instance's primary/secondary nodes.
3589
    if (level == locking.LEVEL_NODE and
3590
        self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
3591
      self._LockInstancesNodes()
3592

    
3593
  def _RunAllocator(self):
3594
    """Compute a new secondary node using an IAllocator.
3595

3596
    """
3597
    ial = IAllocator(self.cfg, self.sstore,
3598
                     mode=constants.IALLOCATOR_MODE_RELOC,
3599
                     name=self.op.instance_name,
3600
                     relocate_from=[self.sec_node])
3601

    
3602
    ial.Run(self.op.iallocator)
3603

    
3604
    if not ial.success:
3605
      raise errors.OpPrereqError("Can't compute nodes using"
3606
                                 " iallocator '%s': %s" % (self.op.iallocator,
3607
                                                           ial.info))
3608
    if len(ial.nodes) != ial.required_nodes:
3609
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3610
                                 " of nodes (%s), required %s" %
3611
                                 (len(ial.nodes), ial.required_nodes))
3612
    self.op.remote_node = ial.nodes[0]
3613
    logger.ToStdout("Selected new secondary for the instance: %s" %
3614
                    self.op.remote_node)
3615

    
3616
  def BuildHooksEnv(self):
3617
    """Build hooks env.
3618

3619
    This runs on the master, the primary and all the secondaries.
3620

3621
    """
3622
    env = {
3623
      "MODE": self.op.mode,
3624
      "NEW_SECONDARY": self.op.remote_node,
3625
      "OLD_SECONDARY": self.instance.secondary_nodes[0],
3626
      }
3627
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3628
    nl = [
3629
      self.sstore.GetMasterNode(),
3630
      self.instance.primary_node,
3631
      ]
3632
    if self.op.remote_node is not None:
3633
      nl.append(self.op.remote_node)
3634
    return env, nl, nl
3635

    
3636
  def CheckPrereq(self):
3637
    """Check prerequisites.
3638

3639
    This checks that the instance is in the cluster.
3640

3641
    """
3642
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3643
    assert instance is not None, \
3644
      "Cannot retrieve locked instance %s" % self.op.instance_name
3645
    self.instance = instance
3646

    
3647
    if instance.disk_template not in constants.DTS_NET_MIRROR:
3648
      raise errors.OpPrereqError("Instance's disk layout is not"
3649
                                 " network mirrored.")
3650

    
3651
    if len(instance.secondary_nodes) != 1:
3652
      raise errors.OpPrereqError("The instance has a strange layout,"
3653
                                 " expected one secondary but found %d" %
3654
                                 len(instance.secondary_nodes))
3655

    
3656
    self.sec_node = instance.secondary_nodes[0]
3657

    
3658
    ia_name = getattr(self.op, "iallocator", None)
3659
    if ia_name is not None:
3660
      self._RunAllocator()
3661

    
3662
    remote_node = self.op.remote_node
3663
    if remote_node is not None:
3664
      self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3665
      assert self.remote_node_info is not None, \
3666
        "Cannot retrieve locked node %s" % remote_node
3667
    else:
3668
      self.remote_node_info = None
3669
    if remote_node == instance.primary_node:
3670
      raise errors.OpPrereqError("The specified node is the primary node of"
3671
                                 " the instance.")
3672
    elif remote_node == self.sec_node:
3673
      if self.op.mode == constants.REPLACE_DISK_SEC:
3674
        # this is for DRBD8, where we can't execute the same mode of
3675
        # replacement as for drbd7 (no different port allocated)
3676
        raise errors.OpPrereqError("Same secondary given, cannot execute"
3677
                                   " replacement")
3678
    if instance.disk_template == constants.DT_DRBD8:
3679
      if (self.op.mode == constants.REPLACE_DISK_ALL and
3680
          remote_node is not None):
3681
        # switch to replace secondary mode
3682
        self.op.mode = constants.REPLACE_DISK_SEC
3683

    
3684
      if self.op.mode == constants.REPLACE_DISK_ALL:
3685
        raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3686
                                   " secondary disk replacement, not"
3687
                                   " both at once")
3688
      elif self.op.mode == constants.REPLACE_DISK_PRI:
3689
        if remote_node is not None:
3690
          raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3691
                                     " the secondary while doing a primary"
3692
                                     " node disk replacement")
3693
        self.tgt_node = instance.primary_node
3694
        self.oth_node = instance.secondary_nodes[0]
3695
      elif self.op.mode == constants.REPLACE_DISK_SEC:
3696
        self.new_node = remote_node # this can be None, in which case
3697
                                    # we don't change the secondary
3698
        self.tgt_node = instance.secondary_nodes[0]
3699
        self.oth_node = instance.primary_node
3700
      else:
3701
        raise errors.ProgrammerError("Unhandled disk replace mode")
3702

    
3703
    for name in self.op.disks:
3704
      if instance.FindDisk(name) is None:
3705
        raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3706
                                   (name, instance.name))
3707

    
3708
  def _ExecD8DiskOnly(self, feedback_fn):
3709
    """Replace a disk on the primary or secondary for dbrd8.
3710

3711
    The algorithm for replace is quite complicated:
3712
      - for each disk to be replaced:
3713
        - create new LVs on the target node with unique names
3714
        - detach old LVs from the drbd device
3715
        - rename old LVs to name_replaced.<time_t>
3716
        - rename new LVs to old LVs
3717
        - attach the new LVs (with the old names now) to the drbd device
3718
      - wait for sync across all devices
3719
      - for each modified disk:
3720
        - remove old LVs (which have the name name_replaces.<time_t>)
3721

3722
    Failures are not very well handled.
3723

3724
    """
3725
    steps_total = 6
3726
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3727
    instance = self.instance
3728
    iv_names = {}
3729
    vgname = self.cfg.GetVGName()
3730
    # start of work
3731
    cfg = self.cfg
3732
    tgt_node = self.tgt_node
3733
    oth_node = self.oth_node
3734

    
3735
    # Step: check device activation
3736
    self.proc.LogStep(1, steps_total, "check device existence")
3737
    info("checking volume groups")
3738
    my_vg = cfg.GetVGName()
3739
    results = rpc.call_vg_list([oth_node, tgt_node])
3740
    if not results:
3741
      raise errors.OpExecError("Can't list volume groups on the nodes")
3742
    for node in oth_node, tgt_node:
3743
      res = results.get(node, False)
3744
      if not res or my_vg not in res:
3745
        raise errors.OpExecError("Volume group '%s' not found on %s" %
3746
                                 (my_vg, node))
3747
    for dev in instance.disks:
3748
      if not dev.iv_name in self.op.disks:
3749
        continue
3750
      for node in tgt_node, oth_node:
3751
        info("checking %s on %s" % (dev.iv_name, node))
3752
        cfg.SetDiskID(dev, node)
3753
        if not rpc.call_blockdev_find(node, dev):
3754
          raise errors.OpExecError("Can't find device %s on node %s" %
3755
                                   (dev.iv_name, node))
3756

    
3757
    # Step: check other node consistency
3758
    self.proc.LogStep(2, steps_total, "check peer consistency")
3759
    for dev in instance.disks:
3760
      if not dev.iv_name in self.op.disks:
3761
        continue
3762
      info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3763
      if not _CheckDiskConsistency(self.cfg, dev, oth_node,
3764
                                   oth_node==instance.primary_node):
3765
        raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3766
                                 " to replace disks on this node (%s)" %
3767
                                 (oth_node, tgt_node))
3768

    
3769
    # Step: create new storage
3770
    self.proc.LogStep(3, steps_total, "allocate new storage")
3771
    for dev in instance.disks:
3772
      if not dev.iv_name in self.op.disks:
3773
        continue
3774
      size = dev.size
3775
      cfg.SetDiskID(dev, tgt_node)
3776
      lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3777
      names = _GenerateUniqueNames(cfg, lv_names)
3778
      lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3779
                             logical_id=(vgname, names[0]))
3780
      lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3781
                             logical_id=(vgname, names[1]))
3782
      new_lvs = [lv_data, lv_meta]
3783
      old_lvs = dev.children
3784
      iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3785
      info("creating new local storage on %s for %s" %
3786
           (tgt_node, dev.iv_name))
3787
      # since we *always* want to create this LV, we use the
3788
      # _Create...OnPrimary (which forces the creation), even if we
3789
      # are talking about the secondary node
3790
      for new_lv in new_lvs:
3791
        if not _CreateBlockDevOnPrimary(cfg, tgt_node, instance, new_lv,
3792
                                        _GetInstanceInfoText(instance)):
3793
          raise errors.OpExecError("Failed to create new LV named '%s' on"
3794
                                   " node '%s'" %
3795
                                   (new_lv.logical_id[1], tgt_node))
3796

    
3797
    # Step: for each lv, detach+rename*2+attach
3798
    self.proc.LogStep(4, steps_total, "change drbd configuration")
3799
    for dev, old_lvs, new_lvs in iv_names.itervalues():
3800
      info("detaching %s drbd from local storage" % dev.iv_name)
3801
      if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
3802
        raise errors.OpExecError("Can't detach drbd from local storage on node"
3803
                                 " %s for device %s" % (tgt_node, dev.iv_name))
3804
      #dev.children = []
3805
      #cfg.Update(instance)
3806

    
3807
      # ok, we created the new LVs, so now we know we have the needed
3808
      # storage; as such, we proceed on the target node to rename
3809
      # old_lv to _old, and new_lv to old_lv; note that we rename LVs
3810
      # using the assumption that logical_id == physical_id (which in
3811
      # turn is the unique_id on that node)
3812

    
3813
      # FIXME(iustin): use a better name for the replaced LVs
3814
      temp_suffix = int(time.time())
3815
      ren_fn = lambda d, suff: (d.physical_id[0],
3816
                                d.physical_id[1] + "_replaced-%s" % suff)
3817
      # build the rename list based on what LVs exist on the node
3818
      rlist = []
3819
      for to_ren in old_lvs:
3820
        find_res = rpc.call_blockdev_find(tgt_node, to_ren)
3821
        if find_res is not None: # device exists
3822
          rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
3823

    
3824
      info("renaming the old LVs on the target node")
3825
      if not rpc.call_blockdev_rename(tgt_node, rlist):
3826
        raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
3827
      # now we rename the new LVs to the old LVs
3828
      info("renaming the new LVs on the target node")
3829
      rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
3830
      if not rpc.call_blockdev_rename(tgt_node, rlist):
3831
        raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
3832

    
3833
      for old, new in zip(old_lvs, new_lvs):
3834
        new.logical_id = old.logical_id
3835
        cfg.SetDiskID(new, tgt_node)
3836

    
3837
      for disk in old_lvs:
3838
        disk.logical_id = ren_fn(disk, temp_suffix)
3839
        cfg.SetDiskID(disk, tgt_node)
3840

    
3841
      # now that the new lvs have the old name, we can add them to the device
3842
      info("adding new mirror component on %s" % tgt_node)
3843
      if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
3844
        for new_lv in new_lvs:
3845
          if not rpc.call_blockdev_remove(tgt_node, new_lv):
3846
            warning("Can't rollback device %s", hint="manually cleanup unused"
3847
                    " logical volumes")
3848
        raise errors.OpExecError("Can't add local storage to drbd")
3849

    
3850
      dev.children = new_lvs
3851
      cfg.Update(instance)
3852

    
3853
    # Step: wait for sync
3854

    
3855
    # this can fail as the old devices are degraded and _WaitForSync
3856
    # does a combined result over all disks, so we don't check its
3857
    # return value
3858
    self.proc.LogStep(5, steps_total, "sync devices")
3859
    _WaitForSync(cfg, instance, self.proc, unlock=True)
3860

    
3861
    # so check manually all the devices
3862
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3863
      cfg.SetDiskID(dev, instance.primary_node)
3864
      is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3865
      if is_degr:
3866
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
3867

    
3868
    # Step: remove old storage
3869
    self.proc.LogStep(6, steps_total, "removing old storage")
3870
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3871
      info("remove logical volumes for %s" % name)
3872
      for lv in old_lvs:
3873
        cfg.SetDiskID(lv, tgt_node)
3874
        if not rpc.call_blockdev_remove(tgt_node, lv):
3875
          warning("Can't remove old LV", hint="manually remove unused LVs")
3876
          continue
3877

    
3878
  def _ExecD8Secondary(self, feedback_fn):
3879
    """Replace the secondary node for drbd8.
3880

3881
    The algorithm for replace is quite complicated:
3882
      - for all disks of the instance:
3883
        - create new LVs on the new node with same names
3884
        - shutdown the drbd device on the old secondary
3885
        - disconnect the drbd network on the primary
3886
        - create the drbd device on the new secondary
3887
        - network attach the drbd on the primary, using an artifice:
3888
          the drbd code for Attach() will connect to the network if it
3889
          finds a device which is connected to the good local disks but
3890
          not network enabled
3891
      - wait for sync across all devices
3892
      - remove all disks from the old secondary
3893

3894
    Failures are not very well handled.
3895

3896
    """
3897
    steps_total = 6
3898
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3899
    instance = self.instance
3900
    iv_names = {}
3901
    vgname = self.cfg.GetVGName()
3902
    # start of work
3903
    cfg = self.cfg
3904
    old_node = self.tgt_node
3905
    new_node = self.new_node
3906
    pri_node = instance.primary_node
3907

    
3908
    # Step: check device activation
3909
    self.proc.LogStep(1, steps_total, "check device existence")
3910
    info("checking volume groups")
3911
    my_vg = cfg.GetVGName()
3912
    results = rpc.call_vg_list([pri_node, new_node])
3913
    if not results:
3914
      raise errors.OpExecError("Can't list volume groups on the nodes")
3915
    for node in pri_node, new_node:
3916
      res = results.get(node, False)
3917
      if not res or my_vg not in res:
3918
        raise errors.OpExecError("Volume group '%s' not found on %s" %
3919
                                 (my_vg, node))
3920
    for dev in instance.disks:
3921
      if not dev.iv_name in self.op.disks:
3922
        continue
3923
      info("checking %s on %s" % (dev.iv_name, pri_node))
3924
      cfg.SetDiskID(dev, pri_node)
3925
      if not rpc.call_blockdev_find(pri_node, dev):
3926
        raise errors.OpExecError("Can't find device %s on node %s" %
3927
                                 (dev.iv_name, pri_node))
3928

    
3929
    # Step: check other node consistency
3930
    self.proc.LogStep(2, steps_total, "check peer consistency")
3931
    for dev in instance.disks:
3932
      if not dev.iv_name in self.op.disks:
3933
        continue
3934
      info("checking %s consistency on %s" % (dev.iv_name, pri_node))
3935
      if not _CheckDiskConsistency(self.cfg, dev, pri_node, True, ldisk=True):
3936
        raise errors.OpExecError("Primary node (%s) has degraded storage,"
3937
                                 " unsafe to replace the secondary" %
3938
                                 pri_node)
3939

    
3940
    # Step: create new storage
3941
    self.proc.LogStep(3, steps_total, "allocate new storage")
3942
    for dev in instance.disks:
3943
      size = dev.size
3944
      info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
3945
      # since we *always* want to create this LV, we use the
3946
      # _Create...OnPrimary (which forces the creation), even if we
3947
      # are talking about the secondary node
3948
      for new_lv in dev.children:
3949
        if not _CreateBlockDevOnPrimary(cfg, new_node, instance, new_lv,
3950
                                        _GetInstanceInfoText(instance)):
3951
          raise errors.Op