Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ b57e9819

History | View | Annotate | Download (194.6 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0613,W0201
25

    
26
import os
27
import os.path
28
import sha
29
import time
30
import tempfile
31
import re
32
import platform
33
import logging
34
import copy
35

    
36
from ganeti import ssh
37
from ganeti import utils
38
from ganeti import errors
39
from ganeti import hypervisor
40
from ganeti import locking
41
from ganeti import constants
42
from ganeti import objects
43
from ganeti import opcodes
44
from ganeti import serializer
45

    
46

    
47
class LogicalUnit(object):
48
  """Logical Unit base class.
49

50
  Subclasses must follow these rules:
51
    - implement ExpandNames
52
    - implement CheckPrereq
53
    - implement Exec
54
    - implement BuildHooksEnv
55
    - redefine HPATH and HTYPE
56
    - optionally redefine their run requirements:
57
        REQ_MASTER: the LU needs to run on the master node
58
        REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
59

60
  Note that all commands require root permissions.
61

62
  """
63
  HPATH = None
64
  HTYPE = None
65
  _OP_REQP = []
66
  REQ_MASTER = True
67
  REQ_BGL = True
68

    
69
  def __init__(self, processor, op, context, rpc):
70
    """Constructor for LogicalUnit.
71

72
    This needs to be overriden in derived classes in order to check op
73
    validity.
74

75
    """
76
    self.proc = processor
77
    self.op = op
78
    self.cfg = context.cfg
79
    self.context = context
80
    self.rpc = rpc
81
    # Dicts used to declare locking needs to mcpu
82
    self.needed_locks = None
83
    self.acquired_locks = {}
84
    self.share_locks = dict(((i, 0) for i in locking.LEVELS))
85
    self.add_locks = {}
86
    self.remove_locks = {}
87
    # Used to force good behavior when calling helper functions
88
    self.recalculate_locks = {}
89
    self.__ssh = None
90
    # logging
91
    self.LogWarning = processor.LogWarning
92
    self.LogInfo = processor.LogInfo
93

    
94
    for attr_name in self._OP_REQP:
95
      attr_val = getattr(op, attr_name, None)
96
      if attr_val is None:
97
        raise errors.OpPrereqError("Required parameter '%s' missing" %
98
                                   attr_name)
99

    
100
    if not self.cfg.IsCluster():
101
      raise errors.OpPrereqError("Cluster not initialized yet,"
102
                                 " use 'gnt-cluster init' first.")
103
    if self.REQ_MASTER:
104
      master = self.cfg.GetMasterNode()
105
      if master != utils.HostInfo().name:
106
        raise errors.OpPrereqError("Commands must be run on the master"
107
                                   " node %s" % master)
108

    
109
  def __GetSSH(self):
110
    """Returns the SshRunner object
111

112
    """
113
    if not self.__ssh:
114
      self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
115
    return self.__ssh
116

    
117
  ssh = property(fget=__GetSSH)
118

    
119
  def ExpandNames(self):
120
    """Expand names for this LU.
121

122
    This method is called before starting to execute the opcode, and it should
123
    update all the parameters of the opcode to their canonical form (e.g. a
124
    short node name must be fully expanded after this method has successfully
125
    completed). This way locking, hooks, logging, ecc. can work correctly.
126

127
    LUs which implement this method must also populate the self.needed_locks
128
    member, as a dict with lock levels as keys, and a list of needed lock names
129
    as values. Rules:
130

131
      - use an empty dict if you don't need any lock
132
      - if you don't need any lock at a particular level omit that level
133
      - don't put anything for the BGL level
134
      - if you want all locks at a level use locking.ALL_SET as a value
135

136
    If you need to share locks (rather than acquire them exclusively) at one
137
    level you can modify self.share_locks, setting a true value (usually 1) for
138
    that level. By default locks are not shared.
139

140
    Examples::
141

142
      # Acquire all nodes and one instance
143
      self.needed_locks = {
144
        locking.LEVEL_NODE: locking.ALL_SET,
145
        locking.LEVEL_INSTANCE: ['instance1.example.tld'],
146
      }
147
      # Acquire just two nodes
148
      self.needed_locks = {
149
        locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
150
      }
151
      # Acquire no locks
152
      self.needed_locks = {} # No, you can't leave it to the default value None
153

154
    """
155
    # The implementation of this method is mandatory only if the new LU is
156
    # concurrent, so that old LUs don't need to be changed all at the same
157
    # time.
158
    if self.REQ_BGL:
159
      self.needed_locks = {} # Exclusive LUs don't need locks.
160
    else:
161
      raise NotImplementedError
162

    
163
  def DeclareLocks(self, level):
164
    """Declare LU locking needs for a level
165

166
    While most LUs can just declare their locking needs at ExpandNames time,
167
    sometimes there's the need to calculate some locks after having acquired
168
    the ones before. This function is called just before acquiring locks at a
169
    particular level, but after acquiring the ones at lower levels, and permits
170
    such calculations. It can be used to modify self.needed_locks, and by
171
    default it does nothing.
172

173
    This function is only called if you have something already set in
174
    self.needed_locks for the level.
175

176
    @param level: Locking level which is going to be locked
177
    @type level: member of ganeti.locking.LEVELS
178

179
    """
180

    
181
  def CheckPrereq(self):
182
    """Check prerequisites for this LU.
183

184
    This method should check that the prerequisites for the execution
185
    of this LU are fulfilled. It can do internode communication, but
186
    it should be idempotent - no cluster or system changes are
187
    allowed.
188

189
    The method should raise errors.OpPrereqError in case something is
190
    not fulfilled. Its return value is ignored.
191

192
    This method should also update all the parameters of the opcode to
193
    their canonical form if it hasn't been done by ExpandNames before.
194

195
    """
196
    raise NotImplementedError
197

    
198
  def Exec(self, feedback_fn):
199
    """Execute the LU.
200

201
    This method should implement the actual work. It should raise
202
    errors.OpExecError for failures that are somewhat dealt with in
203
    code, or expected.
204

205
    """
206
    raise NotImplementedError
207

    
208
  def BuildHooksEnv(self):
209
    """Build hooks environment for this LU.
210

211
    This method should return a three-node tuple consisting of: a dict
212
    containing the environment that will be used for running the
213
    specific hook for this LU, a list of node names on which the hook
214
    should run before the execution, and a list of node names on which
215
    the hook should run after the execution.
216

217
    The keys of the dict must not have 'GANETI_' prefixed as this will
218
    be handled in the hooks runner. Also note additional keys will be
219
    added by the hooks runner. If the LU doesn't define any
220
    environment, an empty dict (and not None) should be returned.
221

222
    No nodes should be returned as an empty list (and not None).
223

224
    Note that if the HPATH for a LU class is None, this function will
225
    not be called.
226

227
    """
228
    raise NotImplementedError
229

    
230
  def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
231
    """Notify the LU about the results of its hooks.
232

233
    This method is called every time a hooks phase is executed, and notifies
234
    the Logical Unit about the hooks' result. The LU can then use it to alter
235
    its result based on the hooks.  By default the method does nothing and the
236
    previous result is passed back unchanged but any LU can define it if it
237
    wants to use the local cluster hook-scripts somehow.
238

239
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
240
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
241
    @param hook_results: the results of the multi-node hooks rpc call
242
    @param feedback_fn: function used send feedback back to the caller
243
    @param lu_result: the previous Exec result this LU had, or None
244
        in the PRE phase
245
    @return: the new Exec result, based on the previous result
246
        and hook results
247

248
    """
249
    return lu_result
250

    
251
  def _ExpandAndLockInstance(self):
252
    """Helper function to expand and lock an instance.
253

254
    Many LUs that work on an instance take its name in self.op.instance_name
255
    and need to expand it and then declare the expanded name for locking. This
256
    function does it, and then updates self.op.instance_name to the expanded
257
    name. It also initializes needed_locks as a dict, if this hasn't been done
258
    before.
259

260
    """
261
    if self.needed_locks is None:
262
      self.needed_locks = {}
263
    else:
264
      assert locking.LEVEL_INSTANCE not in self.needed_locks, \
265
        "_ExpandAndLockInstance called with instance-level locks set"
266
    expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
267
    if expanded_name is None:
268
      raise errors.OpPrereqError("Instance '%s' not known" %
269
                                  self.op.instance_name)
270
    self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
271
    self.op.instance_name = expanded_name
272

    
273
  def _LockInstancesNodes(self, primary_only=False):
274
    """Helper function to declare instances' nodes for locking.
275

276
    This function should be called after locking one or more instances to lock
277
    their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
278
    with all primary or secondary nodes for instances already locked and
279
    present in self.needed_locks[locking.LEVEL_INSTANCE].
280

281
    It should be called from DeclareLocks, and for safety only works if
282
    self.recalculate_locks[locking.LEVEL_NODE] is set.
283

284
    In the future it may grow parameters to just lock some instance's nodes, or
285
    to just lock primaries or secondary nodes, if needed.
286

287
    If should be called in DeclareLocks in a way similar to::
288

289
      if level == locking.LEVEL_NODE:
290
        self._LockInstancesNodes()
291

292
    @type primary_only: boolean
293
    @param primary_only: only lock primary nodes of locked instances
294

295
    """
296
    assert locking.LEVEL_NODE in self.recalculate_locks, \
297
      "_LockInstancesNodes helper function called with no nodes to recalculate"
298

    
299
    # TODO: check if we're really been called with the instance locks held
300

    
301
    # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
302
    # future we might want to have different behaviors depending on the value
303
    # of self.recalculate_locks[locking.LEVEL_NODE]
304
    wanted_nodes = []
305
    for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
306
      instance = self.context.cfg.GetInstanceInfo(instance_name)
307
      wanted_nodes.append(instance.primary_node)
308
      if not primary_only:
309
        wanted_nodes.extend(instance.secondary_nodes)
310

    
311
    if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
312
      self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
313
    elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
314
      self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
315

    
316
    del self.recalculate_locks[locking.LEVEL_NODE]
317

    
318

    
319
class NoHooksLU(LogicalUnit):
320
  """Simple LU which runs no hooks.
321

322
  This LU is intended as a parent for other LogicalUnits which will
323
  run no hooks, in order to reduce duplicate code.
324

325
  """
326
  HPATH = None
327
  HTYPE = None
328

    
329

    
330
def _GetWantedNodes(lu, nodes):
331
  """Returns list of checked and expanded node names.
332

333
  @type lu: L{LogicalUnit}
334
  @param lu: the logical unit on whose behalf we execute
335
  @type nodes: list
336
  @param nodes: list of node names or None for all nodes
337
  @rtype: list
338
  @return: the list of nodes, sorted
339
  @raise errors.OpProgrammerError: if the nodes parameter is wrong type
340

341
  """
342
  if not isinstance(nodes, list):
343
    raise errors.OpPrereqError("Invalid argument type 'nodes'")
344

    
345
  if not nodes:
346
    raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
347
      " non-empty list of nodes whose name is to be expanded.")
348

    
349
  wanted = []
350
  for name in nodes:
351
    node = lu.cfg.ExpandNodeName(name)
352
    if node is None:
353
      raise errors.OpPrereqError("No such node name '%s'" % name)
354
    wanted.append(node)
355

    
356
  return utils.NiceSort(wanted)
357

    
358

    
359
def _GetWantedInstances(lu, instances):
360
  """Returns list of checked and expanded instance names.
361

362
  @type lu: L{LogicalUnit}
363
  @param lu: the logical unit on whose behalf we execute
364
  @type instances: list
365
  @param instances: list of instance names or None for all instances
366
  @rtype: list
367
  @return: the list of instances, sorted
368
  @raise errors.OpPrereqError: if the instances parameter is wrong type
369
  @raise errors.OpPrereqError: if any of the passed instances is not found
370

371
  """
372
  if not isinstance(instances, list):
373
    raise errors.OpPrereqError("Invalid argument type 'instances'")
374

    
375
  if instances:
376
    wanted = []
377

    
378
    for name in instances:
379
      instance = lu.cfg.ExpandInstanceName(name)
380
      if instance is None:
381
        raise errors.OpPrereqError("No such instance name '%s'" % name)
382
      wanted.append(instance)
383

    
384
  else:
385
    wanted = lu.cfg.GetInstanceList()
386
  return utils.NiceSort(wanted)
387

    
388

    
389
def _CheckOutputFields(static, dynamic, selected):
390
  """Checks whether all selected fields are valid.
391

392
  @type static: L{utils.FieldSet}
393
  @param static: static fields set
394
  @type dynamic: L{utils.FieldSet}
395
  @param dynamic: dynamic fields set
396

397
  """
398
  f = utils.FieldSet()
399
  f.Extend(static)
400
  f.Extend(dynamic)
401

    
402
  delta = f.NonMatching(selected)
403
  if delta:
404
    raise errors.OpPrereqError("Unknown output fields selected: %s"
405
                               % ",".join(delta))
406

    
407

    
408
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
409
                          memory, vcpus, nics):
410
  """Builds instance related env variables for hooks
411

412
  This builds the hook environment from individual variables.
413

414
  @type name: string
415
  @param name: the name of the instance
416
  @type primary_node: string
417
  @param primary_node: the name of the instance's primary node
418
  @type secondary_nodes: list
419
  @param secondary_nodes: list of secondary nodes as strings
420
  @type os_type: string
421
  @param os_type: the name of the instance's OS
422
  @type status: string
423
  @param status: the desired status of the instances
424
  @type memory: string
425
  @param memory: the memory size of the instance
426
  @type vcpus: string
427
  @param vcpus: the count of VCPUs the instance has
428
  @type nics: list
429
  @param nics: list of tuples (ip, bridge, mac) representing
430
      the NICs the instance  has
431
  @rtype: dict
432
  @return: the hook environment for this instance
433

434
  """
435
  env = {
436
    "OP_TARGET": name,
437
    "INSTANCE_NAME": name,
438
    "INSTANCE_PRIMARY": primary_node,
439
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
440
    "INSTANCE_OS_TYPE": os_type,
441
    "INSTANCE_STATUS": status,
442
    "INSTANCE_MEMORY": memory,
443
    "INSTANCE_VCPUS": vcpus,
444
  }
445

    
446
  if nics:
447
    nic_count = len(nics)
448
    for idx, (ip, bridge, mac) in enumerate(nics):
449
      if ip is None:
450
        ip = ""
451
      env["INSTANCE_NIC%d_IP" % idx] = ip
452
      env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
453
      env["INSTANCE_NIC%d_HWADDR" % idx] = mac
454
  else:
455
    nic_count = 0
456

    
457
  env["INSTANCE_NIC_COUNT"] = nic_count
458

    
459
  return env
460

    
461

    
462
def _BuildInstanceHookEnvByObject(lu, instance, override=None):
463
  """Builds instance related env variables for hooks from an object.
464

465
  @type lu: L{LogicalUnit}
466
  @param lu: the logical unit on whose behalf we execute
467
  @type instance: L{objects.Instance}
468
  @param instance: the instance for which we should build the
469
      environment
470
  @type override: dict
471
  @param override: dictionary with key/values that will override
472
      our values
473
  @rtype: dict
474
  @return: the hook environment dictionary
475

476
  """
477
  bep = lu.cfg.GetClusterInfo().FillBE(instance)
478
  args = {
479
    'name': instance.name,
480
    'primary_node': instance.primary_node,
481
    'secondary_nodes': instance.secondary_nodes,
482
    'os_type': instance.os,
483
    'status': instance.os,
484
    'memory': bep[constants.BE_MEMORY],
485
    'vcpus': bep[constants.BE_VCPUS],
486
    'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
487
  }
488
  if override:
489
    args.update(override)
490
  return _BuildInstanceHookEnv(**args)
491

    
492

    
493
def _CheckInstanceBridgesExist(lu, instance):
494
  """Check that the brigdes needed by an instance exist.
495

496
  """
497
  # check bridges existance
498
  brlist = [nic.bridge for nic in instance.nics]
499
  if not lu.rpc.call_bridges_exist(instance.primary_node, brlist):
500
    raise errors.OpPrereqError("one or more target bridges %s does not"
501
                               " exist on destination node '%s'" %
502
                               (brlist, instance.primary_node))
503

    
504

    
505
class LUDestroyCluster(NoHooksLU):
506
  """Logical unit for destroying the cluster.
507

508
  """
509
  _OP_REQP = []
510

    
511
  def CheckPrereq(self):
512
    """Check prerequisites.
513

514
    This checks whether the cluster is empty.
515

516
    Any errors are signalled by raising errors.OpPrereqError.
517

518
    """
519
    master = self.cfg.GetMasterNode()
520

    
521
    nodelist = self.cfg.GetNodeList()
522
    if len(nodelist) != 1 or nodelist[0] != master:
523
      raise errors.OpPrereqError("There are still %d node(s) in"
524
                                 " this cluster." % (len(nodelist) - 1))
525
    instancelist = self.cfg.GetInstanceList()
526
    if instancelist:
527
      raise errors.OpPrereqError("There are still %d instance(s) in"
528
                                 " this cluster." % len(instancelist))
529

    
530
  def Exec(self, feedback_fn):
531
    """Destroys the cluster.
532

533
    """
534
    master = self.cfg.GetMasterNode()
535
    if not self.rpc.call_node_stop_master(master, False):
536
      raise errors.OpExecError("Could not disable the master role")
537
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
538
    utils.CreateBackup(priv_key)
539
    utils.CreateBackup(pub_key)
540
    return master
541

    
542

    
543
class LUVerifyCluster(LogicalUnit):
544
  """Verifies the cluster status.
545

546
  """
547
  HPATH = "cluster-verify"
548
  HTYPE = constants.HTYPE_CLUSTER
549
  _OP_REQP = ["skip_checks"]
550
  REQ_BGL = False
551

    
552
  def ExpandNames(self):
553
    self.needed_locks = {
554
      locking.LEVEL_NODE: locking.ALL_SET,
555
      locking.LEVEL_INSTANCE: locking.ALL_SET,
556
    }
557
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
558

    
559
  def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
560
                  remote_version, feedback_fn):
561
    """Run multiple tests against a node.
562

563
    Test list::
564

565
      - compares ganeti version
566
      - checks vg existance and size > 20G
567
      - checks config file checksum
568
      - checks ssh to other nodes
569

570
    @type node: string
571
    @param node: the name of the node to check
572
    @param file_list: required list of files
573
    @param local_cksum: dictionary of local files and their checksums
574
    @type vglist: dict
575
    @param vglist: dictionary of volume group names and their size
576
    @param node_result: the results from the node
577
    @param remote_version: the RPC version from the remote node
578
    @param feedback_fn: function used to accumulate results
579

580
    """
581
    # compares ganeti version
582
    local_version = constants.PROTOCOL_VERSION
583
    if not remote_version:
584
      feedback_fn("  - ERROR: connection to %s failed" % (node))
585
      return True
586

    
587
    if local_version != remote_version:
588
      feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
589
                      (local_version, node, remote_version))
590
      return True
591

    
592
    # checks vg existance and size > 20G
593

    
594
    bad = False
595
    if not vglist:
596
      feedback_fn("  - ERROR: unable to check volume groups on node %s." %
597
                      (node,))
598
      bad = True
599
    else:
600
      vgstatus = utils.CheckVolumeGroupSize(vglist, self.cfg.GetVGName(),
601
                                            constants.MIN_VG_SIZE)
602
      if vgstatus:
603
        feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
604
        bad = True
605

    
606
    if not node_result:
607
      feedback_fn("  - ERROR: unable to verify node %s." % (node,))
608
      return True
609

    
610
    # checks config file checksum
611
    # checks ssh to any
612

    
613
    if 'filelist' not in node_result:
614
      bad = True
615
      feedback_fn("  - ERROR: node hasn't returned file checksum data")
616
    else:
617
      remote_cksum = node_result['filelist']
618
      for file_name in file_list:
619
        if file_name not in remote_cksum:
620
          bad = True
621
          feedback_fn("  - ERROR: file '%s' missing" % file_name)
622
        elif remote_cksum[file_name] != local_cksum[file_name]:
623
          bad = True
624
          feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
625

    
626
    if 'nodelist' not in node_result:
627
      bad = True
628
      feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
629
    else:
630
      if node_result['nodelist']:
631
        bad = True
632
        for node in node_result['nodelist']:
633
          feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
634
                          (node, node_result['nodelist'][node]))
635
    if 'node-net-test' not in node_result:
636
      bad = True
637
      feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
638
    else:
639
      if node_result['node-net-test']:
640
        bad = True
641
        nlist = utils.NiceSort(node_result['node-net-test'].keys())
642
        for node in nlist:
643
          feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
644
                          (node, node_result['node-net-test'][node]))
645

    
646
    hyp_result = node_result.get('hypervisor', None)
647
    if isinstance(hyp_result, dict):
648
      for hv_name, hv_result in hyp_result.iteritems():
649
        if hv_result is not None:
650
          feedback_fn("  - ERROR: hypervisor %s verify failure: '%s'" %
651
                      (hv_name, hv_result))
652
    return bad
653

    
654
  def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
655
                      node_instance, feedback_fn):
656
    """Verify an instance.
657

658
    This function checks to see if the required block devices are
659
    available on the instance's node.
660

661
    """
662
    bad = False
663

    
664
    node_current = instanceconfig.primary_node
665

    
666
    node_vol_should = {}
667
    instanceconfig.MapLVsByNode(node_vol_should)
668

    
669
    for node in node_vol_should:
670
      for volume in node_vol_should[node]:
671
        if node not in node_vol_is or volume not in node_vol_is[node]:
672
          feedback_fn("  - ERROR: volume %s missing on node %s" %
673
                          (volume, node))
674
          bad = True
675

    
676
    if not instanceconfig.status == 'down':
677
      if (node_current not in node_instance or
678
          not instance in node_instance[node_current]):
679
        feedback_fn("  - ERROR: instance %s not running on node %s" %
680
                        (instance, node_current))
681
        bad = True
682

    
683
    for node in node_instance:
684
      if (not node == node_current):
685
        if instance in node_instance[node]:
686
          feedback_fn("  - ERROR: instance %s should not run on node %s" %
687
                          (instance, node))
688
          bad = True
689

    
690
    return bad
691

    
692
  def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
693
    """Verify if there are any unknown volumes in the cluster.
694

695
    The .os, .swap and backup volumes are ignored. All other volumes are
696
    reported as unknown.
697

698
    """
699
    bad = False
700

    
701
    for node in node_vol_is:
702
      for volume in node_vol_is[node]:
703
        if node not in node_vol_should or volume not in node_vol_should[node]:
704
          feedback_fn("  - ERROR: volume %s on node %s should not exist" %
705
                      (volume, node))
706
          bad = True
707
    return bad
708

    
709
  def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
710
    """Verify the list of running instances.
711

712
    This checks what instances are running but unknown to the cluster.
713

714
    """
715
    bad = False
716
    for node in node_instance:
717
      for runninginstance in node_instance[node]:
718
        if runninginstance not in instancelist:
719
          feedback_fn("  - ERROR: instance %s on node %s should not exist" %
720
                          (runninginstance, node))
721
          bad = True
722
    return bad
723

    
724
  def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
725
    """Verify N+1 Memory Resilience.
726

727
    Check that if one single node dies we can still start all the instances it
728
    was primary for.
729

730
    """
731
    bad = False
732

    
733
    for node, nodeinfo in node_info.iteritems():
734
      # This code checks that every node which is now listed as secondary has
735
      # enough memory to host all instances it is supposed to should a single
736
      # other node in the cluster fail.
737
      # FIXME: not ready for failover to an arbitrary node
738
      # FIXME: does not support file-backed instances
739
      # WARNING: we currently take into account down instances as well as up
740
      # ones, considering that even if they're down someone might want to start
741
      # them even in the event of a node failure.
742
      for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
743
        needed_mem = 0
744
        for instance in instances:
745
          bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
746
          if bep[constants.BE_AUTO_BALANCE]:
747
            needed_mem += bep[constants.BE_MEMORY]
748
        if nodeinfo['mfree'] < needed_mem:
749
          feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
750
                      " failovers should node %s fail" % (node, prinode))
751
          bad = True
752
    return bad
753

    
754
  def CheckPrereq(self):
755
    """Check prerequisites.
756

757
    Transform the list of checks we're going to skip into a set and check that
758
    all its members are valid.
759

760
    """
761
    self.skip_set = frozenset(self.op.skip_checks)
762
    if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
763
      raise errors.OpPrereqError("Invalid checks to be skipped specified")
764

    
765
  def BuildHooksEnv(self):
766
    """Build hooks env.
767

768
    Cluster-Verify hooks just rone in the post phase and their failure makes
769
    the output be logged in the verify output and the verification to fail.
770

771
    """
772
    all_nodes = self.cfg.GetNodeList()
773
    # TODO: populate the environment with useful information for verify hooks
774
    env = {}
775
    return env, [], all_nodes
776

    
777
  def Exec(self, feedback_fn):
778
    """Verify integrity of cluster, performing various test on nodes.
779

780
    """
781
    bad = False
782
    feedback_fn("* Verifying global settings")
783
    for msg in self.cfg.VerifyConfig():
784
      feedback_fn("  - ERROR: %s" % msg)
785

    
786
    vg_name = self.cfg.GetVGName()
787
    hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
788
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
789
    nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
790
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
791
    i_non_redundant = [] # Non redundant instances
792
    i_non_a_balanced = [] # Non auto-balanced instances
793
    node_volume = {}
794
    node_instance = {}
795
    node_info = {}
796
    instance_cfg = {}
797

    
798
    # FIXME: verify OS list
799
    # do local checksums
800
    file_names = []
801
    file_names.append(constants.SSL_CERT_FILE)
802
    file_names.append(constants.CLUSTER_CONF_FILE)
803
    local_checksums = utils.FingerprintFiles(file_names)
804

    
805
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
806
    all_volumeinfo = self.rpc.call_volume_list(nodelist, vg_name)
807
    all_instanceinfo = self.rpc.call_instance_list(nodelist, hypervisors)
808
    all_vglist = self.rpc.call_vg_list(nodelist)
809
    node_verify_param = {
810
      'filelist': file_names,
811
      'nodelist': nodelist,
812
      'hypervisor': hypervisors,
813
      'node-net-test': [(node.name, node.primary_ip, node.secondary_ip)
814
                        for node in nodeinfo]
815
      }
816
    all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
817
                                           self.cfg.GetClusterName())
818
    all_rversion = self.rpc.call_version(nodelist)
819
    all_ninfo = self.rpc.call_node_info(nodelist, self.cfg.GetVGName(),
820
                                        self.cfg.GetHypervisorType())
821

    
822
    cluster = self.cfg.GetClusterInfo()
823
    for node in nodelist:
824
      feedback_fn("* Verifying node %s" % node)
825
      result = self._VerifyNode(node, file_names, local_checksums,
826
                                all_vglist[node], all_nvinfo[node],
827
                                all_rversion[node], feedback_fn)
828
      bad = bad or result
829

    
830
      # node_volume
831
      volumeinfo = all_volumeinfo[node]
832

    
833
      if isinstance(volumeinfo, basestring):
834
        feedback_fn("  - ERROR: LVM problem on node %s: %s" %
835
                    (node, volumeinfo[-400:].encode('string_escape')))
836
        bad = True
837
        node_volume[node] = {}
838
      elif not isinstance(volumeinfo, dict):
839
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
840
        bad = True
841
        continue
842
      else:
843
        node_volume[node] = volumeinfo
844

    
845
      # node_instance
846
      nodeinstance = all_instanceinfo[node]
847
      if type(nodeinstance) != list:
848
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
849
        bad = True
850
        continue
851

    
852
      node_instance[node] = nodeinstance
853

    
854
      # node_info
855
      nodeinfo = all_ninfo[node]
856
      if not isinstance(nodeinfo, dict):
857
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
858
        bad = True
859
        continue
860

    
861
      try:
862
        node_info[node] = {
863
          "mfree": int(nodeinfo['memory_free']),
864
          "dfree": int(nodeinfo['vg_free']),
865
          "pinst": [],
866
          "sinst": [],
867
          # dictionary holding all instances this node is secondary for,
868
          # grouped by their primary node. Each key is a cluster node, and each
869
          # value is a list of instances which have the key as primary and the
870
          # current node as secondary.  this is handy to calculate N+1 memory
871
          # availability if you can only failover from a primary to its
872
          # secondary.
873
          "sinst-by-pnode": {},
874
        }
875
      except ValueError:
876
        feedback_fn("  - ERROR: invalid value returned from node %s" % (node,))
877
        bad = True
878
        continue
879

    
880
    node_vol_should = {}
881

    
882
    for instance in instancelist:
883
      feedback_fn("* Verifying instance %s" % instance)
884
      inst_config = self.cfg.GetInstanceInfo(instance)
885
      result =  self._VerifyInstance(instance, inst_config, node_volume,
886
                                     node_instance, feedback_fn)
887
      bad = bad or result
888

    
889
      inst_config.MapLVsByNode(node_vol_should)
890

    
891
      instance_cfg[instance] = inst_config
892

    
893
      pnode = inst_config.primary_node
894
      if pnode in node_info:
895
        node_info[pnode]['pinst'].append(instance)
896
      else:
897
        feedback_fn("  - ERROR: instance %s, connection to primary node"
898
                    " %s failed" % (instance, pnode))
899
        bad = True
900

    
901
      # If the instance is non-redundant we cannot survive losing its primary
902
      # node, so we are not N+1 compliant. On the other hand we have no disk
903
      # templates with more than one secondary so that situation is not well
904
      # supported either.
905
      # FIXME: does not support file-backed instances
906
      if len(inst_config.secondary_nodes) == 0:
907
        i_non_redundant.append(instance)
908
      elif len(inst_config.secondary_nodes) > 1:
909
        feedback_fn("  - WARNING: multiple secondaries for instance %s"
910
                    % instance)
911

    
912
      if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
913
        i_non_a_balanced.append(instance)
914

    
915
      for snode in inst_config.secondary_nodes:
916
        if snode in node_info:
917
          node_info[snode]['sinst'].append(instance)
918
          if pnode not in node_info[snode]['sinst-by-pnode']:
919
            node_info[snode]['sinst-by-pnode'][pnode] = []
920
          node_info[snode]['sinst-by-pnode'][pnode].append(instance)
921
        else:
922
          feedback_fn("  - ERROR: instance %s, connection to secondary node"
923
                      " %s failed" % (instance, snode))
924

    
925
    feedback_fn("* Verifying orphan volumes")
926
    result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
927
                                       feedback_fn)
928
    bad = bad or result
929

    
930
    feedback_fn("* Verifying remaining instances")
931
    result = self._VerifyOrphanInstances(instancelist, node_instance,
932
                                         feedback_fn)
933
    bad = bad or result
934

    
935
    if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
936
      feedback_fn("* Verifying N+1 Memory redundancy")
937
      result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
938
      bad = bad or result
939

    
940
    feedback_fn("* Other Notes")
941
    if i_non_redundant:
942
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
943
                  % len(i_non_redundant))
944

    
945
    if i_non_a_balanced:
946
      feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
947
                  % len(i_non_a_balanced))
948

    
949
    return not bad
950

    
951
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
952
    """Analize the post-hooks' result
953

954
    This method analyses the hook result, handles it, and sends some
955
    nicely-formatted feedback back to the user.
956

957
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
958
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
959
    @param hooks_results: the results of the multi-node hooks rpc call
960
    @param feedback_fn: function used send feedback back to the caller
961
    @param lu_result: previous Exec result
962
    @return: the new Exec result, based on the previous result
963
        and hook results
964

965
    """
966
    # We only really run POST phase hooks, and are only interested in
967
    # their results
968
    if phase == constants.HOOKS_PHASE_POST:
969
      # Used to change hooks' output to proper indentation
970
      indent_re = re.compile('^', re.M)
971
      feedback_fn("* Hooks Results")
972
      if not hooks_results:
973
        feedback_fn("  - ERROR: general communication failure")
974
        lu_result = 1
975
      else:
976
        for node_name in hooks_results:
977
          show_node_header = True
978
          res = hooks_results[node_name]
979
          if res is False or not isinstance(res, list):
980
            feedback_fn("    Communication failure")
981
            lu_result = 1
982
            continue
983
          for script, hkr, output in res:
984
            if hkr == constants.HKR_FAIL:
985
              # The node header is only shown once, if there are
986
              # failing hooks on that node
987
              if show_node_header:
988
                feedback_fn("  Node %s:" % node_name)
989
                show_node_header = False
990
              feedback_fn("    ERROR: Script %s failed, output:" % script)
991
              output = indent_re.sub('      ', output)
992
              feedback_fn("%s" % output)
993
              lu_result = 1
994

    
995
      return lu_result
996

    
997

    
998
class LUVerifyDisks(NoHooksLU):
999
  """Verifies the cluster disks status.
1000

1001
  """
1002
  _OP_REQP = []
1003
  REQ_BGL = False
1004

    
1005
  def ExpandNames(self):
1006
    self.needed_locks = {
1007
      locking.LEVEL_NODE: locking.ALL_SET,
1008
      locking.LEVEL_INSTANCE: locking.ALL_SET,
1009
    }
1010
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1011

    
1012
  def CheckPrereq(self):
1013
    """Check prerequisites.
1014

1015
    This has no prerequisites.
1016

1017
    """
1018
    pass
1019

    
1020
  def Exec(self, feedback_fn):
1021
    """Verify integrity of cluster disks.
1022

1023
    """
1024
    result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
1025

    
1026
    vg_name = self.cfg.GetVGName()
1027
    nodes = utils.NiceSort(self.cfg.GetNodeList())
1028
    instances = [self.cfg.GetInstanceInfo(name)
1029
                 for name in self.cfg.GetInstanceList()]
1030

    
1031
    nv_dict = {}
1032
    for inst in instances:
1033
      inst_lvs = {}
1034
      if (inst.status != "up" or
1035
          inst.disk_template not in constants.DTS_NET_MIRROR):
1036
        continue
1037
      inst.MapLVsByNode(inst_lvs)
1038
      # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1039
      for node, vol_list in inst_lvs.iteritems():
1040
        for vol in vol_list:
1041
          nv_dict[(node, vol)] = inst
1042

    
1043
    if not nv_dict:
1044
      return result
1045

    
1046
    node_lvs = self.rpc.call_volume_list(nodes, vg_name)
1047

    
1048
    to_act = set()
1049
    for node in nodes:
1050
      # node_volume
1051
      lvs = node_lvs[node]
1052

    
1053
      if isinstance(lvs, basestring):
1054
        logging.warning("Error enumerating LVs on node %s: %s", node, lvs)
1055
        res_nlvm[node] = lvs
1056
      elif not isinstance(lvs, dict):
1057
        logging.warning("Connection to node %s failed or invalid data"
1058
                        " returned", node)
1059
        res_nodes.append(node)
1060
        continue
1061

    
1062
      for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
1063
        inst = nv_dict.pop((node, lv_name), None)
1064
        if (not lv_online and inst is not None
1065
            and inst.name not in res_instances):
1066
          res_instances.append(inst.name)
1067

    
1068
    # any leftover items in nv_dict are missing LVs, let's arrange the
1069
    # data better
1070
    for key, inst in nv_dict.iteritems():
1071
      if inst.name not in res_missing:
1072
        res_missing[inst.name] = []
1073
      res_missing[inst.name].append(key)
1074

    
1075
    return result
1076

    
1077

    
1078
class LURenameCluster(LogicalUnit):
1079
  """Rename the cluster.
1080

1081
  """
1082
  HPATH = "cluster-rename"
1083
  HTYPE = constants.HTYPE_CLUSTER
1084
  _OP_REQP = ["name"]
1085

    
1086
  def BuildHooksEnv(self):
1087
    """Build hooks env.
1088

1089
    """
1090
    env = {
1091
      "OP_TARGET": self.cfg.GetClusterName(),
1092
      "NEW_NAME": self.op.name,
1093
      }
1094
    mn = self.cfg.GetMasterNode()
1095
    return env, [mn], [mn]
1096

    
1097
  def CheckPrereq(self):
1098
    """Verify that the passed name is a valid one.
1099

1100
    """
1101
    hostname = utils.HostInfo(self.op.name)
1102

    
1103
    new_name = hostname.name
1104
    self.ip = new_ip = hostname.ip
1105
    old_name = self.cfg.GetClusterName()
1106
    old_ip = self.cfg.GetMasterIP()
1107
    if new_name == old_name and new_ip == old_ip:
1108
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
1109
                                 " cluster has changed")
1110
    if new_ip != old_ip:
1111
      if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1112
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
1113
                                   " reachable on the network. Aborting." %
1114
                                   new_ip)
1115

    
1116
    self.op.name = new_name
1117

    
1118
  def Exec(self, feedback_fn):
1119
    """Rename the cluster.
1120

1121
    """
1122
    clustername = self.op.name
1123
    ip = self.ip
1124

    
1125
    # shutdown the master IP
1126
    master = self.cfg.GetMasterNode()
1127
    if not self.rpc.call_node_stop_master(master, False):
1128
      raise errors.OpExecError("Could not disable the master role")
1129

    
1130
    try:
1131
      # modify the sstore
1132
      # TODO: sstore
1133
      ss.SetKey(ss.SS_MASTER_IP, ip)
1134
      ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
1135

    
1136
      # Distribute updated ss config to all nodes
1137
      myself = self.cfg.GetNodeInfo(master)
1138
      dist_nodes = self.cfg.GetNodeList()
1139
      if myself.name in dist_nodes:
1140
        dist_nodes.remove(myself.name)
1141

    
1142
      logging.debug("Copying updated ssconf data to all nodes")
1143
      for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1144
        fname = ss.KeyToFilename(keyname)
1145
        result = self.rpc.call_upload_file(dist_nodes, fname)
1146
        for to_node in dist_nodes:
1147
          if not result[to_node]:
1148
            self.LogWarning("Copy of file %s to node %s failed",
1149
                            fname, to_node)
1150
    finally:
1151
      if not self.rpc.call_node_start_master(master, False):
1152
        self.LogWarning("Could not re-enable the master role on"
1153
                        " the master, please restart manually.")
1154

    
1155

    
1156
def _RecursiveCheckIfLVMBased(disk):
1157
  """Check if the given disk or its children are lvm-based.
1158

1159
  @type disk: L{objects.Disk}
1160
  @param disk: the disk to check
1161
  @rtype: booleean
1162
  @return: boolean indicating whether a LD_LV dev_type was found or not
1163

1164
  """
1165
  if disk.children:
1166
    for chdisk in disk.children:
1167
      if _RecursiveCheckIfLVMBased(chdisk):
1168
        return True
1169
  return disk.dev_type == constants.LD_LV
1170

    
1171

    
1172
class LUSetClusterParams(LogicalUnit):
1173
  """Change the parameters of the cluster.
1174

1175
  """
1176
  HPATH = "cluster-modify"
1177
  HTYPE = constants.HTYPE_CLUSTER
1178
  _OP_REQP = []
1179
  REQ_BGL = False
1180

    
1181
  def ExpandNames(self):
1182
    # FIXME: in the future maybe other cluster params won't require checking on
1183
    # all nodes to be modified.
1184
    self.needed_locks = {
1185
      locking.LEVEL_NODE: locking.ALL_SET,
1186
    }
1187
    self.share_locks[locking.LEVEL_NODE] = 1
1188

    
1189
  def BuildHooksEnv(self):
1190
    """Build hooks env.
1191

1192
    """
1193
    env = {
1194
      "OP_TARGET": self.cfg.GetClusterName(),
1195
      "NEW_VG_NAME": self.op.vg_name,
1196
      }
1197
    mn = self.cfg.GetMasterNode()
1198
    return env, [mn], [mn]
1199

    
1200
  def CheckPrereq(self):
1201
    """Check prerequisites.
1202

1203
    This checks whether the given params don't conflict and
1204
    if the given volume group is valid.
1205

1206
    """
1207
    # FIXME: This only works because there is only one parameter that can be
1208
    # changed or removed.
1209
    if self.op.vg_name is not None and not self.op.vg_name:
1210
      instances = self.cfg.GetAllInstancesInfo().values()
1211
      for inst in instances:
1212
        for disk in inst.disks:
1213
          if _RecursiveCheckIfLVMBased(disk):
1214
            raise errors.OpPrereqError("Cannot disable lvm storage while"
1215
                                       " lvm-based instances exist")
1216

    
1217
    node_list = self.acquired_locks[locking.LEVEL_NODE]
1218

    
1219
    # if vg_name not None, checks given volume group on all nodes
1220
    if self.op.vg_name:
1221
      vglist = self.rpc.call_vg_list(node_list)
1222
      for node in node_list:
1223
        vgstatus = utils.CheckVolumeGroupSize(vglist[node], self.op.vg_name,
1224
                                              constants.MIN_VG_SIZE)
1225
        if vgstatus:
1226
          raise errors.OpPrereqError("Error on node '%s': %s" %
1227
                                     (node, vgstatus))
1228

    
1229
    self.cluster = cluster = self.cfg.GetClusterInfo()
1230
    # beparams changes do not need validation (we can't validate?),
1231
    # but we still process here
1232
    if self.op.beparams:
1233
      self.new_beparams = cluster.FillDict(
1234
        cluster.beparams[constants.BEGR_DEFAULT], self.op.beparams)
1235

    
1236
    # hypervisor list/parameters
1237
    self.new_hvparams = cluster.FillDict(cluster.hvparams, {})
1238
    if self.op.hvparams:
1239
      if not isinstance(self.op.hvparams, dict):
1240
        raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1241
      for hv_name, hv_dict in self.op.hvparams.items():
1242
        if hv_name not in self.new_hvparams:
1243
          self.new_hvparams[hv_name] = hv_dict
1244
        else:
1245
          self.new_hvparams[hv_name].update(hv_dict)
1246

    
1247
    if self.op.enabled_hypervisors is not None:
1248
      self.hv_list = self.op.enabled_hypervisors
1249
    else:
1250
      self.hv_list = cluster.enabled_hypervisors
1251

    
1252
    if self.op.hvparams or self.op.enabled_hypervisors is not None:
1253
      # either the enabled list has changed, or the parameters have, validate
1254
      for hv_name, hv_params in self.new_hvparams.items():
1255
        if ((self.op.hvparams and hv_name in self.op.hvparams) or
1256
            (self.op.enabled_hypervisors and
1257
             hv_name in self.op.enabled_hypervisors)):
1258
          # either this is a new hypervisor, or its parameters have changed
1259
          hv_class = hypervisor.GetHypervisor(hv_name)
1260
          hv_class.CheckParameterSyntax(hv_params)
1261
          _CheckHVParams(self, node_list, hv_name, hv_params)
1262

    
1263
  def Exec(self, feedback_fn):
1264
    """Change the parameters of the cluster.
1265

1266
    """
1267
    if self.op.vg_name is not None:
1268
      if self.op.vg_name != self.cfg.GetVGName():
1269
        self.cfg.SetVGName(self.op.vg_name)
1270
      else:
1271
        feedback_fn("Cluster LVM configuration already in desired"
1272
                    " state, not changing")
1273
    if self.op.hvparams:
1274
      self.cluster.hvparams = self.new_hvparams
1275
    if self.op.enabled_hypervisors is not None:
1276
      self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1277
    if self.op.beparams:
1278
      self.cluster.beparams[constants.BEGR_DEFAULT] = self.new_beparams
1279
    self.cfg.Update(self.cluster)
1280

    
1281

    
1282
def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1283
  """Sleep and poll for an instance's disk to sync.
1284

1285
  """
1286
  if not instance.disks:
1287
    return True
1288

    
1289
  if not oneshot:
1290
    lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1291

    
1292
  node = instance.primary_node
1293

    
1294
  for dev in instance.disks:
1295
    lu.cfg.SetDiskID(dev, node)
1296

    
1297
  retries = 0
1298
  while True:
1299
    max_time = 0
1300
    done = True
1301
    cumul_degraded = False
1302
    rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1303
    if not rstats:
1304
      lu.LogWarning("Can't get any data from node %s", node)
1305
      retries += 1
1306
      if retries >= 10:
1307
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1308
                                 " aborting." % node)
1309
      time.sleep(6)
1310
      continue
1311
    retries = 0
1312
    for i in range(len(rstats)):
1313
      mstat = rstats[i]
1314
      if mstat is None:
1315
        lu.LogWarning("Can't compute data for node %s/%s",
1316
                           node, instance.disks[i].iv_name)
1317
        continue
1318
      # we ignore the ldisk parameter
1319
      perc_done, est_time, is_degraded, _ = mstat
1320
      cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1321
      if perc_done is not None:
1322
        done = False
1323
        if est_time is not None:
1324
          rem_time = "%d estimated seconds remaining" % est_time
1325
          max_time = est_time
1326
        else:
1327
          rem_time = "no time estimate"
1328
        lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1329
                        (instance.disks[i].iv_name, perc_done, rem_time))
1330
    if done or oneshot:
1331
      break
1332

    
1333
    time.sleep(min(60, max_time))
1334

    
1335
  if done:
1336
    lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1337
  return not cumul_degraded
1338

    
1339

    
1340
def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1341
  """Check that mirrors are not degraded.
1342

1343
  The ldisk parameter, if True, will change the test from the
1344
  is_degraded attribute (which represents overall non-ok status for
1345
  the device(s)) to the ldisk (representing the local storage status).
1346

1347
  """
1348
  lu.cfg.SetDiskID(dev, node)
1349
  if ldisk:
1350
    idx = 6
1351
  else:
1352
    idx = 5
1353

    
1354
  result = True
1355
  if on_primary or dev.AssembleOnSecondary():
1356
    rstats = lu.rpc.call_blockdev_find(node, dev)
1357
    if not rstats:
1358
      logging.warning("Node %s: disk degraded, not found or node down", node)
1359
      result = False
1360
    else:
1361
      result = result and (not rstats[idx])
1362
  if dev.children:
1363
    for child in dev.children:
1364
      result = result and _CheckDiskConsistency(lu, child, node, on_primary)
1365

    
1366
  return result
1367

    
1368

    
1369
class LUDiagnoseOS(NoHooksLU):
1370
  """Logical unit for OS diagnose/query.
1371

1372
  """
1373
  _OP_REQP = ["output_fields", "names"]
1374
  REQ_BGL = False
1375
  _FIELDS_STATIC = utils.FieldSet()
1376
  _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status")
1377

    
1378
  def ExpandNames(self):
1379
    if self.op.names:
1380
      raise errors.OpPrereqError("Selective OS query not supported")
1381

    
1382
    _CheckOutputFields(static=self._FIELDS_STATIC,
1383
                       dynamic=self._FIELDS_DYNAMIC,
1384
                       selected=self.op.output_fields)
1385

    
1386
    # Lock all nodes, in shared mode
1387
    self.needed_locks = {}
1388
    self.share_locks[locking.LEVEL_NODE] = 1
1389
    self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1390

    
1391
  def CheckPrereq(self):
1392
    """Check prerequisites.
1393

1394
    """
1395

    
1396
  @staticmethod
1397
  def _DiagnoseByOS(node_list, rlist):
1398
    """Remaps a per-node return list into an a per-os per-node dictionary
1399

1400
    @param node_list: a list with the names of all nodes
1401
    @param rlist: a map with node names as keys and OS objects as values
1402

1403
    @rtype: dict
1404
    @returns: a dictionary with osnames as keys and as value another map, with
1405
        nodes as keys and list of OS objects as values, eg::
1406

1407
          {"debian-etch": {"node1": [<object>,...],
1408
                           "node2": [<object>,]}
1409
          }
1410

1411
    """
1412
    all_os = {}
1413
    for node_name, nr in rlist.iteritems():
1414
      if not nr:
1415
        continue
1416
      for os_obj in nr:
1417
        if os_obj.name not in all_os:
1418
          # build a list of nodes for this os containing empty lists
1419
          # for each node in node_list
1420
          all_os[os_obj.name] = {}
1421
          for nname in node_list:
1422
            all_os[os_obj.name][nname] = []
1423
        all_os[os_obj.name][node_name].append(os_obj)
1424
    return all_os
1425

    
1426
  def Exec(self, feedback_fn):
1427
    """Compute the list of OSes.
1428

1429
    """
1430
    node_list = self.acquired_locks[locking.LEVEL_NODE]
1431
    node_data = self.rpc.call_os_diagnose(node_list)
1432
    if node_data == False:
1433
      raise errors.OpExecError("Can't gather the list of OSes")
1434
    pol = self._DiagnoseByOS(node_list, node_data)
1435
    output = []
1436
    for os_name, os_data in pol.iteritems():
1437
      row = []
1438
      for field in self.op.output_fields:
1439
        if field == "name":
1440
          val = os_name
1441
        elif field == "valid":
1442
          val = utils.all([osl and osl[0] for osl in os_data.values()])
1443
        elif field == "node_status":
1444
          val = {}
1445
          for node_name, nos_list in os_data.iteritems():
1446
            val[node_name] = [(v.status, v.path) for v in nos_list]
1447
        else:
1448
          raise errors.ParameterError(field)
1449
        row.append(val)
1450
      output.append(row)
1451

    
1452
    return output
1453

    
1454

    
1455
class LURemoveNode(LogicalUnit):
1456
  """Logical unit for removing a node.
1457

1458
  """
1459
  HPATH = "node-remove"
1460
  HTYPE = constants.HTYPE_NODE
1461
  _OP_REQP = ["node_name"]
1462

    
1463
  def BuildHooksEnv(self):
1464
    """Build hooks env.
1465

1466
    This doesn't run on the target node in the pre phase as a failed
1467
    node would then be impossible to remove.
1468

1469
    """
1470
    env = {
1471
      "OP_TARGET": self.op.node_name,
1472
      "NODE_NAME": self.op.node_name,
1473
      }
1474
    all_nodes = self.cfg.GetNodeList()
1475
    all_nodes.remove(self.op.node_name)
1476
    return env, all_nodes, all_nodes
1477

    
1478
  def CheckPrereq(self):
1479
    """Check prerequisites.
1480

1481
    This checks:
1482
     - the node exists in the configuration
1483
     - it does not have primary or secondary instances
1484
     - it's not the master
1485

1486
    Any errors are signalled by raising errors.OpPrereqError.
1487

1488
    """
1489
    node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1490
    if node is None:
1491
      raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1492

    
1493
    instance_list = self.cfg.GetInstanceList()
1494

    
1495
    masternode = self.cfg.GetMasterNode()
1496
    if node.name == masternode:
1497
      raise errors.OpPrereqError("Node is the master node,"
1498
                                 " you need to failover first.")
1499

    
1500
    for instance_name in instance_list:
1501
      instance = self.cfg.GetInstanceInfo(instance_name)
1502
      if node.name == instance.primary_node:
1503
        raise errors.OpPrereqError("Instance %s still running on the node,"
1504
                                   " please remove first." % instance_name)
1505
      if node.name in instance.secondary_nodes:
1506
        raise errors.OpPrereqError("Instance %s has node as a secondary,"
1507
                                   " please remove first." % instance_name)
1508
    self.op.node_name = node.name
1509
    self.node = node
1510

    
1511
  def Exec(self, feedback_fn):
1512
    """Removes the node from the cluster.
1513

1514
    """
1515
    node = self.node
1516
    logging.info("Stopping the node daemon and removing configs from node %s",
1517
                 node.name)
1518

    
1519
    self.context.RemoveNode(node.name)
1520

    
1521
    self.rpc.call_node_leave_cluster(node.name)
1522

    
1523

    
1524
class LUQueryNodes(NoHooksLU):
1525
  """Logical unit for querying nodes.
1526

1527
  """
1528
  _OP_REQP = ["output_fields", "names"]
1529
  REQ_BGL = False
1530
  _FIELDS_DYNAMIC = utils.FieldSet(
1531
    "dtotal", "dfree",
1532
    "mtotal", "mnode", "mfree",
1533
    "bootid",
1534
    "ctotal",
1535
    )
1536

    
1537
  _FIELDS_STATIC = utils.FieldSet(
1538
    "name", "pinst_cnt", "sinst_cnt",
1539
    "pinst_list", "sinst_list",
1540
    "pip", "sip", "tags",
1541
    "serial_no",
1542
    )
1543

    
1544
  def ExpandNames(self):
1545
    _CheckOutputFields(static=self._FIELDS_STATIC,
1546
                       dynamic=self._FIELDS_DYNAMIC,
1547
                       selected=self.op.output_fields)
1548

    
1549
    self.needed_locks = {}
1550
    self.share_locks[locking.LEVEL_NODE] = 1
1551

    
1552
    if self.op.names:
1553
      self.wanted = _GetWantedNodes(self, self.op.names)
1554
    else:
1555
      self.wanted = locking.ALL_SET
1556

    
1557
    self.do_locking = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
1558
    if self.do_locking:
1559
      # if we don't request only static fields, we need to lock the nodes
1560
      self.needed_locks[locking.LEVEL_NODE] = self.wanted
1561

    
1562

    
1563
  def CheckPrereq(self):
1564
    """Check prerequisites.
1565

1566
    """
1567
    # The validation of the node list is done in the _GetWantedNodes,
1568
    # if non empty, and if empty, there's no validation to do
1569
    pass
1570

    
1571
  def Exec(self, feedback_fn):
1572
    """Computes the list of nodes and their attributes.
1573

1574
    """
1575
    all_info = self.cfg.GetAllNodesInfo()
1576
    if self.do_locking:
1577
      nodenames = self.acquired_locks[locking.LEVEL_NODE]
1578
    elif self.wanted != locking.ALL_SET:
1579
      nodenames = self.wanted
1580
      missing = set(nodenames).difference(all_info.keys())
1581
      if missing:
1582
        raise errors.OpExecError(
1583
          "Some nodes were removed before retrieving their data: %s" % missing)
1584
    else:
1585
      nodenames = all_info.keys()
1586

    
1587
    nodenames = utils.NiceSort(nodenames)
1588
    nodelist = [all_info[name] for name in nodenames]
1589

    
1590
    # begin data gathering
1591

    
1592
    if self.do_locking:
1593
      live_data = {}
1594
      node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
1595
                                          self.cfg.GetHypervisorType())
1596
      for name in nodenames:
1597
        nodeinfo = node_data.get(name, None)
1598
        if nodeinfo:
1599
          live_data[name] = {
1600
            "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1601
            "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1602
            "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1603
            "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1604
            "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1605
            "ctotal": utils.TryConvert(int, nodeinfo['cpu_total']),
1606
            "bootid": nodeinfo['bootid'],
1607
            }
1608
        else:
1609
          live_data[name] = {}
1610
    else:
1611
      live_data = dict.fromkeys(nodenames, {})
1612

    
1613
    node_to_primary = dict([(name, set()) for name in nodenames])
1614
    node_to_secondary = dict([(name, set()) for name in nodenames])
1615

    
1616
    inst_fields = frozenset(("pinst_cnt", "pinst_list",
1617
                             "sinst_cnt", "sinst_list"))
1618
    if inst_fields & frozenset(self.op.output_fields):
1619
      instancelist = self.cfg.GetInstanceList()
1620

    
1621
      for instance_name in instancelist:
1622
        inst = self.cfg.GetInstanceInfo(instance_name)
1623
        if inst.primary_node in node_to_primary:
1624
          node_to_primary[inst.primary_node].add(inst.name)
1625
        for secnode in inst.secondary_nodes:
1626
          if secnode in node_to_secondary:
1627
            node_to_secondary[secnode].add(inst.name)
1628

    
1629
    # end data gathering
1630

    
1631
    output = []
1632
    for node in nodelist:
1633
      node_output = []
1634
      for field in self.op.output_fields:
1635
        if field == "name":
1636
          val = node.name
1637
        elif field == "pinst_list":
1638
          val = list(node_to_primary[node.name])
1639
        elif field == "sinst_list":
1640
          val = list(node_to_secondary[node.name])
1641
        elif field == "pinst_cnt":
1642
          val = len(node_to_primary[node.name])
1643
        elif field == "sinst_cnt":
1644
          val = len(node_to_secondary[node.name])
1645
        elif field == "pip":
1646
          val = node.primary_ip
1647
        elif field == "sip":
1648
          val = node.secondary_ip
1649
        elif field == "tags":
1650
          val = list(node.GetTags())
1651
        elif field == "serial_no":
1652
          val = node.serial_no
1653
        elif self._FIELDS_DYNAMIC.Matches(field):
1654
          val = live_data[node.name].get(field, None)
1655
        else:
1656
          raise errors.ParameterError(field)
1657
        node_output.append(val)
1658
      output.append(node_output)
1659

    
1660
    return output
1661

    
1662

    
1663
class LUQueryNodeVolumes(NoHooksLU):
1664
  """Logical unit for getting volumes on node(s).
1665

1666
  """
1667
  _OP_REQP = ["nodes", "output_fields"]
1668
  REQ_BGL = False
1669
  _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
1670
  _FIELDS_STATIC = utils.FieldSet("node")
1671

    
1672
  def ExpandNames(self):
1673
    _CheckOutputFields(static=self._FIELDS_STATIC,
1674
                       dynamic=self._FIELDS_DYNAMIC,
1675
                       selected=self.op.output_fields)
1676

    
1677
    self.needed_locks = {}
1678
    self.share_locks[locking.LEVEL_NODE] = 1
1679
    if not self.op.nodes:
1680
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1681
    else:
1682
      self.needed_locks[locking.LEVEL_NODE] = \
1683
        _GetWantedNodes(self, self.op.nodes)
1684

    
1685
  def CheckPrereq(self):
1686
    """Check prerequisites.
1687

1688
    This checks that the fields required are valid output fields.
1689

1690
    """
1691
    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
1692

    
1693
  def Exec(self, feedback_fn):
1694
    """Computes the list of nodes and their attributes.
1695

1696
    """
1697
    nodenames = self.nodes
1698
    volumes = self.rpc.call_node_volumes(nodenames)
1699

    
1700
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
1701
             in self.cfg.GetInstanceList()]
1702

    
1703
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1704

    
1705
    output = []
1706
    for node in nodenames:
1707
      if node not in volumes or not volumes[node]:
1708
        continue
1709

    
1710
      node_vols = volumes[node][:]
1711
      node_vols.sort(key=lambda vol: vol['dev'])
1712

    
1713
      for vol in node_vols:
1714
        node_output = []
1715
        for field in self.op.output_fields:
1716
          if field == "node":
1717
            val = node
1718
          elif field == "phys":
1719
            val = vol['dev']
1720
          elif field == "vg":
1721
            val = vol['vg']
1722
          elif field == "name":
1723
            val = vol['name']
1724
          elif field == "size":
1725
            val = int(float(vol['size']))
1726
          elif field == "instance":
1727
            for inst in ilist:
1728
              if node not in lv_by_node[inst]:
1729
                continue
1730
              if vol['name'] in lv_by_node[inst][node]:
1731
                val = inst.name
1732
                break
1733
            else:
1734
              val = '-'
1735
          else:
1736
            raise errors.ParameterError(field)
1737
          node_output.append(str(val))
1738

    
1739
        output.append(node_output)
1740

    
1741
    return output
1742

    
1743

    
1744
class LUAddNode(LogicalUnit):
1745
  """Logical unit for adding node to the cluster.
1746

1747
  """
1748
  HPATH = "node-add"
1749
  HTYPE = constants.HTYPE_NODE
1750
  _OP_REQP = ["node_name"]
1751

    
1752
  def BuildHooksEnv(self):
1753
    """Build hooks env.
1754

1755
    This will run on all nodes before, and on all nodes + the new node after.
1756

1757
    """
1758
    env = {
1759
      "OP_TARGET": self.op.node_name,
1760
      "NODE_NAME": self.op.node_name,
1761
      "NODE_PIP": self.op.primary_ip,
1762
      "NODE_SIP": self.op.secondary_ip,
1763
      }
1764
    nodes_0 = self.cfg.GetNodeList()
1765
    nodes_1 = nodes_0 + [self.op.node_name, ]
1766
    return env, nodes_0, nodes_1
1767

    
1768
  def CheckPrereq(self):
1769
    """Check prerequisites.
1770

1771
    This checks:
1772
     - the new node is not already in the config
1773
     - it is resolvable
1774
     - its parameters (single/dual homed) matches the cluster
1775

1776
    Any errors are signalled by raising errors.OpPrereqError.
1777

1778
    """
1779
    node_name = self.op.node_name
1780
    cfg = self.cfg
1781

    
1782
    dns_data = utils.HostInfo(node_name)
1783

    
1784
    node = dns_data.name
1785
    primary_ip = self.op.primary_ip = dns_data.ip
1786
    secondary_ip = getattr(self.op, "secondary_ip", None)
1787
    if secondary_ip is None:
1788
      secondary_ip = primary_ip
1789
    if not utils.IsValidIP(secondary_ip):
1790
      raise errors.OpPrereqError("Invalid secondary IP given")
1791
    self.op.secondary_ip = secondary_ip
1792

    
1793
    node_list = cfg.GetNodeList()
1794
    if not self.op.readd and node in node_list:
1795
      raise errors.OpPrereqError("Node %s is already in the configuration" %
1796
                                 node)
1797
    elif self.op.readd and node not in node_list:
1798
      raise errors.OpPrereqError("Node %s is not in the configuration" % node)
1799

    
1800
    for existing_node_name in node_list:
1801
      existing_node = cfg.GetNodeInfo(existing_node_name)
1802

    
1803
      if self.op.readd and node == existing_node_name:
1804
        if (existing_node.primary_ip != primary_ip or
1805
            existing_node.secondary_ip != secondary_ip):
1806
          raise errors.OpPrereqError("Readded node doesn't have the same IP"
1807
                                     " address configuration as before")
1808
        continue
1809

    
1810
      if (existing_node.primary_ip == primary_ip or
1811
          existing_node.secondary_ip == primary_ip or
1812
          existing_node.primary_ip == secondary_ip or
1813
          existing_node.secondary_ip == secondary_ip):
1814
        raise errors.OpPrereqError("New node ip address(es) conflict with"
1815
                                   " existing node %s" % existing_node.name)
1816

    
1817
    # check that the type of the node (single versus dual homed) is the
1818
    # same as for the master
1819
    myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
1820
    master_singlehomed = myself.secondary_ip == myself.primary_ip
1821
    newbie_singlehomed = secondary_ip == primary_ip
1822
    if master_singlehomed != newbie_singlehomed:
1823
      if master_singlehomed:
1824
        raise errors.OpPrereqError("The master has no private ip but the"
1825
                                   " new node has one")
1826
      else:
1827
        raise errors.OpPrereqError("The master has a private ip but the"
1828
                                   " new node doesn't have one")
1829

    
1830
    # checks reachablity
1831
    if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1832
      raise errors.OpPrereqError("Node not reachable by ping")
1833

    
1834
    if not newbie_singlehomed:
1835
      # check reachability from my secondary ip to newbie's secondary ip
1836
      if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1837
                           source=myself.secondary_ip):
1838
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1839
                                   " based ping to noded port")
1840

    
1841
    self.new_node = objects.Node(name=node,
1842
                                 primary_ip=primary_ip,
1843
                                 secondary_ip=secondary_ip)
1844

    
1845
  def Exec(self, feedback_fn):
1846
    """Adds the new node to the cluster.
1847

1848
    """
1849
    new_node = self.new_node
1850
    node = new_node.name
1851

    
1852
    # check connectivity
1853
    result = self.rpc.call_version([node])[node]
1854
    if result:
1855
      if constants.PROTOCOL_VERSION == result:
1856
        logging.info("Communication to node %s fine, sw version %s match",
1857
                     node, result)
1858
      else:
1859
        raise errors.OpExecError("Version mismatch master version %s,"
1860
                                 " node version %s" %
1861
                                 (constants.PROTOCOL_VERSION, result))
1862
    else:
1863
      raise errors.OpExecError("Cannot get version from the new node")
1864

    
1865
    # setup ssh on node
1866
    logging.info("Copy ssh key to node %s", node)
1867
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1868
    keyarray = []
1869
    keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1870
                constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1871
                priv_key, pub_key]
1872

    
1873
    for i in keyfiles:
1874
      f = open(i, 'r')
1875
      try:
1876
        keyarray.append(f.read())
1877
      finally:
1878
        f.close()
1879

    
1880
    result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
1881
                                    keyarray[2],
1882
                                    keyarray[3], keyarray[4], keyarray[5])
1883

    
1884
    if not result:
1885
      raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1886

    
1887
    # Add node to our /etc/hosts, and add key to known_hosts
1888
    utils.AddHostToEtcHosts(new_node.name)
1889

    
1890
    if new_node.secondary_ip != new_node.primary_ip:
1891
      if not self.rpc.call_node_has_ip_address(new_node.name,
1892
                                               new_node.secondary_ip):
1893
        raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1894
                                 " you gave (%s). Please fix and re-run this"
1895
                                 " command." % new_node.secondary_ip)
1896

    
1897
    node_verify_list = [self.cfg.GetMasterNode()]
1898
    node_verify_param = {
1899
      'nodelist': [node],
1900
      # TODO: do a node-net-test as well?
1901
    }
1902

    
1903
    result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
1904
                                       self.cfg.GetClusterName())
1905
    for verifier in node_verify_list:
1906
      if not result[verifier]:
1907
        raise errors.OpExecError("Cannot communicate with %s's node daemon"
1908
                                 " for remote verification" % verifier)
1909
      if result[verifier]['nodelist']:
1910
        for failed in result[verifier]['nodelist']:
1911
          feedback_fn("ssh/hostname verification failed %s -> %s" %
1912
                      (verifier, result[verifier]['nodelist'][failed]))
1913
        raise errors.OpExecError("ssh/hostname verification failed.")
1914

    
1915
    # Distribute updated /etc/hosts and known_hosts to all nodes,
1916
    # including the node just added
1917
    myself = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
1918
    dist_nodes = self.cfg.GetNodeList()
1919
    if not self.op.readd:
1920
      dist_nodes.append(node)
1921
    if myself.name in dist_nodes:
1922
      dist_nodes.remove(myself.name)
1923

    
1924
    logging.debug("Copying hosts and known_hosts to all nodes")
1925
    for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1926
      result = self.rpc.call_upload_file(dist_nodes, fname)
1927
      for to_node in dist_nodes:
1928
        if not result[to_node]:
1929
          logging.error("Copy of file %s to node %s failed", fname, to_node)
1930

    
1931
    to_copy = []
1932
    if constants.HT_XEN_HVM in self.cfg.GetClusterInfo().enabled_hypervisors:
1933
      to_copy.append(constants.VNC_PASSWORD_FILE)
1934
    for fname in to_copy:
1935
      result = self.rpc.call_upload_file([node], fname)
1936
      if not result[node]:
1937
        logging.error("Could not copy file %s to node %s", fname, node)
1938

    
1939
    if self.op.readd:
1940
      self.context.ReaddNode(new_node)
1941
    else:
1942
      self.context.AddNode(new_node)
1943

    
1944

    
1945
class LUQueryClusterInfo(NoHooksLU):
1946
  """Query cluster configuration.
1947

1948
  """
1949
  _OP_REQP = []
1950
  REQ_MASTER = False
1951
  REQ_BGL = False
1952

    
1953
  def ExpandNames(self):
1954
    self.needed_locks = {}
1955

    
1956
  def CheckPrereq(self):
1957
    """No prerequsites needed for this LU.
1958

1959
    """
1960
    pass
1961

    
1962
  def Exec(self, feedback_fn):
1963
    """Return cluster config.
1964

1965
    """
1966
    cluster = self.cfg.GetClusterInfo()
1967
    result = {
1968
      "software_version": constants.RELEASE_VERSION,
1969
      "protocol_version": constants.PROTOCOL_VERSION,
1970
      "config_version": constants.CONFIG_VERSION,
1971
      "os_api_version": constants.OS_API_VERSION,
1972
      "export_version": constants.EXPORT_VERSION,
1973
      "architecture": (platform.architecture()[0], platform.machine()),
1974
      "name": cluster.cluster_name,
1975
      "master": cluster.master_node,
1976
      "default_hypervisor": cluster.default_hypervisor,
1977
      "enabled_hypervisors": cluster.enabled_hypervisors,
1978
      "hvparams": cluster.hvparams,
1979
      "beparams": cluster.beparams,
1980
      }
1981

    
1982
    return result
1983

    
1984

    
1985
class LUQueryConfigValues(NoHooksLU):
1986
  """Return configuration values.
1987

1988
  """
1989
  _OP_REQP = []
1990
  REQ_BGL = False
1991
  _FIELDS_DYNAMIC = utils.FieldSet()
1992
  _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag")
1993

    
1994
  def ExpandNames(self):
1995
    self.needed_locks = {}
1996

    
1997
    _CheckOutputFields(static=self._FIELDS_STATIC,
1998
                       dynamic=self._FIELDS_DYNAMIC,
1999
                       selected=self.op.output_fields)
2000

    
2001
  def CheckPrereq(self):
2002
    """No prerequisites.
2003

2004
    """
2005
    pass
2006

    
2007
  def Exec(self, feedback_fn):
2008
    """Dump a representation of the cluster config to the standard output.
2009

2010
    """
2011
    values = []
2012
    for field in self.op.output_fields:
2013
      if field == "cluster_name":
2014
        entry = self.cfg.GetClusterName()
2015
      elif field == "master_node":
2016
        entry = self.cfg.GetMasterNode()
2017
      elif field == "drain_flag":
2018
        entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
2019
      else:
2020
        raise errors.ParameterError(field)
2021
      values.append(entry)
2022
    return values
2023

    
2024

    
2025
class LUActivateInstanceDisks(NoHooksLU):
2026
  """Bring up an instance's disks.
2027

2028
  """
2029
  _OP_REQP = ["instance_name"]
2030
  REQ_BGL = False
2031

    
2032
  def ExpandNames(self):
2033
    self._ExpandAndLockInstance()
2034
    self.needed_locks[locking.LEVEL_NODE] = []
2035
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2036

    
2037
  def DeclareLocks(self, level):
2038
    if level == locking.LEVEL_NODE:
2039
      self._LockInstancesNodes()
2040

    
2041
  def CheckPrereq(self):
2042
    """Check prerequisites.
2043

2044
    This checks that the instance is in the cluster.
2045

2046
    """
2047
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2048
    assert self.instance is not None, \
2049
      "Cannot retrieve locked instance %s" % self.op.instance_name
2050

    
2051
  def Exec(self, feedback_fn):
2052
    """Activate the disks.
2053

2054
    """
2055
    disks_ok, disks_info = _AssembleInstanceDisks(self, self.instance)
2056
    if not disks_ok:
2057
      raise errors.OpExecError("Cannot activate block devices")
2058

    
2059
    return disks_info
2060

    
2061

    
2062
def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
2063
  """Prepare the block devices for an instance.
2064

2065
  This sets up the block devices on all nodes.
2066

2067
  @type lu: L{LogicalUnit}
2068
  @param lu: the logical unit on whose behalf we execute
2069
  @type instance: L{objects.Instance}
2070
  @param instance: the instance for whose disks we assemble
2071
  @type ignore_secondaries: boolean
2072
  @param ignore_secondaries: if true, errors on secondary nodes
2073
      won't result in an error return from the function
2074
  @return: False if the operation failed, otherwise a list of
2075
      (host, instance_visible_name, node_visible_name)
2076
      with the mapping from node devices to instance devices
2077

2078
  """
2079
  device_info = []
2080
  disks_ok = True
2081
  iname = instance.name
2082
  # With the two passes mechanism we try to reduce the window of
2083
  # opportunity for the race condition of switching DRBD to primary
2084
  # before handshaking occured, but we do not eliminate it
2085

    
2086
  # The proper fix would be to wait (with some limits) until the
2087
  # connection has been made and drbd transitions from WFConnection
2088
  # into any other network-connected state (Connected, SyncTarget,
2089
  # SyncSource, etc.)
2090

    
2091
  # 1st pass, assemble on all nodes in secondary mode
2092
  for inst_disk in instance.disks:
2093
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2094
      lu.cfg.SetDiskID(node_disk, node)
2095
      result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
2096
      if not result:
2097
        lu.proc.LogWarning("Could not prepare block device %s on node %s"
2098
                           " (is_primary=False, pass=1)",
2099
                           inst_disk.iv_name, node)
2100
        if not ignore_secondaries:
2101
          disks_ok = False
2102

    
2103
  # FIXME: race condition on drbd migration to primary
2104

    
2105
  # 2nd pass, do only the primary node
2106
  for inst_disk in instance.disks:
2107
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2108
      if node != instance.primary_node:
2109
        continue
2110
      lu.cfg.SetDiskID(node_disk, node)
2111
      result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
2112
      if not result:
2113
        lu.proc.LogWarning("Could not prepare block device %s on node %s"
2114
                           " (is_primary=True, pass=2)",
2115
                           inst_disk.iv_name, node)
2116
        disks_ok = False
2117
    device_info.append((instance.primary_node, inst_disk.iv_name, result))
2118

    
2119
  # leave the disks configured for the primary node
2120
  # this is a workaround that would be fixed better by
2121
  # improving the logical/physical id handling
2122
  for disk in instance.disks:
2123
    lu.cfg.SetDiskID(disk, instance.primary_node)
2124

    
2125
  return disks_ok, device_info
2126

    
2127

    
2128
def _StartInstanceDisks(lu, instance, force):
2129
  """Start the disks of an instance.
2130

2131
  """
2132
  disks_ok, dummy = _AssembleInstanceDisks(lu, instance,
2133
                                           ignore_secondaries=force)
2134
  if not disks_ok:
2135
    _ShutdownInstanceDisks(lu, instance)
2136
    if force is not None and not force:
2137
      lu.proc.LogWarning("", hint="If the message above refers to a"
2138
                         " secondary node,"
2139
                         " you can retry the operation using '--force'.")
2140
    raise errors.OpExecError("Disk consistency error")
2141

    
2142

    
2143
class LUDeactivateInstanceDisks(NoHooksLU):
2144
  """Shutdown an instance's disks.
2145

2146
  """
2147
  _OP_REQP = ["instance_name"]
2148
  REQ_BGL = False
2149

    
2150
  def ExpandNames(self):
2151
    self._ExpandAndLockInstance()
2152
    self.needed_locks[locking.LEVEL_NODE] = []
2153
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2154

    
2155
  def DeclareLocks(self, level):
2156
    if level == locking.LEVEL_NODE:
2157
      self._LockInstancesNodes()
2158

    
2159
  def CheckPrereq(self):
2160
    """Check prerequisites.
2161

2162
    This checks that the instance is in the cluster.
2163

2164
    """
2165
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2166
    assert self.instance is not None, \
2167
      "Cannot retrieve locked instance %s" % self.op.instance_name
2168

    
2169
  def Exec(self, feedback_fn):
2170
    """Deactivate the disks
2171

2172
    """
2173
    instance = self.instance
2174
    _SafeShutdownInstanceDisks(self, instance)
2175

    
2176

    
2177
def _SafeShutdownInstanceDisks(lu, instance):
2178
  """Shutdown block devices of an instance.
2179

2180
  This function checks if an instance is running, before calling
2181
  _ShutdownInstanceDisks.
2182

2183
  """
2184
  ins_l = lu.rpc.call_instance_list([instance.primary_node],
2185
                                      [instance.hypervisor])
2186
  ins_l = ins_l[instance.primary_node]
2187
  if not type(ins_l) is list:
2188
    raise errors.OpExecError("Can't contact node '%s'" %
2189
                             instance.primary_node)
2190

    
2191
  if instance.name in ins_l:
2192
    raise errors.OpExecError("Instance is running, can't shutdown"
2193
                             " block devices.")
2194

    
2195
  _ShutdownInstanceDisks(lu, instance)
2196

    
2197

    
2198
def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
2199
  """Shutdown block devices of an instance.
2200

2201
  This does the shutdown on all nodes of the instance.
2202

2203
  If the ignore_primary is false, errors on the primary node are
2204
  ignored.
2205

2206
  """
2207
  result = True
2208
  for disk in instance.disks:
2209
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2210
      lu.cfg.SetDiskID(top_disk, node)
2211
      if not lu.rpc.call_blockdev_shutdown(node, top_disk):
2212
        logging.error("Could not shutdown block device %s on node %s",
2213
                      disk.iv_name, node)
2214
        if not ignore_primary or node != instance.primary_node:
2215
          result = False
2216
  return result
2217

    
2218

    
2219
def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor):
2220
  """Checks if a node has enough free memory.
2221

2222
  This function check if a given node has the needed amount of free
2223
  memory. In case the node has less memory or we cannot get the
2224
  information from the node, this function raise an OpPrereqError
2225
  exception.
2226

2227
  @type lu: C{LogicalUnit}
2228
  @param lu: a logical unit from which we get configuration data
2229
  @type node: C{str}
2230
  @param node: the node to check
2231
  @type reason: C{str}
2232
  @param reason: string to use in the error message
2233
  @type requested: C{int}
2234
  @param requested: the amount of memory in MiB to check for
2235
  @type hypervisor: C{str}
2236
  @param hypervisor: the hypervisor to ask for memory stats
2237
  @raise errors.OpPrereqError: if the node doesn't have enough memory, or
2238
      we cannot check the node
2239

2240
  """
2241
  nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor)
2242
  if not nodeinfo or not isinstance(nodeinfo, dict):
2243
    raise errors.OpPrereqError("Could not contact node %s for resource"
2244
                             " information" % (node,))
2245

    
2246
  free_mem = nodeinfo[node].get('memory_free')
2247
  if not isinstance(free_mem, int):
2248
    raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2249
                             " was '%s'" % (node, free_mem))
2250
  if requested > free_mem:
2251
    raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2252
                             " needed %s MiB, available %s MiB" %
2253
                             (node, reason, requested, free_mem))
2254

    
2255

    
2256
class LUStartupInstance(LogicalUnit):
2257
  """Starts an instance.
2258

2259
  """
2260
  HPATH = "instance-start"
2261
  HTYPE = constants.HTYPE_INSTANCE
2262
  _OP_REQP = ["instance_name", "force"]
2263
  REQ_BGL = False
2264

    
2265
  def ExpandNames(self):
2266
    self._ExpandAndLockInstance()
2267

    
2268
  def BuildHooksEnv(self):
2269
    """Build hooks env.
2270

2271
    This runs on master, primary and secondary nodes of the instance.
2272

2273
    """
2274
    env = {
2275
      "FORCE": self.op.force,
2276
      }
2277
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2278
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2279
          list(self.instance.secondary_nodes))
2280
    return env, nl, nl
2281

    
2282
  def CheckPrereq(self):
2283
    """Check prerequisites.
2284

2285
    This checks that the instance is in the cluster.
2286

2287
    """
2288
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2289
    assert self.instance is not None, \
2290
      "Cannot retrieve locked instance %s" % self.op.instance_name
2291

    
2292
    bep = self.cfg.GetClusterInfo().FillBE(instance)
2293
    # check bridges existance
2294
    _CheckInstanceBridgesExist(self, instance)
2295

    
2296
    _CheckNodeFreeMemory(self, instance.primary_node,
2297
                         "starting instance %s" % instance.name,
2298
                         bep[constants.BE_MEMORY], instance.hypervisor)
2299

    
2300
  def Exec(self, feedback_fn):
2301
    """Start the instance.
2302

2303
    """
2304
    instance = self.instance
2305
    force = self.op.force
2306
    extra_args = getattr(self.op, "extra_args", "")
2307

    
2308
    self.cfg.MarkInstanceUp(instance.name)
2309

    
2310
    node_current = instance.primary_node
2311

    
2312
    _StartInstanceDisks(self, instance, force)
2313

    
2314
    if not self.rpc.call_instance_start(node_current, instance, extra_args):
2315
      _ShutdownInstanceDisks(self, instance)
2316
      raise errors.OpExecError("Could not start instance")
2317

    
2318

    
2319
class LURebootInstance(LogicalUnit):
2320
  """Reboot an instance.
2321

2322
  """
2323
  HPATH = "instance-reboot"
2324
  HTYPE = constants.HTYPE_INSTANCE
2325
  _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2326
  REQ_BGL = False
2327

    
2328
  def ExpandNames(self):
2329
    if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2330
                                   constants.INSTANCE_REBOOT_HARD,
2331
                                   constants.INSTANCE_REBOOT_FULL]:
2332
      raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2333
                                  (constants.INSTANCE_REBOOT_SOFT,
2334
                                   constants.INSTANCE_REBOOT_HARD,
2335
                                   constants.INSTANCE_REBOOT_FULL))
2336
    self._ExpandAndLockInstance()
2337

    
2338
  def BuildHooksEnv(self):
2339
    """Build hooks env.
2340

2341
    This runs on master, primary and secondary nodes of the instance.
2342

2343
    """
2344
    env = {
2345
      "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2346
      }
2347
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2348
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2349
          list(self.instance.secondary_nodes))
2350
    return env, nl, nl
2351

    
2352
  def CheckPrereq(self):
2353
    """Check prerequisites.
2354

2355
    This checks that the instance is in the cluster.
2356

2357
    """
2358
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2359
    assert self.instance is not None, \
2360
      "Cannot retrieve locked instance %s" % self.op.instance_name
2361

    
2362
    # check bridges existance
2363
    _CheckInstanceBridgesExist(self, instance)
2364

    
2365
  def Exec(self, feedback_fn):
2366
    """Reboot the instance.
2367

2368
    """
2369
    instance = self.instance
2370
    ignore_secondaries = self.op.ignore_secondaries
2371
    reboot_type = self.op.reboot_type
2372
    extra_args = getattr(self.op, "extra_args", "")
2373

    
2374
    node_current = instance.primary_node
2375

    
2376
    if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2377
                       constants.INSTANCE_REBOOT_HARD]:
2378
      if not self.rpc.call_instance_reboot(node_current, instance,
2379
                                           reboot_type, extra_args):
2380
        raise errors.OpExecError("Could not reboot instance")
2381
    else:
2382
      if not self.rpc.call_instance_shutdown(node_current, instance):
2383
        raise errors.OpExecError("could not shutdown instance for full reboot")
2384
      _ShutdownInstanceDisks(self, instance)
2385
      _StartInstanceDisks(self, instance, ignore_secondaries)
2386
      if not self.rpc.call_instance_start(node_current, instance, extra_args):
2387
        _ShutdownInstanceDisks(self, instance)
2388
        raise errors.OpExecError("Could not start instance for full reboot")
2389

    
2390
    self.cfg.MarkInstanceUp(instance.name)
2391

    
2392

    
2393
class LUShutdownInstance(LogicalUnit):
2394
  """Shutdown an instance.
2395

2396
  """
2397
  HPATH = "instance-stop"
2398
  HTYPE = constants.HTYPE_INSTANCE
2399
  _OP_REQP = ["instance_name"]
2400
  REQ_BGL = False
2401

    
2402
  def ExpandNames(self):
2403
    self._ExpandAndLockInstance()
2404

    
2405
  def BuildHooksEnv(self):
2406
    """Build hooks env.
2407

2408
    This runs on master, primary and secondary nodes of the instance.
2409

2410
    """
2411
    env = _BuildInstanceHookEnvByObject(self, self.instance)
2412
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2413
          list(self.instance.secondary_nodes))
2414
    return env, nl, nl
2415

    
2416
  def CheckPrereq(self):
2417
    """Check prerequisites.
2418

2419
    This checks that the instance is in the cluster.
2420

2421
    """
2422
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2423
    assert self.instance is not None, \
2424
      "Cannot retrieve locked instance %s" % self.op.instance_name
2425

    
2426
  def Exec(self, feedback_fn):
2427
    """Shutdown the instance.
2428

2429
    """
2430
    instance = self.instance
2431
    node_current = instance.primary_node
2432
    self.cfg.MarkInstanceDown(instance.name)
2433
    if not self.rpc.call_instance_shutdown(node_current, instance):
2434
      self.proc.LogWarning("Could not shutdown instance")
2435

    
2436
    _ShutdownInstanceDisks(self, instance)
2437

    
2438

    
2439
class LUReinstallInstance(LogicalUnit):
2440
  """Reinstall an instance.
2441

2442
  """
2443
  HPATH = "instance-reinstall"
2444
  HTYPE = constants.HTYPE_INSTANCE
2445
  _OP_REQP = ["instance_name"]
2446
  REQ_BGL = False
2447

    
2448
  def ExpandNames(self):
2449
    self._ExpandAndLockInstance()
2450

    
2451
  def BuildHooksEnv(self):
2452
    """Build hooks env.
2453

2454
    This runs on master, primary and secondary nodes of the instance.
2455

2456
    """
2457
    env = _BuildInstanceHookEnvByObject(self, self.instance)
2458
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2459
          list(self.instance.secondary_nodes))
2460
    return env, nl, nl
2461

    
2462
  def CheckPrereq(self):
2463
    """Check prerequisites.
2464

2465
    This checks that the instance is in the cluster and is not running.
2466

2467
    """
2468
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2469
    assert instance is not None, \
2470
      "Cannot retrieve locked instance %s" % self.op.instance_name
2471

    
2472
    if instance.disk_template == constants.DT_DISKLESS:
2473
      raise errors.OpPrereqError("Instance '%s' has no disks" %
2474
                                 self.op.instance_name)
2475
    if instance.status != "down":
2476
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2477
                                 self.op.instance_name)
2478
    remote_info = self.rpc.call_instance_info(instance.primary_node,
2479
                                              instance.name,
2480
                                              instance.hypervisor)
2481
    if remote_info:
2482
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2483
                                 (self.op.instance_name,
2484
                                  instance.primary_node))
2485

    
2486
    self.op.os_type = getattr(self.op, "os_type", None)
2487
    if self.op.os_type is not None:
2488
      # OS verification
2489
      pnode = self.cfg.GetNodeInfo(
2490
        self.cfg.ExpandNodeName(instance.primary_node))
2491
      if pnode is None:
2492
        raise errors.OpPrereqError("Primary node '%s' is unknown" %
2493
                                   self.op.pnode)
2494
      os_obj = self.rpc.call_os_get(pnode.name, self.op.os_type)
2495
      if not os_obj:
2496
        raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2497
                                   " primary node"  % self.op.os_type)
2498

    
2499
    self.instance = instance
2500

    
2501
  def Exec(self, feedback_fn):
2502
    """Reinstall the instance.
2503

2504
    """
2505
    inst = self.instance
2506

    
2507
    if self.op.os_type is not None:
2508
      feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2509
      inst.os = self.op.os_type
2510
      self.cfg.Update(inst)
2511

    
2512
    _StartInstanceDisks(self, inst, None)
2513
    try:
2514
      feedback_fn("Running the instance OS create scripts...")
2515
      if not self.rpc.call_instance_os_add(inst.primary_node, inst):
2516
        raise errors.OpExecError("Could not install OS for instance %s"
2517
                                 " on node %s" %
2518
                                 (inst.name, inst.primary_node))
2519
    finally:
2520
      _ShutdownInstanceDisks(self, inst)
2521

    
2522

    
2523
class LURenameInstance(LogicalUnit):
2524
  """Rename an instance.
2525

2526
  """
2527
  HPATH = "instance-rename"
2528
  HTYPE = constants.HTYPE_INSTANCE
2529
  _OP_REQP = ["instance_name", "new_name"]
2530

    
2531
  def BuildHooksEnv(self):
2532
    """Build hooks env.
2533

2534
    This runs on master, primary and secondary nodes of the instance.
2535

2536
    """
2537
    env = _BuildInstanceHookEnvByObject(self, self.instance)
2538
    env["INSTANCE_NEW_NAME"] = self.op.new_name
2539
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2540
          list(self.instance.secondary_nodes))
2541
    return env, nl, nl
2542

    
2543
  def CheckPrereq(self):
2544
    """Check prerequisites.
2545

2546
    This checks that the instance is in the cluster and is not running.
2547

2548
    """
2549
    instance = self.cfg.GetInstanceInfo(
2550
      self.cfg.ExpandInstanceName(self.op.instance_name))
2551
    if instance is None:
2552
      raise errors.OpPrereqError("Instance '%s' not known" %
2553
                                 self.op.instance_name)
2554
    if instance.status != "down":
2555
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2556
                                 self.op.instance_name)
2557
    remote_info = self.rpc.call_instance_info(instance.primary_node,
2558
                                              instance.name,
2559
                                              instance.hypervisor)
2560
    if remote_info:
2561
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2562
                                 (self.op.instance_name,
2563
                                  instance.primary_node))
2564
    self.instance = instance
2565

    
2566
    # new name verification
2567
    name_info = utils.HostInfo(self.op.new_name)
2568

    
2569
    self.op.new_name = new_name = name_info.name
2570
    instance_list = self.cfg.GetInstanceList()
2571
    if new_name in instance_list:
2572
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2573
                                 new_name)
2574

    
2575
    if not getattr(self.op, "ignore_ip", False):
2576
      if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
2577
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
2578
                                   (name_info.ip, new_name))
2579

    
2580

    
2581
  def Exec(self, feedback_fn):
2582
    """Reinstall the instance.
2583

2584
    """
2585
    inst = self.instance
2586
    old_name = inst.name
2587

    
2588
    if inst.disk_template == constants.DT_FILE:
2589
      old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2590

    
2591
    self.cfg.RenameInstance(inst.name, self.op.new_name)
2592
    # Change the instance lock. This is definitely safe while we hold the BGL
2593
    self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
2594
    self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
2595

    
2596
    # re-read the instance from the configuration after rename
2597
    inst = self.cfg.GetInstanceInfo(self.op.new_name)
2598

    
2599
    if inst.disk_template == constants.DT_FILE:
2600
      new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2601
      result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
2602
                                                     old_file_storage_dir,
2603
                                                     new_file_storage_dir)
2604

    
2605
      if not result:
2606
        raise errors.OpExecError("Could not connect to node '%s' to rename"
2607
                                 " directory '%s' to '%s' (but the instance"
2608
                                 " has been renamed in Ganeti)" % (
2609
                                 inst.primary_node, old_file_storage_dir,
2610
                                 new_file_storage_dir))
2611

    
2612
      if not result[0]:
2613
        raise errors.OpExecError("Could not rename directory '%s' to '%s'"
2614
                                 " (but the instance has been renamed in"
2615
                                 " Ganeti)" % (old_file_storage_dir,
2616
                                               new_file_storage_dir))
2617

    
2618
    _StartInstanceDisks(self, inst, None)
2619
    try:
2620
      if not self.rpc.call_instance_run_rename(inst.primary_node, inst,
2621
                                               old_name):
2622
        msg = ("Could not run OS rename script for instance %s on node %s"
2623
               " (but the instance has been renamed in Ganeti)" %
2624
               (inst.name, inst.primary_node))
2625
        self.proc.LogWarning(msg)
2626
    finally:
2627
      _ShutdownInstanceDisks(self, inst)
2628

    
2629

    
2630
class LURemoveInstance(LogicalUnit):
2631
  """Remove an instance.
2632

2633
  """
2634
  HPATH = "instance-remove"
2635
  HTYPE = constants.HTYPE_INSTANCE
2636
  _OP_REQP = ["instance_name", "ignore_failures"]
2637
  REQ_BGL = False
2638

    
2639
  def ExpandNames(self):
2640
    self._ExpandAndLockInstance()
2641
    self.needed_locks[locking.LEVEL_NODE] = []
2642
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2643

    
2644
  def DeclareLocks(self, level):
2645
    if level == locking.LEVEL_NODE:
2646
      self._LockInstancesNodes()
2647

    
2648
  def BuildHooksEnv(self):
2649
    """Build hooks env.
2650

2651
    This runs on master, primary and secondary nodes of the instance.
2652

2653
    """
2654
    env = _BuildInstanceHookEnvByObject(self, self.instance)
2655
    nl = [self.cfg.GetMasterNode()]
2656
    return env, nl, nl
2657

    
2658
  def CheckPrereq(self):
2659
    """Check prerequisites.
2660

2661
    This checks that the instance is in the cluster.
2662

2663
    """
2664
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2665
    assert self.instance is not None, \
2666
      "Cannot retrieve locked instance %s" % self.op.instance_name
2667

    
2668
  def Exec(self, feedback_fn):
2669
    """Remove the instance.
2670

2671
    """
2672
    instance = self.instance
2673
    logging.info("Shutting down instance %s on node %s",
2674
                 instance.name, instance.primary_node)
2675

    
2676
    if not self.rpc.call_instance_shutdown(instance.primary_node, instance):
2677
      if self.op.ignore_failures:
2678
        feedback_fn("Warning: can't shutdown instance")
2679
      else:
2680
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2681
                                 (instance.name, instance.primary_node))
2682

    
2683
    logging.info("Removing block devices for instance %s", instance.name)
2684

    
2685
    if not _RemoveDisks(self, instance):
2686
      if self.op.ignore_failures:
2687
        feedback_fn("Warning: can't remove instance's disks")
2688
      else:
2689
        raise errors.OpExecError("Can't remove instance's disks")
2690

    
2691
    logging.info("Removing instance %s out of cluster config", instance.name)
2692

    
2693
    self.cfg.RemoveInstance(instance.name)
2694
    self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
2695

    
2696

    
2697
class LUQueryInstances(NoHooksLU):
2698
  """Logical unit for querying instances.
2699

2700
  """
2701
  _OP_REQP = ["output_fields", "names"]
2702
  REQ_BGL = False
2703
  _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
2704
                                    "admin_state", "admin_ram",
2705
                                    "disk_template", "ip", "mac", "bridge",
2706
                                    "sda_size", "sdb_size", "vcpus", "tags",
2707
                                    "network_port", "beparams",
2708
                                    "(disk).(size)/([0-9]+)",
2709
                                    "(disk).(sizes)",
2710
                                    "(nic).(mac|ip|bridge)/([0-9]+)",
2711
                                    "(nic).(macs|ips|bridges)",
2712
                                    "(disk|nic).(count)",
2713
                                    "serial_no", "hypervisor", "hvparams",] +
2714
                                  ["hv/%s" % name
2715
                                   for name in constants.HVS_PARAMETERS] +
2716
                                  ["be/%s" % name
2717
                                   for name in constants.BES_PARAMETERS])
2718
  _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
2719

    
2720

    
2721
  def ExpandNames(self):
2722
    _CheckOutputFields(static=self._FIELDS_STATIC,
2723
                       dynamic=self._FIELDS_DYNAMIC,
2724
                       selected=self.op.output_fields)
2725

    
2726
    self.needed_locks = {}
2727
    self.share_locks[locking.LEVEL_INSTANCE] = 1
2728
    self.share_locks[locking.LEVEL_NODE] = 1
2729

    
2730
    if self.op.names:
2731
      self.wanted = _GetWantedInstances(self, self.op.names)
2732
    else:
2733
      self.wanted = locking.ALL_SET
2734

    
2735
    self.do_locking = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
2736
    if self.do_locking:
2737
      self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
2738
      self.needed_locks[locking.LEVEL_NODE] = []
2739
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2740

    
2741
  def DeclareLocks(self, level):
2742
    if level == locking.LEVEL_NODE and self.do_locking:
2743
      self._LockInstancesNodes()
2744

    
2745
  def CheckPrereq(self):
2746
    """Check prerequisites.
2747

2748
    """
2749
    pass
2750

    
2751
  def Exec(self, feedback_fn):
2752
    """Computes the list of nodes and their attributes.
2753

2754
    """
2755
    all_info = self.cfg.GetAllInstancesInfo()
2756
    if self.do_locking:
2757
      instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
2758
    elif self.wanted != locking.ALL_SET:
2759
      instance_names = self.wanted
2760
      missing = set(instance_names).difference(all_info.keys())
2761
      if missing:
2762
        raise errors.OpExecError(
2763
          "Some instances were removed before retrieving their data: %s"
2764
          % missing)
2765
    else:
2766
      instance_names = all_info.keys()
2767

    
2768
    instance_names = utils.NiceSort(instance_names)
2769
    instance_list = [all_info[iname] for iname in instance_names]
2770

    
2771
    # begin data gathering
2772

    
2773
    nodes = frozenset([inst.primary_node for inst in instance_list])
2774
    hv_list = list(set([inst.hypervisor for inst in instance_list]))
2775

    
2776
    bad_nodes = []
2777
    if self.do_locking:
2778
      live_data = {}
2779
      node_data = self.rpc.call_all_instances_info(nodes, hv_list)
2780
      for name in nodes:
2781
        result = node_data[name]
2782
        if result:
2783
          live_data.update(result)
2784
        elif result == False:
2785
          bad_nodes.append(name)
2786
        # else no instance is alive
2787
    else:
2788
      live_data = dict([(name, {}) for name in instance_names])
2789

    
2790
    # end data gathering
2791

    
2792
    HVPREFIX = "hv/"
2793
    BEPREFIX = "be/"
2794
    output = []
2795
    for instance in instance_list:
2796
      iout = []
2797
      i_hv = self.cfg.GetClusterInfo().FillHV(instance)
2798
      i_be = self.cfg.GetClusterInfo().FillBE(instance)
2799
      for field in self.op.output_fields:
2800
        st_match = self._FIELDS_STATIC.Matches(field)
2801
        if field == "name":
2802
          val = instance.name
2803
        elif field == "os":
2804
          val = instance.os
2805
        elif field == "pnode":
2806
          val = instance.primary_node
2807
        elif field == "snodes":
2808
          val = list(instance.secondary_nodes)
2809
        elif field == "admin_state":
2810
          val = (instance.status != "down")
2811
        elif field == "oper_state":
2812
          if instance.primary_node in bad_nodes:
2813
            val = None
2814
          else:
2815
            val = bool(live_data.get(instance.name))
2816
        elif field == "status":
2817
          if instance.primary_node in bad_nodes:
2818
            val = "ERROR_nodedown"
2819
          else:
2820
            running = bool(live_data.get(instance.name))
2821
            if running:
2822
              if instance.status != "down":
2823
                val = "running"
2824
              else:
2825
                val = "ERROR_up"
2826
            else:
2827
              if instance.status != "down":
2828
                val = "ERROR_down"
2829
              else:
2830
                val = "ADMIN_down"
2831
        elif field == "oper_ram":
2832
          if instance.primary_node in bad_nodes:
2833
            val = None
2834
          elif instance.name in live_data:
2835
            val = live_data[instance.name].get("memory", "?")
2836
          else:
2837
            val = "-"
2838
        elif field == "disk_template":
2839
          val = instance.disk_template
2840
        elif field == "ip":
2841
          val = instance.nics[0].ip
2842
        elif field == "bridge":
2843
          val = instance.nics[0].bridge
2844
        elif field == "mac":
2845
          val = instance.nics[0].mac
2846
        elif field == "sda_size" or field == "sdb_size":
2847
          idx = ord(field[2]) - ord('a')
2848
          try:
2849
            val = instance.FindDisk(idx).size
2850
          except errors.OpPrereqError:
2851
            val = None
2852
        elif field == "tags":
2853
          val = list(instance.GetTags())
2854
        elif field == "serial_no":
2855
          val = instance.serial_no
2856
        elif field == "network_port":
2857
          val = instance.network_port
2858
        elif field == "hypervisor":
2859
          val = instance.hypervisor
2860
        elif field == "hvparams":
2861
          val = i_hv
2862
        elif (field.startswith(HVPREFIX) and
2863
              field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
2864
          val = i_hv.get(field[len(HVPREFIX):], None)
2865
        elif field == "beparams":
2866
          val = i_be
2867
        elif (field.startswith(BEPREFIX) and
2868
              field[len(BEPREFIX):] in constants.BES_PARAMETERS):
2869
          val = i_be.get(field[len(BEPREFIX):], None)
2870
        elif st_match and st_match.groups():
2871
          # matches a variable list
2872
          st_groups = st_match.groups()
2873
          if st_groups and st_groups[0] == "disk":
2874
            if st_groups[1] == "count":
2875
              val = len(instance.disks)
2876
            elif st_groups[1] == "sizes":
2877
              val = [disk.size for disk in instance.disks]
2878
            elif st_groups[1] == "size":
2879
              try:
2880
                val = instance.FindDisk(st_groups[2]).size
2881
              except errors.OpPrereqError:
2882
                val = None
2883
            else:
2884
              assert False, "Unhandled disk parameter"
2885
          elif st_groups[0] == "nic":
2886
            if st_groups[1] == "count":
2887
              val = len(instance.nics)
2888
            elif st_groups[1] == "macs":
2889
              val = [nic.mac for nic in instance.nics]
2890
            elif st_groups[1] == "ips":
2891
              val = [nic.ip for nic in instance.nics]
2892
            elif st_groups[1] == "bridges":
2893
              val = [nic.bridge for nic in instance.nics]
2894
            else:
2895
              # index-based item
2896
              nic_idx = int(st_groups[2])
2897
              if nic_idx >= len(instance.nics):
2898
                val = None
2899
              else:
2900
                if st_groups[1] == "mac":
2901
                  val = instance.nics[nic_idx].mac
2902
                elif st_groups[1] == "ip":
2903
                  val = instance.nics[nic_idx].ip
2904
                elif st_groups[1] == "bridge":
2905
                  val = instance.nics[nic_idx].bridge
2906
                else:
2907
                  assert False, "Unhandled NIC parameter"
2908
          else:
2909
            assert False, "Unhandled variable parameter"
2910
        else:
2911
          raise errors.ParameterError(field)
2912
        iout.append(val)
2913
      output.append(iout)
2914

    
2915
    return output
2916

    
2917

    
2918
class LUFailoverInstance(LogicalUnit):
2919
  """Failover an instance.
2920

2921
  """
2922
  HPATH = "instance-failover"
2923
  HTYPE = constants.HTYPE_INSTANCE
2924
  _OP_REQP = ["instance_name", "ignore_consistency"]
2925
  REQ_BGL = False
2926

    
2927
  def ExpandNames(self):
2928
    self._ExpandAndLockInstance()
2929
    self.needed_locks[locking.LEVEL_NODE] = []
2930
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2931

    
2932
  def DeclareLocks(self, level):
2933
    if level == locking.LEVEL_NODE:
2934
      self._LockInstancesNodes()
2935

    
2936
  def BuildHooksEnv(self):
2937
    """Build hooks env.
2938

2939
    This runs on master, primary and secondary nodes of the instance.
2940

2941
    """
2942
    env = {
2943
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2944
      }
2945
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2946
    nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
2947
    return env, nl, nl
2948

    
2949
  def CheckPrereq(self):
2950
    """Check prerequisites.
2951

2952
    This checks that the instance is in the cluster.
2953

2954
    """
2955
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2956
    assert self.instance is not None, \
2957
      "Cannot retrieve locked instance %s" % self.op.instance_name
2958

    
2959
    bep = self.cfg.GetClusterInfo().FillBE(instance)
2960
    if instance.disk_template not in constants.DTS_NET_MIRROR:
2961
      raise errors.OpPrereqError("Instance's disk layout is not"
2962
                                 " network mirrored, cannot failover.")
2963

    
2964
    secondary_nodes = instance.secondary_nodes
2965
    if not secondary_nodes:
2966
      raise errors.ProgrammerError("no secondary node but using "
2967
                                   "a mirrored disk template")
2968

    
2969
    target_node = secondary_nodes[0]
2970
    # check memory requirements on the secondary node
2971
    _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
2972
                         instance.name, bep[constants.BE_MEMORY],
2973
                         instance.hypervisor)
2974

    
2975
    # check bridge existance
2976
    brlist = [nic.bridge for nic in instance.nics]
2977
    if not self.rpc.call_bridges_exist(target_node, brlist):
2978
      raise errors.OpPrereqError("One or more target bridges %s does not"
2979
                                 " exist on destination node '%s'" %
2980
                                 (brlist, target_node))
2981

    
2982
  def Exec(self, feedback_fn):
2983
    """Failover an instance.
2984

2985
    The failover is done by shutting it down on its present node and
2986
    starting it on the secondary.
2987

2988
    """
2989
    instance = self.instance
2990

    
2991
    source_node = instance.primary_node
2992
    target_node = instance.secondary_nodes[0]
2993

    
2994
    feedback_fn("* checking disk consistency between source and target")
2995
    for dev in instance.disks:
2996
      # for drbd, these are drbd over lvm
2997
      if not _CheckDiskConsistency(self, dev, target_node, False):
2998
        if instance.status == "up" and not self.op.ignore_consistency:
2999
          raise errors.OpExecError("Disk %s is degraded on target node,"
3000
                                   " aborting failover." % dev.iv_name)
3001

    
3002
    feedback_fn("* shutting down instance on source node")
3003
    logging.info("Shutting down instance %s on node %s",
3004
                 instance.name, source_node)
3005

    
3006
    if not self.rpc.call_instance_shutdown(source_node, instance):
3007
      if self.op.ignore_consistency:
3008
        self.proc.LogWarning("Could not shutdown instance %s on node %s."
3009
                             " Proceeding"
3010
                             " anyway. Please make sure node %s is down",
3011
                             instance.name, source_node, source_node)
3012
      else:
3013
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
3014
                                 (instance.name, source_node))
3015

    
3016
    feedback_fn("* deactivating the instance's disks on source node")
3017
    if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
3018
      raise errors.OpExecError("Can't shut down the instance's disks.")
3019

    
3020
    instance.primary_node = target_node
3021
    # distribute new instance config to the other nodes
3022
    self.cfg.Update(instance)
3023

    
3024
    # Only start the instance if it's marked as up
3025
    if instance.status == "up":
3026
      feedback_fn("* activating the instance's disks on target node")
3027
      logging.info("Starting instance %s on node %s",
3028
                   instance.name, target_node)
3029

    
3030
      disks_ok, dummy = _AssembleInstanceDisks(self, instance,
3031
                                               ignore_secondaries=True)
3032
      if not disks_ok:
3033
        _ShutdownInstanceDisks(self, instance)
3034
        raise errors.OpExecError("Can't activate the instance's disks")
3035

    
3036
      feedback_fn("* starting the instance on the target node")
3037
      if not self.rpc.call_instance_start(target_node, instance, None):
3038
        _ShutdownInstanceDisks(self, instance)
3039
        raise errors.OpExecError("Could not start instance %s on node %s." %
3040
                                 (instance.name, target_node))
3041

    
3042

    
3043
def _CreateBlockDevOnPrimary(lu, node, instance, device, info):
3044
  """Create a tree of block devices on the primary node.
3045

3046
  This always creates all devices.
3047

3048
  """
3049
  if device.children:
3050
    for child in device.children:
3051
      if not _CreateBlockDevOnPrimary(lu, node, instance, child, info):
3052
        return False
3053

    
3054
  lu.cfg.SetDiskID(device, node)
3055
  new_id = lu.rpc.call_blockdev_create(node, device, device.size,
3056
                                       instance.name, True, info)
3057
  if not new_id:
3058
    return False
3059
  if device.physical_id is None:
3060
    device.physical_id = new_id
3061
  return True
3062

    
3063

    
3064
def _CreateBlockDevOnSecondary(lu, node, instance, device, force, info):
3065
  """Create a tree of block devices on a secondary node.
3066

3067
  If this device type has to be created on secondaries, create it and
3068
  all its children.
3069

3070
  If not, just recurse to children keeping the same 'force' value.
3071

3072
  """
3073
  if device.CreateOnSecondary():
3074
    force = True
3075
  if device.children:
3076
    for child in device.children:
3077
      if not _CreateBlockDevOnSecondary(lu, node, instance,
3078
                                        child, force, info):
3079
        return False
3080

    
3081
  if not force:
3082
    return True
3083
  lu.cfg.SetDiskID(device, node)
3084
  new_id = lu.rpc.call_blockdev_create(node, device, device.size,
3085
                                       instance.name, False, info)
3086
  if not new_id:
3087
    return False
3088
  if device.physical_id is None:
3089
    device.physical_id = new_id
3090
  return True
3091

    
3092

    
3093
def _GenerateUniqueNames(lu, exts):
3094
  """Generate a suitable LV name.
3095

3096
  This will generate a logical volume name for the given instance.
3097

3098
  """
3099
  results = []
3100
  for val in exts:
3101
    new_id = lu.cfg.GenerateUniqueID()
3102
    results.append("%s%s" % (new_id, val))
3103
  return results
3104

    
3105

    
3106
def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
3107
                         p_minor, s_minor):
3108
  """Generate a drbd8 device complete with its children.
3109

3110
  """
3111
  port = lu.cfg.AllocatePort()
3112
  vgname = lu.cfg.GetVGName()
3113
  shared_secret = lu.cfg.GenerateDRBDSecret()
3114
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3115
                          logical_id=(vgname, names[0]))
3116
  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3117
                          logical_id=(vgname, names[1]))
3118
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
3119
                          logical_id=(primary, secondary, port,
3120
                                      p_minor, s_minor,
3121
                                      shared_secret),
3122
                          children=[dev_data, dev_meta],
3123
                          iv_name=iv_name)
3124
  return drbd_dev
3125

    
3126

    
3127
def _GenerateDiskTemplate(lu, template_name,
3128
                          instance_name, primary_node,
3129
                          secondary_nodes, disk_info,
3130
                          file_storage_dir, file_driver):
3131
  """Generate the entire disk layout for a given template type.
3132

3133
  """
3134
  #TODO: compute space requirements
3135

    
3136
  vgname = lu.cfg.GetVGName()
3137
  disk_count = len(disk_info)
3138
  disks = []
3139
  if template_name == constants.DT_DISKLESS:
3140
    pass
3141
  elif template_name == constants.DT_PLAIN:
3142
    if len(secondary_nodes) != 0:
3143
      raise errors.ProgrammerError("Wrong template configuration")
3144

    
3145
    names = _GenerateUniqueNames(lu, [".disk%d" % i
3146
                                      for i in range(disk_count)])
3147
    for idx, disk in enumerate(disk_info):
3148
      disk_dev = objects.Disk(dev_type=constants.LD_LV, size=disk["size"],
3149
                              logical_id=(vgname, names[idx]),
3150
                              iv_name = "disk/%d" % idx)
3151
      disks.append(disk_dev)
3152
  elif template_name == constants.DT_DRBD8:
3153
    if len(secondary_nodes) != 1:
3154
      raise errors.ProgrammerError("Wrong template configuration")
3155
    remote_node = secondary_nodes[0]
3156
    minors = lu.cfg.AllocateDRBDMinor(
3157
      [primary_node, remote_node] * len(disk_info), instance_name)
3158

    
3159
    names = _GenerateUniqueNames(lu,
3160
                                 [".disk%d_%s" % (i, s)
3161
                                  for i in range(disk_count)
3162
                                  for s in ("data", "meta")
3163
                                  ])
3164
    for idx, disk in enumerate(disk_info):
3165
      disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
3166
                                      disk["size"], names[idx*2:idx*2+2],
3167
                                      "disk/%d" % idx,
3168
                                      minors[idx*2], minors[idx*2+1])
3169
      disks.append(disk_dev)
3170
  elif template_name == constants.DT_FILE:
3171
    if len(secondary_nodes) != 0:
3172
      raise errors.ProgrammerError("Wrong template configuration")
3173

    
3174
    for idx, disk in enumerate(disk_info):
3175

    
3176
      disk_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk["size"],
3177
                              iv_name="disk/%d" % idx,
3178
                              logical_id=(file_driver,
3179
                                          "%s/disk%d" % (file_storage_dir,
3180
                                                         idx)))
3181
      disks.append(disk_dev)
3182
  else:
3183
    raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
3184
  return disks
3185

    
3186

    
3187
def _GetInstanceInfoText(instance):
3188
  """Compute that text that should be added to the disk's metadata.
3189

3190
  """
3191
  return "originstname+%s" % instance.name
3192

    
3193

    
3194
def _CreateDisks(lu, instance):
3195
  """Create all disks for an instance.
3196

3197
  This abstracts away some work from AddInstance.
3198

3199
  @type lu: L{LogicalUnit}
3200
  @param lu: the logical unit on whose behalf we execute
3201
  @type instance: L{objects.Instance}
3202
  @param instance: the instance whose disks we should create
3203
  @rtype: boolean
3204
  @return: the success of the creation
3205

3206
  """
3207
  info = _GetInstanceInfoText(instance)
3208

    
3209
  if instance.disk_template == constants.DT_FILE:
3210
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3211
    result = lu.rpc.call_file_storage_dir_create(instance.primary_node,
3212
                                                 file_storage_dir)
3213

    
3214
    if not result:
3215
      logging.error("Could not connect to node '%s'", instance.primary_node)
3216
      return False
3217

    
3218
    if not result[0]:
3219
      logging.error("Failed to create directory '%s'", file_storage_dir)
3220
      return False
3221

    
3222
  for device in instance.disks:
3223
    logging.info("Creating volume %s for instance %s",
3224
                 device.iv_name, instance.name)
3225
    #HARDCODE
3226
    for secondary_node in instance.secondary_nodes:
3227
      if not _CreateBlockDevOnSecondary(lu, secondary_node, instance,
3228
                                        device, False, info):
3229
        logging.error("Failed to create volume %s (%s) on secondary node %s!",
3230
                      device.iv_name, device, secondary_node)
3231
        return False
3232
    #HARDCODE
3233
    if not _CreateBlockDevOnPrimary(lu, instance.primary_node,
3234
                                    instance, device, info):
3235
      logging.error("Failed to create volume %s on primary!", device.iv_name)
3236
      return False
3237

    
3238
  return True
3239

    
3240

    
3241
def _RemoveDisks(lu, instance):
3242
  """Remove all disks for an instance.
3243

3244
  This abstracts away some work from `AddInstance()` and
3245
  `RemoveInstance()`. Note that in case some of the devices couldn't
3246
  be removed, the removal will continue with the other ones (compare
3247
  with `_CreateDisks()`).
3248

3249
  @type lu: L{LogicalUnit}
3250
  @param lu: the logical unit on whose behalf we execute
3251
  @type instance: L{objects.Instance}
3252
  @param instance: the instance whose disks we should remove
3253
  @rtype: boolean
3254
  @return: the success of the removal
3255

3256
  """
3257
  logging.info("Removing block devices for instance %s", instance.name)
3258

    
3259
  result = True
3260
  for device in instance.disks:
3261
    for node, disk in device.ComputeNodeTree(instance.primary_node):
3262
      lu.cfg.SetDiskID(disk, node)
3263
      if not lu.rpc.call_blockdev_remove(node, disk):
3264
        lu.proc.LogWarning("Could not remove block device %s on node %s,"
3265
                           " continuing anyway", device.iv_name, node)
3266
        result = False
3267

    
3268
  if instance.disk_template == constants.DT_FILE:
3269
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3270
    if not lu.rpc.call_file_storage_dir_remove(instance.primary_node,
3271
                                               file_storage_dir):
3272
      logging.error("Could not remove directory '%s'", file_storage_dir)
3273
      result = False
3274

    
3275
  return result
3276

    
3277

    
3278
def _ComputeDiskSize(disk_template, disks):
3279
  """Compute disk size requirements in the volume group
3280

3281
  """
3282
  # Required free disk space as a function of disk and swap space
3283
  req_size_dict = {
3284
    constants.DT_DISKLESS: None,
3285
    constants.DT_PLAIN: sum(d["size"] for d in disks),
3286
    # 128 MB are added for drbd metadata for each disk
3287
    constants.DT_DRBD8: sum(d["size"] + 128 for d in disks),
3288
    constants.DT_FILE: None,
3289
  }
3290

    
3291
  if disk_template not in req_size_dict:
3292
    raise errors.ProgrammerError("Disk template '%s' size requirement"
3293
                                 " is unknown" %  disk_template)
3294

    
3295
  return req_size_dict[disk_template]
3296

    
3297

    
3298
def _CheckHVParams(lu, nodenames, hvname, hvparams):
3299
  """Hypervisor parameter validation.
3300

3301
  This function abstract the hypervisor parameter validation to be
3302
  used in both instance create and instance modify.
3303

3304
  @type lu: L{LogicalUnit}
3305
  @param lu: the logical unit for which we check
3306
  @type nodenames: list
3307
  @param nodenames: the list of nodes on which we should check
3308
  @type hvname: string
3309
  @param hvname: the name of the hypervisor we should use
3310
  @type hvparams: dict
3311
  @param hvparams: the parameters which we need to check
3312
  @raise errors.OpPrereqError: if the parameters are not valid
3313

3314
  """
3315
  hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
3316
                                                  hvname,
3317
                                                  hvparams)
3318
  for node in nodenames:
3319
    info = hvinfo.get(node, None)
3320
    if not info or not isinstance(info, (tuple, list)):
3321
      raise errors.OpPrereqError("Cannot get current information"
3322
                                 " from node '%s' (%s)" % (node, info))
3323
    if not info[0]:
3324
      raise errors.OpPrereqError("Hypervisor parameter validation failed:"
3325
                                 " %s" % info[1])
3326

    
3327

    
3328
class LUCreateInstance(LogicalUnit):
3329
  """Create an instance.
3330

3331
  """
3332
  HPATH = "instance-add"
3333
  HTYPE = constants.HTYPE_INSTANCE
3334
  _OP_REQP = ["instance_name", "disks", "disk_template",
3335
              "mode", "start",
3336
              "wait_for_sync", "ip_check", "nics",
3337
              "hvparams", "beparams"]
3338
  REQ_BGL = False
3339

    
3340
  def _ExpandNode(self, node):
3341
    """Expands and checks one node name.
3342

3343
    """
3344
    node_full = self.cfg.ExpandNodeName(node)
3345
    if node_full is None:
3346
      raise errors.OpPrereqError("Unknown node %s" % node)
3347
    return node_full
3348

    
3349
  def ExpandNames(self):
3350
    """ExpandNames for CreateInstance.
3351

3352
    Figure out the right locks for instance creation.
3353

3354
    """
3355
    self.needed_locks = {}
3356

    
3357
    # set optional parameters to none if they don't exist
3358
    for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
3359
      if not hasattr(self.op, attr):
3360
        setattr(self.op, attr, None)
3361

    
3362
    # cheap checks, mostly valid constants given
3363

    
3364
    # verify creation mode
3365
    if self.op.mode not in (constants.INSTANCE_CREATE,
3366
                            constants.INSTANCE_IMPORT):
3367
      raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
3368
                                 self.op.mode)
3369

    
3370
    # disk template and mirror node verification
3371
    if self.op.disk_template not in constants.DISK_TEMPLATES:
3372
      raise errors.OpPrereqError("Invalid disk template name")
3373

    
3374
    if self.op.hypervisor is None:
3375
      self.op.hypervisor = self.cfg.GetHypervisorType()
3376

    
3377
    cluster = self.cfg.GetClusterInfo()
3378
    enabled_hvs = cluster.enabled_hypervisors
3379
    if self.op.hypervisor not in enabled_hvs:
3380
      raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
3381
                                 " cluster (%s)" % (self.op.hypervisor,
3382
                                  ",".join(enabled_hvs)))
3383

    
3384
    # check hypervisor parameter syntax (locally)
3385

    
3386
    filled_hvp = cluster.FillDict(cluster.hvparams[self.op.hypervisor],
3387
                                  self.op.hvparams)
3388
    hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
3389
    hv_type.CheckParameterSyntax(filled_hvp)
3390

    
3391
    # fill and remember the beparams dict
3392
    self.be_full = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
3393
                                    self.op.beparams)
3394

    
3395
    #### instance parameters check
3396

    
3397
    # instance name verification
3398
    hostname1 = utils.HostInfo(self.op.instance_name)
3399
    self.op.instance_name = instance_name = hostname1.name
3400

    
3401
    # this is just a preventive check, but someone might still add this
3402
    # instance in the meantime, and creation will fail at lock-add time
3403
    if instance_name in self.cfg.GetInstanceList():
3404
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3405
                                 instance_name)
3406

    
3407
    self.add_locks[locking.LEVEL_INSTANCE] = instance_name
3408

    
3409
    # NIC buildup
3410
    self.nics = []
3411
    for nic in self.op.nics:
3412
      # ip validity checks
3413
      ip = nic.get("ip", None)
3414
      if ip is None or ip.lower() == "none":
3415
        nic_ip = None
3416
      elif ip.lower() == constants.VALUE_AUTO:
3417
        nic_ip = hostname1.ip
3418
      else:
3419
        if not utils.IsValidIP(ip):
3420
          raise errors.OpPrereqError("Given IP address '%s' doesn't look"
3421
                                     " like a valid IP" % ip)
3422
        nic_ip = ip
3423

    
3424
      # MAC address verification
3425
      mac = nic.get("mac", constants.VALUE_AUTO)
3426
      if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
3427
        if not utils.IsValidMac(mac.lower()):
3428
          raise errors.OpPrereqError("Invalid MAC address specified: %s" %
3429
                                     mac)
3430
      # bridge verification
3431
      bridge = nic.get("bridge", self.cfg.GetDefBridge())
3432
      self.nics.append(objects.NIC(mac=mac, ip=nic_ip, bridge=bridge))
3433

    
3434
    # disk checks/pre-build
3435
    self.disks = []
3436
    for disk in self.op.disks:
3437
      mode = disk.get("mode", constants.DISK_RDWR)
3438
      if mode not in constants.DISK_ACCESS_SET:
3439
        raise errors.OpPrereqError("Invalid disk access mode '%s'" %
3440
                                   mode)
3441
      size = disk.get("size", None)
3442
      if size is None:
3443
        raise errors.OpPrereqError("Missing disk size")
3444
      try:
3445
        size = int(size)
3446
      except ValueError:
3447
        raise errors.OpPrereqError("Invalid disk size '%s'" % size)
3448
      self.disks.append({"size": size, "mode": mode})
3449

    
3450
    # used in CheckPrereq for ip ping check
3451
    self.check_ip = hostname1.ip
3452

    
3453
    # file storage checks
3454
    if (self.op.file_driver and
3455
        not self.op.file_driver in constants.FILE_DRIVER):
3456
      raise errors.OpPrereqError("Invalid file driver name '%s'" %
3457
                                 self.op.file_driver)
3458

    
3459
    if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
3460
      raise errors.OpPrereqError("File storage directory path not absolute")
3461

    
3462
    ### Node/iallocator related checks
3463
    if [self.op.iallocator, self.op.pnode].count(None) != 1:
3464
      raise errors.OpPrereqError("One and only one of iallocator and primary"
3465
                                 " node must be given")
3466

    
3467
    if self.op.iallocator:
3468
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3469
    else:
3470
      self.op.pnode = self._ExpandNode(self.op.pnode)
3471
      nodelist = [self.op.pnode]
3472
      if self.op.snode is not None:
3473
        self.op.snode = self._ExpandNode(self.op.snode)
3474
        nodelist.append(self.op.snode)
3475
      self.needed_locks[locking.LEVEL_NODE] = nodelist
3476

    
3477
    # in case of import lock the source node too
3478
    if self.op.mode == constants.INSTANCE_IMPORT:
3479
      src_node = getattr(self.op, "src_node", None)
3480
      src_path = getattr(self.op, "src_path", None)
3481

    
3482
      if src_node is None or src_path is None:
3483
        raise errors.OpPrereqError("Importing an instance requires source"
3484
                                   " node and path options")
3485

    
3486
      if not os.path.isabs(src_path):
3487
        raise errors.OpPrereqError("The source path must be absolute")
3488

    
3489
      self.op.src_node = src_node = self._ExpandNode(src_node)
3490
      if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
3491
        self.needed_locks[locking.LEVEL_NODE].append(src_node)
3492

    
3493
    else: # INSTANCE_CREATE
3494
      if getattr(self.op, "os_type", None) is None:
3495
        raise errors.OpPrereqError("No guest OS specified")
3496

    
3497
  def _RunAllocator(self):
3498
    """Run the allocator based on input opcode.
3499

3500
    """
3501
    nics = [n.ToDict() for n in self.nics]
3502
    ial = IAllocator(self,
3503
                     mode=constants.IALLOCATOR_MODE_ALLOC,
3504
                     name=self.op.instance_name,
3505
                     disk_template=self.op.disk_template,
3506
                     tags=[],
3507
                     os=self.op.os_type,
3508
                     vcpus=self.be_full[constants.BE_VCPUS],
3509
                     mem_size=self.be_full[constants.BE_MEMORY],
3510
                     disks=self.disks,
3511
                     nics=nics,
3512
                     )
3513

    
3514
    ial.Run(self.op.iallocator)
3515

    
3516
    if not ial.success:
3517
      raise errors.OpPrereqError("Can't compute nodes using"
3518
                                 " iallocator '%s': %s" % (self.op.iallocator,
3519
                                                           ial.info))
3520
    if len(ial.nodes) != ial.required_nodes:
3521
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3522
                                 " of nodes (%s), required %s" %
3523
                                 (self.op.iallocator, len(ial.nodes),
3524
                                  ial.required_nodes))
3525
    self.op.pnode = ial.nodes[0]
3526
    self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
3527
                 self.op.instance_name, self.op.iallocator,
3528
                 ", ".join(ial.nodes))
3529
    if ial.required_nodes == 2:
3530
      self.op.snode = ial.nodes[1]
3531

    
3532
  def BuildHooksEnv(self):
3533
    """Build hooks env.
3534

3535
    This runs on master, primary and secondary nodes of the instance.
3536

3537
    """
3538
    env = {
3539
      "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
3540
      "INSTANCE_DISK_SIZE": ",".join(str(d["size"]) for d in self.disks),
3541
      "INSTANCE_ADD_MODE": self.op.mode,
3542
      }
3543
    if self.op.mode == constants.INSTANCE_IMPORT:
3544
      env["INSTANCE_SRC_NODE"] = self.op.src_node
3545
      env["INSTANCE_SRC_PATH"] = self.op.src_path
3546
      env["INSTANCE_SRC_IMAGES"] = self.src_images
3547

    
3548
    env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
3549
      primary_node=self.op.pnode,
3550
      secondary_nodes=self.secondaries,
3551
      status=self.instance_status,
3552
      os_type=self.op.os_type,
3553
      memory=self.be_full[constants.BE_MEMORY],
3554
      vcpus=self.be_full[constants.BE_VCPUS],
3555
      nics=[(n.ip, n.bridge, n.mac) for n in self.nics],
3556
    ))
3557

    
3558
    nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
3559
          self.secondaries)
3560
    return env, nl, nl
3561

    
3562

    
3563
  def CheckPrereq(self):
3564
    """Check prerequisites.
3565

3566
    """
3567
    if (not self.cfg.GetVGName() and
3568
        self.op.disk_template not in constants.DTS_NOT_LVM):
3569
      raise errors.OpPrereqError("Cluster does not support lvm-based"
3570
                                 " instances")
3571

    
3572

    
3573
    if self.op.mode == constants.INSTANCE_IMPORT:
3574
      src_node = self.op.src_node
3575
      src_path = self.op.src_path
3576

    
3577
      export_info = self.rpc.call_export_info(src_node, src_path)
3578

    
3579
      if not export_info:
3580
        raise errors.OpPrereqError("No export found in dir %s" % src_path)
3581

    
3582
      if not export_info.has_section(constants.INISECT_EXP):
3583
        raise errors.ProgrammerError("Corrupted export config")
3584

    
3585
      ei_version = export_info.get(constants.INISECT_EXP, 'version')
3586
      if (int(ei_version) != constants.EXPORT_VERSION):
3587
        raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3588
                                   (ei_version, constants.EXPORT_VERSION))
3589

    
3590
      # Check that the new instance doesn't have less disks than the export
3591
      instance_disks = len(self.disks)
3592
      export_disks = export_info.getint(constants.INISECT_INS, 'disk_count')
3593
      if instance_disks < export_disks:
3594
        raise errors.OpPrereqError("Not enough disks to import."
3595
                                   " (instance: %d, export: %d)" %
3596
                                   (2, export_disks))
3597

    
3598
      self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3599
      disk_images = []
3600
      for idx in range(export_disks):
3601
        option = 'disk%d_dump' % idx
3602
        if export_info.has_option(constants.INISECT_INS, option):
3603
          # FIXME: are the old os-es, disk sizes, etc. useful?
3604
          export_name = export_info.get(constants.INISECT_INS, option)
3605
          image = os.path.join(src_path, export_name)
3606
          disk_images.append(image)
3607
        else:
3608
          disk_images.append(False)
3609

    
3610
      self.src_images = disk_images
3611

    
3612
      if self.op.mac == constants.VALUE_AUTO:
3613
        old_name = export_info.get(constants.INISECT_INS, 'name')
3614
        if self.op.instance_name == old_name:
3615
          # FIXME: adjust every nic, when we'll be able to create instances
3616
          # with more than one
3617
          if int(export_info.get(constants.INISECT_INS, 'nic_count')) >= 1:
3618
            self.op.mac = export_info.get(constants.INISECT_INS, 'nic_0_mac')
3619

    
3620
    # ip ping checks (we use the same ip that was resolved in ExpandNames)
3621

    
3622
    if self.op.start and not self.op.ip_check:
3623
      raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3624
                                 " adding an instance in start mode")
3625

    
3626
    if self.op.ip_check:
3627
      if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
3628
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
3629
                                   (self.check_ip, self.op.instance_name))
3630

    
3631
    #### allocator run
3632

    
3633
    if self.op.iallocator is not None:
3634
      self._RunAllocator()
3635

    
3636
    #### node related checks
3637

    
3638
    # check primary node
3639
    self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
3640
    assert self.pnode is not None, \
3641
      "Cannot retrieve locked node %s" % self.op.pnode
3642
    self.secondaries = []
3643

    
3644
    # mirror node verification
3645
    if self.op.disk_template in constants.DTS_NET_MIRROR:
3646
      if self.op.snode is None:
3647
        raise errors.OpPrereqError("The networked disk templates need"
3648
                                   " a mirror node")
3649
      if self.op.snode == pnode.name:
3650
        raise errors.OpPrereqError("The secondary node cannot be"
3651
                                   " the primary node.")
3652
      self.secondaries.append(self.op.snode)
3653

    
3654
    nodenames = [pnode.name] + self.secondaries
3655

    
3656
    req_size = _ComputeDiskSize(self.op.disk_template,
3657
                                self.disks)
3658

    
3659
    # Check lv size requirements
3660
    if req_size is not None:
3661
      nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
3662
                                         self.op.hypervisor)
3663
      for node in nodenames:
3664
        info = nodeinfo.get(node, None)
3665
        if not info:
3666
          raise errors.OpPrereqError("Cannot get current information"
3667
                                     " from node '%s'" % node)
3668
        vg_free = info.get('vg_free', None)
3669
        if not isinstance(vg_free, int):
3670
          raise errors.OpPrereqError("Can't compute free disk space on"
3671
                                     " node %s" % node)
3672
        if req_size > info['vg_free']:
3673
          raise errors.OpPrereqError("Not enough disk space on target node %s."
3674
                                     " %d MB available, %d MB required" %
3675
                                     (node, info['vg_free'], req_size))
3676

    
3677
    _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
3678

    
3679
    # os verification
3680
    os_obj = self.rpc.call_os_get(pnode.name, self.op.os_type)
3681
    if not os_obj:
3682
      raise errors.OpPrereqError("OS '%s' not in supported os list for"
3683
                                 " primary node"  % self.op.os_type)
3684

    
3685
    # bridge check on primary node
3686
    bridges = [n.bridge for n in self.nics]
3687
    if not self.rpc.call_bridges_exist(self.pnode.name, bridges):
3688
      raise errors.OpPrereqError("one of the target bridges '%s' does not"
3689
                                 " exist on"
3690
                                 " destination node '%s'" %
3691
                                 (",".join(bridges), pnode.name))
3692

    
3693
    # memory check on primary node
3694
    if self.op.start:
3695
      _CheckNodeFreeMemory(self, self.pnode.name,
3696
                           "creating instance %s" % self.op.instance_name,
3697
                           self.be_full[constants.BE_MEMORY],
3698
                           self.op.hypervisor)
3699

    
3700
    if self.op.start:
3701
      self.instance_status = 'up'
3702
    else:
3703
      self.instance_status = 'down'
3704

    
3705
  def Exec(self, feedback_fn):
3706
    """Create and add the instance to the cluster.
3707

3708
    """
3709
    instance = self.op.instance_name
3710
    pnode_name = self.pnode.name
3711

    
3712
    for nic in self.nics:
3713
      if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
3714
        nic.mac = self.cfg.GenerateMAC()
3715

    
3716
    ht_kind = self.op.hypervisor
3717
    if ht_kind in constants.HTS_REQ_PORT:
3718
      network_port = self.cfg.AllocatePort()
3719
    else:
3720
      network_port = None
3721

    
3722
    ##if self.op.vnc_bind_address is None:
3723
    ##  self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
3724

    
3725
    # this is needed because os.path.join does not accept None arguments
3726
    if self.op.file_storage_dir is None:
3727
      string_file_storage_dir = ""
3728
    else:
3729
      string_file_storage_dir = self.op.file_storage_dir
3730

    
3731
    # build the full file storage dir path
3732
    file_storage_dir = os.path.normpath(os.path.join(
3733
                                        self.cfg.GetFileStorageDir(),
3734
                                        string_file_storage_dir, instance))
3735

    
3736

    
3737
    disks = _GenerateDiskTemplate(self,
3738
                                  self.op.disk_template,
3739
                                  instance, pnode_name,
3740
                                  self.secondaries,
3741
                                  self.disks,
3742
                                  file_storage_dir,
3743
                                  self.op.file_driver)
3744

    
3745
    iobj = objects.Instance(name=instance, os=self.op.os_type,
3746
                            primary_node=pnode_name,
3747
                            nics=self.nics, disks=disks,
3748
                            disk_template=self.op.disk_template,
3749
                            status=self.instance_status,
3750
                            network_port=network_port,
3751
                            beparams=self.op.beparams,
3752
                            hvparams=self.op.hvparams,
3753
                            hypervisor=self.op.hypervisor,
3754
                            )
3755

    
3756
    feedback_fn("* creating instance disks...")
3757
    if not _CreateDisks(self, iobj):
3758
      _RemoveDisks(self, iobj)
3759
      self.cfg.ReleaseDRBDMinors(instance)
3760
      raise errors.OpExecError("Device creation failed, reverting...")
3761

    
3762
    feedback_fn("adding instance %s to cluster config" % instance)
3763

    
3764
    self.cfg.AddInstance(iobj)
3765
    # Declare that we don't want to remove the instance lock anymore, as we've
3766
    # added the instance to the config
3767
    del self.remove_locks[locking.LEVEL_INSTANCE]
3768
    # Remove the temp. assignements for the instance's drbds
3769
    self.cfg.ReleaseDRBDMinors(instance)
3770

    
3771
    if self.op.wait_for_sync:
3772
      disk_abort = not _WaitForSync(self, iobj)
3773
    elif iobj.disk_template in constants.DTS_NET_MIRROR:
3774
      # make sure the disks are not degraded (still sync-ing is ok)
3775
      time.sleep(15)
3776
      feedback_fn("* checking mirrors status")
3777
      disk_abort = not _WaitForSync(self, iobj, oneshot=True)
3778
    else:
3779
      disk_abort = False
3780

    
3781
    if disk_abort:
3782
      _RemoveDisks(self, iobj)
3783
      self.cfg.RemoveInstance(iobj.name)
3784
      # Make sure the instance lock gets removed
3785
      self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
3786
      raise errors.OpExecError("There are some degraded disks for"
3787
                               " this instance")
3788

    
3789
    feedback_fn("creating os for instance %s on node %s" %
3790
                (instance, pnode_name))
3791

    
3792
    if iobj.disk_template != constants.DT_DISKLESS:
3793
      if self.op.mode == constants.INSTANCE_CREATE:
3794
        feedback_fn("* running the instance OS create scripts...")
3795
        if not self.rpc.call_instance_os_add(pnode_name, iobj):
3796
          raise errors.OpExecError("could not add os for instance %s"
3797
                                   " on node %s" %
3798
                                   (instance, pnode_name))
3799

    
3800
      elif self.op.mode == constants.INSTANCE_IMPORT:
3801
        feedback_fn("* running the instance OS import scripts...")
3802
        src_node = self.op.src_node
3803
        src_images = self.src_images
3804
        cluster_name = self.cfg.GetClusterName()
3805
        import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
3806
                                                         src_node, src_images,
3807
                                                         cluster_name)
3808
        for idx, result in enumerate(import_result):
3809
          if not result:
3810
            self.LogWarning("Could not image %s for on instance %s, disk %d,"
3811
                            " on node %s" % (src_images[idx], instance, idx,
3812
                                             pnode_name))
3813
      else:
3814
        # also checked in the prereq part
3815
        raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3816
                                     % self.op.mode)
3817

    
3818
    if self.op.start:
3819
      logging.info("Starting instance %s on node %s", instance, pnode_name)
3820
      feedback_fn("* starting instance...")
3821
      if not self.rpc.call_instance_start(pnode_name, iobj, None):
3822
        raise errors.OpExecError("Could not start instance")
3823

    
3824

    
3825
class LUConnectConsole(NoHooksLU):
3826
  """Connect to an instance's console.
3827

3828
  This is somewhat special in that it returns the command line that
3829
  you need to run on the master node in order to connect to the
3830
  console.
3831

3832
  """
3833
  _OP_REQP = ["instance_name"]
3834
  REQ_BGL = False
3835

    
3836
  def ExpandNames(self):
3837
    self._ExpandAndLockInstance()
3838

    
3839
  def CheckPrereq(self):
3840
    """Check prerequisites.
3841

3842
    This checks that the instance is in the cluster.
3843

3844
    """
3845
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3846
    assert self.instance is not None, \
3847
      "Cannot retrieve locked instance %s" % self.op.instance_name
3848

    
3849
  def Exec(self, feedback_fn):
3850
    """Connect to the console of an instance
3851

3852
    """
3853
    instance = self.instance
3854
    node = instance.primary_node
3855

    
3856
    node_insts = self.rpc.call_instance_list([node],
3857
                                             [instance.hypervisor])[node]
3858
    if node_insts is False:
3859
      raise errors.OpExecError("Can't connect to node %s." % node)
3860

    
3861
    if instance.name not in node_insts:
3862
      raise errors.OpExecError("Instance %s is not running." % instance.name)
3863

    
3864
    logging.debug("Connecting to console of %s on %s", instance.name, node)
3865

    
3866
    hyper = hypervisor.GetHypervisor(instance.hypervisor)
3867
    console_cmd = hyper.GetShellCommandForConsole(instance)
3868

    
3869
    # build ssh cmdline
3870
    return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
3871

    
3872

    
3873
class LUReplaceDisks(LogicalUnit):
3874
  """Replace the disks of an instance.
3875

3876
  """
3877
  HPATH = "mirrors-replace"
3878
  HTYPE = constants.HTYPE_INSTANCE
3879
  _OP_REQP = ["instance_name", "mode", "disks"]
3880
  REQ_BGL = False
3881

    
3882
  def ExpandNames(self):
3883
    self._ExpandAndLockInstance()
3884

    
3885
    if not hasattr(self.op, "remote_node"):
3886
      self.op.remote_node = None
3887

    
3888
    ia_name = getattr(self.op, "iallocator", None)
3889
    if ia_name is not None:
3890
      if self.op.remote_node is not None:
3891
        raise errors.OpPrereqError("Give either the iallocator or the new"
3892
                                   " secondary, not both")
3893
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3894
    elif self.op.remote_node is not None:
3895
      remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
3896
      if remote_node is None:
3897
        raise errors.OpPrereqError("Node '%s' not known" %
3898
                                   self.op.remote_node)
3899
      self.op.remote_node = remote_node
3900
      self.needed_locks[locking.LEVEL_NODE] = [remote_node]
3901
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
3902
    else:
3903
      self.needed_locks[locking.LEVEL_NODE] = []
3904
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3905

    
3906
  def DeclareLocks(self, level):
3907
    # If we're not already locking all nodes in the set we have to declare the
3908
    # instance's primary/secondary nodes.
3909
    if (level == locking.LEVEL_NODE and
3910
        self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
3911
      self._LockInstancesNodes()
3912

    
3913
  def _RunAllocator(self):
3914
    """Compute a new secondary node using an IAllocator.
3915

3916
    """
3917
    ial = IAllocator(self,
3918
                     mode=constants.IALLOCATOR_MODE_RELOC,
3919
                     name=self.op.instance_name,
3920
                     relocate_from=[self.sec_node])
3921

    
3922
    ial.Run(self.op.iallocator)
3923

    
3924
    if not ial.success:
3925
      raise errors.OpPrereqError("Can't compute nodes using"
3926
                                 " iallocator '%s': %s" % (self.op.iallocator,
3927
                                                           ial.info))
3928
    if len(ial.nodes) != ial.required_nodes:
3929
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3930
                                 " of nodes (%s), required %s" %
3931
                                 (len(ial.nodes), ial.required_nodes))
3932
    self.op.remote_node = ial.nodes[0]
3933
    self.LogInfo("Selected new secondary for the instance: %s",
3934
                 self.op.remote_node)
3935

    
3936
  def BuildHooksEnv(self):
3937
    """Build hooks env.
3938

3939
    This runs on the master, the primary and all the secondaries.
3940

3941
    """
3942
    env = {
3943
      "MODE": self.op.mode,
3944
      "NEW_SECONDARY": self.op.remote_node,
3945
      "OLD_SECONDARY": self.instance.secondary_nodes[0],
3946
      }
3947
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3948
    nl = [
3949
      self.cfg.GetMasterNode(),
3950
      self.instance.primary_node,
3951
      ]
3952
    if self.op.remote_node is not None:
3953
      nl.append(self.op.remote_node)
3954
    return env, nl, nl
3955

    
3956
  def CheckPrereq(self):
3957
    """Check prerequisites.
3958

3959
    This checks that the instance is in the cluster.
3960

3961
    """
3962
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3963
    assert instance is not None, \
3964
      "Cannot retrieve locked instance %s" % self.op.instance_name
3965
    self.instance = instance
3966

    
3967
    if instance.disk_template not in constants.DTS_NET_MIRROR:
3968
      raise errors.OpPrereqError("Instance's disk layout is not"
3969
                                 " network mirrored.")
3970

    
3971
    if len(instance.secondary_nodes) != 1:
3972
      raise errors.OpPrereqError("The instance has a strange layout,"
3973
                                 " expected one secondary but found %d" %
3974
                                 len(instance.secondary_nodes))
3975

    
3976
    self.sec_node = instance.secondary_nodes[0]
3977

    
3978
    ia_name = getattr(self.op, "iallocator", None)
3979
    if ia_name is not None:
3980
      self._RunAllocator()
3981

    
3982
    remote_node = self.op.remote_node
3983
    if remote_node is not None:
3984
      self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3985
      assert self.remote_node_info is not None, \
3986
        "Cannot retrieve locked node %s" % remote_node
3987
    else:
3988
      self.remote_node_info = None
3989
    if remote_node == instance.primary_node:
3990
      raise errors.OpPrereqError("The specified node is the primary node of"
3991
                                 " the instance.")
3992
    elif remote_node == self.sec_node:
3993
      if self.op.mode == constants.REPLACE_DISK_SEC:
3994
        # this is for DRBD8, where we can't execute the same mode of
3995
        # replacement as for drbd7 (no different port allocated)
3996
        raise errors.OpPrereqError("Same secondary given, cannot execute"
3997
                                   " replacement")
3998
    if instance.disk_template == constants.DT_DRBD8:
3999
      if (self.op.mode == constants.REPLACE_DISK_ALL and
4000
          remote_node is not None):
4001
        # switch to replace secondary mode
4002
        self.op.mode = constants.REPLACE_DISK_SEC
4003

    
4004
      if self.op.mode == constants.REPLACE_DISK_ALL:
4005
        raise errors.OpPrereqError("Template 'drbd' only allows primary or"
4006
                                   " secondary disk replacement, not"
4007
                                   " both at once")
4008
      elif self.op.mode == constants.REPLACE_DISK_PRI:
4009
        if remote_node is not None:
4010
          raise errors.OpPrereqError("Template 'drbd' does not allow changing"
4011
                                     " the secondary while doing a primary"
4012
                                     " node disk replacement")
4013
        self.tgt_node = instance.primary_node
4014
        self.oth_node = instance.secondary_nodes[0]
4015
      elif self.op.mode == constants.REPLACE_DISK_SEC:
4016
        self.new_node = remote_node # this can be None, in which case
4017
                                    # we don't change the secondary
4018
        self.tgt_node = instance.secondary_nodes[0]
4019
        self.oth_node = instance.primary_node
4020
      else:
4021
        raise errors.ProgrammerError("Unhandled disk replace mode")
4022

    
4023
    if not self.op.disks:
4024
      self.op.disks = range(len(instance.disks))
4025

    
4026
    for disk_idx in self.op.disks:
4027
      instance.FindDisk(disk_idx)
4028

    
4029
  def _ExecD8DiskOnly(self, feedback_fn):
4030
    """Replace a disk on the primary or secondary for dbrd8.
4031

4032
    The algorithm for replace is quite complicated:
4033

4034
      1. for each disk to be replaced:
4035

4036
        1. create new LVs on the target node with unique names
4037
        1. detach old LVs from the drbd device
4038
        1. rename old LVs to name_replaced.<time_t>
4039
        1. rename new LVs to old LVs
4040
        1. attach the new LVs (with the old names now) to the drbd device
4041

4042
      1. wait for sync across all devices
4043

4044
      1. for each modified disk:
4045

4046
        1. remove old LVs (which have the name name_replaces.<time_t>)
4047

4048
    Failures are not very well handled.
4049

4050
    """
4051
    steps_total = 6
4052
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
4053
    instance = self.instance
4054
    iv_names = {}
4055
    vgname = self.cfg.GetVGName()
4056
    # start of work
4057
    cfg = self.cfg
4058
    tgt_node = self.tgt_node
4059
    oth_node = self.oth_node
4060

    
4061
    # Step: check device activation
4062
    self.proc.LogStep(1, steps_total, "check device existence")
4063
    info("checking volume groups")
4064
    my_vg = cfg.GetVGName()
4065
    results = self.rpc.call_vg_list([oth_node, tgt_node])
4066
    if not results:
4067
      raise errors.OpExecError("Can't list volume groups on the nodes")
4068
    for node in oth_node, tgt_node:
4069
      res = results.get(node, False)
4070
      if not res or my_vg not in res:
4071
        raise errors.OpExecError("Volume group '%s' not found on %s" %
4072
                                 (my_vg, node))
4073
    for idx, dev in enumerate(instance.disks):
4074
      if idx not in self.op.disks:
4075
        continue
4076
      for node in tgt_node, oth_node:
4077
        info("checking disk/%d on %s" % (idx, node))
4078
        cfg.SetDiskID(dev, node)
4079
        if not self.rpc.call_blockdev_find(node, dev):
4080
          raise errors.OpExecError("Can't find disk/%d on node %s" %
4081
                                   (idx, node))
4082

    
4083
    # Step: check other node consistency
4084
    self.proc.LogStep(2, steps_total, "check peer consistency")
4085
    for idx, dev in enumerate(instance.disks):
4086
      if idx not in self.op.disks:
4087
        continue
4088
      info("checking disk/%d consistency on %s" % (idx, oth_node))
4089
      if not _CheckDiskConsistency(self, dev, oth_node,
4090
                                   oth_node==instance.primary_node):
4091
        raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
4092
                                 " to replace disks on this node (%s)" %
4093
                                 (oth_node, tgt_node))
4094

    
4095
    # Step: create new storage
4096
    self.proc.LogStep(3, steps_total, "allocate new storage")
4097
    for idx, dev in enumerate(instance.disks):
4098
      if idx not in self.op.disks:
4099
        continue
4100
      size = dev.size
4101
      cfg.SetDiskID(dev, tgt_node)
4102
      lv_names = [".disk%d_%s" % (idx, suf)
4103
                  for suf in ["data", "meta"]]
4104
      names = _GenerateUniqueNames(self, lv_names)
4105
      lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
4106
                             logical_id=(vgname, names[0]))
4107
      lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
4108
                             logical_id=(vgname, names[1]))
4109
      new_lvs = [lv_data, lv_meta]
4110
      old_lvs = dev.children
4111
      iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
4112
      info("creating new local storage on %s for %s" %
4113
           (tgt_node, dev.iv_name))
4114
      # since we *always* want to create this LV, we use the
4115
      # _Create...OnPrimary (which forces the creation), even if we
4116
      # are talking about the secondary node
4117
      for new_lv in new_lvs:
4118
        if not _CreateBlockDevOnPrimary(self, tgt_node, instance, new_lv,
4119
                                        _GetInstanceInfoText(instance)):
4120
          raise errors.OpExecError("Failed to create new LV named '%s' on"
4121
                                   " node '%s'" %
4122
                                   (new_lv.logical_id[1], tgt_node))
4123

    
4124
    # Step: for each lv, detach+rename*2+attach
4125
    self.proc.LogStep(4, steps_total, "change drbd configuration")
4126
    for dev, old_lvs, new_lvs in iv_names.itervalues():
4127
      info("detaching %s drbd from local storage" % dev.iv_name)
4128
      if not self.rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
4129
        raise errors.OpExecError("Can't detach drbd from local storage on node"
4130
                                 " %s for device %s" % (tgt_node, dev.iv_name))
4131
      #dev.children = []
4132
      #cfg.Update(instance)
4133

    
4134
      # ok, we created the new LVs, so now we know we have the needed
4135
      # storage; as such, we proceed on the target node to rename
4136
      # old_lv to _old, and new_lv to old_lv; note that we rename LVs
4137
      # using the assumption that logical_id == physical_id (which in
4138
      # turn is the unique_id on that node)
4139

    
4140
      # FIXME(iustin): use a better name for the replaced LVs
4141
      temp_suffix = int(time.time())
4142
      ren_fn = lambda d, suff: (d.physical_id[0],
4143
                                d.physical_id[1] + "_replaced-%s" % suff)
4144
      # build the rename list based on what LVs exist on the node
4145
      rlist = []
4146
      for to_ren in old_lvs:
4147
        find_res = self.rpc.call_blockdev_find(tgt_node, to_ren)
4148
        if find_res is not None: # device exists
4149
          rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
4150

    
4151
      info("renaming the old LVs on the target node")
4152
      if not self.rpc.call_blockdev_rename(tgt_node, rlist):
4153
        raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
4154
      # now we rename the new LVs to the old LVs
4155
      info("renaming the new LVs on the target node")
4156
      rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
4157
      if not self.rpc.call_blockdev_rename(tgt_node, rlist):
4158
        raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
4159

    
4160
      for old, new in zip(old_lvs, new_lvs):
4161
        new.logical_id = old.logical_id
4162
        cfg.SetDiskID(new, tgt_node)
4163

    
4164
      for disk in old_lvs:
4165
        disk.logical_id = ren_fn(disk, temp_suffix)
4166
        cfg.SetDiskID(disk, tgt_node)
4167

    
4168
      # now that the new lvs have the old name, we can add them to the device
4169
      info("adding new mirror component on %s" % tgt_node)
4170
      if not self.rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
4171
        for new_lv in new_lvs:
4172
          if not self.rpc.call_blockdev_remove(tgt_node, new_lv):
4173
            warning("Can't rollback device %s", hint="manually cleanup unused"
4174
                    " logical volumes")
4175
        raise errors.OpExecError("Can't add local storage to drbd")
4176

    
4177
      dev.children = new_lvs
4178
      cfg.Update(instance)
4179

    
4180
    # Step: wait for sync
4181

    
4182
    # this can fail as the old devices are degraded and _WaitForSync
4183
    # does a combined result over all disks, so we don't check its
4184
    # return value
4185
    self.proc.LogStep(5, steps_total, "sync devices")
4186
    _WaitForSync(self, instance, unlock=True)
4187

    
4188
    # so check manually all the devices
4189
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
4190
      cfg.SetDiskID(dev, instance.primary_node)
4191
      is_degr = self.rpc.call_blockdev_find(instance.primary_node, dev)[5]
4192
      if is_degr:
4193
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
4194

    
4195
    # Step: remove old storage
4196
    self.proc.LogStep(6, steps_total, "removing old storage")
4197
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
4198
      info("remove logical volumes for %s" % name)
4199
      for lv in old_lvs:
4200
        cfg.SetDiskID(lv, tgt_node)
4201
        if not self.rpc.call_blockdev_remove(tgt_node, lv):
4202
          warning("Can't remove old LV", hint="manually remove unused LVs")
4203
          continue
4204

    
4205
  def _ExecD8Secondary(self, feedback_fn):
4206
    """Replace the secondary node for drbd8.
4207

4208
    The algorithm for replace is quite complicated:
4209
      - for all disks of the instance:
4210
        - create new LVs on the new node with same names
4211
        - shutdown the drbd device on the old secondary
4212
        - disconnect the drbd network on the primary
4213
        - create the drbd device on the new secondary
4214
        - network attach the drbd on the primary, using an artifice:
4215
          the drbd code for Attach() will connect to the network if it
4216
          finds a device which is connected to the good local disks but
4217
          not network enabled
4218
      - wait for sync across all devices
4219
      - remove all disks from the old secondary
4220

4221
    Failures are not very well handled.
4222

4223
    """
4224
    steps_total = 6
4225
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
4226
    instance = self.instance
4227
    iv_names = {}
4228
    vgname = self.cfg.GetVGName()
4229
    # start of work
4230
    cfg = self.cfg
4231
    old_node = self.tgt_node
4232
    new_node = self.new_node
4233
    pri_node = instance.primary_node
4234

    
4235
    # Step: check device activation
4236
    self.proc.LogStep(1, steps_total, "check device existence")
4237
    info("checking volume groups")
4238
    my_vg = cfg.GetVGName()
4239
    results = self.rpc.call_vg_list([pri_node, new_node])
4240
    if not results:
4241
      raise errors.OpExecError("Can't list volume groups on the nodes")
4242
    for node in pri_node, new_node:
4243
      res = results.get(node, False)
4244
      if not res or my_vg not in res:
4245
        raise errors.OpExecError("Volume group '%s' not found on %s" %
4246
                                 (my_vg, node))
4247
    for idx, dev in enumerate(instance.disks):
4248
      if idx not in self.op.disks:
4249
        continue
4250
      info("checking disk/%d on %s" % (idx, pri_node))
4251
      cfg.SetDiskID(dev, pri_node)
4252
      if not self.rpc.call_blockdev_find(pri_node, dev):
4253
        raise errors.OpExecError("Can't find disk/%d on node %s" %
4254
                                 (idx, pri_node))
4255

    
4256
    # Step: check other node consistency
4257
    self.proc.LogStep(2, steps_total, "check peer consistency")
4258
    for idx, dev in enumerate(instance.disks):
4259
      if idx not in self.op.disks:
4260
        continue
4261
      info("checking disk/%d consistency on %s" % (idx, pri_node))
4262
      if not _CheckDiskConsistency(self, dev, pri_node, True, ldisk=True):
4263
        raise errors.OpExecError("Primary node (%s) has degraded storage,"
4264
                                 " unsafe to replace the secondary" %
4265
                                 pri_node)
4266

    
4267
    # Step: create new storage
4268
    self.proc.LogStep(3, steps_total, "allocate new storage")
4269
    for idx, dev in enumerate(instance.disks):
4270
      size = dev.size
4271
      info("adding new local storage on %s for disk/%d" %
4272
           (new_node, idx))
4273
      # since we *always* want to create this LV, we use the
4274
      # _Create...OnPrimary (which forces the creation), even if we
4275
      # are talking about the secondary node
4276
      for new_lv in dev.children:
4277
        if not _CreateBlockDevOnPrimary(self, new_node, instance, new_lv,
4278
                                        _GetInstanceInfoText(instance)):
4279
          raise errors.OpExecError("Failed to create new LV named '%s' on"
4280
                                   " node '%s'" %
4281
                                   (new_lv.logical_id[1], new_node))
4282

    
4283
    # Step 4: dbrd minors and drbd setups changes
4284
    # after this, we must manually remove the drbd minors on both the
4285
    # error and the success paths
4286
    minors = cfg.AllocateDRBDMinor([new_node for dev in instance.disks],
4287
                                   instance.name)
4288
    logging.debug("Allocated minors %s" % (minors,))
4289
    self.proc.LogStep(4, steps_total, "changing drbd configuration")
4290
    for idx, (dev, new_minor) in enumerate(zip(instance.disks, minors)):
4291
      size = dev.size
4292
      info("activating a new drbd on %s for disk/%d" % (new_node, idx))
4293
      # create new devices on new_node
4294
      if pri_node == dev.logical_id[0]:
4295
        new_logical_id = (pri_node, new_node,
4296
                          dev.logical_id[2], dev.logical_id[3], new_minor,
4297
                          dev.logical_id[5])
4298
      else:
4299
        new_logical_id = (new_node, pri_node,
4300
                          dev.logical_id[2], new_minor, dev.logical_id[4],
4301
                          dev.logical_id[5])
4302
      iv_names[idx] = (dev, dev.children, new_logical_id)
4303
      logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
4304
                    new_logical_id)
4305
      new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
4306
                              logical_id=new_logical_id,
4307
                              children=dev.children)
4308
      if not _CreateBlockDevOnSecondary(self, new_node, instance,
4309
                                        new_drbd, False,
4310
                                        _GetInstanceInfoText(instance)):
4311
        self.cfg.ReleaseDRBDMinors(instance.name)
4312
        raise errors.OpExecError("Failed to create new DRBD on"
4313
                                 " node '%s'" % new_node)
4314

    
4315
    for idx, dev in enumerate(instance.disks):
4316
      # we have new devices, shutdown the drbd on the old secondary
4317
      info("shutting down drbd for disk/%d on old node" % idx)
4318
      cfg.SetDiskID(dev, old_node)
4319
      if not self.rpc.call_blockdev_shutdown(old_node, dev):
4320
        warning("Failed to shutdown drbd for disk/%d on old node" % idx,
4321
                hint="Please cleanup this device manually as soon as possible")
4322

    
4323
    info("detaching primary drbds from the network (=> standalone)")
4324
    done = 0
4325
    for idx, dev in enumerate(instance.disks):
4326
      cfg.SetDiskID(dev, pri_node)
4327
      # set the network part of the physical (unique in bdev terms) id
4328
      # to None, meaning detach from network
4329
      dev.physical_id = (None, None, None, None) + dev.physical_id[4:]
4330
      # and 'find' the device, which will 'fix' it to match the
4331
      # standalone state
4332
      if self.rpc.call_blockdev_find(pri_node, dev):
4333
        done += 1
4334
      else:
4335
        warning("Failed to detach drbd disk/%d from network, unusual case" %
4336
                idx)
4337

    
4338
    if not done:
4339
      # no detaches succeeded (very unlikely)
4340
      self.cfg.ReleaseDRBDMinors(instance.name)
4341
      raise errors.OpExecError("Can't detach at least one DRBD from old node")
4342

    
4343
    # if we managed to detach at least one, we update all the disks of
4344
    # the instance to point to the new secondary
4345
    info("updating instance configuration")
4346
    for dev, _, new_logical_id in iv_names.itervalues():
4347
      dev.logical_id = new_logical_id
4348
      cfg.SetDiskID(dev, pri_node)
4349
    cfg.Update(instance)
4350
    # we can remove now the temp minors as now the new values are
4351
    # written to the config file (and therefore stable)
4352
    self.cfg.ReleaseDRBDMinors(instance.name)
4353

    
4354
    # and now perform the drbd attach
4355
    info("attaching primary drbds to new secondary (standalone => connected)")
4356
    failures = []
4357
    for idx, dev in enumerate(instance.disks):
4358
      info("attaching primary drbd for disk/%d to new secondary node" % idx)
4359
      # since the attach is smart, it's enough to 'find' the device,
4360
      # it will automatically activate the network, if the physical_id
4361
      # is correct
4362
      cfg.SetDiskID(dev, pri_node)
4363
      logging.debug("Disk to attach: %s", dev)
4364
      if not self.rpc.call_blockdev_find(pri_node, dev):
4365
        warning("can't attach drbd disk/%d to new secondary!" % idx,
4366
                "please do a gnt-instance info to see the status of disks")
4367

    
4368
    # this can fail as the old devices are degraded and _WaitForSync
4369
    # does a combined result over all disks, so we don't check its
4370
    # return value
4371
    self.proc.LogStep(5, steps_total, "sync devices")
4372
    _WaitForSync(self, instance, unlock=True)
4373

    
4374
    # so check manually all the devices
4375
    for idx, (dev, old_lvs, _) in iv_names.iteritems():
4376
      cfg.SetDiskID(dev, pri_node)
4377
      is_degr = self.rpc.call_blockdev_find(pri_node, dev)[5]
4378
      if is_degr:
4379
        raise errors.OpExecError("DRBD device disk/%d is degraded!" % idx)
4380

    
4381
    self.proc.LogStep(6, steps_total, "removing old storage")
4382
    for idx, (dev, old_lvs, _) in iv_names.iteritems():
4383
      info("remove logical volumes for disk/%d" % idx)
4384
      for lv in old_lvs:
4385
        cfg.SetDiskID(lv, old_node)
4386
        if not self.rpc.call_blockdev_remove(old_node, lv):
4387
          warning("Can't remove LV on old secondary",
4388
                  hint="Cleanup stale volumes by hand")
4389

    
4390
  def Exec(self, feedback_fn):
4391
    """Execute disk replacement.
4392

4393
    This dispatches the disk replacement to the appropriate handler.
4394

4395
    """
4396
    instance = self.instance
4397

    
4398
    # Activate the instance disks if we're replacing them on a down instance
4399
    if instance.status == "down":
4400
      _StartInstanceDisks(self, instance, True)
4401

    
4402
    if instance.disk_template == constants.DT_DRBD8:
4403
      if self.op.remote_node is None:
4404
        fn = self._ExecD8DiskOnly
4405
      else:
4406
        fn = self._ExecD8Secondary
4407
    else:
4408
      raise errors.ProgrammerError("Unhandled disk replacement case")
4409

    
4410
    ret = fn(feedback_fn)
4411

    
4412
    # Deactivate the instance disks if we're replacing them on a down instance
4413
    if instance.status == "down":
4414
      _SafeShutdownInstanceDisks(self, instance)
4415

    
4416
    return ret
4417

    
4418

    
4419
class LUGrowDisk(LogicalUnit):
4420
  """Grow a disk of an instance.
4421

4422
  """
4423
  HPATH = "disk-grow"
4424
  HTYPE = constants.HTYPE_INSTANCE
4425
  _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
4426
  REQ_BGL = False
4427

    
4428
  def ExpandNames(self):
4429
    self._ExpandAndLockInstance()
4430
    self.needed_locks[locking.LEVEL_NODE] = []
4431
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4432

    
4433
  def DeclareLocks(self, level):
4434
    if level == locking.LEVEL_NODE:
4435
      self._LockInstancesNodes()
4436

    
4437
  def BuildHooksEnv(self):
4438
    """Build hooks env.
4439

4440
    This runs on the master, the primary and all the secondaries.
4441

4442
    """
4443
    env = {
4444
      "DISK": self.op.disk,
4445
      "AMOUNT": self.op.amount,
4446
      }
4447
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
4448
    nl = [
4449
      self.cfg.GetMasterNode(),
4450
      self.instance.primary_node,
4451
      ]
4452
    return env, nl, nl
4453

    
4454
  def CheckPrereq(self):
4455
    """Check prerequisites.
4456

4457
    This checks that the instance is in the cluster.
4458

4459
    """
4460
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4461
    assert instance is not None, \
4462
      "Cannot retrieve locked instance %s" % self.op.instance_name
4463

    
4464
    self.instance = instance
4465

    
4466
    if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
4467
      raise errors.OpPrereqError("Instance's disk layout does not support"
4468
                                 " growing.")
4469

    
4470
    self.disk = instance.FindDisk(self.op.disk)
4471

    
4472
    nodenames = [instance.primary_node] + list(instance.secondary_nodes)
4473
    nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
4474
                                       instance.hypervisor)
4475
    for node in nodenames:
4476
      info = nodeinfo.get(node, None)
4477
      if not info:
4478
        raise errors.OpPrereqError("Cannot get current information"
4479
                                   " from node '%s'" % node)
4480
      vg_free = info.get('vg_free', None)
4481
      if not isinstance(vg_free, int):
4482
        raise errors.OpPrereqError("Can't compute free disk space on"
4483
                                   " node %s" % node)
4484
      if self.op.amount > info['vg_free']:
4485
        raise errors.OpPrereqError("Not enough disk space on target node %s:"
4486
                                   " %d MiB available, %d MiB required" %
4487
                                   (node, info['vg_free'], self.op.amount))
4488

    
4489
  def Exec(self, feedback_fn):
4490
    """Execute disk grow.
4491

4492
    """
4493
    instance = self.instance
4494
    disk = self.disk
4495
    for node in (instance.secondary_nodes + (instance.primary_node,)):
4496
      self.cfg.SetDiskID(disk, node)
4497
      result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
4498
      if (not result or not isinstance(result, (list, tuple)) or
4499
          len(result) != 2):
4500
        raise errors.OpExecError("grow request failed to node %s" % node)
4501
      elif not result[0]:
4502
        raise errors.OpExecError("grow request failed to node %s: %s" %
4503
                                 (node, result[1]))
4504
    disk.RecordGrow(self.op.amount)
4505
    self.cfg.Update(instance)
4506
    if self.op.wait_for_sync:
4507
      disk_abort = not _WaitForSync(self, instance)
4508
      if disk_abort:
4509
        self.proc.LogWarning("Warning: disk sync-ing has not returned a good"
4510
                             " status.\nPlease check the instance.")
4511

    
4512

    
4513
class LUQueryInstanceData(NoHooksLU):
4514
  """Query runtime instance data.
4515

4516
  """
4517
  _OP_REQP = ["instances", "static"]
4518
  REQ_BGL = False
4519

    
4520
  def ExpandNames(self):
4521
    self.needed_locks = {}
4522
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
4523

    
4524
    if not isinstance(self.op.instances, list):
4525
      raise errors.OpPrereqError("Invalid argument type 'instances'")
4526

    
4527
    if self.op.instances:
4528
      self.wanted_names = []
4529
      for name in self.op.instances:
4530
        full_name = self.cfg.ExpandInstanceName(name)
4531
        if full_name is None:
4532
          raise errors.OpPrereqError("Instance '%s' not known" %
4533
                                     self.op.instance_name)
4534
        self.wanted_names.append(full_name)
4535
      self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
4536
    else:
4537
      self.wanted_names = None
4538
      self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
4539

    
4540
    self.needed_locks[locking.LEVEL_NODE] = []
4541
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4542

    
4543
  def DeclareLocks(self, level):
4544
    if level == locking.LEVEL_NODE:
4545
      self._LockInstancesNodes()
4546

    
4547
  def CheckPrereq(self):
4548
    """Check prerequisites.
4549

4550
    This only checks the optional instance list against the existing names.
4551

4552
    """
4553
    if self.wanted_names is None:
4554
      self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
4555

    
4556
    self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
4557
                             in self.wanted_names]
4558
    return
4559

    
4560
  def _ComputeDiskStatus(self, instance, snode, dev):
4561
    """Compute block device status.
4562

4563
    """
4564
    static = self.op.static
4565
    if not static:
4566
      self.cfg.SetDiskID(dev, instance.primary_node)
4567
      dev_pstatus = self.rpc.call_blockdev_find(instance.primary_node, dev)
4568
    else:
4569
      dev_pstatus = None
4570

    
4571
    if dev.dev_type in constants.LDS_DRBD:
4572
      # we change the snode then (otherwise we use the one passed in)
4573
      if dev.logical_id[0] == instance.primary_node:
4574
        snode = dev.logical_id[1]
4575
      else:
4576
        snode = dev.logical_id[0]
4577

    
4578
    if snode and not static:
4579
      self.cfg.SetDiskID(dev, snode)
4580
      dev_sstatus = self.rpc.call_blockdev_find(snode, dev)
4581
    else:
4582
      dev_sstatus = None
4583

    
4584
    if dev.children:
4585
      dev_children = [self._ComputeDiskStatus(instance, snode, child)
4586
                      for child in dev.children]
4587
    else:
4588
      dev_children = []
4589

    
4590
    data = {
4591
      "iv_name": dev.iv_name,
4592
      "dev_type": dev.dev_type,
4593
      "logical_id": dev.logical_id,
4594
      "physical_id": dev.physical_id,
4595
      "pstatus": dev_pstatus,
4596
      "sstatus": dev_sstatus,
4597
      "children": dev_children,
4598
      }
4599

    
4600
    return data
4601

    
4602
  def Exec(self, feedback_fn):
4603
    """Gather and return data"""
4604
    result = {}
4605

    
4606
    cluster = self.cfg.GetClusterInfo()
4607

    
4608
    for instance in self.wanted_instances:
4609
      if not self.op.static:
4610
        remote_info = self.rpc.call_instance_info(instance.primary_node,
4611
                                                  instance.name,
4612
                                                  instance.hypervisor)
4613
        if remote_info and "state" in remote_info:
4614
          remote_state = "up"
4615
        else:
4616
          remote_state = "down"
4617
      else:
4618
        remote_state = None
4619
      if instance.status == "down":
4620
        config_state = "down"
4621
      else:
4622
        config_state = "up"
4623

    
4624
      disks = [self._ComputeDiskStatus(instance, None, device)
4625
               for device in instance.disks]
4626

    
4627
      idict = {
4628
        "name": instance.name,
4629
        "config_state": config_state,
4630
        "run_state": remote_state,
4631
        "pnode": instance.primary_node,
4632
        "snodes": instance.secondary_nodes,
4633
        "os": instance.os,
4634
        "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
4635
        "disks": disks,
4636
        "hypervisor": instance.hypervisor,
4637
        "network_port": instance.network_port,
4638
        "hv_instance": instance.hvparams,
4639
        "hv_actual": cluster.FillHV(instance),
4640
        "be_instance": instance.beparams,
4641
        "be_actual": cluster.FillBE(instance),
4642
        }
4643

    
4644
      result[instance.name] = idict
4645

    
4646
    return result
4647

    
4648

    
4649
class LUSetInstanceParams(LogicalUnit):
4650
  """Modifies an instances's parameters.
4651

4652
  """
4653
  HPATH = "instance-modify"
4654
  HTYPE = constants.HTYPE_INSTANCE
4655
  _OP_REQP = ["instance_name", "hvparams"]
4656
  REQ_BGL = False
4657

    
4658
  def ExpandNames(self):
4659
    self._ExpandAndLockInstance()
4660
    self.needed_locks[locking.LEVEL_NODE] = []
4661
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4662

    
4663

    
4664
  def DeclareLocks(self, level):
4665
    if level == locking.LEVEL_NODE:
4666
      self._LockInstancesNodes()
4667

    
4668
  def BuildHooksEnv(self):
4669
    """Build hooks env.
4670

4671
    This runs on the master, primary and secondaries.
4672

4673
    """
4674
    args = dict()
4675
    if constants.BE_MEMORY in self.be_new:
4676
      args['memory'] = self.be_new[constants.BE_MEMORY]
4677
    if constants.BE_VCPUS in self.be_new:
4678
      args['vcpus'] = self.be_new[constants.BE_VCPUS]
4679
    if self.do_ip or self.do_bridge or self.mac:
4680
      if self.do_ip:
4681
        ip = self.ip
4682
      else:
4683
        ip = self.instance.nics[0].ip
4684
      if self.bridge:
4685
        bridge = self.bridge
4686
      else:
4687
        bridge = self.instance.nics[0].bridge
4688
      if self.mac:
4689
        mac = self.mac
4690
      else:
4691
        mac = self.instance.nics[0].mac
4692
      args['nics'] = [(ip, bridge, mac)]
4693
    env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
4694
    nl = [self.cfg.GetMasterNode(),
4695
          self.instance.primary_node] + list(self.instance.secondary_nodes)
4696
    return env, nl, nl
4697

    
4698
  def CheckPrereq(self):
4699
    """Check prerequisites.
4700

4701
    This only checks the instance list against the existing names.
4702

4703
    """
4704
    # FIXME: all the parameters could be checked before, in ExpandNames, or in
4705
    # a separate CheckArguments function, if we implement one, so the operation
4706
    # can be aborted without waiting for any lock, should it have an error...
4707
    self.ip = getattr(self.op, "ip", None)
4708
    self.mac = getattr(self.op, "mac", None)
4709
    self.bridge = getattr(self.op, "bridge", None)
4710
    self.kernel_path = getattr(self.op, "kernel_path", None)
4711
    self.initrd_path = getattr(self.op, "initrd_path", None)
4712
    self.force = getattr(self.op, "force", None)
4713
    all_parms = [self.ip, self.bridge, self.mac]
4714
    if (all_parms.count(None) == len(all_parms) and
4715
        not self.op.hvparams and
4716
        not self.op.beparams):
4717
      raise errors.OpPrereqError("No changes submitted")
4718
    for item in (constants.BE_MEMORY, constants.BE_VCPUS):
4719
      val = self.op.beparams.get(item, None)
4720
      if val is not None:
4721
        try:
4722
          val = int(val)
4723
        except ValueError, err:
4724
          raise errors.OpPrereqError("Invalid %s size: %s" % (item, str(err)))
4725
        self.op.beparams[item] = val
4726
    if self.ip is not None:
4727
      self.do_ip = True
4728
      if self.ip.lower() == "none":
4729
        self.ip = None
4730
      else:
4731
        if not utils.IsValidIP(self.ip):
4732
          raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
4733
    else:
4734
      self.do_ip = False
4735
    self.do_bridge = (self.bridge is not None)
4736
    if self.mac is not None:
4737
      if self.cfg.IsMacInUse(self.mac):
4738
        raise errors.OpPrereqError('MAC address %s already in use in cluster' %
4739
                                   self.mac)
4740
      if not utils.IsValidMac(self.mac):
4741
        raise errors.OpPrereqError('Invalid MAC address %s' % self.mac)
4742

    
4743
    # checking the new params on the primary/secondary nodes
4744

    
4745
    instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4746
    assert self.instance is not None, \
4747
      "Cannot retrieve locked instance %s" % self.op.instance_name
4748
    pnode = self.instance.primary_node
4749
    nodelist = [pnode]
4750
    nodelist.extend(instance.secondary_nodes)
4751

    
4752
    # hvparams processing
4753
    if self.op.hvparams:
4754
      i_hvdict = copy.deepcopy(instance.hvparams)
4755
      for key, val in self.op.hvparams.iteritems():
4756
        if val is None:
4757
          try:
4758
            del i_hvdict[key]
4759
          except KeyError:
4760
            pass
4761
        else:
4762
          i_hvdict[key] = val
4763
      cluster = self.cfg.GetClusterInfo()
4764
      hv_new = cluster.FillDict(cluster.hvparams[instance.hypervisor],
4765
                                i_hvdict)
4766
      # local check
4767
      hypervisor.GetHypervisor(
4768
        instance.hypervisor).CheckParameterSyntax(hv_new)
4769
      _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
4770
      self.hv_new = hv_new # the new actual values
4771
      self.hv_inst = i_hvdict # the new dict (without defaults)
4772
    else:
4773
      self.hv_new = self.hv_inst = {}
4774

    
4775
    # beparams processing
4776
    if self.op.beparams:
4777
      i_bedict = copy.deepcopy(instance.beparams)
4778
      for key, val in self.op.beparams.iteritems():
4779
        if val is None:
4780
          try:
4781
            del i_bedict[key]
4782
          except KeyError:
4783
            pass
4784
        else:
4785
          i_bedict[key] = val
4786
      cluster = self.cfg.GetClusterInfo()
4787
      be_new = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
4788
                                i_bedict)
4789
      self.be_new = be_new # the new actual values
4790
      self.be_inst = i_bedict # the new dict (without defaults)
4791
    else:
4792
      self.hv_new = self.hv_inst = {}
4793

    
4794
    self.warn = []
4795

    
4796
    if constants.BE_MEMORY in self.op.beparams and not self.force:
4797
      mem_check_list = [pnode]
4798
      if be_new[constants.BE_AUTO_BALANCE]:
4799
        # either we changed auto_balance to yes or it was from before
4800
        mem_check_list.extend(instance.secondary_nodes)
4801
      instance_info = self.rpc.call_instance_info(pnode, instance.name,
4802
                                                  instance.hypervisor)
4803
      nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(),
4804
                                         instance.hypervisor)
4805

    
4806
      if pnode not in nodeinfo or not isinstance(nodeinfo[pnode], dict):
4807
        # Assume the primary node is unreachable and go ahead
4808
        self.warn.append("Can't get info from primary node %s" % pnode)
4809
      else:
4810
        if instance_info:
4811
          current_mem = instance_info['memory']
4812
        else:
4813
          # Assume instance not running
4814
          # (there is a slight race condition here, but it's not very probable,
4815
          # and we have no other way to check)
4816
          current_mem = 0
4817
        miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
4818
                    nodeinfo[pnode]['memory_free'])
4819
        if miss_mem > 0:
4820
          raise errors.OpPrereqError("This change will prevent the instance"
4821
                                     " from starting, due to %d MB of memory"
4822
                                     " missing on its primary node" % miss_mem)
4823

    
4824
      if be_new[constants.BE_AUTO_BALANCE]:
4825
        for node in instance.secondary_nodes:
4826
          if node not in nodeinfo or not isinstance(nodeinfo[node], dict):
4827
            self.warn.append("Can't get info from secondary node %s" % node)
4828
          elif be_new[constants.BE_MEMORY] > nodeinfo[node]['memory_free']:
4829
            self.warn.append("Not enough memory to failover instance to"
4830
                             " secondary node %s" % node)
4831

    
4832
    return
4833

    
4834
  def Exec(self, feedback_fn):
4835
    """Modifies an instance.
4836

4837
    All parameters take effect only at the next restart of the instance.
4838
    """
4839
    # Process here the warnings from CheckPrereq, as we don't have a
4840
    # feedback_fn there.
4841
    for warn in self.warn:
4842
      feedback_fn("WARNING: %s" % warn)
4843

    
4844
    result = []
4845
    instance = self.instance
4846
    if self.do_ip:
4847
      instance.nics[0].ip = self.ip
4848
      result.append(("ip", self.ip))
4849
    if self.bridge:
4850
      instance.nics[0].bridge = self.bridge
4851
      result.append(("bridge", self.bridge))
4852
    if self.mac:
4853
      instance.nics[0].mac = self.mac
4854
      result.append(("mac", self.mac))
4855
    if self.op.hvparams:
4856
      instance.hvparams = self.hv_new
4857
      for key, val in self.op.hvparams.iteritems():
4858
        result.append(("hv/%s" % key, val))
4859
    if self.op.beparams:
4860
      instance.beparams = self.be_inst
4861
      for key, val in self.op.beparams.iteritems():
4862
        result.append(("be/%s" % key, val))
4863

    
4864
    self.cfg.Update(instance)
4865

    
4866
    return result
4867

    
4868

    
4869
class LUQueryExports(NoHooksLU):
4870
  """Query the exports list
4871

4872
  """
4873
  _OP_REQP = ['nodes']
4874
  REQ_BGL = False
4875

    
4876
  def ExpandNames(self):
4877
    self.needed_locks = {}
4878
    self.share_locks[locking.LEVEL_NODE] = 1
4879
    if not self.op.nodes:
4880
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4881
    else:
4882
      self.needed_locks[locking.LEVEL_NODE] = \
4883
        _GetWantedNodes(self, self.op.nodes)
4884

    
4885
  def CheckPrereq(self):
4886
    """Check prerequisites.
4887

4888
    """
4889
    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
4890

    
4891
  def Exec(self, feedback_fn):
4892
    """Compute the list of all the exported system images.
4893

4894
    @rtype: dict
4895
    @return: a dictionary with the structure node->(export-list)
4896
        where export-list is a list of the instances exported on
4897
        that node.
4898

4899
    """
4900
    return self.rpc.call_export_list(self.nodes)
4901

    
4902

    
4903
class LUExportInstance(LogicalUnit):
4904
  """Export an instance to an image in the cluster.
4905

4906
  """
4907
  HPATH = "instance-export"
4908
  HTYPE = constants.HTYPE_INSTANCE
4909
  _OP_REQP = ["instance_name", "target_node", "shutdown"]
4910
  REQ_BGL = False
4911

    
4912
  def ExpandNames(self):
4913
    self._ExpandAndLockInstance()
4914
    # FIXME: lock only instance primary and destination node
4915
    #
4916
    # Sad but true, for now we have do lock all nodes, as we don't know where
4917
    # the previous export might be, and and in this LU we search for it and
4918
    # remove it from its current node. In the future we could fix this by:
4919
    #  - making a tasklet to search (share-lock all), then create the new one,
4920
    #    then one to remove, after
4921
    #  - removing the removal operation altoghether
4922
    self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4923

    
4924
  def DeclareLocks(self, level):
4925
    """Last minute lock declaration."""
4926
    # All nodes are locked anyway, so nothing to do here.
4927

    
4928
  def BuildHooksEnv(self):
4929
    """Build hooks env.
4930

4931
    This will run on the master, primary node and target node.
4932

4933
    """
4934
    env = {
4935
      "EXPORT_NODE": self.op.target_node,
4936
      "EXPORT_DO_SHUTDOWN": self.op.shutdown,
4937
      }
4938
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
4939
    nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
4940
          self.op.target_node]
4941
    return env, nl, nl
4942

    
4943
  def CheckPrereq(self):
4944
    """Check prerequisites.
4945

4946
    This checks that the instance and node names are valid.
4947

4948
    """
4949
    instance_name = self.op.instance_name
4950
    self.instance = self.cfg.GetInstanceInfo(instance_name)
4951
    assert self.instance is not None, \
4952
          "Cannot retrieve locked instance %s" % self.op.instance_name
4953

    
4954
    self.dst_node = self.cfg.GetNodeInfo(
4955
      self.cfg.ExpandNodeName(self.op.target_node))
4956

    
4957
    assert self.dst_node is not None, \
4958
          "Cannot retrieve locked node %s" % self.op.target_node
4959

    
4960
    # instance disk type verification
4961
    for disk in self.instance.disks:
4962
      if disk.dev_type == constants.LD_FILE:
4963
        raise errors.OpPrereqError("Export not supported for instances with"
4964
                                   " file-based disks")
4965

    
4966
  def Exec(self, feedback_fn):
4967
    """Export an instance to an image in the cluster.
4968

4969
    """
4970
    instance = self.instance
4971
    dst_node = self.dst_node
4972
    src_node = instance.primary_node
4973
    if self.op.shutdown:
4974
      # shutdown the instance, but not the disks
4975
      if not self.rpc.call_instance_shutdown(src_node, instance):
4976
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
4977
                                 (instance.name, src_node))
4978

    
4979
    vgname = self.cfg.GetVGName()
4980

    
4981
    snap_disks = []
4982

    
4983
    try:
4984
      for disk in instance.disks:
4985
        # new_dev_name will be a snapshot of an lvm leaf of the one we passed
4986
        new_dev_name = self.rpc.call_blockdev_snapshot(src_node, disk)
4987

    
4988
        if not new_dev_name:
4989
          self.LogWarning("Could not snapshot block device %s on node %s",
4990
                          disk.logical_id[1], src_node)
4991
          snap_disks.append(False)
4992
        else:
4993
          new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
4994
                                 logical_id=(vgname, new_dev_name),
4995
                                 physical_id=(vgname, new_dev_name),
4996
                                 iv_name=disk.iv_name)
4997
          snap_disks.append(new_dev)
4998

    
4999
    finally:
5000
      if self.op.shutdown and instance.status == "up":
5001
        if not self.rpc.call_instance_start(src_node, instance, None):
5002
          _ShutdownInstanceDisks(self, instance)
5003
          raise errors.OpExecError("Could not start instance")
5004

    
5005
    # TODO: check for size
5006

    
5007
    cluster_name = self.cfg.GetClusterName()
5008
    for idx, dev in enumerate(snap_disks):
5009
      if dev:
5010
        if not self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
5011
                                             instance, cluster_name, idx):
5012
          self.LogWarning("Could not export block device %s from node %s to"
5013
                          " node %s", dev.logical_id[1], src_node,
5014
                          dst_node.name)
5015
        if not self.rpc.call_blockdev_remove(src_node, dev):
5016
          self.LogWarning("Could not remove snapshot block device %s from node"
5017
                          " %s", dev.logical_id[1], src_node)
5018

    
5019
    if not self.rpc.call_finalize_export(dst_node.name, instance, snap_disks):
5020
      self.LogWarning("Could not finalize export for instance %s on node %s",
5021
                      instance.name, dst_node.name)
5022

    
5023
    nodelist = self.cfg.GetNodeList()
5024
    nodelist.remove(dst_node.name)
5025

    
5026
    # on one-node clusters nodelist will be empty after the removal
5027
    # if we proceed the backup would be removed because OpQueryExports
5028
    # substitutes an empty list with the full cluster node list.
5029
    if nodelist:
5030
      exportlist = self.rpc.call_export_list(nodelist)
5031
      for node in exportlist:
5032
        if instance.name in exportlist[node]:
5033
          if not self.rpc.call_export_remove(node, instance.name):
5034
            self.LogWarning("Could not remove older export for instance %s"
5035
                            " on node %s", instance.name, node)
5036

    
5037

    
5038
class LURemoveExport(NoHooksLU):
5039
  """Remove exports related to the named instance.
5040

5041
  """
5042
  _OP_REQP = ["instance_name"]
5043
  REQ_BGL = False
5044

    
5045
  def ExpandNames(self):
5046
    self.needed_locks = {}
5047
    # We need all nodes to be locked in order for RemoveExport to work, but we
5048
    # don't need to lock the instance itself, as nothing will happen to it (and
5049
    # we can remove exports also for a removed instance)
5050
    self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5051

    
5052
  def CheckPrereq(self):
5053
    """Check prerequisites.
5054
    """
5055
    pass
5056

    
5057
  def Exec(self, feedback_fn):
5058
    """Remove any export.
5059

5060
    """
5061
    instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
5062
    # If the instance was not found we'll try with the name that was passed in.
5063
    # This will only work if it was an FQDN, though.
5064
    fqdn_warn = False
5065
    if not instance_name:
5066
      fqdn_warn = True
5067
      instance_name = self.op.instance_name
5068

    
5069
    exportlist = self.rpc.call_export_list(self.acquired_locks[
5070
      locking.LEVEL_NODE])
5071
    found = False
5072
    for node in exportlist:
5073
      if instance_name in exportlist[node]:
5074
        found = True
5075
        if not self.rpc.call_export_remove(node, instance_name):
5076
          logging.error("Could not remove export for instance %s"
5077
                        " on node %s", instance_name, node)
5078

    
5079
    if fqdn_warn and not found:
5080
      feedback_fn("Export not found. If trying to remove an export belonging"
5081
                  " to a deleted instance please use its Fully Qualified"
5082
                  " Domain Name.")
5083

    
5084

    
5085
class TagsLU(NoHooksLU):
5086
  """Generic tags LU.
5087

5088
  This is an abstract class which is the parent of all the other tags LUs.
5089

5090
  """
5091

    
5092
  def ExpandNames(self):
5093
    self.needed_locks = {}
5094
    if self.op.kind == constants.TAG_NODE:
5095
      name = self.cfg.ExpandNodeName(self.op.name)
5096
      if name is None:
5097
        raise errors.OpPrereqError("Invalid node name (%s)" %
5098
                                   (self.op.name,))
5099
      self.op.name = name
5100
      self.needed_locks[locking.LEVEL_NODE] = name
5101
    elif self.op.kind == constants.TAG_INSTANCE:
5102
      name = self.cfg.ExpandInstanceName(self.op.name)
5103
      if name is None:
5104
        raise errors.OpPrereqError("Invalid instance name (%s)" %
5105
                                   (self.op.name,))
5106
      self.op.name = name
5107
      self.needed_locks[locking.LEVEL_INSTANCE] = name
5108

    
5109
  def CheckPrereq(self):
5110
    """Check prerequisites.
5111

5112
    """
5113
    if self.op.kind == constants.TAG_CLUSTER:
5114
      self.target = self.cfg.GetClusterInfo()
5115
    elif self.op.kind == constants.TAG_NODE:
5116
      self.target = self.cfg.GetNodeInfo(self.op.name)
5117
    elif self.op.kind == constants.TAG_INSTANCE:
5118
      self.target = self.cfg.GetInstanceInfo(self.op.name)
5119
    else:
5120
      raise errors.OpPrereqError("Wrong tag type requested (%s)" %
5121
                                 str(self.op.kind))
5122

    
5123

    
5124
class LUGetTags(TagsLU):
5125
  """Returns the tags of a given object.
5126

5127
  """
5128
  _OP_REQP = ["kind", "name"]
5129
  REQ_BGL = False
5130

    
5131
  def Exec(self, feedback_fn):
5132
    """Returns the tag list.
5133

5134
    """
5135
    return list(self.target.GetTags())
5136

    
5137

    
5138
class LUSearchTags(NoHooksLU):
5139
  """Searches the tags for a given pattern.
5140

5141
  """
5142
  _OP_REQP = ["pattern"]
5143
  REQ_BGL = False
5144

    
5145
  def ExpandNames(self):
5146
    self.needed_locks = {}
5147

    
5148
  def CheckPrereq(self):
5149
    """Check prerequisites.
5150

5151
    This checks the pattern passed for validity by compiling it.
5152

5153
    """
5154
    try:
5155
      self.re = re.compile(self.op.pattern)
5156
    except re.error, err:
5157
      raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
5158
                                 (self.op.pattern, err))
5159

    
5160
  def Exec(self, feedback_fn):
5161
    """Returns the tag list.
5162

5163
    """
5164
    cfg = self.cfg
5165
    tgts = [("/cluster", cfg.GetClusterInfo())]
5166
    ilist = cfg.GetAllInstancesInfo().values()
5167
    tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
5168
    nlist = cfg.GetAllNodesInfo().values()
5169
    tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
5170
    results = []
5171
    for path, target in tgts:
5172
      for tag in target.GetTags():
5173
        if self.re.search(tag):
5174
          results.append((path, tag))
5175
    return results
5176

    
5177

    
5178
class LUAddTags(TagsLU):
5179
  """Sets a tag on a given object.
5180

5181
  """
5182
  _OP_REQP = ["kind", "name", "tags"]
5183
  REQ_BGL = False
5184

    
5185
  def CheckPrereq(self):
5186
    """Check prerequisites.
5187

5188
    This checks the type and length of the tag name and value.
5189

5190
    """
5191
    TagsLU.CheckPrereq(self)
5192
    for tag in self.op.tags:
5193
      objects.TaggableObject.ValidateTag(tag)
5194

    
5195
  def Exec(self, feedback_fn):
5196
    """Sets the tag.
5197

5198
    """
5199
    try:
5200
      for tag in self.op.tags:
5201
        self.target.AddTag(tag)
5202
    except errors.TagError, err:
5203
      raise errors.OpExecError("Error while setting tag: %s" % str(err))
5204
    try:
5205
      self.cfg.Update(self.target)
5206
    except errors.ConfigurationError:
5207
      raise errors.OpRetryError("There has been a modification to the"
5208
                                " config file and the operation has been"
5209
                                " aborted. Please retry.")
5210

    
5211

    
5212
class LUDelTags(TagsLU):
5213
  """Delete a list of tags from a given object.
5214

5215
  """
5216
  _OP_REQP = ["kind", "name", "tags"]
5217
  REQ_BGL = False
5218

    
5219
  def CheckPrereq(self):
5220
    """Check prerequisites.
5221

5222
    This checks that we have the given tag.
5223

5224
    """
5225
    TagsLU.CheckPrereq(self)
5226
    for tag in self.op.tags:
5227
      objects.TaggableObject.ValidateTag(tag)
5228
    del_tags = frozenset(self.op.tags)
5229
    cur_tags = self.target.GetTags()
5230
    if not del_tags <= cur_tags:
5231
      diff_tags = del_tags - cur_tags
5232
      diff_names = ["'%s'" % tag for tag in diff_tags]
5233
      diff_names.sort()
5234
      raise errors.OpPrereqError("Tag(s) %s not found" %
5235
                                 (",".join(diff_names)))
5236

    
5237
  def Exec(self, feedback_fn):
5238
    """Remove the tag from the object.
5239

5240
    """
5241
    for tag in self.op.tags:
5242
      self.target.RemoveTag(tag)
5243
    try:
5244
      self.cfg.Update(self.target)
5245
    except errors.ConfigurationError:
5246
      raise errors.OpRetryError("There has been a modification to the"
5247
                                " config file and the operation has been"
5248
                                " aborted. Please retry.")
5249

    
5250

    
5251
class LUTestDelay(NoHooksLU):
5252
  """Sleep for a specified amount of time.
5253

5254
  This LU sleeps on the master and/or nodes for a specified amount of
5255
  time.
5256

5257
  """
5258
  _OP_REQP = ["duration", "on_master", "on_nodes"]
5259
  REQ_BGL = False
5260

    
5261
  def ExpandNames(self):
5262
    """Expand names and set required locks.
5263

5264
    This expands the node list, if any.
5265

5266
    """
5267
    self.needed_locks = {}
5268
    if self.op.on_nodes:
5269
      # _GetWantedNodes can be used here, but is not always appropriate to use
5270
      # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
5271
      # more information.
5272
      self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
5273
      self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
5274

    
5275
  def CheckPrereq(self):
5276
    """Check prerequisites.
5277

5278
    """
5279

    
5280
  def Exec(self, feedback_fn):
5281
    """Do the actual sleep.
5282

5283
    """
5284
    if self.op.on_master:
5285
      if not utils.TestDelay(self.op.duration):
5286
        raise errors.OpExecError("Error during master delay test")
5287
    if self.op.on_nodes:
5288
      result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
5289
      if not result:
5290
        raise errors.OpExecError("Complete failure from rpc call")
5291
      for node, node_result in result.items():
5292
        if not node_result:
5293
          raise errors.OpExecError("Failure during rpc call to node %s,"
5294
                                   " result: %s" % (node, node_result))
5295

    
5296

    
5297
class IAllocator(object):
5298
  """IAllocator framework.
5299

5300
  An IAllocator instance has three sets of attributes:
5301
    - cfg that is needed to query the cluster
5302
    - input data (all members of the _KEYS class attribute are required)
5303
    - four buffer attributes (in|out_data|text), that represent the
5304
      input (to the external script) in text and data structure format,
5305
      and the output from it, again in two formats
5306
    - the result variables from the script (success, info, nodes) for
5307
      easy usage
5308

5309
  """
5310
  _ALLO_KEYS = [
5311
    "mem_size", "disks", "disk_template",
5312
    "os", "tags", "nics", "vcpus",
5313
    ]
5314
  _RELO_KEYS = [
5315
    "relocate_from",
5316
    ]
5317

    
5318
  def __init__(self, lu, mode, name, **kwargs):
5319
    self.lu = lu
5320
    # init buffer variables
5321
    self.in_text = self.out_text = self.in_data = self.out_data = None
5322
    # init all input fields so that pylint is happy
5323
    self.mode = mode
5324
    self.name = name
5325
    self.mem_size = self.disks = self.disk_template = None
5326
    self.os = self.tags = self.nics = self.vcpus = None
5327
    self.relocate_from = None
5328
    # computed fields
5329
    self.required_nodes = None
5330
    # init result fields
5331
    self.success = self.info = self.nodes = None
5332
    if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5333
      keyset = self._ALLO_KEYS
5334
    elif self.mode == constants.IALLOCATOR_MODE_RELOC:
5335
      keyset = self._RELO_KEYS
5336
    else:
5337
      raise errors.ProgrammerError("Unknown mode '%s' passed to the"
5338
                                   " IAllocator" % self.mode)
5339
    for key in kwargs:
5340
      if key not in keyset:
5341
        raise errors.ProgrammerError("Invalid input parameter '%s' to"
5342
                                     " IAllocator" % key)
5343
      setattr(self, key, kwargs[key])
5344
    for key in keyset:
5345
      if key not in kwargs:
5346
        raise errors.ProgrammerError("Missing input parameter '%s' to"
5347
                                     " IAllocator" % key)
5348
    self._BuildInputData()
5349

    
5350
  def _ComputeClusterData(self):
5351
    """Compute the generic allocator input data.
5352

5353
    This is the data that is independent of the actual operation.
5354

5355
    """
5356
    cfg = self.lu.cfg
5357
    cluster_info = cfg.GetClusterInfo()
5358
    # cluster data
5359
    data = {
5360
      "version": 1,
5361
      "cluster_name": cfg.GetClusterName(),
5362
      "cluster_tags": list(cluster_info.GetTags()),
5363
      "enable_hypervisors": list(cluster_info.enabled_hypervisors),
5364
      # we don't have job IDs
5365
      }
5366
    iinfo = cfg.GetAllInstancesInfo().values()
5367
    i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo]
5368

    
5369
    # node data
5370
    node_results = {}
5371
    node_list = cfg.GetNodeList()
5372
    # FIXME: here we have only one hypervisor information, but
5373
    # instance can belong to different hypervisors
5374
    node_data = self.lu.rpc.call_node_info(node_list, cfg.GetVGName(),
5375
                                           cfg.GetHypervisorType())
5376
    for nname in node_list:
5377
      ninfo = cfg.GetNodeInfo(nname)
5378
      if nname not in node_data or not isinstance(node_data[nname], dict):
5379
        raise errors.OpExecError("Can't get data for node %s" % nname)
5380
      remote_info = node_data[nname]
5381
      for attr in ['memory_total', 'memory_free', 'memory_dom0',
5382
                   'vg_size', 'vg_free', 'cpu_total']:
5383
        if attr not in remote_info:
5384
          raise errors.OpExecError("Node '%s' didn't return attribute '%s'" %
5385
                                   (nname, attr))
5386
        try:
5387
          remote_info[attr] = int(remote_info[attr])
5388
        except ValueError, err:
5389
          raise errors.OpExecError("Node '%s' returned invalid value for '%s':"
5390
                                   " %s" % (nname, attr, str(err)))
5391
      # compute memory used by primary instances
5392
      i_p_mem = i_p_up_mem = 0
5393
      for iinfo, beinfo in i_list:
5394
        if iinfo.primary_node == nname:
5395
          i_p_mem += beinfo[constants.BE_MEMORY]
5396
          if iinfo.status == "up":
5397
            i_p_up_mem += beinfo[constants.BE_MEMORY]
5398

    
5399
      # compute memory used by instances
5400
      pnr = {
5401
        "tags": list(ninfo.GetTags()),
5402
        "total_memory": remote_info['memory_total'],
5403
        "reserved_memory": remote_info['memory_dom0'],
5404
        "free_memory": remote_info['memory_free'],
5405
        "i_pri_memory": i_p_mem,
5406
        "i_pri_up_memory": i_p_up_mem,
5407
        "total_disk": remote_info['vg_size'],
5408
        "free_disk": remote_info['vg_free'],
5409
        "primary_ip": ninfo.primary_ip,
5410
        "secondary_ip": ninfo.secondary_ip,
5411
        "total_cpus": remote_info['cpu_total'],
5412
        }
5413
      node_results[nname] = pnr
5414
    data["nodes"] = node_results
5415

    
5416
    # instance data
5417
    instance_data = {}
5418
    for iinfo, beinfo in i_list:
5419
      nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
5420
                  for n in iinfo.nics]
5421
      pir = {
5422
        "tags": list(iinfo.GetTags()),
5423
        "should_run": iinfo.status == "up",
5424
        "vcpus": beinfo[constants.BE_VCPUS],
5425
        "memory": beinfo[constants.BE_MEMORY],
5426
        "os": iinfo.os,
5427
        "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
5428
        "nics": nic_data,
5429
        "disks": [{"size": dsk.size, "mode": "w"} for dsk in iinfo.disks],
5430
        "disk_template": iinfo.disk_template,
5431
        "hypervisor": iinfo.hypervisor,
5432
        }
5433
      instance_data[iinfo.name] = pir
5434

    
5435
    data["instances"] = instance_data
5436

    
5437
    self.in_data = data
5438

    
5439
  def _AddNewInstance(self):
5440
    """Add new instance data to allocator structure.
5441

5442
    This in combination with _AllocatorGetClusterData will create the
5443
    correct structure needed as input for the allocator.
5444

5445
    The checks for the completeness of the opcode must have already been
5446
    done.
5447

5448
    """
5449
    data = self.in_data
5450
    if len(self.disks) != 2:
5451
      raise errors.OpExecError("Only two-disk configurations supported")
5452

    
5453
    disk_space = _ComputeDiskSize(self.disk_template, self.disks)
5454

    
5455
    if self.disk_template in constants.DTS_NET_MIRROR:
5456
      self.required_nodes = 2
5457
    else:
5458
      self.required_nodes = 1
5459
    request = {
5460
      "type": "allocate",
5461
      "name": self.name,
5462
      "disk_template": self.disk_template,
5463
      "tags": self.tags,
5464
      "os": self.os,
5465
      "vcpus": self.vcpus,
5466
      "memory": self.mem_size,
5467
      "disks": self.disks,
5468
      "disk_space_total": disk_space,
5469
      "nics": self.nics,
5470
      "required_nodes": self.required_nodes,
5471
      }
5472
    data["request"] = request
5473

    
5474
  def _AddRelocateInstance(self):
5475
    """Add relocate instance data to allocator structure.
5476

5477
    This in combination with _IAllocatorGetClusterData will create the
5478
    correct structure needed as input for the allocator.
5479

5480
    The checks for the completeness of the opcode must have already been
5481
    done.
5482

5483
    """
5484
    instance = self.lu.cfg.GetInstanceInfo(self.name)
5485
    if instance is None:
5486
      raise errors.ProgrammerError("Unknown instance '%s' passed to"
5487
                                   " IAllocator" % self.name)
5488

    
5489
    if instance.disk_template not in constants.DTS_NET_MIRROR:
5490
      raise errors.OpPrereqError("Can't relocate non-mirrored instances")
5491

    
5492
    if len(instance.secondary_nodes) != 1:
5493
      raise errors.OpPrereqError("Instance has not exactly one secondary node")
5494

    
5495
    self.required_nodes = 1
5496
    disk_sizes = [{'size': disk.size} for disk in instance.disks]
5497
    disk_space = _ComputeDiskSize(instance.disk_template, disk_sizes)
5498

    
5499
    request = {
5500
      "type": "relocate",
5501
      "name": self.name,
5502
      "disk_space_total": disk_space,
5503
      "required_nodes": self.required_nodes,
5504
      "relocate_from": self.relocate_from,
5505
      }
5506
    self.in_data["request"] = request
5507

    
5508
  def _BuildInputData(self):
5509
    """Build input data structures.
5510

5511
    """
5512
    self._ComputeClusterData()
5513

    
5514
    if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5515
      self._AddNewInstance()
5516
    else:
5517
      self._AddRelocateInstance()
5518

    
5519
    self.in_text = serializer.Dump(self.in_data)
5520

    
5521
  def Run(self, name, validate=True, call_fn=None):
5522
    """Run an instance allocator and return the results.
5523

5524
    """
5525
    if call_fn is None:
5526
      call_fn = self.lu.rpc.call_iallocator_runner
5527
    data = self.in_text
5528

    
5529
    result = call_fn(self.lu.cfg.GetMasterNode(), name, self.in_text)
5530

    
5531
    if not isinstance(result, (list, tuple)) or len(result) != 4:
5532
      raise errors.OpExecError("Invalid result from master iallocator runner")
5533

    
5534
    rcode, stdout, stderr, fail = result
5535

    
5536
    if rcode == constants.IARUN_NOTFOUND:
5537
      raise errors.OpExecError("Can't find allocator '%s'" % name)
5538
    elif rcode == constants.IARUN_FAILURE:
5539
      raise errors.OpExecError("Instance allocator call failed: %s,"
5540
                               " output: %s" % (fail, stdout+stderr))
5541
    self.out_text = stdout
5542
    if validate:
5543
      self._ValidateResult()
5544

    
5545
  def _ValidateResult(self):
5546
    """Process the allocator results.
5547

5548
    This will process and if successful save the result in
5549
    self.out_data and the other parameters.
5550

5551
    """
5552
    try:
5553
      rdict = serializer.Load(self.out_text)
5554
    except Exception, err:
5555
      raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
5556

    
5557
    if not isinstance(rdict, dict):
5558
      raise errors.OpExecError("Can't parse iallocator results: not a dict")
5559

    
5560
    for key in "success", "info", "nodes":
5561
      if key not in rdict:
5562
        raise errors.OpExecError("Can't parse iallocator results:"
5563
                                 " missing key '%s'" % key)
5564
      setattr(self, key, rdict[key])
5565

    
5566
    if not isinstance(rdict["nodes"], list):
5567
      raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
5568
                               " is not a list")
5569
    self.out_data = rdict
5570

    
5571

    
5572
class LUTestAllocator(NoHooksLU):
5573
  """Run allocator tests.
5574

5575
  This LU runs the allocator tests
5576

5577
  """
5578
  _OP_REQP = ["direction", "mode", "name"]
5579

    
5580
  def CheckPrereq(self):
5581
    """Check prerequisites.
5582

5583
    This checks the opcode parameters depending on the director and mode test.
5584

5585
    """
5586
    if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5587
      for attr in ["name", "mem_size", "disks", "disk_template",
5588
                   "os", "tags", "nics", "vcpus"]:
5589
        if not hasattr(self.op, attr):
5590
          raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
5591
                                     attr)
5592
      iname = self.cfg.ExpandInstanceName(self.op.name)
5593
      if iname is not None:
5594
        raise errors.OpPrereqError("Instance '%s' already in the cluster" %
5595
                                   iname)
5596
      if not isinstance(self.op.nics, list):
5597
        raise errors.OpPrereqError("Invalid parameter 'nics'")
5598
      for row in self.op.nics:
5599
        if (not isinstance(row, dict) or
5600
            "mac" not in row or
5601
            "ip" not in row or
5602
            "bridge" not in row):
5603
          raise errors.OpPrereqError("Invalid contents of the"
5604
                                     " 'nics' parameter")
5605
      if not isinstance(self.op.disks, list):
5606
        raise errors.OpPrereqError("Invalid parameter 'disks'")
5607
      if len(self.op.disks) != 2:
5608
        raise errors.OpPrereqError("Only two-disk configurations supported")
5609
      for row in self.op.disks:
5610
        if (not isinstance(row, dict) or
5611
            "size" not in row or
5612
            not isinstance(row["size"], int) or
5613
            "mode" not in row or
5614
            row["mode"] not in ['r', 'w']):
5615
          raise errors.OpPrereqError("Invalid contents of the"
5616
                                     " 'disks' parameter")
5617
    elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
5618
      if not hasattr(self.op, "name"):
5619
        raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
5620
      fname = self.cfg.ExpandInstanceName(self.op.name)
5621
      if fname is None:
5622
        raise errors.OpPrereqError("Instance '%s' not found for relocation" %
5623
                                   self.op.name)
5624
      self.op.name = fname
5625
      self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
5626
    else:
5627
      raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
5628
                                 self.op.mode)
5629

    
5630
    if self.op.direction == constants.IALLOCATOR_DIR_OUT:
5631
      if not hasattr(self.op, "allocator") or self.op.allocator is None:
5632
        raise errors.OpPrereqError("Missing allocator name")
5633
    elif self.op.direction != constants.IALLOCATOR_DIR_IN:
5634
      raise errors.OpPrereqError("Wrong allocator test '%s'" %
5635
                                 self.op.direction)
5636

    
5637
  def Exec(self, feedback_fn):
5638
    """Run the allocator test.
5639

5640
    """
5641
    if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5642
      ial = IAllocator(self,
5643
                       mode=self.op.mode,
5644
                       name=self.op.name,
5645
                       mem_size=self.op.mem_size,
5646
                       disks=self.op.disks,
5647
                       disk_template=self.op.disk_template,
5648
                       os=self.op.os,
5649
                       tags=self.op.tags,
5650
                       nics=self.op.nics,
5651
                       vcpus=self.op.vcpus,
5652
                       )
5653
    else:
5654
      ial = IAllocator(self,
5655
                       mode=self.op.mode,
5656
                       name=self.op.name,
5657
                       relocate_from=list(self.relocate_from),
5658
                       )
5659

    
5660
    if self.op.direction == constants.IALLOCATOR_DIR_IN:
5661
      result = ial.in_text
5662
    else:
5663
      ial.Run(self.op.allocator, validate=False)
5664
      result = ial.out_text
5665
    return result