Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ ffa1c0dc

History | View | Annotate | Download (186.9 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0613,W0201
25

    
26
import os
27
import os.path
28
import sha
29
import time
30
import tempfile
31
import re
32
import platform
33
import logging
34

    
35
from ganeti import rpc
36
from ganeti import ssh
37
from ganeti import logger
38
from ganeti import utils
39
from ganeti import errors
40
from ganeti import hypervisor
41
from ganeti import locking
42
from ganeti import constants
43
from ganeti import objects
44
from ganeti import opcodes
45
from ganeti import serializer
46

    
47

    
48
class LogicalUnit(object):
49
  """Logical Unit base class.
50

51
  Subclasses must follow these rules:
52
    - implement ExpandNames
53
    - implement CheckPrereq
54
    - implement Exec
55
    - implement BuildHooksEnv
56
    - redefine HPATH and HTYPE
57
    - optionally redefine their run requirements:
58
        REQ_MASTER: the LU needs to run on the master node
59
        REQ_WSSTORE: the LU needs a writable SimpleStore
60
        REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
61

62
  Note that all commands require root permissions.
63

64
  """
65
  HPATH = None
66
  HTYPE = None
67
  _OP_REQP = []
68
  REQ_MASTER = True
69
  REQ_WSSTORE = False
70
  REQ_BGL = True
71

    
72
  def __init__(self, processor, op, context, sstore):
73
    """Constructor for LogicalUnit.
74

75
    This needs to be overriden in derived classes in order to check op
76
    validity.
77

78
    """
79
    self.proc = processor
80
    self.op = op
81
    self.cfg = context.cfg
82
    self.sstore = sstore
83
    self.context = context
84
    # Dicts used to declare locking needs to mcpu
85
    self.needed_locks = None
86
    self.acquired_locks = {}
87
    self.share_locks = dict(((i, 0) for i in locking.LEVELS))
88
    self.add_locks = {}
89
    self.remove_locks = {}
90
    # Used to force good behavior when calling helper functions
91
    self.recalculate_locks = {}
92
    self.__ssh = None
93

    
94
    for attr_name in self._OP_REQP:
95
      attr_val = getattr(op, attr_name, None)
96
      if attr_val is None:
97
        raise errors.OpPrereqError("Required parameter '%s' missing" %
98
                                   attr_name)
99

    
100
    if not self.cfg.IsCluster():
101
      raise errors.OpPrereqError("Cluster not initialized yet,"
102
                                 " use 'gnt-cluster init' first.")
103
    if self.REQ_MASTER:
104
      master = sstore.GetMasterNode()
105
      if master != utils.HostInfo().name:
106
        raise errors.OpPrereqError("Commands must be run on the master"
107
                                   " node %s" % master)
108

    
109
  def __GetSSH(self):
110
    """Returns the SshRunner object
111

112
    """
113
    if not self.__ssh:
114
      self.__ssh = ssh.SshRunner(self.sstore)
115
    return self.__ssh
116

    
117
  ssh = property(fget=__GetSSH)
118

    
119
  def ExpandNames(self):
120
    """Expand names for this LU.
121

122
    This method is called before starting to execute the opcode, and it should
123
    update all the parameters of the opcode to their canonical form (e.g. a
124
    short node name must be fully expanded after this method has successfully
125
    completed). This way locking, hooks, logging, ecc. can work correctly.
126

127
    LUs which implement this method must also populate the self.needed_locks
128
    member, as a dict with lock levels as keys, and a list of needed lock names
129
    as values. Rules:
130
      - Use an empty dict if you don't need any lock
131
      - If you don't need any lock at a particular level omit that level
132
      - Don't put anything for the BGL level
133
      - If you want all locks at a level use locking.ALL_SET as a value
134

135
    If you need to share locks (rather than acquire them exclusively) at one
136
    level you can modify self.share_locks, setting a true value (usually 1) for
137
    that level. By default locks are not shared.
138

139
    Examples:
140
    # Acquire all nodes and one instance
141
    self.needed_locks = {
142
      locking.LEVEL_NODE: locking.ALL_SET,
143
      locking.LEVEL_INSTANCE: ['instance1.example.tld'],
144
    }
145
    # Acquire just two nodes
146
    self.needed_locks = {
147
      locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
148
    }
149
    # Acquire no locks
150
    self.needed_locks = {} # No, you can't leave it to the default value None
151

152
    """
153
    # The implementation of this method is mandatory only if the new LU is
154
    # concurrent, so that old LUs don't need to be changed all at the same
155
    # time.
156
    if self.REQ_BGL:
157
      self.needed_locks = {} # Exclusive LUs don't need locks.
158
    else:
159
      raise NotImplementedError
160

    
161
  def DeclareLocks(self, level):
162
    """Declare LU locking needs for a level
163

164
    While most LUs can just declare their locking needs at ExpandNames time,
165
    sometimes there's the need to calculate some locks after having acquired
166
    the ones before. This function is called just before acquiring locks at a
167
    particular level, but after acquiring the ones at lower levels, and permits
168
    such calculations. It can be used to modify self.needed_locks, and by
169
    default it does nothing.
170

171
    This function is only called if you have something already set in
172
    self.needed_locks for the level.
173

174
    @param level: Locking level which is going to be locked
175
    @type level: member of ganeti.locking.LEVELS
176

177
    """
178

    
179
  def CheckPrereq(self):
180
    """Check prerequisites for this LU.
181

182
    This method should check that the prerequisites for the execution
183
    of this LU are fulfilled. It can do internode communication, but
184
    it should be idempotent - no cluster or system changes are
185
    allowed.
186

187
    The method should raise errors.OpPrereqError in case something is
188
    not fulfilled. Its return value is ignored.
189

190
    This method should also update all the parameters of the opcode to
191
    their canonical form if it hasn't been done by ExpandNames before.
192

193
    """
194
    raise NotImplementedError
195

    
196
  def Exec(self, feedback_fn):
197
    """Execute the LU.
198

199
    This method should implement the actual work. It should raise
200
    errors.OpExecError for failures that are somewhat dealt with in
201
    code, or expected.
202

203
    """
204
    raise NotImplementedError
205

    
206
  def BuildHooksEnv(self):
207
    """Build hooks environment for this LU.
208

209
    This method should return a three-node tuple consisting of: a dict
210
    containing the environment that will be used for running the
211
    specific hook for this LU, a list of node names on which the hook
212
    should run before the execution, and a list of node names on which
213
    the hook should run after the execution.
214

215
    The keys of the dict must not have 'GANETI_' prefixed as this will
216
    be handled in the hooks runner. Also note additional keys will be
217
    added by the hooks runner. If the LU doesn't define any
218
    environment, an empty dict (and not None) should be returned.
219

220
    No nodes should be returned as an empty list (and not None).
221

222
    Note that if the HPATH for a LU class is None, this function will
223
    not be called.
224

225
    """
226
    raise NotImplementedError
227

    
228
  def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
229
    """Notify the LU about the results of its hooks.
230

231
    This method is called every time a hooks phase is executed, and notifies
232
    the Logical Unit about the hooks' result. The LU can then use it to alter
233
    its result based on the hooks.  By default the method does nothing and the
234
    previous result is passed back unchanged but any LU can define it if it
235
    wants to use the local cluster hook-scripts somehow.
236

237
    Args:
238
      phase: the hooks phase that has just been run
239
      hooks_results: the results of the multi-node hooks rpc call
240
      feedback_fn: function to send feedback back to the caller
241
      lu_result: the previous result this LU had, or None in the PRE phase.
242

243
    """
244
    return lu_result
245

    
246
  def _ExpandAndLockInstance(self):
247
    """Helper function to expand and lock an instance.
248

249
    Many LUs that work on an instance take its name in self.op.instance_name
250
    and need to expand it and then declare the expanded name for locking. This
251
    function does it, and then updates self.op.instance_name to the expanded
252
    name. It also initializes needed_locks as a dict, if this hasn't been done
253
    before.
254

255
    """
256
    if self.needed_locks is None:
257
      self.needed_locks = {}
258
    else:
259
      assert locking.LEVEL_INSTANCE not in self.needed_locks, \
260
        "_ExpandAndLockInstance called with instance-level locks set"
261
    expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
262
    if expanded_name is None:
263
      raise errors.OpPrereqError("Instance '%s' not known" %
264
                                  self.op.instance_name)
265
    self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
266
    self.op.instance_name = expanded_name
267

    
268
  def _LockInstancesNodes(self, primary_only=False):
269
    """Helper function to declare instances' nodes for locking.
270

271
    This function should be called after locking one or more instances to lock
272
    their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
273
    with all primary or secondary nodes for instances already locked and
274
    present in self.needed_locks[locking.LEVEL_INSTANCE].
275

276
    It should be called from DeclareLocks, and for safety only works if
277
    self.recalculate_locks[locking.LEVEL_NODE] is set.
278

279
    In the future it may grow parameters to just lock some instance's nodes, or
280
    to just lock primaries or secondary nodes, if needed.
281

282
    If should be called in DeclareLocks in a way similar to:
283

284
    if level == locking.LEVEL_NODE:
285
      self._LockInstancesNodes()
286

287
    @type primary_only: boolean
288
    @param primary_only: only lock primary nodes of locked instances
289

290
    """
291
    assert locking.LEVEL_NODE in self.recalculate_locks, \
292
      "_LockInstancesNodes helper function called with no nodes to recalculate"
293

    
294
    # TODO: check if we're really been called with the instance locks held
295

    
296
    # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
297
    # future we might want to have different behaviors depending on the value
298
    # of self.recalculate_locks[locking.LEVEL_NODE]
299
    wanted_nodes = []
300
    for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
301
      instance = self.context.cfg.GetInstanceInfo(instance_name)
302
      wanted_nodes.append(instance.primary_node)
303
      if not primary_only:
304
        wanted_nodes.extend(instance.secondary_nodes)
305

    
306
    if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
307
      self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
308
    elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
309
      self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
310

    
311
    del self.recalculate_locks[locking.LEVEL_NODE]
312

    
313

    
314
class NoHooksLU(LogicalUnit):
315
  """Simple LU which runs no hooks.
316

317
  This LU is intended as a parent for other LogicalUnits which will
318
  run no hooks, in order to reduce duplicate code.
319

320
  """
321
  HPATH = None
322
  HTYPE = None
323

    
324

    
325
def _GetWantedNodes(lu, nodes):
326
  """Returns list of checked and expanded node names.
327

328
  Args:
329
    nodes: List of nodes (strings) or None for all
330

331
  """
332
  if not isinstance(nodes, list):
333
    raise errors.OpPrereqError("Invalid argument type 'nodes'")
334

    
335
  if not nodes:
336
    raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
337
      " non-empty list of nodes whose name is to be expanded.")
338

    
339
  wanted = []
340
  for name in nodes:
341
    node = lu.cfg.ExpandNodeName(name)
342
    if node is None:
343
      raise errors.OpPrereqError("No such node name '%s'" % name)
344
    wanted.append(node)
345

    
346
  return utils.NiceSort(wanted)
347

    
348

    
349
def _GetWantedInstances(lu, instances):
350
  """Returns list of checked and expanded instance names.
351

352
  Args:
353
    instances: List of instances (strings) or None for all
354

355
  """
356
  if not isinstance(instances, list):
357
    raise errors.OpPrereqError("Invalid argument type 'instances'")
358

    
359
  if instances:
360
    wanted = []
361

    
362
    for name in instances:
363
      instance = lu.cfg.ExpandInstanceName(name)
364
      if instance is None:
365
        raise errors.OpPrereqError("No such instance name '%s'" % name)
366
      wanted.append(instance)
367

    
368
  else:
369
    wanted = lu.cfg.GetInstanceList()
370
  return utils.NiceSort(wanted)
371

    
372

    
373
def _CheckOutputFields(static, dynamic, selected):
374
  """Checks whether all selected fields are valid.
375

376
  Args:
377
    static: Static fields
378
    dynamic: Dynamic fields
379

380
  """
381
  static_fields = frozenset(static)
382
  dynamic_fields = frozenset(dynamic)
383

    
384
  all_fields = static_fields | dynamic_fields
385

    
386
  if not all_fields.issuperset(selected):
387
    raise errors.OpPrereqError("Unknown output fields selected: %s"
388
                               % ",".join(frozenset(selected).
389
                                          difference(all_fields)))
390

    
391

    
392
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
393
                          memory, vcpus, nics):
394
  """Builds instance related env variables for hooks from single variables.
395

396
  Args:
397
    secondary_nodes: List of secondary nodes as strings
398
  """
399
  env = {
400
    "OP_TARGET": name,
401
    "INSTANCE_NAME": name,
402
    "INSTANCE_PRIMARY": primary_node,
403
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
404
    "INSTANCE_OS_TYPE": os_type,
405
    "INSTANCE_STATUS": status,
406
    "INSTANCE_MEMORY": memory,
407
    "INSTANCE_VCPUS": vcpus,
408
  }
409

    
410
  if nics:
411
    nic_count = len(nics)
412
    for idx, (ip, bridge, mac) in enumerate(nics):
413
      if ip is None:
414
        ip = ""
415
      env["INSTANCE_NIC%d_IP" % idx] = ip
416
      env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
417
      env["INSTANCE_NIC%d_HWADDR" % idx] = mac
418
  else:
419
    nic_count = 0
420

    
421
  env["INSTANCE_NIC_COUNT"] = nic_count
422

    
423
  return env
424

    
425

    
426
def _BuildInstanceHookEnvByObject(instance, override=None):
427
  """Builds instance related env variables for hooks from an object.
428

429
  Args:
430
    instance: objects.Instance object of instance
431
    override: dict of values to override
432
  """
433
  args = {
434
    'name': instance.name,
435
    'primary_node': instance.primary_node,
436
    'secondary_nodes': instance.secondary_nodes,
437
    'os_type': instance.os,
438
    'status': instance.os,
439
    'memory': instance.memory,
440
    'vcpus': instance.vcpus,
441
    'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
442
  }
443
  if override:
444
    args.update(override)
445
  return _BuildInstanceHookEnv(**args)
446

    
447

    
448
def _CheckInstanceBridgesExist(instance):
449
  """Check that the brigdes needed by an instance exist.
450

451
  """
452
  # check bridges existance
453
  brlist = [nic.bridge for nic in instance.nics]
454
  if not rpc.call_bridges_exist(instance.primary_node, brlist):
455
    raise errors.OpPrereqError("one or more target bridges %s does not"
456
                               " exist on destination node '%s'" %
457
                               (brlist, instance.primary_node))
458

    
459

    
460
class LUDestroyCluster(NoHooksLU):
461
  """Logical unit for destroying the cluster.
462

463
  """
464
  _OP_REQP = []
465

    
466
  def CheckPrereq(self):
467
    """Check prerequisites.
468

469
    This checks whether the cluster is empty.
470

471
    Any errors are signalled by raising errors.OpPrereqError.
472

473
    """
474
    master = self.sstore.GetMasterNode()
475

    
476
    nodelist = self.cfg.GetNodeList()
477
    if len(nodelist) != 1 or nodelist[0] != master:
478
      raise errors.OpPrereqError("There are still %d node(s) in"
479
                                 " this cluster." % (len(nodelist) - 1))
480
    instancelist = self.cfg.GetInstanceList()
481
    if instancelist:
482
      raise errors.OpPrereqError("There are still %d instance(s) in"
483
                                 " this cluster." % len(instancelist))
484

    
485
  def Exec(self, feedback_fn):
486
    """Destroys the cluster.
487

488
    """
489
    master = self.sstore.GetMasterNode()
490
    if not rpc.call_node_stop_master(master, False):
491
      raise errors.OpExecError("Could not disable the master role")
492
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
493
    utils.CreateBackup(priv_key)
494
    utils.CreateBackup(pub_key)
495
    return master
496

    
497

    
498
class LUVerifyCluster(LogicalUnit):
499
  """Verifies the cluster status.
500

501
  """
502
  HPATH = "cluster-verify"
503
  HTYPE = constants.HTYPE_CLUSTER
504
  _OP_REQP = ["skip_checks"]
505
  REQ_BGL = False
506

    
507
  def ExpandNames(self):
508
    self.needed_locks = {
509
      locking.LEVEL_NODE: locking.ALL_SET,
510
      locking.LEVEL_INSTANCE: locking.ALL_SET,
511
    }
512
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
513

    
514
  def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
515
                  remote_version, feedback_fn):
516
    """Run multiple tests against a node.
517

518
    Test list:
519
      - compares ganeti version
520
      - checks vg existance and size > 20G
521
      - checks config file checksum
522
      - checks ssh to other nodes
523

524
    Args:
525
      node: name of the node to check
526
      file_list: required list of files
527
      local_cksum: dictionary of local files and their checksums
528

529
    """
530
    # compares ganeti version
531
    local_version = constants.PROTOCOL_VERSION
532
    if not remote_version:
533
      feedback_fn("  - ERROR: connection to %s failed" % (node))
534
      return True
535

    
536
    if local_version != remote_version:
537
      feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
538
                      (local_version, node, remote_version))
539
      return True
540

    
541
    # checks vg existance and size > 20G
542

    
543
    bad = False
544
    if not vglist:
545
      feedback_fn("  - ERROR: unable to check volume groups on node %s." %
546
                      (node,))
547
      bad = True
548
    else:
549
      vgstatus = utils.CheckVolumeGroupSize(vglist, self.cfg.GetVGName(),
550
                                            constants.MIN_VG_SIZE)
551
      if vgstatus:
552
        feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
553
        bad = True
554

    
555
    # checks config file checksum
556
    # checks ssh to any
557

    
558
    if 'filelist' not in node_result:
559
      bad = True
560
      feedback_fn("  - ERROR: node hasn't returned file checksum data")
561
    else:
562
      remote_cksum = node_result['filelist']
563
      for file_name in file_list:
564
        if file_name not in remote_cksum:
565
          bad = True
566
          feedback_fn("  - ERROR: file '%s' missing" % file_name)
567
        elif remote_cksum[file_name] != local_cksum[file_name]:
568
          bad = True
569
          feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
570

    
571
    if 'nodelist' not in node_result:
572
      bad = True
573
      feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
574
    else:
575
      if node_result['nodelist']:
576
        bad = True
577
        for node in node_result['nodelist']:
578
          feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
579
                          (node, node_result['nodelist'][node]))
580
    if 'node-net-test' not in node_result:
581
      bad = True
582
      feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
583
    else:
584
      if node_result['node-net-test']:
585
        bad = True
586
        nlist = utils.NiceSort(node_result['node-net-test'].keys())
587
        for node in nlist:
588
          feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
589
                          (node, node_result['node-net-test'][node]))
590

    
591
    hyp_result = node_result.get('hypervisor', None)
592
    if hyp_result is not None:
593
      feedback_fn("  - ERROR: hypervisor verify failure: '%s'" % hyp_result)
594
    return bad
595

    
596
  def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
597
                      node_instance, feedback_fn):
598
    """Verify an instance.
599

600
    This function checks to see if the required block devices are
601
    available on the instance's node.
602

603
    """
604
    bad = False
605

    
606
    node_current = instanceconfig.primary_node
607

    
608
    node_vol_should = {}
609
    instanceconfig.MapLVsByNode(node_vol_should)
610

    
611
    for node in node_vol_should:
612
      for volume in node_vol_should[node]:
613
        if node not in node_vol_is or volume not in node_vol_is[node]:
614
          feedback_fn("  - ERROR: volume %s missing on node %s" %
615
                          (volume, node))
616
          bad = True
617

    
618
    if not instanceconfig.status == 'down':
619
      if (node_current not in node_instance or
620
          not instance in node_instance[node_current]):
621
        feedback_fn("  - ERROR: instance %s not running on node %s" %
622
                        (instance, node_current))
623
        bad = True
624

    
625
    for node in node_instance:
626
      if (not node == node_current):
627
        if instance in node_instance[node]:
628
          feedback_fn("  - ERROR: instance %s should not run on node %s" %
629
                          (instance, node))
630
          bad = True
631

    
632
    return bad
633

    
634
  def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
635
    """Verify if there are any unknown volumes in the cluster.
636

637
    The .os, .swap and backup volumes are ignored. All other volumes are
638
    reported as unknown.
639

640
    """
641
    bad = False
642

    
643
    for node in node_vol_is:
644
      for volume in node_vol_is[node]:
645
        if node not in node_vol_should or volume not in node_vol_should[node]:
646
          feedback_fn("  - ERROR: volume %s on node %s should not exist" %
647
                      (volume, node))
648
          bad = True
649
    return bad
650

    
651
  def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
652
    """Verify the list of running instances.
653

654
    This checks what instances are running but unknown to the cluster.
655

656
    """
657
    bad = False
658
    for node in node_instance:
659
      for runninginstance in node_instance[node]:
660
        if runninginstance not in instancelist:
661
          feedback_fn("  - ERROR: instance %s on node %s should not exist" %
662
                          (runninginstance, node))
663
          bad = True
664
    return bad
665

    
666
  def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
667
    """Verify N+1 Memory Resilience.
668

669
    Check that if one single node dies we can still start all the instances it
670
    was primary for.
671

672
    """
673
    bad = False
674

    
675
    for node, nodeinfo in node_info.iteritems():
676
      # This code checks that every node which is now listed as secondary has
677
      # enough memory to host all instances it is supposed to should a single
678
      # other node in the cluster fail.
679
      # FIXME: not ready for failover to an arbitrary node
680
      # FIXME: does not support file-backed instances
681
      # WARNING: we currently take into account down instances as well as up
682
      # ones, considering that even if they're down someone might want to start
683
      # them even in the event of a node failure.
684
      for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
685
        needed_mem = 0
686
        for instance in instances:
687
          needed_mem += instance_cfg[instance].memory
688
        if nodeinfo['mfree'] < needed_mem:
689
          feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
690
                      " failovers should node %s fail" % (node, prinode))
691
          bad = True
692
    return bad
693

    
694
  def CheckPrereq(self):
695
    """Check prerequisites.
696

697
    Transform the list of checks we're going to skip into a set and check that
698
    all its members are valid.
699

700
    """
701
    self.skip_set = frozenset(self.op.skip_checks)
702
    if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
703
      raise errors.OpPrereqError("Invalid checks to be skipped specified")
704

    
705
  def BuildHooksEnv(self):
706
    """Build hooks env.
707

708
    Cluster-Verify hooks just rone in the post phase and their failure makes
709
    the output be logged in the verify output and the verification to fail.
710

711
    """
712
    all_nodes = self.cfg.GetNodeList()
713
    # TODO: populate the environment with useful information for verify hooks
714
    env = {}
715
    return env, [], all_nodes
716

    
717
  def Exec(self, feedback_fn):
718
    """Verify integrity of cluster, performing various test on nodes.
719

720
    """
721
    bad = False
722
    feedback_fn("* Verifying global settings")
723
    for msg in self.cfg.VerifyConfig():
724
      feedback_fn("  - ERROR: %s" % msg)
725

    
726
    vg_name = self.cfg.GetVGName()
727
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
728
    nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
729
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
730
    i_non_redundant = [] # Non redundant instances
731
    node_volume = {}
732
    node_instance = {}
733
    node_info = {}
734
    instance_cfg = {}
735

    
736
    # FIXME: verify OS list
737
    # do local checksums
738
    file_names = list(self.sstore.GetFileList())
739
    file_names.append(constants.SSL_CERT_FILE)
740
    file_names.append(constants.CLUSTER_CONF_FILE)
741
    local_checksums = utils.FingerprintFiles(file_names)
742

    
743
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
744
    all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
745
    all_instanceinfo = rpc.call_instance_list(nodelist)
746
    all_vglist = rpc.call_vg_list(nodelist)
747
    node_verify_param = {
748
      'filelist': file_names,
749
      'nodelist': nodelist,
750
      'hypervisor': None,
751
      'node-net-test': [(node.name, node.primary_ip, node.secondary_ip)
752
                        for node in nodeinfo]
753
      }
754
    all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
755
    all_rversion = rpc.call_version(nodelist)
756
    all_ninfo = rpc.call_node_info(nodelist, self.cfg.GetVGName())
757

    
758
    for node in nodelist:
759
      feedback_fn("* Verifying node %s" % node)
760
      result = self._VerifyNode(node, file_names, local_checksums,
761
                                all_vglist[node], all_nvinfo[node],
762
                                all_rversion[node], feedback_fn)
763
      bad = bad or result
764

    
765
      # node_volume
766
      volumeinfo = all_volumeinfo[node]
767

    
768
      if isinstance(volumeinfo, basestring):
769
        feedback_fn("  - ERROR: LVM problem on node %s: %s" %
770
                    (node, volumeinfo[-400:].encode('string_escape')))
771
        bad = True
772
        node_volume[node] = {}
773
      elif not isinstance(volumeinfo, dict):
774
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
775
        bad = True
776
        continue
777
      else:
778
        node_volume[node] = volumeinfo
779

    
780
      # node_instance
781
      nodeinstance = all_instanceinfo[node]
782
      if type(nodeinstance) != list:
783
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
784
        bad = True
785
        continue
786

    
787
      node_instance[node] = nodeinstance
788

    
789
      # node_info
790
      nodeinfo = all_ninfo[node]
791
      if not isinstance(nodeinfo, dict):
792
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
793
        bad = True
794
        continue
795

    
796
      try:
797
        node_info[node] = {
798
          "mfree": int(nodeinfo['memory_free']),
799
          "dfree": int(nodeinfo['vg_free']),
800
          "pinst": [],
801
          "sinst": [],
802
          # dictionary holding all instances this node is secondary for,
803
          # grouped by their primary node. Each key is a cluster node, and each
804
          # value is a list of instances which have the key as primary and the
805
          # current node as secondary.  this is handy to calculate N+1 memory
806
          # availability if you can only failover from a primary to its
807
          # secondary.
808
          "sinst-by-pnode": {},
809
        }
810
      except ValueError:
811
        feedback_fn("  - ERROR: invalid value returned from node %s" % (node,))
812
        bad = True
813
        continue
814

    
815
    node_vol_should = {}
816

    
817
    for instance in instancelist:
818
      feedback_fn("* Verifying instance %s" % instance)
819
      inst_config = self.cfg.GetInstanceInfo(instance)
820
      result =  self._VerifyInstance(instance, inst_config, node_volume,
821
                                     node_instance, feedback_fn)
822
      bad = bad or result
823

    
824
      inst_config.MapLVsByNode(node_vol_should)
825

    
826
      instance_cfg[instance] = inst_config
827

    
828
      pnode = inst_config.primary_node
829
      if pnode in node_info:
830
        node_info[pnode]['pinst'].append(instance)
831
      else:
832
        feedback_fn("  - ERROR: instance %s, connection to primary node"
833
                    " %s failed" % (instance, pnode))
834
        bad = True
835

    
836
      # If the instance is non-redundant we cannot survive losing its primary
837
      # node, so we are not N+1 compliant. On the other hand we have no disk
838
      # templates with more than one secondary so that situation is not well
839
      # supported either.
840
      # FIXME: does not support file-backed instances
841
      if len(inst_config.secondary_nodes) == 0:
842
        i_non_redundant.append(instance)
843
      elif len(inst_config.secondary_nodes) > 1:
844
        feedback_fn("  - WARNING: multiple secondaries for instance %s"
845
                    % instance)
846

    
847
      for snode in inst_config.secondary_nodes:
848
        if snode in node_info:
849
          node_info[snode]['sinst'].append(instance)
850
          if pnode not in node_info[snode]['sinst-by-pnode']:
851
            node_info[snode]['sinst-by-pnode'][pnode] = []
852
          node_info[snode]['sinst-by-pnode'][pnode].append(instance)
853
        else:
854
          feedback_fn("  - ERROR: instance %s, connection to secondary node"
855
                      " %s failed" % (instance, snode))
856

    
857
    feedback_fn("* Verifying orphan volumes")
858
    result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
859
                                       feedback_fn)
860
    bad = bad or result
861

    
862
    feedback_fn("* Verifying remaining instances")
863
    result = self._VerifyOrphanInstances(instancelist, node_instance,
864
                                         feedback_fn)
865
    bad = bad or result
866

    
867
    if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
868
      feedback_fn("* Verifying N+1 Memory redundancy")
869
      result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
870
      bad = bad or result
871

    
872
    feedback_fn("* Other Notes")
873
    if i_non_redundant:
874
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
875
                  % len(i_non_redundant))
876

    
877
    return not bad
878

    
879
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
880
    """Analize the post-hooks' result, handle it, and send some
881
    nicely-formatted feedback back to the user.
882

883
    Args:
884
      phase: the hooks phase that has just been run
885
      hooks_results: the results of the multi-node hooks rpc call
886
      feedback_fn: function to send feedback back to the caller
887
      lu_result: previous Exec result
888

889
    """
890
    # We only really run POST phase hooks, and are only interested in
891
    # their results
892
    if phase == constants.HOOKS_PHASE_POST:
893
      # Used to change hooks' output to proper indentation
894
      indent_re = re.compile('^', re.M)
895
      feedback_fn("* Hooks Results")
896
      if not hooks_results:
897
        feedback_fn("  - ERROR: general communication failure")
898
        lu_result = 1
899
      else:
900
        for node_name in hooks_results:
901
          show_node_header = True
902
          res = hooks_results[node_name]
903
          if res is False or not isinstance(res, list):
904
            feedback_fn("    Communication failure")
905
            lu_result = 1
906
            continue
907
          for script, hkr, output in res:
908
            if hkr == constants.HKR_FAIL:
909
              # The node header is only shown once, if there are
910
              # failing hooks on that node
911
              if show_node_header:
912
                feedback_fn("  Node %s:" % node_name)
913
                show_node_header = False
914
              feedback_fn("    ERROR: Script %s failed, output:" % script)
915
              output = indent_re.sub('      ', output)
916
              feedback_fn("%s" % output)
917
              lu_result = 1
918

    
919
      return lu_result
920

    
921

    
922
class LUVerifyDisks(NoHooksLU):
923
  """Verifies the cluster disks status.
924

925
  """
926
  _OP_REQP = []
927
  REQ_BGL = False
928

    
929
  def ExpandNames(self):
930
    self.needed_locks = {
931
      locking.LEVEL_NODE: locking.ALL_SET,
932
      locking.LEVEL_INSTANCE: locking.ALL_SET,
933
    }
934
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
935

    
936
  def CheckPrereq(self):
937
    """Check prerequisites.
938

939
    This has no prerequisites.
940

941
    """
942
    pass
943

    
944
  def Exec(self, feedback_fn):
945
    """Verify integrity of cluster disks.
946

947
    """
948
    result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
949

    
950
    vg_name = self.cfg.GetVGName()
951
    nodes = utils.NiceSort(self.cfg.GetNodeList())
952
    instances = [self.cfg.GetInstanceInfo(name)
953
                 for name in self.cfg.GetInstanceList()]
954

    
955
    nv_dict = {}
956
    for inst in instances:
957
      inst_lvs = {}
958
      if (inst.status != "up" or
959
          inst.disk_template not in constants.DTS_NET_MIRROR):
960
        continue
961
      inst.MapLVsByNode(inst_lvs)
962
      # transform { iname: {node: [vol,],},} to {(node, vol): iname}
963
      for node, vol_list in inst_lvs.iteritems():
964
        for vol in vol_list:
965
          nv_dict[(node, vol)] = inst
966

    
967
    if not nv_dict:
968
      return result
969

    
970
    node_lvs = rpc.call_volume_list(nodes, vg_name)
971

    
972
    to_act = set()
973
    for node in nodes:
974
      # node_volume
975
      lvs = node_lvs[node]
976

    
977
      if isinstance(lvs, basestring):
978
        logger.Info("error enumerating LVs on node %s: %s" % (node, lvs))
979
        res_nlvm[node] = lvs
980
      elif not isinstance(lvs, dict):
981
        logger.Info("connection to node %s failed or invalid data returned" %
982
                    (node,))
983
        res_nodes.append(node)
984
        continue
985

    
986
      for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
987
        inst = nv_dict.pop((node, lv_name), None)
988
        if (not lv_online and inst is not None
989
            and inst.name not in res_instances):
990
          res_instances.append(inst.name)
991

    
992
    # any leftover items in nv_dict are missing LVs, let's arrange the
993
    # data better
994
    for key, inst in nv_dict.iteritems():
995
      if inst.name not in res_missing:
996
        res_missing[inst.name] = []
997
      res_missing[inst.name].append(key)
998

    
999
    return result
1000

    
1001

    
1002
class LURenameCluster(LogicalUnit):
1003
  """Rename the cluster.
1004

1005
  """
1006
  HPATH = "cluster-rename"
1007
  HTYPE = constants.HTYPE_CLUSTER
1008
  _OP_REQP = ["name"]
1009
  REQ_WSSTORE = True
1010

    
1011
  def BuildHooksEnv(self):
1012
    """Build hooks env.
1013

1014
    """
1015
    env = {
1016
      "OP_TARGET": self.sstore.GetClusterName(),
1017
      "NEW_NAME": self.op.name,
1018
      }
1019
    mn = self.sstore.GetMasterNode()
1020
    return env, [mn], [mn]
1021

    
1022
  def CheckPrereq(self):
1023
    """Verify that the passed name is a valid one.
1024

1025
    """
1026
    hostname = utils.HostInfo(self.op.name)
1027

    
1028
    new_name = hostname.name
1029
    self.ip = new_ip = hostname.ip
1030
    old_name = self.sstore.GetClusterName()
1031
    old_ip = self.sstore.GetMasterIP()
1032
    if new_name == old_name and new_ip == old_ip:
1033
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
1034
                                 " cluster has changed")
1035
    if new_ip != old_ip:
1036
      if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1037
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
1038
                                   " reachable on the network. Aborting." %
1039
                                   new_ip)
1040

    
1041
    self.op.name = new_name
1042

    
1043
  def Exec(self, feedback_fn):
1044
    """Rename the cluster.
1045

1046
    """
1047
    clustername = self.op.name
1048
    ip = self.ip
1049
    ss = self.sstore
1050

    
1051
    # shutdown the master IP
1052
    master = ss.GetMasterNode()
1053
    if not rpc.call_node_stop_master(master, False):
1054
      raise errors.OpExecError("Could not disable the master role")
1055

    
1056
    try:
1057
      # modify the sstore
1058
      ss.SetKey(ss.SS_MASTER_IP, ip)
1059
      ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
1060

    
1061
      # Distribute updated ss config to all nodes
1062
      myself = self.cfg.GetNodeInfo(master)
1063
      dist_nodes = self.cfg.GetNodeList()
1064
      if myself.name in dist_nodes:
1065
        dist_nodes.remove(myself.name)
1066

    
1067
      logger.Debug("Copying updated ssconf data to all nodes")
1068
      for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1069
        fname = ss.KeyToFilename(keyname)
1070
        result = rpc.call_upload_file(dist_nodes, fname)
1071
        for to_node in dist_nodes:
1072
          if not result[to_node]:
1073
            logger.Error("copy of file %s to node %s failed" %
1074
                         (fname, to_node))
1075
    finally:
1076
      if not rpc.call_node_start_master(master, False):
1077
        logger.Error("Could not re-enable the master role on the master,"
1078
                     " please restart manually.")
1079

    
1080

    
1081
def _RecursiveCheckIfLVMBased(disk):
1082
  """Check if the given disk or its children are lvm-based.
1083

1084
  Args:
1085
    disk: ganeti.objects.Disk object
1086

1087
  Returns:
1088
    boolean indicating whether a LD_LV dev_type was found or not
1089

1090
  """
1091
  if disk.children:
1092
    for chdisk in disk.children:
1093
      if _RecursiveCheckIfLVMBased(chdisk):
1094
        return True
1095
  return disk.dev_type == constants.LD_LV
1096

    
1097

    
1098
class LUSetClusterParams(LogicalUnit):
1099
  """Change the parameters of the cluster.
1100

1101
  """
1102
  HPATH = "cluster-modify"
1103
  HTYPE = constants.HTYPE_CLUSTER
1104
  _OP_REQP = []
1105
  REQ_BGL = False
1106

    
1107
  def ExpandNames(self):
1108
    # FIXME: in the future maybe other cluster params won't require checking on
1109
    # all nodes to be modified.
1110
    self.needed_locks = {
1111
      locking.LEVEL_NODE: locking.ALL_SET,
1112
    }
1113
    self.share_locks[locking.LEVEL_NODE] = 1
1114

    
1115
  def BuildHooksEnv(self):
1116
    """Build hooks env.
1117

1118
    """
1119
    env = {
1120
      "OP_TARGET": self.sstore.GetClusterName(),
1121
      "NEW_VG_NAME": self.op.vg_name,
1122
      }
1123
    mn = self.sstore.GetMasterNode()
1124
    return env, [mn], [mn]
1125

    
1126
  def CheckPrereq(self):
1127
    """Check prerequisites.
1128

1129
    This checks whether the given params don't conflict and
1130
    if the given volume group is valid.
1131

1132
    """
1133
    # FIXME: This only works because there is only one parameter that can be
1134
    # changed or removed.
1135
    if not self.op.vg_name:
1136
      instances = self.cfg.GetAllInstancesInfo().values()
1137
      for inst in instances:
1138
        for disk in inst.disks:
1139
          if _RecursiveCheckIfLVMBased(disk):
1140
            raise errors.OpPrereqError("Cannot disable lvm storage while"
1141
                                       " lvm-based instances exist")
1142

    
1143
    # if vg_name not None, checks given volume group on all nodes
1144
    if self.op.vg_name:
1145
      node_list = self.acquired_locks[locking.LEVEL_NODE]
1146
      vglist = rpc.call_vg_list(node_list)
1147
      for node in node_list:
1148
        vgstatus = utils.CheckVolumeGroupSize(vglist[node], self.op.vg_name,
1149
                                              constants.MIN_VG_SIZE)
1150
        if vgstatus:
1151
          raise errors.OpPrereqError("Error on node '%s': %s" %
1152
                                     (node, vgstatus))
1153

    
1154
  def Exec(self, feedback_fn):
1155
    """Change the parameters of the cluster.
1156

1157
    """
1158
    if self.op.vg_name != self.cfg.GetVGName():
1159
      self.cfg.SetVGName(self.op.vg_name)
1160
    else:
1161
      feedback_fn("Cluster LVM configuration already in desired"
1162
                  " state, not changing")
1163

    
1164

    
1165
def _WaitForSync(cfgw, instance, proc, oneshot=False, unlock=False):
1166
  """Sleep and poll for an instance's disk to sync.
1167

1168
  """
1169
  if not instance.disks:
1170
    return True
1171

    
1172
  if not oneshot:
1173
    proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1174

    
1175
  node = instance.primary_node
1176

    
1177
  for dev in instance.disks:
1178
    cfgw.SetDiskID(dev, node)
1179

    
1180
  retries = 0
1181
  while True:
1182
    max_time = 0
1183
    done = True
1184
    cumul_degraded = False
1185
    rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1186
    if not rstats:
1187
      proc.LogWarning("Can't get any data from node %s" % node)
1188
      retries += 1
1189
      if retries >= 10:
1190
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1191
                                 " aborting." % node)
1192
      time.sleep(6)
1193
      continue
1194
    retries = 0
1195
    for i in range(len(rstats)):
1196
      mstat = rstats[i]
1197
      if mstat is None:
1198
        proc.LogWarning("Can't compute data for node %s/%s" %
1199
                        (node, instance.disks[i].iv_name))
1200
        continue
1201
      # we ignore the ldisk parameter
1202
      perc_done, est_time, is_degraded, _ = mstat
1203
      cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1204
      if perc_done is not None:
1205
        done = False
1206
        if est_time is not None:
1207
          rem_time = "%d estimated seconds remaining" % est_time
1208
          max_time = est_time
1209
        else:
1210
          rem_time = "no time estimate"
1211
        proc.LogInfo("- device %s: %5.2f%% done, %s" %
1212
                     (instance.disks[i].iv_name, perc_done, rem_time))
1213
    if done or oneshot:
1214
      break
1215

    
1216
    time.sleep(min(60, max_time))
1217

    
1218
  if done:
1219
    proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1220
  return not cumul_degraded
1221

    
1222

    
1223
def _CheckDiskConsistency(cfgw, dev, node, on_primary, ldisk=False):
1224
  """Check that mirrors are not degraded.
1225

1226
  The ldisk parameter, if True, will change the test from the
1227
  is_degraded attribute (which represents overall non-ok status for
1228
  the device(s)) to the ldisk (representing the local storage status).
1229

1230
  """
1231
  cfgw.SetDiskID(dev, node)
1232
  if ldisk:
1233
    idx = 6
1234
  else:
1235
    idx = 5
1236

    
1237
  result = True
1238
  if on_primary or dev.AssembleOnSecondary():
1239
    rstats = rpc.call_blockdev_find(node, dev)
1240
    if not rstats:
1241
      logger.ToStderr("Node %s: Disk degraded, not found or node down" % node)
1242
      result = False
1243
    else:
1244
      result = result and (not rstats[idx])
1245
  if dev.children:
1246
    for child in dev.children:
1247
      result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1248

    
1249
  return result
1250

    
1251

    
1252
class LUDiagnoseOS(NoHooksLU):
1253
  """Logical unit for OS diagnose/query.
1254

1255
  """
1256
  _OP_REQP = ["output_fields", "names"]
1257
  REQ_BGL = False
1258

    
1259
  def ExpandNames(self):
1260
    if self.op.names:
1261
      raise errors.OpPrereqError("Selective OS query not supported")
1262

    
1263
    self.dynamic_fields = frozenset(["name", "valid", "node_status"])
1264
    _CheckOutputFields(static=[],
1265
                       dynamic=self.dynamic_fields,
1266
                       selected=self.op.output_fields)
1267

    
1268
    # Lock all nodes, in shared mode
1269
    self.needed_locks = {}
1270
    self.share_locks[locking.LEVEL_NODE] = 1
1271
    self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1272

    
1273
  def CheckPrereq(self):
1274
    """Check prerequisites.
1275

1276
    """
1277

    
1278
  @staticmethod
1279
  def _DiagnoseByOS(node_list, rlist):
1280
    """Remaps a per-node return list into an a per-os per-node dictionary
1281

1282
      Args:
1283
        node_list: a list with the names of all nodes
1284
        rlist: a map with node names as keys and OS objects as values
1285

1286
      Returns:
1287
        map: a map with osnames as keys and as value another map, with
1288
             nodes as
1289
             keys and list of OS objects as values
1290
             e.g. {"debian-etch": {"node1": [<object>,...],
1291
                                   "node2": [<object>,]}
1292
                  }
1293

1294
    """
1295
    all_os = {}
1296
    for node_name, nr in rlist.iteritems():
1297
      if not nr:
1298
        continue
1299
      for os_obj in nr:
1300
        if os_obj.name not in all_os:
1301
          # build a list of nodes for this os containing empty lists
1302
          # for each node in node_list
1303
          all_os[os_obj.name] = {}
1304
          for nname in node_list:
1305
            all_os[os_obj.name][nname] = []
1306
        all_os[os_obj.name][node_name].append(os_obj)
1307
    return all_os
1308

    
1309
  def Exec(self, feedback_fn):
1310
    """Compute the list of OSes.
1311

1312
    """
1313
    node_list = self.acquired_locks[locking.LEVEL_NODE]
1314
    node_data = rpc.call_os_diagnose(node_list)
1315
    if node_data == False:
1316
      raise errors.OpExecError("Can't gather the list of OSes")
1317
    pol = self._DiagnoseByOS(node_list, node_data)
1318
    output = []
1319
    for os_name, os_data in pol.iteritems():
1320
      row = []
1321
      for field in self.op.output_fields:
1322
        if field == "name":
1323
          val = os_name
1324
        elif field == "valid":
1325
          val = utils.all([osl and osl[0] for osl in os_data.values()])
1326
        elif field == "node_status":
1327
          val = {}
1328
          for node_name, nos_list in os_data.iteritems():
1329
            val[node_name] = [(v.status, v.path) for v in nos_list]
1330
        else:
1331
          raise errors.ParameterError(field)
1332
        row.append(val)
1333
      output.append(row)
1334

    
1335
    return output
1336

    
1337

    
1338
class LURemoveNode(LogicalUnit):
1339
  """Logical unit for removing a node.
1340

1341
  """
1342
  HPATH = "node-remove"
1343
  HTYPE = constants.HTYPE_NODE
1344
  _OP_REQP = ["node_name"]
1345

    
1346
  def BuildHooksEnv(self):
1347
    """Build hooks env.
1348

1349
    This doesn't run on the target node in the pre phase as a failed
1350
    node would then be impossible to remove.
1351

1352
    """
1353
    env = {
1354
      "OP_TARGET": self.op.node_name,
1355
      "NODE_NAME": self.op.node_name,
1356
      }
1357
    all_nodes = self.cfg.GetNodeList()
1358
    all_nodes.remove(self.op.node_name)
1359
    return env, all_nodes, all_nodes
1360

    
1361
  def CheckPrereq(self):
1362
    """Check prerequisites.
1363

1364
    This checks:
1365
     - the node exists in the configuration
1366
     - it does not have primary or secondary instances
1367
     - it's not the master
1368

1369
    Any errors are signalled by raising errors.OpPrereqError.
1370

1371
    """
1372
    node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1373
    if node is None:
1374
      raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1375

    
1376
    instance_list = self.cfg.GetInstanceList()
1377

    
1378
    masternode = self.sstore.GetMasterNode()
1379
    if node.name == masternode:
1380
      raise errors.OpPrereqError("Node is the master node,"
1381
                                 " you need to failover first.")
1382

    
1383
    for instance_name in instance_list:
1384
      instance = self.cfg.GetInstanceInfo(instance_name)
1385
      if node.name == instance.primary_node:
1386
        raise errors.OpPrereqError("Instance %s still running on the node,"
1387
                                   " please remove first." % instance_name)
1388
      if node.name in instance.secondary_nodes:
1389
        raise errors.OpPrereqError("Instance %s has node as a secondary,"
1390
                                   " please remove first." % instance_name)
1391
    self.op.node_name = node.name
1392
    self.node = node
1393

    
1394
  def Exec(self, feedback_fn):
1395
    """Removes the node from the cluster.
1396

1397
    """
1398
    node = self.node
1399
    logger.Info("stopping the node daemon and removing configs from node %s" %
1400
                node.name)
1401

    
1402
    self.context.RemoveNode(node.name)
1403

    
1404
    rpc.call_node_leave_cluster(node.name)
1405

    
1406

    
1407
class LUQueryNodes(NoHooksLU):
1408
  """Logical unit for querying nodes.
1409

1410
  """
1411
  _OP_REQP = ["output_fields", "names"]
1412
  REQ_BGL = False
1413

    
1414
  def ExpandNames(self):
1415
    self.dynamic_fields = frozenset([
1416
      "dtotal", "dfree",
1417
      "mtotal", "mnode", "mfree",
1418
      "bootid",
1419
      "ctotal",
1420
      ])
1421

    
1422
    self.static_fields = frozenset([
1423
      "name", "pinst_cnt", "sinst_cnt",
1424
      "pinst_list", "sinst_list",
1425
      "pip", "sip", "tags",
1426
      ])
1427

    
1428
    _CheckOutputFields(static=self.static_fields,
1429
                       dynamic=self.dynamic_fields,
1430
                       selected=self.op.output_fields)
1431

    
1432
    self.needed_locks = {}
1433
    self.share_locks[locking.LEVEL_NODE] = 1
1434

    
1435
    if self.op.names:
1436
      self.wanted = _GetWantedNodes(self, self.op.names)
1437
    else:
1438
      self.wanted = locking.ALL_SET
1439

    
1440
    self.do_locking = not self.static_fields.issuperset(self.op.output_fields)
1441
    if self.do_locking:
1442
      # if we don't request only static fields, we need to lock the nodes
1443
      self.needed_locks[locking.LEVEL_NODE] = self.wanted
1444

    
1445

    
1446
  def CheckPrereq(self):
1447
    """Check prerequisites.
1448

1449
    """
1450
    # The validation of the node list is done in the _GetWantedNodes,
1451
    # if non empty, and if empty, there's no validation to do
1452
    pass
1453

    
1454
  def Exec(self, feedback_fn):
1455
    """Computes the list of nodes and their attributes.
1456

1457
    """
1458
    all_info = self.cfg.GetAllNodesInfo()
1459
    if self.do_locking:
1460
      nodenames = self.acquired_locks[locking.LEVEL_NODE]
1461
    elif self.wanted != locking.ALL_SET:
1462
      nodenames = self.wanted
1463
      missing = set(nodenames).difference(all_info.keys())
1464
      if missing:
1465
        raise self.OpExecError(
1466
          "Some nodes were removed before retrieving their data: %s" % missing)
1467
    else:
1468
      nodenames = all_info.keys()
1469
    nodelist = [all_info[name] for name in nodenames]
1470

    
1471
    # begin data gathering
1472

    
1473
    if self.dynamic_fields.intersection(self.op.output_fields):
1474
      live_data = {}
1475
      node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1476
      for name in nodenames:
1477
        nodeinfo = node_data.get(name, None)
1478
        if nodeinfo:
1479
          live_data[name] = {
1480
            "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1481
            "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1482
            "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1483
            "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1484
            "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1485
            "ctotal": utils.TryConvert(int, nodeinfo['cpu_total']),
1486
            "bootid": nodeinfo['bootid'],
1487
            }
1488
        else:
1489
          live_data[name] = {}
1490
    else:
1491
      live_data = dict.fromkeys(nodenames, {})
1492

    
1493
    node_to_primary = dict([(name, set()) for name in nodenames])
1494
    node_to_secondary = dict([(name, set()) for name in nodenames])
1495

    
1496
    inst_fields = frozenset(("pinst_cnt", "pinst_list",
1497
                             "sinst_cnt", "sinst_list"))
1498
    if inst_fields & frozenset(self.op.output_fields):
1499
      instancelist = self.cfg.GetInstanceList()
1500

    
1501
      for instance_name in instancelist:
1502
        inst = self.cfg.GetInstanceInfo(instance_name)
1503
        if inst.primary_node in node_to_primary:
1504
          node_to_primary[inst.primary_node].add(inst.name)
1505
        for secnode in inst.secondary_nodes:
1506
          if secnode in node_to_secondary:
1507
            node_to_secondary[secnode].add(inst.name)
1508

    
1509
    # end data gathering
1510

    
1511
    output = []
1512
    for node in nodelist:
1513
      node_output = []
1514
      for field in self.op.output_fields:
1515
        if field == "name":
1516
          val = node.name
1517
        elif field == "pinst_list":
1518
          val = list(node_to_primary[node.name])
1519
        elif field == "sinst_list":
1520
          val = list(node_to_secondary[node.name])
1521
        elif field == "pinst_cnt":
1522
          val = len(node_to_primary[node.name])
1523
        elif field == "sinst_cnt":
1524
          val = len(node_to_secondary[node.name])
1525
        elif field == "pip":
1526
          val = node.primary_ip
1527
        elif field == "sip":
1528
          val = node.secondary_ip
1529
        elif field == "tags":
1530
          val = list(node.GetTags())
1531
        elif field in self.dynamic_fields:
1532
          val = live_data[node.name].get(field, None)
1533
        else:
1534
          raise errors.ParameterError(field)
1535
        node_output.append(val)
1536
      output.append(node_output)
1537

    
1538
    return output
1539

    
1540

    
1541
class LUQueryNodeVolumes(NoHooksLU):
1542
  """Logical unit for getting volumes on node(s).
1543

1544
  """
1545
  _OP_REQP = ["nodes", "output_fields"]
1546
  REQ_BGL = False
1547

    
1548
  def ExpandNames(self):
1549
    _CheckOutputFields(static=["node"],
1550
                       dynamic=["phys", "vg", "name", "size", "instance"],
1551
                       selected=self.op.output_fields)
1552

    
1553
    self.needed_locks = {}
1554
    self.share_locks[locking.LEVEL_NODE] = 1
1555
    if not self.op.nodes:
1556
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1557
    else:
1558
      self.needed_locks[locking.LEVEL_NODE] = \
1559
        _GetWantedNodes(self, self.op.nodes)
1560

    
1561
  def CheckPrereq(self):
1562
    """Check prerequisites.
1563

1564
    This checks that the fields required are valid output fields.
1565

1566
    """
1567
    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
1568

    
1569
  def Exec(self, feedback_fn):
1570
    """Computes the list of nodes and their attributes.
1571

1572
    """
1573
    nodenames = self.nodes
1574
    volumes = rpc.call_node_volumes(nodenames)
1575

    
1576
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
1577
             in self.cfg.GetInstanceList()]
1578

    
1579
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1580

    
1581
    output = []
1582
    for node in nodenames:
1583
      if node not in volumes or not volumes[node]:
1584
        continue
1585

    
1586
      node_vols = volumes[node][:]
1587
      node_vols.sort(key=lambda vol: vol['dev'])
1588

    
1589
      for vol in node_vols:
1590
        node_output = []
1591
        for field in self.op.output_fields:
1592
          if field == "node":
1593
            val = node
1594
          elif field == "phys":
1595
            val = vol['dev']
1596
          elif field == "vg":
1597
            val = vol['vg']
1598
          elif field == "name":
1599
            val = vol['name']
1600
          elif field == "size":
1601
            val = int(float(vol['size']))
1602
          elif field == "instance":
1603
            for inst in ilist:
1604
              if node not in lv_by_node[inst]:
1605
                continue
1606
              if vol['name'] in lv_by_node[inst][node]:
1607
                val = inst.name
1608
                break
1609
            else:
1610
              val = '-'
1611
          else:
1612
            raise errors.ParameterError(field)
1613
          node_output.append(str(val))
1614

    
1615
        output.append(node_output)
1616

    
1617
    return output
1618

    
1619

    
1620
class LUAddNode(LogicalUnit):
1621
  """Logical unit for adding node to the cluster.
1622

1623
  """
1624
  HPATH = "node-add"
1625
  HTYPE = constants.HTYPE_NODE
1626
  _OP_REQP = ["node_name"]
1627

    
1628
  def BuildHooksEnv(self):
1629
    """Build hooks env.
1630

1631
    This will run on all nodes before, and on all nodes + the new node after.
1632

1633
    """
1634
    env = {
1635
      "OP_TARGET": self.op.node_name,
1636
      "NODE_NAME": self.op.node_name,
1637
      "NODE_PIP": self.op.primary_ip,
1638
      "NODE_SIP": self.op.secondary_ip,
1639
      }
1640
    nodes_0 = self.cfg.GetNodeList()
1641
    nodes_1 = nodes_0 + [self.op.node_name, ]
1642
    return env, nodes_0, nodes_1
1643

    
1644
  def CheckPrereq(self):
1645
    """Check prerequisites.
1646

1647
    This checks:
1648
     - the new node is not already in the config
1649
     - it is resolvable
1650
     - its parameters (single/dual homed) matches the cluster
1651

1652
    Any errors are signalled by raising errors.OpPrereqError.
1653

1654
    """
1655
    node_name = self.op.node_name
1656
    cfg = self.cfg
1657

    
1658
    dns_data = utils.HostInfo(node_name)
1659

    
1660
    node = dns_data.name
1661
    primary_ip = self.op.primary_ip = dns_data.ip
1662
    secondary_ip = getattr(self.op, "secondary_ip", None)
1663
    if secondary_ip is None:
1664
      secondary_ip = primary_ip
1665
    if not utils.IsValidIP(secondary_ip):
1666
      raise errors.OpPrereqError("Invalid secondary IP given")
1667
    self.op.secondary_ip = secondary_ip
1668

    
1669
    node_list = cfg.GetNodeList()
1670
    if not self.op.readd and node in node_list:
1671
      raise errors.OpPrereqError("Node %s is already in the configuration" %
1672
                                 node)
1673
    elif self.op.readd and node not in node_list:
1674
      raise errors.OpPrereqError("Node %s is not in the configuration" % node)
1675

    
1676
    for existing_node_name in node_list:
1677
      existing_node = cfg.GetNodeInfo(existing_node_name)
1678

    
1679
      if self.op.readd and node == existing_node_name:
1680
        if (existing_node.primary_ip != primary_ip or
1681
            existing_node.secondary_ip != secondary_ip):
1682
          raise errors.OpPrereqError("Readded node doesn't have the same IP"
1683
                                     " address configuration as before")
1684
        continue
1685

    
1686
      if (existing_node.primary_ip == primary_ip or
1687
          existing_node.secondary_ip == primary_ip or
1688
          existing_node.primary_ip == secondary_ip or
1689
          existing_node.secondary_ip == secondary_ip):
1690
        raise errors.OpPrereqError("New node ip address(es) conflict with"
1691
                                   " existing node %s" % existing_node.name)
1692

    
1693
    # check that the type of the node (single versus dual homed) is the
1694
    # same as for the master
1695
    myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1696
    master_singlehomed = myself.secondary_ip == myself.primary_ip
1697
    newbie_singlehomed = secondary_ip == primary_ip
1698
    if master_singlehomed != newbie_singlehomed:
1699
      if master_singlehomed:
1700
        raise errors.OpPrereqError("The master has no private ip but the"
1701
                                   " new node has one")
1702
      else:
1703
        raise errors.OpPrereqError("The master has a private ip but the"
1704
                                   " new node doesn't have one")
1705

    
1706
    # checks reachablity
1707
    if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1708
      raise errors.OpPrereqError("Node not reachable by ping")
1709

    
1710
    if not newbie_singlehomed:
1711
      # check reachability from my secondary ip to newbie's secondary ip
1712
      if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1713
                           source=myself.secondary_ip):
1714
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1715
                                   " based ping to noded port")
1716

    
1717
    self.new_node = objects.Node(name=node,
1718
                                 primary_ip=primary_ip,
1719
                                 secondary_ip=secondary_ip)
1720

    
1721
  def Exec(self, feedback_fn):
1722
    """Adds the new node to the cluster.
1723

1724
    """
1725
    new_node = self.new_node
1726
    node = new_node.name
1727

    
1728
    # check connectivity
1729
    result = rpc.call_version([node])[node]
1730
    if result:
1731
      if constants.PROTOCOL_VERSION == result:
1732
        logger.Info("communication to node %s fine, sw version %s match" %
1733
                    (node, result))
1734
      else:
1735
        raise errors.OpExecError("Version mismatch master version %s,"
1736
                                 " node version %s" %
1737
                                 (constants.PROTOCOL_VERSION, result))
1738
    else:
1739
      raise errors.OpExecError("Cannot get version from the new node")
1740

    
1741
    # setup ssh on node
1742
    logger.Info("copy ssh key to node %s" % node)
1743
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1744
    keyarray = []
1745
    keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1746
                constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1747
                priv_key, pub_key]
1748

    
1749
    for i in keyfiles:
1750
      f = open(i, 'r')
1751
      try:
1752
        keyarray.append(f.read())
1753
      finally:
1754
        f.close()
1755

    
1756
    result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1757
                               keyarray[3], keyarray[4], keyarray[5])
1758

    
1759
    if not result:
1760
      raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1761

    
1762
    # Add node to our /etc/hosts, and add key to known_hosts
1763
    utils.AddHostToEtcHosts(new_node.name)
1764

    
1765
    if new_node.secondary_ip != new_node.primary_ip:
1766
      if not rpc.call_node_tcp_ping(new_node.name,
1767
                                    constants.LOCALHOST_IP_ADDRESS,
1768
                                    new_node.secondary_ip,
1769
                                    constants.DEFAULT_NODED_PORT,
1770
                                    10, False):
1771
        raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1772
                                 " you gave (%s). Please fix and re-run this"
1773
                                 " command." % new_node.secondary_ip)
1774

    
1775
    node_verify_list = [self.sstore.GetMasterNode()]
1776
    node_verify_param = {
1777
      'nodelist': [node],
1778
      # TODO: do a node-net-test as well?
1779
    }
1780

    
1781
    result = rpc.call_node_verify(node_verify_list, node_verify_param)
1782
    for verifier in node_verify_list:
1783
      if not result[verifier]:
1784
        raise errors.OpExecError("Cannot communicate with %s's node daemon"
1785
                                 " for remote verification" % verifier)
1786
      if result[verifier]['nodelist']:
1787
        for failed in result[verifier]['nodelist']:
1788
          feedback_fn("ssh/hostname verification failed %s -> %s" %
1789
                      (verifier, result[verifier]['nodelist'][failed]))
1790
        raise errors.OpExecError("ssh/hostname verification failed.")
1791

    
1792
    # Distribute updated /etc/hosts and known_hosts to all nodes,
1793
    # including the node just added
1794
    myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1795
    dist_nodes = self.cfg.GetNodeList()
1796
    if not self.op.readd:
1797
      dist_nodes.append(node)
1798
    if myself.name in dist_nodes:
1799
      dist_nodes.remove(myself.name)
1800

    
1801
    logger.Debug("Copying hosts and known_hosts to all nodes")
1802
    for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1803
      result = rpc.call_upload_file(dist_nodes, fname)
1804
      for to_node in dist_nodes:
1805
        if not result[to_node]:
1806
          logger.Error("copy of file %s to node %s failed" %
1807
                       (fname, to_node))
1808

    
1809
    to_copy = self.sstore.GetFileList()
1810
    if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1811
      to_copy.append(constants.VNC_PASSWORD_FILE)
1812
    for fname in to_copy:
1813
      result = rpc.call_upload_file([node], fname)
1814
      if not result[node]:
1815
        logger.Error("could not copy file %s to node %s" % (fname, node))
1816

    
1817
    if self.op.readd:
1818
      self.context.ReaddNode(new_node)
1819
    else:
1820
      self.context.AddNode(new_node)
1821

    
1822

    
1823
class LUQueryClusterInfo(NoHooksLU):
1824
  """Query cluster configuration.
1825

1826
  """
1827
  _OP_REQP = []
1828
  REQ_MASTER = False
1829
  REQ_BGL = False
1830

    
1831
  def ExpandNames(self):
1832
    self.needed_locks = {}
1833

    
1834
  def CheckPrereq(self):
1835
    """No prerequsites needed for this LU.
1836

1837
    """
1838
    pass
1839

    
1840
  def Exec(self, feedback_fn):
1841
    """Return cluster config.
1842

1843
    """
1844
    result = {
1845
      "name": self.sstore.GetClusterName(),
1846
      "software_version": constants.RELEASE_VERSION,
1847
      "protocol_version": constants.PROTOCOL_VERSION,
1848
      "config_version": constants.CONFIG_VERSION,
1849
      "os_api_version": constants.OS_API_VERSION,
1850
      "export_version": constants.EXPORT_VERSION,
1851
      "master": self.sstore.GetMasterNode(),
1852
      "architecture": (platform.architecture()[0], platform.machine()),
1853
      "hypervisor_type": self.sstore.GetHypervisorType(),
1854
      }
1855

    
1856
    return result
1857

    
1858

    
1859
class LUDumpClusterConfig(NoHooksLU):
1860
  """Return a text-representation of the cluster-config.
1861

1862
  """
1863
  _OP_REQP = []
1864
  REQ_BGL = False
1865

    
1866
  def ExpandNames(self):
1867
    self.needed_locks = {}
1868

    
1869
  def CheckPrereq(self):
1870
    """No prerequisites.
1871

1872
    """
1873
    pass
1874

    
1875
  def Exec(self, feedback_fn):
1876
    """Dump a representation of the cluster config to the standard output.
1877

1878
    """
1879
    return self.cfg.DumpConfig()
1880

    
1881

    
1882
class LUActivateInstanceDisks(NoHooksLU):
1883
  """Bring up an instance's disks.
1884

1885
  """
1886
  _OP_REQP = ["instance_name"]
1887
  REQ_BGL = False
1888

    
1889
  def ExpandNames(self):
1890
    self._ExpandAndLockInstance()
1891
    self.needed_locks[locking.LEVEL_NODE] = []
1892
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1893

    
1894
  def DeclareLocks(self, level):
1895
    if level == locking.LEVEL_NODE:
1896
      self._LockInstancesNodes()
1897

    
1898
  def CheckPrereq(self):
1899
    """Check prerequisites.
1900

1901
    This checks that the instance is in the cluster.
1902

1903
    """
1904
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
1905
    assert self.instance is not None, \
1906
      "Cannot retrieve locked instance %s" % self.op.instance_name
1907

    
1908
  def Exec(self, feedback_fn):
1909
    """Activate the disks.
1910

1911
    """
1912
    disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
1913
    if not disks_ok:
1914
      raise errors.OpExecError("Cannot activate block devices")
1915

    
1916
    return disks_info
1917

    
1918

    
1919
def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
1920
  """Prepare the block devices for an instance.
1921

1922
  This sets up the block devices on all nodes.
1923

1924
  Args:
1925
    instance: a ganeti.objects.Instance object
1926
    ignore_secondaries: if true, errors on secondary nodes won't result
1927
                        in an error return from the function
1928

1929
  Returns:
1930
    false if the operation failed
1931
    list of (host, instance_visible_name, node_visible_name) if the operation
1932
         suceeded with the mapping from node devices to instance devices
1933
  """
1934
  device_info = []
1935
  disks_ok = True
1936
  iname = instance.name
1937
  # With the two passes mechanism we try to reduce the window of
1938
  # opportunity for the race condition of switching DRBD to primary
1939
  # before handshaking occured, but we do not eliminate it
1940

    
1941
  # The proper fix would be to wait (with some limits) until the
1942
  # connection has been made and drbd transitions from WFConnection
1943
  # into any other network-connected state (Connected, SyncTarget,
1944
  # SyncSource, etc.)
1945

    
1946
  # 1st pass, assemble on all nodes in secondary mode
1947
  for inst_disk in instance.disks:
1948
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1949
      cfg.SetDiskID(node_disk, node)
1950
      result = rpc.call_blockdev_assemble(node, node_disk, iname, False)
1951
      if not result:
1952
        logger.Error("could not prepare block device %s on node %s"
1953
                     " (is_primary=False, pass=1)" % (inst_disk.iv_name, node))
1954
        if not ignore_secondaries:
1955
          disks_ok = False
1956

    
1957
  # FIXME: race condition on drbd migration to primary
1958

    
1959
  # 2nd pass, do only the primary node
1960
  for inst_disk in instance.disks:
1961
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1962
      if node != instance.primary_node:
1963
        continue
1964
      cfg.SetDiskID(node_disk, node)
1965
      result = rpc.call_blockdev_assemble(node, node_disk, iname, True)
1966
      if not result:
1967
        logger.Error("could not prepare block device %s on node %s"
1968
                     " (is_primary=True, pass=2)" % (inst_disk.iv_name, node))
1969
        disks_ok = False
1970
    device_info.append((instance.primary_node, inst_disk.iv_name, result))
1971

    
1972
  # leave the disks configured for the primary node
1973
  # this is a workaround that would be fixed better by
1974
  # improving the logical/physical id handling
1975
  for disk in instance.disks:
1976
    cfg.SetDiskID(disk, instance.primary_node)
1977

    
1978
  return disks_ok, device_info
1979

    
1980

    
1981
def _StartInstanceDisks(cfg, instance, force):
1982
  """Start the disks of an instance.
1983

1984
  """
1985
  disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
1986
                                           ignore_secondaries=force)
1987
  if not disks_ok:
1988
    _ShutdownInstanceDisks(instance, cfg)
1989
    if force is not None and not force:
1990
      logger.Error("If the message above refers to a secondary node,"
1991
                   " you can retry the operation using '--force'.")
1992
    raise errors.OpExecError("Disk consistency error")
1993

    
1994

    
1995
class LUDeactivateInstanceDisks(NoHooksLU):
1996
  """Shutdown an instance's disks.
1997

1998
  """
1999
  _OP_REQP = ["instance_name"]
2000
  REQ_BGL = False
2001

    
2002
  def ExpandNames(self):
2003
    self._ExpandAndLockInstance()
2004
    self.needed_locks[locking.LEVEL_NODE] = []
2005
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2006

    
2007
  def DeclareLocks(self, level):
2008
    if level == locking.LEVEL_NODE:
2009
      self._LockInstancesNodes()
2010

    
2011
  def CheckPrereq(self):
2012
    """Check prerequisites.
2013

2014
    This checks that the instance is in the cluster.
2015

2016
    """
2017
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2018
    assert self.instance is not None, \
2019
      "Cannot retrieve locked instance %s" % self.op.instance_name
2020

    
2021
  def Exec(self, feedback_fn):
2022
    """Deactivate the disks
2023

2024
    """
2025
    instance = self.instance
2026
    _SafeShutdownInstanceDisks(instance, self.cfg)
2027

    
2028

    
2029
def _SafeShutdownInstanceDisks(instance, cfg):
2030
  """Shutdown block devices of an instance.
2031

2032
  This function checks if an instance is running, before calling
2033
  _ShutdownInstanceDisks.
2034

2035
  """
2036
  ins_l = rpc.call_instance_list([instance.primary_node])
2037
  ins_l = ins_l[instance.primary_node]
2038
  if not type(ins_l) is list:
2039
    raise errors.OpExecError("Can't contact node '%s'" %
2040
                             instance.primary_node)
2041

    
2042
  if instance.name in ins_l:
2043
    raise errors.OpExecError("Instance is running, can't shutdown"
2044
                             " block devices.")
2045

    
2046
  _ShutdownInstanceDisks(instance, cfg)
2047

    
2048

    
2049
def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
2050
  """Shutdown block devices of an instance.
2051

2052
  This does the shutdown on all nodes of the instance.
2053

2054
  If the ignore_primary is false, errors on the primary node are
2055
  ignored.
2056

2057
  """
2058
  result = True
2059
  for disk in instance.disks:
2060
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2061
      cfg.SetDiskID(top_disk, node)
2062
      if not rpc.call_blockdev_shutdown(node, top_disk):
2063
        logger.Error("could not shutdown block device %s on node %s" %
2064
                     (disk.iv_name, node))
2065
        if not ignore_primary or node != instance.primary_node:
2066
          result = False
2067
  return result
2068

    
2069

    
2070
def _CheckNodeFreeMemory(cfg, node, reason, requested):
2071
  """Checks if a node has enough free memory.
2072

2073
  This function check if a given node has the needed amount of free
2074
  memory. In case the node has less memory or we cannot get the
2075
  information from the node, this function raise an OpPrereqError
2076
  exception.
2077

2078
  Args:
2079
    - cfg: a ConfigWriter instance
2080
    - node: the node name
2081
    - reason: string to use in the error message
2082
    - requested: the amount of memory in MiB
2083

2084
  """
2085
  nodeinfo = rpc.call_node_info([node], cfg.GetVGName())
2086
  if not nodeinfo or not isinstance(nodeinfo, dict):
2087
    raise errors.OpPrereqError("Could not contact node %s for resource"
2088
                             " information" % (node,))
2089

    
2090
  free_mem = nodeinfo[node].get('memory_free')
2091
  if not isinstance(free_mem, int):
2092
    raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2093
                             " was '%s'" % (node, free_mem))
2094
  if requested > free_mem:
2095
    raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2096
                             " needed %s MiB, available %s MiB" %
2097
                             (node, reason, requested, free_mem))
2098

    
2099

    
2100
class LUStartupInstance(LogicalUnit):
2101
  """Starts an instance.
2102

2103
  """
2104
  HPATH = "instance-start"
2105
  HTYPE = constants.HTYPE_INSTANCE
2106
  _OP_REQP = ["instance_name", "force"]
2107
  REQ_BGL = False
2108

    
2109
  def ExpandNames(self):
2110
    self._ExpandAndLockInstance()
2111
    self.needed_locks[locking.LEVEL_NODE] = []
2112
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2113

    
2114
  def DeclareLocks(self, level):
2115
    if level == locking.LEVEL_NODE:
2116
      self._LockInstancesNodes()
2117

    
2118
  def BuildHooksEnv(self):
2119
    """Build hooks env.
2120

2121
    This runs on master, primary and secondary nodes of the instance.
2122

2123
    """
2124
    env = {
2125
      "FORCE": self.op.force,
2126
      }
2127
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2128
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2129
          list(self.instance.secondary_nodes))
2130
    return env, nl, nl
2131

    
2132
  def CheckPrereq(self):
2133
    """Check prerequisites.
2134

2135
    This checks that the instance is in the cluster.
2136

2137
    """
2138
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2139
    assert self.instance is not None, \
2140
      "Cannot retrieve locked instance %s" % self.op.instance_name
2141

    
2142
    # check bridges existance
2143
    _CheckInstanceBridgesExist(instance)
2144

    
2145
    _CheckNodeFreeMemory(self.cfg, instance.primary_node,
2146
                         "starting instance %s" % instance.name,
2147
                         instance.memory)
2148

    
2149
  def Exec(self, feedback_fn):
2150
    """Start the instance.
2151

2152
    """
2153
    instance = self.instance
2154
    force = self.op.force
2155
    extra_args = getattr(self.op, "extra_args", "")
2156

    
2157
    self.cfg.MarkInstanceUp(instance.name)
2158

    
2159
    node_current = instance.primary_node
2160

    
2161
    _StartInstanceDisks(self.cfg, instance, force)
2162

    
2163
    if not rpc.call_instance_start(node_current, instance, extra_args):
2164
      _ShutdownInstanceDisks(instance, self.cfg)
2165
      raise errors.OpExecError("Could not start instance")
2166

    
2167

    
2168
class LURebootInstance(LogicalUnit):
2169
  """Reboot an instance.
2170

2171
  """
2172
  HPATH = "instance-reboot"
2173
  HTYPE = constants.HTYPE_INSTANCE
2174
  _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2175
  REQ_BGL = False
2176

    
2177
  def ExpandNames(self):
2178
    if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2179
                                   constants.INSTANCE_REBOOT_HARD,
2180
                                   constants.INSTANCE_REBOOT_FULL]:
2181
      raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2182
                                  (constants.INSTANCE_REBOOT_SOFT,
2183
                                   constants.INSTANCE_REBOOT_HARD,
2184
                                   constants.INSTANCE_REBOOT_FULL))
2185
    self._ExpandAndLockInstance()
2186
    self.needed_locks[locking.LEVEL_NODE] = []
2187
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2188

    
2189
  def DeclareLocks(self, level):
2190
    if level == locking.LEVEL_NODE:
2191
      primary_only = not constants.INSTANCE_REBOOT_FULL
2192
      self._LockInstancesNodes(primary_only=primary_only)
2193

    
2194
  def BuildHooksEnv(self):
2195
    """Build hooks env.
2196

2197
    This runs on master, primary and secondary nodes of the instance.
2198

2199
    """
2200
    env = {
2201
      "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2202
      }
2203
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2204
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2205
          list(self.instance.secondary_nodes))
2206
    return env, nl, nl
2207

    
2208
  def CheckPrereq(self):
2209
    """Check prerequisites.
2210

2211
    This checks that the instance is in the cluster.
2212

2213
    """
2214
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2215
    assert self.instance is not None, \
2216
      "Cannot retrieve locked instance %s" % self.op.instance_name
2217

    
2218
    # check bridges existance
2219
    _CheckInstanceBridgesExist(instance)
2220

    
2221
  def Exec(self, feedback_fn):
2222
    """Reboot the instance.
2223

2224
    """
2225
    instance = self.instance
2226
    ignore_secondaries = self.op.ignore_secondaries
2227
    reboot_type = self.op.reboot_type
2228
    extra_args = getattr(self.op, "extra_args", "")
2229

    
2230
    node_current = instance.primary_node
2231

    
2232
    if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2233
                       constants.INSTANCE_REBOOT_HARD]:
2234
      if not rpc.call_instance_reboot(node_current, instance,
2235
                                      reboot_type, extra_args):
2236
        raise errors.OpExecError("Could not reboot instance")
2237
    else:
2238
      if not rpc.call_instance_shutdown(node_current, instance):
2239
        raise errors.OpExecError("could not shutdown instance for full reboot")
2240
      _ShutdownInstanceDisks(instance, self.cfg)
2241
      _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
2242
      if not rpc.call_instance_start(node_current, instance, extra_args):
2243
        _ShutdownInstanceDisks(instance, self.cfg)
2244
        raise errors.OpExecError("Could not start instance for full reboot")
2245

    
2246
    self.cfg.MarkInstanceUp(instance.name)
2247

    
2248

    
2249
class LUShutdownInstance(LogicalUnit):
2250
  """Shutdown an instance.
2251

2252
  """
2253
  HPATH = "instance-stop"
2254
  HTYPE = constants.HTYPE_INSTANCE
2255
  _OP_REQP = ["instance_name"]
2256
  REQ_BGL = False
2257

    
2258
  def ExpandNames(self):
2259
    self._ExpandAndLockInstance()
2260
    self.needed_locks[locking.LEVEL_NODE] = []
2261
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2262

    
2263
  def DeclareLocks(self, level):
2264
    if level == locking.LEVEL_NODE:
2265
      self._LockInstancesNodes()
2266

    
2267
  def BuildHooksEnv(self):
2268
    """Build hooks env.
2269

2270
    This runs on master, primary and secondary nodes of the instance.
2271

2272
    """
2273
    env = _BuildInstanceHookEnvByObject(self.instance)
2274
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2275
          list(self.instance.secondary_nodes))
2276
    return env, nl, nl
2277

    
2278
  def CheckPrereq(self):
2279
    """Check prerequisites.
2280

2281
    This checks that the instance is in the cluster.
2282

2283
    """
2284
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2285
    assert self.instance is not None, \
2286
      "Cannot retrieve locked instance %s" % self.op.instance_name
2287

    
2288
  def Exec(self, feedback_fn):
2289
    """Shutdown the instance.
2290

2291
    """
2292
    instance = self.instance
2293
    node_current = instance.primary_node
2294
    self.cfg.MarkInstanceDown(instance.name)
2295
    if not rpc.call_instance_shutdown(node_current, instance):
2296
      logger.Error("could not shutdown instance")
2297

    
2298
    _ShutdownInstanceDisks(instance, self.cfg)
2299

    
2300

    
2301
class LUReinstallInstance(LogicalUnit):
2302
  """Reinstall an instance.
2303

2304
  """
2305
  HPATH = "instance-reinstall"
2306
  HTYPE = constants.HTYPE_INSTANCE
2307
  _OP_REQP = ["instance_name"]
2308
  REQ_BGL = False
2309

    
2310
  def ExpandNames(self):
2311
    self._ExpandAndLockInstance()
2312
    self.needed_locks[locking.LEVEL_NODE] = []
2313
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2314

    
2315
  def DeclareLocks(self, level):
2316
    if level == locking.LEVEL_NODE:
2317
      self._LockInstancesNodes()
2318

    
2319
  def BuildHooksEnv(self):
2320
    """Build hooks env.
2321

2322
    This runs on master, primary and secondary nodes of the instance.
2323

2324
    """
2325
    env = _BuildInstanceHookEnvByObject(self.instance)
2326
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2327
          list(self.instance.secondary_nodes))
2328
    return env, nl, nl
2329

    
2330
  def CheckPrereq(self):
2331
    """Check prerequisites.
2332

2333
    This checks that the instance is in the cluster and is not running.
2334

2335
    """
2336
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2337
    assert instance is not None, \
2338
      "Cannot retrieve locked instance %s" % self.op.instance_name
2339

    
2340
    if instance.disk_template == constants.DT_DISKLESS:
2341
      raise errors.OpPrereqError("Instance '%s' has no disks" %
2342
                                 self.op.instance_name)
2343
    if instance.status != "down":
2344
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2345
                                 self.op.instance_name)
2346
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2347
    if remote_info:
2348
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2349
                                 (self.op.instance_name,
2350
                                  instance.primary_node))
2351

    
2352
    self.op.os_type = getattr(self.op, "os_type", None)
2353
    if self.op.os_type is not None:
2354
      # OS verification
2355
      pnode = self.cfg.GetNodeInfo(
2356
        self.cfg.ExpandNodeName(instance.primary_node))
2357
      if pnode is None:
2358
        raise errors.OpPrereqError("Primary node '%s' is unknown" %
2359
                                   self.op.pnode)
2360
      os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2361
      if not os_obj:
2362
        raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2363
                                   " primary node"  % self.op.os_type)
2364

    
2365
    self.instance = instance
2366

    
2367
  def Exec(self, feedback_fn):
2368
    """Reinstall the instance.
2369

2370
    """
2371
    inst = self.instance
2372

    
2373
    if self.op.os_type is not None:
2374
      feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2375
      inst.os = self.op.os_type
2376
      self.cfg.AddInstance(inst)
2377

    
2378
    _StartInstanceDisks(self.cfg, inst, None)
2379
    try:
2380
      feedback_fn("Running the instance OS create scripts...")
2381
      if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2382
        raise errors.OpExecError("Could not install OS for instance %s"
2383
                                 " on node %s" %
2384
                                 (inst.name, inst.primary_node))
2385
    finally:
2386
      _ShutdownInstanceDisks(inst, self.cfg)
2387

    
2388

    
2389
class LURenameInstance(LogicalUnit):
2390
  """Rename an instance.
2391

2392
  """
2393
  HPATH = "instance-rename"
2394
  HTYPE = constants.HTYPE_INSTANCE
2395
  _OP_REQP = ["instance_name", "new_name"]
2396

    
2397
  def BuildHooksEnv(self):
2398
    """Build hooks env.
2399

2400
    This runs on master, primary and secondary nodes of the instance.
2401

2402
    """
2403
    env = _BuildInstanceHookEnvByObject(self.instance)
2404
    env["INSTANCE_NEW_NAME"] = self.op.new_name
2405
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2406
          list(self.instance.secondary_nodes))
2407
    return env, nl, nl
2408

    
2409
  def CheckPrereq(self):
2410
    """Check prerequisites.
2411

2412
    This checks that the instance is in the cluster and is not running.
2413

2414
    """
2415
    instance = self.cfg.GetInstanceInfo(
2416
      self.cfg.ExpandInstanceName(self.op.instance_name))
2417
    if instance is None:
2418
      raise errors.OpPrereqError("Instance '%s' not known" %
2419
                                 self.op.instance_name)
2420
    if instance.status != "down":
2421
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2422
                                 self.op.instance_name)
2423
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2424
    if remote_info:
2425
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2426
                                 (self.op.instance_name,
2427
                                  instance.primary_node))
2428
    self.instance = instance
2429

    
2430
    # new name verification
2431
    name_info = utils.HostInfo(self.op.new_name)
2432

    
2433
    self.op.new_name = new_name = name_info.name
2434
    instance_list = self.cfg.GetInstanceList()
2435
    if new_name in instance_list:
2436
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2437
                                 new_name)
2438

    
2439
    if not getattr(self.op, "ignore_ip", False):
2440
      if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
2441
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
2442
                                   (name_info.ip, new_name))
2443

    
2444

    
2445
  def Exec(self, feedback_fn):
2446
    """Reinstall the instance.
2447

2448
    """
2449
    inst = self.instance
2450
    old_name = inst.name
2451

    
2452
    if inst.disk_template == constants.DT_FILE:
2453
      old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2454

    
2455
    self.cfg.RenameInstance(inst.name, self.op.new_name)
2456
    # Change the instance lock. This is definitely safe while we hold the BGL
2457
    self.context.glm.remove(locking.LEVEL_INSTANCE, inst.name)
2458
    self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
2459

    
2460
    # re-read the instance from the configuration after rename
2461
    inst = self.cfg.GetInstanceInfo(self.op.new_name)
2462

    
2463
    if inst.disk_template == constants.DT_FILE:
2464
      new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2465
      result = rpc.call_file_storage_dir_rename(inst.primary_node,
2466
                                                old_file_storage_dir,
2467
                                                new_file_storage_dir)
2468

    
2469
      if not result:
2470
        raise errors.OpExecError("Could not connect to node '%s' to rename"
2471
                                 " directory '%s' to '%s' (but the instance"
2472
                                 " has been renamed in Ganeti)" % (
2473
                                 inst.primary_node, old_file_storage_dir,
2474
                                 new_file_storage_dir))
2475

    
2476
      if not result[0]:
2477
        raise errors.OpExecError("Could not rename directory '%s' to '%s'"
2478
                                 " (but the instance has been renamed in"
2479
                                 " Ganeti)" % (old_file_storage_dir,
2480
                                               new_file_storage_dir))
2481

    
2482
    _StartInstanceDisks(self.cfg, inst, None)
2483
    try:
2484
      if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2485
                                          "sda", "sdb"):
2486
        msg = ("Could not run OS rename script for instance %s on node %s"
2487
               " (but the instance has been renamed in Ganeti)" %
2488
               (inst.name, inst.primary_node))
2489
        logger.Error(msg)
2490
    finally:
2491
      _ShutdownInstanceDisks(inst, self.cfg)
2492

    
2493

    
2494
class LURemoveInstance(LogicalUnit):
2495
  """Remove an instance.
2496

2497
  """
2498
  HPATH = "instance-remove"
2499
  HTYPE = constants.HTYPE_INSTANCE
2500
  _OP_REQP = ["instance_name", "ignore_failures"]
2501
  REQ_BGL = False
2502

    
2503
  def ExpandNames(self):
2504
    self._ExpandAndLockInstance()
2505
    self.needed_locks[locking.LEVEL_NODE] = []
2506
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2507

    
2508
  def DeclareLocks(self, level):
2509
    if level == locking.LEVEL_NODE:
2510
      self._LockInstancesNodes()
2511

    
2512
  def BuildHooksEnv(self):
2513
    """Build hooks env.
2514

2515
    This runs on master, primary and secondary nodes of the instance.
2516

2517
    """
2518
    env = _BuildInstanceHookEnvByObject(self.instance)
2519
    nl = [self.sstore.GetMasterNode()]
2520
    return env, nl, nl
2521

    
2522
  def CheckPrereq(self):
2523
    """Check prerequisites.
2524

2525
    This checks that the instance is in the cluster.
2526

2527
    """
2528
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2529
    assert self.instance is not None, \
2530
      "Cannot retrieve locked instance %s" % self.op.instance_name
2531

    
2532
  def Exec(self, feedback_fn):
2533
    """Remove the instance.
2534

2535
    """
2536
    instance = self.instance
2537
    logger.Info("shutting down instance %s on node %s" %
2538
                (instance.name, instance.primary_node))
2539

    
2540
    if not rpc.call_instance_shutdown(instance.primary_node, instance):
2541
      if self.op.ignore_failures:
2542
        feedback_fn("Warning: can't shutdown instance")
2543
      else:
2544
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2545
                                 (instance.name, instance.primary_node))
2546

    
2547
    logger.Info("removing block devices for instance %s" % instance.name)
2548

    
2549
    if not _RemoveDisks(instance, self.cfg):
2550
      if self.op.ignore_failures:
2551
        feedback_fn("Warning: can't remove instance's disks")
2552
      else:
2553
        raise errors.OpExecError("Can't remove instance's disks")
2554

    
2555
    logger.Info("removing instance %s out of cluster config" % instance.name)
2556

    
2557
    self.cfg.RemoveInstance(instance.name)
2558
    self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
2559

    
2560

    
2561
class LUQueryInstances(NoHooksLU):
2562
  """Logical unit for querying instances.
2563

2564
  """
2565
  _OP_REQP = ["output_fields", "names"]
2566
  REQ_BGL = False
2567

    
2568
  def ExpandNames(self):
2569
    self.dynamic_fields = frozenset(["oper_state", "oper_ram", "status"])
2570
    self.static_fields = frozenset([
2571
      "name", "os", "pnode", "snodes",
2572
      "admin_state", "admin_ram",
2573
      "disk_template", "ip", "mac", "bridge",
2574
      "sda_size", "sdb_size", "vcpus", "tags",
2575
      "network_port", "kernel_path", "initrd_path",
2576
      "hvm_boot_order", "hvm_acpi", "hvm_pae",
2577
      "hvm_cdrom_image_path", "hvm_nic_type",
2578
      "hvm_disk_type", "vnc_bind_address",
2579
      ])
2580
    _CheckOutputFields(static=self.static_fields,
2581
                       dynamic=self.dynamic_fields,
2582
                       selected=self.op.output_fields)
2583

    
2584
    self.needed_locks = {}
2585
    self.share_locks[locking.LEVEL_INSTANCE] = 1
2586
    self.share_locks[locking.LEVEL_NODE] = 1
2587

    
2588
    if self.op.names:
2589
      self.wanted = _GetWantedInstances(self, self.op.names)
2590
    else:
2591
      self.wanted = locking.ALL_SET
2592

    
2593
    self.do_locking = not self.static_fields.issuperset(self.op.output_fields)
2594
    if self.do_locking:
2595
      self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
2596
      self.needed_locks[locking.LEVEL_NODE] = []
2597
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2598

    
2599
  def DeclareLocks(self, level):
2600
    if level == locking.LEVEL_NODE and self.do_locking:
2601
      self._LockInstancesNodes()
2602

    
2603
  def CheckPrereq(self):
2604
    """Check prerequisites.
2605

2606
    """
2607
    pass
2608

    
2609
  def Exec(self, feedback_fn):
2610
    """Computes the list of nodes and their attributes.
2611

2612
    """
2613
    all_info = self.cfg.GetAllInstancesInfo()
2614
    if self.do_locking:
2615
      instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
2616
    elif self.wanted != locking.ALL_SET:
2617
      instance_names = self.wanted
2618
      missing = set(instance_names).difference(all_info.keys())
2619
      if missing:
2620
        raise self.OpExecError(
2621
          "Some instances were removed before retrieving their data: %s"
2622
          % missing)
2623
    else:
2624
      instance_names = all_info.keys()
2625
    instance_list = [all_info[iname] for iname in instance_names]
2626

    
2627
    # begin data gathering
2628

    
2629
    nodes = frozenset([inst.primary_node for inst in instance_list])
2630

    
2631
    bad_nodes = []
2632
    if self.dynamic_fields.intersection(self.op.output_fields):
2633
      live_data = {}
2634
      node_data = rpc.call_all_instances_info(nodes)
2635
      for name in nodes:
2636
        result = node_data[name]
2637
        if result:
2638
          live_data.update(result)
2639
        elif result == False:
2640
          bad_nodes.append(name)
2641
        # else no instance is alive
2642
    else:
2643
      live_data = dict([(name, {}) for name in instance_names])
2644

    
2645
    # end data gathering
2646

    
2647
    output = []
2648
    for instance in instance_list:
2649
      iout = []
2650
      for field in self.op.output_fields:
2651
        if field == "name":
2652
          val = instance.name
2653
        elif field == "os":
2654
          val = instance.os
2655
        elif field == "pnode":
2656
          val = instance.primary_node
2657
        elif field == "snodes":
2658
          val = list(instance.secondary_nodes)
2659
        elif field == "admin_state":
2660
          val = (instance.status != "down")
2661
        elif field == "oper_state":
2662
          if instance.primary_node in bad_nodes:
2663
            val = None
2664
          else:
2665
            val = bool(live_data.get(instance.name))
2666
        elif field == "status":
2667
          if instance.primary_node in bad_nodes:
2668
            val = "ERROR_nodedown"
2669
          else:
2670
            running = bool(live_data.get(instance.name))
2671
            if running:
2672
              if instance.status != "down":
2673
                val = "running"
2674
              else:
2675
                val = "ERROR_up"
2676
            else:
2677
              if instance.status != "down":
2678
                val = "ERROR_down"
2679
              else:
2680
                val = "ADMIN_down"
2681
        elif field == "admin_ram":
2682
          val = instance.memory
2683
        elif field == "oper_ram":
2684
          if instance.primary_node in bad_nodes:
2685
            val = None
2686
          elif instance.name in live_data:
2687
            val = live_data[instance.name].get("memory", "?")
2688
          else:
2689
            val = "-"
2690
        elif field == "disk_template":
2691
          val = instance.disk_template
2692
        elif field == "ip":
2693
          val = instance.nics[0].ip
2694
        elif field == "bridge":
2695
          val = instance.nics[0].bridge
2696
        elif field == "mac":
2697
          val = instance.nics[0].mac
2698
        elif field == "sda_size" or field == "sdb_size":
2699
          disk = instance.FindDisk(field[:3])
2700
          if disk is None:
2701
            val = None
2702
          else:
2703
            val = disk.size
2704
        elif field == "vcpus":
2705
          val = instance.vcpus
2706
        elif field == "tags":
2707
          val = list(instance.GetTags())
2708
        elif field in ("network_port", "kernel_path", "initrd_path",
2709
                       "hvm_boot_order", "hvm_acpi", "hvm_pae",
2710
                       "hvm_cdrom_image_path", "hvm_nic_type",
2711
                       "hvm_disk_type", "vnc_bind_address"):
2712
          val = getattr(instance, field, None)
2713
          if val is not None:
2714
            pass
2715
          elif field in ("hvm_nic_type", "hvm_disk_type",
2716
                         "kernel_path", "initrd_path"):
2717
            val = "default"
2718
          else:
2719
            val = "-"
2720
        else:
2721
          raise errors.ParameterError(field)
2722
        iout.append(val)
2723
      output.append(iout)
2724

    
2725
    return output
2726

    
2727

    
2728
class LUFailoverInstance(LogicalUnit):
2729
  """Failover an instance.
2730

2731
  """
2732
  HPATH = "instance-failover"
2733
  HTYPE = constants.HTYPE_INSTANCE
2734
  _OP_REQP = ["instance_name", "ignore_consistency"]
2735
  REQ_BGL = False
2736

    
2737
  def ExpandNames(self):
2738
    self._ExpandAndLockInstance()
2739
    self.needed_locks[locking.LEVEL_NODE] = []
2740
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2741

    
2742
  def DeclareLocks(self, level):
2743
    if level == locking.LEVEL_NODE:
2744
      self._LockInstancesNodes()
2745

    
2746
  def BuildHooksEnv(self):
2747
    """Build hooks env.
2748

2749
    This runs on master, primary and secondary nodes of the instance.
2750

2751
    """
2752
    env = {
2753
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2754
      }
2755
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2756
    nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2757
    return env, nl, nl
2758

    
2759
  def CheckPrereq(self):
2760
    """Check prerequisites.
2761

2762
    This checks that the instance is in the cluster.
2763

2764
    """
2765
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2766
    assert self.instance is not None, \
2767
      "Cannot retrieve locked instance %s" % self.op.instance_name
2768

    
2769
    if instance.disk_template not in constants.DTS_NET_MIRROR:
2770
      raise errors.OpPrereqError("Instance's disk layout is not"
2771
                                 " network mirrored, cannot failover.")
2772

    
2773
    secondary_nodes = instance.secondary_nodes
2774
    if not secondary_nodes:
2775
      raise errors.ProgrammerError("no secondary node but using "
2776
                                   "a mirrored disk template")
2777

    
2778
    target_node = secondary_nodes[0]
2779
    # check memory requirements on the secondary node
2780
    _CheckNodeFreeMemory(self.cfg, target_node, "failing over instance %s" %
2781
                         instance.name, instance.memory)
2782

    
2783
    # check bridge existance
2784
    brlist = [nic.bridge for nic in instance.nics]
2785
    if not rpc.call_bridges_exist(target_node, brlist):
2786
      raise errors.OpPrereqError("One or more target bridges %s does not"
2787
                                 " exist on destination node '%s'" %
2788
                                 (brlist, target_node))
2789

    
2790
  def Exec(self, feedback_fn):
2791
    """Failover an instance.
2792

2793
    The failover is done by shutting it down on its present node and
2794
    starting it on the secondary.
2795

2796
    """
2797
    instance = self.instance
2798

    
2799
    source_node = instance.primary_node
2800
    target_node = instance.secondary_nodes[0]
2801

    
2802
    feedback_fn("* checking disk consistency between source and target")
2803
    for dev in instance.disks:
2804
      # for drbd, these are drbd over lvm
2805
      if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2806
        if instance.status == "up" and not self.op.ignore_consistency:
2807
          raise errors.OpExecError("Disk %s is degraded on target node,"
2808
                                   " aborting failover." % dev.iv_name)
2809

    
2810
    feedback_fn("* shutting down instance on source node")
2811
    logger.Info("Shutting down instance %s on node %s" %
2812
                (instance.name, source_node))
2813

    
2814
    if not rpc.call_instance_shutdown(source_node, instance):
2815
      if self.op.ignore_consistency:
2816
        logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2817
                     " anyway. Please make sure node %s is down"  %
2818
                     (instance.name, source_node, source_node))
2819
      else:
2820
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2821
                                 (instance.name, source_node))
2822

    
2823
    feedback_fn("* deactivating the instance's disks on source node")
2824
    if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2825
      raise errors.OpExecError("Can't shut down the instance's disks.")
2826

    
2827
    instance.primary_node = target_node
2828
    # distribute new instance config to the other nodes
2829
    self.cfg.Update(instance)
2830

    
2831
    # Only start the instance if it's marked as up
2832
    if instance.status == "up":
2833
      feedback_fn("* activating the instance's disks on target node")
2834
      logger.Info("Starting instance %s on node %s" %
2835
                  (instance.name, target_node))
2836

    
2837
      disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2838
                                               ignore_secondaries=True)
2839
      if not disks_ok:
2840
        _ShutdownInstanceDisks(instance, self.cfg)
2841
        raise errors.OpExecError("Can't activate the instance's disks")
2842

    
2843
      feedback_fn("* starting the instance on the target node")
2844
      if not rpc.call_instance_start(target_node, instance, None):
2845
        _ShutdownInstanceDisks(instance, self.cfg)
2846
        raise errors.OpExecError("Could not start instance %s on node %s." %
2847
                                 (instance.name, target_node))
2848

    
2849

    
2850
def _CreateBlockDevOnPrimary(cfg, node, instance, device, info):
2851
  """Create a tree of block devices on the primary node.
2852

2853
  This always creates all devices.
2854

2855
  """
2856
  if device.children:
2857
    for child in device.children:
2858
      if not _CreateBlockDevOnPrimary(cfg, node, instance, child, info):
2859
        return False
2860

    
2861
  cfg.SetDiskID(device, node)
2862
  new_id = rpc.call_blockdev_create(node, device, device.size,
2863
                                    instance.name, True, info)
2864
  if not new_id:
2865
    return False
2866
  if device.physical_id is None:
2867
    device.physical_id = new_id
2868
  return True
2869

    
2870

    
2871
def _CreateBlockDevOnSecondary(cfg, node, instance, device, force, info):
2872
  """Create a tree of block devices on a secondary node.
2873

2874
  If this device type has to be created on secondaries, create it and
2875
  all its children.
2876

2877
  If not, just recurse to children keeping the same 'force' value.
2878

2879
  """
2880
  if device.CreateOnSecondary():
2881
    force = True
2882
  if device.children:
2883
    for child in device.children:
2884
      if not _CreateBlockDevOnSecondary(cfg, node, instance,
2885
                                        child, force, info):
2886
        return False
2887

    
2888
  if not force:
2889
    return True
2890
  cfg.SetDiskID(device, node)
2891
  new_id = rpc.call_blockdev_create(node, device, device.size,
2892
                                    instance.name, False, info)
2893
  if not new_id:
2894
    return False
2895
  if device.physical_id is None:
2896
    device.physical_id = new_id
2897
  return True
2898

    
2899

    
2900
def _GenerateUniqueNames(cfg, exts):
2901
  """Generate a suitable LV name.
2902

2903
  This will generate a logical volume name for the given instance.
2904

2905
  """
2906
  results = []
2907
  for val in exts:
2908
    new_id = cfg.GenerateUniqueID()
2909
    results.append("%s%s" % (new_id, val))
2910
  return results
2911

    
2912

    
2913
def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name,
2914
                         p_minor, s_minor):
2915
  """Generate a drbd8 device complete with its children.
2916

2917
  """
2918
  port = cfg.AllocatePort()
2919
  vgname = cfg.GetVGName()
2920
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2921
                          logical_id=(vgname, names[0]))
2922
  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2923
                          logical_id=(vgname, names[1]))
2924
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
2925
                          logical_id=(primary, secondary, port,
2926
                                      p_minor, s_minor),
2927
                          children=[dev_data, dev_meta],
2928
                          iv_name=iv_name)
2929
  return drbd_dev
2930

    
2931

    
2932
def _GenerateDiskTemplate(cfg, template_name,
2933
                          instance_name, primary_node,
2934
                          secondary_nodes, disk_sz, swap_sz,
2935
                          file_storage_dir, file_driver):
2936
  """Generate the entire disk layout for a given template type.
2937

2938
  """
2939
  #TODO: compute space requirements
2940

    
2941
  vgname = cfg.GetVGName()
2942
  if template_name == constants.DT_DISKLESS:
2943
    disks = []
2944
  elif template_name == constants.DT_PLAIN:
2945
    if len(secondary_nodes) != 0:
2946
      raise errors.ProgrammerError("Wrong template configuration")
2947

    
2948
    names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
2949
    sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2950
                           logical_id=(vgname, names[0]),
2951
                           iv_name = "sda")
2952
    sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2953
                           logical_id=(vgname, names[1]),
2954
                           iv_name = "sdb")
2955
    disks = [sda_dev, sdb_dev]
2956
  elif template_name == constants.DT_DRBD8:
2957
    if len(secondary_nodes) != 1:
2958
      raise errors.ProgrammerError("Wrong template configuration")
2959
    remote_node = secondary_nodes[0]
2960
    (minor_pa, minor_pb,
2961
     minor_sa, minor_sb) = [None, None, None, None]
2962

    
2963
    names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2964
                                       ".sdb_data", ".sdb_meta"])
2965
    drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2966
                                        disk_sz, names[0:2], "sda",
2967
                                        minor_pa, minor_sa)
2968
    drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2969
                                        swap_sz, names[2:4], "sdb",
2970
                                        minor_pb, minor_sb)
2971
    disks = [drbd_sda_dev, drbd_sdb_dev]
2972
  elif template_name == constants.DT_FILE:
2973
    if len(secondary_nodes) != 0:
2974
      raise errors.ProgrammerError("Wrong template configuration")
2975

    
2976
    file_sda_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk_sz,
2977
                                iv_name="sda", logical_id=(file_driver,
2978
                                "%s/sda" % file_storage_dir))
2979
    file_sdb_dev = objects.Disk(dev_type=constants.LD_FILE, size=swap_sz,
2980
                                iv_name="sdb", logical_id=(file_driver,
2981
                                "%s/sdb" % file_storage_dir))
2982
    disks = [file_sda_dev, file_sdb_dev]
2983
  else:
2984
    raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
2985
  return disks
2986

    
2987

    
2988
def _GetInstanceInfoText(instance):
2989
  """Compute that text that should be added to the disk's metadata.
2990

2991
  """
2992
  return "originstname+%s" % instance.name
2993

    
2994

    
2995
def _CreateDisks(cfg, instance):
2996
  """Create all disks for an instance.
2997

2998
  This abstracts away some work from AddInstance.
2999

3000
  Args:
3001
    instance: the instance object
3002

3003
  Returns:
3004
    True or False showing the success of the creation process
3005

3006
  """
3007
  info = _GetInstanceInfoText(instance)
3008

    
3009
  if instance.disk_template == constants.DT_FILE:
3010
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3011
    result = rpc.call_file_storage_dir_create(instance.primary_node,
3012
                                              file_storage_dir)
3013

    
3014
    if not result:
3015
      logger.Error("Could not connect to node '%s'" % instance.primary_node)
3016
      return False
3017

    
3018
    if not result[0]:
3019
      logger.Error("failed to create directory '%s'" % file_storage_dir)
3020
      return False
3021

    
3022
  for device in instance.disks:
3023
    logger.Info("creating volume %s for instance %s" %
3024
                (device.iv_name, instance.name))
3025
    #HARDCODE
3026
    for secondary_node in instance.secondary_nodes:
3027
      if not _CreateBlockDevOnSecondary(cfg, secondary_node, instance,
3028
                                        device, False, info):
3029
        logger.Error("failed to create volume %s (%s) on secondary node %s!" %
3030
                     (device.iv_name, device, secondary_node))
3031
        return False
3032
    #HARDCODE
3033
    if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
3034
                                    instance, device, info):
3035
      logger.Error("failed to create volume %s on primary!" %
3036
                   device.iv_name)
3037
      return False
3038

    
3039
  return True
3040

    
3041

    
3042
def _RemoveDisks(instance, cfg):
3043
  """Remove all disks for an instance.
3044

3045
  This abstracts away some work from `AddInstance()` and
3046
  `RemoveInstance()`. Note that in case some of the devices couldn't
3047
  be removed, the removal will continue with the other ones (compare
3048
  with `_CreateDisks()`).
3049

3050
  Args:
3051
    instance: the instance object
3052

3053
  Returns:
3054
    True or False showing the success of the removal proces
3055

3056
  """
3057
  logger.Info("removing block devices for instance %s" % instance.name)
3058

    
3059
  result = True
3060
  for device in instance.disks:
3061
    for node, disk in device.ComputeNodeTree(instance.primary_node):
3062
      cfg.SetDiskID(disk, node)
3063
      if not rpc.call_blockdev_remove(node, disk):
3064
        logger.Error("could not remove block device %s on node %s,"
3065
                     " continuing anyway" %
3066
                     (device.iv_name, node))
3067
        result = False
3068

    
3069
  if instance.disk_template == constants.DT_FILE:
3070
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3071
    if not rpc.call_file_storage_dir_remove(instance.primary_node,
3072
                                            file_storage_dir):
3073
      logger.Error("could not remove directory '%s'" % file_storage_dir)
3074
      result = False
3075

    
3076
  return result
3077

    
3078

    
3079
def _ComputeDiskSize(disk_template, disk_size, swap_size):
3080
  """Compute disk size requirements in the volume group
3081

3082
  This is currently hard-coded for the two-drive layout.
3083

3084
  """
3085
  # Required free disk space as a function of disk and swap space
3086
  req_size_dict = {
3087
    constants.DT_DISKLESS: None,
3088
    constants.DT_PLAIN: disk_size + swap_size,
3089
    # 256 MB are added for drbd metadata, 128MB for each drbd device
3090
    constants.DT_DRBD8: disk_size + swap_size + 256,
3091
    constants.DT_FILE: None,
3092
  }
3093

    
3094
  if disk_template not in req_size_dict:
3095
    raise errors.ProgrammerError("Disk template '%s' size requirement"
3096
                                 " is unknown" %  disk_template)
3097

    
3098
  return req_size_dict[disk_template]
3099

    
3100

    
3101
class LUCreateInstance(LogicalUnit):
3102
  """Create an instance.
3103

3104
  """
3105
  HPATH = "instance-add"
3106
  HTYPE = constants.HTYPE_INSTANCE
3107
  _OP_REQP = ["instance_name", "mem_size", "disk_size",
3108
              "disk_template", "swap_size", "mode", "start", "vcpus",
3109
              "wait_for_sync", "ip_check", "mac"]
3110
  REQ_BGL = False
3111

    
3112
  def _ExpandNode(self, node):
3113
    """Expands and checks one node name.
3114

3115
    """
3116
    node_full = self.cfg.ExpandNodeName(node)
3117
    if node_full is None:
3118
      raise errors.OpPrereqError("Unknown node %s" % node)
3119
    return node_full
3120

    
3121
  def ExpandNames(self):
3122
    """ExpandNames for CreateInstance.
3123

3124
    Figure out the right locks for instance creation.
3125

3126
    """
3127
    self.needed_locks = {}
3128

    
3129
    # set optional parameters to none if they don't exist
3130
    for attr in ["kernel_path", "initrd_path", "pnode", "snode",
3131
                 "iallocator", "hvm_boot_order", "hvm_acpi", "hvm_pae",
3132
                 "hvm_cdrom_image_path", "hvm_nic_type", "hvm_disk_type",
3133
                 "vnc_bind_address"]:
3134
      if not hasattr(self.op, attr):
3135
        setattr(self.op, attr, None)
3136

    
3137
    # verify creation mode
3138
    if self.op.mode not in (constants.INSTANCE_CREATE,
3139
                            constants.INSTANCE_IMPORT):
3140
      raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
3141
                                 self.op.mode)
3142
    # disk template and mirror node verification
3143
    if self.op.disk_template not in constants.DISK_TEMPLATES:
3144
      raise errors.OpPrereqError("Invalid disk template name")
3145

    
3146
    #### instance parameters check
3147

    
3148
    # instance name verification
3149
    hostname1 = utils.HostInfo(self.op.instance_name)
3150
    self.op.instance_name = instance_name = hostname1.name
3151

    
3152
    # this is just a preventive check, but someone might still add this
3153
    # instance in the meantime, and creation will fail at lock-add time
3154
    if instance_name in self.cfg.GetInstanceList():
3155
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3156
                                 instance_name)
3157

    
3158
    self.add_locks[locking.LEVEL_INSTANCE] = instance_name
3159

    
3160
    # ip validity checks
3161
    ip = getattr(self.op, "ip", None)
3162
    if ip is None or ip.lower() == "none":
3163
      inst_ip = None
3164
    elif ip.lower() == "auto":
3165
      inst_ip = hostname1.ip
3166
    else:
3167
      if not utils.IsValidIP(ip):
3168
        raise errors.OpPrereqError("given IP address '%s' doesn't look"
3169
                                   " like a valid IP" % ip)
3170
      inst_ip = ip
3171
    self.inst_ip = self.op.ip = inst_ip
3172
    # used in CheckPrereq for ip ping check
3173
    self.check_ip = hostname1.ip
3174

    
3175
    # MAC address verification
3176
    if self.op.mac != "auto":
3177
      if not utils.IsValidMac(self.op.mac.lower()):
3178
        raise errors.OpPrereqError("invalid MAC address specified: %s" %
3179
                                   self.op.mac)
3180

    
3181
    # boot order verification
3182
    if self.op.hvm_boot_order is not None:
3183
      if len(self.op.hvm_boot_order.strip("acdn")) != 0:
3184
        raise errors.OpPrereqError("invalid boot order specified,"
3185
                                   " must be one or more of [acdn]")
3186
    # file storage checks
3187
    if (self.op.file_driver and
3188
        not self.op.file_driver in constants.FILE_DRIVER):
3189
      raise errors.OpPrereqError("Invalid file driver name '%s'" %
3190
                                 self.op.file_driver)
3191

    
3192
    if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
3193
      raise errors.OpPrereqError("File storage directory path not absolute")
3194

    
3195
    ### Node/iallocator related checks
3196
    if [self.op.iallocator, self.op.pnode].count(None) != 1:
3197
      raise errors.OpPrereqError("One and only one of iallocator and primary"
3198
                                 " node must be given")
3199

    
3200
    if self.op.iallocator:
3201
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3202
    else:
3203
      self.op.pnode = self._ExpandNode(self.op.pnode)
3204
      nodelist = [self.op.pnode]
3205
      if self.op.snode is not None:
3206
        self.op.snode = self._ExpandNode(self.op.snode)
3207
        nodelist.append(self.op.snode)
3208
      self.needed_locks[locking.LEVEL_NODE] = nodelist
3209

    
3210
    # in case of import lock the source node too
3211
    if self.op.mode == constants.INSTANCE_IMPORT:
3212
      src_node = getattr(self.op, "src_node", None)
3213
      src_path = getattr(self.op, "src_path", None)
3214

    
3215
      if src_node is None or src_path is None:
3216
        raise errors.OpPrereqError("Importing an instance requires source"
3217
                                   " node and path options")
3218

    
3219
      if not os.path.isabs(src_path):
3220
        raise errors.OpPrereqError("The source path must be absolute")
3221

    
3222
      self.op.src_node = src_node = self._ExpandNode(src_node)
3223
      if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
3224
        self.needed_locks[locking.LEVEL_NODE].append(src_node)
3225

    
3226
    else: # INSTANCE_CREATE
3227
      if getattr(self.op, "os_type", None) is None:
3228
        raise errors.OpPrereqError("No guest OS specified")
3229

    
3230
  def _RunAllocator(self):
3231
    """Run the allocator based on input opcode.
3232

3233
    """
3234
    disks = [{"size": self.op.disk_size, "mode": "w"},
3235
             {"size": self.op.swap_size, "mode": "w"}]
3236
    nics = [{"mac": self.op.mac, "ip": getattr(self.op, "ip", None),
3237
             "bridge": self.op.bridge}]
3238
    ial = IAllocator(self.cfg, self.sstore,
3239
                     mode=constants.IALLOCATOR_MODE_ALLOC,
3240
                     name=self.op.instance_name,
3241
                     disk_template=self.op.disk_template,
3242
                     tags=[],
3243
                     os=self.op.os_type,
3244
                     vcpus=self.op.vcpus,
3245
                     mem_size=self.op.mem_size,
3246
                     disks=disks,
3247
                     nics=nics,
3248
                     )
3249

    
3250
    ial.Run(self.op.iallocator)
3251

    
3252
    if not ial.success:
3253
      raise errors.OpPrereqError("Can't compute nodes using"
3254
                                 " iallocator '%s': %s" % (self.op.iallocator,
3255
                                                           ial.info))
3256
    if len(ial.nodes) != ial.required_nodes:
3257
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3258
                                 " of nodes (%s), required %s" %
3259
                                 (len(ial.nodes), ial.required_nodes))
3260
    self.op.pnode = ial.nodes[0]
3261
    logger.ToStdout("Selected nodes for the instance: %s" %
3262
                    (", ".join(ial.nodes),))
3263
    logger.Info("Selected nodes for instance %s via iallocator %s: %s" %
3264
                (self.op.instance_name, self.op.iallocator, ial.nodes))
3265
    if ial.required_nodes == 2:
3266
      self.op.snode = ial.nodes[1]
3267

    
3268
  def BuildHooksEnv(self):
3269
    """Build hooks env.
3270

3271
    This runs on master, primary and secondary nodes of the instance.
3272

3273
    """
3274
    env = {
3275
      "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
3276
      "INSTANCE_DISK_SIZE": self.op.disk_size,
3277
      "INSTANCE_SWAP_SIZE": self.op.swap_size,
3278
      "INSTANCE_ADD_MODE": self.op.mode,
3279
      }
3280
    if self.op.mode == constants.INSTANCE_IMPORT:
3281
      env["INSTANCE_SRC_NODE"] = self.op.src_node
3282
      env["INSTANCE_SRC_PATH"] = self.op.src_path
3283
      env["INSTANCE_SRC_IMAGE"] = self.src_image
3284

    
3285
    env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
3286
      primary_node=self.op.pnode,
3287
      secondary_nodes=self.secondaries,
3288
      status=self.instance_status,
3289
      os_type=self.op.os_type,
3290
      memory=self.op.mem_size,
3291
      vcpus=self.op.vcpus,
3292
      nics=[(self.inst_ip, self.op.bridge, self.op.mac)],
3293
    ))
3294

    
3295
    nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
3296
          self.secondaries)
3297
    return env, nl, nl
3298

    
3299

    
3300
  def CheckPrereq(self):
3301
    """Check prerequisites.
3302

3303
    """
3304
    if (not self.cfg.GetVGName() and
3305
        self.op.disk_template not in constants.DTS_NOT_LVM):
3306
      raise errors.OpPrereqError("Cluster does not support lvm-based"
3307
                                 " instances")
3308

    
3309
    if self.op.mode == constants.INSTANCE_IMPORT:
3310
      src_node = self.op.src_node
3311
      src_path = self.op.src_path
3312

    
3313
      export_info = rpc.call_export_info(src_node, src_path)
3314

    
3315
      if not export_info:
3316
        raise errors.OpPrereqError("No export found in dir %s" % src_path)
3317

    
3318
      if not export_info.has_section(constants.INISECT_EXP):
3319
        raise errors.ProgrammerError("Corrupted export config")
3320

    
3321
      ei_version = export_info.get(constants.INISECT_EXP, 'version')
3322
      if (int(ei_version) != constants.EXPORT_VERSION):
3323
        raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3324
                                   (ei_version, constants.EXPORT_VERSION))
3325

    
3326
      if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
3327
        raise errors.OpPrereqError("Can't import instance with more than"
3328
                                   " one data disk")
3329

    
3330
      # FIXME: are the old os-es, disk sizes, etc. useful?
3331
      self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3332
      diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
3333
                                                         'disk0_dump'))
3334
      self.src_image = diskimage
3335

    
3336
    # ip ping checks (we use the same ip that was resolved in ExpandNames)
3337

    
3338
    if self.op.start and not self.op.ip_check:
3339
      raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3340
                                 " adding an instance in start mode")
3341

    
3342
    if self.op.ip_check:
3343
      if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
3344
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
3345
                                   (self.check_ip, instance_name))
3346

    
3347
    # bridge verification
3348
    bridge = getattr(self.op, "bridge", None)
3349
    if bridge is None:
3350
      self.op.bridge = self.cfg.GetDefBridge()
3351
    else:
3352
      self.op.bridge = bridge
3353

    
3354
    #### allocator run
3355

    
3356
    if self.op.iallocator is not None:
3357
      self._RunAllocator()
3358

    
3359
    #### node related checks
3360

    
3361
    # check primary node
3362
    self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
3363
    assert self.pnode is not None, \
3364
      "Cannot retrieve locked node %s" % self.op.pnode
3365
    self.secondaries = []
3366

    
3367
    # mirror node verification
3368
    if self.op.disk_template in constants.DTS_NET_MIRROR:
3369
      if self.op.snode is None:
3370
        raise errors.OpPrereqError("The networked disk templates need"
3371
                                   " a mirror node")
3372
      if self.op.snode == pnode.name:
3373
        raise errors.OpPrereqError("The secondary node cannot be"
3374
                                   " the primary node.")
3375
      self.secondaries.append(self.op.snode)
3376

    
3377
    req_size = _ComputeDiskSize(self.op.disk_template,
3378
                                self.op.disk_size, self.op.swap_size)
3379

    
3380
    # Check lv size requirements
3381
    if req_size is not None:
3382
      nodenames = [pnode.name] + self.secondaries
3383
      nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
3384
      for node in nodenames:
3385
        info = nodeinfo.get(node, None)
3386
        if not info:
3387
          raise errors.OpPrereqError("Cannot get current information"
3388
                                     " from node '%s'" % node)
3389
        vg_free = info.get('vg_free', None)
3390
        if not isinstance(vg_free, int):
3391
          raise errors.OpPrereqError("Can't compute free disk space on"
3392
                                     " node %s" % node)
3393
        if req_size > info['vg_free']:
3394
          raise errors.OpPrereqError("Not enough disk space on target node %s."
3395
                                     " %d MB available, %d MB required" %
3396
                                     (node, info['vg_free'], req_size))
3397

    
3398
    # os verification
3399
    os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
3400
    if not os_obj:
3401
      raise errors.OpPrereqError("OS '%s' not in supported os list for"
3402
                                 " primary node"  % self.op.os_type)
3403

    
3404
    if self.op.kernel_path == constants.VALUE_NONE:
3405
      raise errors.OpPrereqError("Can't set instance kernel to none")
3406

    
3407
    # bridge check on primary node
3408
    if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3409
      raise errors.OpPrereqError("target bridge '%s' does not exist on"
3410
                                 " destination node '%s'" %
3411
                                 (self.op.bridge, pnode.name))
3412

    
3413
    # memory check on primary node
3414
    if self.op.start:
3415
      _CheckNodeFreeMemory(self.cfg, self.pnode.name,
3416
                           "creating instance %s" % self.op.instance_name,
3417
                           self.op.mem_size)
3418

    
3419
    # hvm_cdrom_image_path verification
3420
    if self.op.hvm_cdrom_image_path is not None:
3421
      # FIXME (als): shouldn't these checks happen on the destination node?
3422
      if not os.path.isabs(self.op.hvm_cdrom_image_path):
3423
        raise errors.OpPrereqError("The path to the HVM CDROM image must"
3424
                                   " be an absolute path or None, not %s" %
3425
                                   self.op.hvm_cdrom_image_path)
3426
      if not os.path.isfile(self.op.hvm_cdrom_image_path):
3427
        raise errors.OpPrereqError("The HVM CDROM image must either be a"
3428
                                   " regular file or a symlink pointing to"
3429
                                   " an existing regular file, not %s" %
3430
                                   self.op.hvm_cdrom_image_path)
3431

    
3432
    # vnc_bind_address verification
3433
    if self.op.vnc_bind_address is not None:
3434
      if not utils.IsValidIP(self.op.vnc_bind_address):
3435
        raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
3436
                                   " like a valid IP address" %
3437
                                   self.op.vnc_bind_address)
3438

    
3439
    # Xen HVM device type checks
3440
    if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
3441
      if self.op.hvm_nic_type not in constants.HT_HVM_VALID_NIC_TYPES:
3442
        raise errors.OpPrereqError("Invalid NIC type %s specified for Xen HVM"
3443
                                   " hypervisor" % self.op.hvm_nic_type)
3444
      if self.op.hvm_disk_type not in constants.HT_HVM_VALID_DISK_TYPES:
3445
        raise errors.OpPrereqError("Invalid disk type %s specified for Xen HVM"
3446
                                   " hypervisor" % self.op.hvm_disk_type)
3447

    
3448
    if self.op.start:
3449
      self.instance_status = 'up'
3450
    else:
3451
      self.instance_status = 'down'
3452

    
3453
  def Exec(self, feedback_fn):
3454
    """Create and add the instance to the cluster.
3455

3456
    """
3457
    instance = self.op.instance_name
3458
    pnode_name = self.pnode.name
3459

    
3460
    if self.op.mac == "auto":
3461
      mac_address = self.cfg.GenerateMAC()
3462
    else:
3463
      mac_address = self.op.mac
3464

    
3465
    nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3466
    if self.inst_ip is not None:
3467
      nic.ip = self.inst_ip
3468

    
3469
    ht_kind = self.sstore.GetHypervisorType()
3470
    if ht_kind in constants.HTS_REQ_PORT:
3471
      network_port = self.cfg.AllocatePort()
3472
    else:
3473
      network_port = None
3474

    
3475
    if self.op.vnc_bind_address is None:
3476
      self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
3477

    
3478
    # this is needed because os.path.join does not accept None arguments
3479
    if self.op.file_storage_dir is None:
3480
      string_file_storage_dir = ""
3481
    else:
3482
      string_file_storage_dir = self.op.file_storage_dir
3483

    
3484
    # build the full file storage dir path
3485
    file_storage_dir = os.path.normpath(os.path.join(
3486
                                        self.sstore.GetFileStorageDir(),
3487
                                        string_file_storage_dir, instance))
3488

    
3489

    
3490
    disks = _GenerateDiskTemplate(self.cfg,
3491
                                  self.op.disk_template,
3492
                                  instance, pnode_name,
3493
                                  self.secondaries, self.op.disk_size,
3494
                                  self.op.swap_size,
3495
                                  file_storage_dir,
3496
                                  self.op.file_driver)
3497

    
3498
    iobj = objects.Instance(name=instance, os=self.op.os_type,
3499
                            primary_node=pnode_name,
3500
                            memory=self.op.mem_size,
3501
                            vcpus=self.op.vcpus,
3502
                            nics=[nic], disks=disks,
3503
                            disk_template=self.op.disk_template,
3504
                            status=self.instance_status,
3505
                            network_port=network_port,
3506
                            kernel_path=self.op.kernel_path,
3507
                            initrd_path=self.op.initrd_path,
3508
                            hvm_boot_order=self.op.hvm_boot_order,
3509
                            hvm_acpi=self.op.hvm_acpi,
3510
                            hvm_pae=self.op.hvm_pae,
3511
                            hvm_cdrom_image_path=self.op.hvm_cdrom_image_path,
3512
                            vnc_bind_address=self.op.vnc_bind_address,
3513
                            hvm_nic_type=self.op.hvm_nic_type,
3514
                            hvm_disk_type=self.op.hvm_disk_type,
3515
                            )
3516

    
3517
    feedback_fn("* creating instance disks...")
3518
    if not _CreateDisks(self.cfg, iobj):
3519
      _RemoveDisks(iobj, self.cfg)
3520
      raise errors.OpExecError("Device creation failed, reverting...")
3521

    
3522
    feedback_fn("adding instance %s to cluster config" % instance)
3523

    
3524
    self.cfg.AddInstance(iobj)
3525
    # Declare that we don't want to remove the instance lock anymore, as we've
3526
    # added the instance to the config
3527
    del self.remove_locks[locking.LEVEL_INSTANCE]
3528

    
3529
    if self.op.wait_for_sync:
3530
      disk_abort = not _WaitForSync(self.cfg, iobj, self.proc)
3531
    elif iobj.disk_template in constants.DTS_NET_MIRROR:
3532
      # make sure the disks are not degraded (still sync-ing is ok)
3533
      time.sleep(15)
3534
      feedback_fn("* checking mirrors status")
3535
      disk_abort = not _WaitForSync(self.cfg, iobj, self.proc, oneshot=True)
3536
    else:
3537
      disk_abort = False
3538

    
3539
    if disk_abort:
3540
      _RemoveDisks(iobj, self.cfg)
3541
      self.cfg.RemoveInstance(iobj.name)
3542
      # Make sure the instance lock gets removed
3543
      self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
3544
      raise errors.OpExecError("There are some degraded disks for"
3545
                               " this instance")
3546

    
3547
    feedback_fn("creating os for instance %s on node %s" %
3548
                (instance, pnode_name))
3549

    
3550
    if iobj.disk_template != constants.DT_DISKLESS:
3551
      if self.op.mode == constants.INSTANCE_CREATE:
3552
        feedback_fn("* running the instance OS create scripts...")
3553
        if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3554
          raise errors.OpExecError("could not add os for instance %s"
3555
                                   " on node %s" %
3556
                                   (instance, pnode_name))
3557

    
3558
      elif self.op.mode == constants.INSTANCE_IMPORT:
3559
        feedback_fn("* running the instance OS import scripts...")
3560
        src_node = self.op.src_node
3561
        src_image = self.src_image
3562
        if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3563
                                                src_node, src_image):
3564
          raise errors.OpExecError("Could not import os for instance"
3565
                                   " %s on node %s" %
3566
                                   (instance, pnode_name))
3567
      else:
3568
        # also checked in the prereq part
3569
        raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3570
                                     % self.op.mode)
3571

    
3572
    if self.op.start:
3573
      logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3574
      feedback_fn("* starting instance...")
3575
      if not rpc.call_instance_start(pnode_name, iobj, None):
3576
        raise errors.OpExecError("Could not start instance")
3577

    
3578

    
3579
class LUConnectConsole(NoHooksLU):
3580
  """Connect to an instance's console.
3581

3582
  This is somewhat special in that it returns the command line that
3583
  you need to run on the master node in order to connect to the
3584
  console.
3585

3586
  """
3587
  _OP_REQP = ["instance_name"]
3588
  REQ_BGL = False
3589

    
3590
  def ExpandNames(self):
3591
    self._ExpandAndLockInstance()
3592

    
3593
  def CheckPrereq(self):
3594
    """Check prerequisites.
3595

3596
    This checks that the instance is in the cluster.
3597

3598
    """
3599
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3600
    assert self.instance is not None, \
3601
      "Cannot retrieve locked instance %s" % self.op.instance_name
3602

    
3603
  def Exec(self, feedback_fn):
3604
    """Connect to the console of an instance
3605

3606
    """
3607
    instance = self.instance
3608
    node = instance.primary_node
3609

    
3610
    node_insts = rpc.call_instance_list([node])[node]
3611
    if node_insts is False:
3612
      raise errors.OpExecError("Can't connect to node %s." % node)
3613

    
3614
    if instance.name not in node_insts:
3615
      raise errors.OpExecError("Instance %s is not running." % instance.name)
3616

    
3617
    logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3618

    
3619
    hyper = hypervisor.GetHypervisor()
3620
    console_cmd = hyper.GetShellCommandForConsole(instance)
3621

    
3622
    # build ssh cmdline
3623
    return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
3624

    
3625

    
3626
class LUReplaceDisks(LogicalUnit):
3627
  """Replace the disks of an instance.
3628

3629
  """
3630
  HPATH = "mirrors-replace"
3631
  HTYPE = constants.HTYPE_INSTANCE
3632
  _OP_REQP = ["instance_name", "mode", "disks"]
3633
  REQ_BGL = False
3634

    
3635
  def ExpandNames(self):
3636
    self._ExpandAndLockInstance()
3637

    
3638
    if not hasattr(self.op, "remote_node"):
3639
      self.op.remote_node = None
3640

    
3641
    ia_name = getattr(self.op, "iallocator", None)
3642
    if ia_name is not None:
3643
      if self.op.remote_node is not None:
3644
        raise errors.OpPrereqError("Give either the iallocator or the new"
3645
                                   " secondary, not both")
3646
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3647
    elif self.op.remote_node is not None:
3648
      remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
3649
      if remote_node is None:
3650
        raise errors.OpPrereqError("Node '%s' not known" %
3651
                                   self.op.remote_node)
3652
      self.op.remote_node = remote_node
3653
      self.needed_locks[locking.LEVEL_NODE] = [remote_node]
3654
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
3655
    else:
3656
      self.needed_locks[locking.LEVEL_NODE] = []
3657
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3658

    
3659
  def DeclareLocks(self, level):
3660
    # If we're not already locking all nodes in the set we have to declare the
3661
    # instance's primary/secondary nodes.
3662
    if (level == locking.LEVEL_NODE and
3663
        self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
3664
      self._LockInstancesNodes()
3665

    
3666
  def _RunAllocator(self):
3667
    """Compute a new secondary node using an IAllocator.
3668

3669
    """
3670
    ial = IAllocator(self.cfg, self.sstore,
3671
                     mode=constants.IALLOCATOR_MODE_RELOC,
3672
                     name=self.op.instance_name,
3673
                     relocate_from=[self.sec_node])
3674

    
3675
    ial.Run(self.op.iallocator)
3676

    
3677
    if not ial.success:
3678
      raise errors.OpPrereqError("Can't compute nodes using"
3679
                                 " iallocator '%s': %s" % (self.op.iallocator,
3680
                                                           ial.info))
3681
    if len(ial.nodes) != ial.required_nodes:
3682
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3683
                                 " of nodes (%s), required %s" %
3684
                                 (len(ial.nodes), ial.required_nodes))
3685
    self.op.remote_node = ial.nodes[0]
3686
    logger.ToStdout("Selected new secondary for the instance: %s" %
3687
                    self.op.remote_node)
3688

    
3689
  def BuildHooksEnv(self):
3690
    """Build hooks env.
3691

3692
    This runs on the master, the primary and all the secondaries.
3693

3694
    """
3695
    env = {
3696
      "MODE": self.op.mode,
3697
      "NEW_SECONDARY": self.op.remote_node,
3698
      "OLD_SECONDARY": self.instance.secondary_nodes[0],
3699
      }
3700
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3701
    nl = [
3702
      self.sstore.GetMasterNode(),
3703
      self.instance.primary_node,
3704
      ]
3705
    if self.op.remote_node is not None:
3706
      nl.append(self.op.remote_node)
3707
    return env, nl, nl
3708

    
3709
  def CheckPrereq(self):
3710
    """Check prerequisites.
3711

3712
    This checks that the instance is in the cluster.
3713

3714
    """
3715
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3716
    assert instance is not None, \
3717
      "Cannot retrieve locked instance %s" % self.op.instance_name
3718
    self.instance = instance
3719

    
3720
    if instance.disk_template not in constants.DTS_NET_MIRROR:
3721
      raise errors.OpPrereqError("Instance's disk layout is not"
3722
                                 " network mirrored.")
3723

    
3724
    if len(instance.secondary_nodes) != 1:
3725
      raise errors.OpPrereqError("The instance has a strange layout,"
3726
                                 " expected one secondary but found %d" %
3727
                                 len(instance.secondary_nodes))
3728

    
3729
    self.sec_node = instance.secondary_nodes[0]
3730

    
3731
    ia_name = getattr(self.op, "iallocator", None)
3732
    if ia_name is not None:
3733
      self._RunAllocator()
3734

    
3735
    remote_node = self.op.remote_node
3736
    if remote_node is not None:
3737
      self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3738
      assert self.remote_node_info is not None, \
3739
        "Cannot retrieve locked node %s" % remote_node
3740
    else:
3741
      self.remote_node_info = None
3742
    if remote_node == instance.primary_node:
3743
      raise errors.OpPrereqError("The specified node is the primary node of"
3744
                                 " the instance.")
3745
    elif remote_node == self.sec_node:
3746
      if self.op.mode == constants.REPLACE_DISK_SEC:
3747
        # this is for DRBD8, where we can't execute the same mode of
3748
        # replacement as for drbd7 (no different port allocated)
3749
        raise errors.OpPrereqError("Same secondary given, cannot execute"
3750
                                   " replacement")
3751
    if instance.disk_template == constants.DT_DRBD8:
3752
      if (self.op.mode == constants.REPLACE_DISK_ALL and
3753
          remote_node is not None):
3754
        # switch to replace secondary mode
3755
        self.op.mode = constants.REPLACE_DISK_SEC
3756

    
3757
      if self.op.mode == constants.REPLACE_DISK_ALL:
3758
        raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3759
                                   " secondary disk replacement, not"
3760
                                   " both at once")
3761
      elif self.op.mode == constants.REPLACE_DISK_PRI:
3762
        if remote_node is not None:
3763
          raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3764
                                     " the secondary while doing a primary"
3765
                                     " node disk replacement")
3766
        self.tgt_node = instance.primary_node
3767
        self.oth_node = instance.secondary_nodes[0]
3768
      elif self.op.mode == constants.REPLACE_DISK_SEC:
3769
        self.new_node = remote_node # this can be None, in which case
3770
                                    # we don't change the secondary
3771
        self.tgt_node = instance.secondary_nodes[0]
3772
        self.oth_node = instance.primary_node
3773
      else:
3774
        raise errors.ProgrammerError("Unhandled disk replace mode")
3775

    
3776
    for name in self.op.disks:
3777
      if instance.FindDisk(name) is None:
3778
        raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3779
                                   (name, instance.name))
3780

    
3781
  def _ExecD8DiskOnly(self, feedback_fn):
3782
    """Replace a disk on the primary or secondary for dbrd8.
3783

3784
    The algorithm for replace is quite complicated:
3785
      - for each disk to be replaced:
3786
        - create new LVs on the target node with unique names
3787
        - detach old LVs from the drbd device
3788
        - rename old LVs to name_replaced.<time_t>
3789
        - rename new LVs to old LVs
3790
        - attach the new LVs (with the old names now) to the drbd device
3791
      - wait for sync across all devices
3792
      - for each modified disk:
3793
        - remove old LVs (which have the name name_replaces.<time_t>)
3794

3795
    Failures are not very well handled.
3796

3797
    """
3798
    steps_total = 6
3799
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3800
    instance = self.instance
3801
    iv_names = {}
3802
    vgname = self.cfg.GetVGName()
3803
    # start of work
3804
    cfg = self.cfg
3805
    tgt_node = self.tgt_node
3806
    oth_node = self.oth_node
3807

    
3808
    # Step: check device activation
3809
    self.proc.LogStep(1, steps_total, "check device existence")
3810
    info("checking volume groups")
3811
    my_vg = cfg.GetVGName()
3812
    results = rpc.call_vg_list([oth_node, tgt_node])
3813
    if not results:
3814
      raise errors.OpExecError("Can't list volume groups on the nodes")
3815
    for node in oth_node, tgt_node:
3816
      res = results.get(node, False)
3817
      if not res or my_vg not in res:
3818
        raise errors.OpExecError("Volume group '%s' not found on %s" %
3819
                                 (my_vg, node))
3820
    for dev in instance.disks:
3821
      if not dev.iv_name in self.op.disks:
3822
        continue
3823
      for node in tgt_node, oth_node:
3824
        info("checking %s on %s" % (dev.iv_name, node))
3825
        cfg.SetDiskID(dev, node)
3826
        if not rpc.call_blockdev_find(node, dev):
3827
          raise errors.OpExecError("Can't find device %s on node %s" %
3828
                                   (dev.iv_name, node))
3829

    
3830
    # Step: check other node consistency
3831
    self.proc.LogStep(2, steps_total, "check peer consistency")
3832
    for dev in instance.disks:
3833
      if not dev.iv_name in self.op.disks:
3834
        continue
3835
      info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3836
      if not _CheckDiskConsistency(self.cfg, dev, oth_node,
3837
                                   oth_node==instance.primary_node):
3838
        raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3839
                                 " to replace disks on this node (%s)" %
3840
                                 (oth_node, tgt_node))
3841

    
3842
    # Step: create new storage
3843
    self.proc.LogStep(3, steps_total, "allocate new storage")
3844
    for dev in instance.disks:
3845
      if not dev.iv_name in self.op.disks:
3846
        continue
3847
      size = dev.size
3848
      cfg.SetDiskID(dev, tgt_node)
3849
      lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3850
      names = _GenerateUniqueNames(cfg, lv_names)
3851
      lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3852
                             logical_id=(vgname, names[0]))
3853
      lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3854
                             logical_id=(vgname, names[1]))
3855
      new_lvs = [lv_data, lv_meta]
3856
      old_lvs = dev.children
3857
      iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3858
      info("creating new local storage on %s for %s" %
3859
           (tgt_node, dev.iv_name))
3860
      # since we *always* want to create this LV, we use the
3861
      # _Create...OnPrimary (which forces the creation), even if we
3862
      # are talking about the secondary node
3863
      for new_lv in new_lvs:
3864
        if not _CreateBlockDevOnPrimary(cfg, tgt_node, instance, new_lv,
3865
                                        _GetInstanceInfoText(instance)):
3866
          raise errors.OpExecError("Failed to create new LV named '%s' on"
3867
                                   " node '%s'" %
3868
                                   (new_lv.logical_id[1], tgt_node))
3869

    
3870
    # Step: for each lv, detach+rename*2+attach
3871
    self.proc.LogStep(4, steps_total, "change drbd configuration")
3872
    for dev, old_lvs, new_lvs in iv_names.itervalues():
3873
      info("detaching %s drbd from local storage" % dev.iv_name)
3874
      if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
3875
        raise errors.OpExecError("Can't detach drbd from local storage on node"
3876
                                 " %s for device %s" % (tgt_node, dev.iv_name))
3877
      #dev.children = []
3878
      #cfg.Update(instance)
3879

    
3880
      # ok, we created the new LVs, so now we know we have the needed
3881
      # storage; as such, we proceed on the target node to rename
3882
      # old_lv to _old, and new_lv to old_lv; note that we rename LVs
3883
      # using the assumption that logical_id == physical_id (which in
3884
      # turn is the unique_id on that node)
3885

    
3886
      # FIXME(iustin): use a better name for the replaced LVs
3887
      temp_suffix = int(time.time())
3888
      ren_fn = lambda d, suff: (d.physical_id[0],
3889
                                d.physical_id[1] + "_replaced-%s" % suff)
3890
      # build the rename list based on what LVs exist on the node
3891
      rlist = []
3892
      for to_ren in old_lvs:
3893
        find_res = rpc.call_blockdev_find(tgt_node, to_ren)
3894
        if find_res is not None: # device exists
3895
          rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
3896

    
3897
      info("renaming the old LVs on the target node")
3898
      if not rpc.call_blockdev_rename(tgt_node, rlist):
3899
        raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
3900
      # now we rename the new LVs to the old LVs
3901
      info("renaming the new LVs on the target node")
3902
      rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
3903
      if not rpc.call_blockdev_rename(tgt_node, rlist):
3904
        raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
3905

    
3906
      for old, new in zip(old_lvs, new_lvs):
3907
        new.logical_id = old.logical_id
3908
        cfg.SetDiskID(new, tgt_node)
3909

    
3910
      for disk in old_lvs:
3911
        disk.logical_id = ren_fn(disk, temp_suffix)
3912
        cfg.SetDiskID(disk, tgt_node)
3913

    
3914
      # now that the new lvs have the old name, we can add them to the device
3915
      info("adding new mirror component on %s" % tgt_node)
3916
      if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
3917
        for new_lv in new_lvs:
3918
          if not rpc.call_blockdev_remove(tgt_node, new_lv):
3919
            warning("Can't rollback device %s", hint="manually cleanup unused"
3920
                    " logical volumes")
3921
        raise errors.OpExecError("Can't add local storage to drbd")
3922

    
3923
      dev.children = new_lvs
3924
      cfg.Update(instance)
3925

    
3926
    # Step: wait for sync
3927

    
3928
    # this can fail as the old devices are degraded and _WaitForSync
3929
    # does a combined result over all disks, so we don't check its
3930
    # return value
3931
    self.proc.LogStep(5, steps_total, "sync devices")
3932
    _WaitForSync(cfg, instance, self.proc, unlock=True)
3933

    
3934
    # so check manually all the devices
3935
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3936
      cfg.SetDiskID(dev, instance.primary_node)
3937
      is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3938
      if is_degr:
3939
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
3940

    
3941
    # Step: remove old storage
3942
    self.proc.LogStep(6, steps_total, "removing old storage")
3943
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3944
      info("remove logical volumes for %s" % name)
3945
      for lv in old_lvs:
3946
        cfg.SetDiskID(lv, tgt_node)
3947
        if not rpc.call_blockdev_remove(tgt_node, lv):
3948
          warning("Can't remove old LV", hint="manually remove unused LVs")
3949
          continue
3950

    
3951
  def _ExecD8Secondary(self, feedback_fn):
3952
    """Replace the secondary node for drbd8.
3953

3954
    The algorithm for replace is quite complicated:
3955
      - for all disks of the instance:
3956
        - create new LVs on the new node with same names
3957
        - shutdown the drbd device on the old secondary
3958
        - disconnect the drbd network on the primary
3959
        - create the drbd device on the new secondary
3960
        - network attach the drbd on the primary, using an artifice:
3961
          the drbd code for Attach() will connect to the network if it
3962
          finds a device which is connected to the good local disks but
3963
          not network enabled
3964
      - wait for sync across all devices
3965
      - remove all disks from the old secondary
3966

3967
    Failures are not very well handled.
3968

3969
    """
3970
    steps_total = 6
3971
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3972
    instance = self.instance
3973
    iv_names = {}
3974
    vgname = self.cfg.GetVGName()
3975
    # start of work
3976
    cfg = self.cfg
3977
    old_node = self.tgt_node
3978
    new_node = self.new_node
3979
    pri_node = instance.primary_node
3980

    
3981
    # Step: check device activation