Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ c8d8b4c8

History | View | Annotate | Download (183.6 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0613,W0201
25

    
26
import os
27
import os.path
28
import sha
29
import time
30
import tempfile
31
import re
32
import platform
33

    
34
from ganeti import rpc
35
from ganeti import ssh
36
from ganeti import logger
37
from ganeti import utils
38
from ganeti import errors
39
from ganeti import hypervisor
40
from ganeti import locking
41
from ganeti import constants
42
from ganeti import objects
43
from ganeti import opcodes
44
from ganeti import serializer
45

    
46

    
47
class LogicalUnit(object):
48
  """Logical Unit base class.
49

50
  Subclasses must follow these rules:
51
    - implement ExpandNames
52
    - implement CheckPrereq
53
    - implement Exec
54
    - implement BuildHooksEnv
55
    - redefine HPATH and HTYPE
56
    - optionally redefine their run requirements:
57
        REQ_MASTER: the LU needs to run on the master node
58
        REQ_WSSTORE: the LU needs a writable SimpleStore
59
        REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
60

61
  Note that all commands require root permissions.
62

63
  """
64
  HPATH = None
65
  HTYPE = None
66
  _OP_REQP = []
67
  REQ_MASTER = True
68
  REQ_WSSTORE = False
69
  REQ_BGL = True
70

    
71
  def __init__(self, processor, op, context, sstore):
72
    """Constructor for LogicalUnit.
73

74
    This needs to be overriden in derived classes in order to check op
75
    validity.
76

77
    """
78
    self.proc = processor
79
    self.op = op
80
    self.cfg = context.cfg
81
    self.sstore = sstore
82
    self.context = context
83
    self.needed_locks = None
84
    self.acquired_locks = {}
85
    self.share_locks = dict(((i, 0) for i in locking.LEVELS))
86
    # Used to force good behavior when calling helper functions
87
    self.recalculate_locks = {}
88
    self.__ssh = None
89

    
90
    for attr_name in self._OP_REQP:
91
      attr_val = getattr(op, attr_name, None)
92
      if attr_val is None:
93
        raise errors.OpPrereqError("Required parameter '%s' missing" %
94
                                   attr_name)
95

    
96
    if not self.cfg.IsCluster():
97
      raise errors.OpPrereqError("Cluster not initialized yet,"
98
                                 " use 'gnt-cluster init' first.")
99
    if self.REQ_MASTER:
100
      master = sstore.GetMasterNode()
101
      if master != utils.HostInfo().name:
102
        raise errors.OpPrereqError("Commands must be run on the master"
103
                                   " node %s" % master)
104

    
105
  def __GetSSH(self):
106
    """Returns the SshRunner object
107

108
    """
109
    if not self.__ssh:
110
      self.__ssh = ssh.SshRunner(self.sstore)
111
    return self.__ssh
112

    
113
  ssh = property(fget=__GetSSH)
114

    
115
  def ExpandNames(self):
116
    """Expand names for this LU.
117

118
    This method is called before starting to execute the opcode, and it should
119
    update all the parameters of the opcode to their canonical form (e.g. a
120
    short node name must be fully expanded after this method has successfully
121
    completed). This way locking, hooks, logging, ecc. can work correctly.
122

123
    LUs which implement this method must also populate the self.needed_locks
124
    member, as a dict with lock levels as keys, and a list of needed lock names
125
    as values. Rules:
126
      - Use an empty dict if you don't need any lock
127
      - If you don't need any lock at a particular level omit that level
128
      - Don't put anything for the BGL level
129
      - If you want all locks at a level use locking.ALL_SET as a value
130

131
    If you need to share locks (rather than acquire them exclusively) at one
132
    level you can modify self.share_locks, setting a true value (usually 1) for
133
    that level. By default locks are not shared.
134

135
    Examples:
136
    # Acquire all nodes and one instance
137
    self.needed_locks = {
138
      locking.LEVEL_NODE: locking.ALL_SET,
139
      locking.LEVEL_INSTANCE: ['instance1.example.tld'],
140
    }
141
    # Acquire just two nodes
142
    self.needed_locks = {
143
      locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
144
    }
145
    # Acquire no locks
146
    self.needed_locks = {} # No, you can't leave it to the default value None
147

148
    """
149
    # The implementation of this method is mandatory only if the new LU is
150
    # concurrent, so that old LUs don't need to be changed all at the same
151
    # time.
152
    if self.REQ_BGL:
153
      self.needed_locks = {} # Exclusive LUs don't need locks.
154
    else:
155
      raise NotImplementedError
156

    
157
  def DeclareLocks(self, level):
158
    """Declare LU locking needs for a level
159

160
    While most LUs can just declare their locking needs at ExpandNames time,
161
    sometimes there's the need to calculate some locks after having acquired
162
    the ones before. This function is called just before acquiring locks at a
163
    particular level, but after acquiring the ones at lower levels, and permits
164
    such calculations. It can be used to modify self.needed_locks, and by
165
    default it does nothing.
166

167
    This function is only called if you have something already set in
168
    self.needed_locks for the level.
169

170
    @param level: Locking level which is going to be locked
171
    @type level: member of ganeti.locking.LEVELS
172

173
    """
174

    
175
  def CheckPrereq(self):
176
    """Check prerequisites for this LU.
177

178
    This method should check that the prerequisites for the execution
179
    of this LU are fulfilled. It can do internode communication, but
180
    it should be idempotent - no cluster or system changes are
181
    allowed.
182

183
    The method should raise errors.OpPrereqError in case something is
184
    not fulfilled. Its return value is ignored.
185

186
    This method should also update all the parameters of the opcode to
187
    their canonical form if it hasn't been done by ExpandNames before.
188

189
    """
190
    raise NotImplementedError
191

    
192
  def Exec(self, feedback_fn):
193
    """Execute the LU.
194

195
    This method should implement the actual work. It should raise
196
    errors.OpExecError for failures that are somewhat dealt with in
197
    code, or expected.
198

199
    """
200
    raise NotImplementedError
201

    
202
  def BuildHooksEnv(self):
203
    """Build hooks environment for this LU.
204

205
    This method should return a three-node tuple consisting of: a dict
206
    containing the environment that will be used for running the
207
    specific hook for this LU, a list of node names on which the hook
208
    should run before the execution, and a list of node names on which
209
    the hook should run after the execution.
210

211
    The keys of the dict must not have 'GANETI_' prefixed as this will
212
    be handled in the hooks runner. Also note additional keys will be
213
    added by the hooks runner. If the LU doesn't define any
214
    environment, an empty dict (and not None) should be returned.
215

216
    No nodes should be returned as an empty list (and not None).
217

218
    Note that if the HPATH for a LU class is None, this function will
219
    not be called.
220

221
    """
222
    raise NotImplementedError
223

    
224
  def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
225
    """Notify the LU about the results of its hooks.
226

227
    This method is called every time a hooks phase is executed, and notifies
228
    the Logical Unit about the hooks' result. The LU can then use it to alter
229
    its result based on the hooks.  By default the method does nothing and the
230
    previous result is passed back unchanged but any LU can define it if it
231
    wants to use the local cluster hook-scripts somehow.
232

233
    Args:
234
      phase: the hooks phase that has just been run
235
      hooks_results: the results of the multi-node hooks rpc call
236
      feedback_fn: function to send feedback back to the caller
237
      lu_result: the previous result this LU had, or None in the PRE phase.
238

239
    """
240
    return lu_result
241

    
242
  def _ExpandAndLockInstance(self):
243
    """Helper function to expand and lock an instance.
244

245
    Many LUs that work on an instance take its name in self.op.instance_name
246
    and need to expand it and then declare the expanded name for locking. This
247
    function does it, and then updates self.op.instance_name to the expanded
248
    name. It also initializes needed_locks as a dict, if this hasn't been done
249
    before.
250

251
    """
252
    if self.needed_locks is None:
253
      self.needed_locks = {}
254
    else:
255
      assert locking.LEVEL_INSTANCE not in self.needed_locks, \
256
        "_ExpandAndLockInstance called with instance-level locks set"
257
    expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
258
    if expanded_name is None:
259
      raise errors.OpPrereqError("Instance '%s' not known" %
260
                                  self.op.instance_name)
261
    self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
262
    self.op.instance_name = expanded_name
263

    
264
  def _LockInstancesNodes(self, primary_only=False):
265
    """Helper function to declare instances' nodes for locking.
266

267
    This function should be called after locking one or more instances to lock
268
    their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
269
    with all primary or secondary nodes for instances already locked and
270
    present in self.needed_locks[locking.LEVEL_INSTANCE].
271

272
    It should be called from DeclareLocks, and for safety only works if
273
    self.recalculate_locks[locking.LEVEL_NODE] is set.
274

275
    In the future it may grow parameters to just lock some instance's nodes, or
276
    to just lock primaries or secondary nodes, if needed.
277

278
    If should be called in DeclareLocks in a way similar to:
279

280
    if level == locking.LEVEL_NODE:
281
      self._LockInstancesNodes()
282

283
    @type primary_only: boolean
284
    @param primary_only: only lock primary nodes of locked instances
285

286
    """
287
    assert locking.LEVEL_NODE in self.recalculate_locks, \
288
      "_LockInstancesNodes helper function called with no nodes to recalculate"
289

    
290
    # TODO: check if we're really been called with the instance locks held
291

    
292
    # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
293
    # future we might want to have different behaviors depending on the value
294
    # of self.recalculate_locks[locking.LEVEL_NODE]
295
    wanted_nodes = []
296
    for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
297
      instance = self.context.cfg.GetInstanceInfo(instance_name)
298
      wanted_nodes.append(instance.primary_node)
299
      if not primary_only:
300
        wanted_nodes.extend(instance.secondary_nodes)
301

    
302
    if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
303
      self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
304
    elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
305
      self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
306

    
307
    del self.recalculate_locks[locking.LEVEL_NODE]
308

    
309

    
310
class NoHooksLU(LogicalUnit):
311
  """Simple LU which runs no hooks.
312

313
  This LU is intended as a parent for other LogicalUnits which will
314
  run no hooks, in order to reduce duplicate code.
315

316
  """
317
  HPATH = None
318
  HTYPE = None
319

    
320

    
321
def _GetWantedNodes(lu, nodes):
322
  """Returns list of checked and expanded node names.
323

324
  Args:
325
    nodes: List of nodes (strings) or None for all
326

327
  """
328
  if not isinstance(nodes, list):
329
    raise errors.OpPrereqError("Invalid argument type 'nodes'")
330

    
331
  if not nodes:
332
    raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
333
      " non-empty list of nodes whose name is to be expanded.")
334

    
335
  wanted = []
336
  for name in nodes:
337
    node = lu.cfg.ExpandNodeName(name)
338
    if node is None:
339
      raise errors.OpPrereqError("No such node name '%s'" % name)
340
    wanted.append(node)
341

    
342
  return utils.NiceSort(wanted)
343

    
344

    
345
def _GetWantedInstances(lu, instances):
346
  """Returns list of checked and expanded instance names.
347

348
  Args:
349
    instances: List of instances (strings) or None for all
350

351
  """
352
  if not isinstance(instances, list):
353
    raise errors.OpPrereqError("Invalid argument type 'instances'")
354

    
355
  if instances:
356
    wanted = []
357

    
358
    for name in instances:
359
      instance = lu.cfg.ExpandInstanceName(name)
360
      if instance is None:
361
        raise errors.OpPrereqError("No such instance name '%s'" % name)
362
      wanted.append(instance)
363

    
364
  else:
365
    wanted = lu.cfg.GetInstanceList()
366
  return utils.NiceSort(wanted)
367

    
368

    
369
def _CheckOutputFields(static, dynamic, selected):
370
  """Checks whether all selected fields are valid.
371

372
  Args:
373
    static: Static fields
374
    dynamic: Dynamic fields
375

376
  """
377
  static_fields = frozenset(static)
378
  dynamic_fields = frozenset(dynamic)
379

    
380
  all_fields = static_fields | dynamic_fields
381

    
382
  if not all_fields.issuperset(selected):
383
    raise errors.OpPrereqError("Unknown output fields selected: %s"
384
                               % ",".join(frozenset(selected).
385
                                          difference(all_fields)))
386

    
387

    
388
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
389
                          memory, vcpus, nics):
390
  """Builds instance related env variables for hooks from single variables.
391

392
  Args:
393
    secondary_nodes: List of secondary nodes as strings
394
  """
395
  env = {
396
    "OP_TARGET": name,
397
    "INSTANCE_NAME": name,
398
    "INSTANCE_PRIMARY": primary_node,
399
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
400
    "INSTANCE_OS_TYPE": os_type,
401
    "INSTANCE_STATUS": status,
402
    "INSTANCE_MEMORY": memory,
403
    "INSTANCE_VCPUS": vcpus,
404
  }
405

    
406
  if nics:
407
    nic_count = len(nics)
408
    for idx, (ip, bridge, mac) in enumerate(nics):
409
      if ip is None:
410
        ip = ""
411
      env["INSTANCE_NIC%d_IP" % idx] = ip
412
      env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
413
      env["INSTANCE_NIC%d_HWADDR" % idx] = mac
414
  else:
415
    nic_count = 0
416

    
417
  env["INSTANCE_NIC_COUNT"] = nic_count
418

    
419
  return env
420

    
421

    
422
def _BuildInstanceHookEnvByObject(instance, override=None):
423
  """Builds instance related env variables for hooks from an object.
424

425
  Args:
426
    instance: objects.Instance object of instance
427
    override: dict of values to override
428
  """
429
  args = {
430
    'name': instance.name,
431
    'primary_node': instance.primary_node,
432
    'secondary_nodes': instance.secondary_nodes,
433
    'os_type': instance.os,
434
    'status': instance.os,
435
    'memory': instance.memory,
436
    'vcpus': instance.vcpus,
437
    'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
438
  }
439
  if override:
440
    args.update(override)
441
  return _BuildInstanceHookEnv(**args)
442

    
443

    
444
def _CheckInstanceBridgesExist(instance):
445
  """Check that the brigdes needed by an instance exist.
446

447
  """
448
  # check bridges existance
449
  brlist = [nic.bridge for nic in instance.nics]
450
  if not rpc.call_bridges_exist(instance.primary_node, brlist):
451
    raise errors.OpPrereqError("one or more target bridges %s does not"
452
                               " exist on destination node '%s'" %
453
                               (brlist, instance.primary_node))
454

    
455

    
456
class LUDestroyCluster(NoHooksLU):
457
  """Logical unit for destroying the cluster.
458

459
  """
460
  _OP_REQP = []
461

    
462
  def CheckPrereq(self):
463
    """Check prerequisites.
464

465
    This checks whether the cluster is empty.
466

467
    Any errors are signalled by raising errors.OpPrereqError.
468

469
    """
470
    master = self.sstore.GetMasterNode()
471

    
472
    nodelist = self.cfg.GetNodeList()
473
    if len(nodelist) != 1 or nodelist[0] != master:
474
      raise errors.OpPrereqError("There are still %d node(s) in"
475
                                 " this cluster." % (len(nodelist) - 1))
476
    instancelist = self.cfg.GetInstanceList()
477
    if instancelist:
478
      raise errors.OpPrereqError("There are still %d instance(s) in"
479
                                 " this cluster." % len(instancelist))
480

    
481
  def Exec(self, feedback_fn):
482
    """Destroys the cluster.
483

484
    """
485
    master = self.sstore.GetMasterNode()
486
    if not rpc.call_node_stop_master(master, False):
487
      raise errors.OpExecError("Could not disable the master role")
488
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
489
    utils.CreateBackup(priv_key)
490
    utils.CreateBackup(pub_key)
491
    return master
492

    
493

    
494
class LUVerifyCluster(LogicalUnit):
495
  """Verifies the cluster status.
496

497
  """
498
  HPATH = "cluster-verify"
499
  HTYPE = constants.HTYPE_CLUSTER
500
  _OP_REQP = ["skip_checks"]
501
  REQ_BGL = False
502

    
503
  def ExpandNames(self):
504
    self.needed_locks = {
505
      locking.LEVEL_NODE: locking.ALL_SET,
506
      locking.LEVEL_INSTANCE: locking.ALL_SET,
507
    }
508
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
509

    
510
  def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
511
                  remote_version, feedback_fn):
512
    """Run multiple tests against a node.
513

514
    Test list:
515
      - compares ganeti version
516
      - checks vg existance and size > 20G
517
      - checks config file checksum
518
      - checks ssh to other nodes
519

520
    Args:
521
      node: name of the node to check
522
      file_list: required list of files
523
      local_cksum: dictionary of local files and their checksums
524

525
    """
526
    # compares ganeti version
527
    local_version = constants.PROTOCOL_VERSION
528
    if not remote_version:
529
      feedback_fn("  - ERROR: connection to %s failed" % (node))
530
      return True
531

    
532
    if local_version != remote_version:
533
      feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
534
                      (local_version, node, remote_version))
535
      return True
536

    
537
    # checks vg existance and size > 20G
538

    
539
    bad = False
540
    if not vglist:
541
      feedback_fn("  - ERROR: unable to check volume groups on node %s." %
542
                      (node,))
543
      bad = True
544
    else:
545
      vgstatus = utils.CheckVolumeGroupSize(vglist, self.cfg.GetVGName(),
546
                                            constants.MIN_VG_SIZE)
547
      if vgstatus:
548
        feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
549
        bad = True
550

    
551
    # checks config file checksum
552
    # checks ssh to any
553

    
554
    if 'filelist' not in node_result:
555
      bad = True
556
      feedback_fn("  - ERROR: node hasn't returned file checksum data")
557
    else:
558
      remote_cksum = node_result['filelist']
559
      for file_name in file_list:
560
        if file_name not in remote_cksum:
561
          bad = True
562
          feedback_fn("  - ERROR: file '%s' missing" % file_name)
563
        elif remote_cksum[file_name] != local_cksum[file_name]:
564
          bad = True
565
          feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
566

    
567
    if 'nodelist' not in node_result:
568
      bad = True
569
      feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
570
    else:
571
      if node_result['nodelist']:
572
        bad = True
573
        for node in node_result['nodelist']:
574
          feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
575
                          (node, node_result['nodelist'][node]))
576
    if 'node-net-test' not in node_result:
577
      bad = True
578
      feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
579
    else:
580
      if node_result['node-net-test']:
581
        bad = True
582
        nlist = utils.NiceSort(node_result['node-net-test'].keys())
583
        for node in nlist:
584
          feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
585
                          (node, node_result['node-net-test'][node]))
586

    
587
    hyp_result = node_result.get('hypervisor', None)
588
    if hyp_result is not None:
589
      feedback_fn("  - ERROR: hypervisor verify failure: '%s'" % hyp_result)
590
    return bad
591

    
592
  def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
593
                      node_instance, feedback_fn):
594
    """Verify an instance.
595

596
    This function checks to see if the required block devices are
597
    available on the instance's node.
598

599
    """
600
    bad = False
601

    
602
    node_current = instanceconfig.primary_node
603

    
604
    node_vol_should = {}
605
    instanceconfig.MapLVsByNode(node_vol_should)
606

    
607
    for node in node_vol_should:
608
      for volume in node_vol_should[node]:
609
        if node not in node_vol_is or volume not in node_vol_is[node]:
610
          feedback_fn("  - ERROR: volume %s missing on node %s" %
611
                          (volume, node))
612
          bad = True
613

    
614
    if not instanceconfig.status == 'down':
615
      if (node_current not in node_instance or
616
          not instance in node_instance[node_current]):
617
        feedback_fn("  - ERROR: instance %s not running on node %s" %
618
                        (instance, node_current))
619
        bad = True
620

    
621
    for node in node_instance:
622
      if (not node == node_current):
623
        if instance in node_instance[node]:
624
          feedback_fn("  - ERROR: instance %s should not run on node %s" %
625
                          (instance, node))
626
          bad = True
627

    
628
    return bad
629

    
630
  def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
631
    """Verify if there are any unknown volumes in the cluster.
632

633
    The .os, .swap and backup volumes are ignored. All other volumes are
634
    reported as unknown.
635

636
    """
637
    bad = False
638

    
639
    for node in node_vol_is:
640
      for volume in node_vol_is[node]:
641
        if node not in node_vol_should or volume not in node_vol_should[node]:
642
          feedback_fn("  - ERROR: volume %s on node %s should not exist" %
643
                      (volume, node))
644
          bad = True
645
    return bad
646

    
647
  def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
648
    """Verify the list of running instances.
649

650
    This checks what instances are running but unknown to the cluster.
651

652
    """
653
    bad = False
654
    for node in node_instance:
655
      for runninginstance in node_instance[node]:
656
        if runninginstance not in instancelist:
657
          feedback_fn("  - ERROR: instance %s on node %s should not exist" %
658
                          (runninginstance, node))
659
          bad = True
660
    return bad
661

    
662
  def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
663
    """Verify N+1 Memory Resilience.
664

665
    Check that if one single node dies we can still start all the instances it
666
    was primary for.
667

668
    """
669
    bad = False
670

    
671
    for node, nodeinfo in node_info.iteritems():
672
      # This code checks that every node which is now listed as secondary has
673
      # enough memory to host all instances it is supposed to should a single
674
      # other node in the cluster fail.
675
      # FIXME: not ready for failover to an arbitrary node
676
      # FIXME: does not support file-backed instances
677
      # WARNING: we currently take into account down instances as well as up
678
      # ones, considering that even if they're down someone might want to start
679
      # them even in the event of a node failure.
680
      for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
681
        needed_mem = 0
682
        for instance in instances:
683
          needed_mem += instance_cfg[instance].memory
684
        if nodeinfo['mfree'] < needed_mem:
685
          feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
686
                      " failovers should node %s fail" % (node, prinode))
687
          bad = True
688
    return bad
689

    
690
  def CheckPrereq(self):
691
    """Check prerequisites.
692

693
    Transform the list of checks we're going to skip into a set and check that
694
    all its members are valid.
695

696
    """
697
    self.skip_set = frozenset(self.op.skip_checks)
698
    if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
699
      raise errors.OpPrereqError("Invalid checks to be skipped specified")
700

    
701
  def BuildHooksEnv(self):
702
    """Build hooks env.
703

704
    Cluster-Verify hooks just rone in the post phase and their failure makes
705
    the output be logged in the verify output and the verification to fail.
706

707
    """
708
    all_nodes = self.cfg.GetNodeList()
709
    # TODO: populate the environment with useful information for verify hooks
710
    env = {}
711
    return env, [], all_nodes
712

    
713
  def Exec(self, feedback_fn):
714
    """Verify integrity of cluster, performing various test on nodes.
715

716
    """
717
    bad = False
718
    feedback_fn("* Verifying global settings")
719
    for msg in self.cfg.VerifyConfig():
720
      feedback_fn("  - ERROR: %s" % msg)
721

    
722
    vg_name = self.cfg.GetVGName()
723
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
724
    nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
725
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
726
    i_non_redundant = [] # Non redundant instances
727
    node_volume = {}
728
    node_instance = {}
729
    node_info = {}
730
    instance_cfg = {}
731

    
732
    # FIXME: verify OS list
733
    # do local checksums
734
    file_names = list(self.sstore.GetFileList())
735
    file_names.append(constants.SSL_CERT_FILE)
736
    file_names.append(constants.CLUSTER_CONF_FILE)
737
    local_checksums = utils.FingerprintFiles(file_names)
738

    
739
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
740
    all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
741
    all_instanceinfo = rpc.call_instance_list(nodelist)
742
    all_vglist = rpc.call_vg_list(nodelist)
743
    node_verify_param = {
744
      'filelist': file_names,
745
      'nodelist': nodelist,
746
      'hypervisor': None,
747
      'node-net-test': [(node.name, node.primary_ip, node.secondary_ip)
748
                        for node in nodeinfo]
749
      }
750
    all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
751
    all_rversion = rpc.call_version(nodelist)
752
    all_ninfo = rpc.call_node_info(nodelist, self.cfg.GetVGName())
753

    
754
    for node in nodelist:
755
      feedback_fn("* Verifying node %s" % node)
756
      result = self._VerifyNode(node, file_names, local_checksums,
757
                                all_vglist[node], all_nvinfo[node],
758
                                all_rversion[node], feedback_fn)
759
      bad = bad or result
760

    
761
      # node_volume
762
      volumeinfo = all_volumeinfo[node]
763

    
764
      if isinstance(volumeinfo, basestring):
765
        feedback_fn("  - ERROR: LVM problem on node %s: %s" %
766
                    (node, volumeinfo[-400:].encode('string_escape')))
767
        bad = True
768
        node_volume[node] = {}
769
      elif not isinstance(volumeinfo, dict):
770
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
771
        bad = True
772
        continue
773
      else:
774
        node_volume[node] = volumeinfo
775

    
776
      # node_instance
777
      nodeinstance = all_instanceinfo[node]
778
      if type(nodeinstance) != list:
779
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
780
        bad = True
781
        continue
782

    
783
      node_instance[node] = nodeinstance
784

    
785
      # node_info
786
      nodeinfo = all_ninfo[node]
787
      if not isinstance(nodeinfo, dict):
788
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
789
        bad = True
790
        continue
791

    
792
      try:
793
        node_info[node] = {
794
          "mfree": int(nodeinfo['memory_free']),
795
          "dfree": int(nodeinfo['vg_free']),
796
          "pinst": [],
797
          "sinst": [],
798
          # dictionary holding all instances this node is secondary for,
799
          # grouped by their primary node. Each key is a cluster node, and each
800
          # value is a list of instances which have the key as primary and the
801
          # current node as secondary.  this is handy to calculate N+1 memory
802
          # availability if you can only failover from a primary to its
803
          # secondary.
804
          "sinst-by-pnode": {},
805
        }
806
      except ValueError:
807
        feedback_fn("  - ERROR: invalid value returned from node %s" % (node,))
808
        bad = True
809
        continue
810

    
811
    node_vol_should = {}
812

    
813
    for instance in instancelist:
814
      feedback_fn("* Verifying instance %s" % instance)
815
      inst_config = self.cfg.GetInstanceInfo(instance)
816
      result =  self._VerifyInstance(instance, inst_config, node_volume,
817
                                     node_instance, feedback_fn)
818
      bad = bad or result
819

    
820
      inst_config.MapLVsByNode(node_vol_should)
821

    
822
      instance_cfg[instance] = inst_config
823

    
824
      pnode = inst_config.primary_node
825
      if pnode in node_info:
826
        node_info[pnode]['pinst'].append(instance)
827
      else:
828
        feedback_fn("  - ERROR: instance %s, connection to primary node"
829
                    " %s failed" % (instance, pnode))
830
        bad = True
831

    
832
      # If the instance is non-redundant we cannot survive losing its primary
833
      # node, so we are not N+1 compliant. On the other hand we have no disk
834
      # templates with more than one secondary so that situation is not well
835
      # supported either.
836
      # FIXME: does not support file-backed instances
837
      if len(inst_config.secondary_nodes) == 0:
838
        i_non_redundant.append(instance)
839
      elif len(inst_config.secondary_nodes) > 1:
840
        feedback_fn("  - WARNING: multiple secondaries for instance %s"
841
                    % instance)
842

    
843
      for snode in inst_config.secondary_nodes:
844
        if snode in node_info:
845
          node_info[snode]['sinst'].append(instance)
846
          if pnode not in node_info[snode]['sinst-by-pnode']:
847
            node_info[snode]['sinst-by-pnode'][pnode] = []
848
          node_info[snode]['sinst-by-pnode'][pnode].append(instance)
849
        else:
850
          feedback_fn("  - ERROR: instance %s, connection to secondary node"
851
                      " %s failed" % (instance, snode))
852

    
853
    feedback_fn("* Verifying orphan volumes")
854
    result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
855
                                       feedback_fn)
856
    bad = bad or result
857

    
858
    feedback_fn("* Verifying remaining instances")
859
    result = self._VerifyOrphanInstances(instancelist, node_instance,
860
                                         feedback_fn)
861
    bad = bad or result
862

    
863
    if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
864
      feedback_fn("* Verifying N+1 Memory redundancy")
865
      result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
866
      bad = bad or result
867

    
868
    feedback_fn("* Other Notes")
869
    if i_non_redundant:
870
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
871
                  % len(i_non_redundant))
872

    
873
    return not bad
874

    
875
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
876
    """Analize the post-hooks' result, handle it, and send some
877
    nicely-formatted feedback back to the user.
878

879
    Args:
880
      phase: the hooks phase that has just been run
881
      hooks_results: the results of the multi-node hooks rpc call
882
      feedback_fn: function to send feedback back to the caller
883
      lu_result: previous Exec result
884

885
    """
886
    # We only really run POST phase hooks, and are only interested in
887
    # their results
888
    if phase == constants.HOOKS_PHASE_POST:
889
      # Used to change hooks' output to proper indentation
890
      indent_re = re.compile('^', re.M)
891
      feedback_fn("* Hooks Results")
892
      if not hooks_results:
893
        feedback_fn("  - ERROR: general communication failure")
894
        lu_result = 1
895
      else:
896
        for node_name in hooks_results:
897
          show_node_header = True
898
          res = hooks_results[node_name]
899
          if res is False or not isinstance(res, list):
900
            feedback_fn("    Communication failure")
901
            lu_result = 1
902
            continue
903
          for script, hkr, output in res:
904
            if hkr == constants.HKR_FAIL:
905
              # The node header is only shown once, if there are
906
              # failing hooks on that node
907
              if show_node_header:
908
                feedback_fn("  Node %s:" % node_name)
909
                show_node_header = False
910
              feedback_fn("    ERROR: Script %s failed, output:" % script)
911
              output = indent_re.sub('      ', output)
912
              feedback_fn("%s" % output)
913
              lu_result = 1
914

    
915
      return lu_result
916

    
917

    
918
class LUVerifyDisks(NoHooksLU):
919
  """Verifies the cluster disks status.
920

921
  """
922
  _OP_REQP = []
923
  REQ_BGL = False
924

    
925
  def ExpandNames(self):
926
    self.needed_locks = {
927
      locking.LEVEL_NODE: locking.ALL_SET,
928
      locking.LEVEL_INSTANCE: locking.ALL_SET,
929
    }
930
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
931

    
932
  def CheckPrereq(self):
933
    """Check prerequisites.
934

935
    This has no prerequisites.
936

937
    """
938
    pass
939

    
940
  def Exec(self, feedback_fn):
941
    """Verify integrity of cluster disks.
942

943
    """
944
    result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
945

    
946
    vg_name = self.cfg.GetVGName()
947
    nodes = utils.NiceSort(self.cfg.GetNodeList())
948
    instances = [self.cfg.GetInstanceInfo(name)
949
                 for name in self.cfg.GetInstanceList()]
950

    
951
    nv_dict = {}
952
    for inst in instances:
953
      inst_lvs = {}
954
      if (inst.status != "up" or
955
          inst.disk_template not in constants.DTS_NET_MIRROR):
956
        continue
957
      inst.MapLVsByNode(inst_lvs)
958
      # transform { iname: {node: [vol,],},} to {(node, vol): iname}
959
      for node, vol_list in inst_lvs.iteritems():
960
        for vol in vol_list:
961
          nv_dict[(node, vol)] = inst
962

    
963
    if not nv_dict:
964
      return result
965

    
966
    node_lvs = rpc.call_volume_list(nodes, vg_name)
967

    
968
    to_act = set()
969
    for node in nodes:
970
      # node_volume
971
      lvs = node_lvs[node]
972

    
973
      if isinstance(lvs, basestring):
974
        logger.Info("error enumerating LVs on node %s: %s" % (node, lvs))
975
        res_nlvm[node] = lvs
976
      elif not isinstance(lvs, dict):
977
        logger.Info("connection to node %s failed or invalid data returned" %
978
                    (node,))
979
        res_nodes.append(node)
980
        continue
981

    
982
      for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
983
        inst = nv_dict.pop((node, lv_name), None)
984
        if (not lv_online and inst is not None
985
            and inst.name not in res_instances):
986
          res_instances.append(inst.name)
987

    
988
    # any leftover items in nv_dict are missing LVs, let's arrange the
989
    # data better
990
    for key, inst in nv_dict.iteritems():
991
      if inst.name not in res_missing:
992
        res_missing[inst.name] = []
993
      res_missing[inst.name].append(key)
994

    
995
    return result
996

    
997

    
998
class LURenameCluster(LogicalUnit):
999
  """Rename the cluster.
1000

1001
  """
1002
  HPATH = "cluster-rename"
1003
  HTYPE = constants.HTYPE_CLUSTER
1004
  _OP_REQP = ["name"]
1005
  REQ_WSSTORE = True
1006

    
1007
  def BuildHooksEnv(self):
1008
    """Build hooks env.
1009

1010
    """
1011
    env = {
1012
      "OP_TARGET": self.sstore.GetClusterName(),
1013
      "NEW_NAME": self.op.name,
1014
      }
1015
    mn = self.sstore.GetMasterNode()
1016
    return env, [mn], [mn]
1017

    
1018
  def CheckPrereq(self):
1019
    """Verify that the passed name is a valid one.
1020

1021
    """
1022
    hostname = utils.HostInfo(self.op.name)
1023

    
1024
    new_name = hostname.name
1025
    self.ip = new_ip = hostname.ip
1026
    old_name = self.sstore.GetClusterName()
1027
    old_ip = self.sstore.GetMasterIP()
1028
    if new_name == old_name and new_ip == old_ip:
1029
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
1030
                                 " cluster has changed")
1031
    if new_ip != old_ip:
1032
      if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1033
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
1034
                                   " reachable on the network. Aborting." %
1035
                                   new_ip)
1036

    
1037
    self.op.name = new_name
1038

    
1039
  def Exec(self, feedback_fn):
1040
    """Rename the cluster.
1041

1042
    """
1043
    clustername = self.op.name
1044
    ip = self.ip
1045
    ss = self.sstore
1046

    
1047
    # shutdown the master IP
1048
    master = ss.GetMasterNode()
1049
    if not rpc.call_node_stop_master(master, False):
1050
      raise errors.OpExecError("Could not disable the master role")
1051

    
1052
    try:
1053
      # modify the sstore
1054
      ss.SetKey(ss.SS_MASTER_IP, ip)
1055
      ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
1056

    
1057
      # Distribute updated ss config to all nodes
1058
      myself = self.cfg.GetNodeInfo(master)
1059
      dist_nodes = self.cfg.GetNodeList()
1060
      if myself.name in dist_nodes:
1061
        dist_nodes.remove(myself.name)
1062

    
1063
      logger.Debug("Copying updated ssconf data to all nodes")
1064
      for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1065
        fname = ss.KeyToFilename(keyname)
1066
        result = rpc.call_upload_file(dist_nodes, fname)
1067
        for to_node in dist_nodes:
1068
          if not result[to_node]:
1069
            logger.Error("copy of file %s to node %s failed" %
1070
                         (fname, to_node))
1071
    finally:
1072
      if not rpc.call_node_start_master(master, False):
1073
        logger.Error("Could not re-enable the master role on the master,"
1074
                     " please restart manually.")
1075

    
1076

    
1077
def _RecursiveCheckIfLVMBased(disk):
1078
  """Check if the given disk or its children are lvm-based.
1079

1080
  Args:
1081
    disk: ganeti.objects.Disk object
1082

1083
  Returns:
1084
    boolean indicating whether a LD_LV dev_type was found or not
1085

1086
  """
1087
  if disk.children:
1088
    for chdisk in disk.children:
1089
      if _RecursiveCheckIfLVMBased(chdisk):
1090
        return True
1091
  return disk.dev_type == constants.LD_LV
1092

    
1093

    
1094
class LUSetClusterParams(LogicalUnit):
1095
  """Change the parameters of the cluster.
1096

1097
  """
1098
  HPATH = "cluster-modify"
1099
  HTYPE = constants.HTYPE_CLUSTER
1100
  _OP_REQP = []
1101

    
1102
  def BuildHooksEnv(self):
1103
    """Build hooks env.
1104

1105
    """
1106
    env = {
1107
      "OP_TARGET": self.sstore.GetClusterName(),
1108
      "NEW_VG_NAME": self.op.vg_name,
1109
      }
1110
    mn = self.sstore.GetMasterNode()
1111
    return env, [mn], [mn]
1112

    
1113
  def CheckPrereq(self):
1114
    """Check prerequisites.
1115

1116
    This checks whether the given params don't conflict and
1117
    if the given volume group is valid.
1118

1119
    """
1120
    if not self.op.vg_name:
1121
      instances = [self.cfg.GetInstanceInfo(name)
1122
                   for name in self.cfg.GetInstanceList()]
1123
      for inst in instances:
1124
        for disk in inst.disks:
1125
          if _RecursiveCheckIfLVMBased(disk):
1126
            raise errors.OpPrereqError("Cannot disable lvm storage while"
1127
                                       " lvm-based instances exist")
1128

    
1129
    # if vg_name not None, checks given volume group on all nodes
1130
    if self.op.vg_name:
1131
      node_list = self.cfg.GetNodeList()
1132
      vglist = rpc.call_vg_list(node_list)
1133
      for node in node_list:
1134
        vgstatus = utils.CheckVolumeGroupSize(vglist[node], self.op.vg_name,
1135
                                              constants.MIN_VG_SIZE)
1136
        if vgstatus:
1137
          raise errors.OpPrereqError("Error on node '%s': %s" %
1138
                                     (node, vgstatus))
1139

    
1140
  def Exec(self, feedback_fn):
1141
    """Change the parameters of the cluster.
1142

1143
    """
1144
    if self.op.vg_name != self.cfg.GetVGName():
1145
      self.cfg.SetVGName(self.op.vg_name)
1146
    else:
1147
      feedback_fn("Cluster LVM configuration already in desired"
1148
                  " state, not changing")
1149

    
1150

    
1151
def _WaitForSync(cfgw, instance, proc, oneshot=False, unlock=False):
1152
  """Sleep and poll for an instance's disk to sync.
1153

1154
  """
1155
  if not instance.disks:
1156
    return True
1157

    
1158
  if not oneshot:
1159
    proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1160

    
1161
  node = instance.primary_node
1162

    
1163
  for dev in instance.disks:
1164
    cfgw.SetDiskID(dev, node)
1165

    
1166
  retries = 0
1167
  while True:
1168
    max_time = 0
1169
    done = True
1170
    cumul_degraded = False
1171
    rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1172
    if not rstats:
1173
      proc.LogWarning("Can't get any data from node %s" % node)
1174
      retries += 1
1175
      if retries >= 10:
1176
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1177
                                 " aborting." % node)
1178
      time.sleep(6)
1179
      continue
1180
    retries = 0
1181
    for i in range(len(rstats)):
1182
      mstat = rstats[i]
1183
      if mstat is None:
1184
        proc.LogWarning("Can't compute data for node %s/%s" %
1185
                        (node, instance.disks[i].iv_name))
1186
        continue
1187
      # we ignore the ldisk parameter
1188
      perc_done, est_time, is_degraded, _ = mstat
1189
      cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1190
      if perc_done is not None:
1191
        done = False
1192
        if est_time is not None:
1193
          rem_time = "%d estimated seconds remaining" % est_time
1194
          max_time = est_time
1195
        else:
1196
          rem_time = "no time estimate"
1197
        proc.LogInfo("- device %s: %5.2f%% done, %s" %
1198
                     (instance.disks[i].iv_name, perc_done, rem_time))
1199
    if done or oneshot:
1200
      break
1201

    
1202
    time.sleep(min(60, max_time))
1203

    
1204
  if done:
1205
    proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1206
  return not cumul_degraded
1207

    
1208

    
1209
def _CheckDiskConsistency(cfgw, dev, node, on_primary, ldisk=False):
1210
  """Check that mirrors are not degraded.
1211

1212
  The ldisk parameter, if True, will change the test from the
1213
  is_degraded attribute (which represents overall non-ok status for
1214
  the device(s)) to the ldisk (representing the local storage status).
1215

1216
  """
1217
  cfgw.SetDiskID(dev, node)
1218
  if ldisk:
1219
    idx = 6
1220
  else:
1221
    idx = 5
1222

    
1223
  result = True
1224
  if on_primary or dev.AssembleOnSecondary():
1225
    rstats = rpc.call_blockdev_find(node, dev)
1226
    if not rstats:
1227
      logger.ToStderr("Node %s: Disk degraded, not found or node down" % node)
1228
      result = False
1229
    else:
1230
      result = result and (not rstats[idx])
1231
  if dev.children:
1232
    for child in dev.children:
1233
      result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1234

    
1235
  return result
1236

    
1237

    
1238
class LUDiagnoseOS(NoHooksLU):
1239
  """Logical unit for OS diagnose/query.
1240

1241
  """
1242
  _OP_REQP = ["output_fields", "names"]
1243
  REQ_BGL = False
1244

    
1245
  def ExpandNames(self):
1246
    if self.op.names:
1247
      raise errors.OpPrereqError("Selective OS query not supported")
1248

    
1249
    self.dynamic_fields = frozenset(["name", "valid", "node_status"])
1250
    _CheckOutputFields(static=[],
1251
                       dynamic=self.dynamic_fields,
1252
                       selected=self.op.output_fields)
1253

    
1254
    # Lock all nodes, in shared mode
1255
    self.needed_locks = {}
1256
    self.share_locks[locking.LEVEL_NODE] = 1
1257
    self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1258

    
1259
  def CheckPrereq(self):
1260
    """Check prerequisites.
1261

1262
    """
1263

    
1264
  @staticmethod
1265
  def _DiagnoseByOS(node_list, rlist):
1266
    """Remaps a per-node return list into an a per-os per-node dictionary
1267

1268
      Args:
1269
        node_list: a list with the names of all nodes
1270
        rlist: a map with node names as keys and OS objects as values
1271

1272
      Returns:
1273
        map: a map with osnames as keys and as value another map, with
1274
             nodes as
1275
             keys and list of OS objects as values
1276
             e.g. {"debian-etch": {"node1": [<object>,...],
1277
                                   "node2": [<object>,]}
1278
                  }
1279

1280
    """
1281
    all_os = {}
1282
    for node_name, nr in rlist.iteritems():
1283
      if not nr:
1284
        continue
1285
      for os_obj in nr:
1286
        if os_obj.name not in all_os:
1287
          # build a list of nodes for this os containing empty lists
1288
          # for each node in node_list
1289
          all_os[os_obj.name] = {}
1290
          for nname in node_list:
1291
            all_os[os_obj.name][nname] = []
1292
        all_os[os_obj.name][node_name].append(os_obj)
1293
    return all_os
1294

    
1295
  def Exec(self, feedback_fn):
1296
    """Compute the list of OSes.
1297

1298
    """
1299
    node_list = self.acquired_locks[locking.LEVEL_NODE]
1300
    node_data = rpc.call_os_diagnose(node_list)
1301
    if node_data == False:
1302
      raise errors.OpExecError("Can't gather the list of OSes")
1303
    pol = self._DiagnoseByOS(node_list, node_data)
1304
    output = []
1305
    for os_name, os_data in pol.iteritems():
1306
      row = []
1307
      for field in self.op.output_fields:
1308
        if field == "name":
1309
          val = os_name
1310
        elif field == "valid":
1311
          val = utils.all([osl and osl[0] for osl in os_data.values()])
1312
        elif field == "node_status":
1313
          val = {}
1314
          for node_name, nos_list in os_data.iteritems():
1315
            val[node_name] = [(v.status, v.path) for v in nos_list]
1316
        else:
1317
          raise errors.ParameterError(field)
1318
        row.append(val)
1319
      output.append(row)
1320

    
1321
    return output
1322

    
1323

    
1324
class LURemoveNode(LogicalUnit):
1325
  """Logical unit for removing a node.
1326

1327
  """
1328
  HPATH = "node-remove"
1329
  HTYPE = constants.HTYPE_NODE
1330
  _OP_REQP = ["node_name"]
1331

    
1332
  def BuildHooksEnv(self):
1333
    """Build hooks env.
1334

1335
    This doesn't run on the target node in the pre phase as a failed
1336
    node would then be impossible to remove.
1337

1338
    """
1339
    env = {
1340
      "OP_TARGET": self.op.node_name,
1341
      "NODE_NAME": self.op.node_name,
1342
      }
1343
    all_nodes = self.cfg.GetNodeList()
1344
    all_nodes.remove(self.op.node_name)
1345
    return env, all_nodes, all_nodes
1346

    
1347
  def CheckPrereq(self):
1348
    """Check prerequisites.
1349

1350
    This checks:
1351
     - the node exists in the configuration
1352
     - it does not have primary or secondary instances
1353
     - it's not the master
1354

1355
    Any errors are signalled by raising errors.OpPrereqError.
1356

1357
    """
1358
    node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1359
    if node is None:
1360
      raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1361

    
1362
    instance_list = self.cfg.GetInstanceList()
1363

    
1364
    masternode = self.sstore.GetMasterNode()
1365
    if node.name == masternode:
1366
      raise errors.OpPrereqError("Node is the master node,"
1367
                                 " you need to failover first.")
1368

    
1369
    for instance_name in instance_list:
1370
      instance = self.cfg.GetInstanceInfo(instance_name)
1371
      if node.name == instance.primary_node:
1372
        raise errors.OpPrereqError("Instance %s still running on the node,"
1373
                                   " please remove first." % instance_name)
1374
      if node.name in instance.secondary_nodes:
1375
        raise errors.OpPrereqError("Instance %s has node as a secondary,"
1376
                                   " please remove first." % instance_name)
1377
    self.op.node_name = node.name
1378
    self.node = node
1379

    
1380
  def Exec(self, feedback_fn):
1381
    """Removes the node from the cluster.
1382

1383
    """
1384
    node = self.node
1385
    logger.Info("stopping the node daemon and removing configs from node %s" %
1386
                node.name)
1387

    
1388
    self.context.RemoveNode(node.name)
1389

    
1390
    rpc.call_node_leave_cluster(node.name)
1391

    
1392

    
1393
class LUQueryNodes(NoHooksLU):
1394
  """Logical unit for querying nodes.
1395

1396
  """
1397
  _OP_REQP = ["output_fields", "names"]
1398
  REQ_BGL = False
1399

    
1400
  def ExpandNames(self):
1401
    self.dynamic_fields = frozenset([
1402
      "dtotal", "dfree",
1403
      "mtotal", "mnode", "mfree",
1404
      "bootid",
1405
      "ctotal",
1406
      ])
1407

    
1408
    self.static_fields = frozenset([
1409
      "name", "pinst_cnt", "sinst_cnt",
1410
      "pinst_list", "sinst_list",
1411
      "pip", "sip", "tags",
1412
      ])
1413

    
1414
    _CheckOutputFields(static=self.static_fields,
1415
                       dynamic=self.dynamic_fields,
1416
                       selected=self.op.output_fields)
1417

    
1418
    self.needed_locks = {}
1419
    self.share_locks[locking.LEVEL_NODE] = 1
1420

    
1421
    if self.op.names:
1422
      self.wanted = _GetWantedNodes(self, self.op.names)
1423
    else:
1424
      self.wanted = locking.ALL_SET
1425

    
1426
    self.do_locking = not self.static_fields.issuperset(self.op.output_fields)
1427
    if self.do_locking:
1428
      # if we don't request only static fields, we need to lock the nodes
1429
      self.needed_locks[locking.LEVEL_NODE] = self.wanted
1430

    
1431

    
1432
  def CheckPrereq(self):
1433
    """Check prerequisites.
1434

1435
    """
1436
    # The validation of the node list is done in the _GetWantedNodes,
1437
    # if non empty, and if empty, there's no validation to do
1438
    pass
1439

    
1440
  def Exec(self, feedback_fn):
1441
    """Computes the list of nodes and their attributes.
1442

1443
    """
1444
    all_info = self.cfg.GetAllNodesInfo()
1445
    if self.do_locking:
1446
      nodenames = self.acquired_locks[locking.LEVEL_NODE]
1447
    else:
1448
      nodenames = all_info.keys()
1449
    nodelist = [all_info[name] for name in nodenames]
1450

    
1451
    # begin data gathering
1452

    
1453
    if self.dynamic_fields.intersection(self.op.output_fields):
1454
      live_data = {}
1455
      node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1456
      for name in nodenames:
1457
        nodeinfo = node_data.get(name, None)
1458
        if nodeinfo:
1459
          live_data[name] = {
1460
            "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1461
            "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1462
            "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1463
            "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1464
            "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1465
            "ctotal": utils.TryConvert(int, nodeinfo['cpu_total']),
1466
            "bootid": nodeinfo['bootid'],
1467
            }
1468
        else:
1469
          live_data[name] = {}
1470
    else:
1471
      live_data = dict.fromkeys(nodenames, {})
1472

    
1473
    node_to_primary = dict([(name, set()) for name in nodenames])
1474
    node_to_secondary = dict([(name, set()) for name in nodenames])
1475

    
1476
    inst_fields = frozenset(("pinst_cnt", "pinst_list",
1477
                             "sinst_cnt", "sinst_list"))
1478
    if inst_fields & frozenset(self.op.output_fields):
1479
      instancelist = self.cfg.GetInstanceList()
1480

    
1481
      for instance_name in instancelist:
1482
        inst = self.cfg.GetInstanceInfo(instance_name)
1483
        if inst.primary_node in node_to_primary:
1484
          node_to_primary[inst.primary_node].add(inst.name)
1485
        for secnode in inst.secondary_nodes:
1486
          if secnode in node_to_secondary:
1487
            node_to_secondary[secnode].add(inst.name)
1488

    
1489
    # end data gathering
1490

    
1491
    output = []
1492
    for node in nodelist:
1493
      node_output = []
1494
      for field in self.op.output_fields:
1495
        if field == "name":
1496
          val = node.name
1497
        elif field == "pinst_list":
1498
          val = list(node_to_primary[node.name])
1499
        elif field == "sinst_list":
1500
          val = list(node_to_secondary[node.name])
1501
        elif field == "pinst_cnt":
1502
          val = len(node_to_primary[node.name])
1503
        elif field == "sinst_cnt":
1504
          val = len(node_to_secondary[node.name])
1505
        elif field == "pip":
1506
          val = node.primary_ip
1507
        elif field == "sip":
1508
          val = node.secondary_ip
1509
        elif field == "tags":
1510
          val = list(node.GetTags())
1511
        elif field in self.dynamic_fields:
1512
          val = live_data[node.name].get(field, None)
1513
        else:
1514
          raise errors.ParameterError(field)
1515
        node_output.append(val)
1516
      output.append(node_output)
1517

    
1518
    return output
1519

    
1520

    
1521
class LUQueryNodeVolumes(NoHooksLU):
1522
  """Logical unit for getting volumes on node(s).
1523

1524
  """
1525
  _OP_REQP = ["nodes", "output_fields"]
1526
  REQ_BGL = False
1527

    
1528
  def ExpandNames(self):
1529
    _CheckOutputFields(static=["node"],
1530
                       dynamic=["phys", "vg", "name", "size", "instance"],
1531
                       selected=self.op.output_fields)
1532

    
1533
    self.needed_locks = {}
1534
    self.share_locks[locking.LEVEL_NODE] = 1
1535
    if not self.op.nodes:
1536
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1537
    else:
1538
      self.needed_locks[locking.LEVEL_NODE] = \
1539
        _GetWantedNodes(self, self.op.nodes)
1540

    
1541
  def CheckPrereq(self):
1542
    """Check prerequisites.
1543

1544
    This checks that the fields required are valid output fields.
1545

1546
    """
1547
    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
1548

    
1549
  def Exec(self, feedback_fn):
1550
    """Computes the list of nodes and their attributes.
1551

1552
    """
1553
    nodenames = self.nodes
1554
    volumes = rpc.call_node_volumes(nodenames)
1555

    
1556
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
1557
             in self.cfg.GetInstanceList()]
1558

    
1559
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1560

    
1561
    output = []
1562
    for node in nodenames:
1563
      if node not in volumes or not volumes[node]:
1564
        continue
1565

    
1566
      node_vols = volumes[node][:]
1567
      node_vols.sort(key=lambda vol: vol['dev'])
1568

    
1569
      for vol in node_vols:
1570
        node_output = []
1571
        for field in self.op.output_fields:
1572
          if field == "node":
1573
            val = node
1574
          elif field == "phys":
1575
            val = vol['dev']
1576
          elif field == "vg":
1577
            val = vol['vg']
1578
          elif field == "name":
1579
            val = vol['name']
1580
          elif field == "size":
1581
            val = int(float(vol['size']))
1582
          elif field == "instance":
1583
            for inst in ilist:
1584
              if node not in lv_by_node[inst]:
1585
                continue
1586
              if vol['name'] in lv_by_node[inst][node]:
1587
                val = inst.name
1588
                break
1589
            else:
1590
              val = '-'
1591
          else:
1592
            raise errors.ParameterError(field)
1593
          node_output.append(str(val))
1594

    
1595
        output.append(node_output)
1596

    
1597
    return output
1598

    
1599

    
1600
class LUAddNode(LogicalUnit):
1601
  """Logical unit for adding node to the cluster.
1602

1603
  """
1604
  HPATH = "node-add"
1605
  HTYPE = constants.HTYPE_NODE
1606
  _OP_REQP = ["node_name"]
1607

    
1608
  def BuildHooksEnv(self):
1609
    """Build hooks env.
1610

1611
    This will run on all nodes before, and on all nodes + the new node after.
1612

1613
    """
1614
    env = {
1615
      "OP_TARGET": self.op.node_name,
1616
      "NODE_NAME": self.op.node_name,
1617
      "NODE_PIP": self.op.primary_ip,
1618
      "NODE_SIP": self.op.secondary_ip,
1619
      }
1620
    nodes_0 = self.cfg.GetNodeList()
1621
    nodes_1 = nodes_0 + [self.op.node_name, ]
1622
    return env, nodes_0, nodes_1
1623

    
1624
  def CheckPrereq(self):
1625
    """Check prerequisites.
1626

1627
    This checks:
1628
     - the new node is not already in the config
1629
     - it is resolvable
1630
     - its parameters (single/dual homed) matches the cluster
1631

1632
    Any errors are signalled by raising errors.OpPrereqError.
1633

1634
    """
1635
    node_name = self.op.node_name
1636
    cfg = self.cfg
1637

    
1638
    dns_data = utils.HostInfo(node_name)
1639

    
1640
    node = dns_data.name
1641
    primary_ip = self.op.primary_ip = dns_data.ip
1642
    secondary_ip = getattr(self.op, "secondary_ip", None)
1643
    if secondary_ip is None:
1644
      secondary_ip = primary_ip
1645
    if not utils.IsValidIP(secondary_ip):
1646
      raise errors.OpPrereqError("Invalid secondary IP given")
1647
    self.op.secondary_ip = secondary_ip
1648

    
1649
    node_list = cfg.GetNodeList()
1650
    if not self.op.readd and node in node_list:
1651
      raise errors.OpPrereqError("Node %s is already in the configuration" %
1652
                                 node)
1653
    elif self.op.readd and node not in node_list:
1654
      raise errors.OpPrereqError("Node %s is not in the configuration" % node)
1655

    
1656
    for existing_node_name in node_list:
1657
      existing_node = cfg.GetNodeInfo(existing_node_name)
1658

    
1659
      if self.op.readd and node == existing_node_name:
1660
        if (existing_node.primary_ip != primary_ip or
1661
            existing_node.secondary_ip != secondary_ip):
1662
          raise errors.OpPrereqError("Readded node doesn't have the same IP"
1663
                                     " address configuration as before")
1664
        continue
1665

    
1666
      if (existing_node.primary_ip == primary_ip or
1667
          existing_node.secondary_ip == primary_ip or
1668
          existing_node.primary_ip == secondary_ip or
1669
          existing_node.secondary_ip == secondary_ip):
1670
        raise errors.OpPrereqError("New node ip address(es) conflict with"
1671
                                   " existing node %s" % existing_node.name)
1672

    
1673
    # check that the type of the node (single versus dual homed) is the
1674
    # same as for the master
1675
    myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1676
    master_singlehomed = myself.secondary_ip == myself.primary_ip
1677
    newbie_singlehomed = secondary_ip == primary_ip
1678
    if master_singlehomed != newbie_singlehomed:
1679
      if master_singlehomed:
1680
        raise errors.OpPrereqError("The master has no private ip but the"
1681
                                   " new node has one")
1682
      else:
1683
        raise errors.OpPrereqError("The master has a private ip but the"
1684
                                   " new node doesn't have one")
1685

    
1686
    # checks reachablity
1687
    if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1688
      raise errors.OpPrereqError("Node not reachable by ping")
1689

    
1690
    if not newbie_singlehomed:
1691
      # check reachability from my secondary ip to newbie's secondary ip
1692
      if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1693
                           source=myself.secondary_ip):
1694
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1695
                                   " based ping to noded port")
1696

    
1697
    self.new_node = objects.Node(name=node,
1698
                                 primary_ip=primary_ip,
1699
                                 secondary_ip=secondary_ip)
1700

    
1701
  def Exec(self, feedback_fn):
1702
    """Adds the new node to the cluster.
1703

1704
    """
1705
    new_node = self.new_node
1706
    node = new_node.name
1707

    
1708
    # check connectivity
1709
    result = rpc.call_version([node])[node]
1710
    if result:
1711
      if constants.PROTOCOL_VERSION == result:
1712
        logger.Info("communication to node %s fine, sw version %s match" %
1713
                    (node, result))
1714
      else:
1715
        raise errors.OpExecError("Version mismatch master version %s,"
1716
                                 " node version %s" %
1717
                                 (constants.PROTOCOL_VERSION, result))
1718
    else:
1719
      raise errors.OpExecError("Cannot get version from the new node")
1720

    
1721
    # setup ssh on node
1722
    logger.Info("copy ssh key to node %s" % node)
1723
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1724
    keyarray = []
1725
    keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1726
                constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1727
                priv_key, pub_key]
1728

    
1729
    for i in keyfiles:
1730
      f = open(i, 'r')
1731
      try:
1732
        keyarray.append(f.read())
1733
      finally:
1734
        f.close()
1735

    
1736
    result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1737
                               keyarray[3], keyarray[4], keyarray[5])
1738

    
1739
    if not result:
1740
      raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1741

    
1742
    # Add node to our /etc/hosts, and add key to known_hosts
1743
    utils.AddHostToEtcHosts(new_node.name)
1744

    
1745
    if new_node.secondary_ip != new_node.primary_ip:
1746
      if not rpc.call_node_tcp_ping(new_node.name,
1747
                                    constants.LOCALHOST_IP_ADDRESS,
1748
                                    new_node.secondary_ip,
1749
                                    constants.DEFAULT_NODED_PORT,
1750
                                    10, False):
1751
        raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1752
                                 " you gave (%s). Please fix and re-run this"
1753
                                 " command." % new_node.secondary_ip)
1754

    
1755
    node_verify_list = [self.sstore.GetMasterNode()]
1756
    node_verify_param = {
1757
      'nodelist': [node],
1758
      # TODO: do a node-net-test as well?
1759
    }
1760

    
1761
    result = rpc.call_node_verify(node_verify_list, node_verify_param)
1762
    for verifier in node_verify_list:
1763
      if not result[verifier]:
1764
        raise errors.OpExecError("Cannot communicate with %s's node daemon"
1765
                                 " for remote verification" % verifier)
1766
      if result[verifier]['nodelist']:
1767
        for failed in result[verifier]['nodelist']:
1768
          feedback_fn("ssh/hostname verification failed %s -> %s" %
1769
                      (verifier, result[verifier]['nodelist'][failed]))
1770
        raise errors.OpExecError("ssh/hostname verification failed.")
1771

    
1772
    # Distribute updated /etc/hosts and known_hosts to all nodes,
1773
    # including the node just added
1774
    myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1775
    dist_nodes = self.cfg.GetNodeList()
1776
    if not self.op.readd:
1777
      dist_nodes.append(node)
1778
    if myself.name in dist_nodes:
1779
      dist_nodes.remove(myself.name)
1780

    
1781
    logger.Debug("Copying hosts and known_hosts to all nodes")
1782
    for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1783
      result = rpc.call_upload_file(dist_nodes, fname)
1784
      for to_node in dist_nodes:
1785
        if not result[to_node]:
1786
          logger.Error("copy of file %s to node %s failed" %
1787
                       (fname, to_node))
1788

    
1789
    to_copy = self.sstore.GetFileList()
1790
    if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1791
      to_copy.append(constants.VNC_PASSWORD_FILE)
1792
    for fname in to_copy:
1793
      result = rpc.call_upload_file([node], fname)
1794
      if not result[node]:
1795
        logger.Error("could not copy file %s to node %s" % (fname, node))
1796

    
1797
    if self.op.readd:
1798
      self.context.ReaddNode(new_node)
1799
    else:
1800
      self.context.AddNode(new_node)
1801

    
1802

    
1803
class LUQueryClusterInfo(NoHooksLU):
1804
  """Query cluster configuration.
1805

1806
  """
1807
  _OP_REQP = []
1808
  REQ_MASTER = False
1809
  REQ_BGL = False
1810

    
1811
  def ExpandNames(self):
1812
    self.needed_locks = {}
1813

    
1814
  def CheckPrereq(self):
1815
    """No prerequsites needed for this LU.
1816

1817
    """
1818
    pass
1819

    
1820
  def Exec(self, feedback_fn):
1821
    """Return cluster config.
1822

1823
    """
1824
    result = {
1825
      "name": self.sstore.GetClusterName(),
1826
      "software_version": constants.RELEASE_VERSION,
1827
      "protocol_version": constants.PROTOCOL_VERSION,
1828
      "config_version": constants.CONFIG_VERSION,
1829
      "os_api_version": constants.OS_API_VERSION,
1830
      "export_version": constants.EXPORT_VERSION,
1831
      "master": self.sstore.GetMasterNode(),
1832
      "architecture": (platform.architecture()[0], platform.machine()),
1833
      "hypervisor_type": self.sstore.GetHypervisorType(),
1834
      }
1835

    
1836
    return result
1837

    
1838

    
1839
class LUDumpClusterConfig(NoHooksLU):
1840
  """Return a text-representation of the cluster-config.
1841

1842
  """
1843
  _OP_REQP = []
1844
  REQ_BGL = False
1845

    
1846
  def ExpandNames(self):
1847
    self.needed_locks = {}
1848

    
1849
  def CheckPrereq(self):
1850
    """No prerequisites.
1851

1852
    """
1853
    pass
1854

    
1855
  def Exec(self, feedback_fn):
1856
    """Dump a representation of the cluster config to the standard output.
1857

1858
    """
1859
    return self.cfg.DumpConfig()
1860

    
1861

    
1862
class LUActivateInstanceDisks(NoHooksLU):
1863
  """Bring up an instance's disks.
1864

1865
  """
1866
  _OP_REQP = ["instance_name"]
1867
  REQ_BGL = False
1868

    
1869
  def ExpandNames(self):
1870
    self._ExpandAndLockInstance()
1871
    self.needed_locks[locking.LEVEL_NODE] = []
1872
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1873

    
1874
  def DeclareLocks(self, level):
1875
    if level == locking.LEVEL_NODE:
1876
      self._LockInstancesNodes()
1877

    
1878
  def CheckPrereq(self):
1879
    """Check prerequisites.
1880

1881
    This checks that the instance is in the cluster.
1882

1883
    """
1884
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
1885
    assert self.instance is not None, \
1886
      "Cannot retrieve locked instance %s" % self.op.instance_name
1887

    
1888
  def Exec(self, feedback_fn):
1889
    """Activate the disks.
1890

1891
    """
1892
    disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
1893
    if not disks_ok:
1894
      raise errors.OpExecError("Cannot activate block devices")
1895

    
1896
    return disks_info
1897

    
1898

    
1899
def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
1900
  """Prepare the block devices for an instance.
1901

1902
  This sets up the block devices on all nodes.
1903

1904
  Args:
1905
    instance: a ganeti.objects.Instance object
1906
    ignore_secondaries: if true, errors on secondary nodes won't result
1907
                        in an error return from the function
1908

1909
  Returns:
1910
    false if the operation failed
1911
    list of (host, instance_visible_name, node_visible_name) if the operation
1912
         suceeded with the mapping from node devices to instance devices
1913
  """
1914
  device_info = []
1915
  disks_ok = True
1916
  iname = instance.name
1917
  # With the two passes mechanism we try to reduce the window of
1918
  # opportunity for the race condition of switching DRBD to primary
1919
  # before handshaking occured, but we do not eliminate it
1920

    
1921
  # The proper fix would be to wait (with some limits) until the
1922
  # connection has been made and drbd transitions from WFConnection
1923
  # into any other network-connected state (Connected, SyncTarget,
1924
  # SyncSource, etc.)
1925

    
1926
  # 1st pass, assemble on all nodes in secondary mode
1927
  for inst_disk in instance.disks:
1928
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1929
      cfg.SetDiskID(node_disk, node)
1930
      result = rpc.call_blockdev_assemble(node, node_disk, iname, False)
1931
      if not result:
1932
        logger.Error("could not prepare block device %s on node %s"
1933
                     " (is_primary=False, pass=1)" % (inst_disk.iv_name, node))
1934
        if not ignore_secondaries:
1935
          disks_ok = False
1936

    
1937
  # FIXME: race condition on drbd migration to primary
1938

    
1939
  # 2nd pass, do only the primary node
1940
  for inst_disk in instance.disks:
1941
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1942
      if node != instance.primary_node:
1943
        continue
1944
      cfg.SetDiskID(node_disk, node)
1945
      result = rpc.call_blockdev_assemble(node, node_disk, iname, True)
1946
      if not result:
1947
        logger.Error("could not prepare block device %s on node %s"
1948
                     " (is_primary=True, pass=2)" % (inst_disk.iv_name, node))
1949
        disks_ok = False
1950
    device_info.append((instance.primary_node, inst_disk.iv_name, result))
1951

    
1952
  # leave the disks configured for the primary node
1953
  # this is a workaround that would be fixed better by
1954
  # improving the logical/physical id handling
1955
  for disk in instance.disks:
1956
    cfg.SetDiskID(disk, instance.primary_node)
1957

    
1958
  return disks_ok, device_info
1959

    
1960

    
1961
def _StartInstanceDisks(cfg, instance, force):
1962
  """Start the disks of an instance.
1963

1964
  """
1965
  disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
1966
                                           ignore_secondaries=force)
1967
  if not disks_ok:
1968
    _ShutdownInstanceDisks(instance, cfg)
1969
    if force is not None and not force:
1970
      logger.Error("If the message above refers to a secondary node,"
1971
                   " you can retry the operation using '--force'.")
1972
    raise errors.OpExecError("Disk consistency error")
1973

    
1974

    
1975
class LUDeactivateInstanceDisks(NoHooksLU):
1976
  """Shutdown an instance's disks.
1977

1978
  """
1979
  _OP_REQP = ["instance_name"]
1980
  REQ_BGL = False
1981

    
1982
  def ExpandNames(self):
1983
    self._ExpandAndLockInstance()
1984
    self.needed_locks[locking.LEVEL_NODE] = []
1985
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1986

    
1987
  def DeclareLocks(self, level):
1988
    if level == locking.LEVEL_NODE:
1989
      self._LockInstancesNodes()
1990

    
1991
  def CheckPrereq(self):
1992
    """Check prerequisites.
1993

1994
    This checks that the instance is in the cluster.
1995

1996
    """
1997
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
1998
    assert self.instance is not None, \
1999
      "Cannot retrieve locked instance %s" % self.op.instance_name
2000

    
2001
  def Exec(self, feedback_fn):
2002
    """Deactivate the disks
2003

2004
    """
2005
    instance = self.instance
2006
    _SafeShutdownInstanceDisks(instance, self.cfg)
2007

    
2008

    
2009
def _SafeShutdownInstanceDisks(instance, cfg):
2010
  """Shutdown block devices of an instance.
2011

2012
  This function checks if an instance is running, before calling
2013
  _ShutdownInstanceDisks.
2014

2015
  """
2016
  ins_l = rpc.call_instance_list([instance.primary_node])
2017
  ins_l = ins_l[instance.primary_node]
2018
  if not type(ins_l) is list:
2019
    raise errors.OpExecError("Can't contact node '%s'" %
2020
                             instance.primary_node)
2021

    
2022
  if instance.name in ins_l:
2023
    raise errors.OpExecError("Instance is running, can't shutdown"
2024
                             " block devices.")
2025

    
2026
  _ShutdownInstanceDisks(instance, cfg)
2027

    
2028

    
2029
def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
2030
  """Shutdown block devices of an instance.
2031

2032
  This does the shutdown on all nodes of the instance.
2033

2034
  If the ignore_primary is false, errors on the primary node are
2035
  ignored.
2036

2037
  """
2038
  result = True
2039
  for disk in instance.disks:
2040
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2041
      cfg.SetDiskID(top_disk, node)
2042
      if not rpc.call_blockdev_shutdown(node, top_disk):
2043
        logger.Error("could not shutdown block device %s on node %s" %
2044
                     (disk.iv_name, node))
2045
        if not ignore_primary or node != instance.primary_node:
2046
          result = False
2047
  return result
2048

    
2049

    
2050
def _CheckNodeFreeMemory(cfg, node, reason, requested):
2051
  """Checks if a node has enough free memory.
2052

2053
  This function check if a given node has the needed amount of free
2054
  memory. In case the node has less memory or we cannot get the
2055
  information from the node, this function raise an OpPrereqError
2056
  exception.
2057

2058
  Args:
2059
    - cfg: a ConfigWriter instance
2060
    - node: the node name
2061
    - reason: string to use in the error message
2062
    - requested: the amount of memory in MiB
2063

2064
  """
2065
  nodeinfo = rpc.call_node_info([node], cfg.GetVGName())
2066
  if not nodeinfo or not isinstance(nodeinfo, dict):
2067
    raise errors.OpPrereqError("Could not contact node %s for resource"
2068
                             " information" % (node,))
2069

    
2070
  free_mem = nodeinfo[node].get('memory_free')
2071
  if not isinstance(free_mem, int):
2072
    raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2073
                             " was '%s'" % (node, free_mem))
2074
  if requested > free_mem:
2075
    raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2076
                             " needed %s MiB, available %s MiB" %
2077
                             (node, reason, requested, free_mem))
2078

    
2079

    
2080
class LUStartupInstance(LogicalUnit):
2081
  """Starts an instance.
2082

2083
  """
2084
  HPATH = "instance-start"
2085
  HTYPE = constants.HTYPE_INSTANCE
2086
  _OP_REQP = ["instance_name", "force"]
2087
  REQ_BGL = False
2088

    
2089
  def ExpandNames(self):
2090
    self._ExpandAndLockInstance()
2091
    self.needed_locks[locking.LEVEL_NODE] = []
2092
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2093

    
2094
  def DeclareLocks(self, level):
2095
    if level == locking.LEVEL_NODE:
2096
      self._LockInstancesNodes()
2097

    
2098
  def BuildHooksEnv(self):
2099
    """Build hooks env.
2100

2101
    This runs on master, primary and secondary nodes of the instance.
2102

2103
    """
2104
    env = {
2105
      "FORCE": self.op.force,
2106
      }
2107
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2108
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2109
          list(self.instance.secondary_nodes))
2110
    return env, nl, nl
2111

    
2112
  def CheckPrereq(self):
2113
    """Check prerequisites.
2114

2115
    This checks that the instance is in the cluster.
2116

2117
    """
2118
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2119
    assert self.instance is not None, \
2120
      "Cannot retrieve locked instance %s" % self.op.instance_name
2121

    
2122
    # check bridges existance
2123
    _CheckInstanceBridgesExist(instance)
2124

    
2125
    _CheckNodeFreeMemory(self.cfg, instance.primary_node,
2126
                         "starting instance %s" % instance.name,
2127
                         instance.memory)
2128

    
2129
  def Exec(self, feedback_fn):
2130
    """Start the instance.
2131

2132
    """
2133
    instance = self.instance
2134
    force = self.op.force
2135
    extra_args = getattr(self.op, "extra_args", "")
2136

    
2137
    self.cfg.MarkInstanceUp(instance.name)
2138

    
2139
    node_current = instance.primary_node
2140

    
2141
    _StartInstanceDisks(self.cfg, instance, force)
2142

    
2143
    if not rpc.call_instance_start(node_current, instance, extra_args):
2144
      _ShutdownInstanceDisks(instance, self.cfg)
2145
      raise errors.OpExecError("Could not start instance")
2146

    
2147

    
2148
class LURebootInstance(LogicalUnit):
2149
  """Reboot an instance.
2150

2151
  """
2152
  HPATH = "instance-reboot"
2153
  HTYPE = constants.HTYPE_INSTANCE
2154
  _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2155
  REQ_BGL = False
2156

    
2157
  def ExpandNames(self):
2158
    if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2159
                                   constants.INSTANCE_REBOOT_HARD,
2160
                                   constants.INSTANCE_REBOOT_FULL]:
2161
      raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2162
                                  (constants.INSTANCE_REBOOT_SOFT,
2163
                                   constants.INSTANCE_REBOOT_HARD,
2164
                                   constants.INSTANCE_REBOOT_FULL))
2165
    self._ExpandAndLockInstance()
2166
    self.needed_locks[locking.LEVEL_NODE] = []
2167
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2168

    
2169
  def DeclareLocks(self, level):
2170
    if level == locking.LEVEL_NODE:
2171
      primary_only = not constants.INSTANCE_REBOOT_FULL
2172
      self._LockInstancesNodes(primary_only=primary_only)
2173

    
2174
  def BuildHooksEnv(self):
2175
    """Build hooks env.
2176

2177
    This runs on master, primary and secondary nodes of the instance.
2178

2179
    """
2180
    env = {
2181
      "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2182
      }
2183
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2184
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2185
          list(self.instance.secondary_nodes))
2186
    return env, nl, nl
2187

    
2188
  def CheckPrereq(self):
2189
    """Check prerequisites.
2190

2191
    This checks that the instance is in the cluster.
2192

2193
    """
2194
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2195
    assert self.instance is not None, \
2196
      "Cannot retrieve locked instance %s" % self.op.instance_name
2197

    
2198
    # check bridges existance
2199
    _CheckInstanceBridgesExist(instance)
2200

    
2201
  def Exec(self, feedback_fn):
2202
    """Reboot the instance.
2203

2204
    """
2205
    instance = self.instance
2206
    ignore_secondaries = self.op.ignore_secondaries
2207
    reboot_type = self.op.reboot_type
2208
    extra_args = getattr(self.op, "extra_args", "")
2209

    
2210
    node_current = instance.primary_node
2211

    
2212
    if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2213
                       constants.INSTANCE_REBOOT_HARD]:
2214
      if not rpc.call_instance_reboot(node_current, instance,
2215
                                      reboot_type, extra_args):
2216
        raise errors.OpExecError("Could not reboot instance")
2217
    else:
2218
      if not rpc.call_instance_shutdown(node_current, instance):
2219
        raise errors.OpExecError("could not shutdown instance for full reboot")
2220
      _ShutdownInstanceDisks(instance, self.cfg)
2221
      _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
2222
      if not rpc.call_instance_start(node_current, instance, extra_args):
2223
        _ShutdownInstanceDisks(instance, self.cfg)
2224
        raise errors.OpExecError("Could not start instance for full reboot")
2225

    
2226
    self.cfg.MarkInstanceUp(instance.name)
2227

    
2228

    
2229
class LUShutdownInstance(LogicalUnit):
2230
  """Shutdown an instance.
2231

2232
  """
2233
  HPATH = "instance-stop"
2234
  HTYPE = constants.HTYPE_INSTANCE
2235
  _OP_REQP = ["instance_name"]
2236
  REQ_BGL = False
2237

    
2238
  def ExpandNames(self):
2239
    self._ExpandAndLockInstance()
2240
    self.needed_locks[locking.LEVEL_NODE] = []
2241
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2242

    
2243
  def DeclareLocks(self, level):
2244
    if level == locking.LEVEL_NODE:
2245
      self._LockInstancesNodes()
2246

    
2247
  def BuildHooksEnv(self):
2248
    """Build hooks env.
2249

2250
    This runs on master, primary and secondary nodes of the instance.
2251

2252
    """
2253
    env = _BuildInstanceHookEnvByObject(self.instance)
2254
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2255
          list(self.instance.secondary_nodes))
2256
    return env, nl, nl
2257

    
2258
  def CheckPrereq(self):
2259
    """Check prerequisites.
2260

2261
    This checks that the instance is in the cluster.
2262

2263
    """
2264
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2265
    assert self.instance is not None, \
2266
      "Cannot retrieve locked instance %s" % self.op.instance_name
2267

    
2268
  def Exec(self, feedback_fn):
2269
    """Shutdown the instance.
2270

2271
    """
2272
    instance = self.instance
2273
    node_current = instance.primary_node
2274
    self.cfg.MarkInstanceDown(instance.name)
2275
    if not rpc.call_instance_shutdown(node_current, instance):
2276
      logger.Error("could not shutdown instance")
2277

    
2278
    _ShutdownInstanceDisks(instance, self.cfg)
2279

    
2280

    
2281
class LUReinstallInstance(LogicalUnit):
2282
  """Reinstall an instance.
2283

2284
  """
2285
  HPATH = "instance-reinstall"
2286
  HTYPE = constants.HTYPE_INSTANCE
2287
  _OP_REQP = ["instance_name"]
2288
  REQ_BGL = False
2289

    
2290
  def ExpandNames(self):
2291
    self._ExpandAndLockInstance()
2292
    self.needed_locks[locking.LEVEL_NODE] = []
2293
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2294

    
2295
  def DeclareLocks(self, level):
2296
    if level == locking.LEVEL_NODE:
2297
      self._LockInstancesNodes()
2298

    
2299
  def BuildHooksEnv(self):
2300
    """Build hooks env.
2301

2302
    This runs on master, primary and secondary nodes of the instance.
2303

2304
    """
2305
    env = _BuildInstanceHookEnvByObject(self.instance)
2306
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2307
          list(self.instance.secondary_nodes))
2308
    return env, nl, nl
2309

    
2310
  def CheckPrereq(self):
2311
    """Check prerequisites.
2312

2313
    This checks that the instance is in the cluster and is not running.
2314

2315
    """
2316
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2317
    assert instance is not None, \
2318
      "Cannot retrieve locked instance %s" % self.op.instance_name
2319

    
2320
    if instance.disk_template == constants.DT_DISKLESS:
2321
      raise errors.OpPrereqError("Instance '%s' has no disks" %
2322
                                 self.op.instance_name)
2323
    if instance.status != "down":
2324
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2325
                                 self.op.instance_name)
2326
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2327
    if remote_info:
2328
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2329
                                 (self.op.instance_name,
2330
                                  instance.primary_node))
2331

    
2332
    self.op.os_type = getattr(self.op, "os_type", None)
2333
    if self.op.os_type is not None:
2334
      # OS verification
2335
      pnode = self.cfg.GetNodeInfo(
2336
        self.cfg.ExpandNodeName(instance.primary_node))
2337
      if pnode is None:
2338
        raise errors.OpPrereqError("Primary node '%s' is unknown" %
2339
                                   self.op.pnode)
2340
      os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2341
      if not os_obj:
2342
        raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2343
                                   " primary node"  % self.op.os_type)
2344

    
2345
    self.instance = instance
2346

    
2347
  def Exec(self, feedback_fn):
2348
    """Reinstall the instance.
2349

2350
    """
2351
    inst = self.instance
2352

    
2353
    if self.op.os_type is not None:
2354
      feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2355
      inst.os = self.op.os_type
2356
      self.cfg.AddInstance(inst)
2357

    
2358
    _StartInstanceDisks(self.cfg, inst, None)
2359
    try:
2360
      feedback_fn("Running the instance OS create scripts...")
2361
      if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2362
        raise errors.OpExecError("Could not install OS for instance %s"
2363
                                 " on node %s" %
2364
                                 (inst.name, inst.primary_node))
2365
    finally:
2366
      _ShutdownInstanceDisks(inst, self.cfg)
2367

    
2368

    
2369
class LURenameInstance(LogicalUnit):
2370
  """Rename an instance.
2371

2372
  """
2373
  HPATH = "instance-rename"
2374
  HTYPE = constants.HTYPE_INSTANCE
2375
  _OP_REQP = ["instance_name", "new_name"]
2376

    
2377
  def BuildHooksEnv(self):
2378
    """Build hooks env.
2379

2380
    This runs on master, primary and secondary nodes of the instance.
2381

2382
    """
2383
    env = _BuildInstanceHookEnvByObject(self.instance)
2384
    env["INSTANCE_NEW_NAME"] = self.op.new_name
2385
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2386
          list(self.instance.secondary_nodes))
2387
    return env, nl, nl
2388

    
2389
  def CheckPrereq(self):
2390
    """Check prerequisites.
2391

2392
    This checks that the instance is in the cluster and is not running.
2393

2394
    """
2395
    instance = self.cfg.GetInstanceInfo(
2396
      self.cfg.ExpandInstanceName(self.op.instance_name))
2397
    if instance is None:
2398
      raise errors.OpPrereqError("Instance '%s' not known" %
2399
                                 self.op.instance_name)
2400
    if instance.status != "down":
2401
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2402
                                 self.op.instance_name)
2403
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2404
    if remote_info:
2405
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2406
                                 (self.op.instance_name,
2407
                                  instance.primary_node))
2408
    self.instance = instance
2409

    
2410
    # new name verification
2411
    name_info = utils.HostInfo(self.op.new_name)
2412

    
2413
    self.op.new_name = new_name = name_info.name
2414
    instance_list = self.cfg.GetInstanceList()
2415
    if new_name in instance_list:
2416
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2417
                                 new_name)
2418

    
2419
    if not getattr(self.op, "ignore_ip", False):
2420
      if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
2421
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
2422
                                   (name_info.ip, new_name))
2423

    
2424

    
2425
  def Exec(self, feedback_fn):
2426
    """Reinstall the instance.
2427

2428
    """
2429
    inst = self.instance
2430
    old_name = inst.name
2431

    
2432
    if inst.disk_template == constants.DT_FILE:
2433
      old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2434

    
2435
    self.cfg.RenameInstance(inst.name, self.op.new_name)
2436
    # Change the instance lock. This is definitely safe while we hold the BGL
2437
    self.context.glm.remove(locking.LEVEL_INSTANCE, inst.name)
2438
    self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
2439

    
2440
    # re-read the instance from the configuration after rename
2441
    inst = self.cfg.GetInstanceInfo(self.op.new_name)
2442

    
2443
    if inst.disk_template == constants.DT_FILE:
2444
      new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2445
      result = rpc.call_file_storage_dir_rename(inst.primary_node,
2446
                                                old_file_storage_dir,
2447
                                                new_file_storage_dir)
2448

    
2449
      if not result:
2450
        raise errors.OpExecError("Could not connect to node '%s' to rename"
2451
                                 " directory '%s' to '%s' (but the instance"
2452
                                 " has been renamed in Ganeti)" % (
2453
                                 inst.primary_node, old_file_storage_dir,
2454
                                 new_file_storage_dir))
2455

    
2456
      if not result[0]:
2457
        raise errors.OpExecError("Could not rename directory '%s' to '%s'"
2458
                                 " (but the instance has been renamed in"
2459
                                 " Ganeti)" % (old_file_storage_dir,
2460
                                               new_file_storage_dir))
2461

    
2462
    _StartInstanceDisks(self.cfg, inst, None)
2463
    try:
2464
      if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2465
                                          "sda", "sdb"):
2466
        msg = ("Could not run OS rename script for instance %s on node %s"
2467
               " (but the instance has been renamed in Ganeti)" %
2468
               (inst.name, inst.primary_node))
2469
        logger.Error(msg)
2470
    finally:
2471
      _ShutdownInstanceDisks(inst, self.cfg)
2472

    
2473

    
2474
class LURemoveInstance(LogicalUnit):
2475
  """Remove an instance.
2476

2477
  """
2478
  HPATH = "instance-remove"
2479
  HTYPE = constants.HTYPE_INSTANCE
2480
  _OP_REQP = ["instance_name", "ignore_failures"]
2481

    
2482
  def BuildHooksEnv(self):
2483
    """Build hooks env.
2484

2485
    This runs on master, primary and secondary nodes of the instance.
2486

2487
    """
2488
    env = _BuildInstanceHookEnvByObject(self.instance)
2489
    nl = [self.sstore.GetMasterNode()]
2490
    return env, nl, nl
2491

    
2492
  def CheckPrereq(self):
2493
    """Check prerequisites.
2494

2495
    This checks that the instance is in the cluster.
2496

2497
    """
2498
    instance = self.cfg.GetInstanceInfo(
2499
      self.cfg.ExpandInstanceName(self.op.instance_name))
2500
    if instance is None:
2501
      raise errors.OpPrereqError("Instance '%s' not known" %
2502
                                 self.op.instance_name)
2503
    self.instance = instance
2504

    
2505
  def Exec(self, feedback_fn):
2506
    """Remove the instance.
2507

2508
    """
2509
    instance = self.instance
2510
    logger.Info("shutting down instance %s on node %s" %
2511
                (instance.name, instance.primary_node))
2512

    
2513
    if not rpc.call_instance_shutdown(instance.primary_node, instance):
2514
      if self.op.ignore_failures:
2515
        feedback_fn("Warning: can't shutdown instance")
2516
      else:
2517
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2518
                                 (instance.name, instance.primary_node))
2519

    
2520
    logger.Info("removing block devices for instance %s" % instance.name)
2521

    
2522
    if not _RemoveDisks(instance, self.cfg):
2523
      if self.op.ignore_failures:
2524
        feedback_fn("Warning: can't remove instance's disks")
2525
      else:
2526
        raise errors.OpExecError("Can't remove instance's disks")
2527

    
2528
    logger.Info("removing instance %s out of cluster config" % instance.name)
2529

    
2530
    self.cfg.RemoveInstance(instance.name)
2531
    # Remove the new instance from the Ganeti Lock Manager
2532
    self.context.glm.remove(locking.LEVEL_INSTANCE, instance.name)
2533

    
2534

    
2535
class LUQueryInstances(NoHooksLU):
2536
  """Logical unit for querying instances.
2537

2538
  """
2539
  _OP_REQP = ["output_fields", "names"]
2540
  REQ_BGL = False
2541

    
2542
  def ExpandNames(self):
2543
    self.dynamic_fields = frozenset(["oper_state", "oper_ram", "status"])
2544
    _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2545
                               "admin_state", "admin_ram",
2546
                               "disk_template", "ip", "mac", "bridge",
2547
                               "sda_size", "sdb_size", "vcpus", "tags",
2548
                               "auto_balance",
2549
                               "network_port", "kernel_path", "initrd_path",
2550
                               "hvm_boot_order", "hvm_acpi", "hvm_pae",
2551
                               "hvm_cdrom_image_path", "hvm_nic_type",
2552
                               "hvm_disk_type", "vnc_bind_address"],
2553
                       dynamic=self.dynamic_fields,
2554
                       selected=self.op.output_fields)
2555

    
2556
    self.needed_locks = {}
2557
    self.share_locks[locking.LEVEL_INSTANCE] = 1
2558
    self.share_locks[locking.LEVEL_NODE] = 1
2559

    
2560
    # TODO: we could lock instances (and nodes) only if the user asked for
2561
    # dynamic fields. For that we need atomic ways to get info for a group of
2562
    # instances from the config, though.
2563
    if not self.op.names:
2564
      self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
2565
    else:
2566
      self.needed_locks[locking.LEVEL_INSTANCE] = \
2567
        _GetWantedInstances(self, self.op.names)
2568

    
2569
    self.needed_locks[locking.LEVEL_NODE] = []
2570
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2571

    
2572
  def DeclareLocks(self, level):
2573
    # TODO: locking of nodes could be avoided when not querying them
2574
    if level == locking.LEVEL_NODE:
2575
      self._LockInstancesNodes()
2576

    
2577
  def CheckPrereq(self):
2578
    """Check prerequisites.
2579

2580
    """
2581
    # This of course is valid only if we locked the instances
2582
    self.wanted = self.acquired_locks[locking.LEVEL_INSTANCE]
2583

    
2584
  def Exec(self, feedback_fn):
2585
    """Computes the list of nodes and their attributes.
2586

2587
    """
2588
    instance_names = self.wanted
2589
    instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2590
                     in instance_names]
2591

    
2592
    # begin data gathering
2593

    
2594
    nodes = frozenset([inst.primary_node for inst in instance_list])
2595

    
2596
    bad_nodes = []
2597
    if self.dynamic_fields.intersection(self.op.output_fields):
2598
      live_data = {}
2599
      node_data = rpc.call_all_instances_info(nodes)
2600
      for name in nodes:
2601
        result = node_data[name]
2602
        if result:
2603
          live_data.update(result)
2604
        elif result == False:
2605
          bad_nodes.append(name)
2606
        # else no instance is alive
2607
    else:
2608
      live_data = dict([(name, {}) for name in instance_names])
2609

    
2610
    # end data gathering
2611

    
2612
    output = []
2613
    for instance in instance_list:
2614
      iout = []
2615
      for field in self.op.output_fields:
2616
        if field == "name":
2617
          val = instance.name
2618
        elif field == "os":
2619
          val = instance.os
2620
        elif field == "pnode":
2621
          val = instance.primary_node
2622
        elif field == "snodes":
2623
          val = list(instance.secondary_nodes)
2624
        elif field == "admin_state":
2625
          val = (instance.status != "down")
2626
        elif field == "oper_state":
2627
          if instance.primary_node in bad_nodes:
2628
            val = None
2629
          else:
2630
            val = bool(live_data.get(instance.name))
2631
        elif field == "status":
2632
          if instance.primary_node in bad_nodes:
2633
            val = "ERROR_nodedown"
2634
          else:
2635
            running = bool(live_data.get(instance.name))
2636
            if running:
2637
              if instance.status != "down":
2638
                val = "running"
2639
              else:
2640
                val = "ERROR_up"
2641
            else:
2642
              if instance.status != "down":
2643
                val = "ERROR_down"
2644
              else:
2645
                val = "ADMIN_down"
2646
        elif field == "admin_ram":
2647
          val = instance.memory
2648
        elif field == "oper_ram":
2649
          if instance.primary_node in bad_nodes:
2650
            val = None
2651
          elif instance.name in live_data:
2652
            val = live_data[instance.name].get("memory", "?")
2653
          else:
2654
            val = "-"
2655
        elif field == "disk_template":
2656
          val = instance.disk_template
2657
        elif field == "ip":
2658
          val = instance.nics[0].ip
2659
        elif field == "bridge":
2660
          val = instance.nics[0].bridge
2661
        elif field == "mac":
2662
          val = instance.nics[0].mac
2663
        elif field == "sda_size" or field == "sdb_size":
2664
          disk = instance.FindDisk(field[:3])
2665
          if disk is None:
2666
            val = None
2667
          else:
2668
            val = disk.size
2669
        elif field == "vcpus":
2670
          val = instance.vcpus
2671
        elif field == "tags":
2672
          val = list(instance.GetTags())
2673
        elif field in ("network_port", "kernel_path", "initrd_path",
2674
                       "hvm_boot_order", "hvm_acpi", "hvm_pae",
2675
                       "hvm_cdrom_image_path", "hvm_nic_type",
2676
                       "hvm_disk_type", "vnc_bind_address"):
2677
          val = getattr(instance, field, None)
2678
          if val is not None:
2679
            pass
2680
          elif field in ("hvm_nic_type", "hvm_disk_type",
2681
                         "kernel_path", "initrd_path"):
2682
            val = "default"
2683
          else:
2684
            val = "-"
2685
        else:
2686
          raise errors.ParameterError(field)
2687
        iout.append(val)
2688
      output.append(iout)
2689

    
2690
    return output
2691

    
2692

    
2693
class LUFailoverInstance(LogicalUnit):
2694
  """Failover an instance.
2695

2696
  """
2697
  HPATH = "instance-failover"
2698
  HTYPE = constants.HTYPE_INSTANCE
2699
  _OP_REQP = ["instance_name", "ignore_consistency"]
2700
  REQ_BGL = False
2701

    
2702
  def ExpandNames(self):
2703
    self._ExpandAndLockInstance()
2704
    self.needed_locks[locking.LEVEL_NODE] = []
2705
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2706

    
2707
  def DeclareLocks(self, level):
2708
    if level == locking.LEVEL_NODE:
2709
      self._LockInstancesNodes()
2710

    
2711
  def BuildHooksEnv(self):
2712
    """Build hooks env.
2713

2714
    This runs on master, primary and secondary nodes of the instance.
2715

2716
    """
2717
    env = {
2718
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2719
      }
2720
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2721
    nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2722
    return env, nl, nl
2723

    
2724
  def CheckPrereq(self):
2725
    """Check prerequisites.
2726

2727
    This checks that the instance is in the cluster.
2728

2729
    """
2730
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2731
    assert self.instance is not None, \
2732
      "Cannot retrieve locked instance %s" % self.op.instance_name
2733

    
2734
    if instance.disk_template not in constants.DTS_NET_MIRROR:
2735
      raise errors.OpPrereqError("Instance's disk layout is not"
2736
                                 " network mirrored, cannot failover.")
2737

    
2738
    secondary_nodes = instance.secondary_nodes
2739
    if not secondary_nodes:
2740
      raise errors.ProgrammerError("no secondary node but using "
2741
                                   "a mirrored disk template")
2742

    
2743
    target_node = secondary_nodes[0]
2744
    # check memory requirements on the secondary node
2745
    _CheckNodeFreeMemory(self.cfg, target_node, "failing over instance %s" %
2746
                         instance.name, instance.memory)
2747

    
2748
    # check bridge existance
2749
    brlist = [nic.bridge for nic in instance.nics]
2750
    if not rpc.call_bridges_exist(target_node, brlist):
2751
      raise errors.OpPrereqError("One or more target bridges %s does not"
2752
                                 " exist on destination node '%s'" %
2753
                                 (brlist, target_node))
2754

    
2755
  def Exec(self, feedback_fn):
2756
    """Failover an instance.
2757

2758
    The failover is done by shutting it down on its present node and
2759
    starting it on the secondary.
2760

2761
    """
2762
    instance = self.instance
2763

    
2764
    source_node = instance.primary_node
2765
    target_node = instance.secondary_nodes[0]
2766

    
2767
    feedback_fn("* checking disk consistency between source and target")
2768
    for dev in instance.disks:
2769
      # for drbd, these are drbd over lvm
2770
      if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2771
        if instance.status == "up" and not self.op.ignore_consistency:
2772
          raise errors.OpExecError("Disk %s is degraded on target node,"
2773
                                   " aborting failover." % dev.iv_name)
2774

    
2775
    feedback_fn("* shutting down instance on source node")
2776
    logger.Info("Shutting down instance %s on node %s" %
2777
                (instance.name, source_node))
2778

    
2779
    if not rpc.call_instance_shutdown(source_node, instance):
2780
      if self.op.ignore_consistency:
2781
        logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2782
                     " anyway. Please make sure node %s is down"  %
2783
                     (instance.name, source_node, source_node))
2784
      else:
2785
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2786
                                 (instance.name, source_node))
2787

    
2788
    feedback_fn("* deactivating the instance's disks on source node")
2789
    if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2790
      raise errors.OpExecError("Can't shut down the instance's disks.")
2791

    
2792
    instance.primary_node = target_node
2793
    # distribute new instance config to the other nodes
2794
    self.cfg.Update(instance)
2795

    
2796
    # Only start the instance if it's marked as up
2797
    if instance.status == "up":
2798
      feedback_fn("* activating the instance's disks on target node")
2799
      logger.Info("Starting instance %s on node %s" %
2800
                  (instance.name, target_node))
2801

    
2802
      disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2803
                                               ignore_secondaries=True)
2804
      if not disks_ok:
2805
        _ShutdownInstanceDisks(instance, self.cfg)
2806
        raise errors.OpExecError("Can't activate the instance's disks")
2807

    
2808
      feedback_fn("* starting the instance on the target node")
2809
      if not rpc.call_instance_start(target_node, instance, None):
2810
        _ShutdownInstanceDisks(instance, self.cfg)
2811
        raise errors.OpExecError("Could not start instance %s on node %s." %
2812
                                 (instance.name, target_node))
2813

    
2814

    
2815
def _CreateBlockDevOnPrimary(cfg, node, instance, device, info):
2816
  """Create a tree of block devices on the primary node.
2817

2818
  This always creates all devices.
2819

2820
  """
2821
  if device.children:
2822
    for child in device.children:
2823
      if not _CreateBlockDevOnPrimary(cfg, node, instance, child, info):
2824
        return False
2825

    
2826
  cfg.SetDiskID(device, node)
2827
  new_id = rpc.call_blockdev_create(node, device, device.size,
2828
                                    instance.name, True, info)
2829
  if not new_id:
2830
    return False
2831
  if device.physical_id is None:
2832
    device.physical_id = new_id
2833
  return True
2834

    
2835

    
2836
def _CreateBlockDevOnSecondary(cfg, node, instance, device, force, info):
2837
  """Create a tree of block devices on a secondary node.
2838

2839
  If this device type has to be created on secondaries, create it and
2840
  all its children.
2841

2842
  If not, just recurse to children keeping the same 'force' value.
2843

2844
  """
2845
  if device.CreateOnSecondary():
2846
    force = True
2847
  if device.children:
2848
    for child in device.children:
2849
      if not _CreateBlockDevOnSecondary(cfg, node, instance,
2850
                                        child, force, info):
2851
        return False
2852

    
2853
  if not force:
2854
    return True
2855
  cfg.SetDiskID(device, node)
2856
  new_id = rpc.call_blockdev_create(node, device, device.size,
2857
                                    instance.name, False, info)
2858
  if not new_id:
2859
    return False
2860
  if device.physical_id is None:
2861
    device.physical_id = new_id
2862
  return True
2863

    
2864

    
2865
def _GenerateUniqueNames(cfg, exts):
2866
  """Generate a suitable LV name.
2867

2868
  This will generate a logical volume name for the given instance.
2869

2870
  """
2871
  results = []
2872
  for val in exts:
2873
    new_id = cfg.GenerateUniqueID()
2874
    results.append("%s%s" % (new_id, val))
2875
  return results
2876

    
2877

    
2878
def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name):
2879
  """Generate a drbd8 device complete with its children.
2880

2881
  """
2882
  port = cfg.AllocatePort()
2883
  vgname = cfg.GetVGName()
2884
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2885
                          logical_id=(vgname, names[0]))
2886
  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2887
                          logical_id=(vgname, names[1]))
2888
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
2889
                          logical_id = (primary, secondary, port),
2890
                          children = [dev_data, dev_meta],
2891
                          iv_name=iv_name)
2892
  return drbd_dev
2893

    
2894

    
2895
def _GenerateDiskTemplate(cfg, template_name,
2896
                          instance_name, primary_node,
2897
                          secondary_nodes, disk_sz, swap_sz,
2898
                          file_storage_dir, file_driver):
2899
  """Generate the entire disk layout for a given template type.
2900

2901
  """
2902
  #TODO: compute space requirements
2903

    
2904
  vgname = cfg.GetVGName()
2905
  if template_name == constants.DT_DISKLESS:
2906
    disks = []
2907
  elif template_name == constants.DT_PLAIN:
2908
    if len(secondary_nodes) != 0:
2909
      raise errors.ProgrammerError("Wrong template configuration")
2910

    
2911
    names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
2912
    sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2913
                           logical_id=(vgname, names[0]),
2914
                           iv_name = "sda")
2915
    sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2916
                           logical_id=(vgname, names[1]),
2917
                           iv_name = "sdb")
2918
    disks = [sda_dev, sdb_dev]
2919
  elif template_name == constants.DT_DRBD8:
2920
    if len(secondary_nodes) != 1:
2921
      raise errors.ProgrammerError("Wrong template configuration")
2922
    remote_node = secondary_nodes[0]
2923
    names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2924
                                       ".sdb_data", ".sdb_meta"])
2925
    drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2926
                                         disk_sz, names[0:2], "sda")
2927
    drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2928
                                         swap_sz, names[2:4], "sdb")
2929
    disks = [drbd_sda_dev, drbd_sdb_dev]
2930
  elif template_name == constants.DT_FILE:
2931
    if len(secondary_nodes) != 0:
2932
      raise errors.ProgrammerError("Wrong template configuration")
2933

    
2934
    file_sda_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk_sz,
2935
                                iv_name="sda", logical_id=(file_driver,
2936
                                "%s/sda" % file_storage_dir))
2937
    file_sdb_dev = objects.Disk(dev_type=constants.LD_FILE, size=swap_sz,
2938
                                iv_name="sdb", logical_id=(file_driver,
2939
                                "%s/sdb" % file_storage_dir))
2940
    disks = [file_sda_dev, file_sdb_dev]
2941
  else:
2942
    raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
2943
  return disks
2944

    
2945

    
2946
def _GetInstanceInfoText(instance):
2947
  """Compute that text that should be added to the disk's metadata.
2948

2949
  """
2950
  return "originstname+%s" % instance.name
2951

    
2952

    
2953
def _CreateDisks(cfg, instance):
2954
  """Create all disks for an instance.
2955

2956
  This abstracts away some work from AddInstance.
2957

2958
  Args:
2959
    instance: the instance object
2960

2961
  Returns:
2962
    True or False showing the success of the creation process
2963

2964
  """
2965
  info = _GetInstanceInfoText(instance)
2966

    
2967
  if instance.disk_template == constants.DT_FILE:
2968
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
2969
    result = rpc.call_file_storage_dir_create(instance.primary_node,
2970
                                              file_storage_dir)
2971

    
2972
    if not result:
2973
      logger.Error("Could not connect to node '%s'" % instance.primary_node)
2974
      return False
2975

    
2976
    if not result[0]:
2977
      logger.Error("failed to create directory '%s'" % file_storage_dir)
2978
      return False
2979

    
2980
  for device in instance.disks:
2981
    logger.Info("creating volume %s for instance %s" %
2982
                (device.iv_name, instance.name))
2983
    #HARDCODE
2984
    for secondary_node in instance.secondary_nodes:
2985
      if not _CreateBlockDevOnSecondary(cfg, secondary_node, instance,
2986
                                        device, False, info):
2987
        logger.Error("failed to create volume %s (%s) on secondary node %s!" %
2988
                     (device.iv_name, device, secondary_node))
2989
        return False
2990
    #HARDCODE
2991
    if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
2992
                                    instance, device, info):
2993
      logger.Error("failed to create volume %s on primary!" %
2994
                   device.iv_name)
2995
      return False
2996

    
2997
  return True
2998

    
2999

    
3000
def _RemoveDisks(instance, cfg):
3001
  """Remove all disks for an instance.
3002

3003
  This abstracts away some work from `AddInstance()` and
3004
  `RemoveInstance()`. Note that in case some of the devices couldn't
3005
  be removed, the removal will continue with the other ones (compare
3006
  with `_CreateDisks()`).
3007

3008
  Args:
3009
    instance: the instance object
3010

3011
  Returns:
3012
    True or False showing the success of the removal proces
3013

3014
  """
3015
  logger.Info("removing block devices for instance %s" % instance.name)
3016

    
3017
  result = True
3018
  for device in instance.disks:
3019
    for node, disk in device.ComputeNodeTree(instance.primary_node):
3020
      cfg.SetDiskID(disk, node)
3021
      if not rpc.call_blockdev_remove(node, disk):
3022
        logger.Error("could not remove block device %s on node %s,"
3023
                     " continuing anyway" %
3024
                     (device.iv_name, node))
3025
        result = False
3026

    
3027
  if instance.disk_template == constants.DT_FILE:
3028
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3029
    if not rpc.call_file_storage_dir_remove(instance.primary_node,
3030
                                            file_storage_dir):
3031
      logger.Error("could not remove directory '%s'" % file_storage_dir)
3032
      result = False
3033

    
3034
  return result
3035

    
3036

    
3037
def _ComputeDiskSize(disk_template, disk_size, swap_size):
3038
  """Compute disk size requirements in the volume group
3039

3040
  This is currently hard-coded for the two-drive layout.
3041

3042
  """
3043
  # Required free disk space as a function of disk and swap space
3044
  req_size_dict = {
3045
    constants.DT_DISKLESS: None,
3046
    constants.DT_PLAIN: disk_size + swap_size,
3047
    # 256 MB are added for drbd metadata, 128MB for each drbd device
3048
    constants.DT_DRBD8: disk_size + swap_size + 256,
3049
    constants.DT_FILE: None,
3050
  }
3051

    
3052
  if disk_template not in req_size_dict:
3053
    raise errors.ProgrammerError("Disk template '%s' size requirement"
3054
                                 " is unknown" %  disk_template)
3055

    
3056
  return req_size_dict[disk_template]
3057

    
3058

    
3059
class LUCreateInstance(LogicalUnit):
3060
  """Create an instance.
3061

3062
  """
3063
  HPATH = "instance-add"
3064
  HTYPE = constants.HTYPE_INSTANCE
3065
  _OP_REQP = ["instance_name", "mem_size", "disk_size",
3066
              "disk_template", "swap_size", "mode", "start", "vcpus",
3067
              "wait_for_sync", "ip_check", "mac"]
3068

    
3069
  def _RunAllocator(self):
3070
    """Run the allocator based on input opcode.
3071

3072
    """
3073
    disks = [{"size": self.op.disk_size, "mode": "w"},
3074
             {"size": self.op.swap_size, "mode": "w"}]
3075
    nics = [{"mac": self.op.mac, "ip": getattr(self.op, "ip", None),
3076
             "bridge": self.op.bridge}]
3077
    ial = IAllocator(self.cfg, self.sstore,
3078
                     mode=constants.IALLOCATOR_MODE_ALLOC,
3079
                     name=self.op.instance_name,
3080
                     disk_template=self.op.disk_template,
3081
                     tags=[],
3082
                     os=self.op.os_type,
3083
                     vcpus=self.op.vcpus,
3084
                     mem_size=self.op.mem_size,
3085
                     disks=disks,
3086
                     nics=nics,
3087
                     )
3088

    
3089
    ial.Run(self.op.iallocator)
3090

    
3091
    if not ial.success:
3092
      raise errors.OpPrereqError("Can't compute nodes using"
3093
                                 " iallocator '%s': %s" % (self.op.iallocator,
3094
                                                           ial.info))
3095
    if len(ial.nodes) != ial.required_nodes:
3096
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3097
                                 " of nodes (%s), required %s" %
3098
                                 (len(ial.nodes), ial.required_nodes))
3099
    self.op.pnode = ial.nodes[0]
3100
    logger.ToStdout("Selected nodes for the instance: %s" %
3101
                    (", ".join(ial.nodes),))
3102
    logger.Info("Selected nodes for instance %s via iallocator %s: %s" %
3103
                (self.op.instance_name, self.op.iallocator, ial.nodes))
3104
    if ial.required_nodes == 2:
3105
      self.op.snode = ial.nodes[1]
3106

    
3107
  def BuildHooksEnv(self):
3108
    """Build hooks env.
3109

3110
    This runs on master, primary and secondary nodes of the instance.
3111

3112
    """
3113
    env = {
3114
      "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
3115
      "INSTANCE_DISK_SIZE": self.op.disk_size,
3116
      "INSTANCE_SWAP_SIZE": self.op.swap_size,
3117
      "INSTANCE_ADD_MODE": self.op.mode,
3118
      }
3119
    if self.op.mode == constants.INSTANCE_IMPORT:
3120
      env["INSTANCE_SRC_NODE"] = self.op.src_node
3121
      env["INSTANCE_SRC_PATH"] = self.op.src_path
3122
      env["INSTANCE_SRC_IMAGE"] = self.src_image
3123

    
3124
    env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
3125
      primary_node=self.op.pnode,
3126
      secondary_nodes=self.secondaries,
3127
      status=self.instance_status,
3128
      os_type=self.op.os_type,
3129
      memory=self.op.mem_size,
3130
      vcpus=self.op.vcpus,
3131
      nics=[(self.inst_ip, self.op.bridge, self.op.mac)],
3132
    ))
3133

    
3134
    nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
3135
          self.secondaries)
3136
    return env, nl, nl
3137

    
3138

    
3139
  def CheckPrereq(self):
3140
    """Check prerequisites.
3141

3142
    """
3143
    # set optional parameters to none if they don't exist
3144
    for attr in ["kernel_path", "initrd_path", "hvm_boot_order", "pnode",
3145
                 "iallocator", "hvm_acpi", "hvm_pae", "hvm_cdrom_image_path",
3146
                 "hvm_nic_type", "hvm_disk_type", "vnc_bind_address"]:
3147
      if not hasattr(self.op, attr):
3148
        setattr(self.op, attr, None)
3149

    
3150
    if self.op.mode not in (constants.INSTANCE_CREATE,
3151
                            constants.INSTANCE_IMPORT):
3152
      raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
3153
                                 self.op.mode)
3154

    
3155
    if (not self.cfg.GetVGName() and
3156
        self.op.disk_template not in constants.DTS_NOT_LVM):
3157
      raise errors.OpPrereqError("Cluster does not support lvm-based"
3158
                                 " instances")
3159

    
3160
    if self.op.mode == constants.INSTANCE_IMPORT:
3161
      src_node = getattr(self.op, "src_node", None)
3162
      src_path = getattr(self.op, "src_path", None)
3163
      if src_node is None or src_path is None:
3164
        raise errors.OpPrereqError("Importing an instance requires source"
3165
                                   " node and path options")
3166
      src_node_full = self.cfg.ExpandNodeName(src_node)
3167
      if src_node_full is None:
3168
        raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
3169
      self.op.src_node = src_node = src_node_full
3170

    
3171
      if not os.path.isabs(src_path):
3172
        raise errors.OpPrereqError("The source path must be absolute")
3173

    
3174
      export_info = rpc.call_export_info(src_node, src_path)
3175

    
3176
      if not export_info:
3177
        raise errors.OpPrereqError("No export found in dir %s" % src_path)
3178

    
3179
      if not export_info.has_section(constants.INISECT_EXP):
3180
        raise errors.ProgrammerError("Corrupted export config")
3181

    
3182
      ei_version = export_info.get(constants.INISECT_EXP, 'version')
3183
      if (int(ei_version) != constants.EXPORT_VERSION):
3184
        raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3185
                                   (ei_version, constants.EXPORT_VERSION))
3186

    
3187
      if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
3188
        raise errors.OpPrereqError("Can't import instance with more than"
3189
                                   " one data disk")
3190

    
3191
      # FIXME: are the old os-es, disk sizes, etc. useful?
3192
      self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3193
      diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
3194
                                                         'disk0_dump'))
3195
      self.src_image = diskimage
3196
    else: # INSTANCE_CREATE
3197
      if getattr(self.op, "os_type", None) is None:
3198
        raise errors.OpPrereqError("No guest OS specified")
3199

    
3200
    #### instance parameters check
3201

    
3202
    # disk template and mirror node verification
3203
    if self.op.disk_template not in constants.DISK_TEMPLATES:
3204
      raise errors.OpPrereqError("Invalid disk template name")
3205

    
3206
    # instance name verification
3207
    hostname1 = utils.HostInfo(self.op.instance_name)
3208

    
3209
    self.op.instance_name = instance_name = hostname1.name
3210
    instance_list = self.cfg.GetInstanceList()
3211
    if instance_name in instance_list:
3212
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3213
                                 instance_name)
3214

    
3215
    # ip validity checks
3216
    ip = getattr(self.op, "ip", None)
3217
    if ip is None or ip.lower() == "none":
3218
      inst_ip = None
3219
    elif ip.lower() == "auto":
3220
      inst_ip = hostname1.ip
3221
    else:
3222
      if not utils.IsValidIP(ip):
3223
        raise errors.OpPrereqError("given IP address '%s' doesn't look"
3224
                                   " like a valid IP" % ip)
3225
      inst_ip = ip
3226
    self.inst_ip = self.op.ip = inst_ip
3227

    
3228
    if self.op.start and not self.op.ip_check:
3229
      raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3230
                                 " adding an instance in start mode")
3231

    
3232
    if self.op.ip_check:
3233
      if utils.TcpPing(hostname1.ip, constants.DEFAULT_NODED_PORT):
3234
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
3235
                                   (hostname1.ip, instance_name))
3236

    
3237
    # MAC address verification
3238
    if self.op.mac != "auto":
3239
      if not utils.IsValidMac(self.op.mac.lower()):
3240
        raise errors.OpPrereqError("invalid MAC address specified: %s" %
3241
                                   self.op.mac)
3242

    
3243
    # bridge verification
3244
    bridge = getattr(self.op, "bridge", None)
3245
    if bridge is None:
3246
      self.op.bridge = self.cfg.GetDefBridge()
3247
    else:
3248
      self.op.bridge = bridge
3249

    
3250
    # boot order verification
3251
    if self.op.hvm_boot_order is not None:
3252
      if len(self.op.hvm_boot_order.strip("acdn")) != 0:
3253
        raise errors.OpPrereqError("invalid boot order specified,"
3254
                                   " must be one or more of [acdn]")
3255
    # file storage checks
3256
    if (self.op.file_driver and
3257
        not self.op.file_driver in constants.FILE_DRIVER):
3258
      raise errors.OpPrereqError("Invalid file driver name '%s'" %
3259
                                 self.op.file_driver)
3260

    
3261
    if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
3262
      raise errors.OpPrereqError("File storage directory not a relative"
3263
                                 " path")
3264
    #### allocator run
3265

    
3266
    if [self.op.iallocator, self.op.pnode].count(None) != 1:
3267
      raise errors.OpPrereqError("One and only one of iallocator and primary"
3268
                                 " node must be given")
3269

    
3270
    if self.op.iallocator is not None:
3271
      self._RunAllocator()
3272

    
3273
    #### node related checks
3274

    
3275
    # check primary node
3276
    pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
3277
    if pnode is None:
3278
      raise errors.OpPrereqError("Primary node '%s' is unknown" %
3279
                                 self.op.pnode)
3280
    self.op.pnode = pnode.name
3281
    self.pnode = pnode
3282
    self.secondaries = []
3283

    
3284
    # mirror node verification
3285
    if self.op.disk_template in constants.DTS_NET_MIRROR:
3286
      if getattr(self.op, "snode", None) is None:
3287
        raise errors.OpPrereqError("The networked disk templates need"
3288
                                   " a mirror node")
3289

    
3290
      snode_name = self.cfg.ExpandNodeName(self.op.snode)
3291
      if snode_name is None:
3292
        raise errors.OpPrereqError("Unknown secondary node '%s'" %
3293
                                   self.op.snode)
3294
      elif snode_name == pnode.name:
3295
        raise errors.OpPrereqError("The secondary node cannot be"
3296
                                   " the primary node.")
3297
      self.secondaries.append(snode_name)
3298

    
3299
    req_size = _ComputeDiskSize(self.op.disk_template,
3300
                                self.op.disk_size, self.op.swap_size)
3301

    
3302
    # Check lv size requirements
3303
    if req_size is not None:
3304
      nodenames = [pnode.name] + self.secondaries
3305
      nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
3306
      for node in nodenames:
3307
        info = nodeinfo.get(node, None)
3308
        if not info:
3309
          raise errors.OpPrereqError("Cannot get current information"
3310
                                     " from node '%s'" % node)
3311
        vg_free = info.get('vg_free', None)
3312
        if not isinstance(vg_free, int):
3313
          raise errors.OpPrereqError("Can't compute free disk space on"
3314
                                     " node %s" % node)
3315
        if req_size > info['vg_free']:
3316
          raise errors.OpPrereqError("Not enough disk space on target node %s."
3317
                                     " %d MB available, %d MB required" %
3318
                                     (node, info['vg_free'], req_size))
3319

    
3320
    # os verification
3321
    os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
3322
    if not os_obj:
3323
      raise errors.OpPrereqError("OS '%s' not in supported os list for"
3324
                                 " primary node"  % self.op.os_type)
3325

    
3326
    if self.op.kernel_path == constants.VALUE_NONE:
3327
      raise errors.OpPrereqError("Can't set instance kernel to none")
3328

    
3329

    
3330
    # bridge check on primary node
3331
    if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3332
      raise errors.OpPrereqError("target bridge '%s' does not exist on"
3333
                                 " destination node '%s'" %
3334
                                 (self.op.bridge, pnode.name))
3335

    
3336
    # memory check on primary node
3337
    if self.op.start:
3338
      _CheckNodeFreeMemory(self.cfg, self.pnode.name,
3339
                           "creating instance %s" % self.op.instance_name,
3340
                           self.op.mem_size)
3341

    
3342
    # hvm_cdrom_image_path verification
3343
    if self.op.hvm_cdrom_image_path is not None:
3344
      if not os.path.isabs(self.op.hvm_cdrom_image_path):
3345
        raise errors.OpPrereqError("The path to the HVM CDROM image must"
3346
                                   " be an absolute path or None, not %s" %
3347
                                   self.op.hvm_cdrom_image_path)
3348
      if not os.path.isfile(self.op.hvm_cdrom_image_path):
3349
        raise errors.OpPrereqError("The HVM CDROM image must either be a"
3350
                                   " regular file or a symlink pointing to"
3351
                                   " an existing regular file, not %s" %
3352
                                   self.op.hvm_cdrom_image_path)
3353

    
3354
    # vnc_bind_address verification
3355
    if self.op.vnc_bind_address is not None:
3356
      if not utils.IsValidIP(self.op.vnc_bind_address):
3357
        raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
3358
                                   " like a valid IP address" %
3359
                                   self.op.vnc_bind_address)
3360

    
3361
    # Xen HVM device type checks
3362
    if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
3363
      if self.op.hvm_nic_type not in constants.HT_HVM_VALID_NIC_TYPES:
3364
        raise errors.OpPrereqError("Invalid NIC type %s specified for Xen HVM"
3365
                                   " hypervisor" % self.op.hvm_nic_type)
3366
      if self.op.hvm_disk_type not in constants.HT_HVM_VALID_DISK_TYPES:
3367
        raise errors.OpPrereqError("Invalid disk type %s specified for Xen HVM"
3368
                                   " hypervisor" % self.op.hvm_disk_type)
3369

    
3370
    if self.op.start:
3371
      self.instance_status = 'up'
3372
    else:
3373
      self.instance_status = 'down'
3374

    
3375
  def Exec(self, feedback_fn):
3376
    """Create and add the instance to the cluster.
3377

3378
    """
3379
    instance = self.op.instance_name
3380
    pnode_name = self.pnode.name
3381

    
3382
    if self.op.mac == "auto":
3383
      mac_address = self.cfg.GenerateMAC()
3384
    else:
3385
      mac_address = self.op.mac
3386

    
3387
    nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3388
    if self.inst_ip is not None:
3389
      nic.ip = self.inst_ip
3390

    
3391
    ht_kind = self.sstore.GetHypervisorType()
3392
    if ht_kind in constants.HTS_REQ_PORT:
3393
      network_port = self.cfg.AllocatePort()
3394
    else:
3395
      network_port = None
3396

    
3397
    if self.op.vnc_bind_address is None:
3398
      self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
3399

    
3400
    # this is needed because os.path.join does not accept None arguments
3401
    if self.op.file_storage_dir is None:
3402
      string_file_storage_dir = ""
3403
    else:
3404
      string_file_storage_dir = self.op.file_storage_dir
3405

    
3406
    # build the full file storage dir path
3407
    file_storage_dir = os.path.normpath(os.path.join(
3408
                                        self.sstore.GetFileStorageDir(),
3409
                                        string_file_storage_dir, instance))
3410

    
3411

    
3412
    disks = _GenerateDiskTemplate(self.cfg,
3413
                                  self.op.disk_template,
3414
                                  instance, pnode_name,
3415
                                  self.secondaries, self.op.disk_size,
3416
                                  self.op.swap_size,
3417
                                  file_storage_dir,
3418
                                  self.op.file_driver)
3419

    
3420
    iobj = objects.Instance(name=instance, os=self.op.os_type,
3421
                            primary_node=pnode_name,
3422
                            memory=self.op.mem_size,
3423
                            vcpus=self.op.vcpus,
3424
                            nics=[nic], disks=disks,
3425
                            disk_template=self.op.disk_template,
3426
                            status=self.instance_status,
3427
                            network_port=network_port,
3428
                            kernel_path=self.op.kernel_path,
3429
                            initrd_path=self.op.initrd_path,
3430
                            hvm_boot_order=self.op.hvm_boot_order,
3431
                            hvm_acpi=self.op.hvm_acpi,
3432
                            hvm_pae=self.op.hvm_pae,
3433
                            hvm_cdrom_image_path=self.op.hvm_cdrom_image_path,
3434
                            vnc_bind_address=self.op.vnc_bind_address,
3435
                            hvm_nic_type=self.op.hvm_nic_type,
3436
                            hvm_disk_type=self.op.hvm_disk_type,
3437
                            )
3438

    
3439
    feedback_fn("* creating instance disks...")
3440
    if not _CreateDisks(self.cfg, iobj):
3441
      _RemoveDisks(iobj, self.cfg)
3442
      raise errors.OpExecError("Device creation failed, reverting...")
3443

    
3444
    feedback_fn("adding instance %s to cluster config" % instance)
3445

    
3446
    self.cfg.AddInstance(iobj)
3447
    # Add the new instance to the Ganeti Lock Manager
3448
    self.context.glm.add(locking.LEVEL_INSTANCE, instance)
3449

    
3450
    if self.op.wait_for_sync:
3451
      disk_abort = not _WaitForSync(self.cfg, iobj, self.proc)
3452
    elif iobj.disk_template in constants.DTS_NET_MIRROR:
3453
      # make sure the disks are not degraded (still sync-ing is ok)
3454
      time.sleep(15)
3455
      feedback_fn("* checking mirrors status")
3456
      disk_abort = not _WaitForSync(self.cfg, iobj, self.proc, oneshot=True)
3457
    else:
3458
      disk_abort = False
3459

    
3460
    if disk_abort:
3461
      _RemoveDisks(iobj, self.cfg)
3462
      self.cfg.RemoveInstance(iobj.name)
3463
      # Remove the new instance from the Ganeti Lock Manager
3464
      self.context.glm.remove(locking.LEVEL_INSTANCE, iobj.name)
3465
      raise errors.OpExecError("There are some degraded disks for"
3466
                               " this instance")
3467

    
3468
    feedback_fn("creating os for instance %s on node %s" %
3469
                (instance, pnode_name))
3470

    
3471
    if iobj.disk_template != constants.DT_DISKLESS:
3472
      if self.op.mode == constants.INSTANCE_CREATE:
3473
        feedback_fn("* running the instance OS create scripts...")
3474
        if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3475
          raise errors.OpExecError("could not add os for instance %s"
3476
                                   " on node %s" %
3477
                                   (instance, pnode_name))
3478

    
3479
      elif self.op.mode == constants.INSTANCE_IMPORT:
3480
        feedback_fn("* running the instance OS import scripts...")
3481
        src_node = self.op.src_node
3482
        src_image = self.src_image
3483
        if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3484
                                                src_node, src_image):
3485
          raise errors.OpExecError("Could not import os for instance"
3486
                                   " %s on node %s" %
3487
                                   (instance, pnode_name))
3488
      else:
3489
        # also checked in the prereq part
3490
        raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3491
                                     % self.op.mode)
3492

    
3493
    if self.op.start:
3494
      logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3495
      feedback_fn("* starting instance...")
3496
      if not rpc.call_instance_start(pnode_name, iobj, None):
3497
        raise errors.OpExecError("Could not start instance")
3498

    
3499

    
3500
class LUConnectConsole(NoHooksLU):
3501
  """Connect to an instance's console.
3502

3503
  This is somewhat special in that it returns the command line that
3504
  you need to run on the master node in order to connect to the
3505
  console.
3506

3507
  """
3508
  _OP_REQP = ["instance_name"]
3509
  REQ_BGL = False
3510

    
3511
  def ExpandNames(self):
3512
    self._ExpandAndLockInstance()
3513

    
3514
  def CheckPrereq(self):
3515
    """Check prerequisites.
3516

3517
    This checks that the instance is in the cluster.
3518

3519
    """
3520
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3521
    assert self.instance is not None, \
3522
      "Cannot retrieve locked instance %s" % self.op.instance_name
3523

    
3524
  def Exec(self, feedback_fn):
3525
    """Connect to the console of an instance
3526

3527
    """
3528
    instance = self.instance
3529
    node = instance.primary_node
3530

    
3531
    node_insts = rpc.call_instance_list([node])[node]
3532
    if node_insts is False:
3533
      raise errors.OpExecError("Can't connect to node %s." % node)
3534

    
3535
    if instance.name not in node_insts:
3536
      raise errors.OpExecError("Instance %s is not running." % instance.name)
3537

    
3538
    logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3539

    
3540
    hyper = hypervisor.GetHypervisor()
3541
    console_cmd = hyper.GetShellCommandForConsole(instance)
3542

    
3543
    # build ssh cmdline
3544
    return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
3545

    
3546

    
3547
class LUReplaceDisks(LogicalUnit):
3548
  """Replace the disks of an instance.
3549

3550
  """
3551
  HPATH = "mirrors-replace"
3552
  HTYPE = constants.HTYPE_INSTANCE
3553
  _OP_REQP = ["instance_name", "mode", "disks"]
3554
  REQ_BGL = False
3555

    
3556
  def ExpandNames(self):
3557
    self._ExpandAndLockInstance()
3558

    
3559
    if not hasattr(self.op, "remote_node"):
3560
      self.op.remote_node = None
3561

    
3562
    ia_name = getattr(self.op, "iallocator", None)
3563
    if ia_name is not None:
3564
      if self.op.remote_node is not None:
3565
        raise errors.OpPrereqError("Give either the iallocator or the new"
3566
                                   " secondary, not both")
3567
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3568
    elif self.op.remote_node is not None:
3569
      remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
3570
      if remote_node is None:
3571
        raise errors.OpPrereqError("Node '%s' not known" %
3572
                                   self.op.remote_node)
3573
      self.op.remote_node = remote_node
3574
      self.needed_locks[locking.LEVEL_NODE] = [remote_node]
3575
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
3576
    else:
3577
      self.needed_locks[locking.LEVEL_NODE] = []
3578
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3579

    
3580
  def DeclareLocks(self, level):
3581
    # If we're not already locking all nodes in the set we have to declare the
3582
    # instance's primary/secondary nodes.
3583
    if (level == locking.LEVEL_NODE and
3584
        self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
3585
      self._LockInstancesNodes()
3586

    
3587
  def _RunAllocator(self):
3588
    """Compute a new secondary node using an IAllocator.
3589

3590
    """
3591
    ial = IAllocator(self.cfg, self.sstore,
3592
                     mode=constants.IALLOCATOR_MODE_RELOC,
3593
                     name=self.op.instance_name,
3594
                     relocate_from=[self.sec_node])
3595

    
3596
    ial.Run(self.op.iallocator)
3597

    
3598
    if not ial.success:
3599
      raise errors.OpPrereqError("Can't compute nodes using"
3600
                                 " iallocator '%s': %s" % (self.op.iallocator,
3601
                                                           ial.info))
3602
    if len(ial.nodes) != ial.required_nodes:
3603
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3604
                                 " of nodes (%s), required %s" %
3605
                                 (len(ial.nodes), ial.required_nodes))
3606
    self.op.remote_node = ial.nodes[0]
3607
    logger.ToStdout("Selected new secondary for the instance: %s" %
3608
                    self.op.remote_node)
3609

    
3610
  def BuildHooksEnv(self):
3611
    """Build hooks env.
3612

3613
    This runs on the master, the primary and all the secondaries.
3614

3615
    """
3616
    env = {
3617
      "MODE": self.op.mode,
3618
      "NEW_SECONDARY": self.op.remote_node,
3619
      "OLD_SECONDARY": self.instance.secondary_nodes[0],
3620
      }
3621
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3622
    nl = [
3623
      self.sstore.GetMasterNode(),
3624
      self.instance.primary_node,
3625
      ]
3626
    if self.op.remote_node is not None:
3627
      nl.append(self.op.remote_node)
3628
    return env, nl, nl
3629

    
3630
  def CheckPrereq(self):
3631
    """Check prerequisites.
3632

3633
    This checks that the instance is in the cluster.
3634

3635
    """
3636
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3637
    assert instance is not None, \
3638
      "Cannot retrieve locked instance %s" % self.op.instance_name
3639
    self.instance = instance
3640

    
3641
    if instance.disk_template not in constants.DTS_NET_MIRROR:
3642
      raise errors.OpPrereqError("Instance's disk layout is not"
3643
                                 " network mirrored.")
3644

    
3645
    if len(instance.secondary_nodes) != 1:
3646
      raise errors.OpPrereqError("The instance has a strange layout,"
3647
                                 " expected one secondary but found %d" %
3648
                                 len(instance.secondary_nodes))
3649

    
3650
    self.sec_node = instance.secondary_nodes[0]
3651

    
3652
    ia_name = getattr(self.op, "iallocator", None)
3653
    if ia_name is not None:
3654
      self._RunAllocator()
3655

    
3656
    remote_node = self.op.remote_node
3657
    if remote_node is not None:
3658
      self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3659
      assert self.remote_node_info is not None, \
3660
        "Cannot retrieve locked node %s" % remote_node
3661
    else:
3662
      self.remote_node_info = None
3663
    if remote_node == instance.primary_node:
3664
      raise errors.OpPrereqError("The specified node is the primary node of"
3665
                                 " the instance.")
3666
    elif remote_node == self.sec_node:
3667
      if self.op.mode == constants.REPLACE_DISK_SEC:
3668
        # this is for DRBD8, where we can't execute the same mode of
3669
        # replacement as for drbd7 (no different port allocated)
3670
        raise errors.OpPrereqError("Same secondary given, cannot execute"
3671
                                   " replacement")
3672
    if instance.disk_template == constants.DT_DRBD8:
3673
      if (self.op.mode == constants.REPLACE_DISK_ALL and
3674
          remote_node is not None):
3675
        # switch to replace secondary mode
3676
        self.op.mode = constants.REPLACE_DISK_SEC
3677

    
3678
      if self.op.mode == constants.REPLACE_DISK_ALL:
3679
        raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3680
                                   " secondary disk replacement, not"
3681
                                   " both at once")
3682
      elif self.op.mode == constants.REPLACE_DISK_PRI:
3683
        if remote_node is not None:
3684
          raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3685
                                     " the secondary while doing a primary"
3686
                                     " node disk replacement")
3687
        self.tgt_node = instance.primary_node
3688
        self.oth_node = instance.secondary_nodes[0]
3689
      elif self.op.mode == constants.REPLACE_DISK_SEC:
3690
        self.new_node = remote_node # this can be None, in which case
3691
                                    # we don't change the secondary
3692
        self.tgt_node = instance.secondary_nodes[0]
3693
        self.oth_node = instance.primary_node
3694
      else:
3695
        raise errors.ProgrammerError("Unhandled disk replace mode")
3696

    
3697
    for name in self.op.disks:
3698
      if instance.FindDisk(name) is None:
3699
        raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3700
                                   (name, instance.name))
3701

    
3702
  def _ExecD8DiskOnly(self, feedback_fn):
3703
    """Replace a disk on the primary or secondary for dbrd8.
3704

3705
    The algorithm for replace is quite complicated:
3706
      - for each disk to be replaced:
3707
        - create new LVs on the target node with unique names
3708
        - detach old LVs from the drbd device
3709
        - rename old LVs to name_replaced.<time_t>
3710
        - rename new LVs to old LVs
3711
        - attach the new LVs (with the old names now) to the drbd device
3712
      - wait for sync across all devices
3713
      - for each modified disk:
3714
        - remove old LVs (which have the name name_replaces.<time_t>)
3715

3716
    Failures are not very well handled.
3717

3718
    """
3719
    steps_total = 6
3720
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3721
    instance = self.instance
3722
    iv_names = {}
3723
    vgname = self.cfg.GetVGName()
3724
    # start of work
3725
    cfg = self.cfg
3726
    tgt_node = self.tgt_node
3727
    oth_node = self.oth_node
3728

    
3729
    # Step: check device activation
3730
    self.proc.LogStep(1, steps_total, "check device existence")
3731
    info("checking volume groups")
3732
    my_vg = cfg.GetVGName()
3733
    results = rpc.call_vg_list([oth_node, tgt_node])
3734
    if not results:
3735
      raise errors.OpExecError("Can't list volume groups on the nodes")
3736
    for node in oth_node, tgt_node:
3737
      res = results.get(node, False)
3738
      if not res or my_vg not in res:
3739
        raise errors.OpExecError("Volume group '%s' not found on %s" %
3740
                                 (my_vg, node))
3741
    for dev in instance.disks:
3742
      if not dev.iv_name in self.op.disks:
3743
        continue
3744
      for node in tgt_node, oth_node:
3745
        info("checking %s on %s" % (dev.iv_name, node))
3746
        cfg.SetDiskID(dev, node)
3747
        if not rpc.call_blockdev_find(node, dev):
3748
          raise errors.OpExecError("Can't find device %s on node %s" %
3749
                                   (dev.iv_name, node))
3750

    
3751
    # Step: check other node consistency
3752
    self.proc.LogStep(2, steps_total, "check peer consistency")
3753
    for dev in instance.disks:
3754
      if not dev.iv_name in self.op.disks:
3755
        continue
3756
      info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3757
      if not _CheckDiskConsistency(self.cfg, dev, oth_node,
3758
                                   oth_node==instance.primary_node):
3759
        raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3760
                                 " to replace disks on this node (%s)" %
3761
                                 (oth_node, tgt_node))
3762

    
3763
    # Step: create new storage
3764
    self.proc.LogStep(3, steps_total, "allocate new storage")
3765
    for dev in instance.disks:
3766
      if not dev.iv_name in self.op.disks:
3767
        continue
3768
      size = dev.size
3769
      cfg.SetDiskID(dev, tgt_node)
3770
      lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3771
      names = _GenerateUniqueNames(cfg, lv_names)
3772
      lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3773
                             logical_id=(vgname, names[0]))
3774
      lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3775
                             logical_id=(vgname, names[1]))
3776
      new_lvs = [lv_data, lv_meta]
3777
      old_lvs = dev.children
3778
      iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3779
      info("creating new local storage on %s for %s" %
3780
           (tgt_node, dev.iv_name))
3781
      # since we *always* want to create this LV, we use the
3782
      # _Create...OnPrimary (which forces the creation), even if we
3783
      # are talking about the secondary node
3784
      for new_lv in new_lvs:
3785
        if not _CreateBlockDevOnPrimary(cfg, tgt_node, instance, new_lv,
3786
                                        _GetInstanceInfoText(instance)):
3787
          raise errors.OpExecError("Failed to create new LV named '%s' on"
3788
                                   " node '%s'" %
3789
                                   (new_lv.logical_id[1], tgt_node))
3790

    
3791
    # Step: for each lv, detach+rename*2+attach
3792
    self.proc.LogStep(4, steps_total, "change drbd configuration")
3793
    for dev, old_lvs, new_lvs in iv_names.itervalues():
3794
      info("detaching %s drbd from local storage" % dev.iv_name)
3795
      if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
3796
        raise errors.OpExecError("Can't detach drbd from local storage on node"
3797
                                 " %s for device %s" % (tgt_node, dev.iv_name))
3798
      #dev.children = []
3799
      #cfg.Update(instance)
3800

    
3801
      # ok, we created the new LVs, so now we know we have the needed
3802
      # storage; as such, we proceed on the target node to rename
3803
      # old_lv to _old, and new_lv to old_lv; note that we rename LVs
3804
      # using the assumption that logical_id == physical_id (which in
3805
      # turn is the unique_id on that node)
3806

    
3807
      # FIXME(iustin): use a better name for the replaced LVs
3808
      temp_suffix = int(time.time())
3809
      ren_fn = lambda d, suff: (d.physical_id[0],
3810
                                d.physical_id[1] + "_replaced-%s" % suff)
3811
      # build the rename list based on what LVs exist on the node
3812
      rlist = []
3813
      for to_ren in old_lvs:
3814
        find_res = rpc.call_blockdev_find(tgt_node, to_ren)
3815
        if find_res is not None: # device exists
3816
          rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
3817

    
3818
      info("renaming the old LVs on the target node")
3819
      if not rpc.call_blockdev_rename(tgt_node, rlist):
3820
        raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
3821
      # now we rename the new LVs to the old LVs
3822
      info("renaming the new LVs on the target node")
3823
      rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
3824
      if not rpc.call_blockdev_rename(tgt_node, rlist):
3825
        raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
3826

    
3827
      for old, new in zip(old_lvs, new_lvs):
3828
        new.logical_id = old.logical_id
3829
        cfg.SetDiskID(new, tgt_node)
3830

    
3831
      for disk in old_lvs:
3832
        disk.logical_id = ren_fn(disk, temp_suffix)
3833
        cfg.SetDiskID(disk, tgt_node)
3834

    
3835
      # now that the new lvs have the old name, we can add them to the device
3836
      info("adding new mirror component on %s" % tgt_node)
3837
      if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
3838
        for new_lv in new_lvs:
3839
          if not rpc.call_blockdev_remove(tgt_node, new_lv):
3840
            warning("Can't rollback device %s", hint="manually cleanup unused"
3841
                    " logical volumes")
3842
        raise errors.OpExecError("Can't add local storage to drbd")
3843

    
3844
      dev.children = new_lvs
3845
      cfg.Update(instance)
3846

    
3847
    # Step: wait for sync
3848

    
3849
    # this can fail as the old devices are degraded and _WaitForSync
3850
    # does a combined result over all disks, so we don't check its
3851
    # return value
3852
    self.proc.LogStep(5, steps_total, "sync devices")
3853
    _WaitForSync(cfg, instance, self.proc, unlock=True)
3854

    
3855
    # so check manually all the devices
3856
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3857
      cfg.SetDiskID(dev, instance.primary_node)
3858
      is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3859
      if is_degr:
3860
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
3861

    
3862
    # Step: remove old storage
3863
    self.proc.LogStep(6, steps_total, "removing old storage")
3864
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3865
      info("remove logical volumes for %s" % name)
3866
      for lv in old_lvs:
3867
        cfg.SetDiskID(lv, tgt_node)
3868
        if not rpc.call_blockdev_remove(tgt_node, lv):
3869
          warning("Can't remove old LV", hint="manually remove unused LVs")
3870
          continue
3871

    
3872
  def _ExecD8Secondary(self, feedback_fn):
3873
    """Replace the secondary node for drbd8.
3874

3875
    The algorithm for replace is quite complicated:
3876
      - for all disks of the instance:
3877
        - create new LVs on the new node with same names
3878
        - shutdown the drbd device on the old secondary
3879
        - disconnect the drbd network on the primary
3880
        - create the drbd device on the new secondary
3881
        - network attach the drbd on the primary, using an artifice:
3882
          the drbd code for Attach() will connect to the network if it
3883
          finds a device which is connected to the good local disks but
3884
          not network enabled
3885
      - wait for sync across all devices
3886
      - remove all disks from the old secondary
3887

3888
    Failures are not very well handled.
3889

3890
    """
3891
    steps_total = 6
3892
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3893
    instance = self.instance
3894
    iv_names = {}
3895
    vgname = self.cfg.GetVGName()
3896
    # start of work
3897
    cfg = self.cfg
3898
    old_node = self.tgt_node
3899
    new_node = self.new_node
3900
    pri_node = instance.primary_node
3901

    
3902
    # Step: check device activation
3903
    self.proc.LogStep(1, steps_total, "check device existence")
3904
    info("checking volume groups")
3905
    my_vg = cfg.GetVGName()
3906
    results = rpc.call_vg_list([pri_node, new_node])
3907
    if not results:
3908
      raise errors.OpExecError("Can't list volume groups on the nodes")
3909
    for node in pri_node, new_node:
3910
      res = results.get(node, False)
3911
      if not res or my_vg not in res:
3912
        raise errors.OpExecError("Volume group '%s' not found on %s" %
3913
                                 (my_vg, node))
3914
    for dev in instance.disks:
3915
      if not dev.iv_name in self.op.disks:
3916
        continue
3917
      info("checking %s on %s" % (dev.iv_name, pri_node))
3918
      cfg.SetDiskID(dev, pri_node)
3919
      if not rpc.call_blockdev_find(pri_node, dev):
3920
        raise errors.OpExecError("Can't find device %s on node %s" %
3921
                                 (dev.iv_name, pri_node))
3922

    
3923
    # Step: check other node consistency
3924
    self.proc.LogStep(2, steps_total, "check peer consistency")
3925
    for dev in instance.disks:
3926
      if not dev.iv_name in self.op.disks:
3927
        continue
3928
      info("checking %s consistency on %s" % (dev.iv_name, pri_node))
3929
      if not _CheckDiskConsistency(self.cfg, dev, pri_node, True, ldisk=True):
3930
        raise errors.OpExecError("Primary node (%s) has degraded storage,"
3931
                                 " unsafe to replace the secondary" %
3932
                                 pri_node)
3933

    
3934
    # Step: create new storage
3935
    self.proc.LogStep(3, steps_total, "allocate new storage")
3936
    for dev in instance.disks:
3937
      size = dev.size
3938
      info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
3939
      # since we *always* want to create this LV, we use the
3940
      # _Create...OnPrimary (which forces the creation), even if we
3941
      # are talking about the secondary node
3942
      for new_lv in dev.children:
3943
        if not _CreateBlockDevOnPrimary(cfg, new_node, instance, new_lv,
3944
                                        _GetInstanceInfoText(instance)):
3945
          raise errors.OpExecError("Failed to create new LV named '%s' on"
3946
                                   " node '%s'" %
3947
                                   (new_lv.logical_id[1], new_node))
3948

    
3949
      iv_names[dev.iv_name] = (dev, dev.children)
3950

    
3951
    self.proc.LogStep(4, steps_total, "changing drbd configuration")
3952
    for dev in instance.disks:
3953
      size = dev.size
3954
      info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
3955
      # create new devices on new_node
3956
      new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
3957
                              logical_id=(pri_node, new_node,
3958
                                          dev.logical_id[2]),
3959
                              children=dev.children)
3960
      if not _CreateBlockDevOnSecondary(cfg, new_node, instance,
3961
                                        new_drbd, False,
3962
                                      _GetInstanceInfoText(instance)):
3963
        raise errors.OpExecError("Failed to create new DRBD on"
3964
                                 " node '%s'" % new_node)
3965

    
3966
    for dev in instance.disks:
3967
      # we have new devices, shutdown the drbd on the old secondary
3968
      info("shutting down drbd for %s on old node" % dev.iv_name)
3969
      cfg.SetDiskID(dev, old_node)
3970
      if not rpc.call_blockdev_shutdown(old_node, dev):
3971
        warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
3972
                hint="Please cleanup this device manually as soon as possible")
3973

    
3974
    info("detaching primary drbds from the network (=> standalone)")
3975
    done = 0
3976
    for dev in instance.disks:
3977
      cfg.SetDiskID(dev, pri_node)
3978
      # set the physical (unique in bdev terms) id to None, meaning
3979
      # detach from network
3980
      dev.physical_id = (None,) * len(dev.physical_id)
3981
      # and 'find' the device, which will 'fix' it to match the
3982
      # standalone state
3983
      if rpc.call_blockdev_find(pri_node, dev):
3984
        done += 1
3985
      else:
3986
        warning("Failed to detach drbd %s from network, unusual case" %
3987
                dev.iv_name)
3988

    
3989
    if not done:
3990
      # no detaches succeeded (very unlikely)
3991
      raise errors.OpExecError("Can't detach at least one DRBD from old node")
3992

    
3993
    # if we managed to detach at least one, we update all the disks of
3994
    # the instance to point to the new secondary
3995
    info("updating instance configuration")
3996
    for dev in instance.disks:
3997
      dev.logical_id = (pri_node, new_node) + dev.logical_id[2:]
3998
      cfg.SetDiskID(dev, pri_node)
3999
    cfg.Update(instance)
4000

    
4001
    # and now perform the drbd attach
4002
    info("attaching primary drbds to new secondary (standalone => connected)")
4003
    failures = []
4004
    for dev in instance.disks:
4005
      info("attaching primary drbd for %s to new secondary node" % dev.iv_name)
4006
      # since the attach is smart, it's enough to 'find' the device,
4007
      # it will automatically activate the network, if the physical_id
4008
      # is correct
4009
      cfg.SetDiskID(dev, pri_node)
4010
      if not rpc.call_blockdev_find(pri_node, dev):
4011
        warning("can't attach drbd %s to new secondary!" % dev.iv_name,
4012
                "please do a gnt-instance info to see the status of disks")
4013

    
4014
    # this can fail as the old devices are degraded and _WaitForSync
4015
    # does a combined result over all disks, so we don't check its
4016
    # return value
4017
    self.proc.LogStep(5, steps_total, "sync devices")
4018
    _WaitForSync(cfg, instance, self.proc, unlock=True)
4019

    
4020
    # so check manually all the devices
4021
    for name, (dev, old_lvs) in iv_names.iteritems():
4022
      cfg.SetDiskID(dev, pri_node)
4023
      is_degr = rpc.call_blockdev_find(pri_node, dev)[5]
4024
      if is_degr:
4025
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
4026

    
4027
    self.proc.LogStep(6, steps_total, "removing old storage")
4028
    for name, (dev, old_lvs) in iv_names.iteritems():
4029
      info("remove logical volumes for %s" % name)
4030
      for lv in old_lvs:
4031
        cfg.SetDiskID(lv, old_node)
4032
        if not rpc.call_blockdev_remove(old_node, lv):
4033
          warning("Can't remove LV on old secondary",
4034
                  hint="Cleanup stale volumes by hand")
4035

    
4036
  def Exec(self, feedback_fn):
4037
    """Execute disk replacement.
4038

4039
    This dispatches the disk replacement to the appropriate handler.
4040

4041
    """
4042
    instance = self.instance
4043

    
4044
    # Activate the instance disks if we're replacing them on a down instance
4045
    if instance.status == "down":
4046
      _StartInstanceDisks(self.cfg, instance, True)
4047

    
4048
    if instance.disk_template == constants.DT_DRBD8:
4049
      if self.op.remote_node is None:
4050
        fn = self._ExecD8DiskOnly
4051
      else:
4052
        fn = self._ExecD8Secondary
4053
    else:
4054
      raise errors.ProgrammerError("Unhandled disk replacement case")
4055

    
4056
    ret = fn(feedback_fn)
4057

    
4058
    # Deactivate the instance disks if we're replacing them on a down instance
4059
    if instance.status == "down":
4060
      _SafeShutdownInstanceDisks(instance, self.cfg)
4061

    
4062
    return ret
4063

    
4064

    
4065
class LUGrowDisk(LogicalUnit):
4066
  """Grow a disk of an instance.
4067

4068
  """
4069
  HPATH = "disk-grow"
4070
  HTYPE = constants.HTYPE_INSTANCE
4071
  _OP_REQP = ["instance_name", "disk", "amount"]
4072
  REQ_BGL = False
4073

    
4074
  def ExpandNames(self):
4075
    self._ExpandAndLockInstance()
4076
    self.needed_locks[locking.LEVEL_NODE] = []
4077
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4078

    
4079
  def DeclareLocks(self, level):
4080
    if level == locking.LEVEL_NODE:
4081
      self._LockInstancesNodes()
4082

    
4083
  def BuildHooksEnv(self):
4084
    """Build hooks env.
4085

4086
    This runs on the master, the primary and all the secondaries.
4087

4088
    """
4089
    env = {
4090
      "DISK": self.op.disk,
4091
      "AMOUNT": self.op.amount,
4092
      }
4093
    env.update(_BuildInstanceHookEnvByObject(self.instance))
4094
    nl = [
4095
      self.sstore.GetMasterNode(),
4096
      self.instance.primary_node,
4097
      ]
4098
    return env, nl, nl
4099

    
4100
  def CheckPrereq(self):
4101
    """Check prerequisites.
4102

4103
    This checks that the instance is in the cluster.
4104

4105
    """
4106
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4107
    assert instance is not None, \
4108
      "Cannot retrieve locked instance %s" % self.op.instance_name
4109

    
4110
    self.instance = instance
4111

    
4112
    if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
4113
      raise errors.OpPrereqError("Instance's disk layout does not support"
4114
                                 " growing.")
4115

    
4116
    if instance.FindDisk(self.op.disk) is None:
4117
      raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
4118
                                 (self.op.disk, instance.name))
4119

    
4120
    nodenames = [instance.primary_node] + list(instance.secondary_nodes)
4121
    nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
4122
    for node in nodenames:
4123
      info = nodeinfo.get(node, None)
4124
      if not info:
4125
        raise errors.OpPrereqError("Cannot get current information"
4126
                                   " from node '%s'" % node)
4127
      vg_free = info.get('vg_free', None)
4128
      if not isinstance(vg_free, int):
4129
        raise errors.OpPrereqError("Can't compute free disk space on"
4130
                                   " node %s" % node)
4131
      if self.op.amount > info['vg_free']:
4132
        raise errors.OpPrereqError("Not enough disk space on target node %s:"
4133
                                   " %d MiB available, %d MiB required" %
4134
                                   (node, info['vg_free'], self.op.amount))
4135

    
4136
  def Exec(self, feedback_fn):
4137
    """Execute disk grow.
4138

4139
    """
4140
    instance = self.instance
4141
    disk = instance.FindDisk(self.op.disk)
4142
    for node in (instance.secondary_nodes + (instance.primary_node,)):
4143
      self.cfg.SetDiskID(disk, node)
4144
      result = rpc.call_blockdev_grow(node, disk, self.op.amount)
4145
      if not result or not isinstance(result, (list, tuple)) or len(result) != 2:
4146
        raise errors.OpExecError("grow request failed to node %s" % node)
4147
      elif not result[0]:
4148
        raise errors.OpExecError("grow request failed to node %s: %s" %
4149
                                 (node, result[1]))
4150
    disk.RecordGrow(self.op.amount)
4151
    self.cfg.Update(instance)
4152
    return
4153

    
4154

    
4155
class LUQueryInstanceData(NoHooksLU):
4156
  """Query runtime instance data.
4157

4158
  """
4159
  _OP_REQP = ["instances"]
4160
  REQ_BGL = False
4161
  def ExpandNames(self):
4162
    self.needed_locks = {}
4163
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
4164

    
4165
    if not isinstance(self.op.instances, list):
4166
      raise errors.OpPrereqError("Invalid argument type 'instances'")
4167

    
4168
    if self.op.instances:
4169
      self.wanted_names = []
4170
      for name in self.op.instances:
4171
        full_name = self.cfg.ExpandInstanceName(name)
4172
        if full_name is None:
4173
          raise errors.OpPrereqError("Instance '%s' not known" %
4174
                                     self.op.instance_name)
4175
        self.wanted_names.append(full_name)
4176
      self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
4177
    else:
4178
      self.wanted_names = None
4179
      self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
4180

    
4181
    self.needed_locks[locking.LEVEL_NODE] = []
4182
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4183

    
4184
  def DeclareLocks(self, level):
4185
    if level == locking.LEVEL_NODE:
4186
      self._LockInstancesNodes()
4187

    
4188
  def CheckPrereq(self):
4189
    """Check prerequisites.
4190

4191
    This only checks the optional instance list against the existing names.
4192

4193
    """
4194
    if self.wanted_names is None:
4195
      self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
4196

    
4197
    self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
4198
                             in self.wanted_names]
4199
    return
4200

    
4201
  def _ComputeDiskStatus(self, instance, snode, dev):
4202
    """Compute block device status.
4203

4204
    """
4205
    self.cfg.SetDiskID(dev, instance.primary_node)
4206
    dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
4207
    if dev.dev_type in constants.LDS_DRBD:
4208
      # we change the snode then (otherwise we use the one passed in)
4209
      if dev.logical_id[0] == instance.primary_node:
4210
        snode = dev.logical_id[1]
4211
      else:
4212
        snode = dev.logical_id[0]
4213

    
4214
    if snode:
4215
      self.cfg.SetDiskID(dev, snode)
4216
      dev_sstatus = rpc.call_blockdev_find(snode, dev)
4217
    else:
4218
      dev_sstatus = None
4219

    
4220
    if dev.children:
4221
      dev_children = [self._ComputeDiskStatus(instance, snode, child)
4222
                      for child in dev.children]
4223
    else:
4224
      dev_children = []
4225

    
4226
    data = {
4227
      "iv_name": dev.iv_name,
4228
      "dev_type": dev.dev_type,
4229
      "logical_id": dev.logical_id,
4230
      "physical_id": dev.physical_id,
4231
      "pstatus": dev_pstatus,
4232
      "sstatus": dev_sstatus,
4233
      "children": dev_children,
4234
      }
4235

    
4236
    return data
4237

    
4238
  def Exec(self, feedback_fn):
4239
    """Gather and return data"""
4240
    result = {}
4241
    for instance in self.wanted_instances:
4242
      remote_info = rpc.call_instance_info(instance.primary_node,
4243
                                                instance.name)
4244
      if remote_info and "state" in remote_info:
4245
        remote_state = "up"
4246
      else:
4247
        remote_state = "down"
4248
      if instance.status == "down":
4249
        config_state = "down"
4250
      else:
4251
        config_state = "up"
4252

    
4253
      disks = [self._ComputeDiskStatus(instance, None, device)
4254
               for device in instance.disks]
4255

    
4256
      idict = {
4257
        "name": instance.name,
4258
        "config_state": config_state,
4259
        "run_state": remote_state,
4260
        "pnode": instance.primary_node,
4261
        "snodes": instance.secondary_nodes,
4262
        "os": instance.os,
4263
        "memory": instance.memory,
4264
        "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
4265
        "disks": disks,
4266
        "vcpus": instance.vcpus,
4267
        }
4268

    
4269
      htkind = self.sstore.GetHypervisorType()
4270
      if htkind == constants.HT_XEN_PVM30:
4271
        idict["kernel_path"] = instance.kernel_path
4272
        idict["initrd_path"] = instance.initrd_path
4273

    
4274
      if htkind == constants.HT_XEN_HVM31:
4275
        idict["hvm_boot_order"] = instance.hvm_boot_order
4276
        idict["hvm_acpi"] = instance.hvm_acpi
4277
        idict["hvm_pae"] = instance.hvm_pae
4278
        idict["hvm_cdrom_image_path"] = instance.hvm_cdrom_image_path
4279
        idict["hvm_nic_type"] = instance.hvm_nic_type
4280
        idict["hvm_disk_type"] = instance.hvm_disk_type
4281

    
4282
      if htkind in constants.HTS_REQ_PORT:
4283
        if instance.vnc_bind_address is None:
4284
          vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
4285
        else:
4286
          vnc_bind_address = instance.vnc_bind_address
4287
        if instance.network_port is None:
4288
          vnc_console_port = None
4289
        elif vnc_bind_address == constants.BIND_ADDRESS_GLOBAL:
4290
          vnc_console_port = "%s:%s" % (instance.primary_node,
4291
                                       instance.network_port)
4292
        elif vnc_bind_address == constants.LOCALHOST_IP_ADDRESS:
4293
          vnc_console_port = "%s:%s on node %s" % (vnc_bind_address,
4294
                                                   instance.network_port,
4295
                                                   instance.primary_node)
4296
        else:
4297
          vnc_console_port = "%s:%s" % (instance.vnc_bind_address,
4298
                                        instance.network_port)
4299
        idict["vnc_console_port"] = vnc_console_port
4300
        idict["vnc_bind_address"] = vnc_bind_address
4301
        idict["network_port"] = instance.network_port
4302

    
4303
      result[instance.name] = idict
4304

    
4305
    return result
4306

    
4307

    
4308
class LUSetInstanceParams(LogicalUnit):
4309
  """Modifies an instances's parameters.
4310

4311
  """
4312
  HPATH = "instance-modify"
4313
  HTYPE = constants.HTYPE_INSTANCE
4314
  _OP_REQP = ["instance_name"]
4315
  REQ_BGL = False
4316

    
4317
  def ExpandNames(self):
4318
    self._ExpandAndLockInstance()
4319

    
4320
  def BuildHooksEnv(self):
4321
    """Build hooks env.
4322

4323
    This runs on the master, primary and secondaries.
4324

4325
    """
4326
    args = dict()
4327
    if self.mem:
4328
      args['memory'] = self.mem
4329
    if self.vcpus:
4330
      args['vcpus'] = self.vcpus
4331
    if self.do_ip or self.do_bridge or self.mac:
4332
      if self.do_ip:
4333
        ip = self.ip
4334
      else:
4335
        ip = self.instance.nics[0].ip
4336
      if self.bridge:
4337
        bridge = self.bridge
4338
      else:
4339
        bridge = self.instance.nics[0].bridge
4340
      if self.mac:
4341
        mac = self.mac
4342
      else:
4343
        mac = self.instance.nics[0].mac
4344
      args['nics'] = [(ip, bridge, mac)]
4345
    env = _BuildInstanceHookEnvByObject(self.instance, override=args)
4346
    nl = [self.sstore.GetMasterNode(),
4347
          self.instance.primary_node] + list(self.instance.secondary_nodes)
4348
    return env, nl, nl
4349

    
4350
  def CheckPrereq(self):
4351
    """Check prerequisites.
4352

4353
    This only checks the instance list against the existing names.
4354

4355
    """
4356
    # FIXME: all the parameters could be checked before, in ExpandNames, or in
4357
    # a separate CheckArguments function, if we implement one, so the operation
4358
    # can be aborted without waiting for any lock, should it have an error...
4359
    self.mem = getattr(self.op, "mem", None)
4360
    self.vcpus = getattr(self.op, "vcpus", None)
4361
    self.ip = getattr(self.op, "ip", None)
4362
    self.mac = getattr(self.op, "mac", None)
4363
    self.bridge = getattr(self.op, "bridge", None)
4364
    self.kernel_path = getattr(self.op, "kernel_path", None)
4365
    self.initrd_path = getattr(self.op, "initrd_path", None)
4366
    self.hvm_boot_order = getattr(self.op, "hvm_boot_order", None)
4367
    self.hvm_acpi = getattr(self.op, "hvm_acpi", None)
4368
    self.hvm_pae = getattr(self.op, "hvm_pae", None)
4369
    self.hvm_nic_type = getattr(self.op, "hvm_nic_type", None)
4370
    self.hvm_disk_type = getattr(self.op, "hvm_disk_type", None)
4371
    self.hvm_cdrom_image_path = getattr(self.op, "hvm_cdrom_image_path", None)
4372
    self.vnc_bind_address = getattr(self.op, "vnc_bind_address", None)
4373
    self.force = getattr(self.op, "force", None)
4374
    all_parms = [self.mem, self.vcpus, self.ip, self.bridge, self.mac,
4375
                 self.kernel_path, self.initrd_path, self.hvm_boot_order,
4376
                 self.hvm_acpi, self.hvm_pae, self.hvm_cdrom_image_path,
4377
                 self.vnc_bind_address, self.hvm_nic_type, self.hvm_disk_type]
4378
    if all_parms.count(None) == len(all_parms):
4379
      raise errors.OpPrereqError("No changes submitted")
4380
    if self.mem is not None:
4381
      try:
4382
        self.mem = int(self.mem)
4383
      except ValueError, err:
4384
        raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
4385
    if self.vcpus is not None:
4386
      try:
4387
        self.vcpus = int(self.vcpus)
4388
      except ValueError, err:
4389
        raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
4390
    if self.ip is not None:
4391
      self.do_ip = True
4392
      if self.ip.lower() == "none":
4393
        self.ip = None
4394
      else:
4395
        if not utils.IsValidIP(self.ip):
4396
          raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
4397
    else:
4398
      self.do_ip = False
4399
    self.do_bridge = (self.bridge is not None)
4400
    if self.mac is not None:
4401
      if self.cfg.IsMacInUse(self.mac):
4402
        raise errors.OpPrereqError('MAC address %s already in use in cluster' %
4403
                                   self.mac)
4404
      if not utils.IsValidMac(self.mac):
4405
        raise errors.OpPrereqError('Invalid MAC address %s' % self.mac)
4406

    
4407
    if self.kernel_path is not None:
4408
      self.do_kernel_path = True
4409
      if self.kernel_path == constants.VALUE_NONE:
4410
        raise errors.OpPrereqError("Can't set instance to no kernel")
4411

    
4412
      if self.kernel_path != constants.VALUE_DEFAULT:
4413
        if not os.path.isabs(self.kernel_path):
4414
          raise errors.OpPrereqError("The kernel path must be an absolute"
4415
                                    " filename")
4416
    else:
4417
      self.do_kernel_path = False
4418

    
4419
    if self.initrd_path is not None:
4420
      self.do_initrd_path = True
4421
      if self.initrd_path not in (constants.VALUE_NONE,
4422
                                  constants.VALUE_DEFAULT):
4423
        if not os.path.isabs(self.initrd_path):
4424
          raise errors.OpPrereqError("The initrd path must be an absolute"
4425
                                    " filename")
4426
    else:
4427
      self.do_initrd_path = False
4428

    
4429
    # boot order verification
4430
    if self.hvm_boot_order is not None:
4431
      if self.hvm_boot_order != constants.VALUE_DEFAULT:
4432
        if len(self.hvm_boot_order.strip("acdn")) != 0:
4433
          raise errors.OpPrereqError("invalid boot order specified,"
4434
                                     " must be one or more of [acdn]"
4435
                                     " or 'default'")
4436

    
4437
    # hvm_cdrom_image_path verification
4438
    if self.op.hvm_cdrom_image_path is not None:
4439
      if not (os.path.isabs(self.op.hvm_cdrom_image_path) or
4440
              self.op.hvm_cdrom_image_path.lower() == "none"):
4441
        raise errors.OpPrereqError("The path to the HVM CDROM image must"
4442
                                   " be an absolute path or None, not %s" %
4443
                                   self.op.hvm_cdrom_image_path)
4444
      if not (os.path.isfile(self.op.hvm_cdrom_image_path) or
4445
              self.op.hvm_cdrom_image_path.lower() == "none"):
4446
        raise errors.OpPrereqError("The HVM CDROM image must either be a"
4447
                                   " regular file or a symlink pointing to"
4448
                                   " an existing regular file, not %s" %
4449
                                   self.op.hvm_cdrom_image_path)
4450

    
4451
    # vnc_bind_address verification
4452
    if self.op.vnc_bind_address is not None:
4453
      if not utils.IsValidIP(self.op.vnc_bind_address):
4454
        raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
4455
                                   " like a valid IP address" %
4456
                                   self.op.vnc_bind_address)
4457

    
4458
    instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4459
    assert self.instance is not None, \
4460
      "Cannot retrieve locked instance %s" % self.op.instance_name
4461
    self.warn = []
4462
    if self.mem is not None and not self.force:
4463
      pnode = self.instance.primary_node
4464
      nodelist = [pnode]
4465
      nodelist.extend(instance.secondary_nodes)
4466
      instance_info = rpc.call_instance_info(pnode, instance.name)
4467
      nodeinfo = rpc.call_node_info(nodelist, self.cfg.GetVGName())
4468

    
4469
      if pnode not in nodeinfo or not isinstance(nodeinfo[pnode], dict):
4470
        # Assume the primary node is unreachable and go ahead
4471
        self.warn.append("Can't get info from primary node %s" % pnode)
4472
      else:
4473
        if instance_info:
4474
          current_mem = instance_info['memory']
4475
        else:
4476
          # Assume instance not running
4477
          # (there is a slight race condition here, but it's not very probable,
4478
          # and we have no other way to check)
4479
          current_mem = 0
4480
        miss_mem = self.mem - current_mem - nodeinfo[pnode]['memory_free']
4481
        if miss_mem > 0:
4482
          raise errors.OpPrereqError("This change will prevent the instance"
4483
                                     " from starting, due to %d MB of memory"
4484
                                     " missing on its primary node" % miss_mem)
4485

    
4486
      for node in instance.secondary_nodes:
4487
        if node not in nodeinfo or not isinstance(nodeinfo[node], dict):
4488
          self.warn.append("Can't get info from secondary node %s" % node)
4489
        elif self.mem > nodeinfo[node]['memory_free']:
4490
          self.warn.append("Not enough memory to failover instance to secondary"
4491
                           " node %s" % node)
4492

    
4493
    # Xen HVM device type checks
4494
    if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
4495
      if self.op.hvm_nic_type is not None:
4496
        if self.op.hvm_nic_type not in constants.HT_HVM_VALID_NIC_TYPES:
4497
          raise errors.OpPrereqError("Invalid NIC type %s specified for Xen"
4498
                                     " HVM  hypervisor" % self.op.hvm_nic_type)
4499
      if self.op.hvm_disk_type is not None:
4500
        if self.op.hvm_disk_type not in constants.HT_HVM_VALID_DISK_TYPES:
4501
          raise errors.OpPrereqError("Invalid disk type %s specified for Xen"
4502
                                     " HVM hypervisor" % self.op.hvm_disk_type)
4503

    
4504
    return
4505

    
4506
  def Exec(self, feedback_fn):
4507
    """Modifies an instance.
4508

4509
    All parameters take effect only at the next restart of the instance.
4510
    """
4511
    # Process here the warnings from CheckPrereq, as we don't have a
4512
    # feedback_fn there.
4513
    for warn in self.warn:
4514
      feedback_fn("WARNING: %s" % warn)
4515

    
4516
    result = []
4517
    instance = self.instance
4518
    if self.mem:
4519
      instance.memory = self.mem
4520
      result.append(("mem", self.mem))
4521
    if self.vcpus:
4522
      instance.vcpus = self.vcpus
4523
      result.append(("vcpus",  self.vcpus))
4524
    if self.do_ip:
4525
      instance.nics[0].ip = self.ip
4526
      result.append(("ip", self.ip))
4527
    if self.bridge:
4528
      instance.nics[0].bridge = self.bridge
4529
      result.append(("bridge", self.bridge))
4530
    if self.mac:
4531
      instance.nics[0].mac = self.mac
4532
      result.append(("mac", self.mac))
4533
    if self.do_kernel_path:
4534
      instance.kernel_path = self.kernel_path
4535
      result.append(("kernel_path", self.kernel_path))
4536
    if self.do_initrd_path:
4537
      instance.initrd_path = self.initrd_path
4538
      result.append(("initrd_path", self.initrd_path))
4539
    if self.hvm_boot_order:
4540
      if self.hvm_boot_order == constants.VALUE_DEFAULT:
4541
        instance.hvm_boot_order = None
4542
      else:
4543
        instance.hvm_boot_order = self.hvm_boot_order
4544
      result.append(("hvm_boot_order", self.hvm_boot_order))
4545
    if self.hvm_acpi is not None:
4546
      instance.hvm_acpi = self.hvm_acpi
4547
      result.append(("hvm_acpi", self.hvm_acpi))
4548
    if self.hvm_pae is not None:
4549
      instance.hvm_pae = self.hvm_pae
4550
      result.append(("hvm_pae", self.hvm_pae))
4551
    if self.hvm_nic_type is not None:
4552
      instance.hvm_nic_type = self.hvm_nic_type
4553
      result.append(("hvm_nic_type", self.hvm_nic_type))
4554
    if self.hvm_disk_type is not None:
4555
      instance.hvm_disk_type = self.hvm_disk_type
4556
      result.append(("hvm_disk_type", self.hvm_disk_type))
4557
    if self.hvm_cdrom_image_path:
4558
      if self.hvm_cdrom_image_path == constants.VALUE_NONE:
4559
        instance.hvm_cdrom_image_path = None
4560
      else:
4561
        instance.hvm_cdrom_image_path = self.hvm_cdrom_image_path
4562
      result.append(("hvm_cdrom_image_path", self.hvm_cdrom_image_path))
4563
    if self.vnc_bind_address:
4564
      instance.vnc_bind_address = self.vnc_bind_address
4565
      result.append(("vnc_bind_address", self.vnc_bind_address))
4566

    
4567
    self.cfg.Update(instance)
4568

    
4569
    return result
4570

    
4571

    
4572
class LUQueryExports(NoHooksLU):
4573
  """Query the exports list
4574

4575
  """
4576
  _OP_REQP = ['nodes']
4577
  REQ_BGL = False
4578

    
4579
  def ExpandNames(self):
4580
    self.needed_locks = {}
4581
    self.share_locks[locking.LEVEL_NODE] = 1
4582
    if not self.op.nodes:
4583
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4584
    else:
4585
      self.needed_locks[locking.LEVEL_NODE] = \
4586
        _GetWantedNodes(self, self.op.nodes)
4587

    
4588
  def CheckPrereq(self):
4589
    """Check prerequisites.
4590

4591
    """
4592
    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
4593

    
4594
  def Exec(self, feedback_fn):
4595
    """Compute the list of all the exported system images.
4596

4597
    Returns:
4598
      a dictionary with the structure node->(export-list)
4599
      where export-list is a list of the instances exported on
4600
      that node.
4601

4602
    """
4603
    return rpc.call_export_list(self.nodes)
4604

    
4605

    
4606
class LUExportInstance(LogicalUnit):
4607
  """Export an instance to an image in the cluster.
4608

4609
  """
4610
  HPATH = "instance-export"
4611
  HTYPE = constants.HTYPE_INSTANCE
4612
  _OP_REQP = ["instance_name", "target_node", "shutdown"]
4613
  REQ_BGL = False
4614

    
4615
  def ExpandNames(self):
4616
    self._ExpandAndLockInstance()
4617
    # FIXME: lock only instance primary and destination node
4618
    #
4619
    # Sad but true, for now we have do lock all nodes, as we don't know where
4620
    # the previous export might be, and and in this LU we search for it and
4621
    # remove it from its current node. In the future we could fix this by:
4622
    #  - making a tasklet to search (share-lock all), then create the new one,
4623
    #    then one to remove, after
4624
    #  - removing the removal operation altoghether
4625
    self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4626

    
4627
  def DeclareLocks(self, level):
4628
    """Last minute lock declaration."""
4629
    # All nodes are locked anyway, so nothing to do here.
4630

    
4631
  def BuildHooksEnv(self):
4632
    """Build hooks env.
4633

4634
    This will run on the master, primary node and target node.
4635

4636
    """
4637
    env = {
4638
      "EXPORT_NODE": self.op.target_node,
4639
      "EXPORT_DO_SHUTDOWN": self.op.shutdown,
4640
      }
4641
    env.update(_BuildInstanceHookEnvByObject(self.instance))
4642
    nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
4643
          self.op.target_node]
4644
    return env, nl, nl
4645

    
4646
  def CheckPrereq(self):
4647
    """Check prerequisites.
4648

4649
    This checks that the instance and node names are valid.
4650

4651
    """
4652
    instance_name = self.op.instance_name
4653
    self.instance = self.cfg.GetInstanceInfo(instance_name)
4654
    assert self.instance is not None, \
4655
          "Cannot retrieve locked instance %s" % self.op.instance_name
4656

    
4657
    self.dst_node = self.cfg.GetNodeInfo(
4658
      self.cfg.ExpandNodeName(self.op.target_node))
4659

    
4660
    assert self.dst_node is not None, \
4661
          "Cannot retrieve locked node %s" % self.op.target_node
4662

    
4663
    # instance disk type verification
4664
    for disk in self.instance.disks:
4665
      if disk.dev_type == constants.LD_FILE:
4666
        raise errors.OpPrereqError("Export not supported for instances with"
4667
                                   " file-based disks")
4668

    
4669
  def Exec(self, feedback_fn):
4670
    """Export an instance to an image in the cluster.
4671

4672
    """
4673
    instance = self.instance
4674
    dst_node = self.dst_node
4675
    src_node = instance.primary_node
4676
    if self.op.shutdown:
4677
      # shutdown the instance, but not the disks
4678
      if not rpc.call_instance_shutdown(src_node, instance):
4679
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
4680
                                 (instance.name, src_node))
4681

    
4682
    vgname = self.cfg.GetVGName()
4683

    
4684
    snap_disks = []
4685

    
4686
    try:
4687
      for disk in instance.disks:
4688
        if disk.iv_name == "sda":
4689
          # new_dev_name will be a snapshot of an lvm leaf of the one we passed
4690
          new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
4691

    
4692
          if not new_dev_name:
4693
            logger.Error("could not snapshot block device %s on node %s" %
4694
                         (disk.logical_id[1], src_node))
4695
          else:
4696
            new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
4697
                                      logical_id=(vgname, new_dev_name),
4698
                                      physical_id=(vgname, new_dev_name),
4699
                                      iv_name=disk.iv_name)
4700
            snap_disks.append(new_dev)
4701

    
4702
    finally:
4703
      if self.op.shutdown and instance.status == "up":
4704
        if not rpc.call_instance_start(src_node, instance, None):
4705
          _ShutdownInstanceDisks(instance, self.cfg)
4706
          raise errors.OpExecError("Could not start instance")
4707

    
4708
    # TODO: check for size
4709

    
4710
    for dev in snap_disks:
4711
      if not rpc.call_snapshot_export(src_node, dev, dst_node.name, instance):
4712
        logger.Error("could not export block device %s from node %s to node %s"
4713
                     % (dev.logical_id[1], src_node, dst_node.name))
4714
      if not rpc.call_blockdev_remove(src_node, dev):
4715
        logger.Error("could not remove snapshot block device %s from node %s" %
4716
                     (dev.logical_id[1], src_node))
4717

    
4718
    if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
4719
      logger.Error("could not finalize export for instance %s on node %s" %
4720
                   (instance.name, dst_node.name))
4721

    
4722
    nodelist = self.cfg.GetNodeList()
4723
    nodelist.remove(dst_node.name)
4724

    
4725
    # on one-node clusters nodelist will be empty after the removal
4726
    # if we proceed the backup would be removed because OpQueryExports
4727
    # substitutes an empty list with the full cluster node list.
4728
    if nodelist:
4729
      exportlist = rpc.call_export_list(nodelist)
4730
      for node in exportlist:
4731
        if instance.name in exportlist[node]:
4732
          if not rpc.call_export_remove(node, instance.name):
4733
            logger.Error("could not remove older export for instance %s"
4734
                         " on node %s" % (instance.name, node))
4735

    
4736

    
4737
class LURemoveExport(NoHooksLU):
4738
  """Remove exports related to the named instance.
4739

4740
  """
4741
  _OP_REQP = ["instance_name"]
4742

    
4743
  def CheckPrereq(self):
4744
    """Check prerequisites.
4745
    """
4746
    pass
4747

    
4748
  def Exec(self, feedback_fn):
4749
    """Remove any export.
4750

4751
    """
4752
    instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4753
    # If the instance was not found we'll try with the name that was passed in.
4754
    # This will only work if it was an FQDN, though.
4755
    fqdn_warn = False
4756
    if not instance_name:
4757
      fqdn_warn = True
4758
      instance_name = self.op.instance_name
4759

    
4760
    exportlist = rpc.call_export_list(self.cfg.GetNodeList())
4761
    found = False
4762
    for node in exportlist:
4763
      if instance_name in exportlist[node]:
4764
        found = True
4765
        if not rpc.call_export_remove(node, instance_name):
4766
          logger.Error("could not remove export for instance %s"
4767
                       " on node %s" % (instance_name, node))
4768

    
4769
    if fqdn_warn and not found:
4770
      feedback_fn("Export not found. If trying to remove an export belonging"
4771
                  " to a deleted instance please use its Fully Qualified"
4772
                  " Domain Name.")
4773

    
4774

    
4775
class TagsLU(NoHooksLU):
4776
  """Generic tags LU.
4777

4778
  This is an abstract class which is the parent of all the other tags LUs.
4779

4780
  """
4781
  def CheckPrereq(self):
4782
    """Check prerequisites.
4783

4784
    """
4785
    if self.op.kind == constants.TAG_CLUSTER:
4786
      self.target = self.cfg.GetClusterInfo()
4787
    elif self.op.kind == constants.TAG_NODE:
4788
      name = self.cfg.ExpandNodeName(self.op.name)
4789
      if name is None:
4790
        raise errors.OpPrereqError("Invalid node name (%s)" %
4791
                                   (self.op.name,))
4792
      self.op.name = name
4793
      self.target = self.cfg.GetNodeInfo(name)
4794
    elif self.op.kind == constants.TAG_INSTANCE:
4795
      name = self.cfg.ExpandInstanceName(self.op.name)
4796
      if name is None:
4797
        raise errors.OpPrereqError("Invalid instance name (%s)" %
4798
                                   (self.op.name,))
4799
      self.op.name = name
4800
      self.target = self.cfg.GetInstanceInfo(name)
4801
    else:
4802
      raise errors.OpPrereqError("Wrong tag type requested (%s)" %
4803
                                 str(self.op.kind))
4804

    
4805

    
4806
class LUGetTags(TagsLU):
4807
  """Returns the tags of a given object.
4808

4809
  """
4810
  _OP_REQP = ["kind", "name"]
4811

    
4812
  def Exec(self, feedback_fn):
4813
    """Returns the tag list.
4814

4815
    """
4816
    return list(self.target.GetTags())
4817

    
4818

    
4819
class LUSearchTags(NoHooksLU):
4820
  """Searches the tags for a given pattern.
4821

4822
  """
4823
  _OP_REQP = ["pattern"]
4824

    
4825
  def CheckPrereq(self):
4826
    """Check prerequisites.
4827

4828
    This checks the pattern passed for validity by compiling it.
4829

4830
    """
4831
    try:
4832
      self.re = re.compile(self.op.pattern)
4833
    except re.error, err:
4834
      raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
4835
                                 (self.op.pattern, err))
4836

    
4837
  def Exec(self, feedback_fn):
4838
    """Returns the tag list.
4839

4840
    """
4841
    cfg = self.cfg
4842
    tgts = [("/cluster", cfg.GetClusterInfo())]
4843
    ilist = [cfg.GetInstanceInfo(name) for name in cfg.GetInstanceList()]
4844
    tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
4845
    nlist = [cfg.GetNodeInfo(name) for name in cfg.GetNodeList()]
4846
    tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
4847
    results = []
4848
    for path, target in tgts:
4849
      for tag in target.GetTags():
4850
        if self.re.search(tag):
4851
          results.append((path, tag))
4852
    return results
4853

    
4854

    
4855
class LUAddTags(TagsLU):
4856
  """Sets a tag on a given object.
4857

4858
  """
4859
  _OP_REQP = ["kind", "name", "tags"]
4860

    
4861
  def CheckPrereq(self):
4862
    """Check prerequisites.
4863

4864
    This checks the type and length of the tag name and value.
4865

4866
    """
4867
    TagsLU.CheckPrereq(self)
4868
    for tag in self.op.tags:
4869
      objects.TaggableObject.ValidateTag(tag)
4870

    
4871
  def Exec(self, feedback_fn):
4872
    """Sets the tag.
4873

4874
    """
4875
    try:
4876
      for tag in self.op.tags:
4877
        self.target.AddTag(tag)
4878
    except errors.TagError, err:
4879
      raise errors.OpExecError("Error while setting tag: %s" % str(err))
4880
    try:
4881
      self.cfg.Update(self.target)
4882
    except errors.ConfigurationError:
4883
      raise errors.OpRetryError("There has been a modification to the"
4884
                                " config file and the operation has been"
4885
                                " aborted. Please retry.")
4886

    
4887

    
4888
class LUDelTags(TagsLU):
4889
  """Delete a list of tags from a given object.
4890

4891
  """
4892
  _OP_REQP = ["kind", "name", "tags"]
4893

    
4894
  def CheckPrereq(self):
4895
    """Check prerequisites.
4896

4897
    This checks that we have the given tag.
4898

4899
    """
4900
    TagsLU.CheckPrereq(self)
4901
    for tag in self.op.tags:
4902
      objects.TaggableObject.ValidateTag(tag)
4903
    del_tags = frozenset(self.op.tags)
4904
    cur_tags = self.target.GetTags()
4905
    if not del_tags <= cur_tags:
4906
      diff_tags = del_tags - cur_tags
4907
      diff_names = ["'%s'" % tag for tag in diff_tags]
4908
      diff_names.sort()
4909
      raise errors.OpPrereqError("Tag(s) %s not found" %
4910
                                 (",".join(diff_names)))
4911

    
4912
  def Exec(self, feedback_fn):
4913
    """Remove the tag from the object.
4914

4915
    """
4916
    for tag in self.op.tags:
4917
      self.target.RemoveTag(tag)
4918
    try:
4919
      self.cfg.Update(self.target)
4920
    except errors.ConfigurationError:
4921
      raise errors.OpRetryError("There has been a modification to the"
4922
                                " config file and the operation has been"
4923
                                " aborted. Please retry.")
4924

    
4925

    
4926
class LUTestDelay(NoHooksLU):
4927
  """Sleep for a specified amount of time.
4928

4929
  This LU sleeps on the master and/or nodes for a specified amount of
4930
  time.
4931

4932
  """
4933
  _OP_REQP = ["duration", "on_master", "on_nodes"]
4934
  REQ_BGL = False
4935

    
4936
  def ExpandNames(self):
4937
    """Expand names and set required locks.
4938

4939
    This expands the node list, if any.
4940

4941
    """
4942
    self.needed_locks = {}
4943
    if self.op.on_nodes:
4944
      # _GetWantedNodes can be used here, but is not always appropriate to use
4945
      # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
4946
      # more information.
4947
      self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
4948
      self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
4949

    
4950
  def CheckPrereq(self):
4951
    """Check prerequisites.
4952

4953
    """
4954

    
4955
  def Exec(self, feedback_fn):
4956
    """Do the actual sleep.
4957

4958
    """
4959
    if self.op.on_master:
4960
      if not utils.TestDelay(self.op.duration):
4961
        raise errors.OpExecError("Error during master delay test")
4962
    if self.op.on_nodes:
4963
      result = rpc.call_test_delay(self.op.on_nodes, self.op.duration)
4964
      if not result:
4965
        raise errors.OpExecError("Complete failure from rpc call")
4966
      for node, node_result in result.items():
4967
        if not node_result:
4968
          raise errors.OpExecError("Failure during rpc call to node %s,"
4969
                                   " result: %s" % (node, node_result))
4970

    
4971

    
4972
class IAllocator(object):
4973
  """IAllocator framework.
4974

4975
  An IAllocator instance has three sets of attributes:
4976
    - cfg/sstore that are needed to query the cluster
4977
    - input data (all members of the _KEYS class attribute are required)
4978
    - four buffer attributes (in|out_data|text), that represent the
4979
      input (to the external script) in text and data structure format,
4980
      and the output from it, again in two formats
4981
    - the result variables from the script (success, info, nodes) for
4982
      easy usage
4983

4984
  """
4985
  _ALLO_KEYS = [
4986
    "mem_size", "disks", "disk_template",
4987
    "os", "tags", "nics", "vcpus",
4988
    ]
4989
  _RELO_KEYS = [
4990
    "relocate_from",
4991
    ]
4992

    
4993
  def __init__(self, cfg, sstore, mode, name, **kwargs):
4994
    self.cfg = cfg
4995
    self.sstore = sstore
4996
    # init buffer variables
4997
    self.in_text = self.out_text = self.in_data = self.out_data = None
4998
    # init all input fields so that pylint is happy
4999
    self.mode = mode
5000
    self.name = name
5001
    self.mem_size = self.disks = self.disk_template = None
5002
    self.os = self.tags = self.nics = self.vcpus = None
5003
    self.relocate_from = None
5004
    # computed fields
5005
    self.required_nodes = None
5006
    # init result fields
5007
    self.success = self.info = self.nodes = None
5008
    if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5009
      keyset = self._ALLO_KEYS
5010
    elif self.mode == constants.IALLOCATOR_MODE_RELOC:
5011
      keyset = self._RELO_KEYS
5012
    else:
5013
      raise errors.ProgrammerError("Unknown mode '%s' passed to the"
5014
                                   " IAllocator" % self.mode)
5015
    for key in kwargs:
5016
      if key not in keyset:
5017
        raise errors.ProgrammerError("Invalid input parameter '%s' to"
5018
                                     " IAllocator" % key)
5019
      setattr(self, key, kwargs[key])
5020
    for key in keyset:
5021
      if key not in kwargs:
5022
        raise errors.ProgrammerError("Missing input parameter '%s' to"
5023
                                     " IAllocator" % key)
5024
    self._BuildInputData()
5025

    
5026
  def _ComputeClusterData(self):
5027
    """Compute the generic allocator input data.
5028

5029
    This is the data that is independent of the actual operation.
5030

5031
    """
5032
    cfg = self.cfg
5033
    # cluster data
5034
    data = {
5035
      "version": 1,
5036
      "cluster_name": self.sstore.GetClusterName(),
5037
      "cluster_tags": list(cfg.GetClusterInfo().GetTags()),
5038
      "hypervisor_type": self.sstore.GetHypervisorType(),
5039
      # we don't have job IDs
5040
      }
5041

    
5042
    i_list = [cfg.GetInstanceInfo(iname) for iname in cfg.GetInstanceList()]
5043

    
5044
    # node data
5045
    node_results = {}
5046
    node_list = cfg.GetNodeList()
5047
    node_data = rpc.call_node_info(node_list, cfg.GetVGName())
5048
    for nname in node_list:
5049
      ninfo = cfg.GetNodeInfo(nname)
5050
      if nname not in node_data or not isinstance(node_data[nname], dict):
5051
        raise errors.OpExecError("Can't get data for node %s" % nname)
5052
      remote_info = node_data[nname]
5053
      for attr in ['memory_total', 'memory_free', 'memory_dom0',
5054
                   'vg_size', 'vg_free', 'cpu_total']:
5055
        if attr not in remote_info:
5056
          raise errors.OpExecError("Node '%s' didn't return attribute '%s'" %
5057
                                   (nname, attr))
5058
        try:
5059
          remote_info[attr] = int(remote_info[attr])
5060
        except ValueError, err:
5061
          raise errors.OpExecError("Node '%s' returned invalid value for '%s':"
5062
                                   " %s" % (nname, attr, str(err)))
5063
      # compute memory used by primary instances
5064
      i_p_mem = i_p_up_mem = 0
5065
      for iinfo in i_list:
5066
        if iinfo.primary_node == nname:
5067
          i_p_mem += iinfo.memory
5068
          if iinfo.status == "up":
5069
            i_p_up_mem += iinfo.memory
5070

    
5071
      # compute memory used by instances
5072
      pnr = {
5073
        "tags": list(ninfo.GetTags()),
5074
        "total_memory": remote_info['memory_total'],
5075
        "reserved_memory": remote_info['memory_dom0'],
5076
        "free_memory": remote_info['memory_free'],
5077
        "i_pri_memory": i_p_mem,
5078
        "i_pri_up_memory": i_p_up_mem,
5079
        "total_disk": remote_info['vg_size'],
5080
        "free_disk": remote_info['vg_free'],
5081
        "primary_ip": ninfo.primary_ip,
5082
        "secondary_ip": ninfo.secondary_ip,
5083
        "total_cpus": remote_info['cpu_total'],
5084
        }
5085
      node_results[nname] = pnr
5086
    data["nodes"] = node_results
5087

    
5088
    # instance data
5089
    instance_data = {}
5090
    for iinfo in i_list:
5091
      nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
5092
                  for n in iinfo.nics]
5093
      pir = {
5094
        "tags": list(iinfo.GetTags()),
5095
        "should_run": iinfo.status == "up",
5096
        "vcpus": iinfo.vcpus,
5097
        "memory": iinfo.memory,
5098
        "os": iinfo.os,
5099
        "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
5100
        "nics": nic_data,
5101
        "disks": [{"size": dsk.size, "mode": "w"} for dsk in iinfo.disks],
5102
        "disk_template": iinfo.disk_template,
5103
        }
5104
      instance_data[iinfo.name] = pir
5105

    
5106
    data["instances"] = instance_data
5107

    
5108
    self.in_data = data
5109

    
5110
  def _AddNewInstance(self):
5111
    """Add new instance data to allocator structure.
5112

5113
    This in combination with _AllocatorGetClusterData will create the
5114
    correct structure needed as input for the allocator.
5115

5116
    The checks for the completeness of the opcode must have already been
5117
    done.
5118

5119
    """
5120
    data = self.in_data
5121
    if len(self.disks) != 2:
5122
      raise errors.OpExecError("Only two-disk configurations supported")
5123

    
5124
    disk_space = _ComputeDiskSize(self.disk_template,
5125
                                  self.disks[0]["size"], self.disks[1]["size"])
5126

    
5127
    if self.disk_template in constants.DTS_NET_MIRROR:
5128
      self.required_nodes = 2
5129
    else:
5130
      self.required_nodes = 1
5131
    request = {
5132
      "type": "allocate",
5133
      "name": self.name,
5134
      "disk_template": self.disk_template,
5135
      "tags": self.tags,
5136
      "os": self.os,
5137
      "vcpus": self.vcpus,
5138
      "memory": self.mem_size,
5139
      "disks": self.disks,
5140
      "disk_space_total": disk_space,
5141
      "nics": self.nics,
5142
      "required_nodes": self.required_nodes,
5143
      }
5144
    data["request"] = request
5145

    
5146
  def _AddRelocateInstance(self):
5147
    """Add relocate instance data to allocator structure.
5148

5149
    This in combination with _IAllocatorGetClusterData will create the
5150
    correct structure needed as input for the allocator.
5151

5152
    The checks for the completeness of the opcode must have already been
5153
    done.
5154

5155
    """
5156
    instance = self.cfg.GetInstanceInfo(self.name)
5157
    if instance is None:
5158
      raise errors.ProgrammerError("Unknown instance '%s' passed to"
5159
                                   " IAllocator" % self.name)
5160

    
5161
    if instance.disk_template not in constants.DTS_NET_MIRROR:
5162
      raise errors.OpPrereqError("Can't relocate non-mirrored instances")
5163

    
5164
    if len(instance.secondary_nodes) != 1:
5165
      raise errors.OpPrereqError("Instance has not exactly one secondary node")
5166

    
5167
    self.required_nodes = 1
5168

    
5169
    disk_space = _ComputeDiskSize(instance.disk_template,
5170
                                  instance.disks[0].size,
5171
                                  instance.disks[1].size)
5172

    
5173
    request = {
5174
      "type": "relocate",
5175
      "name": self.name,
5176
      "disk_space_total": disk_space,
5177
      "required_nodes": self.required_nodes,
5178
      "relocate_from": self.relocate_from,
5179
      }
5180
    self.in_data["request"] = request
5181

    
5182
  def _BuildInputData(self):
5183
    """Build input data structures.
5184

5185
    """
5186
    self._ComputeClusterData()
5187

    
5188
    if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5189
      self._AddNewInstance()
5190
    else:
5191
      self._AddRelocateInstance()
5192

    
5193
    self.in_text = serializer.Dump(self.in_data)
5194

    
5195
  def Run(self, name, validate=True, call_fn=rpc.call_iallocator_runner):
5196
    """Run an instance allocator and return the results.
5197

5198
    """
5199
    data = self.in_text
5200

    
5201
    result = call_fn(self.sstore.GetMasterNode(), name, self.in_text)
5202

    
5203
    if not isinstance(result, (list, tuple)) or len(result) != 4:
5204
      raise errors.OpExecError("Invalid result from master iallocator runner")
5205

    
5206
    rcode, stdout, stderr, fail = result
5207

    
5208
    if rcode == constants.IARUN_NOTFOUND:
5209
      raise errors.OpExecError("Can't find allocator '%s'" % name)
5210
    elif rcode == constants.IARUN_FAILURE:
5211
      raise errors.OpExecError("Instance allocator call failed: %s,"
5212
                               " output: %s" % (fail, stdout+stderr))
5213
    self.out_text = stdout
5214
    if validate:
5215
      self._ValidateResult()
5216

    
5217
  def _ValidateResult(self):
5218
    """Process the allocator results.
5219

5220
    This will process and if successful save the result in
5221
    self.out_data and the other parameters.
5222

5223
    """
5224
    try:
5225
      rdict = serializer.Load(self.out_text)
5226
    except Exception, err:
5227
      raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
5228

    
5229
    if not isinstance(rdict, dict):
5230
      raise errors.OpExecError("Can't parse iallocator results: not a dict")
5231

    
5232
    for key in "success", "info", "nodes":
5233
      if key not in rdict:
5234
        raise errors.OpExecError("Can't parse iallocator results:"
5235
                                 " missing key '%s'" % key)
5236
      setattr(self, key, rdict[key])
5237

    
5238
    if not isinstance(rdict["nodes"], list):
5239
      raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
5240
                               " is not a list")
5241
    self.out_data = rdict
5242

    
5243

    
5244
class LUTestAllocator(NoHooksLU):
5245
  """Run allocator tests.
5246

5247
  This LU runs the allocator tests
5248

5249
  """
5250
  _OP_REQP = ["direction", "mode", "name"]
5251

    
5252
  def CheckPrereq(self):
5253
    """Check prerequisites.
5254

5255
    This checks the opcode parameters depending on the director and mode test.
5256

5257
    """
5258
    if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5259
      for attr in ["name", "mem_size", "disks", "disk_template",
5260
                   "os", "tags", "nics", "vcpus"]:
5261
        if not hasattr(self.op, attr):
5262
          raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
5263
                                     attr)
5264
      iname = self.cfg.ExpandInstanceName(self.op.name)
5265
      if iname is not None:
5266
        raise errors.OpPrereqError("Instance '%s' already in the cluster" %
5267
                                   iname)
5268
      if not isinstance(self.op.nics, list):
5269
        raise errors.OpPrereqError("Invalid parameter 'nics'")
5270
      for row in self.op.nics:
5271
        if (not isinstance(row, dict) or
5272
            "mac" not in row or
5273
            "ip" not in row or
5274
            "bridge" not in row):
5275
          raise errors.OpPrereqError("Invalid contents of the"
5276
                                     " 'nics' parameter")
5277
      if not isinstance(self.op.disks, list):
5278
        raise errors.OpPrereqError("Invalid parameter 'disks'")
5279
      if len(self.op.disks) != 2:
5280
        raise errors.OpPrereqError("Only two-disk configurations supported")
5281
      for row in self.op.disks:
5282
        if (not isinstance(row, dict) or
5283
            "size" not in row or
5284
            not isinstance(row["size"], int) or
5285
            "mode" not in row or
5286
            row["mode"] not in ['r', 'w']):
5287
          raise errors.OpPrereqError("Invalid contents of the"
5288
                                     " 'disks' parameter")
5289
    elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
5290
      if not hasattr(self.op, "name"):
5291
        raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
5292
      fname = self.cfg.ExpandInstanceName(self.op.name)
5293
      if fname is None:
5294
        raise errors.OpPrereqError("Instance '%s' not found for relocation" %
5295
                                   self.op.name)
5296
      self.op.name = fname
5297
      self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
5298
    else:
5299
      raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
5300
                                 self.op.mode)
5301

    
5302
    if self.op.direction == constants.IALLOCATOR_DIR_OUT:
5303
      if not hasattr(self.op, "allocator") or self.op.allocator is None:
5304
        raise errors.OpPrereqError("Missing allocator name")
5305
    elif self.op.direction != constants.IALLOCATOR_DIR_IN:
5306
      raise errors.OpPrereqError("Wrong allocator test '%s'" %
5307
                                 self.op.direction)
5308

    
5309
  def Exec(self, feedback_fn):
5310
    """Run the allocator test.
5311

5312
    """
5313
    if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5314
      ial = IAllocator(self.cfg, self.sstore,
5315
                       mode=self.op.mode,
5316
                       name=self.op.name,
5317
                       mem_size=self.op.mem_size,
5318
                       disks=self.op.disks,
5319
                       disk_template=self.op.disk_template,
5320
                       os=self.op.os,
5321
                       tags=self.op.tags,
5322
                       nics=self.op.nics,
5323
                       vcpus=self.op.vcpus,
5324
                       )
5325
    else:
5326
      ial = IAllocator(self.cfg, self.sstore,
5327
                       mode=self.op.mode,
5328
                       name=self.op.name,
5329
                       relocate_from=list(self.relocate_from),
5330
                       )
5331

    
5332
    if self.op.direction == constants.IALLOCATOR_DIR_IN:
5333
      result = ial.in_text
5334
    else:
5335
      ial.Run(self.op.allocator, validate=False)
5336
      result = ial.out_text
5337
    return result