Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ 7688d0d3

History | View | Annotate | Download (188.5 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0613,W0201
25

    
26
import os
27
import os.path
28
import sha
29
import time
30
import tempfile
31
import re
32
import platform
33
import logging
34

    
35
from ganeti import rpc
36
from ganeti import ssh
37
from ganeti import logger
38
from ganeti import utils
39
from ganeti import errors
40
from ganeti import hypervisor
41
from ganeti import locking
42
from ganeti import constants
43
from ganeti import objects
44
from ganeti import opcodes
45
from ganeti import serializer
46

    
47

    
48
class LogicalUnit(object):
49
  """Logical Unit base class.
50

51
  Subclasses must follow these rules:
52
    - implement ExpandNames
53
    - implement CheckPrereq
54
    - implement Exec
55
    - implement BuildHooksEnv
56
    - redefine HPATH and HTYPE
57
    - optionally redefine their run requirements:
58
        REQ_MASTER: the LU needs to run on the master node
59
        REQ_WSSTORE: the LU needs a writable SimpleStore
60
        REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
61

62
  Note that all commands require root permissions.
63

64
  """
65
  HPATH = None
66
  HTYPE = None
67
  _OP_REQP = []
68
  REQ_MASTER = True
69
  REQ_WSSTORE = False
70
  REQ_BGL = True
71

    
72
  def __init__(self, processor, op, context, sstore):
73
    """Constructor for LogicalUnit.
74

75
    This needs to be overriden in derived classes in order to check op
76
    validity.
77

78
    """
79
    self.proc = processor
80
    self.op = op
81
    self.cfg = context.cfg
82
    self.sstore = sstore
83
    self.context = context
84
    # Dicts used to declare locking needs to mcpu
85
    self.needed_locks = None
86
    self.acquired_locks = {}
87
    self.share_locks = dict(((i, 0) for i in locking.LEVELS))
88
    self.add_locks = {}
89
    self.remove_locks = {}
90
    # Used to force good behavior when calling helper functions
91
    self.recalculate_locks = {}
92
    self.__ssh = None
93

    
94
    for attr_name in self._OP_REQP:
95
      attr_val = getattr(op, attr_name, None)
96
      if attr_val is None:
97
        raise errors.OpPrereqError("Required parameter '%s' missing" %
98
                                   attr_name)
99

    
100
    if not self.cfg.IsCluster():
101
      raise errors.OpPrereqError("Cluster not initialized yet,"
102
                                 " use 'gnt-cluster init' first.")
103
    if self.REQ_MASTER:
104
      master = sstore.GetMasterNode()
105
      if master != utils.HostInfo().name:
106
        raise errors.OpPrereqError("Commands must be run on the master"
107
                                   " node %s" % master)
108

    
109
  def __GetSSH(self):
110
    """Returns the SshRunner object
111

112
    """
113
    if not self.__ssh:
114
      self.__ssh = ssh.SshRunner(self.cfg)
115
    return self.__ssh
116

    
117
  ssh = property(fget=__GetSSH)
118

    
119
  def ExpandNames(self):
120
    """Expand names for this LU.
121

122
    This method is called before starting to execute the opcode, and it should
123
    update all the parameters of the opcode to their canonical form (e.g. a
124
    short node name must be fully expanded after this method has successfully
125
    completed). This way locking, hooks, logging, ecc. can work correctly.
126

127
    LUs which implement this method must also populate the self.needed_locks
128
    member, as a dict with lock levels as keys, and a list of needed lock names
129
    as values. Rules:
130
      - Use an empty dict if you don't need any lock
131
      - If you don't need any lock at a particular level omit that level
132
      - Don't put anything for the BGL level
133
      - If you want all locks at a level use locking.ALL_SET as a value
134

135
    If you need to share locks (rather than acquire them exclusively) at one
136
    level you can modify self.share_locks, setting a true value (usually 1) for
137
    that level. By default locks are not shared.
138

139
    Examples:
140
    # Acquire all nodes and one instance
141
    self.needed_locks = {
142
      locking.LEVEL_NODE: locking.ALL_SET,
143
      locking.LEVEL_INSTANCE: ['instance1.example.tld'],
144
    }
145
    # Acquire just two nodes
146
    self.needed_locks = {
147
      locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
148
    }
149
    # Acquire no locks
150
    self.needed_locks = {} # No, you can't leave it to the default value None
151

152
    """
153
    # The implementation of this method is mandatory only if the new LU is
154
    # concurrent, so that old LUs don't need to be changed all at the same
155
    # time.
156
    if self.REQ_BGL:
157
      self.needed_locks = {} # Exclusive LUs don't need locks.
158
    else:
159
      raise NotImplementedError
160

    
161
  def DeclareLocks(self, level):
162
    """Declare LU locking needs for a level
163

164
    While most LUs can just declare their locking needs at ExpandNames time,
165
    sometimes there's the need to calculate some locks after having acquired
166
    the ones before. This function is called just before acquiring locks at a
167
    particular level, but after acquiring the ones at lower levels, and permits
168
    such calculations. It can be used to modify self.needed_locks, and by
169
    default it does nothing.
170

171
    This function is only called if you have something already set in
172
    self.needed_locks for the level.
173

174
    @param level: Locking level which is going to be locked
175
    @type level: member of ganeti.locking.LEVELS
176

177
    """
178

    
179
  def CheckPrereq(self):
180
    """Check prerequisites for this LU.
181

182
    This method should check that the prerequisites for the execution
183
    of this LU are fulfilled. It can do internode communication, but
184
    it should be idempotent - no cluster or system changes are
185
    allowed.
186

187
    The method should raise errors.OpPrereqError in case something is
188
    not fulfilled. Its return value is ignored.
189

190
    This method should also update all the parameters of the opcode to
191
    their canonical form if it hasn't been done by ExpandNames before.
192

193
    """
194
    raise NotImplementedError
195

    
196
  def Exec(self, feedback_fn):
197
    """Execute the LU.
198

199
    This method should implement the actual work. It should raise
200
    errors.OpExecError for failures that are somewhat dealt with in
201
    code, or expected.
202

203
    """
204
    raise NotImplementedError
205

    
206
  def BuildHooksEnv(self):
207
    """Build hooks environment for this LU.
208

209
    This method should return a three-node tuple consisting of: a dict
210
    containing the environment that will be used for running the
211
    specific hook for this LU, a list of node names on which the hook
212
    should run before the execution, and a list of node names on which
213
    the hook should run after the execution.
214

215
    The keys of the dict must not have 'GANETI_' prefixed as this will
216
    be handled in the hooks runner. Also note additional keys will be
217
    added by the hooks runner. If the LU doesn't define any
218
    environment, an empty dict (and not None) should be returned.
219

220
    No nodes should be returned as an empty list (and not None).
221

222
    Note that if the HPATH for a LU class is None, this function will
223
    not be called.
224

225
    """
226
    raise NotImplementedError
227

    
228
  def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
229
    """Notify the LU about the results of its hooks.
230

231
    This method is called every time a hooks phase is executed, and notifies
232
    the Logical Unit about the hooks' result. The LU can then use it to alter
233
    its result based on the hooks.  By default the method does nothing and the
234
    previous result is passed back unchanged but any LU can define it if it
235
    wants to use the local cluster hook-scripts somehow.
236

237
    Args:
238
      phase: the hooks phase that has just been run
239
      hooks_results: the results of the multi-node hooks rpc call
240
      feedback_fn: function to send feedback back to the caller
241
      lu_result: the previous result this LU had, or None in the PRE phase.
242

243
    """
244
    return lu_result
245

    
246
  def _ExpandAndLockInstance(self):
247
    """Helper function to expand and lock an instance.
248

249
    Many LUs that work on an instance take its name in self.op.instance_name
250
    and need to expand it and then declare the expanded name for locking. This
251
    function does it, and then updates self.op.instance_name to the expanded
252
    name. It also initializes needed_locks as a dict, if this hasn't been done
253
    before.
254

255
    """
256
    if self.needed_locks is None:
257
      self.needed_locks = {}
258
    else:
259
      assert locking.LEVEL_INSTANCE not in self.needed_locks, \
260
        "_ExpandAndLockInstance called with instance-level locks set"
261
    expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
262
    if expanded_name is None:
263
      raise errors.OpPrereqError("Instance '%s' not known" %
264
                                  self.op.instance_name)
265
    self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
266
    self.op.instance_name = expanded_name
267

    
268
  def _LockInstancesNodes(self, primary_only=False):
269
    """Helper function to declare instances' nodes for locking.
270

271
    This function should be called after locking one or more instances to lock
272
    their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
273
    with all primary or secondary nodes for instances already locked and
274
    present in self.needed_locks[locking.LEVEL_INSTANCE].
275

276
    It should be called from DeclareLocks, and for safety only works if
277
    self.recalculate_locks[locking.LEVEL_NODE] is set.
278

279
    In the future it may grow parameters to just lock some instance's nodes, or
280
    to just lock primaries or secondary nodes, if needed.
281

282
    If should be called in DeclareLocks in a way similar to:
283

284
    if level == locking.LEVEL_NODE:
285
      self._LockInstancesNodes()
286

287
    @type primary_only: boolean
288
    @param primary_only: only lock primary nodes of locked instances
289

290
    """
291
    assert locking.LEVEL_NODE in self.recalculate_locks, \
292
      "_LockInstancesNodes helper function called with no nodes to recalculate"
293

    
294
    # TODO: check if we're really been called with the instance locks held
295

    
296
    # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
297
    # future we might want to have different behaviors depending on the value
298
    # of self.recalculate_locks[locking.LEVEL_NODE]
299
    wanted_nodes = []
300
    for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
301
      instance = self.context.cfg.GetInstanceInfo(instance_name)
302
      wanted_nodes.append(instance.primary_node)
303
      if not primary_only:
304
        wanted_nodes.extend(instance.secondary_nodes)
305

    
306
    if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
307
      self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
308
    elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
309
      self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
310

    
311
    del self.recalculate_locks[locking.LEVEL_NODE]
312

    
313

    
314
class NoHooksLU(LogicalUnit):
315
  """Simple LU which runs no hooks.
316

317
  This LU is intended as a parent for other LogicalUnits which will
318
  run no hooks, in order to reduce duplicate code.
319

320
  """
321
  HPATH = None
322
  HTYPE = None
323

    
324

    
325
def _GetWantedNodes(lu, nodes):
326
  """Returns list of checked and expanded node names.
327

328
  Args:
329
    nodes: List of nodes (strings) or None for all
330

331
  """
332
  if not isinstance(nodes, list):
333
    raise errors.OpPrereqError("Invalid argument type 'nodes'")
334

    
335
  if not nodes:
336
    raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
337
      " non-empty list of nodes whose name is to be expanded.")
338

    
339
  wanted = []
340
  for name in nodes:
341
    node = lu.cfg.ExpandNodeName(name)
342
    if node is None:
343
      raise errors.OpPrereqError("No such node name '%s'" % name)
344
    wanted.append(node)
345

    
346
  return utils.NiceSort(wanted)
347

    
348

    
349
def _GetWantedInstances(lu, instances):
350
  """Returns list of checked and expanded instance names.
351

352
  Args:
353
    instances: List of instances (strings) or None for all
354

355
  """
356
  if not isinstance(instances, list):
357
    raise errors.OpPrereqError("Invalid argument type 'instances'")
358

    
359
  if instances:
360
    wanted = []
361

    
362
    for name in instances:
363
      instance = lu.cfg.ExpandInstanceName(name)
364
      if instance is None:
365
        raise errors.OpPrereqError("No such instance name '%s'" % name)
366
      wanted.append(instance)
367

    
368
  else:
369
    wanted = lu.cfg.GetInstanceList()
370
  return utils.NiceSort(wanted)
371

    
372

    
373
def _CheckOutputFields(static, dynamic, selected):
374
  """Checks whether all selected fields are valid.
375

376
  Args:
377
    static: Static fields
378
    dynamic: Dynamic fields
379

380
  """
381
  static_fields = frozenset(static)
382
  dynamic_fields = frozenset(dynamic)
383

    
384
  all_fields = static_fields | dynamic_fields
385

    
386
  if not all_fields.issuperset(selected):
387
    raise errors.OpPrereqError("Unknown output fields selected: %s"
388
                               % ",".join(frozenset(selected).
389
                                          difference(all_fields)))
390

    
391

    
392
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
393
                          memory, vcpus, nics):
394
  """Builds instance related env variables for hooks from single variables.
395

396
  Args:
397
    secondary_nodes: List of secondary nodes as strings
398
  """
399
  env = {
400
    "OP_TARGET": name,
401
    "INSTANCE_NAME": name,
402
    "INSTANCE_PRIMARY": primary_node,
403
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
404
    "INSTANCE_OS_TYPE": os_type,
405
    "INSTANCE_STATUS": status,
406
    "INSTANCE_MEMORY": memory,
407
    "INSTANCE_VCPUS": vcpus,
408
  }
409

    
410
  if nics:
411
    nic_count = len(nics)
412
    for idx, (ip, bridge, mac) in enumerate(nics):
413
      if ip is None:
414
        ip = ""
415
      env["INSTANCE_NIC%d_IP" % idx] = ip
416
      env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
417
      env["INSTANCE_NIC%d_HWADDR" % idx] = mac
418
  else:
419
    nic_count = 0
420

    
421
  env["INSTANCE_NIC_COUNT"] = nic_count
422

    
423
  return env
424

    
425

    
426
def _BuildInstanceHookEnvByObject(instance, override=None):
427
  """Builds instance related env variables for hooks from an object.
428

429
  Args:
430
    instance: objects.Instance object of instance
431
    override: dict of values to override
432
  """
433
  args = {
434
    'name': instance.name,
435
    'primary_node': instance.primary_node,
436
    'secondary_nodes': instance.secondary_nodes,
437
    'os_type': instance.os,
438
    'status': instance.os,
439
    'memory': instance.memory,
440
    'vcpus': instance.vcpus,
441
    'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
442
  }
443
  if override:
444
    args.update(override)
445
  return _BuildInstanceHookEnv(**args)
446

    
447

    
448
def _CheckInstanceBridgesExist(instance):
449
  """Check that the brigdes needed by an instance exist.
450

451
  """
452
  # check bridges existance
453
  brlist = [nic.bridge for nic in instance.nics]
454
  if not rpc.call_bridges_exist(instance.primary_node, brlist):
455
    raise errors.OpPrereqError("one or more target bridges %s does not"
456
                               " exist on destination node '%s'" %
457
                               (brlist, instance.primary_node))
458

    
459

    
460
class LUDestroyCluster(NoHooksLU):
461
  """Logical unit for destroying the cluster.
462

463
  """
464
  _OP_REQP = []
465

    
466
  def CheckPrereq(self):
467
    """Check prerequisites.
468

469
    This checks whether the cluster is empty.
470

471
    Any errors are signalled by raising errors.OpPrereqError.
472

473
    """
474
    master = self.sstore.GetMasterNode()
475

    
476
    nodelist = self.cfg.GetNodeList()
477
    if len(nodelist) != 1 or nodelist[0] != master:
478
      raise errors.OpPrereqError("There are still %d node(s) in"
479
                                 " this cluster." % (len(nodelist) - 1))
480
    instancelist = self.cfg.GetInstanceList()
481
    if instancelist:
482
      raise errors.OpPrereqError("There are still %d instance(s) in"
483
                                 " this cluster." % len(instancelist))
484

    
485
  def Exec(self, feedback_fn):
486
    """Destroys the cluster.
487

488
    """
489
    master = self.sstore.GetMasterNode()
490
    if not rpc.call_node_stop_master(master, False):
491
      raise errors.OpExecError("Could not disable the master role")
492
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
493
    utils.CreateBackup(priv_key)
494
    utils.CreateBackup(pub_key)
495
    return master
496

    
497

    
498
class LUVerifyCluster(LogicalUnit):
499
  """Verifies the cluster status.
500

501
  """
502
  HPATH = "cluster-verify"
503
  HTYPE = constants.HTYPE_CLUSTER
504
  _OP_REQP = ["skip_checks"]
505
  REQ_BGL = False
506

    
507
  def ExpandNames(self):
508
    self.needed_locks = {
509
      locking.LEVEL_NODE: locking.ALL_SET,
510
      locking.LEVEL_INSTANCE: locking.ALL_SET,
511
    }
512
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
513

    
514
  def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
515
                  remote_version, feedback_fn):
516
    """Run multiple tests against a node.
517

518
    Test list:
519
      - compares ganeti version
520
      - checks vg existance and size > 20G
521
      - checks config file checksum
522
      - checks ssh to other nodes
523

524
    Args:
525
      node: name of the node to check
526
      file_list: required list of files
527
      local_cksum: dictionary of local files and their checksums
528

529
    """
530
    # compares ganeti version
531
    local_version = constants.PROTOCOL_VERSION
532
    if not remote_version:
533
      feedback_fn("  - ERROR: connection to %s failed" % (node))
534
      return True
535

    
536
    if local_version != remote_version:
537
      feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
538
                      (local_version, node, remote_version))
539
      return True
540

    
541
    # checks vg existance and size > 20G
542

    
543
    bad = False
544
    if not vglist:
545
      feedback_fn("  - ERROR: unable to check volume groups on node %s." %
546
                      (node,))
547
      bad = True
548
    else:
549
      vgstatus = utils.CheckVolumeGroupSize(vglist, self.cfg.GetVGName(),
550
                                            constants.MIN_VG_SIZE)
551
      if vgstatus:
552
        feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
553
        bad = True
554

    
555
    # checks config file checksum
556
    # checks ssh to any
557

    
558
    if 'filelist' not in node_result:
559
      bad = True
560
      feedback_fn("  - ERROR: node hasn't returned file checksum data")
561
    else:
562
      remote_cksum = node_result['filelist']
563
      for file_name in file_list:
564
        if file_name not in remote_cksum:
565
          bad = True
566
          feedback_fn("  - ERROR: file '%s' missing" % file_name)
567
        elif remote_cksum[file_name] != local_cksum[file_name]:
568
          bad = True
569
          feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
570

    
571
    if 'nodelist' not in node_result:
572
      bad = True
573
      feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
574
    else:
575
      if node_result['nodelist']:
576
        bad = True
577
        for node in node_result['nodelist']:
578
          feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
579
                          (node, node_result['nodelist'][node]))
580
    if 'node-net-test' not in node_result:
581
      bad = True
582
      feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
583
    else:
584
      if node_result['node-net-test']:
585
        bad = True
586
        nlist = utils.NiceSort(node_result['node-net-test'].keys())
587
        for node in nlist:
588
          feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
589
                          (node, node_result['node-net-test'][node]))
590

    
591
    hyp_result = node_result.get('hypervisor', None)
592
    if hyp_result is not None:
593
      feedback_fn("  - ERROR: hypervisor verify failure: '%s'" % hyp_result)
594
    return bad
595

    
596
  def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
597
                      node_instance, feedback_fn):
598
    """Verify an instance.
599

600
    This function checks to see if the required block devices are
601
    available on the instance's node.
602

603
    """
604
    bad = False
605

    
606
    node_current = instanceconfig.primary_node
607

    
608
    node_vol_should = {}
609
    instanceconfig.MapLVsByNode(node_vol_should)
610

    
611
    for node in node_vol_should:
612
      for volume in node_vol_should[node]:
613
        if node not in node_vol_is or volume not in node_vol_is[node]:
614
          feedback_fn("  - ERROR: volume %s missing on node %s" %
615
                          (volume, node))
616
          bad = True
617

    
618
    if not instanceconfig.status == 'down':
619
      if (node_current not in node_instance or
620
          not instance in node_instance[node_current]):
621
        feedback_fn("  - ERROR: instance %s not running on node %s" %
622
                        (instance, node_current))
623
        bad = True
624

    
625
    for node in node_instance:
626
      if (not node == node_current):
627
        if instance in node_instance[node]:
628
          feedback_fn("  - ERROR: instance %s should not run on node %s" %
629
                          (instance, node))
630
          bad = True
631

    
632
    return bad
633

    
634
  def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
635
    """Verify if there are any unknown volumes in the cluster.
636

637
    The .os, .swap and backup volumes are ignored. All other volumes are
638
    reported as unknown.
639

640
    """
641
    bad = False
642

    
643
    for node in node_vol_is:
644
      for volume in node_vol_is[node]:
645
        if node not in node_vol_should or volume not in node_vol_should[node]:
646
          feedback_fn("  - ERROR: volume %s on node %s should not exist" %
647
                      (volume, node))
648
          bad = True
649
    return bad
650

    
651
  def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
652
    """Verify the list of running instances.
653

654
    This checks what instances are running but unknown to the cluster.
655

656
    """
657
    bad = False
658
    for node in node_instance:
659
      for runninginstance in node_instance[node]:
660
        if runninginstance not in instancelist:
661
          feedback_fn("  - ERROR: instance %s on node %s should not exist" %
662
                          (runninginstance, node))
663
          bad = True
664
    return bad
665

    
666
  def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
667
    """Verify N+1 Memory Resilience.
668

669
    Check that if one single node dies we can still start all the instances it
670
    was primary for.
671

672
    """
673
    bad = False
674

    
675
    for node, nodeinfo in node_info.iteritems():
676
      # This code checks that every node which is now listed as secondary has
677
      # enough memory to host all instances it is supposed to should a single
678
      # other node in the cluster fail.
679
      # FIXME: not ready for failover to an arbitrary node
680
      # FIXME: does not support file-backed instances
681
      # WARNING: we currently take into account down instances as well as up
682
      # ones, considering that even if they're down someone might want to start
683
      # them even in the event of a node failure.
684
      for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
685
        needed_mem = 0
686
        for instance in instances:
687
          needed_mem += instance_cfg[instance].memory
688
        if nodeinfo['mfree'] < needed_mem:
689
          feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
690
                      " failovers should node %s fail" % (node, prinode))
691
          bad = True
692
    return bad
693

    
694
  def CheckPrereq(self):
695
    """Check prerequisites.
696

697
    Transform the list of checks we're going to skip into a set and check that
698
    all its members are valid.
699

700
    """
701
    self.skip_set = frozenset(self.op.skip_checks)
702
    if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
703
      raise errors.OpPrereqError("Invalid checks to be skipped specified")
704

    
705
  def BuildHooksEnv(self):
706
    """Build hooks env.
707

708
    Cluster-Verify hooks just rone in the post phase and their failure makes
709
    the output be logged in the verify output and the verification to fail.
710

711
    """
712
    all_nodes = self.cfg.GetNodeList()
713
    # TODO: populate the environment with useful information for verify hooks
714
    env = {}
715
    return env, [], all_nodes
716

    
717
  def Exec(self, feedback_fn):
718
    """Verify integrity of cluster, performing various test on nodes.
719

720
    """
721
    bad = False
722
    feedback_fn("* Verifying global settings")
723
    for msg in self.cfg.VerifyConfig():
724
      feedback_fn("  - ERROR: %s" % msg)
725

    
726
    vg_name = self.cfg.GetVGName()
727
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
728
    nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
729
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
730
    i_non_redundant = [] # Non redundant instances
731
    node_volume = {}
732
    node_instance = {}
733
    node_info = {}
734
    instance_cfg = {}
735

    
736
    # FIXME: verify OS list
737
    # do local checksums
738
    file_names = list(self.sstore.GetFileList())
739
    file_names.append(constants.SSL_CERT_FILE)
740
    file_names.append(constants.CLUSTER_CONF_FILE)
741
    local_checksums = utils.FingerprintFiles(file_names)
742

    
743
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
744
    all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
745
    all_instanceinfo = rpc.call_instance_list(nodelist)
746
    all_vglist = rpc.call_vg_list(nodelist)
747
    node_verify_param = {
748
      'filelist': file_names,
749
      'nodelist': nodelist,
750
      'hypervisor': None,
751
      'node-net-test': [(node.name, node.primary_ip, node.secondary_ip)
752
                        for node in nodeinfo]
753
      }
754
    all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
755
    all_rversion = rpc.call_version(nodelist)
756
    all_ninfo = rpc.call_node_info(nodelist, self.cfg.GetVGName())
757

    
758
    for node in nodelist:
759
      feedback_fn("* Verifying node %s" % node)
760
      result = self._VerifyNode(node, file_names, local_checksums,
761
                                all_vglist[node], all_nvinfo[node],
762
                                all_rversion[node], feedback_fn)
763
      bad = bad or result
764

    
765
      # node_volume
766
      volumeinfo = all_volumeinfo[node]
767

    
768
      if isinstance(volumeinfo, basestring):
769
        feedback_fn("  - ERROR: LVM problem on node %s: %s" %
770
                    (node, volumeinfo[-400:].encode('string_escape')))
771
        bad = True
772
        node_volume[node] = {}
773
      elif not isinstance(volumeinfo, dict):
774
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
775
        bad = True
776
        continue
777
      else:
778
        node_volume[node] = volumeinfo
779

    
780
      # node_instance
781
      nodeinstance = all_instanceinfo[node]
782
      if type(nodeinstance) != list:
783
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
784
        bad = True
785
        continue
786

    
787
      node_instance[node] = nodeinstance
788

    
789
      # node_info
790
      nodeinfo = all_ninfo[node]
791
      if not isinstance(nodeinfo, dict):
792
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
793
        bad = True
794
        continue
795

    
796
      try:
797
        node_info[node] = {
798
          "mfree": int(nodeinfo['memory_free']),
799
          "dfree": int(nodeinfo['vg_free']),
800
          "pinst": [],
801
          "sinst": [],
802
          # dictionary holding all instances this node is secondary for,
803
          # grouped by their primary node. Each key is a cluster node, and each
804
          # value is a list of instances which have the key as primary and the
805
          # current node as secondary.  this is handy to calculate N+1 memory
806
          # availability if you can only failover from a primary to its
807
          # secondary.
808
          "sinst-by-pnode": {},
809
        }
810
      except ValueError:
811
        feedback_fn("  - ERROR: invalid value returned from node %s" % (node,))
812
        bad = True
813
        continue
814

    
815
    node_vol_should = {}
816

    
817
    for instance in instancelist:
818
      feedback_fn("* Verifying instance %s" % instance)
819
      inst_config = self.cfg.GetInstanceInfo(instance)
820
      result =  self._VerifyInstance(instance, inst_config, node_volume,
821
                                     node_instance, feedback_fn)
822
      bad = bad or result
823

    
824
      inst_config.MapLVsByNode(node_vol_should)
825

    
826
      instance_cfg[instance] = inst_config
827

    
828
      pnode = inst_config.primary_node
829
      if pnode in node_info:
830
        node_info[pnode]['pinst'].append(instance)
831
      else:
832
        feedback_fn("  - ERROR: instance %s, connection to primary node"
833
                    " %s failed" % (instance, pnode))
834
        bad = True
835

    
836
      # If the instance is non-redundant we cannot survive losing its primary
837
      # node, so we are not N+1 compliant. On the other hand we have no disk
838
      # templates with more than one secondary so that situation is not well
839
      # supported either.
840
      # FIXME: does not support file-backed instances
841
      if len(inst_config.secondary_nodes) == 0:
842
        i_non_redundant.append(instance)
843
      elif len(inst_config.secondary_nodes) > 1:
844
        feedback_fn("  - WARNING: multiple secondaries for instance %s"
845
                    % instance)
846

    
847
      for snode in inst_config.secondary_nodes:
848
        if snode in node_info:
849
          node_info[snode]['sinst'].append(instance)
850
          if pnode not in node_info[snode]['sinst-by-pnode']:
851
            node_info[snode]['sinst-by-pnode'][pnode] = []
852
          node_info[snode]['sinst-by-pnode'][pnode].append(instance)
853
        else:
854
          feedback_fn("  - ERROR: instance %s, connection to secondary node"
855
                      " %s failed" % (instance, snode))
856

    
857
    feedback_fn("* Verifying orphan volumes")
858
    result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
859
                                       feedback_fn)
860
    bad = bad or result
861

    
862
    feedback_fn("* Verifying remaining instances")
863
    result = self._VerifyOrphanInstances(instancelist, node_instance,
864
                                         feedback_fn)
865
    bad = bad or result
866

    
867
    if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
868
      feedback_fn("* Verifying N+1 Memory redundancy")
869
      result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
870
      bad = bad or result
871

    
872
    feedback_fn("* Other Notes")
873
    if i_non_redundant:
874
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
875
                  % len(i_non_redundant))
876

    
877
    return not bad
878

    
879
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
880
    """Analize the post-hooks' result, handle it, and send some
881
    nicely-formatted feedback back to the user.
882

883
    Args:
884
      phase: the hooks phase that has just been run
885
      hooks_results: the results of the multi-node hooks rpc call
886
      feedback_fn: function to send feedback back to the caller
887
      lu_result: previous Exec result
888

889
    """
890
    # We only really run POST phase hooks, and are only interested in
891
    # their results
892
    if phase == constants.HOOKS_PHASE_POST:
893
      # Used to change hooks' output to proper indentation
894
      indent_re = re.compile('^', re.M)
895
      feedback_fn("* Hooks Results")
896
      if not hooks_results:
897
        feedback_fn("  - ERROR: general communication failure")
898
        lu_result = 1
899
      else:
900
        for node_name in hooks_results:
901
          show_node_header = True
902
          res = hooks_results[node_name]
903
          if res is False or not isinstance(res, list):
904
            feedback_fn("    Communication failure")
905
            lu_result = 1
906
            continue
907
          for script, hkr, output in res:
908
            if hkr == constants.HKR_FAIL:
909
              # The node header is only shown once, if there are
910
              # failing hooks on that node
911
              if show_node_header:
912
                feedback_fn("  Node %s:" % node_name)
913
                show_node_header = False
914
              feedback_fn("    ERROR: Script %s failed, output:" % script)
915
              output = indent_re.sub('      ', output)
916
              feedback_fn("%s" % output)
917
              lu_result = 1
918

    
919
      return lu_result
920

    
921

    
922
class LUVerifyDisks(NoHooksLU):
923
  """Verifies the cluster disks status.
924

925
  """
926
  _OP_REQP = []
927
  REQ_BGL = False
928

    
929
  def ExpandNames(self):
930
    self.needed_locks = {
931
      locking.LEVEL_NODE: locking.ALL_SET,
932
      locking.LEVEL_INSTANCE: locking.ALL_SET,
933
    }
934
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
935

    
936
  def CheckPrereq(self):
937
    """Check prerequisites.
938

939
    This has no prerequisites.
940

941
    """
942
    pass
943

    
944
  def Exec(self, feedback_fn):
945
    """Verify integrity of cluster disks.
946

947
    """
948
    result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
949

    
950
    vg_name = self.cfg.GetVGName()
951
    nodes = utils.NiceSort(self.cfg.GetNodeList())
952
    instances = [self.cfg.GetInstanceInfo(name)
953
                 for name in self.cfg.GetInstanceList()]
954

    
955
    nv_dict = {}
956
    for inst in instances:
957
      inst_lvs = {}
958
      if (inst.status != "up" or
959
          inst.disk_template not in constants.DTS_NET_MIRROR):
960
        continue
961
      inst.MapLVsByNode(inst_lvs)
962
      # transform { iname: {node: [vol,],},} to {(node, vol): iname}
963
      for node, vol_list in inst_lvs.iteritems():
964
        for vol in vol_list:
965
          nv_dict[(node, vol)] = inst
966

    
967
    if not nv_dict:
968
      return result
969

    
970
    node_lvs = rpc.call_volume_list(nodes, vg_name)
971

    
972
    to_act = set()
973
    for node in nodes:
974
      # node_volume
975
      lvs = node_lvs[node]
976

    
977
      if isinstance(lvs, basestring):
978
        logger.Info("error enumerating LVs on node %s: %s" % (node, lvs))
979
        res_nlvm[node] = lvs
980
      elif not isinstance(lvs, dict):
981
        logger.Info("connection to node %s failed or invalid data returned" %
982
                    (node,))
983
        res_nodes.append(node)
984
        continue
985

    
986
      for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
987
        inst = nv_dict.pop((node, lv_name), None)
988
        if (not lv_online and inst is not None
989
            and inst.name not in res_instances):
990
          res_instances.append(inst.name)
991

    
992
    # any leftover items in nv_dict are missing LVs, let's arrange the
993
    # data better
994
    for key, inst in nv_dict.iteritems():
995
      if inst.name not in res_missing:
996
        res_missing[inst.name] = []
997
      res_missing[inst.name].append(key)
998

    
999
    return result
1000

    
1001

    
1002
class LURenameCluster(LogicalUnit):
1003
  """Rename the cluster.
1004

1005
  """
1006
  HPATH = "cluster-rename"
1007
  HTYPE = constants.HTYPE_CLUSTER
1008
  _OP_REQP = ["name"]
1009
  REQ_WSSTORE = True
1010

    
1011
  def BuildHooksEnv(self):
1012
    """Build hooks env.
1013

1014
    """
1015
    env = {
1016
      "OP_TARGET": self.sstore.GetClusterName(),
1017
      "NEW_NAME": self.op.name,
1018
      }
1019
    mn = self.sstore.GetMasterNode()
1020
    return env, [mn], [mn]
1021

    
1022
  def CheckPrereq(self):
1023
    """Verify that the passed name is a valid one.
1024

1025
    """
1026
    hostname = utils.HostInfo(self.op.name)
1027

    
1028
    new_name = hostname.name
1029
    self.ip = new_ip = hostname.ip
1030
    old_name = self.sstore.GetClusterName()
1031
    old_ip = self.sstore.GetMasterIP()
1032
    if new_name == old_name and new_ip == old_ip:
1033
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
1034
                                 " cluster has changed")
1035
    if new_ip != old_ip:
1036
      if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1037
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
1038
                                   " reachable on the network. Aborting." %
1039
                                   new_ip)
1040

    
1041
    self.op.name = new_name
1042

    
1043
  def Exec(self, feedback_fn):
1044
    """Rename the cluster.
1045

1046
    """
1047
    clustername = self.op.name
1048
    ip = self.ip
1049
    ss = self.sstore
1050

    
1051
    # shutdown the master IP
1052
    master = ss.GetMasterNode()
1053
    if not rpc.call_node_stop_master(master, False):
1054
      raise errors.OpExecError("Could not disable the master role")
1055

    
1056
    try:
1057
      # modify the sstore
1058
      ss.SetKey(ss.SS_MASTER_IP, ip)
1059
      ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
1060

    
1061
      # Distribute updated ss config to all nodes
1062
      myself = self.cfg.GetNodeInfo(master)
1063
      dist_nodes = self.cfg.GetNodeList()
1064
      if myself.name in dist_nodes:
1065
        dist_nodes.remove(myself.name)
1066

    
1067
      logger.Debug("Copying updated ssconf data to all nodes")
1068
      for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1069
        fname = ss.KeyToFilename(keyname)
1070
        result = rpc.call_upload_file(dist_nodes, fname)
1071
        for to_node in dist_nodes:
1072
          if not result[to_node]:
1073
            logger.Error("copy of file %s to node %s failed" %
1074
                         (fname, to_node))
1075
    finally:
1076
      if not rpc.call_node_start_master(master, False):
1077
        logger.Error("Could not re-enable the master role on the master,"
1078
                     " please restart manually.")
1079

    
1080

    
1081
def _RecursiveCheckIfLVMBased(disk):
1082
  """Check if the given disk or its children are lvm-based.
1083

1084
  Args:
1085
    disk: ganeti.objects.Disk object
1086

1087
  Returns:
1088
    boolean indicating whether a LD_LV dev_type was found or not
1089

1090
  """
1091
  if disk.children:
1092
    for chdisk in disk.children:
1093
      if _RecursiveCheckIfLVMBased(chdisk):
1094
        return True
1095
  return disk.dev_type == constants.LD_LV
1096

    
1097

    
1098
class LUSetClusterParams(LogicalUnit):
1099
  """Change the parameters of the cluster.
1100

1101
  """
1102
  HPATH = "cluster-modify"
1103
  HTYPE = constants.HTYPE_CLUSTER
1104
  _OP_REQP = []
1105
  REQ_BGL = False
1106

    
1107
  def ExpandNames(self):
1108
    # FIXME: in the future maybe other cluster params won't require checking on
1109
    # all nodes to be modified.
1110
    self.needed_locks = {
1111
      locking.LEVEL_NODE: locking.ALL_SET,
1112
    }
1113
    self.share_locks[locking.LEVEL_NODE] = 1
1114

    
1115
  def BuildHooksEnv(self):
1116
    """Build hooks env.
1117

1118
    """
1119
    env = {
1120
      "OP_TARGET": self.sstore.GetClusterName(),
1121
      "NEW_VG_NAME": self.op.vg_name,
1122
      }
1123
    mn = self.sstore.GetMasterNode()
1124
    return env, [mn], [mn]
1125

    
1126
  def CheckPrereq(self):
1127
    """Check prerequisites.
1128

1129
    This checks whether the given params don't conflict and
1130
    if the given volume group is valid.
1131

1132
    """
1133
    # FIXME: This only works because there is only one parameter that can be
1134
    # changed or removed.
1135
    if not self.op.vg_name:
1136
      instances = self.cfg.GetAllInstancesInfo().values()
1137
      for inst in instances:
1138
        for disk in inst.disks:
1139
          if _RecursiveCheckIfLVMBased(disk):
1140
            raise errors.OpPrereqError("Cannot disable lvm storage while"
1141
                                       " lvm-based instances exist")
1142

    
1143
    # if vg_name not None, checks given volume group on all nodes
1144
    if self.op.vg_name:
1145
      node_list = self.acquired_locks[locking.LEVEL_NODE]
1146
      vglist = rpc.call_vg_list(node_list)
1147
      for node in node_list:
1148
        vgstatus = utils.CheckVolumeGroupSize(vglist[node], self.op.vg_name,
1149
                                              constants.MIN_VG_SIZE)
1150
        if vgstatus:
1151
          raise errors.OpPrereqError("Error on node '%s': %s" %
1152
                                     (node, vgstatus))
1153

    
1154
  def Exec(self, feedback_fn):
1155
    """Change the parameters of the cluster.
1156

1157
    """
1158
    if self.op.vg_name != self.cfg.GetVGName():
1159
      self.cfg.SetVGName(self.op.vg_name)
1160
    else:
1161
      feedback_fn("Cluster LVM configuration already in desired"
1162
                  " state, not changing")
1163

    
1164

    
1165
def _WaitForSync(cfgw, instance, proc, oneshot=False, unlock=False):
1166
  """Sleep and poll for an instance's disk to sync.
1167

1168
  """
1169
  if not instance.disks:
1170
    return True
1171

    
1172
  if not oneshot:
1173
    proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1174

    
1175
  node = instance.primary_node
1176

    
1177
  for dev in instance.disks:
1178
    cfgw.SetDiskID(dev, node)
1179

    
1180
  retries = 0
1181
  while True:
1182
    max_time = 0
1183
    done = True
1184
    cumul_degraded = False
1185
    rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1186
    if not rstats:
1187
      proc.LogWarning("Can't get any data from node %s" % node)
1188
      retries += 1
1189
      if retries >= 10:
1190
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1191
                                 " aborting." % node)
1192
      time.sleep(6)
1193
      continue
1194
    retries = 0
1195
    for i in range(len(rstats)):
1196
      mstat = rstats[i]
1197
      if mstat is None:
1198
        proc.LogWarning("Can't compute data for node %s/%s" %
1199
                        (node, instance.disks[i].iv_name))
1200
        continue
1201
      # we ignore the ldisk parameter
1202
      perc_done, est_time, is_degraded, _ = mstat
1203
      cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1204
      if perc_done is not None:
1205
        done = False
1206
        if est_time is not None:
1207
          rem_time = "%d estimated seconds remaining" % est_time
1208
          max_time = est_time
1209
        else:
1210
          rem_time = "no time estimate"
1211
        proc.LogInfo("- device %s: %5.2f%% done, %s" %
1212
                     (instance.disks[i].iv_name, perc_done, rem_time))
1213
    if done or oneshot:
1214
      break
1215

    
1216
    time.sleep(min(60, max_time))
1217

    
1218
  if done:
1219
    proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1220
  return not cumul_degraded
1221

    
1222

    
1223
def _CheckDiskConsistency(cfgw, dev, node, on_primary, ldisk=False):
1224
  """Check that mirrors are not degraded.
1225

1226
  The ldisk parameter, if True, will change the test from the
1227
  is_degraded attribute (which represents overall non-ok status for
1228
  the device(s)) to the ldisk (representing the local storage status).
1229

1230
  """
1231
  cfgw.SetDiskID(dev, node)
1232
  if ldisk:
1233
    idx = 6
1234
  else:
1235
    idx = 5
1236

    
1237
  result = True
1238
  if on_primary or dev.AssembleOnSecondary():
1239
    rstats = rpc.call_blockdev_find(node, dev)
1240
    if not rstats:
1241
      logger.ToStderr("Node %s: Disk degraded, not found or node down" % node)
1242
      result = False
1243
    else:
1244
      result = result and (not rstats[idx])
1245
  if dev.children:
1246
    for child in dev.children:
1247
      result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1248

    
1249
  return result
1250

    
1251

    
1252
class LUDiagnoseOS(NoHooksLU):
1253
  """Logical unit for OS diagnose/query.
1254

1255
  """
1256
  _OP_REQP = ["output_fields", "names"]
1257
  REQ_BGL = False
1258

    
1259
  def ExpandNames(self):
1260
    if self.op.names:
1261
      raise errors.OpPrereqError("Selective OS query not supported")
1262

    
1263
    self.dynamic_fields = frozenset(["name", "valid", "node_status"])
1264
    _CheckOutputFields(static=[],
1265
                       dynamic=self.dynamic_fields,
1266
                       selected=self.op.output_fields)
1267

    
1268
    # Lock all nodes, in shared mode
1269
    self.needed_locks = {}
1270
    self.share_locks[locking.LEVEL_NODE] = 1
1271
    self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1272

    
1273
  def CheckPrereq(self):
1274
    """Check prerequisites.
1275

1276
    """
1277

    
1278
  @staticmethod
1279
  def _DiagnoseByOS(node_list, rlist):
1280
    """Remaps a per-node return list into an a per-os per-node dictionary
1281

1282
      Args:
1283
        node_list: a list with the names of all nodes
1284
        rlist: a map with node names as keys and OS objects as values
1285

1286
      Returns:
1287
        map: a map with osnames as keys and as value another map, with
1288
             nodes as
1289
             keys and list of OS objects as values
1290
             e.g. {"debian-etch": {"node1": [<object>,...],
1291
                                   "node2": [<object>,]}
1292
                  }
1293

1294
    """
1295
    all_os = {}
1296
    for node_name, nr in rlist.iteritems():
1297
      if not nr:
1298
        continue
1299
      for os_obj in nr:
1300
        if os_obj.name not in all_os:
1301
          # build a list of nodes for this os containing empty lists
1302
          # for each node in node_list
1303
          all_os[os_obj.name] = {}
1304
          for nname in node_list:
1305
            all_os[os_obj.name][nname] = []
1306
        all_os[os_obj.name][node_name].append(os_obj)
1307
    return all_os
1308

    
1309
  def Exec(self, feedback_fn):
1310
    """Compute the list of OSes.
1311

1312
    """
1313
    node_list = self.acquired_locks[locking.LEVEL_NODE]
1314
    node_data = rpc.call_os_diagnose(node_list)
1315
    if node_data == False:
1316
      raise errors.OpExecError("Can't gather the list of OSes")
1317
    pol = self._DiagnoseByOS(node_list, node_data)
1318
    output = []
1319
    for os_name, os_data in pol.iteritems():
1320
      row = []
1321
      for field in self.op.output_fields:
1322
        if field == "name":
1323
          val = os_name
1324
        elif field == "valid":
1325
          val = utils.all([osl and osl[0] for osl in os_data.values()])
1326
        elif field == "node_status":
1327
          val = {}
1328
          for node_name, nos_list in os_data.iteritems():
1329
            val[node_name] = [(v.status, v.path) for v in nos_list]
1330
        else:
1331
          raise errors.ParameterError(field)
1332
        row.append(val)
1333
      output.append(row)
1334

    
1335
    return output
1336

    
1337

    
1338
class LURemoveNode(LogicalUnit):
1339
  """Logical unit for removing a node.
1340

1341
  """
1342
  HPATH = "node-remove"
1343
  HTYPE = constants.HTYPE_NODE
1344
  _OP_REQP = ["node_name"]
1345

    
1346
  def BuildHooksEnv(self):
1347
    """Build hooks env.
1348

1349
    This doesn't run on the target node in the pre phase as a failed
1350
    node would then be impossible to remove.
1351

1352
    """
1353
    env = {
1354
      "OP_TARGET": self.op.node_name,
1355
      "NODE_NAME": self.op.node_name,
1356
      }
1357
    all_nodes = self.cfg.GetNodeList()
1358
    all_nodes.remove(self.op.node_name)
1359
    return env, all_nodes, all_nodes
1360

    
1361
  def CheckPrereq(self):
1362
    """Check prerequisites.
1363

1364
    This checks:
1365
     - the node exists in the configuration
1366
     - it does not have primary or secondary instances
1367
     - it's not the master
1368

1369
    Any errors are signalled by raising errors.OpPrereqError.
1370

1371
    """
1372
    node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1373
    if node is None:
1374
      raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1375

    
1376
    instance_list = self.cfg.GetInstanceList()
1377

    
1378
    masternode = self.sstore.GetMasterNode()
1379
    if node.name == masternode:
1380
      raise errors.OpPrereqError("Node is the master node,"
1381
                                 " you need to failover first.")
1382

    
1383
    for instance_name in instance_list:
1384
      instance = self.cfg.GetInstanceInfo(instance_name)
1385
      if node.name == instance.primary_node:
1386
        raise errors.OpPrereqError("Instance %s still running on the node,"
1387
                                   " please remove first." % instance_name)
1388
      if node.name in instance.secondary_nodes:
1389
        raise errors.OpPrereqError("Instance %s has node as a secondary,"
1390
                                   " please remove first." % instance_name)
1391
    self.op.node_name = node.name
1392
    self.node = node
1393

    
1394
  def Exec(self, feedback_fn):
1395
    """Removes the node from the cluster.
1396

1397
    """
1398
    node = self.node
1399
    logger.Info("stopping the node daemon and removing configs from node %s" %
1400
                node.name)
1401

    
1402
    self.context.RemoveNode(node.name)
1403

    
1404
    rpc.call_node_leave_cluster(node.name)
1405

    
1406

    
1407
class LUQueryNodes(NoHooksLU):
1408
  """Logical unit for querying nodes.
1409

1410
  """
1411
  _OP_REQP = ["output_fields", "names"]
1412
  REQ_BGL = False
1413

    
1414
  def ExpandNames(self):
1415
    self.dynamic_fields = frozenset([
1416
      "dtotal", "dfree",
1417
      "mtotal", "mnode", "mfree",
1418
      "bootid",
1419
      "ctotal",
1420
      ])
1421

    
1422
    self.static_fields = frozenset([
1423
      "name", "pinst_cnt", "sinst_cnt",
1424
      "pinst_list", "sinst_list",
1425
      "pip", "sip", "tags",
1426
      "serial_no",
1427
      ])
1428

    
1429
    _CheckOutputFields(static=self.static_fields,
1430
                       dynamic=self.dynamic_fields,
1431
                       selected=self.op.output_fields)
1432

    
1433
    self.needed_locks = {}
1434
    self.share_locks[locking.LEVEL_NODE] = 1
1435

    
1436
    if self.op.names:
1437
      self.wanted = _GetWantedNodes(self, self.op.names)
1438
    else:
1439
      self.wanted = locking.ALL_SET
1440

    
1441
    self.do_locking = not self.static_fields.issuperset(self.op.output_fields)
1442
    if self.do_locking:
1443
      # if we don't request only static fields, we need to lock the nodes
1444
      self.needed_locks[locking.LEVEL_NODE] = self.wanted
1445

    
1446

    
1447
  def CheckPrereq(self):
1448
    """Check prerequisites.
1449

1450
    """
1451
    # The validation of the node list is done in the _GetWantedNodes,
1452
    # if non empty, and if empty, there's no validation to do
1453
    pass
1454

    
1455
  def Exec(self, feedback_fn):
1456
    """Computes the list of nodes and their attributes.
1457

1458
    """
1459
    all_info = self.cfg.GetAllNodesInfo()
1460
    if self.do_locking:
1461
      nodenames = self.acquired_locks[locking.LEVEL_NODE]
1462
    elif self.wanted != locking.ALL_SET:
1463
      nodenames = self.wanted
1464
      missing = set(nodenames).difference(all_info.keys())
1465
      if missing:
1466
        raise self.OpExecError(
1467
          "Some nodes were removed before retrieving their data: %s" % missing)
1468
    else:
1469
      nodenames = all_info.keys()
1470
    nodelist = [all_info[name] for name in nodenames]
1471

    
1472
    # begin data gathering
1473

    
1474
    if self.dynamic_fields.intersection(self.op.output_fields):
1475
      live_data = {}
1476
      node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1477
      for name in nodenames:
1478
        nodeinfo = node_data.get(name, None)
1479
        if nodeinfo:
1480
          live_data[name] = {
1481
            "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1482
            "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1483
            "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1484
            "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1485
            "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1486
            "ctotal": utils.TryConvert(int, nodeinfo['cpu_total']),
1487
            "bootid": nodeinfo['bootid'],
1488
            }
1489
        else:
1490
          live_data[name] = {}
1491
    else:
1492
      live_data = dict.fromkeys(nodenames, {})
1493

    
1494
    node_to_primary = dict([(name, set()) for name in nodenames])
1495
    node_to_secondary = dict([(name, set()) for name in nodenames])
1496

    
1497
    inst_fields = frozenset(("pinst_cnt", "pinst_list",
1498
                             "sinst_cnt", "sinst_list"))
1499
    if inst_fields & frozenset(self.op.output_fields):
1500
      instancelist = self.cfg.GetInstanceList()
1501

    
1502
      for instance_name in instancelist:
1503
        inst = self.cfg.GetInstanceInfo(instance_name)
1504
        if inst.primary_node in node_to_primary:
1505
          node_to_primary[inst.primary_node].add(inst.name)
1506
        for secnode in inst.secondary_nodes:
1507
          if secnode in node_to_secondary:
1508
            node_to_secondary[secnode].add(inst.name)
1509

    
1510
    # end data gathering
1511

    
1512
    output = []
1513
    for node in nodelist:
1514
      node_output = []
1515
      for field in self.op.output_fields:
1516
        if field == "name":
1517
          val = node.name
1518
        elif field == "pinst_list":
1519
          val = list(node_to_primary[node.name])
1520
        elif field == "sinst_list":
1521
          val = list(node_to_secondary[node.name])
1522
        elif field == "pinst_cnt":
1523
          val = len(node_to_primary[node.name])
1524
        elif field == "sinst_cnt":
1525
          val = len(node_to_secondary[node.name])
1526
        elif field == "pip":
1527
          val = node.primary_ip
1528
        elif field == "sip":
1529
          val = node.secondary_ip
1530
        elif field == "tags":
1531
          val = list(node.GetTags())
1532
        elif field == "serial_no":
1533
          val = node.serial_no
1534
        elif field in self.dynamic_fields:
1535
          val = live_data[node.name].get(field, None)
1536
        else:
1537
          raise errors.ParameterError(field)
1538
        node_output.append(val)
1539
      output.append(node_output)
1540

    
1541
    return output
1542

    
1543

    
1544
class LUQueryNodeVolumes(NoHooksLU):
1545
  """Logical unit for getting volumes on node(s).
1546

1547
  """
1548
  _OP_REQP = ["nodes", "output_fields"]
1549
  REQ_BGL = False
1550

    
1551
  def ExpandNames(self):
1552
    _CheckOutputFields(static=["node"],
1553
                       dynamic=["phys", "vg", "name", "size", "instance"],
1554
                       selected=self.op.output_fields)
1555

    
1556
    self.needed_locks = {}
1557
    self.share_locks[locking.LEVEL_NODE] = 1
1558
    if not self.op.nodes:
1559
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1560
    else:
1561
      self.needed_locks[locking.LEVEL_NODE] = \
1562
        _GetWantedNodes(self, self.op.nodes)
1563

    
1564
  def CheckPrereq(self):
1565
    """Check prerequisites.
1566

1567
    This checks that the fields required are valid output fields.
1568

1569
    """
1570
    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
1571

    
1572
  def Exec(self, feedback_fn):
1573
    """Computes the list of nodes and their attributes.
1574

1575
    """
1576
    nodenames = self.nodes
1577
    volumes = rpc.call_node_volumes(nodenames)
1578

    
1579
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
1580
             in self.cfg.GetInstanceList()]
1581

    
1582
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1583

    
1584
    output = []
1585
    for node in nodenames:
1586
      if node not in volumes or not volumes[node]:
1587
        continue
1588

    
1589
      node_vols = volumes[node][:]
1590
      node_vols.sort(key=lambda vol: vol['dev'])
1591

    
1592
      for vol in node_vols:
1593
        node_output = []
1594
        for field in self.op.output_fields:
1595
          if field == "node":
1596
            val = node
1597
          elif field == "phys":
1598
            val = vol['dev']
1599
          elif field == "vg":
1600
            val = vol['vg']
1601
          elif field == "name":
1602
            val = vol['name']
1603
          elif field == "size":
1604
            val = int(float(vol['size']))
1605
          elif field == "instance":
1606
            for inst in ilist:
1607
              if node not in lv_by_node[inst]:
1608
                continue
1609
              if vol['name'] in lv_by_node[inst][node]:
1610
                val = inst.name
1611
                break
1612
            else:
1613
              val = '-'
1614
          else:
1615
            raise errors.ParameterError(field)
1616
          node_output.append(str(val))
1617

    
1618
        output.append(node_output)
1619

    
1620
    return output
1621

    
1622

    
1623
class LUAddNode(LogicalUnit):
1624
  """Logical unit for adding node to the cluster.
1625

1626
  """
1627
  HPATH = "node-add"
1628
  HTYPE = constants.HTYPE_NODE
1629
  _OP_REQP = ["node_name"]
1630

    
1631
  def BuildHooksEnv(self):
1632
    """Build hooks env.
1633

1634
    This will run on all nodes before, and on all nodes + the new node after.
1635

1636
    """
1637
    env = {
1638
      "OP_TARGET": self.op.node_name,
1639
      "NODE_NAME": self.op.node_name,
1640
      "NODE_PIP": self.op.primary_ip,
1641
      "NODE_SIP": self.op.secondary_ip,
1642
      }
1643
    nodes_0 = self.cfg.GetNodeList()
1644
    nodes_1 = nodes_0 + [self.op.node_name, ]
1645
    return env, nodes_0, nodes_1
1646

    
1647
  def CheckPrereq(self):
1648
    """Check prerequisites.
1649

1650
    This checks:
1651
     - the new node is not already in the config
1652
     - it is resolvable
1653
     - its parameters (single/dual homed) matches the cluster
1654

1655
    Any errors are signalled by raising errors.OpPrereqError.
1656

1657
    """
1658
    node_name = self.op.node_name
1659
    cfg = self.cfg
1660

    
1661
    dns_data = utils.HostInfo(node_name)
1662

    
1663
    node = dns_data.name
1664
    primary_ip = self.op.primary_ip = dns_data.ip
1665
    secondary_ip = getattr(self.op, "secondary_ip", None)
1666
    if secondary_ip is None:
1667
      secondary_ip = primary_ip
1668
    if not utils.IsValidIP(secondary_ip):
1669
      raise errors.OpPrereqError("Invalid secondary IP given")
1670
    self.op.secondary_ip = secondary_ip
1671

    
1672
    node_list = cfg.GetNodeList()
1673
    if not self.op.readd and node in node_list:
1674
      raise errors.OpPrereqError("Node %s is already in the configuration" %
1675
                                 node)
1676
    elif self.op.readd and node not in node_list:
1677
      raise errors.OpPrereqError("Node %s is not in the configuration" % node)
1678

    
1679
    for existing_node_name in node_list:
1680
      existing_node = cfg.GetNodeInfo(existing_node_name)
1681

    
1682
      if self.op.readd and node == existing_node_name:
1683
        if (existing_node.primary_ip != primary_ip or
1684
            existing_node.secondary_ip != secondary_ip):
1685
          raise errors.OpPrereqError("Readded node doesn't have the same IP"
1686
                                     " address configuration as before")
1687
        continue
1688

    
1689
      if (existing_node.primary_ip == primary_ip or
1690
          existing_node.secondary_ip == primary_ip or
1691
          existing_node.primary_ip == secondary_ip or
1692
          existing_node.secondary_ip == secondary_ip):
1693
        raise errors.OpPrereqError("New node ip address(es) conflict with"
1694
                                   " existing node %s" % existing_node.name)
1695

    
1696
    # check that the type of the node (single versus dual homed) is the
1697
    # same as for the master
1698
    myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1699
    master_singlehomed = myself.secondary_ip == myself.primary_ip
1700
    newbie_singlehomed = secondary_ip == primary_ip
1701
    if master_singlehomed != newbie_singlehomed:
1702
      if master_singlehomed:
1703
        raise errors.OpPrereqError("The master has no private ip but the"
1704
                                   " new node has one")
1705
      else:
1706
        raise errors.OpPrereqError("The master has a private ip but the"
1707
                                   " new node doesn't have one")
1708

    
1709
    # checks reachablity
1710
    if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1711
      raise errors.OpPrereqError("Node not reachable by ping")
1712

    
1713
    if not newbie_singlehomed:
1714
      # check reachability from my secondary ip to newbie's secondary ip
1715
      if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1716
                           source=myself.secondary_ip):
1717
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1718
                                   " based ping to noded port")
1719

    
1720
    self.new_node = objects.Node(name=node,
1721
                                 primary_ip=primary_ip,
1722
                                 secondary_ip=secondary_ip)
1723

    
1724
  def Exec(self, feedback_fn):
1725
    """Adds the new node to the cluster.
1726

1727
    """
1728
    new_node = self.new_node
1729
    node = new_node.name
1730

    
1731
    # check connectivity
1732
    result = rpc.call_version([node])[node]
1733
    if result:
1734
      if constants.PROTOCOL_VERSION == result:
1735
        logger.Info("communication to node %s fine, sw version %s match" %
1736
                    (node, result))
1737
      else:
1738
        raise errors.OpExecError("Version mismatch master version %s,"
1739
                                 " node version %s" %
1740
                                 (constants.PROTOCOL_VERSION, result))
1741
    else:
1742
      raise errors.OpExecError("Cannot get version from the new node")
1743

    
1744
    # setup ssh on node
1745
    logger.Info("copy ssh key to node %s" % node)
1746
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1747
    keyarray = []
1748
    keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1749
                constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1750
                priv_key, pub_key]
1751

    
1752
    for i in keyfiles:
1753
      f = open(i, 'r')
1754
      try:
1755
        keyarray.append(f.read())
1756
      finally:
1757
        f.close()
1758

    
1759
    result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1760
                               keyarray[3], keyarray[4], keyarray[5])
1761

    
1762
    if not result:
1763
      raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1764

    
1765
    # Add node to our /etc/hosts, and add key to known_hosts
1766
    utils.AddHostToEtcHosts(new_node.name)
1767

    
1768
    if new_node.secondary_ip != new_node.primary_ip:
1769
      if not rpc.call_node_tcp_ping(new_node.name,
1770
                                    constants.LOCALHOST_IP_ADDRESS,
1771
                                    new_node.secondary_ip,
1772
                                    constants.DEFAULT_NODED_PORT,
1773
                                    10, False):
1774
        raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1775
                                 " you gave (%s). Please fix and re-run this"
1776
                                 " command." % new_node.secondary_ip)
1777

    
1778
    node_verify_list = [self.sstore.GetMasterNode()]
1779
    node_verify_param = {
1780
      'nodelist': [node],
1781
      # TODO: do a node-net-test as well?
1782
    }
1783

    
1784
    result = rpc.call_node_verify(node_verify_list, node_verify_param)
1785
    for verifier in node_verify_list:
1786
      if not result[verifier]:
1787
        raise errors.OpExecError("Cannot communicate with %s's node daemon"
1788
                                 " for remote verification" % verifier)
1789
      if result[verifier]['nodelist']:
1790
        for failed in result[verifier]['nodelist']:
1791
          feedback_fn("ssh/hostname verification failed %s -> %s" %
1792
                      (verifier, result[verifier]['nodelist'][failed]))
1793
        raise errors.OpExecError("ssh/hostname verification failed.")
1794

    
1795
    # Distribute updated /etc/hosts and known_hosts to all nodes,
1796
    # including the node just added
1797
    myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1798
    dist_nodes = self.cfg.GetNodeList()
1799
    if not self.op.readd:
1800
      dist_nodes.append(node)
1801
    if myself.name in dist_nodes:
1802
      dist_nodes.remove(myself.name)
1803

    
1804
    logger.Debug("Copying hosts and known_hosts to all nodes")
1805
    for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1806
      result = rpc.call_upload_file(dist_nodes, fname)
1807
      for to_node in dist_nodes:
1808
        if not result[to_node]:
1809
          logger.Error("copy of file %s to node %s failed" %
1810
                       (fname, to_node))
1811

    
1812
    to_copy = self.sstore.GetFileList()
1813
    if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1814
      to_copy.append(constants.VNC_PASSWORD_FILE)
1815
    for fname in to_copy:
1816
      result = rpc.call_upload_file([node], fname)
1817
      if not result[node]:
1818
        logger.Error("could not copy file %s to node %s" % (fname, node))
1819

    
1820
    if self.op.readd:
1821
      self.context.ReaddNode(new_node)
1822
    else:
1823
      self.context.AddNode(new_node)
1824

    
1825

    
1826
class LUQueryClusterInfo(NoHooksLU):
1827
  """Query cluster configuration.
1828

1829
  """
1830
  _OP_REQP = []
1831
  REQ_MASTER = False
1832
  REQ_BGL = False
1833

    
1834
  def ExpandNames(self):
1835
    self.needed_locks = {}
1836

    
1837
  def CheckPrereq(self):
1838
    """No prerequsites needed for this LU.
1839

1840
    """
1841
    pass
1842

    
1843
  def Exec(self, feedback_fn):
1844
    """Return cluster config.
1845

1846
    """
1847
    result = {
1848
      "name": self.sstore.GetClusterName(),
1849
      "software_version": constants.RELEASE_VERSION,
1850
      "protocol_version": constants.PROTOCOL_VERSION,
1851
      "config_version": constants.CONFIG_VERSION,
1852
      "os_api_version": constants.OS_API_VERSION,
1853
      "export_version": constants.EXPORT_VERSION,
1854
      "master": self.sstore.GetMasterNode(),
1855
      "architecture": (platform.architecture()[0], platform.machine()),
1856
      "hypervisor_type": self.sstore.GetHypervisorType(),
1857
      }
1858

    
1859
    return result
1860

    
1861

    
1862
class LUQueryConfigValues(NoHooksLU):
1863
  """Return configuration values.
1864

1865
  """
1866
  _OP_REQP = []
1867
  REQ_BGL = False
1868

    
1869
  def ExpandNames(self):
1870
    self.needed_locks = {}
1871

    
1872
    static_fields = ["cluster_name", "master_node"]
1873
    _CheckOutputFields(static=static_fields,
1874
                       dynamic=[],
1875
                       selected=self.op.output_fields)
1876

    
1877
  def CheckPrereq(self):
1878
    """No prerequisites.
1879

1880
    """
1881
    pass
1882

    
1883
  def Exec(self, feedback_fn):
1884
    """Dump a representation of the cluster config to the standard output.
1885

1886
    """
1887
    values = []
1888
    for field in self.op.output_fields:
1889
      if field == "cluster_name":
1890
        values.append(self.cfg.GetClusterName())
1891
      elif field == "master_node":
1892
        values.append(self.cfg.GetMasterNode())
1893
      else:
1894
        raise errors.ParameterError(field)
1895
    return values
1896

    
1897

    
1898
class LUActivateInstanceDisks(NoHooksLU):
1899
  """Bring up an instance's disks.
1900

1901
  """
1902
  _OP_REQP = ["instance_name"]
1903
  REQ_BGL = False
1904

    
1905
  def ExpandNames(self):
1906
    self._ExpandAndLockInstance()
1907
    self.needed_locks[locking.LEVEL_NODE] = []
1908
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1909

    
1910
  def DeclareLocks(self, level):
1911
    if level == locking.LEVEL_NODE:
1912
      self._LockInstancesNodes()
1913

    
1914
  def CheckPrereq(self):
1915
    """Check prerequisites.
1916

1917
    This checks that the instance is in the cluster.
1918

1919
    """
1920
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
1921
    assert self.instance is not None, \
1922
      "Cannot retrieve locked instance %s" % self.op.instance_name
1923

    
1924
  def Exec(self, feedback_fn):
1925
    """Activate the disks.
1926

1927
    """
1928
    disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
1929
    if not disks_ok:
1930
      raise errors.OpExecError("Cannot activate block devices")
1931

    
1932
    return disks_info
1933

    
1934

    
1935
def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
1936
  """Prepare the block devices for an instance.
1937

1938
  This sets up the block devices on all nodes.
1939

1940
  Args:
1941
    instance: a ganeti.objects.Instance object
1942
    ignore_secondaries: if true, errors on secondary nodes won't result
1943
                        in an error return from the function
1944

1945
  Returns:
1946
    false if the operation failed
1947
    list of (host, instance_visible_name, node_visible_name) if the operation
1948
         suceeded with the mapping from node devices to instance devices
1949
  """
1950
  device_info = []
1951
  disks_ok = True
1952
  iname = instance.name
1953
  # With the two passes mechanism we try to reduce the window of
1954
  # opportunity for the race condition of switching DRBD to primary
1955
  # before handshaking occured, but we do not eliminate it
1956

    
1957
  # The proper fix would be to wait (with some limits) until the
1958
  # connection has been made and drbd transitions from WFConnection
1959
  # into any other network-connected state (Connected, SyncTarget,
1960
  # SyncSource, etc.)
1961

    
1962
  # 1st pass, assemble on all nodes in secondary mode
1963
  for inst_disk in instance.disks:
1964
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1965
      cfg.SetDiskID(node_disk, node)
1966
      result = rpc.call_blockdev_assemble(node, node_disk, iname, False)
1967
      if not result:
1968
        logger.Error("could not prepare block device %s on node %s"
1969
                     " (is_primary=False, pass=1)" % (inst_disk.iv_name, node))
1970
        if not ignore_secondaries:
1971
          disks_ok = False
1972

    
1973
  # FIXME: race condition on drbd migration to primary
1974

    
1975
  # 2nd pass, do only the primary node
1976
  for inst_disk in instance.disks:
1977
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1978
      if node != instance.primary_node:
1979
        continue
1980
      cfg.SetDiskID(node_disk, node)
1981
      result = rpc.call_blockdev_assemble(node, node_disk, iname, True)
1982
      if not result:
1983
        logger.Error("could not prepare block device %s on node %s"
1984
                     " (is_primary=True, pass=2)" % (inst_disk.iv_name, node))
1985
        disks_ok = False
1986
    device_info.append((instance.primary_node, inst_disk.iv_name, result))
1987

    
1988
  # leave the disks configured for the primary node
1989
  # this is a workaround that would be fixed better by
1990
  # improving the logical/physical id handling
1991
  for disk in instance.disks:
1992
    cfg.SetDiskID(disk, instance.primary_node)
1993

    
1994
  return disks_ok, device_info
1995

    
1996

    
1997
def _StartInstanceDisks(cfg, instance, force):
1998
  """Start the disks of an instance.
1999

2000
  """
2001
  disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
2002
                                           ignore_secondaries=force)
2003
  if not disks_ok:
2004
    _ShutdownInstanceDisks(instance, cfg)
2005
    if force is not None and not force:
2006
      logger.Error("If the message above refers to a secondary node,"
2007
                   " you can retry the operation using '--force'.")
2008
    raise errors.OpExecError("Disk consistency error")
2009

    
2010

    
2011
class LUDeactivateInstanceDisks(NoHooksLU):
2012
  """Shutdown an instance's disks.
2013

2014
  """
2015
  _OP_REQP = ["instance_name"]
2016
  REQ_BGL = False
2017

    
2018
  def ExpandNames(self):
2019
    self._ExpandAndLockInstance()
2020
    self.needed_locks[locking.LEVEL_NODE] = []
2021
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2022

    
2023
  def DeclareLocks(self, level):
2024
    if level == locking.LEVEL_NODE:
2025
      self._LockInstancesNodes()
2026

    
2027
  def CheckPrereq(self):
2028
    """Check prerequisites.
2029

2030
    This checks that the instance is in the cluster.
2031

2032
    """
2033
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2034
    assert self.instance is not None, \
2035
      "Cannot retrieve locked instance %s" % self.op.instance_name
2036

    
2037
  def Exec(self, feedback_fn):
2038
    """Deactivate the disks
2039

2040
    """
2041
    instance = self.instance
2042
    _SafeShutdownInstanceDisks(instance, self.cfg)
2043

    
2044

    
2045
def _SafeShutdownInstanceDisks(instance, cfg):
2046
  """Shutdown block devices of an instance.
2047

2048
  This function checks if an instance is running, before calling
2049
  _ShutdownInstanceDisks.
2050

2051
  """
2052
  ins_l = rpc.call_instance_list([instance.primary_node])
2053
  ins_l = ins_l[instance.primary_node]
2054
  if not type(ins_l) is list:
2055
    raise errors.OpExecError("Can't contact node '%s'" %
2056
                             instance.primary_node)
2057

    
2058
  if instance.name in ins_l:
2059
    raise errors.OpExecError("Instance is running, can't shutdown"
2060
                             " block devices.")
2061

    
2062
  _ShutdownInstanceDisks(instance, cfg)
2063

    
2064

    
2065
def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
2066
  """Shutdown block devices of an instance.
2067

2068
  This does the shutdown on all nodes of the instance.
2069

2070
  If the ignore_primary is false, errors on the primary node are
2071
  ignored.
2072

2073
  """
2074
  result = True
2075
  for disk in instance.disks:
2076
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2077
      cfg.SetDiskID(top_disk, node)
2078
      if not rpc.call_blockdev_shutdown(node, top_disk):
2079
        logger.Error("could not shutdown block device %s on node %s" %
2080
                     (disk.iv_name, node))
2081
        if not ignore_primary or node != instance.primary_node:
2082
          result = False
2083
  return result
2084

    
2085

    
2086
def _CheckNodeFreeMemory(cfg, node, reason, requested):
2087
  """Checks if a node has enough free memory.
2088

2089
  This function check if a given node has the needed amount of free
2090
  memory. In case the node has less memory or we cannot get the
2091
  information from the node, this function raise an OpPrereqError
2092
  exception.
2093

2094
  Args:
2095
    - cfg: a ConfigWriter instance
2096
    - node: the node name
2097
    - reason: string to use in the error message
2098
    - requested: the amount of memory in MiB
2099

2100
  """
2101
  nodeinfo = rpc.call_node_info([node], cfg.GetVGName())
2102
  if not nodeinfo or not isinstance(nodeinfo, dict):
2103
    raise errors.OpPrereqError("Could not contact node %s for resource"
2104
                             " information" % (node,))
2105

    
2106
  free_mem = nodeinfo[node].get('memory_free')
2107
  if not isinstance(free_mem, int):
2108
    raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2109
                             " was '%s'" % (node, free_mem))
2110
  if requested > free_mem:
2111
    raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2112
                             " needed %s MiB, available %s MiB" %
2113
                             (node, reason, requested, free_mem))
2114

    
2115

    
2116
class LUStartupInstance(LogicalUnit):
2117
  """Starts an instance.
2118

2119
  """
2120
  HPATH = "instance-start"
2121
  HTYPE = constants.HTYPE_INSTANCE
2122
  _OP_REQP = ["instance_name", "force"]
2123
  REQ_BGL = False
2124

    
2125
  def ExpandNames(self):
2126
    self._ExpandAndLockInstance()
2127
    self.needed_locks[locking.LEVEL_NODE] = []
2128
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2129

    
2130
  def DeclareLocks(self, level):
2131
    if level == locking.LEVEL_NODE:
2132
      self._LockInstancesNodes()
2133

    
2134
  def BuildHooksEnv(self):
2135
    """Build hooks env.
2136

2137
    This runs on master, primary and secondary nodes of the instance.
2138

2139
    """
2140
    env = {
2141
      "FORCE": self.op.force,
2142
      }
2143
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2144
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2145
          list(self.instance.secondary_nodes))
2146
    return env, nl, nl
2147

    
2148
  def CheckPrereq(self):
2149
    """Check prerequisites.
2150

2151
    This checks that the instance is in the cluster.
2152

2153
    """
2154
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2155
    assert self.instance is not None, \
2156
      "Cannot retrieve locked instance %s" % self.op.instance_name
2157

    
2158
    # check bridges existance
2159
    _CheckInstanceBridgesExist(instance)
2160

    
2161
    _CheckNodeFreeMemory(self.cfg, instance.primary_node,
2162
                         "starting instance %s" % instance.name,
2163
                         instance.memory)
2164

    
2165
  def Exec(self, feedback_fn):
2166
    """Start the instance.
2167

2168
    """
2169
    instance = self.instance
2170
    force = self.op.force
2171
    extra_args = getattr(self.op, "extra_args", "")
2172

    
2173
    self.cfg.MarkInstanceUp(instance.name)
2174

    
2175
    node_current = instance.primary_node
2176

    
2177
    _StartInstanceDisks(self.cfg, instance, force)
2178

    
2179
    if not rpc.call_instance_start(node_current, instance, extra_args):
2180
      _ShutdownInstanceDisks(instance, self.cfg)
2181
      raise errors.OpExecError("Could not start instance")
2182

    
2183

    
2184
class LURebootInstance(LogicalUnit):
2185
  """Reboot an instance.
2186

2187
  """
2188
  HPATH = "instance-reboot"
2189
  HTYPE = constants.HTYPE_INSTANCE
2190
  _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2191
  REQ_BGL = False
2192

    
2193
  def ExpandNames(self):
2194
    if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2195
                                   constants.INSTANCE_REBOOT_HARD,
2196
                                   constants.INSTANCE_REBOOT_FULL]:
2197
      raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2198
                                  (constants.INSTANCE_REBOOT_SOFT,
2199
                                   constants.INSTANCE_REBOOT_HARD,
2200
                                   constants.INSTANCE_REBOOT_FULL))
2201
    self._ExpandAndLockInstance()
2202
    self.needed_locks[locking.LEVEL_NODE] = []
2203
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2204

    
2205
  def DeclareLocks(self, level):
2206
    if level == locking.LEVEL_NODE:
2207
      primary_only = not constants.INSTANCE_REBOOT_FULL
2208
      self._LockInstancesNodes(primary_only=primary_only)
2209

    
2210
  def BuildHooksEnv(self):
2211
    """Build hooks env.
2212

2213
    This runs on master, primary and secondary nodes of the instance.
2214

2215
    """
2216
    env = {
2217
      "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2218
      }
2219
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2220
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2221
          list(self.instance.secondary_nodes))
2222
    return env, nl, nl
2223

    
2224
  def CheckPrereq(self):
2225
    """Check prerequisites.
2226

2227
    This checks that the instance is in the cluster.
2228

2229
    """
2230
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2231
    assert self.instance is not None, \
2232
      "Cannot retrieve locked instance %s" % self.op.instance_name
2233

    
2234
    # check bridges existance
2235
    _CheckInstanceBridgesExist(instance)
2236

    
2237
  def Exec(self, feedback_fn):
2238
    """Reboot the instance.
2239

2240
    """
2241
    instance = self.instance
2242
    ignore_secondaries = self.op.ignore_secondaries
2243
    reboot_type = self.op.reboot_type
2244
    extra_args = getattr(self.op, "extra_args", "")
2245

    
2246
    node_current = instance.primary_node
2247

    
2248
    if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2249
                       constants.INSTANCE_REBOOT_HARD]:
2250
      if not rpc.call_instance_reboot(node_current, instance,
2251
                                      reboot_type, extra_args):
2252
        raise errors.OpExecError("Could not reboot instance")
2253
    else:
2254
      if not rpc.call_instance_shutdown(node_current, instance):
2255
        raise errors.OpExecError("could not shutdown instance for full reboot")
2256
      _ShutdownInstanceDisks(instance, self.cfg)
2257
      _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
2258
      if not rpc.call_instance_start(node_current, instance, extra_args):
2259
        _ShutdownInstanceDisks(instance, self.cfg)
2260
        raise errors.OpExecError("Could not start instance for full reboot")
2261

    
2262
    self.cfg.MarkInstanceUp(instance.name)
2263

    
2264

    
2265
class LUShutdownInstance(LogicalUnit):
2266
  """Shutdown an instance.
2267

2268
  """
2269
  HPATH = "instance-stop"
2270
  HTYPE = constants.HTYPE_INSTANCE
2271
  _OP_REQP = ["instance_name"]
2272
  REQ_BGL = False
2273

    
2274
  def ExpandNames(self):
2275
    self._ExpandAndLockInstance()
2276
    self.needed_locks[locking.LEVEL_NODE] = []
2277
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2278

    
2279
  def DeclareLocks(self, level):
2280
    if level == locking.LEVEL_NODE:
2281
      self._LockInstancesNodes()
2282

    
2283
  def BuildHooksEnv(self):
2284
    """Build hooks env.
2285

2286
    This runs on master, primary and secondary nodes of the instance.
2287

2288
    """
2289
    env = _BuildInstanceHookEnvByObject(self.instance)
2290
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2291
          list(self.instance.secondary_nodes))
2292
    return env, nl, nl
2293

    
2294
  def CheckPrereq(self):
2295
    """Check prerequisites.
2296

2297
    This checks that the instance is in the cluster.
2298

2299
    """
2300
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2301
    assert self.instance is not None, \
2302
      "Cannot retrieve locked instance %s" % self.op.instance_name
2303

    
2304
  def Exec(self, feedback_fn):
2305
    """Shutdown the instance.
2306

2307
    """
2308
    instance = self.instance
2309
    node_current = instance.primary_node
2310
    self.cfg.MarkInstanceDown(instance.name)
2311
    if not rpc.call_instance_shutdown(node_current, instance):
2312
      logger.Error("could not shutdown instance")
2313

    
2314
    _ShutdownInstanceDisks(instance, self.cfg)
2315

    
2316

    
2317
class LUReinstallInstance(LogicalUnit):
2318
  """Reinstall an instance.
2319

2320
  """
2321
  HPATH = "instance-reinstall"
2322
  HTYPE = constants.HTYPE_INSTANCE
2323
  _OP_REQP = ["instance_name"]
2324
  REQ_BGL = False
2325

    
2326
  def ExpandNames(self):
2327
    self._ExpandAndLockInstance()
2328
    self.needed_locks[locking.LEVEL_NODE] = []
2329
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2330

    
2331
  def DeclareLocks(self, level):
2332
    if level == locking.LEVEL_NODE:
2333
      self._LockInstancesNodes()
2334

    
2335
  def BuildHooksEnv(self):
2336
    """Build hooks env.
2337

2338
    This runs on master, primary and secondary nodes of the instance.
2339

2340
    """
2341
    env = _BuildInstanceHookEnvByObject(self.instance)
2342
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2343
          list(self.instance.secondary_nodes))
2344
    return env, nl, nl
2345

    
2346
  def CheckPrereq(self):
2347
    """Check prerequisites.
2348

2349
    This checks that the instance is in the cluster and is not running.
2350

2351
    """
2352
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2353
    assert instance is not None, \
2354
      "Cannot retrieve locked instance %s" % self.op.instance_name
2355

    
2356
    if instance.disk_template == constants.DT_DISKLESS:
2357
      raise errors.OpPrereqError("Instance '%s' has no disks" %
2358
                                 self.op.instance_name)
2359
    if instance.status != "down":
2360
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2361
                                 self.op.instance_name)
2362
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2363
    if remote_info:
2364
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2365
                                 (self.op.instance_name,
2366
                                  instance.primary_node))
2367

    
2368
    self.op.os_type = getattr(self.op, "os_type", None)
2369
    if self.op.os_type is not None:
2370
      # OS verification
2371
      pnode = self.cfg.GetNodeInfo(
2372
        self.cfg.ExpandNodeName(instance.primary_node))
2373
      if pnode is None:
2374
        raise errors.OpPrereqError("Primary node '%s' is unknown" %
2375
                                   self.op.pnode)
2376
      os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2377
      if not os_obj:
2378
        raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2379
                                   " primary node"  % self.op.os_type)
2380

    
2381
    self.instance = instance
2382

    
2383
  def Exec(self, feedback_fn):
2384
    """Reinstall the instance.
2385

2386
    """
2387
    inst = self.instance
2388

    
2389
    if self.op.os_type is not None:
2390
      feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2391
      inst.os = self.op.os_type
2392
      self.cfg.Update(inst)
2393

    
2394
    _StartInstanceDisks(self.cfg, inst, None)
2395
    try:
2396
      feedback_fn("Running the instance OS create scripts...")
2397
      if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2398
        raise errors.OpExecError("Could not install OS for instance %s"
2399
                                 " on node %s" %
2400
                                 (inst.name, inst.primary_node))
2401
    finally:
2402
      _ShutdownInstanceDisks(inst, self.cfg)
2403

    
2404

    
2405
class LURenameInstance(LogicalUnit):
2406
  """Rename an instance.
2407

2408
  """
2409
  HPATH = "instance-rename"
2410
  HTYPE = constants.HTYPE_INSTANCE
2411
  _OP_REQP = ["instance_name", "new_name"]
2412

    
2413
  def BuildHooksEnv(self):
2414
    """Build hooks env.
2415

2416
    This runs on master, primary and secondary nodes of the instance.
2417

2418
    """
2419
    env = _BuildInstanceHookEnvByObject(self.instance)
2420
    env["INSTANCE_NEW_NAME"] = self.op.new_name
2421
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2422
          list(self.instance.secondary_nodes))
2423
    return env, nl, nl
2424

    
2425
  def CheckPrereq(self):
2426
    """Check prerequisites.
2427

2428
    This checks that the instance is in the cluster and is not running.
2429

2430
    """
2431
    instance = self.cfg.GetInstanceInfo(
2432
      self.cfg.ExpandInstanceName(self.op.instance_name))
2433
    if instance is None:
2434
      raise errors.OpPrereqError("Instance '%s' not known" %
2435
                                 self.op.instance_name)
2436
    if instance.status != "down":
2437
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2438
                                 self.op.instance_name)
2439
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2440
    if remote_info:
2441
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2442
                                 (self.op.instance_name,
2443
                                  instance.primary_node))
2444
    self.instance = instance
2445

    
2446
    # new name verification
2447
    name_info = utils.HostInfo(self.op.new_name)
2448

    
2449
    self.op.new_name = new_name = name_info.name
2450
    instance_list = self.cfg.GetInstanceList()
2451
    if new_name in instance_list:
2452
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2453
                                 new_name)
2454

    
2455
    if not getattr(self.op, "ignore_ip", False):
2456
      if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
2457
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
2458
                                   (name_info.ip, new_name))
2459

    
2460

    
2461
  def Exec(self, feedback_fn):
2462
    """Reinstall the instance.
2463

2464
    """
2465
    inst = self.instance
2466
    old_name = inst.name
2467

    
2468
    if inst.disk_template == constants.DT_FILE:
2469
      old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2470

    
2471
    self.cfg.RenameInstance(inst.name, self.op.new_name)
2472
    # Change the instance lock. This is definitely safe while we hold the BGL
2473
    self.context.glm.remove(locking.LEVEL_INSTANCE, inst.name)
2474
    self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
2475

    
2476
    # re-read the instance from the configuration after rename
2477
    inst = self.cfg.GetInstanceInfo(self.op.new_name)
2478

    
2479
    if inst.disk_template == constants.DT_FILE:
2480
      new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2481
      result = rpc.call_file_storage_dir_rename(inst.primary_node,
2482
                                                old_file_storage_dir,
2483
                                                new_file_storage_dir)
2484

    
2485
      if not result:
2486
        raise errors.OpExecError("Could not connect to node '%s' to rename"
2487
                                 " directory '%s' to '%s' (but the instance"
2488
                                 " has been renamed in Ganeti)" % (
2489
                                 inst.primary_node, old_file_storage_dir,
2490
                                 new_file_storage_dir))
2491

    
2492
      if not result[0]:
2493
        raise errors.OpExecError("Could not rename directory '%s' to '%s'"
2494
                                 " (but the instance has been renamed in"
2495
                                 " Ganeti)" % (old_file_storage_dir,
2496
                                               new_file_storage_dir))
2497

    
2498
    _StartInstanceDisks(self.cfg, inst, None)
2499
    try:
2500
      if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2501
                                          "sda", "sdb"):
2502
        msg = ("Could not run OS rename script for instance %s on node %s"
2503
               " (but the instance has been renamed in Ganeti)" %
2504
               (inst.name, inst.primary_node))
2505
        logger.Error(msg)
2506
    finally:
2507
      _ShutdownInstanceDisks(inst, self.cfg)
2508

    
2509

    
2510
class LURemoveInstance(LogicalUnit):
2511
  """Remove an instance.
2512

2513
  """
2514
  HPATH = "instance-remove"
2515
  HTYPE = constants.HTYPE_INSTANCE
2516
  _OP_REQP = ["instance_name", "ignore_failures"]
2517
  REQ_BGL = False
2518

    
2519
  def ExpandNames(self):
2520
    self._ExpandAndLockInstance()
2521
    self.needed_locks[locking.LEVEL_NODE] = []
2522
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2523

    
2524
  def DeclareLocks(self, level):
2525
    if level == locking.LEVEL_NODE:
2526
      self._LockInstancesNodes()
2527

    
2528
  def BuildHooksEnv(self):
2529
    """Build hooks env.
2530

2531
    This runs on master, primary and secondary nodes of the instance.
2532

2533
    """
2534
    env = _BuildInstanceHookEnvByObject(self.instance)
2535
    nl = [self.sstore.GetMasterNode()]
2536
    return env, nl, nl
2537

    
2538
  def CheckPrereq(self):
2539
    """Check prerequisites.
2540

2541
    This checks that the instance is in the cluster.
2542

2543
    """
2544
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2545
    assert self.instance is not None, \
2546
      "Cannot retrieve locked instance %s" % self.op.instance_name
2547

    
2548
  def Exec(self, feedback_fn):
2549
    """Remove the instance.
2550

2551
    """
2552
    instance = self.instance
2553
    logger.Info("shutting down instance %s on node %s" %
2554
                (instance.name, instance.primary_node))
2555

    
2556
    if not rpc.call_instance_shutdown(instance.primary_node, instance):
2557
      if self.op.ignore_failures:
2558
        feedback_fn("Warning: can't shutdown instance")
2559
      else:
2560
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2561
                                 (instance.name, instance.primary_node))
2562

    
2563
    logger.Info("removing block devices for instance %s" % instance.name)
2564

    
2565
    if not _RemoveDisks(instance, self.cfg):
2566
      if self.op.ignore_failures:
2567
        feedback_fn("Warning: can't remove instance's disks")
2568
      else:
2569
        raise errors.OpExecError("Can't remove instance's disks")
2570

    
2571
    logger.Info("removing instance %s out of cluster config" % instance.name)
2572

    
2573
    self.cfg.RemoveInstance(instance.name)
2574
    self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
2575

    
2576

    
2577
class LUQueryInstances(NoHooksLU):
2578
  """Logical unit for querying instances.
2579

2580
  """
2581
  _OP_REQP = ["output_fields", "names"]
2582
  REQ_BGL = False
2583

    
2584
  def ExpandNames(self):
2585
    self.dynamic_fields = frozenset(["oper_state", "oper_ram", "status"])
2586
    self.static_fields = frozenset([
2587
      "name", "os", "pnode", "snodes",
2588
      "admin_state", "admin_ram",
2589
      "disk_template", "ip", "mac", "bridge",
2590
      "sda_size", "sdb_size", "vcpus", "tags",
2591
      "network_port", "kernel_path", "initrd_path",
2592
      "hvm_boot_order", "hvm_acpi", "hvm_pae",
2593
      "hvm_cdrom_image_path", "hvm_nic_type",
2594
      "hvm_disk_type", "vnc_bind_address",
2595
      "serial_no",
2596
      ])
2597
    _CheckOutputFields(static=self.static_fields,
2598
                       dynamic=self.dynamic_fields,
2599
                       selected=self.op.output_fields)
2600

    
2601
    self.needed_locks = {}
2602
    self.share_locks[locking.LEVEL_INSTANCE] = 1
2603
    self.share_locks[locking.LEVEL_NODE] = 1
2604

    
2605
    if self.op.names:
2606
      self.wanted = _GetWantedInstances(self, self.op.names)
2607
    else:
2608
      self.wanted = locking.ALL_SET
2609

    
2610
    self.do_locking = not self.static_fields.issuperset(self.op.output_fields)
2611
    if self.do_locking:
2612
      self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
2613
      self.needed_locks[locking.LEVEL_NODE] = []
2614
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2615

    
2616
  def DeclareLocks(self, level):
2617
    if level == locking.LEVEL_NODE and self.do_locking:
2618
      self._LockInstancesNodes()
2619

    
2620
  def CheckPrereq(self):
2621
    """Check prerequisites.
2622

2623
    """
2624
    pass
2625

    
2626
  def Exec(self, feedback_fn):
2627
    """Computes the list of nodes and their attributes.
2628

2629
    """
2630
    all_info = self.cfg.GetAllInstancesInfo()
2631
    if self.do_locking:
2632
      instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
2633
    elif self.wanted != locking.ALL_SET:
2634
      instance_names = self.wanted
2635
      missing = set(instance_names).difference(all_info.keys())
2636
      if missing:
2637
        raise self.OpExecError(
2638
          "Some instances were removed before retrieving their data: %s"
2639
          % missing)
2640
    else:
2641
      instance_names = all_info.keys()
2642
    instance_list = [all_info[iname] for iname in instance_names]
2643

    
2644
    # begin data gathering
2645

    
2646
    nodes = frozenset([inst.primary_node for inst in instance_list])
2647

    
2648
    bad_nodes = []
2649
    if self.dynamic_fields.intersection(self.op.output_fields):
2650
      live_data = {}
2651
      node_data = rpc.call_all_instances_info(nodes)
2652
      for name in nodes:
2653
        result = node_data[name]
2654
        if result:
2655
          live_data.update(result)
2656
        elif result == False:
2657
          bad_nodes.append(name)
2658
        # else no instance is alive
2659
    else:
2660
      live_data = dict([(name, {}) for name in instance_names])
2661

    
2662
    # end data gathering
2663

    
2664
    output = []
2665
    for instance in instance_list:
2666
      iout = []
2667
      for field in self.op.output_fields:
2668
        if field == "name":
2669
          val = instance.name
2670
        elif field == "os":
2671
          val = instance.os
2672
        elif field == "pnode":
2673
          val = instance.primary_node
2674
        elif field == "snodes":
2675
          val = list(instance.secondary_nodes)
2676
        elif field == "admin_state":
2677
          val = (instance.status != "down")
2678
        elif field == "oper_state":
2679
          if instance.primary_node in bad_nodes:
2680
            val = None
2681
          else:
2682
            val = bool(live_data.get(instance.name))
2683
        elif field == "status":
2684
          if instance.primary_node in bad_nodes:
2685
            val = "ERROR_nodedown"
2686
          else:
2687
            running = bool(live_data.get(instance.name))
2688
            if running:
2689
              if instance.status != "down":
2690
                val = "running"
2691
              else:
2692
                val = "ERROR_up"
2693
            else:
2694
              if instance.status != "down":
2695
                val = "ERROR_down"
2696
              else:
2697
                val = "ADMIN_down"
2698
        elif field == "admin_ram":
2699
          val = instance.memory
2700
        elif field == "oper_ram":
2701
          if instance.primary_node in bad_nodes:
2702
            val = None
2703
          elif instance.name in live_data:
2704
            val = live_data[instance.name].get("memory", "?")
2705
          else:
2706
            val = "-"
2707
        elif field == "disk_template":
2708
          val = instance.disk_template
2709
        elif field == "ip":
2710
          val = instance.nics[0].ip
2711
        elif field == "bridge":
2712
          val = instance.nics[0].bridge
2713
        elif field == "mac":
2714
          val = instance.nics[0].mac
2715
        elif field == "sda_size" or field == "sdb_size":
2716
          disk = instance.FindDisk(field[:3])
2717
          if disk is None:
2718
            val = None
2719
          else:
2720
            val = disk.size
2721
        elif field == "vcpus":
2722
          val = instance.vcpus
2723
        elif field == "tags":
2724
          val = list(instance.GetTags())
2725
        elif field == "serial_no":
2726
          val = instance.serial_no
2727
        elif field in ("network_port", "kernel_path", "initrd_path",
2728
                       "hvm_boot_order", "hvm_acpi", "hvm_pae",
2729
                       "hvm_cdrom_image_path", "hvm_nic_type",
2730
                       "hvm_disk_type", "vnc_bind_address"):
2731
          val = getattr(instance, field, None)
2732
          if val is not None:
2733
            pass
2734
          elif field in ("hvm_nic_type", "hvm_disk_type",
2735
                         "kernel_path", "initrd_path"):
2736
            val = "default"
2737
          else:
2738
            val = "-"
2739
        else:
2740
          raise errors.ParameterError(field)
2741
        iout.append(val)
2742
      output.append(iout)
2743

    
2744
    return output
2745

    
2746

    
2747
class LUFailoverInstance(LogicalUnit):
2748
  """Failover an instance.
2749

2750
  """
2751
  HPATH = "instance-failover"
2752
  HTYPE = constants.HTYPE_INSTANCE
2753
  _OP_REQP = ["instance_name", "ignore_consistency"]
2754
  REQ_BGL = False
2755

    
2756
  def ExpandNames(self):
2757
    self._ExpandAndLockInstance()
2758
    self.needed_locks[locking.LEVEL_NODE] = []
2759
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2760

    
2761
  def DeclareLocks(self, level):
2762
    if level == locking.LEVEL_NODE:
2763
      self._LockInstancesNodes()
2764

    
2765
  def BuildHooksEnv(self):
2766
    """Build hooks env.
2767

2768
    This runs on master, primary and secondary nodes of the instance.
2769

2770
    """
2771
    env = {
2772
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2773
      }
2774
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2775
    nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2776
    return env, nl, nl
2777

    
2778
  def CheckPrereq(self):
2779
    """Check prerequisites.
2780

2781
    This checks that the instance is in the cluster.
2782

2783
    """
2784
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2785
    assert self.instance is not None, \
2786
      "Cannot retrieve locked instance %s" % self.op.instance_name
2787

    
2788
    if instance.disk_template not in constants.DTS_NET_MIRROR:
2789
      raise errors.OpPrereqError("Instance's disk layout is not"
2790
                                 " network mirrored, cannot failover.")
2791

    
2792
    secondary_nodes = instance.secondary_nodes
2793
    if not secondary_nodes:
2794
      raise errors.ProgrammerError("no secondary node but using "
2795
                                   "a mirrored disk template")
2796

    
2797
    target_node = secondary_nodes[0]
2798
    # check memory requirements on the secondary node
2799
    _CheckNodeFreeMemory(self.cfg, target_node, "failing over instance %s" %
2800
                         instance.name, instance.memory)
2801

    
2802
    # check bridge existance
2803
    brlist = [nic.bridge for nic in instance.nics]
2804
    if not rpc.call_bridges_exist(target_node, brlist):
2805
      raise errors.OpPrereqError("One or more target bridges %s does not"
2806
                                 " exist on destination node '%s'" %
2807
                                 (brlist, target_node))
2808

    
2809
  def Exec(self, feedback_fn):
2810
    """Failover an instance.
2811

2812
    The failover is done by shutting it down on its present node and
2813
    starting it on the secondary.
2814

2815
    """
2816
    instance = self.instance
2817

    
2818
    source_node = instance.primary_node
2819
    target_node = instance.secondary_nodes[0]
2820

    
2821
    feedback_fn("* checking disk consistency between source and target")
2822
    for dev in instance.disks:
2823
      # for drbd, these are drbd over lvm
2824
      if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2825
        if instance.status == "up" and not self.op.ignore_consistency:
2826
          raise errors.OpExecError("Disk %s is degraded on target node,"
2827
                                   " aborting failover." % dev.iv_name)
2828

    
2829
    feedback_fn("* shutting down instance on source node")
2830
    logger.Info("Shutting down instance %s on node %s" %
2831
                (instance.name, source_node))
2832

    
2833
    if not rpc.call_instance_shutdown(source_node, instance):
2834
      if self.op.ignore_consistency:
2835
        logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2836
                     " anyway. Please make sure node %s is down"  %
2837
                     (instance.name, source_node, source_node))
2838
      else:
2839
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2840
                                 (instance.name, source_node))
2841

    
2842
    feedback_fn("* deactivating the instance's disks on source node")
2843
    if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2844
      raise errors.OpExecError("Can't shut down the instance's disks.")
2845

    
2846
    instance.primary_node = target_node
2847
    # distribute new instance config to the other nodes
2848
    self.cfg.Update(instance)
2849

    
2850
    # Only start the instance if it's marked as up
2851
    if instance.status == "up":
2852
      feedback_fn("* activating the instance's disks on target node")
2853
      logger.Info("Starting instance %s on node %s" %
2854
                  (instance.name, target_node))
2855

    
2856
      disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2857
                                               ignore_secondaries=True)
2858
      if not disks_ok:
2859
        _ShutdownInstanceDisks(instance, self.cfg)
2860
        raise errors.OpExecError("Can't activate the instance's disks")
2861

    
2862
      feedback_fn("* starting the instance on the target node")
2863
      if not rpc.call_instance_start(target_node, instance, None):
2864
        _ShutdownInstanceDisks(instance, self.cfg)
2865
        raise errors.OpExecError("Could not start instance %s on node %s." %
2866
                                 (instance.name, target_node))
2867

    
2868

    
2869
def _CreateBlockDevOnPrimary(cfg, node, instance, device, info):
2870
  """Create a tree of block devices on the primary node.
2871

2872
  This always creates all devices.
2873

2874
  """
2875
  if device.children:
2876
    for child in device.children:
2877
      if not _CreateBlockDevOnPrimary(cfg, node, instance, child, info):
2878
        return False
2879

    
2880
  cfg.SetDiskID(device, node)
2881
  new_id = rpc.call_blockdev_create(node, device, device.size,
2882
                                    instance.name, True, info)
2883
  if not new_id:
2884
    return False
2885
  if device.physical_id is None:
2886
    device.physical_id = new_id
2887
  return True
2888

    
2889

    
2890
def _CreateBlockDevOnSecondary(cfg, node, instance, device, force, info):
2891
  """Create a tree of block devices on a secondary node.
2892

2893
  If this device type has to be created on secondaries, create it and
2894
  all its children.
2895

2896
  If not, just recurse to children keeping the same 'force' value.
2897

2898
  """
2899
  if device.CreateOnSecondary():
2900
    force = True
2901
  if device.children:
2902
    for child in device.children:
2903
      if not _CreateBlockDevOnSecondary(cfg, node, instance,
2904
                                        child, force, info):
2905
        return False
2906

    
2907
  if not force:
2908
    return True
2909
  cfg.SetDiskID(device, node)
2910
  new_id = rpc.call_blockdev_create(node, device, device.size,
2911
                                    instance.name, False, info)
2912
  if not new_id:
2913
    return False
2914
  if device.physical_id is None:
2915
    device.physical_id = new_id
2916
  return True
2917

    
2918

    
2919
def _GenerateUniqueNames(cfg, exts):
2920
  """Generate a suitable LV name.
2921

2922
  This will generate a logical volume name for the given instance.
2923

2924
  """
2925
  results = []
2926
  for val in exts:
2927
    new_id = cfg.GenerateUniqueID()
2928
    results.append("%s%s" % (new_id, val))
2929
  return results
2930

    
2931

    
2932
def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name,
2933
                         p_minor, s_minor):
2934
  """Generate a drbd8 device complete with its children.
2935

2936
  """
2937
  port = cfg.AllocatePort()
2938
  vgname = cfg.GetVGName()
2939
  shared_secret = cfg.GenerateDRBDSecret()
2940
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2941
                          logical_id=(vgname, names[0]))
2942
  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2943
                          logical_id=(vgname, names[1]))
2944
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
2945
                          logical_id=(primary, secondary, port,
2946
                                      p_minor, s_minor,
2947
                                      shared_secret),
2948
                          children=[dev_data, dev_meta],
2949
                          iv_name=iv_name)
2950
  return drbd_dev
2951

    
2952

    
2953
def _GenerateDiskTemplate(cfg, template_name,
2954
                          instance_name, primary_node,
2955
                          secondary_nodes, disk_sz, swap_sz,
2956
                          file_storage_dir, file_driver):
2957
  """Generate the entire disk layout for a given template type.
2958

2959
  """
2960
  #TODO: compute space requirements
2961

    
2962
  vgname = cfg.GetVGName()
2963
  if template_name == constants.DT_DISKLESS:
2964
    disks = []
2965
  elif template_name == constants.DT_PLAIN:
2966
    if len(secondary_nodes) != 0:
2967
      raise errors.ProgrammerError("Wrong template configuration")
2968

    
2969
    names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
2970
    sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2971
                           logical_id=(vgname, names[0]),
2972
                           iv_name = "sda")
2973
    sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2974
                           logical_id=(vgname, names[1]),
2975
                           iv_name = "sdb")
2976
    disks = [sda_dev, sdb_dev]
2977
  elif template_name == constants.DT_DRBD8:
2978
    if len(secondary_nodes) != 1:
2979
      raise errors.ProgrammerError("Wrong template configuration")
2980
    remote_node = secondary_nodes[0]
2981
    (minor_pa, minor_pb,
2982
     minor_sa, minor_sb) = cfg.AllocateDRBDMinor(
2983
      [primary_node, primary_node, remote_node, remote_node], instance_name)
2984

    
2985
    names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2986
                                       ".sdb_data", ".sdb_meta"])
2987
    drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2988
                                        disk_sz, names[0:2], "sda",
2989
                                        minor_pa, minor_sa)
2990
    drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2991
                                        swap_sz, names[2:4], "sdb",
2992
                                        minor_pb, minor_sb)
2993
    disks = [drbd_sda_dev, drbd_sdb_dev]
2994
  elif template_name == constants.DT_FILE:
2995
    if len(secondary_nodes) != 0:
2996
      raise errors.ProgrammerError("Wrong template configuration")
2997

    
2998
    file_sda_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk_sz,
2999
                                iv_name="sda", logical_id=(file_driver,
3000
                                "%s/sda" % file_storage_dir))
3001
    file_sdb_dev = objects.Disk(dev_type=constants.LD_FILE, size=swap_sz,
3002
                                iv_name="sdb", logical_id=(file_driver,
3003
                                "%s/sdb" % file_storage_dir))
3004
    disks = [file_sda_dev, file_sdb_dev]
3005
  else:
3006
    raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
3007
  return disks
3008

    
3009

    
3010
def _GetInstanceInfoText(instance):
3011
  """Compute that text that should be added to the disk's metadata.
3012

3013
  """
3014
  return "originstname+%s" % instance.name
3015

    
3016

    
3017
def _CreateDisks(cfg, instance):
3018
  """Create all disks for an instance.
3019

3020
  This abstracts away some work from AddInstance.
3021

3022
  Args:
3023
    instance: the instance object
3024

3025
  Returns:
3026
    True or False showing the success of the creation process
3027

3028
  """
3029
  info = _GetInstanceInfoText(instance)
3030

    
3031
  if instance.disk_template == constants.DT_FILE:
3032
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3033
    result = rpc.call_file_storage_dir_create(instance.primary_node,
3034
                                              file_storage_dir)
3035

    
3036
    if not result:
3037
      logger.Error("Could not connect to node '%s'" % instance.primary_node)
3038
      return False
3039

    
3040
    if not result[0]:
3041
      logger.Error("failed to create directory '%s'" % file_storage_dir)
3042
      return False
3043

    
3044
  for device in instance.disks:
3045
    logger.Info("creating volume %s for instance %s" %
3046
                (device.iv_name, instance.name))
3047
    #HARDCODE
3048
    for secondary_node in instance.secondary_nodes:
3049
      if not _CreateBlockDevOnSecondary(cfg, secondary_node, instance,
3050
                                        device, False, info):
3051
        logger.Error("failed to create volume %s (%s) on secondary node %s!" %
3052
                     (device.iv_name, device, secondary_node))
3053
        return False
3054
    #HARDCODE
3055
    if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
3056
                                    instance, device, info):
3057
      logger.Error("failed to create volume %s on primary!" %
3058
                   device.iv_name)
3059
      return False
3060

    
3061
  return True
3062

    
3063

    
3064
def _RemoveDisks(instance, cfg):
3065
  """Remove all disks for an instance.
3066

3067
  This abstracts away some work from `AddInstance()` and
3068
  `RemoveInstance()`. Note that in case some of the devices couldn't
3069
  be removed, the removal will continue with the other ones (compare
3070
  with `_CreateDisks()`).
3071

3072
  Args:
3073
    instance: the instance object
3074

3075
  Returns:
3076
    True or False showing the success of the removal proces
3077

3078
  """
3079
  logger.Info("removing block devices for instance %s" % instance.name)
3080

    
3081
  result = True
3082
  for device in instance.disks:
3083
    for node, disk in device.ComputeNodeTree(instance.primary_node):
3084
      cfg.SetDiskID(disk, node)
3085
      if not rpc.call_blockdev_remove(node, disk):
3086
        logger.Error("could not remove block device %s on node %s,"
3087
                     " continuing anyway" %
3088
                     (device.iv_name, node))
3089
        result = False
3090

    
3091
  if instance.disk_template == constants.DT_FILE:
3092
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3093
    if not rpc.call_file_storage_dir_remove(instance.primary_node,
3094
                                            file_storage_dir):
3095
      logger.Error("could not remove directory '%s'" % file_storage_dir)
3096
      result = False
3097

    
3098
  return result
3099

    
3100

    
3101
def _ComputeDiskSize(disk_template, disk_size, swap_size):
3102
  """Compute disk size requirements in the volume group
3103

3104
  This is currently hard-coded for the two-drive layout.
3105

3106
  """
3107
  # Required free disk space as a function of disk and swap space
3108
  req_size_dict = {
3109
    constants.DT_DISKLESS: None,
3110
    constants.DT_PLAIN: disk_size + swap_size,
3111
    # 256 MB are added for drbd metadata, 128MB for each drbd device
3112
    constants.DT_DRBD8: disk_size + swap_size + 256,
3113
    constants.DT_FILE: None,
3114
  }
3115

    
3116
  if disk_template not in req_size_dict:
3117
    raise errors.ProgrammerError("Disk template '%s' size requirement"
3118
                                 " is unknown" %  disk_template)
3119

    
3120
  return req_size_dict[disk_template]
3121

    
3122

    
3123
class LUCreateInstance(LogicalUnit):
3124
  """Create an instance.
3125

3126
  """
3127
  HPATH = "instance-add"
3128
  HTYPE = constants.HTYPE_INSTANCE
3129
  _OP_REQP = ["instance_name", "mem_size", "disk_size",
3130
              "disk_template", "swap_size", "mode", "start", "vcpus",
3131
              "wait_for_sync", "ip_check", "mac"]
3132
  REQ_BGL = False
3133

    
3134
  def _ExpandNode(self, node):
3135
    """Expands and checks one node name.
3136

3137
    """
3138
    node_full = self.cfg.ExpandNodeName(node)
3139
    if node_full is None:
3140
      raise errors.OpPrereqError("Unknown node %s" % node)
3141
    return node_full
3142

    
3143
  def ExpandNames(self):
3144
    """ExpandNames for CreateInstance.
3145

3146
    Figure out the right locks for instance creation.
3147

3148
    """
3149
    self.needed_locks = {}
3150

    
3151
    # set optional parameters to none if they don't exist
3152
    for attr in ["kernel_path", "initrd_path", "pnode", "snode",
3153
                 "iallocator", "hvm_boot_order", "hvm_acpi", "hvm_pae",
3154
                 "hvm_cdrom_image_path", "hvm_nic_type", "hvm_disk_type",
3155
                 "vnc_bind_address"]:
3156
      if not hasattr(self.op, attr):
3157
        setattr(self.op, attr, None)
3158

    
3159
    # verify creation mode
3160
    if self.op.mode not in (constants.INSTANCE_CREATE,
3161
                            constants.INSTANCE_IMPORT):
3162
      raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
3163
                                 self.op.mode)
3164
    # disk template and mirror node verification
3165
    if self.op.disk_template not in constants.DISK_TEMPLATES:
3166
      raise errors.OpPrereqError("Invalid disk template name")
3167

    
3168
    #### instance parameters check
3169

    
3170
    # instance name verification
3171
    hostname1 = utils.HostInfo(self.op.instance_name)
3172
    self.op.instance_name = instance_name = hostname1.name
3173

    
3174
    # this is just a preventive check, but someone might still add this
3175
    # instance in the meantime, and creation will fail at lock-add time
3176
    if instance_name in self.cfg.GetInstanceList():
3177
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3178
                                 instance_name)
3179

    
3180
    self.add_locks[locking.LEVEL_INSTANCE] = instance_name
3181

    
3182
    # ip validity checks
3183
    ip = getattr(self.op, "ip", None)
3184
    if ip is None or ip.lower() == "none":
3185
      inst_ip = None
3186
    elif ip.lower() == "auto":
3187
      inst_ip = hostname1.ip
3188
    else:
3189
      if not utils.IsValidIP(ip):
3190
        raise errors.OpPrereqError("given IP address '%s' doesn't look"
3191
                                   " like a valid IP" % ip)
3192
      inst_ip = ip
3193
    self.inst_ip = self.op.ip = inst_ip
3194
    # used in CheckPrereq for ip ping check
3195
    self.check_ip = hostname1.ip
3196

    
3197
    # MAC address verification
3198
    if self.op.mac != "auto":
3199
      if not utils.IsValidMac(self.op.mac.lower()):
3200
        raise errors.OpPrereqError("invalid MAC address specified: %s" %
3201
                                   self.op.mac)
3202

    
3203
    # boot order verification
3204
    if self.op.hvm_boot_order is not None:
3205
      if len(self.op.hvm_boot_order.strip("acdn")) != 0:
3206
        raise errors.OpPrereqError("invalid boot order specified,"
3207
                                   " must be one or more of [acdn]")
3208
    # file storage checks
3209
    if (self.op.file_driver and
3210
        not self.op.file_driver in constants.FILE_DRIVER):
3211
      raise errors.OpPrereqError("Invalid file driver name '%s'" %
3212
                                 self.op.file_driver)
3213

    
3214
    if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
3215
      raise errors.OpPrereqError("File storage directory path not absolute")
3216

    
3217
    ### Node/iallocator related checks
3218
    if [self.op.iallocator, self.op.pnode].count(None) != 1:
3219
      raise errors.OpPrereqError("One and only one of iallocator and primary"
3220
                                 " node must be given")
3221

    
3222
    if self.op.iallocator:
3223
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3224
    else:
3225
      self.op.pnode = self._ExpandNode(self.op.pnode)
3226
      nodelist = [self.op.pnode]
3227
      if self.op.snode is not None:
3228
        self.op.snode = self._ExpandNode(self.op.snode)
3229
        nodelist.append(self.op.snode)
3230
      self.needed_locks[locking.LEVEL_NODE] = nodelist
3231

    
3232
    # in case of import lock the source node too
3233
    if self.op.mode == constants.INSTANCE_IMPORT:
3234
      src_node = getattr(self.op, "src_node", None)
3235
      src_path = getattr(self.op, "src_path", None)
3236

    
3237
      if src_node is None or src_path is None:
3238
        raise errors.OpPrereqError("Importing an instance requires source"
3239
                                   " node and path options")
3240

    
3241
      if not os.path.isabs(src_path):
3242
        raise errors.OpPrereqError("The source path must be absolute")
3243

    
3244
      self.op.src_node = src_node = self._ExpandNode(src_node)
3245
      if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
3246
        self.needed_locks[locking.LEVEL_NODE].append(src_node)
3247

    
3248
    else: # INSTANCE_CREATE
3249
      if getattr(self.op, "os_type", None) is None:
3250
        raise errors.OpPrereqError("No guest OS specified")
3251

    
3252
  def _RunAllocator(self):
3253
    """Run the allocator based on input opcode.
3254

3255
    """
3256
    disks = [{"size": self.op.disk_size, "mode": "w"},
3257
             {"size": self.op.swap_size, "mode": "w"}]
3258
    nics = [{"mac": self.op.mac, "ip": getattr(self.op, "ip", None),
3259
             "bridge": self.op.bridge}]
3260
    ial = IAllocator(self.cfg, self.sstore,
3261
                     mode=constants.IALLOCATOR_MODE_ALLOC,
3262
                     name=self.op.instance_name,
3263
                     disk_template=self.op.disk_template,
3264
                     tags=[],
3265
                     os=self.op.os_type,
3266
                     vcpus=self.op.vcpus,
3267
                     mem_size=self.op.mem_size,
3268
                     disks=disks,
3269
                     nics=nics,
3270
                     )
3271

    
3272
    ial.Run(self.op.iallocator)
3273

    
3274
    if not ial.success:
3275
      raise errors.OpPrereqError("Can't compute nodes using"
3276
                                 " iallocator '%s': %s" % (self.op.iallocator,
3277
                                                           ial.info))
3278
    if len(ial.nodes) != ial.required_nodes:
3279
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3280
                                 " of nodes (%s), required %s" %
3281
                                 (self.op.iallocator, len(ial.nodes),
3282
                                  ial.required_nodes))
3283
    self.op.pnode = ial.nodes[0]
3284
    logger.ToStdout("Selected nodes for the instance: %s" %
3285
                    (", ".join(ial.nodes),))
3286
    logger.Info("Selected nodes for instance %s via iallocator %s: %s" %
3287
                (self.op.instance_name, self.op.iallocator, ial.nodes))
3288
    if ial.required_nodes == 2:
3289
      self.op.snode = ial.nodes[1]
3290

    
3291
  def BuildHooksEnv(self):
3292
    """Build hooks env.
3293

3294
    This runs on master, primary and secondary nodes of the instance.
3295

3296
    """
3297
    env = {
3298
      "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
3299
      "INSTANCE_DISK_SIZE": self.op.disk_size,
3300
      "INSTANCE_SWAP_SIZE": self.op.swap_size,
3301
      "INSTANCE_ADD_MODE": self.op.mode,
3302
      }
3303
    if self.op.mode == constants.INSTANCE_IMPORT:
3304
      env["INSTANCE_SRC_NODE"] = self.op.src_node
3305
      env["INSTANCE_SRC_PATH"] = self.op.src_path
3306
      env["INSTANCE_SRC_IMAGE"] = self.src_image
3307

    
3308
    env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
3309
      primary_node=self.op.pnode,
3310
      secondary_nodes=self.secondaries,
3311
      status=self.instance_status,
3312
      os_type=self.op.os_type,
3313
      memory=self.op.mem_size,
3314
      vcpus=self.op.vcpus,
3315
      nics=[(self.inst_ip, self.op.bridge, self.op.mac)],
3316
    ))
3317

    
3318
    nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
3319
          self.secondaries)
3320
    return env, nl, nl
3321

    
3322

    
3323
  def CheckPrereq(self):
3324
    """Check prerequisites.
3325

3326
    """
3327
    if (not self.cfg.GetVGName() and
3328
        self.op.disk_template not in constants.DTS_NOT_LVM):
3329
      raise errors.OpPrereqError("Cluster does not support lvm-based"
3330
                                 " instances")
3331

    
3332
    if self.op.mode == constants.INSTANCE_IMPORT:
3333
      src_node = self.op.src_node
3334
      src_path = self.op.src_path
3335

    
3336
      export_info = rpc.call_export_info(src_node, src_path)
3337

    
3338
      if not export_info:
3339
        raise errors.OpPrereqError("No export found in dir %s" % src_path)
3340

    
3341
      if not export_info.has_section(constants.INISECT_EXP):
3342
        raise errors.ProgrammerError("Corrupted export config")
3343

    
3344
      ei_version = export_info.get(constants.INISECT_EXP, 'version')
3345
      if (int(ei_version) != constants.EXPORT_VERSION):
3346
        raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3347
                                   (ei_version, constants.EXPORT_VERSION))
3348

    
3349
      if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
3350
        raise errors.OpPrereqError("Can't import instance with more than"
3351
                                   " one data disk")
3352

    
3353
      # FIXME: are the old os-es, disk sizes, etc. useful?
3354
      self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3355
      diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
3356
                                                         'disk0_dump'))
3357
      self.src_image = diskimage
3358

    
3359
    # ip ping checks (we use the same ip that was resolved in ExpandNames)
3360

    
3361
    if self.op.start and not self.op.ip_check:
3362
      raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3363
                                 " adding an instance in start mode")
3364

    
3365
    if self.op.ip_check:
3366
      if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
3367
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
3368
                                   (self.check_ip, instance_name))
3369

    
3370
    # bridge verification
3371
    bridge = getattr(self.op, "bridge", None)
3372
    if bridge is None:
3373
      self.op.bridge = self.cfg.GetDefBridge()
3374
    else:
3375
      self.op.bridge = bridge
3376

    
3377
    #### allocator run
3378

    
3379
    if self.op.iallocator is not None:
3380
      self._RunAllocator()
3381

    
3382
    #### node related checks
3383

    
3384
    # check primary node
3385
    self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
3386
    assert self.pnode is not None, \
3387
      "Cannot retrieve locked node %s" % self.op.pnode
3388
    self.secondaries = []
3389

    
3390
    # mirror node verification
3391
    if self.op.disk_template in constants.DTS_NET_MIRROR:
3392
      if self.op.snode is None:
3393
        raise errors.OpPrereqError("The networked disk templates need"
3394
                                   " a mirror node")
3395
      if self.op.snode == pnode.name:
3396
        raise errors.OpPrereqError("The secondary node cannot be"
3397
                                   " the primary node.")
3398
      self.secondaries.append(self.op.snode)
3399

    
3400
    req_size = _ComputeDiskSize(self.op.disk_template,
3401
                                self.op.disk_size, self.op.swap_size)
3402

    
3403
    # Check lv size requirements
3404
    if req_size is not None:
3405
      nodenames = [pnode.name] + self.secondaries
3406
      nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
3407
      for node in nodenames:
3408
        info = nodeinfo.get(node, None)
3409
        if not info:
3410
          raise errors.OpPrereqError("Cannot get current information"
3411
                                     " from node '%s'" % node)
3412
        vg_free = info.get('vg_free', None)
3413
        if not isinstance(vg_free, int):
3414
          raise errors.OpPrereqError("Can't compute free disk space on"
3415
                                     " node %s" % node)
3416
        if req_size > info['vg_free']:
3417
          raise errors.OpPrereqError("Not enough disk space on target node %s."
3418
                                     " %d MB available, %d MB required" %
3419
                                     (node, info['vg_free'], req_size))
3420

    
3421
    # os verification
3422
    os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
3423
    if not os_obj:
3424
      raise errors.OpPrereqError("OS '%s' not in supported os list for"
3425
                                 " primary node"  % self.op.os_type)
3426

    
3427
    if self.op.kernel_path == constants.VALUE_NONE:
3428
      raise errors.OpPrereqError("Can't set instance kernel to none")
3429

    
3430
    # bridge check on primary node
3431
    if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3432
      raise errors.OpPrereqError("target bridge '%s' does not exist on"
3433
                                 " destination node '%s'" %
3434
                                 (self.op.bridge, pnode.name))
3435

    
3436
    # memory check on primary node
3437
    if self.op.start:
3438
      _CheckNodeFreeMemory(self.cfg, self.pnode.name,
3439
                           "creating instance %s" % self.op.instance_name,
3440
                           self.op.mem_size)
3441

    
3442
    # hvm_cdrom_image_path verification
3443
    if self.op.hvm_cdrom_image_path is not None:
3444
      # FIXME (als): shouldn't these checks happen on the destination node?
3445
      if not os.path.isabs(self.op.hvm_cdrom_image_path):
3446
        raise errors.OpPrereqError("The path to the HVM CDROM image must"
3447
                                   " be an absolute path or None, not %s" %
3448
                                   self.op.hvm_cdrom_image_path)
3449
      if not os.path.isfile(self.op.hvm_cdrom_image_path):
3450
        raise errors.OpPrereqError("The HVM CDROM image must either be a"
3451
                                   " regular file or a symlink pointing to"
3452
                                   " an existing regular file, not %s" %
3453
                                   self.op.hvm_cdrom_image_path)
3454

    
3455
    # vnc_bind_address verification
3456
    if self.op.vnc_bind_address is not None:
3457
      if not utils.IsValidIP(self.op.vnc_bind_address):
3458
        raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
3459
                                   " like a valid IP address" %
3460
                                   self.op.vnc_bind_address)
3461

    
3462
    # Xen HVM device type checks
3463
    if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
3464
      if self.op.hvm_nic_type not in constants.HT_HVM_VALID_NIC_TYPES:
3465
        raise errors.OpPrereqError("Invalid NIC type %s specified for Xen HVM"
3466
                                   " hypervisor" % self.op.hvm_nic_type)
3467
      if self.op.hvm_disk_type not in constants.HT_HVM_VALID_DISK_TYPES:
3468
        raise errors.OpPrereqError("Invalid disk type %s specified for Xen HVM"
3469
                                   " hypervisor" % self.op.hvm_disk_type)
3470

    
3471
    if self.op.start:
3472
      self.instance_status = 'up'
3473
    else:
3474
      self.instance_status = 'down'
3475

    
3476
  def Exec(self, feedback_fn):
3477
    """Create and add the instance to the cluster.
3478

3479
    """
3480
    instance = self.op.instance_name
3481
    pnode_name = self.pnode.name
3482

    
3483
    if self.op.mac == "auto":
3484
      mac_address = self.cfg.GenerateMAC()
3485
    else:
3486
      mac_address = self.op.mac
3487

    
3488
    nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3489
    if self.inst_ip is not None:
3490
      nic.ip = self.inst_ip
3491

    
3492
    ht_kind = self.sstore.GetHypervisorType()
3493
    if ht_kind in constants.HTS_REQ_PORT:
3494
      network_port = self.cfg.AllocatePort()
3495
    else:
3496
      network_port = None
3497

    
3498
    if self.op.vnc_bind_address is None:
3499
      self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
3500

    
3501
    # this is needed because os.path.join does not accept None arguments
3502
    if self.op.file_storage_dir is None:
3503
      string_file_storage_dir = ""
3504
    else:
3505
      string_file_storage_dir = self.op.file_storage_dir
3506

    
3507
    # build the full file storage dir path
3508
    file_storage_dir = os.path.normpath(os.path.join(
3509
                                        self.sstore.GetFileStorageDir(),
3510
                                        string_file_storage_dir, instance))
3511

    
3512

    
3513
    disks = _GenerateDiskTemplate(self.cfg,
3514
                                  self.op.disk_template,
3515
                                  instance, pnode_name,
3516
                                  self.secondaries, self.op.disk_size,
3517
                                  self.op.swap_size,
3518
                                  file_storage_dir,
3519
                                  self.op.file_driver)
3520

    
3521
    iobj = objects.Instance(name=instance, os=self.op.os_type,
3522
                            primary_node=pnode_name,
3523
                            memory=self.op.mem_size,
3524
                            vcpus=self.op.vcpus,
3525
                            nics=[nic], disks=disks,
3526
                            disk_template=self.op.disk_template,
3527
                            status=self.instance_status,
3528
                            network_port=network_port,
3529
                            kernel_path=self.op.kernel_path,
3530
                            initrd_path=self.op.initrd_path,
3531
                            hvm_boot_order=self.op.hvm_boot_order,
3532
                            hvm_acpi=self.op.hvm_acpi,
3533
                            hvm_pae=self.op.hvm_pae,
3534
                            hvm_cdrom_image_path=self.op.hvm_cdrom_image_path,
3535
                            vnc_bind_address=self.op.vnc_bind_address,
3536
                            hvm_nic_type=self.op.hvm_nic_type,
3537
                            hvm_disk_type=self.op.hvm_disk_type,
3538
                            )
3539

    
3540
    feedback_fn("* creating instance disks...")
3541
    if not _CreateDisks(self.cfg, iobj):
3542
      _RemoveDisks(iobj, self.cfg)
3543
      self.cfg.ReleaseDRBDMinors(instance)
3544
      raise errors.OpExecError("Device creation failed, reverting...")
3545

    
3546
    feedback_fn("adding instance %s to cluster config" % instance)
3547

    
3548
    self.cfg.AddInstance(iobj)
3549
    # Declare that we don't want to remove the instance lock anymore, as we've
3550
    # added the instance to the config
3551
    del self.remove_locks[locking.LEVEL_INSTANCE]
3552
    # Remove the temp. assignements for the instance's drbds
3553
    self.cfg.ReleaseDRBDMinors(instance)
3554

    
3555
    if self.op.wait_for_sync:
3556
      disk_abort = not _WaitForSync(self.cfg, iobj, self.proc)
3557
    elif iobj.disk_template in constants.DTS_NET_MIRROR:
3558
      # make sure the disks are not degraded (still sync-ing is ok)
3559
      time.sleep(15)
3560
      feedback_fn("* checking mirrors status")
3561
      disk_abort = not _WaitForSync(self.cfg, iobj, self.proc, oneshot=True)
3562
    else:
3563
      disk_abort = False
3564

    
3565
    if disk_abort:
3566
      _RemoveDisks(iobj, self.cfg)
3567
      self.cfg.RemoveInstance(iobj.name)
3568
      # Make sure the instance lock gets removed
3569
      self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
3570
      raise errors.OpExecError("There are some degraded disks for"
3571
                               " this instance")
3572

    
3573
    feedback_fn("creating os for instance %s on node %s" %
3574
                (instance, pnode_name))
3575

    
3576
    if iobj.disk_template != constants.DT_DISKLESS:
3577
      if self.op.mode == constants.INSTANCE_CREATE:
3578
        feedback_fn("* running the instance OS create scripts...")
3579
        if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3580
          raise errors.OpExecError("could not add os for instance %s"
3581
                                   " on node %s" %
3582
                                   (instance, pnode_name))
3583

    
3584
      elif self.op.mode == constants.INSTANCE_IMPORT:
3585
        feedback_fn("* running the instance OS import scripts...")
3586
        src_node = self.op.src_node
3587
        src_image = self.src_image
3588
        if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3589
                                                src_node, src_image):
3590
          raise errors.OpExecError("Could not import os for instance"
3591
                                   " %s on node %s" %
3592
                                   (instance, pnode_name))
3593
      else:
3594
        # also checked in the prereq part
3595
        raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3596
                                     % self.op.mode)
3597

    
3598
    if self.op.start:
3599
      logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3600
      feedback_fn("* starting instance...")
3601
      if not rpc.call_instance_start(pnode_name, iobj, None):
3602
        raise errors.OpExecError("Could not start instance")
3603

    
3604

    
3605
class LUConnectConsole(NoHooksLU):
3606
  """Connect to an instance's console.
3607

3608
  This is somewhat special in that it returns the command line that
3609
  you need to run on the master node in order to connect to the
3610
  console.
3611

3612
  """
3613
  _OP_REQP = ["instance_name"]
3614
  REQ_BGL = False
3615

    
3616
  def ExpandNames(self):
3617
    self._ExpandAndLockInstance()
3618

    
3619
  def CheckPrereq(self):
3620
    """Check prerequisites.
3621

3622
    This checks that the instance is in the cluster.
3623

3624
    """
3625
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3626
    assert self.instance is not None, \
3627
      "Cannot retrieve locked instance %s" % self.op.instance_name
3628

    
3629
  def Exec(self, feedback_fn):
3630
    """Connect to the console of an instance
3631

3632
    """
3633
    instance = self.instance
3634
    node = instance.primary_node
3635

    
3636
    node_insts = rpc.call_instance_list([node])[node]
3637
    if node_insts is False:
3638
      raise errors.OpExecError("Can't connect to node %s." % node)
3639

    
3640
    if instance.name not in node_insts:
3641
      raise errors.OpExecError("Instance %s is not running." % instance.name)
3642

    
3643
    logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3644

    
3645
    hyper = hypervisor.GetHypervisor(self.cfg)
3646
    console_cmd = hyper.GetShellCommandForConsole(instance)
3647

    
3648
    # build ssh cmdline
3649
    return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
3650

    
3651

    
3652
class LUReplaceDisks(LogicalUnit):
3653
  """Replace the disks of an instance.
3654

3655
  """
3656
  HPATH = "mirrors-replace"
3657
  HTYPE = constants.HTYPE_INSTANCE
3658
  _OP_REQP = ["instance_name", "mode", "disks"]
3659
  REQ_BGL = False
3660

    
3661
  def ExpandNames(self):
3662
    self._ExpandAndLockInstance()
3663

    
3664
    if not hasattr(self.op, "remote_node"):
3665
      self.op.remote_node = None
3666

    
3667
    ia_name = getattr(self.op, "iallocator", None)
3668
    if ia_name is not None:
3669
      if self.op.remote_node is not None:
3670
        raise errors.OpPrereqError("Give either the iallocator or the new"
3671
                                   " secondary, not both")
3672
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3673
    elif self.op.remote_node is not None:
3674
      remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
3675
      if remote_node is None:
3676
        raise errors.OpPrereqError("Node '%s' not known" %
3677
                                   self.op.remote_node)
3678
      self.op.remote_node = remote_node
3679
      self.needed_locks[locking.LEVEL_NODE] = [remote_node]
3680
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
3681
    else:
3682
      self.needed_locks[locking.LEVEL_NODE] = []
3683
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3684

    
3685
  def DeclareLocks(self, level):
3686
    # If we're not already locking all nodes in the set we have to declare the
3687
    # instance's primary/secondary nodes.
3688
    if (level == locking.LEVEL_NODE and
3689
        self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
3690
      self._LockInstancesNodes()
3691

    
3692
  def _RunAllocator(self):
3693
    """Compute a new secondary node using an IAllocator.
3694

3695
    """
3696
    ial = IAllocator(self.cfg, self.sstore,
3697
                     mode=constants.IALLOCATOR_MODE_RELOC,
3698
                     name=self.op.instance_name,
3699
                     relocate_from=[self.sec_node])
3700

    
3701
    ial.Run(self.op.iallocator)
3702

    
3703
    if not ial.success:
3704
      raise errors.OpPrereqError("Can't compute nodes using"
3705
                                 " iallocator '%s': %s" % (self.op.iallocator,
3706
                                                           ial.info))
3707
    if len(ial.nodes) != ial.required_nodes:
3708
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3709
                                 " of nodes (%s), required %s" %
3710
                                 (len(ial.nodes), ial.required_nodes))
3711
    self.op.remote_node = ial.nodes[0]
3712
    logger.ToStdout("Selected new secondary for the instance: %s" %
3713
                    self.op.remote_node)
3714

    
3715
  def BuildHooksEnv(self):
3716
    """Build hooks env.
3717

3718
    This runs on the master, the primary and all the secondaries.
3719

3720
    """
3721
    env = {
3722
      "MODE": self.op.mode,
3723
      "NEW_SECONDARY": self.op.remote_node,
3724
      "OLD_SECONDARY": self.instance.secondary_nodes[0],
3725
      }
3726
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3727
    nl = [
3728
      self.sstore.GetMasterNode(),
3729
      self.instance.primary_node,
3730
      ]
3731
    if self.op.remote_node is not None:
3732
      nl.append(self.op.remote_node)
3733
    return env, nl, nl
3734

    
3735
  def CheckPrereq(self):
3736
    """Check prerequisites.
3737

3738
    This checks that the instance is in the cluster.
3739

3740
    """
3741
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3742
    assert instance is not None, \
3743
      "Cannot retrieve locked instance %s" % self.op.instance_name
3744
    self.instance = instance
3745

    
3746
    if instance.disk_template not in constants.DTS_NET_MIRROR:
3747
      raise errors.OpPrereqError("Instance's disk layout is not"
3748
                                 " network mirrored.")
3749

    
3750
    if len(instance.secondary_nodes) != 1:
3751
      raise errors.OpPrereqError("The instance has a strange layout,"
3752
                                 " expected one secondary but found %d" %
3753
                                 len(instance.secondary_nodes))
3754

    
3755
    self.sec_node = instance.secondary_nodes[0]
3756

    
3757
    ia_name = getattr(self.op, "iallocator", None)
3758
    if ia_name is not None:
3759
      self._RunAllocator()
3760

    
3761
    remote_node = self.op.remote_node
3762
    if remote_node is not None:
3763
      self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3764
      assert self.remote_node_info is not None, \
3765
        "Cannot retrieve locked node %s" % remote_node
3766
    else:
3767
      self.remote_node_info = None
3768
    if remote_node == instance.primary_node:
3769
      raise errors.OpPrereqError("The specified node is the primary node of"
3770
                                 " the instance.")
3771
    elif remote_node == self.sec_node:
3772
      if self.op.mode == constants.REPLACE_DISK_SEC:
3773
        # this is for DRBD8, where we can't execute the same mode of
3774
        # replacement as for drbd7 (no different port allocated)
3775
        raise errors.OpPrereqError("Same secondary given, cannot execute"
3776
                                   " replacement")
3777
    if instance.disk_template == constants.DT_DRBD8:
3778
      if (self.op.mode == constants.REPLACE_DISK_ALL and
3779
          remote_node is not None):
3780
        # switch to replace secondary mode
3781
        self.op.mode = constants.REPLACE_DISK_SEC
3782

    
3783
      if self.op.mode == constants.REPLACE_DISK_ALL:
3784
        raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3785
                                   " secondary disk replacement, not"
3786
                                   " both at once")
3787
      elif self.op.mode == constants.REPLACE_DISK_PRI:
3788
        if remote_node is not None:
3789
          raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3790
                                     " the secondary while doing a primary"
3791
                                     " node disk replacement")
3792
        self.tgt_node = instance.primary_node
3793
        self.oth_node = instance.secondary_nodes[0]
3794
      elif self.op.mode == constants.REPLACE_DISK_SEC:
3795
        self.new_node = remote_node # this can be None, in which case
3796
                                    # we don't change the secondary
3797
        self.tgt_node = instance.secondary_nodes[0]
3798
        self.oth_node = instance.primary_node
3799
      else:
3800
        raise errors.ProgrammerError("Unhandled disk replace mode")
3801

    
3802
    for name in self.op.disks:
3803
      if instance.FindDisk(name) is None:
3804
        raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3805
                                   (name, instance.name))
3806

    
3807
  def _ExecD8DiskOnly(self, feedback_fn):
3808
    """Replace a disk on the primary or secondary for dbrd8.
3809

3810
    The algorithm for replace is quite complicated:
3811
      - for each disk to be replaced:
3812
        - create new LVs on the target node with unique names
3813
        - detach old LVs from the drbd device
3814
        - rename old LVs to name_replaced.<time_t>
3815
        - rename new LVs to old LVs
3816
        - attach the new LVs (with the old names now) to the drbd device
3817
      - wait for sync across all devices
3818
      - for each modified disk:
3819
        - remove old LVs (which have the name name_replaces.<time_t>)
3820

3821
    Failures are not very well handled.
3822

3823
    """
3824
    steps_total = 6
3825
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3826
    instance = self.instance
3827
    iv_names = {}
3828
    vgname = self.cfg.GetVGName()
3829
    # start of work
3830
    cfg = self.cfg
3831
    tgt_node = self.tgt_node
3832
    oth_node = self.oth_node
3833

    
3834
    # Step: check device activation
3835
    self.proc.LogStep(1, steps_total, "check device existence")
3836
    info("checking volume groups")
3837
    my_vg = cfg.GetVGName()
3838
    results = rpc.call_vg_list([oth_node, tgt_node])
3839
    if not results:
3840
      raise errors.OpExecError("Can't list volume groups on the nodes")
3841
    for node in oth_node, tgt_node:
3842
      res = results.get(node, False)
3843
      if not res or my_vg not in res:
3844
        raise errors.OpExecError("Volume group '%s' not found on %s" %
3845
                                 (my_vg, node))
3846
    for dev in instance.disks:
3847
      if not dev.iv_name in self.op.disks:
3848
        continue
3849
      for node in tgt_node, oth_node:
3850
        info("checking %s on %s" % (dev.iv_name, node))
3851
        cfg.SetDiskID(dev, node)
3852
        if not rpc.call_blockdev_find(node, dev):
3853
          raise errors.OpExecError("Can't find device %s on node %s" %
3854
                                   (dev.iv_name, node))
3855

    
3856
    # Step: check other node consistency
3857
    self.proc.LogStep(2, steps_total, "check peer consistency")
3858
    for dev in instance.disks:
3859
      if not dev.iv_name in self.op.disks:
3860
        continue
3861
      info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3862
      if not _CheckDiskConsistency(self.cfg, dev, oth_node,
3863
                                   oth_node==instance.primary_node):
3864
        raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3865
                                 " to replace disks on this node (%s)" %
3866
                                 (oth_node, tgt_node))
3867

    
3868
    # Step: create new storage
3869
    self.proc.LogStep(3, steps_total, "allocate new storage")
3870
    for dev in instance.disks:
3871
      if not dev.iv_name in self.op.disks:
3872
        continue
3873
      size = dev.size
3874
      cfg.SetDiskID(dev, tgt_node)
3875
      lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3876
      names = _GenerateUniqueNames(cfg, lv_names)
3877
      lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3878
                             logical_id=(vgname, names[0]))
3879
      lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3880
                             logical_id=(vgname, names[1]))
3881
      new_lvs = [lv_data, lv_meta]
3882
      old_lvs = dev.children
3883
      iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3884
      info("creating new local storage on %s for %s" %
3885
           (tgt_node, dev.iv_name))
3886
      # since we *always* want to create this LV, we use the
3887
      # _Create...OnPrimary (which forces the creation), even if we
3888
      # are talking about the secondary node
3889
      for new_lv in new_lvs:
3890
        if not _CreateBlockDevOnPrimary(cfg, tgt_node, instance, new_lv,
3891
                                        _GetInstanceInfoText(instance)):
3892
          raise errors.OpExecError("Failed to create new LV named '%s' on"
3893
                                   " node '%s'" %
3894
                                   (new_lv.logical_id[1], tgt_node))
3895

    
3896
    # Step: for each lv, detach+rename*2+attach
3897
    self.proc.LogStep(4, steps_total, "change drbd configuration")
3898
    for dev, old_lvs, new_lvs in iv_names.itervalues():
3899
      info("detaching %s drbd from local storage" % dev.iv_name)
3900
      if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
3901
        raise errors.OpExecError("Can't detach drbd from local storage on node"
3902
                                 " %s for device %s" % (tgt_node, dev.iv_name))
3903
      #dev.children = []
3904
      #cfg.Update(instance)
3905

    
3906
      # ok, we created the new LVs, so now we know we have the needed
3907
      # storage; as such, we proceed on the target node to rename
3908
      # old_lv to _old, and new_lv to old_lv; note that we rename LVs
3909
      # using the assumption that logical_id == physical_id (which in
3910
      # turn is the unique_id on that node)
3911

    
3912
      # FIXME(iustin): use a better name for the replaced LVs
3913
      temp_suffix = int(time.time())
3914
      ren_fn = lambda d, suff: (d.physical_id[0],
3915
                                d.physical_id[1] + "_replaced-%s" % suff)
3916
      # build the rename list based on what LVs exist on the node
3917
      rlist = []
3918
      for to_ren in old_lvs:
3919
        find_res = rpc.call_blockdev_find(tgt_node, to_ren)
3920
        if find_res is not None: # device exists
3921
          rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
3922

    
3923
      info("renaming the old LVs on the target node")
3924
      if not rpc.call_blockdev_rename(tgt_node, rlist):
3925
        raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
3926
      # now we rename the new LVs to the old LVs
3927
      info("renaming the new LVs on the target node")
3928
      rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
3929
      if not rpc.call_blockdev_rename(tgt_node, rlist):
3930
        raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
3931

    
3932
      for old, new in zip(old_lvs, new_lvs):
3933
        new.logical_id = old.logical_id
3934
        cfg.SetDiskID(new, tgt_node)
3935

    
3936
      for disk in old_lvs:
3937
        disk.logical_id = ren_fn(disk, temp_suffix)
3938
        cfg.SetDiskID(disk, tgt_node)
3939

    
3940
      # now that the new lvs have the old name, we can add them to the device
3941
      info("adding new mirror component on %s" % tgt_node)
3942
      if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
3943
        for new_lv in new_lvs:
3944
          if not rpc.call_blockdev_remove(tgt_node, new_lv):
3945
            warning("Can't rollback device %s", hint="manually cleanup unused"
3946
                    " logical volumes")
3947
        raise errors.OpExecError("Can't add local storage to drbd")
3948

    
3949
      dev.children = new_lvs
3950
      cfg.Update(instance)
3951

    
3952
    # Step: wait for sync
3953

    
3954
    # this can fail as the old devices are degraded and _WaitForSync
3955
    # does a combined result over all disks, so we don't check its
3956
    # return value
3957
    self.proc.LogStep(5, steps_total, "sync devices")
3958
    _WaitForSync(cfg, instance, self.proc, unlock=True)
3959

    
3960
    # so check manually all the devices
3961
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3962
      cfg.SetDiskID(dev, instance.primary_node)
3963
      is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3964
      if is_degr:
3965
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
3966

    
3967
    # Step: remove old storage
3968
    self.proc.LogStep(6, steps_total, "removing old storage")
3969
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3970
      info("remove logical volumes for %s" % name)
3971
      for lv in old_lvs:
3972
        cfg.SetDiskID(lv, tgt_node)
3973
        if not rpc.call_blockdev_remove(tgt_node, lv):
3974
          warning("Can't remove old LV", hint="manually remove unused LVs")
3975
          continue
3976

    
3977
  def _ExecD8Secondary(self, feedback_fn):
3978
    """Replace the secondary node for drbd8.
3979

3980
    The algorithm for replace is quite complicated:
3981
      - for all disks of the instance:
3982
        - create new LVs on the new node with same names
3983