Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ 38d7239a

History | View | Annotate | Download (187.9 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0613,W0201
25

    
26
import os
27
import os.path
28
import sha
29
import time
30
import tempfile
31
import re
32
import platform
33
import logging
34

    
35
from ganeti import rpc
36
from ganeti import ssh
37
from ganeti import logger
38
from ganeti import utils
39
from ganeti import errors
40
from ganeti import hypervisor
41
from ganeti import locking
42
from ganeti import constants
43
from ganeti import objects
44
from ganeti import opcodes
45
from ganeti import serializer
46

    
47

    
48
class LogicalUnit(object):
49
  """Logical Unit base class.
50

51
  Subclasses must follow these rules:
52
    - implement ExpandNames
53
    - implement CheckPrereq
54
    - implement Exec
55
    - implement BuildHooksEnv
56
    - redefine HPATH and HTYPE
57
    - optionally redefine their run requirements:
58
        REQ_MASTER: the LU needs to run on the master node
59
        REQ_WSSTORE: the LU needs a writable SimpleStore
60
        REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
61

62
  Note that all commands require root permissions.
63

64
  """
65
  HPATH = None
66
  HTYPE = None
67
  _OP_REQP = []
68
  REQ_MASTER = True
69
  REQ_WSSTORE = False
70
  REQ_BGL = True
71

    
72
  def __init__(self, processor, op, context, sstore):
73
    """Constructor for LogicalUnit.
74

75
    This needs to be overriden in derived classes in order to check op
76
    validity.
77

78
    """
79
    self.proc = processor
80
    self.op = op
81
    self.cfg = context.cfg
82
    self.sstore = sstore
83
    self.context = context
84
    # Dicts used to declare locking needs to mcpu
85
    self.needed_locks = None
86
    self.acquired_locks = {}
87
    self.share_locks = dict(((i, 0) for i in locking.LEVELS))
88
    self.add_locks = {}
89
    self.remove_locks = {}
90
    # Used to force good behavior when calling helper functions
91
    self.recalculate_locks = {}
92
    self.__ssh = None
93

    
94
    for attr_name in self._OP_REQP:
95
      attr_val = getattr(op, attr_name, None)
96
      if attr_val is None:
97
        raise errors.OpPrereqError("Required parameter '%s' missing" %
98
                                   attr_name)
99

    
100
    if not self.cfg.IsCluster():
101
      raise errors.OpPrereqError("Cluster not initialized yet,"
102
                                 " use 'gnt-cluster init' first.")
103
    if self.REQ_MASTER:
104
      master = sstore.GetMasterNode()
105
      if master != utils.HostInfo().name:
106
        raise errors.OpPrereqError("Commands must be run on the master"
107
                                   " node %s" % master)
108

    
109
  def __GetSSH(self):
110
    """Returns the SshRunner object
111

112
    """
113
    if not self.__ssh:
114
      self.__ssh = ssh.SshRunner(self.sstore)
115
    return self.__ssh
116

    
117
  ssh = property(fget=__GetSSH)
118

    
119
  def ExpandNames(self):
120
    """Expand names for this LU.
121

122
    This method is called before starting to execute the opcode, and it should
123
    update all the parameters of the opcode to their canonical form (e.g. a
124
    short node name must be fully expanded after this method has successfully
125
    completed). This way locking, hooks, logging, ecc. can work correctly.
126

127
    LUs which implement this method must also populate the self.needed_locks
128
    member, as a dict with lock levels as keys, and a list of needed lock names
129
    as values. Rules:
130
      - Use an empty dict if you don't need any lock
131
      - If you don't need any lock at a particular level omit that level
132
      - Don't put anything for the BGL level
133
      - If you want all locks at a level use locking.ALL_SET as a value
134

135
    If you need to share locks (rather than acquire them exclusively) at one
136
    level you can modify self.share_locks, setting a true value (usually 1) for
137
    that level. By default locks are not shared.
138

139
    Examples:
140
    # Acquire all nodes and one instance
141
    self.needed_locks = {
142
      locking.LEVEL_NODE: locking.ALL_SET,
143
      locking.LEVEL_INSTANCE: ['instance1.example.tld'],
144
    }
145
    # Acquire just two nodes
146
    self.needed_locks = {
147
      locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
148
    }
149
    # Acquire no locks
150
    self.needed_locks = {} # No, you can't leave it to the default value None
151

152
    """
153
    # The implementation of this method is mandatory only if the new LU is
154
    # concurrent, so that old LUs don't need to be changed all at the same
155
    # time.
156
    if self.REQ_BGL:
157
      self.needed_locks = {} # Exclusive LUs don't need locks.
158
    else:
159
      raise NotImplementedError
160

    
161
  def DeclareLocks(self, level):
162
    """Declare LU locking needs for a level
163

164
    While most LUs can just declare their locking needs at ExpandNames time,
165
    sometimes there's the need to calculate some locks after having acquired
166
    the ones before. This function is called just before acquiring locks at a
167
    particular level, but after acquiring the ones at lower levels, and permits
168
    such calculations. It can be used to modify self.needed_locks, and by
169
    default it does nothing.
170

171
    This function is only called if you have something already set in
172
    self.needed_locks for the level.
173

174
    @param level: Locking level which is going to be locked
175
    @type level: member of ganeti.locking.LEVELS
176

177
    """
178

    
179
  def CheckPrereq(self):
180
    """Check prerequisites for this LU.
181

182
    This method should check that the prerequisites for the execution
183
    of this LU are fulfilled. It can do internode communication, but
184
    it should be idempotent - no cluster or system changes are
185
    allowed.
186

187
    The method should raise errors.OpPrereqError in case something is
188
    not fulfilled. Its return value is ignored.
189

190
    This method should also update all the parameters of the opcode to
191
    their canonical form if it hasn't been done by ExpandNames before.
192

193
    """
194
    raise NotImplementedError
195

    
196
  def Exec(self, feedback_fn):
197
    """Execute the LU.
198

199
    This method should implement the actual work. It should raise
200
    errors.OpExecError for failures that are somewhat dealt with in
201
    code, or expected.
202

203
    """
204
    raise NotImplementedError
205

    
206
  def BuildHooksEnv(self):
207
    """Build hooks environment for this LU.
208

209
    This method should return a three-node tuple consisting of: a dict
210
    containing the environment that will be used for running the
211
    specific hook for this LU, a list of node names on which the hook
212
    should run before the execution, and a list of node names on which
213
    the hook should run after the execution.
214

215
    The keys of the dict must not have 'GANETI_' prefixed as this will
216
    be handled in the hooks runner. Also note additional keys will be
217
    added by the hooks runner. If the LU doesn't define any
218
    environment, an empty dict (and not None) should be returned.
219

220
    No nodes should be returned as an empty list (and not None).
221

222
    Note that if the HPATH for a LU class is None, this function will
223
    not be called.
224

225
    """
226
    raise NotImplementedError
227

    
228
  def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
229
    """Notify the LU about the results of its hooks.
230

231
    This method is called every time a hooks phase is executed, and notifies
232
    the Logical Unit about the hooks' result. The LU can then use it to alter
233
    its result based on the hooks.  By default the method does nothing and the
234
    previous result is passed back unchanged but any LU can define it if it
235
    wants to use the local cluster hook-scripts somehow.
236

237
    Args:
238
      phase: the hooks phase that has just been run
239
      hooks_results: the results of the multi-node hooks rpc call
240
      feedback_fn: function to send feedback back to the caller
241
      lu_result: the previous result this LU had, or None in the PRE phase.
242

243
    """
244
    return lu_result
245

    
246
  def _ExpandAndLockInstance(self):
247
    """Helper function to expand and lock an instance.
248

249
    Many LUs that work on an instance take its name in self.op.instance_name
250
    and need to expand it and then declare the expanded name for locking. This
251
    function does it, and then updates self.op.instance_name to the expanded
252
    name. It also initializes needed_locks as a dict, if this hasn't been done
253
    before.
254

255
    """
256
    if self.needed_locks is None:
257
      self.needed_locks = {}
258
    else:
259
      assert locking.LEVEL_INSTANCE not in self.needed_locks, \
260
        "_ExpandAndLockInstance called with instance-level locks set"
261
    expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
262
    if expanded_name is None:
263
      raise errors.OpPrereqError("Instance '%s' not known" %
264
                                  self.op.instance_name)
265
    self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
266
    self.op.instance_name = expanded_name
267

    
268
  def _LockInstancesNodes(self, primary_only=False):
269
    """Helper function to declare instances' nodes for locking.
270

271
    This function should be called after locking one or more instances to lock
272
    their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
273
    with all primary or secondary nodes for instances already locked and
274
    present in self.needed_locks[locking.LEVEL_INSTANCE].
275

276
    It should be called from DeclareLocks, and for safety only works if
277
    self.recalculate_locks[locking.LEVEL_NODE] is set.
278

279
    In the future it may grow parameters to just lock some instance's nodes, or
280
    to just lock primaries or secondary nodes, if needed.
281

282
    If should be called in DeclareLocks in a way similar to:
283

284
    if level == locking.LEVEL_NODE:
285
      self._LockInstancesNodes()
286

287
    @type primary_only: boolean
288
    @param primary_only: only lock primary nodes of locked instances
289

290
    """
291
    assert locking.LEVEL_NODE in self.recalculate_locks, \
292
      "_LockInstancesNodes helper function called with no nodes to recalculate"
293

    
294
    # TODO: check if we're really been called with the instance locks held
295

    
296
    # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
297
    # future we might want to have different behaviors depending on the value
298
    # of self.recalculate_locks[locking.LEVEL_NODE]
299
    wanted_nodes = []
300
    for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
301
      instance = self.context.cfg.GetInstanceInfo(instance_name)
302
      wanted_nodes.append(instance.primary_node)
303
      if not primary_only:
304
        wanted_nodes.extend(instance.secondary_nodes)
305

    
306
    if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
307
      self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
308
    elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
309
      self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
310

    
311
    del self.recalculate_locks[locking.LEVEL_NODE]
312

    
313

    
314
class NoHooksLU(LogicalUnit):
315
  """Simple LU which runs no hooks.
316

317
  This LU is intended as a parent for other LogicalUnits which will
318
  run no hooks, in order to reduce duplicate code.
319

320
  """
321
  HPATH = None
322
  HTYPE = None
323

    
324

    
325
def _GetWantedNodes(lu, nodes):
326
  """Returns list of checked and expanded node names.
327

328
  Args:
329
    nodes: List of nodes (strings) or None for all
330

331
  """
332
  if not isinstance(nodes, list):
333
    raise errors.OpPrereqError("Invalid argument type 'nodes'")
334

    
335
  if not nodes:
336
    raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
337
      " non-empty list of nodes whose name is to be expanded.")
338

    
339
  wanted = []
340
  for name in nodes:
341
    node = lu.cfg.ExpandNodeName(name)
342
    if node is None:
343
      raise errors.OpPrereqError("No such node name '%s'" % name)
344
    wanted.append(node)
345

    
346
  return utils.NiceSort(wanted)
347

    
348

    
349
def _GetWantedInstances(lu, instances):
350
  """Returns list of checked and expanded instance names.
351

352
  Args:
353
    instances: List of instances (strings) or None for all
354

355
  """
356
  if not isinstance(instances, list):
357
    raise errors.OpPrereqError("Invalid argument type 'instances'")
358

    
359
  if instances:
360
    wanted = []
361

    
362
    for name in instances:
363
      instance = lu.cfg.ExpandInstanceName(name)
364
      if instance is None:
365
        raise errors.OpPrereqError("No such instance name '%s'" % name)
366
      wanted.append(instance)
367

    
368
  else:
369
    wanted = lu.cfg.GetInstanceList()
370
  return utils.NiceSort(wanted)
371

    
372

    
373
def _CheckOutputFields(static, dynamic, selected):
374
  """Checks whether all selected fields are valid.
375

376
  Args:
377
    static: Static fields
378
    dynamic: Dynamic fields
379

380
  """
381
  static_fields = frozenset(static)
382
  dynamic_fields = frozenset(dynamic)
383

    
384
  all_fields = static_fields | dynamic_fields
385

    
386
  if not all_fields.issuperset(selected):
387
    raise errors.OpPrereqError("Unknown output fields selected: %s"
388
                               % ",".join(frozenset(selected).
389
                                          difference(all_fields)))
390

    
391

    
392
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
393
                          memory, vcpus, nics):
394
  """Builds instance related env variables for hooks from single variables.
395

396
  Args:
397
    secondary_nodes: List of secondary nodes as strings
398
  """
399
  env = {
400
    "OP_TARGET": name,
401
    "INSTANCE_NAME": name,
402
    "INSTANCE_PRIMARY": primary_node,
403
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
404
    "INSTANCE_OS_TYPE": os_type,
405
    "INSTANCE_STATUS": status,
406
    "INSTANCE_MEMORY": memory,
407
    "INSTANCE_VCPUS": vcpus,
408
  }
409

    
410
  if nics:
411
    nic_count = len(nics)
412
    for idx, (ip, bridge, mac) in enumerate(nics):
413
      if ip is None:
414
        ip = ""
415
      env["INSTANCE_NIC%d_IP" % idx] = ip
416
      env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
417
      env["INSTANCE_NIC%d_HWADDR" % idx] = mac
418
  else:
419
    nic_count = 0
420

    
421
  env["INSTANCE_NIC_COUNT"] = nic_count
422

    
423
  return env
424

    
425

    
426
def _BuildInstanceHookEnvByObject(instance, override=None):
427
  """Builds instance related env variables for hooks from an object.
428

429
  Args:
430
    instance: objects.Instance object of instance
431
    override: dict of values to override
432
  """
433
  args = {
434
    'name': instance.name,
435
    'primary_node': instance.primary_node,
436
    'secondary_nodes': instance.secondary_nodes,
437
    'os_type': instance.os,
438
    'status': instance.os,
439
    'memory': instance.memory,
440
    'vcpus': instance.vcpus,
441
    'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
442
  }
443
  if override:
444
    args.update(override)
445
  return _BuildInstanceHookEnv(**args)
446

    
447

    
448
def _CheckInstanceBridgesExist(instance):
449
  """Check that the brigdes needed by an instance exist.
450

451
  """
452
  # check bridges existance
453
  brlist = [nic.bridge for nic in instance.nics]
454
  if not rpc.call_bridges_exist(instance.primary_node, brlist):
455
    raise errors.OpPrereqError("one or more target bridges %s does not"
456
                               " exist on destination node '%s'" %
457
                               (brlist, instance.primary_node))
458

    
459

    
460
class LUDestroyCluster(NoHooksLU):
461
  """Logical unit for destroying the cluster.
462

463
  """
464
  _OP_REQP = []
465

    
466
  def CheckPrereq(self):
467
    """Check prerequisites.
468

469
    This checks whether the cluster is empty.
470

471
    Any errors are signalled by raising errors.OpPrereqError.
472

473
    """
474
    master = self.sstore.GetMasterNode()
475

    
476
    nodelist = self.cfg.GetNodeList()
477
    if len(nodelist) != 1 or nodelist[0] != master:
478
      raise errors.OpPrereqError("There are still %d node(s) in"
479
                                 " this cluster." % (len(nodelist) - 1))
480
    instancelist = self.cfg.GetInstanceList()
481
    if instancelist:
482
      raise errors.OpPrereqError("There are still %d instance(s) in"
483
                                 " this cluster." % len(instancelist))
484

    
485
  def Exec(self, feedback_fn):
486
    """Destroys the cluster.
487

488
    """
489
    master = self.sstore.GetMasterNode()
490
    if not rpc.call_node_stop_master(master, False):
491
      raise errors.OpExecError("Could not disable the master role")
492
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
493
    utils.CreateBackup(priv_key)
494
    utils.CreateBackup(pub_key)
495
    return master
496

    
497

    
498
class LUVerifyCluster(LogicalUnit):
499
  """Verifies the cluster status.
500

501
  """
502
  HPATH = "cluster-verify"
503
  HTYPE = constants.HTYPE_CLUSTER
504
  _OP_REQP = ["skip_checks"]
505
  REQ_BGL = False
506

    
507
  def ExpandNames(self):
508
    self.needed_locks = {
509
      locking.LEVEL_NODE: locking.ALL_SET,
510
      locking.LEVEL_INSTANCE: locking.ALL_SET,
511
    }
512
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
513

    
514
  def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
515
                  remote_version, feedback_fn):
516
    """Run multiple tests against a node.
517

518
    Test list:
519
      - compares ganeti version
520
      - checks vg existance and size > 20G
521
      - checks config file checksum
522
      - checks ssh to other nodes
523

524
    Args:
525
      node: name of the node to check
526
      file_list: required list of files
527
      local_cksum: dictionary of local files and their checksums
528

529
    """
530
    # compares ganeti version
531
    local_version = constants.PROTOCOL_VERSION
532
    if not remote_version:
533
      feedback_fn("  - ERROR: connection to %s failed" % (node))
534
      return True
535

    
536
    if local_version != remote_version:
537
      feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
538
                      (local_version, node, remote_version))
539
      return True
540

    
541
    # checks vg existance and size > 20G
542

    
543
    bad = False
544
    if not vglist:
545
      feedback_fn("  - ERROR: unable to check volume groups on node %s." %
546
                      (node,))
547
      bad = True
548
    else:
549
      vgstatus = utils.CheckVolumeGroupSize(vglist, self.cfg.GetVGName(),
550
                                            constants.MIN_VG_SIZE)
551
      if vgstatus:
552
        feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
553
        bad = True
554

    
555
    # checks config file checksum
556
    # checks ssh to any
557

    
558
    if 'filelist' not in node_result:
559
      bad = True
560
      feedback_fn("  - ERROR: node hasn't returned file checksum data")
561
    else:
562
      remote_cksum = node_result['filelist']
563
      for file_name in file_list:
564
        if file_name not in remote_cksum:
565
          bad = True
566
          feedback_fn("  - ERROR: file '%s' missing" % file_name)
567
        elif remote_cksum[file_name] != local_cksum[file_name]:
568
          bad = True
569
          feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
570

    
571
    if 'nodelist' not in node_result:
572
      bad = True
573
      feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
574
    else:
575
      if node_result['nodelist']:
576
        bad = True
577
        for node in node_result['nodelist']:
578
          feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
579
                          (node, node_result['nodelist'][node]))
580
    if 'node-net-test' not in node_result:
581
      bad = True
582
      feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
583
    else:
584
      if node_result['node-net-test']:
585
        bad = True
586
        nlist = utils.NiceSort(node_result['node-net-test'].keys())
587
        for node in nlist:
588
          feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
589
                          (node, node_result['node-net-test'][node]))
590

    
591
    hyp_result = node_result.get('hypervisor', None)
592
    if hyp_result is not None:
593
      feedback_fn("  - ERROR: hypervisor verify failure: '%s'" % hyp_result)
594
    return bad
595

    
596
  def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
597
                      node_instance, feedback_fn):
598
    """Verify an instance.
599

600
    This function checks to see if the required block devices are
601
    available on the instance's node.
602

603
    """
604
    bad = False
605

    
606
    node_current = instanceconfig.primary_node
607

    
608
    node_vol_should = {}
609
    instanceconfig.MapLVsByNode(node_vol_should)
610

    
611
    for node in node_vol_should:
612
      for volume in node_vol_should[node]:
613
        if node not in node_vol_is or volume not in node_vol_is[node]:
614
          feedback_fn("  - ERROR: volume %s missing on node %s" %
615
                          (volume, node))
616
          bad = True
617

    
618
    if not instanceconfig.status == 'down':
619
      if (node_current not in node_instance or
620
          not instance in node_instance[node_current]):
621
        feedback_fn("  - ERROR: instance %s not running on node %s" %
622
                        (instance, node_current))
623
        bad = True
624

    
625
    for node in node_instance:
626
      if (not node == node_current):
627
        if instance in node_instance[node]:
628
          feedback_fn("  - ERROR: instance %s should not run on node %s" %
629
                          (instance, node))
630
          bad = True
631

    
632
    return bad
633

    
634
  def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
635
    """Verify if there are any unknown volumes in the cluster.
636

637
    The .os, .swap and backup volumes are ignored. All other volumes are
638
    reported as unknown.
639

640
    """
641
    bad = False
642

    
643
    for node in node_vol_is:
644
      for volume in node_vol_is[node]:
645
        if node not in node_vol_should or volume not in node_vol_should[node]:
646
          feedback_fn("  - ERROR: volume %s on node %s should not exist" %
647
                      (volume, node))
648
          bad = True
649
    return bad
650

    
651
  def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
652
    """Verify the list of running instances.
653

654
    This checks what instances are running but unknown to the cluster.
655

656
    """
657
    bad = False
658
    for node in node_instance:
659
      for runninginstance in node_instance[node]:
660
        if runninginstance not in instancelist:
661
          feedback_fn("  - ERROR: instance %s on node %s should not exist" %
662
                          (runninginstance, node))
663
          bad = True
664
    return bad
665

    
666
  def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
667
    """Verify N+1 Memory Resilience.
668

669
    Check that if one single node dies we can still start all the instances it
670
    was primary for.
671

672
    """
673
    bad = False
674

    
675
    for node, nodeinfo in node_info.iteritems():
676
      # This code checks that every node which is now listed as secondary has
677
      # enough memory to host all instances it is supposed to should a single
678
      # other node in the cluster fail.
679
      # FIXME: not ready for failover to an arbitrary node
680
      # FIXME: does not support file-backed instances
681
      # WARNING: we currently take into account down instances as well as up
682
      # ones, considering that even if they're down someone might want to start
683
      # them even in the event of a node failure.
684
      for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
685
        needed_mem = 0
686
        for instance in instances:
687
          needed_mem += instance_cfg[instance].memory
688
        if nodeinfo['mfree'] < needed_mem:
689
          feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
690
                      " failovers should node %s fail" % (node, prinode))
691
          bad = True
692
    return bad
693

    
694
  def CheckPrereq(self):
695
    """Check prerequisites.
696

697
    Transform the list of checks we're going to skip into a set and check that
698
    all its members are valid.
699

700
    """
701
    self.skip_set = frozenset(self.op.skip_checks)
702
    if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
703
      raise errors.OpPrereqError("Invalid checks to be skipped specified")
704

    
705
  def BuildHooksEnv(self):
706
    """Build hooks env.
707

708
    Cluster-Verify hooks just rone in the post phase and their failure makes
709
    the output be logged in the verify output and the verification to fail.
710

711
    """
712
    all_nodes = self.cfg.GetNodeList()
713
    # TODO: populate the environment with useful information for verify hooks
714
    env = {}
715
    return env, [], all_nodes
716

    
717
  def Exec(self, feedback_fn):
718
    """Verify integrity of cluster, performing various test on nodes.
719

720
    """
721
    bad = False
722
    feedback_fn("* Verifying global settings")
723
    for msg in self.cfg.VerifyConfig():
724
      feedback_fn("  - ERROR: %s" % msg)
725

    
726
    vg_name = self.cfg.GetVGName()
727
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
728
    nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
729
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
730
    i_non_redundant = [] # Non redundant instances
731
    node_volume = {}
732
    node_instance = {}
733
    node_info = {}
734
    instance_cfg = {}
735

    
736
    # FIXME: verify OS list
737
    # do local checksums
738
    file_names = list(self.sstore.GetFileList())
739
    file_names.append(constants.SSL_CERT_FILE)
740
    file_names.append(constants.CLUSTER_CONF_FILE)
741
    local_checksums = utils.FingerprintFiles(file_names)
742

    
743
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
744
    all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
745
    all_instanceinfo = rpc.call_instance_list(nodelist)
746
    all_vglist = rpc.call_vg_list(nodelist)
747
    node_verify_param = {
748
      'filelist': file_names,
749
      'nodelist': nodelist,
750
      'hypervisor': None,
751
      'node-net-test': [(node.name, node.primary_ip, node.secondary_ip)
752
                        for node in nodeinfo]
753
      }
754
    all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
755
    all_rversion = rpc.call_version(nodelist)
756
    all_ninfo = rpc.call_node_info(nodelist, self.cfg.GetVGName())
757

    
758
    for node in nodelist:
759
      feedback_fn("* Verifying node %s" % node)
760
      result = self._VerifyNode(node, file_names, local_checksums,
761
                                all_vglist[node], all_nvinfo[node],
762
                                all_rversion[node], feedback_fn)
763
      bad = bad or result
764

    
765
      # node_volume
766
      volumeinfo = all_volumeinfo[node]
767

    
768
      if isinstance(volumeinfo, basestring):
769
        feedback_fn("  - ERROR: LVM problem on node %s: %s" %
770
                    (node, volumeinfo[-400:].encode('string_escape')))
771
        bad = True
772
        node_volume[node] = {}
773
      elif not isinstance(volumeinfo, dict):
774
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
775
        bad = True
776
        continue
777
      else:
778
        node_volume[node] = volumeinfo
779

    
780
      # node_instance
781
      nodeinstance = all_instanceinfo[node]
782
      if type(nodeinstance) != list:
783
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
784
        bad = True
785
        continue
786

    
787
      node_instance[node] = nodeinstance
788

    
789
      # node_info
790
      nodeinfo = all_ninfo[node]
791
      if not isinstance(nodeinfo, dict):
792
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
793
        bad = True
794
        continue
795

    
796
      try:
797
        node_info[node] = {
798
          "mfree": int(nodeinfo['memory_free']),
799
          "dfree": int(nodeinfo['vg_free']),
800
          "pinst": [],
801
          "sinst": [],
802
          # dictionary holding all instances this node is secondary for,
803
          # grouped by their primary node. Each key is a cluster node, and each
804
          # value is a list of instances which have the key as primary and the
805
          # current node as secondary.  this is handy to calculate N+1 memory
806
          # availability if you can only failover from a primary to its
807
          # secondary.
808
          "sinst-by-pnode": {},
809
        }
810
      except ValueError:
811
        feedback_fn("  - ERROR: invalid value returned from node %s" % (node,))
812
        bad = True
813
        continue
814

    
815
    node_vol_should = {}
816

    
817
    for instance in instancelist:
818
      feedback_fn("* Verifying instance %s" % instance)
819
      inst_config = self.cfg.GetInstanceInfo(instance)
820
      result =  self._VerifyInstance(instance, inst_config, node_volume,
821
                                     node_instance, feedback_fn)
822
      bad = bad or result
823

    
824
      inst_config.MapLVsByNode(node_vol_should)
825

    
826
      instance_cfg[instance] = inst_config
827

    
828
      pnode = inst_config.primary_node
829
      if pnode in node_info:
830
        node_info[pnode]['pinst'].append(instance)
831
      else:
832
        feedback_fn("  - ERROR: instance %s, connection to primary node"
833
                    " %s failed" % (instance, pnode))
834
        bad = True
835

    
836
      # If the instance is non-redundant we cannot survive losing its primary
837
      # node, so we are not N+1 compliant. On the other hand we have no disk
838
      # templates with more than one secondary so that situation is not well
839
      # supported either.
840
      # FIXME: does not support file-backed instances
841
      if len(inst_config.secondary_nodes) == 0:
842
        i_non_redundant.append(instance)
843
      elif len(inst_config.secondary_nodes) > 1:
844
        feedback_fn("  - WARNING: multiple secondaries for instance %s"
845
                    % instance)
846

    
847
      for snode in inst_config.secondary_nodes:
848
        if snode in node_info:
849
          node_info[snode]['sinst'].append(instance)
850
          if pnode not in node_info[snode]['sinst-by-pnode']:
851
            node_info[snode]['sinst-by-pnode'][pnode] = []
852
          node_info[snode]['sinst-by-pnode'][pnode].append(instance)
853
        else:
854
          feedback_fn("  - ERROR: instance %s, connection to secondary node"
855
                      " %s failed" % (instance, snode))
856

    
857
    feedback_fn("* Verifying orphan volumes")
858
    result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
859
                                       feedback_fn)
860
    bad = bad or result
861

    
862
    feedback_fn("* Verifying remaining instances")
863
    result = self._VerifyOrphanInstances(instancelist, node_instance,
864
                                         feedback_fn)
865
    bad = bad or result
866

    
867
    if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
868
      feedback_fn("* Verifying N+1 Memory redundancy")
869
      result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
870
      bad = bad or result
871

    
872
    feedback_fn("* Other Notes")
873
    if i_non_redundant:
874
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
875
                  % len(i_non_redundant))
876

    
877
    return not bad
878

    
879
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
880
    """Analize the post-hooks' result, handle it, and send some
881
    nicely-formatted feedback back to the user.
882

883
    Args:
884
      phase: the hooks phase that has just been run
885
      hooks_results: the results of the multi-node hooks rpc call
886
      feedback_fn: function to send feedback back to the caller
887
      lu_result: previous Exec result
888

889
    """
890
    # We only really run POST phase hooks, and are only interested in
891
    # their results
892
    if phase == constants.HOOKS_PHASE_POST:
893
      # Used to change hooks' output to proper indentation
894
      indent_re = re.compile('^', re.M)
895
      feedback_fn("* Hooks Results")
896
      if not hooks_results:
897
        feedback_fn("  - ERROR: general communication failure")
898
        lu_result = 1
899
      else:
900
        for node_name in hooks_results:
901
          show_node_header = True
902
          res = hooks_results[node_name]
903
          if res is False or not isinstance(res, list):
904
            feedback_fn("    Communication failure")
905
            lu_result = 1
906
            continue
907
          for script, hkr, output in res:
908
            if hkr == constants.HKR_FAIL:
909
              # The node header is only shown once, if there are
910
              # failing hooks on that node
911
              if show_node_header:
912
                feedback_fn("  Node %s:" % node_name)
913
                show_node_header = False
914
              feedback_fn("    ERROR: Script %s failed, output:" % script)
915
              output = indent_re.sub('      ', output)
916
              feedback_fn("%s" % output)
917
              lu_result = 1
918

    
919
      return lu_result
920

    
921

    
922
class LUVerifyDisks(NoHooksLU):
923
  """Verifies the cluster disks status.
924

925
  """
926
  _OP_REQP = []
927
  REQ_BGL = False
928

    
929
  def ExpandNames(self):
930
    self.needed_locks = {
931
      locking.LEVEL_NODE: locking.ALL_SET,
932
      locking.LEVEL_INSTANCE: locking.ALL_SET,
933
    }
934
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
935

    
936
  def CheckPrereq(self):
937
    """Check prerequisites.
938

939
    This has no prerequisites.
940

941
    """
942
    pass
943

    
944
  def Exec(self, feedback_fn):
945
    """Verify integrity of cluster disks.
946

947
    """
948
    result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
949

    
950
    vg_name = self.cfg.GetVGName()
951
    nodes = utils.NiceSort(self.cfg.GetNodeList())
952
    instances = [self.cfg.GetInstanceInfo(name)
953
                 for name in self.cfg.GetInstanceList()]
954

    
955
    nv_dict = {}
956
    for inst in instances:
957
      inst_lvs = {}
958
      if (inst.status != "up" or
959
          inst.disk_template not in constants.DTS_NET_MIRROR):
960
        continue
961
      inst.MapLVsByNode(inst_lvs)
962
      # transform { iname: {node: [vol,],},} to {(node, vol): iname}
963
      for node, vol_list in inst_lvs.iteritems():
964
        for vol in vol_list:
965
          nv_dict[(node, vol)] = inst
966

    
967
    if not nv_dict:
968
      return result
969

    
970
    node_lvs = rpc.call_volume_list(nodes, vg_name)
971

    
972
    to_act = set()
973
    for node in nodes:
974
      # node_volume
975
      lvs = node_lvs[node]
976

    
977
      if isinstance(lvs, basestring):
978
        logger.Info("error enumerating LVs on node %s: %s" % (node, lvs))
979
        res_nlvm[node] = lvs
980
      elif not isinstance(lvs, dict):
981
        logger.Info("connection to node %s failed or invalid data returned" %
982
                    (node,))
983
        res_nodes.append(node)
984
        continue
985

    
986
      for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
987
        inst = nv_dict.pop((node, lv_name), None)
988
        if (not lv_online and inst is not None
989
            and inst.name not in res_instances):
990
          res_instances.append(inst.name)
991

    
992
    # any leftover items in nv_dict are missing LVs, let's arrange the
993
    # data better
994
    for key, inst in nv_dict.iteritems():
995
      if inst.name not in res_missing:
996
        res_missing[inst.name] = []
997
      res_missing[inst.name].append(key)
998

    
999
    return result
1000

    
1001

    
1002
class LURenameCluster(LogicalUnit):
1003
  """Rename the cluster.
1004

1005
  """
1006
  HPATH = "cluster-rename"
1007
  HTYPE = constants.HTYPE_CLUSTER
1008
  _OP_REQP = ["name"]
1009
  REQ_WSSTORE = True
1010

    
1011
  def BuildHooksEnv(self):
1012
    """Build hooks env.
1013

1014
    """
1015
    env = {
1016
      "OP_TARGET": self.sstore.GetClusterName(),
1017
      "NEW_NAME": self.op.name,
1018
      }
1019
    mn = self.sstore.GetMasterNode()
1020
    return env, [mn], [mn]
1021

    
1022
  def CheckPrereq(self):
1023
    """Verify that the passed name is a valid one.
1024

1025
    """
1026
    hostname = utils.HostInfo(self.op.name)
1027

    
1028
    new_name = hostname.name
1029
    self.ip = new_ip = hostname.ip
1030
    old_name = self.sstore.GetClusterName()
1031
    old_ip = self.sstore.GetMasterIP()
1032
    if new_name == old_name and new_ip == old_ip:
1033
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
1034
                                 " cluster has changed")
1035
    if new_ip != old_ip:
1036
      if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1037
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
1038
                                   " reachable on the network. Aborting." %
1039
                                   new_ip)
1040

    
1041
    self.op.name = new_name
1042

    
1043
  def Exec(self, feedback_fn):
1044
    """Rename the cluster.
1045

1046
    """
1047
    clustername = self.op.name
1048
    ip = self.ip
1049
    ss = self.sstore
1050

    
1051
    # shutdown the master IP
1052
    master = ss.GetMasterNode()
1053
    if not rpc.call_node_stop_master(master, False):
1054
      raise errors.OpExecError("Could not disable the master role")
1055

    
1056
    try:
1057
      # modify the sstore
1058
      ss.SetKey(ss.SS_MASTER_IP, ip)
1059
      ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
1060

    
1061
      # Distribute updated ss config to all nodes
1062
      myself = self.cfg.GetNodeInfo(master)
1063
      dist_nodes = self.cfg.GetNodeList()
1064
      if myself.name in dist_nodes:
1065
        dist_nodes.remove(myself.name)
1066

    
1067
      logger.Debug("Copying updated ssconf data to all nodes")
1068
      for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1069
        fname = ss.KeyToFilename(keyname)
1070
        result = rpc.call_upload_file(dist_nodes, fname)
1071
        for to_node in dist_nodes:
1072
          if not result[to_node]:
1073
            logger.Error("copy of file %s to node %s failed" %
1074
                         (fname, to_node))
1075
    finally:
1076
      if not rpc.call_node_start_master(master, False):
1077
        logger.Error("Could not re-enable the master role on the master,"
1078
                     " please restart manually.")
1079

    
1080

    
1081
def _RecursiveCheckIfLVMBased(disk):
1082
  """Check if the given disk or its children are lvm-based.
1083

1084
  Args:
1085
    disk: ganeti.objects.Disk object
1086

1087
  Returns:
1088
    boolean indicating whether a LD_LV dev_type was found or not
1089

1090
  """
1091
  if disk.children:
1092
    for chdisk in disk.children:
1093
      if _RecursiveCheckIfLVMBased(chdisk):
1094
        return True
1095
  return disk.dev_type == constants.LD_LV
1096

    
1097

    
1098
class LUSetClusterParams(LogicalUnit):
1099
  """Change the parameters of the cluster.
1100

1101
  """
1102
  HPATH = "cluster-modify"
1103
  HTYPE = constants.HTYPE_CLUSTER
1104
  _OP_REQP = []
1105
  REQ_BGL = False
1106

    
1107
  def ExpandNames(self):
1108
    # FIXME: in the future maybe other cluster params won't require checking on
1109
    # all nodes to be modified.
1110
    self.needed_locks = {
1111
      locking.LEVEL_NODE: locking.ALL_SET,
1112
    }
1113
    self.share_locks[locking.LEVEL_NODE] = 1
1114

    
1115
  def BuildHooksEnv(self):
1116
    """Build hooks env.
1117

1118
    """
1119
    env = {
1120
      "OP_TARGET": self.sstore.GetClusterName(),
1121
      "NEW_VG_NAME": self.op.vg_name,
1122
      }
1123
    mn = self.sstore.GetMasterNode()
1124
    return env, [mn], [mn]
1125

    
1126
  def CheckPrereq(self):
1127
    """Check prerequisites.
1128

1129
    This checks whether the given params don't conflict and
1130
    if the given volume group is valid.
1131

1132
    """
1133
    # FIXME: This only works because there is only one parameter that can be
1134
    # changed or removed.
1135
    if not self.op.vg_name:
1136
      instances = self.cfg.GetAllInstancesInfo().values()
1137
      for inst in instances:
1138
        for disk in inst.disks:
1139
          if _RecursiveCheckIfLVMBased(disk):
1140
            raise errors.OpPrereqError("Cannot disable lvm storage while"
1141
                                       " lvm-based instances exist")
1142

    
1143
    # if vg_name not None, checks given volume group on all nodes
1144
    if self.op.vg_name:
1145
      node_list = self.acquired_locks[locking.LEVEL_NODE]
1146
      vglist = rpc.call_vg_list(node_list)
1147
      for node in node_list:
1148
        vgstatus = utils.CheckVolumeGroupSize(vglist[node], self.op.vg_name,
1149
                                              constants.MIN_VG_SIZE)
1150
        if vgstatus:
1151
          raise errors.OpPrereqError("Error on node '%s': %s" %
1152
                                     (node, vgstatus))
1153

    
1154
  def Exec(self, feedback_fn):
1155
    """Change the parameters of the cluster.
1156

1157
    """
1158
    if self.op.vg_name != self.cfg.GetVGName():
1159
      self.cfg.SetVGName(self.op.vg_name)
1160
    else:
1161
      feedback_fn("Cluster LVM configuration already in desired"
1162
                  " state, not changing")
1163

    
1164

    
1165
def _WaitForSync(cfgw, instance, proc, oneshot=False, unlock=False):
1166
  """Sleep and poll for an instance's disk to sync.
1167

1168
  """
1169
  if not instance.disks:
1170
    return True
1171

    
1172
  if not oneshot:
1173
    proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1174

    
1175
  node = instance.primary_node
1176

    
1177
  for dev in instance.disks:
1178
    cfgw.SetDiskID(dev, node)
1179

    
1180
  retries = 0
1181
  while True:
1182
    max_time = 0
1183
    done = True
1184
    cumul_degraded = False
1185
    rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1186
    if not rstats:
1187
      proc.LogWarning("Can't get any data from node %s" % node)
1188
      retries += 1
1189
      if retries >= 10:
1190
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1191
                                 " aborting." % node)
1192
      time.sleep(6)
1193
      continue
1194
    retries = 0
1195
    for i in range(len(rstats)):
1196
      mstat = rstats[i]
1197
      if mstat is None:
1198
        proc.LogWarning("Can't compute data for node %s/%s" %
1199
                        (node, instance.disks[i].iv_name))
1200
        continue
1201
      # we ignore the ldisk parameter
1202
      perc_done, est_time, is_degraded, _ = mstat
1203
      cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1204
      if perc_done is not None:
1205
        done = False
1206
        if est_time is not None:
1207
          rem_time = "%d estimated seconds remaining" % est_time
1208
          max_time = est_time
1209
        else:
1210
          rem_time = "no time estimate"
1211
        proc.LogInfo("- device %s: %5.2f%% done, %s" %
1212
                     (instance.disks[i].iv_name, perc_done, rem_time))
1213
    if done or oneshot:
1214
      break
1215

    
1216
    time.sleep(min(60, max_time))
1217

    
1218
  if done:
1219
    proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1220
  return not cumul_degraded
1221

    
1222

    
1223
def _CheckDiskConsistency(cfgw, dev, node, on_primary, ldisk=False):
1224
  """Check that mirrors are not degraded.
1225

1226
  The ldisk parameter, if True, will change the test from the
1227
  is_degraded attribute (which represents overall non-ok status for
1228
  the device(s)) to the ldisk (representing the local storage status).
1229

1230
  """
1231
  cfgw.SetDiskID(dev, node)
1232
  if ldisk:
1233
    idx = 6
1234
  else:
1235
    idx = 5
1236

    
1237
  result = True
1238
  if on_primary or dev.AssembleOnSecondary():
1239
    rstats = rpc.call_blockdev_find(node, dev)
1240
    if not rstats:
1241
      logger.ToStderr("Node %s: Disk degraded, not found or node down" % node)
1242
      result = False
1243
    else:
1244
      result = result and (not rstats[idx])
1245
  if dev.children:
1246
    for child in dev.children:
1247
      result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1248

    
1249
  return result
1250

    
1251

    
1252
class LUDiagnoseOS(NoHooksLU):
1253
  """Logical unit for OS diagnose/query.
1254

1255
  """
1256
  _OP_REQP = ["output_fields", "names"]
1257
  REQ_BGL = False
1258

    
1259
  def ExpandNames(self):
1260
    if self.op.names:
1261
      raise errors.OpPrereqError("Selective OS query not supported")
1262

    
1263
    self.dynamic_fields = frozenset(["name", "valid", "node_status"])
1264
    _CheckOutputFields(static=[],
1265
                       dynamic=self.dynamic_fields,
1266
                       selected=self.op.output_fields)
1267

    
1268
    # Lock all nodes, in shared mode
1269
    self.needed_locks = {}
1270
    self.share_locks[locking.LEVEL_NODE] = 1
1271
    self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1272

    
1273
  def CheckPrereq(self):
1274
    """Check prerequisites.
1275

1276
    """
1277

    
1278
  @staticmethod
1279
  def _DiagnoseByOS(node_list, rlist):
1280
    """Remaps a per-node return list into an a per-os per-node dictionary
1281

1282
      Args:
1283
        node_list: a list with the names of all nodes
1284
        rlist: a map with node names as keys and OS objects as values
1285

1286
      Returns:
1287
        map: a map with osnames as keys and as value another map, with
1288
             nodes as
1289
             keys and list of OS objects as values
1290
             e.g. {"debian-etch": {"node1": [<object>,...],
1291
                                   "node2": [<object>,]}
1292
                  }
1293

1294
    """
1295
    all_os = {}
1296
    for node_name, nr in rlist.iteritems():
1297
      if not nr:
1298
        continue
1299
      for os_obj in nr:
1300
        if os_obj.name not in all_os:
1301
          # build a list of nodes for this os containing empty lists
1302
          # for each node in node_list
1303
          all_os[os_obj.name] = {}
1304
          for nname in node_list:
1305
            all_os[os_obj.name][nname] = []
1306
        all_os[os_obj.name][node_name].append(os_obj)
1307
    return all_os
1308

    
1309
  def Exec(self, feedback_fn):
1310
    """Compute the list of OSes.
1311

1312
    """
1313
    node_list = self.acquired_locks[locking.LEVEL_NODE]
1314
    node_data = rpc.call_os_diagnose(node_list)
1315
    if node_data == False:
1316
      raise errors.OpExecError("Can't gather the list of OSes")
1317
    pol = self._DiagnoseByOS(node_list, node_data)
1318
    output = []
1319
    for os_name, os_data in pol.iteritems():
1320
      row = []
1321
      for field in self.op.output_fields:
1322
        if field == "name":
1323
          val = os_name
1324
        elif field == "valid":
1325
          val = utils.all([osl and osl[0] for osl in os_data.values()])
1326
        elif field == "node_status":
1327
          val = {}
1328
          for node_name, nos_list in os_data.iteritems():
1329
            val[node_name] = [(v.status, v.path) for v in nos_list]
1330
        else:
1331
          raise errors.ParameterError(field)
1332
        row.append(val)
1333
      output.append(row)
1334

    
1335
    return output
1336

    
1337

    
1338
class LURemoveNode(LogicalUnit):
1339
  """Logical unit for removing a node.
1340

1341
  """
1342
  HPATH = "node-remove"
1343
  HTYPE = constants.HTYPE_NODE
1344
  _OP_REQP = ["node_name"]
1345

    
1346
  def BuildHooksEnv(self):
1347
    """Build hooks env.
1348

1349
    This doesn't run on the target node in the pre phase as a failed
1350
    node would then be impossible to remove.
1351

1352
    """
1353
    env = {
1354
      "OP_TARGET": self.op.node_name,
1355
      "NODE_NAME": self.op.node_name,
1356
      }
1357
    all_nodes = self.cfg.GetNodeList()
1358
    all_nodes.remove(self.op.node_name)
1359
    return env, all_nodes, all_nodes
1360

    
1361
  def CheckPrereq(self):
1362
    """Check prerequisites.
1363

1364
    This checks:
1365
     - the node exists in the configuration
1366
     - it does not have primary or secondary instances
1367
     - it's not the master
1368

1369
    Any errors are signalled by raising errors.OpPrereqError.
1370

1371
    """
1372
    node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1373
    if node is None:
1374
      raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1375

    
1376
    instance_list = self.cfg.GetInstanceList()
1377

    
1378
    masternode = self.sstore.GetMasterNode()
1379
    if node.name == masternode:
1380
      raise errors.OpPrereqError("Node is the master node,"
1381
                                 " you need to failover first.")
1382

    
1383
    for instance_name in instance_list:
1384
      instance = self.cfg.GetInstanceInfo(instance_name)
1385
      if node.name == instance.primary_node:
1386
        raise errors.OpPrereqError("Instance %s still running on the node,"
1387
                                   " please remove first." % instance_name)
1388
      if node.name in instance.secondary_nodes:
1389
        raise errors.OpPrereqError("Instance %s has node as a secondary,"
1390
                                   " please remove first." % instance_name)
1391
    self.op.node_name = node.name
1392
    self.node = node
1393

    
1394
  def Exec(self, feedback_fn):
1395
    """Removes the node from the cluster.
1396

1397
    """
1398
    node = self.node
1399
    logger.Info("stopping the node daemon and removing configs from node %s" %
1400
                node.name)
1401

    
1402
    self.context.RemoveNode(node.name)
1403

    
1404
    rpc.call_node_leave_cluster(node.name)
1405

    
1406

    
1407
class LUQueryNodes(NoHooksLU):
1408
  """Logical unit for querying nodes.
1409

1410
  """
1411
  _OP_REQP = ["output_fields", "names"]
1412
  REQ_BGL = False
1413

    
1414
  def ExpandNames(self):
1415
    self.dynamic_fields = frozenset([
1416
      "dtotal", "dfree",
1417
      "mtotal", "mnode", "mfree",
1418
      "bootid",
1419
      "ctotal",
1420
      ])
1421

    
1422
    self.static_fields = frozenset([
1423
      "name", "pinst_cnt", "sinst_cnt",
1424
      "pinst_list", "sinst_list",
1425
      "pip", "sip", "tags",
1426
      "serial_no",
1427
      ])
1428

    
1429
    _CheckOutputFields(static=self.static_fields,
1430
                       dynamic=self.dynamic_fields,
1431
                       selected=self.op.output_fields)
1432

    
1433
    self.needed_locks = {}
1434
    self.share_locks[locking.LEVEL_NODE] = 1
1435

    
1436
    if self.op.names:
1437
      self.wanted = _GetWantedNodes(self, self.op.names)
1438
    else:
1439
      self.wanted = locking.ALL_SET
1440

    
1441
    self.do_locking = not self.static_fields.issuperset(self.op.output_fields)
1442
    if self.do_locking:
1443
      # if we don't request only static fields, we need to lock the nodes
1444
      self.needed_locks[locking.LEVEL_NODE] = self.wanted
1445

    
1446

    
1447
  def CheckPrereq(self):
1448
    """Check prerequisites.
1449

1450
    """
1451
    # The validation of the node list is done in the _GetWantedNodes,
1452
    # if non empty, and if empty, there's no validation to do
1453
    pass
1454

    
1455
  def Exec(self, feedback_fn):
1456
    """Computes the list of nodes and their attributes.
1457

1458
    """
1459
    all_info = self.cfg.GetAllNodesInfo()
1460
    if self.do_locking:
1461
      nodenames = self.acquired_locks[locking.LEVEL_NODE]
1462
    elif self.wanted != locking.ALL_SET:
1463
      nodenames = self.wanted
1464
      missing = set(nodenames).difference(all_info.keys())
1465
      if missing:
1466
        raise self.OpExecError(
1467
          "Some nodes were removed before retrieving their data: %s" % missing)
1468
    else:
1469
      nodenames = all_info.keys()
1470
    nodelist = [all_info[name] for name in nodenames]
1471

    
1472
    # begin data gathering
1473

    
1474
    if self.dynamic_fields.intersection(self.op.output_fields):
1475
      live_data = {}
1476
      node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1477
      for name in nodenames:
1478
        nodeinfo = node_data.get(name, None)
1479
        if nodeinfo:
1480
          live_data[name] = {
1481
            "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1482
            "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1483
            "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1484
            "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1485
            "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1486
            "ctotal": utils.TryConvert(int, nodeinfo['cpu_total']),
1487
            "bootid": nodeinfo['bootid'],
1488
            }
1489
        else:
1490
          live_data[name] = {}
1491
    else:
1492
      live_data = dict.fromkeys(nodenames, {})
1493

    
1494
    node_to_primary = dict([(name, set()) for name in nodenames])
1495
    node_to_secondary = dict([(name, set()) for name in nodenames])
1496

    
1497
    inst_fields = frozenset(("pinst_cnt", "pinst_list",
1498
                             "sinst_cnt", "sinst_list"))
1499
    if inst_fields & frozenset(self.op.output_fields):
1500
      instancelist = self.cfg.GetInstanceList()
1501

    
1502
      for instance_name in instancelist:
1503
        inst = self.cfg.GetInstanceInfo(instance_name)
1504
        if inst.primary_node in node_to_primary:
1505
          node_to_primary[inst.primary_node].add(inst.name)
1506
        for secnode in inst.secondary_nodes:
1507
          if secnode in node_to_secondary:
1508
            node_to_secondary[secnode].add(inst.name)
1509

    
1510
    # end data gathering
1511

    
1512
    output = []
1513
    for node in nodelist:
1514
      node_output = []
1515
      for field in self.op.output_fields:
1516
        if field == "name":
1517
          val = node.name
1518
        elif field == "pinst_list":
1519
          val = list(node_to_primary[node.name])
1520
        elif field == "sinst_list":
1521
          val = list(node_to_secondary[node.name])
1522
        elif field == "pinst_cnt":
1523
          val = len(node_to_primary[node.name])
1524
        elif field == "sinst_cnt":
1525
          val = len(node_to_secondary[node.name])
1526
        elif field == "pip":
1527
          val = node.primary_ip
1528
        elif field == "sip":
1529
          val = node.secondary_ip
1530
        elif field == "tags":
1531
          val = list(node.GetTags())
1532
        elif field == "serial_no":
1533
          val = node.serial_no
1534
        elif field in self.dynamic_fields:
1535
          val = live_data[node.name].get(field, None)
1536
        else:
1537
          raise errors.ParameterError(field)
1538
        node_output.append(val)
1539
      output.append(node_output)
1540

    
1541
    return output
1542

    
1543

    
1544
class LUQueryNodeVolumes(NoHooksLU):
1545
  """Logical unit for getting volumes on node(s).
1546

1547
  """
1548
  _OP_REQP = ["nodes", "output_fields"]
1549
  REQ_BGL = False
1550

    
1551
  def ExpandNames(self):
1552
    _CheckOutputFields(static=["node"],
1553
                       dynamic=["phys", "vg", "name", "size", "instance"],
1554
                       selected=self.op.output_fields)
1555

    
1556
    self.needed_locks = {}
1557
    self.share_locks[locking.LEVEL_NODE] = 1
1558
    if not self.op.nodes:
1559
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1560
    else:
1561
      self.needed_locks[locking.LEVEL_NODE] = \
1562
        _GetWantedNodes(self, self.op.nodes)
1563

    
1564
  def CheckPrereq(self):
1565
    """Check prerequisites.
1566

1567
    This checks that the fields required are valid output fields.
1568

1569
    """
1570
    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
1571

    
1572
  def Exec(self, feedback_fn):
1573
    """Computes the list of nodes and their attributes.
1574

1575
    """
1576
    nodenames = self.nodes
1577
    volumes = rpc.call_node_volumes(nodenames)
1578

    
1579
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
1580
             in self.cfg.GetInstanceList()]
1581

    
1582
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1583

    
1584
    output = []
1585
    for node in nodenames:
1586
      if node not in volumes or not volumes[node]:
1587
        continue
1588

    
1589
      node_vols = volumes[node][:]
1590
      node_vols.sort(key=lambda vol: vol['dev'])
1591

    
1592
      for vol in node_vols:
1593
        node_output = []
1594
        for field in self.op.output_fields:
1595
          if field == "node":
1596
            val = node
1597
          elif field == "phys":
1598
            val = vol['dev']
1599
          elif field == "vg":
1600
            val = vol['vg']
1601
          elif field == "name":
1602
            val = vol['name']
1603
          elif field == "size":
1604
            val = int(float(vol['size']))
1605
          elif field == "instance":
1606
            for inst in ilist:
1607
              if node not in lv_by_node[inst]:
1608
                continue
1609
              if vol['name'] in lv_by_node[inst][node]:
1610
                val = inst.name
1611
                break
1612
            else:
1613
              val = '-'
1614
          else:
1615
            raise errors.ParameterError(field)
1616
          node_output.append(str(val))
1617

    
1618
        output.append(node_output)
1619

    
1620
    return output
1621

    
1622

    
1623
class LUAddNode(LogicalUnit):
1624
  """Logical unit for adding node to the cluster.
1625

1626
  """
1627
  HPATH = "node-add"
1628
  HTYPE = constants.HTYPE_NODE
1629
  _OP_REQP = ["node_name"]
1630

    
1631
  def BuildHooksEnv(self):
1632
    """Build hooks env.
1633

1634
    This will run on all nodes before, and on all nodes + the new node after.
1635

1636
    """
1637
    env = {
1638
      "OP_TARGET": self.op.node_name,
1639
      "NODE_NAME": self.op.node_name,
1640
      "NODE_PIP": self.op.primary_ip,
1641
      "NODE_SIP": self.op.secondary_ip,
1642
      }
1643
    nodes_0 = self.cfg.GetNodeList()
1644
    nodes_1 = nodes_0 + [self.op.node_name, ]
1645
    return env, nodes_0, nodes_1
1646

    
1647
  def CheckPrereq(self):
1648
    """Check prerequisites.
1649

1650
    This checks:
1651
     - the new node is not already in the config
1652
     - it is resolvable
1653
     - its parameters (single/dual homed) matches the cluster
1654

1655
    Any errors are signalled by raising errors.OpPrereqError.
1656

1657
    """
1658
    node_name = self.op.node_name
1659
    cfg = self.cfg
1660

    
1661
    dns_data = utils.HostInfo(node_name)
1662

    
1663
    node = dns_data.name
1664
    primary_ip = self.op.primary_ip = dns_data.ip
1665
    secondary_ip = getattr(self.op, "secondary_ip", None)
1666
    if secondary_ip is None:
1667
      secondary_ip = primary_ip
1668
    if not utils.IsValidIP(secondary_ip):
1669
      raise errors.OpPrereqError("Invalid secondary IP given")
1670
    self.op.secondary_ip = secondary_ip
1671

    
1672
    node_list = cfg.GetNodeList()
1673
    if not self.op.readd and node in node_list:
1674
      raise errors.OpPrereqError("Node %s is already in the configuration" %
1675
                                 node)
1676
    elif self.op.readd and node not in node_list:
1677
      raise errors.OpPrereqError("Node %s is not in the configuration" % node)
1678

    
1679
    for existing_node_name in node_list:
1680
      existing_node = cfg.GetNodeInfo(existing_node_name)
1681

    
1682
      if self.op.readd and node == existing_node_name:
1683
        if (existing_node.primary_ip != primary_ip or
1684
            existing_node.secondary_ip != secondary_ip):
1685
          raise errors.OpPrereqError("Readded node doesn't have the same IP"
1686
                                     " address configuration as before")
1687
        continue
1688

    
1689
      if (existing_node.primary_ip == primary_ip or
1690
          existing_node.secondary_ip == primary_ip or
1691
          existing_node.primary_ip == secondary_ip or
1692
          existing_node.secondary_ip == secondary_ip):
1693
        raise errors.OpPrereqError("New node ip address(es) conflict with"
1694
                                   " existing node %s" % existing_node.name)
1695

    
1696
    # check that the type of the node (single versus dual homed) is the
1697
    # same as for the master
1698
    myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1699
    master_singlehomed = myself.secondary_ip == myself.primary_ip
1700
    newbie_singlehomed = secondary_ip == primary_ip
1701
    if master_singlehomed != newbie_singlehomed:
1702
      if master_singlehomed:
1703
        raise errors.OpPrereqError("The master has no private ip but the"
1704
                                   " new node has one")
1705
      else:
1706
        raise errors.OpPrereqError("The master has a private ip but the"
1707
                                   " new node doesn't have one")
1708

    
1709
    # checks reachablity
1710
    if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1711
      raise errors.OpPrereqError("Node not reachable by ping")
1712

    
1713
    if not newbie_singlehomed:
1714
      # check reachability from my secondary ip to newbie's secondary ip
1715
      if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1716
                           source=myself.secondary_ip):
1717
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1718
                                   " based ping to noded port")
1719

    
1720
    self.new_node = objects.Node(name=node,
1721
                                 primary_ip=primary_ip,
1722
                                 secondary_ip=secondary_ip)
1723

    
1724
  def Exec(self, feedback_fn):
1725
    """Adds the new node to the cluster.
1726

1727
    """
1728
    new_node = self.new_node
1729
    node = new_node.name
1730

    
1731
    # check connectivity
1732
    result = rpc.call_version([node])[node]
1733
    if result:
1734
      if constants.PROTOCOL_VERSION == result:
1735
        logger.Info("communication to node %s fine, sw version %s match" %
1736
                    (node, result))
1737
      else:
1738
        raise errors.OpExecError("Version mismatch master version %s,"
1739
                                 " node version %s" %
1740
                                 (constants.PROTOCOL_VERSION, result))
1741
    else:
1742
      raise errors.OpExecError("Cannot get version from the new node")
1743

    
1744
    # setup ssh on node
1745
    logger.Info("copy ssh key to node %s" % node)
1746
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1747
    keyarray = []
1748
    keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1749
                constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1750
                priv_key, pub_key]
1751

    
1752
    for i in keyfiles:
1753
      f = open(i, 'r')
1754
      try:
1755
        keyarray.append(f.read())
1756
      finally:
1757
        f.close()
1758

    
1759
    result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1760
                               keyarray[3], keyarray[4], keyarray[5])
1761

    
1762
    if not result:
1763
      raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1764

    
1765
    # Add node to our /etc/hosts, and add key to known_hosts
1766
    utils.AddHostToEtcHosts(new_node.name)
1767

    
1768
    if new_node.secondary_ip != new_node.primary_ip:
1769
      if not rpc.call_node_tcp_ping(new_node.name,
1770
                                    constants.LOCALHOST_IP_ADDRESS,
1771
                                    new_node.secondary_ip,
1772
                                    constants.DEFAULT_NODED_PORT,
1773
                                    10, False):
1774
        raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1775
                                 " you gave (%s). Please fix and re-run this"
1776
                                 " command." % new_node.secondary_ip)
1777

    
1778
    node_verify_list = [self.sstore.GetMasterNode()]
1779
    node_verify_param = {
1780
      'nodelist': [node],
1781
      # TODO: do a node-net-test as well?
1782
    }
1783

    
1784
    result = rpc.call_node_verify(node_verify_list, node_verify_param)
1785
    for verifier in node_verify_list:
1786
      if not result[verifier]:
1787
        raise errors.OpExecError("Cannot communicate with %s's node daemon"
1788
                                 " for remote verification" % verifier)
1789
      if result[verifier]['nodelist']:
1790
        for failed in result[verifier]['nodelist']:
1791
          feedback_fn("ssh/hostname verification failed %s -> %s" %
1792
                      (verifier, result[verifier]['nodelist'][failed]))
1793
        raise errors.OpExecError("ssh/hostname verification failed.")
1794

    
1795
    # Distribute updated /etc/hosts and known_hosts to all nodes,
1796
    # including the node just added
1797
    myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1798
    dist_nodes = self.cfg.GetNodeList()
1799
    if not self.op.readd:
1800
      dist_nodes.append(node)
1801
    if myself.name in dist_nodes:
1802
      dist_nodes.remove(myself.name)
1803

    
1804
    logger.Debug("Copying hosts and known_hosts to all nodes")
1805
    for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1806
      result = rpc.call_upload_file(dist_nodes, fname)
1807
      for to_node in dist_nodes:
1808
        if not result[to_node]:
1809
          logger.Error("copy of file %s to node %s failed" %
1810
                       (fname, to_node))
1811

    
1812
    to_copy = self.sstore.GetFileList()
1813
    if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1814
      to_copy.append(constants.VNC_PASSWORD_FILE)
1815
    for fname in to_copy:
1816
      result = rpc.call_upload_file([node], fname)
1817
      if not result[node]:
1818
        logger.Error("could not copy file %s to node %s" % (fname, node))
1819

    
1820
    if self.op.readd:
1821
      self.context.ReaddNode(new_node)
1822
    else:
1823
      self.context.AddNode(new_node)
1824

    
1825

    
1826
class LUQueryClusterInfo(NoHooksLU):
1827
  """Query cluster configuration.
1828

1829
  """
1830
  _OP_REQP = []
1831
  REQ_MASTER = False
1832
  REQ_BGL = False
1833

    
1834
  def ExpandNames(self):
1835
    self.needed_locks = {}
1836

    
1837
  def CheckPrereq(self):
1838
    """No prerequsites needed for this LU.
1839

1840
    """
1841
    pass
1842

    
1843
  def Exec(self, feedback_fn):
1844
    """Return cluster config.
1845

1846
    """
1847
    result = {
1848
      "name": self.sstore.GetClusterName(),
1849
      "software_version": constants.RELEASE_VERSION,
1850
      "protocol_version": constants.PROTOCOL_VERSION,
1851
      "config_version": constants.CONFIG_VERSION,
1852
      "os_api_version": constants.OS_API_VERSION,
1853
      "export_version": constants.EXPORT_VERSION,
1854
      "master": self.sstore.GetMasterNode(),
1855
      "architecture": (platform.architecture()[0], platform.machine()),
1856
      "hypervisor_type": self.sstore.GetHypervisorType(),
1857
      }
1858

    
1859
    return result
1860

    
1861

    
1862
class LUDumpClusterConfig(NoHooksLU):
1863
  """Return a text-representation of the cluster-config.
1864

1865
  """
1866
  _OP_REQP = []
1867
  REQ_BGL = False
1868

    
1869
  def ExpandNames(self):
1870
    self.needed_locks = {}
1871

    
1872
  def CheckPrereq(self):
1873
    """No prerequisites.
1874

1875
    """
1876
    pass
1877

    
1878
  def Exec(self, feedback_fn):
1879
    """Dump a representation of the cluster config to the standard output.
1880

1881
    """
1882
    return self.cfg.DumpConfig()
1883

    
1884

    
1885
class LUActivateInstanceDisks(NoHooksLU):
1886
  """Bring up an instance's disks.
1887

1888
  """
1889
  _OP_REQP = ["instance_name"]
1890
  REQ_BGL = False
1891

    
1892
  def ExpandNames(self):
1893
    self._ExpandAndLockInstance()
1894
    self.needed_locks[locking.LEVEL_NODE] = []
1895
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1896

    
1897
  def DeclareLocks(self, level):
1898
    if level == locking.LEVEL_NODE:
1899
      self._LockInstancesNodes()
1900

    
1901
  def CheckPrereq(self):
1902
    """Check prerequisites.
1903

1904
    This checks that the instance is in the cluster.
1905

1906
    """
1907
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
1908
    assert self.instance is not None, \
1909
      "Cannot retrieve locked instance %s" % self.op.instance_name
1910

    
1911
  def Exec(self, feedback_fn):
1912
    """Activate the disks.
1913

1914
    """
1915
    disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
1916
    if not disks_ok:
1917
      raise errors.OpExecError("Cannot activate block devices")
1918

    
1919
    return disks_info
1920

    
1921

    
1922
def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
1923
  """Prepare the block devices for an instance.
1924

1925
  This sets up the block devices on all nodes.
1926

1927
  Args:
1928
    instance: a ganeti.objects.Instance object
1929
    ignore_secondaries: if true, errors on secondary nodes won't result
1930
                        in an error return from the function
1931

1932
  Returns:
1933
    false if the operation failed
1934
    list of (host, instance_visible_name, node_visible_name) if the operation
1935
         suceeded with the mapping from node devices to instance devices
1936
  """
1937
  device_info = []
1938
  disks_ok = True
1939
  iname = instance.name
1940
  # With the two passes mechanism we try to reduce the window of
1941
  # opportunity for the race condition of switching DRBD to primary
1942
  # before handshaking occured, but we do not eliminate it
1943

    
1944
  # The proper fix would be to wait (with some limits) until the
1945
  # connection has been made and drbd transitions from WFConnection
1946
  # into any other network-connected state (Connected, SyncTarget,
1947
  # SyncSource, etc.)
1948

    
1949
  # 1st pass, assemble on all nodes in secondary mode
1950
  for inst_disk in instance.disks:
1951
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1952
      cfg.SetDiskID(node_disk, node)
1953
      result = rpc.call_blockdev_assemble(node, node_disk, iname, False)
1954
      if not result:
1955
        logger.Error("could not prepare block device %s on node %s"
1956
                     " (is_primary=False, pass=1)" % (inst_disk.iv_name, node))
1957
        if not ignore_secondaries:
1958
          disks_ok = False
1959

    
1960
  # FIXME: race condition on drbd migration to primary
1961

    
1962
  # 2nd pass, do only the primary node
1963
  for inst_disk in instance.disks:
1964
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1965
      if node != instance.primary_node:
1966
        continue
1967
      cfg.SetDiskID(node_disk, node)
1968
      result = rpc.call_blockdev_assemble(node, node_disk, iname, True)
1969
      if not result:
1970
        logger.Error("could not prepare block device %s on node %s"
1971
                     " (is_primary=True, pass=2)" % (inst_disk.iv_name, node))
1972
        disks_ok = False
1973
    device_info.append((instance.primary_node, inst_disk.iv_name, result))
1974

    
1975
  # leave the disks configured for the primary node
1976
  # this is a workaround that would be fixed better by
1977
  # improving the logical/physical id handling
1978
  for disk in instance.disks:
1979
    cfg.SetDiskID(disk, instance.primary_node)
1980

    
1981
  return disks_ok, device_info
1982

    
1983

    
1984
def _StartInstanceDisks(cfg, instance, force):
1985
  """Start the disks of an instance.
1986

1987
  """
1988
  disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
1989
                                           ignore_secondaries=force)
1990
  if not disks_ok:
1991
    _ShutdownInstanceDisks(instance, cfg)
1992
    if force is not None and not force:
1993
      logger.Error("If the message above refers to a secondary node,"
1994
                   " you can retry the operation using '--force'.")
1995
    raise errors.OpExecError("Disk consistency error")
1996

    
1997

    
1998
class LUDeactivateInstanceDisks(NoHooksLU):
1999
  """Shutdown an instance's disks.
2000

2001
  """
2002
  _OP_REQP = ["instance_name"]
2003
  REQ_BGL = False
2004

    
2005
  def ExpandNames(self):
2006
    self._ExpandAndLockInstance()
2007
    self.needed_locks[locking.LEVEL_NODE] = []
2008
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2009

    
2010
  def DeclareLocks(self, level):
2011
    if level == locking.LEVEL_NODE:
2012
      self._LockInstancesNodes()
2013

    
2014
  def CheckPrereq(self):
2015
    """Check prerequisites.
2016

2017
    This checks that the instance is in the cluster.
2018

2019
    """
2020
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2021
    assert self.instance is not None, \
2022
      "Cannot retrieve locked instance %s" % self.op.instance_name
2023

    
2024
  def Exec(self, feedback_fn):
2025
    """Deactivate the disks
2026

2027
    """
2028
    instance = self.instance
2029
    _SafeShutdownInstanceDisks(instance, self.cfg)
2030

    
2031

    
2032
def _SafeShutdownInstanceDisks(instance, cfg):
2033
  """Shutdown block devices of an instance.
2034

2035
  This function checks if an instance is running, before calling
2036
  _ShutdownInstanceDisks.
2037

2038
  """
2039
  ins_l = rpc.call_instance_list([instance.primary_node])
2040
  ins_l = ins_l[instance.primary_node]
2041
  if not type(ins_l) is list:
2042
    raise errors.OpExecError("Can't contact node '%s'" %
2043
                             instance.primary_node)
2044

    
2045
  if instance.name in ins_l:
2046
    raise errors.OpExecError("Instance is running, can't shutdown"
2047
                             " block devices.")
2048

    
2049
  _ShutdownInstanceDisks(instance, cfg)
2050

    
2051

    
2052
def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
2053
  """Shutdown block devices of an instance.
2054

2055
  This does the shutdown on all nodes of the instance.
2056

2057
  If the ignore_primary is false, errors on the primary node are
2058
  ignored.
2059

2060
  """
2061
  result = True
2062
  for disk in instance.disks:
2063
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2064
      cfg.SetDiskID(top_disk, node)
2065
      if not rpc.call_blockdev_shutdown(node, top_disk):
2066
        logger.Error("could not shutdown block device %s on node %s" %
2067
                     (disk.iv_name, node))
2068
        if not ignore_primary or node != instance.primary_node:
2069
          result = False
2070
  return result
2071

    
2072

    
2073
def _CheckNodeFreeMemory(cfg, node, reason, requested):
2074
  """Checks if a node has enough free memory.
2075

2076
  This function check if a given node has the needed amount of free
2077
  memory. In case the node has less memory or we cannot get the
2078
  information from the node, this function raise an OpPrereqError
2079
  exception.
2080

2081
  Args:
2082
    - cfg: a ConfigWriter instance
2083
    - node: the node name
2084
    - reason: string to use in the error message
2085
    - requested: the amount of memory in MiB
2086

2087
  """
2088
  nodeinfo = rpc.call_node_info([node], cfg.GetVGName())
2089
  if not nodeinfo or not isinstance(nodeinfo, dict):
2090
    raise errors.OpPrereqError("Could not contact node %s for resource"
2091
                             " information" % (node,))
2092

    
2093
  free_mem = nodeinfo[node].get('memory_free')
2094
  if not isinstance(free_mem, int):
2095
    raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2096
                             " was '%s'" % (node, free_mem))
2097
  if requested > free_mem:
2098
    raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2099
                             " needed %s MiB, available %s MiB" %
2100
                             (node, reason, requested, free_mem))
2101

    
2102

    
2103
class LUStartupInstance(LogicalUnit):
2104
  """Starts an instance.
2105

2106
  """
2107
  HPATH = "instance-start"
2108
  HTYPE = constants.HTYPE_INSTANCE
2109
  _OP_REQP = ["instance_name", "force"]
2110
  REQ_BGL = False
2111

    
2112
  def ExpandNames(self):
2113
    self._ExpandAndLockInstance()
2114
    self.needed_locks[locking.LEVEL_NODE] = []
2115
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2116

    
2117
  def DeclareLocks(self, level):
2118
    if level == locking.LEVEL_NODE:
2119
      self._LockInstancesNodes()
2120

    
2121
  def BuildHooksEnv(self):
2122
    """Build hooks env.
2123

2124
    This runs on master, primary and secondary nodes of the instance.
2125

2126
    """
2127
    env = {
2128
      "FORCE": self.op.force,
2129
      }
2130
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2131
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2132
          list(self.instance.secondary_nodes))
2133
    return env, nl, nl
2134

    
2135
  def CheckPrereq(self):
2136
    """Check prerequisites.
2137

2138
    This checks that the instance is in the cluster.
2139

2140
    """
2141
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2142
    assert self.instance is not None, \
2143
      "Cannot retrieve locked instance %s" % self.op.instance_name
2144

    
2145
    # check bridges existance
2146
    _CheckInstanceBridgesExist(instance)
2147

    
2148
    _CheckNodeFreeMemory(self.cfg, instance.primary_node,
2149
                         "starting instance %s" % instance.name,
2150
                         instance.memory)
2151

    
2152
  def Exec(self, feedback_fn):
2153
    """Start the instance.
2154

2155
    """
2156
    instance = self.instance
2157
    force = self.op.force
2158
    extra_args = getattr(self.op, "extra_args", "")
2159

    
2160
    self.cfg.MarkInstanceUp(instance.name)
2161

    
2162
    node_current = instance.primary_node
2163

    
2164
    _StartInstanceDisks(self.cfg, instance, force)
2165

    
2166
    if not rpc.call_instance_start(node_current, instance, extra_args):
2167
      _ShutdownInstanceDisks(instance, self.cfg)
2168
      raise errors.OpExecError("Could not start instance")
2169

    
2170

    
2171
class LURebootInstance(LogicalUnit):
2172
  """Reboot an instance.
2173

2174
  """
2175
  HPATH = "instance-reboot"
2176
  HTYPE = constants.HTYPE_INSTANCE
2177
  _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2178
  REQ_BGL = False
2179

    
2180
  def ExpandNames(self):
2181
    if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2182
                                   constants.INSTANCE_REBOOT_HARD,
2183
                                   constants.INSTANCE_REBOOT_FULL]:
2184
      raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2185
                                  (constants.INSTANCE_REBOOT_SOFT,
2186
                                   constants.INSTANCE_REBOOT_HARD,
2187
                                   constants.INSTANCE_REBOOT_FULL))
2188
    self._ExpandAndLockInstance()
2189
    self.needed_locks[locking.LEVEL_NODE] = []
2190
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2191

    
2192
  def DeclareLocks(self, level):
2193
    if level == locking.LEVEL_NODE:
2194
      primary_only = not constants.INSTANCE_REBOOT_FULL
2195
      self._LockInstancesNodes(primary_only=primary_only)
2196

    
2197
  def BuildHooksEnv(self):
2198
    """Build hooks env.
2199

2200
    This runs on master, primary and secondary nodes of the instance.
2201

2202
    """
2203
    env = {
2204
      "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2205
      }
2206
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2207
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2208
          list(self.instance.secondary_nodes))
2209
    return env, nl, nl
2210

    
2211
  def CheckPrereq(self):
2212
    """Check prerequisites.
2213

2214
    This checks that the instance is in the cluster.
2215

2216
    """
2217
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2218
    assert self.instance is not None, \
2219
      "Cannot retrieve locked instance %s" % self.op.instance_name
2220

    
2221
    # check bridges existance
2222
    _CheckInstanceBridgesExist(instance)
2223

    
2224
  def Exec(self, feedback_fn):
2225
    """Reboot the instance.
2226

2227
    """
2228
    instance = self.instance
2229
    ignore_secondaries = self.op.ignore_secondaries
2230
    reboot_type = self.op.reboot_type
2231
    extra_args = getattr(self.op, "extra_args", "")
2232

    
2233
    node_current = instance.primary_node
2234

    
2235
    if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2236
                       constants.INSTANCE_REBOOT_HARD]:
2237
      if not rpc.call_instance_reboot(node_current, instance,
2238
                                      reboot_type, extra_args):
2239
        raise errors.OpExecError("Could not reboot instance")
2240
    else:
2241
      if not rpc.call_instance_shutdown(node_current, instance):
2242
        raise errors.OpExecError("could not shutdown instance for full reboot")
2243
      _ShutdownInstanceDisks(instance, self.cfg)
2244
      _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
2245
      if not rpc.call_instance_start(node_current, instance, extra_args):
2246
        _ShutdownInstanceDisks(instance, self.cfg)
2247
        raise errors.OpExecError("Could not start instance for full reboot")
2248

    
2249
    self.cfg.MarkInstanceUp(instance.name)
2250

    
2251

    
2252
class LUShutdownInstance(LogicalUnit):
2253
  """Shutdown an instance.
2254

2255
  """
2256
  HPATH = "instance-stop"
2257
  HTYPE = constants.HTYPE_INSTANCE
2258
  _OP_REQP = ["instance_name"]
2259
  REQ_BGL = False
2260

    
2261
  def ExpandNames(self):
2262
    self._ExpandAndLockInstance()
2263
    self.needed_locks[locking.LEVEL_NODE] = []
2264
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2265

    
2266
  def DeclareLocks(self, level):
2267
    if level == locking.LEVEL_NODE:
2268
      self._LockInstancesNodes()
2269

    
2270
  def BuildHooksEnv(self):
2271
    """Build hooks env.
2272

2273
    This runs on master, primary and secondary nodes of the instance.
2274

2275
    """
2276
    env = _BuildInstanceHookEnvByObject(self.instance)
2277
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2278
          list(self.instance.secondary_nodes))
2279
    return env, nl, nl
2280

    
2281
  def CheckPrereq(self):
2282
    """Check prerequisites.
2283

2284
    This checks that the instance is in the cluster.
2285

2286
    """
2287
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2288
    assert self.instance is not None, \
2289
      "Cannot retrieve locked instance %s" % self.op.instance_name
2290

    
2291
  def Exec(self, feedback_fn):
2292
    """Shutdown the instance.
2293

2294
    """
2295
    instance = self.instance
2296
    node_current = instance.primary_node
2297
    self.cfg.MarkInstanceDown(instance.name)
2298
    if not rpc.call_instance_shutdown(node_current, instance):
2299
      logger.Error("could not shutdown instance")
2300

    
2301
    _ShutdownInstanceDisks(instance, self.cfg)
2302

    
2303

    
2304
class LUReinstallInstance(LogicalUnit):
2305
  """Reinstall an instance.
2306

2307
  """
2308
  HPATH = "instance-reinstall"
2309
  HTYPE = constants.HTYPE_INSTANCE
2310
  _OP_REQP = ["instance_name"]
2311
  REQ_BGL = False
2312

    
2313
  def ExpandNames(self):
2314
    self._ExpandAndLockInstance()
2315
    self.needed_locks[locking.LEVEL_NODE] = []
2316
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2317

    
2318
  def DeclareLocks(self, level):
2319
    if level == locking.LEVEL_NODE:
2320
      self._LockInstancesNodes()
2321

    
2322
  def BuildHooksEnv(self):
2323
    """Build hooks env.
2324

2325
    This runs on master, primary and secondary nodes of the instance.
2326

2327
    """
2328
    env = _BuildInstanceHookEnvByObject(self.instance)
2329
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2330
          list(self.instance.secondary_nodes))
2331
    return env, nl, nl
2332

    
2333
  def CheckPrereq(self):
2334
    """Check prerequisites.
2335

2336
    This checks that the instance is in the cluster and is not running.
2337

2338
    """
2339
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2340
    assert instance is not None, \
2341
      "Cannot retrieve locked instance %s" % self.op.instance_name
2342

    
2343
    if instance.disk_template == constants.DT_DISKLESS:
2344
      raise errors.OpPrereqError("Instance '%s' has no disks" %
2345
                                 self.op.instance_name)
2346
    if instance.status != "down":
2347
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2348
                                 self.op.instance_name)
2349
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2350
    if remote_info:
2351
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2352
                                 (self.op.instance_name,
2353
                                  instance.primary_node))
2354

    
2355
    self.op.os_type = getattr(self.op, "os_type", None)
2356
    if self.op.os_type is not None:
2357
      # OS verification
2358
      pnode = self.cfg.GetNodeInfo(
2359
        self.cfg.ExpandNodeName(instance.primary_node))
2360
      if pnode is None:
2361
        raise errors.OpPrereqError("Primary node '%s' is unknown" %
2362
                                   self.op.pnode)
2363
      os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2364
      if not os_obj:
2365
        raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2366
                                   " primary node"  % self.op.os_type)
2367

    
2368
    self.instance = instance
2369

    
2370
  def Exec(self, feedback_fn):
2371
    """Reinstall the instance.
2372

2373
    """
2374
    inst = self.instance
2375

    
2376
    if self.op.os_type is not None:
2377
      feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2378
      inst.os = self.op.os_type
2379
      self.cfg.Update(inst)
2380

    
2381
    _StartInstanceDisks(self.cfg, inst, None)
2382
    try:
2383
      feedback_fn("Running the instance OS create scripts...")
2384
      if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2385
        raise errors.OpExecError("Could not install OS for instance %s"
2386
                                 " on node %s" %
2387
                                 (inst.name, inst.primary_node))
2388
    finally:
2389
      _ShutdownInstanceDisks(inst, self.cfg)
2390

    
2391

    
2392
class LURenameInstance(LogicalUnit):
2393
  """Rename an instance.
2394

2395
  """
2396
  HPATH = "instance-rename"
2397
  HTYPE = constants.HTYPE_INSTANCE
2398
  _OP_REQP = ["instance_name", "new_name"]
2399

    
2400
  def BuildHooksEnv(self):
2401
    """Build hooks env.
2402

2403
    This runs on master, primary and secondary nodes of the instance.
2404

2405
    """
2406
    env = _BuildInstanceHookEnvByObject(self.instance)
2407
    env["INSTANCE_NEW_NAME"] = self.op.new_name
2408
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2409
          list(self.instance.secondary_nodes))
2410
    return env, nl, nl
2411

    
2412
  def CheckPrereq(self):
2413
    """Check prerequisites.
2414

2415
    This checks that the instance is in the cluster and is not running.
2416

2417
    """
2418
    instance = self.cfg.GetInstanceInfo(
2419
      self.cfg.ExpandInstanceName(self.op.instance_name))
2420
    if instance is None:
2421
      raise errors.OpPrereqError("Instance '%s' not known" %
2422
                                 self.op.instance_name)
2423
    if instance.status != "down":
2424
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2425
                                 self.op.instance_name)
2426
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2427
    if remote_info:
2428
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2429
                                 (self.op.instance_name,
2430
                                  instance.primary_node))
2431
    self.instance = instance
2432

    
2433
    # new name verification
2434
    name_info = utils.HostInfo(self.op.new_name)
2435

    
2436
    self.op.new_name = new_name = name_info.name
2437
    instance_list = self.cfg.GetInstanceList()
2438
    if new_name in instance_list:
2439
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2440
                                 new_name)
2441

    
2442
    if not getattr(self.op, "ignore_ip", False):
2443
      if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
2444
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
2445
                                   (name_info.ip, new_name))
2446

    
2447

    
2448
  def Exec(self, feedback_fn):
2449
    """Reinstall the instance.
2450

2451
    """
2452
    inst = self.instance
2453
    old_name = inst.name
2454

    
2455
    if inst.disk_template == constants.DT_FILE:
2456
      old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2457

    
2458
    self.cfg.RenameInstance(inst.name, self.op.new_name)
2459
    # Change the instance lock. This is definitely safe while we hold the BGL
2460
    self.context.glm.remove(locking.LEVEL_INSTANCE, inst.name)
2461
    self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
2462

    
2463
    # re-read the instance from the configuration after rename
2464
    inst = self.cfg.GetInstanceInfo(self.op.new_name)
2465

    
2466
    if inst.disk_template == constants.DT_FILE:
2467
      new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2468
      result = rpc.call_file_storage_dir_rename(inst.primary_node,
2469
                                                old_file_storage_dir,
2470
                                                new_file_storage_dir)
2471

    
2472
      if not result:
2473
        raise errors.OpExecError("Could not connect to node '%s' to rename"
2474
                                 " directory '%s' to '%s' (but the instance"
2475
                                 " has been renamed in Ganeti)" % (
2476
                                 inst.primary_node, old_file_storage_dir,
2477
                                 new_file_storage_dir))
2478

    
2479
      if not result[0]:
2480
        raise errors.OpExecError("Could not rename directory '%s' to '%s'"
2481
                                 " (but the instance has been renamed in"
2482
                                 " Ganeti)" % (old_file_storage_dir,
2483
                                               new_file_storage_dir))
2484

    
2485
    _StartInstanceDisks(self.cfg, inst, None)
2486
    try:
2487
      if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2488
                                          "sda", "sdb"):
2489
        msg = ("Could not run OS rename script for instance %s on node %s"
2490
               " (but the instance has been renamed in Ganeti)" %
2491
               (inst.name, inst.primary_node))
2492
        logger.Error(msg)
2493
    finally:
2494
      _ShutdownInstanceDisks(inst, self.cfg)
2495

    
2496

    
2497
class LURemoveInstance(LogicalUnit):
2498
  """Remove an instance.
2499

2500
  """
2501
  HPATH = "instance-remove"
2502
  HTYPE = constants.HTYPE_INSTANCE
2503
  _OP_REQP = ["instance_name", "ignore_failures"]
2504
  REQ_BGL = False
2505

    
2506
  def ExpandNames(self):
2507
    self._ExpandAndLockInstance()
2508
    self.needed_locks[locking.LEVEL_NODE] = []
2509
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2510

    
2511
  def DeclareLocks(self, level):
2512
    if level == locking.LEVEL_NODE:
2513
      self._LockInstancesNodes()
2514

    
2515
  def BuildHooksEnv(self):
2516
    """Build hooks env.
2517

2518
    This runs on master, primary and secondary nodes of the instance.
2519

2520
    """
2521
    env = _BuildInstanceHookEnvByObject(self.instance)
2522
    nl = [self.sstore.GetMasterNode()]
2523
    return env, nl, nl
2524

    
2525
  def CheckPrereq(self):
2526
    """Check prerequisites.
2527

2528
    This checks that the instance is in the cluster.
2529

2530
    """
2531
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2532
    assert self.instance is not None, \
2533
      "Cannot retrieve locked instance %s" % self.op.instance_name
2534

    
2535
  def Exec(self, feedback_fn):
2536
    """Remove the instance.
2537

2538
    """
2539
    instance = self.instance
2540
    logger.Info("shutting down instance %s on node %s" %
2541
                (instance.name, instance.primary_node))
2542

    
2543
    if not rpc.call_instance_shutdown(instance.primary_node, instance):
2544
      if self.op.ignore_failures:
2545
        feedback_fn("Warning: can't shutdown instance")
2546
      else:
2547
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2548
                                 (instance.name, instance.primary_node))
2549

    
2550
    logger.Info("removing block devices for instance %s" % instance.name)
2551

    
2552
    if not _RemoveDisks(instance, self.cfg):
2553
      if self.op.ignore_failures:
2554
        feedback_fn("Warning: can't remove instance's disks")
2555
      else:
2556
        raise errors.OpExecError("Can't remove instance's disks")
2557

    
2558
    logger.Info("removing instance %s out of cluster config" % instance.name)
2559

    
2560
    self.cfg.RemoveInstance(instance.name)
2561
    self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
2562

    
2563

    
2564
class LUQueryInstances(NoHooksLU):
2565
  """Logical unit for querying instances.
2566

2567
  """
2568
  _OP_REQP = ["output_fields", "names"]
2569
  REQ_BGL = False
2570

    
2571
  def ExpandNames(self):
2572
    self.dynamic_fields = frozenset(["oper_state", "oper_ram", "status"])
2573
    self.static_fields = frozenset([
2574
      "name", "os", "pnode", "snodes",
2575
      "admin_state", "admin_ram",
2576
      "disk_template", "ip", "mac", "bridge",
2577
      "sda_size", "sdb_size", "vcpus", "tags",
2578
      "network_port", "kernel_path", "initrd_path",
2579
      "hvm_boot_order", "hvm_acpi", "hvm_pae",
2580
      "hvm_cdrom_image_path", "hvm_nic_type",
2581
      "hvm_disk_type", "vnc_bind_address",
2582
      "serial_no",
2583
      ])
2584
    _CheckOutputFields(static=self.static_fields,
2585
                       dynamic=self.dynamic_fields,
2586
                       selected=self.op.output_fields)
2587

    
2588
    self.needed_locks = {}
2589
    self.share_locks[locking.LEVEL_INSTANCE] = 1
2590
    self.share_locks[locking.LEVEL_NODE] = 1
2591

    
2592
    if self.op.names:
2593
      self.wanted = _GetWantedInstances(self, self.op.names)
2594
    else:
2595
      self.wanted = locking.ALL_SET
2596

    
2597
    self.do_locking = not self.static_fields.issuperset(self.op.output_fields)
2598
    if self.do_locking:
2599
      self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
2600
      self.needed_locks[locking.LEVEL_NODE] = []
2601
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2602

    
2603
  def DeclareLocks(self, level):
2604
    if level == locking.LEVEL_NODE and self.do_locking:
2605
      self._LockInstancesNodes()
2606

    
2607
  def CheckPrereq(self):
2608
    """Check prerequisites.
2609

2610
    """
2611
    pass
2612

    
2613
  def Exec(self, feedback_fn):
2614
    """Computes the list of nodes and their attributes.
2615

2616
    """
2617
    all_info = self.cfg.GetAllInstancesInfo()
2618
    if self.do_locking:
2619
      instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
2620
    elif self.wanted != locking.ALL_SET:
2621
      instance_names = self.wanted
2622
      missing = set(instance_names).difference(all_info.keys())
2623
      if missing:
2624
        raise self.OpExecError(
2625
          "Some instances were removed before retrieving their data: %s"
2626
          % missing)
2627
    else:
2628
      instance_names = all_info.keys()
2629
    instance_list = [all_info[iname] for iname in instance_names]
2630

    
2631
    # begin data gathering
2632

    
2633
    nodes = frozenset([inst.primary_node for inst in instance_list])
2634

    
2635
    bad_nodes = []
2636
    if self.dynamic_fields.intersection(self.op.output_fields):
2637
      live_data = {}
2638
      node_data = rpc.call_all_instances_info(nodes)
2639
      for name in nodes:
2640
        result = node_data[name]
2641
        if result:
2642
          live_data.update(result)
2643
        elif result == False:
2644
          bad_nodes.append(name)
2645
        # else no instance is alive
2646
    else:
2647
      live_data = dict([(name, {}) for name in instance_names])
2648

    
2649
    # end data gathering
2650

    
2651
    output = []
2652
    for instance in instance_list:
2653
      iout = []
2654
      for field in self.op.output_fields:
2655
        if field == "name":
2656
          val = instance.name
2657
        elif field == "os":
2658
          val = instance.os
2659
        elif field == "pnode":
2660
          val = instance.primary_node
2661
        elif field == "snodes":
2662
          val = list(instance.secondary_nodes)
2663
        elif field == "admin_state":
2664
          val = (instance.status != "down")
2665
        elif field == "oper_state":
2666
          if instance.primary_node in bad_nodes:
2667
            val = None
2668
          else:
2669
            val = bool(live_data.get(instance.name))
2670
        elif field == "status":
2671
          if instance.primary_node in bad_nodes:
2672
            val = "ERROR_nodedown"
2673
          else:
2674
            running = bool(live_data.get(instance.name))
2675
            if running:
2676
              if instance.status != "down":
2677
                val = "running"
2678
              else:
2679
                val = "ERROR_up"
2680
            else:
2681
              if instance.status != "down":
2682
                val = "ERROR_down"
2683
              else:
2684
                val = "ADMIN_down"
2685
        elif field == "admin_ram":
2686
          val = instance.memory
2687
        elif field == "oper_ram":
2688
          if instance.primary_node in bad_nodes:
2689
            val = None
2690
          elif instance.name in live_data:
2691
            val = live_data[instance.name].get("memory", "?")
2692
          else:
2693
            val = "-"
2694
        elif field == "disk_template":
2695
          val = instance.disk_template
2696
        elif field == "ip":
2697
          val = instance.nics[0].ip
2698
        elif field == "bridge":
2699
          val = instance.nics[0].bridge
2700
        elif field == "mac":
2701
          val = instance.nics[0].mac
2702
        elif field == "sda_size" or field == "sdb_size":
2703
          disk = instance.FindDisk(field[:3])
2704
          if disk is None:
2705
            val = None
2706
          else:
2707
            val = disk.size
2708
        elif field == "vcpus":
2709
          val = instance.vcpus
2710
        elif field == "tags":
2711
          val = list(instance.GetTags())
2712
        elif field == "serial_no":
2713
          val = instance.serial_no
2714
        elif field in ("network_port", "kernel_path", "initrd_path",
2715
                       "hvm_boot_order", "hvm_acpi", "hvm_pae",
2716
                       "hvm_cdrom_image_path", "hvm_nic_type",
2717
                       "hvm_disk_type", "vnc_bind_address"):
2718
          val = getattr(instance, field, None)
2719
          if val is not None:
2720
            pass
2721
          elif field in ("hvm_nic_type", "hvm_disk_type",
2722
                         "kernel_path", "initrd_path"):
2723
            val = "default"
2724
          else:
2725
            val = "-"
2726
        else:
2727
          raise errors.ParameterError(field)
2728
        iout.append(val)
2729
      output.append(iout)
2730

    
2731
    return output
2732

    
2733

    
2734
class LUFailoverInstance(LogicalUnit):
2735
  """Failover an instance.
2736

2737
  """
2738
  HPATH = "instance-failover"
2739
  HTYPE = constants.HTYPE_INSTANCE
2740
  _OP_REQP = ["instance_name", "ignore_consistency"]
2741
  REQ_BGL = False
2742

    
2743
  def ExpandNames(self):
2744
    self._ExpandAndLockInstance()
2745
    self.needed_locks[locking.LEVEL_NODE] = []
2746
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2747

    
2748
  def DeclareLocks(self, level):
2749
    if level == locking.LEVEL_NODE:
2750
      self._LockInstancesNodes()
2751

    
2752
  def BuildHooksEnv(self):
2753
    """Build hooks env.
2754

2755
    This runs on master, primary and secondary nodes of the instance.
2756

2757
    """
2758
    env = {
2759
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2760
      }
2761
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2762
    nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2763
    return env, nl, nl
2764

    
2765
  def CheckPrereq(self):
2766
    """Check prerequisites.
2767

2768
    This checks that the instance is in the cluster.
2769

2770
    """
2771
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2772
    assert self.instance is not None, \
2773
      "Cannot retrieve locked instance %s" % self.op.instance_name
2774

    
2775
    if instance.disk_template not in constants.DTS_NET_MIRROR:
2776
      raise errors.OpPrereqError("Instance's disk layout is not"
2777
                                 " network mirrored, cannot failover.")
2778

    
2779
    secondary_nodes = instance.secondary_nodes
2780
    if not secondary_nodes:
2781
      raise errors.ProgrammerError("no secondary node but using "
2782
                                   "a mirrored disk template")
2783

    
2784
    target_node = secondary_nodes[0]
2785
    # check memory requirements on the secondary node
2786
    _CheckNodeFreeMemory(self.cfg, target_node, "failing over instance %s" %
2787
                         instance.name, instance.memory)
2788

    
2789
    # check bridge existance
2790
    brlist = [nic.bridge for nic in instance.nics]
2791
    if not rpc.call_bridges_exist(target_node, brlist):
2792
      raise errors.OpPrereqError("One or more target bridges %s does not"
2793
                                 " exist on destination node '%s'" %
2794
                                 (brlist, target_node))
2795

    
2796
  def Exec(self, feedback_fn):
2797
    """Failover an instance.
2798

2799
    The failover is done by shutting it down on its present node and
2800
    starting it on the secondary.
2801

2802
    """
2803
    instance = self.instance
2804

    
2805
    source_node = instance.primary_node
2806
    target_node = instance.secondary_nodes[0]
2807

    
2808
    feedback_fn("* checking disk consistency between source and target")
2809
    for dev in instance.disks:
2810
      # for drbd, these are drbd over lvm
2811
      if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2812
        if instance.status == "up" and not self.op.ignore_consistency:
2813
          raise errors.OpExecError("Disk %s is degraded on target node,"
2814
                                   " aborting failover." % dev.iv_name)
2815

    
2816
    feedback_fn("* shutting down instance on source node")
2817
    logger.Info("Shutting down instance %s on node %s" %
2818
                (instance.name, source_node))
2819

    
2820
    if not rpc.call_instance_shutdown(source_node, instance):
2821
      if self.op.ignore_consistency:
2822
        logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2823
                     " anyway. Please make sure node %s is down"  %
2824
                     (instance.name, source_node, source_node))
2825
      else:
2826
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2827
                                 (instance.name, source_node))
2828

    
2829
    feedback_fn("* deactivating the instance's disks on source node")
2830
    if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2831
      raise errors.OpExecError("Can't shut down the instance's disks.")
2832

    
2833
    instance.primary_node = target_node
2834
    # distribute new instance config to the other nodes
2835
    self.cfg.Update(instance)
2836

    
2837
    # Only start the instance if it's marked as up
2838
    if instance.status == "up":
2839
      feedback_fn("* activating the instance's disks on target node")
2840
      logger.Info("Starting instance %s on node %s" %
2841
                  (instance.name, target_node))
2842

    
2843
      disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2844
                                               ignore_secondaries=True)
2845
      if not disks_ok:
2846
        _ShutdownInstanceDisks(instance, self.cfg)
2847
        raise errors.OpExecError("Can't activate the instance's disks")
2848

    
2849
      feedback_fn("* starting the instance on the target node")
2850
      if not rpc.call_instance_start(target_node, instance, None):
2851
        _ShutdownInstanceDisks(instance, self.cfg)
2852
        raise errors.OpExecError("Could not start instance %s on node %s." %
2853
                                 (instance.name, target_node))
2854

    
2855

    
2856
def _CreateBlockDevOnPrimary(cfg, node, instance, device, info):
2857
  """Create a tree of block devices on the primary node.
2858

2859
  This always creates all devices.
2860

2861
  """
2862
  if device.children:
2863
    for child in device.children:
2864
      if not _CreateBlockDevOnPrimary(cfg, node, instance, child, info):
2865
        return False
2866

    
2867
  cfg.SetDiskID(device, node)
2868
  new_id = rpc.call_blockdev_create(node, device, device.size,
2869
                                    instance.name, True, info)
2870
  if not new_id:
2871
    return False
2872
  if device.physical_id is None:
2873
    device.physical_id = new_id
2874
  return True
2875

    
2876

    
2877
def _CreateBlockDevOnSecondary(cfg, node, instance, device, force, info):
2878
  """Create a tree of block devices on a secondary node.
2879

2880
  If this device type has to be created on secondaries, create it and
2881
  all its children.
2882

2883
  If not, just recurse to children keeping the same 'force' value.
2884

2885
  """
2886
  if device.CreateOnSecondary():
2887
    force = True
2888
  if device.children:
2889
    for child in device.children:
2890
      if not _CreateBlockDevOnSecondary(cfg, node, instance,
2891
                                        child, force, info):
2892
        return False
2893

    
2894
  if not force:
2895
    return True
2896
  cfg.SetDiskID(device, node)
2897
  new_id = rpc.call_blockdev_create(node, device, device.size,
2898
                                    instance.name, False, info)
2899
  if not new_id:
2900
    return False
2901
  if device.physical_id is None:
2902
    device.physical_id = new_id
2903
  return True
2904

    
2905

    
2906
def _GenerateUniqueNames(cfg, exts):
2907
  """Generate a suitable LV name.
2908

2909
  This will generate a logical volume name for the given instance.
2910

2911
  """
2912
  results = []
2913
  for val in exts:
2914
    new_id = cfg.GenerateUniqueID()
2915
    results.append("%s%s" % (new_id, val))
2916
  return results
2917

    
2918

    
2919
def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name,
2920
                         p_minor, s_minor):
2921
  """Generate a drbd8 device complete with its children.
2922

2923
  """
2924
  port = cfg.AllocatePort()
2925
  vgname = cfg.GetVGName()
2926
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2927
                          logical_id=(vgname, names[0]))
2928
  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2929
                          logical_id=(vgname, names[1]))
2930
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
2931
                          logical_id=(primary, secondary, port,
2932
                                      p_minor, s_minor),
2933
                          children=[dev_data, dev_meta],
2934
                          iv_name=iv_name)
2935
  return drbd_dev
2936

    
2937

    
2938
def _GenerateDiskTemplate(cfg, template_name,
2939
                          instance_name, primary_node,
2940
                          secondary_nodes, disk_sz, swap_sz,
2941
                          file_storage_dir, file_driver):
2942
  """Generate the entire disk layout for a given template type.
2943

2944
  """
2945
  #TODO: compute space requirements
2946

    
2947
  vgname = cfg.GetVGName()
2948
  if template_name == constants.DT_DISKLESS:
2949
    disks = []
2950
  elif template_name == constants.DT_PLAIN:
2951
    if len(secondary_nodes) != 0:
2952
      raise errors.ProgrammerError("Wrong template configuration")
2953

    
2954
    names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
2955
    sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2956
                           logical_id=(vgname, names[0]),
2957
                           iv_name = "sda")
2958
    sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2959
                           logical_id=(vgname, names[1]),
2960
                           iv_name = "sdb")
2961
    disks = [sda_dev, sdb_dev]
2962
  elif template_name == constants.DT_DRBD8:
2963
    if len(secondary_nodes) != 1:
2964
      raise errors.ProgrammerError("Wrong template configuration")
2965
    remote_node = secondary_nodes[0]
2966
    (minor_pa, minor_pb,
2967
     minor_sa, minor_sb) = cfg.AllocateDRBDMinor(
2968
      [primary_node, primary_node, remote_node, remote_node], instance_name)
2969

    
2970
    names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2971
                                       ".sdb_data", ".sdb_meta"])
2972
    drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2973
                                        disk_sz, names[0:2], "sda",
2974
                                        minor_pa, minor_sa)
2975
    drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2976
                                        swap_sz, names[2:4], "sdb",
2977
                                        minor_pb, minor_sb)
2978
    disks = [drbd_sda_dev, drbd_sdb_dev]
2979
  elif template_name == constants.DT_FILE:
2980
    if len(secondary_nodes) != 0:
2981
      raise errors.ProgrammerError("Wrong template configuration")
2982

    
2983
    file_sda_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk_sz,
2984
                                iv_name="sda", logical_id=(file_driver,
2985
                                "%s/sda" % file_storage_dir))
2986
    file_sdb_dev = objects.Disk(dev_type=constants.LD_FILE, size=swap_sz,
2987
                                iv_name="sdb", logical_id=(file_driver,
2988
                                "%s/sdb" % file_storage_dir))
2989
    disks = [file_sda_dev, file_sdb_dev]
2990
  else:
2991
    raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
2992
  return disks
2993

    
2994

    
2995
def _GetInstanceInfoText(instance):
2996
  """Compute that text that should be added to the disk's metadata.
2997

2998
  """
2999
  return "originstname+%s" % instance.name
3000

    
3001

    
3002
def _CreateDisks(cfg, instance):
3003
  """Create all disks for an instance.
3004

3005
  This abstracts away some work from AddInstance.
3006

3007
  Args:
3008
    instance: the instance object
3009

3010
  Returns:
3011
    True or False showing the success of the creation process
3012

3013
  """
3014
  info = _GetInstanceInfoText(instance)
3015

    
3016
  if instance.disk_template == constants.DT_FILE:
3017
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3018
    result = rpc.call_file_storage_dir_create(instance.primary_node,
3019
                                              file_storage_dir)
3020

    
3021
    if not result:
3022
      logger.Error("Could not connect to node '%s'" % instance.primary_node)
3023
      return False
3024

    
3025
    if not result[0]:
3026
      logger.Error("failed to create directory '%s'" % file_storage_dir)
3027
      return False
3028

    
3029
  for device in instance.disks:
3030
    logger.Info("creating volume %s for instance %s" %
3031
                (device.iv_name, instance.name))
3032
    #HARDCODE
3033
    for secondary_node in instance.secondary_nodes:
3034
      if not _CreateBlockDevOnSecondary(cfg, secondary_node, instance,
3035
                                        device, False, info):
3036
        logger.Error("failed to create volume %s (%s) on secondary node %s!" %
3037
                     (device.iv_name, device, secondary_node))
3038
        return False
3039
    #HARDCODE
3040
    if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
3041
                                    instance, device, info):
3042
      logger.Error("failed to create volume %s on primary!" %
3043
                   device.iv_name)
3044
      return False
3045

    
3046
  return True
3047

    
3048

    
3049
def _RemoveDisks(instance, cfg):
3050
  """Remove all disks for an instance.
3051

3052
  This abstracts away some work from `AddInstance()` and
3053
  `RemoveInstance()`. Note that in case some of the devices couldn't
3054
  be removed, the removal will continue with the other ones (compare
3055
  with `_CreateDisks()`).
3056

3057
  Args:
3058
    instance: the instance object
3059

3060
  Returns:
3061
    True or False showing the success of the removal proces
3062

3063
  """
3064
  logger.Info("removing block devices for instance %s" % instance.name)
3065

    
3066
  result = True
3067
  for device in instance.disks:
3068
    for node, disk in device.ComputeNodeTree(instance.primary_node):
3069
      cfg.SetDiskID(disk, node)
3070
      if not rpc.call_blockdev_remove(node, disk):
3071
        logger.Error("could not remove block device %s on node %s,"
3072
                     " continuing anyway" %
3073
                     (device.iv_name, node))
3074
        result = False
3075

    
3076
  if instance.disk_template == constants.DT_FILE:
3077
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3078
    if not rpc.call_file_storage_dir_remove(instance.primary_node,
3079
                                            file_storage_dir):
3080
      logger.Error("could not remove directory '%s'" % file_storage_dir)
3081
      result = False
3082

    
3083
  return result
3084

    
3085

    
3086
def _ComputeDiskSize(disk_template, disk_size, swap_size):
3087
  """Compute disk size requirements in the volume group
3088

3089
  This is currently hard-coded for the two-drive layout.
3090

3091
  """
3092
  # Required free disk space as a function of disk and swap space
3093
  req_size_dict = {
3094
    constants.DT_DISKLESS: None,
3095
    constants.DT_PLAIN: disk_size + swap_size,
3096
    # 256 MB are added for drbd metadata, 128MB for each drbd device
3097
    constants.DT_DRBD8: disk_size + swap_size + 256,
3098
    constants.DT_FILE: None,
3099
  }
3100

    
3101
  if disk_template not in req_size_dict:
3102
    raise errors.ProgrammerError("Disk template '%s' size requirement"
3103
                                 " is unknown" %  disk_template)
3104

    
3105
  return req_size_dict[disk_template]
3106

    
3107

    
3108
class LUCreateInstance(LogicalUnit):
3109
  """Create an instance.
3110

3111
  """
3112
  HPATH = "instance-add"
3113
  HTYPE = constants.HTYPE_INSTANCE
3114
  _OP_REQP = ["instance_name", "mem_size", "disk_size",
3115
              "disk_template", "swap_size", "mode", "start", "vcpus",
3116
              "wait_for_sync", "ip_check", "mac"]
3117
  REQ_BGL = False
3118

    
3119
  def _ExpandNode(self, node):
3120
    """Expands and checks one node name.
3121

3122
    """
3123
    node_full = self.cfg.ExpandNodeName(node)
3124
    if node_full is None:
3125
      raise errors.OpPrereqError("Unknown node %s" % node)
3126
    return node_full
3127

    
3128
  def ExpandNames(self):
3129
    """ExpandNames for CreateInstance.
3130

3131
    Figure out the right locks for instance creation.
3132

3133
    """
3134
    self.needed_locks = {}
3135

    
3136
    # set optional parameters to none if they don't exist
3137
    for attr in ["kernel_path", "initrd_path", "pnode", "snode",
3138
                 "iallocator", "hvm_boot_order", "hvm_acpi", "hvm_pae",
3139
                 "hvm_cdrom_image_path", "hvm_nic_type", "hvm_disk_type",
3140
                 "vnc_bind_address"]:
3141
      if not hasattr(self.op, attr):
3142
        setattr(self.op, attr, None)
3143

    
3144
    # verify creation mode
3145
    if self.op.mode not in (constants.INSTANCE_CREATE,
3146
                            constants.INSTANCE_IMPORT):
3147
      raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
3148
                                 self.op.mode)
3149
    # disk template and mirror node verification
3150
    if self.op.disk_template not in constants.DISK_TEMPLATES:
3151
      raise errors.OpPrereqError("Invalid disk template name")
3152

    
3153
    #### instance parameters check
3154

    
3155
    # instance name verification
3156
    hostname1 = utils.HostInfo(self.op.instance_name)
3157
    self.op.instance_name = instance_name = hostname1.name
3158

    
3159
    # this is just a preventive check, but someone might still add this
3160
    # instance in the meantime, and creation will fail at lock-add time
3161
    if instance_name in self.cfg.GetInstanceList():
3162
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3163
                                 instance_name)
3164

    
3165
    self.add_locks[locking.LEVEL_INSTANCE] = instance_name
3166

    
3167
    # ip validity checks
3168
    ip = getattr(self.op, "ip", None)
3169
    if ip is None or ip.lower() == "none":
3170
      inst_ip = None
3171
    elif ip.lower() == "auto":
3172
      inst_ip = hostname1.ip
3173
    else:
3174
      if not utils.IsValidIP(ip):
3175
        raise errors.OpPrereqError("given IP address '%s' doesn't look"
3176
                                   " like a valid IP" % ip)
3177
      inst_ip = ip
3178
    self.inst_ip = self.op.ip = inst_ip
3179
    # used in CheckPrereq for ip ping check
3180
    self.check_ip = hostname1.ip
3181

    
3182
    # MAC address verification
3183
    if self.op.mac != "auto":
3184
      if not utils.IsValidMac(self.op.mac.lower()):
3185
        raise errors.OpPrereqError("invalid MAC address specified: %s" %
3186
                                   self.op.mac)
3187

    
3188
    # boot order verification
3189
    if self.op.hvm_boot_order is not None:
3190
      if len(self.op.hvm_boot_order.strip("acdn")) != 0:
3191
        raise errors.OpPrereqError("invalid boot order specified,"
3192
                                   " must be one or more of [acdn]")
3193
    # file storage checks
3194
    if (self.op.file_driver and
3195
        not self.op.file_driver in constants.FILE_DRIVER):
3196
      raise errors.OpPrereqError("Invalid file driver name '%s'" %
3197
                                 self.op.file_driver)
3198

    
3199
    if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
3200
      raise errors.OpPrereqError("File storage directory path not absolute")
3201

    
3202
    ### Node/iallocator related checks
3203
    if [self.op.iallocator, self.op.pnode].count(None) != 1:
3204
      raise errors.OpPrereqError("One and only one of iallocator and primary"
3205
                                 " node must be given")
3206

    
3207
    if self.op.iallocator:
3208
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3209
    else:
3210
      self.op.pnode = self._ExpandNode(self.op.pnode)
3211
      nodelist = [self.op.pnode]
3212
      if self.op.snode is not None:
3213
        self.op.snode = self._ExpandNode(self.op.snode)
3214
        nodelist.append(self.op.snode)
3215
      self.needed_locks[locking.LEVEL_NODE] = nodelist
3216

    
3217
    # in case of import lock the source node too
3218
    if self.op.mode == constants.INSTANCE_IMPORT:
3219
      src_node = getattr(self.op, "src_node", None)
3220
      src_path = getattr(self.op, "src_path", None)
3221

    
3222
      if src_node is None or src_path is None:
3223
        raise errors.OpPrereqError("Importing an instance requires source"
3224
                                   " node and path options")
3225

    
3226
      if not os.path.isabs(src_path):
3227
        raise errors.OpPrereqError("The source path must be absolute")
3228

    
3229
      self.op.src_node = src_node = self._ExpandNode(src_node)
3230
      if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
3231
        self.needed_locks[locking.LEVEL_NODE].append(src_node)
3232

    
3233
    else: # INSTANCE_CREATE
3234
      if getattr(self.op, "os_type", None) is None:
3235
        raise errors.OpPrereqError("No guest OS specified")
3236

    
3237
  def _RunAllocator(self):
3238
    """Run the allocator based on input opcode.
3239

3240
    """
3241
    disks = [{"size": self.op.disk_size, "mode": "w"},
3242
             {"size": self.op.swap_size, "mode": "w"}]
3243
    nics = [{"mac": self.op.mac, "ip": getattr(self.op, "ip", None),
3244
             "bridge": self.op.bridge}]
3245
    ial = IAllocator(self.cfg, self.sstore,
3246
                     mode=constants.IALLOCATOR_MODE_ALLOC,
3247
                     name=self.op.instance_name,
3248
                     disk_template=self.op.disk_template,
3249
                     tags=[],
3250
                     os=self.op.os_type,
3251
                     vcpus=self.op.vcpus,
3252
                     mem_size=self.op.mem_size,
3253
                     disks=disks,
3254
                     nics=nics,
3255
                     )
3256

    
3257
    ial.Run(self.op.iallocator)
3258

    
3259
    if not ial.success:
3260
      raise errors.OpPrereqError("Can't compute nodes using"
3261
                                 " iallocator '%s': %s" % (self.op.iallocator,
3262
                                                           ial.info))
3263
    if len(ial.nodes) != ial.required_nodes:
3264
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3265
                                 " of nodes (%s), required %s" %
3266
                                 (self.op.iallocator, len(ial.nodes),
3267
                                  ial.required_nodes))
3268
    self.op.pnode = ial.nodes[0]
3269
    logger.ToStdout("Selected nodes for the instance: %s" %
3270
                    (", ".join(ial.nodes),))
3271
    logger.Info("Selected nodes for instance %s via iallocator %s: %s" %
3272
                (self.op.instance_name, self.op.iallocator, ial.nodes))
3273
    if ial.required_nodes == 2:
3274
      self.op.snode = ial.nodes[1]
3275

    
3276
  def BuildHooksEnv(self):
3277
    """Build hooks env.
3278

3279
    This runs on master, primary and secondary nodes of the instance.
3280

3281
    """
3282
    env = {
3283
      "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
3284
      "INSTANCE_DISK_SIZE": self.op.disk_size,
3285
      "INSTANCE_SWAP_SIZE": self.op.swap_size,
3286
      "INSTANCE_ADD_MODE": self.op.mode,
3287
      }
3288
    if self.op.mode == constants.INSTANCE_IMPORT:
3289
      env["INSTANCE_SRC_NODE"] = self.op.src_node
3290
      env["INSTANCE_SRC_PATH"] = self.op.src_path
3291
      env["INSTANCE_SRC_IMAGE"] = self.src_image
3292

    
3293
    env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
3294
      primary_node=self.op.pnode,
3295
      secondary_nodes=self.secondaries,
3296
      status=self.instance_status,
3297
      os_type=self.op.os_type,
3298
      memory=self.op.mem_size,
3299
      vcpus=self.op.vcpus,
3300
      nics=[(self.inst_ip, self.op.bridge, self.op.mac)],
3301
    ))
3302

    
3303
    nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
3304
          self.secondaries)
3305
    return env, nl, nl
3306

    
3307

    
3308
  def CheckPrereq(self):
3309
    """Check prerequisites.
3310

3311
    """
3312
    if (not self.cfg.GetVGName() and
3313
        self.op.disk_template not in constants.DTS_NOT_LVM):
3314
      raise errors.OpPrereqError("Cluster does not support lvm-based"
3315
                                 " instances")
3316

    
3317
    if self.op.mode == constants.INSTANCE_IMPORT:
3318
      src_node = self.op.src_node
3319
      src_path = self.op.src_path
3320

    
3321
      export_info = rpc.call_export_info(src_node, src_path)
3322

    
3323
      if not export_info:
3324
        raise errors.OpPrereqError("No export found in dir %s" % src_path)
3325

    
3326
      if not export_info.has_section(constants.INISECT_EXP):
3327
        raise errors.ProgrammerError("Corrupted export config")
3328

    
3329
      ei_version = export_info.get(constants.INISECT_EXP, 'version')
3330
      if (int(ei_version) != constants.EXPORT_VERSION):
3331
        raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3332
                                   (ei_version, constants.EXPORT_VERSION))
3333

    
3334
      if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
3335
        raise errors.OpPrereqError("Can't import instance with more than"
3336
                                   " one data disk")
3337

    
3338
      # FIXME: are the old os-es, disk sizes, etc. useful?
3339
      self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3340
      diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
3341
                                                         'disk0_dump'))
3342
      self.src_image = diskimage
3343

    
3344
    # ip ping checks (we use the same ip that was resolved in ExpandNames)
3345

    
3346
    if self.op.start and not self.op.ip_check:
3347
      raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3348
                                 " adding an instance in start mode")
3349

    
3350
    if self.op.ip_check:
3351
      if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
3352
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
3353
                                   (self.check_ip, instance_name))
3354

    
3355
    # bridge verification
3356
    bridge = getattr(self.op, "bridge", None)
3357
    if bridge is None:
3358
      self.op.bridge = self.cfg.GetDefBridge()
3359
    else:
3360
      self.op.bridge = bridge
3361

    
3362
    #### allocator run
3363

    
3364
    if self.op.iallocator is not None:
3365
      self._RunAllocator()
3366

    
3367
    #### node related checks
3368

    
3369
    # check primary node
3370
    self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
3371
    assert self.pnode is not None, \
3372
      "Cannot retrieve locked node %s" % self.op.pnode
3373
    self.secondaries = []
3374

    
3375
    # mirror node verification
3376
    if self.op.disk_template in constants.DTS_NET_MIRROR:
3377
      if self.op.snode is None:
3378
        raise errors.OpPrereqError("The networked disk templates need"
3379
                                   " a mirror node")
3380
      if self.op.snode == pnode.name:
3381
        raise errors.OpPrereqError("The secondary node cannot be"
3382
                                   " the primary node.")
3383
      self.secondaries.append(self.op.snode)
3384

    
3385
    req_size = _ComputeDiskSize(self.op.disk_template,
3386
                                self.op.disk_size, self.op.swap_size)
3387

    
3388
    # Check lv size requirements
3389
    if req_size is not None:
3390
      nodenames = [pnode.name] + self.secondaries
3391
      nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
3392
      for node in nodenames:
3393
        info = nodeinfo.get(node, None)
3394
        if not info:
3395
          raise errors.OpPrereqError("Cannot get current information"
3396
                                     " from node '%s'" % node)
3397
        vg_free = info.get('vg_free', None)
3398
        if not isinstance(vg_free, int):
3399
          raise errors.OpPrereqError("Can't compute free disk space on"
3400
                                     " node %s" % node)
3401
        if req_size > info['vg_free']:
3402
          raise errors.OpPrereqError("Not enough disk space on target node %s."
3403
                                     " %d MB available, %d MB required" %
3404
                                     (node, info['vg_free'], req_size))
3405

    
3406
    # os verification
3407
    os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
3408
    if not os_obj:
3409
      raise errors.OpPrereqError("OS '%s' not in supported os list for"
3410
                                 " primary node"  % self.op.os_type)
3411

    
3412
    if self.op.kernel_path == constants.VALUE_NONE:
3413
      raise errors.OpPrereqError("Can't set instance kernel to none")
3414

    
3415
    # bridge check on primary node
3416
    if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3417
      raise errors.OpPrereqError("target bridge '%s' does not exist on"
3418
                                 " destination node '%s'" %
3419
                                 (self.op.bridge, pnode.name))
3420

    
3421
    # memory check on primary node
3422
    if self.op.start:
3423
      _CheckNodeFreeMemory(self.cfg, self.pnode.name,
3424
                           "creating instance %s" % self.op.instance_name,
3425
                           self.op.mem_size)
3426

    
3427
    # hvm_cdrom_image_path verification
3428
    if self.op.hvm_cdrom_image_path is not None:
3429
      # FIXME (als): shouldn't these checks happen on the destination node?
3430
      if not os.path.isabs(self.op.hvm_cdrom_image_path):
3431
        raise errors.OpPrereqError("The path to the HVM CDROM image must"
3432
                                   " be an absolute path or None, not %s" %
3433
                                   self.op.hvm_cdrom_image_path)
3434
      if not os.path.isfile(self.op.hvm_cdrom_image_path):
3435
        raise errors.OpPrereqError("The HVM CDROM image must either be a"
3436
                                   " regular file or a symlink pointing to"
3437
                                   " an existing regular file, not %s" %
3438
                                   self.op.hvm_cdrom_image_path)
3439

    
3440
    # vnc_bind_address verification
3441
    if self.op.vnc_bind_address is not None:
3442
      if not utils.IsValidIP(self.op.vnc_bind_address):
3443
        raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
3444
                                   " like a valid IP address" %
3445
                                   self.op.vnc_bind_address)
3446

    
3447
    # Xen HVM device type checks
3448
    if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
3449
      if self.op.hvm_nic_type not in constants.HT_HVM_VALID_NIC_TYPES:
3450
        raise errors.OpPrereqError("Invalid NIC type %s specified for Xen HVM"
3451
                                   " hypervisor" % self.op.hvm_nic_type)
3452
      if self.op.hvm_disk_type not in constants.HT_HVM_VALID_DISK_TYPES:
3453
        raise errors.OpPrereqError("Invalid disk type %s specified for Xen HVM"
3454
                                   " hypervisor" % self.op.hvm_disk_type)
3455

    
3456
    if self.op.start:
3457
      self.instance_status = 'up'
3458
    else:
3459
      self.instance_status = 'down'
3460

    
3461
  def Exec(self, feedback_fn):
3462
    """Create and add the instance to the cluster.
3463

3464
    """
3465
    instance = self.op.instance_name
3466
    pnode_name = self.pnode.name
3467

    
3468
    if self.op.mac == "auto":
3469
      mac_address = self.cfg.GenerateMAC()
3470
    else:
3471
      mac_address = self.op.mac
3472

    
3473
    nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3474
    if self.inst_ip is not None:
3475
      nic.ip = self.inst_ip
3476

    
3477
    ht_kind = self.sstore.GetHypervisorType()
3478
    if ht_kind in constants.HTS_REQ_PORT:
3479
      network_port = self.cfg.AllocatePort()
3480
    else:
3481
      network_port = None
3482

    
3483
    if self.op.vnc_bind_address is None:
3484
      self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
3485

    
3486
    # this is needed because os.path.join does not accept None arguments
3487
    if self.op.file_storage_dir is None:
3488
      string_file_storage_dir = ""
3489
    else:
3490
      string_file_storage_dir = self.op.file_storage_dir
3491

    
3492
    # build the full file storage dir path
3493
    file_storage_dir = os.path.normpath(os.path.join(
3494
                                        self.sstore.GetFileStorageDir(),
3495
                                        string_file_storage_dir, instance))
3496

    
3497

    
3498
    disks = _GenerateDiskTemplate(self.cfg,
3499
                                  self.op.disk_template,
3500
                                  instance, pnode_name,
3501
                                  self.secondaries, self.op.disk_size,
3502
                                  self.op.swap_size,
3503
                                  file_storage_dir,
3504
                                  self.op.file_driver)
3505

    
3506
    iobj = objects.Instance(name=instance, os=self.op.os_type,
3507
                            primary_node=pnode_name,
3508
                            memory=self.op.mem_size,
3509
                            vcpus=self.op.vcpus,
3510
                            nics=[nic], disks=disks,
3511
                            disk_template=self.op.disk_template,
3512
                            status=self.instance_status,
3513
                            network_port=network_port,
3514
                            kernel_path=self.op.kernel_path,
3515
                            initrd_path=self.op.initrd_path,
3516
                            hvm_boot_order=self.op.hvm_boot_order,
3517
                            hvm_acpi=self.op.hvm_acpi,
3518
                            hvm_pae=self.op.hvm_pae,
3519
                            hvm_cdrom_image_path=self.op.hvm_cdrom_image_path,
3520
                            vnc_bind_address=self.op.vnc_bind_address,
3521
                            hvm_nic_type=self.op.hvm_nic_type,
3522
                            hvm_disk_type=self.op.hvm_disk_type,
3523
                            )
3524

    
3525
    feedback_fn("* creating instance disks...")
3526
    if not _CreateDisks(self.cfg, iobj):
3527
      _RemoveDisks(iobj, self.cfg)
3528
      self.cfg.ReleaseDRBDMinors(instance)
3529
      raise errors.OpExecError("Device creation failed, reverting...")
3530

    
3531
    feedback_fn("adding instance %s to cluster config" % instance)
3532

    
3533
    self.cfg.AddInstance(iobj)
3534
    # Declare that we don't want to remove the instance lock anymore, as we've
3535
    # added the instance to the config
3536
    del self.remove_locks[locking.LEVEL_INSTANCE]
3537
    # Remove the temp. assignements for the instance's drbds
3538
    self.cfg.ReleaseDRBDMinors(instance)
3539

    
3540
    if self.op.wait_for_sync:
3541
      disk_abort = not _WaitForSync(self.cfg, iobj, self.proc)
3542
    elif iobj.disk_template in constants.DTS_NET_MIRROR:
3543
      # make sure the disks are not degraded (still sync-ing is ok)
3544
      time.sleep(15)
3545
      feedback_fn("* checking mirrors status")
3546
      disk_abort = not _WaitForSync(self.cfg, iobj, self.proc, oneshot=True)
3547
    else:
3548
      disk_abort = False
3549

    
3550
    if disk_abort:
3551
      _RemoveDisks(iobj, self.cfg)
3552
      self.cfg.RemoveInstance(iobj.name)
3553
      # Make sure the instance lock gets removed
3554
      self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
3555
      raise errors.OpExecError("There are some degraded disks for"
3556
                               " this instance")
3557

    
3558
    feedback_fn("creating os for instance %s on node %s" %
3559
                (instance, pnode_name))
3560

    
3561
    if iobj.disk_template != constants.DT_DISKLESS:
3562
      if self.op.mode == constants.INSTANCE_CREATE:
3563
        feedback_fn("* running the instance OS create scripts...")
3564
        if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3565
          raise errors.OpExecError("could not add os for instance %s"
3566
                                   " on node %s" %
3567
                                   (instance, pnode_name))
3568

    
3569
      elif self.op.mode == constants.INSTANCE_IMPORT:
3570
        feedback_fn("* running the instance OS import scripts...")
3571
        src_node = self.op.src_node
3572
        src_image = self.src_image
3573
        if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3574
                                                src_node, src_image):
3575
          raise errors.OpExecError("Could not import os for instance"
3576
                                   " %s on node %s" %
3577
                                   (instance, pnode_name))
3578
      else:
3579
        # also checked in the prereq part
3580
        raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3581
                                     % self.op.mode)
3582

    
3583
    if self.op.start:
3584
      logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3585
      feedback_fn("* starting instance...")
3586
      if not rpc.call_instance_start(pnode_name, iobj, None):
3587
        raise errors.OpExecError("Could not start instance")
3588

    
3589

    
3590
class LUConnectConsole(NoHooksLU):
3591
  """Connect to an instance's console.
3592

3593
  This is somewhat special in that it returns the command line that
3594
  you need to run on the master node in order to connect to the
3595
  console.
3596

3597
  """
3598
  _OP_REQP = ["instance_name"]
3599
  REQ_BGL = False
3600

    
3601
  def ExpandNames(self):
3602
    self._ExpandAndLockInstance()
3603

    
3604
  def CheckPrereq(self):
3605
    """Check prerequisites.
3606

3607
    This checks that the instance is in the cluster.
3608

3609
    """
3610
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3611
    assert self.instance is not None, \
3612
      "Cannot retrieve locked instance %s" % self.op.instance_name
3613

    
3614
  def Exec(self, feedback_fn):
3615
    """Connect to the console of an instance
3616

3617
    """
3618
    instance = self.instance
3619
    node = instance.primary_node
3620

    
3621
    node_insts = rpc.call_instance_list([node])[node]
3622
    if node_insts is False:
3623
      raise errors.OpExecError("Can't connect to node %s." % node)
3624

    
3625
    if instance.name not in node_insts:
3626
      raise errors.OpExecError("Instance %s is not running." % instance.name)
3627

    
3628
    logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3629

    
3630
    hyper = hypervisor.GetHypervisor()
3631
    console_cmd = hyper.GetShellCommandForConsole(instance)
3632

    
3633
    # build ssh cmdline
3634
    return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
3635

    
3636

    
3637
class LUReplaceDisks(LogicalUnit):
3638
  """Replace the disks of an instance.
3639

3640
  """
3641
  HPATH = "mirrors-replace"
3642
  HTYPE = constants.HTYPE_INSTANCE
3643
  _OP_REQP = ["instance_name", "mode", "disks"]
3644
  REQ_BGL = False
3645

    
3646
  def ExpandNames(self):
3647
    self._ExpandAndLockInstance()
3648

    
3649
    if not hasattr(self.op, "remote_node"):
3650
      self.op.remote_node = None
3651

    
3652
    ia_name = getattr(self.op, "iallocator", None)
3653
    if ia_name is not None:
3654
      if self.op.remote_node is not None:
3655
        raise errors.OpPrereqError("Give either the iallocator or the new"
3656
                                   " secondary, not both")
3657
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3658
    elif self.op.remote_node is not None:
3659
      remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
3660
      if remote_node is None:
3661
        raise errors.OpPrereqError("Node '%s' not known" %
3662
                                   self.op.remote_node)
3663
      self.op.remote_node = remote_node
3664
      self.needed_locks[locking.LEVEL_NODE] = [remote_node]
3665
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
3666
    else:
3667
      self.needed_locks[locking.LEVEL_NODE] = []
3668
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3669

    
3670
  def DeclareLocks(self, level):
3671
    # If we're not already locking all nodes in the set we have to declare the
3672
    # instance's primary/secondary nodes.
3673
    if (level == locking.LEVEL_NODE and
3674
        self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
3675
      self._LockInstancesNodes()
3676

    
3677
  def _RunAllocator(self):
3678
    """Compute a new secondary node using an IAllocator.
3679

3680
    """
3681
    ial = IAllocator(self.cfg, self.sstore,
3682
                     mode=constants.IALLOCATOR_MODE_RELOC,
3683
                     name=self.op.instance_name,
3684
                     relocate_from=[self.sec_node])
3685

    
3686
    ial.Run(self.op.iallocator)
3687

    
3688
    if not ial.success:
3689
      raise errors.OpPrereqError("Can't compute nodes using"
3690
                                 " iallocator '%s': %s" % (self.op.iallocator,
3691
                                                           ial.info))
3692
    if len(ial.nodes) != ial.required_nodes:
3693
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3694
                                 " of nodes (%s), required %s" %
3695
                                 (len(ial.nodes), ial.required_nodes))
3696
    self.op.remote_node = ial.nodes[0]
3697
    logger.ToStdout("Selected new secondary for the instance: %s" %
3698
                    self.op.remote_node)
3699

    
3700
  def BuildHooksEnv(self):
3701
    """Build hooks env.
3702

3703
    This runs on the master, the primary and all the secondaries.
3704

3705
    """
3706
    env = {
3707
      "MODE": self.op.mode,
3708
      "NEW_SECONDARY": self.op.remote_node,
3709
      "OLD_SECONDARY": self.instance.secondary_nodes[0],
3710
      }
3711
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3712
    nl = [
3713
      self.sstore.GetMasterNode(),
3714
      self.instance.primary_node,
3715
      ]
3716
    if self.op.remote_node is not None:
3717
      nl.append(self.op.remote_node)
3718
    return env, nl, nl
3719

    
3720
  def CheckPrereq(self):
3721
    """Check prerequisites.
3722

3723
    This checks that the instance is in the cluster.
3724

3725
    """
3726
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3727
    assert instance is not None, \
3728
      "Cannot retrieve locked instance %s" % self.op.instance_name
3729
    self.instance = instance
3730

    
3731
    if instance.disk_template not in constants.DTS_NET_MIRROR:
3732
      raise errors.OpPrereqError("Instance's disk layout is not"
3733
                                 " network mirrored.")
3734

    
3735
    if len(instance.secondary_nodes) != 1:
3736
      raise errors.OpPrereqError("The instance has a strange layout,"
3737
                                 " expected one secondary but found %d" %
3738
                                 len(instance.secondary_nodes))
3739

    
3740
    self.sec_node = instance.secondary_nodes[0]
3741

    
3742
    ia_name = getattr(self.op, "iallocator", None)
3743
    if ia_name is not None:
3744
      self._RunAllocator()
3745

    
3746
    remote_node = self.op.remote_node
3747
    if remote_node is not None:
3748
      self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3749
      assert self.remote_node_info is not None, \
3750
        "Cannot retrieve locked node %s" % remote_node
3751
    else:
3752
      self.remote_node_info = None
3753
    if remote_node == instance.primary_node:
3754
      raise errors.OpPrereqError("The specified node is the primary node of"
3755
                                 " the instance.")
3756
    elif remote_node == self.sec_node:
3757
      if self.op.mode == constants.REPLACE_DISK_SEC:
3758
        # this is for DRBD8, where we can't execute the same mode of
3759
        # replacement as for drbd7 (no different port allocated)
3760
        raise errors.OpPrereqError("Same secondary given, cannot execute"
3761
                                   " replacement")
3762
    if instance.disk_template == constants.DT_DRBD8:
3763
      if (self.op.mode == constants.REPLACE_DISK_ALL and
3764
          remote_node is not None):
3765
        # switch to replace secondary mode
3766
        self.op.mode = constants.REPLACE_DISK_SEC
3767

    
3768
      if self.op.mode == constants.REPLACE_DISK_ALL:
3769
        raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3770
                                   " secondary disk replacement, not"
3771
                                   " both at once")
3772
      elif self.op.mode == constants.REPLACE_DISK_PRI:
3773
        if remote_node is not None:
3774
          raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3775
                                     " the secondary while doing a primary"
3776
                                     " node disk replacement")
3777
        self.tgt_node = instance.primary_node
3778
        self.oth_node = instance.secondary_nodes[0]
3779
      elif self.op.mode == constants.REPLACE_DISK_SEC:
3780
        self.new_node = remote_node # this can be None, in which case
3781
                                    # we don't change the secondary
3782
        self.tgt_node = instance.secondary_nodes[0]
3783
        self.oth_node = instance.primary_node
3784
      else:
3785
        raise errors.ProgrammerError("Unhandled disk replace mode")
3786

    
3787
    for name in self.op.disks:
3788
      if instance.FindDisk(name) is None:
3789
        raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3790
                                   (name, instance.name))
3791

    
3792
  def _ExecD8DiskOnly(self, feedback_fn):
3793
    """Replace a disk on the primary or secondary for dbrd8.
3794

3795
    The algorithm for replace is quite complicated:
3796
      - for each disk to be replaced:
3797
        - create new LVs on the target node with unique names
3798
        - detach old LVs from the drbd device
3799
        - rename old LVs to name_replaced.<time_t>
3800
        - rename new LVs to old LVs
3801
        - attach the new LVs (with the old names now) to the drbd device
3802
      - wait for sync across all devices
3803
      - for each modified disk:
3804
        - remove old LVs (which have the name name_replaces.<time_t>)
3805

3806
    Failures are not very well handled.
3807

3808
    """
3809
    steps_total = 6
3810
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3811
    instance = self.instance
3812
    iv_names = {}
3813
    vgname = self.cfg.GetVGName()
3814
    # start of work
3815
    cfg = self.cfg
3816
    tgt_node = self.tgt_node
3817
    oth_node = self.oth_node
3818

    
3819
    # Step: check device activation
3820
    self.proc.LogStep(1, steps_total, "check device existence")
3821
    info("checking volume groups")
3822
    my_vg = cfg.GetVGName()
3823
    results = rpc.call_vg_list([oth_node, tgt_node])
3824
    if not results:
3825
      raise errors.OpExecError("Can't list volume groups on the nodes")
3826
    for node in oth_node, tgt_node:
3827
      res = results.get(node, False)
3828
      if not res or my_vg not in res:
3829
        raise errors.OpExecError("Volume group '%s' not found on %s" %
3830
                                 (my_vg, node))
3831
    for dev in instance.disks:
3832
      if not dev.iv_name in self.op.disks:
3833
        continue
3834
      for node in tgt_node, oth_node:
3835
        info("checking %s on %s" % (dev.iv_name, node))
3836
        cfg.SetDiskID(dev, node)
3837
        if not rpc.call_blockdev_find(node, dev):
3838
          raise errors.OpExecError("Can't find device %s on node %s" %
3839
                                   (dev.iv_name, node))
3840

    
3841
    # Step: check other node consistency
3842
    self.proc.LogStep(2, steps_total, "check peer consistency")
3843
    for dev in instance.disks:
3844
      if not dev.iv_name in self.op.disks:
3845
        continue
3846
      info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3847
      if not _CheckDiskConsistency(self.cfg, dev, oth_node,
3848
                                   oth_node==instance.primary_node):
3849
        raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3850
                                 " to replace disks on this node (%s)" %
3851
                                 (oth_node, tgt_node))
3852

    
3853
    # Step: create new storage
3854
    self.proc.LogStep(3, steps_total, "allocate new storage")
3855
    for dev in instance.disks:
3856
      if not dev.iv_name in self.op.disks:
3857
        continue
3858
      size = dev.size
3859
      cfg.SetDiskID(dev, tgt_node)
3860
      lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3861
      names = _GenerateUniqueNames(cfg, lv_names)
3862
      lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3863
                             logical_id=(vgname, names[0]))
3864
      lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3865
                             logical_id=(vgname, names[1]))
3866
      new_lvs = [lv_data, lv_meta]
3867
      old_lvs = dev.children
3868
      iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3869
      info("creating new local storage on %s for %s" %
3870
           (tgt_node, dev.iv_name))
3871
      # since we *always* want to create this LV, we use the
3872
      # _Create...OnPrimary (which forces the creation), even if we
3873
      # are talking about the secondary node
3874
      for new_lv in new_lvs:
3875
        if not _CreateBlockDevOnPrimary(cfg, tgt_node, instance, new_lv,
3876
                                        _GetInstanceInfoText(instance)):
3877
          raise errors.OpExecError("Failed to create new LV named '%s' on"
3878
                                   " node '%s'" %
3879
                                   (new_lv.logical_id[1], tgt_node))
3880

    
3881
    # Step: for each lv, detach+rename*2+attach
3882
    self.proc.LogStep(4, steps_total, "change drbd configuration")
3883
    for dev, old_lvs, new_lvs in iv_names.itervalues():
3884
      info("detaching %s drbd from local storage" % dev.iv_name)
3885
      if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
3886
        raise errors.OpExecError("Can't detach drbd from local storage on node"
3887
                                 " %s for device %s" % (tgt_node, dev.iv_name))
3888
      #dev.children = []
3889
      #cfg.Update(instance)
3890

    
3891
      # ok, we created the new LVs, so now we know we have the needed
3892
      # storage; as such, we proceed on the target node to rename
3893
      # old_lv to _old, and new_lv to old_lv; note that we rename LVs
3894
      # using the assumption that logical_id == physical_id (which in
3895
      # turn is the unique_id on that node)
3896

    
3897
      # FIXME(iustin): use a better name for the replaced LVs
3898
      temp_suffix = int(time.time())
3899
      ren_fn = lambda d, suff: (d.physical_id[0],
3900
                                d.physical_id[1] + "_replaced-%s" % suff)
3901
      # build the rename list based on what LVs exist on the node
3902
      rlist = []
3903
      for to_ren in old_lvs:
3904
        find_res = rpc.call_blockdev_find(tgt_node, to_ren)
3905
        if find_res is not None: # device exists
3906
          rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
3907

    
3908
      info("renaming the old LVs on the target node")
3909
      if not rpc.call_blockdev_rename(tgt_node, rlist):
3910
        raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
3911
      # now we rename the new LVs to the old LVs
3912
      info("renaming the new LVs on the target node")
3913
      rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
3914
      if not rpc.call_blockdev_rename(tgt_node, rlist):
3915
        raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
3916

    
3917
      for old, new in zip(old_lvs, new_lvs):
3918
        new.logical_id = old.logical_id
3919
        cfg.SetDiskID(new, tgt_node)
3920

    
3921
      for disk in old_lvs:
3922
        disk.logical_id = ren_fn(disk, temp_suffix)
3923
        cfg.SetDiskID(disk, tgt_node)
3924

    
3925
      # now that the new lvs have the old name, we can add them to the device
3926
      info("adding new mirror component on %s" % tgt_node)
3927
      if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
3928
        for new_lv in new_lvs:
3929
          if not rpc.call_blockdev_remove(tgt_node, new_lv):
3930
            warning("Can't rollback device %s", hint="manually cleanup unused"
3931
                    " logical volumes")
3932
        raise errors.OpExecError("Can't add local storage to drbd")
3933

    
3934
      dev.children = new_lvs
3935
      cfg.Update(instance)
3936

    
3937
    # Step: wait for sync
3938

    
3939
    # this can fail as the old devices are degraded and _WaitForSync
3940
    # does a combined result over all disks, so we don't check its
3941
    # return value
3942
    self.proc.LogStep(5, steps_total, "sync devices")
3943
    _WaitForSync(cfg, instance, self.proc, unlock=True)
3944

    
3945
    # so check manually all the devices
3946
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3947
      cfg.SetDiskID(dev, instance.primary_node)
3948
      is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3949
      if is_degr:
3950
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
3951

    
3952
    # Step: remove old storage
3953
    self.proc.LogStep(6, steps_total, "removing old storage")
3954
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3955
      info("remove logical volumes for %s" % name)
3956
      for lv in old_lvs:
3957
        cfg.SetDiskID(lv, tgt_node)
3958
        if not rpc.call_blockdev_remove(tgt_node, lv):
3959
          warning("Can't remove old LV", hint="manually remove unused LVs")
3960
          continue
3961

    
3962
  def _ExecD8Secondary(self, feedback_fn):
3963
    """Replace the secondary node for drbd8.
3964

3965
    The algorithm for replace is quite complicated:
3966
      - for all disks of the instance:
3967
        - create new LVs on the new node with same names
3968
        - shutdown the drbd device on the old secondary
3969
        - disconnect the drbd network on the primary
3970
        - create the drbd device on the new secondary
3971
        - network attach the drbd on the primary, using an artifice:
3972
          the drbd code for Attach() will connect to the network if it
3973
          finds a device which is connected to the good local disks but
3974
          not network enabled
3975
      - wait for sync across all devices
3976
      - remove all disks from the old secondary
3977

3978
    Failures are not very well handled.
3979

3980
<