Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ d4b9d97f

History | View | Annotate | Download (182.8 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0613,W0201
25

    
26
import os
27
import os.path
28
import sha
29
import time
30
import tempfile
31
import re
32
import platform
33

    
34
from ganeti import rpc
35
from ganeti import ssh
36
from ganeti import logger
37
from ganeti import utils
38
from ganeti import errors
39
from ganeti import hypervisor
40
from ganeti import locking
41
from ganeti import constants
42
from ganeti import objects
43
from ganeti import opcodes
44
from ganeti import serializer
45

    
46

    
47
class LogicalUnit(object):
48
  """Logical Unit base class.
49

50
  Subclasses must follow these rules:
51
    - implement ExpandNames
52
    - implement CheckPrereq
53
    - implement Exec
54
    - implement BuildHooksEnv
55
    - redefine HPATH and HTYPE
56
    - optionally redefine their run requirements:
57
        REQ_MASTER: the LU needs to run on the master node
58
        REQ_WSSTORE: the LU needs a writable SimpleStore
59
        REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
60

61
  Note that all commands require root permissions.
62

63
  """
64
  HPATH = None
65
  HTYPE = None
66
  _OP_REQP = []
67
  REQ_MASTER = True
68
  REQ_WSSTORE = False
69
  REQ_BGL = True
70

    
71
  def __init__(self, processor, op, context, sstore):
72
    """Constructor for LogicalUnit.
73

74
    This needs to be overriden in derived classes in order to check op
75
    validity.
76

77
    """
78
    self.proc = processor
79
    self.op = op
80
    self.cfg = context.cfg
81
    self.sstore = sstore
82
    self.context = context
83
    self.needed_locks = None
84
    self.acquired_locks = {}
85
    self.share_locks = dict(((i, 0) for i in locking.LEVELS))
86
    # Used to force good behavior when calling helper functions
87
    self.recalculate_locks = {}
88
    self.__ssh = None
89

    
90
    for attr_name in self._OP_REQP:
91
      attr_val = getattr(op, attr_name, None)
92
      if attr_val is None:
93
        raise errors.OpPrereqError("Required parameter '%s' missing" %
94
                                   attr_name)
95

    
96
    if not self.cfg.IsCluster():
97
      raise errors.OpPrereqError("Cluster not initialized yet,"
98
                                 " use 'gnt-cluster init' first.")
99
    if self.REQ_MASTER:
100
      master = sstore.GetMasterNode()
101
      if master != utils.HostInfo().name:
102
        raise errors.OpPrereqError("Commands must be run on the master"
103
                                   " node %s" % master)
104

    
105
  def __GetSSH(self):
106
    """Returns the SshRunner object
107

108
    """
109
    if not self.__ssh:
110
      self.__ssh = ssh.SshRunner(self.sstore)
111
    return self.__ssh
112

    
113
  ssh = property(fget=__GetSSH)
114

    
115
  def ExpandNames(self):
116
    """Expand names for this LU.
117

118
    This method is called before starting to execute the opcode, and it should
119
    update all the parameters of the opcode to their canonical form (e.g. a
120
    short node name must be fully expanded after this method has successfully
121
    completed). This way locking, hooks, logging, ecc. can work correctly.
122

123
    LUs which implement this method must also populate the self.needed_locks
124
    member, as a dict with lock levels as keys, and a list of needed lock names
125
    as values. Rules:
126
      - Use an empty dict if you don't need any lock
127
      - If you don't need any lock at a particular level omit that level
128
      - Don't put anything for the BGL level
129
      - If you want all locks at a level use locking.ALL_SET as a value
130

131
    If you need to share locks (rather than acquire them exclusively) at one
132
    level you can modify self.share_locks, setting a true value (usually 1) for
133
    that level. By default locks are not shared.
134

135
    Examples:
136
    # Acquire all nodes and one instance
137
    self.needed_locks = {
138
      locking.LEVEL_NODE: locking.ALL_SET,
139
      locking.LEVEL_INSTANCE: ['instance1.example.tld'],
140
    }
141
    # Acquire just two nodes
142
    self.needed_locks = {
143
      locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
144
    }
145
    # Acquire no locks
146
    self.needed_locks = {} # No, you can't leave it to the default value None
147

148
    """
149
    # The implementation of this method is mandatory only if the new LU is
150
    # concurrent, so that old LUs don't need to be changed all at the same
151
    # time.
152
    if self.REQ_BGL:
153
      self.needed_locks = {} # Exclusive LUs don't need locks.
154
    else:
155
      raise NotImplementedError
156

    
157
  def DeclareLocks(self, level):
158
    """Declare LU locking needs for a level
159

160
    While most LUs can just declare their locking needs at ExpandNames time,
161
    sometimes there's the need to calculate some locks after having acquired
162
    the ones before. This function is called just before acquiring locks at a
163
    particular level, but after acquiring the ones at lower levels, and permits
164
    such calculations. It can be used to modify self.needed_locks, and by
165
    default it does nothing.
166

167
    This function is only called if you have something already set in
168
    self.needed_locks for the level.
169

170
    @param level: Locking level which is going to be locked
171
    @type level: member of ganeti.locking.LEVELS
172

173
    """
174

    
175
  def CheckPrereq(self):
176
    """Check prerequisites for this LU.
177

178
    This method should check that the prerequisites for the execution
179
    of this LU are fulfilled. It can do internode communication, but
180
    it should be idempotent - no cluster or system changes are
181
    allowed.
182

183
    The method should raise errors.OpPrereqError in case something is
184
    not fulfilled. Its return value is ignored.
185

186
    This method should also update all the parameters of the opcode to
187
    their canonical form if it hasn't been done by ExpandNames before.
188

189
    """
190
    raise NotImplementedError
191

    
192
  def Exec(self, feedback_fn):
193
    """Execute the LU.
194

195
    This method should implement the actual work. It should raise
196
    errors.OpExecError for failures that are somewhat dealt with in
197
    code, or expected.
198

199
    """
200
    raise NotImplementedError
201

    
202
  def BuildHooksEnv(self):
203
    """Build hooks environment for this LU.
204

205
    This method should return a three-node tuple consisting of: a dict
206
    containing the environment that will be used for running the
207
    specific hook for this LU, a list of node names on which the hook
208
    should run before the execution, and a list of node names on which
209
    the hook should run after the execution.
210

211
    The keys of the dict must not have 'GANETI_' prefixed as this will
212
    be handled in the hooks runner. Also note additional keys will be
213
    added by the hooks runner. If the LU doesn't define any
214
    environment, an empty dict (and not None) should be returned.
215

216
    No nodes should be returned as an empty list (and not None).
217

218
    Note that if the HPATH for a LU class is None, this function will
219
    not be called.
220

221
    """
222
    raise NotImplementedError
223

    
224
  def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
225
    """Notify the LU about the results of its hooks.
226

227
    This method is called every time a hooks phase is executed, and notifies
228
    the Logical Unit about the hooks' result. The LU can then use it to alter
229
    its result based on the hooks.  By default the method does nothing and the
230
    previous result is passed back unchanged but any LU can define it if it
231
    wants to use the local cluster hook-scripts somehow.
232

233
    Args:
234
      phase: the hooks phase that has just been run
235
      hooks_results: the results of the multi-node hooks rpc call
236
      feedback_fn: function to send feedback back to the caller
237
      lu_result: the previous result this LU had, or None in the PRE phase.
238

239
    """
240
    return lu_result
241

    
242
  def _ExpandAndLockInstance(self):
243
    """Helper function to expand and lock an instance.
244

245
    Many LUs that work on an instance take its name in self.op.instance_name
246
    and need to expand it and then declare the expanded name for locking. This
247
    function does it, and then updates self.op.instance_name to the expanded
248
    name. It also initializes needed_locks as a dict, if this hasn't been done
249
    before.
250

251
    """
252
    if self.needed_locks is None:
253
      self.needed_locks = {}
254
    else:
255
      assert locking.LEVEL_INSTANCE not in self.needed_locks, \
256
        "_ExpandAndLockInstance called with instance-level locks set"
257
    expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
258
    if expanded_name is None:
259
      raise errors.OpPrereqError("Instance '%s' not known" %
260
                                  self.op.instance_name)
261
    self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
262
    self.op.instance_name = expanded_name
263

    
264
  def _LockInstancesNodes(self, primary_only=False):
265
    """Helper function to declare instances' nodes for locking.
266

267
    This function should be called after locking one or more instances to lock
268
    their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
269
    with all primary or secondary nodes for instances already locked and
270
    present in self.needed_locks[locking.LEVEL_INSTANCE].
271

272
    It should be called from DeclareLocks, and for safety only works if
273
    self.recalculate_locks[locking.LEVEL_NODE] is set.
274

275
    In the future it may grow parameters to just lock some instance's nodes, or
276
    to just lock primaries or secondary nodes, if needed.
277

278
    If should be called in DeclareLocks in a way similar to:
279

280
    if level == locking.LEVEL_NODE:
281
      self._LockInstancesNodes()
282

283
    @type primary_only: boolean
284
    @param primary_only: only lock primary nodes of locked instances
285

286
    """
287
    assert locking.LEVEL_NODE in self.recalculate_locks, \
288
      "_LockInstancesNodes helper function called with no nodes to recalculate"
289

    
290
    # TODO: check if we're really been called with the instance locks held
291

    
292
    # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
293
    # future we might want to have different behaviors depending on the value
294
    # of self.recalculate_locks[locking.LEVEL_NODE]
295
    wanted_nodes = []
296
    for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
297
      instance = self.context.cfg.GetInstanceInfo(instance_name)
298
      wanted_nodes.append(instance.primary_node)
299
      if not primary_only:
300
        wanted_nodes.extend(instance.secondary_nodes)
301

    
302
    if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
303
      self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
304
    elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
305
      self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
306

    
307
    del self.recalculate_locks[locking.LEVEL_NODE]
308

    
309

    
310
class NoHooksLU(LogicalUnit):
311
  """Simple LU which runs no hooks.
312

313
  This LU is intended as a parent for other LogicalUnits which will
314
  run no hooks, in order to reduce duplicate code.
315

316
  """
317
  HPATH = None
318
  HTYPE = None
319

    
320

    
321
def _GetWantedNodes(lu, nodes):
322
  """Returns list of checked and expanded node names.
323

324
  Args:
325
    nodes: List of nodes (strings) or None for all
326

327
  """
328
  if not isinstance(nodes, list):
329
    raise errors.OpPrereqError("Invalid argument type 'nodes'")
330

    
331
  if not nodes:
332
    raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
333
      " non-empty list of nodes whose name is to be expanded.")
334

    
335
  wanted = []
336
  for name in nodes:
337
    node = lu.cfg.ExpandNodeName(name)
338
    if node is None:
339
      raise errors.OpPrereqError("No such node name '%s'" % name)
340
    wanted.append(node)
341

    
342
  return utils.NiceSort(wanted)
343

    
344

    
345
def _GetWantedInstances(lu, instances):
346
  """Returns list of checked and expanded instance names.
347

348
  Args:
349
    instances: List of instances (strings) or None for all
350

351
  """
352
  if not isinstance(instances, list):
353
    raise errors.OpPrereqError("Invalid argument type 'instances'")
354

    
355
  if instances:
356
    wanted = []
357

    
358
    for name in instances:
359
      instance = lu.cfg.ExpandInstanceName(name)
360
      if instance is None:
361
        raise errors.OpPrereqError("No such instance name '%s'" % name)
362
      wanted.append(instance)
363

    
364
  else:
365
    wanted = lu.cfg.GetInstanceList()
366
  return utils.NiceSort(wanted)
367

    
368

    
369
def _CheckOutputFields(static, dynamic, selected):
370
  """Checks whether all selected fields are valid.
371

372
  Args:
373
    static: Static fields
374
    dynamic: Dynamic fields
375

376
  """
377
  static_fields = frozenset(static)
378
  dynamic_fields = frozenset(dynamic)
379

    
380
  all_fields = static_fields | dynamic_fields
381

    
382
  if not all_fields.issuperset(selected):
383
    raise errors.OpPrereqError("Unknown output fields selected: %s"
384
                               % ",".join(frozenset(selected).
385
                                          difference(all_fields)))
386

    
387

    
388
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
389
                          memory, vcpus, nics):
390
  """Builds instance related env variables for hooks from single variables.
391

392
  Args:
393
    secondary_nodes: List of secondary nodes as strings
394
  """
395
  env = {
396
    "OP_TARGET": name,
397
    "INSTANCE_NAME": name,
398
    "INSTANCE_PRIMARY": primary_node,
399
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
400
    "INSTANCE_OS_TYPE": os_type,
401
    "INSTANCE_STATUS": status,
402
    "INSTANCE_MEMORY": memory,
403
    "INSTANCE_VCPUS": vcpus,
404
  }
405

    
406
  if nics:
407
    nic_count = len(nics)
408
    for idx, (ip, bridge, mac) in enumerate(nics):
409
      if ip is None:
410
        ip = ""
411
      env["INSTANCE_NIC%d_IP" % idx] = ip
412
      env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
413
      env["INSTANCE_NIC%d_HWADDR" % idx] = mac
414
  else:
415
    nic_count = 0
416

    
417
  env["INSTANCE_NIC_COUNT"] = nic_count
418

    
419
  return env
420

    
421

    
422
def _BuildInstanceHookEnvByObject(instance, override=None):
423
  """Builds instance related env variables for hooks from an object.
424

425
  Args:
426
    instance: objects.Instance object of instance
427
    override: dict of values to override
428
  """
429
  args = {
430
    'name': instance.name,
431
    'primary_node': instance.primary_node,
432
    'secondary_nodes': instance.secondary_nodes,
433
    'os_type': instance.os,
434
    'status': instance.os,
435
    'memory': instance.memory,
436
    'vcpus': instance.vcpus,
437
    'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
438
  }
439
  if override:
440
    args.update(override)
441
  return _BuildInstanceHookEnv(**args)
442

    
443

    
444
def _CheckInstanceBridgesExist(instance):
445
  """Check that the brigdes needed by an instance exist.
446

447
  """
448
  # check bridges existance
449
  brlist = [nic.bridge for nic in instance.nics]
450
  if not rpc.call_bridges_exist(instance.primary_node, brlist):
451
    raise errors.OpPrereqError("one or more target bridges %s does not"
452
                               " exist on destination node '%s'" %
453
                               (brlist, instance.primary_node))
454

    
455

    
456
class LUDestroyCluster(NoHooksLU):
457
  """Logical unit for destroying the cluster.
458

459
  """
460
  _OP_REQP = []
461

    
462
  def CheckPrereq(self):
463
    """Check prerequisites.
464

465
    This checks whether the cluster is empty.
466

467
    Any errors are signalled by raising errors.OpPrereqError.
468

469
    """
470
    master = self.sstore.GetMasterNode()
471

    
472
    nodelist = self.cfg.GetNodeList()
473
    if len(nodelist) != 1 or nodelist[0] != master:
474
      raise errors.OpPrereqError("There are still %d node(s) in"
475
                                 " this cluster." % (len(nodelist) - 1))
476
    instancelist = self.cfg.GetInstanceList()
477
    if instancelist:
478
      raise errors.OpPrereqError("There are still %d instance(s) in"
479
                                 " this cluster." % len(instancelist))
480

    
481
  def Exec(self, feedback_fn):
482
    """Destroys the cluster.
483

484
    """
485
    master = self.sstore.GetMasterNode()
486
    if not rpc.call_node_stop_master(master, False):
487
      raise errors.OpExecError("Could not disable the master role")
488
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
489
    utils.CreateBackup(priv_key)
490
    utils.CreateBackup(pub_key)
491
    return master
492

    
493

    
494
class LUVerifyCluster(LogicalUnit):
495
  """Verifies the cluster status.
496

497
  """
498
  HPATH = "cluster-verify"
499
  HTYPE = constants.HTYPE_CLUSTER
500
  _OP_REQP = ["skip_checks"]
501
  REQ_BGL = False
502

    
503
  def ExpandNames(self):
504
    self.needed_locks = {
505
      locking.LEVEL_NODE: locking.ALL_SET,
506
      locking.LEVEL_INSTANCE: locking.ALL_SET,
507
    }
508
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
509

    
510
  def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
511
                  remote_version, feedback_fn):
512
    """Run multiple tests against a node.
513

514
    Test list:
515
      - compares ganeti version
516
      - checks vg existance and size > 20G
517
      - checks config file checksum
518
      - checks ssh to other nodes
519

520
    Args:
521
      node: name of the node to check
522
      file_list: required list of files
523
      local_cksum: dictionary of local files and their checksums
524

525
    """
526
    # compares ganeti version
527
    local_version = constants.PROTOCOL_VERSION
528
    if not remote_version:
529
      feedback_fn("  - ERROR: connection to %s failed" % (node))
530
      return True
531

    
532
    if local_version != remote_version:
533
      feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
534
                      (local_version, node, remote_version))
535
      return True
536

    
537
    # checks vg existance and size > 20G
538

    
539
    bad = False
540
    if not vglist:
541
      feedback_fn("  - ERROR: unable to check volume groups on node %s." %
542
                      (node,))
543
      bad = True
544
    else:
545
      vgstatus = utils.CheckVolumeGroupSize(vglist, self.cfg.GetVGName(),
546
                                            constants.MIN_VG_SIZE)
547
      if vgstatus:
548
        feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
549
        bad = True
550

    
551
    # checks config file checksum
552
    # checks ssh to any
553

    
554
    if 'filelist' not in node_result:
555
      bad = True
556
      feedback_fn("  - ERROR: node hasn't returned file checksum data")
557
    else:
558
      remote_cksum = node_result['filelist']
559
      for file_name in file_list:
560
        if file_name not in remote_cksum:
561
          bad = True
562
          feedback_fn("  - ERROR: file '%s' missing" % file_name)
563
        elif remote_cksum[file_name] != local_cksum[file_name]:
564
          bad = True
565
          feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
566

    
567
    if 'nodelist' not in node_result:
568
      bad = True
569
      feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
570
    else:
571
      if node_result['nodelist']:
572
        bad = True
573
        for node in node_result['nodelist']:
574
          feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
575
                          (node, node_result['nodelist'][node]))
576
    if 'node-net-test' not in node_result:
577
      bad = True
578
      feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
579
    else:
580
      if node_result['node-net-test']:
581
        bad = True
582
        nlist = utils.NiceSort(node_result['node-net-test'].keys())
583
        for node in nlist:
584
          feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
585
                          (node, node_result['node-net-test'][node]))
586

    
587
    hyp_result = node_result.get('hypervisor', None)
588
    if hyp_result is not None:
589
      feedback_fn("  - ERROR: hypervisor verify failure: '%s'" % hyp_result)
590
    return bad
591

    
592
  def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
593
                      node_instance, feedback_fn):
594
    """Verify an instance.
595

596
    This function checks to see if the required block devices are
597
    available on the instance's node.
598

599
    """
600
    bad = False
601

    
602
    node_current = instanceconfig.primary_node
603

    
604
    node_vol_should = {}
605
    instanceconfig.MapLVsByNode(node_vol_should)
606

    
607
    for node in node_vol_should:
608
      for volume in node_vol_should[node]:
609
        if node not in node_vol_is or volume not in node_vol_is[node]:
610
          feedback_fn("  - ERROR: volume %s missing on node %s" %
611
                          (volume, node))
612
          bad = True
613

    
614
    if not instanceconfig.status == 'down':
615
      if (node_current not in node_instance or
616
          not instance in node_instance[node_current]):
617
        feedback_fn("  - ERROR: instance %s not running on node %s" %
618
                        (instance, node_current))
619
        bad = True
620

    
621
    for node in node_instance:
622
      if (not node == node_current):
623
        if instance in node_instance[node]:
624
          feedback_fn("  - ERROR: instance %s should not run on node %s" %
625
                          (instance, node))
626
          bad = True
627

    
628
    return bad
629

    
630
  def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
631
    """Verify if there are any unknown volumes in the cluster.
632

633
    The .os, .swap and backup volumes are ignored. All other volumes are
634
    reported as unknown.
635

636
    """
637
    bad = False
638

    
639
    for node in node_vol_is:
640
      for volume in node_vol_is[node]:
641
        if node not in node_vol_should or volume not in node_vol_should[node]:
642
          feedback_fn("  - ERROR: volume %s on node %s should not exist" %
643
                      (volume, node))
644
          bad = True
645
    return bad
646

    
647
  def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
648
    """Verify the list of running instances.
649

650
    This checks what instances are running but unknown to the cluster.
651

652
    """
653
    bad = False
654
    for node in node_instance:
655
      for runninginstance in node_instance[node]:
656
        if runninginstance not in instancelist:
657
          feedback_fn("  - ERROR: instance %s on node %s should not exist" %
658
                          (runninginstance, node))
659
          bad = True
660
    return bad
661

    
662
  def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
663
    """Verify N+1 Memory Resilience.
664

665
    Check that if one single node dies we can still start all the instances it
666
    was primary for.
667

668
    """
669
    bad = False
670

    
671
    for node, nodeinfo in node_info.iteritems():
672
      # This code checks that every node which is now listed as secondary has
673
      # enough memory to host all instances it is supposed to should a single
674
      # other node in the cluster fail.
675
      # FIXME: not ready for failover to an arbitrary node
676
      # FIXME: does not support file-backed instances
677
      # WARNING: we currently take into account down instances as well as up
678
      # ones, considering that even if they're down someone might want to start
679
      # them even in the event of a node failure.
680
      for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
681
        needed_mem = 0
682
        for instance in instances:
683
          needed_mem += instance_cfg[instance].memory
684
        if nodeinfo['mfree'] < needed_mem:
685
          feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
686
                      " failovers should node %s fail" % (node, prinode))
687
          bad = True
688
    return bad
689

    
690
  def CheckPrereq(self):
691
    """Check prerequisites.
692

693
    Transform the list of checks we're going to skip into a set and check that
694
    all its members are valid.
695

696
    """
697
    self.skip_set = frozenset(self.op.skip_checks)
698
    if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
699
      raise errors.OpPrereqError("Invalid checks to be skipped specified")
700

    
701
  def BuildHooksEnv(self):
702
    """Build hooks env.
703

704
    Cluster-Verify hooks just rone in the post phase and their failure makes
705
    the output be logged in the verify output and the verification to fail.
706

707
    """
708
    all_nodes = self.cfg.GetNodeList()
709
    # TODO: populate the environment with useful information for verify hooks
710
    env = {}
711
    return env, [], all_nodes
712

    
713
  def Exec(self, feedback_fn):
714
    """Verify integrity of cluster, performing various test on nodes.
715

716
    """
717
    bad = False
718
    feedback_fn("* Verifying global settings")
719
    for msg in self.cfg.VerifyConfig():
720
      feedback_fn("  - ERROR: %s" % msg)
721

    
722
    vg_name = self.cfg.GetVGName()
723
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
724
    nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
725
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
726
    i_non_redundant = [] # Non redundant instances
727
    node_volume = {}
728
    node_instance = {}
729
    node_info = {}
730
    instance_cfg = {}
731

    
732
    # FIXME: verify OS list
733
    # do local checksums
734
    file_names = list(self.sstore.GetFileList())
735
    file_names.append(constants.SSL_CERT_FILE)
736
    file_names.append(constants.CLUSTER_CONF_FILE)
737
    local_checksums = utils.FingerprintFiles(file_names)
738

    
739
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
740
    all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
741
    all_instanceinfo = rpc.call_instance_list(nodelist)
742
    all_vglist = rpc.call_vg_list(nodelist)
743
    node_verify_param = {
744
      'filelist': file_names,
745
      'nodelist': nodelist,
746
      'hypervisor': None,
747
      'node-net-test': [(node.name, node.primary_ip, node.secondary_ip)
748
                        for node in nodeinfo]
749
      }
750
    all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
751
    all_rversion = rpc.call_version(nodelist)
752
    all_ninfo = rpc.call_node_info(nodelist, self.cfg.GetVGName())
753

    
754
    for node in nodelist:
755
      feedback_fn("* Verifying node %s" % node)
756
      result = self._VerifyNode(node, file_names, local_checksums,
757
                                all_vglist[node], all_nvinfo[node],
758
                                all_rversion[node], feedback_fn)
759
      bad = bad or result
760

    
761
      # node_volume
762
      volumeinfo = all_volumeinfo[node]
763

    
764
      if isinstance(volumeinfo, basestring):
765
        feedback_fn("  - ERROR: LVM problem on node %s: %s" %
766
                    (node, volumeinfo[-400:].encode('string_escape')))
767
        bad = True
768
        node_volume[node] = {}
769
      elif not isinstance(volumeinfo, dict):
770
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
771
        bad = True
772
        continue
773
      else:
774
        node_volume[node] = volumeinfo
775

    
776
      # node_instance
777
      nodeinstance = all_instanceinfo[node]
778
      if type(nodeinstance) != list:
779
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
780
        bad = True
781
        continue
782

    
783
      node_instance[node] = nodeinstance
784

    
785
      # node_info
786
      nodeinfo = all_ninfo[node]
787
      if not isinstance(nodeinfo, dict):
788
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
789
        bad = True
790
        continue
791

    
792
      try:
793
        node_info[node] = {
794
          "mfree": int(nodeinfo['memory_free']),
795
          "dfree": int(nodeinfo['vg_free']),
796
          "pinst": [],
797
          "sinst": [],
798
          # dictionary holding all instances this node is secondary for,
799
          # grouped by their primary node. Each key is a cluster node, and each
800
          # value is a list of instances which have the key as primary and the
801
          # current node as secondary.  this is handy to calculate N+1 memory
802
          # availability if you can only failover from a primary to its
803
          # secondary.
804
          "sinst-by-pnode": {},
805
        }
806
      except ValueError:
807
        feedback_fn("  - ERROR: invalid value returned from node %s" % (node,))
808
        bad = True
809
        continue
810

    
811
    node_vol_should = {}
812

    
813
    for instance in instancelist:
814
      feedback_fn("* Verifying instance %s" % instance)
815
      inst_config = self.cfg.GetInstanceInfo(instance)
816
      result =  self._VerifyInstance(instance, inst_config, node_volume,
817
                                     node_instance, feedback_fn)
818
      bad = bad or result
819

    
820
      inst_config.MapLVsByNode(node_vol_should)
821

    
822
      instance_cfg[instance] = inst_config
823

    
824
      pnode = inst_config.primary_node
825
      if pnode in node_info:
826
        node_info[pnode]['pinst'].append(instance)
827
      else:
828
        feedback_fn("  - ERROR: instance %s, connection to primary node"
829
                    " %s failed" % (instance, pnode))
830
        bad = True
831

    
832
      # If the instance is non-redundant we cannot survive losing its primary
833
      # node, so we are not N+1 compliant. On the other hand we have no disk
834
      # templates with more than one secondary so that situation is not well
835
      # supported either.
836
      # FIXME: does not support file-backed instances
837
      if len(inst_config.secondary_nodes) == 0:
838
        i_non_redundant.append(instance)
839
      elif len(inst_config.secondary_nodes) > 1:
840
        feedback_fn("  - WARNING: multiple secondaries for instance %s"
841
                    % instance)
842

    
843
      for snode in inst_config.secondary_nodes:
844
        if snode in node_info:
845
          node_info[snode]['sinst'].append(instance)
846
          if pnode not in node_info[snode]['sinst-by-pnode']:
847
            node_info[snode]['sinst-by-pnode'][pnode] = []
848
          node_info[snode]['sinst-by-pnode'][pnode].append(instance)
849
        else:
850
          feedback_fn("  - ERROR: instance %s, connection to secondary node"
851
                      " %s failed" % (instance, snode))
852

    
853
    feedback_fn("* Verifying orphan volumes")
854
    result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
855
                                       feedback_fn)
856
    bad = bad or result
857

    
858
    feedback_fn("* Verifying remaining instances")
859
    result = self._VerifyOrphanInstances(instancelist, node_instance,
860
                                         feedback_fn)
861
    bad = bad or result
862

    
863
    if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
864
      feedback_fn("* Verifying N+1 Memory redundancy")
865
      result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
866
      bad = bad or result
867

    
868
    feedback_fn("* Other Notes")
869
    if i_non_redundant:
870
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
871
                  % len(i_non_redundant))
872

    
873
    return not bad
874

    
875
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
876
    """Analize the post-hooks' result, handle it, and send some
877
    nicely-formatted feedback back to the user.
878

879
    Args:
880
      phase: the hooks phase that has just been run
881
      hooks_results: the results of the multi-node hooks rpc call
882
      feedback_fn: function to send feedback back to the caller
883
      lu_result: previous Exec result
884

885
    """
886
    # We only really run POST phase hooks, and are only interested in
887
    # their results
888
    if phase == constants.HOOKS_PHASE_POST:
889
      # Used to change hooks' output to proper indentation
890
      indent_re = re.compile('^', re.M)
891
      feedback_fn("* Hooks Results")
892
      if not hooks_results:
893
        feedback_fn("  - ERROR: general communication failure")
894
        lu_result = 1
895
      else:
896
        for node_name in hooks_results:
897
          show_node_header = True
898
          res = hooks_results[node_name]
899
          if res is False or not isinstance(res, list):
900
            feedback_fn("    Communication failure")
901
            lu_result = 1
902
            continue
903
          for script, hkr, output in res:
904
            if hkr == constants.HKR_FAIL:
905
              # The node header is only shown once, if there are
906
              # failing hooks on that node
907
              if show_node_header:
908
                feedback_fn("  Node %s:" % node_name)
909
                show_node_header = False
910
              feedback_fn("    ERROR: Script %s failed, output:" % script)
911
              output = indent_re.sub('      ', output)
912
              feedback_fn("%s" % output)
913
              lu_result = 1
914

    
915
      return lu_result
916

    
917

    
918
class LUVerifyDisks(NoHooksLU):
919
  """Verifies the cluster disks status.
920

921
  """
922
  _OP_REQP = []
923
  REQ_BGL = False
924

    
925
  def ExpandNames(self):
926
    self.needed_locks = {
927
      locking.LEVEL_NODE: locking.ALL_SET,
928
      locking.LEVEL_INSTANCE: locking.ALL_SET,
929
    }
930
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
931

    
932
  def CheckPrereq(self):
933
    """Check prerequisites.
934

935
    This has no prerequisites.
936

937
    """
938
    pass
939

    
940
  def Exec(self, feedback_fn):
941
    """Verify integrity of cluster disks.
942

943
    """
944
    result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
945

    
946
    vg_name = self.cfg.GetVGName()
947
    nodes = utils.NiceSort(self.cfg.GetNodeList())
948
    instances = [self.cfg.GetInstanceInfo(name)
949
                 for name in self.cfg.GetInstanceList()]
950

    
951
    nv_dict = {}
952
    for inst in instances:
953
      inst_lvs = {}
954
      if (inst.status != "up" or
955
          inst.disk_template not in constants.DTS_NET_MIRROR):
956
        continue
957
      inst.MapLVsByNode(inst_lvs)
958
      # transform { iname: {node: [vol,],},} to {(node, vol): iname}
959
      for node, vol_list in inst_lvs.iteritems():
960
        for vol in vol_list:
961
          nv_dict[(node, vol)] = inst
962

    
963
    if not nv_dict:
964
      return result
965

    
966
    node_lvs = rpc.call_volume_list(nodes, vg_name)
967

    
968
    to_act = set()
969
    for node in nodes:
970
      # node_volume
971
      lvs = node_lvs[node]
972

    
973
      if isinstance(lvs, basestring):
974
        logger.Info("error enumerating LVs on node %s: %s" % (node, lvs))
975
        res_nlvm[node] = lvs
976
      elif not isinstance(lvs, dict):
977
        logger.Info("connection to node %s failed or invalid data returned" %
978
                    (node,))
979
        res_nodes.append(node)
980
        continue
981

    
982
      for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
983
        inst = nv_dict.pop((node, lv_name), None)
984
        if (not lv_online and inst is not None
985
            and inst.name not in res_instances):
986
          res_instances.append(inst.name)
987

    
988
    # any leftover items in nv_dict are missing LVs, let's arrange the
989
    # data better
990
    for key, inst in nv_dict.iteritems():
991
      if inst.name not in res_missing:
992
        res_missing[inst.name] = []
993
      res_missing[inst.name].append(key)
994

    
995
    return result
996

    
997

    
998
class LURenameCluster(LogicalUnit):
999
  """Rename the cluster.
1000

1001
  """
1002
  HPATH = "cluster-rename"
1003
  HTYPE = constants.HTYPE_CLUSTER
1004
  _OP_REQP = ["name"]
1005
  REQ_WSSTORE = True
1006

    
1007
  def BuildHooksEnv(self):
1008
    """Build hooks env.
1009

1010
    """
1011
    env = {
1012
      "OP_TARGET": self.sstore.GetClusterName(),
1013
      "NEW_NAME": self.op.name,
1014
      }
1015
    mn = self.sstore.GetMasterNode()
1016
    return env, [mn], [mn]
1017

    
1018
  def CheckPrereq(self):
1019
    """Verify that the passed name is a valid one.
1020

1021
    """
1022
    hostname = utils.HostInfo(self.op.name)
1023

    
1024
    new_name = hostname.name
1025
    self.ip = new_ip = hostname.ip
1026
    old_name = self.sstore.GetClusterName()
1027
    old_ip = self.sstore.GetMasterIP()
1028
    if new_name == old_name and new_ip == old_ip:
1029
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
1030
                                 " cluster has changed")
1031
    if new_ip != old_ip:
1032
      if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1033
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
1034
                                   " reachable on the network. Aborting." %
1035
                                   new_ip)
1036

    
1037
    self.op.name = new_name
1038

    
1039
  def Exec(self, feedback_fn):
1040
    """Rename the cluster.
1041

1042
    """
1043
    clustername = self.op.name
1044
    ip = self.ip
1045
    ss = self.sstore
1046

    
1047
    # shutdown the master IP
1048
    master = ss.GetMasterNode()
1049
    if not rpc.call_node_stop_master(master, False):
1050
      raise errors.OpExecError("Could not disable the master role")
1051

    
1052
    try:
1053
      # modify the sstore
1054
      ss.SetKey(ss.SS_MASTER_IP, ip)
1055
      ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
1056

    
1057
      # Distribute updated ss config to all nodes
1058
      myself = self.cfg.GetNodeInfo(master)
1059
      dist_nodes = self.cfg.GetNodeList()
1060
      if myself.name in dist_nodes:
1061
        dist_nodes.remove(myself.name)
1062

    
1063
      logger.Debug("Copying updated ssconf data to all nodes")
1064
      for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1065
        fname = ss.KeyToFilename(keyname)
1066
        result = rpc.call_upload_file(dist_nodes, fname)
1067
        for to_node in dist_nodes:
1068
          if not result[to_node]:
1069
            logger.Error("copy of file %s to node %s failed" %
1070
                         (fname, to_node))
1071
    finally:
1072
      if not rpc.call_node_start_master(master, False):
1073
        logger.Error("Could not re-enable the master role on the master,"
1074
                     " please restart manually.")
1075

    
1076

    
1077
def _RecursiveCheckIfLVMBased(disk):
1078
  """Check if the given disk or its children are lvm-based.
1079

1080
  Args:
1081
    disk: ganeti.objects.Disk object
1082

1083
  Returns:
1084
    boolean indicating whether a LD_LV dev_type was found or not
1085

1086
  """
1087
  if disk.children:
1088
    for chdisk in disk.children:
1089
      if _RecursiveCheckIfLVMBased(chdisk):
1090
        return True
1091
  return disk.dev_type == constants.LD_LV
1092

    
1093

    
1094
class LUSetClusterParams(LogicalUnit):
1095
  """Change the parameters of the cluster.
1096

1097
  """
1098
  HPATH = "cluster-modify"
1099
  HTYPE = constants.HTYPE_CLUSTER
1100
  _OP_REQP = []
1101

    
1102
  def BuildHooksEnv(self):
1103
    """Build hooks env.
1104

1105
    """
1106
    env = {
1107
      "OP_TARGET": self.sstore.GetClusterName(),
1108
      "NEW_VG_NAME": self.op.vg_name,
1109
      }
1110
    mn = self.sstore.GetMasterNode()
1111
    return env, [mn], [mn]
1112

    
1113
  def CheckPrereq(self):
1114
    """Check prerequisites.
1115

1116
    This checks whether the given params don't conflict and
1117
    if the given volume group is valid.
1118

1119
    """
1120
    if not self.op.vg_name:
1121
      instances = [self.cfg.GetInstanceInfo(name)
1122
                   for name in self.cfg.GetInstanceList()]
1123
      for inst in instances:
1124
        for disk in inst.disks:
1125
          if _RecursiveCheckIfLVMBased(disk):
1126
            raise errors.OpPrereqError("Cannot disable lvm storage while"
1127
                                       " lvm-based instances exist")
1128

    
1129
    # if vg_name not None, checks given volume group on all nodes
1130
    if self.op.vg_name:
1131
      node_list = self.cfg.GetNodeList()
1132
      vglist = rpc.call_vg_list(node_list)
1133
      for node in node_list:
1134
        vgstatus = utils.CheckVolumeGroupSize(vglist[node], self.op.vg_name,
1135
                                              constants.MIN_VG_SIZE)
1136
        if vgstatus:
1137
          raise errors.OpPrereqError("Error on node '%s': %s" %
1138
                                     (node, vgstatus))
1139

    
1140
  def Exec(self, feedback_fn):
1141
    """Change the parameters of the cluster.
1142

1143
    """
1144
    if self.op.vg_name != self.cfg.GetVGName():
1145
      self.cfg.SetVGName(self.op.vg_name)
1146
    else:
1147
      feedback_fn("Cluster LVM configuration already in desired"
1148
                  " state, not changing")
1149

    
1150

    
1151
def _WaitForSync(cfgw, instance, proc, oneshot=False, unlock=False):
1152
  """Sleep and poll for an instance's disk to sync.
1153

1154
  """
1155
  if not instance.disks:
1156
    return True
1157

    
1158
  if not oneshot:
1159
    proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1160

    
1161
  node = instance.primary_node
1162

    
1163
  for dev in instance.disks:
1164
    cfgw.SetDiskID(dev, node)
1165

    
1166
  retries = 0
1167
  while True:
1168
    max_time = 0
1169
    done = True
1170
    cumul_degraded = False
1171
    rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1172
    if not rstats:
1173
      proc.LogWarning("Can't get any data from node %s" % node)
1174
      retries += 1
1175
      if retries >= 10:
1176
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1177
                                 " aborting." % node)
1178
      time.sleep(6)
1179
      continue
1180
    retries = 0
1181
    for i in range(len(rstats)):
1182
      mstat = rstats[i]
1183
      if mstat is None:
1184
        proc.LogWarning("Can't compute data for node %s/%s" %
1185
                        (node, instance.disks[i].iv_name))
1186
        continue
1187
      # we ignore the ldisk parameter
1188
      perc_done, est_time, is_degraded, _ = mstat
1189
      cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1190
      if perc_done is not None:
1191
        done = False
1192
        if est_time is not None:
1193
          rem_time = "%d estimated seconds remaining" % est_time
1194
          max_time = est_time
1195
        else:
1196
          rem_time = "no time estimate"
1197
        proc.LogInfo("- device %s: %5.2f%% done, %s" %
1198
                     (instance.disks[i].iv_name, perc_done, rem_time))
1199
    if done or oneshot:
1200
      break
1201

    
1202
    time.sleep(min(60, max_time))
1203

    
1204
  if done:
1205
    proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1206
  return not cumul_degraded
1207

    
1208

    
1209
def _CheckDiskConsistency(cfgw, dev, node, on_primary, ldisk=False):
1210
  """Check that mirrors are not degraded.
1211

1212
  The ldisk parameter, if True, will change the test from the
1213
  is_degraded attribute (which represents overall non-ok status for
1214
  the device(s)) to the ldisk (representing the local storage status).
1215

1216
  """
1217
  cfgw.SetDiskID(dev, node)
1218
  if ldisk:
1219
    idx = 6
1220
  else:
1221
    idx = 5
1222

    
1223
  result = True
1224
  if on_primary or dev.AssembleOnSecondary():
1225
    rstats = rpc.call_blockdev_find(node, dev)
1226
    if not rstats:
1227
      logger.ToStderr("Node %s: Disk degraded, not found or node down" % node)
1228
      result = False
1229
    else:
1230
      result = result and (not rstats[idx])
1231
  if dev.children:
1232
    for child in dev.children:
1233
      result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1234

    
1235
  return result
1236

    
1237

    
1238
class LUDiagnoseOS(NoHooksLU):
1239
  """Logical unit for OS diagnose/query.
1240

1241
  """
1242
  _OP_REQP = ["output_fields", "names"]
1243
  REQ_BGL = False
1244

    
1245
  def ExpandNames(self):
1246
    if self.op.names:
1247
      raise errors.OpPrereqError("Selective OS query not supported")
1248

    
1249
    self.dynamic_fields = frozenset(["name", "valid", "node_status"])
1250
    _CheckOutputFields(static=[],
1251
                       dynamic=self.dynamic_fields,
1252
                       selected=self.op.output_fields)
1253

    
1254
    # Lock all nodes, in shared mode
1255
    self.needed_locks = {}
1256
    self.share_locks[locking.LEVEL_NODE] = 1
1257
    self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1258

    
1259
  def CheckPrereq(self):
1260
    """Check prerequisites.
1261

1262
    """
1263

    
1264
  @staticmethod
1265
  def _DiagnoseByOS(node_list, rlist):
1266
    """Remaps a per-node return list into an a per-os per-node dictionary
1267

1268
      Args:
1269
        node_list: a list with the names of all nodes
1270
        rlist: a map with node names as keys and OS objects as values
1271

1272
      Returns:
1273
        map: a map with osnames as keys and as value another map, with
1274
             nodes as
1275
             keys and list of OS objects as values
1276
             e.g. {"debian-etch": {"node1": [<object>,...],
1277
                                   "node2": [<object>,]}
1278
                  }
1279

1280
    """
1281
    all_os = {}
1282
    for node_name, nr in rlist.iteritems():
1283
      if not nr:
1284
        continue
1285
      for os_obj in nr:
1286
        if os_obj.name not in all_os:
1287
          # build a list of nodes for this os containing empty lists
1288
          # for each node in node_list
1289
          all_os[os_obj.name] = {}
1290
          for nname in node_list:
1291
            all_os[os_obj.name][nname] = []
1292
        all_os[os_obj.name][node_name].append(os_obj)
1293
    return all_os
1294

    
1295
  def Exec(self, feedback_fn):
1296
    """Compute the list of OSes.
1297

1298
    """
1299
    node_list = self.acquired_locks[locking.LEVEL_NODE]
1300
    node_data = rpc.call_os_diagnose(node_list)
1301
    if node_data == False:
1302
      raise errors.OpExecError("Can't gather the list of OSes")
1303
    pol = self._DiagnoseByOS(node_list, node_data)
1304
    output = []
1305
    for os_name, os_data in pol.iteritems():
1306
      row = []
1307
      for field in self.op.output_fields:
1308
        if field == "name":
1309
          val = os_name
1310
        elif field == "valid":
1311
          val = utils.all([osl and osl[0] for osl in os_data.values()])
1312
        elif field == "node_status":
1313
          val = {}
1314
          for node_name, nos_list in os_data.iteritems():
1315
            val[node_name] = [(v.status, v.path) for v in nos_list]
1316
        else:
1317
          raise errors.ParameterError(field)
1318
        row.append(val)
1319
      output.append(row)
1320

    
1321
    return output
1322

    
1323

    
1324
class LURemoveNode(LogicalUnit):
1325
  """Logical unit for removing a node.
1326

1327
  """
1328
  HPATH = "node-remove"
1329
  HTYPE = constants.HTYPE_NODE
1330
  _OP_REQP = ["node_name"]
1331

    
1332
  def BuildHooksEnv(self):
1333
    """Build hooks env.
1334

1335
    This doesn't run on the target node in the pre phase as a failed
1336
    node would then be impossible to remove.
1337

1338
    """
1339
    env = {
1340
      "OP_TARGET": self.op.node_name,
1341
      "NODE_NAME": self.op.node_name,
1342
      }
1343
    all_nodes = self.cfg.GetNodeList()
1344
    all_nodes.remove(self.op.node_name)
1345
    return env, all_nodes, all_nodes
1346

    
1347
  def CheckPrereq(self):
1348
    """Check prerequisites.
1349

1350
    This checks:
1351
     - the node exists in the configuration
1352
     - it does not have primary or secondary instances
1353
     - it's not the master
1354

1355
    Any errors are signalled by raising errors.OpPrereqError.
1356

1357
    """
1358
    node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1359
    if node is None:
1360
      raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1361

    
1362
    instance_list = self.cfg.GetInstanceList()
1363

    
1364
    masternode = self.sstore.GetMasterNode()
1365
    if node.name == masternode:
1366
      raise errors.OpPrereqError("Node is the master node,"
1367
                                 " you need to failover first.")
1368

    
1369
    for instance_name in instance_list:
1370
      instance = self.cfg.GetInstanceInfo(instance_name)
1371
      if node.name == instance.primary_node:
1372
        raise errors.OpPrereqError("Instance %s still running on the node,"
1373
                                   " please remove first." % instance_name)
1374
      if node.name in instance.secondary_nodes:
1375
        raise errors.OpPrereqError("Instance %s has node as a secondary,"
1376
                                   " please remove first." % instance_name)
1377
    self.op.node_name = node.name
1378
    self.node = node
1379

    
1380
  def Exec(self, feedback_fn):
1381
    """Removes the node from the cluster.
1382

1383
    """
1384
    node = self.node
1385
    logger.Info("stopping the node daemon and removing configs from node %s" %
1386
                node.name)
1387

    
1388
    self.context.RemoveNode(node.name)
1389

    
1390
    rpc.call_node_leave_cluster(node.name)
1391

    
1392

    
1393
class LUQueryNodes(NoHooksLU):
1394
  """Logical unit for querying nodes.
1395

1396
  """
1397
  _OP_REQP = ["output_fields", "names"]
1398
  REQ_BGL = False
1399

    
1400
  def ExpandNames(self):
1401
    self.dynamic_fields = frozenset([
1402
      "dtotal", "dfree",
1403
      "mtotal", "mnode", "mfree",
1404
      "bootid",
1405
      "ctotal",
1406
      ])
1407

    
1408
    _CheckOutputFields(static=["name", "pinst_cnt", "sinst_cnt",
1409
                               "pinst_list", "sinst_list",
1410
                               "pip", "sip", "tags"],
1411
                       dynamic=self.dynamic_fields,
1412
                       selected=self.op.output_fields)
1413

    
1414
    self.needed_locks = {}
1415
    self.share_locks[locking.LEVEL_NODE] = 1
1416
    # TODO: we could lock nodes only if the user asked for dynamic fields. For
1417
    # that we need atomic ways to get info for a group of nodes from the
1418
    # config, though.
1419
    if not self.op.names:
1420
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1421
    else:
1422
      self.needed_locks[locking.LEVEL_NODE] = \
1423
        _GetWantedNodes(self, self.op.names)
1424

    
1425
  def CheckPrereq(self):
1426
    """Check prerequisites.
1427

1428
    """
1429
    # This of course is valid only if we locked the nodes
1430
    self.wanted = self.acquired_locks[locking.LEVEL_NODE]
1431

    
1432
  def Exec(self, feedback_fn):
1433
    """Computes the list of nodes and their attributes.
1434

1435
    """
1436
    nodenames = self.wanted
1437
    nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1438

    
1439
    # begin data gathering
1440

    
1441
    if self.dynamic_fields.intersection(self.op.output_fields):
1442
      live_data = {}
1443
      node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1444
      for name in nodenames:
1445
        nodeinfo = node_data.get(name, None)
1446
        if nodeinfo:
1447
          live_data[name] = {
1448
            "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1449
            "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1450
            "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1451
            "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1452
            "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1453
            "ctotal": utils.TryConvert(int, nodeinfo['cpu_total']),
1454
            "bootid": nodeinfo['bootid'],
1455
            }
1456
        else:
1457
          live_data[name] = {}
1458
    else:
1459
      live_data = dict.fromkeys(nodenames, {})
1460

    
1461
    node_to_primary = dict([(name, set()) for name in nodenames])
1462
    node_to_secondary = dict([(name, set()) for name in nodenames])
1463

    
1464
    inst_fields = frozenset(("pinst_cnt", "pinst_list",
1465
                             "sinst_cnt", "sinst_list"))
1466
    if inst_fields & frozenset(self.op.output_fields):
1467
      instancelist = self.cfg.GetInstanceList()
1468

    
1469
      for instance_name in instancelist:
1470
        inst = self.cfg.GetInstanceInfo(instance_name)
1471
        if inst.primary_node in node_to_primary:
1472
          node_to_primary[inst.primary_node].add(inst.name)
1473
        for secnode in inst.secondary_nodes:
1474
          if secnode in node_to_secondary:
1475
            node_to_secondary[secnode].add(inst.name)
1476

    
1477
    # end data gathering
1478

    
1479
    output = []
1480
    for node in nodelist:
1481
      node_output = []
1482
      for field in self.op.output_fields:
1483
        if field == "name":
1484
          val = node.name
1485
        elif field == "pinst_list":
1486
          val = list(node_to_primary[node.name])
1487
        elif field == "sinst_list":
1488
          val = list(node_to_secondary[node.name])
1489
        elif field == "pinst_cnt":
1490
          val = len(node_to_primary[node.name])
1491
        elif field == "sinst_cnt":
1492
          val = len(node_to_secondary[node.name])
1493
        elif field == "pip":
1494
          val = node.primary_ip
1495
        elif field == "sip":
1496
          val = node.secondary_ip
1497
        elif field == "tags":
1498
          val = list(node.GetTags())
1499
        elif field in self.dynamic_fields:
1500
          val = live_data[node.name].get(field, None)
1501
        else:
1502
          raise errors.ParameterError(field)
1503
        node_output.append(val)
1504
      output.append(node_output)
1505

    
1506
    return output
1507

    
1508

    
1509
class LUQueryNodeVolumes(NoHooksLU):
1510
  """Logical unit for getting volumes on node(s).
1511

1512
  """
1513
  _OP_REQP = ["nodes", "output_fields"]
1514
  REQ_BGL = False
1515

    
1516
  def ExpandNames(self):
1517
    _CheckOutputFields(static=["node"],
1518
                       dynamic=["phys", "vg", "name", "size", "instance"],
1519
                       selected=self.op.output_fields)
1520

    
1521
    self.needed_locks = {}
1522
    self.share_locks[locking.LEVEL_NODE] = 1
1523
    if not self.op.nodes:
1524
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1525
    else:
1526
      self.needed_locks[locking.LEVEL_NODE] = \
1527
        _GetWantedNodes(self, self.op.nodes)
1528

    
1529
  def CheckPrereq(self):
1530
    """Check prerequisites.
1531

1532
    This checks that the fields required are valid output fields.
1533

1534
    """
1535
    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
1536

    
1537
  def Exec(self, feedback_fn):
1538
    """Computes the list of nodes and their attributes.
1539

1540
    """
1541
    nodenames = self.nodes
1542
    volumes = rpc.call_node_volumes(nodenames)
1543

    
1544
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
1545
             in self.cfg.GetInstanceList()]
1546

    
1547
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1548

    
1549
    output = []
1550
    for node in nodenames:
1551
      if node not in volumes or not volumes[node]:
1552
        continue
1553

    
1554
      node_vols = volumes[node][:]
1555
      node_vols.sort(key=lambda vol: vol['dev'])
1556

    
1557
      for vol in node_vols:
1558
        node_output = []
1559
        for field in self.op.output_fields:
1560
          if field == "node":
1561
            val = node
1562
          elif field == "phys":
1563
            val = vol['dev']
1564
          elif field == "vg":
1565
            val = vol['vg']
1566
          elif field == "name":
1567
            val = vol['name']
1568
          elif field == "size":
1569
            val = int(float(vol['size']))
1570
          elif field == "instance":
1571
            for inst in ilist:
1572
              if node not in lv_by_node[inst]:
1573
                continue
1574
              if vol['name'] in lv_by_node[inst][node]:
1575
                val = inst.name
1576
                break
1577
            else:
1578
              val = '-'
1579
          else:
1580
            raise errors.ParameterError(field)
1581
          node_output.append(str(val))
1582

    
1583
        output.append(node_output)
1584

    
1585
    return output
1586

    
1587

    
1588
class LUAddNode(LogicalUnit):
1589
  """Logical unit for adding node to the cluster.
1590

1591
  """
1592
  HPATH = "node-add"
1593
  HTYPE = constants.HTYPE_NODE
1594
  _OP_REQP = ["node_name"]
1595

    
1596
  def BuildHooksEnv(self):
1597
    """Build hooks env.
1598

1599
    This will run on all nodes before, and on all nodes + the new node after.
1600

1601
    """
1602
    env = {
1603
      "OP_TARGET": self.op.node_name,
1604
      "NODE_NAME": self.op.node_name,
1605
      "NODE_PIP": self.op.primary_ip,
1606
      "NODE_SIP": self.op.secondary_ip,
1607
      }
1608
    nodes_0 = self.cfg.GetNodeList()
1609
    nodes_1 = nodes_0 + [self.op.node_name, ]
1610
    return env, nodes_0, nodes_1
1611

    
1612
  def CheckPrereq(self):
1613
    """Check prerequisites.
1614

1615
    This checks:
1616
     - the new node is not already in the config
1617
     - it is resolvable
1618
     - its parameters (single/dual homed) matches the cluster
1619

1620
    Any errors are signalled by raising errors.OpPrereqError.
1621

1622
    """
1623
    node_name = self.op.node_name
1624
    cfg = self.cfg
1625

    
1626
    dns_data = utils.HostInfo(node_name)
1627

    
1628
    node = dns_data.name
1629
    primary_ip = self.op.primary_ip = dns_data.ip
1630
    secondary_ip = getattr(self.op, "secondary_ip", None)
1631
    if secondary_ip is None:
1632
      secondary_ip = primary_ip
1633
    if not utils.IsValidIP(secondary_ip):
1634
      raise errors.OpPrereqError("Invalid secondary IP given")
1635
    self.op.secondary_ip = secondary_ip
1636

    
1637
    node_list = cfg.GetNodeList()
1638
    if not self.op.readd and node in node_list:
1639
      raise errors.OpPrereqError("Node %s is already in the configuration" %
1640
                                 node)
1641
    elif self.op.readd and node not in node_list:
1642
      raise errors.OpPrereqError("Node %s is not in the configuration" % node)
1643

    
1644
    for existing_node_name in node_list:
1645
      existing_node = cfg.GetNodeInfo(existing_node_name)
1646

    
1647
      if self.op.readd and node == existing_node_name:
1648
        if (existing_node.primary_ip != primary_ip or
1649
            existing_node.secondary_ip != secondary_ip):
1650
          raise errors.OpPrereqError("Readded node doesn't have the same IP"
1651
                                     " address configuration as before")
1652
        continue
1653

    
1654
      if (existing_node.primary_ip == primary_ip or
1655
          existing_node.secondary_ip == primary_ip or
1656
          existing_node.primary_ip == secondary_ip or
1657
          existing_node.secondary_ip == secondary_ip):
1658
        raise errors.OpPrereqError("New node ip address(es) conflict with"
1659
                                   " existing node %s" % existing_node.name)
1660

    
1661
    # check that the type of the node (single versus dual homed) is the
1662
    # same as for the master
1663
    myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1664
    master_singlehomed = myself.secondary_ip == myself.primary_ip
1665
    newbie_singlehomed = secondary_ip == primary_ip
1666
    if master_singlehomed != newbie_singlehomed:
1667
      if master_singlehomed:
1668
        raise errors.OpPrereqError("The master has no private ip but the"
1669
                                   " new node has one")
1670
      else:
1671
        raise errors.OpPrereqError("The master has a private ip but the"
1672
                                   " new node doesn't have one")
1673

    
1674
    # checks reachablity
1675
    if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1676
      raise errors.OpPrereqError("Node not reachable by ping")
1677

    
1678
    if not newbie_singlehomed:
1679
      # check reachability from my secondary ip to newbie's secondary ip
1680
      if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1681
                           source=myself.secondary_ip):
1682
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1683
                                   " based ping to noded port")
1684

    
1685
    self.new_node = objects.Node(name=node,
1686
                                 primary_ip=primary_ip,
1687
                                 secondary_ip=secondary_ip)
1688

    
1689
  def Exec(self, feedback_fn):
1690
    """Adds the new node to the cluster.
1691

1692
    """
1693
    new_node = self.new_node
1694
    node = new_node.name
1695

    
1696
    # check connectivity
1697
    result = rpc.call_version([node])[node]
1698
    if result:
1699
      if constants.PROTOCOL_VERSION == result:
1700
        logger.Info("communication to node %s fine, sw version %s match" %
1701
                    (node, result))
1702
      else:
1703
        raise errors.OpExecError("Version mismatch master version %s,"
1704
                                 " node version %s" %
1705
                                 (constants.PROTOCOL_VERSION, result))
1706
    else:
1707
      raise errors.OpExecError("Cannot get version from the new node")
1708

    
1709
    # setup ssh on node
1710
    logger.Info("copy ssh key to node %s" % node)
1711
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1712
    keyarray = []
1713
    keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1714
                constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1715
                priv_key, pub_key]
1716

    
1717
    for i in keyfiles:
1718
      f = open(i, 'r')
1719
      try:
1720
        keyarray.append(f.read())
1721
      finally:
1722
        f.close()
1723

    
1724
    result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1725
                               keyarray[3], keyarray[4], keyarray[5])
1726

    
1727
    if not result:
1728
      raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1729

    
1730
    # Add node to our /etc/hosts, and add key to known_hosts
1731
    utils.AddHostToEtcHosts(new_node.name)
1732

    
1733
    if new_node.secondary_ip != new_node.primary_ip:
1734
      if not rpc.call_node_tcp_ping(new_node.name,
1735
                                    constants.LOCALHOST_IP_ADDRESS,
1736
                                    new_node.secondary_ip,
1737
                                    constants.DEFAULT_NODED_PORT,
1738
                                    10, False):
1739
        raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1740
                                 " you gave (%s). Please fix and re-run this"
1741
                                 " command." % new_node.secondary_ip)
1742

    
1743
    node_verify_list = [self.sstore.GetMasterNode()]
1744
    node_verify_param = {
1745
      'nodelist': [node],
1746
      # TODO: do a node-net-test as well?
1747
    }
1748

    
1749
    result = rpc.call_node_verify(node_verify_list, node_verify_param)
1750
    for verifier in node_verify_list:
1751
      if not result[verifier]:
1752
        raise errors.OpExecError("Cannot communicate with %s's node daemon"
1753
                                 " for remote verification" % verifier)
1754
      if result[verifier]['nodelist']:
1755
        for failed in result[verifier]['nodelist']:
1756
          feedback_fn("ssh/hostname verification failed %s -> %s" %
1757
                      (verifier, result[verifier]['nodelist'][failed]))
1758
        raise errors.OpExecError("ssh/hostname verification failed.")
1759

    
1760
    # Distribute updated /etc/hosts and known_hosts to all nodes,
1761
    # including the node just added
1762
    myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1763
    dist_nodes = self.cfg.GetNodeList()
1764
    if not self.op.readd:
1765
      dist_nodes.append(node)
1766
    if myself.name in dist_nodes:
1767
      dist_nodes.remove(myself.name)
1768

    
1769
    logger.Debug("Copying hosts and known_hosts to all nodes")
1770
    for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1771
      result = rpc.call_upload_file(dist_nodes, fname)
1772
      for to_node in dist_nodes:
1773
        if not result[to_node]:
1774
          logger.Error("copy of file %s to node %s failed" %
1775
                       (fname, to_node))
1776

    
1777
    to_copy = self.sstore.GetFileList()
1778
    if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1779
      to_copy.append(constants.VNC_PASSWORD_FILE)
1780
    for fname in to_copy:
1781
      result = rpc.call_upload_file([node], fname)
1782
      if not result[node]:
1783
        logger.Error("could not copy file %s to node %s" % (fname, node))
1784

    
1785
    if self.op.readd:
1786
      self.context.ReaddNode(new_node)
1787
    else:
1788
      self.context.AddNode(new_node)
1789

    
1790

    
1791
class LUQueryClusterInfo(NoHooksLU):
1792
  """Query cluster configuration.
1793

1794
  """
1795
  _OP_REQP = []
1796
  REQ_MASTER = False
1797
  REQ_BGL = False
1798

    
1799
  def ExpandNames(self):
1800
    self.needed_locks = {}
1801

    
1802
  def CheckPrereq(self):
1803
    """No prerequsites needed for this LU.
1804

1805
    """
1806
    pass
1807

    
1808
  def Exec(self, feedback_fn):
1809
    """Return cluster config.
1810

1811
    """
1812
    result = {
1813
      "name": self.sstore.GetClusterName(),
1814
      "software_version": constants.RELEASE_VERSION,
1815
      "protocol_version": constants.PROTOCOL_VERSION,
1816
      "config_version": constants.CONFIG_VERSION,
1817
      "os_api_version": constants.OS_API_VERSION,
1818
      "export_version": constants.EXPORT_VERSION,
1819
      "master": self.sstore.GetMasterNode(),
1820
      "architecture": (platform.architecture()[0], platform.machine()),
1821
      "hypervisor_type": self.sstore.GetHypervisorType(),
1822
      }
1823

    
1824
    return result
1825

    
1826

    
1827
class LUDumpClusterConfig(NoHooksLU):
1828
  """Return a text-representation of the cluster-config.
1829

1830
  """
1831
  _OP_REQP = []
1832
  REQ_BGL = False
1833

    
1834
  def ExpandNames(self):
1835
    self.needed_locks = {}
1836

    
1837
  def CheckPrereq(self):
1838
    """No prerequisites.
1839

1840
    """
1841
    pass
1842

    
1843
  def Exec(self, feedback_fn):
1844
    """Dump a representation of the cluster config to the standard output.
1845

1846
    """
1847
    return self.cfg.DumpConfig()
1848

    
1849

    
1850
class LUActivateInstanceDisks(NoHooksLU):
1851
  """Bring up an instance's disks.
1852

1853
  """
1854
  _OP_REQP = ["instance_name"]
1855
  REQ_BGL = False
1856

    
1857
  def ExpandNames(self):
1858
    self._ExpandAndLockInstance()
1859
    self.needed_locks[locking.LEVEL_NODE] = []
1860
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1861

    
1862
  def DeclareLocks(self, level):
1863
    if level == locking.LEVEL_NODE:
1864
      self._LockInstancesNodes()
1865

    
1866
  def CheckPrereq(self):
1867
    """Check prerequisites.
1868

1869
    This checks that the instance is in the cluster.
1870

1871
    """
1872
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
1873
    assert self.instance is not None, \
1874
      "Cannot retrieve locked instance %s" % self.op.instance_name
1875

    
1876
  def Exec(self, feedback_fn):
1877
    """Activate the disks.
1878

1879
    """
1880
    disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
1881
    if not disks_ok:
1882
      raise errors.OpExecError("Cannot activate block devices")
1883

    
1884
    return disks_info
1885

    
1886

    
1887
def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
1888
  """Prepare the block devices for an instance.
1889

1890
  This sets up the block devices on all nodes.
1891

1892
  Args:
1893
    instance: a ganeti.objects.Instance object
1894
    ignore_secondaries: if true, errors on secondary nodes won't result
1895
                        in an error return from the function
1896

1897
  Returns:
1898
    false if the operation failed
1899
    list of (host, instance_visible_name, node_visible_name) if the operation
1900
         suceeded with the mapping from node devices to instance devices
1901
  """
1902
  device_info = []
1903
  disks_ok = True
1904
  iname = instance.name
1905
  # With the two passes mechanism we try to reduce the window of
1906
  # opportunity for the race condition of switching DRBD to primary
1907
  # before handshaking occured, but we do not eliminate it
1908

    
1909
  # The proper fix would be to wait (with some limits) until the
1910
  # connection has been made and drbd transitions from WFConnection
1911
  # into any other network-connected state (Connected, SyncTarget,
1912
  # SyncSource, etc.)
1913

    
1914
  # 1st pass, assemble on all nodes in secondary mode
1915
  for inst_disk in instance.disks:
1916
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1917
      cfg.SetDiskID(node_disk, node)
1918
      result = rpc.call_blockdev_assemble(node, node_disk, iname, False)
1919
      if not result:
1920
        logger.Error("could not prepare block device %s on node %s"
1921
                     " (is_primary=False, pass=1)" % (inst_disk.iv_name, node))
1922
        if not ignore_secondaries:
1923
          disks_ok = False
1924

    
1925
  # FIXME: race condition on drbd migration to primary
1926

    
1927
  # 2nd pass, do only the primary node
1928
  for inst_disk in instance.disks:
1929
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1930
      if node != instance.primary_node:
1931
        continue
1932
      cfg.SetDiskID(node_disk, node)
1933
      result = rpc.call_blockdev_assemble(node, node_disk, iname, True)
1934
      if not result:
1935
        logger.Error("could not prepare block device %s on node %s"
1936
                     " (is_primary=True, pass=2)" % (inst_disk.iv_name, node))
1937
        disks_ok = False
1938
    device_info.append((instance.primary_node, inst_disk.iv_name, result))
1939

    
1940
  # leave the disks configured for the primary node
1941
  # this is a workaround that would be fixed better by
1942
  # improving the logical/physical id handling
1943
  for disk in instance.disks:
1944
    cfg.SetDiskID(disk, instance.primary_node)
1945

    
1946
  return disks_ok, device_info
1947

    
1948

    
1949
def _StartInstanceDisks(cfg, instance, force):
1950
  """Start the disks of an instance.
1951

1952
  """
1953
  disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
1954
                                           ignore_secondaries=force)
1955
  if not disks_ok:
1956
    _ShutdownInstanceDisks(instance, cfg)
1957
    if force is not None and not force:
1958
      logger.Error("If the message above refers to a secondary node,"
1959
                   " you can retry the operation using '--force'.")
1960
    raise errors.OpExecError("Disk consistency error")
1961

    
1962

    
1963
class LUDeactivateInstanceDisks(NoHooksLU):
1964
  """Shutdown an instance's disks.
1965

1966
  """
1967
  _OP_REQP = ["instance_name"]
1968
  REQ_BGL = False
1969

    
1970
  def ExpandNames(self):
1971
    self._ExpandAndLockInstance()
1972
    self.needed_locks[locking.LEVEL_NODE] = []
1973
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1974

    
1975
  def DeclareLocks(self, level):
1976
    if level == locking.LEVEL_NODE:
1977
      self._LockInstancesNodes()
1978

    
1979
  def CheckPrereq(self):
1980
    """Check prerequisites.
1981

1982
    This checks that the instance is in the cluster.
1983

1984
    """
1985
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
1986
    assert self.instance is not None, \
1987
      "Cannot retrieve locked instance %s" % self.op.instance_name
1988

    
1989
  def Exec(self, feedback_fn):
1990
    """Deactivate the disks
1991

1992
    """
1993
    instance = self.instance
1994
    _SafeShutdownInstanceDisks(instance, self.cfg)
1995

    
1996

    
1997
def _SafeShutdownInstanceDisks(instance, cfg):
1998
  """Shutdown block devices of an instance.
1999

2000
  This function checks if an instance is running, before calling
2001
  _ShutdownInstanceDisks.
2002

2003
  """
2004
  ins_l = rpc.call_instance_list([instance.primary_node])
2005
  ins_l = ins_l[instance.primary_node]
2006
  if not type(ins_l) is list:
2007
    raise errors.OpExecError("Can't contact node '%s'" %
2008
                             instance.primary_node)
2009

    
2010
  if instance.name in ins_l:
2011
    raise errors.OpExecError("Instance is running, can't shutdown"
2012
                             " block devices.")
2013

    
2014
  _ShutdownInstanceDisks(instance, cfg)
2015

    
2016

    
2017
def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
2018
  """Shutdown block devices of an instance.
2019

2020
  This does the shutdown on all nodes of the instance.
2021

2022
  If the ignore_primary is false, errors on the primary node are
2023
  ignored.
2024

2025
  """
2026
  result = True
2027
  for disk in instance.disks:
2028
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2029
      cfg.SetDiskID(top_disk, node)
2030
      if not rpc.call_blockdev_shutdown(node, top_disk):
2031
        logger.Error("could not shutdown block device %s on node %s" %
2032
                     (disk.iv_name, node))
2033
        if not ignore_primary or node != instance.primary_node:
2034
          result = False
2035
  return result
2036

    
2037

    
2038
def _CheckNodeFreeMemory(cfg, node, reason, requested):
2039
  """Checks if a node has enough free memory.
2040

2041
  This function check if a given node has the needed amount of free
2042
  memory. In case the node has less memory or we cannot get the
2043
  information from the node, this function raise an OpPrereqError
2044
  exception.
2045

2046
  Args:
2047
    - cfg: a ConfigWriter instance
2048
    - node: the node name
2049
    - reason: string to use in the error message
2050
    - requested: the amount of memory in MiB
2051

2052
  """
2053
  nodeinfo = rpc.call_node_info([node], cfg.GetVGName())
2054
  if not nodeinfo or not isinstance(nodeinfo, dict):
2055
    raise errors.OpPrereqError("Could not contact node %s for resource"
2056
                             " information" % (node,))
2057

    
2058
  free_mem = nodeinfo[node].get('memory_free')
2059
  if not isinstance(free_mem, int):
2060
    raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2061
                             " was '%s'" % (node, free_mem))
2062
  if requested > free_mem:
2063
    raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2064
                             " needed %s MiB, available %s MiB" %
2065
                             (node, reason, requested, free_mem))
2066

    
2067

    
2068
class LUStartupInstance(LogicalUnit):
2069
  """Starts an instance.
2070

2071
  """
2072
  HPATH = "instance-start"
2073
  HTYPE = constants.HTYPE_INSTANCE
2074
  _OP_REQP = ["instance_name", "force"]
2075
  REQ_BGL = False
2076

    
2077
  def ExpandNames(self):
2078
    self._ExpandAndLockInstance()
2079
    self.needed_locks[locking.LEVEL_NODE] = []
2080
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2081

    
2082
  def DeclareLocks(self, level):
2083
    if level == locking.LEVEL_NODE:
2084
      self._LockInstancesNodes()
2085

    
2086
  def BuildHooksEnv(self):
2087
    """Build hooks env.
2088

2089
    This runs on master, primary and secondary nodes of the instance.
2090

2091
    """
2092
    env = {
2093
      "FORCE": self.op.force,
2094
      }
2095
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2096
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2097
          list(self.instance.secondary_nodes))
2098
    return env, nl, nl
2099

    
2100
  def CheckPrereq(self):
2101
    """Check prerequisites.
2102

2103
    This checks that the instance is in the cluster.
2104

2105
    """
2106
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2107
    assert self.instance is not None, \
2108
      "Cannot retrieve locked instance %s" % self.op.instance_name
2109

    
2110
    # check bridges existance
2111
    _CheckInstanceBridgesExist(instance)
2112

    
2113
    _CheckNodeFreeMemory(self.cfg, instance.primary_node,
2114
                         "starting instance %s" % instance.name,
2115
                         instance.memory)
2116

    
2117
  def Exec(self, feedback_fn):
2118
    """Start the instance.
2119

2120
    """
2121
    instance = self.instance
2122
    force = self.op.force
2123
    extra_args = getattr(self.op, "extra_args", "")
2124

    
2125
    self.cfg.MarkInstanceUp(instance.name)
2126

    
2127
    node_current = instance.primary_node
2128

    
2129
    _StartInstanceDisks(self.cfg, instance, force)
2130

    
2131
    if not rpc.call_instance_start(node_current, instance, extra_args):
2132
      _ShutdownInstanceDisks(instance, self.cfg)
2133
      raise errors.OpExecError("Could not start instance")
2134

    
2135

    
2136
class LURebootInstance(LogicalUnit):
2137
  """Reboot an instance.
2138

2139
  """
2140
  HPATH = "instance-reboot"
2141
  HTYPE = constants.HTYPE_INSTANCE
2142
  _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2143
  REQ_BGL = False
2144

    
2145
  def ExpandNames(self):
2146
    if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2147
                                   constants.INSTANCE_REBOOT_HARD,
2148
                                   constants.INSTANCE_REBOOT_FULL]:
2149
      raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2150
                                  (constants.INSTANCE_REBOOT_SOFT,
2151
                                   constants.INSTANCE_REBOOT_HARD,
2152
                                   constants.INSTANCE_REBOOT_FULL))
2153
    self._ExpandAndLockInstance()
2154
    self.needed_locks[locking.LEVEL_NODE] = []
2155
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2156

    
2157
  def DeclareLocks(self, level):
2158
    if level == locking.LEVEL_NODE:
2159
      primary_only = not constants.INSTANCE_REBOOT_FULL
2160
      self._LockInstancesNodes(primary_only=primary_only)
2161

    
2162
  def BuildHooksEnv(self):
2163
    """Build hooks env.
2164

2165
    This runs on master, primary and secondary nodes of the instance.
2166

2167
    """
2168
    env = {
2169
      "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2170
      }
2171
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2172
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2173
          list(self.instance.secondary_nodes))
2174
    return env, nl, nl
2175

    
2176
  def CheckPrereq(self):
2177
    """Check prerequisites.
2178

2179
    This checks that the instance is in the cluster.
2180

2181
    """
2182
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2183
    assert self.instance is not None, \
2184
      "Cannot retrieve locked instance %s" % self.op.instance_name
2185

    
2186
    # check bridges existance
2187
    _CheckInstanceBridgesExist(instance)
2188

    
2189
  def Exec(self, feedback_fn):
2190
    """Reboot the instance.
2191

2192
    """
2193
    instance = self.instance
2194
    ignore_secondaries = self.op.ignore_secondaries
2195
    reboot_type = self.op.reboot_type
2196
    extra_args = getattr(self.op, "extra_args", "")
2197

    
2198
    node_current = instance.primary_node
2199

    
2200
    if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2201
                       constants.INSTANCE_REBOOT_HARD]:
2202
      if not rpc.call_instance_reboot(node_current, instance,
2203
                                      reboot_type, extra_args):
2204
        raise errors.OpExecError("Could not reboot instance")
2205
    else:
2206
      if not rpc.call_instance_shutdown(node_current, instance):
2207
        raise errors.OpExecError("could not shutdown instance for full reboot")
2208
      _ShutdownInstanceDisks(instance, self.cfg)
2209
      _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
2210
      if not rpc.call_instance_start(node_current, instance, extra_args):
2211
        _ShutdownInstanceDisks(instance, self.cfg)
2212
        raise errors.OpExecError("Could not start instance for full reboot")
2213

    
2214
    self.cfg.MarkInstanceUp(instance.name)
2215

    
2216

    
2217
class LUShutdownInstance(LogicalUnit):
2218
  """Shutdown an instance.
2219

2220
  """
2221
  HPATH = "instance-stop"
2222
  HTYPE = constants.HTYPE_INSTANCE
2223
  _OP_REQP = ["instance_name"]
2224
  REQ_BGL = False
2225

    
2226
  def ExpandNames(self):
2227
    self._ExpandAndLockInstance()
2228
    self.needed_locks[locking.LEVEL_NODE] = []
2229
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2230

    
2231
  def DeclareLocks(self, level):
2232
    if level == locking.LEVEL_NODE:
2233
      self._LockInstancesNodes()
2234

    
2235
  def BuildHooksEnv(self):
2236
    """Build hooks env.
2237

2238
    This runs on master, primary and secondary nodes of the instance.
2239

2240
    """
2241
    env = _BuildInstanceHookEnvByObject(self.instance)
2242
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2243
          list(self.instance.secondary_nodes))
2244
    return env, nl, nl
2245

    
2246
  def CheckPrereq(self):
2247
    """Check prerequisites.
2248

2249
    This checks that the instance is in the cluster.
2250

2251
    """
2252
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2253
    assert self.instance is not None, \
2254
      "Cannot retrieve locked instance %s" % self.op.instance_name
2255

    
2256
  def Exec(self, feedback_fn):
2257
    """Shutdown the instance.
2258

2259
    """
2260
    instance = self.instance
2261
    node_current = instance.primary_node
2262
    self.cfg.MarkInstanceDown(instance.name)
2263
    if not rpc.call_instance_shutdown(node_current, instance):
2264
      logger.Error("could not shutdown instance")
2265

    
2266
    _ShutdownInstanceDisks(instance, self.cfg)
2267

    
2268

    
2269
class LUReinstallInstance(LogicalUnit):
2270
  """Reinstall an instance.
2271

2272
  """
2273
  HPATH = "instance-reinstall"
2274
  HTYPE = constants.HTYPE_INSTANCE
2275
  _OP_REQP = ["instance_name"]
2276
  REQ_BGL = False
2277

    
2278
  def ExpandNames(self):
2279
    self._ExpandAndLockInstance()
2280
    self.needed_locks[locking.LEVEL_NODE] = []
2281
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2282

    
2283
  def DeclareLocks(self, level):
2284
    if level == locking.LEVEL_NODE:
2285
      self._LockInstancesNodes()
2286

    
2287
  def BuildHooksEnv(self):
2288
    """Build hooks env.
2289

2290
    This runs on master, primary and secondary nodes of the instance.
2291

2292
    """
2293
    env = _BuildInstanceHookEnvByObject(self.instance)
2294
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2295
          list(self.instance.secondary_nodes))
2296
    return env, nl, nl
2297

    
2298
  def CheckPrereq(self):
2299
    """Check prerequisites.
2300

2301
    This checks that the instance is in the cluster and is not running.
2302

2303
    """
2304
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2305
    assert instance is not None, \
2306
      "Cannot retrieve locked instance %s" % self.op.instance_name
2307

    
2308
    if instance.disk_template == constants.DT_DISKLESS:
2309
      raise errors.OpPrereqError("Instance '%s' has no disks" %
2310
                                 self.op.instance_name)
2311
    if instance.status != "down":
2312
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2313
                                 self.op.instance_name)
2314
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2315
    if remote_info:
2316
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2317
                                 (self.op.instance_name,
2318
                                  instance.primary_node))
2319

    
2320
    self.op.os_type = getattr(self.op, "os_type", None)
2321
    if self.op.os_type is not None:
2322
      # OS verification
2323
      pnode = self.cfg.GetNodeInfo(
2324
        self.cfg.ExpandNodeName(instance.primary_node))
2325
      if pnode is None:
2326
        raise errors.OpPrereqError("Primary node '%s' is unknown" %
2327
                                   self.op.pnode)
2328
      os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2329
      if not os_obj:
2330
        raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2331
                                   " primary node"  % self.op.os_type)
2332

    
2333
    self.instance = instance
2334

    
2335
  def Exec(self, feedback_fn):
2336
    """Reinstall the instance.
2337

2338
    """
2339
    inst = self.instance
2340

    
2341
    if self.op.os_type is not None:
2342
      feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2343
      inst.os = self.op.os_type
2344
      self.cfg.AddInstance(inst)
2345

    
2346
    _StartInstanceDisks(self.cfg, inst, None)
2347
    try:
2348
      feedback_fn("Running the instance OS create scripts...")
2349
      if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2350
        raise errors.OpExecError("Could not install OS for instance %s"
2351
                                 " on node %s" %
2352
                                 (inst.name, inst.primary_node))
2353
    finally:
2354
      _ShutdownInstanceDisks(inst, self.cfg)
2355

    
2356

    
2357
class LURenameInstance(LogicalUnit):
2358
  """Rename an instance.
2359

2360
  """
2361
  HPATH = "instance-rename"
2362
  HTYPE = constants.HTYPE_INSTANCE
2363
  _OP_REQP = ["instance_name", "new_name"]
2364

    
2365
  def BuildHooksEnv(self):
2366
    """Build hooks env.
2367

2368
    This runs on master, primary and secondary nodes of the instance.
2369

2370
    """
2371
    env = _BuildInstanceHookEnvByObject(self.instance)
2372
    env["INSTANCE_NEW_NAME"] = self.op.new_name
2373
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2374
          list(self.instance.secondary_nodes))
2375
    return env, nl, nl
2376

    
2377
  def CheckPrereq(self):
2378
    """Check prerequisites.
2379

2380
    This checks that the instance is in the cluster and is not running.
2381

2382
    """
2383
    instance = self.cfg.GetInstanceInfo(
2384
      self.cfg.ExpandInstanceName(self.op.instance_name))
2385
    if instance is None:
2386
      raise errors.OpPrereqError("Instance '%s' not known" %
2387
                                 self.op.instance_name)
2388
    if instance.status != "down":
2389
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2390
                                 self.op.instance_name)
2391
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2392
    if remote_info:
2393
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2394
                                 (self.op.instance_name,
2395
                                  instance.primary_node))
2396
    self.instance = instance
2397

    
2398
    # new name verification
2399
    name_info = utils.HostInfo(self.op.new_name)
2400

    
2401
    self.op.new_name = new_name = name_info.name
2402
    instance_list = self.cfg.GetInstanceList()
2403
    if new_name in instance_list:
2404
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2405
                                 new_name)
2406

    
2407
    if not getattr(self.op, "ignore_ip", False):
2408
      if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
2409
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
2410
                                   (name_info.ip, new_name))
2411

    
2412

    
2413
  def Exec(self, feedback_fn):
2414
    """Reinstall the instance.
2415

2416
    """
2417
    inst = self.instance
2418
    old_name = inst.name
2419

    
2420
    if inst.disk_template == constants.DT_FILE:
2421
      old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2422

    
2423
    self.cfg.RenameInstance(inst.name, self.op.new_name)
2424
    # Change the instance lock. This is definitely safe while we hold the BGL
2425
    self.context.glm.remove(locking.LEVEL_INSTANCE, inst.name)
2426
    self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
2427

    
2428
    # re-read the instance from the configuration after rename
2429
    inst = self.cfg.GetInstanceInfo(self.op.new_name)
2430

    
2431
    if inst.disk_template == constants.DT_FILE:
2432
      new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2433
      result = rpc.call_file_storage_dir_rename(inst.primary_node,
2434
                                                old_file_storage_dir,
2435
                                                new_file_storage_dir)
2436

    
2437
      if not result:
2438
        raise errors.OpExecError("Could not connect to node '%s' to rename"
2439
                                 " directory '%s' to '%s' (but the instance"
2440
                                 " has been renamed in Ganeti)" % (
2441
                                 inst.primary_node, old_file_storage_dir,
2442
                                 new_file_storage_dir))
2443

    
2444
      if not result[0]:
2445
        raise errors.OpExecError("Could not rename directory '%s' to '%s'"
2446
                                 " (but the instance has been renamed in"
2447
                                 " Ganeti)" % (old_file_storage_dir,
2448
                                               new_file_storage_dir))
2449

    
2450
    _StartInstanceDisks(self.cfg, inst, None)
2451
    try:
2452
      if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2453
                                          "sda", "sdb"):
2454
        msg = ("Could not run OS rename script for instance %s on node %s"
2455
               " (but the instance has been renamed in Ganeti)" %
2456
               (inst.name, inst.primary_node))
2457
        logger.Error(msg)
2458
    finally:
2459
      _ShutdownInstanceDisks(inst, self.cfg)
2460

    
2461

    
2462
class LURemoveInstance(LogicalUnit):
2463
  """Remove an instance.
2464

2465
  """
2466
  HPATH = "instance-remove"
2467
  HTYPE = constants.HTYPE_INSTANCE
2468
  _OP_REQP = ["instance_name", "ignore_failures"]
2469

    
2470
  def BuildHooksEnv(self):
2471
    """Build hooks env.
2472

2473
    This runs on master, primary and secondary nodes of the instance.
2474

2475
    """
2476
    env = _BuildInstanceHookEnvByObject(self.instance)
2477
    nl = [self.sstore.GetMasterNode()]
2478
    return env, nl, nl
2479

    
2480
  def CheckPrereq(self):
2481
    """Check prerequisites.
2482

2483
    This checks that the instance is in the cluster.
2484

2485
    """
2486
    instance = self.cfg.GetInstanceInfo(
2487
      self.cfg.ExpandInstanceName(self.op.instance_name))
2488
    if instance is None:
2489
      raise errors.OpPrereqError("Instance '%s' not known" %
2490
                                 self.op.instance_name)
2491
    self.instance = instance
2492

    
2493
  def Exec(self, feedback_fn):
2494
    """Remove the instance.
2495

2496
    """
2497
    instance = self.instance
2498
    logger.Info("shutting down instance %s on node %s" %
2499
                (instance.name, instance.primary_node))
2500

    
2501
    if not rpc.call_instance_shutdown(instance.primary_node, instance):
2502
      if self.op.ignore_failures:
2503
        feedback_fn("Warning: can't shutdown instance")
2504
      else:
2505
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2506
                                 (instance.name, instance.primary_node))
2507

    
2508
    logger.Info("removing block devices for instance %s" % instance.name)
2509

    
2510
    if not _RemoveDisks(instance, self.cfg):
2511
      if self.op.ignore_failures:
2512
        feedback_fn("Warning: can't remove instance's disks")
2513
      else:
2514
        raise errors.OpExecError("Can't remove instance's disks")
2515

    
2516
    logger.Info("removing instance %s out of cluster config" % instance.name)
2517

    
2518
    self.cfg.RemoveInstance(instance.name)
2519
    # Remove the new instance from the Ganeti Lock Manager
2520
    self.context.glm.remove(locking.LEVEL_INSTANCE, instance.name)
2521

    
2522

    
2523
class LUQueryInstances(NoHooksLU):
2524
  """Logical unit for querying instances.
2525

2526
  """
2527
  _OP_REQP = ["output_fields", "names"]
2528
  REQ_BGL = False
2529

    
2530
  def ExpandNames(self):
2531
    self.dynamic_fields = frozenset(["oper_state", "oper_ram", "status"])
2532
    _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2533
                               "admin_state", "admin_ram",
2534
                               "disk_template", "ip", "mac", "bridge",
2535
                               "sda_size", "sdb_size", "vcpus", "tags",
2536
                               "auto_balance",
2537
                               "network_port", "kernel_path", "initrd_path",
2538
                               "hvm_boot_order", "hvm_acpi", "hvm_pae",
2539
                               "hvm_cdrom_image_path", "hvm_nic_type",
2540
                               "hvm_disk_type", "vnc_bind_address"],
2541
                       dynamic=self.dynamic_fields,
2542
                       selected=self.op.output_fields)
2543

    
2544
    self.needed_locks = {}
2545
    self.share_locks[locking.LEVEL_INSTANCE] = 1
2546
    self.share_locks[locking.LEVEL_NODE] = 1
2547

    
2548
    # TODO: we could lock instances (and nodes) only if the user asked for
2549
    # dynamic fields. For that we need atomic ways to get info for a group of
2550
    # instances from the config, though.
2551
    if not self.op.names:
2552
      self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
2553
    else:
2554
      self.needed_locks[locking.LEVEL_INSTANCE] = \
2555
        _GetWantedInstances(self, self.op.names)
2556

    
2557
    self.needed_locks[locking.LEVEL_NODE] = []
2558
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2559

    
2560
  def DeclareLocks(self, level):
2561
    # TODO: locking of nodes could be avoided when not querying them
2562
    if level == locking.LEVEL_NODE:
2563
      self._LockInstancesNodes()
2564

    
2565
  def CheckPrereq(self):
2566
    """Check prerequisites.
2567

2568
    """
2569
    # This of course is valid only if we locked the instances
2570
    self.wanted = self.acquired_locks[locking.LEVEL_INSTANCE]
2571

    
2572
  def Exec(self, feedback_fn):
2573
    """Computes the list of nodes and their attributes.
2574

2575
    """
2576
    instance_names = self.wanted
2577
    instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2578
                     in instance_names]
2579

    
2580
    # begin data gathering
2581

    
2582
    nodes = frozenset([inst.primary_node for inst in instance_list])
2583

    
2584
    bad_nodes = []
2585
    if self.dynamic_fields.intersection(self.op.output_fields):
2586
      live_data = {}
2587
      node_data = rpc.call_all_instances_info(nodes)
2588
      for name in nodes:
2589
        result = node_data[name]
2590
        if result:
2591
          live_data.update(result)
2592
        elif result == False:
2593
          bad_nodes.append(name)
2594
        # else no instance is alive
2595
    else:
2596
      live_data = dict([(name, {}) for name in instance_names])
2597

    
2598
    # end data gathering
2599

    
2600
    output = []
2601
    for instance in instance_list:
2602
      iout = []
2603
      for field in self.op.output_fields:
2604
        if field == "name":
2605
          val = instance.name
2606
        elif field == "os":
2607
          val = instance.os
2608
        elif field == "pnode":
2609
          val = instance.primary_node
2610
        elif field == "snodes":
2611
          val = list(instance.secondary_nodes)
2612
        elif field == "admin_state":
2613
          val = (instance.status != "down")
2614
        elif field == "oper_state":
2615
          if instance.primary_node in bad_nodes:
2616
            val = None
2617
          else:
2618
            val = bool(live_data.get(instance.name))
2619
        elif field == "status":
2620
          if instance.primary_node in bad_nodes:
2621
            val = "ERROR_nodedown"
2622
          else:
2623
            running = bool(live_data.get(instance.name))
2624
            if running:
2625
              if instance.status != "down":
2626
                val = "running"
2627
              else:
2628
                val = "ERROR_up"
2629
            else:
2630
              if instance.status != "down":
2631
                val = "ERROR_down"
2632
              else:
2633
                val = "ADMIN_down"
2634
        elif field == "admin_ram":
2635
          val = instance.memory
2636
        elif field == "oper_ram":
2637
          if instance.primary_node in bad_nodes:
2638
            val = None
2639
          elif instance.name in live_data:
2640
            val = live_data[instance.name].get("memory", "?")
2641
          else:
2642
            val = "-"
2643
        elif field == "disk_template":
2644
          val = instance.disk_template
2645
        elif field == "ip":
2646
          val = instance.nics[0].ip
2647
        elif field == "bridge":
2648
          val = instance.nics[0].bridge
2649
        elif field == "mac":
2650
          val = instance.nics[0].mac
2651
        elif field == "sda_size" or field == "sdb_size":
2652
          disk = instance.FindDisk(field[:3])
2653
          if disk is None:
2654
            val = None
2655
          else:
2656
            val = disk.size
2657
        elif field == "vcpus":
2658
          val = instance.vcpus
2659
        elif field == "tags":
2660
          val = list(instance.GetTags())
2661
        elif field in ("network_port", "kernel_path", "initrd_path",
2662
                       "hvm_boot_order", "hvm_acpi", "hvm_pae",
2663
                       "hvm_cdrom_image_path", "hvm_nic_type",
2664
                       "hvm_disk_type", "vnc_bind_address"):
2665
          val = getattr(instance, field, None)
2666
          if val is not None:
2667
            pass
2668
          elif field in ("hvm_nic_type", "hvm_disk_type",
2669
                         "kernel_path", "initrd_path"):
2670
            val = "default"
2671
          else:
2672
            val = "-"
2673
        else:
2674
          raise errors.ParameterError(field)
2675
        iout.append(val)
2676
      output.append(iout)
2677

    
2678
    return output
2679

    
2680

    
2681
class LUFailoverInstance(LogicalUnit):
2682
  """Failover an instance.
2683

2684
  """
2685
  HPATH = "instance-failover"
2686
  HTYPE = constants.HTYPE_INSTANCE
2687
  _OP_REQP = ["instance_name", "ignore_consistency"]
2688
  REQ_BGL = False
2689

    
2690
  def ExpandNames(self):
2691
    self._ExpandAndLockInstance()
2692
    self.needed_locks[locking.LEVEL_NODE] = []
2693
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2694

    
2695
  def DeclareLocks(self, level):
2696
    if level == locking.LEVEL_NODE:
2697
      self._LockInstancesNodes()
2698

    
2699
  def BuildHooksEnv(self):
2700
    """Build hooks env.
2701

2702
    This runs on master, primary and secondary nodes of the instance.
2703

2704
    """
2705
    env = {
2706
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2707
      }
2708
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2709
    nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2710
    return env, nl, nl
2711

    
2712
  def CheckPrereq(self):
2713
    """Check prerequisites.
2714

2715
    This checks that the instance is in the cluster.
2716

2717
    """
2718
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2719
    assert self.instance is not None, \
2720
      "Cannot retrieve locked instance %s" % self.op.instance_name
2721

    
2722
    if instance.disk_template not in constants.DTS_NET_MIRROR:
2723
      raise errors.OpPrereqError("Instance's disk layout is not"
2724
                                 " network mirrored, cannot failover.")
2725

    
2726
    secondary_nodes = instance.secondary_nodes
2727
    if not secondary_nodes:
2728
      raise errors.ProgrammerError("no secondary node but using "
2729
                                   "a mirrored disk template")
2730

    
2731
    target_node = secondary_nodes[0]
2732
    # check memory requirements on the secondary node
2733
    _CheckNodeFreeMemory(self.cfg, target_node, "failing over instance %s" %
2734
                         instance.name, instance.memory)
2735

    
2736
    # check bridge existance
2737
    brlist = [nic.bridge for nic in instance.nics]
2738
    if not rpc.call_bridges_exist(target_node, brlist):
2739
      raise errors.OpPrereqError("One or more target bridges %s does not"
2740
                                 " exist on destination node '%s'" %
2741
                                 (brlist, target_node))
2742

    
2743
  def Exec(self, feedback_fn):
2744
    """Failover an instance.
2745

2746
    The failover is done by shutting it down on its present node and
2747
    starting it on the secondary.
2748

2749
    """
2750
    instance = self.instance
2751

    
2752
    source_node = instance.primary_node
2753
    target_node = instance.secondary_nodes[0]
2754

    
2755
    feedback_fn("* checking disk consistency between source and target")
2756
    for dev in instance.disks:
2757
      # for drbd, these are drbd over lvm
2758
      if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2759
        if instance.status == "up" and not self.op.ignore_consistency:
2760
          raise errors.OpExecError("Disk %s is degraded on target node,"
2761
                                   " aborting failover." % dev.iv_name)
2762

    
2763
    feedback_fn("* shutting down instance on source node")
2764
    logger.Info("Shutting down instance %s on node %s" %
2765
                (instance.name, source_node))
2766

    
2767
    if not rpc.call_instance_shutdown(source_node, instance):
2768
      if self.op.ignore_consistency:
2769
        logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2770
                     " anyway. Please make sure node %s is down"  %
2771
                     (instance.name, source_node, source_node))
2772
      else:
2773
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2774
                                 (instance.name, source_node))
2775

    
2776
    feedback_fn("* deactivating the instance's disks on source node")
2777
    if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2778
      raise errors.OpExecError("Can't shut down the instance's disks.")
2779

    
2780
    instance.primary_node = target_node
2781
    # distribute new instance config to the other nodes
2782
    self.cfg.Update(instance)
2783

    
2784
    # Only start the instance if it's marked as up
2785
    if instance.status == "up":
2786
      feedback_fn("* activating the instance's disks on target node")
2787
      logger.Info("Starting instance %s on node %s" %
2788
                  (instance.name, target_node))
2789

    
2790
      disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2791
                                               ignore_secondaries=True)
2792
      if not disks_ok:
2793
        _ShutdownInstanceDisks(instance, self.cfg)
2794
        raise errors.OpExecError("Can't activate the instance's disks")
2795

    
2796
      feedback_fn("* starting the instance on the target node")
2797
      if not rpc.call_instance_start(target_node, instance, None):
2798
        _ShutdownInstanceDisks(instance, self.cfg)
2799
        raise errors.OpExecError("Could not start instance %s on node %s." %
2800
                                 (instance.name, target_node))
2801

    
2802

    
2803
def _CreateBlockDevOnPrimary(cfg, node, instance, device, info):
2804
  """Create a tree of block devices on the primary node.
2805

2806
  This always creates all devices.
2807

2808
  """
2809
  if device.children:
2810
    for child in device.children:
2811
      if not _CreateBlockDevOnPrimary(cfg, node, instance, child, info):
2812
        return False
2813

    
2814
  cfg.SetDiskID(device, node)
2815
  new_id = rpc.call_blockdev_create(node, device, device.size,
2816
                                    instance.name, True, info)
2817
  if not new_id:
2818
    return False
2819
  if device.physical_id is None:
2820
    device.physical_id = new_id
2821
  return True
2822

    
2823

    
2824
def _CreateBlockDevOnSecondary(cfg, node, instance, device, force, info):
2825
  """Create a tree of block devices on a secondary node.
2826

2827
  If this device type has to be created on secondaries, create it and
2828
  all its children.
2829

2830
  If not, just recurse to children keeping the same 'force' value.
2831

2832
  """
2833
  if device.CreateOnSecondary():
2834
    force = True
2835
  if device.children:
2836
    for child in device.children:
2837
      if not _CreateBlockDevOnSecondary(cfg, node, instance,
2838
                                        child, force, info):
2839
        return False
2840

    
2841
  if not force:
2842
    return True
2843
  cfg.SetDiskID(device, node)
2844
  new_id = rpc.call_blockdev_create(node, device, device.size,
2845
                                    instance.name, False, info)
2846
  if not new_id:
2847
    return False
2848
  if device.physical_id is None:
2849
    device.physical_id = new_id
2850
  return True
2851

    
2852

    
2853
def _GenerateUniqueNames(cfg, exts):
2854
  """Generate a suitable LV name.
2855

2856
  This will generate a logical volume name for the given instance.
2857

2858
  """
2859
  results = []
2860
  for val in exts:
2861
    new_id = cfg.GenerateUniqueID()
2862
    results.append("%s%s" % (new_id, val))
2863
  return results
2864

    
2865

    
2866
def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name):
2867
  """Generate a drbd8 device complete with its children.
2868

2869
  """
2870
  port = cfg.AllocatePort()
2871
  vgname = cfg.GetVGName()
2872
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2873
                          logical_id=(vgname, names[0]))
2874
  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2875
                          logical_id=(vgname, names[1]))
2876
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
2877
                          logical_id = (primary, secondary, port),
2878
                          children = [dev_data, dev_meta],
2879
                          iv_name=iv_name)
2880
  return drbd_dev
2881

    
2882

    
2883
def _GenerateDiskTemplate(cfg, template_name,
2884
                          instance_name, primary_node,
2885
                          secondary_nodes, disk_sz, swap_sz,
2886
                          file_storage_dir, file_driver):
2887
  """Generate the entire disk layout for a given template type.
2888

2889
  """
2890
  #TODO: compute space requirements
2891

    
2892
  vgname = cfg.GetVGName()
2893
  if template_name == constants.DT_DISKLESS:
2894
    disks = []
2895
  elif template_name == constants.DT_PLAIN:
2896
    if len(secondary_nodes) != 0:
2897
      raise errors.ProgrammerError("Wrong template configuration")
2898

    
2899
    names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
2900
    sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2901
                           logical_id=(vgname, names[0]),
2902
                           iv_name = "sda")
2903
    sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2904
                           logical_id=(vgname, names[1]),
2905
                           iv_name = "sdb")
2906
    disks = [sda_dev, sdb_dev]
2907
  elif template_name == constants.DT_DRBD8:
2908
    if len(secondary_nodes) != 1:
2909
      raise errors.ProgrammerError("Wrong template configuration")
2910
    remote_node = secondary_nodes[0]
2911
    names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2912
                                       ".sdb_data", ".sdb_meta"])
2913
    drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2914
                                         disk_sz, names[0:2], "sda")
2915
    drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2916
                                         swap_sz, names[2:4], "sdb")
2917
    disks = [drbd_sda_dev, drbd_sdb_dev]
2918
  elif template_name == constants.DT_FILE:
2919
    if len(secondary_nodes) != 0:
2920
      raise errors.ProgrammerError("Wrong template configuration")
2921

    
2922
    file_sda_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk_sz,
2923
                                iv_name="sda", logical_id=(file_driver,
2924
                                "%s/sda" % file_storage_dir))
2925
    file_sdb_dev = objects.Disk(dev_type=constants.LD_FILE, size=swap_sz,
2926
                                iv_name="sdb", logical_id=(file_driver,
2927
                                "%s/sdb" % file_storage_dir))
2928
    disks = [file_sda_dev, file_sdb_dev]
2929
  else:
2930
    raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
2931
  return disks
2932

    
2933

    
2934
def _GetInstanceInfoText(instance):
2935
  """Compute that text that should be added to the disk's metadata.
2936

2937
  """
2938
  return "originstname+%s" % instance.name
2939

    
2940

    
2941
def _CreateDisks(cfg, instance):
2942
  """Create all disks for an instance.
2943

2944
  This abstracts away some work from AddInstance.
2945

2946
  Args:
2947
    instance: the instance object
2948

2949
  Returns:
2950
    True or False showing the success of the creation process
2951

2952
  """
2953
  info = _GetInstanceInfoText(instance)
2954

    
2955
  if instance.disk_template == constants.DT_FILE:
2956
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
2957
    result = rpc.call_file_storage_dir_create(instance.primary_node,
2958
                                              file_storage_dir)
2959

    
2960
    if not result:
2961
      logger.Error("Could not connect to node '%s'" % instance.primary_node)
2962
      return False
2963

    
2964
    if not result[0]:
2965
      logger.Error("failed to create directory '%s'" % file_storage_dir)
2966
      return False
2967

    
2968
  for device in instance.disks:
2969
    logger.Info("creating volume %s for instance %s" %
2970
                (device.iv_name, instance.name))
2971
    #HARDCODE
2972
    for secondary_node in instance.secondary_nodes:
2973
      if not _CreateBlockDevOnSecondary(cfg, secondary_node, instance,
2974
                                        device, False, info):
2975
        logger.Error("failed to create volume %s (%s) on secondary node %s!" %
2976
                     (device.iv_name, device, secondary_node))
2977
        return False
2978
    #HARDCODE
2979
    if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
2980
                                    instance, device, info):
2981
      logger.Error("failed to create volume %s on primary!" %
2982
                   device.iv_name)
2983
      return False
2984

    
2985
  return True
2986

    
2987

    
2988
def _RemoveDisks(instance, cfg):
2989
  """Remove all disks for an instance.
2990

2991
  This abstracts away some work from `AddInstance()` and
2992
  `RemoveInstance()`. Note that in case some of the devices couldn't
2993
  be removed, the removal will continue with the other ones (compare
2994
  with `_CreateDisks()`).
2995

2996
  Args:
2997
    instance: the instance object
2998

2999
  Returns:
3000
    True or False showing the success of the removal proces
3001

3002
  """
3003
  logger.Info("removing block devices for instance %s" % instance.name)
3004

    
3005
  result = True
3006
  for device in instance.disks:
3007
    for node, disk in device.ComputeNodeTree(instance.primary_node):
3008
      cfg.SetDiskID(disk, node)
3009
      if not rpc.call_blockdev_remove(node, disk):
3010
        logger.Error("could not remove block device %s on node %s,"
3011
                     " continuing anyway" %
3012
                     (device.iv_name, node))
3013
        result = False
3014

    
3015
  if instance.disk_template == constants.DT_FILE:
3016
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3017
    if not rpc.call_file_storage_dir_remove(instance.primary_node,
3018
                                            file_storage_dir):
3019
      logger.Error("could not remove directory '%s'" % file_storage_dir)
3020
      result = False
3021

    
3022
  return result
3023

    
3024

    
3025
def _ComputeDiskSize(disk_template, disk_size, swap_size):
3026
  """Compute disk size requirements in the volume group
3027

3028
  This is currently hard-coded for the two-drive layout.
3029

3030
  """
3031
  # Required free disk space as a function of disk and swap space
3032
  req_size_dict = {
3033
    constants.DT_DISKLESS: None,
3034
    constants.DT_PLAIN: disk_size + swap_size,
3035
    # 256 MB are added for drbd metadata, 128MB for each drbd device
3036
    constants.DT_DRBD8: disk_size + swap_size + 256,
3037
    constants.DT_FILE: None,
3038
  }
3039

    
3040
  if disk_template not in req_size_dict:
3041
    raise errors.ProgrammerError("Disk template '%s' size requirement"
3042
                                 " is unknown" %  disk_template)
3043

    
3044
  return req_size_dict[disk_template]
3045

    
3046

    
3047
class LUCreateInstance(LogicalUnit):
3048
  """Create an instance.
3049

3050
  """
3051
  HPATH = "instance-add"
3052
  HTYPE = constants.HTYPE_INSTANCE
3053
  _OP_REQP = ["instance_name", "mem_size", "disk_size",
3054
              "disk_template", "swap_size", "mode", "start", "vcpus",
3055
              "wait_for_sync", "ip_check", "mac"]
3056

    
3057
  def _RunAllocator(self):
3058
    """Run the allocator based on input opcode.
3059

3060
    """
3061
    disks = [{"size": self.op.disk_size, "mode": "w"},
3062
             {"size": self.op.swap_size, "mode": "w"}]
3063
    nics = [{"mac": self.op.mac, "ip": getattr(self.op, "ip", None),
3064
             "bridge": self.op.bridge}]
3065
    ial = IAllocator(self.cfg, self.sstore,
3066
                     mode=constants.IALLOCATOR_MODE_ALLOC,
3067
                     name=self.op.instance_name,
3068
                     disk_template=self.op.disk_template,
3069
                     tags=[],
3070
                     os=self.op.os_type,
3071
                     vcpus=self.op.vcpus,
3072
                     mem_size=self.op.mem_size,
3073
                     disks=disks,
3074
                     nics=nics,
3075
                     )
3076

    
3077
    ial.Run(self.op.iallocator)
3078

    
3079
    if not ial.success:
3080
      raise errors.OpPrereqError("Can't compute nodes using"
3081
                                 " iallocator '%s': %s" % (self.op.iallocator,
3082
                                                           ial.info))
3083
    if len(ial.nodes) != ial.required_nodes:
3084
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3085
                                 " of nodes (%s), required %s" %
3086
                                 (len(ial.nodes), ial.required_nodes))
3087
    self.op.pnode = ial.nodes[0]
3088
    logger.ToStdout("Selected nodes for the instance: %s" %
3089
                    (", ".join(ial.nodes),))
3090
    logger.Info("Selected nodes for instance %s via iallocator %s: %s" %
3091
                (self.op.instance_name, self.op.iallocator, ial.nodes))
3092
    if ial.required_nodes == 2:
3093
      self.op.snode = ial.nodes[1]
3094

    
3095
  def BuildHooksEnv(self):
3096
    """Build hooks env.
3097

3098
    This runs on master, primary and secondary nodes of the instance.
3099

3100
    """
3101
    env = {
3102
      "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
3103
      "INSTANCE_DISK_SIZE": self.op.disk_size,
3104
      "INSTANCE_SWAP_SIZE": self.op.swap_size,
3105
      "INSTANCE_ADD_MODE": self.op.mode,
3106
      }
3107
    if self.op.mode == constants.INSTANCE_IMPORT:
3108
      env["INSTANCE_SRC_NODE"] = self.op.src_node
3109
      env["INSTANCE_SRC_PATH"] = self.op.src_path
3110
      env["INSTANCE_SRC_IMAGE"] = self.src_image
3111

    
3112
    env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
3113
      primary_node=self.op.pnode,
3114
      secondary_nodes=self.secondaries,
3115
      status=self.instance_status,
3116
      os_type=self.op.os_type,
3117
      memory=self.op.mem_size,
3118
      vcpus=self.op.vcpus,
3119
      nics=[(self.inst_ip, self.op.bridge, self.op.mac)],
3120
    ))
3121

    
3122
    nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
3123
          self.secondaries)
3124
    return env, nl, nl
3125

    
3126

    
3127
  def CheckPrereq(self):
3128
    """Check prerequisites.
3129

3130
    """
3131
    # set optional parameters to none if they don't exist
3132
    for attr in ["kernel_path", "initrd_path", "hvm_boot_order", "pnode",
3133
                 "iallocator", "hvm_acpi", "hvm_pae", "hvm_cdrom_image_path",
3134
                 "hvm_nic_type", "hvm_disk_type", "vnc_bind_address"]:
3135
      if not hasattr(self.op, attr):
3136
        setattr(self.op, attr, None)
3137

    
3138
    if self.op.mode not in (constants.INSTANCE_CREATE,
3139
                            constants.INSTANCE_IMPORT):
3140
      raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
3141
                                 self.op.mode)
3142

    
3143
    if (not self.cfg.GetVGName() and
3144
        self.op.disk_template not in constants.DTS_NOT_LVM):
3145
      raise errors.OpPrereqError("Cluster does not support lvm-based"
3146
                                 " instances")
3147

    
3148
    if self.op.mode == constants.INSTANCE_IMPORT:
3149
      src_node = getattr(self.op, "src_node", None)
3150
      src_path = getattr(self.op, "src_path", None)
3151
      if src_node is None or src_path is None:
3152
        raise errors.OpPrereqError("Importing an instance requires source"
3153
                                   " node and path options")
3154
      src_node_full = self.cfg.ExpandNodeName(src_node)
3155
      if src_node_full is None:
3156
        raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
3157
      self.op.src_node = src_node = src_node_full
3158

    
3159
      if not os.path.isabs(src_path):
3160
        raise errors.OpPrereqError("The source path must be absolute")
3161

    
3162
      export_info = rpc.call_export_info(src_node, src_path)
3163

    
3164
      if not export_info:
3165
        raise errors.OpPrereqError("No export found in dir %s" % src_path)
3166

    
3167
      if not export_info.has_section(constants.INISECT_EXP):
3168
        raise errors.ProgrammerError("Corrupted export config")
3169

    
3170
      ei_version = export_info.get(constants.INISECT_EXP, 'version')
3171
      if (int(ei_version) != constants.EXPORT_VERSION):
3172
        raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3173
                                   (ei_version, constants.EXPORT_VERSION))
3174

    
3175
      if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
3176
        raise errors.OpPrereqError("Can't import instance with more than"
3177
                                   " one data disk")
3178

    
3179
      # FIXME: are the old os-es, disk sizes, etc. useful?
3180
      self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3181
      diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
3182
                                                         'disk0_dump'))
3183
      self.src_image = diskimage
3184
    else: # INSTANCE_CREATE
3185
      if getattr(self.op, "os_type", None) is None:
3186
        raise errors.OpPrereqError("No guest OS specified")
3187

    
3188
    #### instance parameters check
3189

    
3190
    # disk template and mirror node verification
3191
    if self.op.disk_template not in constants.DISK_TEMPLATES:
3192
      raise errors.OpPrereqError("Invalid disk template name")
3193

    
3194
    # instance name verification
3195
    hostname1 = utils.HostInfo(self.op.instance_name)
3196

    
3197
    self.op.instance_name = instance_name = hostname1.name
3198
    instance_list = self.cfg.GetInstanceList()
3199
    if instance_name in instance_list:
3200
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3201
                                 instance_name)
3202

    
3203
    # ip validity checks
3204
    ip = getattr(self.op, "ip", None)
3205
    if ip is None or ip.lower() == "none":
3206
      inst_ip = None
3207
    elif ip.lower() == "auto":
3208
      inst_ip = hostname1.ip
3209
    else:
3210
      if not utils.IsValidIP(ip):
3211
        raise errors.OpPrereqError("given IP address '%s' doesn't look"
3212
                                   " like a valid IP" % ip)
3213
      inst_ip = ip
3214
    self.inst_ip = self.op.ip = inst_ip
3215

    
3216
    if self.op.start and not self.op.ip_check:
3217
      raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3218
                                 " adding an instance in start mode")
3219

    
3220
    if self.op.ip_check:
3221
      if utils.TcpPing(hostname1.ip, constants.DEFAULT_NODED_PORT):
3222
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
3223
                                   (hostname1.ip, instance_name))
3224

    
3225
    # MAC address verification
3226
    if self.op.mac != "auto":
3227
      if not utils.IsValidMac(self.op.mac.lower()):
3228
        raise errors.OpPrereqError("invalid MAC address specified: %s" %
3229
                                   self.op.mac)
3230

    
3231
    # bridge verification
3232
    bridge = getattr(self.op, "bridge", None)
3233
    if bridge is None:
3234
      self.op.bridge = self.cfg.GetDefBridge()
3235
    else:
3236
      self.op.bridge = bridge
3237

    
3238
    # boot order verification
3239
    if self.op.hvm_boot_order is not None:
3240
      if len(self.op.hvm_boot_order.strip("acdn")) != 0:
3241
        raise errors.OpPrereqError("invalid boot order specified,"
3242
                                   " must be one or more of [acdn]")
3243
    # file storage checks
3244
    if (self.op.file_driver and
3245
        not self.op.file_driver in constants.FILE_DRIVER):
3246
      raise errors.OpPrereqError("Invalid file driver name '%s'" %
3247
                                 self.op.file_driver)
3248

    
3249
    if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
3250
      raise errors.OpPrereqError("File storage directory not a relative"
3251
                                 " path")
3252
    #### allocator run
3253

    
3254
    if [self.op.iallocator, self.op.pnode].count(None) != 1:
3255
      raise errors.OpPrereqError("One and only one of iallocator and primary"
3256
                                 " node must be given")
3257

    
3258
    if self.op.iallocator is not None:
3259
      self._RunAllocator()
3260

    
3261
    #### node related checks
3262

    
3263
    # check primary node
3264
    pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
3265
    if pnode is None:
3266
      raise errors.OpPrereqError("Primary node '%s' is unknown" %
3267
                                 self.op.pnode)
3268
    self.op.pnode = pnode.name
3269
    self.pnode = pnode
3270
    self.secondaries = []
3271

    
3272
    # mirror node verification
3273
    if self.op.disk_template in constants.DTS_NET_MIRROR:
3274
      if getattr(self.op, "snode", None) is None:
3275
        raise errors.OpPrereqError("The networked disk templates need"
3276
                                   " a mirror node")
3277

    
3278
      snode_name = self.cfg.ExpandNodeName(self.op.snode)
3279
      if snode_name is None:
3280
        raise errors.OpPrereqError("Unknown secondary node '%s'" %
3281
                                   self.op.snode)
3282
      elif snode_name == pnode.name:
3283
        raise errors.OpPrereqError("The secondary node cannot be"
3284
                                   " the primary node.")
3285
      self.secondaries.append(snode_name)
3286

    
3287
    req_size = _ComputeDiskSize(self.op.disk_template,
3288
                                self.op.disk_size, self.op.swap_size)
3289

    
3290
    # Check lv size requirements
3291
    if req_size is not None:
3292
      nodenames = [pnode.name] + self.secondaries
3293
      nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
3294
      for node in nodenames:
3295
        info = nodeinfo.get(node, None)
3296
        if not info:
3297
          raise errors.OpPrereqError("Cannot get current information"
3298
                                     " from node '%s'" % node)
3299
        vg_free = info.get('vg_free', None)
3300
        if not isinstance(vg_free, int):
3301
          raise errors.OpPrereqError("Can't compute free disk space on"
3302
                                     " node %s" % node)
3303
        if req_size > info['vg_free']:
3304
          raise errors.OpPrereqError("Not enough disk space on target node %s."
3305
                                     " %d MB available, %d MB required" %
3306
                                     (node, info['vg_free'], req_size))
3307

    
3308
    # os verification
3309
    os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
3310
    if not os_obj:
3311
      raise errors.OpPrereqError("OS '%s' not in supported os list for"
3312
                                 " primary node"  % self.op.os_type)
3313

    
3314
    if self.op.kernel_path == constants.VALUE_NONE:
3315
      raise errors.OpPrereqError("Can't set instance kernel to none")
3316

    
3317

    
3318
    # bridge check on primary node
3319
    if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3320
      raise errors.OpPrereqError("target bridge '%s' does not exist on"
3321
                                 " destination node '%s'" %
3322
                                 (self.op.bridge, pnode.name))
3323

    
3324
    # memory check on primary node
3325
    if self.op.start:
3326
      _CheckNodeFreeMemory(self.cfg, self.pnode.name,
3327
                           "creating instance %s" % self.op.instance_name,
3328
                           self.op.mem_size)
3329

    
3330
    # hvm_cdrom_image_path verification
3331
    if self.op.hvm_cdrom_image_path is not None:
3332
      if not os.path.isabs(self.op.hvm_cdrom_image_path):
3333
        raise errors.OpPrereqError("The path to the HVM CDROM image must"
3334
                                   " be an absolute path or None, not %s" %
3335
                                   self.op.hvm_cdrom_image_path)
3336
      if not os.path.isfile(self.op.hvm_cdrom_image_path):
3337
        raise errors.OpPrereqError("The HVM CDROM image must either be a"
3338
                                   " regular file or a symlink pointing to"
3339
                                   " an existing regular file, not %s" %
3340
                                   self.op.hvm_cdrom_image_path)
3341

    
3342
    # vnc_bind_address verification
3343
    if self.op.vnc_bind_address is not None:
3344
      if not utils.IsValidIP(self.op.vnc_bind_address):
3345
        raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
3346
                                   " like a valid IP address" %
3347
                                   self.op.vnc_bind_address)
3348

    
3349
    # Xen HVM device type checks
3350
    if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
3351
      if self.op.hvm_nic_type not in constants.HT_HVM_VALID_NIC_TYPES:
3352
        raise errors.OpPrereqError("Invalid NIC type %s specified for Xen HVM"
3353
                                   " hypervisor" % self.op.hvm_nic_type)
3354
      if self.op.hvm_disk_type not in constants.HT_HVM_VALID_DISK_TYPES:
3355
        raise errors.OpPrereqError("Invalid disk type %s specified for Xen HVM"
3356
                                   " hypervisor" % self.op.hvm_disk_type)
3357

    
3358
    if self.op.start:
3359
      self.instance_status = 'up'
3360
    else:
3361
      self.instance_status = 'down'
3362

    
3363
  def Exec(self, feedback_fn):
3364
    """Create and add the instance to the cluster.
3365

3366
    """
3367
    instance = self.op.instance_name
3368
    pnode_name = self.pnode.name
3369

    
3370
    if self.op.mac == "auto":
3371
      mac_address = self.cfg.GenerateMAC()
3372
    else:
3373
      mac_address = self.op.mac
3374

    
3375
    nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3376
    if self.inst_ip is not None:
3377
      nic.ip = self.inst_ip
3378

    
3379
    ht_kind = self.sstore.GetHypervisorType()
3380
    if ht_kind in constants.HTS_REQ_PORT:
3381
      network_port = self.cfg.AllocatePort()
3382
    else:
3383
      network_port = None
3384

    
3385
    if self.op.vnc_bind_address is None:
3386
      self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
3387

    
3388
    # this is needed because os.path.join does not accept None arguments
3389
    if self.op.file_storage_dir is None:
3390
      string_file_storage_dir = ""
3391
    else:
3392
      string_file_storage_dir = self.op.file_storage_dir
3393

    
3394
    # build the full file storage dir path
3395
    file_storage_dir = os.path.normpath(os.path.join(
3396
                                        self.sstore.GetFileStorageDir(),
3397
                                        string_file_storage_dir, instance))
3398

    
3399

    
3400
    disks = _GenerateDiskTemplate(self.cfg,
3401
                                  self.op.disk_template,
3402
                                  instance, pnode_name,
3403
                                  self.secondaries, self.op.disk_size,
3404
                                  self.op.swap_size,
3405
                                  file_storage_dir,
3406
                                  self.op.file_driver)
3407

    
3408
    iobj = objects.Instance(name=instance, os=self.op.os_type,
3409
                            primary_node=pnode_name,
3410
                            memory=self.op.mem_size,
3411
                            vcpus=self.op.vcpus,
3412
                            nics=[nic], disks=disks,
3413
                            disk_template=self.op.disk_template,
3414
                            status=self.instance_status,
3415
                            network_port=network_port,
3416
                            kernel_path=self.op.kernel_path,
3417
                            initrd_path=self.op.initrd_path,
3418
                            hvm_boot_order=self.op.hvm_boot_order,
3419
                            hvm_acpi=self.op.hvm_acpi,
3420
                            hvm_pae=self.op.hvm_pae,
3421
                            hvm_cdrom_image_path=self.op.hvm_cdrom_image_path,
3422
                            vnc_bind_address=self.op.vnc_bind_address,
3423
                            hvm_nic_type=self.op.hvm_nic_type,
3424
                            hvm_disk_type=self.op.hvm_disk_type,
3425
                            )
3426

    
3427
    feedback_fn("* creating instance disks...")
3428
    if not _CreateDisks(self.cfg, iobj):
3429
      _RemoveDisks(iobj, self.cfg)
3430
      raise errors.OpExecError("Device creation failed, reverting...")
3431

    
3432
    feedback_fn("adding instance %s to cluster config" % instance)
3433

    
3434
    self.cfg.AddInstance(iobj)
3435
    # Add the new instance to the Ganeti Lock Manager
3436
    self.context.glm.add(locking.LEVEL_INSTANCE, instance)
3437

    
3438
    if self.op.wait_for_sync:
3439
      disk_abort = not _WaitForSync(self.cfg, iobj, self.proc)
3440
    elif iobj.disk_template in constants.DTS_NET_MIRROR:
3441
      # make sure the disks are not degraded (still sync-ing is ok)
3442
      time.sleep(15)
3443
      feedback_fn("* checking mirrors status")
3444
      disk_abort = not _WaitForSync(self.cfg, iobj, self.proc, oneshot=True)
3445
    else:
3446
      disk_abort = False
3447

    
3448
    if disk_abort:
3449
      _RemoveDisks(iobj, self.cfg)
3450
      self.cfg.RemoveInstance(iobj.name)
3451
      # Remove the new instance from the Ganeti Lock Manager
3452
      self.context.glm.remove(locking.LEVEL_INSTANCE, iobj.name)
3453
      raise errors.OpExecError("There are some degraded disks for"
3454
                               " this instance")
3455

    
3456
    feedback_fn("creating os for instance %s on node %s" %
3457
                (instance, pnode_name))
3458

    
3459
    if iobj.disk_template != constants.DT_DISKLESS:
3460
      if self.op.mode == constants.INSTANCE_CREATE:
3461
        feedback_fn("* running the instance OS create scripts...")
3462
        if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3463
          raise errors.OpExecError("could not add os for instance %s"
3464
                                   " on node %s" %
3465
                                   (instance, pnode_name))
3466

    
3467
      elif self.op.mode == constants.INSTANCE_IMPORT:
3468
        feedback_fn("* running the instance OS import scripts...")
3469
        src_node = self.op.src_node
3470
        src_image = self.src_image
3471
        if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3472
                                                src_node, src_image):
3473
          raise errors.OpExecError("Could not import os for instance"
3474
                                   " %s on node %s" %
3475
                                   (instance, pnode_name))
3476
      else:
3477
        # also checked in the prereq part
3478
        raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3479
                                     % self.op.mode)
3480

    
3481
    if self.op.start:
3482
      logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3483
      feedback_fn("* starting instance...")
3484
      if not rpc.call_instance_start(pnode_name, iobj, None):
3485
        raise errors.OpExecError("Could not start instance")
3486

    
3487

    
3488
class LUConnectConsole(NoHooksLU):
3489
  """Connect to an instance's console.
3490

3491
  This is somewhat special in that it returns the command line that
3492
  you need to run on the master node in order to connect to the
3493
  console.
3494

3495
  """
3496
  _OP_REQP = ["instance_name"]
3497
  REQ_BGL = False
3498

    
3499
  def ExpandNames(self):
3500
    self._ExpandAndLockInstance()
3501

    
3502
  def CheckPrereq(self):
3503
    """Check prerequisites.
3504

3505
    This checks that the instance is in the cluster.
3506

3507
    """
3508
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3509
    assert self.instance is not None, \
3510
      "Cannot retrieve locked instance %s" % self.op.instance_name
3511

    
3512
  def Exec(self, feedback_fn):
3513
    """Connect to the console of an instance
3514

3515
    """
3516
    instance = self.instance
3517
    node = instance.primary_node
3518

    
3519
    node_insts = rpc.call_instance_list([node])[node]
3520
    if node_insts is False:
3521
      raise errors.OpExecError("Can't connect to node %s." % node)
3522

    
3523
    if instance.name not in node_insts:
3524
      raise errors.OpExecError("Instance %s is not running." % instance.name)
3525

    
3526
    logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3527

    
3528
    hyper = hypervisor.GetHypervisor()
3529
    console_cmd = hyper.GetShellCommandForConsole(instance)
3530

    
3531
    # build ssh cmdline
3532
    return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
3533

    
3534

    
3535
class LUReplaceDisks(LogicalUnit):
3536
  """Replace the disks of an instance.
3537

3538
  """
3539
  HPATH = "mirrors-replace"
3540
  HTYPE = constants.HTYPE_INSTANCE
3541
  _OP_REQP = ["instance_name", "mode", "disks"]
3542
  REQ_BGL = False
3543

    
3544
  def ExpandNames(self):
3545
    self._ExpandAndLockInstance()
3546

    
3547
    if not hasattr(self.op, "remote_node"):
3548
      self.op.remote_node = None
3549

    
3550
    ia_name = getattr(self.op, "iallocator", None)
3551
    if ia_name is not None:
3552
      if self.op.remote_node is not None:
3553
        raise errors.OpPrereqError("Give either the iallocator or the new"
3554
                                   " secondary, not both")
3555
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3556
    elif self.op.remote_node is not None:
3557
      remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
3558
      if remote_node is None:
3559
        raise errors.OpPrereqError("Node '%s' not known" %
3560
                                   self.op.remote_node)
3561
      self.op.remote_node = remote_node
3562
      self.needed_locks[locking.LEVEL_NODE] = [remote_node]
3563
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
3564
    else:
3565
      self.needed_locks[locking.LEVEL_NODE] = []
3566
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3567

    
3568
  def DeclareLocks(self, level):
3569
    # If we're not already locking all nodes in the set we have to declare the
3570
    # instance's primary/secondary nodes.
3571
    if (level == locking.LEVEL_NODE and
3572
        self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
3573
      self._LockInstancesNodes()
3574

    
3575
  def _RunAllocator(self):
3576
    """Compute a new secondary node using an IAllocator.
3577

3578
    """
3579
    ial = IAllocator(self.cfg, self.sstore,
3580
                     mode=constants.IALLOCATOR_MODE_RELOC,
3581
                     name=self.op.instance_name,
3582
                     relocate_from=[self.sec_node])
3583

    
3584
    ial.Run(self.op.iallocator)
3585

    
3586
    if not ial.success:
3587
      raise errors.OpPrereqError("Can't compute nodes using"
3588
                                 " iallocator '%s': %s" % (self.op.iallocator,
3589
                                                           ial.info))
3590
    if len(ial.nodes) != ial.required_nodes:
3591
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3592
                                 " of nodes (%s), required %s" %
3593
                                 (len(ial.nodes), ial.required_nodes))
3594
    self.op.remote_node = ial.nodes[0]
3595
    logger.ToStdout("Selected new secondary for the instance: %s" %
3596
                    self.op.remote_node)
3597

    
3598
  def BuildHooksEnv(self):
3599
    """Build hooks env.
3600

3601
    This runs on the master, the primary and all the secondaries.
3602

3603
    """
3604
    env = {
3605
      "MODE": self.op.mode,
3606
      "NEW_SECONDARY": self.op.remote_node,
3607
      "OLD_SECONDARY": self.instance.secondary_nodes[0],
3608
      }
3609
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3610
    nl = [
3611
      self.sstore.GetMasterNode(),
3612
      self.instance.primary_node,
3613
      ]
3614
    if self.op.remote_node is not None:
3615
      nl.append(self.op.remote_node)
3616
    return env, nl, nl
3617

    
3618
  def CheckPrereq(self):
3619
    """Check prerequisites.
3620

3621
    This checks that the instance is in the cluster.
3622

3623
    """
3624
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3625
    assert instance is not None, \
3626
      "Cannot retrieve locked instance %s" % self.op.instance_name
3627
    self.instance = instance
3628

    
3629
    if instance.disk_template not in constants.DTS_NET_MIRROR:
3630
      raise errors.OpPrereqError("Instance's disk layout is not"
3631
                                 " network mirrored.")
3632

    
3633
    if len(instance.secondary_nodes) != 1:
3634
      raise errors.OpPrereqError("The instance has a strange layout,"
3635
                                 " expected one secondary but found %d" %
3636
                                 len(instance.secondary_nodes))
3637

    
3638
    self.sec_node = instance.secondary_nodes[0]
3639

    
3640
    ia_name = getattr(self.op, "iallocator", None)
3641
    if ia_name is not None:
3642
      self._RunAllocator()
3643

    
3644
    remote_node = self.op.remote_node
3645
    if remote_node is not None:
3646
      self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3647
      assert self.remote_node_info is not None, \
3648
        "Cannot retrieve locked node %s" % remote_node
3649
    else:
3650
      self.remote_node_info = None
3651
    if remote_node == instance.primary_node:
3652
      raise errors.OpPrereqError("The specified node is the primary node of"
3653
                                 " the instance.")
3654
    elif remote_node == self.sec_node:
3655
      if self.op.mode == constants.REPLACE_DISK_SEC:
3656
        # this is for DRBD8, where we can't execute the same mode of
3657
        # replacement as for drbd7 (no different port allocated)
3658
        raise errors.OpPrereqError("Same secondary given, cannot execute"
3659
                                   " replacement")
3660
    if instance.disk_template == constants.DT_DRBD8:
3661
      if (self.op.mode == constants.REPLACE_DISK_ALL and
3662
          remote_node is not None):
3663
        # switch to replace secondary mode
3664
        self.op.mode = constants.REPLACE_DISK_SEC
3665

    
3666
      if self.op.mode == constants.REPLACE_DISK_ALL:
3667
        raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3668
                                   " secondary disk replacement, not"
3669
                                   " both at once")
3670
      elif self.op.mode == constants.REPLACE_DISK_PRI:
3671
        if remote_node is not None:
3672
          raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3673
                                     " the secondary while doing a primary"
3674
                                     " node disk replacement")
3675
        self.tgt_node = instance.primary_node
3676
        self.oth_node = instance.secondary_nodes[0]
3677
      elif self.op.mode == constants.REPLACE_DISK_SEC:
3678
        self.new_node = remote_node # this can be None, in which case
3679
                                    # we don't change the secondary
3680
        self.tgt_node = instance.secondary_nodes[0]
3681
        self.oth_node = instance.primary_node
3682
      else:
3683
        raise errors.ProgrammerError("Unhandled disk replace mode")
3684

    
3685
    for name in self.op.disks:
3686
      if instance.FindDisk(name) is None:
3687
        raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3688
                                   (name, instance.name))
3689

    
3690
  def _ExecD8DiskOnly(self, feedback_fn):
3691
    """Replace a disk on the primary or secondary for dbrd8.
3692

3693
    The algorithm for replace is quite complicated:
3694
      - for each disk to be replaced:
3695
        - create new LVs on the target node with unique names
3696
        - detach old LVs from the drbd device
3697
        - rename old LVs to name_replaced.<time_t>
3698
        - rename new LVs to old LVs
3699
        - attach the new LVs (with the old names now) to the drbd device
3700
      - wait for sync across all devices
3701
      - for each modified disk:
3702
        - remove old LVs (which have the name name_replaces.<time_t>)
3703

3704
    Failures are not very well handled.
3705

3706
    """
3707
    steps_total = 6
3708
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3709
    instance = self.instance
3710
    iv_names = {}
3711
    vgname = self.cfg.GetVGName()
3712
    # start of work
3713
    cfg = self.cfg
3714
    tgt_node = self.tgt_node
3715
    oth_node = self.oth_node
3716

    
3717
    # Step: check device activation
3718
    self.proc.LogStep(1, steps_total, "check device existence")
3719
    info("checking volume groups")
3720
    my_vg = cfg.GetVGName()
3721
    results = rpc.call_vg_list([oth_node, tgt_node])
3722
    if not results:
3723
      raise errors.OpExecError("Can't list volume groups on the nodes")
3724
    for node in oth_node, tgt_node:
3725
      res = results.get(node, False)
3726
      if not res or my_vg not in res:
3727
        raise errors.OpExecError("Volume group '%s' not found on %s" %
3728
                                 (my_vg, node))
3729
    for dev in instance.disks:
3730
      if not dev.iv_name in self.op.disks:
3731
        continue
3732
      for node in tgt_node, oth_node:
3733
        info("checking %s on %s" % (dev.iv_name, node))
3734
        cfg.SetDiskID(dev, node)
3735
        if not rpc.call_blockdev_find(node, dev):
3736
          raise errors.OpExecError("Can't find device %s on node %s" %
3737
                                   (dev.iv_name, node))
3738

    
3739
    # Step: check other node consistency
3740
    self.proc.LogStep(2, steps_total, "check peer consistency")
3741
    for dev in instance.disks:
3742
      if not dev.iv_name in self.op.disks:
3743
        continue
3744
      info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3745
      if not _CheckDiskConsistency(self.cfg, dev, oth_node,
3746
                                   oth_node==instance.primary_node):
3747
        raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3748
                                 " to replace disks on this node (%s)" %
3749
                                 (oth_node, tgt_node))
3750

    
3751
    # Step: create new storage
3752
    self.proc.LogStep(3, steps_total, "allocate new storage")
3753
    for dev in instance.disks:
3754
      if not dev.iv_name in self.op.disks:
3755
        continue
3756
      size = dev.size
3757
      cfg.SetDiskID(dev, tgt_node)
3758
      lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3759
      names = _GenerateUniqueNames(cfg, lv_names)
3760
      lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3761
                             logical_id=(vgname, names[0]))
3762
      lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3763
                             logical_id=(vgname, names[1]))
3764
      new_lvs = [lv_data, lv_meta]
3765
      old_lvs = dev.children
3766
      iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3767
      info("creating new local storage on %s for %s" %
3768
           (tgt_node, dev.iv_name))
3769
      # since we *always* want to create this LV, we use the
3770
      # _Create...OnPrimary (which forces the creation), even if we
3771
      # are talking about the secondary node
3772
      for new_lv in new_lvs:
3773
        if not _CreateBlockDevOnPrimary(cfg, tgt_node, instance, new_lv,
3774
                                        _GetInstanceInfoText(instance)):
3775
          raise errors.OpExecError("Failed to create new LV named '%s' on"
3776
                                   " node '%s'" %
3777
                                   (new_lv.logical_id[1], tgt_node))
3778

    
3779
    # Step: for each lv, detach+rename*2+attach
3780
    self.proc.LogStep(4, steps_total, "change drbd configuration")
3781
    for dev, old_lvs, new_lvs in iv_names.itervalues():
3782
      info("detaching %s drbd from local storage" % dev.iv_name)
3783
      if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
3784
        raise errors.OpExecError("Can't detach drbd from local storage on node"
3785
                                 " %s for device %s" % (tgt_node, dev.iv_name))
3786
      #dev.children = []
3787
      #cfg.Update(instance)
3788

    
3789
      # ok, we created the new LVs, so now we know we have the needed
3790
      # storage; as such, we proceed on the target node to rename
3791
      # old_lv to _old, and new_lv to old_lv; note that we rename LVs
3792
      # using the assumption that logical_id == physical_id (which in
3793
      # turn is the unique_id on that node)
3794

    
3795
      # FIXME(iustin): use a better name for the replaced LVs
3796
      temp_suffix = int(time.time())
3797
      ren_fn = lambda d, suff: (d.physical_id[0],
3798
                                d.physical_id[1] + "_replaced-%s" % suff)
3799
      # build the rename list based on what LVs exist on the node
3800
      rlist = []
3801
      for to_ren in old_lvs:
3802
        find_res = rpc.call_blockdev_find(tgt_node, to_ren)
3803
        if find_res is not None: # device exists
3804
          rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
3805

    
3806
      info("renaming the old LVs on the target node")
3807
      if not rpc.call_blockdev_rename(tgt_node, rlist):
3808
        raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
3809
      # now we rename the new LVs to the old LVs
3810
      info("renaming the new LVs on the target node")
3811
      rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
3812
      if not rpc.call_blockdev_rename(tgt_node, rlist):
3813
        raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
3814

    
3815
      for old, new in zip(old_lvs, new_lvs):
3816
        new.logical_id = old.logical_id
3817
        cfg.SetDiskID(new, tgt_node)
3818

    
3819
      for disk in old_lvs:
3820
        disk.logical_id = ren_fn(disk, temp_suffix)
3821
        cfg.SetDiskID(disk, tgt_node)
3822

    
3823
      # now that the new lvs have the old name, we can add them to the device
3824
      info("adding new mirror component on %s" % tgt_node)
3825
      if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
3826
        for new_lv in new_lvs:
3827
          if not rpc.call_blockdev_remove(tgt_node, new_lv):
3828
            warning("Can't rollback device %s", hint="manually cleanup unused"
3829
                    " logical volumes")
3830
        raise errors.OpExecError("Can't add local storage to drbd")
3831

    
3832
      dev.children = new_lvs
3833
      cfg.Update(instance)
3834

    
3835
    # Step: wait for sync
3836

    
3837
    # this can fail as the old devices are degraded and _WaitForSync
3838
    # does a combined result over all disks, so we don't check its
3839
    # return value
3840
    self.proc.LogStep(5, steps_total, "sync devices")
3841
    _WaitForSync(cfg, instance, self.proc, unlock=True)
3842

    
3843
    # so check manually all the devices
3844
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3845
      cfg.SetDiskID(dev, instance.primary_node)
3846
      is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3847
      if is_degr:
3848
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
3849

    
3850
    # Step: remove old storage
3851
    self.proc.LogStep(6, steps_total, "removing old storage")
3852
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3853
      info("remove logical volumes for %s" % name)
3854
      for lv in old_lvs:
3855
        cfg.SetDiskID(lv, tgt_node)
3856
        if not rpc.call_blockdev_remove(tgt_node, lv):
3857
          warning("Can't remove old LV", hint="manually remove unused LVs")
3858
          continue
3859

    
3860
  def _ExecD8Secondary(self, feedback_fn):
3861
    """Replace the secondary node for drbd8.
3862

3863
    The algorithm for replace is quite complicated:
3864
      - for all disks of the instance:
3865
        - create new LVs on the new node with same names
3866
        - shutdown the drbd device on the old secondary
3867
        - disconnect the drbd network on the primary
3868
        - create the drbd device on the new secondary
3869
        - network attach the drbd on the primary, using an artifice:
3870
          the drbd code for Attach() will connect to the network if it
3871
          finds a device which is connected to the good local disks but
3872
          not network enabled
3873
      - wait for sync across all devices
3874
      - remove all disks from the old secondary
3875

3876
    Failures are not very well handled.
3877

3878
    """
3879
    steps_total = 6
3880
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3881
    instance = self.instance
3882
    iv_names = {}
3883
    vgname = self.cfg.GetVGName()
3884
    # start of work
3885
    cfg = self.cfg
3886
    old_node = self.tgt_node
3887
    new_node = self.new_node
3888
    pri_node = instance.primary_node
3889

    
3890
    # Step: check device activation
3891
    self.proc.LogStep(1, steps_total, "check device existence")
3892
    info("checking volume groups")
3893
    my_vg = cfg.GetVGName()
3894
    results = rpc.call_vg_list([pri_node, new_node])
3895
    if not results:
3896
      raise errors.OpExecError("Can't list volume groups on the nodes")
3897
    for node in pri_node, new_node:
3898
      res = results.get(node, False)
3899
      if not res or my_vg not in res:
3900
        raise errors.OpExecError("Volume group '%s' not found on %s" %
3901
                                 (my_vg, node))
3902
    for dev in instance.disks:
3903
      if not dev.iv_name in self.op.disks:
3904
        continue
3905
      info("checking %s on %s" % (dev.iv_name, pri_node))
3906
      cfg.SetDiskID(dev, pri_node)
3907
      if not rpc.call_blockdev_find(pri_node, dev):
3908
        raise errors.OpExecError("Can't find device %s on node %s" %
3909
                                 (dev.iv_name, pri_node))
3910

    
3911
    # Step: check other node consistency
3912
    self.proc.LogStep(2, steps_total, "check peer consistency")
3913
    for dev in instance.disks:
3914
      if not dev.iv_name in self.op.disks:
3915
        continue
3916
      info("checking %s consistency on %s" % (dev.iv_name, pri_node))
3917
      if not _CheckDiskConsistency(self.cfg, dev, pri_node, True, ldisk=True):
3918
        raise errors.OpExecError("Primary node (%s) has degraded storage,"
3919
                                 " unsafe to replace the secondary" %
3920
                                 pri_node)
3921

    
3922
    # Step: create new storage
3923
    self.proc.LogStep(3, steps_total, "allocate new storage")
3924
    for dev in instance.disks:
3925
      size = dev.size
3926
      info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
3927
      # since we *always* want to create this LV, we use the
3928
      # _Create...OnPrimary (which forces the creation), even if we
3929
      # are talking about the secondary node
3930
      for new_lv in dev.children:
3931
        if not _CreateBlockDevOnPrimary(cfg, new_node, instance, new_lv,
3932
                                        _GetInstanceInfoText(instance)):
3933
          raise errors.OpExecError("Failed to create new LV named '%s' on"
3934
                                   " node '%s'" %
3935
                                   (new_lv.logical_id[1], new_node))
3936

    
3937
      iv_names[dev.iv_name] = (dev, dev.children)
3938

    
3939
    self.proc.LogStep(4, steps_total, "changing drbd configuration")
3940
    for dev in instance.disks:
3941
      size = dev.size
3942
      info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
3943
      # create new devices on new_node
3944
      new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
3945
                              logical_id=(pri_node, new_node,
3946
                                          dev.logical_id[2]),
3947
                              children=dev.children)
3948
      if not _CreateBlockDevOnSecondary(cfg, new_node, instance,
3949
                                        new_drbd, False,
3950
                                      _GetInstanceInfoText(instance)):
3951
        raise errors.OpExecError("Failed to create new DRBD on"
3952
                                 " node '%s'" % new_node)
3953

    
3954
    for dev in instance.disks:
3955
      # we have new devices, shutdown the drbd on the old secondary
3956
      info("shutting down drbd for %s on old node" % dev.iv_name)
3957
      cfg.SetDiskID(dev, old_node)
3958
      if not rpc.call_blockdev_shutdown(old_node, dev):
3959
        warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
3960
                hint="Please cleanup this device manually as soon as possible")
3961

    
3962
    info("detaching primary drbds from the network (=> standalone)")
3963
    done = 0
3964
    for dev in instance.disks:
3965
      cfg.SetDiskID(dev, pri_node)
3966
      # set the physical (unique in bdev terms) id to None, meaning
3967
      # detach from network
3968
      dev.physical_id = (None,) * len(dev.physical_id)
3969
      # and 'find' the device, which will 'fix' it to match the
3970
      # standalone state
3971
      if rpc.call_blockdev_find(pri_node, dev):
3972
        done += 1
3973
      else:
3974
        warning("Failed to detach drbd %s from network, unusual case" %
3975
                dev.iv_name)
3976

    
3977
    if not done:
3978
      # no detaches succeeded (very unlikely)
3979
      raise errors.OpExecError("Can't detach at least one DRBD from old node")
3980

    
3981
    # if we managed to detach at least one, we update all the disks of
3982
    # the instance to point to the new secondary
3983
    info("updating instance configuration")
3984
    for dev in instance.disks:
3985
      dev.logical_id = (pri_node, new_node) + dev.logical_id[2:]
3986
      cfg.SetDiskID(dev, pri_node)
3987
    cfg.Update(instance)
3988

    
3989
    # and now perform the drbd attach
3990
    info("attaching primary drbds to new secondary (standalone => connected)")
3991
    failures = []
3992
    for dev in instance.disks:
3993
      info("attaching primary drbd for %s to new secondary node" % dev.iv_name)
3994
      # since the attach is smart, it's enough to 'find' the device,
3995
      # it will automatically activate the network, if the physical_id
3996
      # is correct
3997
      cfg.SetDiskID(dev, pri_node)
3998
      if not rpc.call_blockdev_find(pri_node, dev):
3999
        warning("can't attach drbd %s to new secondary!" % dev.iv_name,
4000
                "please do a gnt-instance info to see the status of disks")
4001

    
4002
    # this can fail as the old devices are degraded and _WaitForSync
4003
    # does a combined result over all disks, so we don't check its
4004
    # return value
4005
    self.proc.LogStep(5, steps_total, "sync devices")
4006
    _WaitForSync(cfg, instance, self.proc, unlock=True)
4007

    
4008
    # so check manually all the devices
4009
    for name, (dev, old_lvs) in iv_names.iteritems():
4010
      cfg.SetDiskID(dev, pri_node)
4011
      is_degr = rpc.call_blockdev_find(pri_node, dev)[5]
4012
      if is_degr:
4013
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
4014

    
4015
    self.proc.LogStep(6, steps_total, "removing old storage")
4016
    for name, (dev, old_lvs) in iv_names.iteritems():
4017
      info("remove logical volumes for %s" % name)
4018
      for lv in old_lvs:
4019
        cfg.SetDiskID(lv, old_node)
4020
        if not rpc.call_blockdev_remove(old_node, lv):
4021
          warning("Can't remove LV on old secondary",
4022
                  hint="Cleanup stale volumes by hand")
4023

    
4024
  def Exec(self, feedback_fn):
4025
    """Execute disk replacement.
4026

4027
    This dispatches the disk replacement to the appropriate handler.
4028

4029
    """
4030
    instance = self.instance
4031

    
4032
    # Activate the instance disks if we're replacing them on a down instance
4033
    if instance.status == "down":
4034
      _StartInstanceDisks(self.cfg, instance, True)
4035

    
4036
    if instance.disk_template == constants.DT_DRBD8:
4037
      if self.op.remote_node is None:
4038
        fn = self._ExecD8DiskOnly
4039
      else:
4040
        fn = self._ExecD8Secondary
4041
    else:
4042
      raise errors.ProgrammerError("Unhandled disk replacement case")
4043

    
4044
    ret = fn(feedback_fn)
4045

    
4046
    # Deactivate the instance disks if we're replacing them on a down instance
4047
    if instance.status == "down":
4048
      _SafeShutdownInstanceDisks(instance, self.cfg)
4049

    
4050
    return ret
4051

    
4052

    
4053
class LUGrowDisk(LogicalUnit):
4054
  """Grow a disk of an instance.
4055

4056
  """
4057
  HPATH = "disk-grow"
4058
  HTYPE = constants.HTYPE_INSTANCE
4059
  _OP_REQP = ["instance_name", "disk", "amount"]
4060
  REQ_BGL = False
4061

    
4062
  def ExpandNames(self):
4063
    self._ExpandAndLockInstance()
4064
    self.needed_locks[locking.LEVEL_NODE] = []
4065
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4066

    
4067
  def DeclareLocks(self, level):
4068
    if level == locking.LEVEL_NODE:
4069
      self._LockInstancesNodes()
4070

    
4071
  def BuildHooksEnv(self):
4072
    """Build hooks env.
4073

4074
    This runs on the master, the primary and all the secondaries.
4075

4076
    """
4077
    env = {
4078
      "DISK": self.op.disk,
4079
      "AMOUNT": self.op.amount,
4080
      }
4081
    env.update(_BuildInstanceHookEnvByObject(self.instance))
4082
    nl = [
4083
      self.sstore.GetMasterNode(),
4084
      self.instance.primary_node,
4085
      ]
4086
    return env, nl, nl
4087

    
4088
  def CheckPrereq(self):
4089
    """Check prerequisites.
4090

4091
    This checks that the instance is in the cluster.
4092

4093
    """
4094
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4095
    assert instance is not None, \
4096
      "Cannot retrieve locked instance %s" % self.op.instance_name
4097

    
4098
    self.instance = instance
4099

    
4100
    if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
4101
      raise errors.OpPrereqError("Instance's disk layout does not support"
4102
                                 " growing.")
4103

    
4104
    if instance.FindDisk(self.op.disk) is None:
4105
      raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
4106
                                 (self.op.disk, instance.name))
4107

    
4108
    nodenames = [instance.primary_node] + list(instance.secondary_nodes)
4109
    nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
4110
    for node in nodenames:
4111
      info = nodeinfo.get(node, None)
4112
      if not info:
4113
        raise errors.OpPrereqError("Cannot get current information"
4114
                                   " from node '%s'" % node)
4115
      vg_free = info.get('vg_free', None)
4116
      if not isinstance(vg_free, int):
4117
        raise errors.OpPrereqError("Can't compute free disk space on"
4118
                                   " node %s" % node)
4119
      if self.op.amount > info['vg_free']:
4120
        raise errors.OpPrereqError("Not enough disk space on target node %s:"
4121
                                   " %d MiB available, %d MiB required" %
4122
                                   (node, info['vg_free'], self.op.amount))
4123

    
4124
  def Exec(self, feedback_fn):
4125
    """Execute disk grow.
4126

4127
    """
4128
    instance = self.instance
4129
    disk = instance.FindDisk(self.op.disk)
4130
    for node in (instance.secondary_nodes + (instance.primary_node,)):
4131
      self.cfg.SetDiskID(disk, node)
4132
      result = rpc.call_blockdev_grow(node, disk, self.op.amount)
4133
      if not result or not isinstance(result, (list, tuple)) or len(result) != 2:
4134
        raise errors.OpExecError("grow request failed to node %s" % node)
4135
      elif not result[0]:
4136
        raise errors.OpExecError("grow request failed to node %s: %s" %
4137
                                 (node, result[1]))
4138
    disk.RecordGrow(self.op.amount)
4139
    self.cfg.Update(instance)
4140
    return
4141

    
4142

    
4143
class LUQueryInstanceData(NoHooksLU):
4144
  """Query runtime instance data.
4145

4146
  """
4147
  _OP_REQP = ["instances"]
4148

    
4149
  def CheckPrereq(self):
4150
    """Check prerequisites.
4151

4152
    This only checks the optional instance list against the existing names.
4153

4154
    """
4155
    if not isinstance(self.op.instances, list):
4156
      raise errors.OpPrereqError("Invalid argument type 'instances'")
4157
    if self.op.instances:
4158
      self.wanted_instances = []
4159
      names = self.op.instances
4160
      for name in names:
4161
        instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
4162
        if instance is None:
4163
          raise errors.OpPrereqError("No such instance name '%s'" % name)
4164
        self.wanted_instances.append(instance)
4165
    else:
4166
      self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
4167
                               in self.cfg.GetInstanceList()]
4168
    return
4169

    
4170

    
4171
  def _ComputeDiskStatus(self, instance, snode, dev):
4172
    """Compute block device status.
4173

4174
    """
4175
    self.cfg.SetDiskID(dev, instance.primary_node)
4176
    dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
4177
    if dev.dev_type in constants.LDS_DRBD:
4178
      # we change the snode then (otherwise we use the one passed in)
4179
      if dev.logical_id[0] == instance.primary_node:
4180
        snode = dev.logical_id[1]
4181
      else:
4182
        snode = dev.logical_id[0]
4183

    
4184
    if snode:
4185
      self.cfg.SetDiskID(dev, snode)
4186
      dev_sstatus = rpc.call_blockdev_find(snode, dev)
4187
    else:
4188
      dev_sstatus = None
4189

    
4190
    if dev.children:
4191
      dev_children = [self._ComputeDiskStatus(instance, snode, child)
4192
                      for child in dev.children]
4193
    else:
4194
      dev_children = []
4195

    
4196
    data = {
4197
      "iv_name": dev.iv_name,
4198
      "dev_type": dev.dev_type,
4199
      "logical_id": dev.logical_id,
4200
      "physical_id": dev.physical_id,
4201
      "pstatus": dev_pstatus,
4202
      "sstatus": dev_sstatus,
4203
      "children": dev_children,
4204
      }
4205

    
4206
    return data
4207

    
4208
  def Exec(self, feedback_fn):
4209
    """Gather and return data"""
4210
    result = {}
4211
    for instance in self.wanted_instances:
4212
      remote_info = rpc.call_instance_info(instance.primary_node,
4213
                                                instance.name)
4214
      if remote_info and "state" in remote_info:
4215
        remote_state = "up"
4216
      else:
4217
        remote_state = "down"
4218
      if instance.status == "down":
4219
        config_state = "down"
4220
      else:
4221
        config_state = "up"
4222

    
4223
      disks = [self._ComputeDiskStatus(instance, None, device)
4224
               for device in instance.disks]
4225

    
4226
      idict = {
4227
        "name": instance.name,
4228
        "config_state": config_state,
4229
        "run_state": remote_state,
4230
        "pnode": instance.primary_node,
4231
        "snodes": instance.secondary_nodes,
4232
        "os": instance.os,
4233
        "memory": instance.memory,
4234
        "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
4235
        "disks": disks,
4236
        "vcpus": instance.vcpus,
4237
        }
4238

    
4239
      htkind = self.sstore.GetHypervisorType()
4240
      if htkind == constants.HT_XEN_PVM30:
4241
        idict["kernel_path"] = instance.kernel_path
4242
        idict["initrd_path"] = instance.initrd_path
4243

    
4244
      if htkind == constants.HT_XEN_HVM31:
4245
        idict["hvm_boot_order"] = instance.hvm_boot_order
4246
        idict["hvm_acpi"] = instance.hvm_acpi
4247
        idict["hvm_pae"] = instance.hvm_pae
4248
        idict["hvm_cdrom_image_path"] = instance.hvm_cdrom_image_path
4249
        idict["hvm_nic_type"] = instance.hvm_nic_type
4250
        idict["hvm_disk_type"] = instance.hvm_disk_type
4251

    
4252
      if htkind in constants.HTS_REQ_PORT:
4253
        if instance.vnc_bind_address is None:
4254
          vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
4255
        else:
4256
          vnc_bind_address = instance.vnc_bind_address
4257
        if instance.network_port is None:
4258
          vnc_console_port = None
4259
        elif vnc_bind_address == constants.BIND_ADDRESS_GLOBAL:
4260
          vnc_console_port = "%s:%s" % (instance.primary_node,
4261
                                       instance.network_port)
4262
        elif vnc_bind_address == constants.LOCALHOST_IP_ADDRESS:
4263
          vnc_console_port = "%s:%s on node %s" % (vnc_bind_address,
4264
                                                   instance.network_port,
4265
                                                   instance.primary_node)
4266
        else:
4267
          vnc_console_port = "%s:%s" % (instance.vnc_bind_address,
4268
                                        instance.network_port)
4269
        idict["vnc_console_port"] = vnc_console_port
4270
        idict["vnc_bind_address"] = vnc_bind_address
4271
        idict["network_port"] = instance.network_port
4272

    
4273
      result[instance.name] = idict
4274

    
4275
    return result
4276

    
4277

    
4278
class LUSetInstanceParams(LogicalUnit):
4279
  """Modifies an instances's parameters.
4280

4281
  """
4282
  HPATH = "instance-modify"
4283
  HTYPE = constants.HTYPE_INSTANCE
4284
  _OP_REQP = ["instance_name"]
4285
  REQ_BGL = False
4286

    
4287
  def ExpandNames(self):
4288
    self._ExpandAndLockInstance()
4289

    
4290
  def BuildHooksEnv(self):
4291
    """Build hooks env.
4292

4293
    This runs on the master, primary and secondaries.
4294

4295
    """
4296
    args = dict()
4297
    if self.mem:
4298
      args['memory'] = self.mem
4299
    if self.vcpus:
4300
      args['vcpus'] = self.vcpus
4301
    if self.do_ip or self.do_bridge or self.mac:
4302
      if self.do_ip:
4303
        ip = self.ip
4304
      else:
4305
        ip = self.instance.nics[0].ip
4306
      if self.bridge:
4307
        bridge = self.bridge
4308
      else:
4309
        bridge = self.instance.nics[0].bridge
4310
      if self.mac:
4311
        mac = self.mac
4312
      else:
4313
        mac = self.instance.nics[0].mac
4314
      args['nics'] = [(ip, bridge, mac)]
4315
    env = _BuildInstanceHookEnvByObject(self.instance, override=args)
4316
    nl = [self.sstore.GetMasterNode(),
4317
          self.instance.primary_node] + list(self.instance.secondary_nodes)
4318
    return env, nl, nl
4319

    
4320
  def CheckPrereq(self):
4321
    """Check prerequisites.
4322

4323
    This only checks the instance list against the existing names.
4324

4325
    """
4326
    # FIXME: all the parameters could be checked before, in ExpandNames, or in
4327
    # a separate CheckArguments function, if we implement one, so the operation
4328
    # can be aborted without waiting for any lock, should it have an error...
4329
    self.mem = getattr(self.op, "mem", None)
4330
    self.vcpus = getattr(self.op, "vcpus", None)
4331
    self.ip = getattr(self.op, "ip", None)
4332
    self.mac = getattr(self.op, "mac", None)
4333
    self.bridge = getattr(self.op, "bridge", None)
4334
    self.kernel_path = getattr(self.op, "kernel_path", None)
4335
    self.initrd_path = getattr(self.op, "initrd_path", None)
4336
    self.hvm_boot_order = getattr(self.op, "hvm_boot_order", None)
4337
    self.hvm_acpi = getattr(self.op, "hvm_acpi", None)
4338
    self.hvm_pae = getattr(self.op, "hvm_pae", None)
4339
    self.hvm_nic_type = getattr(self.op, "hvm_nic_type", None)
4340
    self.hvm_disk_type = getattr(self.op, "hvm_disk_type", None)
4341
    self.hvm_cdrom_image_path = getattr(self.op, "hvm_cdrom_image_path", None)
4342
    self.vnc_bind_address = getattr(self.op, "vnc_bind_address", None)
4343
    self.force = getattr(self.op, "force", None)
4344
    all_parms = [self.mem, self.vcpus, self.ip, self.bridge, self.mac,
4345
                 self.kernel_path, self.initrd_path, self.hvm_boot_order,
4346
                 self.hvm_acpi, self.hvm_pae, self.hvm_cdrom_image_path,
4347
                 self.vnc_bind_address, self.hvm_nic_type, self.hvm_disk_type]
4348
    if all_parms.count(None) == len(all_parms):
4349
      raise errors.OpPrereqError("No changes submitted")
4350
    if self.mem is not None:
4351
      try:
4352
        self.mem = int(self.mem)
4353
      except ValueError, err:
4354
        raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
4355
    if self.vcpus is not None:
4356
      try:
4357
        self.vcpus = int(self.vcpus)
4358
      except ValueError, err:
4359
        raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
4360
    if self.ip is not None:
4361
      self.do_ip = True
4362
      if self.ip.lower() == "none":
4363
        self.ip = None
4364
      else:
4365
        if not utils.IsValidIP(self.ip):
4366
          raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
4367
    else:
4368
      self.do_ip = False
4369
    self.do_bridge = (self.bridge is not None)
4370
    if self.mac is not None:
4371
      if self.cfg.IsMacInUse(self.mac):
4372
        raise errors.OpPrereqError('MAC address %s already in use in cluster' %
4373
                                   self.mac)
4374
      if not utils.IsValidMac(self.mac):
4375
        raise errors.OpPrereqError('Invalid MAC address %s' % self.mac)
4376

    
4377
    if self.kernel_path is not None:
4378
      self.do_kernel_path = True
4379
      if self.kernel_path == constants.VALUE_NONE:
4380
        raise errors.OpPrereqError("Can't set instance to no kernel")
4381

    
4382
      if self.kernel_path != constants.VALUE_DEFAULT:
4383
        if not os.path.isabs(self.kernel_path):
4384
          raise errors.OpPrereqError("The kernel path must be an absolute"
4385
                                    " filename")
4386
    else:
4387
      self.do_kernel_path = False
4388

    
4389
    if self.initrd_path is not None:
4390
      self.do_initrd_path = True
4391
      if self.initrd_path not in (constants.VALUE_NONE,
4392
                                  constants.VALUE_DEFAULT):
4393
        if not os.path.isabs(self.initrd_path):
4394
          raise errors.OpPrereqError("The initrd path must be an absolute"
4395
                                    " filename")
4396
    else:
4397
      self.do_initrd_path = False
4398

    
4399
    # boot order verification
4400
    if self.hvm_boot_order is not None:
4401
      if self.hvm_boot_order != constants.VALUE_DEFAULT:
4402
        if len(self.hvm_boot_order.strip("acdn")) != 0:
4403
          raise errors.OpPrereqError("invalid boot order specified,"
4404
                                     " must be one or more of [acdn]"
4405
                                     " or 'default'")
4406

    
4407
    # hvm_cdrom_image_path verification
4408
    if self.op.hvm_cdrom_image_path is not None:
4409
      if not (os.path.isabs(self.op.hvm_cdrom_image_path) or
4410
              self.op.hvm_cdrom_image_path.lower() == "none"):
4411
        raise errors.OpPrereqError("The path to the HVM CDROM image must"
4412
                                   " be an absolute path or None, not %s" %
4413
                                   self.op.hvm_cdrom_image_path)
4414
      if not (os.path.isfile(self.op.hvm_cdrom_image_path) or
4415
              self.op.hvm_cdrom_image_path.lower() == "none"):
4416
        raise errors.OpPrereqError("The HVM CDROM image must either be a"
4417
                                   " regular file or a symlink pointing to"
4418
                                   " an existing regular file, not %s" %
4419
                                   self.op.hvm_cdrom_image_path)
4420

    
4421
    # vnc_bind_address verification
4422
    if self.op.vnc_bind_address is not None:
4423
      if not utils.IsValidIP(self.op.vnc_bind_address):
4424
        raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
4425
                                   " like a valid IP address" %
4426
                                   self.op.vnc_bind_address)
4427

    
4428
    instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4429
    assert self.instance is not None, \
4430
      "Cannot retrieve locked instance %s" % self.op.instance_name
4431
    self.warn = []
4432
    if self.mem is not None and not self.force:
4433
      pnode = self.instance.primary_node
4434
      nodelist = [pnode]
4435
      nodelist.extend(instance.secondary_nodes)
4436
      instance_info = rpc.call_instance_info(pnode, instance.name)
4437
      nodeinfo = rpc.call_node_info(nodelist, self.cfg.GetVGName())
4438

    
4439
      if pnode not in nodeinfo or not isinstance(nodeinfo[pnode], dict):
4440
        # Assume the primary node is unreachable and go ahead
4441
        self.warn.append("Can't get info from primary node %s" % pnode)
4442
      else:
4443
        if instance_info:
4444
          current_mem = instance_info['memory']
4445
        else:
4446
          # Assume instance not running
4447
          # (there is a slight race condition here, but it's not very probable,
4448
          # and we have no other way to check)
4449
          current_mem = 0
4450
        miss_mem = self.mem - current_mem - nodeinfo[pnode]['memory_free']
4451
        if miss_mem > 0:
4452
          raise errors.OpPrereqError("This change will prevent the instance"
4453
                                     " from starting, due to %d MB of memory"
4454
                                     " missing on its primary node" % miss_mem)
4455

    
4456
      for node in instance.secondary_nodes:
4457
        if node not in nodeinfo or not isinstance(nodeinfo[node], dict):
4458
          self.warn.append("Can't get info from secondary node %s" % node)
4459
        elif self.mem > nodeinfo[node]['memory_free']:
4460
          self.warn.append("Not enough memory to failover instance to secondary"
4461
                           " node %s" % node)
4462

    
4463
    # Xen HVM device type checks
4464
    if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
4465
      if self.op.hvm_nic_type is not None:
4466
        if self.op.hvm_nic_type not in constants.HT_HVM_VALID_NIC_TYPES:
4467
          raise errors.OpPrereqError("Invalid NIC type %s specified for Xen"
4468
                                     " HVM  hypervisor" % self.op.hvm_nic_type)
4469
      if self.op.hvm_disk_type is not None:
4470
        if self.op.hvm_disk_type not in constants.HT_HVM_VALID_DISK_TYPES:
4471
          raise errors.OpPrereqError("Invalid disk type %s specified for Xen"
4472
                                     " HVM hypervisor" % self.op.hvm_disk_type)
4473

    
4474
    return
4475

    
4476
  def Exec(self, feedback_fn):
4477
    """Modifies an instance.
4478

4479
    All parameters take effect only at the next restart of the instance.
4480
    """
4481
    # Process here the warnings from CheckPrereq, as we don't have a
4482
    # feedback_fn there.
4483
    for warn in self.warn:
4484
      feedback_fn("WARNING: %s" % warn)
4485

    
4486
    result = []
4487
    instance = self.instance
4488
    if self.mem:
4489
      instance.memory = self.mem
4490
      result.append(("mem", self.mem))
4491
    if self.vcpus:
4492
      instance.vcpus = self.vcpus
4493
      result.append(("vcpus",  self.vcpus))
4494
    if self.do_ip:
4495
      instance.nics[0].ip = self.ip
4496
      result.append(("ip", self.ip))
4497
    if self.bridge:
4498
      instance.nics[0].bridge = self.bridge
4499
      result.append(("bridge", self.bridge))
4500
    if self.mac:
4501
      instance.nics[0].mac = self.mac
4502
      result.append(("mac", self.mac))
4503
    if self.do_kernel_path:
4504
      instance.kernel_path = self.kernel_path
4505
      result.append(("kernel_path", self.kernel_path))
4506
    if self.do_initrd_path:
4507
      instance.initrd_path = self.initrd_path
4508
      result.append(("initrd_path", self.initrd_path))
4509
    if self.hvm_boot_order:
4510
      if self.hvm_boot_order == constants.VALUE_DEFAULT:
4511
        instance.hvm_boot_order = None
4512
      else:
4513
        instance.hvm_boot_order = self.hvm_boot_order
4514
      result.append(("hvm_boot_order", self.hvm_boot_order))
4515
    if self.hvm_acpi is not None:
4516
      instance.hvm_acpi = self.hvm_acpi
4517
      result.append(("hvm_acpi", self.hvm_acpi))
4518
    if self.hvm_pae is not None:
4519
      instance.hvm_pae = self.hvm_pae
4520
      result.append(("hvm_pae", self.hvm_pae))
4521
    if self.hvm_nic_type is not None:
4522
      instance.hvm_nic_type = self.hvm_nic_type
4523
      result.append(("hvm_nic_type", self.hvm_nic_type))
4524
    if self.hvm_disk_type is not None:
4525
      instance.hvm_disk_type = self.hvm_disk_type
4526
      result.append(("hvm_disk_type", self.hvm_disk_type))
4527
    if self.hvm_cdrom_image_path:
4528
      if self.hvm_cdrom_image_path == constants.VALUE_NONE:
4529
        instance.hvm_cdrom_image_path = None
4530
      else:
4531
        instance.hvm_cdrom_image_path = self.hvm_cdrom_image_path
4532
      result.append(("hvm_cdrom_image_path", self.hvm_cdrom_image_path))
4533
    if self.vnc_bind_address:
4534
      instance.vnc_bind_address = self.vnc_bind_address
4535
      result.append(("vnc_bind_address", self.vnc_bind_address))
4536

    
4537
    self.cfg.Update(instance)
4538

    
4539
    return result
4540

    
4541

    
4542
class LUQueryExports(NoHooksLU):
4543
  """Query the exports list
4544

4545
  """
4546
  _OP_REQP = ['nodes']
4547
  REQ_BGL = False
4548

    
4549
  def ExpandNames(self):
4550
    self.needed_locks = {}
4551
    self.share_locks[locking.LEVEL_NODE] = 1
4552
    if not self.op.nodes:
4553
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4554
    else:
4555
      self.needed_locks[locking.LEVEL_NODE] = \
4556
        _GetWantedNodes(self, self.op.nodes)
4557

    
4558
  def CheckPrereq(self):
4559
    """Check prerequisites.
4560

4561
    """
4562
    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
4563

    
4564
  def Exec(self, feedback_fn):
4565
    """Compute the list of all the exported system images.
4566

4567
    Returns:
4568
      a dictionary with the structure node->(export-list)
4569
      where export-list is a list of the instances exported on
4570
      that node.
4571

4572
    """
4573
    return rpc.call_export_list(self.nodes)
4574

    
4575

    
4576
class LUExportInstance(LogicalUnit):
4577
  """Export an instance to an image in the cluster.
4578

4579
  """
4580
  HPATH = "instance-export"
4581
  HTYPE = constants.HTYPE_INSTANCE
4582
  _OP_REQP = ["instance_name", "target_node", "shutdown"]
4583
  REQ_BGL = False
4584

    
4585
  def ExpandNames(self):
4586
    self._ExpandAndLockInstance()
4587
    # FIXME: lock only instance primary and destination node
4588
    #
4589
    # Sad but true, for now we have do lock all nodes, as we don't know where
4590
    # the previous export might be, and and in this LU we search for it and
4591
    # remove it from its current node. In the future we could fix this by:
4592
    #  - making a tasklet to search (share-lock all), then create the new one,
4593
    #    then one to remove, after
4594
    #  - removing the removal operation altoghether
4595
    self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4596

    
4597
  def DeclareLocks(self, level):
4598
    """Last minute lock declaration."""
4599
    # All nodes are locked anyway, so nothing to do here.
4600

    
4601
  def BuildHooksEnv(self):
4602
    """Build hooks env.
4603

4604
    This will run on the master, primary node and target node.
4605

4606
    """
4607
    env = {
4608
      "EXPORT_NODE": self.op.target_node,
4609
      "EXPORT_DO_SHUTDOWN": self.op.shutdown,
4610
      }
4611
    env.update(_BuildInstanceHookEnvByObject(self.instance))
4612
    nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
4613
          self.op.target_node]
4614
    return env, nl, nl
4615

    
4616
  def CheckPrereq(self):
4617
    """Check prerequisites.
4618

4619
    This checks that the instance and node names are valid.
4620

4621
    """
4622
    instance_name = self.op.instance_name
4623
    self.instance = self.cfg.GetInstanceInfo(instance_name)
4624
    assert self.instance is not None, \
4625
          "Cannot retrieve locked instance %s" % self.op.instance_name
4626

    
4627
    self.dst_node = self.cfg.GetNodeInfo(
4628
      self.cfg.ExpandNodeName(self.op.target_node))
4629

    
4630
    assert self.dst_node is not None, \
4631
          "Cannot retrieve locked node %s" % self.op.target_node
4632

    
4633
    # instance disk type verification
4634
    for disk in self.instance.disks:
4635
      if disk.dev_type == constants.LD_FILE:
4636
        raise errors.OpPrereqError("Export not supported for instances with"
4637
                                   " file-based disks")
4638

    
4639
  def Exec(self, feedback_fn):
4640
    """Export an instance to an image in the cluster.
4641

4642
    """
4643
    instance = self.instance
4644
    dst_node = self.dst_node
4645
    src_node = instance.primary_node
4646
    if self.op.shutdown:
4647
      # shutdown the instance, but not the disks
4648
      if not rpc.call_instance_shutdown(src_node, instance):
4649
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
4650
                                 (instance.name, src_node))
4651

    
4652
    vgname = self.cfg.GetVGName()
4653

    
4654
    snap_disks = []
4655

    
4656
    try:
4657
      for disk in instance.disks:
4658
        if disk.iv_name == "sda":
4659
          # new_dev_name will be a snapshot of an lvm leaf of the one we passed
4660
          new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
4661

    
4662
          if not new_dev_name:
4663
            logger.Error("could not snapshot block device %s on node %s" %
4664
                         (disk.logical_id[1], src_node))
4665
          else:
4666
            new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
4667
                                      logical_id=(vgname, new_dev_name),
4668
                                      physical_id=(vgname, new_dev_name),
4669
                                      iv_name=disk.iv_name)
4670
            snap_disks.append(new_dev)
4671

    
4672
    finally:
4673
      if self.op.shutdown and instance.status == "up":
4674
        if not rpc.call_instance_start(src_node, instance, None):
4675
          _ShutdownInstanceDisks(instance, self.cfg)
4676
          raise errors.OpExecError("Could not start instance")
4677

    
4678
    # TODO: check for size
4679

    
4680
    for dev in snap_disks:
4681
      if not rpc.call_snapshot_export(src_node, dev, dst_node.name, instance):
4682
        logger.Error("could not export block device %s from node %s to node %s"
4683
                     % (dev.logical_id[1], src_node, dst_node.name))
4684
      if not rpc.call_blockdev_remove(src_node, dev):
4685
        logger.Error("could not remove snapshot block device %s from node %s" %
4686
                     (dev.logical_id[1], src_node))
4687

    
4688
    if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
4689
      logger.Error("could not finalize export for instance %s on node %s" %
4690
                   (instance.name, dst_node.name))
4691

    
4692
    nodelist = self.cfg.GetNodeList()
4693
    nodelist.remove(dst_node.name)
4694

    
4695
    # on one-node clusters nodelist will be empty after the removal
4696
    # if we proceed the backup would be removed because OpQueryExports
4697
    # substitutes an empty list with the full cluster node list.
4698
    if nodelist:
4699
      exportlist = rpc.call_export_list(nodelist)
4700
      for node in exportlist:
4701
        if instance.name in exportlist[node]:
4702
          if not rpc.call_export_remove(node, instance.name):
4703
            logger.Error("could not remove older export for instance %s"
4704
                         " on node %s" % (instance.name, node))
4705

    
4706

    
4707
class LURemoveExport(NoHooksLU):
4708
  """Remove exports related to the named instance.
4709

4710
  """
4711
  _OP_REQP = ["instance_name"]
4712

    
4713
  def CheckPrereq(self):
4714
    """Check prerequisites.
4715
    """
4716
    pass
4717

    
4718
  def Exec(self, feedback_fn):
4719
    """Remove any export.
4720

4721
    """
4722
    instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4723
    # If the instance was not found we'll try with the name that was passed in.
4724
    # This will only work if it was an FQDN, though.
4725
    fqdn_warn = False
4726
    if not instance_name:
4727
      fqdn_warn = True
4728
      instance_name = self.op.instance_name
4729

    
4730
    exportlist = rpc.call_export_list(self.cfg.GetNodeList())
4731
    found = False
4732
    for node in exportlist:
4733
      if instance_name in exportlist[node]:
4734
        found = True
4735
        if not rpc.call_export_remove(node, instance_name):
4736
          logger.Error("could not remove export for instance %s"
4737
                       " on node %s" % (instance_name, node))
4738

    
4739
    if fqdn_warn and not found:
4740
      feedback_fn("Export not found. If trying to remove an export belonging"
4741
                  " to a deleted instance please use its Fully Qualified"
4742
                  " Domain Name.")
4743

    
4744

    
4745
class TagsLU(NoHooksLU):
4746
  """Generic tags LU.
4747

4748
  This is an abstract class which is the parent of all the other tags LUs.
4749

4750
  """
4751
  def CheckPrereq(self):
4752
    """Check prerequisites.
4753

4754
    """
4755
    if self.op.kind == constants.TAG_CLUSTER:
4756
      self.target = self.cfg.GetClusterInfo()
4757
    elif self.op.kind == constants.TAG_NODE:
4758
      name = self.cfg.ExpandNodeName(self.op.name)
4759
      if name is None:
4760
        raise errors.OpPrereqError("Invalid node name (%s)" %
4761
                                   (self.op.name,))
4762
      self.op.name = name
4763
      self.target = self.cfg.GetNodeInfo(name)
4764
    elif self.op.kind == constants.TAG_INSTANCE:
4765
      name = self.cfg.ExpandInstanceName(self.op.name)
4766
      if name is None:
4767
        raise errors.OpPrereqError("Invalid instance name (%s)" %
4768
                                   (self.op.name,))
4769
      self.op.name = name
4770
      self.target = self.cfg.GetInstanceInfo(name)
4771
    else:
4772
      raise errors.OpPrereqError("Wrong tag type requested (%s)" %
4773
                                 str(self.op.kind))
4774

    
4775

    
4776
class LUGetTags(TagsLU):
4777
  """Returns the tags of a given object.
4778

4779
  """
4780
  _OP_REQP = ["kind", "name"]
4781

    
4782
  def Exec(self, feedback_fn):
4783
    """Returns the tag list.
4784

4785
    """
4786
    return list(self.target.GetTags())
4787

    
4788

    
4789
class LUSearchTags(NoHooksLU):
4790
  """Searches the tags for a given pattern.
4791

4792
  """
4793
  _OP_REQP = ["pattern"]
4794

    
4795
  def CheckPrereq(self):
4796
    """Check prerequisites.
4797

4798
    This checks the pattern passed for validity by compiling it.
4799

4800
    """
4801
    try:
4802
      self.re = re.compile(self.op.pattern)
4803
    except re.error, err:
4804
      raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
4805
                                 (self.op.pattern, err))
4806

    
4807
  def Exec(self, feedback_fn):
4808
    """Returns the tag list.
4809

4810
    """
4811
    cfg = self.cfg
4812
    tgts = [("/cluster", cfg.GetClusterInfo())]
4813
    ilist = [cfg.GetInstanceInfo(name) for name in cfg.GetInstanceList()]
4814
    tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
4815
    nlist = [cfg.GetNodeInfo(name) for name in cfg.GetNodeList()]
4816
    tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
4817
    results = []
4818
    for path, target in tgts:
4819
      for tag in target.GetTags():
4820
        if self.re.search(tag):
4821
          results.append((path, tag))
4822
    return results
4823

    
4824

    
4825
class LUAddTags(TagsLU):
4826
  """Sets a tag on a given object.
4827

4828
  """
4829
  _OP_REQP = ["kind", "name", "tags"]
4830

    
4831
  def CheckPrereq(self):
4832
    """Check prerequisites.
4833

4834
    This checks the type and length of the tag name and value.
4835

4836
    """
4837
    TagsLU.CheckPrereq(self)
4838
    for tag in self.op.tags:
4839
      objects.TaggableObject.ValidateTag(tag)
4840

    
4841
  def Exec(self, feedback_fn):
4842
    """Sets the tag.
4843

4844
    """
4845
    try:
4846
      for tag in self.op.tags:
4847
        self.target.AddTag(tag)
4848
    except errors.TagError, err:
4849
      raise errors.OpExecError("Error while setting tag: %s" % str(err))
4850
    try:
4851
      self.cfg.Update(self.target)
4852
    except errors.ConfigurationError:
4853
      raise errors.OpRetryError("There has been a modification to the"
4854
                                " config file and the operation has been"
4855
                                " aborted. Please retry.")
4856

    
4857

    
4858
class LUDelTags(TagsLU):
4859
  """Delete a list of tags from a given object.
4860

4861
  """
4862
  _OP_REQP = ["kind", "name", "tags"]
4863

    
4864
  def CheckPrereq(self):
4865
    """Check prerequisites.
4866

4867
    This checks that we have the given tag.
4868

4869
    """
4870
    TagsLU.CheckPrereq(self)
4871
    for tag in self.op.tags:
4872
      objects.TaggableObject.ValidateTag(tag)
4873
    del_tags = frozenset(self.op.tags)
4874
    cur_tags = self.target.GetTags()
4875
    if not del_tags <= cur_tags:
4876
      diff_tags = del_tags - cur_tags
4877
      diff_names = ["'%s'" % tag for tag in diff_tags]
4878
      diff_names.sort()
4879
      raise errors.OpPrereqError("Tag(s) %s not found" %
4880
                                 (",".join(diff_names)))
4881

    
4882
  def Exec(self, feedback_fn):
4883
    """Remove the tag from the object.
4884

4885
    """
4886
    for tag in self.op.tags:
4887
      self.target.RemoveTag(tag)
4888
    try:
4889
      self.cfg.Update(self.target)
4890
    except errors.ConfigurationError:
4891
      raise errors.OpRetryError("There has been a modification to the"
4892
                                " config file and the operation has been"
4893
                                " aborted. Please retry.")
4894

    
4895

    
4896
class LUTestDelay(NoHooksLU):
4897
  """Sleep for a specified amount of time.
4898

4899
  This LU sleeps on the master and/or nodes for a specified amount of
4900
  time.
4901

4902
  """
4903
  _OP_REQP = ["duration", "on_master", "on_nodes"]
4904
  REQ_BGL = False
4905

    
4906
  def ExpandNames(self):
4907
    """Expand names and set required locks.
4908

4909
    This expands the node list, if any.
4910

4911
    """
4912
    self.needed_locks = {}
4913
    if self.op.on_nodes:
4914
      # _GetWantedNodes can be used here, but is not always appropriate to use
4915
      # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
4916
      # more information.
4917
      self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
4918
      self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
4919

    
4920
  def CheckPrereq(self):
4921
    """Check prerequisites.
4922

4923
    """
4924

    
4925
  def Exec(self, feedback_fn):
4926
    """Do the actual sleep.
4927

4928
    """
4929
    if self.op.on_master:
4930
      if not utils.TestDelay(self.op.duration):
4931
        raise errors.OpExecError("Error during master delay test")
4932
    if self.op.on_nodes:
4933
      result = rpc.call_test_delay(self.op.on_nodes, self.op.duration)
4934
      if not result:
4935
        raise errors.OpExecError("Complete failure from rpc call")
4936
      for node, node_result in result.items():
4937
        if not node_result:
4938
          raise errors.OpExecError("Failure during rpc call to node %s,"
4939
                                   " result: %s" % (node, node_result))
4940

    
4941

    
4942
class IAllocator(object):
4943
  """IAllocator framework.
4944

4945
  An IAllocator instance has three sets of attributes:
4946
    - cfg/sstore that are needed to query the cluster
4947
    - input data (all members of the _KEYS class attribute are required)
4948
    - four buffer attributes (in|out_data|text), that represent the
4949
      input (to the external script) in text and data structure format,
4950
      and the output from it, again in two formats
4951
    - the result variables from the script (success, info, nodes) for
4952
      easy usage
4953

4954
  """
4955
  _ALLO_KEYS = [
4956
    "mem_size", "disks", "disk_template",
4957
    "os", "tags", "nics", "vcpus",
4958
    ]
4959
  _RELO_KEYS = [
4960
    "relocate_from",
4961
    ]
4962

    
4963
  def __init__(self, cfg, sstore, mode, name, **kwargs):
4964
    self.cfg = cfg
4965
    self.sstore = sstore
4966
    # init buffer variables
4967
    self.in_text = self.out_text = self.in_data = self.out_data = None
4968
    # init all input fields so that pylint is happy
4969
    self.mode = mode
4970
    self.name = name
4971
    self.mem_size = self.disks = self.disk_template = None
4972
    self.os = self.tags = self.nics = self.vcpus = None
4973
    self.relocate_from = None
4974
    # computed fields
4975
    self.required_nodes = None
4976
    # init result fields
4977
    self.success = self.info = self.nodes = None
4978
    if self.mode == constants.IALLOCATOR_MODE_ALLOC:
4979
      keyset = self._ALLO_KEYS
4980
    elif self.mode == constants.IALLOCATOR_MODE_RELOC:
4981
      keyset = self._RELO_KEYS
4982
    else:
4983
      raise errors.ProgrammerError("Unknown mode '%s' passed to the"
4984
                                   " IAllocator" % self.mode)
4985
    for key in kwargs:
4986
      if key not in keyset:
4987
        raise errors.ProgrammerError("Invalid input parameter '%s' to"
4988
                                     " IAllocator" % key)
4989
      setattr(self, key, kwargs[key])
4990
    for key in keyset:
4991
      if key not in kwargs:
4992
        raise errors.ProgrammerError("Missing input parameter '%s' to"
4993
                                     " IAllocator" % key)
4994
    self._BuildInputData()
4995

    
4996
  def _ComputeClusterData(self):
4997
    """Compute the generic allocator input data.
4998

4999
    This is the data that is independent of the actual operation.
5000

5001
    """
5002
    cfg = self.cfg
5003
    # cluster data
5004
    data = {
5005
      "version": 1,
5006
      "cluster_name": self.sstore.GetClusterName(),
5007
      "cluster_tags": list(cfg.GetClusterInfo().GetTags()),
5008
      "hypervisor_type": self.sstore.GetHypervisorType(),
5009
      # we don't have job IDs
5010
      }
5011

    
5012
    i_list = [cfg.GetInstanceInfo(iname) for iname in cfg.GetInstanceList()]
5013

    
5014
    # node data
5015
    node_results = {}
5016
    node_list = cfg.GetNodeList()
5017
    node_data = rpc.call_node_info(node_list, cfg.GetVGName())
5018
    for nname in node_list:
5019
      ninfo = cfg.GetNodeInfo(nname)
5020
      if nname not in node_data or not isinstance(node_data[nname], dict):
5021
        raise errors.OpExecError("Can't get data for node %s" % nname)
5022
      remote_info = node_data[nname]
5023
      for attr in ['memory_total', 'memory_free', 'memory_dom0',
5024
                   'vg_size', 'vg_free', 'cpu_total']:
5025
        if attr not in remote_info:
5026
          raise errors.OpExecError("Node '%s' didn't return attribute '%s'" %
5027
                                   (nname, attr))
5028
        try:
5029
          remote_info[attr] = int(remote_info[attr])
5030
        except ValueError, err:
5031
          raise errors.OpExecError("Node '%s' returned invalid value for '%s':"
5032
                                   " %s" % (nname, attr, str(err)))
5033
      # compute memory used by primary instances
5034
      i_p_mem = i_p_up_mem = 0
5035
      for iinfo in i_list:
5036
        if iinfo.primary_node == nname:
5037
          i_p_mem += iinfo.memory
5038
          if iinfo.status == "up":
5039
            i_p_up_mem += iinfo.memory
5040

    
5041
      # compute memory used by instances
5042
      pnr = {
5043
        "tags": list(ninfo.GetTags()),
5044
        "total_memory": remote_info['memory_total'],
5045
        "reserved_memory": remote_info['memory_dom0'],
5046
        "free_memory": remote_info['memory_free'],
5047
        "i_pri_memory": i_p_mem,
5048
        "i_pri_up_memory": i_p_up_mem,
5049
        "total_disk": remote_info['vg_size'],
5050
        "free_disk": remote_info['vg_free'],
5051
        "primary_ip": ninfo.primary_ip,
5052
        "secondary_ip": ninfo.secondary_ip,
5053
        "total_cpus": remote_info['cpu_total'],
5054
        }
5055
      node_results[nname] = pnr
5056
    data["nodes"] = node_results
5057

    
5058
    # instance data
5059
    instance_data = {}
5060
    for iinfo in i_list:
5061
      nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
5062
                  for n in iinfo.nics]
5063
      pir = {
5064
        "tags": list(iinfo.GetTags()),
5065
        "should_run": iinfo.status == "up",
5066
        "vcpus": iinfo.vcpus,
5067
        "memory": iinfo.memory,
5068
        "os": iinfo.os,
5069
        "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
5070
        "nics": nic_data,
5071
        "disks": [{"size": dsk.size, "mode": "w"} for dsk in iinfo.disks],
5072
        "disk_template": iinfo.disk_template,
5073
        }
5074
      instance_data[iinfo.name] = pir
5075

    
5076
    data["instances"] = instance_data
5077

    
5078
    self.in_data = data
5079

    
5080
  def _AddNewInstance(self):
5081
    """Add new instance data to allocator structure.
5082

5083
    This in combination with _AllocatorGetClusterData will create the
5084
    correct structure needed as input for the allocator.
5085

5086
    The checks for the completeness of the opcode must have already been
5087
    done.
5088

5089
    """
5090
    data = self.in_data
5091
    if len(self.disks) != 2:
5092
      raise errors.OpExecError("Only two-disk configurations supported")
5093

    
5094
    disk_space = _ComputeDiskSize(self.disk_template,
5095
                                  self.disks[0]["size"], self.disks[1]["size"])
5096

    
5097
    if self.disk_template in constants.DTS_NET_MIRROR:
5098
      self.required_nodes = 2
5099
    else:
5100
      self.required_nodes = 1
5101
    request = {
5102
      "type": "allocate",
5103
      "name": self.name,
5104
      "disk_template": self.disk_template,
5105
      "tags": self.tags,
5106
      "os": self.os,
5107
      "vcpus": self.vcpus,
5108
      "memory": self.mem_size,
5109
      "disks": self.disks,
5110
      "disk_space_total": disk_space,
5111
      "nics": self.nics,
5112
      "required_nodes": self.required_nodes,
5113
      }
5114
    data["request"] = request
5115

    
5116
  def _AddRelocateInstance(self):
5117
    """Add relocate instance data to allocator structure.
5118

5119
    This in combination with _IAllocatorGetClusterData will create the
5120
    correct structure needed as input for the allocator.
5121

5122
    The checks for the completeness of the opcode must have already been
5123
    done.
5124

5125
    """
5126
    instance = self.cfg.GetInstanceInfo(self.name)
5127
    if instance is None:
5128
      raise errors.ProgrammerError("Unknown instance '%s' passed to"
5129
                                   " IAllocator" % self.name)
5130

    
5131
    if instance.disk_template not in constants.DTS_NET_MIRROR:
5132
      raise errors.OpPrereqError("Can't relocate non-mirrored instances")
5133

    
5134
    if len(instance.secondary_nodes) != 1:
5135
      raise errors.OpPrereqError("Instance has not exactly one secondary node")
5136

    
5137
    self.required_nodes = 1
5138

    
5139
    disk_space = _ComputeDiskSize(instance.disk_template,
5140
                                  instance.disks[0].size,
5141
                                  instance.disks[1].size)
5142

    
5143
    request = {
5144
      "type": "relocate",
5145
      "name": self.name,
5146
      "disk_space_total": disk_space,
5147
      "required_nodes": self.required_nodes,
5148
      "relocate_from": self.relocate_from,
5149
      }
5150
    self.in_data["request"] = request
5151

    
5152
  def _BuildInputData(self):
5153
    """Build input data structures.
5154

5155
    """
5156
    self._ComputeClusterData()
5157

    
5158
    if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5159
      self._AddNewInstance()
5160
    else:
5161
      self._AddRelocateInstance()
5162

    
5163
    self.in_text = serializer.Dump(self.in_data)
5164

    
5165
  def Run(self, name, validate=True, call_fn=rpc.call_iallocator_runner):
5166
    """Run an instance allocator and return the results.
5167

5168
    """
5169
    data = self.in_text
5170

    
5171
    result = call_fn(self.sstore.GetMasterNode(), name, self.in_text)
5172

    
5173
    if not isinstance(result, (list, tuple)) or len(result) != 4:
5174
      raise errors.OpExecError("Invalid result from master iallocator runner")
5175

    
5176
    rcode, stdout, stderr, fail = result
5177

    
5178
    if rcode == constants.IARUN_NOTFOUND:
5179
      raise errors.OpExecError("Can't find allocator '%s'" % name)
5180
    elif rcode == constants.IARUN_FAILURE:
5181
      raise errors.OpExecError("Instance allocator call failed: %s,"
5182
                               " output: %s" % (fail, stdout+stderr))
5183
    self.out_text = stdout
5184
    if validate:
5185
      self._ValidateResult()
5186

    
5187
  def _ValidateResult(self):
5188
    """Process the allocator results.
5189

5190
    This will process and if successful save the result in
5191
    self.out_data and the other parameters.
5192

5193
    """
5194
    try:
5195
      rdict = serializer.Load(self.out_text)
5196
    except Exception, err:
5197
      raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
5198

    
5199
    if not isinstance(rdict, dict):
5200
      raise errors.OpExecError("Can't parse iallocator results: not a dict")
5201

    
5202
    for key in "success", "info", "nodes":
5203
      if key not in rdict:
5204
        raise errors.OpExecError("Can't parse iallocator results:"
5205
                                 " missing key '%s'" % key)
5206
      setattr(self, key, rdict[key])
5207

    
5208
    if not isinstance(rdict["nodes"], list):
5209
      raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
5210
                               " is not a list")
5211
    self.out_data = rdict
5212

    
5213

    
5214
class LUTestAllocator(NoHooksLU):
5215
  """Run allocator tests.
5216

5217
  This LU runs the allocator tests
5218

5219
  """
5220
  _OP_REQP = ["direction", "mode", "name"]
5221

    
5222
  def CheckPrereq(self):
5223
    """Check prerequisites.
5224

5225
    This checks the opcode parameters depending on the director and mode test.
5226

5227
    """
5228
    if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5229
      for attr in ["name", "mem_size", "disks", "disk_template",
5230
                   "os", "tags", "nics", "vcpus"]:
5231
        if not hasattr(self.op, attr):
5232
          raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
5233
                                     attr)
5234
      iname = self.cfg.ExpandInstanceName(self.op.name)
5235
      if iname is not None:
5236
        raise errors.OpPrereqError("Instance '%s' already in the cluster" %
5237
                                   iname)
5238
      if not isinstance(self.op.nics, list):
5239
        raise errors.OpPrereqError("Invalid parameter 'nics'")
5240
      for row in self.op.nics:
5241
        if (not isinstance(row, dict) or
5242
            "mac" not in row or
5243
            "ip" not in row or
5244
            "bridge" not in row):
5245
          raise errors.OpPrereqError("Invalid contents of the"
5246
                                     " 'nics' parameter")
5247
      if not isinstance(self.op.disks, list):
5248
        raise errors.OpPrereqError("Invalid parameter 'disks'")
5249
      if len(self.op.disks) != 2:
5250
        raise errors.OpPrereqError("Only two-disk configurations supported")
5251
      for row in self.op.disks:
5252
        if (not isinstance(row, dict) or
5253
            "size" not in row or
5254
            not isinstance(row["size"], int) or
5255
            "mode" not in row or
5256
            row["mode"] not in ['r', 'w']):
5257
          raise errors.OpPrereqError("Invalid contents of the"
5258
                                     " 'disks' parameter")
5259
    elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
5260
      if not hasattr(self.op, "name"):
5261
        raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
5262
      fname = self.cfg.ExpandInstanceName(self.op.name)
5263
      if fname is None:
5264
        raise errors.OpPrereqError("Instance '%s' not found for relocation" %
5265
                                   self.op.name)
5266
      self.op.name = fname
5267
      self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
5268
    else:
5269
      raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
5270
                                 self.op.mode)
5271

    
5272
    if self.op.direction == constants.IALLOCATOR_DIR_OUT:
5273
      if not hasattr(self.op, "allocator") or self.op.allocator is None:
5274
        raise errors.OpPrereqError("Missing allocator name")
5275
    elif self.op.direction != constants.IALLOCATOR_DIR_IN:
5276
      raise errors.OpPrereqError("Wrong allocator test '%s'" %
5277
                                 self.op.direction)
5278

    
5279
  def Exec(self, feedback_fn):
5280
    """Run the allocator test.
5281

5282
    """
5283
    if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5284
      ial = IAllocator(self.cfg, self.sstore,
5285
                       mode=self.op.mode,
5286
                       name=self.op.name,
5287
                       mem_size=self.op.mem_size,
5288
                       disks=self.op.disks,
5289
                       disk_template=self.op.disk_template,
5290
                       os=self.op.os,
5291
                       tags=self.op.tags,
5292
                       nics=self.op.nics,
5293
                       vcpus=self.op.vcpus,
5294
                       )
5295
    else:
5296
      ial = IAllocator(self.cfg, self.sstore,
5297
                       mode=self.op.mode,
5298
                       name=self.op.name,
5299
                       relocate_from=list(self.relocate_from),
5300
                       )
5301

    
5302
    if self.op.direction == constants.IALLOCATOR_DIR_IN:
5303
      result = ial.in_text
5304
    else:
5305
      ial.Run(self.op.allocator, validate=False)
5306
      result = ial.out_text
5307
    return result