Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ b6fdf8b8

History | View | Annotate | Download (195.7 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0613,W0201
25

    
26
import os
27
import os.path
28
import sha
29
import time
30
import tempfile
31
import re
32
import platform
33
import logging
34
import copy
35

    
36
from ganeti import ssh
37
from ganeti import utils
38
from ganeti import errors
39
from ganeti import hypervisor
40
from ganeti import locking
41
from ganeti import constants
42
from ganeti import objects
43
from ganeti import opcodes
44
from ganeti import serializer
45

    
46

    
47
class LogicalUnit(object):
48
  """Logical Unit base class.
49

50
  Subclasses must follow these rules:
51
    - implement ExpandNames
52
    - implement CheckPrereq
53
    - implement Exec
54
    - implement BuildHooksEnv
55
    - redefine HPATH and HTYPE
56
    - optionally redefine their run requirements:
57
        REQ_MASTER: the LU needs to run on the master node
58
        REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
59

60
  Note that all commands require root permissions.
61

62
  """
63
  HPATH = None
64
  HTYPE = None
65
  _OP_REQP = []
66
  REQ_MASTER = True
67
  REQ_BGL = True
68

    
69
  def __init__(self, processor, op, context, rpc):
70
    """Constructor for LogicalUnit.
71

72
    This needs to be overriden in derived classes in order to check op
73
    validity.
74

75
    """
76
    self.proc = processor
77
    self.op = op
78
    self.cfg = context.cfg
79
    self.context = context
80
    self.rpc = rpc
81
    # Dicts used to declare locking needs to mcpu
82
    self.needed_locks = None
83
    self.acquired_locks = {}
84
    self.share_locks = dict(((i, 0) for i in locking.LEVELS))
85
    self.add_locks = {}
86
    self.remove_locks = {}
87
    # Used to force good behavior when calling helper functions
88
    self.recalculate_locks = {}
89
    self.__ssh = None
90
    # logging
91
    self.LogWarning = processor.LogWarning
92
    self.LogInfo = processor.LogInfo
93

    
94
    for attr_name in self._OP_REQP:
95
      attr_val = getattr(op, attr_name, None)
96
      if attr_val is None:
97
        raise errors.OpPrereqError("Required parameter '%s' missing" %
98
                                   attr_name)
99

    
100
    if not self.cfg.IsCluster():
101
      raise errors.OpPrereqError("Cluster not initialized yet,"
102
                                 " use 'gnt-cluster init' first.")
103
    if self.REQ_MASTER:
104
      master = self.cfg.GetMasterNode()
105
      if master != utils.HostInfo().name:
106
        raise errors.OpPrereqError("Commands must be run on the master"
107
                                   " node %s" % master)
108

    
109
  def __GetSSH(self):
110
    """Returns the SshRunner object
111

112
    """
113
    if not self.__ssh:
114
      self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
115
    return self.__ssh
116

    
117
  ssh = property(fget=__GetSSH)
118

    
119
  def ExpandNames(self):
120
    """Expand names for this LU.
121

122
    This method is called before starting to execute the opcode, and it should
123
    update all the parameters of the opcode to their canonical form (e.g. a
124
    short node name must be fully expanded after this method has successfully
125
    completed). This way locking, hooks, logging, ecc. can work correctly.
126

127
    LUs which implement this method must also populate the self.needed_locks
128
    member, as a dict with lock levels as keys, and a list of needed lock names
129
    as values. Rules:
130

131
      - use an empty dict if you don't need any lock
132
      - if you don't need any lock at a particular level omit that level
133
      - don't put anything for the BGL level
134
      - if you want all locks at a level use locking.ALL_SET as a value
135

136
    If you need to share locks (rather than acquire them exclusively) at one
137
    level you can modify self.share_locks, setting a true value (usually 1) for
138
    that level. By default locks are not shared.
139

140
    Examples::
141

142
      # Acquire all nodes and one instance
143
      self.needed_locks = {
144
        locking.LEVEL_NODE: locking.ALL_SET,
145
        locking.LEVEL_INSTANCE: ['instance1.example.tld'],
146
      }
147
      # Acquire just two nodes
148
      self.needed_locks = {
149
        locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
150
      }
151
      # Acquire no locks
152
      self.needed_locks = {} # No, you can't leave it to the default value None
153

154
    """
155
    # The implementation of this method is mandatory only if the new LU is
156
    # concurrent, so that old LUs don't need to be changed all at the same
157
    # time.
158
    if self.REQ_BGL:
159
      self.needed_locks = {} # Exclusive LUs don't need locks.
160
    else:
161
      raise NotImplementedError
162

    
163
  def DeclareLocks(self, level):
164
    """Declare LU locking needs for a level
165

166
    While most LUs can just declare their locking needs at ExpandNames time,
167
    sometimes there's the need to calculate some locks after having acquired
168
    the ones before. This function is called just before acquiring locks at a
169
    particular level, but after acquiring the ones at lower levels, and permits
170
    such calculations. It can be used to modify self.needed_locks, and by
171
    default it does nothing.
172

173
    This function is only called if you have something already set in
174
    self.needed_locks for the level.
175

176
    @param level: Locking level which is going to be locked
177
    @type level: member of ganeti.locking.LEVELS
178

179
    """
180

    
181
  def CheckPrereq(self):
182
    """Check prerequisites for this LU.
183

184
    This method should check that the prerequisites for the execution
185
    of this LU are fulfilled. It can do internode communication, but
186
    it should be idempotent - no cluster or system changes are
187
    allowed.
188

189
    The method should raise errors.OpPrereqError in case something is
190
    not fulfilled. Its return value is ignored.
191

192
    This method should also update all the parameters of the opcode to
193
    their canonical form if it hasn't been done by ExpandNames before.
194

195
    """
196
    raise NotImplementedError
197

    
198
  def Exec(self, feedback_fn):
199
    """Execute the LU.
200

201
    This method should implement the actual work. It should raise
202
    errors.OpExecError for failures that are somewhat dealt with in
203
    code, or expected.
204

205
    """
206
    raise NotImplementedError
207

    
208
  def BuildHooksEnv(self):
209
    """Build hooks environment for this LU.
210

211
    This method should return a three-node tuple consisting of: a dict
212
    containing the environment that will be used for running the
213
    specific hook for this LU, a list of node names on which the hook
214
    should run before the execution, and a list of node names on which
215
    the hook should run after the execution.
216

217
    The keys of the dict must not have 'GANETI_' prefixed as this will
218
    be handled in the hooks runner. Also note additional keys will be
219
    added by the hooks runner. If the LU doesn't define any
220
    environment, an empty dict (and not None) should be returned.
221

222
    No nodes should be returned as an empty list (and not None).
223

224
    Note that if the HPATH for a LU class is None, this function will
225
    not be called.
226

227
    """
228
    raise NotImplementedError
229

    
230
  def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
231
    """Notify the LU about the results of its hooks.
232

233
    This method is called every time a hooks phase is executed, and notifies
234
    the Logical Unit about the hooks' result. The LU can then use it to alter
235
    its result based on the hooks.  By default the method does nothing and the
236
    previous result is passed back unchanged but any LU can define it if it
237
    wants to use the local cluster hook-scripts somehow.
238

239
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
240
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
241
    @param hook_results: the results of the multi-node hooks rpc call
242
    @param feedback_fn: function used send feedback back to the caller
243
    @param lu_result: the previous Exec result this LU had, or None
244
        in the PRE phase
245
    @return: the new Exec result, based on the previous result
246
        and hook results
247

248
    """
249
    return lu_result
250

    
251
  def _ExpandAndLockInstance(self):
252
    """Helper function to expand and lock an instance.
253

254
    Many LUs that work on an instance take its name in self.op.instance_name
255
    and need to expand it and then declare the expanded name for locking. This
256
    function does it, and then updates self.op.instance_name to the expanded
257
    name. It also initializes needed_locks as a dict, if this hasn't been done
258
    before.
259

260
    """
261
    if self.needed_locks is None:
262
      self.needed_locks = {}
263
    else:
264
      assert locking.LEVEL_INSTANCE not in self.needed_locks, \
265
        "_ExpandAndLockInstance called with instance-level locks set"
266
    expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
267
    if expanded_name is None:
268
      raise errors.OpPrereqError("Instance '%s' not known" %
269
                                  self.op.instance_name)
270
    self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
271
    self.op.instance_name = expanded_name
272

    
273
  def _LockInstancesNodes(self, primary_only=False):
274
    """Helper function to declare instances' nodes for locking.
275

276
    This function should be called after locking one or more instances to lock
277
    their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
278
    with all primary or secondary nodes for instances already locked and
279
    present in self.needed_locks[locking.LEVEL_INSTANCE].
280

281
    It should be called from DeclareLocks, and for safety only works if
282
    self.recalculate_locks[locking.LEVEL_NODE] is set.
283

284
    In the future it may grow parameters to just lock some instance's nodes, or
285
    to just lock primaries or secondary nodes, if needed.
286

287
    If should be called in DeclareLocks in a way similar to::
288

289
      if level == locking.LEVEL_NODE:
290
        self._LockInstancesNodes()
291

292
    @type primary_only: boolean
293
    @param primary_only: only lock primary nodes of locked instances
294

295
    """
296
    assert locking.LEVEL_NODE in self.recalculate_locks, \
297
      "_LockInstancesNodes helper function called with no nodes to recalculate"
298

    
299
    # TODO: check if we're really been called with the instance locks held
300

    
301
    # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
302
    # future we might want to have different behaviors depending on the value
303
    # of self.recalculate_locks[locking.LEVEL_NODE]
304
    wanted_nodes = []
305
    for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
306
      instance = self.context.cfg.GetInstanceInfo(instance_name)
307
      wanted_nodes.append(instance.primary_node)
308
      if not primary_only:
309
        wanted_nodes.extend(instance.secondary_nodes)
310

    
311
    if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
312
      self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
313
    elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
314
      self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
315

    
316
    del self.recalculate_locks[locking.LEVEL_NODE]
317

    
318

    
319
class NoHooksLU(LogicalUnit):
320
  """Simple LU which runs no hooks.
321

322
  This LU is intended as a parent for other LogicalUnits which will
323
  run no hooks, in order to reduce duplicate code.
324

325
  """
326
  HPATH = None
327
  HTYPE = None
328

    
329

    
330
def _GetWantedNodes(lu, nodes):
331
  """Returns list of checked and expanded node names.
332

333
  @type lu: L{LogicalUnit}
334
  @param lu: the logical unit on whose behalf we execute
335
  @type nodes: list
336
  @param nodes: list of node names or None for all nodes
337
  @rtype: list
338
  @return: the list of nodes, sorted
339
  @raise errors.OpProgrammerError: if the nodes parameter is wrong type
340

341
  """
342
  if not isinstance(nodes, list):
343
    raise errors.OpPrereqError("Invalid argument type 'nodes'")
344

    
345
  if not nodes:
346
    raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
347
      " non-empty list of nodes whose name is to be expanded.")
348

    
349
  wanted = []
350
  for name in nodes:
351
    node = lu.cfg.ExpandNodeName(name)
352
    if node is None:
353
      raise errors.OpPrereqError("No such node name '%s'" % name)
354
    wanted.append(node)
355

    
356
  return utils.NiceSort(wanted)
357

    
358

    
359
def _GetWantedInstances(lu, instances):
360
  """Returns list of checked and expanded instance names.
361

362
  @type lu: L{LogicalUnit}
363
  @param lu: the logical unit on whose behalf we execute
364
  @type instances: list
365
  @param instances: list of instance names or None for all instances
366
  @rtype: list
367
  @return: the list of instances, sorted
368
  @raise errors.OpPrereqError: if the instances parameter is wrong type
369
  @raise errors.OpPrereqError: if any of the passed instances is not found
370

371
  """
372
  if not isinstance(instances, list):
373
    raise errors.OpPrereqError("Invalid argument type 'instances'")
374

    
375
  if instances:
376
    wanted = []
377

    
378
    for name in instances:
379
      instance = lu.cfg.ExpandInstanceName(name)
380
      if instance is None:
381
        raise errors.OpPrereqError("No such instance name '%s'" % name)
382
      wanted.append(instance)
383

    
384
  else:
385
    wanted = lu.cfg.GetInstanceList()
386
  return utils.NiceSort(wanted)
387

    
388

    
389
def _CheckOutputFields(static, dynamic, selected):
390
  """Checks whether all selected fields are valid.
391

392
  @type static: L{utils.FieldSet}
393
  @param static: static fields set
394
  @type dynamic: L{utils.FieldSet}
395
  @param dynamic: dynamic fields set
396

397
  """
398
  f = utils.FieldSet()
399
  f.Extend(static)
400
  f.Extend(dynamic)
401

    
402
  delta = f.NonMatching(selected)
403
  if delta:
404
    raise errors.OpPrereqError("Unknown output fields selected: %s"
405
                               % ",".join(delta))
406

    
407

    
408
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
409
                          memory, vcpus, nics):
410
  """Builds instance related env variables for hooks
411

412
  This builds the hook environment from individual variables.
413

414
  @type name: string
415
  @param name: the name of the instance
416
  @type primary_node: string
417
  @param primary_node: the name of the instance's primary node
418
  @type secondary_nodes: list
419
  @param secondary_nodes: list of secondary nodes as strings
420
  @type os_type: string
421
  @param os_type: the name of the instance's OS
422
  @type status: string
423
  @param status: the desired status of the instances
424
  @type memory: string
425
  @param memory: the memory size of the instance
426
  @type vcpus: string
427
  @param vcpus: the count of VCPUs the instance has
428
  @type nics: list
429
  @param nics: list of tuples (ip, bridge, mac) representing
430
      the NICs the instance  has
431
  @rtype: dict
432
  @return: the hook environment for this instance
433

434
  """
435
  env = {
436
    "OP_TARGET": name,
437
    "INSTANCE_NAME": name,
438
    "INSTANCE_PRIMARY": primary_node,
439
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
440
    "INSTANCE_OS_TYPE": os_type,
441
    "INSTANCE_STATUS": status,
442
    "INSTANCE_MEMORY": memory,
443
    "INSTANCE_VCPUS": vcpus,
444
  }
445

    
446
  if nics:
447
    nic_count = len(nics)
448
    for idx, (ip, bridge, mac) in enumerate(nics):
449
      if ip is None:
450
        ip = ""
451
      env["INSTANCE_NIC%d_IP" % idx] = ip
452
      env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
453
      env["INSTANCE_NIC%d_HWADDR" % idx] = mac
454
  else:
455
    nic_count = 0
456

    
457
  env["INSTANCE_NIC_COUNT"] = nic_count
458

    
459
  return env
460

    
461

    
462
def _BuildInstanceHookEnvByObject(lu, instance, override=None):
463
  """Builds instance related env variables for hooks from an object.
464

465
  @type lu: L{LogicalUnit}
466
  @param lu: the logical unit on whose behalf we execute
467
  @type instance: L{objects.Instance}
468
  @param instance: the instance for which we should build the
469
      environment
470
  @type override: dict
471
  @param override: dictionary with key/values that will override
472
      our values
473
  @rtype: dict
474
  @return: the hook environment dictionary
475

476
  """
477
  bep = lu.cfg.GetClusterInfo().FillBE(instance)
478
  args = {
479
    'name': instance.name,
480
    'primary_node': instance.primary_node,
481
    'secondary_nodes': instance.secondary_nodes,
482
    'os_type': instance.os,
483
    'status': instance.os,
484
    'memory': bep[constants.BE_MEMORY],
485
    'vcpus': bep[constants.BE_VCPUS],
486
    'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
487
  }
488
  if override:
489
    args.update(override)
490
  return _BuildInstanceHookEnv(**args)
491

    
492

    
493
def _CheckInstanceBridgesExist(lu, instance):
494
  """Check that the brigdes needed by an instance exist.
495

496
  """
497
  # check bridges existance
498
  brlist = [nic.bridge for nic in instance.nics]
499
  if not lu.rpc.call_bridges_exist(instance.primary_node, brlist):
500
    raise errors.OpPrereqError("one or more target bridges %s does not"
501
                               " exist on destination node '%s'" %
502
                               (brlist, instance.primary_node))
503

    
504

    
505
class LUDestroyCluster(NoHooksLU):
506
  """Logical unit for destroying the cluster.
507

508
  """
509
  _OP_REQP = []
510

    
511
  def CheckPrereq(self):
512
    """Check prerequisites.
513

514
    This checks whether the cluster is empty.
515

516
    Any errors are signalled by raising errors.OpPrereqError.
517

518
    """
519
    master = self.cfg.GetMasterNode()
520

    
521
    nodelist = self.cfg.GetNodeList()
522
    if len(nodelist) != 1 or nodelist[0] != master:
523
      raise errors.OpPrereqError("There are still %d node(s) in"
524
                                 " this cluster." % (len(nodelist) - 1))
525
    instancelist = self.cfg.GetInstanceList()
526
    if instancelist:
527
      raise errors.OpPrereqError("There are still %d instance(s) in"
528
                                 " this cluster." % len(instancelist))
529

    
530
  def Exec(self, feedback_fn):
531
    """Destroys the cluster.
532

533
    """
534
    master = self.cfg.GetMasterNode()
535
    if not self.rpc.call_node_stop_master(master, False):
536
      raise errors.OpExecError("Could not disable the master role")
537
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
538
    utils.CreateBackup(priv_key)
539
    utils.CreateBackup(pub_key)
540
    return master
541

    
542

    
543
class LUVerifyCluster(LogicalUnit):
544
  """Verifies the cluster status.
545

546
  """
547
  HPATH = "cluster-verify"
548
  HTYPE = constants.HTYPE_CLUSTER
549
  _OP_REQP = ["skip_checks"]
550
  REQ_BGL = False
551

    
552
  def ExpandNames(self):
553
    self.needed_locks = {
554
      locking.LEVEL_NODE: locking.ALL_SET,
555
      locking.LEVEL_INSTANCE: locking.ALL_SET,
556
    }
557
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
558

    
559
  def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
560
                  remote_version, feedback_fn):
561
    """Run multiple tests against a node.
562

563
    Test list::
564

565
      - compares ganeti version
566
      - checks vg existance and size > 20G
567
      - checks config file checksum
568
      - checks ssh to other nodes
569

570
    @type node: string
571
    @param node: the name of the node to check
572
    @param file_list: required list of files
573
    @param local_cksum: dictionary of local files and their checksums
574
    @type vglist: dict
575
    @param vglist: dictionary of volume group names and their size
576
    @param node_result: the results from the node
577
    @param remote_version: the RPC version from the remote node
578
    @param feedback_fn: function used to accumulate results
579

580
    """
581
    # compares ganeti version
582
    local_version = constants.PROTOCOL_VERSION
583
    if not remote_version:
584
      feedback_fn("  - ERROR: connection to %s failed" % (node))
585
      return True
586

    
587
    if local_version != remote_version:
588
      feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
589
                      (local_version, node, remote_version))
590
      return True
591

    
592
    # checks vg existance and size > 20G
593

    
594
    bad = False
595
    if not vglist:
596
      feedback_fn("  - ERROR: unable to check volume groups on node %s." %
597
                      (node,))
598
      bad = True
599
    else:
600
      vgstatus = utils.CheckVolumeGroupSize(vglist, self.cfg.GetVGName(),
601
                                            constants.MIN_VG_SIZE)
602
      if vgstatus:
603
        feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
604
        bad = True
605

    
606
    if not node_result:
607
      feedback_fn("  - ERROR: unable to verify node %s." % (node,))
608
      return True
609

    
610
    # checks config file checksum
611
    # checks ssh to any
612

    
613
    if 'filelist' not in node_result:
614
      bad = True
615
      feedback_fn("  - ERROR: node hasn't returned file checksum data")
616
    else:
617
      remote_cksum = node_result['filelist']
618
      for file_name in file_list:
619
        if file_name not in remote_cksum:
620
          bad = True
621
          feedback_fn("  - ERROR: file '%s' missing" % file_name)
622
        elif remote_cksum[file_name] != local_cksum[file_name]:
623
          bad = True
624
          feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
625

    
626
    if 'nodelist' not in node_result:
627
      bad = True
628
      feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
629
    else:
630
      if node_result['nodelist']:
631
        bad = True
632
        for node in node_result['nodelist']:
633
          feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
634
                          (node, node_result['nodelist'][node]))
635
    if 'node-net-test' not in node_result:
636
      bad = True
637
      feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
638
    else:
639
      if node_result['node-net-test']:
640
        bad = True
641
        nlist = utils.NiceSort(node_result['node-net-test'].keys())
642
        for node in nlist:
643
          feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
644
                          (node, node_result['node-net-test'][node]))
645

    
646
    hyp_result = node_result.get('hypervisor', None)
647
    if isinstance(hyp_result, dict):
648
      for hv_name, hv_result in hyp_result.iteritems():
649
        if hv_result is not None:
650
          feedback_fn("  - ERROR: hypervisor %s verify failure: '%s'" %
651
                      (hv_name, hv_result))
652
    return bad
653

    
654
  def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
655
                      node_instance, feedback_fn):
656
    """Verify an instance.
657

658
    This function checks to see if the required block devices are
659
    available on the instance's node.
660

661
    """
662
    bad = False
663

    
664
    node_current = instanceconfig.primary_node
665

    
666
    node_vol_should = {}
667
    instanceconfig.MapLVsByNode(node_vol_should)
668

    
669
    for node in node_vol_should:
670
      for volume in node_vol_should[node]:
671
        if node not in node_vol_is or volume not in node_vol_is[node]:
672
          feedback_fn("  - ERROR: volume %s missing on node %s" %
673
                          (volume, node))
674
          bad = True
675

    
676
    if not instanceconfig.status == 'down':
677
      if (node_current not in node_instance or
678
          not instance in node_instance[node_current]):
679
        feedback_fn("  - ERROR: instance %s not running on node %s" %
680
                        (instance, node_current))
681
        bad = True
682

    
683
    for node in node_instance:
684
      if (not node == node_current):
685
        if instance in node_instance[node]:
686
          feedback_fn("  - ERROR: instance %s should not run on node %s" %
687
                          (instance, node))
688
          bad = True
689

    
690
    return bad
691

    
692
  def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
693
    """Verify if there are any unknown volumes in the cluster.
694

695
    The .os, .swap and backup volumes are ignored. All other volumes are
696
    reported as unknown.
697

698
    """
699
    bad = False
700

    
701
    for node in node_vol_is:
702
      for volume in node_vol_is[node]:
703
        if node not in node_vol_should or volume not in node_vol_should[node]:
704
          feedback_fn("  - ERROR: volume %s on node %s should not exist" %
705
                      (volume, node))
706
          bad = True
707
    return bad
708

    
709
  def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
710
    """Verify the list of running instances.
711

712
    This checks what instances are running but unknown to the cluster.
713

714
    """
715
    bad = False
716
    for node in node_instance:
717
      for runninginstance in node_instance[node]:
718
        if runninginstance not in instancelist:
719
          feedback_fn("  - ERROR: instance %s on node %s should not exist" %
720
                          (runninginstance, node))
721
          bad = True
722
    return bad
723

    
724
  def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
725
    """Verify N+1 Memory Resilience.
726

727
    Check that if one single node dies we can still start all the instances it
728
    was primary for.
729

730
    """
731
    bad = False
732

    
733
    for node, nodeinfo in node_info.iteritems():
734
      # This code checks that every node which is now listed as secondary has
735
      # enough memory to host all instances it is supposed to should a single
736
      # other node in the cluster fail.
737
      # FIXME: not ready for failover to an arbitrary node
738
      # FIXME: does not support file-backed instances
739
      # WARNING: we currently take into account down instances as well as up
740
      # ones, considering that even if they're down someone might want to start
741
      # them even in the event of a node failure.
742
      for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
743
        needed_mem = 0
744
        for instance in instances:
745
          bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
746
          if bep[constants.BE_AUTO_BALANCE]:
747
            needed_mem += bep[constants.BE_MEMORY]
748
        if nodeinfo['mfree'] < needed_mem:
749
          feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
750
                      " failovers should node %s fail" % (node, prinode))
751
          bad = True
752
    return bad
753

    
754
  def CheckPrereq(self):
755
    """Check prerequisites.
756

757
    Transform the list of checks we're going to skip into a set and check that
758
    all its members are valid.
759

760
    """
761
    self.skip_set = frozenset(self.op.skip_checks)
762
    if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
763
      raise errors.OpPrereqError("Invalid checks to be skipped specified")
764

    
765
  def BuildHooksEnv(self):
766
    """Build hooks env.
767

768
    Cluster-Verify hooks just rone in the post phase and their failure makes
769
    the output be logged in the verify output and the verification to fail.
770

771
    """
772
    all_nodes = self.cfg.GetNodeList()
773
    # TODO: populate the environment with useful information for verify hooks
774
    env = {}
775
    return env, [], all_nodes
776

    
777
  def Exec(self, feedback_fn):
778
    """Verify integrity of cluster, performing various test on nodes.
779

780
    """
781
    bad = False
782
    feedback_fn("* Verifying global settings")
783
    for msg in self.cfg.VerifyConfig():
784
      feedback_fn("  - ERROR: %s" % msg)
785

    
786
    vg_name = self.cfg.GetVGName()
787
    hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
788
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
789
    nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
790
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
791
    i_non_redundant = [] # Non redundant instances
792
    i_non_a_balanced = [] # Non auto-balanced instances
793
    node_volume = {}
794
    node_instance = {}
795
    node_info = {}
796
    instance_cfg = {}
797

    
798
    # FIXME: verify OS list
799
    # do local checksums
800
    file_names = []
801
    file_names.append(constants.SSL_CERT_FILE)
802
    file_names.append(constants.CLUSTER_CONF_FILE)
803
    local_checksums = utils.FingerprintFiles(file_names)
804

    
805
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
806
    all_volumeinfo = self.rpc.call_volume_list(nodelist, vg_name)
807
    all_instanceinfo = self.rpc.call_instance_list(nodelist, hypervisors)
808
    all_vglist = self.rpc.call_vg_list(nodelist)
809
    node_verify_param = {
810
      'filelist': file_names,
811
      'nodelist': nodelist,
812
      'hypervisor': hypervisors,
813
      'node-net-test': [(node.name, node.primary_ip, node.secondary_ip)
814
                        for node in nodeinfo]
815
      }
816
    all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
817
                                           self.cfg.GetClusterName())
818
    all_rversion = self.rpc.call_version(nodelist)
819
    all_ninfo = self.rpc.call_node_info(nodelist, self.cfg.GetVGName(),
820
                                        self.cfg.GetHypervisorType())
821

    
822
    cluster = self.cfg.GetClusterInfo()
823
    for node in nodelist:
824
      feedback_fn("* Verifying node %s" % node)
825
      result = self._VerifyNode(node, file_names, local_checksums,
826
                                all_vglist[node], all_nvinfo[node],
827
                                all_rversion[node], feedback_fn)
828
      bad = bad or result
829

    
830
      # node_volume
831
      volumeinfo = all_volumeinfo[node]
832

    
833
      if isinstance(volumeinfo, basestring):
834
        feedback_fn("  - ERROR: LVM problem on node %s: %s" %
835
                    (node, volumeinfo[-400:].encode('string_escape')))
836
        bad = True
837
        node_volume[node] = {}
838
      elif not isinstance(volumeinfo, dict):
839
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
840
        bad = True
841
        continue
842
      else:
843
        node_volume[node] = volumeinfo
844

    
845
      # node_instance
846
      nodeinstance = all_instanceinfo[node]
847
      if type(nodeinstance) != list:
848
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
849
        bad = True
850
        continue
851

    
852
      node_instance[node] = nodeinstance
853

    
854
      # node_info
855
      nodeinfo = all_ninfo[node]
856
      if not isinstance(nodeinfo, dict):
857
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
858
        bad = True
859
        continue
860

    
861
      try:
862
        node_info[node] = {
863
          "mfree": int(nodeinfo['memory_free']),
864
          "dfree": int(nodeinfo['vg_free']),
865
          "pinst": [],
866
          "sinst": [],
867
          # dictionary holding all instances this node is secondary for,
868
          # grouped by their primary node. Each key is a cluster node, and each
869
          # value is a list of instances which have the key as primary and the
870
          # current node as secondary.  this is handy to calculate N+1 memory
871
          # availability if you can only failover from a primary to its
872
          # secondary.
873
          "sinst-by-pnode": {},
874
        }
875
      except ValueError:
876
        feedback_fn("  - ERROR: invalid value returned from node %s" % (node,))
877
        bad = True
878
        continue
879

    
880
    node_vol_should = {}
881

    
882
    for instance in instancelist:
883
      feedback_fn("* Verifying instance %s" % instance)
884
      inst_config = self.cfg.GetInstanceInfo(instance)
885
      result =  self._VerifyInstance(instance, inst_config, node_volume,
886
                                     node_instance, feedback_fn)
887
      bad = bad or result
888

    
889
      inst_config.MapLVsByNode(node_vol_should)
890

    
891
      instance_cfg[instance] = inst_config
892

    
893
      pnode = inst_config.primary_node
894
      if pnode in node_info:
895
        node_info[pnode]['pinst'].append(instance)
896
      else:
897
        feedback_fn("  - ERROR: instance %s, connection to primary node"
898
                    " %s failed" % (instance, pnode))
899
        bad = True
900

    
901
      # If the instance is non-redundant we cannot survive losing its primary
902
      # node, so we are not N+1 compliant. On the other hand we have no disk
903
      # templates with more than one secondary so that situation is not well
904
      # supported either.
905
      # FIXME: does not support file-backed instances
906
      if len(inst_config.secondary_nodes) == 0:
907
        i_non_redundant.append(instance)
908
      elif len(inst_config.secondary_nodes) > 1:
909
        feedback_fn("  - WARNING: multiple secondaries for instance %s"
910
                    % instance)
911

    
912
      if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
913
        i_non_a_balanced.append(instance)
914

    
915
      for snode in inst_config.secondary_nodes:
916
        if snode in node_info:
917
          node_info[snode]['sinst'].append(instance)
918
          if pnode not in node_info[snode]['sinst-by-pnode']:
919
            node_info[snode]['sinst-by-pnode'][pnode] = []
920
          node_info[snode]['sinst-by-pnode'][pnode].append(instance)
921
        else:
922
          feedback_fn("  - ERROR: instance %s, connection to secondary node"
923
                      " %s failed" % (instance, snode))
924

    
925
    feedback_fn("* Verifying orphan volumes")
926
    result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
927
                                       feedback_fn)
928
    bad = bad or result
929

    
930
    feedback_fn("* Verifying remaining instances")
931
    result = self._VerifyOrphanInstances(instancelist, node_instance,
932
                                         feedback_fn)
933
    bad = bad or result
934

    
935
    if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
936
      feedback_fn("* Verifying N+1 Memory redundancy")
937
      result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
938
      bad = bad or result
939

    
940
    feedback_fn("* Other Notes")
941
    if i_non_redundant:
942
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
943
                  % len(i_non_redundant))
944

    
945
    if i_non_a_balanced:
946
      feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
947
                  % len(i_non_a_balanced))
948

    
949
    return not bad
950

    
951
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
952
    """Analize the post-hooks' result
953

954
    This method analyses the hook result, handles it, and sends some
955
    nicely-formatted feedback back to the user.
956

957
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
958
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
959
    @param hooks_results: the results of the multi-node hooks rpc call
960
    @param feedback_fn: function used send feedback back to the caller
961
    @param lu_result: previous Exec result
962
    @return: the new Exec result, based on the previous result
963
        and hook results
964

965
    """
966
    # We only really run POST phase hooks, and are only interested in
967
    # their results
968
    if phase == constants.HOOKS_PHASE_POST:
969
      # Used to change hooks' output to proper indentation
970
      indent_re = re.compile('^', re.M)
971
      feedback_fn("* Hooks Results")
972
      if not hooks_results:
973
        feedback_fn("  - ERROR: general communication failure")
974
        lu_result = 1
975
      else:
976
        for node_name in hooks_results:
977
          show_node_header = True
978
          res = hooks_results[node_name]
979
          if res is False or not isinstance(res, list):
980
            feedback_fn("    Communication failure")
981
            lu_result = 1
982
            continue
983
          for script, hkr, output in res:
984
            if hkr == constants.HKR_FAIL:
985
              # The node header is only shown once, if there are
986
              # failing hooks on that node
987
              if show_node_header:
988
                feedback_fn("  Node %s:" % node_name)
989
                show_node_header = False
990
              feedback_fn("    ERROR: Script %s failed, output:" % script)
991
              output = indent_re.sub('      ', output)
992
              feedback_fn("%s" % output)
993
              lu_result = 1
994

    
995
      return lu_result
996

    
997

    
998
class LUVerifyDisks(NoHooksLU):
999
  """Verifies the cluster disks status.
1000

1001
  """
1002
  _OP_REQP = []
1003
  REQ_BGL = False
1004

    
1005
  def ExpandNames(self):
1006
    self.needed_locks = {
1007
      locking.LEVEL_NODE: locking.ALL_SET,
1008
      locking.LEVEL_INSTANCE: locking.ALL_SET,
1009
    }
1010
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1011

    
1012
  def CheckPrereq(self):
1013
    """Check prerequisites.
1014

1015
    This has no prerequisites.
1016

1017
    """
1018
    pass
1019

    
1020
  def Exec(self, feedback_fn):
1021
    """Verify integrity of cluster disks.
1022

1023
    """
1024
    result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
1025

    
1026
    vg_name = self.cfg.GetVGName()
1027
    nodes = utils.NiceSort(self.cfg.GetNodeList())
1028
    instances = [self.cfg.GetInstanceInfo(name)
1029
                 for name in self.cfg.GetInstanceList()]
1030

    
1031
    nv_dict = {}
1032
    for inst in instances:
1033
      inst_lvs = {}
1034
      if (inst.status != "up" or
1035
          inst.disk_template not in constants.DTS_NET_MIRROR):
1036
        continue
1037
      inst.MapLVsByNode(inst_lvs)
1038
      # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1039
      for node, vol_list in inst_lvs.iteritems():
1040
        for vol in vol_list:
1041
          nv_dict[(node, vol)] = inst
1042

    
1043
    if not nv_dict:
1044
      return result
1045

    
1046
    node_lvs = self.rpc.call_volume_list(nodes, vg_name)
1047

    
1048
    to_act = set()
1049
    for node in nodes:
1050
      # node_volume
1051
      lvs = node_lvs[node]
1052

    
1053
      if isinstance(lvs, basestring):
1054
        logging.warning("Error enumerating LVs on node %s: %s", node, lvs)
1055
        res_nlvm[node] = lvs
1056
      elif not isinstance(lvs, dict):
1057
        logging.warning("Connection to node %s failed or invalid data"
1058
                        " returned", node)
1059
        res_nodes.append(node)
1060
        continue
1061

    
1062
      for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
1063
        inst = nv_dict.pop((node, lv_name), None)
1064
        if (not lv_online and inst is not None
1065
            and inst.name not in res_instances):
1066
          res_instances.append(inst.name)
1067

    
1068
    # any leftover items in nv_dict are missing LVs, let's arrange the
1069
    # data better
1070
    for key, inst in nv_dict.iteritems():
1071
      if inst.name not in res_missing:
1072
        res_missing[inst.name] = []
1073
      res_missing[inst.name].append(key)
1074

    
1075
    return result
1076

    
1077

    
1078
class LURenameCluster(LogicalUnit):
1079
  """Rename the cluster.
1080

1081
  """
1082
  HPATH = "cluster-rename"
1083
  HTYPE = constants.HTYPE_CLUSTER
1084
  _OP_REQP = ["name"]
1085

    
1086
  def BuildHooksEnv(self):
1087
    """Build hooks env.
1088

1089
    """
1090
    env = {
1091
      "OP_TARGET": self.cfg.GetClusterName(),
1092
      "NEW_NAME": self.op.name,
1093
      }
1094
    mn = self.cfg.GetMasterNode()
1095
    return env, [mn], [mn]
1096

    
1097
  def CheckPrereq(self):
1098
    """Verify that the passed name is a valid one.
1099

1100
    """
1101
    hostname = utils.HostInfo(self.op.name)
1102

    
1103
    new_name = hostname.name
1104
    self.ip = new_ip = hostname.ip
1105
    old_name = self.cfg.GetClusterName()
1106
    old_ip = self.cfg.GetMasterIP()
1107
    if new_name == old_name and new_ip == old_ip:
1108
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
1109
                                 " cluster has changed")
1110
    if new_ip != old_ip:
1111
      if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1112
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
1113
                                   " reachable on the network. Aborting." %
1114
                                   new_ip)
1115

    
1116
    self.op.name = new_name
1117

    
1118
  def Exec(self, feedback_fn):
1119
    """Rename the cluster.
1120

1121
    """
1122
    clustername = self.op.name
1123
    ip = self.ip
1124

    
1125
    # shutdown the master IP
1126
    master = self.cfg.GetMasterNode()
1127
    if not self.rpc.call_node_stop_master(master, False):
1128
      raise errors.OpExecError("Could not disable the master role")
1129

    
1130
    try:
1131
      # modify the sstore
1132
      # TODO: sstore
1133
      ss.SetKey(ss.SS_MASTER_IP, ip)
1134
      ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
1135

    
1136
      # Distribute updated ss config to all nodes
1137
      myself = self.cfg.GetNodeInfo(master)
1138
      dist_nodes = self.cfg.GetNodeList()
1139
      if myself.name in dist_nodes:
1140
        dist_nodes.remove(myself.name)
1141

    
1142
      logging.debug("Copying updated ssconf data to all nodes")
1143
      for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1144
        fname = ss.KeyToFilename(keyname)
1145
        result = self.rpc.call_upload_file(dist_nodes, fname)
1146
        for to_node in dist_nodes:
1147
          if not result[to_node]:
1148
            self.LogWarning("Copy of file %s to node %s failed",
1149
                            fname, to_node)
1150
    finally:
1151
      if not self.rpc.call_node_start_master(master, False):
1152
        self.LogWarning("Could not re-enable the master role on"
1153
                        " the master, please restart manually.")
1154

    
1155

    
1156
def _RecursiveCheckIfLVMBased(disk):
1157
  """Check if the given disk or its children are lvm-based.
1158

1159
  @type disk: L{objects.Disk}
1160
  @param disk: the disk to check
1161
  @rtype: booleean
1162
  @return: boolean indicating whether a LD_LV dev_type was found or not
1163

1164
  """
1165
  if disk.children:
1166
    for chdisk in disk.children:
1167
      if _RecursiveCheckIfLVMBased(chdisk):
1168
        return True
1169
  return disk.dev_type == constants.LD_LV
1170

    
1171

    
1172
class LUSetClusterParams(LogicalUnit):
1173
  """Change the parameters of the cluster.
1174

1175
  """
1176
  HPATH = "cluster-modify"
1177
  HTYPE = constants.HTYPE_CLUSTER
1178
  _OP_REQP = []
1179
  REQ_BGL = False
1180

    
1181
  def ExpandNames(self):
1182
    # FIXME: in the future maybe other cluster params won't require checking on
1183
    # all nodes to be modified.
1184
    self.needed_locks = {
1185
      locking.LEVEL_NODE: locking.ALL_SET,
1186
    }
1187
    self.share_locks[locking.LEVEL_NODE] = 1
1188

    
1189
  def BuildHooksEnv(self):
1190
    """Build hooks env.
1191

1192
    """
1193
    env = {
1194
      "OP_TARGET": self.cfg.GetClusterName(),
1195
      "NEW_VG_NAME": self.op.vg_name,
1196
      }
1197
    mn = self.cfg.GetMasterNode()
1198
    return env, [mn], [mn]
1199

    
1200
  def CheckPrereq(self):
1201
    """Check prerequisites.
1202

1203
    This checks whether the given params don't conflict and
1204
    if the given volume group is valid.
1205

1206
    """
1207
    # FIXME: This only works because there is only one parameter that can be
1208
    # changed or removed.
1209
    if self.op.vg_name is not None and not self.op.vg_name:
1210
      instances = self.cfg.GetAllInstancesInfo().values()
1211
      for inst in instances:
1212
        for disk in inst.disks:
1213
          if _RecursiveCheckIfLVMBased(disk):
1214
            raise errors.OpPrereqError("Cannot disable lvm storage while"
1215
                                       " lvm-based instances exist")
1216

    
1217
    node_list = self.acquired_locks[locking.LEVEL_NODE]
1218

    
1219
    # if vg_name not None, checks given volume group on all nodes
1220
    if self.op.vg_name:
1221
      vglist = self.rpc.call_vg_list(node_list)
1222
      for node in node_list:
1223
        vgstatus = utils.CheckVolumeGroupSize(vglist[node], self.op.vg_name,
1224
                                              constants.MIN_VG_SIZE)
1225
        if vgstatus:
1226
          raise errors.OpPrereqError("Error on node '%s': %s" %
1227
                                     (node, vgstatus))
1228

    
1229
    self.cluster = cluster = self.cfg.GetClusterInfo()
1230
    # beparams changes do not need validation (we can't validate?),
1231
    # but we still process here
1232
    if self.op.beparams:
1233
      self.new_beparams = cluster.FillDict(
1234
        cluster.beparams[constants.BEGR_DEFAULT], self.op.beparams)
1235

    
1236
    # hypervisor list/parameters
1237
    self.new_hvparams = cluster.FillDict(cluster.hvparams, {})
1238
    if self.op.hvparams:
1239
      if not isinstance(self.op.hvparams, dict):
1240
        raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1241
      for hv_name, hv_dict in self.op.hvparams.items():
1242
        if hv_name not in self.new_hvparams:
1243
          self.new_hvparams[hv_name] = hv_dict
1244
        else:
1245
          self.new_hvparams[hv_name].update(hv_dict)
1246

    
1247
    if self.op.enabled_hypervisors is not None:
1248
      self.hv_list = self.op.enabled_hypervisors
1249
    else:
1250
      self.hv_list = cluster.enabled_hypervisors
1251

    
1252
    if self.op.hvparams or self.op.enabled_hypervisors is not None:
1253
      # either the enabled list has changed, or the parameters have, validate
1254
      for hv_name, hv_params in self.new_hvparams.items():
1255
        if ((self.op.hvparams and hv_name in self.op.hvparams) or
1256
            (self.op.enabled_hypervisors and
1257
             hv_name in self.op.enabled_hypervisors)):
1258
          # either this is a new hypervisor, or its parameters have changed
1259
          hv_class = hypervisor.GetHypervisor(hv_name)
1260
          hv_class.CheckParameterSyntax(hv_params)
1261
          _CheckHVParams(self, node_list, hv_name, hv_params)
1262

    
1263
  def Exec(self, feedback_fn):
1264
    """Change the parameters of the cluster.
1265

1266
    """
1267
    if self.op.vg_name is not None:
1268
      if self.op.vg_name != self.cfg.GetVGName():
1269
        self.cfg.SetVGName(self.op.vg_name)
1270
      else:
1271
        feedback_fn("Cluster LVM configuration already in desired"
1272
                    " state, not changing")
1273
    if self.op.hvparams:
1274
      self.cluster.hvparams = self.new_hvparams
1275
    if self.op.enabled_hypervisors is not None:
1276
      self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1277
    if self.op.beparams:
1278
      self.cluster.beparams[constants.BEGR_DEFAULT] = self.new_beparams
1279
    self.cfg.Update(self.cluster)
1280

    
1281

    
1282
def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1283
  """Sleep and poll for an instance's disk to sync.
1284

1285
  """
1286
  if not instance.disks:
1287
    return True
1288

    
1289
  if not oneshot:
1290
    lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1291

    
1292
  node = instance.primary_node
1293

    
1294
  for dev in instance.disks:
1295
    lu.cfg.SetDiskID(dev, node)
1296

    
1297
  retries = 0
1298
  while True:
1299
    max_time = 0
1300
    done = True
1301
    cumul_degraded = False
1302
    rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1303
    if not rstats:
1304
      lu.LogWarning("Can't get any data from node %s", node)
1305
      retries += 1
1306
      if retries >= 10:
1307
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1308
                                 " aborting." % node)
1309
      time.sleep(6)
1310
      continue
1311
    retries = 0
1312
    for i in range(len(rstats)):
1313
      mstat = rstats[i]
1314
      if mstat is None:
1315
        lu.LogWarning("Can't compute data for node %s/%s",
1316
                           node, instance.disks[i].iv_name)
1317
        continue
1318
      # we ignore the ldisk parameter
1319
      perc_done, est_time, is_degraded, _ = mstat
1320
      cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1321
      if perc_done is not None:
1322
        done = False
1323
        if est_time is not None:
1324
          rem_time = "%d estimated seconds remaining" % est_time
1325
          max_time = est_time
1326
        else:
1327
          rem_time = "no time estimate"
1328
        lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1329
                        (instance.disks[i].iv_name, perc_done, rem_time))
1330
    if done or oneshot:
1331
      break
1332

    
1333
    time.sleep(min(60, max_time))
1334

    
1335
  if done:
1336
    lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1337
  return not cumul_degraded
1338

    
1339

    
1340
def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1341
  """Check that mirrors are not degraded.
1342

1343
  The ldisk parameter, if True, will change the test from the
1344
  is_degraded attribute (which represents overall non-ok status for
1345
  the device(s)) to the ldisk (representing the local storage status).
1346

1347
  """
1348
  lu.cfg.SetDiskID(dev, node)
1349
  if ldisk:
1350
    idx = 6
1351
  else:
1352
    idx = 5
1353

    
1354
  result = True
1355
  if on_primary or dev.AssembleOnSecondary():
1356
    rstats = lu.rpc.call_blockdev_find(node, dev)
1357
    if not rstats:
1358
      logging.warning("Node %s: disk degraded, not found or node down", node)
1359
      result = False
1360
    else:
1361
      result = result and (not rstats[idx])
1362
  if dev.children:
1363
    for child in dev.children:
1364
      result = result and _CheckDiskConsistency(lu, child, node, on_primary)
1365

    
1366
  return result
1367

    
1368

    
1369
class LUDiagnoseOS(NoHooksLU):
1370
  """Logical unit for OS diagnose/query.
1371

1372
  """
1373
  _OP_REQP = ["output_fields", "names"]
1374
  REQ_BGL = False
1375
  _FIELDS_STATIC = utils.FieldSet()
1376
  _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status")
1377

    
1378
  def ExpandNames(self):
1379
    if self.op.names:
1380
      raise errors.OpPrereqError("Selective OS query not supported")
1381

    
1382
    _CheckOutputFields(static=self._FIELDS_STATIC,
1383
                       dynamic=self._FIELDS_DYNAMIC,
1384
                       selected=self.op.output_fields)
1385

    
1386
    # Lock all nodes, in shared mode
1387
    self.needed_locks = {}
1388
    self.share_locks[locking.LEVEL_NODE] = 1
1389
    self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1390

    
1391
  def CheckPrereq(self):
1392
    """Check prerequisites.
1393

1394
    """
1395

    
1396
  @staticmethod
1397
  def _DiagnoseByOS(node_list, rlist):
1398
    """Remaps a per-node return list into an a per-os per-node dictionary
1399

1400
    @param node_list: a list with the names of all nodes
1401
    @param rlist: a map with node names as keys and OS objects as values
1402

1403
    @rtype: dict
1404
    @returns: a dictionary with osnames as keys and as value another map, with
1405
        nodes as keys and list of OS objects as values, eg::
1406

1407
          {"debian-etch": {"node1": [<object>,...],
1408
                           "node2": [<object>,]}
1409
          }
1410

1411
    """
1412
    all_os = {}
1413
    for node_name, nr in rlist.iteritems():
1414
      if not nr:
1415
        continue
1416
      for os_obj in nr:
1417
        if os_obj.name not in all_os:
1418
          # build a list of nodes for this os containing empty lists
1419
          # for each node in node_list
1420
          all_os[os_obj.name] = {}
1421
          for nname in node_list:
1422
            all_os[os_obj.name][nname] = []
1423
        all_os[os_obj.name][node_name].append(os_obj)
1424
    return all_os
1425

    
1426
  def Exec(self, feedback_fn):
1427
    """Compute the list of OSes.
1428

1429
    """
1430
    node_list = self.acquired_locks[locking.LEVEL_NODE]
1431
    node_data = self.rpc.call_os_diagnose(node_list)
1432
    if node_data == False:
1433
      raise errors.OpExecError("Can't gather the list of OSes")
1434
    pol = self._DiagnoseByOS(node_list, node_data)
1435
    output = []
1436
    for os_name, os_data in pol.iteritems():
1437
      row = []
1438
      for field in self.op.output_fields:
1439
        if field == "name":
1440
          val = os_name
1441
        elif field == "valid":
1442
          val = utils.all([osl and osl[0] for osl in os_data.values()])
1443
        elif field == "node_status":
1444
          val = {}
1445
          for node_name, nos_list in os_data.iteritems():
1446
            val[node_name] = [(v.status, v.path) for v in nos_list]
1447
        else:
1448
          raise errors.ParameterError(field)
1449
        row.append(val)
1450
      output.append(row)
1451

    
1452
    return output
1453

    
1454

    
1455
class LURemoveNode(LogicalUnit):
1456
  """Logical unit for removing a node.
1457

1458
  """
1459
  HPATH = "node-remove"
1460
  HTYPE = constants.HTYPE_NODE
1461
  _OP_REQP = ["node_name"]
1462

    
1463
  def BuildHooksEnv(self):
1464
    """Build hooks env.
1465

1466
    This doesn't run on the target node in the pre phase as a failed
1467
    node would then be impossible to remove.
1468

1469
    """
1470
    env = {
1471
      "OP_TARGET": self.op.node_name,
1472
      "NODE_NAME": self.op.node_name,
1473
      }
1474
    all_nodes = self.cfg.GetNodeList()
1475
    all_nodes.remove(self.op.node_name)
1476
    return env, all_nodes, all_nodes
1477

    
1478
  def CheckPrereq(self):
1479
    """Check prerequisites.
1480

1481
    This checks:
1482
     - the node exists in the configuration
1483
     - it does not have primary or secondary instances
1484
     - it's not the master
1485

1486
    Any errors are signalled by raising errors.OpPrereqError.
1487

1488
    """
1489
    node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1490
    if node is None:
1491
      raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1492

    
1493
    instance_list = self.cfg.GetInstanceList()
1494

    
1495
    masternode = self.cfg.GetMasterNode()
1496
    if node.name == masternode:
1497
      raise errors.OpPrereqError("Node is the master node,"
1498
                                 " you need to failover first.")
1499

    
1500
    for instance_name in instance_list:
1501
      instance = self.cfg.GetInstanceInfo(instance_name)
1502
      if node.name == instance.primary_node:
1503
        raise errors.OpPrereqError("Instance %s still running on the node,"
1504
                                   " please remove first." % instance_name)
1505
      if node.name in instance.secondary_nodes:
1506
        raise errors.OpPrereqError("Instance %s has node as a secondary,"
1507
                                   " please remove first." % instance_name)
1508
    self.op.node_name = node.name
1509
    self.node = node
1510

    
1511
  def Exec(self, feedback_fn):
1512
    """Removes the node from the cluster.
1513

1514
    """
1515
    node = self.node
1516
    logging.info("Stopping the node daemon and removing configs from node %s",
1517
                 node.name)
1518

    
1519
    self.context.RemoveNode(node.name)
1520

    
1521
    self.rpc.call_node_leave_cluster(node.name)
1522

    
1523

    
1524
class LUQueryNodes(NoHooksLU):
1525
  """Logical unit for querying nodes.
1526

1527
  """
1528
  _OP_REQP = ["output_fields", "names"]
1529
  REQ_BGL = False
1530
  _FIELDS_DYNAMIC = utils.FieldSet(
1531
    "dtotal", "dfree",
1532
    "mtotal", "mnode", "mfree",
1533
    "bootid",
1534
    "ctotal",
1535
    )
1536

    
1537
  _FIELDS_STATIC = utils.FieldSet(
1538
    "name", "pinst_cnt", "sinst_cnt",
1539
    "pinst_list", "sinst_list",
1540
    "pip", "sip", "tags",
1541
    "serial_no",
1542
    )
1543

    
1544
  def ExpandNames(self):
1545
    _CheckOutputFields(static=self._FIELDS_STATIC,
1546
                       dynamic=self._FIELDS_DYNAMIC,
1547
                       selected=self.op.output_fields)
1548

    
1549
    self.needed_locks = {}
1550
    self.share_locks[locking.LEVEL_NODE] = 1
1551

    
1552
    if self.op.names:
1553
      self.wanted = _GetWantedNodes(self, self.op.names)
1554
    else:
1555
      self.wanted = locking.ALL_SET
1556

    
1557
    self.do_locking = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
1558
    if self.do_locking:
1559
      # if we don't request only static fields, we need to lock the nodes
1560
      self.needed_locks[locking.LEVEL_NODE] = self.wanted
1561

    
1562

    
1563
  def CheckPrereq(self):
1564
    """Check prerequisites.
1565

1566
    """
1567
    # The validation of the node list is done in the _GetWantedNodes,
1568
    # if non empty, and if empty, there's no validation to do
1569
    pass
1570

    
1571
  def Exec(self, feedback_fn):
1572
    """Computes the list of nodes and their attributes.
1573

1574
    """
1575
    all_info = self.cfg.GetAllNodesInfo()
1576
    if self.do_locking:
1577
      nodenames = self.acquired_locks[locking.LEVEL_NODE]
1578
    elif self.wanted != locking.ALL_SET:
1579
      nodenames = self.wanted
1580
      missing = set(nodenames).difference(all_info.keys())
1581
      if missing:
1582
        raise errors.OpExecError(
1583
          "Some nodes were removed before retrieving their data: %s" % missing)
1584
    else:
1585
      nodenames = all_info.keys()
1586

    
1587
    nodenames = utils.NiceSort(nodenames)
1588
    nodelist = [all_info[name] for name in nodenames]
1589

    
1590
    # begin data gathering
1591

    
1592
    if self.do_locking:
1593
      live_data = {}
1594
      node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
1595
                                          self.cfg.GetHypervisorType())
1596
      for name in nodenames:
1597
        nodeinfo = node_data.get(name, None)
1598
        if nodeinfo:
1599
          live_data[name] = {
1600
            "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1601
            "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1602
            "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1603
            "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1604
            "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1605
            "ctotal": utils.TryConvert(int, nodeinfo['cpu_total']),
1606
            "bootid": nodeinfo['bootid'],
1607
            }
1608
        else:
1609
          live_data[name] = {}
1610
    else:
1611
      live_data = dict.fromkeys(nodenames, {})
1612

    
1613
    node_to_primary = dict([(name, set()) for name in nodenames])
1614
    node_to_secondary = dict([(name, set()) for name in nodenames])
1615

    
1616
    inst_fields = frozenset(("pinst_cnt", "pinst_list",
1617
                             "sinst_cnt", "sinst_list"))
1618
    if inst_fields & frozenset(self.op.output_fields):
1619
      instancelist = self.cfg.GetInstanceList()
1620

    
1621
      for instance_name in instancelist:
1622
        inst = self.cfg.GetInstanceInfo(instance_name)
1623
        if inst.primary_node in node_to_primary:
1624
          node_to_primary[inst.primary_node].add(inst.name)
1625
        for secnode in inst.secondary_nodes:
1626
          if secnode in node_to_secondary:
1627
            node_to_secondary[secnode].add(inst.name)
1628

    
1629
    # end data gathering
1630

    
1631
    output = []
1632
    for node in nodelist:
1633
      node_output = []
1634
      for field in self.op.output_fields:
1635
        if field == "name":
1636
          val = node.name
1637
        elif field == "pinst_list":
1638
          val = list(node_to_primary[node.name])
1639
        elif field == "sinst_list":
1640
          val = list(node_to_secondary[node.name])
1641
        elif field == "pinst_cnt":
1642
          val = len(node_to_primary[node.name])
1643
        elif field == "sinst_cnt":
1644
          val = len(node_to_secondary[node.name])
1645
        elif field == "pip":
1646
          val = node.primary_ip
1647
        elif field == "sip":
1648
          val = node.secondary_ip
1649
        elif field == "tags":
1650
          val = list(node.GetTags())
1651
        elif field == "serial_no":
1652
          val = node.serial_no
1653
        elif self._FIELDS_DYNAMIC.Matches(field):
1654
          val = live_data[node.name].get(field, None)
1655
        else:
1656
          raise errors.ParameterError(field)
1657
        node_output.append(val)
1658
      output.append(node_output)
1659

    
1660
    return output
1661

    
1662

    
1663
class LUQueryNodeVolumes(NoHooksLU):
1664
  """Logical unit for getting volumes on node(s).
1665

1666
  """
1667
  _OP_REQP = ["nodes", "output_fields"]
1668
  REQ_BGL = False
1669
  _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
1670
  _FIELDS_STATIC = utils.FieldSet("node")
1671

    
1672
  def ExpandNames(self):
1673
    _CheckOutputFields(static=self._FIELDS_STATIC,
1674
                       dynamic=self._FIELDS_DYNAMIC,
1675
                       selected=self.op.output_fields)
1676

    
1677
    self.needed_locks = {}
1678
    self.share_locks[locking.LEVEL_NODE] = 1
1679
    if not self.op.nodes:
1680
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1681
    else:
1682
      self.needed_locks[locking.LEVEL_NODE] = \
1683
        _GetWantedNodes(self, self.op.nodes)
1684

    
1685
  def CheckPrereq(self):
1686
    """Check prerequisites.
1687

1688
    This checks that the fields required are valid output fields.
1689

1690
    """
1691
    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
1692

    
1693
  def Exec(self, feedback_fn):
1694
    """Computes the list of nodes and their attributes.
1695

1696
    """
1697
    nodenames = self.nodes
1698
    volumes = self.rpc.call_node_volumes(nodenames)
1699

    
1700
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
1701
             in self.cfg.GetInstanceList()]
1702

    
1703
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1704

    
1705
    output = []
1706
    for node in nodenames:
1707
      if node not in volumes or not volumes[node]:
1708
        continue
1709

    
1710
      node_vols = volumes[node][:]
1711
      node_vols.sort(key=lambda vol: vol['dev'])
1712

    
1713
      for vol in node_vols:
1714
        node_output = []
1715
        for field in self.op.output_fields:
1716
          if field == "node":
1717
            val = node
1718
          elif field == "phys":
1719
            val = vol['dev']
1720
          elif field == "vg":
1721
            val = vol['vg']
1722
          elif field == "name":
1723
            val = vol['name']
1724
          elif field == "size":
1725
            val = int(float(vol['size']))
1726
          elif field == "instance":
1727
            for inst in ilist:
1728
              if node not in lv_by_node[inst]:
1729
                continue
1730
              if vol['name'] in lv_by_node[inst][node]:
1731
                val = inst.name
1732
                break
1733
            else:
1734
              val = '-'
1735
          else:
1736
            raise errors.ParameterError(field)
1737
          node_output.append(str(val))
1738

    
1739
        output.append(node_output)
1740

    
1741
    return output
1742

    
1743

    
1744
class LUAddNode(LogicalUnit):
1745
  """Logical unit for adding node to the cluster.
1746

1747
  """
1748
  HPATH = "node-add"
1749
  HTYPE = constants.HTYPE_NODE
1750
  _OP_REQP = ["node_name"]
1751

    
1752
  def BuildHooksEnv(self):
1753
    """Build hooks env.
1754

1755
    This will run on all nodes before, and on all nodes + the new node after.
1756

1757
    """
1758
    env = {
1759
      "OP_TARGET": self.op.node_name,
1760
      "NODE_NAME": self.op.node_name,
1761
      "NODE_PIP": self.op.primary_ip,
1762
      "NODE_SIP": self.op.secondary_ip,
1763
      }
1764
    nodes_0 = self.cfg.GetNodeList()
1765
    nodes_1 = nodes_0 + [self.op.node_name, ]
1766
    return env, nodes_0, nodes_1
1767

    
1768
  def CheckPrereq(self):
1769
    """Check prerequisites.
1770

1771
    This checks:
1772
     - the new node is not already in the config
1773
     - it is resolvable
1774
     - its parameters (single/dual homed) matches the cluster
1775

1776
    Any errors are signalled by raising errors.OpPrereqError.
1777

1778
    """
1779
    node_name = self.op.node_name
1780
    cfg = self.cfg
1781

    
1782
    dns_data = utils.HostInfo(node_name)
1783

    
1784
    node = dns_data.name
1785
    primary_ip = self.op.primary_ip = dns_data.ip
1786
    secondary_ip = getattr(self.op, "secondary_ip", None)
1787
    if secondary_ip is None:
1788
      secondary_ip = primary_ip
1789
    if not utils.IsValidIP(secondary_ip):
1790
      raise errors.OpPrereqError("Invalid secondary IP given")
1791
    self.op.secondary_ip = secondary_ip
1792

    
1793
    node_list = cfg.GetNodeList()
1794
    if not self.op.readd and node in node_list:
1795
      raise errors.OpPrereqError("Node %s is already in the configuration" %
1796
                                 node)
1797
    elif self.op.readd and node not in node_list:
1798
      raise errors.OpPrereqError("Node %s is not in the configuration" % node)
1799

    
1800
    for existing_node_name in node_list:
1801
      existing_node = cfg.GetNodeInfo(existing_node_name)
1802

    
1803
      if self.op.readd and node == existing_node_name:
1804
        if (existing_node.primary_ip != primary_ip or
1805
            existing_node.secondary_ip != secondary_ip):
1806
          raise errors.OpPrereqError("Readded node doesn't have the same IP"
1807
                                     " address configuration as before")
1808
        continue
1809

    
1810
      if (existing_node.primary_ip == primary_ip or
1811
          existing_node.secondary_ip == primary_ip or
1812
          existing_node.primary_ip == secondary_ip or
1813
          existing_node.secondary_ip == secondary_ip):
1814
        raise errors.OpPrereqError("New node ip address(es) conflict with"
1815
                                   " existing node %s" % existing_node.name)
1816

    
1817
    # check that the type of the node (single versus dual homed) is the
1818
    # same as for the master
1819
    myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
1820
    master_singlehomed = myself.secondary_ip == myself.primary_ip
1821
    newbie_singlehomed = secondary_ip == primary_ip
1822
    if master_singlehomed != newbie_singlehomed:
1823
      if master_singlehomed:
1824
        raise errors.OpPrereqError("The master has no private ip but the"
1825
                                   " new node has one")
1826
      else:
1827
        raise errors.OpPrereqError("The master has a private ip but the"
1828
                                   " new node doesn't have one")
1829

    
1830
    # checks reachablity
1831
    if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1832
      raise errors.OpPrereqError("Node not reachable by ping")
1833

    
1834
    if not newbie_singlehomed:
1835
      # check reachability from my secondary ip to newbie's secondary ip
1836
      if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1837
                           source=myself.secondary_ip):
1838
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1839
                                   " based ping to noded port")
1840

    
1841
    self.new_node = objects.Node(name=node,
1842
                                 primary_ip=primary_ip,
1843
                                 secondary_ip=secondary_ip)
1844

    
1845
  def Exec(self, feedback_fn):
1846
    """Adds the new node to the cluster.
1847

1848
    """
1849
    new_node = self.new_node
1850
    node = new_node.name
1851

    
1852
    # check connectivity
1853
    result = self.rpc.call_version([node])[node]
1854
    if result:
1855
      if constants.PROTOCOL_VERSION == result:
1856
        logging.info("Communication to node %s fine, sw version %s match",
1857
                     node, result)
1858
      else:
1859
        raise errors.OpExecError("Version mismatch master version %s,"
1860
                                 " node version %s" %
1861
                                 (constants.PROTOCOL_VERSION, result))
1862
    else:
1863
      raise errors.OpExecError("Cannot get version from the new node")
1864

    
1865
    # setup ssh on node
1866
    logging.info("Copy ssh key to node %s", node)
1867
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1868
    keyarray = []
1869
    keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1870
                constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1871
                priv_key, pub_key]
1872

    
1873
    for i in keyfiles:
1874
      f = open(i, 'r')
1875
      try:
1876
        keyarray.append(f.read())
1877
      finally:
1878
        f.close()
1879

    
1880
    result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
1881
                                    keyarray[2],
1882
                                    keyarray[3], keyarray[4], keyarray[5])
1883

    
1884
    if not result:
1885
      raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1886

    
1887
    # Add node to our /etc/hosts, and add key to known_hosts
1888
    utils.AddHostToEtcHosts(new_node.name)
1889

    
1890
    if new_node.secondary_ip != new_node.primary_ip:
1891
      if not self.rpc.call_node_has_ip_address(new_node.name,
1892
                                               new_node.secondary_ip):
1893
        raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1894
                                 " you gave (%s). Please fix and re-run this"
1895
                                 " command." % new_node.secondary_ip)
1896

    
1897
    node_verify_list = [self.cfg.GetMasterNode()]
1898
    node_verify_param = {
1899
      'nodelist': [node],
1900
      # TODO: do a node-net-test as well?
1901
    }
1902

    
1903
    result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
1904
                                       self.cfg.GetClusterName())
1905
    for verifier in node_verify_list:
1906
      if not result[verifier]:
1907
        raise errors.OpExecError("Cannot communicate with %s's node daemon"
1908
                                 " for remote verification" % verifier)
1909
      if result[verifier]['nodelist']:
1910
        for failed in result[verifier]['nodelist']:
1911
          feedback_fn("ssh/hostname verification failed %s -> %s" %
1912
                      (verifier, result[verifier]['nodelist'][failed]))
1913
        raise errors.OpExecError("ssh/hostname verification failed.")
1914

    
1915
    # Distribute updated /etc/hosts and known_hosts to all nodes,
1916
    # including the node just added
1917
    myself = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
1918
    dist_nodes = self.cfg.GetNodeList()
1919
    if not self.op.readd:
1920
      dist_nodes.append(node)
1921
    if myself.name in dist_nodes:
1922
      dist_nodes.remove(myself.name)
1923

    
1924
    logging.debug("Copying hosts and known_hosts to all nodes")
1925
    for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1926
      result = self.rpc.call_upload_file(dist_nodes, fname)
1927
      for to_node in dist_nodes:
1928
        if not result[to_node]:
1929
          logging.error("Copy of file %s to node %s failed", fname, to_node)
1930

    
1931
    to_copy = []
1932
    if constants.HT_XEN_HVM in self.cfg.GetClusterInfo().enabled_hypervisors:
1933
      to_copy.append(constants.VNC_PASSWORD_FILE)
1934
    for fname in to_copy:
1935
      result = self.rpc.call_upload_file([node], fname)
1936
      if not result[node]:
1937
        logging.error("Could not copy file %s to node %s", fname, node)
1938

    
1939
    if self.op.readd:
1940
      self.context.ReaddNode(new_node)
1941
    else:
1942
      self.context.AddNode(new_node)
1943

    
1944

    
1945
class LUQueryClusterInfo(NoHooksLU):
1946
  """Query cluster configuration.
1947

1948
  """
1949
  _OP_REQP = []
1950
  REQ_MASTER = False
1951
  REQ_BGL = False
1952

    
1953
  def ExpandNames(self):
1954
    self.needed_locks = {}
1955

    
1956
  def CheckPrereq(self):
1957
    """No prerequsites needed for this LU.
1958

1959
    """
1960
    pass
1961

    
1962
  def Exec(self, feedback_fn):
1963
    """Return cluster config.
1964

1965
    """
1966
    cluster = self.cfg.GetClusterInfo()
1967
    result = {
1968
      "software_version": constants.RELEASE_VERSION,
1969
      "protocol_version": constants.PROTOCOL_VERSION,
1970
      "config_version": constants.CONFIG_VERSION,
1971
      "os_api_version": constants.OS_API_VERSION,
1972
      "export_version": constants.EXPORT_VERSION,
1973
      "architecture": (platform.architecture()[0], platform.machine()),
1974
      "name": cluster.cluster_name,
1975
      "master": cluster.master_node,
1976
      "default_hypervisor": cluster.default_hypervisor,
1977
      "enabled_hypervisors": cluster.enabled_hypervisors,
1978
      "hvparams": cluster.hvparams,
1979
      "beparams": cluster.beparams,
1980
      }
1981

    
1982
    return result
1983

    
1984

    
1985
class LUQueryConfigValues(NoHooksLU):
1986
  """Return configuration values.
1987

1988
  """
1989
  _OP_REQP = []
1990
  REQ_BGL = False
1991
  _FIELDS_DYNAMIC = utils.FieldSet()
1992
  _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag")
1993

    
1994
  def ExpandNames(self):
1995
    self.needed_locks = {}
1996

    
1997
    _CheckOutputFields(static=self._FIELDS_STATIC,
1998
                       dynamic=self._FIELDS_DYNAMIC,
1999
                       selected=self.op.output_fields)
2000

    
2001
  def CheckPrereq(self):
2002
    """No prerequisites.
2003

2004
    """
2005
    pass
2006

    
2007
  def Exec(self, feedback_fn):
2008
    """Dump a representation of the cluster config to the standard output.
2009

2010
    """
2011
    values = []
2012
    for field in self.op.output_fields:
2013
      if field == "cluster_name":
2014
        entry = self.cfg.GetClusterName()
2015
      elif field == "master_node":
2016
        entry = self.cfg.GetMasterNode()
2017
      elif field == "drain_flag":
2018
        entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
2019
      else:
2020
        raise errors.ParameterError(field)
2021
      values.append(entry)
2022
    return values
2023

    
2024

    
2025
class LUActivateInstanceDisks(NoHooksLU):
2026
  """Bring up an instance's disks.
2027

2028
  """
2029
  _OP_REQP = ["instance_name"]
2030
  REQ_BGL = False
2031

    
2032
  def ExpandNames(self):
2033
    self._ExpandAndLockInstance()
2034
    self.needed_locks[locking.LEVEL_NODE] = []
2035
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2036

    
2037
  def DeclareLocks(self, level):
2038
    if level == locking.LEVEL_NODE:
2039
      self._LockInstancesNodes()
2040

    
2041
  def CheckPrereq(self):
2042
    """Check prerequisites.
2043

2044
    This checks that the instance is in the cluster.
2045

2046
    """
2047
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2048
    assert self.instance is not None, \
2049
      "Cannot retrieve locked instance %s" % self.op.instance_name
2050

    
2051
  def Exec(self, feedback_fn):
2052
    """Activate the disks.
2053

2054
    """
2055
    disks_ok, disks_info = _AssembleInstanceDisks(self, self.instance)
2056
    if not disks_ok:
2057
      raise errors.OpExecError("Cannot activate block devices")
2058

    
2059
    return disks_info
2060

    
2061

    
2062
def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
2063
  """Prepare the block devices for an instance.
2064

2065
  This sets up the block devices on all nodes.
2066

2067
  @type lu: L{LogicalUnit}
2068
  @param lu: the logical unit on whose behalf we execute
2069
  @type instance: L{objects.Instance}
2070
  @param instance: the instance for whose disks we assemble
2071
  @type ignore_secondaries: boolean
2072
  @param ignore_secondaries: if true, errors on secondary nodes
2073
      won't result in an error return from the function
2074
  @return: False if the operation failed, otherwise a list of
2075
      (host, instance_visible_name, node_visible_name)
2076
      with the mapping from node devices to instance devices
2077

2078
  """
2079
  device_info = []
2080
  disks_ok = True
2081
  iname = instance.name
2082
  # With the two passes mechanism we try to reduce the window of
2083
  # opportunity for the race condition of switching DRBD to primary
2084
  # before handshaking occured, but we do not eliminate it
2085

    
2086
  # The proper fix would be to wait (with some limits) until the
2087
  # connection has been made and drbd transitions from WFConnection
2088
  # into any other network-connected state (Connected, SyncTarget,
2089
  # SyncSource, etc.)
2090

    
2091
  # 1st pass, assemble on all nodes in secondary mode
2092
  for inst_disk in instance.disks:
2093
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2094
      lu.cfg.SetDiskID(node_disk, node)
2095
      result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
2096
      if not result:
2097
        lu.proc.LogWarning("Could not prepare block device %s on node %s"
2098
                           " (is_primary=False, pass=1)",
2099
                           inst_disk.iv_name, node)
2100
        if not ignore_secondaries:
2101
          disks_ok = False
2102

    
2103
  # FIXME: race condition on drbd migration to primary
2104

    
2105
  # 2nd pass, do only the primary node
2106
  for inst_disk in instance.disks:
2107
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2108
      if node != instance.primary_node:
2109
        continue
2110
      lu.cfg.SetDiskID(node_disk, node)
2111
      result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
2112
      if not result:
2113
        lu.proc.LogWarning("Could not prepare block device %s on node %s"
2114
                           " (is_primary=True, pass=2)",
2115
                           inst_disk.iv_name, node)
2116
        disks_ok = False
2117
    device_info.append((instance.primary_node, inst_disk.iv_name, result))
2118

    
2119
  # leave the disks configured for the primary node
2120
  # this is a workaround that would be fixed better by
2121
  # improving the logical/physical id handling
2122
  for disk in instance.disks:
2123
    lu.cfg.SetDiskID(disk, instance.primary_node)
2124

    
2125
  return disks_ok, device_info
2126

    
2127

    
2128
def _StartInstanceDisks(lu, instance, force):
2129
  """Start the disks of an instance.
2130

2131
  """
2132
  disks_ok, dummy = _AssembleInstanceDisks(lu, instance,
2133
                                           ignore_secondaries=force)
2134
  if not disks_ok:
2135
    _ShutdownInstanceDisks(lu, instance)
2136
    if force is not None and not force:
2137
      lu.proc.LogWarning("", hint="If the message above refers to a"
2138
                         " secondary node,"
2139
                         " you can retry the operation using '--force'.")
2140
    raise errors.OpExecError("Disk consistency error")
2141

    
2142

    
2143
class LUDeactivateInstanceDisks(NoHooksLU):
2144
  """Shutdown an instance's disks.
2145

2146
  """
2147
  _OP_REQP = ["instance_name"]
2148
  REQ_BGL = False
2149

    
2150
  def ExpandNames(self):
2151
    self._ExpandAndLockInstance()
2152
    self.needed_locks[locking.LEVEL_NODE] = []
2153
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2154

    
2155
  def DeclareLocks(self, level):
2156
    if level == locking.LEVEL_NODE:
2157
      self._LockInstancesNodes()
2158

    
2159
  def CheckPrereq(self):
2160
    """Check prerequisites.
2161

2162
    This checks that the instance is in the cluster.
2163

2164
    """
2165
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2166
    assert self.instance is not None, \
2167
      "Cannot retrieve locked instance %s" % self.op.instance_name
2168

    
2169
  def Exec(self, feedback_fn):
2170
    """Deactivate the disks
2171

2172
    """
2173
    instance = self.instance
2174
    _SafeShutdownInstanceDisks(self, instance)
2175

    
2176

    
2177
def _SafeShutdownInstanceDisks(lu, instance):
2178
  """Shutdown block devices of an instance.
2179

2180
  This function checks if an instance is running, before calling
2181
  _ShutdownInstanceDisks.
2182

2183
  """
2184
  ins_l = lu.rpc.call_instance_list([instance.primary_node],
2185
                                      [instance.hypervisor])
2186
  ins_l = ins_l[instance.primary_node]
2187
  if not type(ins_l) is list:
2188
    raise errors.OpExecError("Can't contact node '%s'" %
2189
                             instance.primary_node)
2190

    
2191
  if instance.name in ins_l:
2192
    raise errors.OpExecError("Instance is running, can't shutdown"
2193
                             " block devices.")
2194

    
2195
  _ShutdownInstanceDisks(lu, instance)
2196

    
2197

    
2198
def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
2199
  """Shutdown block devices of an instance.
2200

2201
  This does the shutdown on all nodes of the instance.
2202

2203
  If the ignore_primary is false, errors on the primary node are
2204
  ignored.
2205

2206
  """
2207
  result = True
2208
  for disk in instance.disks:
2209
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2210
      lu.cfg.SetDiskID(top_disk, node)
2211
      if not lu.rpc.call_blockdev_shutdown(node, top_disk):
2212
        logging.error("Could not shutdown block device %s on node %s",
2213
                      disk.iv_name, node)
2214
        if not ignore_primary or node != instance.primary_node:
2215
          result = False
2216
  return result
2217

    
2218

    
2219
def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor):
2220
  """Checks if a node has enough free memory.
2221

2222
  This function check if a given node has the needed amount of free
2223
  memory. In case the node has less memory or we cannot get the
2224
  information from the node, this function raise an OpPrereqError
2225
  exception.
2226

2227
  @type lu: C{LogicalUnit}
2228
  @param lu: a logical unit from which we get configuration data
2229
  @type node: C{str}
2230
  @param node: the node to check
2231
  @type reason: C{str}
2232
  @param reason: string to use in the error message
2233
  @type requested: C{int}
2234
  @param requested: the amount of memory in MiB to check for
2235
  @type hypervisor: C{str}
2236
  @param hypervisor: the hypervisor to ask for memory stats
2237
  @raise errors.OpPrereqError: if the node doesn't have enough memory, or
2238
      we cannot check the node
2239

2240
  """
2241
  nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor)
2242
  if not nodeinfo or not isinstance(nodeinfo, dict):
2243
    raise errors.OpPrereqError("Could not contact node %s for resource"
2244
                             " information" % (node,))
2245

    
2246
  free_mem = nodeinfo[node].get('memory_free')
2247
  if not isinstance(free_mem, int):
2248
    raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2249
                             " was '%s'" % (node, free_mem))
2250
  if requested > free_mem:
2251
    raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2252
                             " needed %s MiB, available %s MiB" %
2253
                             (node, reason, requested, free_mem))
2254

    
2255

    
2256
class LUStartupInstance(LogicalUnit):
2257
  """Starts an instance.
2258

2259
  """
2260
  HPATH = "instance-start"
2261
  HTYPE = constants.HTYPE_INSTANCE
2262
  _OP_REQP = ["instance_name", "force"]
2263
  REQ_BGL = False
2264

    
2265
  def ExpandNames(self):
2266
    self._ExpandAndLockInstance()
2267

    
2268
  def BuildHooksEnv(self):
2269
    """Build hooks env.
2270

2271
    This runs on master, primary and secondary nodes of the instance.
2272

2273
    """
2274
    env = {
2275
      "FORCE": self.op.force,
2276
      }
2277
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2278
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2279
          list(self.instance.secondary_nodes))
2280
    return env, nl, nl
2281

    
2282
  def CheckPrereq(self):
2283
    """Check prerequisites.
2284

2285
    This checks that the instance is in the cluster.
2286

2287
    """
2288
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2289
    assert self.instance is not None, \
2290
      "Cannot retrieve locked instance %s" % self.op.instance_name
2291

    
2292
    bep = self.cfg.GetClusterInfo().FillBE(instance)
2293
    # check bridges existance
2294
    _CheckInstanceBridgesExist(self, instance)
2295

    
2296
    _CheckNodeFreeMemory(self, instance.primary_node,
2297
                         "starting instance %s" % instance.name,
2298
                         bep[constants.BE_MEMORY], instance.hypervisor)
2299

    
2300
  def Exec(self, feedback_fn):
2301
    """Start the instance.
2302

2303
    """
2304
    instance = self.instance
2305
    force = self.op.force
2306
    extra_args = getattr(self.op, "extra_args", "")
2307

    
2308
    self.cfg.MarkInstanceUp(instance.name)
2309

    
2310
    node_current = instance.primary_node
2311

    
2312
    _StartInstanceDisks(self, instance, force)
2313

    
2314
    if not self.rpc.call_instance_start(node_current, instance, extra_args):
2315
      _ShutdownInstanceDisks(self, instance)
2316
      raise errors.OpExecError("Could not start instance")
2317

    
2318

    
2319
class LURebootInstance(LogicalUnit):
2320
  """Reboot an instance.
2321

2322
  """
2323
  HPATH = "instance-reboot"
2324
  HTYPE = constants.HTYPE_INSTANCE
2325
  _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2326
  REQ_BGL = False
2327

    
2328
  def ExpandNames(self):
2329
    if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2330
                                   constants.INSTANCE_REBOOT_HARD,
2331
                                   constants.INSTANCE_REBOOT_FULL]:
2332
      raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2333
                                  (constants.INSTANCE_REBOOT_SOFT,
2334
                                   constants.INSTANCE_REBOOT_HARD,
2335
                                   constants.INSTANCE_REBOOT_FULL))
2336
    self._ExpandAndLockInstance()
2337

    
2338
  def BuildHooksEnv(self):
2339
    """Build hooks env.
2340

2341
    This runs on master, primary and secondary nodes of the instance.
2342

2343
    """
2344
    env = {
2345
      "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2346
      }
2347
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2348
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2349
          list(self.instance.secondary_nodes))
2350
    return env, nl, nl
2351

    
2352
  def CheckPrereq(self):
2353
    """Check prerequisites.
2354

2355
    This checks that the instance is in the cluster.
2356

2357
    """
2358
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2359
    assert self.instance is not None, \
2360
      "Cannot retrieve locked instance %s" % self.op.instance_name
2361

    
2362
    # check bridges existance
2363
    _CheckInstanceBridgesExist(self, instance)
2364

    
2365
  def Exec(self, feedback_fn):
2366
    """Reboot the instance.
2367

2368
    """
2369
    instance = self.instance
2370
    ignore_secondaries = self.op.ignore_secondaries
2371
    reboot_type = self.op.reboot_type
2372
    extra_args = getattr(self.op, "extra_args", "")
2373

    
2374
    node_current = instance.primary_node
2375

    
2376
    if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2377
                       constants.INSTANCE_REBOOT_HARD]:
2378
      if not self.rpc.call_instance_reboot(node_current, instance,
2379
                                           reboot_type, extra_args):
2380
        raise errors.OpExecError("Could not reboot instance")
2381
    else:
2382
      if not self.rpc.call_instance_shutdown(node_current, instance):
2383
        raise errors.OpExecError("could not shutdown instance for full reboot")
2384
      _ShutdownInstanceDisks(self, instance)
2385
      _StartInstanceDisks(self, instance, ignore_secondaries)
2386
      if not self.rpc.call_instance_start(node_current, instance, extra_args):
2387
        _ShutdownInstanceDisks(self, instance)
2388
        raise errors.OpExecError("Could not start instance for full reboot")
2389

    
2390
    self.cfg.MarkInstanceUp(instance.name)
2391

    
2392

    
2393
class LUShutdownInstance(LogicalUnit):
2394
  """Shutdown an instance.
2395

2396
  """
2397
  HPATH = "instance-stop"
2398
  HTYPE = constants.HTYPE_INSTANCE
2399
  _OP_REQP = ["instance_name"]
2400
  REQ_BGL = False
2401

    
2402
  def ExpandNames(self):
2403
    self._ExpandAndLockInstance()
2404

    
2405
  def BuildHooksEnv(self):
2406
    """Build hooks env.
2407

2408
    This runs on master, primary and secondary nodes of the instance.
2409

2410
    """
2411
    env = _BuildInstanceHookEnvByObject(self, self.instance)
2412
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2413
          list(self.instance.secondary_nodes))
2414
    return env, nl, nl
2415

    
2416
  def CheckPrereq(self):
2417
    """Check prerequisites.
2418

2419
    This checks that the instance is in the cluster.
2420

2421
    """
2422
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2423
    assert self.instance is not None, \
2424
      "Cannot retrieve locked instance %s" % self.op.instance_name
2425

    
2426
  def Exec(self, feedback_fn):
2427
    """Shutdown the instance.
2428

2429
    """
2430
    instance = self.instance
2431
    node_current = instance.primary_node
2432
    self.cfg.MarkInstanceDown(instance.name)
2433
    if not self.rpc.call_instance_shutdown(node_current, instance):
2434
      self.proc.LogWarning("Could not shutdown instance")
2435

    
2436
    _ShutdownInstanceDisks(self, instance)
2437

    
2438

    
2439
class LUReinstallInstance(LogicalUnit):
2440
  """Reinstall an instance.
2441

2442
  """
2443
  HPATH = "instance-reinstall"
2444
  HTYPE = constants.HTYPE_INSTANCE
2445
  _OP_REQP = ["instance_name"]
2446
  REQ_BGL = False
2447

    
2448
  def ExpandNames(self):
2449
    self._ExpandAndLockInstance()
2450

    
2451
  def BuildHooksEnv(self):
2452
    """Build hooks env.
2453

2454
    This runs on master, primary and secondary nodes of the instance.
2455

2456
    """
2457
    env = _BuildInstanceHookEnvByObject(self, self.instance)
2458
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2459
          list(self.instance.secondary_nodes))
2460
    return env, nl, nl
2461

    
2462
  def CheckPrereq(self):
2463
    """Check prerequisites.
2464

2465
    This checks that the instance is in the cluster and is not running.
2466

2467
    """
2468
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2469
    assert instance is not None, \
2470
      "Cannot retrieve locked instance %s" % self.op.instance_name
2471

    
2472
    if instance.disk_template == constants.DT_DISKLESS:
2473
      raise errors.OpPrereqError("Instance '%s' has no disks" %
2474
                                 self.op.instance_name)
2475
    if instance.status != "down":
2476
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2477
                                 self.op.instance_name)
2478
    remote_info = self.rpc.call_instance_info(instance.primary_node,
2479
                                              instance.name,
2480
                                              instance.hypervisor)
2481
    if remote_info:
2482
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2483
                                 (self.op.instance_name,
2484
                                  instance.primary_node))
2485

    
2486
    self.op.os_type = getattr(self.op, "os_type", None)
2487
    if self.op.os_type is not None:
2488
      # OS verification
2489
      pnode = self.cfg.GetNodeInfo(
2490
        self.cfg.ExpandNodeName(instance.primary_node))
2491
      if pnode is None:
2492
        raise errors.OpPrereqError("Primary node '%s' is unknown" %
2493
                                   self.op.pnode)
2494
      os_obj = self.rpc.call_os_get(pnode.name, self.op.os_type)
2495
      if not os_obj:
2496
        raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2497
                                   " primary node"  % self.op.os_type)
2498

    
2499
    self.instance = instance
2500

    
2501
  def Exec(self, feedback_fn):
2502
    """Reinstall the instance.
2503

2504
    """
2505
    inst = self.instance
2506

    
2507
    if self.op.os_type is not None:
2508
      feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2509
      inst.os = self.op.os_type
2510
      self.cfg.Update(inst)
2511

    
2512
    _StartInstanceDisks(self, inst, None)
2513
    try:
2514
      feedback_fn("Running the instance OS create scripts...")
2515
      if not self.rpc.call_instance_os_add(inst.primary_node, inst):
2516
        raise errors.OpExecError("Could not install OS for instance %s"
2517
                                 " on node %s" %
2518
                                 (inst.name, inst.primary_node))
2519
    finally:
2520
      _ShutdownInstanceDisks(self, inst)
2521

    
2522

    
2523
class LURenameInstance(LogicalUnit):
2524
  """Rename an instance.
2525

2526
  """
2527
  HPATH = "instance-rename"
2528
  HTYPE = constants.HTYPE_INSTANCE
2529
  _OP_REQP = ["instance_name", "new_name"]
2530

    
2531
  def BuildHooksEnv(self):
2532
    """Build hooks env.
2533

2534
    This runs on master, primary and secondary nodes of the instance.
2535

2536
    """
2537
    env = _BuildInstanceHookEnvByObject(self, self.instance)
2538
    env["INSTANCE_NEW_NAME"] = self.op.new_name
2539
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2540
          list(self.instance.secondary_nodes))
2541
    return env, nl, nl
2542

    
2543
  def CheckPrereq(self):
2544
    """Check prerequisites.
2545

2546
    This checks that the instance is in the cluster and is not running.
2547

2548
    """
2549
    instance = self.cfg.GetInstanceInfo(
2550
      self.cfg.ExpandInstanceName(self.op.instance_name))
2551
    if instance is None:
2552
      raise errors.OpPrereqError("Instance '%s' not known" %
2553
                                 self.op.instance_name)
2554
    if instance.status != "down":
2555
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2556
                                 self.op.instance_name)
2557
    remote_info = self.rpc.call_instance_info(instance.primary_node,
2558
                                              instance.name,
2559
                                              instance.hypervisor)
2560
    if remote_info:
2561
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2562
                                 (self.op.instance_name,
2563
                                  instance.primary_node))
2564
    self.instance = instance
2565

    
2566
    # new name verification
2567
    name_info = utils.HostInfo(self.op.new_name)
2568

    
2569
    self.op.new_name = new_name = name_info.name
2570
    instance_list = self.cfg.GetInstanceList()
2571
    if new_name in instance_list:
2572
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2573
                                 new_name)
2574

    
2575
    if not getattr(self.op, "ignore_ip", False):
2576
      if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
2577
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
2578
                                   (name_info.ip, new_name))
2579

    
2580

    
2581
  def Exec(self, feedback_fn):
2582
    """Reinstall the instance.
2583

2584
    """
2585
    inst = self.instance
2586
    old_name = inst.name
2587

    
2588
    if inst.disk_template == constants.DT_FILE:
2589
      old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2590

    
2591
    self.cfg.RenameInstance(inst.name, self.op.new_name)
2592
    # Change the instance lock. This is definitely safe while we hold the BGL
2593
    self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
2594
    self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
2595

    
2596
    # re-read the instance from the configuration after rename
2597
    inst = self.cfg.GetInstanceInfo(self.op.new_name)
2598

    
2599
    if inst.disk_template == constants.DT_FILE:
2600
      new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2601
      result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
2602
                                                     old_file_storage_dir,
2603
                                                     new_file_storage_dir)
2604

    
2605
      if not result:
2606
        raise errors.OpExecError("Could not connect to node '%s' to rename"
2607
                                 " directory '%s' to '%s' (but the instance"
2608
                                 " has been renamed in Ganeti)" % (
2609
                                 inst.primary_node, old_file_storage_dir,
2610
                                 new_file_storage_dir))
2611

    
2612
      if not result[0]:
2613
        raise errors.OpExecError("Could not rename directory '%s' to '%s'"
2614
                                 " (but the instance has been renamed in"
2615
                                 " Ganeti)" % (old_file_storage_dir,
2616
                                               new_file_storage_dir))
2617

    
2618
    _StartInstanceDisks(self, inst, None)
2619
    try:
2620
      if not self.rpc.call_instance_run_rename(inst.primary_node, inst,
2621
                                               old_name):
2622
        msg = ("Could not run OS rename script for instance %s on node %s"
2623
               " (but the instance has been renamed in Ganeti)" %
2624
               (inst.name, inst.primary_node))
2625
        self.proc.LogWarning(msg)
2626
    finally:
2627
      _ShutdownInstanceDisks(self, inst)
2628

    
2629

    
2630
class LURemoveInstance(LogicalUnit):
2631
  """Remove an instance.
2632

2633
  """
2634
  HPATH = "instance-remove"
2635
  HTYPE = constants.HTYPE_INSTANCE
2636
  _OP_REQP = ["instance_name", "ignore_failures"]
2637
  REQ_BGL = False
2638

    
2639
  def ExpandNames(self):
2640
    self._ExpandAndLockInstance()
2641
    self.needed_locks[locking.LEVEL_NODE] = []
2642
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2643

    
2644
  def DeclareLocks(self, level):
2645
    if level == locking.LEVEL_NODE:
2646
      self._LockInstancesNodes()
2647

    
2648
  def BuildHooksEnv(self):
2649
    """Build hooks env.
2650

2651
    This runs on master, primary and secondary nodes of the instance.
2652

2653
    """
2654
    env = _BuildInstanceHookEnvByObject(self, self.instance)
2655
    nl = [self.cfg.GetMasterNode()]
2656
    return env, nl, nl
2657

    
2658
  def CheckPrereq(self):
2659
    """Check prerequisites.
2660

2661
    This checks that the instance is in the cluster.
2662

2663
    """
2664
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2665
    assert self.instance is not None, \
2666
      "Cannot retrieve locked instance %s" % self.op.instance_name
2667

    
2668
  def Exec(self, feedback_fn):
2669
    """Remove the instance.
2670

2671
    """
2672
    instance = self.instance
2673
    logging.info("Shutting down instance %s on node %s",
2674
                 instance.name, instance.primary_node)
2675

    
2676
    if not self.rpc.call_instance_shutdown(instance.primary_node, instance):
2677
      if self.op.ignore_failures:
2678
        feedback_fn("Warning: can't shutdown instance")
2679
      else:
2680
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2681
                                 (instance.name, instance.primary_node))
2682

    
2683
    logging.info("Removing block devices for instance %s", instance.name)
2684

    
2685
    if not _RemoveDisks(self, instance):
2686
      if self.op.ignore_failures:
2687
        feedback_fn("Warning: can't remove instance's disks")
2688
      else:
2689
        raise errors.OpExecError("Can't remove instance's disks")
2690

    
2691
    logging.info("Removing instance %s out of cluster config", instance.name)
2692

    
2693
    self.cfg.RemoveInstance(instance.name)
2694
    self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
2695

    
2696

    
2697
class LUQueryInstances(NoHooksLU):
2698
  """Logical unit for querying instances.
2699

2700
  """
2701
  _OP_REQP = ["output_fields", "names"]
2702
  REQ_BGL = False
2703
  _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
2704
                                    "admin_state", "admin_ram",
2705
                                    "disk_template", "ip", "mac", "bridge",
2706
                                    "sda_size", "sdb_size", "vcpus", "tags",
2707
                                    "network_port", "beparams",
2708
                                    "(disk).(size)/([0-9]+)",
2709
                                    "(disk).(sizes)",
2710
                                    "(nic).(mac|ip|bridge)/([0-9]+)",
2711
                                    "(nic).(macs|ips|bridges)",
2712
                                    "(disk|nic).(count)",
2713
                                    "serial_no", "hypervisor", "hvparams",] +
2714
                                  ["hv/%s" % name
2715
                                   for name in constants.HVS_PARAMETERS] +
2716
                                  ["be/%s" % name
2717
                                   for name in constants.BES_PARAMETERS])
2718
  _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
2719

    
2720

    
2721
  def ExpandNames(self):
2722
    _CheckOutputFields(static=self._FIELDS_STATIC,
2723
                       dynamic=self._FIELDS_DYNAMIC,
2724
                       selected=self.op.output_fields)
2725

    
2726
    self.needed_locks = {}
2727
    self.share_locks[locking.LEVEL_INSTANCE] = 1
2728
    self.share_locks[locking.LEVEL_NODE] = 1
2729

    
2730
    if self.op.names:
2731
      self.wanted = _GetWantedInstances(self, self.op.names)
2732
    else:
2733
      self.wanted = locking.ALL_SET
2734

    
2735
    self.do_locking = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
2736
    if self.do_locking:
2737
      self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
2738
      self.needed_locks[locking.LEVEL_NODE] = []
2739
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2740

    
2741
  def DeclareLocks(self, level):
2742
    if level == locking.LEVEL_NODE and self.do_locking:
2743
      self._LockInstancesNodes()
2744

    
2745
  def CheckPrereq(self):
2746
    """Check prerequisites.
2747

2748
    """
2749
    pass
2750

    
2751
  def Exec(self, feedback_fn):
2752
    """Computes the list of nodes and their attributes.
2753

2754
    """
2755
    all_info = self.cfg.GetAllInstancesInfo()
2756
    if self.do_locking:
2757
      instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
2758
    elif self.wanted != locking.ALL_SET:
2759
      instance_names = self.wanted
2760
      missing = set(instance_names).difference(all_info.keys())
2761
      if missing:
2762
        raise errors.OpExecError(
2763
          "Some instances were removed before retrieving their data: %s"
2764
          % missing)
2765
    else:
2766
      instance_names = all_info.keys()
2767

    
2768
    instance_names = utils.NiceSort(instance_names)
2769
    instance_list = [all_info[iname] for iname in instance_names]
2770

    
2771
    # begin data gathering
2772

    
2773
    nodes = frozenset([inst.primary_node for inst in instance_list])
2774
    hv_list = list(set([inst.hypervisor for inst in instance_list]))
2775

    
2776
    bad_nodes = []
2777
    if self.do_locking:
2778
      live_data = {}
2779
      node_data = self.rpc.call_all_instances_info(nodes, hv_list)
2780
      for name in nodes:
2781
        result = node_data[name]
2782
        if result:
2783
          live_data.update(result)
2784
        elif result == False:
2785
          bad_nodes.append(name)
2786
        # else no instance is alive
2787
    else:
2788
      live_data = dict([(name, {}) for name in instance_names])
2789

    
2790
    # end data gathering
2791

    
2792
    HVPREFIX = "hv/"
2793
    BEPREFIX = "be/"
2794
    output = []
2795
    for instance in instance_list:
2796
      iout = []
2797
      i_hv = self.cfg.GetClusterInfo().FillHV(instance)
2798
      i_be = self.cfg.GetClusterInfo().FillBE(instance)
2799
      for field in self.op.output_fields:
2800
        st_match = self._FIELDS_STATIC.Matches(field)
2801
        if field == "name":
2802
          val = instance.name
2803
        elif field == "os":
2804
          val = instance.os
2805
        elif field == "pnode":
2806
          val = instance.primary_node
2807
        elif field == "snodes":
2808
          val = list(instance.secondary_nodes)
2809
        elif field == "admin_state":
2810
          val = (instance.status != "down")
2811
        elif field == "oper_state":
2812
          if instance.primary_node in bad_nodes:
2813
            val = None
2814
          else:
2815
            val = bool(live_data.get(instance.name))
2816
        elif field == "status":
2817
          if instance.primary_node in bad_nodes:
2818
            val = "ERROR_nodedown"
2819
          else:
2820
            running = bool(live_data.get(instance.name))
2821
            if running:
2822
              if instance.status != "down":
2823
                val = "running"
2824
              else:
2825
                val = "ERROR_up"
2826
            else:
2827
              if instance.status != "down":
2828
                val = "ERROR_down"
2829
              else:
2830
                val = "ADMIN_down"
2831
        elif field == "oper_ram":
2832
          if instance.primary_node in bad_nodes:
2833
            val = None
2834
          elif instance.name in live_data:
2835
            val = live_data[instance.name].get("memory", "?")
2836
          else:
2837
            val = "-"
2838
        elif field == "disk_template":
2839
          val = instance.disk_template
2840
        elif field == "ip":
2841
          val = instance.nics[0].ip
2842
        elif field == "bridge":
2843
          val = instance.nics[0].bridge
2844
        elif field == "mac":
2845
          val = instance.nics[0].mac
2846
        elif field == "sda_size" or field == "sdb_size":
2847
          idx = ord(field[2]) - ord('a')
2848
          try:
2849
            val = instance.FindDisk(idx).size
2850
          except errors.OpPrereqError:
2851
            val = None
2852
        elif field == "tags":
2853
          val = list(instance.GetTags())
2854
        elif field == "serial_no":
2855
          val = instance.serial_no
2856
        elif field == "network_port":
2857
          val = instance.network_port
2858
        elif field == "hypervisor":
2859
          val = instance.hypervisor
2860
        elif field == "hvparams":
2861
          val = i_hv
2862
        elif (field.startswith(HVPREFIX) and
2863
              field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
2864
          val = i_hv.get(field[len(HVPREFIX):], None)
2865
        elif field == "beparams":
2866
          val = i_be
2867
        elif (field.startswith(BEPREFIX) and
2868
              field[len(BEPREFIX):] in constants.BES_PARAMETERS):
2869
          val = i_be.get(field[len(BEPREFIX):], None)
2870
        elif st_match and st_match.groups():
2871
          # matches a variable list
2872
          st_groups = st_match.groups()
2873
          if st_groups and st_groups[0] == "disk":
2874
            if st_groups[1] == "count":
2875
              val = len(instance.disks)
2876
            elif st_groups[1] == "sizes":
2877
              val = [disk.size for disk in instance.disks]
2878
            elif st_groups[1] == "size":
2879
              try:
2880
                val = instance.FindDisk(st_groups[2]).size
2881
              except errors.OpPrereqError:
2882
                val = None
2883
            else:
2884
              assert False, "Unhandled disk parameter"
2885
          elif st_groups[0] == "nic":
2886
            if st_groups[1] == "count":
2887
              val = len(instance.nics)
2888
            elif st_groups[1] == "macs":
2889
              val = [nic.mac for nic in instance.nics]
2890
            elif st_groups[1] == "ips":
2891
              val = [nic.ip for nic in instance.nics]
2892
            elif st_groups[1] == "bridges":
2893
              val = [nic.bridge for nic in instance.nics]
2894
            else:
2895
              # index-based item
2896
              nic_idx = int(st_groups[2])
2897
              if nic_idx >= len(instance.nics):
2898
                val = None
2899
              else:
2900
                if st_groups[1] == "mac":
2901
                  val = instance.nics[nic_idx].mac
2902
                elif st_groups[1] == "ip":
2903
                  val = instance.nics[nic_idx].ip
2904
                elif st_groups[1] == "bridge":
2905
                  val = instance.nics[nic_idx].bridge
2906
                else:
2907
                  assert False, "Unhandled NIC parameter"
2908
          else:
2909
            assert False, "Unhandled variable parameter"
2910
        else:
2911
          raise errors.ParameterError(field)
2912
        iout.append(val)
2913
      output.append(iout)
2914

    
2915
    return output
2916

    
2917

    
2918
class LUFailoverInstance(LogicalUnit):
2919
  """Failover an instance.
2920

2921
  """
2922
  HPATH = "instance-failover"
2923
  HTYPE = constants.HTYPE_INSTANCE
2924
  _OP_REQP = ["instance_name", "ignore_consistency"]
2925
  REQ_BGL = False
2926

    
2927
  def ExpandNames(self):
2928
    self._ExpandAndLockInstance()
2929
    self.needed_locks[locking.LEVEL_NODE] = []
2930
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2931

    
2932
  def DeclareLocks(self, level):
2933
    if level == locking.LEVEL_NODE:
2934
      self._LockInstancesNodes()
2935

    
2936
  def BuildHooksEnv(self):
2937
    """Build hooks env.
2938

2939
    This runs on master, primary and secondary nodes of the instance.
2940

2941
    """
2942
    env = {
2943
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2944
      }
2945
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2946
    nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
2947
    return env, nl, nl
2948

    
2949
  def CheckPrereq(self):
2950
    """Check prerequisites.
2951

2952
    This checks that the instance is in the cluster.
2953

2954
    """
2955
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2956
    assert self.instance is not None, \
2957
      "Cannot retrieve locked instance %s" % self.op.instance_name
2958

    
2959
    bep = self.cfg.GetClusterInfo().FillBE(instance)
2960
    if instance.disk_template not in constants.DTS_NET_MIRROR:
2961
      raise errors.OpPrereqError("Instance's disk layout is not"
2962
                                 " network mirrored, cannot failover.")
2963

    
2964
    secondary_nodes = instance.secondary_nodes
2965
    if not secondary_nodes:
2966
      raise errors.ProgrammerError("no secondary node but using "
2967
                                   "a mirrored disk template")
2968

    
2969
    target_node = secondary_nodes[0]
2970
    # check memory requirements on the secondary node
2971
    _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
2972
                         instance.name, bep[constants.BE_MEMORY],
2973
                         instance.hypervisor)
2974

    
2975
    # check bridge existance
2976
    brlist = [nic.bridge for nic in instance.nics]
2977
    if not self.rpc.call_bridges_exist(target_node, brlist):
2978
      raise errors.OpPrereqError("One or more target bridges %s does not"
2979
                                 " exist on destination node '%s'" %
2980
                                 (brlist, target_node))
2981

    
2982
  def Exec(self, feedback_fn):
2983
    """Failover an instance.
2984

2985
    The failover is done by shutting it down on its present node and
2986
    starting it on the secondary.
2987

2988
    """
2989
    instance = self.instance
2990

    
2991
    source_node = instance.primary_node
2992
    target_node = instance.secondary_nodes[0]
2993

    
2994
    feedback_fn("* checking disk consistency between source and target")
2995
    for dev in instance.disks:
2996
      # for drbd, these are drbd over lvm
2997
      if not _CheckDiskConsistency(self, dev, target_node, False):
2998
        if instance.status == "up" and not self.op.ignore_consistency:
2999
          raise errors.OpExecError("Disk %s is degraded on target node,"
3000
                                   " aborting failover." % dev.iv_name)
3001

    
3002
    feedback_fn("* shutting down instance on source node")
3003
    logging.info("Shutting down instance %s on node %s",
3004
                 instance.name, source_node)
3005

    
3006
    if not self.rpc.call_instance_shutdown(source_node, instance):
3007
      if self.op.ignore_consistency:
3008
        self.proc.LogWarning("Could not shutdown instance %s on node %s."
3009
                             " Proceeding"
3010
                             " anyway. Please make sure node %s is down",
3011
                             instance.name, source_node, source_node)
3012
      else:
3013
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
3014
                                 (instance.name, source_node))
3015

    
3016
    feedback_fn("* deactivating the instance's disks on source node")
3017
    if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
3018
      raise errors.OpExecError("Can't shut down the instance's disks.")
3019

    
3020
    instance.primary_node = target_node
3021
    # distribute new instance config to the other nodes
3022
    self.cfg.Update(instance)
3023

    
3024
    # Only start the instance if it's marked as up
3025
    if instance.status == "up":
3026
      feedback_fn("* activating the instance's disks on target node")
3027
      logging.info("Starting instance %s on node %s",
3028
                   instance.name, target_node)
3029

    
3030
      disks_ok, dummy = _AssembleInstanceDisks(self, instance,
3031
                                               ignore_secondaries=True)
3032
      if not disks_ok:
3033
        _ShutdownInstanceDisks(self, instance)
3034
        raise errors.OpExecError("Can't activate the instance's disks")
3035

    
3036
      feedback_fn("* starting the instance on the target node")
3037
      if not self.rpc.call_instance_start(target_node, instance, None):
3038
        _ShutdownInstanceDisks(self, instance)
3039
        raise errors.OpExecError("Could not start instance %s on node %s." %
3040
                                 (instance.name, target_node))
3041

    
3042

    
3043
def _CreateBlockDevOnPrimary(lu, node, instance, device, info):
3044
  """Create a tree of block devices on the primary node.
3045

3046
  This always creates all devices.
3047

3048
  """
3049
  if device.children:
3050
    for child in device.children:
3051
      if not _CreateBlockDevOnPrimary(lu, node, instance, child, info):
3052
        return False
3053

    
3054
  lu.cfg.SetDiskID(device, node)
3055
  new_id = lu.rpc.call_blockdev_create(node, device, device.size,
3056
                                       instance.name, True, info)
3057
  if not new_id:
3058
    return False
3059
  if device.physical_id is None:
3060
    device.physical_id = new_id
3061
  return True
3062

    
3063

    
3064
def _CreateBlockDevOnSecondary(lu, node, instance, device, force, info):
3065
  """Create a tree of block devices on a secondary node.
3066

3067
  If this device type has to be created on secondaries, create it and
3068
  all its children.
3069

3070
  If not, just recurse to children keeping the same 'force' value.
3071

3072
  """
3073
  if device.CreateOnSecondary():
3074
    force = True
3075
  if device.children:
3076
    for child in device.children:
3077
      if not _CreateBlockDevOnSecondary(lu, node, instance,
3078
                                        child, force, info):
3079
        return False
3080

    
3081
  if not force:
3082
    return True
3083
  lu.cfg.SetDiskID(device, node)
3084
  new_id = lu.rpc.call_blockdev_create(node, device, device.size,
3085
                                       instance.name, False, info)
3086
  if not new_id:
3087
    return False
3088
  if device.physical_id is None:
3089
    device.physical_id = new_id
3090
  return True
3091

    
3092

    
3093
def _GenerateUniqueNames(lu, exts):
3094
  """Generate a suitable LV name.
3095

3096
  This will generate a logical volume name for the given instance.
3097

3098
  """
3099
  results = []
3100
  for val in exts:
3101
    new_id = lu.cfg.GenerateUniqueID()
3102
    results.append("%s%s" % (new_id, val))
3103
  return results
3104

    
3105

    
3106
def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
3107
                         p_minor, s_minor):
3108
  """Generate a drbd8 device complete with its children.
3109

3110
  """
3111
  port = lu.cfg.AllocatePort()
3112
  vgname = lu.cfg.GetVGName()
3113
  shared_secret = lu.cfg.GenerateDRBDSecret()
3114
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3115
                          logical_id=(vgname, names[0]))
3116
  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3117
                          logical_id=(vgname, names[1]))
3118
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
3119
                          logical_id=(primary, secondary, port,
3120
                                      p_minor, s_minor,
3121
                                      shared_secret),
3122
                          children=[dev_data, dev_meta],
3123
                          iv_name=iv_name)
3124
  return drbd_dev
3125

    
3126

    
3127
def _GenerateDiskTemplate(lu, template_name,
3128
                          instance_name, primary_node,
3129
                          secondary_nodes, disk_info,
3130
                          file_storage_dir, file_driver,
3131
                          base_index):
3132
  """Generate the entire disk layout for a given template type.
3133

3134
  """
3135
  #TODO: compute space requirements
3136

    
3137
  vgname = lu.cfg.GetVGName()
3138
  disk_count = len(disk_info)
3139
  disks = []
3140
  if template_name == constants.DT_DISKLESS:
3141
    pass
3142
  elif template_name == constants.DT_PLAIN:
3143
    if len(secondary_nodes) != 0:
3144
      raise errors.ProgrammerError("Wrong template configuration")
3145

    
3146
    names = _GenerateUniqueNames(lu, [".disk%d" % i
3147
                                      for i in range(disk_count)])
3148
    for idx, disk in enumerate(disk_info):
3149
      disk_index = idx + base_index
3150
      disk_dev = objects.Disk(dev_type=constants.LD_LV, size=disk["size"],
3151
                              logical_id=(vgname, names[idx]),
3152
                              iv_name="disk/%d" % disk_index)
3153
      disks.append(disk_dev)
3154
  elif template_name == constants.DT_DRBD8:
3155
    if len(secondary_nodes) != 1:
3156
      raise errors.ProgrammerError("Wrong template configuration")
3157
    remote_node = secondary_nodes[0]
3158
    minors = lu.cfg.AllocateDRBDMinor(
3159
      [primary_node, remote_node] * len(disk_info), instance_name)
3160

    
3161
    names = _GenerateUniqueNames(lu,
3162
                                 [".disk%d_%s" % (i, s)
3163
                                  for i in range(disk_count)
3164
                                  for s in ("data", "meta")
3165
                                  ])
3166
    for idx, disk in enumerate(disk_info):
3167
      disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
3168
                                      disk["size"], names[idx*2:idx*2+2],
3169
                                      "disk/%d" % disk_index,
3170
                                      minors[idx*2], minors[idx*2+1])
3171
      disks.append(disk_dev)
3172
  elif template_name == constants.DT_FILE:
3173
    if len(secondary_nodes) != 0:
3174
      raise errors.ProgrammerError("Wrong template configuration")
3175

    
3176
    for idx, disk in enumerate(disk_info):
3177

    
3178
      disk_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk["size"],
3179
                              iv_name="disk/%d" % disk_index,
3180
                              logical_id=(file_driver,
3181
                                          "%s/disk%d" % (file_storage_dir,
3182
                                                         idx)))
3183
      disks.append(disk_dev)
3184
  else:
3185
    raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
3186
  return disks
3187

    
3188

    
3189
def _GetInstanceInfoText(instance):
3190
  """Compute that text that should be added to the disk's metadata.
3191

3192
  """
3193
  return "originstname+%s" % instance.name
3194

    
3195

    
3196
def _CreateDisks(lu, instance):
3197
  """Create all disks for an instance.
3198

3199
  This abstracts away some work from AddInstance.
3200

3201
  @type lu: L{LogicalUnit}
3202
  @param lu: the logical unit on whose behalf we execute
3203
  @type instance: L{objects.Instance}
3204
  @param instance: the instance whose disks we should create
3205
  @rtype: boolean
3206
  @return: the success of the creation
3207

3208
  """
3209
  info = _GetInstanceInfoText(instance)
3210

    
3211
  if instance.disk_template == constants.DT_FILE:
3212
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3213
    result = lu.rpc.call_file_storage_dir_create(instance.primary_node,
3214
                                                 file_storage_dir)
3215

    
3216
    if not result:
3217
      logging.error("Could not connect to node '%s'", instance.primary_node)
3218
      return False
3219

    
3220
    if not result[0]:
3221
      logging.error("Failed to create directory '%s'", file_storage_dir)
3222
      return False
3223

    
3224
  for device in instance.disks:
3225
    logging.info("Creating volume %s for instance %s",
3226
                 device.iv_name, instance.name)
3227
    #HARDCODE
3228
    for secondary_node in instance.secondary_nodes:
3229
      if not _CreateBlockDevOnSecondary(lu, secondary_node, instance,
3230
                                        device, False, info):
3231
        logging.error("Failed to create volume %s (%s) on secondary node %s!",
3232
                      device.iv_name, device, secondary_node)
3233
        return False
3234
    #HARDCODE
3235
    if not _CreateBlockDevOnPrimary(lu, instance.primary_node,
3236
                                    instance, device, info):
3237
      logging.error("Failed to create volume %s on primary!", device.iv_name)
3238
      return False
3239

    
3240
  return True
3241

    
3242

    
3243
def _RemoveDisks(lu, instance):
3244
  """Remove all disks for an instance.
3245

3246
  This abstracts away some work from `AddInstance()` and
3247
  `RemoveInstance()`. Note that in case some of the devices couldn't
3248
  be removed, the removal will continue with the other ones (compare
3249
  with `_CreateDisks()`).
3250

3251
  @type lu: L{LogicalUnit}
3252
  @param lu: the logical unit on whose behalf we execute
3253
  @type instance: L{objects.Instance}
3254
  @param instance: the instance whose disks we should remove
3255
  @rtype: boolean
3256
  @return: the success of the removal
3257

3258
  """
3259
  logging.info("Removing block devices for instance %s", instance.name)
3260

    
3261
  result = True
3262
  for device in instance.disks:
3263
    for node, disk in device.ComputeNodeTree(instance.primary_node):
3264
      lu.cfg.SetDiskID(disk, node)
3265
      if not lu.rpc.call_blockdev_remove(node, disk):
3266
        lu.proc.LogWarning("Could not remove block device %s on node %s,"
3267
                           " continuing anyway", device.iv_name, node)
3268
        result = False
3269

    
3270
  if instance.disk_template == constants.DT_FILE:
3271
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3272
    if not lu.rpc.call_file_storage_dir_remove(instance.primary_node,
3273
                                               file_storage_dir):
3274
      logging.error("Could not remove directory '%s'", file_storage_dir)
3275
      result = False
3276

    
3277
  return result
3278

    
3279

    
3280
def _ComputeDiskSize(disk_template, disks):
3281
  """Compute disk size requirements in the volume group
3282

3283
  """
3284
  # Required free disk space as a function of disk and swap space
3285
  req_size_dict = {
3286
    constants.DT_DISKLESS: None,
3287
    constants.DT_PLAIN: sum(d["size"] for d in disks),
3288
    # 128 MB are added for drbd metadata for each disk
3289
    constants.DT_DRBD8: sum(d["size"] + 128 for d in disks),
3290
    constants.DT_FILE: None,
3291
  }
3292

    
3293
  if disk_template not in req_size_dict:
3294
    raise errors.ProgrammerError("Disk template '%s' size requirement"
3295
                                 " is unknown" %  disk_template)
3296

    
3297
  return req_size_dict[disk_template]
3298

    
3299

    
3300
def _CheckHVParams(lu, nodenames, hvname, hvparams):
3301
  """Hypervisor parameter validation.
3302

3303
  This function abstract the hypervisor parameter validation to be
3304
  used in both instance create and instance modify.
3305

3306
  @type lu: L{LogicalUnit}
3307
  @param lu: the logical unit for which we check
3308
  @type nodenames: list
3309
  @param nodenames: the list of nodes on which we should check
3310
  @type hvname: string
3311
  @param hvname: the name of the hypervisor we should use
3312
  @type hvparams: dict
3313
  @param hvparams: the parameters which we need to check
3314
  @raise errors.OpPrereqError: if the parameters are not valid
3315

3316
  """
3317
  hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
3318
                                                  hvname,
3319
                                                  hvparams)
3320
  for node in nodenames:
3321
    info = hvinfo.get(node, None)
3322
    if not info or not isinstance(info, (tuple, list)):
3323
      raise errors.OpPrereqError("Cannot get current information"
3324
                                 " from node '%s' (%s)" % (node, info))
3325
    if not info[0]:
3326
      raise errors.OpPrereqError("Hypervisor parameter validation failed:"
3327
                                 " %s" % info[1])
3328

    
3329

    
3330
class LUCreateInstance(LogicalUnit):
3331
  """Create an instance.
3332

3333
  """
3334
  HPATH = "instance-add"
3335
  HTYPE = constants.HTYPE_INSTANCE
3336
  _OP_REQP = ["instance_name", "disks", "disk_template",
3337
              "mode", "start",
3338
              "wait_for_sync", "ip_check", "nics",
3339
              "hvparams", "beparams"]
3340
  REQ_BGL = False
3341

    
3342
  def _ExpandNode(self, node):
3343
    """Expands and checks one node name.
3344

3345
    """
3346
    node_full = self.cfg.ExpandNodeName(node)
3347
    if node_full is None:
3348
      raise errors.OpPrereqError("Unknown node %s" % node)
3349
    return node_full
3350

    
3351
  def ExpandNames(self):
3352
    """ExpandNames for CreateInstance.
3353

3354
    Figure out the right locks for instance creation.
3355

3356
    """
3357
    self.needed_locks = {}
3358

    
3359
    # set optional parameters to none if they don't exist
3360
    for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
3361
      if not hasattr(self.op, attr):
3362
        setattr(self.op, attr, None)
3363

    
3364
    # cheap checks, mostly valid constants given
3365

    
3366
    # verify creation mode
3367
    if self.op.mode not in (constants.INSTANCE_CREATE,
3368
                            constants.INSTANCE_IMPORT):
3369
      raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
3370
                                 self.op.mode)
3371

    
3372
    # disk template and mirror node verification
3373
    if self.op.disk_template not in constants.DISK_TEMPLATES:
3374
      raise errors.OpPrereqError("Invalid disk template name")
3375

    
3376
    if self.op.hypervisor is None:
3377
      self.op.hypervisor = self.cfg.GetHypervisorType()
3378

    
3379
    cluster = self.cfg.GetClusterInfo()
3380
    enabled_hvs = cluster.enabled_hypervisors
3381
    if self.op.hypervisor not in enabled_hvs:
3382
      raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
3383
                                 " cluster (%s)" % (self.op.hypervisor,
3384
                                  ",".join(enabled_hvs)))
3385

    
3386
    # check hypervisor parameter syntax (locally)
3387

    
3388
    filled_hvp = cluster.FillDict(cluster.hvparams[self.op.hypervisor],
3389
                                  self.op.hvparams)
3390
    hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
3391
    hv_type.CheckParameterSyntax(filled_hvp)
3392

    
3393
    # fill and remember the beparams dict
3394
    self.be_full = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
3395
                                    self.op.beparams)
3396

    
3397
    #### instance parameters check
3398

    
3399
    # instance name verification
3400
    hostname1 = utils.HostInfo(self.op.instance_name)
3401
    self.op.instance_name = instance_name = hostname1.name
3402

    
3403
    # this is just a preventive check, but someone might still add this
3404
    # instance in the meantime, and creation will fail at lock-add time
3405
    if instance_name in self.cfg.GetInstanceList():
3406
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3407
                                 instance_name)
3408

    
3409
    self.add_locks[locking.LEVEL_INSTANCE] = instance_name
3410

    
3411
    # NIC buildup
3412
    self.nics = []
3413
    for nic in self.op.nics:
3414
      # ip validity checks
3415
      ip = nic.get("ip", None)
3416
      if ip is None or ip.lower() == "none":
3417
        nic_ip = None
3418
      elif ip.lower() == constants.VALUE_AUTO:
3419
        nic_ip = hostname1.ip
3420
      else:
3421
        if not utils.IsValidIP(ip):
3422
          raise errors.OpPrereqError("Given IP address '%s' doesn't look"
3423
                                     " like a valid IP" % ip)
3424
        nic_ip = ip
3425

    
3426
      # MAC address verification
3427
      mac = nic.get("mac", constants.VALUE_AUTO)
3428
      if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
3429
        if not utils.IsValidMac(mac.lower()):
3430
          raise errors.OpPrereqError("Invalid MAC address specified: %s" %
3431
                                     mac)
3432
      # bridge verification
3433
      bridge = nic.get("bridge", self.cfg.GetDefBridge())
3434
      self.nics.append(objects.NIC(mac=mac, ip=nic_ip, bridge=bridge))
3435

    
3436
    # disk checks/pre-build
3437
    self.disks = []
3438
    for disk in self.op.disks:
3439
      mode = disk.get("mode", constants.DISK_RDWR)
3440
      if mode not in constants.DISK_ACCESS_SET:
3441
        raise errors.OpPrereqError("Invalid disk access mode '%s'" %
3442
                                   mode)
3443
      size = disk.get("size", None)
3444
      if size is None:
3445
        raise errors.OpPrereqError("Missing disk size")
3446
      try:
3447
        size = int(size)
3448
      except ValueError:
3449
        raise errors.OpPrereqError("Invalid disk size '%s'" % size)
3450
      self.disks.append({"size": size, "mode": mode})
3451

    
3452
    # used in CheckPrereq for ip ping check
3453
    self.check_ip = hostname1.ip
3454

    
3455
    # file storage checks
3456
    if (self.op.file_driver and
3457
        not self.op.file_driver in constants.FILE_DRIVER):
3458
      raise errors.OpPrereqError("Invalid file driver name '%s'" %
3459
                                 self.op.file_driver)
3460

    
3461
    if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
3462
      raise errors.OpPrereqError("File storage directory path not absolute")
3463

    
3464
    ### Node/iallocator related checks
3465
    if [self.op.iallocator, self.op.pnode].count(None) != 1:
3466
      raise errors.OpPrereqError("One and only one of iallocator and primary"
3467
                                 " node must be given")
3468

    
3469
    if self.op.iallocator:
3470
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3471
    else:
3472
      self.op.pnode = self._ExpandNode(self.op.pnode)
3473
      nodelist = [self.op.pnode]
3474
      if self.op.snode is not None:
3475
        self.op.snode = self._ExpandNode(self.op.snode)
3476
        nodelist.append(self.op.snode)
3477
      self.needed_locks[locking.LEVEL_NODE] = nodelist
3478

    
3479
    # in case of import lock the source node too
3480
    if self.op.mode == constants.INSTANCE_IMPORT:
3481
      src_node = getattr(self.op, "src_node", None)
3482
      src_path = getattr(self.op, "src_path", None)
3483

    
3484
      if src_node is None or src_path is None:
3485
        raise errors.OpPrereqError("Importing an instance requires source"
3486
                                   " node and path options")
3487

    
3488
      if not os.path.isabs(src_path):
3489
        raise errors.OpPrereqError("The source path must be absolute")
3490

    
3491
      self.op.src_node = src_node = self._ExpandNode(src_node)
3492
      if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
3493
        self.needed_locks[locking.LEVEL_NODE].append(src_node)
3494

    
3495
    else: # INSTANCE_CREATE
3496
      if getattr(self.op, "os_type", None) is None:
3497
        raise errors.OpPrereqError("No guest OS specified")
3498

    
3499
  def _RunAllocator(self):
3500
    """Run the allocator based on input opcode.
3501

3502
    """
3503
    nics = [n.ToDict() for n in self.nics]
3504
    ial = IAllocator(self,
3505
                     mode=constants.IALLOCATOR_MODE_ALLOC,
3506
                     name=self.op.instance_name,
3507
                     disk_template=self.op.disk_template,
3508
                     tags=[],
3509
                     os=self.op.os_type,
3510
                     vcpus=self.be_full[constants.BE_VCPUS],
3511
                     mem_size=self.be_full[constants.BE_MEMORY],
3512
                     disks=self.disks,
3513
                     nics=nics,
3514
                     hypervisor=self.op.hypervisor,
3515
                     )
3516

    
3517
    ial.Run(self.op.iallocator)
3518

    
3519
    if not ial.success:
3520
      raise errors.OpPrereqError("Can't compute nodes using"
3521
                                 " iallocator '%s': %s" % (self.op.iallocator,
3522
                                                           ial.info))
3523
    if len(ial.nodes) != ial.required_nodes:
3524
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3525
                                 " of nodes (%s), required %s" %
3526
                                 (self.op.iallocator, len(ial.nodes),
3527
                                  ial.required_nodes))
3528
    self.op.pnode = ial.nodes[0]
3529
    self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
3530
                 self.op.instance_name, self.op.iallocator,
3531
                 ", ".join(ial.nodes))
3532
    if ial.required_nodes == 2:
3533
      self.op.snode = ial.nodes[1]
3534

    
3535
  def BuildHooksEnv(self):
3536
    """Build hooks env.
3537

3538
    This runs on master, primary and secondary nodes of the instance.
3539

3540
    """
3541
    env = {
3542
      "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
3543
      "INSTANCE_DISK_SIZE": ",".join(str(d["size"]) for d in self.disks),
3544
      "INSTANCE_ADD_MODE": self.op.mode,
3545
      }
3546
    if self.op.mode == constants.INSTANCE_IMPORT:
3547
      env["INSTANCE_SRC_NODE"] = self.op.src_node
3548
      env["INSTANCE_SRC_PATH"] = self.op.src_path
3549
      env["INSTANCE_SRC_IMAGES"] = self.src_images
3550

    
3551
    env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
3552
      primary_node=self.op.pnode,
3553
      secondary_nodes=self.secondaries,
3554
      status=self.instance_status,
3555
      os_type=self.op.os_type,
3556
      memory=self.be_full[constants.BE_MEMORY],
3557
      vcpus=self.be_full[constants.BE_VCPUS],
3558
      nics=[(n.ip, n.bridge, n.mac) for n in self.nics],
3559
    ))
3560

    
3561
    nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
3562
          self.secondaries)
3563
    return env, nl, nl
3564

    
3565

    
3566
  def CheckPrereq(self):
3567
    """Check prerequisites.
3568

3569
    """
3570
    if (not self.cfg.GetVGName() and
3571
        self.op.disk_template not in constants.DTS_NOT_LVM):
3572
      raise errors.OpPrereqError("Cluster does not support lvm-based"
3573
                                 " instances")
3574

    
3575

    
3576
    if self.op.mode == constants.INSTANCE_IMPORT:
3577
      src_node = self.op.src_node
3578
      src_path = self.op.src_path
3579

    
3580
      export_info = self.rpc.call_export_info(src_node, src_path)
3581

    
3582
      if not export_info:
3583
        raise errors.OpPrereqError("No export found in dir %s" % src_path)
3584

    
3585
      if not export_info.has_section(constants.INISECT_EXP):
3586
        raise errors.ProgrammerError("Corrupted export config")
3587

    
3588
      ei_version = export_info.get(constants.INISECT_EXP, 'version')
3589
      if (int(ei_version) != constants.EXPORT_VERSION):
3590
        raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3591
                                   (ei_version, constants.EXPORT_VERSION))
3592

    
3593
      # Check that the new instance doesn't have less disks than the export
3594
      instance_disks = len(self.disks)
3595
      export_disks = export_info.getint(constants.INISECT_INS, 'disk_count')
3596
      if instance_disks < export_disks:
3597
        raise errors.OpPrereqError("Not enough disks to import."
3598
                                   " (instance: %d, export: %d)" %
3599
                                   (2, export_disks))
3600

    
3601
      self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3602
      disk_images = []
3603
      for idx in range(export_disks):
3604
        option = 'disk%d_dump' % idx
3605
        if export_info.has_option(constants.INISECT_INS, option):
3606
          # FIXME: are the old os-es, disk sizes, etc. useful?
3607
          export_name = export_info.get(constants.INISECT_INS, option)
3608
          image = os.path.join(src_path, export_name)
3609
          disk_images.append(image)
3610
        else:
3611
          disk_images.append(False)
3612

    
3613
      self.src_images = disk_images
3614

    
3615
      old_name = export_info.get(constants.INISECT_INS, 'name')
3616
      # FIXME: int() here could throw a ValueError on broken exports
3617
      exp_nic_count = int(export_info.get(constants.INISECT_INS, 'nic_count'))
3618
      if self.op.instance_name == old_name:
3619
        for idx, nic in enumerate(self.nics):
3620
          if nic.mac == constants.VALUE_AUTO and exp_nic_count >= idx:
3621
            nic_mac_ini = 'nic%d_mac' % idx
3622
            nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini)
3623

    
3624
    # ip ping checks (we use the same ip that was resolved in ExpandNames)
3625
    if self.op.start and not self.op.ip_check:
3626
      raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3627
                                 " adding an instance in start mode")
3628

    
3629
    if self.op.ip_check:
3630
      if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
3631
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
3632
                                   (self.check_ip, self.op.instance_name))
3633

    
3634
    #### allocator run
3635

    
3636
    if self.op.iallocator is not None:
3637
      self._RunAllocator()
3638

    
3639
    #### node related checks
3640

    
3641
    # check primary node
3642
    self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
3643
    assert self.pnode is not None, \
3644
      "Cannot retrieve locked node %s" % self.op.pnode
3645
    self.secondaries = []
3646

    
3647
    # mirror node verification
3648
    if self.op.disk_template in constants.DTS_NET_MIRROR:
3649
      if self.op.snode is None:
3650
        raise errors.OpPrereqError("The networked disk templates need"
3651
                                   " a mirror node")
3652
      if self.op.snode == pnode.name:
3653
        raise errors.OpPrereqError("The secondary node cannot be"
3654
                                   " the primary node.")
3655
      self.secondaries.append(self.op.snode)
3656

    
3657
    nodenames = [pnode.name] + self.secondaries
3658

    
3659
    req_size = _ComputeDiskSize(self.op.disk_template,
3660
                                self.disks)
3661

    
3662
    # Check lv size requirements
3663
    if req_size is not None:
3664
      nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
3665
                                         self.op.hypervisor)
3666
      for node in nodenames:
3667
        info = nodeinfo.get(node, None)
3668
        if not info:
3669
          raise errors.OpPrereqError("Cannot get current information"
3670
                                     " from node '%s'" % node)
3671
        vg_free = info.get('vg_free', None)
3672
        if not isinstance(vg_free, int):
3673
          raise errors.OpPrereqError("Can't compute free disk space on"
3674
                                     " node %s" % node)
3675
        if req_size > info['vg_free']:
3676
          raise errors.OpPrereqError("Not enough disk space on target node %s."
3677
                                     " %d MB available, %d MB required" %
3678
                                     (node, info['vg_free'], req_size))
3679

    
3680
    _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
3681

    
3682
    # os verification
3683
    os_obj = self.rpc.call_os_get(pnode.name, self.op.os_type)
3684
    if not os_obj:
3685
      raise errors.OpPrereqError("OS '%s' not in supported os list for"
3686
                                 " primary node"  % self.op.os_type)
3687

    
3688
    # bridge check on primary node
3689
    bridges = [n.bridge for n in self.nics]
3690
    if not self.rpc.call_bridges_exist(self.pnode.name, bridges):
3691
      raise errors.OpPrereqError("one of the target bridges '%s' does not"
3692
                                 " exist on"
3693
                                 " destination node '%s'" %
3694
                                 (",".join(bridges), pnode.name))
3695

    
3696
    # memory check on primary node
3697
    if self.op.start:
3698
      _CheckNodeFreeMemory(self, self.pnode.name,
3699
                           "creating instance %s" % self.op.instance_name,
3700
                           self.be_full[constants.BE_MEMORY],
3701
                           self.op.hypervisor)
3702

    
3703
    if self.op.start:
3704
      self.instance_status = 'up'
3705
    else:
3706
      self.instance_status = 'down'
3707

    
3708
  def Exec(self, feedback_fn):
3709
    """Create and add the instance to the cluster.
3710

3711
    """
3712
    instance = self.op.instance_name
3713
    pnode_name = self.pnode.name
3714

    
3715
    for nic in self.nics:
3716
      if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
3717
        nic.mac = self.cfg.GenerateMAC()
3718

    
3719
    ht_kind = self.op.hypervisor
3720
    if ht_kind in constants.HTS_REQ_PORT:
3721
      network_port = self.cfg.AllocatePort()
3722
    else:
3723
      network_port = None
3724

    
3725
    ##if self.op.vnc_bind_address is None:
3726
    ##  self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
3727

    
3728
    # this is needed because os.path.join does not accept None arguments
3729
    if self.op.file_storage_dir is None:
3730
      string_file_storage_dir = ""
3731
    else:
3732
      string_file_storage_dir = self.op.file_storage_dir
3733

    
3734
    # build the full file storage dir path
3735
    file_storage_dir = os.path.normpath(os.path.join(
3736
                                        self.cfg.GetFileStorageDir(),
3737
                                        string_file_storage_dir, instance))
3738

    
3739

    
3740
    disks = _GenerateDiskTemplate(self,
3741
                                  self.op.disk_template,
3742
                                  instance, pnode_name,
3743
                                  self.secondaries,
3744
                                  self.disks,
3745
                                  file_storage_dir,
3746
                                  self.op.file_driver,
3747
                                  0)
3748

    
3749
    iobj = objects.Instance(name=instance, os=self.op.os_type,
3750
                            primary_node=pnode_name,
3751
                            nics=self.nics, disks=disks,
3752
                            disk_template=self.op.disk_template,
3753
                            status=self.instance_status,
3754
                            network_port=network_port,
3755
                            beparams=self.op.beparams,
3756
                            hvparams=self.op.hvparams,
3757
                            hypervisor=self.op.hypervisor,
3758
                            )
3759

    
3760
    feedback_fn("* creating instance disks...")
3761
    if not _CreateDisks(self, iobj):
3762
      _RemoveDisks(self, iobj)
3763
      self.cfg.ReleaseDRBDMinors(instance)
3764
      raise errors.OpExecError("Device creation failed, reverting...")
3765

    
3766
    feedback_fn("adding instance %s to cluster config" % instance)
3767

    
3768
    self.cfg.AddInstance(iobj)
3769
    # Declare that we don't want to remove the instance lock anymore, as we've
3770
    # added the instance to the config
3771
    del self.remove_locks[locking.LEVEL_INSTANCE]
3772
    # Remove the temp. assignements for the instance's drbds
3773
    self.cfg.ReleaseDRBDMinors(instance)
3774
    # Unlock all the nodes
3775
    self.context.glm.release(locking.LEVEL_NODE)
3776
    del self.acquired_locks[locking.LEVEL_NODE]
3777

    
3778
    if self.op.wait_for_sync:
3779
      disk_abort = not _WaitForSync(self, iobj)
3780
    elif iobj.disk_template in constants.DTS_NET_MIRROR:
3781
      # make sure the disks are not degraded (still sync-ing is ok)
3782
      time.sleep(15)
3783
      feedback_fn("* checking mirrors status")
3784
      disk_abort = not _WaitForSync(self, iobj, oneshot=True)
3785
    else:
3786
      disk_abort = False
3787

    
3788
    if disk_abort:
3789
      _RemoveDisks(self, iobj)
3790
      self.cfg.RemoveInstance(iobj.name)
3791
      # Make sure the instance lock gets removed
3792
      self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
3793
      raise errors.OpExecError("There are some degraded disks for"
3794
                               " this instance")
3795

    
3796
    feedback_fn("creating os for instance %s on node %s" %
3797
                (instance, pnode_name))
3798

    
3799
    if iobj.disk_template != constants.DT_DISKLESS:
3800
      if self.op.mode == constants.INSTANCE_CREATE:
3801
        feedback_fn("* running the instance OS create scripts...")
3802
        if not self.rpc.call_instance_os_add(pnode_name, iobj):
3803
          raise errors.OpExecError("could not add os for instance %s"
3804
                                   " on node %s" %
3805
                                   (instance, pnode_name))
3806

    
3807
      elif self.op.mode == constants.INSTANCE_IMPORT:
3808
        feedback_fn("* running the instance OS import scripts...")
3809
        src_node = self.op.src_node
3810
        src_images = self.src_images
3811
        cluster_name = self.cfg.GetClusterName()
3812
        import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
3813
                                                         src_node, src_images,
3814
                                                         cluster_name)
3815
        for idx, result in enumerate(import_result):
3816
          if not result:
3817
            self.LogWarning("Could not image %s for on instance %s, disk %d,"
3818
                            " on node %s" % (src_images[idx], instance, idx,
3819
                                             pnode_name))
3820
      else:
3821
        # also checked in the prereq part
3822
        raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3823
                                     % self.op.mode)
3824

    
3825
    if self.op.start:
3826
      logging.info("Starting instance %s on node %s", instance, pnode_name)
3827
      feedback_fn("* starting instance...")
3828
      if not self.rpc.call_instance_start(pnode_name, iobj, None):
3829
        raise errors.OpExecError("Could not start instance")
3830

    
3831

    
3832
class LUConnectConsole(NoHooksLU):
3833
  """Connect to an instance's console.
3834

3835
  This is somewhat special in that it returns the command line that
3836
  you need to run on the master node in order to connect to the
3837
  console.
3838

3839
  """
3840
  _OP_REQP = ["instance_name"]
3841
  REQ_BGL = False
3842

    
3843
  def ExpandNames(self):
3844
    self._ExpandAndLockInstance()
3845

    
3846
  def CheckPrereq(self):
3847
    """Check prerequisites.
3848

3849
    This checks that the instance is in the cluster.
3850

3851
    """
3852
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3853
    assert self.instance is not None, \
3854
      "Cannot retrieve locked instance %s" % self.op.instance_name
3855

    
3856
  def Exec(self, feedback_fn):
3857
    """Connect to the console of an instance
3858

3859
    """
3860
    instance = self.instance
3861
    node = instance.primary_node
3862

    
3863
    node_insts = self.rpc.call_instance_list([node],
3864
                                             [instance.hypervisor])[node]
3865
    if node_insts is False:
3866
      raise errors.OpExecError("Can't connect to node %s." % node)
3867

    
3868
    if instance.name not in node_insts:
3869
      raise errors.OpExecError("Instance %s is not running." % instance.name)
3870

    
3871
    logging.debug("Connecting to console of %s on %s", instance.name, node)
3872

    
3873
    hyper = hypervisor.GetHypervisor(instance.hypervisor)
3874
    console_cmd = hyper.GetShellCommandForConsole(instance)
3875

    
3876
    # build ssh cmdline
3877
    return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
3878

    
3879

    
3880
class LUReplaceDisks(LogicalUnit):
3881
  """Replace the disks of an instance.
3882

3883
  """
3884
  HPATH = "mirrors-replace"
3885
  HTYPE = constants.HTYPE_INSTANCE
3886
  _OP_REQP = ["instance_name", "mode", "disks"]
3887
  REQ_BGL = False
3888

    
3889
  def ExpandNames(self):
3890
    self._ExpandAndLockInstance()
3891

    
3892
    if not hasattr(self.op, "remote_node"):
3893
      self.op.remote_node = None
3894

    
3895
    ia_name = getattr(self.op, "iallocator", None)
3896
    if ia_name is not None:
3897
      if self.op.remote_node is not None:
3898
        raise errors.OpPrereqError("Give either the iallocator or the new"
3899
                                   " secondary, not both")
3900
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3901
    elif self.op.remote_node is not None:
3902
      remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
3903
      if remote_node is None:
3904
        raise errors.OpPrereqError("Node '%s' not known" %
3905
                                   self.op.remote_node)
3906
      self.op.remote_node = remote_node
3907
      self.needed_locks[locking.LEVEL_NODE] = [remote_node]
3908
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
3909
    else:
3910
      self.needed_locks[locking.LEVEL_NODE] = []
3911
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3912

    
3913
  def DeclareLocks(self, level):
3914
    # If we're not already locking all nodes in the set we have to declare the
3915
    # instance's primary/secondary nodes.
3916
    if (level == locking.LEVEL_NODE and
3917
        self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
3918
      self._LockInstancesNodes()
3919

    
3920
  def _RunAllocator(self):
3921
    """Compute a new secondary node using an IAllocator.
3922

3923
    """
3924
    ial = IAllocator(self,
3925
                     mode=constants.IALLOCATOR_MODE_RELOC,
3926
                     name=self.op.instance_name,
3927
                     relocate_from=[self.sec_node])
3928

    
3929
    ial.Run(self.op.iallocator)
3930

    
3931
    if not ial.success:
3932
      raise errors.OpPrereqError("Can't compute nodes using"
3933
                                 " iallocator '%s': %s" % (self.op.iallocator,
3934
                                                           ial.info))
3935
    if len(ial.nodes) != ial.required_nodes:
3936
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3937
                                 " of nodes (%s), required %s" %
3938
                                 (len(ial.nodes), ial.required_nodes))
3939
    self.op.remote_node = ial.nodes[0]
3940
    self.LogInfo("Selected new secondary for the instance: %s",
3941
                 self.op.remote_node)
3942

    
3943
  def BuildHooksEnv(self):
3944
    """Build hooks env.
3945

3946
    This runs on the master, the primary and all the secondaries.
3947

3948
    """
3949
    env = {
3950
      "MODE": self.op.mode,
3951
      "NEW_SECONDARY": self.op.remote_node,
3952
      "OLD_SECONDARY": self.instance.secondary_nodes[0],
3953
      }
3954
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3955
    nl = [
3956
      self.cfg.GetMasterNode(),
3957
      self.instance.primary_node,
3958
      ]
3959
    if self.op.remote_node is not None:
3960
      nl.append(self.op.remote_node)
3961
    return env, nl, nl
3962

    
3963
  def CheckPrereq(self):
3964
    """Check prerequisites.
3965

3966
    This checks that the instance is in the cluster.
3967

3968
    """
3969
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3970
    assert instance is not None, \
3971
      "Cannot retrieve locked instance %s" % self.op.instance_name
3972
    self.instance = instance
3973

    
3974
    if instance.disk_template not in constants.DTS_NET_MIRROR:
3975
      raise errors.OpPrereqError("Instance's disk layout is not"
3976
                                 " network mirrored.")
3977

    
3978
    if len(instance.secondary_nodes) != 1:
3979
      raise errors.OpPrereqError("The instance has a strange layout,"
3980
                                 " expected one secondary but found %d" %
3981
                                 len(instance.secondary_nodes))
3982

    
3983
    self.sec_node = instance.secondary_nodes[0]
3984

    
3985
    ia_name = getattr(self.op, "iallocator", None)
3986
    if ia_name is not None:
3987
      self._RunAllocator()
3988

    
3989
    remote_node = self.op.remote_node
3990
    if remote_node is not None:
3991
      self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3992
      assert self.remote_node_info is not None, \
3993
        "Cannot retrieve locked node %s" % remote_node
3994
    else:
3995
      self.remote_node_info = None
3996
    if remote_node == instance.primary_node:
3997
      raise errors.OpPrereqError("The specified node is the primary node of"
3998
                                 " the instance.")
3999
    elif remote_node == self.sec_node:
4000
      if self.op.mode == constants.REPLACE_DISK_SEC:
4001
        # this is for DRBD8, where we can't execute the same mode of
4002
        # replacement as for drbd7 (no different port allocated)
4003
        raise errors.OpPrereqError("Same secondary given, cannot execute"
4004
                                   " replacement")
4005
    if instance.disk_template == constants.DT_DRBD8:
4006
      if (self.op.mode == constants.REPLACE_DISK_ALL and
4007
          remote_node is not None):
4008
        # switch to replace secondary mode
4009
        self.op.mode = constants.REPLACE_DISK_SEC
4010

    
4011
      if self.op.mode == constants.REPLACE_DISK_ALL:
4012
        raise errors.OpPrereqError("Template 'drbd' only allows primary or"
4013
                                   " secondary disk replacement, not"
4014
                                   " both at once")
4015
      elif self.op.mode == constants.REPLACE_DISK_PRI:
4016
        if remote_node is not None:
4017
          raise errors.OpPrereqError("Template 'drbd' does not allow changing"
4018
                                     " the secondary while doing a primary"
4019
                                     " node disk replacement")
4020
        self.tgt_node = instance.primary_node
4021
        self.oth_node = instance.secondary_nodes[0]
4022
      elif self.op.mode == constants.REPLACE_DISK_SEC:
4023
        self.new_node = remote_node # this can be None, in which case
4024
                                    # we don't change the secondary
4025
        self.tgt_node = instance.secondary_nodes[0]
4026
        self.oth_node = instance.primary_node
4027
      else:
4028
        raise errors.ProgrammerError("Unhandled disk replace mode")
4029

    
4030
    if not self.op.disks:
4031
      self.op.disks = range(len(instance.disks))
4032

    
4033
    for disk_idx in self.op.disks:
4034
      instance.FindDisk(disk_idx)
4035

    
4036
  def _ExecD8DiskOnly(self, feedback_fn):
4037
    """Replace a disk on the primary or secondary for dbrd8.
4038

4039
    The algorithm for replace is quite complicated:
4040

4041
      1. for each disk to be replaced:
4042

4043
        1. create new LVs on the target node with unique names
4044
        1. detach old LVs from the drbd device
4045
        1. rename old LVs to name_replaced.<time_t>
4046
        1. rename new LVs to old LVs
4047
        1. attach the new LVs (with the old names now) to the drbd device
4048

4049
      1. wait for sync across all devices
4050

4051
      1. for each modified disk:
4052

4053
        1. remove old LVs (which have the name name_replaces.<time_t>)
4054

4055
    Failures are not very well handled.
4056

4057
    """
4058
    steps_total = 6
4059
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
4060
    instance = self.instance
4061
    iv_names = {}
4062
    vgname = self.cfg.GetVGName()
4063
    # start of work
4064
    cfg = self.cfg
4065
    tgt_node = self.tgt_node
4066
    oth_node = self.oth_node
4067

    
4068
    # Step: check device activation
4069
    self.proc.LogStep(1, steps_total, "check device existence")
4070
    info("checking volume groups")
4071
    my_vg = cfg.GetVGName()
4072
    results = self.rpc.call_vg_list([oth_node, tgt_node])
4073
    if not results:
4074
      raise errors.OpExecError("Can't list volume groups on the nodes")
4075
    for node in oth_node, tgt_node:
4076
      res = results.get(node, False)
4077
      if not res or my_vg not in res:
4078
        raise errors.OpExecError("Volume group '%s' not found on %s" %
4079
                                 (my_vg, node))
4080
    for idx, dev in enumerate(instance.disks):
4081
      if idx not in self.op.disks:
4082
        continue
4083
      for node in tgt_node, oth_node:
4084
        info("checking disk/%d on %s" % (idx, node))
4085
        cfg.SetDiskID(dev, node)
4086
        if not self.rpc.call_blockdev_find(node, dev):
4087
          raise errors.OpExecError("Can't find disk/%d on node %s" %
4088
                                   (idx, node))
4089

    
4090
    # Step: check other node consistency
4091
    self.proc.LogStep(2, steps_total, "check peer consistency")
4092
    for idx, dev in enumerate(instance.disks):
4093
      if idx not in self.op.disks:
4094
        continue
4095
      info("checking disk/%d consistency on %s" % (idx, oth_node))
4096
      if not _CheckDiskConsistency(self, dev, oth_node,
4097
                                   oth_node==instance.primary_node):
4098
        raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
4099
                                 " to replace disks on this node (%s)" %
4100
                                 (oth_node, tgt_node))
4101

    
4102
    # Step: create new storage
4103
    self.proc.LogStep(3, steps_total, "allocate new storage")
4104
    for idx, dev in enumerate(instance.disks):
4105
      if idx not in self.op.disks:
4106
        continue
4107
      size = dev.size
4108
      cfg.SetDiskID(dev, tgt_node)
4109
      lv_names = [".disk%d_%s" % (idx, suf)
4110
                  for suf in ["data", "meta"]]
4111
      names = _GenerateUniqueNames(self, lv_names)
4112
      lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
4113
                             logical_id=(vgname, names[0]))
4114
      lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
4115
                             logical_id=(vgname, names[1]))
4116
      new_lvs = [lv_data, lv_meta]
4117
      old_lvs = dev.children
4118
      iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
4119
      info("creating new local storage on %s for %s" %
4120
           (tgt_node, dev.iv_name))
4121
      # since we *always* want to create this LV, we use the
4122
      # _Create...OnPrimary (which forces the creation), even if we
4123
      # are talking about the secondary node
4124
      for new_lv in new_lvs:
4125
        if not _CreateBlockDevOnPrimary(self, tgt_node, instance, new_lv,
4126
                                        _GetInstanceInfoText(instance)):
4127
          raise errors.OpExecError("Failed to create new LV named '%s' on"
4128
                                   " node '%s'" %
4129
                                   (new_lv.logical_id[1], tgt_node))
4130

    
4131
    # Step: for each lv, detach+rename*2+attach
4132
    self.proc.LogStep(4, steps_total, "change drbd configuration")
4133
    for dev, old_lvs, new_lvs in iv_names.itervalues():
4134
      info("detaching %s drbd from local storage" % dev.iv_name)
4135
      if not self.rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
4136
        raise errors.OpExecError("Can't detach drbd from local storage on node"
4137
                                 " %s for device %s" % (tgt_node, dev.iv_name))
4138
      #dev.children = []
4139
      #cfg.Update(instance)
4140

    
4141
      # ok, we created the new LVs, so now we know we have the needed
4142
      # storage; as such, we proceed on the target node to rename
4143
      # old_lv to _old, and new_lv to old_lv; note that we rename LVs
4144
      # using the assumption that logical_id == physical_id (which in
4145
      # turn is the unique_id on that node)
4146

    
4147
      # FIXME(iustin): use a better name for the replaced LVs
4148
      temp_suffix = int(time.time())
4149
      ren_fn = lambda d, suff: (d.physical_id[0],
4150
                                d.physical_id[1] + "_replaced-%s" % suff)
4151
      # build the rename list based on what LVs exist on the node
4152
      rlist = []
4153
      for to_ren in old_lvs:
4154
        find_res = self.rpc.call_blockdev_find(tgt_node, to_ren)
4155
        if find_res is not None: # device exists
4156
          rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
4157

    
4158
      info("renaming the old LVs on the target node")
4159
      if not self.rpc.call_blockdev_rename(tgt_node, rlist):
4160
        raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
4161
      # now we rename the new LVs to the old LVs
4162
      info("renaming the new LVs on the target node")
4163
      rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
4164
      if not self.rpc.call_blockdev_rename(tgt_node, rlist):
4165
        raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
4166

    
4167
      for old, new in zip(old_lvs, new_lvs):
4168
        new.logical_id = old.logical_id
4169
        cfg.SetDiskID(new, tgt_node)
4170

    
4171
      for disk in old_lvs:
4172
        disk.logical_id = ren_fn(disk, temp_suffix)
4173
        cfg.SetDiskID(disk, tgt_node)
4174

    
4175
      # now that the new lvs have the old name, we can add them to the device
4176
      info("adding new mirror component on %s" % tgt_node)
4177
      if not self.rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
4178
        for new_lv in new_lvs:
4179
          if not self.rpc.call_blockdev_remove(tgt_node, new_lv):
4180
            warning("Can't rollback device %s", hint="manually cleanup unused"
4181
                    " logical volumes")
4182
        raise errors.OpExecError("Can't add local storage to drbd")
4183

    
4184
      dev.children = new_lvs
4185
      cfg.Update(instance)
4186

    
4187
    # Step: wait for sync
4188

    
4189
    # this can fail as the old devices are degraded and _WaitForSync
4190
    # does a combined result over all disks, so we don't check its
4191
    # return value
4192
    self.proc.LogStep(5, steps_total, "sync devices")
4193
    _WaitForSync(self, instance, unlock=True)
4194

    
4195
    # so check manually all the devices
4196
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
4197
      cfg.SetDiskID(dev, instance.primary_node)
4198
      is_degr = self.rpc.call_blockdev_find(instance.primary_node, dev)[5]
4199
      if is_degr:
4200
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
4201

    
4202
    # Step: remove old storage
4203
    self.proc.LogStep(6, steps_total, "removing old storage")
4204
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
4205
      info("remove logical volumes for %s" % name)
4206
      for lv in old_lvs:
4207
        cfg.SetDiskID(lv, tgt_node)
4208
        if not self.rpc.call_blockdev_remove(tgt_node, lv):
4209
          warning("Can't remove old LV", hint="manually remove unused LVs")
4210
          continue
4211

    
4212
  def _ExecD8Secondary(self, feedback_fn):
4213
    """Replace the secondary node for drbd8.
4214

4215
    The algorithm for replace is quite complicated:
4216
      - for all disks of the instance:
4217
        - create new LVs on the new node with same names
4218
        - shutdown the drbd device on the old secondary
4219
        - disconnect the drbd network on the primary
4220
        - create the drbd device on the new secondary
4221
        - network attach the drbd on the primary, using an artifice:
4222
          the drbd code for Attach() will connect to the network if it
4223
          finds a device which is connected to the good local disks but
4224
          not network enabled
4225
      - wait for sync across all devices
4226
      - remove all disks from the old secondary
4227

4228
    Failures are not very well handled.
4229

4230
    """
4231
    steps_total = 6
4232
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
4233
    instance = self.instance
4234
    iv_names = {}
4235
    vgname = self.cfg.GetVGName()
4236
    # start of work
4237
    cfg = self.cfg
4238
    old_node = self.tgt_node
4239
    new_node = self.new_node
4240
    pri_node = instance.primary_node
4241

    
4242
    # Step: check device activation
4243
    self.proc.LogStep(1, steps_total, "check device existence")
4244
    info("checking volume groups")
4245
    my_vg = cfg.GetVGName()
4246
    results = self.rpc.call_vg_list([pri_node, new_node])
4247
    if not results:
4248
      raise errors.OpExecError("Can't list volume groups on the nodes")
4249
    for node in pri_node, new_node:
4250
      res = results.get(node, False)
4251
      if not res or my_vg not in res:
4252
        raise errors.OpExecError("Volume group '%s' not found on %s" %
4253
                                 (my_vg, node))
4254
    for idx, dev in enumerate(instance.disks):
4255
      if idx not in self.op.disks:
4256
        continue
4257
      info("checking disk/%d on %s" % (idx, pri_node))
4258
      cfg.SetDiskID(dev, pri_node)
4259
      if not self.rpc.call_blockdev_find(pri_node, dev):
4260
        raise errors.OpExecError("Can't find disk/%d on node %s" %
4261
                                 (idx, pri_node))
4262

    
4263
    # Step: check other node consistency
4264
    self.proc.LogStep(2, steps_total, "check peer consistency")
4265
    for idx, dev in enumerate(instance.disks):
4266
      if idx not in self.op.disks:
4267
        continue
4268
      info("checking disk/%d consistency on %s" % (idx, pri_node))
4269
      if not _CheckDiskConsistency(self, dev, pri_node, True, ldisk=True):
4270
        raise errors.OpExecError("Primary node (%s) has degraded storage,"
4271
                                 " unsafe to replace the secondary" %
4272
                                 pri_node)
4273

    
4274
    # Step: create new storage
4275
    self.proc.LogStep(3, steps_total, "allocate new storage")
4276
    for idx, dev in enumerate(instance.disks):
4277
      size = dev.size
4278
      info("adding new local storage on %s for disk/%d" %
4279
           (new_node, idx))
4280
      # since we *always* want to create this LV, we use the
4281
      # _Create...OnPrimary (which forces the creation), even if we
4282
      # are talking about the secondary node
4283
      for new_lv in dev.children:
4284
        if not _CreateBlockDevOnPrimary(self, new_node, instance, new_lv,
4285
                                        _GetInstanceInfoText(instance)):
4286
          raise errors.OpExecError("Failed to create new LV named '%s' on"
4287
                                   " node '%s'" %
4288
                                   (new_lv.logical_id[1], new_node))
4289

    
4290
    # Step 4: dbrd minors and drbd setups changes
4291
    # after this, we must manually remove the drbd minors on both the
4292
    # error and the success paths
4293
    minors = cfg.AllocateDRBDMinor([new_node for dev in instance.disks],
4294
                                   instance.name)
4295
    logging.debug("Allocated minors %s" % (minors,))
4296
    self.proc.LogStep(4, steps_total, "changing drbd configuration")
4297
    for idx, (dev, new_minor) in enumerate(zip(instance.disks, minors)):
4298
      size = dev.size
4299
      info("activating a new drbd on %s for disk/%d" % (new_node, idx))
4300
      # create new devices on new_node
4301
      if pri_node == dev.logical_id[0]:
4302
        new_logical_id = (pri_node, new_node,
4303
                          dev.logical_id[2], dev.logical_id[3], new_minor,
4304
                          dev.logical_id[5])
4305
      else:
4306
        new_logical_id = (new_node, pri_node,
4307
                          dev.logical_id[2], new_minor, dev.logical_id[4],
4308
                          dev.logical_id[5])
4309
      iv_names[idx] = (dev, dev.children, new_logical_id)
4310
      logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
4311
                    new_logical_id)
4312
      new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
4313
                              logical_id=new_logical_id,
4314
                              children=dev.children)
4315
      if not _CreateBlockDevOnSecondary(self, new_node, instance,
4316
                                        new_drbd, False,
4317
                                        _GetInstanceInfoText(instance)):
4318
        self.cfg.ReleaseDRBDMinors(instance.name)
4319
        raise errors.OpExecError("Failed to create new DRBD on"
4320
                                 " node '%s'" % new_node)
4321

    
4322
    for idx, dev in enumerate(instance.disks):
4323
      # we have new devices, shutdown the drbd on the old secondary
4324
      info("shutting down drbd for disk/%d on old node" % idx)
4325
      cfg.SetDiskID(dev, old_node)
4326
      if not self.rpc.call_blockdev_shutdown(old_node, dev):
4327
        warning("Failed to shutdown drbd for disk/%d on old node" % idx,
4328
                hint="Please cleanup this device manually as soon as possible")
4329

    
4330
    info("detaching primary drbds from the network (=> standalone)")
4331
    done = 0
4332
    for idx, dev in enumerate(instance.disks):
4333
      cfg.SetDiskID(dev, pri_node)
4334
      # set the network part of the physical (unique in bdev terms) id
4335
      # to None, meaning detach from network
4336
      dev.physical_id = (None, None, None, None) + dev.physical_id[4:]
4337
      # and 'find' the device, which will 'fix' it to match the
4338
      # standalone state
4339
      if self.rpc.call_blockdev_find(pri_node, dev):
4340
        done += 1
4341
      else:
4342
        warning("Failed to detach drbd disk/%d from network, unusual case" %
4343
                idx)
4344

    
4345
    if not done:
4346
      # no detaches succeeded (very unlikely)
4347
      self.cfg.ReleaseDRBDMinors(instance.name)
4348
      raise errors.OpExecError("Can't detach at least one DRBD from old node")
4349

    
4350
    # if we managed to detach at least one, we update all the disks of
4351
    # the instance to point to the new secondary
4352
    info("updating instance configuration")
4353
    for dev, _, new_logical_id in iv_names.itervalues():
4354
      dev.logical_id = new_logical_id
4355
      cfg.SetDiskID(dev, pri_node)
4356
    cfg.Update(instance)
4357
    # we can remove now the temp minors as now the new values are
4358
    # written to the config file (and therefore stable)
4359
    self.cfg.ReleaseDRBDMinors(instance.name)
4360

    
4361
    # and now perform the drbd attach
4362
    info("attaching primary drbds to new secondary (standalone => connected)")
4363
    failures = []
4364
    for idx, dev in enumerate(instance.disks):
4365
      info("attaching primary drbd for disk/%d to new secondary node" % idx)
4366
      # since the attach is smart, it's enough to 'find' the device,
4367
      # it will automatically activate the network, if the physical_id
4368
      # is correct
4369
      cfg.SetDiskID(dev, pri_node)
4370
      logging.debug("Disk to attach: %s", dev)
4371
      if not self.rpc.call_blockdev_find(pri_node, dev):
4372
        warning("can't attach drbd disk/%d to new secondary!" % idx,
4373
                "please do a gnt-instance info to see the status of disks")
4374

    
4375
    # this can fail as the old devices are degraded and _WaitForSync
4376
    # does a combined result over all disks, so we don't check its
4377
    # return value
4378
    self.proc.LogStep(5, steps_total, "sync devices")
4379
    _WaitForSync(self, instance, unlock=True)
4380

    
4381
    # so check manually all the devices
4382
    for idx, (dev, old_lvs, _) in iv_names.iteritems():
4383
      cfg.SetDiskID(dev, pri_node)
4384
      is_degr = self.rpc.call_blockdev_find(pri_node, dev)[5]
4385
      if is_degr:
4386
        raise errors.OpExecError("DRBD device disk/%d is degraded!" % idx)
4387

    
4388
    self.proc.LogStep(6, steps_total, "removing old storage")
4389
    for idx, (dev, old_lvs, _) in iv_names.iteritems():
4390
      info("remove logical volumes for disk/%d" % idx)
4391
      for lv in old_lvs:
4392
        cfg.SetDiskID(lv, old_node)
4393
        if not self.rpc.call_blockdev_remove(old_node, lv):
4394
          warning("Can't remove LV on old secondary",
4395
                  hint="Cleanup stale volumes by hand")
4396

    
4397
  def Exec(self, feedback_fn):
4398
    """Execute disk replacement.
4399

4400
    This dispatches the disk replacement to the appropriate handler.
4401

4402
    """
4403
    instance = self.instance
4404

    
4405
    # Activate the instance disks if we're replacing them on a down instance
4406
    if instance.status == "down":
4407
      _StartInstanceDisks(self, instance, True)
4408

    
4409
    if instance.disk_template == constants.DT_DRBD8:
4410
      if self.op.remote_node is None:
4411
        fn = self._ExecD8DiskOnly
4412
      else:
4413
        fn = self._ExecD8Secondary
4414
    else:
4415
      raise errors.ProgrammerError("Unhandled disk replacement case")
4416

    
4417
    ret = fn(feedback_fn)
4418

    
4419
    # Deactivate the instance disks if we're replacing them on a down instance
4420
    if instance.status == "down":
4421
      _SafeShutdownInstanceDisks(self, instance)
4422

    
4423
    return ret
4424

    
4425

    
4426
class LUGrowDisk(LogicalUnit):
4427
  """Grow a disk of an instance.
4428

4429
  """
4430
  HPATH = "disk-grow"
4431
  HTYPE = constants.HTYPE_INSTANCE
4432
  _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
4433
  REQ_BGL = False
4434

    
4435
  def ExpandNames(self):
4436
    self._ExpandAndLockInstance()
4437
    self.needed_locks[locking.LEVEL_NODE] = []
4438
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4439

    
4440
  def DeclareLocks(self, level):
4441
    if level == locking.LEVEL_NODE:
4442
      self._LockInstancesNodes()
4443

    
4444
  def BuildHooksEnv(self):
4445
    """Build hooks env.
4446

4447
    This runs on the master, the primary and all the secondaries.
4448

4449
    """
4450
    env = {
4451
      "DISK": self.op.disk,
4452
      "AMOUNT": self.op.amount,
4453
      }
4454
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
4455
    nl = [
4456
      self.cfg.GetMasterNode(),
4457
      self.instance.primary_node,
4458
      ]
4459
    return env, nl, nl
4460

    
4461
  def CheckPrereq(self):
4462
    """Check prerequisites.
4463

4464
    This checks that the instance is in the cluster.
4465

4466
    """
4467
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4468
    assert instance is not None, \
4469
      "Cannot retrieve locked instance %s" % self.op.instance_name
4470

    
4471
    self.instance = instance
4472

    
4473
    if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
4474
      raise errors.OpPrereqError("Instance's disk layout does not support"
4475
                                 " growing.")
4476

    
4477
    self.disk = instance.FindDisk(self.op.disk)
4478

    
4479
    nodenames = [instance.primary_node] + list(instance.secondary_nodes)
4480
    nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
4481
                                       instance.hypervisor)
4482
    for node in nodenames:
4483
      info = nodeinfo.get(node, None)
4484
      if not info:
4485
        raise errors.OpPrereqError("Cannot get current information"
4486
                                   " from node '%s'" % node)
4487
      vg_free = info.get('vg_free', None)
4488
      if not isinstance(vg_free, int):
4489
        raise errors.OpPrereqError("Can't compute free disk space on"
4490
                                   " node %s" % node)
4491
      if self.op.amount > info['vg_free']:
4492
        raise errors.OpPrereqError("Not enough disk space on target node %s:"
4493
                                   " %d MiB available, %d MiB required" %
4494
                                   (node, info['vg_free'], self.op.amount))
4495

    
4496
  def Exec(self, feedback_fn):
4497
    """Execute disk grow.
4498

4499
    """
4500
    instance = self.instance
4501
    disk = self.disk
4502
    for node in (instance.secondary_nodes + (instance.primary_node,)):
4503
      self.cfg.SetDiskID(disk, node)
4504
      result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
4505
      if (not result or not isinstance(result, (list, tuple)) or
4506
          len(result) != 2):
4507
        raise errors.OpExecError("grow request failed to node %s" % node)
4508
      elif not result[0]:
4509
        raise errors.OpExecError("grow request failed to node %s: %s" %
4510
                                 (node, result[1]))
4511
    disk.RecordGrow(self.op.amount)
4512
    self.cfg.Update(instance)
4513
    if self.op.wait_for_sync:
4514
      disk_abort = not _WaitForSync(self, instance)
4515
      if disk_abort:
4516
        self.proc.LogWarning("Warning: disk sync-ing has not returned a good"
4517
                             " status.\nPlease check the instance.")
4518

    
4519

    
4520
class LUQueryInstanceData(NoHooksLU):
4521
  """Query runtime instance data.
4522

4523
  """
4524
  _OP_REQP = ["instances", "static"]
4525
  REQ_BGL = False
4526

    
4527
  def ExpandNames(self):
4528
    self.needed_locks = {}
4529
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
4530

    
4531
    if not isinstance(self.op.instances, list):
4532
      raise errors.OpPrereqError("Invalid argument type 'instances'")
4533

    
4534
    if self.op.instances:
4535
      self.wanted_names = []
4536
      for name in self.op.instances:
4537
        full_name = self.cfg.ExpandInstanceName(name)
4538
        if full_name is None:
4539
          raise errors.OpPrereqError("Instance '%s' not known" %
4540
                                     self.op.instance_name)
4541
        self.wanted_names.append(full_name)
4542
      self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
4543
    else:
4544
      self.wanted_names = None
4545
      self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
4546

    
4547
    self.needed_locks[locking.LEVEL_NODE] = []
4548
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4549

    
4550
  def DeclareLocks(self, level):
4551
    if level == locking.LEVEL_NODE:
4552
      self._LockInstancesNodes()
4553

    
4554
  def CheckPrereq(self):
4555
    """Check prerequisites.
4556

4557
    This only checks the optional instance list against the existing names.
4558

4559
    """
4560
    if self.wanted_names is None:
4561
      self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
4562

    
4563
    self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
4564
                             in self.wanted_names]
4565
    return
4566

    
4567
  def _ComputeDiskStatus(self, instance, snode, dev):
4568
    """Compute block device status.
4569

4570
    """
4571
    static = self.op.static
4572
    if not static:
4573
      self.cfg.SetDiskID(dev, instance.primary_node)
4574
      dev_pstatus = self.rpc.call_blockdev_find(instance.primary_node, dev)
4575
    else:
4576
      dev_pstatus = None
4577

    
4578
    if dev.dev_type in constants.LDS_DRBD:
4579
      # we change the snode then (otherwise we use the one passed in)
4580
      if dev.logical_id[0] == instance.primary_node:
4581
        snode = dev.logical_id[1]
4582
      else:
4583
        snode = dev.logical_id[0]
4584

    
4585
    if snode and not static:
4586
      self.cfg.SetDiskID(dev, snode)
4587
      dev_sstatus = self.rpc.call_blockdev_find(snode, dev)
4588
    else:
4589
      dev_sstatus = None
4590

    
4591
    if dev.children:
4592
      dev_children = [self._ComputeDiskStatus(instance, snode, child)
4593
                      for child in dev.children]
4594
    else:
4595
      dev_children = []
4596

    
4597
    data = {
4598
      "iv_name": dev.iv_name,
4599
      "dev_type": dev.dev_type,
4600
      "logical_id": dev.logical_id,
4601
      "physical_id": dev.physical_id,
4602
      "pstatus": dev_pstatus,
4603
      "sstatus": dev_sstatus,
4604
      "children": dev_children,
4605
      "mode": dev.mode,
4606
      }
4607

    
4608
    return data
4609

    
4610
  def Exec(self, feedback_fn):
4611
    """Gather and return data"""
4612
    result = {}
4613

    
4614
    cluster = self.cfg.GetClusterInfo()
4615

    
4616
    for instance in self.wanted_instances:
4617
      if not self.op.static:
4618
        remote_info = self.rpc.call_instance_info(instance.primary_node,
4619
                                                  instance.name,
4620
                                                  instance.hypervisor)
4621
        if remote_info and "state" in remote_info:
4622
          remote_state = "up"
4623
        else:
4624
          remote_state = "down"
4625
      else:
4626
        remote_state = None
4627
      if instance.status == "down":
4628
        config_state = "down"
4629
      else:
4630
        config_state = "up"
4631

    
4632
      disks = [self._ComputeDiskStatus(instance, None, device)
4633
               for device in instance.disks]
4634

    
4635
      idict = {
4636
        "name": instance.name,
4637
        "config_state": config_state,
4638
        "run_state": remote_state,
4639
        "pnode": instance.primary_node,
4640
        "snodes": instance.secondary_nodes,
4641
        "os": instance.os,
4642
        "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
4643
        "disks": disks,
4644
        "hypervisor": instance.hypervisor,
4645
        "network_port": instance.network_port,
4646
        "hv_instance": instance.hvparams,
4647
        "hv_actual": cluster.FillHV(instance),
4648
        "be_instance": instance.beparams,
4649
        "be_actual": cluster.FillBE(instance),
4650
        }
4651

    
4652
      result[instance.name] = idict
4653

    
4654
    return result
4655

    
4656

    
4657
class LUSetInstanceParams(LogicalUnit):
4658
  """Modifies an instances's parameters.
4659

4660
  """
4661
  HPATH = "instance-modify"
4662
  HTYPE = constants.HTYPE_INSTANCE
4663
  _OP_REQP = ["instance_name", "hvparams"]
4664
  REQ_BGL = False
4665

    
4666
  def ExpandNames(self):
4667
    self._ExpandAndLockInstance()
4668
    self.needed_locks[locking.LEVEL_NODE] = []
4669
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4670

    
4671

    
4672
  def DeclareLocks(self, level):
4673
    if level == locking.LEVEL_NODE:
4674
      self._LockInstancesNodes()
4675

    
4676
  def BuildHooksEnv(self):
4677
    """Build hooks env.
4678

4679
    This runs on the master, primary and secondaries.
4680

4681
    """
4682
    args = dict()
4683
    if constants.BE_MEMORY in self.be_new:
4684
      args['memory'] = self.be_new[constants.BE_MEMORY]
4685
    if constants.BE_VCPUS in self.be_new:
4686
      args['vcpus'] = self.be_new[constants.BE_VCPUS]
4687
    if self.do_ip or self.do_bridge or self.mac:
4688
      if self.do_ip:
4689
        ip = self.ip
4690
      else:
4691
        ip = self.instance.nics[0].ip
4692
      if self.bridge:
4693
        bridge = self.bridge
4694
      else:
4695
        bridge = self.instance.nics[0].bridge
4696
      if self.mac:
4697
        mac = self.mac
4698
      else:
4699
        mac = self.instance.nics[0].mac
4700
      args['nics'] = [(ip, bridge, mac)]
4701
    env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
4702
    nl = [self.cfg.GetMasterNode(),
4703
          self.instance.primary_node] + list(self.instance.secondary_nodes)
4704
    return env, nl, nl
4705

    
4706
  def CheckPrereq(self):
4707
    """Check prerequisites.
4708

4709
    This only checks the instance list against the existing names.
4710

4711
    """
4712
    # FIXME: all the parameters could be checked before, in ExpandNames, or in
4713
    # a separate CheckArguments function, if we implement one, so the operation
4714
    # can be aborted without waiting for any lock, should it have an error...
4715
    self.ip = getattr(self.op, "ip", None)
4716
    self.mac = getattr(self.op, "mac", None)
4717
    self.bridge = getattr(self.op, "bridge", None)
4718
    self.kernel_path = getattr(self.op, "kernel_path", None)
4719
    self.initrd_path = getattr(self.op, "initrd_path", None)
4720
    self.force = getattr(self.op, "force", None)
4721
    all_parms = [self.ip, self.bridge, self.mac]
4722
    if (all_parms.count(None) == len(all_parms) and
4723
        not self.op.hvparams and
4724
        not self.op.beparams):
4725
      raise errors.OpPrereqError("No changes submitted")
4726
    for item in (constants.BE_MEMORY, constants.BE_VCPUS):
4727
      val = self.op.beparams.get(item, None)
4728
      if val is not None:
4729
        try:
4730
          val = int(val)
4731
        except ValueError, err:
4732
          raise errors.OpPrereqError("Invalid %s size: %s" % (item, str(err)))
4733
        self.op.beparams[item] = val
4734
    if self.ip is not None:
4735
      self.do_ip = True
4736
      if self.ip.lower() == "none":
4737
        self.ip = None
4738
      else:
4739
        if not utils.IsValidIP(self.ip):
4740
          raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
4741
    else:
4742
      self.do_ip = False
4743
    self.do_bridge = (self.bridge is not None)
4744
    if self.mac is not None:
4745
      if self.cfg.IsMacInUse(self.mac):
4746
        raise errors.OpPrereqError('MAC address %s already in use in cluster' %
4747
                                   self.mac)
4748
      if not utils.IsValidMac(self.mac):
4749
        raise errors.OpPrereqError('Invalid MAC address %s' % self.mac)
4750

    
4751
    # checking the new params on the primary/secondary nodes
4752

    
4753
    instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4754
    assert self.instance is not None, \
4755
      "Cannot retrieve locked instance %s" % self.op.instance_name
4756
    pnode = self.instance.primary_node
4757
    nodelist = [pnode]
4758
    nodelist.extend(instance.secondary_nodes)
4759

    
4760
    # hvparams processing
4761
    if self.op.hvparams:
4762
      i_hvdict = copy.deepcopy(instance.hvparams)
4763
      for key, val in self.op.hvparams.iteritems():
4764
        if val is None:
4765
          try:
4766
            del i_hvdict[key]
4767
          except KeyError:
4768
            pass
4769
        else:
4770
          i_hvdict[key] = val
4771
      cluster = self.cfg.GetClusterInfo()
4772
      hv_new = cluster.FillDict(cluster.hvparams[instance.hypervisor],
4773
                                i_hvdict)
4774
      # local check
4775
      hypervisor.GetHypervisor(
4776
        instance.hypervisor).CheckParameterSyntax(hv_new)
4777
      _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
4778
      self.hv_new = hv_new # the new actual values
4779
      self.hv_inst = i_hvdict # the new dict (without defaults)
4780
    else:
4781
      self.hv_new = self.hv_inst = {}
4782

    
4783
    # beparams processing
4784
    if self.op.beparams:
4785
      i_bedict = copy.deepcopy(instance.beparams)
4786
      for key, val in self.op.beparams.iteritems():
4787
        if val is None:
4788
          try:
4789
            del i_bedict[key]
4790
          except KeyError:
4791
            pass
4792
        else:
4793
          i_bedict[key] = val
4794
      cluster = self.cfg.GetClusterInfo()
4795
      be_new = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
4796
                                i_bedict)
4797
      self.be_new = be_new # the new actual values
4798
      self.be_inst = i_bedict # the new dict (without defaults)
4799
    else:
4800
      self.hv_new = self.hv_inst = {}
4801

    
4802
    self.warn = []
4803

    
4804
    if constants.BE_MEMORY in self.op.beparams and not self.force:
4805
      mem_check_list = [pnode]
4806
      if be_new[constants.BE_AUTO_BALANCE]:
4807
        # either we changed auto_balance to yes or it was from before
4808
        mem_check_list.extend(instance.secondary_nodes)
4809
      instance_info = self.rpc.call_instance_info(pnode, instance.name,
4810
                                                  instance.hypervisor)
4811
      nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(),
4812
                                         instance.hypervisor)
4813

    
4814
      if pnode not in nodeinfo or not isinstance(nodeinfo[pnode], dict):
4815
        # Assume the primary node is unreachable and go ahead
4816
        self.warn.append("Can't get info from primary node %s" % pnode)
4817
      else:
4818
        if instance_info:
4819
          current_mem = instance_info['memory']
4820
        else:
4821
          # Assume instance not running
4822
          # (there is a slight race condition here, but it's not very probable,
4823
          # and we have no other way to check)
4824
          current_mem = 0
4825
        miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
4826
                    nodeinfo[pnode]['memory_free'])
4827
        if miss_mem > 0:
4828
          raise errors.OpPrereqError("This change will prevent the instance"
4829
                                     " from starting, due to %d MB of memory"
4830
                                     " missing on its primary node" % miss_mem)
4831

    
4832
      if be_new[constants.BE_AUTO_BALANCE]:
4833
        for node in instance.secondary_nodes:
4834
          if node not in nodeinfo or not isinstance(nodeinfo[node], dict):
4835
            self.warn.append("Can't get info from secondary node %s" % node)
4836
          elif be_new[constants.BE_MEMORY] > nodeinfo[node]['memory_free']:
4837
            self.warn.append("Not enough memory to failover instance to"
4838
                             " secondary node %s" % node)
4839

    
4840
    return
4841

    
4842
  def Exec(self, feedback_fn):
4843
    """Modifies an instance.
4844

4845
    All parameters take effect only at the next restart of the instance.
4846
    """
4847
    # Process here the warnings from CheckPrereq, as we don't have a
4848
    # feedback_fn there.
4849
    for warn in self.warn:
4850
      feedback_fn("WARNING: %s" % warn)
4851

    
4852
    result = []
4853
    instance = self.instance
4854
    if self.do_ip:
4855
      instance.nics[0].ip = self.ip
4856
      result.append(("ip", self.ip))
4857
    if self.bridge:
4858
      instance.nics[0].bridge = self.bridge
4859
      result.append(("bridge", self.bridge))
4860
    if self.mac:
4861
      instance.nics[0].mac = self.mac
4862
      result.append(("mac", self.mac))
4863
    if self.op.hvparams:
4864
      instance.hvparams = self.hv_new
4865
      for key, val in self.op.hvparams.iteritems():
4866
        result.append(("hv/%s" % key, val))
4867
    if self.op.beparams:
4868
      instance.beparams = self.be_inst
4869
      for key, val in self.op.beparams.iteritems():
4870
        result.append(("be/%s" % key, val))
4871

    
4872
    self.cfg.Update(instance)
4873

    
4874
    return result
4875

    
4876

    
4877
class LUQueryExports(NoHooksLU):
4878
  """Query the exports list
4879

4880
  """
4881
  _OP_REQP = ['nodes']
4882
  REQ_BGL = False
4883

    
4884
  def ExpandNames(self):
4885
    self.needed_locks = {}
4886
    self.share_locks[locking.LEVEL_NODE] = 1
4887
    if not self.op.nodes:
4888
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4889
    else:
4890
      self.needed_locks[locking.LEVEL_NODE] = \
4891
        _GetWantedNodes(self, self.op.nodes)
4892

    
4893
  def CheckPrereq(self):
4894
    """Check prerequisites.
4895

4896
    """
4897
    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
4898

    
4899
  def Exec(self, feedback_fn):
4900
    """Compute the list of all the exported system images.
4901

4902
    @rtype: dict
4903
    @return: a dictionary with the structure node->(export-list)
4904
        where export-list is a list of the instances exported on
4905
        that node.
4906

4907
    """
4908
    return self.rpc.call_export_list(self.nodes)
4909

    
4910

    
4911
class LUExportInstance(LogicalUnit):
4912
  """Export an instance to an image in the cluster.
4913

4914
  """
4915
  HPATH = "instance-export"
4916
  HTYPE = constants.HTYPE_INSTANCE
4917
  _OP_REQP = ["instance_name", "target_node", "shutdown"]
4918
  REQ_BGL = False
4919

    
4920
  def ExpandNames(self):
4921
    self._ExpandAndLockInstance()
4922
    # FIXME: lock only instance primary and destination node
4923
    #
4924
    # Sad but true, for now we have do lock all nodes, as we don't know where
4925
    # the previous export might be, and and in this LU we search for it and
4926
    # remove it from its current node. In the future we could fix this by:
4927
    #  - making a tasklet to search (share-lock all), then create the new one,
4928
    #    then one to remove, after
4929
    #  - removing the removal operation altoghether
4930
    self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4931

    
4932
  def DeclareLocks(self, level):
4933
    """Last minute lock declaration."""
4934
    # All nodes are locked anyway, so nothing to do here.
4935

    
4936
  def BuildHooksEnv(self):
4937
    """Build hooks env.
4938

4939
    This will run on the master, primary node and target node.
4940

4941
    """
4942
    env = {
4943
      "EXPORT_NODE": self.op.target_node,
4944
      "EXPORT_DO_SHUTDOWN": self.op.shutdown,
4945
      }
4946
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
4947
    nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
4948
          self.op.target_node]
4949
    return env, nl, nl
4950

    
4951
  def CheckPrereq(self):
4952
    """Check prerequisites.
4953

4954
    This checks that the instance and node names are valid.
4955

4956
    """
4957
    instance_name = self.op.instance_name
4958
    self.instance = self.cfg.GetInstanceInfo(instance_name)
4959
    assert self.instance is not None, \
4960
          "Cannot retrieve locked instance %s" % self.op.instance_name
4961

    
4962
    self.dst_node = self.cfg.GetNodeInfo(
4963
      self.cfg.ExpandNodeName(self.op.target_node))
4964

    
4965
    assert self.dst_node is not None, \
4966
          "Cannot retrieve locked node %s" % self.op.target_node
4967

    
4968
    # instance disk type verification
4969
    for disk in self.instance.disks:
4970
      if disk.dev_type == constants.LD_FILE:
4971
        raise errors.OpPrereqError("Export not supported for instances with"
4972
                                   " file-based disks")
4973

    
4974
  def Exec(self, feedback_fn):
4975
    """Export an instance to an image in the cluster.
4976

4977
    """
4978
    instance = self.instance
4979
    dst_node = self.dst_node
4980
    src_node = instance.primary_node
4981
    if self.op.shutdown:
4982
      # shutdown the instance, but not the disks
4983
      if not self.rpc.call_instance_shutdown(src_node, instance):
4984
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
4985
                                 (instance.name, src_node))
4986

    
4987
    vgname = self.cfg.GetVGName()
4988

    
4989
    snap_disks = []
4990

    
4991
    try:
4992
      for disk in instance.disks:
4993
        # new_dev_name will be a snapshot of an lvm leaf of the one we passed
4994
        new_dev_name = self.rpc.call_blockdev_snapshot(src_node, disk)
4995

    
4996
        if not new_dev_name:
4997
          self.LogWarning("Could not snapshot block device %s on node %s",
4998
                          disk.logical_id[1], src_node)
4999
          snap_disks.append(False)
5000
        else:
5001
          new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
5002
                                 logical_id=(vgname, new_dev_name),
5003
                                 physical_id=(vgname, new_dev_name),
5004
                                 iv_name=disk.iv_name)
5005
          snap_disks.append(new_dev)
5006

    
5007
    finally:
5008
      if self.op.shutdown and instance.status == "up":
5009
        if not self.rpc.call_instance_start(src_node, instance, None):
5010
          _ShutdownInstanceDisks(self, instance)
5011
          raise errors.OpExecError("Could not start instance")
5012

    
5013
    # TODO: check for size
5014

    
5015
    cluster_name = self.cfg.GetClusterName()
5016
    for idx, dev in enumerate(snap_disks):
5017
      if dev:
5018
        if not self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
5019
                                             instance, cluster_name, idx):
5020
          self.LogWarning("Could not export block device %s from node %s to"
5021
                          " node %s", dev.logical_id[1], src_node,
5022
                          dst_node.name)
5023
        if not self.rpc.call_blockdev_remove(src_node, dev):
5024
          self.LogWarning("Could not remove snapshot block device %s from node"
5025
                          " %s", dev.logical_id[1], src_node)
5026

    
5027
    if not self.rpc.call_finalize_export(dst_node.name, instance, snap_disks):
5028
      self.LogWarning("Could not finalize export for instance %s on node %s",
5029
                      instance.name, dst_node.name)
5030

    
5031
    nodelist = self.cfg.GetNodeList()
5032
    nodelist.remove(dst_node.name)
5033

    
5034
    # on one-node clusters nodelist will be empty after the removal
5035
    # if we proceed the backup would be removed because OpQueryExports
5036
    # substitutes an empty list with the full cluster node list.
5037
    if nodelist:
5038
      exportlist = self.rpc.call_export_list(nodelist)
5039
      for node in exportlist:
5040
        if instance.name in exportlist[node]:
5041
          if not self.rpc.call_export_remove(node, instance.name):
5042
            self.LogWarning("Could not remove older export for instance %s"
5043
                            " on node %s", instance.name, node)
5044

    
5045

    
5046
class LURemoveExport(NoHooksLU):
5047
  """Remove exports related to the named instance.
5048

5049
  """
5050
  _OP_REQP = ["instance_name"]
5051
  REQ_BGL = False
5052

    
5053
  def ExpandNames(self):
5054
    self.needed_locks = {}
5055
    # We need all nodes to be locked in order for RemoveExport to work, but we
5056
    # don't need to lock the instance itself, as nothing will happen to it (and
5057
    # we can remove exports also for a removed instance)
5058
    self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5059

    
5060
  def CheckPrereq(self):
5061
    """Check prerequisites.
5062
    """
5063
    pass
5064

    
5065
  def Exec(self, feedback_fn):
5066
    """Remove any export.
5067

5068
    """
5069
    instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
5070
    # If the instance was not found we'll try with the name that was passed in.
5071
    # This will only work if it was an FQDN, though.
5072
    fqdn_warn = False
5073
    if not instance_name:
5074
      fqdn_warn = True
5075
      instance_name = self.op.instance_name
5076

    
5077
    exportlist = self.rpc.call_export_list(self.acquired_locks[
5078
      locking.LEVEL_NODE])
5079
    found = False
5080
    for node in exportlist:
5081
      if instance_name in exportlist[node]:
5082
        found = True
5083
        if not self.rpc.call_export_remove(node, instance_name):
5084
          logging.error("Could not remove export for instance %s"
5085
                        " on node %s", instance_name, node)
5086

    
5087
    if fqdn_warn and not found:
5088
      feedback_fn("Export not found. If trying to remove an export belonging"
5089
                  " to a deleted instance please use its Fully Qualified"
5090
                  " Domain Name.")
5091

    
5092

    
5093
class TagsLU(NoHooksLU):
5094
  """Generic tags LU.
5095

5096
  This is an abstract class which is the parent of all the other tags LUs.
5097

5098
  """
5099

    
5100
  def ExpandNames(self):
5101
    self.needed_locks = {}
5102
    if self.op.kind == constants.TAG_NODE:
5103
      name = self.cfg.ExpandNodeName(self.op.name)
5104
      if name is None:
5105
        raise errors.OpPrereqError("Invalid node name (%s)" %
5106
                                   (self.op.name,))
5107
      self.op.name = name
5108
      self.needed_locks[locking.LEVEL_NODE] = name
5109
    elif self.op.kind == constants.TAG_INSTANCE:
5110
      name = self.cfg.ExpandInstanceName(self.op.name)
5111
      if name is None:
5112
        raise errors.OpPrereqError("Invalid instance name (%s)" %
5113
                                   (self.op.name,))
5114
      self.op.name = name
5115
      self.needed_locks[locking.LEVEL_INSTANCE] = name
5116

    
5117
  def CheckPrereq(self):
5118
    """Check prerequisites.
5119

5120
    """
5121
    if self.op.kind == constants.TAG_CLUSTER:
5122
      self.target = self.cfg.GetClusterInfo()
5123
    elif self.op.kind == constants.TAG_NODE:
5124
      self.target = self.cfg.GetNodeInfo(self.op.name)
5125
    elif self.op.kind == constants.TAG_INSTANCE:
5126
      self.target = self.cfg.GetInstanceInfo(self.op.name)
5127
    else:
5128
      raise errors.OpPrereqError("Wrong tag type requested (%s)" %
5129
                                 str(self.op.kind))
5130

    
5131

    
5132
class LUGetTags(TagsLU):
5133
  """Returns the tags of a given object.
5134

5135
  """
5136
  _OP_REQP = ["kind", "name"]
5137
  REQ_BGL = False
5138

    
5139
  def Exec(self, feedback_fn):
5140
    """Returns the tag list.
5141

5142
    """
5143
    return list(self.target.GetTags())
5144

    
5145

    
5146
class LUSearchTags(NoHooksLU):
5147
  """Searches the tags for a given pattern.
5148

5149
  """
5150
  _OP_REQP = ["pattern"]
5151
  REQ_BGL = False
5152

    
5153
  def ExpandNames(self):
5154
    self.needed_locks = {}
5155

    
5156
  def CheckPrereq(self):
5157
    """Check prerequisites.
5158

5159
    This checks the pattern passed for validity by compiling it.
5160

5161
    """
5162
    try:
5163
      self.re = re.compile(self.op.pattern)
5164
    except re.error, err:
5165
      raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
5166
                                 (self.op.pattern, err))
5167

    
5168
  def Exec(self, feedback_fn):
5169
    """Returns the tag list.
5170

5171
    """
5172
    cfg = self.cfg
5173
    tgts = [("/cluster", cfg.GetClusterInfo())]
5174
    ilist = cfg.GetAllInstancesInfo().values()
5175
    tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
5176
    nlist = cfg.GetAllNodesInfo().values()
5177
    tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
5178
    results = []
5179
    for path, target in tgts:
5180
      for tag in target.GetTags():
5181
        if self.re.search(tag):
5182
          results.append((path, tag))
5183
    return results
5184

    
5185

    
5186
class LUAddTags(TagsLU):
5187
  """Sets a tag on a given object.
5188

5189
  """
5190
  _OP_REQP = ["kind", "name", "tags"]
5191
  REQ_BGL = False
5192

    
5193
  def CheckPrereq(self):
5194
    """Check prerequisites.
5195

5196
    This checks the type and length of the tag name and value.
5197

5198
    """
5199
    TagsLU.CheckPrereq(self)
5200
    for tag in self.op.tags:
5201
      objects.TaggableObject.ValidateTag(tag)
5202

    
5203
  def Exec(self, feedback_fn):
5204
    """Sets the tag.
5205

5206
    """
5207
    try:
5208
      for tag in self.op.tags:
5209
        self.target.AddTag(tag)
5210
    except errors.TagError, err:
5211
      raise errors.OpExecError("Error while setting tag: %s" % str(err))
5212
    try:
5213
      self.cfg.Update(self.target)
5214
    except errors.ConfigurationError:
5215
      raise errors.OpRetryError("There has been a modification to the"
5216
                                " config file and the operation has been"
5217
                                " aborted. Please retry.")
5218

    
5219

    
5220
class LUDelTags(TagsLU):
5221
  """Delete a list of tags from a given object.
5222

5223
  """
5224
  _OP_REQP = ["kind", "name", "tags"]
5225
  REQ_BGL = False
5226

    
5227
  def CheckPrereq(self):
5228
    """Check prerequisites.
5229

5230
    This checks that we have the given tag.
5231

5232
    """
5233
    TagsLU.CheckPrereq(self)
5234
    for tag in self.op.tags:
5235
      objects.TaggableObject.ValidateTag(tag)
5236
    del_tags = frozenset(self.op.tags)
5237
    cur_tags = self.target.GetTags()
5238
    if not del_tags <= cur_tags:
5239
      diff_tags = del_tags - cur_tags
5240
      diff_names = ["'%s'" % tag for tag in diff_tags]
5241
      diff_names.sort()
5242
      raise errors.OpPrereqError("Tag(s) %s not found" %
5243
                                 (",".join(diff_names)))
5244

    
5245
  def Exec(self, feedback_fn):
5246
    """Remove the tag from the object.
5247

5248
    """
5249
    for tag in self.op.tags:
5250
      self.target.RemoveTag(tag)
5251
    try:
5252
      self.cfg.Update(self.target)
5253
    except errors.ConfigurationError:
5254
      raise errors.OpRetryError("There has been a modification to the"
5255
                                " config file and the operation has been"
5256
                                " aborted. Please retry.")
5257

    
5258

    
5259
class LUTestDelay(NoHooksLU):
5260
  """Sleep for a specified amount of time.
5261

5262
  This LU sleeps on the master and/or nodes for a specified amount of
5263
  time.
5264

5265
  """
5266
  _OP_REQP = ["duration", "on_master", "on_nodes"]
5267
  REQ_BGL = False
5268

    
5269
  def ExpandNames(self):
5270
    """Expand names and set required locks.
5271

5272
    This expands the node list, if any.
5273

5274
    """
5275
    self.needed_locks = {}
5276
    if self.op.on_nodes:
5277
      # _GetWantedNodes can be used here, but is not always appropriate to use
5278
      # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
5279
      # more information.
5280
      self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
5281
      self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
5282

    
5283
  def CheckPrereq(self):
5284
    """Check prerequisites.
5285

5286
    """
5287

    
5288
  def Exec(self, feedback_fn):
5289
    """Do the actual sleep.
5290

5291
    """
5292
    if self.op.on_master:
5293
      if not utils.TestDelay(self.op.duration):
5294
        raise errors.OpExecError("Error during master delay test")
5295
    if self.op.on_nodes:
5296
      result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
5297
      if not result:
5298
        raise errors.OpExecError("Complete failure from rpc call")
5299
      for node, node_result in result.items():
5300
        if not node_result:
5301
          raise errors.OpExecError("Failure during rpc call to node %s,"
5302
                                   " result: %s" % (node, node_result))
5303

    
5304

    
5305
class IAllocator(object):
5306
  """IAllocator framework.
5307

5308
  An IAllocator instance has three sets of attributes:
5309
    - cfg that is needed to query the cluster
5310
    - input data (all members of the _KEYS class attribute are required)
5311
    - four buffer attributes (in|out_data|text), that represent the
5312
      input (to the external script) in text and data structure format,
5313
      and the output from it, again in two formats
5314
    - the result variables from the script (success, info, nodes) for
5315
      easy usage
5316

5317
  """
5318
  _ALLO_KEYS = [
5319
    "mem_size", "disks", "disk_template",
5320
    "os", "tags", "nics", "vcpus", "hypervisor",
5321
    ]
5322
  _RELO_KEYS = [
5323
    "relocate_from",
5324
    ]
5325

    
5326
  def __init__(self, lu, mode, name, **kwargs):
5327
    self.lu = lu
5328
    # init buffer variables
5329
    self.in_text = self.out_text = self.in_data = self.out_data = None
5330
    # init all input fields so that pylint is happy
5331
    self.mode = mode
5332
    self.name = name
5333
    self.mem_size = self.disks = self.disk_template = None
5334
    self.os = self.tags = self.nics = self.vcpus = None
5335
    self.relocate_from = None
5336
    # computed fields
5337
    self.required_nodes = None
5338
    # init result fields
5339
    self.success = self.info = self.nodes = None
5340
    if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5341
      keyset = self._ALLO_KEYS
5342
    elif self.mode == constants.IALLOCATOR_MODE_RELOC:
5343
      keyset = self._RELO_KEYS
5344
    else:
5345
      raise errors.ProgrammerError("Unknown mode '%s' passed to the"
5346
                                   " IAllocator" % self.mode)
5347
    for key in kwargs:
5348
      if key not in keyset:
5349
        raise errors.ProgrammerError("Invalid input parameter '%s' to"
5350
                                     " IAllocator" % key)
5351
      setattr(self, key, kwargs[key])
5352
    for key in keyset:
5353
      if key not in kwargs:
5354
        raise errors.ProgrammerError("Missing input parameter '%s' to"
5355
                                     " IAllocator" % key)
5356
    self._BuildInputData()
5357

    
5358
  def _ComputeClusterData(self):
5359
    """Compute the generic allocator input data.
5360

5361
    This is the data that is independent of the actual operation.
5362

5363
    """
5364
    cfg = self.lu.cfg
5365
    cluster_info = cfg.GetClusterInfo()
5366
    # cluster data
5367
    data = {
5368
      "version": 1,
5369
      "cluster_name": cfg.GetClusterName(),
5370
      "cluster_tags": list(cluster_info.GetTags()),
5371
      "enable_hypervisors": list(cluster_info.enabled_hypervisors),
5372
      # we don't have job IDs
5373
      }
5374
    iinfo = cfg.GetAllInstancesInfo().values()
5375
    i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo]
5376

    
5377
    # node data
5378
    node_results = {}
5379
    node_list = cfg.GetNodeList()
5380

    
5381
    if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5382
      hypervisor = self.hypervisor
5383
    elif self.mode == constants.IALLOCATOR_MODE_RELOC:
5384
      hypervisor = cfg.GetInstanceInfo(self.name).hypervisor
5385

    
5386
    node_data = self.lu.rpc.call_node_info(node_list, cfg.GetVGName(),
5387
                                           hypervisor)
5388
    node_iinfo = self.lu.rpc.call_all_instances_info(node_list,
5389
                       cluster_info.enabled_hypervisors)
5390
    for nname in node_list:
5391
      ninfo = cfg.GetNodeInfo(nname)
5392
      if nname not in node_data or not isinstance(node_data[nname], dict):
5393
        raise errors.OpExecError("Can't get data for node %s" % nname)
5394
      remote_info = node_data[nname]
5395
      for attr in ['memory_total', 'memory_free', 'memory_dom0',
5396
                   'vg_size', 'vg_free', 'cpu_total']:
5397
        if attr not in remote_info:
5398
          raise errors.OpExecError("Node '%s' didn't return attribute '%s'" %
5399
                                   (nname, attr))
5400
        try:
5401
          remote_info[attr] = int(remote_info[attr])
5402
        except ValueError, err:
5403
          raise errors.OpExecError("Node '%s' returned invalid value for '%s':"
5404
                                   " %s" % (nname, attr, str(err)))
5405
      # compute memory used by primary instances
5406
      i_p_mem = i_p_up_mem = 0
5407
      for iinfo, beinfo in i_list:
5408
        if iinfo.primary_node == nname:
5409
          i_p_mem += beinfo[constants.BE_MEMORY]
5410
          if iinfo.name not in node_iinfo[nname]:
5411
            i_used_mem = 0
5412
          else:
5413
            i_used_mem = int(node_iinfo[nname][iinfo.name]['memory'])
5414
          i_mem_diff = beinfo[constants.BE_MEMORY] - i_used_mem
5415
          remote_info['memory_free'] -= max(0, i_mem_diff)
5416

    
5417
          if iinfo.status == "up":
5418
            i_p_up_mem += beinfo[constants.BE_MEMORY]
5419

    
5420
      # compute memory used by instances
5421
      pnr = {
5422
        "tags": list(ninfo.GetTags()),
5423
        "total_memory": remote_info['memory_total'],
5424
        "reserved_memory": remote_info['memory_dom0'],
5425
        "free_memory": remote_info['memory_free'],
5426
        "i_pri_memory": i_p_mem,
5427
        "i_pri_up_memory": i_p_up_mem,
5428
        "total_disk": remote_info['vg_size'],
5429
        "free_disk": remote_info['vg_free'],
5430
        "primary_ip": ninfo.primary_ip,
5431
        "secondary_ip": ninfo.secondary_ip,
5432
        "total_cpus": remote_info['cpu_total'],
5433
        }
5434
      node_results[nname] = pnr
5435
    data["nodes"] = node_results
5436

    
5437
    # instance data
5438
    instance_data = {}
5439
    for iinfo, beinfo in i_list:
5440
      nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
5441
                  for n in iinfo.nics]
5442
      pir = {
5443
        "tags": list(iinfo.GetTags()),
5444
        "should_run": iinfo.status == "up",
5445
        "vcpus": beinfo[constants.BE_VCPUS],
5446
        "memory": beinfo[constants.BE_MEMORY],
5447
        "os": iinfo.os,
5448
        "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
5449
        "nics": nic_data,
5450
        "disks": [{"size": dsk.size, "mode": "w"} for dsk in iinfo.disks],
5451
        "disk_template": iinfo.disk_template,
5452
        "hypervisor": iinfo.hypervisor,
5453
        }
5454
      instance_data[iinfo.name] = pir
5455

    
5456
    data["instances"] = instance_data
5457

    
5458
    self.in_data = data
5459

    
5460
  def _AddNewInstance(self):
5461
    """Add new instance data to allocator structure.
5462

5463
    This in combination with _AllocatorGetClusterData will create the
5464
    correct structure needed as input for the allocator.
5465

5466
    The checks for the completeness of the opcode must have already been
5467
    done.
5468

5469
    """
5470
    data = self.in_data
5471
    if len(self.disks) != 2:
5472
      raise errors.OpExecError("Only two-disk configurations supported")
5473

    
5474
    disk_space = _ComputeDiskSize(self.disk_template, self.disks)
5475

    
5476
    if self.disk_template in constants.DTS_NET_MIRROR:
5477
      self.required_nodes = 2
5478
    else:
5479
      self.required_nodes = 1
5480
    request = {
5481
      "type": "allocate",
5482
      "name": self.name,
5483
      "disk_template": self.disk_template,
5484
      "tags": self.tags,
5485
      "os": self.os,
5486
      "vcpus": self.vcpus,
5487
      "memory": self.mem_size,
5488
      "disks": self.disks,
5489
      "disk_space_total": disk_space,
5490
      "nics": self.nics,
5491
      "required_nodes": self.required_nodes,
5492
      }
5493
    data["request"] = request
5494

    
5495
  def _AddRelocateInstance(self):
5496
    """Add relocate instance data to allocator structure.
5497

5498
    This in combination with _IAllocatorGetClusterData will create the
5499
    correct structure needed as input for the allocator.
5500

5501
    The checks for the completeness of the opcode must have already been
5502
    done.
5503

5504
    """
5505
    instance = self.lu.cfg.GetInstanceInfo(self.name)
5506
    if instance is None:
5507
      raise errors.ProgrammerError("Unknown instance '%s' passed to"
5508
                                   " IAllocator" % self.name)
5509

    
5510
    if instance.disk_template not in constants.DTS_NET_MIRROR:
5511
      raise errors.OpPrereqError("Can't relocate non-mirrored instances")
5512

    
5513
    if len(instance.secondary_nodes) != 1:
5514
      raise errors.OpPrereqError("Instance has not exactly one secondary node")
5515

    
5516
    self.required_nodes = 1
5517
    disk_sizes = [{'size': disk.size} for disk in instance.disks]
5518
    disk_space = _ComputeDiskSize(instance.disk_template, disk_sizes)
5519

    
5520
    request = {
5521
      "type": "relocate",
5522
      "name": self.name,
5523
      "disk_space_total": disk_space,
5524
      "required_nodes": self.required_nodes,
5525
      "relocate_from": self.relocate_from,
5526
      }
5527
    self.in_data["request"] = request
5528

    
5529
  def _BuildInputData(self):
5530
    """Build input data structures.
5531

5532
    """
5533
    self._ComputeClusterData()
5534

    
5535
    if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5536
      self._AddNewInstance()
5537
    else:
5538
      self._AddRelocateInstance()
5539

    
5540
    self.in_text = serializer.Dump(self.in_data)
5541

    
5542
  def Run(self, name, validate=True, call_fn=None):
5543
    """Run an instance allocator and return the results.
5544

5545
    """
5546
    if call_fn is None:
5547
      call_fn = self.lu.rpc.call_iallocator_runner
5548
    data = self.in_text
5549

    
5550
    result = call_fn(self.lu.cfg.GetMasterNode(), name, self.in_text)
5551

    
5552
    if not isinstance(result, (list, tuple)) or len(result) != 4:
5553
      raise errors.OpExecError("Invalid result from master iallocator runner")
5554

    
5555
    rcode, stdout, stderr, fail = result
5556

    
5557
    if rcode == constants.IARUN_NOTFOUND:
5558
      raise errors.OpExecError("Can't find allocator '%s'" % name)
5559
    elif rcode == constants.IARUN_FAILURE:
5560
      raise errors.OpExecError("Instance allocator call failed: %s,"
5561
                               " output: %s" % (fail, stdout+stderr))
5562
    self.out_text = stdout
5563
    if validate:
5564
      self._ValidateResult()
5565

    
5566
  def _ValidateResult(self):
5567
    """Process the allocator results.
5568

5569
    This will process and if successful save the result in
5570
    self.out_data and the other parameters.
5571

5572
    """
5573
    try:
5574
      rdict = serializer.Load(self.out_text)
5575
    except Exception, err:
5576
      raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
5577

    
5578
    if not isinstance(rdict, dict):
5579
      raise errors.OpExecError("Can't parse iallocator results: not a dict")
5580

    
5581
    for key in "success", "info", "nodes":
5582
      if key not in rdict:
5583
        raise errors.OpExecError("Can't parse iallocator results:"
5584
                                 " missing key '%s'" % key)
5585
      setattr(self, key, rdict[key])
5586

    
5587
    if not isinstance(rdict["nodes"], list):
5588
      raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
5589
                               " is not a list")
5590
    self.out_data = rdict
5591

    
5592

    
5593
class LUTestAllocator(NoHooksLU):
5594
  """Run allocator tests.
5595

5596
  This LU runs the allocator tests
5597

5598
  """
5599
  _OP_REQP = ["direction", "mode", "name"]
5600

    
5601
  def CheckPrereq(self):
5602
    """Check prerequisites.
5603

5604
    This checks the opcode parameters depending on the director and mode test.
5605

5606
    """
5607
    if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5608
      for attr in ["name", "mem_size", "disks", "disk_template",
5609
                   "os", "tags", "nics", "vcpus"]:
5610
        if not hasattr(self.op, attr):
5611
          raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
5612
                                     attr)
5613
      iname = self.cfg.ExpandInstanceName(self.op.name)
5614
      if iname is not None:
5615
        raise errors.OpPrereqError("Instance '%s' already in the cluster" %
5616
                                   iname)
5617
      if not isinstance(self.op.nics, list):
5618
        raise errors.OpPrereqError("Invalid parameter 'nics'")
5619
      for row in self.op.nics:
5620
        if (not isinstance(row, dict) or
5621
            "mac" not in row or
5622
            "ip" not in row or
5623
            "bridge" not in row):
5624
          raise errors.OpPrereqError("Invalid contents of the"
5625
                                     " 'nics' parameter")
5626
      if not isinstance(self.op.disks, list):
5627
        raise errors.OpPrereqError("Invalid parameter 'disks'")
5628
      if len(self.op.disks) != 2:
5629
        raise errors.OpPrereqError("Only two-disk configurations supported")
5630
      for row in self.op.disks:
5631
        if (not isinstance(row, dict) or
5632
            "size" not in row or
5633
            not isinstance(row["size"], int) or
5634
            "mode" not in row or
5635
            row["mode"] not in ['r', 'w']):
5636
          raise errors.OpPrereqError("Invalid contents of the"
5637
                                     " 'disks' parameter")
5638
      if self.op.hypervisor is None:
5639
        self.op.hypervisor = self.cfg.GetHypervisorType()
5640
    elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
5641
      if not hasattr(self.op, "name"):
5642
        raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
5643
      fname = self.cfg.ExpandInstanceName(self.op.name)
5644
      if fname is None:
5645
        raise errors.OpPrereqError("Instance '%s' not found for relocation" %
5646
                                   self.op.name)
5647
      self.op.name = fname
5648
      self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
5649
    else:
5650
      raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
5651
                                 self.op.mode)
5652

    
5653
    if self.op.direction == constants.IALLOCATOR_DIR_OUT:
5654
      if not hasattr(self.op, "allocator") or self.op.allocator is None:
5655
        raise errors.OpPrereqError("Missing allocator name")
5656
    elif self.op.direction != constants.IALLOCATOR_DIR_IN:
5657
      raise errors.OpPrereqError("Wrong allocator test '%s'" %
5658
                                 self.op.direction)
5659

    
5660
  def Exec(self, feedback_fn):
5661
    """Run the allocator test.
5662

5663
    """
5664
    if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5665
      ial = IAllocator(self,
5666
                       mode=self.op.mode,
5667
                       name=self.op.name,
5668
                       mem_size=self.op.mem_size,
5669
                       disks=self.op.disks,
5670
                       disk_template=self.op.disk_template,
5671
                       os=self.op.os,
5672
                       tags=self.op.tags,
5673
                       nics=self.op.nics,
5674
                       vcpus=self.op.vcpus,
5675
                       hypervisor=self.op.hypervisor,
5676
                       )
5677
    else:
5678
      ial = IAllocator(self,
5679
                       mode=self.op.mode,
5680
                       name=self.op.name,
5681
                       relocate_from=list(self.relocate_from),
5682
                       )
5683

    
5684
    if self.op.direction == constants.IALLOCATOR_DIR_IN:
5685
      result = ial.in_text
5686
    else:
5687
      ial.Run(self.op.allocator, validate=False)
5688
      result = ial.out_text
5689
    return result