Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ 8cc7e742

History | View | Annotate | Download (194.9 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0613,W0201
25

    
26
import os
27
import os.path
28
import sha
29
import time
30
import tempfile
31
import re
32
import platform
33
import logging
34
import copy
35

    
36
from ganeti import ssh
37
from ganeti import utils
38
from ganeti import errors
39
from ganeti import hypervisor
40
from ganeti import locking
41
from ganeti import constants
42
from ganeti import objects
43
from ganeti import opcodes
44
from ganeti import serializer
45

    
46

    
47
class LogicalUnit(object):
48
  """Logical Unit base class.
49

50
  Subclasses must follow these rules:
51
    - implement ExpandNames
52
    - implement CheckPrereq
53
    - implement Exec
54
    - implement BuildHooksEnv
55
    - redefine HPATH and HTYPE
56
    - optionally redefine their run requirements:
57
        REQ_MASTER: the LU needs to run on the master node
58
        REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
59

60
  Note that all commands require root permissions.
61

62
  """
63
  HPATH = None
64
  HTYPE = None
65
  _OP_REQP = []
66
  REQ_MASTER = True
67
  REQ_BGL = True
68

    
69
  def __init__(self, processor, op, context, rpc):
70
    """Constructor for LogicalUnit.
71

72
    This needs to be overriden in derived classes in order to check op
73
    validity.
74

75
    """
76
    self.proc = processor
77
    self.op = op
78
    self.cfg = context.cfg
79
    self.context = context
80
    self.rpc = rpc
81
    # Dicts used to declare locking needs to mcpu
82
    self.needed_locks = None
83
    self.acquired_locks = {}
84
    self.share_locks = dict(((i, 0) for i in locking.LEVELS))
85
    self.add_locks = {}
86
    self.remove_locks = {}
87
    # Used to force good behavior when calling helper functions
88
    self.recalculate_locks = {}
89
    self.__ssh = None
90
    # logging
91
    self.LogWarning = processor.LogWarning
92
    self.LogInfo = processor.LogInfo
93

    
94
    for attr_name in self._OP_REQP:
95
      attr_val = getattr(op, attr_name, None)
96
      if attr_val is None:
97
        raise errors.OpPrereqError("Required parameter '%s' missing" %
98
                                   attr_name)
99

    
100
    if not self.cfg.IsCluster():
101
      raise errors.OpPrereqError("Cluster not initialized yet,"
102
                                 " use 'gnt-cluster init' first.")
103
    if self.REQ_MASTER:
104
      master = self.cfg.GetMasterNode()
105
      if master != utils.HostInfo().name:
106
        raise errors.OpPrereqError("Commands must be run on the master"
107
                                   " node %s" % master)
108

    
109
  def __GetSSH(self):
110
    """Returns the SshRunner object
111

112
    """
113
    if not self.__ssh:
114
      self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
115
    return self.__ssh
116

    
117
  ssh = property(fget=__GetSSH)
118

    
119
  def ExpandNames(self):
120
    """Expand names for this LU.
121

122
    This method is called before starting to execute the opcode, and it should
123
    update all the parameters of the opcode to their canonical form (e.g. a
124
    short node name must be fully expanded after this method has successfully
125
    completed). This way locking, hooks, logging, ecc. can work correctly.
126

127
    LUs which implement this method must also populate the self.needed_locks
128
    member, as a dict with lock levels as keys, and a list of needed lock names
129
    as values. Rules:
130

131
      - use an empty dict if you don't need any lock
132
      - if you don't need any lock at a particular level omit that level
133
      - don't put anything for the BGL level
134
      - if you want all locks at a level use locking.ALL_SET as a value
135

136
    If you need to share locks (rather than acquire them exclusively) at one
137
    level you can modify self.share_locks, setting a true value (usually 1) for
138
    that level. By default locks are not shared.
139

140
    Examples::
141

142
      # Acquire all nodes and one instance
143
      self.needed_locks = {
144
        locking.LEVEL_NODE: locking.ALL_SET,
145
        locking.LEVEL_INSTANCE: ['instance1.example.tld'],
146
      }
147
      # Acquire just two nodes
148
      self.needed_locks = {
149
        locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
150
      }
151
      # Acquire no locks
152
      self.needed_locks = {} # No, you can't leave it to the default value None
153

154
    """
155
    # The implementation of this method is mandatory only if the new LU is
156
    # concurrent, so that old LUs don't need to be changed all at the same
157
    # time.
158
    if self.REQ_BGL:
159
      self.needed_locks = {} # Exclusive LUs don't need locks.
160
    else:
161
      raise NotImplementedError
162

    
163
  def DeclareLocks(self, level):
164
    """Declare LU locking needs for a level
165

166
    While most LUs can just declare their locking needs at ExpandNames time,
167
    sometimes there's the need to calculate some locks after having acquired
168
    the ones before. This function is called just before acquiring locks at a
169
    particular level, but after acquiring the ones at lower levels, and permits
170
    such calculations. It can be used to modify self.needed_locks, and by
171
    default it does nothing.
172

173
    This function is only called if you have something already set in
174
    self.needed_locks for the level.
175

176
    @param level: Locking level which is going to be locked
177
    @type level: member of ganeti.locking.LEVELS
178

179
    """
180

    
181
  def CheckPrereq(self):
182
    """Check prerequisites for this LU.
183

184
    This method should check that the prerequisites for the execution
185
    of this LU are fulfilled. It can do internode communication, but
186
    it should be idempotent - no cluster or system changes are
187
    allowed.
188

189
    The method should raise errors.OpPrereqError in case something is
190
    not fulfilled. Its return value is ignored.
191

192
    This method should also update all the parameters of the opcode to
193
    their canonical form if it hasn't been done by ExpandNames before.
194

195
    """
196
    raise NotImplementedError
197

    
198
  def Exec(self, feedback_fn):
199
    """Execute the LU.
200

201
    This method should implement the actual work. It should raise
202
    errors.OpExecError for failures that are somewhat dealt with in
203
    code, or expected.
204

205
    """
206
    raise NotImplementedError
207

    
208
  def BuildHooksEnv(self):
209
    """Build hooks environment for this LU.
210

211
    This method should return a three-node tuple consisting of: a dict
212
    containing the environment that will be used for running the
213
    specific hook for this LU, a list of node names on which the hook
214
    should run before the execution, and a list of node names on which
215
    the hook should run after the execution.
216

217
    The keys of the dict must not have 'GANETI_' prefixed as this will
218
    be handled in the hooks runner. Also note additional keys will be
219
    added by the hooks runner. If the LU doesn't define any
220
    environment, an empty dict (and not None) should be returned.
221

222
    No nodes should be returned as an empty list (and not None).
223

224
    Note that if the HPATH for a LU class is None, this function will
225
    not be called.
226

227
    """
228
    raise NotImplementedError
229

    
230
  def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
231
    """Notify the LU about the results of its hooks.
232

233
    This method is called every time a hooks phase is executed, and notifies
234
    the Logical Unit about the hooks' result. The LU can then use it to alter
235
    its result based on the hooks.  By default the method does nothing and the
236
    previous result is passed back unchanged but any LU can define it if it
237
    wants to use the local cluster hook-scripts somehow.
238

239
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
240
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
241
    @param hook_results: the results of the multi-node hooks rpc call
242
    @param feedback_fn: function used send feedback back to the caller
243
    @param lu_result: the previous Exec result this LU had, or None
244
        in the PRE phase
245
    @return: the new Exec result, based on the previous result
246
        and hook results
247

248
    """
249
    return lu_result
250

    
251
  def _ExpandAndLockInstance(self):
252
    """Helper function to expand and lock an instance.
253

254
    Many LUs that work on an instance take its name in self.op.instance_name
255
    and need to expand it and then declare the expanded name for locking. This
256
    function does it, and then updates self.op.instance_name to the expanded
257
    name. It also initializes needed_locks as a dict, if this hasn't been done
258
    before.
259

260
    """
261
    if self.needed_locks is None:
262
      self.needed_locks = {}
263
    else:
264
      assert locking.LEVEL_INSTANCE not in self.needed_locks, \
265
        "_ExpandAndLockInstance called with instance-level locks set"
266
    expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
267
    if expanded_name is None:
268
      raise errors.OpPrereqError("Instance '%s' not known" %
269
                                  self.op.instance_name)
270
    self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
271
    self.op.instance_name = expanded_name
272

    
273
  def _LockInstancesNodes(self, primary_only=False):
274
    """Helper function to declare instances' nodes for locking.
275

276
    This function should be called after locking one or more instances to lock
277
    their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
278
    with all primary or secondary nodes for instances already locked and
279
    present in self.needed_locks[locking.LEVEL_INSTANCE].
280

281
    It should be called from DeclareLocks, and for safety only works if
282
    self.recalculate_locks[locking.LEVEL_NODE] is set.
283

284
    In the future it may grow parameters to just lock some instance's nodes, or
285
    to just lock primaries or secondary nodes, if needed.
286

287
    If should be called in DeclareLocks in a way similar to::
288

289
      if level == locking.LEVEL_NODE:
290
        self._LockInstancesNodes()
291

292
    @type primary_only: boolean
293
    @param primary_only: only lock primary nodes of locked instances
294

295
    """
296
    assert locking.LEVEL_NODE in self.recalculate_locks, \
297
      "_LockInstancesNodes helper function called with no nodes to recalculate"
298

    
299
    # TODO: check if we're really been called with the instance locks held
300

    
301
    # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
302
    # future we might want to have different behaviors depending on the value
303
    # of self.recalculate_locks[locking.LEVEL_NODE]
304
    wanted_nodes = []
305
    for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
306
      instance = self.context.cfg.GetInstanceInfo(instance_name)
307
      wanted_nodes.append(instance.primary_node)
308
      if not primary_only:
309
        wanted_nodes.extend(instance.secondary_nodes)
310

    
311
    if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
312
      self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
313
    elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
314
      self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
315

    
316
    del self.recalculate_locks[locking.LEVEL_NODE]
317

    
318

    
319
class NoHooksLU(LogicalUnit):
320
  """Simple LU which runs no hooks.
321

322
  This LU is intended as a parent for other LogicalUnits which will
323
  run no hooks, in order to reduce duplicate code.
324

325
  """
326
  HPATH = None
327
  HTYPE = None
328

    
329

    
330
def _GetWantedNodes(lu, nodes):
331
  """Returns list of checked and expanded node names.
332

333
  @type lu: L{LogicalUnit}
334
  @param lu: the logical unit on whose behalf we execute
335
  @type nodes: list
336
  @param nodes: list of node names or None for all nodes
337
  @rtype: list
338
  @return: the list of nodes, sorted
339
  @raise errors.OpProgrammerError: if the nodes parameter is wrong type
340

341
  """
342
  if not isinstance(nodes, list):
343
    raise errors.OpPrereqError("Invalid argument type 'nodes'")
344

    
345
  if not nodes:
346
    raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
347
      " non-empty list of nodes whose name is to be expanded.")
348

    
349
  wanted = []
350
  for name in nodes:
351
    node = lu.cfg.ExpandNodeName(name)
352
    if node is None:
353
      raise errors.OpPrereqError("No such node name '%s'" % name)
354
    wanted.append(node)
355

    
356
  return utils.NiceSort(wanted)
357

    
358

    
359
def _GetWantedInstances(lu, instances):
360
  """Returns list of checked and expanded instance names.
361

362
  @type lu: L{LogicalUnit}
363
  @param lu: the logical unit on whose behalf we execute
364
  @type instances: list
365
  @param instances: list of instance names or None for all instances
366
  @rtype: list
367
  @return: the list of instances, sorted
368
  @raise errors.OpPrereqError: if the instances parameter is wrong type
369
  @raise errors.OpPrereqError: if any of the passed instances is not found
370

371
  """
372
  if not isinstance(instances, list):
373
    raise errors.OpPrereqError("Invalid argument type 'instances'")
374

    
375
  if instances:
376
    wanted = []
377

    
378
    for name in instances:
379
      instance = lu.cfg.ExpandInstanceName(name)
380
      if instance is None:
381
        raise errors.OpPrereqError("No such instance name '%s'" % name)
382
      wanted.append(instance)
383

    
384
  else:
385
    wanted = lu.cfg.GetInstanceList()
386
  return utils.NiceSort(wanted)
387

    
388

    
389
def _CheckOutputFields(static, dynamic, selected):
390
  """Checks whether all selected fields are valid.
391

392
  @type static: L{utils.FieldSet}
393
  @param static: static fields set
394
  @type dynamic: L{utils.FieldSet}
395
  @param dynamic: dynamic fields set
396

397
  """
398
  f = utils.FieldSet()
399
  f.Extend(static)
400
  f.Extend(dynamic)
401

    
402
  delta = f.NonMatching(selected)
403
  if delta:
404
    raise errors.OpPrereqError("Unknown output fields selected: %s"
405
                               % ",".join(delta))
406

    
407

    
408
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
409
                          memory, vcpus, nics):
410
  """Builds instance related env variables for hooks
411

412
  This builds the hook environment from individual variables.
413

414
  @type name: string
415
  @param name: the name of the instance
416
  @type primary_node: string
417
  @param primary_node: the name of the instance's primary node
418
  @type secondary_nodes: list
419
  @param secondary_nodes: list of secondary nodes as strings
420
  @type os_type: string
421
  @param os_type: the name of the instance's OS
422
  @type status: string
423
  @param status: the desired status of the instances
424
  @type memory: string
425
  @param memory: the memory size of the instance
426
  @type vcpus: string
427
  @param vcpus: the count of VCPUs the instance has
428
  @type nics: list
429
  @param nics: list of tuples (ip, bridge, mac) representing
430
      the NICs the instance  has
431
  @rtype: dict
432
  @return: the hook environment for this instance
433

434
  """
435
  env = {
436
    "OP_TARGET": name,
437
    "INSTANCE_NAME": name,
438
    "INSTANCE_PRIMARY": primary_node,
439
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
440
    "INSTANCE_OS_TYPE": os_type,
441
    "INSTANCE_STATUS": status,
442
    "INSTANCE_MEMORY": memory,
443
    "INSTANCE_VCPUS": vcpus,
444
  }
445

    
446
  if nics:
447
    nic_count = len(nics)
448
    for idx, (ip, bridge, mac) in enumerate(nics):
449
      if ip is None:
450
        ip = ""
451
      env["INSTANCE_NIC%d_IP" % idx] = ip
452
      env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
453
      env["INSTANCE_NIC%d_HWADDR" % idx] = mac
454
  else:
455
    nic_count = 0
456

    
457
  env["INSTANCE_NIC_COUNT"] = nic_count
458

    
459
  return env
460

    
461

    
462
def _BuildInstanceHookEnvByObject(lu, instance, override=None):
463
  """Builds instance related env variables for hooks from an object.
464

465
  @type lu: L{LogicalUnit}
466
  @param lu: the logical unit on whose behalf we execute
467
  @type instance: L{objects.Instance}
468
  @param instance: the instance for which we should build the
469
      environment
470
  @type override: dict
471
  @param override: dictionary with key/values that will override
472
      our values
473
  @rtype: dict
474
  @return: the hook environment dictionary
475

476
  """
477
  bep = lu.cfg.GetClusterInfo().FillBE(instance)
478
  args = {
479
    'name': instance.name,
480
    'primary_node': instance.primary_node,
481
    'secondary_nodes': instance.secondary_nodes,
482
    'os_type': instance.os,
483
    'status': instance.os,
484
    'memory': bep[constants.BE_MEMORY],
485
    'vcpus': bep[constants.BE_VCPUS],
486
    'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
487
  }
488
  if override:
489
    args.update(override)
490
  return _BuildInstanceHookEnv(**args)
491

    
492

    
493
def _CheckInstanceBridgesExist(lu, instance):
494
  """Check that the brigdes needed by an instance exist.
495

496
  """
497
  # check bridges existance
498
  brlist = [nic.bridge for nic in instance.nics]
499
  if not lu.rpc.call_bridges_exist(instance.primary_node, brlist):
500
    raise errors.OpPrereqError("one or more target bridges %s does not"
501
                               " exist on destination node '%s'" %
502
                               (brlist, instance.primary_node))
503

    
504

    
505
class LUDestroyCluster(NoHooksLU):
506
  """Logical unit for destroying the cluster.
507

508
  """
509
  _OP_REQP = []
510

    
511
  def CheckPrereq(self):
512
    """Check prerequisites.
513

514
    This checks whether the cluster is empty.
515

516
    Any errors are signalled by raising errors.OpPrereqError.
517

518
    """
519
    master = self.cfg.GetMasterNode()
520

    
521
    nodelist = self.cfg.GetNodeList()
522
    if len(nodelist) != 1 or nodelist[0] != master:
523
      raise errors.OpPrereqError("There are still %d node(s) in"
524
                                 " this cluster." % (len(nodelist) - 1))
525
    instancelist = self.cfg.GetInstanceList()
526
    if instancelist:
527
      raise errors.OpPrereqError("There are still %d instance(s) in"
528
                                 " this cluster." % len(instancelist))
529

    
530
  def Exec(self, feedback_fn):
531
    """Destroys the cluster.
532

533
    """
534
    master = self.cfg.GetMasterNode()
535
    if not self.rpc.call_node_stop_master(master, False):
536
      raise errors.OpExecError("Could not disable the master role")
537
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
538
    utils.CreateBackup(priv_key)
539
    utils.CreateBackup(pub_key)
540
    return master
541

    
542

    
543
class LUVerifyCluster(LogicalUnit):
544
  """Verifies the cluster status.
545

546
  """
547
  HPATH = "cluster-verify"
548
  HTYPE = constants.HTYPE_CLUSTER
549
  _OP_REQP = ["skip_checks"]
550
  REQ_BGL = False
551

    
552
  def ExpandNames(self):
553
    self.needed_locks = {
554
      locking.LEVEL_NODE: locking.ALL_SET,
555
      locking.LEVEL_INSTANCE: locking.ALL_SET,
556
    }
557
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
558

    
559
  def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
560
                  remote_version, feedback_fn):
561
    """Run multiple tests against a node.
562

563
    Test list::
564

565
      - compares ganeti version
566
      - checks vg existance and size > 20G
567
      - checks config file checksum
568
      - checks ssh to other nodes
569

570
    @type node: string
571
    @param node: the name of the node to check
572
    @param file_list: required list of files
573
    @param local_cksum: dictionary of local files and their checksums
574
    @type vglist: dict
575
    @param vglist: dictionary of volume group names and their size
576
    @param node_result: the results from the node
577
    @param remote_version: the RPC version from the remote node
578
    @param feedback_fn: function used to accumulate results
579

580
    """
581
    # compares ganeti version
582
    local_version = constants.PROTOCOL_VERSION
583
    if not remote_version:
584
      feedback_fn("  - ERROR: connection to %s failed" % (node))
585
      return True
586

    
587
    if local_version != remote_version:
588
      feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
589
                      (local_version, node, remote_version))
590
      return True
591

    
592
    # checks vg existance and size > 20G
593

    
594
    bad = False
595
    if not vglist:
596
      feedback_fn("  - ERROR: unable to check volume groups on node %s." %
597
                      (node,))
598
      bad = True
599
    else:
600
      vgstatus = utils.CheckVolumeGroupSize(vglist, self.cfg.GetVGName(),
601
                                            constants.MIN_VG_SIZE)
602
      if vgstatus:
603
        feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
604
        bad = True
605

    
606
    if not node_result:
607
      feedback_fn("  - ERROR: unable to verify node %s." % (node,))
608
      return True
609

    
610
    # checks config file checksum
611
    # checks ssh to any
612

    
613
    if 'filelist' not in node_result:
614
      bad = True
615
      feedback_fn("  - ERROR: node hasn't returned file checksum data")
616
    else:
617
      remote_cksum = node_result['filelist']
618
      for file_name in file_list:
619
        if file_name not in remote_cksum:
620
          bad = True
621
          feedback_fn("  - ERROR: file '%s' missing" % file_name)
622
        elif remote_cksum[file_name] != local_cksum[file_name]:
623
          bad = True
624
          feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
625

    
626
    if 'nodelist' not in node_result:
627
      bad = True
628
      feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
629
    else:
630
      if node_result['nodelist']:
631
        bad = True
632
        for node in node_result['nodelist']:
633
          feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
634
                          (node, node_result['nodelist'][node]))
635
    if 'node-net-test' not in node_result:
636
      bad = True
637
      feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
638
    else:
639
      if node_result['node-net-test']:
640
        bad = True
641
        nlist = utils.NiceSort(node_result['node-net-test'].keys())
642
        for node in nlist:
643
          feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
644
                          (node, node_result['node-net-test'][node]))
645

    
646
    hyp_result = node_result.get('hypervisor', None)
647
    if isinstance(hyp_result, dict):
648
      for hv_name, hv_result in hyp_result.iteritems():
649
        if hv_result is not None:
650
          feedback_fn("  - ERROR: hypervisor %s verify failure: '%s'" %
651
                      (hv_name, hv_result))
652
    return bad
653

    
654
  def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
655
                      node_instance, feedback_fn):
656
    """Verify an instance.
657

658
    This function checks to see if the required block devices are
659
    available on the instance's node.
660

661
    """
662
    bad = False
663

    
664
    node_current = instanceconfig.primary_node
665

    
666
    node_vol_should = {}
667
    instanceconfig.MapLVsByNode(node_vol_should)
668

    
669
    for node in node_vol_should:
670
      for volume in node_vol_should[node]:
671
        if node not in node_vol_is or volume not in node_vol_is[node]:
672
          feedback_fn("  - ERROR: volume %s missing on node %s" %
673
                          (volume, node))
674
          bad = True
675

    
676
    if not instanceconfig.status == 'down':
677
      if (node_current not in node_instance or
678
          not instance in node_instance[node_current]):
679
        feedback_fn("  - ERROR: instance %s not running on node %s" %
680
                        (instance, node_current))
681
        bad = True
682

    
683
    for node in node_instance:
684
      if (not node == node_current):
685
        if instance in node_instance[node]:
686
          feedback_fn("  - ERROR: instance %s should not run on node %s" %
687
                          (instance, node))
688
          bad = True
689

    
690
    return bad
691

    
692
  def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
693
    """Verify if there are any unknown volumes in the cluster.
694

695
    The .os, .swap and backup volumes are ignored. All other volumes are
696
    reported as unknown.
697

698
    """
699
    bad = False
700

    
701
    for node in node_vol_is:
702
      for volume in node_vol_is[node]:
703
        if node not in node_vol_should or volume not in node_vol_should[node]:
704
          feedback_fn("  - ERROR: volume %s on node %s should not exist" %
705
                      (volume, node))
706
          bad = True
707
    return bad
708

    
709
  def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
710
    """Verify the list of running instances.
711

712
    This checks what instances are running but unknown to the cluster.
713

714
    """
715
    bad = False
716
    for node in node_instance:
717
      for runninginstance in node_instance[node]:
718
        if runninginstance not in instancelist:
719
          feedback_fn("  - ERROR: instance %s on node %s should not exist" %
720
                          (runninginstance, node))
721
          bad = True
722
    return bad
723

    
724
  def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
725
    """Verify N+1 Memory Resilience.
726

727
    Check that if one single node dies we can still start all the instances it
728
    was primary for.
729

730
    """
731
    bad = False
732

    
733
    for node, nodeinfo in node_info.iteritems():
734
      # This code checks that every node which is now listed as secondary has
735
      # enough memory to host all instances it is supposed to should a single
736
      # other node in the cluster fail.
737
      # FIXME: not ready for failover to an arbitrary node
738
      # FIXME: does not support file-backed instances
739
      # WARNING: we currently take into account down instances as well as up
740
      # ones, considering that even if they're down someone might want to start
741
      # them even in the event of a node failure.
742
      for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
743
        needed_mem = 0
744
        for instance in instances:
745
          bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
746
          if bep[constants.BE_AUTO_BALANCE]:
747
            needed_mem += bep[constants.BE_MEMORY]
748
        if nodeinfo['mfree'] < needed_mem:
749
          feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
750
                      " failovers should node %s fail" % (node, prinode))
751
          bad = True
752
    return bad
753

    
754
  def CheckPrereq(self):
755
    """Check prerequisites.
756

757
    Transform the list of checks we're going to skip into a set and check that
758
    all its members are valid.
759

760
    """
761
    self.skip_set = frozenset(self.op.skip_checks)
762
    if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
763
      raise errors.OpPrereqError("Invalid checks to be skipped specified")
764

    
765
  def BuildHooksEnv(self):
766
    """Build hooks env.
767

768
    Cluster-Verify hooks just rone in the post phase and their failure makes
769
    the output be logged in the verify output and the verification to fail.
770

771
    """
772
    all_nodes = self.cfg.GetNodeList()
773
    # TODO: populate the environment with useful information for verify hooks
774
    env = {}
775
    return env, [], all_nodes
776

    
777
  def Exec(self, feedback_fn):
778
    """Verify integrity of cluster, performing various test on nodes.
779

780
    """
781
    bad = False
782
    feedback_fn("* Verifying global settings")
783
    for msg in self.cfg.VerifyConfig():
784
      feedback_fn("  - ERROR: %s" % msg)
785

    
786
    vg_name = self.cfg.GetVGName()
787
    hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
788
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
789
    nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
790
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
791
    i_non_redundant = [] # Non redundant instances
792
    i_non_a_balanced = [] # Non auto-balanced instances
793
    node_volume = {}
794
    node_instance = {}
795
    node_info = {}
796
    instance_cfg = {}
797

    
798
    # FIXME: verify OS list
799
    # do local checksums
800
    file_names = []
801
    file_names.append(constants.SSL_CERT_FILE)
802
    file_names.append(constants.CLUSTER_CONF_FILE)
803
    local_checksums = utils.FingerprintFiles(file_names)
804

    
805
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
806
    all_volumeinfo = self.rpc.call_volume_list(nodelist, vg_name)
807
    all_instanceinfo = self.rpc.call_instance_list(nodelist, hypervisors)
808
    all_vglist = self.rpc.call_vg_list(nodelist)
809
    node_verify_param = {
810
      'filelist': file_names,
811
      'nodelist': nodelist,
812
      'hypervisor': hypervisors,
813
      'node-net-test': [(node.name, node.primary_ip, node.secondary_ip)
814
                        for node in nodeinfo]
815
      }
816
    all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
817
                                           self.cfg.GetClusterName())
818
    all_rversion = self.rpc.call_version(nodelist)
819
    all_ninfo = self.rpc.call_node_info(nodelist, self.cfg.GetVGName(),
820
                                        self.cfg.GetHypervisorType())
821

    
822
    cluster = self.cfg.GetClusterInfo()
823
    for node in nodelist:
824
      feedback_fn("* Verifying node %s" % node)
825
      result = self._VerifyNode(node, file_names, local_checksums,
826
                                all_vglist[node], all_nvinfo[node],
827
                                all_rversion[node], feedback_fn)
828
      bad = bad or result
829

    
830
      # node_volume
831
      volumeinfo = all_volumeinfo[node]
832

    
833
      if isinstance(volumeinfo, basestring):
834
        feedback_fn("  - ERROR: LVM problem on node %s: %s" %
835
                    (node, volumeinfo[-400:].encode('string_escape')))
836
        bad = True
837
        node_volume[node] = {}
838
      elif not isinstance(volumeinfo, dict):
839
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
840
        bad = True
841
        continue
842
      else:
843
        node_volume[node] = volumeinfo
844

    
845
      # node_instance
846
      nodeinstance = all_instanceinfo[node]
847
      if type(nodeinstance) != list:
848
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
849
        bad = True
850
        continue
851

    
852
      node_instance[node] = nodeinstance
853

    
854
      # node_info
855
      nodeinfo = all_ninfo[node]
856
      if not isinstance(nodeinfo, dict):
857
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
858
        bad = True
859
        continue
860

    
861
      try:
862
        node_info[node] = {
863
          "mfree": int(nodeinfo['memory_free']),
864
          "dfree": int(nodeinfo['vg_free']),
865
          "pinst": [],
866
          "sinst": [],
867
          # dictionary holding all instances this node is secondary for,
868
          # grouped by their primary node. Each key is a cluster node, and each
869
          # value is a list of instances which have the key as primary and the
870
          # current node as secondary.  this is handy to calculate N+1 memory
871
          # availability if you can only failover from a primary to its
872
          # secondary.
873
          "sinst-by-pnode": {},
874
        }
875
      except ValueError:
876
        feedback_fn("  - ERROR: invalid value returned from node %s" % (node,))
877
        bad = True
878
        continue
879

    
880
    node_vol_should = {}
881

    
882
    for instance in instancelist:
883
      feedback_fn("* Verifying instance %s" % instance)
884
      inst_config = self.cfg.GetInstanceInfo(instance)
885
      result =  self._VerifyInstance(instance, inst_config, node_volume,
886
                                     node_instance, feedback_fn)
887
      bad = bad or result
888

    
889
      inst_config.MapLVsByNode(node_vol_should)
890

    
891
      instance_cfg[instance] = inst_config
892

    
893
      pnode = inst_config.primary_node
894
      if pnode in node_info:
895
        node_info[pnode]['pinst'].append(instance)
896
      else:
897
        feedback_fn("  - ERROR: instance %s, connection to primary node"
898
                    " %s failed" % (instance, pnode))
899
        bad = True
900

    
901
      # If the instance is non-redundant we cannot survive losing its primary
902
      # node, so we are not N+1 compliant. On the other hand we have no disk
903
      # templates with more than one secondary so that situation is not well
904
      # supported either.
905
      # FIXME: does not support file-backed instances
906
      if len(inst_config.secondary_nodes) == 0:
907
        i_non_redundant.append(instance)
908
      elif len(inst_config.secondary_nodes) > 1:
909
        feedback_fn("  - WARNING: multiple secondaries for instance %s"
910
                    % instance)
911

    
912
      if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
913
        i_non_a_balanced.append(instance)
914

    
915
      for snode in inst_config.secondary_nodes:
916
        if snode in node_info:
917
          node_info[snode]['sinst'].append(instance)
918
          if pnode not in node_info[snode]['sinst-by-pnode']:
919
            node_info[snode]['sinst-by-pnode'][pnode] = []
920
          node_info[snode]['sinst-by-pnode'][pnode].append(instance)
921
        else:
922
          feedback_fn("  - ERROR: instance %s, connection to secondary node"
923
                      " %s failed" % (instance, snode))
924

    
925
    feedback_fn("* Verifying orphan volumes")
926
    result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
927
                                       feedback_fn)
928
    bad = bad or result
929

    
930
    feedback_fn("* Verifying remaining instances")
931
    result = self._VerifyOrphanInstances(instancelist, node_instance,
932
                                         feedback_fn)
933
    bad = bad or result
934

    
935
    if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
936
      feedback_fn("* Verifying N+1 Memory redundancy")
937
      result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
938
      bad = bad or result
939

    
940
    feedback_fn("* Other Notes")
941
    if i_non_redundant:
942
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
943
                  % len(i_non_redundant))
944

    
945
    if i_non_a_balanced:
946
      feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
947
                  % len(i_non_a_balanced))
948

    
949
    return not bad
950

    
951
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
952
    """Analize the post-hooks' result
953

954
    This method analyses the hook result, handles it, and sends some
955
    nicely-formatted feedback back to the user.
956

957
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
958
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
959
    @param hooks_results: the results of the multi-node hooks rpc call
960
    @param feedback_fn: function used send feedback back to the caller
961
    @param lu_result: previous Exec result
962
    @return: the new Exec result, based on the previous result
963
        and hook results
964

965
    """
966
    # We only really run POST phase hooks, and are only interested in
967
    # their results
968
    if phase == constants.HOOKS_PHASE_POST:
969
      # Used to change hooks' output to proper indentation
970
      indent_re = re.compile('^', re.M)
971
      feedback_fn("* Hooks Results")
972
      if not hooks_results:
973
        feedback_fn("  - ERROR: general communication failure")
974
        lu_result = 1
975
      else:
976
        for node_name in hooks_results:
977
          show_node_header = True
978
          res = hooks_results[node_name]
979
          if res is False or not isinstance(res, list):
980
            feedback_fn("    Communication failure")
981
            lu_result = 1
982
            continue
983
          for script, hkr, output in res:
984
            if hkr == constants.HKR_FAIL:
985
              # The node header is only shown once, if there are
986
              # failing hooks on that node
987
              if show_node_header:
988
                feedback_fn("  Node %s:" % node_name)
989
                show_node_header = False
990
              feedback_fn("    ERROR: Script %s failed, output:" % script)
991
              output = indent_re.sub('      ', output)
992
              feedback_fn("%s" % output)
993
              lu_result = 1
994

    
995
      return lu_result
996

    
997

    
998
class LUVerifyDisks(NoHooksLU):
999
  """Verifies the cluster disks status.
1000

1001
  """
1002
  _OP_REQP = []
1003
  REQ_BGL = False
1004

    
1005
  def ExpandNames(self):
1006
    self.needed_locks = {
1007
      locking.LEVEL_NODE: locking.ALL_SET,
1008
      locking.LEVEL_INSTANCE: locking.ALL_SET,
1009
    }
1010
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1011

    
1012
  def CheckPrereq(self):
1013
    """Check prerequisites.
1014

1015
    This has no prerequisites.
1016

1017
    """
1018
    pass
1019

    
1020
  def Exec(self, feedback_fn):
1021
    """Verify integrity of cluster disks.
1022

1023
    """
1024
    result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
1025

    
1026
    vg_name = self.cfg.GetVGName()
1027
    nodes = utils.NiceSort(self.cfg.GetNodeList())
1028
    instances = [self.cfg.GetInstanceInfo(name)
1029
                 for name in self.cfg.GetInstanceList()]
1030

    
1031
    nv_dict = {}
1032
    for inst in instances:
1033
      inst_lvs = {}
1034
      if (inst.status != "up" or
1035
          inst.disk_template not in constants.DTS_NET_MIRROR):
1036
        continue
1037
      inst.MapLVsByNode(inst_lvs)
1038
      # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1039
      for node, vol_list in inst_lvs.iteritems():
1040
        for vol in vol_list:
1041
          nv_dict[(node, vol)] = inst
1042

    
1043
    if not nv_dict:
1044
      return result
1045

    
1046
    node_lvs = self.rpc.call_volume_list(nodes, vg_name)
1047

    
1048
    to_act = set()
1049
    for node in nodes:
1050
      # node_volume
1051
      lvs = node_lvs[node]
1052

    
1053
      if isinstance(lvs, basestring):
1054
        logging.warning("Error enumerating LVs on node %s: %s", node, lvs)
1055
        res_nlvm[node] = lvs
1056
      elif not isinstance(lvs, dict):
1057
        logging.warning("Connection to node %s failed or invalid data"
1058
                        " returned", node)
1059
        res_nodes.append(node)
1060
        continue
1061

    
1062
      for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
1063
        inst = nv_dict.pop((node, lv_name), None)
1064
        if (not lv_online and inst is not None
1065
            and inst.name not in res_instances):
1066
          res_instances.append(inst.name)
1067

    
1068
    # any leftover items in nv_dict are missing LVs, let's arrange the
1069
    # data better
1070
    for key, inst in nv_dict.iteritems():
1071
      if inst.name not in res_missing:
1072
        res_missing[inst.name] = []
1073
      res_missing[inst.name].append(key)
1074

    
1075
    return result
1076

    
1077

    
1078
class LURenameCluster(LogicalUnit):
1079
  """Rename the cluster.
1080

1081
  """
1082
  HPATH = "cluster-rename"
1083
  HTYPE = constants.HTYPE_CLUSTER
1084
  _OP_REQP = ["name"]
1085

    
1086
  def BuildHooksEnv(self):
1087
    """Build hooks env.
1088

1089
    """
1090
    env = {
1091
      "OP_TARGET": self.cfg.GetClusterName(),
1092
      "NEW_NAME": self.op.name,
1093
      }
1094
    mn = self.cfg.GetMasterNode()
1095
    return env, [mn], [mn]
1096

    
1097
  def CheckPrereq(self):
1098
    """Verify that the passed name is a valid one.
1099

1100
    """
1101
    hostname = utils.HostInfo(self.op.name)
1102

    
1103
    new_name = hostname.name
1104
    self.ip = new_ip = hostname.ip
1105
    old_name = self.cfg.GetClusterName()
1106
    old_ip = self.cfg.GetMasterIP()
1107
    if new_name == old_name and new_ip == old_ip:
1108
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
1109
                                 " cluster has changed")
1110
    if new_ip != old_ip:
1111
      if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1112
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
1113
                                   " reachable on the network. Aborting." %
1114
                                   new_ip)
1115

    
1116
    self.op.name = new_name
1117

    
1118
  def Exec(self, feedback_fn):
1119
    """Rename the cluster.
1120

1121
    """
1122
    clustername = self.op.name
1123
    ip = self.ip
1124

    
1125
    # shutdown the master IP
1126
    master = self.cfg.GetMasterNode()
1127
    if not self.rpc.call_node_stop_master(master, False):
1128
      raise errors.OpExecError("Could not disable the master role")
1129

    
1130
    try:
1131
      # modify the sstore
1132
      # TODO: sstore
1133
      ss.SetKey(ss.SS_MASTER_IP, ip)
1134
      ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
1135

    
1136
      # Distribute updated ss config to all nodes
1137
      myself = self.cfg.GetNodeInfo(master)
1138
      dist_nodes = self.cfg.GetNodeList()
1139
      if myself.name in dist_nodes:
1140
        dist_nodes.remove(myself.name)
1141

    
1142
      logging.debug("Copying updated ssconf data to all nodes")
1143
      for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1144
        fname = ss.KeyToFilename(keyname)
1145
        result = self.rpc.call_upload_file(dist_nodes, fname)
1146
        for to_node in dist_nodes:
1147
          if not result[to_node]:
1148
            self.LogWarning("Copy of file %s to node %s failed",
1149
                            fname, to_node)
1150
    finally:
1151
      if not self.rpc.call_node_start_master(master, False):
1152
        self.LogWarning("Could not re-enable the master role on"
1153
                        " the master, please restart manually.")
1154

    
1155

    
1156
def _RecursiveCheckIfLVMBased(disk):
1157
  """Check if the given disk or its children are lvm-based.
1158

1159
  @type disk: L{objects.Disk}
1160
  @param disk: the disk to check
1161
  @rtype: booleean
1162
  @return: boolean indicating whether a LD_LV dev_type was found or not
1163

1164
  """
1165
  if disk.children:
1166
    for chdisk in disk.children:
1167
      if _RecursiveCheckIfLVMBased(chdisk):
1168
        return True
1169
  return disk.dev_type == constants.LD_LV
1170

    
1171

    
1172
class LUSetClusterParams(LogicalUnit):
1173
  """Change the parameters of the cluster.
1174

1175
  """
1176
  HPATH = "cluster-modify"
1177
  HTYPE = constants.HTYPE_CLUSTER
1178
  _OP_REQP = []
1179
  REQ_BGL = False
1180

    
1181
  def ExpandNames(self):
1182
    # FIXME: in the future maybe other cluster params won't require checking on
1183
    # all nodes to be modified.
1184
    self.needed_locks = {
1185
      locking.LEVEL_NODE: locking.ALL_SET,
1186
    }
1187
    self.share_locks[locking.LEVEL_NODE] = 1
1188

    
1189
  def BuildHooksEnv(self):
1190
    """Build hooks env.
1191

1192
    """
1193
    env = {
1194
      "OP_TARGET": self.cfg.GetClusterName(),
1195
      "NEW_VG_NAME": self.op.vg_name,
1196
      }
1197
    mn = self.cfg.GetMasterNode()
1198
    return env, [mn], [mn]
1199

    
1200
  def CheckPrereq(self):
1201
    """Check prerequisites.
1202

1203
    This checks whether the given params don't conflict and
1204
    if the given volume group is valid.
1205

1206
    """
1207
    # FIXME: This only works because there is only one parameter that can be
1208
    # changed or removed.
1209
    if self.op.vg_name is not None and not self.op.vg_name:
1210
      instances = self.cfg.GetAllInstancesInfo().values()
1211
      for inst in instances:
1212
        for disk in inst.disks:
1213
          if _RecursiveCheckIfLVMBased(disk):
1214
            raise errors.OpPrereqError("Cannot disable lvm storage while"
1215
                                       " lvm-based instances exist")
1216

    
1217
    node_list = self.acquired_locks[locking.LEVEL_NODE]
1218

    
1219
    # if vg_name not None, checks given volume group on all nodes
1220
    if self.op.vg_name:
1221
      vglist = self.rpc.call_vg_list(node_list)
1222
      for node in node_list:
1223
        vgstatus = utils.CheckVolumeGroupSize(vglist[node], self.op.vg_name,
1224
                                              constants.MIN_VG_SIZE)
1225
        if vgstatus:
1226
          raise errors.OpPrereqError("Error on node '%s': %s" %
1227
                                     (node, vgstatus))
1228

    
1229
    self.cluster = cluster = self.cfg.GetClusterInfo()
1230
    # beparams changes do not need validation (we can't validate?),
1231
    # but we still process here
1232
    if self.op.beparams:
1233
      self.new_beparams = cluster.FillDict(
1234
        cluster.beparams[constants.BEGR_DEFAULT], self.op.beparams)
1235

    
1236
    # hypervisor list/parameters
1237
    self.new_hvparams = cluster.FillDict(cluster.hvparams, {})
1238
    if self.op.hvparams:
1239
      if not isinstance(self.op.hvparams, dict):
1240
        raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1241
      for hv_name, hv_dict in self.op.hvparams.items():
1242
        if hv_name not in self.new_hvparams:
1243
          self.new_hvparams[hv_name] = hv_dict
1244
        else:
1245
          self.new_hvparams[hv_name].update(hv_dict)
1246

    
1247
    if self.op.enabled_hypervisors is not None:
1248
      self.hv_list = self.op.enabled_hypervisors
1249
    else:
1250
      self.hv_list = cluster.enabled_hypervisors
1251

    
1252
    if self.op.hvparams or self.op.enabled_hypervisors is not None:
1253
      # either the enabled list has changed, or the parameters have, validate
1254
      for hv_name, hv_params in self.new_hvparams.items():
1255
        if ((self.op.hvparams and hv_name in self.op.hvparams) or
1256
            (self.op.enabled_hypervisors and
1257
             hv_name in self.op.enabled_hypervisors)):
1258
          # either this is a new hypervisor, or its parameters have changed
1259
          hv_class = hypervisor.GetHypervisor(hv_name)
1260
          hv_class.CheckParameterSyntax(hv_params)
1261
          _CheckHVParams(self, node_list, hv_name, hv_params)
1262

    
1263
  def Exec(self, feedback_fn):
1264
    """Change the parameters of the cluster.
1265

1266
    """
1267
    if self.op.vg_name is not None:
1268
      if self.op.vg_name != self.cfg.GetVGName():
1269
        self.cfg.SetVGName(self.op.vg_name)
1270
      else:
1271
        feedback_fn("Cluster LVM configuration already in desired"
1272
                    " state, not changing")
1273
    if self.op.hvparams:
1274
      self.cluster.hvparams = self.new_hvparams
1275
    if self.op.enabled_hypervisors is not None:
1276
      self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1277
    if self.op.beparams:
1278
      self.cluster.beparams[constants.BEGR_DEFAULT] = self.new_beparams
1279
    self.cfg.Update(self.cluster)
1280

    
1281

    
1282
def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1283
  """Sleep and poll for an instance's disk to sync.
1284

1285
  """
1286
  if not instance.disks:
1287
    return True
1288

    
1289
  if not oneshot:
1290
    lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1291

    
1292
  node = instance.primary_node
1293

    
1294
  for dev in instance.disks:
1295
    lu.cfg.SetDiskID(dev, node)
1296

    
1297
  retries = 0
1298
  while True:
1299
    max_time = 0
1300
    done = True
1301
    cumul_degraded = False
1302
    rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1303
    if not rstats:
1304
      lu.LogWarning("Can't get any data from node %s", node)
1305
      retries += 1
1306
      if retries >= 10:
1307
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1308
                                 " aborting." % node)
1309
      time.sleep(6)
1310
      continue
1311
    retries = 0
1312
    for i in range(len(rstats)):
1313
      mstat = rstats[i]
1314
      if mstat is None:
1315
        lu.LogWarning("Can't compute data for node %s/%s",
1316
                           node, instance.disks[i].iv_name)
1317
        continue
1318
      # we ignore the ldisk parameter
1319
      perc_done, est_time, is_degraded, _ = mstat
1320
      cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1321
      if perc_done is not None:
1322
        done = False
1323
        if est_time is not None:
1324
          rem_time = "%d estimated seconds remaining" % est_time
1325
          max_time = est_time
1326
        else:
1327
          rem_time = "no time estimate"
1328
        lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1329
                        (instance.disks[i].iv_name, perc_done, rem_time))
1330
    if done or oneshot:
1331
      break
1332

    
1333
    time.sleep(min(60, max_time))
1334

    
1335
  if done:
1336
    lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1337
  return not cumul_degraded
1338

    
1339

    
1340
def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1341
  """Check that mirrors are not degraded.
1342

1343
  The ldisk parameter, if True, will change the test from the
1344
  is_degraded attribute (which represents overall non-ok status for
1345
  the device(s)) to the ldisk (representing the local storage status).
1346

1347
  """
1348
  lu.cfg.SetDiskID(dev, node)
1349
  if ldisk:
1350
    idx = 6
1351
  else:
1352
    idx = 5
1353

    
1354
  result = True
1355
  if on_primary or dev.AssembleOnSecondary():
1356
    rstats = lu.rpc.call_blockdev_find(node, dev)
1357
    if not rstats:
1358
      logging.warning("Node %s: disk degraded, not found or node down", node)
1359
      result = False
1360
    else:
1361
      result = result and (not rstats[idx])
1362
  if dev.children:
1363
    for child in dev.children:
1364
      result = result and _CheckDiskConsistency(lu, child, node, on_primary)
1365

    
1366
  return result
1367

    
1368

    
1369
class LUDiagnoseOS(NoHooksLU):
1370
  """Logical unit for OS diagnose/query.
1371

1372
  """
1373
  _OP_REQP = ["output_fields", "names"]
1374
  REQ_BGL = False
1375
  _FIELDS_STATIC = utils.FieldSet()
1376
  _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status")
1377

    
1378
  def ExpandNames(self):
1379
    if self.op.names:
1380
      raise errors.OpPrereqError("Selective OS query not supported")
1381

    
1382
    _CheckOutputFields(static=self._FIELDS_STATIC,
1383
                       dynamic=self._FIELDS_DYNAMIC,
1384
                       selected=self.op.output_fields)
1385

    
1386
    # Lock all nodes, in shared mode
1387
    self.needed_locks = {}
1388
    self.share_locks[locking.LEVEL_NODE] = 1
1389
    self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1390

    
1391
  def CheckPrereq(self):
1392
    """Check prerequisites.
1393

1394
    """
1395

    
1396
  @staticmethod
1397
  def _DiagnoseByOS(node_list, rlist):
1398
    """Remaps a per-node return list into an a per-os per-node dictionary
1399

1400
    @param node_list: a list with the names of all nodes
1401
    @param rlist: a map with node names as keys and OS objects as values
1402

1403
    @rtype: dict
1404
    @returns: a dictionary with osnames as keys and as value another map, with
1405
        nodes as keys and list of OS objects as values, eg::
1406

1407
          {"debian-etch": {"node1": [<object>,...],
1408
                           "node2": [<object>,]}
1409
          }
1410

1411
    """
1412
    all_os = {}
1413
    for node_name, nr in rlist.iteritems():
1414
      if not nr:
1415
        continue
1416
      for os_obj in nr:
1417
        if os_obj.name not in all_os:
1418
          # build a list of nodes for this os containing empty lists
1419
          # for each node in node_list
1420
          all_os[os_obj.name] = {}
1421
          for nname in node_list:
1422
            all_os[os_obj.name][nname] = []
1423
        all_os[os_obj.name][node_name].append(os_obj)
1424
    return all_os
1425

    
1426
  def Exec(self, feedback_fn):
1427
    """Compute the list of OSes.
1428

1429
    """
1430
    node_list = self.acquired_locks[locking.LEVEL_NODE]
1431
    node_data = self.rpc.call_os_diagnose(node_list)
1432
    if node_data == False:
1433
      raise errors.OpExecError("Can't gather the list of OSes")
1434
    pol = self._DiagnoseByOS(node_list, node_data)
1435
    output = []
1436
    for os_name, os_data in pol.iteritems():
1437
      row = []
1438
      for field in self.op.output_fields:
1439
        if field == "name":
1440
          val = os_name
1441
        elif field == "valid":
1442
          val = utils.all([osl and osl[0] for osl in os_data.values()])
1443
        elif field == "node_status":
1444
          val = {}
1445
          for node_name, nos_list in os_data.iteritems():
1446
            val[node_name] = [(v.status, v.path) for v in nos_list]
1447
        else:
1448
          raise errors.ParameterError(field)
1449
        row.append(val)
1450
      output.append(row)
1451

    
1452
    return output
1453

    
1454

    
1455
class LURemoveNode(LogicalUnit):
1456
  """Logical unit for removing a node.
1457

1458
  """
1459
  HPATH = "node-remove"
1460
  HTYPE = constants.HTYPE_NODE
1461
  _OP_REQP = ["node_name"]
1462

    
1463
  def BuildHooksEnv(self):
1464
    """Build hooks env.
1465

1466
    This doesn't run on the target node in the pre phase as a failed
1467
    node would then be impossible to remove.
1468

1469
    """
1470
    env = {
1471
      "OP_TARGET": self.op.node_name,
1472
      "NODE_NAME": self.op.node_name,
1473
      }
1474
    all_nodes = self.cfg.GetNodeList()
1475
    all_nodes.remove(self.op.node_name)
1476
    return env, all_nodes, all_nodes
1477

    
1478
  def CheckPrereq(self):
1479
    """Check prerequisites.
1480

1481
    This checks:
1482
     - the node exists in the configuration
1483
     - it does not have primary or secondary instances
1484
     - it's not the master
1485

1486
    Any errors are signalled by raising errors.OpPrereqError.
1487

1488
    """
1489
    node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1490
    if node is None:
1491
      raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1492

    
1493
    instance_list = self.cfg.GetInstanceList()
1494

    
1495
    masternode = self.cfg.GetMasterNode()
1496
    if node.name == masternode:
1497
      raise errors.OpPrereqError("Node is the master node,"
1498
                                 " you need to failover first.")
1499

    
1500
    for instance_name in instance_list:
1501
      instance = self.cfg.GetInstanceInfo(instance_name)
1502
      if node.name == instance.primary_node:
1503
        raise errors.OpPrereqError("Instance %s still running on the node,"
1504
                                   " please remove first." % instance_name)
1505
      if node.name in instance.secondary_nodes:
1506
        raise errors.OpPrereqError("Instance %s has node as a secondary,"
1507
                                   " please remove first." % instance_name)
1508
    self.op.node_name = node.name
1509
    self.node = node
1510

    
1511
  def Exec(self, feedback_fn):
1512
    """Removes the node from the cluster.
1513

1514
    """
1515
    node = self.node
1516
    logging.info("Stopping the node daemon and removing configs from node %s",
1517
                 node.name)
1518

    
1519
    self.context.RemoveNode(node.name)
1520

    
1521
    self.rpc.call_node_leave_cluster(node.name)
1522

    
1523

    
1524
class LUQueryNodes(NoHooksLU):
1525
  """Logical unit for querying nodes.
1526

1527
  """
1528
  _OP_REQP = ["output_fields", "names"]
1529
  REQ_BGL = False
1530
  _FIELDS_DYNAMIC = utils.FieldSet(
1531
    "dtotal", "dfree",
1532
    "mtotal", "mnode", "mfree",
1533
    "bootid",
1534
    "ctotal",
1535
    )
1536

    
1537
  _FIELDS_STATIC = utils.FieldSet(
1538
    "name", "pinst_cnt", "sinst_cnt",
1539
    "pinst_list", "sinst_list",
1540
    "pip", "sip", "tags",
1541
    "serial_no",
1542
    )
1543

    
1544
  def ExpandNames(self):
1545
    _CheckOutputFields(static=self._FIELDS_STATIC,
1546
                       dynamic=self._FIELDS_DYNAMIC,
1547
                       selected=self.op.output_fields)
1548

    
1549
    self.needed_locks = {}
1550
    self.share_locks[locking.LEVEL_NODE] = 1
1551

    
1552
    if self.op.names:
1553
      self.wanted = _GetWantedNodes(self, self.op.names)
1554
    else:
1555
      self.wanted = locking.ALL_SET
1556

    
1557
    self.do_locking = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
1558
    if self.do_locking:
1559
      # if we don't request only static fields, we need to lock the nodes
1560
      self.needed_locks[locking.LEVEL_NODE] = self.wanted
1561

    
1562

    
1563
  def CheckPrereq(self):
1564
    """Check prerequisites.
1565

1566
    """
1567
    # The validation of the node list is done in the _GetWantedNodes,
1568
    # if non empty, and if empty, there's no validation to do
1569
    pass
1570

    
1571
  def Exec(self, feedback_fn):
1572
    """Computes the list of nodes and their attributes.
1573

1574
    """
1575
    all_info = self.cfg.GetAllNodesInfo()
1576
    if self.do_locking:
1577
      nodenames = self.acquired_locks[locking.LEVEL_NODE]
1578
    elif self.wanted != locking.ALL_SET:
1579
      nodenames = self.wanted
1580
      missing = set(nodenames).difference(all_info.keys())
1581
      if missing:
1582
        raise errors.OpExecError(
1583
          "Some nodes were removed before retrieving their data: %s" % missing)
1584
    else:
1585
      nodenames = all_info.keys()
1586

    
1587
    nodenames = utils.NiceSort(nodenames)
1588
    nodelist = [all_info[name] for name in nodenames]
1589

    
1590
    # begin data gathering
1591

    
1592
    if self.do_locking:
1593
      live_data = {}
1594
      node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
1595
                                          self.cfg.GetHypervisorType())
1596
      for name in nodenames:
1597
        nodeinfo = node_data.get(name, None)
1598
        if nodeinfo:
1599
          live_data[name] = {
1600
            "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1601
            "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1602
            "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1603
            "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1604
            "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1605
            "ctotal": utils.TryConvert(int, nodeinfo['cpu_total']),
1606
            "bootid": nodeinfo['bootid'],
1607
            }
1608
        else:
1609
          live_data[name] = {}
1610
    else:
1611
      live_data = dict.fromkeys(nodenames, {})
1612

    
1613
    node_to_primary = dict([(name, set()) for name in nodenames])
1614
    node_to_secondary = dict([(name, set()) for name in nodenames])
1615

    
1616
    inst_fields = frozenset(("pinst_cnt", "pinst_list",
1617
                             "sinst_cnt", "sinst_list"))
1618
    if inst_fields & frozenset(self.op.output_fields):
1619
      instancelist = self.cfg.GetInstanceList()
1620

    
1621
      for instance_name in instancelist:
1622
        inst = self.cfg.GetInstanceInfo(instance_name)
1623
        if inst.primary_node in node_to_primary:
1624
          node_to_primary[inst.primary_node].add(inst.name)
1625
        for secnode in inst.secondary_nodes:
1626
          if secnode in node_to_secondary:
1627
            node_to_secondary[secnode].add(inst.name)
1628

    
1629
    # end data gathering
1630

    
1631
    output = []
1632
    for node in nodelist:
1633
      node_output = []
1634
      for field in self.op.output_fields:
1635
        if field == "name":
1636
          val = node.name
1637
        elif field == "pinst_list":
1638
          val = list(node_to_primary[node.name])
1639
        elif field == "sinst_list":
1640
          val = list(node_to_secondary[node.name])
1641
        elif field == "pinst_cnt":
1642
          val = len(node_to_primary[node.name])
1643
        elif field == "sinst_cnt":
1644
          val = len(node_to_secondary[node.name])
1645
        elif field == "pip":
1646
          val = node.primary_ip
1647
        elif field == "sip":
1648
          val = node.secondary_ip
1649
        elif field == "tags":
1650
          val = list(node.GetTags())
1651
        elif field == "serial_no":
1652
          val = node.serial_no
1653
        elif self._FIELDS_DYNAMIC.Matches(field):
1654
          val = live_data[node.name].get(field, None)
1655
        else:
1656
          raise errors.ParameterError(field)
1657
        node_output.append(val)
1658
      output.append(node_output)
1659

    
1660
    return output
1661

    
1662

    
1663
class LUQueryNodeVolumes(NoHooksLU):
1664
  """Logical unit for getting volumes on node(s).
1665

1666
  """
1667
  _OP_REQP = ["nodes", "output_fields"]
1668
  REQ_BGL = False
1669
  _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
1670
  _FIELDS_STATIC = utils.FieldSet("node")
1671

    
1672
  def ExpandNames(self):
1673
    _CheckOutputFields(static=self._FIELDS_STATIC,
1674
                       dynamic=self._FIELDS_DYNAMIC,
1675
                       selected=self.op.output_fields)
1676

    
1677
    self.needed_locks = {}
1678
    self.share_locks[locking.LEVEL_NODE] = 1
1679
    if not self.op.nodes:
1680
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1681
    else:
1682
      self.needed_locks[locking.LEVEL_NODE] = \
1683
        _GetWantedNodes(self, self.op.nodes)
1684

    
1685
  def CheckPrereq(self):
1686
    """Check prerequisites.
1687

1688
    This checks that the fields required are valid output fields.
1689

1690
    """
1691
    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
1692

    
1693
  def Exec(self, feedback_fn):
1694
    """Computes the list of nodes and their attributes.
1695

1696
    """
1697
    nodenames = self.nodes
1698
    volumes = self.rpc.call_node_volumes(nodenames)
1699

    
1700
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
1701
             in self.cfg.GetInstanceList()]
1702

    
1703
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1704

    
1705
    output = []
1706
    for node in nodenames:
1707
      if node not in volumes or not volumes[node]:
1708
        continue
1709

    
1710
      node_vols = volumes[node][:]
1711
      node_vols.sort(key=lambda vol: vol['dev'])
1712

    
1713
      for vol in node_vols:
1714
        node_output = []
1715
        for field in self.op.output_fields:
1716
          if field == "node":
1717
            val = node
1718
          elif field == "phys":
1719
            val = vol['dev']
1720
          elif field == "vg":
1721
            val = vol['vg']
1722
          elif field == "name":
1723
            val = vol['name']
1724
          elif field == "size":
1725
            val = int(float(vol['size']))
1726
          elif field == "instance":
1727
            for inst in ilist:
1728
              if node not in lv_by_node[inst]:
1729
                continue
1730
              if vol['name'] in lv_by_node[inst][node]:
1731
                val = inst.name
1732
                break
1733
            else:
1734
              val = '-'
1735
          else:
1736
            raise errors.ParameterError(field)
1737
          node_output.append(str(val))
1738

    
1739
        output.append(node_output)
1740

    
1741
    return output
1742

    
1743

    
1744
class LUAddNode(LogicalUnit):
1745
  """Logical unit for adding node to the cluster.
1746

1747
  """
1748
  HPATH = "node-add"
1749
  HTYPE = constants.HTYPE_NODE
1750
  _OP_REQP = ["node_name"]
1751

    
1752
  def BuildHooksEnv(self):
1753
    """Build hooks env.
1754

1755
    This will run on all nodes before, and on all nodes + the new node after.
1756

1757
    """
1758
    env = {
1759
      "OP_TARGET": self.op.node_name,
1760
      "NODE_NAME": self.op.node_name,
1761
      "NODE_PIP": self.op.primary_ip,
1762
      "NODE_SIP": self.op.secondary_ip,
1763
      }
1764
    nodes_0 = self.cfg.GetNodeList()
1765
    nodes_1 = nodes_0 + [self.op.node_name, ]
1766
    return env, nodes_0, nodes_1
1767

    
1768
  def CheckPrereq(self):
1769
    """Check prerequisites.
1770

1771
    This checks:
1772
     - the new node is not already in the config
1773
     - it is resolvable
1774
     - its parameters (single/dual homed) matches the cluster
1775

1776
    Any errors are signalled by raising errors.OpPrereqError.
1777

1778
    """
1779
    node_name = self.op.node_name
1780
    cfg = self.cfg
1781

    
1782
    dns_data = utils.HostInfo(node_name)
1783

    
1784
    node = dns_data.name
1785
    primary_ip = self.op.primary_ip = dns_data.ip
1786
    secondary_ip = getattr(self.op, "secondary_ip", None)
1787
    if secondary_ip is None:
1788
      secondary_ip = primary_ip
1789
    if not utils.IsValidIP(secondary_ip):
1790
      raise errors.OpPrereqError("Invalid secondary IP given")
1791
    self.op.secondary_ip = secondary_ip
1792

    
1793
    node_list = cfg.GetNodeList()
1794
    if not self.op.readd and node in node_list:
1795
      raise errors.OpPrereqError("Node %s is already in the configuration" %
1796
                                 node)
1797
    elif self.op.readd and node not in node_list:
1798
      raise errors.OpPrereqError("Node %s is not in the configuration" % node)
1799

    
1800
    for existing_node_name in node_list:
1801
      existing_node = cfg.GetNodeInfo(existing_node_name)
1802

    
1803
      if self.op.readd and node == existing_node_name:
1804
        if (existing_node.primary_ip != primary_ip or
1805
            existing_node.secondary_ip != secondary_ip):
1806
          raise errors.OpPrereqError("Readded node doesn't have the same IP"
1807
                                     " address configuration as before")
1808
        continue
1809

    
1810
      if (existing_node.primary_ip == primary_ip or
1811
          existing_node.secondary_ip == primary_ip or
1812
          existing_node.primary_ip == secondary_ip or
1813
          existing_node.secondary_ip == secondary_ip):
1814
        raise errors.OpPrereqError("New node ip address(es) conflict with"
1815
                                   " existing node %s" % existing_node.name)
1816

    
1817
    # check that the type of the node (single versus dual homed) is the
1818
    # same as for the master
1819
    myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
1820
    master_singlehomed = myself.secondary_ip == myself.primary_ip
1821
    newbie_singlehomed = secondary_ip == primary_ip
1822
    if master_singlehomed != newbie_singlehomed:
1823
      if master_singlehomed:
1824
        raise errors.OpPrereqError("The master has no private ip but the"
1825
                                   " new node has one")
1826
      else:
1827
        raise errors.OpPrereqError("The master has a private ip but the"
1828
                                   " new node doesn't have one")
1829

    
1830
    # checks reachablity
1831
    if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1832
      raise errors.OpPrereqError("Node not reachable by ping")
1833

    
1834
    if not newbie_singlehomed:
1835
      # check reachability from my secondary ip to newbie's secondary ip
1836
      if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1837
                           source=myself.secondary_ip):
1838
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1839
                                   " based ping to noded port")
1840

    
1841
    self.new_node = objects.Node(name=node,
1842
                                 primary_ip=primary_ip,
1843
                                 secondary_ip=secondary_ip)
1844

    
1845
  def Exec(self, feedback_fn):
1846
    """Adds the new node to the cluster.
1847

1848
    """
1849
    new_node = self.new_node
1850
    node = new_node.name
1851

    
1852
    # check connectivity
1853
    result = self.rpc.call_version([node])[node]
1854
    if result:
1855
      if constants.PROTOCOL_VERSION == result:
1856
        logging.info("Communication to node %s fine, sw version %s match",
1857
                     node, result)
1858
      else:
1859
        raise errors.OpExecError("Version mismatch master version %s,"
1860
                                 " node version %s" %
1861
                                 (constants.PROTOCOL_VERSION, result))
1862
    else:
1863
      raise errors.OpExecError("Cannot get version from the new node")
1864

    
1865
    # setup ssh on node
1866
    logging.info("Copy ssh key to node %s", node)
1867
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1868
    keyarray = []
1869
    keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1870
                constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1871
                priv_key, pub_key]
1872

    
1873
    for i in keyfiles:
1874
      f = open(i, 'r')
1875
      try:
1876
        keyarray.append(f.read())
1877
      finally:
1878
        f.close()
1879

    
1880
    result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
1881
                                    keyarray[2],
1882
                                    keyarray[3], keyarray[4], keyarray[5])
1883

    
1884
    if not result:
1885
      raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1886

    
1887
    # Add node to our /etc/hosts, and add key to known_hosts
1888
    utils.AddHostToEtcHosts(new_node.name)
1889

    
1890
    if new_node.secondary_ip != new_node.primary_ip:
1891
      if not self.rpc.call_node_has_ip_address(new_node.name,
1892
                                               new_node.secondary_ip):
1893
        raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1894
                                 " you gave (%s). Please fix and re-run this"
1895
                                 " command." % new_node.secondary_ip)
1896

    
1897
    node_verify_list = [self.cfg.GetMasterNode()]
1898
    node_verify_param = {
1899
      'nodelist': [node],
1900
      # TODO: do a node-net-test as well?
1901
    }
1902

    
1903
    result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
1904
                                       self.cfg.GetClusterName())
1905
    for verifier in node_verify_list:
1906
      if not result[verifier]:
1907
        raise errors.OpExecError("Cannot communicate with %s's node daemon"
1908
                                 " for remote verification" % verifier)
1909
      if result[verifier]['nodelist']:
1910
        for failed in result[verifier]['nodelist']:
1911
          feedback_fn("ssh/hostname verification failed %s -> %s" %
1912
                      (verifier, result[verifier]['nodelist'][failed]))
1913
        raise errors.OpExecError("ssh/hostname verification failed.")
1914

    
1915
    # Distribute updated /etc/hosts and known_hosts to all nodes,
1916
    # including the node just added
1917
    myself = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
1918
    dist_nodes = self.cfg.GetNodeList()
1919
    if not self.op.readd:
1920
      dist_nodes.append(node)
1921
    if myself.name in dist_nodes:
1922
      dist_nodes.remove(myself.name)
1923

    
1924
    logging.debug("Copying hosts and known_hosts to all nodes")
1925
    for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1926
      result = self.rpc.call_upload_file(dist_nodes, fname)
1927
      for to_node in dist_nodes:
1928
        if not result[to_node]:
1929
          logging.error("Copy of file %s to node %s failed", fname, to_node)
1930

    
1931
    to_copy = []
1932
    if constants.HT_XEN_HVM in self.cfg.GetClusterInfo().enabled_hypervisors:
1933
      to_copy.append(constants.VNC_PASSWORD_FILE)
1934
    for fname in to_copy:
1935
      result = self.rpc.call_upload_file([node], fname)
1936
      if not result[node]:
1937
        logging.error("Could not copy file %s to node %s", fname, node)
1938

    
1939
    if self.op.readd:
1940
      self.context.ReaddNode(new_node)
1941
    else:
1942
      self.context.AddNode(new_node)
1943

    
1944

    
1945
class LUQueryClusterInfo(NoHooksLU):
1946
  """Query cluster configuration.
1947

1948
  """
1949
  _OP_REQP = []
1950
  REQ_MASTER = False
1951
  REQ_BGL = False
1952

    
1953
  def ExpandNames(self):
1954
    self.needed_locks = {}
1955

    
1956
  def CheckPrereq(self):
1957
    """No prerequsites needed for this LU.
1958

1959
    """
1960
    pass
1961

    
1962
  def Exec(self, feedback_fn):
1963
    """Return cluster config.
1964

1965
    """
1966
    cluster = self.cfg.GetClusterInfo()
1967
    result = {
1968
      "software_version": constants.RELEASE_VERSION,
1969
      "protocol_version": constants.PROTOCOL_VERSION,
1970
      "config_version": constants.CONFIG_VERSION,
1971
      "os_api_version": constants.OS_API_VERSION,
1972
      "export_version": constants.EXPORT_VERSION,
1973
      "architecture": (platform.architecture()[0], platform.machine()),
1974
      "name": cluster.cluster_name,
1975
      "master": cluster.master_node,
1976
      "default_hypervisor": cluster.default_hypervisor,
1977
      "enabled_hypervisors": cluster.enabled_hypervisors,
1978
      "hvparams": cluster.hvparams,
1979
      "beparams": cluster.beparams,
1980
      }
1981

    
1982
    return result
1983

    
1984

    
1985
class LUQueryConfigValues(NoHooksLU):
1986
  """Return configuration values.
1987

1988
  """
1989
  _OP_REQP = []
1990
  REQ_BGL = False
1991
  _FIELDS_DYNAMIC = utils.FieldSet()
1992
  _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag")
1993

    
1994
  def ExpandNames(self):
1995
    self.needed_locks = {}
1996

    
1997
    _CheckOutputFields(static=self._FIELDS_STATIC,
1998
                       dynamic=self._FIELDS_DYNAMIC,
1999
                       selected=self.op.output_fields)
2000

    
2001
  def CheckPrereq(self):
2002
    """No prerequisites.
2003

2004
    """
2005
    pass
2006

    
2007
  def Exec(self, feedback_fn):
2008
    """Dump a representation of the cluster config to the standard output.
2009

2010
    """
2011
    values = []
2012
    for field in self.op.output_fields:
2013
      if field == "cluster_name":
2014
        entry = self.cfg.GetClusterName()
2015
      elif field == "master_node":
2016
        entry = self.cfg.GetMasterNode()
2017
      elif field == "drain_flag":
2018
        entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
2019
      else:
2020
        raise errors.ParameterError(field)
2021
      values.append(entry)
2022
    return values
2023

    
2024

    
2025
class LUActivateInstanceDisks(NoHooksLU):
2026
  """Bring up an instance's disks.
2027

2028
  """
2029
  _OP_REQP = ["instance_name"]
2030
  REQ_BGL = False
2031

    
2032
  def ExpandNames(self):
2033
    self._ExpandAndLockInstance()
2034
    self.needed_locks[locking.LEVEL_NODE] = []
2035
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2036

    
2037
  def DeclareLocks(self, level):
2038
    if level == locking.LEVEL_NODE:
2039
      self._LockInstancesNodes()
2040

    
2041
  def CheckPrereq(self):
2042
    """Check prerequisites.
2043

2044
    This checks that the instance is in the cluster.
2045

2046
    """
2047
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2048
    assert self.instance is not None, \
2049
      "Cannot retrieve locked instance %s" % self.op.instance_name
2050

    
2051
  def Exec(self, feedback_fn):
2052
    """Activate the disks.
2053

2054
    """
2055
    disks_ok, disks_info = _AssembleInstanceDisks(self, self.instance)
2056
    if not disks_ok:
2057
      raise errors.OpExecError("Cannot activate block devices")
2058

    
2059
    return disks_info
2060

    
2061

    
2062
def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
2063
  """Prepare the block devices for an instance.
2064

2065
  This sets up the block devices on all nodes.
2066

2067
  @type lu: L{LogicalUnit}
2068
  @param lu: the logical unit on whose behalf we execute
2069
  @type instance: L{objects.Instance}
2070
  @param instance: the instance for whose disks we assemble
2071
  @type ignore_secondaries: boolean
2072
  @param ignore_secondaries: if true, errors on secondary nodes
2073
      won't result in an error return from the function
2074
  @return: False if the operation failed, otherwise a list of
2075
      (host, instance_visible_name, node_visible_name)
2076
      with the mapping from node devices to instance devices
2077

2078
  """
2079
  device_info = []
2080
  disks_ok = True
2081
  iname = instance.name
2082
  # With the two passes mechanism we try to reduce the window of
2083
  # opportunity for the race condition of switching DRBD to primary
2084
  # before handshaking occured, but we do not eliminate it
2085

    
2086
  # The proper fix would be to wait (with some limits) until the
2087
  # connection has been made and drbd transitions from WFConnection
2088
  # into any other network-connected state (Connected, SyncTarget,
2089
  # SyncSource, etc.)
2090

    
2091
  # 1st pass, assemble on all nodes in secondary mode
2092
  for inst_disk in instance.disks:
2093
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2094
      lu.cfg.SetDiskID(node_disk, node)
2095
      result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
2096
      if not result:
2097
        lu.proc.LogWarning("Could not prepare block device %s on node %s"
2098
                           " (is_primary=False, pass=1)",
2099
                           inst_disk.iv_name, node)
2100
        if not ignore_secondaries:
2101
          disks_ok = False
2102

    
2103
  # FIXME: race condition on drbd migration to primary
2104

    
2105
  # 2nd pass, do only the primary node
2106
  for inst_disk in instance.disks:
2107
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2108
      if node != instance.primary_node:
2109
        continue
2110
      lu.cfg.SetDiskID(node_disk, node)
2111
      result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
2112
      if not result:
2113
        lu.proc.LogWarning("Could not prepare block device %s on node %s"
2114
                           " (is_primary=True, pass=2)",
2115
                           inst_disk.iv_name, node)
2116
        disks_ok = False
2117
    device_info.append((instance.primary_node, inst_disk.iv_name, result))
2118

    
2119
  # leave the disks configured for the primary node
2120
  # this is a workaround that would be fixed better by
2121
  # improving the logical/physical id handling
2122
  for disk in instance.disks:
2123
    lu.cfg.SetDiskID(disk, instance.primary_node)
2124

    
2125
  return disks_ok, device_info
2126

    
2127

    
2128
def _StartInstanceDisks(lu, instance, force):
2129
  """Start the disks of an instance.
2130

2131
  """
2132
  disks_ok, dummy = _AssembleInstanceDisks(lu, instance,
2133
                                           ignore_secondaries=force)
2134
  if not disks_ok:
2135
    _ShutdownInstanceDisks(lu, instance)
2136
    if force is not None and not force:
2137
      lu.proc.LogWarning("", hint="If the message above refers to a"
2138
                         " secondary node,"
2139
                         " you can retry the operation using '--force'.")
2140
    raise errors.OpExecError("Disk consistency error")
2141

    
2142

    
2143
class LUDeactivateInstanceDisks(NoHooksLU):
2144
  """Shutdown an instance's disks.
2145

2146
  """
2147
  _OP_REQP = ["instance_name"]
2148
  REQ_BGL = False
2149

    
2150
  def ExpandNames(self):
2151
    self._ExpandAndLockInstance()
2152
    self.needed_locks[locking.LEVEL_NODE] = []
2153
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2154

    
2155
  def DeclareLocks(self, level):
2156
    if level == locking.LEVEL_NODE:
2157
      self._LockInstancesNodes()
2158

    
2159
  def CheckPrereq(self):
2160
    """Check prerequisites.
2161

2162
    This checks that the instance is in the cluster.
2163

2164
    """
2165
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2166
    assert self.instance is not None, \
2167
      "Cannot retrieve locked instance %s" % self.op.instance_name
2168

    
2169
  def Exec(self, feedback_fn):
2170
    """Deactivate the disks
2171

2172
    """
2173
    instance = self.instance
2174
    _SafeShutdownInstanceDisks(self, instance)
2175

    
2176

    
2177
def _SafeShutdownInstanceDisks(lu, instance):
2178
  """Shutdown block devices of an instance.
2179

2180
  This function checks if an instance is running, before calling
2181
  _ShutdownInstanceDisks.
2182

2183
  """
2184
  ins_l = lu.rpc.call_instance_list([instance.primary_node],
2185
                                      [instance.hypervisor])
2186
  ins_l = ins_l[instance.primary_node]
2187
  if not type(ins_l) is list:
2188
    raise errors.OpExecError("Can't contact node '%s'" %
2189
                             instance.primary_node)
2190

    
2191
  if instance.name in ins_l:
2192
    raise errors.OpExecError("Instance is running, can't shutdown"
2193
                             " block devices.")
2194

    
2195
  _ShutdownInstanceDisks(lu, instance)
2196

    
2197

    
2198
def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
2199
  """Shutdown block devices of an instance.
2200

2201
  This does the shutdown on all nodes of the instance.
2202

2203
  If the ignore_primary is false, errors on the primary node are
2204
  ignored.
2205

2206
  """
2207
  result = True
2208
  for disk in instance.disks:
2209
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2210
      lu.cfg.SetDiskID(top_disk, node)
2211
      if not lu.rpc.call_blockdev_shutdown(node, top_disk):
2212
        logging.error("Could not shutdown block device %s on node %s",
2213
                      disk.iv_name, node)
2214
        if not ignore_primary or node != instance.primary_node:
2215
          result = False
2216
  return result
2217

    
2218

    
2219
def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor):
2220
  """Checks if a node has enough free memory.
2221

2222
  This function check if a given node has the needed amount of free
2223
  memory. In case the node has less memory or we cannot get the
2224
  information from the node, this function raise an OpPrereqError
2225
  exception.
2226

2227
  @type lu: C{LogicalUnit}
2228
  @param lu: a logical unit from which we get configuration data
2229
  @type node: C{str}
2230
  @param node: the node to check
2231
  @type reason: C{str}
2232
  @param reason: string to use in the error message
2233
  @type requested: C{int}
2234
  @param requested: the amount of memory in MiB to check for
2235
  @type hypervisor: C{str}
2236
  @param hypervisor: the hypervisor to ask for memory stats
2237
  @raise errors.OpPrereqError: if the node doesn't have enough memory, or
2238
      we cannot check the node
2239

2240
  """
2241
  nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor)
2242
  if not nodeinfo or not isinstance(nodeinfo, dict):
2243
    raise errors.OpPrereqError("Could not contact node %s for resource"
2244
                             " information" % (node,))
2245

    
2246
  free_mem = nodeinfo[node].get('memory_free')
2247
  if not isinstance(free_mem, int):
2248
    raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2249
                             " was '%s'" % (node, free_mem))
2250
  if requested > free_mem:
2251
    raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2252
                             " needed %s MiB, available %s MiB" %
2253
                             (node, reason, requested, free_mem))
2254

    
2255

    
2256
class LUStartupInstance(LogicalUnit):
2257
  """Starts an instance.
2258

2259
  """
2260
  HPATH = "instance-start"
2261
  HTYPE = constants.HTYPE_INSTANCE
2262
  _OP_REQP = ["instance_name", "force"]
2263
  REQ_BGL = False
2264

    
2265
  def ExpandNames(self):
2266
    self._ExpandAndLockInstance()
2267

    
2268
  def BuildHooksEnv(self):
2269
    """Build hooks env.
2270

2271
    This runs on master, primary and secondary nodes of the instance.
2272

2273
    """
2274
    env = {
2275
      "FORCE": self.op.force,
2276
      }
2277
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2278
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2279
          list(self.instance.secondary_nodes))
2280
    return env, nl, nl
2281

    
2282
  def CheckPrereq(self):
2283
    """Check prerequisites.
2284

2285
    This checks that the instance is in the cluster.
2286

2287
    """
2288
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2289
    assert self.instance is not None, \
2290
      "Cannot retrieve locked instance %s" % self.op.instance_name
2291

    
2292
    bep = self.cfg.GetClusterInfo().FillBE(instance)
2293
    # check bridges existance
2294
    _CheckInstanceBridgesExist(self, instance)
2295

    
2296
    _CheckNodeFreeMemory(self, instance.primary_node,
2297
                         "starting instance %s" % instance.name,
2298
                         bep[constants.BE_MEMORY], instance.hypervisor)
2299

    
2300
  def Exec(self, feedback_fn):
2301
    """Start the instance.
2302

2303
    """
2304
    instance = self.instance
2305
    force = self.op.force
2306
    extra_args = getattr(self.op, "extra_args", "")
2307

    
2308
    self.cfg.MarkInstanceUp(instance.name)
2309

    
2310
    node_current = instance.primary_node
2311

    
2312
    _StartInstanceDisks(self, instance, force)
2313

    
2314
    if not self.rpc.call_instance_start(node_current, instance, extra_args):
2315
      _ShutdownInstanceDisks(self, instance)
2316
      raise errors.OpExecError("Could not start instance")
2317

    
2318

    
2319
class LURebootInstance(LogicalUnit):
2320
  """Reboot an instance.
2321

2322
  """
2323
  HPATH = "instance-reboot"
2324
  HTYPE = constants.HTYPE_INSTANCE
2325
  _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2326
  REQ_BGL = False
2327

    
2328
  def ExpandNames(self):
2329
    if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2330
                                   constants.INSTANCE_REBOOT_HARD,
2331
                                   constants.INSTANCE_REBOOT_FULL]:
2332
      raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2333
                                  (constants.INSTANCE_REBOOT_SOFT,
2334
                                   constants.INSTANCE_REBOOT_HARD,
2335
                                   constants.INSTANCE_REBOOT_FULL))
2336
    self._ExpandAndLockInstance()
2337

    
2338
  def BuildHooksEnv(self):
2339
    """Build hooks env.
2340

2341
    This runs on master, primary and secondary nodes of the instance.
2342

2343
    """
2344
    env = {
2345
      "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2346
      }
2347
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2348
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2349
          list(self.instance.secondary_nodes))
2350
    return env, nl, nl
2351

    
2352
  def CheckPrereq(self):
2353
    """Check prerequisites.
2354

2355
    This checks that the instance is in the cluster.
2356

2357
    """
2358
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2359
    assert self.instance is not None, \
2360
      "Cannot retrieve locked instance %s" % self.op.instance_name
2361

    
2362
    # check bridges existance
2363
    _CheckInstanceBridgesExist(self, instance)
2364

    
2365
  def Exec(self, feedback_fn):
2366
    """Reboot the instance.
2367

2368
    """
2369
    instance = self.instance
2370
    ignore_secondaries = self.op.ignore_secondaries
2371
    reboot_type = self.op.reboot_type
2372
    extra_args = getattr(self.op, "extra_args", "")
2373

    
2374
    node_current = instance.primary_node
2375

    
2376
    if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2377
                       constants.INSTANCE_REBOOT_HARD]:
2378
      if not self.rpc.call_instance_reboot(node_current, instance,
2379
                                           reboot_type, extra_args):
2380
        raise errors.OpExecError("Could not reboot instance")
2381
    else:
2382
      if not self.rpc.call_instance_shutdown(node_current, instance):
2383
        raise errors.OpExecError("could not shutdown instance for full reboot")
2384
      _ShutdownInstanceDisks(self, instance)
2385
      _StartInstanceDisks(self, instance, ignore_secondaries)
2386
      if not self.rpc.call_instance_start(node_current, instance, extra_args):
2387
        _ShutdownInstanceDisks(self, instance)
2388
        raise errors.OpExecError("Could not start instance for full reboot")
2389

    
2390
    self.cfg.MarkInstanceUp(instance.name)
2391

    
2392

    
2393
class LUShutdownInstance(LogicalUnit):
2394
  """Shutdown an instance.
2395

2396
  """
2397
  HPATH = "instance-stop"
2398
  HTYPE = constants.HTYPE_INSTANCE
2399
  _OP_REQP = ["instance_name"]
2400
  REQ_BGL = False
2401

    
2402
  def ExpandNames(self):
2403
    self._ExpandAndLockInstance()
2404

    
2405
  def BuildHooksEnv(self):
2406
    """Build hooks env.
2407

2408
    This runs on master, primary and secondary nodes of the instance.
2409

2410
    """
2411
    env = _BuildInstanceHookEnvByObject(self, self.instance)
2412
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2413
          list(self.instance.secondary_nodes))
2414
    return env, nl, nl
2415

    
2416
  def CheckPrereq(self):
2417
    """Check prerequisites.
2418

2419
    This checks that the instance is in the cluster.
2420

2421
    """
2422
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2423
    assert self.instance is not None, \
2424
      "Cannot retrieve locked instance %s" % self.op.instance_name
2425

    
2426
  def Exec(self, feedback_fn):
2427
    """Shutdown the instance.
2428

2429
    """
2430
    instance = self.instance
2431
    node_current = instance.primary_node
2432
    self.cfg.MarkInstanceDown(instance.name)
2433
    if not self.rpc.call_instance_shutdown(node_current, instance):
2434
      self.proc.LogWarning("Could not shutdown instance")
2435

    
2436
    _ShutdownInstanceDisks(self, instance)
2437

    
2438

    
2439
class LUReinstallInstance(LogicalUnit):
2440
  """Reinstall an instance.
2441

2442
  """
2443
  HPATH = "instance-reinstall"
2444
  HTYPE = constants.HTYPE_INSTANCE
2445
  _OP_REQP = ["instance_name"]
2446
  REQ_BGL = False
2447

    
2448
  def ExpandNames(self):
2449
    self._ExpandAndLockInstance()
2450

    
2451
  def BuildHooksEnv(self):
2452
    """Build hooks env.
2453

2454
    This runs on master, primary and secondary nodes of the instance.
2455

2456
    """
2457
    env = _BuildInstanceHookEnvByObject(self, self.instance)
2458
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2459
          list(self.instance.secondary_nodes))
2460
    return env, nl, nl
2461

    
2462
  def CheckPrereq(self):
2463
    """Check prerequisites.
2464

2465
    This checks that the instance is in the cluster and is not running.
2466

2467
    """
2468
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2469
    assert instance is not None, \
2470
      "Cannot retrieve locked instance %s" % self.op.instance_name
2471

    
2472
    if instance.disk_template == constants.DT_DISKLESS:
2473
      raise errors.OpPrereqError("Instance '%s' has no disks" %
2474
                                 self.op.instance_name)
2475
    if instance.status != "down":
2476
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2477
                                 self.op.instance_name)
2478
    remote_info = self.rpc.call_instance_info(instance.primary_node,
2479
                                              instance.name,
2480
                                              instance.hypervisor)
2481
    if remote_info:
2482
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2483
                                 (self.op.instance_name,
2484
                                  instance.primary_node))
2485

    
2486
    self.op.os_type = getattr(self.op, "os_type", None)
2487
    if self.op.os_type is not None:
2488
      # OS verification
2489
      pnode = self.cfg.GetNodeInfo(
2490
        self.cfg.ExpandNodeName(instance.primary_node))
2491
      if pnode is None:
2492
        raise errors.OpPrereqError("Primary node '%s' is unknown" %
2493
                                   self.op.pnode)
2494
      os_obj = self.rpc.call_os_get(pnode.name, self.op.os_type)
2495
      if not os_obj:
2496
        raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2497
                                   " primary node"  % self.op.os_type)
2498

    
2499
    self.instance = instance
2500

    
2501
  def Exec(self, feedback_fn):
2502
    """Reinstall the instance.
2503

2504
    """
2505
    inst = self.instance
2506

    
2507
    if self.op.os_type is not None:
2508
      feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2509
      inst.os = self.op.os_type
2510
      self.cfg.Update(inst)
2511

    
2512
    _StartInstanceDisks(self, inst, None)
2513
    try:
2514
      feedback_fn("Running the instance OS create scripts...")
2515
      if not self.rpc.call_instance_os_add(inst.primary_node, inst):
2516
        raise errors.OpExecError("Could not install OS for instance %s"
2517
                                 " on node %s" %
2518
                                 (inst.name, inst.primary_node))
2519
    finally:
2520
      _ShutdownInstanceDisks(self, inst)
2521

    
2522

    
2523
class LURenameInstance(LogicalUnit):
2524
  """Rename an instance.
2525

2526
  """
2527
  HPATH = "instance-rename"
2528
  HTYPE = constants.HTYPE_INSTANCE
2529
  _OP_REQP = ["instance_name", "new_name"]
2530

    
2531
  def BuildHooksEnv(self):
2532
    """Build hooks env.
2533

2534
    This runs on master, primary and secondary nodes of the instance.
2535

2536
    """
2537
    env = _BuildInstanceHookEnvByObject(self, self.instance)
2538
    env["INSTANCE_NEW_NAME"] = self.op.new_name
2539
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2540
          list(self.instance.secondary_nodes))
2541
    return env, nl, nl
2542

    
2543
  def CheckPrereq(self):
2544
    """Check prerequisites.
2545

2546
    This checks that the instance is in the cluster and is not running.
2547

2548
    """
2549
    instance = self.cfg.GetInstanceInfo(
2550
      self.cfg.ExpandInstanceName(self.op.instance_name))
2551
    if instance is None:
2552
      raise errors.OpPrereqError("Instance '%s' not known" %
2553
                                 self.op.instance_name)
2554
    if instance.status != "down":
2555
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2556
                                 self.op.instance_name)
2557
    remote_info = self.rpc.call_instance_info(instance.primary_node,
2558
                                              instance.name,
2559
                                              instance.hypervisor)
2560
    if remote_info:
2561
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2562
                                 (self.op.instance_name,
2563
                                  instance.primary_node))
2564
    self.instance = instance
2565

    
2566
    # new name verification
2567
    name_info = utils.HostInfo(self.op.new_name)
2568

    
2569
    self.op.new_name = new_name = name_info.name
2570
    instance_list = self.cfg.GetInstanceList()
2571
    if new_name in instance_list:
2572
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2573
                                 new_name)
2574

    
2575
    if not getattr(self.op, "ignore_ip", False):
2576
      if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
2577
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
2578
                                   (name_info.ip, new_name))
2579

    
2580

    
2581
  def Exec(self, feedback_fn):
2582
    """Reinstall the instance.
2583

2584
    """
2585
    inst = self.instance
2586
    old_name = inst.name
2587

    
2588
    if inst.disk_template == constants.DT_FILE:
2589
      old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2590

    
2591
    self.cfg.RenameInstance(inst.name, self.op.new_name)
2592
    # Change the instance lock. This is definitely safe while we hold the BGL
2593
    self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
2594
    self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
2595

    
2596
    # re-read the instance from the configuration after rename
2597
    inst = self.cfg.GetInstanceInfo(self.op.new_name)
2598

    
2599
    if inst.disk_template == constants.DT_FILE:
2600
      new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2601
      result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
2602
                                                     old_file_storage_dir,
2603
                                                     new_file_storage_dir)
2604

    
2605
      if not result:
2606
        raise errors.OpExecError("Could not connect to node '%s' to rename"
2607
                                 " directory '%s' to '%s' (but the instance"
2608
                                 " has been renamed in Ganeti)" % (
2609
                                 inst.primary_node, old_file_storage_dir,
2610
                                 new_file_storage_dir))
2611

    
2612
      if not result[0]:
2613
        raise errors.OpExecError("Could not rename directory '%s' to '%s'"
2614
                                 " (but the instance has been renamed in"
2615
                                 " Ganeti)" % (old_file_storage_dir,
2616
                                               new_file_storage_dir))
2617

    
2618
    _StartInstanceDisks(self, inst, None)
2619
    try:
2620
      if not self.rpc.call_instance_run_rename(inst.primary_node, inst,
2621
                                               old_name):
2622
        msg = ("Could not run OS rename script for instance %s on node %s"
2623
               " (but the instance has been renamed in Ganeti)" %
2624
               (inst.name, inst.primary_node))
2625
        self.proc.LogWarning(msg)
2626
    finally:
2627
      _ShutdownInstanceDisks(self, inst)
2628

    
2629

    
2630
class LURemoveInstance(LogicalUnit):
2631
  """Remove an instance.
2632

2633
  """
2634
  HPATH = "instance-remove"
2635
  HTYPE = constants.HTYPE_INSTANCE
2636
  _OP_REQP = ["instance_name", "ignore_failures"]
2637
  REQ_BGL = False
2638

    
2639
  def ExpandNames(self):
2640
    self._ExpandAndLockInstance()
2641
    self.needed_locks[locking.LEVEL_NODE] = []
2642
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2643

    
2644
  def DeclareLocks(self, level):
2645
    if level == locking.LEVEL_NODE:
2646
      self._LockInstancesNodes()
2647

    
2648
  def BuildHooksEnv(self):
2649
    """Build hooks env.
2650

2651
    This runs on master, primary and secondary nodes of the instance.
2652

2653
    """
2654
    env = _BuildInstanceHookEnvByObject(self, self.instance)
2655
    nl = [self.cfg.GetMasterNode()]
2656
    return env, nl, nl
2657

    
2658
  def CheckPrereq(self):
2659
    """Check prerequisites.
2660

2661
    This checks that the instance is in the cluster.
2662

2663
    """
2664
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2665
    assert self.instance is not None, \
2666
      "Cannot retrieve locked instance %s" % self.op.instance_name
2667

    
2668
  def Exec(self, feedback_fn):
2669
    """Remove the instance.
2670

2671
    """
2672
    instance = self.instance
2673
    logging.info("Shutting down instance %s on node %s",
2674
                 instance.name, instance.primary_node)
2675

    
2676
    if not self.rpc.call_instance_shutdown(instance.primary_node, instance):
2677
      if self.op.ignore_failures:
2678
        feedback_fn("Warning: can't shutdown instance")
2679
      else:
2680
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2681
                                 (instance.name, instance.primary_node))
2682

    
2683
    logging.info("Removing block devices for instance %s", instance.name)
2684

    
2685
    if not _RemoveDisks(self, instance):
2686
      if self.op.ignore_failures:
2687
        feedback_fn("Warning: can't remove instance's disks")
2688
      else:
2689
        raise errors.OpExecError("Can't remove instance's disks")
2690

    
2691
    logging.info("Removing instance %s out of cluster config", instance.name)
2692

    
2693
    self.cfg.RemoveInstance(instance.name)
2694
    self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
2695

    
2696

    
2697
class LUQueryInstances(NoHooksLU):
2698
  """Logical unit for querying instances.
2699

2700
  """
2701
  _OP_REQP = ["output_fields", "names"]
2702
  REQ_BGL = False
2703
  _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
2704
                                    "admin_state", "admin_ram",
2705
                                    "disk_template", "ip", "mac", "bridge",
2706
                                    "sda_size", "sdb_size", "vcpus", "tags",
2707
                                    "network_port", "beparams",
2708
                                    "(disk).(size)/([0-9]+)",
2709
                                    "(disk).(sizes)",
2710
                                    "(nic).(mac|ip|bridge)/([0-9]+)",
2711
                                    "(nic).(macs|ips|bridges)",
2712
                                    "(disk|nic).(count)",
2713
                                    "serial_no", "hypervisor", "hvparams",] +
2714
                                  ["hv/%s" % name
2715
                                   for name in constants.HVS_PARAMETERS] +
2716
                                  ["be/%s" % name
2717
                                   for name in constants.BES_PARAMETERS])
2718
  _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
2719

    
2720

    
2721
  def ExpandNames(self):
2722
    _CheckOutputFields(static=self._FIELDS_STATIC,
2723
                       dynamic=self._FIELDS_DYNAMIC,
2724
                       selected=self.op.output_fields)
2725

    
2726
    self.needed_locks = {}
2727
    self.share_locks[locking.LEVEL_INSTANCE] = 1
2728
    self.share_locks[locking.LEVEL_NODE] = 1
2729

    
2730
    if self.op.names:
2731
      self.wanted = _GetWantedInstances(self, self.op.names)
2732
    else:
2733
      self.wanted = locking.ALL_SET
2734

    
2735
    self.do_locking = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
2736
    if self.do_locking:
2737
      self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
2738
      self.needed_locks[locking.LEVEL_NODE] = []
2739
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2740

    
2741
  def DeclareLocks(self, level):
2742
    if level == locking.LEVEL_NODE and self.do_locking:
2743
      self._LockInstancesNodes()
2744

    
2745
  def CheckPrereq(self):
2746
    """Check prerequisites.
2747

2748
    """
2749
    pass
2750

    
2751
  def Exec(self, feedback_fn):
2752
    """Computes the list of nodes and their attributes.
2753

2754
    """
2755
    all_info = self.cfg.GetAllInstancesInfo()
2756
    if self.do_locking:
2757
      instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
2758
    elif self.wanted != locking.ALL_SET:
2759
      instance_names = self.wanted
2760
      missing = set(instance_names).difference(all_info.keys())
2761
      if missing:
2762
        raise errors.OpExecError(
2763
          "Some instances were removed before retrieving their data: %s"
2764
          % missing)
2765
    else:
2766
      instance_names = all_info.keys()
2767

    
2768
    instance_names = utils.NiceSort(instance_names)
2769
    instance_list = [all_info[iname] for iname in instance_names]
2770

    
2771
    # begin data gathering
2772

    
2773
    nodes = frozenset([inst.primary_node for inst in instance_list])
2774
    hv_list = list(set([inst.hypervisor for inst in instance_list]))
2775

    
2776
    bad_nodes = []
2777
    if self.do_locking:
2778
      live_data = {}
2779
      node_data = self.rpc.call_all_instances_info(nodes, hv_list)
2780
      for name in nodes:
2781
        result = node_data[name]
2782
        if result:
2783
          live_data.update(result)
2784
        elif result == False:
2785
          bad_nodes.append(name)
2786
        # else no instance is alive
2787
    else:
2788
      live_data = dict([(name, {}) for name in instance_names])
2789

    
2790
    # end data gathering
2791

    
2792
    HVPREFIX = "hv/"
2793
    BEPREFIX = "be/"
2794
    output = []
2795
    for instance in instance_list:
2796
      iout = []
2797
      i_hv = self.cfg.GetClusterInfo().FillHV(instance)
2798
      i_be = self.cfg.GetClusterInfo().FillBE(instance)
2799
      for field in self.op.output_fields:
2800
        st_match = self._FIELDS_STATIC.Matches(field)
2801
        if field == "name":
2802
          val = instance.name
2803
        elif field == "os":
2804
          val = instance.os
2805
        elif field == "pnode":
2806
          val = instance.primary_node
2807
        elif field == "snodes":
2808
          val = list(instance.secondary_nodes)
2809
        elif field == "admin_state":
2810
          val = (instance.status != "down")
2811
        elif field == "oper_state":
2812
          if instance.primary_node in bad_nodes:
2813
            val = None
2814
          else:
2815
            val = bool(live_data.get(instance.name))
2816
        elif field == "status":
2817
          if instance.primary_node in bad_nodes:
2818
            val = "ERROR_nodedown"
2819
          else:
2820
            running = bool(live_data.get(instance.name))
2821
            if running:
2822
              if instance.status != "down":
2823
                val = "running"
2824
              else:
2825
                val = "ERROR_up"
2826
            else:
2827
              if instance.status != "down":
2828
                val = "ERROR_down"
2829
              else:
2830
                val = "ADMIN_down"
2831
        elif field == "oper_ram":
2832
          if instance.primary_node in bad_nodes:
2833
            val = None
2834
          elif instance.name in live_data:
2835
            val = live_data[instance.name].get("memory", "?")
2836
          else:
2837
            val = "-"
2838
        elif field == "disk_template":
2839
          val = instance.disk_template
2840
        elif field == "ip":
2841
          val = instance.nics[0].ip
2842
        elif field == "bridge":
2843
          val = instance.nics[0].bridge
2844
        elif field == "mac":
2845
          val = instance.nics[0].mac
2846
        elif field == "sda_size" or field == "sdb_size":
2847
          idx = ord(field[2]) - ord('a')
2848
          try:
2849
            val = instance.FindDisk(idx).size
2850
          except errors.OpPrereqError:
2851
            val = None
2852
        elif field == "tags":
2853
          val = list(instance.GetTags())
2854
        elif field == "serial_no":
2855
          val = instance.serial_no
2856
        elif field == "network_port":
2857
          val = instance.network_port
2858
        elif field == "hypervisor":
2859
          val = instance.hypervisor
2860
        elif field == "hvparams":
2861
          val = i_hv
2862
        elif (field.startswith(HVPREFIX) and
2863
              field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
2864
          val = i_hv.get(field[len(HVPREFIX):], None)
2865
        elif field == "beparams":
2866
          val = i_be
2867
        elif (field.startswith(BEPREFIX) and
2868
              field[len(BEPREFIX):] in constants.BES_PARAMETERS):
2869
          val = i_be.get(field[len(BEPREFIX):], None)
2870
        elif st_match and st_match.groups():
2871
          # matches a variable list
2872
          st_groups = st_match.groups()
2873
          if st_groups and st_groups[0] == "disk":
2874
            if st_groups[1] == "count":
2875
              val = len(instance.disks)
2876
            elif st_groups[1] == "sizes":
2877
              val = [disk.size for disk in instance.disks]
2878
            elif st_groups[1] == "size":
2879
              try:
2880
                val = instance.FindDisk(st_groups[2]).size
2881
              except errors.OpPrereqError:
2882
                val = None
2883
            else:
2884
              assert False, "Unhandled disk parameter"
2885
          elif st_groups[0] == "nic":
2886
            if st_groups[1] == "count":
2887
              val = len(instance.nics)
2888
            elif st_groups[1] == "macs":
2889
              val = [nic.mac for nic in instance.nics]
2890
            elif st_groups[1] == "ips":
2891
              val = [nic.ip for nic in instance.nics]
2892
            elif st_groups[1] == "bridges":
2893
              val = [nic.bridge for nic in instance.nics]
2894
            else:
2895
              # index-based item
2896
              nic_idx = int(st_groups[2])
2897
              if nic_idx >= len(instance.nics):
2898
                val = None
2899
              else:
2900
                if st_groups[1] == "mac":
2901
                  val = instance.nics[nic_idx].mac
2902
                elif st_groups[1] == "ip":
2903
                  val = instance.nics[nic_idx].ip
2904
                elif st_groups[1] == "bridge":
2905
                  val = instance.nics[nic_idx].bridge
2906
                else:
2907
                  assert False, "Unhandled NIC parameter"
2908
          else:
2909
            assert False, "Unhandled variable parameter"
2910
        else:
2911
          raise errors.ParameterError(field)
2912
        iout.append(val)
2913
      output.append(iout)
2914

    
2915
    return output
2916

    
2917

    
2918
class LUFailoverInstance(LogicalUnit):
2919
  """Failover an instance.
2920

2921
  """
2922
  HPATH = "instance-failover"
2923
  HTYPE = constants.HTYPE_INSTANCE
2924
  _OP_REQP = ["instance_name", "ignore_consistency"]
2925
  REQ_BGL = False
2926

    
2927
  def ExpandNames(self):
2928
    self._ExpandAndLockInstance()
2929
    self.needed_locks[locking.LEVEL_NODE] = []
2930
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2931

    
2932
  def DeclareLocks(self, level):
2933
    if level == locking.LEVEL_NODE:
2934
      self._LockInstancesNodes()
2935

    
2936
  def BuildHooksEnv(self):
2937
    """Build hooks env.
2938

2939
    This runs on master, primary and secondary nodes of the instance.
2940

2941
    """
2942
    env = {
2943
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2944
      }
2945
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2946
    nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
2947
    return env, nl, nl
2948

    
2949
  def CheckPrereq(self):
2950
    """Check prerequisites.
2951

2952
    This checks that the instance is in the cluster.
2953

2954
    """
2955
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2956
    assert self.instance is not None, \
2957
      "Cannot retrieve locked instance %s" % self.op.instance_name
2958

    
2959
    bep = self.cfg.GetClusterInfo().FillBE(instance)
2960
    if instance.disk_template not in constants.DTS_NET_MIRROR:
2961
      raise errors.OpPrereqError("Instance's disk layout is not"
2962
                                 " network mirrored, cannot failover.")
2963

    
2964
    secondary_nodes = instance.secondary_nodes
2965
    if not secondary_nodes:
2966
      raise errors.ProgrammerError("no secondary node but using "
2967
                                   "a mirrored disk template")
2968

    
2969
    target_node = secondary_nodes[0]
2970
    # check memory requirements on the secondary node
2971
    _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
2972
                         instance.name, bep[constants.BE_MEMORY],
2973
                         instance.hypervisor)
2974

    
2975
    # check bridge existance
2976
    brlist = [nic.bridge for nic in instance.nics]
2977
    if not self.rpc.call_bridges_exist(target_node, brlist):
2978
      raise errors.OpPrereqError("One or more target bridges %s does not"
2979
                                 " exist on destination node '%s'" %
2980
                                 (brlist, target_node))
2981

    
2982
  def Exec(self, feedback_fn):
2983
    """Failover an instance.
2984

2985
    The failover is done by shutting it down on its present node and
2986
    starting it on the secondary.
2987

2988
    """
2989
    instance = self.instance
2990

    
2991
    source_node = instance.primary_node
2992
    target_node = instance.secondary_nodes[0]
2993

    
2994
    feedback_fn("* checking disk consistency between source and target")
2995
    for dev in instance.disks:
2996
      # for drbd, these are drbd over lvm
2997
      if not _CheckDiskConsistency(self, dev, target_node, False):
2998
        if instance.status == "up" and not self.op.ignore_consistency:
2999
          raise errors.OpExecError("Disk %s is degraded on target node,"
3000
                                   " aborting failover." % dev.iv_name)
3001

    
3002
    feedback_fn("* shutting down instance on source node")
3003
    logging.info("Shutting down instance %s on node %s",
3004
                 instance.name, source_node)
3005

    
3006
    if not self.rpc.call_instance_shutdown(source_node, instance):
3007
      if self.op.ignore_consistency:
3008
        self.proc.LogWarning("Could not shutdown instance %s on node %s."
3009
                             " Proceeding"
3010
                             " anyway. Please make sure node %s is down",
3011
                             instance.name, source_node, source_node)
3012
      else:
3013
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
3014
                                 (instance.name, source_node))
3015

    
3016
    feedback_fn("* deactivating the instance's disks on source node")
3017
    if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
3018
      raise errors.OpExecError("Can't shut down the instance's disks.")
3019

    
3020
    instance.primary_node = target_node
3021
    # distribute new instance config to the other nodes
3022
    self.cfg.Update(instance)
3023

    
3024
    # Only start the instance if it's marked as up
3025
    if instance.status == "up":
3026
      feedback_fn("* activating the instance's disks on target node")
3027
      logging.info("Starting instance %s on node %s",
3028
                   instance.name, target_node)
3029

    
3030
      disks_ok, dummy = _AssembleInstanceDisks(self, instance,
3031
                                               ignore_secondaries=True)
3032
      if not disks_ok:
3033
        _ShutdownInstanceDisks(self, instance)
3034
        raise errors.OpExecError("Can't activate the instance's disks")
3035

    
3036
      feedback_fn("* starting the instance on the target node")
3037
      if not self.rpc.call_instance_start(target_node, instance, None):
3038
        _ShutdownInstanceDisks(self, instance)
3039
        raise errors.OpExecError("Could not start instance %s on node %s." %
3040
                                 (instance.name, target_node))
3041

    
3042

    
3043
def _CreateBlockDevOnPrimary(lu, node, instance, device, info):
3044
  """Create a tree of block devices on the primary node.
3045

3046
  This always creates all devices.
3047

3048
  """
3049
  if device.children:
3050
    for child in device.children:
3051
      if not _CreateBlockDevOnPrimary(lu, node, instance, child, info):
3052
        return False
3053

    
3054
  lu.cfg.SetDiskID(device, node)
3055
  new_id = lu.rpc.call_blockdev_create(node, device, device.size,
3056
                                       instance.name, True, info)
3057
  if not new_id:
3058
    return False
3059
  if device.physical_id is None:
3060
    device.physical_id = new_id
3061
  return True
3062

    
3063

    
3064
def _CreateBlockDevOnSecondary(lu, node, instance, device, force, info):
3065
  """Create a tree of block devices on a secondary node.
3066

3067
  If this device type has to be created on secondaries, create it and
3068
  all its children.
3069

3070
  If not, just recurse to children keeping the same 'force' value.
3071

3072
  """
3073
  if device.CreateOnSecondary():
3074
    force = True
3075
  if device.children:
3076
    for child in device.children:
3077
      if not _CreateBlockDevOnSecondary(lu, node, instance,
3078
                                        child, force, info):
3079
        return False
3080

    
3081
  if not force:
3082
    return True
3083
  lu.cfg.SetDiskID(device, node)
3084
  new_id = lu.rpc.call_blockdev_create(node, device, device.size,
3085
                                       instance.name, False, info)
3086
  if not new_id:
3087
    return False
3088
  if device.physical_id is None:
3089
    device.physical_id = new_id
3090
  return True
3091

    
3092

    
3093
def _GenerateUniqueNames(lu, exts):
3094
  """Generate a suitable LV name.
3095

3096
  This will generate a logical volume name for the given instance.
3097

3098
  """
3099
  results = []
3100
  for val in exts:
3101
    new_id = lu.cfg.GenerateUniqueID()
3102
    results.append("%s%s" % (new_id, val))
3103
  return results
3104

    
3105

    
3106
def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
3107
                         p_minor, s_minor):
3108
  """Generate a drbd8 device complete with its children.
3109

3110
  """
3111
  port = lu.cfg.AllocatePort()
3112
  vgname = lu.cfg.GetVGName()
3113
  shared_secret = lu.cfg.GenerateDRBDSecret()
3114
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3115
                          logical_id=(vgname, names[0]))
3116
  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3117
                          logical_id=(vgname, names[1]))
3118
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
3119
                          logical_id=(primary, secondary, port,
3120
                                      p_minor, s_minor,
3121
                                      shared_secret),
3122
                          children=[dev_data, dev_meta],
3123
                          iv_name=iv_name)
3124
  return drbd_dev
3125

    
3126

    
3127
def _GenerateDiskTemplate(lu, template_name,
3128
                          instance_name, primary_node,
3129
                          secondary_nodes, disk_info,
3130
                          file_storage_dir, file_driver):
3131
  """Generate the entire disk layout for a given template type.
3132

3133
  """
3134
  #TODO: compute space requirements
3135

    
3136
  vgname = lu.cfg.GetVGName()
3137
  disk_count = len(disk_info)
3138
  disks = []
3139
  if template_name == constants.DT_DISKLESS:
3140
    pass
3141
  elif template_name == constants.DT_PLAIN:
3142
    if len(secondary_nodes) != 0:
3143
      raise errors.ProgrammerError("Wrong template configuration")
3144

    
3145
    names = _GenerateUniqueNames(lu, [".disk%d" % i
3146
                                      for i in range(disk_count)])
3147
    for idx, disk in enumerate(disk_info):
3148
      disk_dev = objects.Disk(dev_type=constants.LD_LV, size=disk["size"],
3149
                              logical_id=(vgname, names[idx]),
3150
                              iv_name = "disk/%d" % idx)
3151
      disks.append(disk_dev)
3152
  elif template_name == constants.DT_DRBD8:
3153
    if len(secondary_nodes) != 1:
3154
      raise errors.ProgrammerError("Wrong template configuration")
3155
    remote_node = secondary_nodes[0]
3156
    minors = lu.cfg.AllocateDRBDMinor(
3157
      [primary_node, remote_node] * len(disk_info), instance_name)
3158

    
3159
    names = _GenerateUniqueNames(lu,
3160
                                 [".disk%d_%s" % (i, s)
3161
                                  for i in range(disk_count)
3162
                                  for s in ("data", "meta")
3163
                                  ])
3164
    for idx, disk in enumerate(disk_info):
3165
      disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
3166
                                      disk["size"], names[idx*2:idx*2+2],
3167
                                      "disk/%d" % idx,
3168
                                      minors[idx*2], minors[idx*2+1])
3169
      disks.append(disk_dev)
3170
  elif template_name == constants.DT_FILE:
3171
    if len(secondary_nodes) != 0:
3172
      raise errors.ProgrammerError("Wrong template configuration")
3173

    
3174
    for idx, disk in enumerate(disk_info):
3175

    
3176
      disk_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk["size"],
3177
                              iv_name="disk/%d" % idx,
3178
                              logical_id=(file_driver,
3179
                                          "%s/disk%d" % (file_storage_dir,
3180
                                                         idx)))
3181
      disks.append(disk_dev)
3182
  else:
3183
    raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
3184
  return disks
3185

    
3186

    
3187
def _GetInstanceInfoText(instance):
3188
  """Compute that text that should be added to the disk's metadata.
3189

3190
  """
3191
  return "originstname+%s" % instance.name
3192

    
3193

    
3194
def _CreateDisks(lu, instance):
3195
  """Create all disks for an instance.
3196

3197
  This abstracts away some work from AddInstance.
3198

3199
  @type lu: L{LogicalUnit}
3200
  @param lu: the logical unit on whose behalf we execute
3201
  @type instance: L{objects.Instance}
3202
  @param instance: the instance whose disks we should create
3203
  @rtype: boolean
3204
  @return: the success of the creation
3205

3206
  """
3207
  info = _GetInstanceInfoText(instance)
3208

    
3209
  if instance.disk_template == constants.DT_FILE:
3210
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3211
    result = lu.rpc.call_file_storage_dir_create(instance.primary_node,
3212
                                                 file_storage_dir)
3213

    
3214
    if not result:
3215
      logging.error("Could not connect to node '%s'", instance.primary_node)
3216
      return False
3217

    
3218
    if not result[0]:
3219
      logging.error("Failed to create directory '%s'", file_storage_dir)
3220
      return False
3221

    
3222
  for device in instance.disks:
3223
    logging.info("Creating volume %s for instance %s",
3224
                 device.iv_name, instance.name)
3225
    #HARDCODE
3226
    for secondary_node in instance.secondary_nodes:
3227
      if not _CreateBlockDevOnSecondary(lu, secondary_node, instance,
3228
                                        device, False, info):
3229
        logging.error("Failed to create volume %s (%s) on secondary node %s!",
3230
                      device.iv_name, device, secondary_node)
3231
        return False
3232
    #HARDCODE
3233
    if not _CreateBlockDevOnPrimary(lu, instance.primary_node,
3234
                                    instance, device, info):
3235
      logging.error("Failed to create volume %s on primary!", device.iv_name)
3236
      return False
3237

    
3238
  return True
3239

    
3240

    
3241
def _RemoveDisks(lu, instance):
3242
  """Remove all disks for an instance.
3243

3244
  This abstracts away some work from `AddInstance()` and
3245
  `RemoveInstance()`. Note that in case some of the devices couldn't
3246
  be removed, the removal will continue with the other ones (compare
3247
  with `_CreateDisks()`).
3248

3249
  @type lu: L{LogicalUnit}
3250
  @param lu: the logical unit on whose behalf we execute
3251
  @type instance: L{objects.Instance}
3252
  @param instance: the instance whose disks we should remove
3253
  @rtype: boolean
3254
  @return: the success of the removal
3255

3256
  """
3257
  logging.info("Removing block devices for instance %s", instance.name)
3258

    
3259
  result = True
3260
  for device in instance.disks:
3261
    for node, disk in device.ComputeNodeTree(instance.primary_node):
3262
      lu.cfg.SetDiskID(disk, node)
3263
      if not lu.rpc.call_blockdev_remove(node, disk):
3264
        lu.proc.LogWarning("Could not remove block device %s on node %s,"
3265
                           " continuing anyway", device.iv_name, node)
3266
        result = False
3267

    
3268
  if instance.disk_template == constants.DT_FILE:
3269
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3270
    if not lu.rpc.call_file_storage_dir_remove(instance.primary_node,
3271
                                               file_storage_dir):
3272
      logging.error("Could not remove directory '%s'", file_storage_dir)
3273
      result = False
3274

    
3275
  return result
3276

    
3277

    
3278
def _ComputeDiskSize(disk_template, disks):
3279
  """Compute disk size requirements in the volume group
3280

3281
  """
3282
  # Required free disk space as a function of disk and swap space
3283
  req_size_dict = {
3284
    constants.DT_DISKLESS: None,
3285
    constants.DT_PLAIN: sum(d["size"] for d in disks),
3286
    # 128 MB are added for drbd metadata for each disk
3287
    constants.DT_DRBD8: sum(d["size"] + 128 for d in disks),
3288
    constants.DT_FILE: None,
3289
  }
3290

    
3291
  if disk_template not in req_size_dict:
3292
    raise errors.ProgrammerError("Disk template '%s' size requirement"
3293
                                 " is unknown" %  disk_template)
3294

    
3295
  return req_size_dict[disk_template]
3296

    
3297

    
3298
def _CheckHVParams(lu, nodenames, hvname, hvparams):
3299
  """Hypervisor parameter validation.
3300

3301
  This function abstract the hypervisor parameter validation to be
3302
  used in both instance create and instance modify.
3303

3304
  @type lu: L{LogicalUnit}
3305
  @param lu: the logical unit for which we check
3306
  @type nodenames: list
3307
  @param nodenames: the list of nodes on which we should check
3308
  @type hvname: string
3309
  @param hvname: the name of the hypervisor we should use
3310
  @type hvparams: dict
3311
  @param hvparams: the parameters which we need to check
3312
  @raise errors.OpPrereqError: if the parameters are not valid
3313

3314
  """
3315
  hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
3316
                                                  hvname,
3317
                                                  hvparams)
3318
  for node in nodenames:
3319
    info = hvinfo.get(node, None)
3320
    if not info or not isinstance(info, (tuple, list)):
3321
      raise errors.OpPrereqError("Cannot get current information"
3322
                                 " from node '%s' (%s)" % (node, info))
3323
    if not info[0]:
3324
      raise errors.OpPrereqError("Hypervisor parameter validation failed:"
3325
                                 " %s" % info[1])
3326

    
3327

    
3328
class LUCreateInstance(LogicalUnit):
3329
  """Create an instance.
3330

3331
  """
3332
  HPATH = "instance-add"
3333
  HTYPE = constants.HTYPE_INSTANCE
3334
  _OP_REQP = ["instance_name", "disks", "disk_template",
3335
              "mode", "start",
3336
              "wait_for_sync", "ip_check", "nics",
3337
              "hvparams", "beparams"]
3338
  REQ_BGL = False
3339

    
3340
  def _ExpandNode(self, node):
3341
    """Expands and checks one node name.
3342

3343
    """
3344
    node_full = self.cfg.ExpandNodeName(node)
3345
    if node_full is None:
3346
      raise errors.OpPrereqError("Unknown node %s" % node)
3347
    return node_full
3348

    
3349
  def ExpandNames(self):
3350
    """ExpandNames for CreateInstance.
3351

3352
    Figure out the right locks for instance creation.
3353

3354
    """
3355
    self.needed_locks = {}
3356

    
3357
    # set optional parameters to none if they don't exist
3358
    for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
3359
      if not hasattr(self.op, attr):
3360
        setattr(self.op, attr, None)
3361

    
3362
    # cheap checks, mostly valid constants given
3363

    
3364
    # verify creation mode
3365
    if self.op.mode not in (constants.INSTANCE_CREATE,
3366
                            constants.INSTANCE_IMPORT):
3367
      raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
3368
                                 self.op.mode)
3369

    
3370
    # disk template and mirror node verification
3371
    if self.op.disk_template not in constants.DISK_TEMPLATES:
3372
      raise errors.OpPrereqError("Invalid disk template name")
3373

    
3374
    if self.op.hypervisor is None:
3375
      self.op.hypervisor = self.cfg.GetHypervisorType()
3376

    
3377
    cluster = self.cfg.GetClusterInfo()
3378
    enabled_hvs = cluster.enabled_hypervisors
3379
    if self.op.hypervisor not in enabled_hvs:
3380
      raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
3381
                                 " cluster (%s)" % (self.op.hypervisor,
3382
                                  ",".join(enabled_hvs)))
3383

    
3384
    # check hypervisor parameter syntax (locally)
3385

    
3386
    filled_hvp = cluster.FillDict(cluster.hvparams[self.op.hypervisor],
3387
                                  self.op.hvparams)
3388
    hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
3389
    hv_type.CheckParameterSyntax(filled_hvp)
3390

    
3391
    # fill and remember the beparams dict
3392
    self.be_full = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
3393
                                    self.op.beparams)
3394

    
3395
    #### instance parameters check
3396

    
3397
    # instance name verification
3398
    hostname1 = utils.HostInfo(self.op.instance_name)
3399
    self.op.instance_name = instance_name = hostname1.name
3400

    
3401
    # this is just a preventive check, but someone might still add this
3402
    # instance in the meantime, and creation will fail at lock-add time
3403
    if instance_name in self.cfg.GetInstanceList():
3404
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3405
                                 instance_name)
3406

    
3407
    self.add_locks[locking.LEVEL_INSTANCE] = instance_name
3408

    
3409
    # NIC buildup
3410
    self.nics = []
3411
    for nic in self.op.nics:
3412
      # ip validity checks
3413
      ip = nic.get("ip", None)
3414
      if ip is None or ip.lower() == "none":
3415
        nic_ip = None
3416
      elif ip.lower() == constants.VALUE_AUTO:
3417
        nic_ip = hostname1.ip
3418
      else:
3419
        if not utils.IsValidIP(ip):
3420
          raise errors.OpPrereqError("Given IP address '%s' doesn't look"
3421
                                     " like a valid IP" % ip)
3422
        nic_ip = ip
3423

    
3424
      # MAC address verification
3425
      mac = nic.get("mac", constants.VALUE_AUTO)
3426
      if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
3427
        if not utils.IsValidMac(mac.lower()):
3428
          raise errors.OpPrereqError("Invalid MAC address specified: %s" %
3429
                                     mac)
3430
      # bridge verification
3431
      bridge = nic.get("bridge", self.cfg.GetDefBridge())
3432
      self.nics.append(objects.NIC(mac=mac, ip=nic_ip, bridge=bridge))
3433

    
3434
    # disk checks/pre-build
3435
    self.disks = []
3436
    for disk in self.op.disks:
3437
      mode = disk.get("mode", constants.DISK_RDWR)
3438
      if mode not in constants.DISK_ACCESS_SET:
3439
        raise errors.OpPrereqError("Invalid disk access mode '%s'" %
3440
                                   mode)
3441
      size = disk.get("size", None)
3442
      if size is None:
3443
        raise errors.OpPrereqError("Missing disk size")
3444
      try:
3445
        size = int(size)
3446
      except ValueError:
3447
        raise errors.OpPrereqError("Invalid disk size '%s'" % size)
3448
      self.disks.append({"size": size, "mode": mode})
3449

    
3450
    # used in CheckPrereq for ip ping check
3451
    self.check_ip = hostname1.ip
3452

    
3453
    # file storage checks
3454
    if (self.op.file_driver and
3455
        not self.op.file_driver in constants.FILE_DRIVER):
3456
      raise errors.OpPrereqError("Invalid file driver name '%s'" %
3457
                                 self.op.file_driver)
3458

    
3459
    if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
3460
      raise errors.OpPrereqError("File storage directory path not absolute")
3461

    
3462
    ### Node/iallocator related checks
3463
    if [self.op.iallocator, self.op.pnode].count(None) != 1:
3464
      raise errors.OpPrereqError("One and only one of iallocator and primary"
3465
                                 " node must be given")
3466

    
3467
    if self.op.iallocator:
3468
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3469
    else:
3470
      self.op.pnode = self._ExpandNode(self.op.pnode)
3471
      nodelist = [self.op.pnode]
3472
      if self.op.snode is not None:
3473
        self.op.snode = self._ExpandNode(self.op.snode)
3474
        nodelist.append(self.op.snode)
3475
      self.needed_locks[locking.LEVEL_NODE] = nodelist
3476

    
3477
    # in case of import lock the source node too
3478
    if self.op.mode == constants.INSTANCE_IMPORT:
3479
      src_node = getattr(self.op, "src_node", None)
3480
      src_path = getattr(self.op, "src_path", None)
3481

    
3482
      if src_node is None or src_path is None:
3483
        raise errors.OpPrereqError("Importing an instance requires source"
3484
                                   " node and path options")
3485

    
3486
      if not os.path.isabs(src_path):
3487
        raise errors.OpPrereqError("The source path must be absolute")
3488

    
3489
      self.op.src_node = src_node = self._ExpandNode(src_node)
3490
      if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
3491
        self.needed_locks[locking.LEVEL_NODE].append(src_node)
3492

    
3493
    else: # INSTANCE_CREATE
3494
      if getattr(self.op, "os_type", None) is None:
3495
        raise errors.OpPrereqError("No guest OS specified")
3496

    
3497
  def _RunAllocator(self):
3498
    """Run the allocator based on input opcode.
3499

3500
    """
3501
    nics = [n.ToDict() for n in self.nics]
3502
    ial = IAllocator(self,
3503
                     mode=constants.IALLOCATOR_MODE_ALLOC,
3504
                     name=self.op.instance_name,
3505
                     disk_template=self.op.disk_template,
3506
                     tags=[],
3507
                     os=self.op.os_type,
3508
                     vcpus=self.be_full[constants.BE_VCPUS],
3509
                     mem_size=self.be_full[constants.BE_MEMORY],
3510
                     disks=self.disks,
3511
                     nics=nics,
3512
                     hypervisor=self.op.hypervisor,
3513
                     )
3514

    
3515
    ial.Run(self.op.iallocator)
3516

    
3517
    if not ial.success:
3518
      raise errors.OpPrereqError("Can't compute nodes using"
3519
                                 " iallocator '%s': %s" % (self.op.iallocator,
3520
                                                           ial.info))
3521
    if len(ial.nodes) != ial.required_nodes:
3522
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3523
                                 " of nodes (%s), required %s" %
3524
                                 (self.op.iallocator, len(ial.nodes),
3525
                                  ial.required_nodes))
3526
    self.op.pnode = ial.nodes[0]
3527
    self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
3528
                 self.op.instance_name, self.op.iallocator,
3529
                 ", ".join(ial.nodes))
3530
    if ial.required_nodes == 2:
3531
      self.op.snode = ial.nodes[1]
3532

    
3533
  def BuildHooksEnv(self):
3534
    """Build hooks env.
3535

3536
    This runs on master, primary and secondary nodes of the instance.
3537

3538
    """
3539
    env = {
3540
      "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
3541
      "INSTANCE_DISK_SIZE": ",".join(str(d["size"]) for d in self.disks),
3542
      "INSTANCE_ADD_MODE": self.op.mode,
3543
      }
3544
    if self.op.mode == constants.INSTANCE_IMPORT:
3545
      env["INSTANCE_SRC_NODE"] = self.op.src_node
3546
      env["INSTANCE_SRC_PATH"] = self.op.src_path
3547
      env["INSTANCE_SRC_IMAGES"] = self.src_images
3548

    
3549
    env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
3550
      primary_node=self.op.pnode,
3551
      secondary_nodes=self.secondaries,
3552
      status=self.instance_status,
3553
      os_type=self.op.os_type,
3554
      memory=self.be_full[constants.BE_MEMORY],
3555
      vcpus=self.be_full[constants.BE_VCPUS],
3556
      nics=[(n.ip, n.bridge, n.mac) for n in self.nics],
3557
    ))
3558

    
3559
    nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
3560
          self.secondaries)
3561
    return env, nl, nl
3562

    
3563

    
3564
  def CheckPrereq(self):
3565
    """Check prerequisites.
3566

3567
    """
3568
    if (not self.cfg.GetVGName() and
3569
        self.op.disk_template not in constants.DTS_NOT_LVM):
3570
      raise errors.OpPrereqError("Cluster does not support lvm-based"
3571
                                 " instances")
3572

    
3573

    
3574
    if self.op.mode == constants.INSTANCE_IMPORT:
3575
      src_node = self.op.src_node
3576
      src_path = self.op.src_path
3577

    
3578
      export_info = self.rpc.call_export_info(src_node, src_path)
3579

    
3580
      if not export_info:
3581
        raise errors.OpPrereqError("No export found in dir %s" % src_path)
3582

    
3583
      if not export_info.has_section(constants.INISECT_EXP):
3584
        raise errors.ProgrammerError("Corrupted export config")
3585

    
3586
      ei_version = export_info.get(constants.INISECT_EXP, 'version')
3587
      if (int(ei_version) != constants.EXPORT_VERSION):
3588
        raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3589
                                   (ei_version, constants.EXPORT_VERSION))
3590

    
3591
      # Check that the new instance doesn't have less disks than the export
3592
      instance_disks = len(self.disks)
3593
      export_disks = export_info.getint(constants.INISECT_INS, 'disk_count')
3594
      if instance_disks < export_disks:
3595
        raise errors.OpPrereqError("Not enough disks to import."
3596
                                   " (instance: %d, export: %d)" %
3597
                                   (2, export_disks))
3598

    
3599
      self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3600
      disk_images = []
3601
      for idx in range(export_disks):
3602
        option = 'disk%d_dump' % idx
3603
        if export_info.has_option(constants.INISECT_INS, option):
3604
          # FIXME: are the old os-es, disk sizes, etc. useful?
3605
          export_name = export_info.get(constants.INISECT_INS, option)
3606
          image = os.path.join(src_path, export_name)
3607
          disk_images.append(image)
3608
        else:
3609
          disk_images.append(False)
3610

    
3611
      self.src_images = disk_images
3612

    
3613
      if self.op.mac == constants.VALUE_AUTO:
3614
        old_name = export_info.get(constants.INISECT_INS, 'name')
3615
        if self.op.instance_name == old_name:
3616
          # FIXME: adjust every nic, when we'll be able to create instances
3617
          # with more than one
3618
          if int(export_info.get(constants.INISECT_INS, 'nic_count')) >= 1:
3619
            self.op.mac = export_info.get(constants.INISECT_INS, 'nic_0_mac')
3620

    
3621
    # ip ping checks (we use the same ip that was resolved in ExpandNames)
3622

    
3623
    if self.op.start and not self.op.ip_check:
3624
      raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3625
                                 " adding an instance in start mode")
3626

    
3627
    if self.op.ip_check:
3628
      if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
3629
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
3630
                                   (self.check_ip, self.op.instance_name))
3631

    
3632
    #### allocator run
3633

    
3634
    if self.op.iallocator is not None:
3635
      self._RunAllocator()
3636

    
3637
    #### node related checks
3638

    
3639
    # check primary node
3640
    self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
3641
    assert self.pnode is not None, \
3642
      "Cannot retrieve locked node %s" % self.op.pnode
3643
    self.secondaries = []
3644

    
3645
    # mirror node verification
3646
    if self.op.disk_template in constants.DTS_NET_MIRROR:
3647
      if self.op.snode is None:
3648
        raise errors.OpPrereqError("The networked disk templates need"
3649
                                   " a mirror node")
3650
      if self.op.snode == pnode.name:
3651
        raise errors.OpPrereqError("The secondary node cannot be"
3652
                                   " the primary node.")
3653
      self.secondaries.append(self.op.snode)
3654

    
3655
    nodenames = [pnode.name] + self.secondaries
3656

    
3657
    req_size = _ComputeDiskSize(self.op.disk_template,
3658
                                self.disks)
3659

    
3660
    # Check lv size requirements
3661
    if req_size is not None:
3662
      nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
3663
                                         self.op.hypervisor)
3664
      for node in nodenames:
3665
        info = nodeinfo.get(node, None)
3666
        if not info:
3667
          raise errors.OpPrereqError("Cannot get current information"
3668
                                     " from node '%s'" % node)
3669
        vg_free = info.get('vg_free', None)
3670
        if not isinstance(vg_free, int):
3671
          raise errors.OpPrereqError("Can't compute free disk space on"
3672
                                     " node %s" % node)
3673
        if req_size > info['vg_free']:
3674
          raise errors.OpPrereqError("Not enough disk space on target node %s."
3675
                                     " %d MB available, %d MB required" %
3676
                                     (node, info['vg_free'], req_size))
3677

    
3678
    _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
3679

    
3680
    # os verification
3681
    os_obj = self.rpc.call_os_get(pnode.name, self.op.os_type)
3682
    if not os_obj:
3683
      raise errors.OpPrereqError("OS '%s' not in supported os list for"
3684
                                 " primary node"  % self.op.os_type)
3685

    
3686
    # bridge check on primary node
3687
    bridges = [n.bridge for n in self.nics]
3688
    if not self.rpc.call_bridges_exist(self.pnode.name, bridges):
3689
      raise errors.OpPrereqError("one of the target bridges '%s' does not"
3690
                                 " exist on"
3691
                                 " destination node '%s'" %
3692
                                 (",".join(bridges), pnode.name))
3693

    
3694
    # memory check on primary node
3695
    if self.op.start:
3696
      _CheckNodeFreeMemory(self, self.pnode.name,
3697
                           "creating instance %s" % self.op.instance_name,
3698
                           self.be_full[constants.BE_MEMORY],
3699
                           self.op.hypervisor)
3700

    
3701
    if self.op.start:
3702
      self.instance_status = 'up'
3703
    else:
3704
      self.instance_status = 'down'
3705

    
3706
  def Exec(self, feedback_fn):
3707
    """Create and add the instance to the cluster.
3708

3709
    """
3710
    instance = self.op.instance_name
3711
    pnode_name = self.pnode.name
3712

    
3713
    for nic in self.nics:
3714
      if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
3715
        nic.mac = self.cfg.GenerateMAC()
3716

    
3717
    ht_kind = self.op.hypervisor
3718
    if ht_kind in constants.HTS_REQ_PORT:
3719
      network_port = self.cfg.AllocatePort()
3720
    else:
3721
      network_port = None
3722

    
3723
    ##if self.op.vnc_bind_address is None:
3724
    ##  self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
3725

    
3726
    # this is needed because os.path.join does not accept None arguments
3727
    if self.op.file_storage_dir is None:
3728
      string_file_storage_dir = ""
3729
    else:
3730
      string_file_storage_dir = self.op.file_storage_dir
3731

    
3732
    # build the full file storage dir path
3733
    file_storage_dir = os.path.normpath(os.path.join(
3734
                                        self.cfg.GetFileStorageDir(),
3735
                                        string_file_storage_dir, instance))
3736

    
3737

    
3738
    disks = _GenerateDiskTemplate(self,
3739
                                  self.op.disk_template,
3740
                                  instance, pnode_name,
3741
                                  self.secondaries,
3742
                                  self.disks,
3743
                                  file_storage_dir,
3744
                                  self.op.file_driver)
3745

    
3746
    iobj = objects.Instance(name=instance, os=self.op.os_type,
3747
                            primary_node=pnode_name,
3748
                            nics=self.nics, disks=disks,
3749
                            disk_template=self.op.disk_template,
3750
                            status=self.instance_status,
3751
                            network_port=network_port,
3752
                            beparams=self.op.beparams,
3753
                            hvparams=self.op.hvparams,
3754
                            hypervisor=self.op.hypervisor,
3755
                            )
3756

    
3757
    feedback_fn("* creating instance disks...")
3758
    if not _CreateDisks(self, iobj):
3759
      _RemoveDisks(self, iobj)
3760
      self.cfg.ReleaseDRBDMinors(instance)
3761
      raise errors.OpExecError("Device creation failed, reverting...")
3762

    
3763
    feedback_fn("adding instance %s to cluster config" % instance)
3764

    
3765
    self.cfg.AddInstance(iobj)
3766
    # Declare that we don't want to remove the instance lock anymore, as we've
3767
    # added the instance to the config
3768
    del self.remove_locks[locking.LEVEL_INSTANCE]
3769
    # Remove the temp. assignements for the instance's drbds
3770
    self.cfg.ReleaseDRBDMinors(instance)
3771

    
3772
    if self.op.wait_for_sync:
3773
      disk_abort = not _WaitForSync(self, iobj)
3774
    elif iobj.disk_template in constants.DTS_NET_MIRROR:
3775
      # make sure the disks are not degraded (still sync-ing is ok)
3776
      time.sleep(15)
3777
      feedback_fn("* checking mirrors status")
3778
      disk_abort = not _WaitForSync(self, iobj, oneshot=True)
3779
    else:
3780
      disk_abort = False
3781

    
3782
    if disk_abort:
3783
      _RemoveDisks(self, iobj)
3784
      self.cfg.RemoveInstance(iobj.name)
3785
      # Make sure the instance lock gets removed
3786
      self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
3787
      raise errors.OpExecError("There are some degraded disks for"
3788
                               " this instance")
3789

    
3790
    feedback_fn("creating os for instance %s on node %s" %
3791
                (instance, pnode_name))
3792

    
3793
    if iobj.disk_template != constants.DT_DISKLESS:
3794
      if self.op.mode == constants.INSTANCE_CREATE:
3795
        feedback_fn("* running the instance OS create scripts...")
3796
        if not self.rpc.call_instance_os_add(pnode_name, iobj):
3797
          raise errors.OpExecError("could not add os for instance %s"
3798
                                   " on node %s" %
3799
                                   (instance, pnode_name))
3800

    
3801
      elif self.op.mode == constants.INSTANCE_IMPORT:
3802
        feedback_fn("* running the instance OS import scripts...")
3803
        src_node = self.op.src_node
3804
        src_images = self.src_images
3805
        cluster_name = self.cfg.GetClusterName()
3806
        import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
3807
                                                         src_node, src_images,
3808
                                                         cluster_name)
3809
        for idx, result in enumerate(import_result):
3810
          if not result:
3811
            self.LogWarning("Could not image %s for on instance %s, disk %d,"
3812
                            " on node %s" % (src_images[idx], instance, idx,
3813
                                             pnode_name))
3814
      else:
3815
        # also checked in the prereq part
3816
        raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3817
                                     % self.op.mode)
3818

    
3819
    if self.op.start:
3820
      logging.info("Starting instance %s on node %s", instance, pnode_name)
3821
      feedback_fn("* starting instance...")
3822
      if not self.rpc.call_instance_start(pnode_name, iobj, None):
3823
        raise errors.OpExecError("Could not start instance")
3824

    
3825

    
3826
class LUConnectConsole(NoHooksLU):
3827
  """Connect to an instance's console.
3828

3829
  This is somewhat special in that it returns the command line that
3830
  you need to run on the master node in order to connect to the
3831
  console.
3832

3833
  """
3834
  _OP_REQP = ["instance_name"]
3835
  REQ_BGL = False
3836

    
3837
  def ExpandNames(self):
3838
    self._ExpandAndLockInstance()
3839

    
3840
  def CheckPrereq(self):
3841
    """Check prerequisites.
3842

3843
    This checks that the instance is in the cluster.
3844

3845
    """
3846
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3847
    assert self.instance is not None, \
3848
      "Cannot retrieve locked instance %s" % self.op.instance_name
3849

    
3850
  def Exec(self, feedback_fn):
3851
    """Connect to the console of an instance
3852

3853
    """
3854
    instance = self.instance
3855
    node = instance.primary_node
3856

    
3857
    node_insts = self.rpc.call_instance_list([node],
3858
                                             [instance.hypervisor])[node]
3859
    if node_insts is False:
3860
      raise errors.OpExecError("Can't connect to node %s." % node)
3861

    
3862
    if instance.name not in node_insts:
3863
      raise errors.OpExecError("Instance %s is not running." % instance.name)
3864

    
3865
    logging.debug("Connecting to console of %s on %s", instance.name, node)
3866

    
3867
    hyper = hypervisor.GetHypervisor(instance.hypervisor)
3868
    console_cmd = hyper.GetShellCommandForConsole(instance)
3869

    
3870
    # build ssh cmdline
3871
    return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
3872

    
3873

    
3874
class LUReplaceDisks(LogicalUnit):
3875
  """Replace the disks of an instance.
3876

3877
  """
3878
  HPATH = "mirrors-replace"
3879
  HTYPE = constants.HTYPE_INSTANCE
3880
  _OP_REQP = ["instance_name", "mode", "disks"]
3881
  REQ_BGL = False
3882

    
3883
  def ExpandNames(self):
3884
    self._ExpandAndLockInstance()
3885

    
3886
    if not hasattr(self.op, "remote_node"):
3887
      self.op.remote_node = None
3888

    
3889
    ia_name = getattr(self.op, "iallocator", None)
3890
    if ia_name is not None:
3891
      if self.op.remote_node is not None:
3892
        raise errors.OpPrereqError("Give either the iallocator or the new"
3893
                                   " secondary, not both")
3894
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3895
    elif self.op.remote_node is not None:
3896
      remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
3897
      if remote_node is None:
3898
        raise errors.OpPrereqError("Node '%s' not known" %
3899
                                   self.op.remote_node)
3900
      self.op.remote_node = remote_node
3901
      self.needed_locks[locking.LEVEL_NODE] = [remote_node]
3902
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
3903
    else:
3904
      self.needed_locks[locking.LEVEL_NODE] = []
3905
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3906

    
3907
  def DeclareLocks(self, level):
3908
    # If we're not already locking all nodes in the set we have to declare the
3909
    # instance's primary/secondary nodes.
3910
    if (level == locking.LEVEL_NODE and
3911
        self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
3912
      self._LockInstancesNodes()
3913

    
3914
  def _RunAllocator(self):
3915
    """Compute a new secondary node using an IAllocator.
3916

3917
    """
3918
    ial = IAllocator(self,
3919
                     mode=constants.IALLOCATOR_MODE_RELOC,
3920
                     name=self.op.instance_name,
3921
                     relocate_from=[self.sec_node])
3922

    
3923
    ial.Run(self.op.iallocator)
3924

    
3925
    if not ial.success:
3926
      raise errors.OpPrereqError("Can't compute nodes using"
3927
                                 " iallocator '%s': %s" % (self.op.iallocator,
3928
                                                           ial.info))
3929
    if len(ial.nodes) != ial.required_nodes:
3930
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3931
                                 " of nodes (%s), required %s" %
3932
                                 (len(ial.nodes), ial.required_nodes))
3933
    self.op.remote_node = ial.nodes[0]
3934
    self.LogInfo("Selected new secondary for the instance: %s",
3935
                 self.op.remote_node)
3936

    
3937
  def BuildHooksEnv(self):
3938
    """Build hooks env.
3939

3940
    This runs on the master, the primary and all the secondaries.
3941

3942
    """
3943
    env = {
3944
      "MODE": self.op.mode,
3945
      "NEW_SECONDARY": self.op.remote_node,
3946
      "OLD_SECONDARY": self.instance.secondary_nodes[0],
3947
      }
3948
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3949
    nl = [
3950
      self.cfg.GetMasterNode(),
3951
      self.instance.primary_node,
3952
      ]
3953
    if self.op.remote_node is not None:
3954
      nl.append(self.op.remote_node)
3955
    return env, nl, nl
3956

    
3957
  def CheckPrereq(self):
3958
    """Check prerequisites.
3959

3960
    This checks that the instance is in the cluster.
3961

3962
    """
3963
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3964
    assert instance is not None, \
3965
      "Cannot retrieve locked instance %s" % self.op.instance_name
3966
    self.instance = instance
3967

    
3968
    if instance.disk_template not in constants.DTS_NET_MIRROR:
3969
      raise errors.OpPrereqError("Instance's disk layout is not"
3970
                                 " network mirrored.")
3971

    
3972
    if len(instance.secondary_nodes) != 1:
3973
      raise errors.OpPrereqError("The instance has a strange layout,"
3974
                                 " expected one secondary but found %d" %
3975
                                 len(instance.secondary_nodes))
3976

    
3977
    self.sec_node = instance.secondary_nodes[0]
3978

    
3979
    ia_name = getattr(self.op, "iallocator", None)
3980
    if ia_name is not None:
3981
      self._RunAllocator()
3982

    
3983
    remote_node = self.op.remote_node
3984
    if remote_node is not None:
3985
      self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3986
      assert self.remote_node_info is not None, \
3987
        "Cannot retrieve locked node %s" % remote_node
3988
    else:
3989
      self.remote_node_info = None
3990
    if remote_node == instance.primary_node:
3991
      raise errors.OpPrereqError("The specified node is the primary node of"
3992
                                 " the instance.")
3993
    elif remote_node == self.sec_node:
3994
      if self.op.mode == constants.REPLACE_DISK_SEC:
3995
        # this is for DRBD8, where we can't execute the same mode of
3996
        # replacement as for drbd7 (no different port allocated)
3997
        raise errors.OpPrereqError("Same secondary given, cannot execute"
3998
                                   " replacement")
3999
    if instance.disk_template == constants.DT_DRBD8:
4000
      if (self.op.mode == constants.REPLACE_DISK_ALL and
4001
          remote_node is not None):
4002
        # switch to replace secondary mode
4003
        self.op.mode = constants.REPLACE_DISK_SEC
4004

    
4005
      if self.op.mode == constants.REPLACE_DISK_ALL:
4006
        raise errors.OpPrereqError("Template 'drbd' only allows primary or"
4007
                                   " secondary disk replacement, not"
4008
                                   " both at once")
4009
      elif self.op.mode == constants.REPLACE_DISK_PRI:
4010
        if remote_node is not None:
4011
          raise errors.OpPrereqError("Template 'drbd' does not allow changing"
4012
                                     " the secondary while doing a primary"
4013
                                     " node disk replacement")
4014
        self.tgt_node = instance.primary_node
4015
        self.oth_node = instance.secondary_nodes[0]
4016
      elif self.op.mode == constants.REPLACE_DISK_SEC:
4017
        self.new_node = remote_node # this can be None, in which case
4018
                                    # we don't change the secondary
4019
        self.tgt_node = instance.secondary_nodes[0]
4020
        self.oth_node = instance.primary_node
4021
      else:
4022
        raise errors.ProgrammerError("Unhandled disk replace mode")
4023

    
4024
    if not self.op.disks:
4025
      self.op.disks = range(len(instance.disks))
4026

    
4027
    for disk_idx in self.op.disks:
4028
      instance.FindDisk(disk_idx)
4029

    
4030
  def _ExecD8DiskOnly(self, feedback_fn):
4031
    """Replace a disk on the primary or secondary for dbrd8.
4032

4033
    The algorithm for replace is quite complicated:
4034

4035
      1. for each disk to be replaced:
4036

4037
        1. create new LVs on the target node with unique names
4038
        1. detach old LVs from the drbd device
4039
        1. rename old LVs to name_replaced.<time_t>
4040
        1. rename new LVs to old LVs
4041
        1. attach the new LVs (with the old names now) to the drbd device
4042

4043
      1. wait for sync across all devices
4044

4045
      1. for each modified disk:
4046

4047
        1. remove old LVs (which have the name name_replaces.<time_t>)
4048

4049
    Failures are not very well handled.
4050

4051
    """
4052
    steps_total = 6
4053
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
4054
    instance = self.instance
4055
    iv_names = {}
4056
    vgname = self.cfg.GetVGName()
4057
    # start of work
4058
    cfg = self.cfg
4059
    tgt_node = self.tgt_node
4060
    oth_node = self.oth_node
4061

    
4062
    # Step: check device activation
4063
    self.proc.LogStep(1, steps_total, "check device existence")
4064
    info("checking volume groups")
4065
    my_vg = cfg.GetVGName()
4066
    results = self.rpc.call_vg_list([oth_node, tgt_node])
4067
    if not results:
4068
      raise errors.OpExecError("Can't list volume groups on the nodes")
4069
    for node in oth_node, tgt_node:
4070
      res = results.get(node, False)
4071
      if not res or my_vg not in res:
4072
        raise errors.OpExecError("Volume group '%s' not found on %s" %
4073
                                 (my_vg, node))
4074
    for idx, dev in enumerate(instance.disks):
4075
      if idx not in self.op.disks:
4076
        continue
4077
      for node in tgt_node, oth_node:
4078
        info("checking disk/%d on %s" % (idx, node))
4079
        cfg.SetDiskID(dev, node)
4080
        if not self.rpc.call_blockdev_find(node, dev):
4081
          raise errors.OpExecError("Can't find disk/%d on node %s" %
4082
                                   (idx, node))
4083

    
4084
    # Step: check other node consistency
4085
    self.proc.LogStep(2, steps_total, "check peer consistency")
4086
    for idx, dev in enumerate(instance.disks):
4087
      if idx not in self.op.disks:
4088
        continue
4089
      info("checking disk/%d consistency on %s" % (idx, oth_node))
4090
      if not _CheckDiskConsistency(self, dev, oth_node,
4091
                                   oth_node==instance.primary_node):
4092
        raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
4093
                                 " to replace disks on this node (%s)" %
4094
                                 (oth_node, tgt_node))
4095

    
4096
    # Step: create new storage
4097
    self.proc.LogStep(3, steps_total, "allocate new storage")
4098
    for idx, dev in enumerate(instance.disks):
4099
      if idx not in self.op.disks:
4100
        continue
4101
      size = dev.size
4102
      cfg.SetDiskID(dev, tgt_node)
4103
      lv_names = [".disk%d_%s" % (idx, suf)
4104
                  for suf in ["data", "meta"]]
4105
      names = _GenerateUniqueNames(self, lv_names)
4106
      lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
4107
                             logical_id=(vgname, names[0]))
4108
      lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
4109
                             logical_id=(vgname, names[1]))
4110
      new_lvs = [lv_data, lv_meta]
4111
      old_lvs = dev.children
4112
      iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
4113
      info("creating new local storage on %s for %s" %
4114
           (tgt_node, dev.iv_name))
4115
      # since we *always* want to create this LV, we use the
4116
      # _Create...OnPrimary (which forces the creation), even if we
4117
      # are talking about the secondary node
4118
      for new_lv in new_lvs:
4119
        if not _CreateBlockDevOnPrimary(self, tgt_node, instance, new_lv,
4120
                                        _GetInstanceInfoText(instance)):
4121
          raise errors.OpExecError("Failed to create new LV named '%s' on"
4122
                                   " node '%s'" %
4123
                                   (new_lv.logical_id[1], tgt_node))
4124

    
4125
    # Step: for each lv, detach+rename*2+attach
4126
    self.proc.LogStep(4, steps_total, "change drbd configuration")
4127
    for dev, old_lvs, new_lvs in iv_names.itervalues():
4128
      info("detaching %s drbd from local storage" % dev.iv_name)
4129
      if not self.rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
4130
        raise errors.OpExecError("Can't detach drbd from local storage on node"
4131
                                 " %s for device %s" % (tgt_node, dev.iv_name))
4132
      #dev.children = []
4133
      #cfg.Update(instance)
4134

    
4135
      # ok, we created the new LVs, so now we know we have the needed
4136
      # storage; as such, we proceed on the target node to rename
4137
      # old_lv to _old, and new_lv to old_lv; note that we rename LVs
4138
      # using the assumption that logical_id == physical_id (which in
4139
      # turn is the unique_id on that node)
4140

    
4141
      # FIXME(iustin): use a better name for the replaced LVs
4142
      temp_suffix = int(time.time())
4143
      ren_fn = lambda d, suff: (d.physical_id[0],
4144
                                d.physical_id[1] + "_replaced-%s" % suff)
4145
      # build the rename list based on what LVs exist on the node
4146
      rlist = []
4147
      for to_ren in old_lvs:
4148
        find_res = self.rpc.call_blockdev_find(tgt_node, to_ren)
4149
        if find_res is not None: # device exists
4150
          rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
4151

    
4152
      info("renaming the old LVs on the target node")
4153
      if not self.rpc.call_blockdev_rename(tgt_node, rlist):
4154
        raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
4155
      # now we rename the new LVs to the old LVs
4156
      info("renaming the new LVs on the target node")
4157
      rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
4158
      if not self.rpc.call_blockdev_rename(tgt_node, rlist):
4159
        raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
4160

    
4161
      for old, new in zip(old_lvs, new_lvs):
4162
        new.logical_id = old.logical_id
4163
        cfg.SetDiskID(new, tgt_node)
4164

    
4165
      for disk in old_lvs:
4166
        disk.logical_id = ren_fn(disk, temp_suffix)
4167
        cfg.SetDiskID(disk, tgt_node)
4168

    
4169
      # now that the new lvs have the old name, we can add them to the device
4170
      info("adding new mirror component on %s" % tgt_node)
4171
      if not self.rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
4172
        for new_lv in new_lvs:
4173
          if not self.rpc.call_blockdev_remove(tgt_node, new_lv):
4174
            warning("Can't rollback device %s", hint="manually cleanup unused"
4175
                    " logical volumes")
4176
        raise errors.OpExecError("Can't add local storage to drbd")
4177

    
4178
      dev.children = new_lvs
4179
      cfg.Update(instance)
4180

    
4181
    # Step: wait for sync
4182

    
4183
    # this can fail as the old devices are degraded and _WaitForSync
4184
    # does a combined result over all disks, so we don't check its
4185
    # return value
4186
    self.proc.LogStep(5, steps_total, "sync devices")
4187
    _WaitForSync(self, instance, unlock=True)
4188

    
4189
    # so check manually all the devices
4190
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
4191
      cfg.SetDiskID(dev, instance.primary_node)
4192
      is_degr = self.rpc.call_blockdev_find(instance.primary_node, dev)[5]
4193
      if is_degr:
4194
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
4195

    
4196
    # Step: remove old storage
4197
    self.proc.LogStep(6, steps_total, "removing old storage")
4198
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
4199
      info("remove logical volumes for %s" % name)
4200
      for lv in old_lvs:
4201
        cfg.SetDiskID(lv, tgt_node)
4202
        if not self.rpc.call_blockdev_remove(tgt_node, lv):
4203
          warning("Can't remove old LV", hint="manually remove unused LVs")
4204
          continue
4205

    
4206
  def _ExecD8Secondary(self, feedback_fn):
4207
    """Replace the secondary node for drbd8.
4208

4209
    The algorithm for replace is quite complicated:
4210
      - for all disks of the instance:
4211
        - create new LVs on the new node with same names
4212
        - shutdown the drbd device on the old secondary
4213
        - disconnect the drbd network on the primary
4214
        - create the drbd device on the new secondary
4215
        - network attach the drbd on the primary, using an artifice:
4216
          the drbd code for Attach() will connect to the network if it
4217
          finds a device which is connected to the good local disks but
4218
          not network enabled
4219
      - wait for sync across all devices
4220
      - remove all disks from the old secondary
4221

4222
    Failures are not very well handled.
4223

4224
    """
4225
    steps_total = 6
4226
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
4227
    instance = self.instance
4228
    iv_names = {}
4229
    vgname = self.cfg.GetVGName()
4230
    # start of work
4231
    cfg = self.cfg
4232
    old_node = self.tgt_node
4233
    new_node = self.new_node
4234
    pri_node = instance.primary_node
4235

    
4236
    # Step: check device activation
4237
    self.proc.LogStep(1, steps_total, "check device existence")
4238
    info("checking volume groups")
4239
    my_vg = cfg.GetVGName()
4240
    results = self.rpc.call_vg_list([pri_node, new_node])
4241
    if not results:
4242
      raise errors.OpExecError("Can't list volume groups on the nodes")
4243
    for node in pri_node, new_node:
4244
      res = results.get(node, False)
4245
      if not res or my_vg not in res:
4246
        raise errors.OpExecError("Volume group '%s' not found on %s" %
4247
                                 (my_vg, node))
4248
    for idx, dev in enumerate(instance.disks):
4249
      if idx not in self.op.disks:
4250
        continue
4251
      info("checking disk/%d on %s" % (idx, pri_node))
4252
      cfg.SetDiskID(dev, pri_node)
4253
      if not self.rpc.call_blockdev_find(pri_node, dev):
4254
        raise errors.OpExecError("Can't find disk/%d on node %s" %
4255
                                 (idx, pri_node))
4256

    
4257
    # Step: check other node consistency
4258
    self.proc.LogStep(2, steps_total, "check peer consistency")
4259
    for idx, dev in enumerate(instance.disks):
4260
      if idx not in self.op.disks:
4261
        continue
4262
      info("checking disk/%d consistency on %s" % (idx, pri_node))
4263
      if not _CheckDiskConsistency(self, dev, pri_node, True, ldisk=True):
4264
        raise errors.OpExecError("Primary node (%s) has degraded storage,"
4265
                                 " unsafe to replace the secondary" %
4266
                                 pri_node)
4267

    
4268
    # Step: create new storage
4269
    self.proc.LogStep(3, steps_total, "allocate new storage")
4270
    for idx, dev in enumerate(instance.disks):
4271
      size = dev.size
4272
      info("adding new local storage on %s for disk/%d" %
4273
           (new_node, idx))
4274
      # since we *always* want to create this LV, we use the
4275
      # _Create...OnPrimary (which forces the creation), even if we
4276
      # are talking about the secondary node
4277
      for new_lv in dev.children:
4278
        if not _CreateBlockDevOnPrimary(self, new_node, instance, new_lv,
4279
                                        _GetInstanceInfoText(instance)):
4280
          raise errors.OpExecError("Failed to create new LV named '%s' on"
4281
                                   " node '%s'" %
4282
                                   (new_lv.logical_id[1], new_node))
4283

    
4284
    # Step 4: dbrd minors and drbd setups changes
4285
    # after this, we must manually remove the drbd minors on both the
4286
    # error and the success paths
4287
    minors = cfg.AllocateDRBDMinor([new_node for dev in instance.disks],
4288
                                   instance.name)
4289
    logging.debug("Allocated minors %s" % (minors,))
4290
    self.proc.LogStep(4, steps_total, "changing drbd configuration")
4291
    for idx, (dev, new_minor) in enumerate(zip(instance.disks, minors)):
4292
      size = dev.size
4293
      info("activating a new drbd on %s for disk/%d" % (new_node, idx))
4294
      # create new devices on new_node
4295
      if pri_node == dev.logical_id[0]:
4296
        new_logical_id = (pri_node, new_node,
4297
                          dev.logical_id[2], dev.logical_id[3], new_minor,
4298
                          dev.logical_id[5])
4299
      else:
4300
        new_logical_id = (new_node, pri_node,
4301
                          dev.logical_id[2], new_minor, dev.logical_id[4],
4302
                          dev.logical_id[5])
4303
      iv_names[idx] = (dev, dev.children, new_logical_id)
4304
      logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
4305
                    new_logical_id)
4306
      new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
4307
                              logical_id=new_logical_id,
4308
                              children=dev.children)
4309
      if not _CreateBlockDevOnSecondary(self, new_node, instance,
4310
                                        new_drbd, False,
4311
                                        _GetInstanceInfoText(instance)):
4312
        self.cfg.ReleaseDRBDMinors(instance.name)
4313
        raise errors.OpExecError("Failed to create new DRBD on"
4314
                                 " node '%s'" % new_node)
4315

    
4316
    for idx, dev in enumerate(instance.disks):
4317
      # we have new devices, shutdown the drbd on the old secondary
4318
      info("shutting down drbd for disk/%d on old node" % idx)
4319
      cfg.SetDiskID(dev, old_node)
4320
      if not self.rpc.call_blockdev_shutdown(old_node, dev):
4321
        warning("Failed to shutdown drbd for disk/%d on old node" % idx,
4322
                hint="Please cleanup this device manually as soon as possible")
4323

    
4324
    info("detaching primary drbds from the network (=> standalone)")
4325
    done = 0
4326
    for idx, dev in enumerate(instance.disks):
4327
      cfg.SetDiskID(dev, pri_node)
4328
      # set the network part of the physical (unique in bdev terms) id
4329
      # to None, meaning detach from network
4330
      dev.physical_id = (None, None, None, None) + dev.physical_id[4:]
4331
      # and 'find' the device, which will 'fix' it to match the
4332
      # standalone state
4333
      if self.rpc.call_blockdev_find(pri_node, dev):
4334
        done += 1
4335
      else:
4336
        warning("Failed to detach drbd disk/%d from network, unusual case" %
4337
                idx)
4338

    
4339
    if not done:
4340
      # no detaches succeeded (very unlikely)
4341
      self.cfg.ReleaseDRBDMinors(instance.name)
4342
      raise errors.OpExecError("Can't detach at least one DRBD from old node")
4343

    
4344
    # if we managed to detach at least one, we update all the disks of
4345
    # the instance to point to the new secondary
4346
    info("updating instance configuration")
4347
    for dev, _, new_logical_id in iv_names.itervalues():
4348
      dev.logical_id = new_logical_id
4349
      cfg.SetDiskID(dev, pri_node)
4350
    cfg.Update(instance)
4351
    # we can remove now the temp minors as now the new values are
4352
    # written to the config file (and therefore stable)
4353
    self.cfg.ReleaseDRBDMinors(instance.name)
4354

    
4355
    # and now perform the drbd attach
4356
    info("attaching primary drbds to new secondary (standalone => connected)")
4357
    failures = []
4358
    for idx, dev in enumerate(instance.disks):
4359
      info("attaching primary drbd for disk/%d to new secondary node" % idx)
4360
      # since the attach is smart, it's enough to 'find' the device,
4361
      # it will automatically activate the network, if the physical_id
4362
      # is correct
4363
      cfg.SetDiskID(dev, pri_node)
4364
      logging.debug("Disk to attach: %s", dev)
4365
      if not self.rpc.call_blockdev_find(pri_node, dev):
4366
        warning("can't attach drbd disk/%d to new secondary!" % idx,
4367
                "please do a gnt-instance info to see the status of disks")
4368

    
4369
    # this can fail as the old devices are degraded and _WaitForSync
4370
    # does a combined result over all disks, so we don't check its
4371
    # return value
4372
    self.proc.LogStep(5, steps_total, "sync devices")
4373
    _WaitForSync(self, instance, unlock=True)
4374

    
4375
    # so check manually all the devices
4376
    for idx, (dev, old_lvs, _) in iv_names.iteritems():
4377
      cfg.SetDiskID(dev, pri_node)
4378
      is_degr = self.rpc.call_blockdev_find(pri_node, dev)[5]
4379
      if is_degr:
4380
        raise errors.OpExecError("DRBD device disk/%d is degraded!" % idx)
4381

    
4382
    self.proc.LogStep(6, steps_total, "removing old storage")
4383
    for idx, (dev, old_lvs, _) in iv_names.iteritems():
4384
      info("remove logical volumes for disk/%d" % idx)
4385
      for lv in old_lvs:
4386
        cfg.SetDiskID(lv, old_node)
4387
        if not self.rpc.call_blockdev_remove(old_node, lv):
4388
          warning("Can't remove LV on old secondary",
4389
                  hint="Cleanup stale volumes by hand")
4390

    
4391
  def Exec(self, feedback_fn):
4392
    """Execute disk replacement.
4393

4394
    This dispatches the disk replacement to the appropriate handler.
4395

4396
    """
4397
    instance = self.instance
4398

    
4399
    # Activate the instance disks if we're replacing them on a down instance
4400
    if instance.status == "down":
4401
      _StartInstanceDisks(self, instance, True)
4402

    
4403
    if instance.disk_template == constants.DT_DRBD8:
4404
      if self.op.remote_node is None:
4405
        fn = self._ExecD8DiskOnly
4406
      else:
4407
        fn = self._ExecD8Secondary
4408
    else:
4409
      raise errors.ProgrammerError("Unhandled disk replacement case")
4410

    
4411
    ret = fn(feedback_fn)
4412

    
4413
    # Deactivate the instance disks if we're replacing them on a down instance
4414
    if instance.status == "down":
4415
      _SafeShutdownInstanceDisks(self, instance)
4416

    
4417
    return ret
4418

    
4419

    
4420
class LUGrowDisk(LogicalUnit):
4421
  """Grow a disk of an instance.
4422

4423
  """
4424
  HPATH = "disk-grow"
4425
  HTYPE = constants.HTYPE_INSTANCE
4426
  _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
4427
  REQ_BGL = False
4428

    
4429
  def ExpandNames(self):
4430
    self._ExpandAndLockInstance()
4431
    self.needed_locks[locking.LEVEL_NODE] = []
4432
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4433

    
4434
  def DeclareLocks(self, level):
4435
    if level == locking.LEVEL_NODE:
4436
      self._LockInstancesNodes()
4437

    
4438
  def BuildHooksEnv(self):
4439
    """Build hooks env.
4440

4441
    This runs on the master, the primary and all the secondaries.
4442

4443
    """
4444
    env = {
4445
      "DISK": self.op.disk,
4446
      "AMOUNT": self.op.amount,
4447
      }
4448
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
4449
    nl = [
4450
      self.cfg.GetMasterNode(),
4451
      self.instance.primary_node,
4452
      ]
4453
    return env, nl, nl
4454

    
4455
  def CheckPrereq(self):
4456
    """Check prerequisites.
4457

4458
    This checks that the instance is in the cluster.
4459

4460
    """
4461
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4462
    assert instance is not None, \
4463
      "Cannot retrieve locked instance %s" % self.op.instance_name
4464

    
4465
    self.instance = instance
4466

    
4467
    if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
4468
      raise errors.OpPrereqError("Instance's disk layout does not support"
4469
                                 " growing.")
4470

    
4471
    self.disk = instance.FindDisk(self.op.disk)
4472

    
4473
    nodenames = [instance.primary_node] + list(instance.secondary_nodes)
4474
    nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
4475
                                       instance.hypervisor)
4476
    for node in nodenames:
4477
      info = nodeinfo.get(node, None)
4478
      if not info:
4479
        raise errors.OpPrereqError("Cannot get current information"
4480
                                   " from node '%s'" % node)
4481
      vg_free = info.get('vg_free', None)
4482
      if not isinstance(vg_free, int):
4483
        raise errors.OpPrereqError("Can't compute free disk space on"
4484
                                   " node %s" % node)
4485
      if self.op.amount > info['vg_free']:
4486
        raise errors.OpPrereqError("Not enough disk space on target node %s:"
4487
                                   " %d MiB available, %d MiB required" %
4488
                                   (node, info['vg_free'], self.op.amount))
4489

    
4490
  def Exec(self, feedback_fn):
4491
    """Execute disk grow.
4492

4493
    """
4494
    instance = self.instance
4495
    disk = self.disk
4496
    for node in (instance.secondary_nodes + (instance.primary_node,)):
4497
      self.cfg.SetDiskID(disk, node)
4498
      result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
4499
      if (not result or not isinstance(result, (list, tuple)) or
4500
          len(result) != 2):
4501
        raise errors.OpExecError("grow request failed to node %s" % node)
4502
      elif not result[0]:
4503
        raise errors.OpExecError("grow request failed to node %s: %s" %
4504
                                 (node, result[1]))
4505
    disk.RecordGrow(self.op.amount)
4506
    self.cfg.Update(instance)
4507
    if self.op.wait_for_sync:
4508
      disk_abort = not _WaitForSync(self, instance)
4509
      if disk_abort:
4510
        self.proc.LogWarning("Warning: disk sync-ing has not returned a good"
4511
                             " status.\nPlease check the instance.")
4512

    
4513

    
4514
class LUQueryInstanceData(NoHooksLU):
4515
  """Query runtime instance data.
4516

4517
  """
4518
  _OP_REQP = ["instances", "static"]
4519
  REQ_BGL = False
4520

    
4521
  def ExpandNames(self):
4522
    self.needed_locks = {}
4523
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
4524

    
4525
    if not isinstance(self.op.instances, list):
4526
      raise errors.OpPrereqError("Invalid argument type 'instances'")
4527

    
4528
    if self.op.instances:
4529
      self.wanted_names = []
4530
      for name in self.op.instances:
4531
        full_name = self.cfg.ExpandInstanceName(name)
4532
        if full_name is None:
4533
          raise errors.OpPrereqError("Instance '%s' not known" %
4534
                                     self.op.instance_name)
4535
        self.wanted_names.append(full_name)
4536
      self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
4537
    else:
4538
      self.wanted_names = None
4539
      self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
4540

    
4541
    self.needed_locks[locking.LEVEL_NODE] = []
4542
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4543

    
4544
  def DeclareLocks(self, level):
4545
    if level == locking.LEVEL_NODE:
4546
      self._LockInstancesNodes()
4547

    
4548
  def CheckPrereq(self):
4549
    """Check prerequisites.
4550

4551
    This only checks the optional instance list against the existing names.
4552

4553
    """
4554
    if self.wanted_names is None:
4555
      self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
4556

    
4557
    self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
4558
                             in self.wanted_names]
4559
    return
4560

    
4561
  def _ComputeDiskStatus(self, instance, snode, dev):
4562
    """Compute block device status.
4563

4564
    """
4565
    static = self.op.static
4566
    if not static:
4567
      self.cfg.SetDiskID(dev, instance.primary_node)
4568
      dev_pstatus = self.rpc.call_blockdev_find(instance.primary_node, dev)
4569
    else:
4570
      dev_pstatus = None
4571

    
4572
    if dev.dev_type in constants.LDS_DRBD:
4573
      # we change the snode then (otherwise we use the one passed in)
4574
      if dev.logical_id[0] == instance.primary_node:
4575
        snode = dev.logical_id[1]
4576
      else:
4577
        snode = dev.logical_id[0]
4578

    
4579
    if snode and not static:
4580
      self.cfg.SetDiskID(dev, snode)
4581
      dev_sstatus = self.rpc.call_blockdev_find(snode, dev)
4582
    else:
4583
      dev_sstatus = None
4584

    
4585
    if dev.children:
4586
      dev_children = [self._ComputeDiskStatus(instance, snode, child)
4587
                      for child in dev.children]
4588
    else:
4589
      dev_children = []
4590

    
4591
    data = {
4592
      "iv_name": dev.iv_name,
4593
      "dev_type": dev.dev_type,
4594
      "logical_id": dev.logical_id,
4595
      "physical_id": dev.physical_id,
4596
      "pstatus": dev_pstatus,
4597
      "sstatus": dev_sstatus,
4598
      "children": dev_children,
4599
      }
4600

    
4601
    return data
4602

    
4603
  def Exec(self, feedback_fn):
4604
    """Gather and return data"""
4605
    result = {}
4606

    
4607
    cluster = self.cfg.GetClusterInfo()
4608

    
4609
    for instance in self.wanted_instances:
4610
      if not self.op.static:
4611
        remote_info = self.rpc.call_instance_info(instance.primary_node,
4612
                                                  instance.name,
4613
                                                  instance.hypervisor)
4614
        if remote_info and "state" in remote_info:
4615
          remote_state = "up"
4616
        else:
4617
          remote_state = "down"
4618
      else:
4619
        remote_state = None
4620
      if instance.status == "down":
4621
        config_state = "down"
4622
      else:
4623
        config_state = "up"
4624

    
4625
      disks = [self._ComputeDiskStatus(instance, None, device)
4626
               for device in instance.disks]
4627

    
4628
      idict = {
4629
        "name": instance.name,
4630
        "config_state": config_state,
4631
        "run_state": remote_state,
4632
        "pnode": instance.primary_node,
4633
        "snodes": instance.secondary_nodes,
4634
        "os": instance.os,
4635
        "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
4636
        "disks": disks,
4637
        "hypervisor": instance.hypervisor,
4638
        "network_port": instance.network_port,
4639
        "hv_instance": instance.hvparams,
4640
        "hv_actual": cluster.FillHV(instance),
4641
        "be_instance": instance.beparams,
4642
        "be_actual": cluster.FillBE(instance),
4643
        }
4644

    
4645
      result[instance.name] = idict
4646

    
4647
    return result
4648

    
4649

    
4650
class LUSetInstanceParams(LogicalUnit):
4651
  """Modifies an instances's parameters.
4652

4653
  """
4654
  HPATH = "instance-modify"
4655
  HTYPE = constants.HTYPE_INSTANCE
4656
  _OP_REQP = ["instance_name", "hvparams"]
4657
  REQ_BGL = False
4658

    
4659
  def ExpandNames(self):
4660
    self._ExpandAndLockInstance()
4661
    self.needed_locks[locking.LEVEL_NODE] = []
4662
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4663

    
4664

    
4665
  def DeclareLocks(self, level):
4666
    if level == locking.LEVEL_NODE:
4667
      self._LockInstancesNodes()
4668

    
4669
  def BuildHooksEnv(self):
4670
    """Build hooks env.
4671

4672
    This runs on the master, primary and secondaries.
4673

4674
    """
4675
    args = dict()
4676
    if constants.BE_MEMORY in self.be_new:
4677
      args['memory'] = self.be_new[constants.BE_MEMORY]
4678
    if constants.BE_VCPUS in self.be_new:
4679
      args['vcpus'] = self.be_new[constants.BE_VCPUS]
4680
    if self.do_ip or self.do_bridge or self.mac:
4681
      if self.do_ip:
4682
        ip = self.ip
4683
      else:
4684
        ip = self.instance.nics[0].ip
4685
      if self.bridge:
4686
        bridge = self.bridge
4687
      else:
4688
        bridge = self.instance.nics[0].bridge
4689
      if self.mac:
4690
        mac = self.mac
4691
      else:
4692
        mac = self.instance.nics[0].mac
4693
      args['nics'] = [(ip, bridge, mac)]
4694
    env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
4695
    nl = [self.cfg.GetMasterNode(),
4696
          self.instance.primary_node] + list(self.instance.secondary_nodes)
4697
    return env, nl, nl
4698

    
4699
  def CheckPrereq(self):
4700
    """Check prerequisites.
4701

4702
    This only checks the instance list against the existing names.
4703

4704
    """
4705
    # FIXME: all the parameters could be checked before, in ExpandNames, or in
4706
    # a separate CheckArguments function, if we implement one, so the operation
4707
    # can be aborted without waiting for any lock, should it have an error...
4708
    self.ip = getattr(self.op, "ip", None)
4709
    self.mac = getattr(self.op, "mac", None)
4710
    self.bridge = getattr(self.op, "bridge", None)
4711
    self.kernel_path = getattr(self.op, "kernel_path", None)
4712
    self.initrd_path = getattr(self.op, "initrd_path", None)
4713
    self.force = getattr(self.op, "force", None)
4714
    all_parms = [self.ip, self.bridge, self.mac]
4715
    if (all_parms.count(None) == len(all_parms) and
4716
        not self.op.hvparams and
4717
        not self.op.beparams):
4718
      raise errors.OpPrereqError("No changes submitted")
4719
    for item in (constants.BE_MEMORY, constants.BE_VCPUS):
4720
      val = self.op.beparams.get(item, None)
4721
      if val is not None:
4722
        try:
4723
          val = int(val)
4724
        except ValueError, err:
4725
          raise errors.OpPrereqError("Invalid %s size: %s" % (item, str(err)))
4726
        self.op.beparams[item] = val
4727
    if self.ip is not None:
4728
      self.do_ip = True
4729
      if self.ip.lower() == "none":
4730
        self.ip = None
4731
      else:
4732
        if not utils.IsValidIP(self.ip):
4733
          raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
4734
    else:
4735
      self.do_ip = False
4736
    self.do_bridge = (self.bridge is not None)
4737
    if self.mac is not None:
4738
      if self.cfg.IsMacInUse(self.mac):
4739
        raise errors.OpPrereqError('MAC address %s already in use in cluster' %
4740
                                   self.mac)
4741
      if not utils.IsValidMac(self.mac):
4742
        raise errors.OpPrereqError('Invalid MAC address %s' % self.mac)
4743

    
4744
    # checking the new params on the primary/secondary nodes
4745

    
4746
    instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4747
    assert self.instance is not None, \
4748
      "Cannot retrieve locked instance %s" % self.op.instance_name
4749
    pnode = self.instance.primary_node
4750
    nodelist = [pnode]
4751
    nodelist.extend(instance.secondary_nodes)
4752

    
4753
    # hvparams processing
4754
    if self.op.hvparams:
4755
      i_hvdict = copy.deepcopy(instance.hvparams)
4756
      for key, val in self.op.hvparams.iteritems():
4757
        if val is None:
4758
          try:
4759
            del i_hvdict[key]
4760
          except KeyError:
4761
            pass
4762
        else:
4763
          i_hvdict[key] = val
4764
      cluster = self.cfg.GetClusterInfo()
4765
      hv_new = cluster.FillDict(cluster.hvparams[instance.hypervisor],
4766
                                i_hvdict)
4767
      # local check
4768
      hypervisor.GetHypervisor(
4769
        instance.hypervisor).CheckParameterSyntax(hv_new)
4770
      _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
4771
      self.hv_new = hv_new # the new actual values
4772
      self.hv_inst = i_hvdict # the new dict (without defaults)
4773
    else:
4774
      self.hv_new = self.hv_inst = {}
4775

    
4776
    # beparams processing
4777
    if self.op.beparams:
4778
      i_bedict = copy.deepcopy(instance.beparams)
4779
      for key, val in self.op.beparams.iteritems():
4780
        if val is None:
4781
          try:
4782
            del i_bedict[key]
4783
          except KeyError:
4784
            pass
4785
        else:
4786
          i_bedict[key] = val
4787
      cluster = self.cfg.GetClusterInfo()
4788
      be_new = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
4789
                                i_bedict)
4790
      self.be_new = be_new # the new actual values
4791
      self.be_inst = i_bedict # the new dict (without defaults)
4792
    else:
4793
      self.hv_new = self.hv_inst = {}
4794

    
4795
    self.warn = []
4796

    
4797
    if constants.BE_MEMORY in self.op.beparams and not self.force:
4798
      mem_check_list = [pnode]
4799
      if be_new[constants.BE_AUTO_BALANCE]:
4800
        # either we changed auto_balance to yes or it was from before
4801
        mem_check_list.extend(instance.secondary_nodes)
4802
      instance_info = self.rpc.call_instance_info(pnode, instance.name,
4803
                                                  instance.hypervisor)
4804
      nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(),
4805
                                         instance.hypervisor)
4806

    
4807
      if pnode not in nodeinfo or not isinstance(nodeinfo[pnode], dict):
4808
        # Assume the primary node is unreachable and go ahead
4809
        self.warn.append("Can't get info from primary node %s" % pnode)
4810
      else:
4811
        if instance_info:
4812
          current_mem = instance_info['memory']
4813
        else:
4814
          # Assume instance not running
4815
          # (there is a slight race condition here, but it's not very probable,
4816
          # and we have no other way to check)
4817
          current_mem = 0
4818
        miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
4819
                    nodeinfo[pnode]['memory_free'])
4820
        if miss_mem > 0:
4821
          raise errors.OpPrereqError("This change will prevent the instance"
4822
                                     " from starting, due to %d MB of memory"
4823
                                     " missing on its primary node" % miss_mem)
4824

    
4825
      if be_new[constants.BE_AUTO_BALANCE]:
4826
        for node in instance.secondary_nodes:
4827
          if node not in nodeinfo or not isinstance(nodeinfo[node], dict):
4828
            self.warn.append("Can't get info from secondary node %s" % node)
4829
          elif be_new[constants.BE_MEMORY] > nodeinfo[node]['memory_free']:
4830
            self.warn.append("Not enough memory to failover instance to"
4831
                             " secondary node %s" % node)
4832

    
4833
    return
4834

    
4835
  def Exec(self, feedback_fn):
4836
    """Modifies an instance.
4837

4838
    All parameters take effect only at the next restart of the instance.
4839
    """
4840
    # Process here the warnings from CheckPrereq, as we don't have a
4841
    # feedback_fn there.
4842
    for warn in self.warn:
4843
      feedback_fn("WARNING: %s" % warn)
4844

    
4845
    result = []
4846
    instance = self.instance
4847
    if self.do_ip:
4848
      instance.nics[0].ip = self.ip
4849
      result.append(("ip", self.ip))
4850
    if self.bridge:
4851
      instance.nics[0].bridge = self.bridge
4852
      result.append(("bridge", self.bridge))
4853
    if self.mac:
4854
      instance.nics[0].mac = self.mac
4855
      result.append(("mac", self.mac))
4856
    if self.op.hvparams:
4857
      instance.hvparams = self.hv_new
4858
      for key, val in self.op.hvparams.iteritems():
4859
        result.append(("hv/%s" % key, val))
4860
    if self.op.beparams:
4861
      instance.beparams = self.be_inst
4862
      for key, val in self.op.beparams.iteritems():
4863
        result.append(("be/%s" % key, val))
4864

    
4865
    self.cfg.Update(instance)
4866

    
4867
    return result
4868

    
4869

    
4870
class LUQueryExports(NoHooksLU):
4871
  """Query the exports list
4872

4873
  """
4874
  _OP_REQP = ['nodes']
4875
  REQ_BGL = False
4876

    
4877
  def ExpandNames(self):
4878
    self.needed_locks = {}
4879
    self.share_locks[locking.LEVEL_NODE] = 1
4880
    if not self.op.nodes:
4881
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4882
    else:
4883
      self.needed_locks[locking.LEVEL_NODE] = \
4884
        _GetWantedNodes(self, self.op.nodes)
4885

    
4886
  def CheckPrereq(self):
4887
    """Check prerequisites.
4888

4889
    """
4890
    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
4891

    
4892
  def Exec(self, feedback_fn):
4893
    """Compute the list of all the exported system images.
4894

4895
    @rtype: dict
4896
    @return: a dictionary with the structure node->(export-list)
4897
        where export-list is a list of the instances exported on
4898
        that node.
4899

4900
    """
4901
    return self.rpc.call_export_list(self.nodes)
4902

    
4903

    
4904
class LUExportInstance(LogicalUnit):
4905
  """Export an instance to an image in the cluster.
4906

4907
  """
4908
  HPATH = "instance-export"
4909
  HTYPE = constants.HTYPE_INSTANCE
4910
  _OP_REQP = ["instance_name", "target_node", "shutdown"]
4911
  REQ_BGL = False
4912

    
4913
  def ExpandNames(self):
4914
    self._ExpandAndLockInstance()
4915
    # FIXME: lock only instance primary and destination node
4916
    #
4917
    # Sad but true, for now we have do lock all nodes, as we don't know where
4918
    # the previous export might be, and and in this LU we search for it and
4919
    # remove it from its current node. In the future we could fix this by:
4920
    #  - making a tasklet to search (share-lock all), then create the new one,
4921
    #    then one to remove, after
4922
    #  - removing the removal operation altoghether
4923
    self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4924

    
4925
  def DeclareLocks(self, level):
4926
    """Last minute lock declaration."""
4927
    # All nodes are locked anyway, so nothing to do here.
4928

    
4929
  def BuildHooksEnv(self):
4930
    """Build hooks env.
4931

4932
    This will run on the master, primary node and target node.
4933

4934
    """
4935
    env = {
4936
      "EXPORT_NODE": self.op.target_node,
4937
      "EXPORT_DO_SHUTDOWN": self.op.shutdown,
4938
      }
4939
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
4940
    nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
4941
          self.op.target_node]
4942
    return env, nl, nl
4943

    
4944
  def CheckPrereq(self):
4945
    """Check prerequisites.
4946

4947
    This checks that the instance and node names are valid.
4948

4949
    """
4950
    instance_name = self.op.instance_name
4951
    self.instance = self.cfg.GetInstanceInfo(instance_name)
4952
    assert self.instance is not None, \
4953
          "Cannot retrieve locked instance %s" % self.op.instance_name
4954

    
4955
    self.dst_node = self.cfg.GetNodeInfo(
4956
      self.cfg.ExpandNodeName(self.op.target_node))
4957

    
4958
    assert self.dst_node is not None, \
4959
          "Cannot retrieve locked node %s" % self.op.target_node
4960

    
4961
    # instance disk type verification
4962
    for disk in self.instance.disks:
4963
      if disk.dev_type == constants.LD_FILE:
4964
        raise errors.OpPrereqError("Export not supported for instances with"
4965
                                   " file-based disks")
4966

    
4967
  def Exec(self, feedback_fn):
4968
    """Export an instance to an image in the cluster.
4969

4970
    """
4971
    instance = self.instance
4972
    dst_node = self.dst_node
4973
    src_node = instance.primary_node
4974
    if self.op.shutdown:
4975
      # shutdown the instance, but not the disks
4976
      if not self.rpc.call_instance_shutdown(src_node, instance):
4977
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
4978
                                 (instance.name, src_node))
4979

    
4980
    vgname = self.cfg.GetVGName()
4981

    
4982
    snap_disks = []
4983

    
4984
    try:
4985
      for disk in instance.disks:
4986
        # new_dev_name will be a snapshot of an lvm leaf of the one we passed
4987
        new_dev_name = self.rpc.call_blockdev_snapshot(src_node, disk)
4988

    
4989
        if not new_dev_name:
4990
          self.LogWarning("Could not snapshot block device %s on node %s",
4991
                          disk.logical_id[1], src_node)
4992
          snap_disks.append(False)
4993
        else:
4994
          new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
4995
                                 logical_id=(vgname, new_dev_name),
4996
                                 physical_id=(vgname, new_dev_name),
4997
                                 iv_name=disk.iv_name)
4998
          snap_disks.append(new_dev)
4999

    
5000
    finally:
5001
      if self.op.shutdown and instance.status == "up":
5002
        if not self.rpc.call_instance_start(src_node, instance, None):
5003
          _ShutdownInstanceDisks(self, instance)
5004
          raise errors.OpExecError("Could not start instance")
5005

    
5006
    # TODO: check for size
5007

    
5008
    cluster_name = self.cfg.GetClusterName()
5009
    for idx, dev in enumerate(snap_disks):
5010
      if dev:
5011
        if not self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
5012
                                             instance, cluster_name, idx):
5013
          self.LogWarning("Could not export block device %s from node %s to"
5014
                          " node %s", dev.logical_id[1], src_node,
5015
                          dst_node.name)
5016
        if not self.rpc.call_blockdev_remove(src_node, dev):
5017
          self.LogWarning("Could not remove snapshot block device %s from node"
5018
                          " %s", dev.logical_id[1], src_node)
5019

    
5020
    if not self.rpc.call_finalize_export(dst_node.name, instance, snap_disks):
5021
      self.LogWarning("Could not finalize export for instance %s on node %s",
5022
                      instance.name, dst_node.name)
5023

    
5024
    nodelist = self.cfg.GetNodeList()
5025
    nodelist.remove(dst_node.name)
5026

    
5027
    # on one-node clusters nodelist will be empty after the removal
5028
    # if we proceed the backup would be removed because OpQueryExports
5029
    # substitutes an empty list with the full cluster node list.
5030
    if nodelist:
5031
      exportlist = self.rpc.call_export_list(nodelist)
5032
      for node in exportlist:
5033
        if instance.name in exportlist[node]:
5034
          if not self.rpc.call_export_remove(node, instance.name):
5035
            self.LogWarning("Could not remove older export for instance %s"
5036
                            " on node %s", instance.name, node)
5037

    
5038

    
5039
class LURemoveExport(NoHooksLU):
5040
  """Remove exports related to the named instance.
5041

5042
  """
5043
  _OP_REQP = ["instance_name"]
5044
  REQ_BGL = False
5045

    
5046
  def ExpandNames(self):
5047
    self.needed_locks = {}
5048
    # We need all nodes to be locked in order for RemoveExport to work, but we
5049
    # don't need to lock the instance itself, as nothing will happen to it (and
5050
    # we can remove exports also for a removed instance)
5051
    self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5052

    
5053
  def CheckPrereq(self):
5054
    """Check prerequisites.
5055
    """
5056
    pass
5057

    
5058
  def Exec(self, feedback_fn):
5059
    """Remove any export.
5060

5061
    """
5062
    instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
5063
    # If the instance was not found we'll try with the name that was passed in.
5064
    # This will only work if it was an FQDN, though.
5065
    fqdn_warn = False
5066
    if not instance_name:
5067
      fqdn_warn = True
5068
      instance_name = self.op.instance_name
5069

    
5070
    exportlist = self.rpc.call_export_list(self.acquired_locks[
5071
      locking.LEVEL_NODE])
5072
    found = False
5073
    for node in exportlist:
5074
      if instance_name in exportlist[node]:
5075
        found = True
5076
        if not self.rpc.call_export_remove(node, instance_name):
5077
          logging.error("Could not remove export for instance %s"
5078
                        " on node %s", instance_name, node)
5079

    
5080
    if fqdn_warn and not found:
5081
      feedback_fn("Export not found. If trying to remove an export belonging"
5082
                  " to a deleted instance please use its Fully Qualified"
5083
                  " Domain Name.")
5084

    
5085

    
5086
class TagsLU(NoHooksLU):
5087
  """Generic tags LU.
5088

5089
  This is an abstract class which is the parent of all the other tags LUs.
5090

5091
  """
5092

    
5093
  def ExpandNames(self):
5094
    self.needed_locks = {}
5095
    if self.op.kind == constants.TAG_NODE:
5096
      name = self.cfg.ExpandNodeName(self.op.name)
5097
      if name is None:
5098
        raise errors.OpPrereqError("Invalid node name (%s)" %
5099
                                   (self.op.name,))
5100
      self.op.name = name
5101
      self.needed_locks[locking.LEVEL_NODE] = name
5102
    elif self.op.kind == constants.TAG_INSTANCE:
5103
      name = self.cfg.ExpandInstanceName(self.op.name)
5104
      if name is None:
5105
        raise errors.OpPrereqError("Invalid instance name (%s)" %
5106
                                   (self.op.name,))
5107
      self.op.name = name
5108
      self.needed_locks[locking.LEVEL_INSTANCE] = name
5109

    
5110
  def CheckPrereq(self):
5111
    """Check prerequisites.
5112

5113
    """
5114
    if self.op.kind == constants.TAG_CLUSTER:
5115
      self.target = self.cfg.GetClusterInfo()
5116
    elif self.op.kind == constants.TAG_NODE:
5117
      self.target = self.cfg.GetNodeInfo(self.op.name)
5118
    elif self.op.kind == constants.TAG_INSTANCE:
5119
      self.target = self.cfg.GetInstanceInfo(self.op.name)
5120
    else:
5121
      raise errors.OpPrereqError("Wrong tag type requested (%s)" %
5122
                                 str(self.op.kind))
5123

    
5124

    
5125
class LUGetTags(TagsLU):
5126
  """Returns the tags of a given object.
5127

5128
  """
5129
  _OP_REQP = ["kind", "name"]
5130
  REQ_BGL = False
5131

    
5132
  def Exec(self, feedback_fn):
5133
    """Returns the tag list.
5134

5135
    """
5136
    return list(self.target.GetTags())
5137

    
5138

    
5139
class LUSearchTags(NoHooksLU):
5140
  """Searches the tags for a given pattern.
5141

5142
  """
5143
  _OP_REQP = ["pattern"]
5144
  REQ_BGL = False
5145

    
5146
  def ExpandNames(self):
5147
    self.needed_locks = {}
5148

    
5149
  def CheckPrereq(self):
5150
    """Check prerequisites.
5151

5152
    This checks the pattern passed for validity by compiling it.
5153

5154
    """
5155
    try:
5156
      self.re = re.compile(self.op.pattern)
5157
    except re.error, err:
5158
      raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
5159
                                 (self.op.pattern, err))
5160

    
5161
  def Exec(self, feedback_fn):
5162
    """Returns the tag list.
5163

5164
    """
5165
    cfg = self.cfg
5166
    tgts = [("/cluster", cfg.GetClusterInfo())]
5167
    ilist = cfg.GetAllInstancesInfo().values()
5168
    tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
5169
    nlist = cfg.GetAllNodesInfo().values()
5170
    tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
5171
    results = []
5172
    for path, target in tgts:
5173
      for tag in target.GetTags():
5174
        if self.re.search(tag):
5175
          results.append((path, tag))
5176
    return results
5177

    
5178

    
5179
class LUAddTags(TagsLU):
5180
  """Sets a tag on a given object.
5181

5182
  """
5183
  _OP_REQP = ["kind", "name", "tags"]
5184
  REQ_BGL = False
5185

    
5186
  def CheckPrereq(self):
5187
    """Check prerequisites.
5188

5189
    This checks the type and length of the tag name and value.
5190

5191
    """
5192
    TagsLU.CheckPrereq(self)
5193
    for tag in self.op.tags:
5194
      objects.TaggableObject.ValidateTag(tag)
5195

    
5196
  def Exec(self, feedback_fn):
5197
    """Sets the tag.
5198

5199
    """
5200
    try:
5201
      for tag in self.op.tags:
5202
        self.target.AddTag(tag)
5203
    except errors.TagError, err:
5204
      raise errors.OpExecError("Error while setting tag: %s" % str(err))
5205
    try:
5206
      self.cfg.Update(self.target)
5207
    except errors.ConfigurationError:
5208
      raise errors.OpRetryError("There has been a modification to the"
5209
                                " config file and the operation has been"
5210
                                " aborted. Please retry.")
5211

    
5212

    
5213
class LUDelTags(TagsLU):
5214
  """Delete a list of tags from a given object.
5215

5216
  """
5217
  _OP_REQP = ["kind", "name", "tags"]
5218
  REQ_BGL = False
5219

    
5220
  def CheckPrereq(self):
5221
    """Check prerequisites.
5222

5223
    This checks that we have the given tag.
5224

5225
    """
5226
    TagsLU.CheckPrereq(self)
5227
    for tag in self.op.tags:
5228
      objects.TaggableObject.ValidateTag(tag)
5229
    del_tags = frozenset(self.op.tags)
5230
    cur_tags = self.target.GetTags()
5231
    if not del_tags <= cur_tags:
5232
      diff_tags = del_tags - cur_tags
5233
      diff_names = ["'%s'" % tag for tag in diff_tags]
5234
      diff_names.sort()
5235
      raise errors.OpPrereqError("Tag(s) %s not found" %
5236
                                 (",".join(diff_names)))
5237

    
5238
  def Exec(self, feedback_fn):
5239
    """Remove the tag from the object.
5240

5241
    """
5242
    for tag in self.op.tags:
5243
      self.target.RemoveTag(tag)
5244
    try:
5245
      self.cfg.Update(self.target)
5246
    except errors.ConfigurationError:
5247
      raise errors.OpRetryError("There has been a modification to the"
5248
                                " config file and the operation has been"
5249
                                " aborted. Please retry.")
5250

    
5251

    
5252
class LUTestDelay(NoHooksLU):
5253
  """Sleep for a specified amount of time.
5254

5255
  This LU sleeps on the master and/or nodes for a specified amount of
5256
  time.
5257

5258
  """
5259
  _OP_REQP = ["duration", "on_master", "on_nodes"]
5260
  REQ_BGL = False
5261

    
5262
  def ExpandNames(self):
5263
    """Expand names and set required locks.
5264

5265
    This expands the node list, if any.
5266

5267
    """
5268
    self.needed_locks = {}
5269
    if self.op.on_nodes:
5270
      # _GetWantedNodes can be used here, but is not always appropriate to use
5271
      # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
5272
      # more information.
5273
      self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
5274
      self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
5275

    
5276
  def CheckPrereq(self):
5277
    """Check prerequisites.
5278

5279
    """
5280

    
5281
  def Exec(self, feedback_fn):
5282
    """Do the actual sleep.
5283

5284
    """
5285
    if self.op.on_master:
5286
      if not utils.TestDelay(self.op.duration):
5287
        raise errors.OpExecError("Error during master delay test")
5288
    if self.op.on_nodes:
5289
      result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
5290
      if not result:
5291
        raise errors.OpExecError("Complete failure from rpc call")
5292
      for node, node_result in result.items():
5293
        if not node_result:
5294
          raise errors.OpExecError("Failure during rpc call to node %s,"
5295
                                   " result: %s" % (node, node_result))
5296

    
5297

    
5298
class IAllocator(object):
5299
  """IAllocator framework.
5300

5301
  An IAllocator instance has three sets of attributes:
5302
    - cfg that is needed to query the cluster
5303
    - input data (all members of the _KEYS class attribute are required)
5304
    - four buffer attributes (in|out_data|text), that represent the
5305
      input (to the external script) in text and data structure format,
5306
      and the output from it, again in two formats
5307
    - the result variables from the script (success, info, nodes) for
5308
      easy usage
5309

5310
  """
5311
  _ALLO_KEYS = [
5312
    "mem_size", "disks", "disk_template",
5313
    "os", "tags", "nics", "vcpus", "hypervisor",
5314
    ]
5315
  _RELO_KEYS = [
5316
    "relocate_from",
5317
    ]
5318

    
5319
  def __init__(self, lu, mode, name, **kwargs):
5320
    self.lu = lu
5321
    # init buffer variables
5322
    self.in_text = self.out_text = self.in_data = self.out_data = None
5323
    # init all input fields so that pylint is happy
5324
    self.mode = mode
5325
    self.name = name
5326
    self.mem_size = self.disks = self.disk_template = None
5327
    self.os = self.tags = self.nics = self.vcpus = None
5328
    self.relocate_from = None
5329
    # computed fields
5330
    self.required_nodes = None
5331
    # init result fields
5332
    self.success = self.info = self.nodes = None
5333
    if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5334
      keyset = self._ALLO_KEYS
5335
    elif self.mode == constants.IALLOCATOR_MODE_RELOC:
5336
      keyset = self._RELO_KEYS
5337
    else:
5338
      raise errors.ProgrammerError("Unknown mode '%s' passed to the"
5339
                                   " IAllocator" % self.mode)
5340
    for key in kwargs:
5341
      if key not in keyset:
5342
        raise errors.ProgrammerError("Invalid input parameter '%s' to"
5343
                                     " IAllocator" % key)
5344
      setattr(self, key, kwargs[key])
5345
    for key in keyset:
5346
      if key not in kwargs:
5347
        raise errors.ProgrammerError("Missing input parameter '%s' to"
5348
                                     " IAllocator" % key)
5349
    self._BuildInputData()
5350

    
5351
  def _ComputeClusterData(self):
5352
    """Compute the generic allocator input data.
5353

5354
    This is the data that is independent of the actual operation.
5355

5356
    """
5357
    cfg = self.lu.cfg
5358
    cluster_info = cfg.GetClusterInfo()
5359
    # cluster data
5360
    data = {
5361
      "version": 1,
5362
      "cluster_name": cfg.GetClusterName(),
5363
      "cluster_tags": list(cluster_info.GetTags()),
5364
      "enable_hypervisors": list(cluster_info.enabled_hypervisors),
5365
      # we don't have job IDs
5366
      }
5367
    iinfo = cfg.GetAllInstancesInfo().values()
5368
    i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo]
5369

    
5370
    # node data
5371
    node_results = {}
5372
    node_list = cfg.GetNodeList()
5373

    
5374
    if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5375
      hypervisor = self.hypervisor
5376
    elif self.mode == constants.IALLOCATOR_MODE_RELOC:
5377
      hypervisor = cfg.GetInstanceInfo(self.name).hypervisor
5378

    
5379
    node_data = self.lu.rpc.call_node_info(node_list, cfg.GetVGName(),
5380
                                           hypervisor)
5381
    for nname in node_list:
5382
      ninfo = cfg.GetNodeInfo(nname)
5383
      if nname not in node_data or not isinstance(node_data[nname], dict):
5384
        raise errors.OpExecError("Can't get data for node %s" % nname)
5385
      remote_info = node_data[nname]
5386
      for attr in ['memory_total', 'memory_free', 'memory_dom0',
5387
                   'vg_size', 'vg_free', 'cpu_total']:
5388
        if attr not in remote_info:
5389
          raise errors.OpExecError("Node '%s' didn't return attribute '%s'" %
5390
                                   (nname, attr))
5391
        try:
5392
          remote_info[attr] = int(remote_info[attr])
5393
        except ValueError, err:
5394
          raise errors.OpExecError("Node '%s' returned invalid value for '%s':"
5395
                                   " %s" % (nname, attr, str(err)))
5396
      # compute memory used by primary instances
5397
      i_p_mem = i_p_up_mem = 0
5398
      for iinfo, beinfo in i_list:
5399
        if iinfo.primary_node == nname:
5400
          i_p_mem += beinfo[constants.BE_MEMORY]
5401
          if iinfo.status == "up":
5402
            i_p_up_mem += beinfo[constants.BE_MEMORY]
5403

    
5404
      # compute memory used by instances
5405
      pnr = {
5406
        "tags": list(ninfo.GetTags()),
5407
        "total_memory": remote_info['memory_total'],
5408
        "reserved_memory": remote_info['memory_dom0'],
5409
        "free_memory": remote_info['memory_free'],
5410
        "i_pri_memory": i_p_mem,
5411
        "i_pri_up_memory": i_p_up_mem,
5412
        "total_disk": remote_info['vg_size'],
5413
        "free_disk": remote_info['vg_free'],
5414
        "primary_ip": ninfo.primary_ip,
5415
        "secondary_ip": ninfo.secondary_ip,
5416
        "total_cpus": remote_info['cpu_total'],
5417
        }
5418
      node_results[nname] = pnr
5419
    data["nodes"] = node_results
5420

    
5421
    # instance data
5422
    instance_data = {}
5423
    for iinfo, beinfo in i_list:
5424
      nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
5425
                  for n in iinfo.nics]
5426
      pir = {
5427
        "tags": list(iinfo.GetTags()),
5428
        "should_run": iinfo.status == "up",
5429
        "vcpus": beinfo[constants.BE_VCPUS],
5430
        "memory": beinfo[constants.BE_MEMORY],
5431
        "os": iinfo.os,
5432
        "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
5433
        "nics": nic_data,
5434
        "disks": [{"size": dsk.size, "mode": "w"} for dsk in iinfo.disks],
5435
        "disk_template": iinfo.disk_template,
5436
        "hypervisor": iinfo.hypervisor,
5437
        }
5438
      instance_data[iinfo.name] = pir
5439

    
5440
    data["instances"] = instance_data
5441

    
5442
    self.in_data = data
5443

    
5444
  def _AddNewInstance(self):
5445
    """Add new instance data to allocator structure.
5446

5447
    This in combination with _AllocatorGetClusterData will create the
5448
    correct structure needed as input for the allocator.
5449

5450
    The checks for the completeness of the opcode must have already been
5451
    done.
5452

5453
    """
5454
    data = self.in_data
5455
    if len(self.disks) != 2:
5456
      raise errors.OpExecError("Only two-disk configurations supported")
5457

    
5458
    disk_space = _ComputeDiskSize(self.disk_template, self.disks)
5459

    
5460
    if self.disk_template in constants.DTS_NET_MIRROR:
5461
      self.required_nodes = 2
5462
    else:
5463
      self.required_nodes = 1
5464
    request = {
5465
      "type": "allocate",
5466
      "name": self.name,
5467
      "disk_template": self.disk_template,
5468
      "tags": self.tags,
5469
      "os": self.os,
5470
      "vcpus": self.vcpus,
5471
      "memory": self.mem_size,
5472
      "disks": self.disks,
5473
      "disk_space_total": disk_space,
5474
      "nics": self.nics,
5475
      "required_nodes": self.required_nodes,
5476
      }
5477
    data["request"] = request
5478

    
5479
  def _AddRelocateInstance(self):
5480
    """Add relocate instance data to allocator structure.
5481

5482
    This in combination with _IAllocatorGetClusterData will create the
5483
    correct structure needed as input for the allocator.
5484

5485
    The checks for the completeness of the opcode must have already been
5486
    done.
5487

5488
    """
5489
    instance = self.lu.cfg.GetInstanceInfo(self.name)
5490
    if instance is None:
5491
      raise errors.ProgrammerError("Unknown instance '%s' passed to"
5492
                                   " IAllocator" % self.name)
5493

    
5494
    if instance.disk_template not in constants.DTS_NET_MIRROR:
5495
      raise errors.OpPrereqError("Can't relocate non-mirrored instances")
5496

    
5497
    if len(instance.secondary_nodes) != 1:
5498
      raise errors.OpPrereqError("Instance has not exactly one secondary node")
5499

    
5500
    self.required_nodes = 1
5501
    disk_sizes = [{'size': disk.size} for disk in instance.disks]
5502
    disk_space = _ComputeDiskSize(instance.disk_template, disk_sizes)
5503

    
5504
    request = {
5505
      "type": "relocate",
5506
      "name": self.name,
5507
      "disk_space_total": disk_space,
5508
      "required_nodes": self.required_nodes,
5509
      "relocate_from": self.relocate_from,
5510
      }
5511
    self.in_data["request"] = request
5512

    
5513
  def _BuildInputData(self):
5514
    """Build input data structures.
5515

5516
    """
5517
    self._ComputeClusterData()
5518

    
5519
    if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5520
      self._AddNewInstance()
5521
    else:
5522
      self._AddRelocateInstance()
5523

    
5524
    self.in_text = serializer.Dump(self.in_data)
5525

    
5526
  def Run(self, name, validate=True, call_fn=None):
5527
    """Run an instance allocator and return the results.
5528

5529
    """
5530
    if call_fn is None:
5531
      call_fn = self.lu.rpc.call_iallocator_runner
5532
    data = self.in_text
5533

    
5534
    result = call_fn(self.lu.cfg.GetMasterNode(), name, self.in_text)
5535

    
5536
    if not isinstance(result, (list, tuple)) or len(result) != 4:
5537
      raise errors.OpExecError("Invalid result from master iallocator runner")
5538

    
5539
    rcode, stdout, stderr, fail = result
5540

    
5541
    if rcode == constants.IARUN_NOTFOUND:
5542
      raise errors.OpExecError("Can't find allocator '%s'" % name)
5543
    elif rcode == constants.IARUN_FAILURE:
5544
      raise errors.OpExecError("Instance allocator call failed: %s,"
5545
                               " output: %s" % (fail, stdout+stderr))
5546
    self.out_text = stdout
5547
    if validate:
5548
      self._ValidateResult()
5549

    
5550
  def _ValidateResult(self):
5551
    """Process the allocator results.
5552

5553
    This will process and if successful save the result in
5554
    self.out_data and the other parameters.
5555

5556
    """
5557
    try:
5558
      rdict = serializer.Load(self.out_text)
5559
    except Exception, err:
5560
      raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
5561

    
5562
    if not isinstance(rdict, dict):
5563
      raise errors.OpExecError("Can't parse iallocator results: not a dict")
5564

    
5565
    for key in "success", "info", "nodes":
5566
      if key not in rdict:
5567
        raise errors.OpExecError("Can't parse iallocator results:"
5568
                                 " missing key '%s'" % key)
5569
      setattr(self, key, rdict[key])
5570

    
5571
    if not isinstance(rdict["nodes"], list):
5572
      raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
5573
                               " is not a list")
5574
    self.out_data = rdict
5575

    
5576

    
5577
class LUTestAllocator(NoHooksLU):
5578
  """Run allocator tests.
5579

5580
  This LU runs the allocator tests
5581

5582
  """
5583
  _OP_REQP = ["direction", "mode", "name"]
5584

    
5585
  def CheckPrereq(self):
5586
    """Check prerequisites.
5587

5588
    This checks the opcode parameters depending on the director and mode test.
5589

5590
    """
5591
    if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5592
      for attr in ["name", "mem_size", "disks", "disk_template",
5593
                   "os", "tags", "nics", "vcpus"]:
5594
        if not hasattr(self.op, attr):
5595
          raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
5596
                                     attr)
5597
      iname = self.cfg.ExpandInstanceName(self.op.name)
5598
      if iname is not None:
5599
        raise errors.OpPrereqError("Instance '%s' already in the cluster" %
5600
                                   iname)
5601
      if not isinstance(self.op.nics, list):
5602
        raise errors.OpPrereqError("Invalid parameter 'nics'")
5603
      for row in self.op.nics:
5604
        if (not isinstance(row, dict) or
5605
            "mac" not in row or
5606
            "ip" not in row or
5607
            "bridge" not in row):
5608
          raise errors.OpPrereqError("Invalid contents of the"
5609
                                     " 'nics' parameter")
5610
      if not isinstance(self.op.disks, list):
5611
        raise errors.OpPrereqError("Invalid parameter 'disks'")
5612
      if len(self.op.disks) != 2:
5613
        raise errors.OpPrereqError("Only two-disk configurations supported")
5614
      for row in self.op.disks:
5615
        if (not isinstance(row, dict) or
5616
            "size" not in row or
5617
            not isinstance(row["size"], int) or
5618
            "mode" not in row or
5619
            row["mode"] not in ['r', 'w']):
5620
          raise errors.OpPrereqError("Invalid contents of the"
5621
                                     " 'disks' parameter")
5622
      if self.op.hypervisor is None:
5623
        self.op.hypervisor = self.cfg.GetHypervisorType()
5624
    elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
5625
      if not hasattr(self.op, "name"):
5626
        raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
5627
      fname = self.cfg.ExpandInstanceName(self.op.name)
5628
      if fname is None:
5629
        raise errors.OpPrereqError("Instance '%s' not found for relocation" %
5630
                                   self.op.name)
5631
      self.op.name = fname
5632
      self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
5633
    else:
5634
      raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
5635
                                 self.op.mode)
5636

    
5637
    if self.op.direction == constants.IALLOCATOR_DIR_OUT:
5638
      if not hasattr(self.op, "allocator") or self.op.allocator is None:
5639
        raise errors.OpPrereqError("Missing allocator name")
5640
    elif self.op.direction != constants.IALLOCATOR_DIR_IN:
5641
      raise errors.OpPrereqError("Wrong allocator test '%s'" %
5642
                                 self.op.direction)
5643

    
5644
  def Exec(self, feedback_fn):
5645
    """Run the allocator test.
5646

5647
    """
5648
    if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5649
      ial = IAllocator(self,
5650
                       mode=self.op.mode,
5651
                       name=self.op.name,
5652
                       mem_size=self.op.mem_size,
5653
                       disks=self.op.disks,
5654
                       disk_template=self.op.disk_template,
5655
                       os=self.op.os,
5656
                       tags=self.op.tags,
5657
                       nics=self.op.nics,
5658
                       vcpus=self.op.vcpus,
5659
                       hypervisor=self.op.hypervisor,
5660
                       )
5661
    else:
5662
      ial = IAllocator(self,
5663
                       mode=self.op.mode,
5664
                       name=self.op.name,
5665
                       relocate_from=list(self.relocate_from),
5666
                       )
5667

    
5668
    if self.op.direction == constants.IALLOCATOR_DIR_IN:
5669
      result = ial.in_text
5670
    else:
5671
      ial.Run(self.op.allocator, validate=False)
5672
      result = ial.out_text
5673
    return result