Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ 4be4691d

History | View | Annotate | Download (195.8 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0613,W0201
25

    
26
import os
27
import os.path
28
import sha
29
import time
30
import tempfile
31
import re
32
import platform
33
import logging
34
import copy
35

    
36
from ganeti import ssh
37
from ganeti import utils
38
from ganeti import errors
39
from ganeti import hypervisor
40
from ganeti import locking
41
from ganeti import constants
42
from ganeti import objects
43
from ganeti import opcodes
44
from ganeti import serializer
45

    
46

    
47
class LogicalUnit(object):
48
  """Logical Unit base class.
49

50
  Subclasses must follow these rules:
51
    - implement ExpandNames
52
    - implement CheckPrereq
53
    - implement Exec
54
    - implement BuildHooksEnv
55
    - redefine HPATH and HTYPE
56
    - optionally redefine their run requirements:
57
        REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
58

59
  Note that all commands require root permissions.
60

61
  """
62
  HPATH = None
63
  HTYPE = None
64
  _OP_REQP = []
65
  REQ_BGL = True
66

    
67
  def __init__(self, processor, op, context, rpc):
68
    """Constructor for LogicalUnit.
69

70
    This needs to be overriden in derived classes in order to check op
71
    validity.
72

73
    """
74
    self.proc = processor
75
    self.op = op
76
    self.cfg = context.cfg
77
    self.context = context
78
    self.rpc = rpc
79
    # Dicts used to declare locking needs to mcpu
80
    self.needed_locks = None
81
    self.acquired_locks = {}
82
    self.share_locks = dict(((i, 0) for i in locking.LEVELS))
83
    self.add_locks = {}
84
    self.remove_locks = {}
85
    # Used to force good behavior when calling helper functions
86
    self.recalculate_locks = {}
87
    self.__ssh = None
88
    # logging
89
    self.LogWarning = processor.LogWarning
90
    self.LogInfo = processor.LogInfo
91

    
92
    for attr_name in self._OP_REQP:
93
      attr_val = getattr(op, attr_name, None)
94
      if attr_val is None:
95
        raise errors.OpPrereqError("Required parameter '%s' missing" %
96
                                   attr_name)
97
    self.CheckArguments()
98

    
99
  def __GetSSH(self):
100
    """Returns the SshRunner object
101

102
    """
103
    if not self.__ssh:
104
      self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
105
    return self.__ssh
106

    
107
  ssh = property(fget=__GetSSH)
108

    
109
  def CheckArguments(self):
110
    """Check syntactic validity for the opcode arguments.
111

112
    This method is for doing a simple syntactic check and ensure
113
    validity of opcode parameters, without any cluster-related
114
    checks. While the same can be accomplished in ExpandNames and/or
115
    CheckPrereq, doing these separate is better because:
116

117
      - ExpandNames is left as as purely a lock-related function
118
      - CheckPrereq is run after we have aquired locks (and possible
119
        waited for them)
120

121
    The function is allowed to change the self.op attribute so that
122
    later methods can no longer worry about missing parameters.
123

124
    """
125
    pass
126

    
127
  def ExpandNames(self):
128
    """Expand names for this LU.
129

130
    This method is called before starting to execute the opcode, and it should
131
    update all the parameters of the opcode to their canonical form (e.g. a
132
    short node name must be fully expanded after this method has successfully
133
    completed). This way locking, hooks, logging, ecc. can work correctly.
134

135
    LUs which implement this method must also populate the self.needed_locks
136
    member, as a dict with lock levels as keys, and a list of needed lock names
137
    as values. Rules:
138

139
      - use an empty dict if you don't need any lock
140
      - if you don't need any lock at a particular level omit that level
141
      - don't put anything for the BGL level
142
      - if you want all locks at a level use locking.ALL_SET as a value
143

144
    If you need to share locks (rather than acquire them exclusively) at one
145
    level you can modify self.share_locks, setting a true value (usually 1) for
146
    that level. By default locks are not shared.
147

148
    Examples::
149

150
      # Acquire all nodes and one instance
151
      self.needed_locks = {
152
        locking.LEVEL_NODE: locking.ALL_SET,
153
        locking.LEVEL_INSTANCE: ['instance1.example.tld'],
154
      }
155
      # Acquire just two nodes
156
      self.needed_locks = {
157
        locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
158
      }
159
      # Acquire no locks
160
      self.needed_locks = {} # No, you can't leave it to the default value None
161

162
    """
163
    # The implementation of this method is mandatory only if the new LU is
164
    # concurrent, so that old LUs don't need to be changed all at the same
165
    # time.
166
    if self.REQ_BGL:
167
      self.needed_locks = {} # Exclusive LUs don't need locks.
168
    else:
169
      raise NotImplementedError
170

    
171
  def DeclareLocks(self, level):
172
    """Declare LU locking needs for a level
173

174
    While most LUs can just declare their locking needs at ExpandNames time,
175
    sometimes there's the need to calculate some locks after having acquired
176
    the ones before. This function is called just before acquiring locks at a
177
    particular level, but after acquiring the ones at lower levels, and permits
178
    such calculations. It can be used to modify self.needed_locks, and by
179
    default it does nothing.
180

181
    This function is only called if you have something already set in
182
    self.needed_locks for the level.
183

184
    @param level: Locking level which is going to be locked
185
    @type level: member of ganeti.locking.LEVELS
186

187
    """
188

    
189
  def CheckPrereq(self):
190
    """Check prerequisites for this LU.
191

192
    This method should check that the prerequisites for the execution
193
    of this LU are fulfilled. It can do internode communication, but
194
    it should be idempotent - no cluster or system changes are
195
    allowed.
196

197
    The method should raise errors.OpPrereqError in case something is
198
    not fulfilled. Its return value is ignored.
199

200
    This method should also update all the parameters of the opcode to
201
    their canonical form if it hasn't been done by ExpandNames before.
202

203
    """
204
    raise NotImplementedError
205

    
206
  def Exec(self, feedback_fn):
207
    """Execute the LU.
208

209
    This method should implement the actual work. It should raise
210
    errors.OpExecError for failures that are somewhat dealt with in
211
    code, or expected.
212

213
    """
214
    raise NotImplementedError
215

    
216
  def BuildHooksEnv(self):
217
    """Build hooks environment for this LU.
218

219
    This method should return a three-node tuple consisting of: a dict
220
    containing the environment that will be used for running the
221
    specific hook for this LU, a list of node names on which the hook
222
    should run before the execution, and a list of node names on which
223
    the hook should run after the execution.
224

225
    The keys of the dict must not have 'GANETI_' prefixed as this will
226
    be handled in the hooks runner. Also note additional keys will be
227
    added by the hooks runner. If the LU doesn't define any
228
    environment, an empty dict (and not None) should be returned.
229

230
    No nodes should be returned as an empty list (and not None).
231

232
    Note that if the HPATH for a LU class is None, this function will
233
    not be called.
234

235
    """
236
    raise NotImplementedError
237

    
238
  def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
239
    """Notify the LU about the results of its hooks.
240

241
    This method is called every time a hooks phase is executed, and notifies
242
    the Logical Unit about the hooks' result. The LU can then use it to alter
243
    its result based on the hooks.  By default the method does nothing and the
244
    previous result is passed back unchanged but any LU can define it if it
245
    wants to use the local cluster hook-scripts somehow.
246

247
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
248
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
249
    @param hook_results: the results of the multi-node hooks rpc call
250
    @param feedback_fn: function used send feedback back to the caller
251
    @param lu_result: the previous Exec result this LU had, or None
252
        in the PRE phase
253
    @return: the new Exec result, based on the previous result
254
        and hook results
255

256
    """
257
    return lu_result
258

    
259
  def _ExpandAndLockInstance(self):
260
    """Helper function to expand and lock an instance.
261

262
    Many LUs that work on an instance take its name in self.op.instance_name
263
    and need to expand it and then declare the expanded name for locking. This
264
    function does it, and then updates self.op.instance_name to the expanded
265
    name. It also initializes needed_locks as a dict, if this hasn't been done
266
    before.
267

268
    """
269
    if self.needed_locks is None:
270
      self.needed_locks = {}
271
    else:
272
      assert locking.LEVEL_INSTANCE not in self.needed_locks, \
273
        "_ExpandAndLockInstance called with instance-level locks set"
274
    expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
275
    if expanded_name is None:
276
      raise errors.OpPrereqError("Instance '%s' not known" %
277
                                  self.op.instance_name)
278
    self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
279
    self.op.instance_name = expanded_name
280

    
281
  def _LockInstancesNodes(self, primary_only=False):
282
    """Helper function to declare instances' nodes for locking.
283

284
    This function should be called after locking one or more instances to lock
285
    their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
286
    with all primary or secondary nodes for instances already locked and
287
    present in self.needed_locks[locking.LEVEL_INSTANCE].
288

289
    It should be called from DeclareLocks, and for safety only works if
290
    self.recalculate_locks[locking.LEVEL_NODE] is set.
291

292
    In the future it may grow parameters to just lock some instance's nodes, or
293
    to just lock primaries or secondary nodes, if needed.
294

295
    If should be called in DeclareLocks in a way similar to::
296

297
      if level == locking.LEVEL_NODE:
298
        self._LockInstancesNodes()
299

300
    @type primary_only: boolean
301
    @param primary_only: only lock primary nodes of locked instances
302

303
    """
304
    assert locking.LEVEL_NODE in self.recalculate_locks, \
305
      "_LockInstancesNodes helper function called with no nodes to recalculate"
306

    
307
    # TODO: check if we're really been called with the instance locks held
308

    
309
    # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
310
    # future we might want to have different behaviors depending on the value
311
    # of self.recalculate_locks[locking.LEVEL_NODE]
312
    wanted_nodes = []
313
    for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
314
      instance = self.context.cfg.GetInstanceInfo(instance_name)
315
      wanted_nodes.append(instance.primary_node)
316
      if not primary_only:
317
        wanted_nodes.extend(instance.secondary_nodes)
318

    
319
    if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
320
      self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
321
    elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
322
      self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
323

    
324
    del self.recalculate_locks[locking.LEVEL_NODE]
325

    
326

    
327
class NoHooksLU(LogicalUnit):
328
  """Simple LU which runs no hooks.
329

330
  This LU is intended as a parent for other LogicalUnits which will
331
  run no hooks, in order to reduce duplicate code.
332

333
  """
334
  HPATH = None
335
  HTYPE = None
336

    
337

    
338
def _GetWantedNodes(lu, nodes):
339
  """Returns list of checked and expanded node names.
340

341
  @type lu: L{LogicalUnit}
342
  @param lu: the logical unit on whose behalf we execute
343
  @type nodes: list
344
  @param nodes: list of node names or None for all nodes
345
  @rtype: list
346
  @return: the list of nodes, sorted
347
  @raise errors.OpProgrammerError: if the nodes parameter is wrong type
348

349
  """
350
  if not isinstance(nodes, list):
351
    raise errors.OpPrereqError("Invalid argument type 'nodes'")
352

    
353
  if not nodes:
354
    raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
355
      " non-empty list of nodes whose name is to be expanded.")
356

    
357
  wanted = []
358
  for name in nodes:
359
    node = lu.cfg.ExpandNodeName(name)
360
    if node is None:
361
      raise errors.OpPrereqError("No such node name '%s'" % name)
362
    wanted.append(node)
363

    
364
  return utils.NiceSort(wanted)
365

    
366

    
367
def _GetWantedInstances(lu, instances):
368
  """Returns list of checked and expanded instance names.
369

370
  @type lu: L{LogicalUnit}
371
  @param lu: the logical unit on whose behalf we execute
372
  @type instances: list
373
  @param instances: list of instance names or None for all instances
374
  @rtype: list
375
  @return: the list of instances, sorted
376
  @raise errors.OpPrereqError: if the instances parameter is wrong type
377
  @raise errors.OpPrereqError: if any of the passed instances is not found
378

379
  """
380
  if not isinstance(instances, list):
381
    raise errors.OpPrereqError("Invalid argument type 'instances'")
382

    
383
  if instances:
384
    wanted = []
385

    
386
    for name in instances:
387
      instance = lu.cfg.ExpandInstanceName(name)
388
      if instance is None:
389
        raise errors.OpPrereqError("No such instance name '%s'" % name)
390
      wanted.append(instance)
391

    
392
  else:
393
    wanted = lu.cfg.GetInstanceList()
394
  return utils.NiceSort(wanted)
395

    
396

    
397
def _CheckOutputFields(static, dynamic, selected):
398
  """Checks whether all selected fields are valid.
399

400
  @type static: L{utils.FieldSet}
401
  @param static: static fields set
402
  @type dynamic: L{utils.FieldSet}
403
  @param dynamic: dynamic fields set
404

405
  """
406
  f = utils.FieldSet()
407
  f.Extend(static)
408
  f.Extend(dynamic)
409

    
410
  delta = f.NonMatching(selected)
411
  if delta:
412
    raise errors.OpPrereqError("Unknown output fields selected: %s"
413
                               % ",".join(delta))
414

    
415

    
416
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
417
                          memory, vcpus, nics):
418
  """Builds instance related env variables for hooks
419

420
  This builds the hook environment from individual variables.
421

422
  @type name: string
423
  @param name: the name of the instance
424
  @type primary_node: string
425
  @param primary_node: the name of the instance's primary node
426
  @type secondary_nodes: list
427
  @param secondary_nodes: list of secondary nodes as strings
428
  @type os_type: string
429
  @param os_type: the name of the instance's OS
430
  @type status: string
431
  @param status: the desired status of the instances
432
  @type memory: string
433
  @param memory: the memory size of the instance
434
  @type vcpus: string
435
  @param vcpus: the count of VCPUs the instance has
436
  @type nics: list
437
  @param nics: list of tuples (ip, bridge, mac) representing
438
      the NICs the instance  has
439
  @rtype: dict
440
  @return: the hook environment for this instance
441

442
  """
443
  env = {
444
    "OP_TARGET": name,
445
    "INSTANCE_NAME": name,
446
    "INSTANCE_PRIMARY": primary_node,
447
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
448
    "INSTANCE_OS_TYPE": os_type,
449
    "INSTANCE_STATUS": status,
450
    "INSTANCE_MEMORY": memory,
451
    "INSTANCE_VCPUS": vcpus,
452
  }
453

    
454
  if nics:
455
    nic_count = len(nics)
456
    for idx, (ip, bridge, mac) in enumerate(nics):
457
      if ip is None:
458
        ip = ""
459
      env["INSTANCE_NIC%d_IP" % idx] = ip
460
      env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
461
      env["INSTANCE_NIC%d_HWADDR" % idx] = mac
462
  else:
463
    nic_count = 0
464

    
465
  env["INSTANCE_NIC_COUNT"] = nic_count
466

    
467
  return env
468

    
469

    
470
def _BuildInstanceHookEnvByObject(lu, instance, override=None):
471
  """Builds instance related env variables for hooks from an object.
472

473
  @type lu: L{LogicalUnit}
474
  @param lu: the logical unit on whose behalf we execute
475
  @type instance: L{objects.Instance}
476
  @param instance: the instance for which we should build the
477
      environment
478
  @type override: dict
479
  @param override: dictionary with key/values that will override
480
      our values
481
  @rtype: dict
482
  @return: the hook environment dictionary
483

484
  """
485
  bep = lu.cfg.GetClusterInfo().FillBE(instance)
486
  args = {
487
    'name': instance.name,
488
    'primary_node': instance.primary_node,
489
    'secondary_nodes': instance.secondary_nodes,
490
    'os_type': instance.os,
491
    'status': instance.os,
492
    'memory': bep[constants.BE_MEMORY],
493
    'vcpus': bep[constants.BE_VCPUS],
494
    'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
495
  }
496
  if override:
497
    args.update(override)
498
  return _BuildInstanceHookEnv(**args)
499

    
500

    
501
def _CheckInstanceBridgesExist(lu, instance):
502
  """Check that the brigdes needed by an instance exist.
503

504
  """
505
  # check bridges existance
506
  brlist = [nic.bridge for nic in instance.nics]
507
  if not lu.rpc.call_bridges_exist(instance.primary_node, brlist):
508
    raise errors.OpPrereqError("one or more target bridges %s does not"
509
                               " exist on destination node '%s'" %
510
                               (brlist, instance.primary_node))
511

    
512

    
513
class LUDestroyCluster(NoHooksLU):
514
  """Logical unit for destroying the cluster.
515

516
  """
517
  _OP_REQP = []
518

    
519
  def CheckPrereq(self):
520
    """Check prerequisites.
521

522
    This checks whether the cluster is empty.
523

524
    Any errors are signalled by raising errors.OpPrereqError.
525

526
    """
527
    master = self.cfg.GetMasterNode()
528

    
529
    nodelist = self.cfg.GetNodeList()
530
    if len(nodelist) != 1 or nodelist[0] != master:
531
      raise errors.OpPrereqError("There are still %d node(s) in"
532
                                 " this cluster." % (len(nodelist) - 1))
533
    instancelist = self.cfg.GetInstanceList()
534
    if instancelist:
535
      raise errors.OpPrereqError("There are still %d instance(s) in"
536
                                 " this cluster." % len(instancelist))
537

    
538
  def Exec(self, feedback_fn):
539
    """Destroys the cluster.
540

541
    """
542
    master = self.cfg.GetMasterNode()
543
    if not self.rpc.call_node_stop_master(master, False):
544
      raise errors.OpExecError("Could not disable the master role")
545
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
546
    utils.CreateBackup(priv_key)
547
    utils.CreateBackup(pub_key)
548
    return master
549

    
550

    
551
class LUVerifyCluster(LogicalUnit):
552
  """Verifies the cluster status.
553

554
  """
555
  HPATH = "cluster-verify"
556
  HTYPE = constants.HTYPE_CLUSTER
557
  _OP_REQP = ["skip_checks"]
558
  REQ_BGL = False
559

    
560
  def ExpandNames(self):
561
    self.needed_locks = {
562
      locking.LEVEL_NODE: locking.ALL_SET,
563
      locking.LEVEL_INSTANCE: locking.ALL_SET,
564
    }
565
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
566

    
567
  def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
568
                  remote_version, feedback_fn):
569
    """Run multiple tests against a node.
570

571
    Test list::
572

573
      - compares ganeti version
574
      - checks vg existance and size > 20G
575
      - checks config file checksum
576
      - checks ssh to other nodes
577

578
    @type node: string
579
    @param node: the name of the node to check
580
    @param file_list: required list of files
581
    @param local_cksum: dictionary of local files and their checksums
582
    @type vglist: dict
583
    @param vglist: dictionary of volume group names and their size
584
    @param node_result: the results from the node
585
    @param remote_version: the RPC version from the remote node
586
    @param feedback_fn: function used to accumulate results
587

588
    """
589
    # compares ganeti version
590
    local_version = constants.PROTOCOL_VERSION
591
    if not remote_version:
592
      feedback_fn("  - ERROR: connection to %s failed" % (node))
593
      return True
594

    
595
    if local_version != remote_version:
596
      feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
597
                      (local_version, node, remote_version))
598
      return True
599

    
600
    # checks vg existance and size > 20G
601

    
602
    bad = False
603
    if not vglist:
604
      feedback_fn("  - ERROR: unable to check volume groups on node %s." %
605
                      (node,))
606
      bad = True
607
    else:
608
      vgstatus = utils.CheckVolumeGroupSize(vglist, self.cfg.GetVGName(),
609
                                            constants.MIN_VG_SIZE)
610
      if vgstatus:
611
        feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
612
        bad = True
613

    
614
    if not node_result:
615
      feedback_fn("  - ERROR: unable to verify node %s." % (node,))
616
      return True
617

    
618
    # checks config file checksum
619
    # checks ssh to any
620

    
621
    if 'filelist' not in node_result:
622
      bad = True
623
      feedback_fn("  - ERROR: node hasn't returned file checksum data")
624
    else:
625
      remote_cksum = node_result['filelist']
626
      for file_name in file_list:
627
        if file_name not in remote_cksum:
628
          bad = True
629
          feedback_fn("  - ERROR: file '%s' missing" % file_name)
630
        elif remote_cksum[file_name] != local_cksum[file_name]:
631
          bad = True
632
          feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
633

    
634
    if 'nodelist' not in node_result:
635
      bad = True
636
      feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
637
    else:
638
      if node_result['nodelist']:
639
        bad = True
640
        for node in node_result['nodelist']:
641
          feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
642
                          (node, node_result['nodelist'][node]))
643
    if 'node-net-test' not in node_result:
644
      bad = True
645
      feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
646
    else:
647
      if node_result['node-net-test']:
648
        bad = True
649
        nlist = utils.NiceSort(node_result['node-net-test'].keys())
650
        for node in nlist:
651
          feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
652
                          (node, node_result['node-net-test'][node]))
653

    
654
    hyp_result = node_result.get('hypervisor', None)
655
    if isinstance(hyp_result, dict):
656
      for hv_name, hv_result in hyp_result.iteritems():
657
        if hv_result is not None:
658
          feedback_fn("  - ERROR: hypervisor %s verify failure: '%s'" %
659
                      (hv_name, hv_result))
660
    return bad
661

    
662
  def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
663
                      node_instance, feedback_fn):
664
    """Verify an instance.
665

666
    This function checks to see if the required block devices are
667
    available on the instance's node.
668

669
    """
670
    bad = False
671

    
672
    node_current = instanceconfig.primary_node
673

    
674
    node_vol_should = {}
675
    instanceconfig.MapLVsByNode(node_vol_should)
676

    
677
    for node in node_vol_should:
678
      for volume in node_vol_should[node]:
679
        if node not in node_vol_is or volume not in node_vol_is[node]:
680
          feedback_fn("  - ERROR: volume %s missing on node %s" %
681
                          (volume, node))
682
          bad = True
683

    
684
    if not instanceconfig.status == 'down':
685
      if (node_current not in node_instance or
686
          not instance in node_instance[node_current]):
687
        feedback_fn("  - ERROR: instance %s not running on node %s" %
688
                        (instance, node_current))
689
        bad = True
690

    
691
    for node in node_instance:
692
      if (not node == node_current):
693
        if instance in node_instance[node]:
694
          feedback_fn("  - ERROR: instance %s should not run on node %s" %
695
                          (instance, node))
696
          bad = True
697

    
698
    return bad
699

    
700
  def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
701
    """Verify if there are any unknown volumes in the cluster.
702

703
    The .os, .swap and backup volumes are ignored. All other volumes are
704
    reported as unknown.
705

706
    """
707
    bad = False
708

    
709
    for node in node_vol_is:
710
      for volume in node_vol_is[node]:
711
        if node not in node_vol_should or volume not in node_vol_should[node]:
712
          feedback_fn("  - ERROR: volume %s on node %s should not exist" %
713
                      (volume, node))
714
          bad = True
715
    return bad
716

    
717
  def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
718
    """Verify the list of running instances.
719

720
    This checks what instances are running but unknown to the cluster.
721

722
    """
723
    bad = False
724
    for node in node_instance:
725
      for runninginstance in node_instance[node]:
726
        if runninginstance not in instancelist:
727
          feedback_fn("  - ERROR: instance %s on node %s should not exist" %
728
                          (runninginstance, node))
729
          bad = True
730
    return bad
731

    
732
  def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
733
    """Verify N+1 Memory Resilience.
734

735
    Check that if one single node dies we can still start all the instances it
736
    was primary for.
737

738
    """
739
    bad = False
740

    
741
    for node, nodeinfo in node_info.iteritems():
742
      # This code checks that every node which is now listed as secondary has
743
      # enough memory to host all instances it is supposed to should a single
744
      # other node in the cluster fail.
745
      # FIXME: not ready for failover to an arbitrary node
746
      # FIXME: does not support file-backed instances
747
      # WARNING: we currently take into account down instances as well as up
748
      # ones, considering that even if they're down someone might want to start
749
      # them even in the event of a node failure.
750
      for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
751
        needed_mem = 0
752
        for instance in instances:
753
          bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
754
          if bep[constants.BE_AUTO_BALANCE]:
755
            needed_mem += bep[constants.BE_MEMORY]
756
        if nodeinfo['mfree'] < needed_mem:
757
          feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
758
                      " failovers should node %s fail" % (node, prinode))
759
          bad = True
760
    return bad
761

    
762
  def CheckPrereq(self):
763
    """Check prerequisites.
764

765
    Transform the list of checks we're going to skip into a set and check that
766
    all its members are valid.
767

768
    """
769
    self.skip_set = frozenset(self.op.skip_checks)
770
    if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
771
      raise errors.OpPrereqError("Invalid checks to be skipped specified")
772

    
773
  def BuildHooksEnv(self):
774
    """Build hooks env.
775

776
    Cluster-Verify hooks just rone in the post phase and their failure makes
777
    the output be logged in the verify output and the verification to fail.
778

779
    """
780
    all_nodes = self.cfg.GetNodeList()
781
    # TODO: populate the environment with useful information for verify hooks
782
    env = {}
783
    return env, [], all_nodes
784

    
785
  def Exec(self, feedback_fn):
786
    """Verify integrity of cluster, performing various test on nodes.
787

788
    """
789
    bad = False
790
    feedback_fn("* Verifying global settings")
791
    for msg in self.cfg.VerifyConfig():
792
      feedback_fn("  - ERROR: %s" % msg)
793

    
794
    vg_name = self.cfg.GetVGName()
795
    hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
796
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
797
    nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
798
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
799
    i_non_redundant = [] # Non redundant instances
800
    i_non_a_balanced = [] # Non auto-balanced instances
801
    node_volume = {}
802
    node_instance = {}
803
    node_info = {}
804
    instance_cfg = {}
805

    
806
    # FIXME: verify OS list
807
    # do local checksums
808
    file_names = []
809
    file_names.append(constants.SSL_CERT_FILE)
810
    file_names.append(constants.CLUSTER_CONF_FILE)
811
    local_checksums = utils.FingerprintFiles(file_names)
812

    
813
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
814
    all_volumeinfo = self.rpc.call_volume_list(nodelist, vg_name)
815
    all_instanceinfo = self.rpc.call_instance_list(nodelist, hypervisors)
816
    all_vglist = self.rpc.call_vg_list(nodelist)
817
    node_verify_param = {
818
      'filelist': file_names,
819
      'nodelist': nodelist,
820
      'hypervisor': hypervisors,
821
      'node-net-test': [(node.name, node.primary_ip, node.secondary_ip)
822
                        for node in nodeinfo]
823
      }
824
    all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
825
                                           self.cfg.GetClusterName())
826
    all_rversion = self.rpc.call_version(nodelist)
827
    all_ninfo = self.rpc.call_node_info(nodelist, self.cfg.GetVGName(),
828
                                        self.cfg.GetHypervisorType())
829

    
830
    cluster = self.cfg.GetClusterInfo()
831
    for node in nodelist:
832
      feedback_fn("* Verifying node %s" % node)
833
      result = self._VerifyNode(node, file_names, local_checksums,
834
                                all_vglist[node], all_nvinfo[node],
835
                                all_rversion[node], feedback_fn)
836
      bad = bad or result
837

    
838
      # node_volume
839
      volumeinfo = all_volumeinfo[node]
840

    
841
      if isinstance(volumeinfo, basestring):
842
        feedback_fn("  - ERROR: LVM problem on node %s: %s" %
843
                    (node, volumeinfo[-400:].encode('string_escape')))
844
        bad = True
845
        node_volume[node] = {}
846
      elif not isinstance(volumeinfo, dict):
847
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
848
        bad = True
849
        continue
850
      else:
851
        node_volume[node] = volumeinfo
852

    
853
      # node_instance
854
      nodeinstance = all_instanceinfo[node]
855
      if type(nodeinstance) != list:
856
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
857
        bad = True
858
        continue
859

    
860
      node_instance[node] = nodeinstance
861

    
862
      # node_info
863
      nodeinfo = all_ninfo[node]
864
      if not isinstance(nodeinfo, dict):
865
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
866
        bad = True
867
        continue
868

    
869
      try:
870
        node_info[node] = {
871
          "mfree": int(nodeinfo['memory_free']),
872
          "dfree": int(nodeinfo['vg_free']),
873
          "pinst": [],
874
          "sinst": [],
875
          # dictionary holding all instances this node is secondary for,
876
          # grouped by their primary node. Each key is a cluster node, and each
877
          # value is a list of instances which have the key as primary and the
878
          # current node as secondary.  this is handy to calculate N+1 memory
879
          # availability if you can only failover from a primary to its
880
          # secondary.
881
          "sinst-by-pnode": {},
882
        }
883
      except ValueError:
884
        feedback_fn("  - ERROR: invalid value returned from node %s" % (node,))
885
        bad = True
886
        continue
887

    
888
    node_vol_should = {}
889

    
890
    for instance in instancelist:
891
      feedback_fn("* Verifying instance %s" % instance)
892
      inst_config = self.cfg.GetInstanceInfo(instance)
893
      result =  self._VerifyInstance(instance, inst_config, node_volume,
894
                                     node_instance, feedback_fn)
895
      bad = bad or result
896

    
897
      inst_config.MapLVsByNode(node_vol_should)
898

    
899
      instance_cfg[instance] = inst_config
900

    
901
      pnode = inst_config.primary_node
902
      if pnode in node_info:
903
        node_info[pnode]['pinst'].append(instance)
904
      else:
905
        feedback_fn("  - ERROR: instance %s, connection to primary node"
906
                    " %s failed" % (instance, pnode))
907
        bad = True
908

    
909
      # If the instance is non-redundant we cannot survive losing its primary
910
      # node, so we are not N+1 compliant. On the other hand we have no disk
911
      # templates with more than one secondary so that situation is not well
912
      # supported either.
913
      # FIXME: does not support file-backed instances
914
      if len(inst_config.secondary_nodes) == 0:
915
        i_non_redundant.append(instance)
916
      elif len(inst_config.secondary_nodes) > 1:
917
        feedback_fn("  - WARNING: multiple secondaries for instance %s"
918
                    % instance)
919

    
920
      if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
921
        i_non_a_balanced.append(instance)
922

    
923
      for snode in inst_config.secondary_nodes:
924
        if snode in node_info:
925
          node_info[snode]['sinst'].append(instance)
926
          if pnode not in node_info[snode]['sinst-by-pnode']:
927
            node_info[snode]['sinst-by-pnode'][pnode] = []
928
          node_info[snode]['sinst-by-pnode'][pnode].append(instance)
929
        else:
930
          feedback_fn("  - ERROR: instance %s, connection to secondary node"
931
                      " %s failed" % (instance, snode))
932

    
933
    feedback_fn("* Verifying orphan volumes")
934
    result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
935
                                       feedback_fn)
936
    bad = bad or result
937

    
938
    feedback_fn("* Verifying remaining instances")
939
    result = self._VerifyOrphanInstances(instancelist, node_instance,
940
                                         feedback_fn)
941
    bad = bad or result
942

    
943
    if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
944
      feedback_fn("* Verifying N+1 Memory redundancy")
945
      result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
946
      bad = bad or result
947

    
948
    feedback_fn("* Other Notes")
949
    if i_non_redundant:
950
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
951
                  % len(i_non_redundant))
952

    
953
    if i_non_a_balanced:
954
      feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
955
                  % len(i_non_a_balanced))
956

    
957
    return not bad
958

    
959
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
960
    """Analize the post-hooks' result
961

962
    This method analyses the hook result, handles it, and sends some
963
    nicely-formatted feedback back to the user.
964

965
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
966
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
967
    @param hooks_results: the results of the multi-node hooks rpc call
968
    @param feedback_fn: function used send feedback back to the caller
969
    @param lu_result: previous Exec result
970
    @return: the new Exec result, based on the previous result
971
        and hook results
972

973
    """
974
    # We only really run POST phase hooks, and are only interested in
975
    # their results
976
    if phase == constants.HOOKS_PHASE_POST:
977
      # Used to change hooks' output to proper indentation
978
      indent_re = re.compile('^', re.M)
979
      feedback_fn("* Hooks Results")
980
      if not hooks_results:
981
        feedback_fn("  - ERROR: general communication failure")
982
        lu_result = 1
983
      else:
984
        for node_name in hooks_results:
985
          show_node_header = True
986
          res = hooks_results[node_name]
987
          if res is False or not isinstance(res, list):
988
            feedback_fn("    Communication failure")
989
            lu_result = 1
990
            continue
991
          for script, hkr, output in res:
992
            if hkr == constants.HKR_FAIL:
993
              # The node header is only shown once, if there are
994
              # failing hooks on that node
995
              if show_node_header:
996
                feedback_fn("  Node %s:" % node_name)
997
                show_node_header = False
998
              feedback_fn("    ERROR: Script %s failed, output:" % script)
999
              output = indent_re.sub('      ', output)
1000
              feedback_fn("%s" % output)
1001
              lu_result = 1
1002

    
1003
      return lu_result
1004

    
1005

    
1006
class LUVerifyDisks(NoHooksLU):
1007
  """Verifies the cluster disks status.
1008

1009
  """
1010
  _OP_REQP = []
1011
  REQ_BGL = False
1012

    
1013
  def ExpandNames(self):
1014
    self.needed_locks = {
1015
      locking.LEVEL_NODE: locking.ALL_SET,
1016
      locking.LEVEL_INSTANCE: locking.ALL_SET,
1017
    }
1018
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1019

    
1020
  def CheckPrereq(self):
1021
    """Check prerequisites.
1022

1023
    This has no prerequisites.
1024

1025
    """
1026
    pass
1027

    
1028
  def Exec(self, feedback_fn):
1029
    """Verify integrity of cluster disks.
1030

1031
    """
1032
    result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
1033

    
1034
    vg_name = self.cfg.GetVGName()
1035
    nodes = utils.NiceSort(self.cfg.GetNodeList())
1036
    instances = [self.cfg.GetInstanceInfo(name)
1037
                 for name in self.cfg.GetInstanceList()]
1038

    
1039
    nv_dict = {}
1040
    for inst in instances:
1041
      inst_lvs = {}
1042
      if (inst.status != "up" or
1043
          inst.disk_template not in constants.DTS_NET_MIRROR):
1044
        continue
1045
      inst.MapLVsByNode(inst_lvs)
1046
      # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1047
      for node, vol_list in inst_lvs.iteritems():
1048
        for vol in vol_list:
1049
          nv_dict[(node, vol)] = inst
1050

    
1051
    if not nv_dict:
1052
      return result
1053

    
1054
    node_lvs = self.rpc.call_volume_list(nodes, vg_name)
1055

    
1056
    to_act = set()
1057
    for node in nodes:
1058
      # node_volume
1059
      lvs = node_lvs[node]
1060

    
1061
      if isinstance(lvs, basestring):
1062
        logging.warning("Error enumerating LVs on node %s: %s", node, lvs)
1063
        res_nlvm[node] = lvs
1064
      elif not isinstance(lvs, dict):
1065
        logging.warning("Connection to node %s failed or invalid data"
1066
                        " returned", node)
1067
        res_nodes.append(node)
1068
        continue
1069

    
1070
      for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
1071
        inst = nv_dict.pop((node, lv_name), None)
1072
        if (not lv_online and inst is not None
1073
            and inst.name not in res_instances):
1074
          res_instances.append(inst.name)
1075

    
1076
    # any leftover items in nv_dict are missing LVs, let's arrange the
1077
    # data better
1078
    for key, inst in nv_dict.iteritems():
1079
      if inst.name not in res_missing:
1080
        res_missing[inst.name] = []
1081
      res_missing[inst.name].append(key)
1082

    
1083
    return result
1084

    
1085

    
1086
class LURenameCluster(LogicalUnit):
1087
  """Rename the cluster.
1088

1089
  """
1090
  HPATH = "cluster-rename"
1091
  HTYPE = constants.HTYPE_CLUSTER
1092
  _OP_REQP = ["name"]
1093

    
1094
  def BuildHooksEnv(self):
1095
    """Build hooks env.
1096

1097
    """
1098
    env = {
1099
      "OP_TARGET": self.cfg.GetClusterName(),
1100
      "NEW_NAME": self.op.name,
1101
      }
1102
    mn = self.cfg.GetMasterNode()
1103
    return env, [mn], [mn]
1104

    
1105
  def CheckPrereq(self):
1106
    """Verify that the passed name is a valid one.
1107

1108
    """
1109
    hostname = utils.HostInfo(self.op.name)
1110

    
1111
    new_name = hostname.name
1112
    self.ip = new_ip = hostname.ip
1113
    old_name = self.cfg.GetClusterName()
1114
    old_ip = self.cfg.GetMasterIP()
1115
    if new_name == old_name and new_ip == old_ip:
1116
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
1117
                                 " cluster has changed")
1118
    if new_ip != old_ip:
1119
      if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1120
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
1121
                                   " reachable on the network. Aborting." %
1122
                                   new_ip)
1123

    
1124
    self.op.name = new_name
1125

    
1126
  def Exec(self, feedback_fn):
1127
    """Rename the cluster.
1128

1129
    """
1130
    clustername = self.op.name
1131
    ip = self.ip
1132

    
1133
    # shutdown the master IP
1134
    master = self.cfg.GetMasterNode()
1135
    if not self.rpc.call_node_stop_master(master, False):
1136
      raise errors.OpExecError("Could not disable the master role")
1137

    
1138
    try:
1139
      # modify the sstore
1140
      # TODO: sstore
1141
      ss.SetKey(ss.SS_MASTER_IP, ip)
1142
      ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
1143

    
1144
      # Distribute updated ss config to all nodes
1145
      myself = self.cfg.GetNodeInfo(master)
1146
      dist_nodes = self.cfg.GetNodeList()
1147
      if myself.name in dist_nodes:
1148
        dist_nodes.remove(myself.name)
1149

    
1150
      logging.debug("Copying updated ssconf data to all nodes")
1151
      for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1152
        fname = ss.KeyToFilename(keyname)
1153
        result = self.rpc.call_upload_file(dist_nodes, fname)
1154
        for to_node in dist_nodes:
1155
          if not result[to_node]:
1156
            self.LogWarning("Copy of file %s to node %s failed",
1157
                            fname, to_node)
1158
    finally:
1159
      if not self.rpc.call_node_start_master(master, False):
1160
        self.LogWarning("Could not re-enable the master role on"
1161
                        " the master, please restart manually.")
1162

    
1163

    
1164
def _RecursiveCheckIfLVMBased(disk):
1165
  """Check if the given disk or its children are lvm-based.
1166

1167
  @type disk: L{objects.Disk}
1168
  @param disk: the disk to check
1169
  @rtype: booleean
1170
  @return: boolean indicating whether a LD_LV dev_type was found or not
1171

1172
  """
1173
  if disk.children:
1174
    for chdisk in disk.children:
1175
      if _RecursiveCheckIfLVMBased(chdisk):
1176
        return True
1177
  return disk.dev_type == constants.LD_LV
1178

    
1179

    
1180
class LUSetClusterParams(LogicalUnit):
1181
  """Change the parameters of the cluster.
1182

1183
  """
1184
  HPATH = "cluster-modify"
1185
  HTYPE = constants.HTYPE_CLUSTER
1186
  _OP_REQP = []
1187
  REQ_BGL = False
1188

    
1189
  def ExpandNames(self):
1190
    # FIXME: in the future maybe other cluster params won't require checking on
1191
    # all nodes to be modified.
1192
    self.needed_locks = {
1193
      locking.LEVEL_NODE: locking.ALL_SET,
1194
    }
1195
    self.share_locks[locking.LEVEL_NODE] = 1
1196

    
1197
  def BuildHooksEnv(self):
1198
    """Build hooks env.
1199

1200
    """
1201
    env = {
1202
      "OP_TARGET": self.cfg.GetClusterName(),
1203
      "NEW_VG_NAME": self.op.vg_name,
1204
      }
1205
    mn = self.cfg.GetMasterNode()
1206
    return env, [mn], [mn]
1207

    
1208
  def CheckPrereq(self):
1209
    """Check prerequisites.
1210

1211
    This checks whether the given params don't conflict and
1212
    if the given volume group is valid.
1213

1214
    """
1215
    # FIXME: This only works because there is only one parameter that can be
1216
    # changed or removed.
1217
    if self.op.vg_name is not None and not self.op.vg_name:
1218
      instances = self.cfg.GetAllInstancesInfo().values()
1219
      for inst in instances:
1220
        for disk in inst.disks:
1221
          if _RecursiveCheckIfLVMBased(disk):
1222
            raise errors.OpPrereqError("Cannot disable lvm storage while"
1223
                                       " lvm-based instances exist")
1224

    
1225
    node_list = self.acquired_locks[locking.LEVEL_NODE]
1226

    
1227
    # if vg_name not None, checks given volume group on all nodes
1228
    if self.op.vg_name:
1229
      vglist = self.rpc.call_vg_list(node_list)
1230
      for node in node_list:
1231
        vgstatus = utils.CheckVolumeGroupSize(vglist[node], self.op.vg_name,
1232
                                              constants.MIN_VG_SIZE)
1233
        if vgstatus:
1234
          raise errors.OpPrereqError("Error on node '%s': %s" %
1235
                                     (node, vgstatus))
1236

    
1237
    self.cluster = cluster = self.cfg.GetClusterInfo()
1238
    # beparams changes do not need validation (we can't validate?),
1239
    # but we still process here
1240
    if self.op.beparams:
1241
      self.new_beparams = cluster.FillDict(
1242
        cluster.beparams[constants.BEGR_DEFAULT], self.op.beparams)
1243

    
1244
    # hypervisor list/parameters
1245
    self.new_hvparams = cluster.FillDict(cluster.hvparams, {})
1246
    if self.op.hvparams:
1247
      if not isinstance(self.op.hvparams, dict):
1248
        raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1249
      for hv_name, hv_dict in self.op.hvparams.items():
1250
        if hv_name not in self.new_hvparams:
1251
          self.new_hvparams[hv_name] = hv_dict
1252
        else:
1253
          self.new_hvparams[hv_name].update(hv_dict)
1254

    
1255
    if self.op.enabled_hypervisors is not None:
1256
      self.hv_list = self.op.enabled_hypervisors
1257
    else:
1258
      self.hv_list = cluster.enabled_hypervisors
1259

    
1260
    if self.op.hvparams or self.op.enabled_hypervisors is not None:
1261
      # either the enabled list has changed, or the parameters have, validate
1262
      for hv_name, hv_params in self.new_hvparams.items():
1263
        if ((self.op.hvparams and hv_name in self.op.hvparams) or
1264
            (self.op.enabled_hypervisors and
1265
             hv_name in self.op.enabled_hypervisors)):
1266
          # either this is a new hypervisor, or its parameters have changed
1267
          hv_class = hypervisor.GetHypervisor(hv_name)
1268
          hv_class.CheckParameterSyntax(hv_params)
1269
          _CheckHVParams(self, node_list, hv_name, hv_params)
1270

    
1271
  def Exec(self, feedback_fn):
1272
    """Change the parameters of the cluster.
1273

1274
    """
1275
    if self.op.vg_name is not None:
1276
      if self.op.vg_name != self.cfg.GetVGName():
1277
        self.cfg.SetVGName(self.op.vg_name)
1278
      else:
1279
        feedback_fn("Cluster LVM configuration already in desired"
1280
                    " state, not changing")
1281
    if self.op.hvparams:
1282
      self.cluster.hvparams = self.new_hvparams
1283
    if self.op.enabled_hypervisors is not None:
1284
      self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1285
    if self.op.beparams:
1286
      self.cluster.beparams[constants.BEGR_DEFAULT] = self.new_beparams
1287
    self.cfg.Update(self.cluster)
1288

    
1289

    
1290
def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1291
  """Sleep and poll for an instance's disk to sync.
1292

1293
  """
1294
  if not instance.disks:
1295
    return True
1296

    
1297
  if not oneshot:
1298
    lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1299

    
1300
  node = instance.primary_node
1301

    
1302
  for dev in instance.disks:
1303
    lu.cfg.SetDiskID(dev, node)
1304

    
1305
  retries = 0
1306
  while True:
1307
    max_time = 0
1308
    done = True
1309
    cumul_degraded = False
1310
    rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1311
    if not rstats:
1312
      lu.LogWarning("Can't get any data from node %s", node)
1313
      retries += 1
1314
      if retries >= 10:
1315
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1316
                                 " aborting." % node)
1317
      time.sleep(6)
1318
      continue
1319
    retries = 0
1320
    for i in range(len(rstats)):
1321
      mstat = rstats[i]
1322
      if mstat is None:
1323
        lu.LogWarning("Can't compute data for node %s/%s",
1324
                           node, instance.disks[i].iv_name)
1325
        continue
1326
      # we ignore the ldisk parameter
1327
      perc_done, est_time, is_degraded, _ = mstat
1328
      cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1329
      if perc_done is not None:
1330
        done = False
1331
        if est_time is not None:
1332
          rem_time = "%d estimated seconds remaining" % est_time
1333
          max_time = est_time
1334
        else:
1335
          rem_time = "no time estimate"
1336
        lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1337
                        (instance.disks[i].iv_name, perc_done, rem_time))
1338
    if done or oneshot:
1339
      break
1340

    
1341
    time.sleep(min(60, max_time))
1342

    
1343
  if done:
1344
    lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1345
  return not cumul_degraded
1346

    
1347

    
1348
def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1349
  """Check that mirrors are not degraded.
1350

1351
  The ldisk parameter, if True, will change the test from the
1352
  is_degraded attribute (which represents overall non-ok status for
1353
  the device(s)) to the ldisk (representing the local storage status).
1354

1355
  """
1356
  lu.cfg.SetDiskID(dev, node)
1357
  if ldisk:
1358
    idx = 6
1359
  else:
1360
    idx = 5
1361

    
1362
  result = True
1363
  if on_primary or dev.AssembleOnSecondary():
1364
    rstats = lu.rpc.call_blockdev_find(node, dev)
1365
    if not rstats:
1366
      logging.warning("Node %s: disk degraded, not found or node down", node)
1367
      result = False
1368
    else:
1369
      result = result and (not rstats[idx])
1370
  if dev.children:
1371
    for child in dev.children:
1372
      result = result and _CheckDiskConsistency(lu, child, node, on_primary)
1373

    
1374
  return result
1375

    
1376

    
1377
class LUDiagnoseOS(NoHooksLU):
1378
  """Logical unit for OS diagnose/query.
1379

1380
  """
1381
  _OP_REQP = ["output_fields", "names"]
1382
  REQ_BGL = False
1383
  _FIELDS_STATIC = utils.FieldSet()
1384
  _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status")
1385

    
1386
  def ExpandNames(self):
1387
    if self.op.names:
1388
      raise errors.OpPrereqError("Selective OS query not supported")
1389

    
1390
    _CheckOutputFields(static=self._FIELDS_STATIC,
1391
                       dynamic=self._FIELDS_DYNAMIC,
1392
                       selected=self.op.output_fields)
1393

    
1394
    # Lock all nodes, in shared mode
1395
    self.needed_locks = {}
1396
    self.share_locks[locking.LEVEL_NODE] = 1
1397
    self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1398

    
1399
  def CheckPrereq(self):
1400
    """Check prerequisites.
1401

1402
    """
1403

    
1404
  @staticmethod
1405
  def _DiagnoseByOS(node_list, rlist):
1406
    """Remaps a per-node return list into an a per-os per-node dictionary
1407

1408
    @param node_list: a list with the names of all nodes
1409
    @param rlist: a map with node names as keys and OS objects as values
1410

1411
    @rtype: dict
1412
    @returns: a dictionary with osnames as keys and as value another map, with
1413
        nodes as keys and list of OS objects as values, eg::
1414

1415
          {"debian-etch": {"node1": [<object>,...],
1416
                           "node2": [<object>,]}
1417
          }
1418

1419
    """
1420
    all_os = {}
1421
    for node_name, nr in rlist.iteritems():
1422
      if not nr:
1423
        continue
1424
      for os_obj in nr:
1425
        if os_obj.name not in all_os:
1426
          # build a list of nodes for this os containing empty lists
1427
          # for each node in node_list
1428
          all_os[os_obj.name] = {}
1429
          for nname in node_list:
1430
            all_os[os_obj.name][nname] = []
1431
        all_os[os_obj.name][node_name].append(os_obj)
1432
    return all_os
1433

    
1434
  def Exec(self, feedback_fn):
1435
    """Compute the list of OSes.
1436

1437
    """
1438
    node_list = self.acquired_locks[locking.LEVEL_NODE]
1439
    node_data = self.rpc.call_os_diagnose(node_list)
1440
    if node_data == False:
1441
      raise errors.OpExecError("Can't gather the list of OSes")
1442
    pol = self._DiagnoseByOS(node_list, node_data)
1443
    output = []
1444
    for os_name, os_data in pol.iteritems():
1445
      row = []
1446
      for field in self.op.output_fields:
1447
        if field == "name":
1448
          val = os_name
1449
        elif field == "valid":
1450
          val = utils.all([osl and osl[0] for osl in os_data.values()])
1451
        elif field == "node_status":
1452
          val = {}
1453
          for node_name, nos_list in os_data.iteritems():
1454
            val[node_name] = [(v.status, v.path) for v in nos_list]
1455
        else:
1456
          raise errors.ParameterError(field)
1457
        row.append(val)
1458
      output.append(row)
1459

    
1460
    return output
1461

    
1462

    
1463
class LURemoveNode(LogicalUnit):
1464
  """Logical unit for removing a node.
1465

1466
  """
1467
  HPATH = "node-remove"
1468
  HTYPE = constants.HTYPE_NODE
1469
  _OP_REQP = ["node_name"]
1470

    
1471
  def BuildHooksEnv(self):
1472
    """Build hooks env.
1473

1474
    This doesn't run on the target node in the pre phase as a failed
1475
    node would then be impossible to remove.
1476

1477
    """
1478
    env = {
1479
      "OP_TARGET": self.op.node_name,
1480
      "NODE_NAME": self.op.node_name,
1481
      }
1482
    all_nodes = self.cfg.GetNodeList()
1483
    all_nodes.remove(self.op.node_name)
1484
    return env, all_nodes, all_nodes
1485

    
1486
  def CheckPrereq(self):
1487
    """Check prerequisites.
1488

1489
    This checks:
1490
     - the node exists in the configuration
1491
     - it does not have primary or secondary instances
1492
     - it's not the master
1493

1494
    Any errors are signalled by raising errors.OpPrereqError.
1495

1496
    """
1497
    node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1498
    if node is None:
1499
      raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1500

    
1501
    instance_list = self.cfg.GetInstanceList()
1502

    
1503
    masternode = self.cfg.GetMasterNode()
1504
    if node.name == masternode:
1505
      raise errors.OpPrereqError("Node is the master node,"
1506
                                 " you need to failover first.")
1507

    
1508
    for instance_name in instance_list:
1509
      instance = self.cfg.GetInstanceInfo(instance_name)
1510
      if node.name == instance.primary_node:
1511
        raise errors.OpPrereqError("Instance %s still running on the node,"
1512
                                   " please remove first." % instance_name)
1513
      if node.name in instance.secondary_nodes:
1514
        raise errors.OpPrereqError("Instance %s has node as a secondary,"
1515
                                   " please remove first." % instance_name)
1516
    self.op.node_name = node.name
1517
    self.node = node
1518

    
1519
  def Exec(self, feedback_fn):
1520
    """Removes the node from the cluster.
1521

1522
    """
1523
    node = self.node
1524
    logging.info("Stopping the node daemon and removing configs from node %s",
1525
                 node.name)
1526

    
1527
    self.context.RemoveNode(node.name)
1528

    
1529
    self.rpc.call_node_leave_cluster(node.name)
1530

    
1531

    
1532
class LUQueryNodes(NoHooksLU):
1533
  """Logical unit for querying nodes.
1534

1535
  """
1536
  _OP_REQP = ["output_fields", "names"]
1537
  REQ_BGL = False
1538
  _FIELDS_DYNAMIC = utils.FieldSet(
1539
    "dtotal", "dfree",
1540
    "mtotal", "mnode", "mfree",
1541
    "bootid",
1542
    "ctotal",
1543
    )
1544

    
1545
  _FIELDS_STATIC = utils.FieldSet(
1546
    "name", "pinst_cnt", "sinst_cnt",
1547
    "pinst_list", "sinst_list",
1548
    "pip", "sip", "tags",
1549
    "serial_no",
1550
    )
1551

    
1552
  def ExpandNames(self):
1553
    _CheckOutputFields(static=self._FIELDS_STATIC,
1554
                       dynamic=self._FIELDS_DYNAMIC,
1555
                       selected=self.op.output_fields)
1556

    
1557
    self.needed_locks = {}
1558
    self.share_locks[locking.LEVEL_NODE] = 1
1559

    
1560
    if self.op.names:
1561
      self.wanted = _GetWantedNodes(self, self.op.names)
1562
    else:
1563
      self.wanted = locking.ALL_SET
1564

    
1565
    self.do_locking = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
1566
    if self.do_locking:
1567
      # if we don't request only static fields, we need to lock the nodes
1568
      self.needed_locks[locking.LEVEL_NODE] = self.wanted
1569

    
1570

    
1571
  def CheckPrereq(self):
1572
    """Check prerequisites.
1573

1574
    """
1575
    # The validation of the node list is done in the _GetWantedNodes,
1576
    # if non empty, and if empty, there's no validation to do
1577
    pass
1578

    
1579
  def Exec(self, feedback_fn):
1580
    """Computes the list of nodes and their attributes.
1581

1582
    """
1583
    all_info = self.cfg.GetAllNodesInfo()
1584
    if self.do_locking:
1585
      nodenames = self.acquired_locks[locking.LEVEL_NODE]
1586
    elif self.wanted != locking.ALL_SET:
1587
      nodenames = self.wanted
1588
      missing = set(nodenames).difference(all_info.keys())
1589
      if missing:
1590
        raise errors.OpExecError(
1591
          "Some nodes were removed before retrieving their data: %s" % missing)
1592
    else:
1593
      nodenames = all_info.keys()
1594

    
1595
    nodenames = utils.NiceSort(nodenames)
1596
    nodelist = [all_info[name] for name in nodenames]
1597

    
1598
    # begin data gathering
1599

    
1600
    if self.do_locking:
1601
      live_data = {}
1602
      node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
1603
                                          self.cfg.GetHypervisorType())
1604
      for name in nodenames:
1605
        nodeinfo = node_data.get(name, None)
1606
        if nodeinfo:
1607
          live_data[name] = {
1608
            "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1609
            "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1610
            "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1611
            "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1612
            "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1613
            "ctotal": utils.TryConvert(int, nodeinfo['cpu_total']),
1614
            "bootid": nodeinfo['bootid'],
1615
            }
1616
        else:
1617
          live_data[name] = {}
1618
    else:
1619
      live_data = dict.fromkeys(nodenames, {})
1620

    
1621
    node_to_primary = dict([(name, set()) for name in nodenames])
1622
    node_to_secondary = dict([(name, set()) for name in nodenames])
1623

    
1624
    inst_fields = frozenset(("pinst_cnt", "pinst_list",
1625
                             "sinst_cnt", "sinst_list"))
1626
    if inst_fields & frozenset(self.op.output_fields):
1627
      instancelist = self.cfg.GetInstanceList()
1628

    
1629
      for instance_name in instancelist:
1630
        inst = self.cfg.GetInstanceInfo(instance_name)
1631
        if inst.primary_node in node_to_primary:
1632
          node_to_primary[inst.primary_node].add(inst.name)
1633
        for secnode in inst.secondary_nodes:
1634
          if secnode in node_to_secondary:
1635
            node_to_secondary[secnode].add(inst.name)
1636

    
1637
    # end data gathering
1638

    
1639
    output = []
1640
    for node in nodelist:
1641
      node_output = []
1642
      for field in self.op.output_fields:
1643
        if field == "name":
1644
          val = node.name
1645
        elif field == "pinst_list":
1646
          val = list(node_to_primary[node.name])
1647
        elif field == "sinst_list":
1648
          val = list(node_to_secondary[node.name])
1649
        elif field == "pinst_cnt":
1650
          val = len(node_to_primary[node.name])
1651
        elif field == "sinst_cnt":
1652
          val = len(node_to_secondary[node.name])
1653
        elif field == "pip":
1654
          val = node.primary_ip
1655
        elif field == "sip":
1656
          val = node.secondary_ip
1657
        elif field == "tags":
1658
          val = list(node.GetTags())
1659
        elif field == "serial_no":
1660
          val = node.serial_no
1661
        elif self._FIELDS_DYNAMIC.Matches(field):
1662
          val = live_data[node.name].get(field, None)
1663
        else:
1664
          raise errors.ParameterError(field)
1665
        node_output.append(val)
1666
      output.append(node_output)
1667

    
1668
    return output
1669

    
1670

    
1671
class LUQueryNodeVolumes(NoHooksLU):
1672
  """Logical unit for getting volumes on node(s).
1673

1674
  """
1675
  _OP_REQP = ["nodes", "output_fields"]
1676
  REQ_BGL = False
1677
  _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
1678
  _FIELDS_STATIC = utils.FieldSet("node")
1679

    
1680
  def ExpandNames(self):
1681
    _CheckOutputFields(static=self._FIELDS_STATIC,
1682
                       dynamic=self._FIELDS_DYNAMIC,
1683
                       selected=self.op.output_fields)
1684

    
1685
    self.needed_locks = {}
1686
    self.share_locks[locking.LEVEL_NODE] = 1
1687
    if not self.op.nodes:
1688
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1689
    else:
1690
      self.needed_locks[locking.LEVEL_NODE] = \
1691
        _GetWantedNodes(self, self.op.nodes)
1692

    
1693
  def CheckPrereq(self):
1694
    """Check prerequisites.
1695

1696
    This checks that the fields required are valid output fields.
1697

1698
    """
1699
    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
1700

    
1701
  def Exec(self, feedback_fn):
1702
    """Computes the list of nodes and their attributes.
1703

1704
    """
1705
    nodenames = self.nodes
1706
    volumes = self.rpc.call_node_volumes(nodenames)
1707

    
1708
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
1709
             in self.cfg.GetInstanceList()]
1710

    
1711
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1712

    
1713
    output = []
1714
    for node in nodenames:
1715
      if node not in volumes or not volumes[node]:
1716
        continue
1717

    
1718
      node_vols = volumes[node][:]
1719
      node_vols.sort(key=lambda vol: vol['dev'])
1720

    
1721
      for vol in node_vols:
1722
        node_output = []
1723
        for field in self.op.output_fields:
1724
          if field == "node":
1725
            val = node
1726
          elif field == "phys":
1727
            val = vol['dev']
1728
          elif field == "vg":
1729
            val = vol['vg']
1730
          elif field == "name":
1731
            val = vol['name']
1732
          elif field == "size":
1733
            val = int(float(vol['size']))
1734
          elif field == "instance":
1735
            for inst in ilist:
1736
              if node not in lv_by_node[inst]:
1737
                continue
1738
              if vol['name'] in lv_by_node[inst][node]:
1739
                val = inst.name
1740
                break
1741
            else:
1742
              val = '-'
1743
          else:
1744
            raise errors.ParameterError(field)
1745
          node_output.append(str(val))
1746

    
1747
        output.append(node_output)
1748

    
1749
    return output
1750

    
1751

    
1752
class LUAddNode(LogicalUnit):
1753
  """Logical unit for adding node to the cluster.
1754

1755
  """
1756
  HPATH = "node-add"
1757
  HTYPE = constants.HTYPE_NODE
1758
  _OP_REQP = ["node_name"]
1759

    
1760
  def BuildHooksEnv(self):
1761
    """Build hooks env.
1762

1763
    This will run on all nodes before, and on all nodes + the new node after.
1764

1765
    """
1766
    env = {
1767
      "OP_TARGET": self.op.node_name,
1768
      "NODE_NAME": self.op.node_name,
1769
      "NODE_PIP": self.op.primary_ip,
1770
      "NODE_SIP": self.op.secondary_ip,
1771
      }
1772
    nodes_0 = self.cfg.GetNodeList()
1773
    nodes_1 = nodes_0 + [self.op.node_name, ]
1774
    return env, nodes_0, nodes_1
1775

    
1776
  def CheckPrereq(self):
1777
    """Check prerequisites.
1778

1779
    This checks:
1780
     - the new node is not already in the config
1781
     - it is resolvable
1782
     - its parameters (single/dual homed) matches the cluster
1783

1784
    Any errors are signalled by raising errors.OpPrereqError.
1785

1786
    """
1787
    node_name = self.op.node_name
1788
    cfg = self.cfg
1789

    
1790
    dns_data = utils.HostInfo(node_name)
1791

    
1792
    node = dns_data.name
1793
    primary_ip = self.op.primary_ip = dns_data.ip
1794
    secondary_ip = getattr(self.op, "secondary_ip", None)
1795
    if secondary_ip is None:
1796
      secondary_ip = primary_ip
1797
    if not utils.IsValidIP(secondary_ip):
1798
      raise errors.OpPrereqError("Invalid secondary IP given")
1799
    self.op.secondary_ip = secondary_ip
1800

    
1801
    node_list = cfg.GetNodeList()
1802
    if not self.op.readd and node in node_list:
1803
      raise errors.OpPrereqError("Node %s is already in the configuration" %
1804
                                 node)
1805
    elif self.op.readd and node not in node_list:
1806
      raise errors.OpPrereqError("Node %s is not in the configuration" % node)
1807

    
1808
    for existing_node_name in node_list:
1809
      existing_node = cfg.GetNodeInfo(existing_node_name)
1810

    
1811
      if self.op.readd and node == existing_node_name:
1812
        if (existing_node.primary_ip != primary_ip or
1813
            existing_node.secondary_ip != secondary_ip):
1814
          raise errors.OpPrereqError("Readded node doesn't have the same IP"
1815
                                     " address configuration as before")
1816
        continue
1817

    
1818
      if (existing_node.primary_ip == primary_ip or
1819
          existing_node.secondary_ip == primary_ip or
1820
          existing_node.primary_ip == secondary_ip or
1821
          existing_node.secondary_ip == secondary_ip):
1822
        raise errors.OpPrereqError("New node ip address(es) conflict with"
1823
                                   " existing node %s" % existing_node.name)
1824

    
1825
    # check that the type of the node (single versus dual homed) is the
1826
    # same as for the master
1827
    myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
1828
    master_singlehomed = myself.secondary_ip == myself.primary_ip
1829
    newbie_singlehomed = secondary_ip == primary_ip
1830
    if master_singlehomed != newbie_singlehomed:
1831
      if master_singlehomed:
1832
        raise errors.OpPrereqError("The master has no private ip but the"
1833
                                   " new node has one")
1834
      else:
1835
        raise errors.OpPrereqError("The master has a private ip but the"
1836
                                   " new node doesn't have one")
1837

    
1838
    # checks reachablity
1839
    if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1840
      raise errors.OpPrereqError("Node not reachable by ping")
1841

    
1842
    if not newbie_singlehomed:
1843
      # check reachability from my secondary ip to newbie's secondary ip
1844
      if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1845
                           source=myself.secondary_ip):
1846
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1847
                                   " based ping to noded port")
1848

    
1849
    self.new_node = objects.Node(name=node,
1850
                                 primary_ip=primary_ip,
1851
                                 secondary_ip=secondary_ip)
1852

    
1853
  def Exec(self, feedback_fn):
1854
    """Adds the new node to the cluster.
1855

1856
    """
1857
    new_node = self.new_node
1858
    node = new_node.name
1859

    
1860
    # check connectivity
1861
    result = self.rpc.call_version([node])[node]
1862
    if result:
1863
      if constants.PROTOCOL_VERSION == result:
1864
        logging.info("Communication to node %s fine, sw version %s match",
1865
                     node, result)
1866
      else:
1867
        raise errors.OpExecError("Version mismatch master version %s,"
1868
                                 " node version %s" %
1869
                                 (constants.PROTOCOL_VERSION, result))
1870
    else:
1871
      raise errors.OpExecError("Cannot get version from the new node")
1872

    
1873
    # setup ssh on node
1874
    logging.info("Copy ssh key to node %s", node)
1875
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1876
    keyarray = []
1877
    keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1878
                constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1879
                priv_key, pub_key]
1880

    
1881
    for i in keyfiles:
1882
      f = open(i, 'r')
1883
      try:
1884
        keyarray.append(f.read())
1885
      finally:
1886
        f.close()
1887

    
1888
    result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
1889
                                    keyarray[2],
1890
                                    keyarray[3], keyarray[4], keyarray[5])
1891

    
1892
    if not result:
1893
      raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1894

    
1895
    # Add node to our /etc/hosts, and add key to known_hosts
1896
    utils.AddHostToEtcHosts(new_node.name)
1897

    
1898
    if new_node.secondary_ip != new_node.primary_ip:
1899
      if not self.rpc.call_node_has_ip_address(new_node.name,
1900
                                               new_node.secondary_ip):
1901
        raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1902
                                 " you gave (%s). Please fix and re-run this"
1903
                                 " command." % new_node.secondary_ip)
1904

    
1905
    node_verify_list = [self.cfg.GetMasterNode()]
1906
    node_verify_param = {
1907
      'nodelist': [node],
1908
      # TODO: do a node-net-test as well?
1909
    }
1910

    
1911
    result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
1912
                                       self.cfg.GetClusterName())
1913
    for verifier in node_verify_list:
1914
      if not result[verifier]:
1915
        raise errors.OpExecError("Cannot communicate with %s's node daemon"
1916
                                 " for remote verification" % verifier)
1917
      if result[verifier]['nodelist']:
1918
        for failed in result[verifier]['nodelist']:
1919
          feedback_fn("ssh/hostname verification failed %s -> %s" %
1920
                      (verifier, result[verifier]['nodelist'][failed]))
1921
        raise errors.OpExecError("ssh/hostname verification failed.")
1922

    
1923
    # Distribute updated /etc/hosts and known_hosts to all nodes,
1924
    # including the node just added
1925
    myself = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
1926
    dist_nodes = self.cfg.GetNodeList()
1927
    if not self.op.readd:
1928
      dist_nodes.append(node)
1929
    if myself.name in dist_nodes:
1930
      dist_nodes.remove(myself.name)
1931

    
1932
    logging.debug("Copying hosts and known_hosts to all nodes")
1933
    for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1934
      result = self.rpc.call_upload_file(dist_nodes, fname)
1935
      for to_node in dist_nodes:
1936
        if not result[to_node]:
1937
          logging.error("Copy of file %s to node %s failed", fname, to_node)
1938

    
1939
    to_copy = []
1940
    if constants.HT_XEN_HVM in self.cfg.GetClusterInfo().enabled_hypervisors:
1941
      to_copy.append(constants.VNC_PASSWORD_FILE)
1942
    for fname in to_copy:
1943
      result = self.rpc.call_upload_file([node], fname)
1944
      if not result[node]:
1945
        logging.error("Could not copy file %s to node %s", fname, node)
1946

    
1947
    if self.op.readd:
1948
      self.context.ReaddNode(new_node)
1949
    else:
1950
      self.context.AddNode(new_node)
1951

    
1952

    
1953
class LUQueryClusterInfo(NoHooksLU):
1954
  """Query cluster configuration.
1955

1956
  """
1957
  _OP_REQP = []
1958
  REQ_BGL = False
1959

    
1960
  def ExpandNames(self):
1961
    self.needed_locks = {}
1962

    
1963
  def CheckPrereq(self):
1964
    """No prerequsites needed for this LU.
1965

1966
    """
1967
    pass
1968

    
1969
  def Exec(self, feedback_fn):
1970
    """Return cluster config.
1971

1972
    """
1973
    cluster = self.cfg.GetClusterInfo()
1974
    result = {
1975
      "software_version": constants.RELEASE_VERSION,
1976
      "protocol_version": constants.PROTOCOL_VERSION,
1977
      "config_version": constants.CONFIG_VERSION,
1978
      "os_api_version": constants.OS_API_VERSION,
1979
      "export_version": constants.EXPORT_VERSION,
1980
      "architecture": (platform.architecture()[0], platform.machine()),
1981
      "name": cluster.cluster_name,
1982
      "master": cluster.master_node,
1983
      "default_hypervisor": cluster.default_hypervisor,
1984
      "enabled_hypervisors": cluster.enabled_hypervisors,
1985
      "hvparams": cluster.hvparams,
1986
      "beparams": cluster.beparams,
1987
      }
1988

    
1989
    return result
1990

    
1991

    
1992
class LUQueryConfigValues(NoHooksLU):
1993
  """Return configuration values.
1994

1995
  """
1996
  _OP_REQP = []
1997
  REQ_BGL = False
1998
  _FIELDS_DYNAMIC = utils.FieldSet()
1999
  _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag")
2000

    
2001
  def ExpandNames(self):
2002
    self.needed_locks = {}
2003

    
2004
    _CheckOutputFields(static=self._FIELDS_STATIC,
2005
                       dynamic=self._FIELDS_DYNAMIC,
2006
                       selected=self.op.output_fields)
2007

    
2008
  def CheckPrereq(self):
2009
    """No prerequisites.
2010

2011
    """
2012
    pass
2013

    
2014
  def Exec(self, feedback_fn):
2015
    """Dump a representation of the cluster config to the standard output.
2016

2017
    """
2018
    values = []
2019
    for field in self.op.output_fields:
2020
      if field == "cluster_name":
2021
        entry = self.cfg.GetClusterName()
2022
      elif field == "master_node":
2023
        entry = self.cfg.GetMasterNode()
2024
      elif field == "drain_flag":
2025
        entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
2026
      else:
2027
        raise errors.ParameterError(field)
2028
      values.append(entry)
2029
    return values
2030

    
2031

    
2032
class LUActivateInstanceDisks(NoHooksLU):
2033
  """Bring up an instance's disks.
2034

2035
  """
2036
  _OP_REQP = ["instance_name"]
2037
  REQ_BGL = False
2038

    
2039
  def ExpandNames(self):
2040
    self._ExpandAndLockInstance()
2041
    self.needed_locks[locking.LEVEL_NODE] = []
2042
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2043

    
2044
  def DeclareLocks(self, level):
2045
    if level == locking.LEVEL_NODE:
2046
      self._LockInstancesNodes()
2047

    
2048
  def CheckPrereq(self):
2049
    """Check prerequisites.
2050

2051
    This checks that the instance is in the cluster.
2052

2053
    """
2054
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2055
    assert self.instance is not None, \
2056
      "Cannot retrieve locked instance %s" % self.op.instance_name
2057

    
2058
  def Exec(self, feedback_fn):
2059
    """Activate the disks.
2060

2061
    """
2062
    disks_ok, disks_info = _AssembleInstanceDisks(self, self.instance)
2063
    if not disks_ok:
2064
      raise errors.OpExecError("Cannot activate block devices")
2065

    
2066
    return disks_info
2067

    
2068

    
2069
def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
2070
  """Prepare the block devices for an instance.
2071

2072
  This sets up the block devices on all nodes.
2073

2074
  @type lu: L{LogicalUnit}
2075
  @param lu: the logical unit on whose behalf we execute
2076
  @type instance: L{objects.Instance}
2077
  @param instance: the instance for whose disks we assemble
2078
  @type ignore_secondaries: boolean
2079
  @param ignore_secondaries: if true, errors on secondary nodes
2080
      won't result in an error return from the function
2081
  @return: False if the operation failed, otherwise a list of
2082
      (host, instance_visible_name, node_visible_name)
2083
      with the mapping from node devices to instance devices
2084

2085
  """
2086
  device_info = []
2087
  disks_ok = True
2088
  iname = instance.name
2089
  # With the two passes mechanism we try to reduce the window of
2090
  # opportunity for the race condition of switching DRBD to primary
2091
  # before handshaking occured, but we do not eliminate it
2092

    
2093
  # The proper fix would be to wait (with some limits) until the
2094
  # connection has been made and drbd transitions from WFConnection
2095
  # into any other network-connected state (Connected, SyncTarget,
2096
  # SyncSource, etc.)
2097

    
2098
  # 1st pass, assemble on all nodes in secondary mode
2099
  for inst_disk in instance.disks:
2100
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2101
      lu.cfg.SetDiskID(node_disk, node)
2102
      result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
2103
      if not result:
2104
        lu.proc.LogWarning("Could not prepare block device %s on node %s"
2105
                           " (is_primary=False, pass=1)",
2106
                           inst_disk.iv_name, node)
2107
        if not ignore_secondaries:
2108
          disks_ok = False
2109

    
2110
  # FIXME: race condition on drbd migration to primary
2111

    
2112
  # 2nd pass, do only the primary node
2113
  for inst_disk in instance.disks:
2114
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2115
      if node != instance.primary_node:
2116
        continue
2117
      lu.cfg.SetDiskID(node_disk, node)
2118
      result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
2119
      if not result:
2120
        lu.proc.LogWarning("Could not prepare block device %s on node %s"
2121
                           " (is_primary=True, pass=2)",
2122
                           inst_disk.iv_name, node)
2123
        disks_ok = False
2124
    device_info.append((instance.primary_node, inst_disk.iv_name, result))
2125

    
2126
  # leave the disks configured for the primary node
2127
  # this is a workaround that would be fixed better by
2128
  # improving the logical/physical id handling
2129
  for disk in instance.disks:
2130
    lu.cfg.SetDiskID(disk, instance.primary_node)
2131

    
2132
  return disks_ok, device_info
2133

    
2134

    
2135
def _StartInstanceDisks(lu, instance, force):
2136
  """Start the disks of an instance.
2137

2138
  """
2139
  disks_ok, dummy = _AssembleInstanceDisks(lu, instance,
2140
                                           ignore_secondaries=force)
2141
  if not disks_ok:
2142
    _ShutdownInstanceDisks(lu, instance)
2143
    if force is not None and not force:
2144
      lu.proc.LogWarning("", hint="If the message above refers to a"
2145
                         " secondary node,"
2146
                         " you can retry the operation using '--force'.")
2147
    raise errors.OpExecError("Disk consistency error")
2148

    
2149

    
2150
class LUDeactivateInstanceDisks(NoHooksLU):
2151
  """Shutdown an instance's disks.
2152

2153
  """
2154
  _OP_REQP = ["instance_name"]
2155
  REQ_BGL = False
2156

    
2157
  def ExpandNames(self):
2158
    self._ExpandAndLockInstance()
2159
    self.needed_locks[locking.LEVEL_NODE] = []
2160
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2161

    
2162
  def DeclareLocks(self, level):
2163
    if level == locking.LEVEL_NODE:
2164
      self._LockInstancesNodes()
2165

    
2166
  def CheckPrereq(self):
2167
    """Check prerequisites.
2168

2169
    This checks that the instance is in the cluster.
2170

2171
    """
2172
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2173
    assert self.instance is not None, \
2174
      "Cannot retrieve locked instance %s" % self.op.instance_name
2175

    
2176
  def Exec(self, feedback_fn):
2177
    """Deactivate the disks
2178

2179
    """
2180
    instance = self.instance
2181
    _SafeShutdownInstanceDisks(self, instance)
2182

    
2183

    
2184
def _SafeShutdownInstanceDisks(lu, instance):
2185
  """Shutdown block devices of an instance.
2186

2187
  This function checks if an instance is running, before calling
2188
  _ShutdownInstanceDisks.
2189

2190
  """
2191
  ins_l = lu.rpc.call_instance_list([instance.primary_node],
2192
                                      [instance.hypervisor])
2193
  ins_l = ins_l[instance.primary_node]
2194
  if not type(ins_l) is list:
2195
    raise errors.OpExecError("Can't contact node '%s'" %
2196
                             instance.primary_node)
2197

    
2198
  if instance.name in ins_l:
2199
    raise errors.OpExecError("Instance is running, can't shutdown"
2200
                             " block devices.")
2201

    
2202
  _ShutdownInstanceDisks(lu, instance)
2203

    
2204

    
2205
def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
2206
  """Shutdown block devices of an instance.
2207

2208
  This does the shutdown on all nodes of the instance.
2209

2210
  If the ignore_primary is false, errors on the primary node are
2211
  ignored.
2212

2213
  """
2214
  result = True
2215
  for disk in instance.disks:
2216
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2217
      lu.cfg.SetDiskID(top_disk, node)
2218
      if not lu.rpc.call_blockdev_shutdown(node, top_disk):
2219
        logging.error("Could not shutdown block device %s on node %s",
2220
                      disk.iv_name, node)
2221
        if not ignore_primary or node != instance.primary_node:
2222
          result = False
2223
  return result
2224

    
2225

    
2226
def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor):
2227
  """Checks if a node has enough free memory.
2228

2229
  This function check if a given node has the needed amount of free
2230
  memory. In case the node has less memory or we cannot get the
2231
  information from the node, this function raise an OpPrereqError
2232
  exception.
2233

2234
  @type lu: C{LogicalUnit}
2235
  @param lu: a logical unit from which we get configuration data
2236
  @type node: C{str}
2237
  @param node: the node to check
2238
  @type reason: C{str}
2239
  @param reason: string to use in the error message
2240
  @type requested: C{int}
2241
  @param requested: the amount of memory in MiB to check for
2242
  @type hypervisor: C{str}
2243
  @param hypervisor: the hypervisor to ask for memory stats
2244
  @raise errors.OpPrereqError: if the node doesn't have enough memory, or
2245
      we cannot check the node
2246

2247
  """
2248
  nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor)
2249
  if not nodeinfo or not isinstance(nodeinfo, dict):
2250
    raise errors.OpPrereqError("Could not contact node %s for resource"
2251
                             " information" % (node,))
2252

    
2253
  free_mem = nodeinfo[node].get('memory_free')
2254
  if not isinstance(free_mem, int):
2255
    raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2256
                             " was '%s'" % (node, free_mem))
2257
  if requested > free_mem:
2258
    raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2259
                             " needed %s MiB, available %s MiB" %
2260
                             (node, reason, requested, free_mem))
2261

    
2262

    
2263
class LUStartupInstance(LogicalUnit):
2264
  """Starts an instance.
2265

2266
  """
2267
  HPATH = "instance-start"
2268
  HTYPE = constants.HTYPE_INSTANCE
2269
  _OP_REQP = ["instance_name", "force"]
2270
  REQ_BGL = False
2271

    
2272
  def ExpandNames(self):
2273
    self._ExpandAndLockInstance()
2274

    
2275
  def BuildHooksEnv(self):
2276
    """Build hooks env.
2277

2278
    This runs on master, primary and secondary nodes of the instance.
2279

2280
    """
2281
    env = {
2282
      "FORCE": self.op.force,
2283
      }
2284
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2285
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2286
          list(self.instance.secondary_nodes))
2287
    return env, nl, nl
2288

    
2289
  def CheckPrereq(self):
2290
    """Check prerequisites.
2291

2292
    This checks that the instance is in the cluster.
2293

2294
    """
2295
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2296
    assert self.instance is not None, \
2297
      "Cannot retrieve locked instance %s" % self.op.instance_name
2298

    
2299
    bep = self.cfg.GetClusterInfo().FillBE(instance)
2300
    # check bridges existance
2301
    _CheckInstanceBridgesExist(self, instance)
2302

    
2303
    _CheckNodeFreeMemory(self, instance.primary_node,
2304
                         "starting instance %s" % instance.name,
2305
                         bep[constants.BE_MEMORY], instance.hypervisor)
2306

    
2307
  def Exec(self, feedback_fn):
2308
    """Start the instance.
2309

2310
    """
2311
    instance = self.instance
2312
    force = self.op.force
2313
    extra_args = getattr(self.op, "extra_args", "")
2314

    
2315
    self.cfg.MarkInstanceUp(instance.name)
2316

    
2317
    node_current = instance.primary_node
2318

    
2319
    _StartInstanceDisks(self, instance, force)
2320

    
2321
    if not self.rpc.call_instance_start(node_current, instance, extra_args):
2322
      _ShutdownInstanceDisks(self, instance)
2323
      raise errors.OpExecError("Could not start instance")
2324

    
2325

    
2326
class LURebootInstance(LogicalUnit):
2327
  """Reboot an instance.
2328

2329
  """
2330
  HPATH = "instance-reboot"
2331
  HTYPE = constants.HTYPE_INSTANCE
2332
  _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2333
  REQ_BGL = False
2334

    
2335
  def ExpandNames(self):
2336
    if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2337
                                   constants.INSTANCE_REBOOT_HARD,
2338
                                   constants.INSTANCE_REBOOT_FULL]:
2339
      raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2340
                                  (constants.INSTANCE_REBOOT_SOFT,
2341
                                   constants.INSTANCE_REBOOT_HARD,
2342
                                   constants.INSTANCE_REBOOT_FULL))
2343
    self._ExpandAndLockInstance()
2344

    
2345
  def BuildHooksEnv(self):
2346
    """Build hooks env.
2347

2348
    This runs on master, primary and secondary nodes of the instance.
2349

2350
    """
2351
    env = {
2352
      "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2353
      }
2354
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2355
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2356
          list(self.instance.secondary_nodes))
2357
    return env, nl, nl
2358

    
2359
  def CheckPrereq(self):
2360
    """Check prerequisites.
2361

2362
    This checks that the instance is in the cluster.
2363

2364
    """
2365
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2366
    assert self.instance is not None, \
2367
      "Cannot retrieve locked instance %s" % self.op.instance_name
2368

    
2369
    # check bridges existance
2370
    _CheckInstanceBridgesExist(self, instance)
2371

    
2372
  def Exec(self, feedback_fn):
2373
    """Reboot the instance.
2374

2375
    """
2376
    instance = self.instance
2377
    ignore_secondaries = self.op.ignore_secondaries
2378
    reboot_type = self.op.reboot_type
2379
    extra_args = getattr(self.op, "extra_args", "")
2380

    
2381
    node_current = instance.primary_node
2382

    
2383
    if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2384
                       constants.INSTANCE_REBOOT_HARD]:
2385
      if not self.rpc.call_instance_reboot(node_current, instance,
2386
                                           reboot_type, extra_args):
2387
        raise errors.OpExecError("Could not reboot instance")
2388
    else:
2389
      if not self.rpc.call_instance_shutdown(node_current, instance):
2390
        raise errors.OpExecError("could not shutdown instance for full reboot")
2391
      _ShutdownInstanceDisks(self, instance)
2392
      _StartInstanceDisks(self, instance, ignore_secondaries)
2393
      if not self.rpc.call_instance_start(node_current, instance, extra_args):
2394
        _ShutdownInstanceDisks(self, instance)
2395
        raise errors.OpExecError("Could not start instance for full reboot")
2396

    
2397
    self.cfg.MarkInstanceUp(instance.name)
2398

    
2399

    
2400
class LUShutdownInstance(LogicalUnit):
2401
  """Shutdown an instance.
2402

2403
  """
2404
  HPATH = "instance-stop"
2405
  HTYPE = constants.HTYPE_INSTANCE
2406
  _OP_REQP = ["instance_name"]
2407
  REQ_BGL = False
2408

    
2409
  def ExpandNames(self):
2410
    self._ExpandAndLockInstance()
2411

    
2412
  def BuildHooksEnv(self):
2413
    """Build hooks env.
2414

2415
    This runs on master, primary and secondary nodes of the instance.
2416

2417
    """
2418
    env = _BuildInstanceHookEnvByObject(self, self.instance)
2419
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2420
          list(self.instance.secondary_nodes))
2421
    return env, nl, nl
2422

    
2423
  def CheckPrereq(self):
2424
    """Check prerequisites.
2425

2426
    This checks that the instance is in the cluster.
2427

2428
    """
2429
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2430
    assert self.instance is not None, \
2431
      "Cannot retrieve locked instance %s" % self.op.instance_name
2432

    
2433
  def Exec(self, feedback_fn):
2434
    """Shutdown the instance.
2435

2436
    """
2437
    instance = self.instance
2438
    node_current = instance.primary_node
2439
    self.cfg.MarkInstanceDown(instance.name)
2440
    if not self.rpc.call_instance_shutdown(node_current, instance):
2441
      self.proc.LogWarning("Could not shutdown instance")
2442

    
2443
    _ShutdownInstanceDisks(self, instance)
2444

    
2445

    
2446
class LUReinstallInstance(LogicalUnit):
2447
  """Reinstall an instance.
2448

2449
  """
2450
  HPATH = "instance-reinstall"
2451
  HTYPE = constants.HTYPE_INSTANCE
2452
  _OP_REQP = ["instance_name"]
2453
  REQ_BGL = False
2454

    
2455
  def ExpandNames(self):
2456
    self._ExpandAndLockInstance()
2457

    
2458
  def BuildHooksEnv(self):
2459
    """Build hooks env.
2460

2461
    This runs on master, primary and secondary nodes of the instance.
2462

2463
    """
2464
    env = _BuildInstanceHookEnvByObject(self, self.instance)
2465
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2466
          list(self.instance.secondary_nodes))
2467
    return env, nl, nl
2468

    
2469
  def CheckPrereq(self):
2470
    """Check prerequisites.
2471

2472
    This checks that the instance is in the cluster and is not running.
2473

2474
    """
2475
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2476
    assert instance is not None, \
2477
      "Cannot retrieve locked instance %s" % self.op.instance_name
2478

    
2479
    if instance.disk_template == constants.DT_DISKLESS:
2480
      raise errors.OpPrereqError("Instance '%s' has no disks" %
2481
                                 self.op.instance_name)
2482
    if instance.status != "down":
2483
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2484
                                 self.op.instance_name)
2485
    remote_info = self.rpc.call_instance_info(instance.primary_node,
2486
                                              instance.name,
2487
                                              instance.hypervisor)
2488
    if remote_info:
2489
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2490
                                 (self.op.instance_name,
2491
                                  instance.primary_node))
2492

    
2493
    self.op.os_type = getattr(self.op, "os_type", None)
2494
    if self.op.os_type is not None:
2495
      # OS verification
2496
      pnode = self.cfg.GetNodeInfo(
2497
        self.cfg.ExpandNodeName(instance.primary_node))
2498
      if pnode is None:
2499
        raise errors.OpPrereqError("Primary node '%s' is unknown" %
2500
                                   self.op.pnode)
2501
      os_obj = self.rpc.call_os_get(pnode.name, self.op.os_type)
2502
      if not os_obj:
2503
        raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2504
                                   " primary node"  % self.op.os_type)
2505

    
2506
    self.instance = instance
2507

    
2508
  def Exec(self, feedback_fn):
2509
    """Reinstall the instance.
2510

2511
    """
2512
    inst = self.instance
2513

    
2514
    if self.op.os_type is not None:
2515
      feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2516
      inst.os = self.op.os_type
2517
      self.cfg.Update(inst)
2518

    
2519
    _StartInstanceDisks(self, inst, None)
2520
    try:
2521
      feedback_fn("Running the instance OS create scripts...")
2522
      if not self.rpc.call_instance_os_add(inst.primary_node, inst):
2523
        raise errors.OpExecError("Could not install OS for instance %s"
2524
                                 " on node %s" %
2525
                                 (inst.name, inst.primary_node))
2526
    finally:
2527
      _ShutdownInstanceDisks(self, inst)
2528

    
2529

    
2530
class LURenameInstance(LogicalUnit):
2531
  """Rename an instance.
2532

2533
  """
2534
  HPATH = "instance-rename"
2535
  HTYPE = constants.HTYPE_INSTANCE
2536
  _OP_REQP = ["instance_name", "new_name"]
2537

    
2538
  def BuildHooksEnv(self):
2539
    """Build hooks env.
2540

2541
    This runs on master, primary and secondary nodes of the instance.
2542

2543
    """
2544
    env = _BuildInstanceHookEnvByObject(self, self.instance)
2545
    env["INSTANCE_NEW_NAME"] = self.op.new_name
2546
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2547
          list(self.instance.secondary_nodes))
2548
    return env, nl, nl
2549

    
2550
  def CheckPrereq(self):
2551
    """Check prerequisites.
2552

2553
    This checks that the instance is in the cluster and is not running.
2554

2555
    """
2556
    instance = self.cfg.GetInstanceInfo(
2557
      self.cfg.ExpandInstanceName(self.op.instance_name))
2558
    if instance is None:
2559
      raise errors.OpPrereqError("Instance '%s' not known" %
2560
                                 self.op.instance_name)
2561
    if instance.status != "down":
2562
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2563
                                 self.op.instance_name)
2564
    remote_info = self.rpc.call_instance_info(instance.primary_node,
2565
                                              instance.name,
2566
                                              instance.hypervisor)
2567
    if remote_info:
2568
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2569
                                 (self.op.instance_name,
2570
                                  instance.primary_node))
2571
    self.instance = instance
2572

    
2573
    # new name verification
2574
    name_info = utils.HostInfo(self.op.new_name)
2575

    
2576
    self.op.new_name = new_name = name_info.name
2577
    instance_list = self.cfg.GetInstanceList()
2578
    if new_name in instance_list:
2579
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2580
                                 new_name)
2581

    
2582
    if not getattr(self.op, "ignore_ip", False):
2583
      if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
2584
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
2585
                                   (name_info.ip, new_name))
2586

    
2587

    
2588
  def Exec(self, feedback_fn):
2589
    """Reinstall the instance.
2590

2591
    """
2592
    inst = self.instance
2593
    old_name = inst.name
2594

    
2595
    if inst.disk_template == constants.DT_FILE:
2596
      old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2597

    
2598
    self.cfg.RenameInstance(inst.name, self.op.new_name)
2599
    # Change the instance lock. This is definitely safe while we hold the BGL
2600
    self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
2601
    self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
2602

    
2603
    # re-read the instance from the configuration after rename
2604
    inst = self.cfg.GetInstanceInfo(self.op.new_name)
2605

    
2606
    if inst.disk_template == constants.DT_FILE:
2607
      new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2608
      result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
2609
                                                     old_file_storage_dir,
2610
                                                     new_file_storage_dir)
2611

    
2612
      if not result:
2613
        raise errors.OpExecError("Could not connect to node '%s' to rename"
2614
                                 " directory '%s' to '%s' (but the instance"
2615
                                 " has been renamed in Ganeti)" % (
2616
                                 inst.primary_node, old_file_storage_dir,
2617
                                 new_file_storage_dir))
2618

    
2619
      if not result[0]:
2620
        raise errors.OpExecError("Could not rename directory '%s' to '%s'"
2621
                                 " (but the instance has been renamed in"
2622
                                 " Ganeti)" % (old_file_storage_dir,
2623
                                               new_file_storage_dir))
2624

    
2625
    _StartInstanceDisks(self, inst, None)
2626
    try:
2627
      if not self.rpc.call_instance_run_rename(inst.primary_node, inst,
2628
                                               old_name):
2629
        msg = ("Could not run OS rename script for instance %s on node %s"
2630
               " (but the instance has been renamed in Ganeti)" %
2631
               (inst.name, inst.primary_node))
2632
        self.proc.LogWarning(msg)
2633
    finally:
2634
      _ShutdownInstanceDisks(self, inst)
2635

    
2636

    
2637
class LURemoveInstance(LogicalUnit):
2638
  """Remove an instance.
2639

2640
  """
2641
  HPATH = "instance-remove"
2642
  HTYPE = constants.HTYPE_INSTANCE
2643
  _OP_REQP = ["instance_name", "ignore_failures"]
2644
  REQ_BGL = False
2645

    
2646
  def ExpandNames(self):
2647
    self._ExpandAndLockInstance()
2648
    self.needed_locks[locking.LEVEL_NODE] = []
2649
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2650

    
2651
  def DeclareLocks(self, level):
2652
    if level == locking.LEVEL_NODE:
2653
      self._LockInstancesNodes()
2654

    
2655
  def BuildHooksEnv(self):
2656
    """Build hooks env.
2657

2658
    This runs on master, primary and secondary nodes of the instance.
2659

2660
    """
2661
    env = _BuildInstanceHookEnvByObject(self, self.instance)
2662
    nl = [self.cfg.GetMasterNode()]
2663
    return env, nl, nl
2664

    
2665
  def CheckPrereq(self):
2666
    """Check prerequisites.
2667

2668
    This checks that the instance is in the cluster.
2669

2670
    """
2671
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2672
    assert self.instance is not None, \
2673
      "Cannot retrieve locked instance %s" % self.op.instance_name
2674

    
2675
  def Exec(self, feedback_fn):
2676
    """Remove the instance.
2677

2678
    """
2679
    instance = self.instance
2680
    logging.info("Shutting down instance %s on node %s",
2681
                 instance.name, instance.primary_node)
2682

    
2683
    if not self.rpc.call_instance_shutdown(instance.primary_node, instance):
2684
      if self.op.ignore_failures:
2685
        feedback_fn("Warning: can't shutdown instance")
2686
      else:
2687
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2688
                                 (instance.name, instance.primary_node))
2689

    
2690
    logging.info("Removing block devices for instance %s", instance.name)
2691

    
2692
    if not _RemoveDisks(self, instance):
2693
      if self.op.ignore_failures:
2694
        feedback_fn("Warning: can't remove instance's disks")
2695
      else:
2696
        raise errors.OpExecError("Can't remove instance's disks")
2697

    
2698
    logging.info("Removing instance %s out of cluster config", instance.name)
2699

    
2700
    self.cfg.RemoveInstance(instance.name)
2701
    self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
2702

    
2703

    
2704
class LUQueryInstances(NoHooksLU):
2705
  """Logical unit for querying instances.
2706

2707
  """
2708
  _OP_REQP = ["output_fields", "names"]
2709
  REQ_BGL = False
2710
  _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
2711
                                    "admin_state", "admin_ram",
2712
                                    "disk_template", "ip", "mac", "bridge",
2713
                                    "sda_size", "sdb_size", "vcpus", "tags",
2714
                                    "network_port", "beparams",
2715
                                    "(disk).(size)/([0-9]+)",
2716
                                    "(disk).(sizes)",
2717
                                    "(nic).(mac|ip|bridge)/([0-9]+)",
2718
                                    "(nic).(macs|ips|bridges)",
2719
                                    "(disk|nic).(count)",
2720
                                    "serial_no", "hypervisor", "hvparams",] +
2721
                                  ["hv/%s" % name
2722
                                   for name in constants.HVS_PARAMETERS] +
2723
                                  ["be/%s" % name
2724
                                   for name in constants.BES_PARAMETERS])
2725
  _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
2726

    
2727

    
2728
  def ExpandNames(self):
2729
    _CheckOutputFields(static=self._FIELDS_STATIC,
2730
                       dynamic=self._FIELDS_DYNAMIC,
2731
                       selected=self.op.output_fields)
2732

    
2733
    self.needed_locks = {}
2734
    self.share_locks[locking.LEVEL_INSTANCE] = 1
2735
    self.share_locks[locking.LEVEL_NODE] = 1
2736

    
2737
    if self.op.names:
2738
      self.wanted = _GetWantedInstances(self, self.op.names)
2739
    else:
2740
      self.wanted = locking.ALL_SET
2741

    
2742
    self.do_locking = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
2743
    if self.do_locking:
2744
      self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
2745
      self.needed_locks[locking.LEVEL_NODE] = []
2746
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2747

    
2748
  def DeclareLocks(self, level):
2749
    if level == locking.LEVEL_NODE and self.do_locking:
2750
      self._LockInstancesNodes()
2751

    
2752
  def CheckPrereq(self):
2753
    """Check prerequisites.
2754

2755
    """
2756
    pass
2757

    
2758
  def Exec(self, feedback_fn):
2759
    """Computes the list of nodes and their attributes.
2760

2761
    """
2762
    all_info = self.cfg.GetAllInstancesInfo()
2763
    if self.do_locking:
2764
      instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
2765
    elif self.wanted != locking.ALL_SET:
2766
      instance_names = self.wanted
2767
      missing = set(instance_names).difference(all_info.keys())
2768
      if missing:
2769
        raise errors.OpExecError(
2770
          "Some instances were removed before retrieving their data: %s"
2771
          % missing)
2772
    else:
2773
      instance_names = all_info.keys()
2774

    
2775
    instance_names = utils.NiceSort(instance_names)
2776
    instance_list = [all_info[iname] for iname in instance_names]
2777

    
2778
    # begin data gathering
2779

    
2780
    nodes = frozenset([inst.primary_node for inst in instance_list])
2781
    hv_list = list(set([inst.hypervisor for inst in instance_list]))
2782

    
2783
    bad_nodes = []
2784
    if self.do_locking:
2785
      live_data = {}
2786
      node_data = self.rpc.call_all_instances_info(nodes, hv_list)
2787
      for name in nodes:
2788
        result = node_data[name]
2789
        if result:
2790
          live_data.update(result)
2791
        elif result == False:
2792
          bad_nodes.append(name)
2793
        # else no instance is alive
2794
    else:
2795
      live_data = dict([(name, {}) for name in instance_names])
2796

    
2797
    # end data gathering
2798

    
2799
    HVPREFIX = "hv/"
2800
    BEPREFIX = "be/"
2801
    output = []
2802
    for instance in instance_list:
2803
      iout = []
2804
      i_hv = self.cfg.GetClusterInfo().FillHV(instance)
2805
      i_be = self.cfg.GetClusterInfo().FillBE(instance)
2806
      for field in self.op.output_fields:
2807
        st_match = self._FIELDS_STATIC.Matches(field)
2808
        if field == "name":
2809
          val = instance.name
2810
        elif field == "os":
2811
          val = instance.os
2812
        elif field == "pnode":
2813
          val = instance.primary_node
2814
        elif field == "snodes":
2815
          val = list(instance.secondary_nodes)
2816
        elif field == "admin_state":
2817
          val = (instance.status != "down")
2818
        elif field == "oper_state":
2819
          if instance.primary_node in bad_nodes:
2820
            val = None
2821
          else:
2822
            val = bool(live_data.get(instance.name))
2823
        elif field == "status":
2824
          if instance.primary_node in bad_nodes:
2825
            val = "ERROR_nodedown"
2826
          else:
2827
            running = bool(live_data.get(instance.name))
2828
            if running:
2829
              if instance.status != "down":
2830
                val = "running"
2831
              else:
2832
                val = "ERROR_up"
2833
            else:
2834
              if instance.status != "down":
2835
                val = "ERROR_down"
2836
              else:
2837
                val = "ADMIN_down"
2838
        elif field == "oper_ram":
2839
          if instance.primary_node in bad_nodes:
2840
            val = None
2841
          elif instance.name in live_data:
2842
            val = live_data[instance.name].get("memory", "?")
2843
          else:
2844
            val = "-"
2845
        elif field == "disk_template":
2846
          val = instance.disk_template
2847
        elif field == "ip":
2848
          val = instance.nics[0].ip
2849
        elif field == "bridge":
2850
          val = instance.nics[0].bridge
2851
        elif field == "mac":
2852
          val = instance.nics[0].mac
2853
        elif field == "sda_size" or field == "sdb_size":
2854
          idx = ord(field[2]) - ord('a')
2855
          try:
2856
            val = instance.FindDisk(idx).size
2857
          except errors.OpPrereqError:
2858
            val = None
2859
        elif field == "tags":
2860
          val = list(instance.GetTags())
2861
        elif field == "serial_no":
2862
          val = instance.serial_no
2863
        elif field == "network_port":
2864
          val = instance.network_port
2865
        elif field == "hypervisor":
2866
          val = instance.hypervisor
2867
        elif field == "hvparams":
2868
          val = i_hv
2869
        elif (field.startswith(HVPREFIX) and
2870
              field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
2871
          val = i_hv.get(field[len(HVPREFIX):], None)
2872
        elif field == "beparams":
2873
          val = i_be
2874
        elif (field.startswith(BEPREFIX) and
2875
              field[len(BEPREFIX):] in constants.BES_PARAMETERS):
2876
          val = i_be.get(field[len(BEPREFIX):], None)
2877
        elif st_match and st_match.groups():
2878
          # matches a variable list
2879
          st_groups = st_match.groups()
2880
          if st_groups and st_groups[0] == "disk":
2881
            if st_groups[1] == "count":
2882
              val = len(instance.disks)
2883
            elif st_groups[1] == "sizes":
2884
              val = [disk.size for disk in instance.disks]
2885
            elif st_groups[1] == "size":
2886
              try:
2887
                val = instance.FindDisk(st_groups[2]).size
2888
              except errors.OpPrereqError:
2889
                val = None
2890
            else:
2891
              assert False, "Unhandled disk parameter"
2892
          elif st_groups[0] == "nic":
2893
            if st_groups[1] == "count":
2894
              val = len(instance.nics)
2895
            elif st_groups[1] == "macs":
2896
              val = [nic.mac for nic in instance.nics]
2897
            elif st_groups[1] == "ips":
2898
              val = [nic.ip for nic in instance.nics]
2899
            elif st_groups[1] == "bridges":
2900
              val = [nic.bridge for nic in instance.nics]
2901
            else:
2902
              # index-based item
2903
              nic_idx = int(st_groups[2])
2904
              if nic_idx >= len(instance.nics):
2905
                val = None
2906
              else:
2907
                if st_groups[1] == "mac":
2908
                  val = instance.nics[nic_idx].mac
2909
                elif st_groups[1] == "ip":
2910
                  val = instance.nics[nic_idx].ip
2911
                elif st_groups[1] == "bridge":
2912
                  val = instance.nics[nic_idx].bridge
2913
                else:
2914
                  assert False, "Unhandled NIC parameter"
2915
          else:
2916
            assert False, "Unhandled variable parameter"
2917
        else:
2918
          raise errors.ParameterError(field)
2919
        iout.append(val)
2920
      output.append(iout)
2921

    
2922
    return output
2923

    
2924

    
2925
class LUFailoverInstance(LogicalUnit):
2926
  """Failover an instance.
2927

2928
  """
2929
  HPATH = "instance-failover"
2930
  HTYPE = constants.HTYPE_INSTANCE
2931
  _OP_REQP = ["instance_name", "ignore_consistency"]
2932
  REQ_BGL = False
2933

    
2934
  def ExpandNames(self):
2935
    self._ExpandAndLockInstance()
2936
    self.needed_locks[locking.LEVEL_NODE] = []
2937
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2938

    
2939
  def DeclareLocks(self, level):
2940
    if level == locking.LEVEL_NODE:
2941
      self._LockInstancesNodes()
2942

    
2943
  def BuildHooksEnv(self):
2944
    """Build hooks env.
2945

2946
    This runs on master, primary and secondary nodes of the instance.
2947

2948
    """
2949
    env = {
2950
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2951
      }
2952
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2953
    nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
2954
    return env, nl, nl
2955

    
2956
  def CheckPrereq(self):
2957
    """Check prerequisites.
2958

2959
    This checks that the instance is in the cluster.
2960

2961
    """
2962
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2963
    assert self.instance is not None, \
2964
      "Cannot retrieve locked instance %s" % self.op.instance_name
2965

    
2966
    bep = self.cfg.GetClusterInfo().FillBE(instance)
2967
    if instance.disk_template not in constants.DTS_NET_MIRROR:
2968
      raise errors.OpPrereqError("Instance's disk layout is not"
2969
                                 " network mirrored, cannot failover.")
2970

    
2971
    secondary_nodes = instance.secondary_nodes
2972
    if not secondary_nodes:
2973
      raise errors.ProgrammerError("no secondary node but using "
2974
                                   "a mirrored disk template")
2975

    
2976
    target_node = secondary_nodes[0]
2977
    # check memory requirements on the secondary node
2978
    _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
2979
                         instance.name, bep[constants.BE_MEMORY],
2980
                         instance.hypervisor)
2981

    
2982
    # check bridge existance
2983
    brlist = [nic.bridge for nic in instance.nics]
2984
    if not self.rpc.call_bridges_exist(target_node, brlist):
2985
      raise errors.OpPrereqError("One or more target bridges %s does not"
2986
                                 " exist on destination node '%s'" %
2987
                                 (brlist, target_node))
2988

    
2989
  def Exec(self, feedback_fn):
2990
    """Failover an instance.
2991

2992
    The failover is done by shutting it down on its present node and
2993
    starting it on the secondary.
2994

2995
    """
2996
    instance = self.instance
2997

    
2998
    source_node = instance.primary_node
2999
    target_node = instance.secondary_nodes[0]
3000

    
3001
    feedback_fn("* checking disk consistency between source and target")
3002
    for dev in instance.disks:
3003
      # for drbd, these are drbd over lvm
3004
      if not _CheckDiskConsistency(self, dev, target_node, False):
3005
        if instance.status == "up" and not self.op.ignore_consistency:
3006
          raise errors.OpExecError("Disk %s is degraded on target node,"
3007
                                   " aborting failover." % dev.iv_name)
3008

    
3009
    feedback_fn("* shutting down instance on source node")
3010
    logging.info("Shutting down instance %s on node %s",
3011
                 instance.name, source_node)
3012

    
3013
    if not self.rpc.call_instance_shutdown(source_node, instance):
3014
      if self.op.ignore_consistency:
3015
        self.proc.LogWarning("Could not shutdown instance %s on node %s."
3016
                             " Proceeding"
3017
                             " anyway. Please make sure node %s is down",
3018
                             instance.name, source_node, source_node)
3019
      else:
3020
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
3021
                                 (instance.name, source_node))
3022

    
3023
    feedback_fn("* deactivating the instance's disks on source node")
3024
    if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
3025
      raise errors.OpExecError("Can't shut down the instance's disks.")
3026

    
3027
    instance.primary_node = target_node
3028
    # distribute new instance config to the other nodes
3029
    self.cfg.Update(instance)
3030

    
3031
    # Only start the instance if it's marked as up
3032
    if instance.status == "up":
3033
      feedback_fn("* activating the instance's disks on target node")
3034
      logging.info("Starting instance %s on node %s",
3035
                   instance.name, target_node)
3036

    
3037
      disks_ok, dummy = _AssembleInstanceDisks(self, instance,
3038
                                               ignore_secondaries=True)
3039
      if not disks_ok:
3040
        _ShutdownInstanceDisks(self, instance)
3041
        raise errors.OpExecError("Can't activate the instance's disks")
3042

    
3043
      feedback_fn("* starting the instance on the target node")
3044
      if not self.rpc.call_instance_start(target_node, instance, None):
3045
        _ShutdownInstanceDisks(self, instance)
3046
        raise errors.OpExecError("Could not start instance %s on node %s." %
3047
                                 (instance.name, target_node))
3048

    
3049

    
3050
def _CreateBlockDevOnPrimary(lu, node, instance, device, info):
3051
  """Create a tree of block devices on the primary node.
3052

3053
  This always creates all devices.
3054

3055
  """
3056
  if device.children:
3057
    for child in device.children:
3058
      if not _CreateBlockDevOnPrimary(lu, node, instance, child, info):
3059
        return False
3060

    
3061
  lu.cfg.SetDiskID(device, node)
3062
  new_id = lu.rpc.call_blockdev_create(node, device, device.size,
3063
                                       instance.name, True, info)
3064
  if not new_id:
3065
    return False
3066
  if device.physical_id is None:
3067
    device.physical_id = new_id
3068
  return True
3069

    
3070

    
3071
def _CreateBlockDevOnSecondary(lu, node, instance, device, force, info):
3072
  """Create a tree of block devices on a secondary node.
3073

3074
  If this device type has to be created on secondaries, create it and
3075
  all its children.
3076

3077
  If not, just recurse to children keeping the same 'force' value.
3078

3079
  """
3080
  if device.CreateOnSecondary():
3081
    force = True
3082
  if device.children:
3083
    for child in device.children:
3084
      if not _CreateBlockDevOnSecondary(lu, node, instance,
3085
                                        child, force, info):
3086
        return False
3087

    
3088
  if not force:
3089
    return True
3090
  lu.cfg.SetDiskID(device, node)
3091
  new_id = lu.rpc.call_blockdev_create(node, device, device.size,
3092
                                       instance.name, False, info)
3093
  if not new_id:
3094
    return False
3095
  if device.physical_id is None:
3096
    device.physical_id = new_id
3097
  return True
3098

    
3099

    
3100
def _GenerateUniqueNames(lu, exts):
3101
  """Generate a suitable LV name.
3102

3103
  This will generate a logical volume name for the given instance.
3104

3105
  """
3106
  results = []
3107
  for val in exts:
3108
    new_id = lu.cfg.GenerateUniqueID()
3109
    results.append("%s%s" % (new_id, val))
3110
  return results
3111

    
3112

    
3113
def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
3114
                         p_minor, s_minor):
3115
  """Generate a drbd8 device complete with its children.
3116

3117
  """
3118
  port = lu.cfg.AllocatePort()
3119
  vgname = lu.cfg.GetVGName()
3120
  shared_secret = lu.cfg.GenerateDRBDSecret()
3121
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3122
                          logical_id=(vgname, names[0]))
3123
  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3124
                          logical_id=(vgname, names[1]))
3125
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
3126
                          logical_id=(primary, secondary, port,
3127
                                      p_minor, s_minor,
3128
                                      shared_secret),
3129
                          children=[dev_data, dev_meta],
3130
                          iv_name=iv_name)
3131
  return drbd_dev
3132

    
3133

    
3134
def _GenerateDiskTemplate(lu, template_name,
3135
                          instance_name, primary_node,
3136
                          secondary_nodes, disk_info,
3137
                          file_storage_dir, file_driver,
3138
                          base_index):
3139
  """Generate the entire disk layout for a given template type.
3140

3141
  """
3142
  #TODO: compute space requirements
3143

    
3144
  vgname = lu.cfg.GetVGName()
3145
  disk_count = len(disk_info)
3146
  disks = []
3147
  if template_name == constants.DT_DISKLESS:
3148
    pass
3149
  elif template_name == constants.DT_PLAIN:
3150
    if len(secondary_nodes) != 0:
3151
      raise errors.ProgrammerError("Wrong template configuration")
3152

    
3153
    names = _GenerateUniqueNames(lu, [".disk%d" % i
3154
                                      for i in range(disk_count)])
3155
    for idx, disk in enumerate(disk_info):
3156
      disk_index = idx + base_index
3157
      disk_dev = objects.Disk(dev_type=constants.LD_LV, size=disk["size"],
3158
                              logical_id=(vgname, names[idx]),
3159
                              iv_name="disk/%d" % disk_index)
3160
      disks.append(disk_dev)
3161
  elif template_name == constants.DT_DRBD8:
3162
    if len(secondary_nodes) != 1:
3163
      raise errors.ProgrammerError("Wrong template configuration")
3164
    remote_node = secondary_nodes[0]
3165
    minors = lu.cfg.AllocateDRBDMinor(
3166
      [primary_node, remote_node] * len(disk_info), instance_name)
3167

    
3168
    names = _GenerateUniqueNames(lu,
3169
                                 [".disk%d_%s" % (i, s)
3170
                                  for i in range(disk_count)
3171
                                  for s in ("data", "meta")
3172
                                  ])
3173
    for idx, disk in enumerate(disk_info):
3174
      disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
3175
                                      disk["size"], names[idx*2:idx*2+2],
3176
                                      "disk/%d" % disk_index,
3177
                                      minors[idx*2], minors[idx*2+1])
3178
      disks.append(disk_dev)
3179
  elif template_name == constants.DT_FILE:
3180
    if len(secondary_nodes) != 0:
3181
      raise errors.ProgrammerError("Wrong template configuration")
3182

    
3183
    for idx, disk in enumerate(disk_info):
3184

    
3185
      disk_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk["size"],
3186
                              iv_name="disk/%d" % disk_index,
3187
                              logical_id=(file_driver,
3188
                                          "%s/disk%d" % (file_storage_dir,
3189
                                                         idx)))
3190
      disks.append(disk_dev)
3191
  else:
3192
    raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
3193
  return disks
3194

    
3195

    
3196
def _GetInstanceInfoText(instance):
3197
  """Compute that text that should be added to the disk's metadata.
3198

3199
  """
3200
  return "originstname+%s" % instance.name
3201

    
3202

    
3203
def _CreateDisks(lu, instance):
3204
  """Create all disks for an instance.
3205

3206
  This abstracts away some work from AddInstance.
3207

3208
  @type lu: L{LogicalUnit}
3209
  @param lu: the logical unit on whose behalf we execute
3210
  @type instance: L{objects.Instance}
3211
  @param instance: the instance whose disks we should create
3212
  @rtype: boolean
3213
  @return: the success of the creation
3214

3215
  """
3216
  info = _GetInstanceInfoText(instance)
3217

    
3218
  if instance.disk_template == constants.DT_FILE:
3219
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3220
    result = lu.rpc.call_file_storage_dir_create(instance.primary_node,
3221
                                                 file_storage_dir)
3222

    
3223
    if not result:
3224
      logging.error("Could not connect to node '%s'", instance.primary_node)
3225
      return False
3226

    
3227
    if not result[0]:
3228
      logging.error("Failed to create directory '%s'", file_storage_dir)
3229
      return False
3230

    
3231
  for device in instance.disks:
3232
    logging.info("Creating volume %s for instance %s",
3233
                 device.iv_name, instance.name)
3234
    #HARDCODE
3235
    for secondary_node in instance.secondary_nodes:
3236
      if not _CreateBlockDevOnSecondary(lu, secondary_node, instance,
3237
                                        device, False, info):
3238
        logging.error("Failed to create volume %s (%s) on secondary node %s!",
3239
                      device.iv_name, device, secondary_node)
3240
        return False
3241
    #HARDCODE
3242
    if not _CreateBlockDevOnPrimary(lu, instance.primary_node,
3243
                                    instance, device, info):
3244
      logging.error("Failed to create volume %s on primary!", device.iv_name)
3245
      return False
3246

    
3247
  return True
3248

    
3249

    
3250
def _RemoveDisks(lu, instance):
3251
  """Remove all disks for an instance.
3252

3253
  This abstracts away some work from `AddInstance()` and
3254
  `RemoveInstance()`. Note that in case some of the devices couldn't
3255
  be removed, the removal will continue with the other ones (compare
3256
  with `_CreateDisks()`).
3257

3258
  @type lu: L{LogicalUnit}
3259
  @param lu: the logical unit on whose behalf we execute
3260
  @type instance: L{objects.Instance}
3261
  @param instance: the instance whose disks we should remove
3262
  @rtype: boolean
3263
  @return: the success of the removal
3264

3265
  """
3266
  logging.info("Removing block devices for instance %s", instance.name)
3267

    
3268
  result = True
3269
  for device in instance.disks:
3270
    for node, disk in device.ComputeNodeTree(instance.primary_node):
3271
      lu.cfg.SetDiskID(disk, node)
3272
      if not lu.rpc.call_blockdev_remove(node, disk):
3273
        lu.proc.LogWarning("Could not remove block device %s on node %s,"
3274
                           " continuing anyway", device.iv_name, node)
3275
        result = False
3276

    
3277
  if instance.disk_template == constants.DT_FILE:
3278
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3279
    if not lu.rpc.call_file_storage_dir_remove(instance.primary_node,
3280
                                               file_storage_dir):
3281
      logging.error("Could not remove directory '%s'", file_storage_dir)
3282
      result = False
3283

    
3284
  return result
3285

    
3286

    
3287
def _ComputeDiskSize(disk_template, disks):
3288
  """Compute disk size requirements in the volume group
3289

3290
  """
3291
  # Required free disk space as a function of disk and swap space
3292
  req_size_dict = {
3293
    constants.DT_DISKLESS: None,
3294
    constants.DT_PLAIN: sum(d["size"] for d in disks),
3295
    # 128 MB are added for drbd metadata for each disk
3296
    constants.DT_DRBD8: sum(d["size"] + 128 for d in disks),
3297
    constants.DT_FILE: None,
3298
  }
3299

    
3300
  if disk_template not in req_size_dict:
3301
    raise errors.ProgrammerError("Disk template '%s' size requirement"
3302
                                 " is unknown" %  disk_template)
3303

    
3304
  return req_size_dict[disk_template]
3305

    
3306

    
3307
def _CheckHVParams(lu, nodenames, hvname, hvparams):
3308
  """Hypervisor parameter validation.
3309

3310
  This function abstract the hypervisor parameter validation to be
3311
  used in both instance create and instance modify.
3312

3313
  @type lu: L{LogicalUnit}
3314
  @param lu: the logical unit for which we check
3315
  @type nodenames: list
3316
  @param nodenames: the list of nodes on which we should check
3317
  @type hvname: string
3318
  @param hvname: the name of the hypervisor we should use
3319
  @type hvparams: dict
3320
  @param hvparams: the parameters which we need to check
3321
  @raise errors.OpPrereqError: if the parameters are not valid
3322

3323
  """
3324
  hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
3325
                                                  hvname,
3326
                                                  hvparams)
3327
  for node in nodenames:
3328
    info = hvinfo.get(node, None)
3329
    if not info or not isinstance(info, (tuple, list)):
3330
      raise errors.OpPrereqError("Cannot get current information"
3331
                                 " from node '%s' (%s)" % (node, info))
3332
    if not info[0]:
3333
      raise errors.OpPrereqError("Hypervisor parameter validation failed:"
3334
                                 " %s" % info[1])
3335

    
3336

    
3337
class LUCreateInstance(LogicalUnit):
3338
  """Create an instance.
3339

3340
  """
3341
  HPATH = "instance-add"
3342
  HTYPE = constants.HTYPE_INSTANCE
3343
  _OP_REQP = ["instance_name", "disks", "disk_template",
3344
              "mode", "start",
3345
              "wait_for_sync", "ip_check", "nics",
3346
              "hvparams", "beparams"]
3347
  REQ_BGL = False
3348

    
3349
  def _ExpandNode(self, node):
3350
    """Expands and checks one node name.
3351

3352
    """
3353
    node_full = self.cfg.ExpandNodeName(node)
3354
    if node_full is None:
3355
      raise errors.OpPrereqError("Unknown node %s" % node)
3356
    return node_full
3357

    
3358
  def ExpandNames(self):
3359
    """ExpandNames for CreateInstance.
3360

3361
    Figure out the right locks for instance creation.
3362

3363
    """
3364
    self.needed_locks = {}
3365

    
3366
    # set optional parameters to none if they don't exist
3367
    for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
3368
      if not hasattr(self.op, attr):
3369
        setattr(self.op, attr, None)
3370

    
3371
    # cheap checks, mostly valid constants given
3372

    
3373
    # verify creation mode
3374
    if self.op.mode not in (constants.INSTANCE_CREATE,
3375
                            constants.INSTANCE_IMPORT):
3376
      raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
3377
                                 self.op.mode)
3378

    
3379
    # disk template and mirror node verification
3380
    if self.op.disk_template not in constants.DISK_TEMPLATES:
3381
      raise errors.OpPrereqError("Invalid disk template name")
3382

    
3383
    if self.op.hypervisor is None:
3384
      self.op.hypervisor = self.cfg.GetHypervisorType()
3385

    
3386
    cluster = self.cfg.GetClusterInfo()
3387
    enabled_hvs = cluster.enabled_hypervisors
3388
    if self.op.hypervisor not in enabled_hvs:
3389
      raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
3390
                                 " cluster (%s)" % (self.op.hypervisor,
3391
                                  ",".join(enabled_hvs)))
3392

    
3393
    # check hypervisor parameter syntax (locally)
3394

    
3395
    filled_hvp = cluster.FillDict(cluster.hvparams[self.op.hypervisor],
3396
                                  self.op.hvparams)
3397
    hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
3398
    hv_type.CheckParameterSyntax(filled_hvp)
3399

    
3400
    # fill and remember the beparams dict
3401
    self.be_full = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
3402
                                    self.op.beparams)
3403

    
3404
    #### instance parameters check
3405

    
3406
    # instance name verification
3407
    hostname1 = utils.HostInfo(self.op.instance_name)
3408
    self.op.instance_name = instance_name = hostname1.name
3409

    
3410
    # this is just a preventive check, but someone might still add this
3411
    # instance in the meantime, and creation will fail at lock-add time
3412
    if instance_name in self.cfg.GetInstanceList():
3413
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3414
                                 instance_name)
3415

    
3416
    self.add_locks[locking.LEVEL_INSTANCE] = instance_name
3417

    
3418
    # NIC buildup
3419
    self.nics = []
3420
    for nic in self.op.nics:
3421
      # ip validity checks
3422
      ip = nic.get("ip", None)
3423
      if ip is None or ip.lower() == "none":
3424
        nic_ip = None
3425
      elif ip.lower() == constants.VALUE_AUTO:
3426
        nic_ip = hostname1.ip
3427
      else:
3428
        if not utils.IsValidIP(ip):
3429
          raise errors.OpPrereqError("Given IP address '%s' doesn't look"
3430
                                     " like a valid IP" % ip)
3431
        nic_ip = ip
3432

    
3433
      # MAC address verification
3434
      mac = nic.get("mac", constants.VALUE_AUTO)
3435
      if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
3436
        if not utils.IsValidMac(mac.lower()):
3437
          raise errors.OpPrereqError("Invalid MAC address specified: %s" %
3438
                                     mac)
3439
      # bridge verification
3440
      bridge = nic.get("bridge", self.cfg.GetDefBridge())
3441
      self.nics.append(objects.NIC(mac=mac, ip=nic_ip, bridge=bridge))
3442

    
3443
    # disk checks/pre-build
3444
    self.disks = []
3445
    for disk in self.op.disks:
3446
      mode = disk.get("mode", constants.DISK_RDWR)
3447
      if mode not in constants.DISK_ACCESS_SET:
3448
        raise errors.OpPrereqError("Invalid disk access mode '%s'" %
3449
                                   mode)
3450
      size = disk.get("size", None)
3451
      if size is None:
3452
        raise errors.OpPrereqError("Missing disk size")
3453
      try:
3454
        size = int(size)
3455
      except ValueError:
3456
        raise errors.OpPrereqError("Invalid disk size '%s'" % size)
3457
      self.disks.append({"size": size, "mode": mode})
3458

    
3459
    # used in CheckPrereq for ip ping check
3460
    self.check_ip = hostname1.ip
3461

    
3462
    # file storage checks
3463
    if (self.op.file_driver and
3464
        not self.op.file_driver in constants.FILE_DRIVER):
3465
      raise errors.OpPrereqError("Invalid file driver name '%s'" %
3466
                                 self.op.file_driver)
3467

    
3468
    if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
3469
      raise errors.OpPrereqError("File storage directory path not absolute")
3470

    
3471
    ### Node/iallocator related checks
3472
    if [self.op.iallocator, self.op.pnode].count(None) != 1:
3473
      raise errors.OpPrereqError("One and only one of iallocator and primary"
3474
                                 " node must be given")
3475

    
3476
    if self.op.iallocator:
3477
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3478
    else:
3479
      self.op.pnode = self._ExpandNode(self.op.pnode)
3480
      nodelist = [self.op.pnode]
3481
      if self.op.snode is not None:
3482
        self.op.snode = self._ExpandNode(self.op.snode)
3483
        nodelist.append(self.op.snode)
3484
      self.needed_locks[locking.LEVEL_NODE] = nodelist
3485

    
3486
    # in case of import lock the source node too
3487
    if self.op.mode == constants.INSTANCE_IMPORT:
3488
      src_node = getattr(self.op, "src_node", None)
3489
      src_path = getattr(self.op, "src_path", None)
3490

    
3491
      if src_node is None or src_path is None:
3492
        raise errors.OpPrereqError("Importing an instance requires source"
3493
                                   " node and path options")
3494

    
3495
      if not os.path.isabs(src_path):
3496
        raise errors.OpPrereqError("The source path must be absolute")
3497

    
3498
      self.op.src_node = src_node = self._ExpandNode(src_node)
3499
      if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
3500
        self.needed_locks[locking.LEVEL_NODE].append(src_node)
3501

    
3502
    else: # INSTANCE_CREATE
3503
      if getattr(self.op, "os_type", None) is None:
3504
        raise errors.OpPrereqError("No guest OS specified")
3505

    
3506
  def _RunAllocator(self):
3507
    """Run the allocator based on input opcode.
3508

3509
    """
3510
    nics = [n.ToDict() for n in self.nics]
3511
    ial = IAllocator(self,
3512
                     mode=constants.IALLOCATOR_MODE_ALLOC,
3513
                     name=self.op.instance_name,
3514
                     disk_template=self.op.disk_template,
3515
                     tags=[],
3516
                     os=self.op.os_type,
3517
                     vcpus=self.be_full[constants.BE_VCPUS],
3518
                     mem_size=self.be_full[constants.BE_MEMORY],
3519
                     disks=self.disks,
3520
                     nics=nics,
3521
                     hypervisor=self.op.hypervisor,
3522
                     )
3523

    
3524
    ial.Run(self.op.iallocator)
3525

    
3526
    if not ial.success:
3527
      raise errors.OpPrereqError("Can't compute nodes using"
3528
                                 " iallocator '%s': %s" % (self.op.iallocator,
3529
                                                           ial.info))
3530
    if len(ial.nodes) != ial.required_nodes:
3531
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3532
                                 " of nodes (%s), required %s" %
3533
                                 (self.op.iallocator, len(ial.nodes),
3534
                                  ial.required_nodes))
3535
    self.op.pnode = ial.nodes[0]
3536
    self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
3537
                 self.op.instance_name, self.op.iallocator,
3538
                 ", ".join(ial.nodes))
3539
    if ial.required_nodes == 2:
3540
      self.op.snode = ial.nodes[1]
3541

    
3542
  def BuildHooksEnv(self):
3543
    """Build hooks env.
3544

3545
    This runs on master, primary and secondary nodes of the instance.
3546

3547
    """
3548
    env = {
3549
      "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
3550
      "INSTANCE_DISK_SIZE": ",".join(str(d["size"]) for d in self.disks),
3551
      "INSTANCE_ADD_MODE": self.op.mode,
3552
      }
3553
    if self.op.mode == constants.INSTANCE_IMPORT:
3554
      env["INSTANCE_SRC_NODE"] = self.op.src_node
3555
      env["INSTANCE_SRC_PATH"] = self.op.src_path
3556
      env["INSTANCE_SRC_IMAGES"] = self.src_images
3557

    
3558
    env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
3559
      primary_node=self.op.pnode,
3560
      secondary_nodes=self.secondaries,
3561
      status=self.instance_status,
3562
      os_type=self.op.os_type,
3563
      memory=self.be_full[constants.BE_MEMORY],
3564
      vcpus=self.be_full[constants.BE_VCPUS],
3565
      nics=[(n.ip, n.bridge, n.mac) for n in self.nics],
3566
    ))
3567

    
3568
    nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
3569
          self.secondaries)
3570
    return env, nl, nl
3571

    
3572

    
3573
  def CheckPrereq(self):
3574
    """Check prerequisites.
3575

3576
    """
3577
    if (not self.cfg.GetVGName() and
3578
        self.op.disk_template not in constants.DTS_NOT_LVM):
3579
      raise errors.OpPrereqError("Cluster does not support lvm-based"
3580
                                 " instances")
3581

    
3582

    
3583
    if self.op.mode == constants.INSTANCE_IMPORT:
3584
      src_node = self.op.src_node
3585
      src_path = self.op.src_path
3586

    
3587
      export_info = self.rpc.call_export_info(src_node, src_path)
3588

    
3589
      if not export_info:
3590
        raise errors.OpPrereqError("No export found in dir %s" % src_path)
3591

    
3592
      if not export_info.has_section(constants.INISECT_EXP):
3593
        raise errors.ProgrammerError("Corrupted export config")
3594

    
3595
      ei_version = export_info.get(constants.INISECT_EXP, 'version')
3596
      if (int(ei_version) != constants.EXPORT_VERSION):
3597
        raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3598
                                   (ei_version, constants.EXPORT_VERSION))
3599

    
3600
      # Check that the new instance doesn't have less disks than the export
3601
      instance_disks = len(self.disks)
3602
      export_disks = export_info.getint(constants.INISECT_INS, 'disk_count')
3603
      if instance_disks < export_disks:
3604
        raise errors.OpPrereqError("Not enough disks to import."
3605
                                   " (instance: %d, export: %d)" %
3606
                                   (2, export_disks))
3607

    
3608
      self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3609
      disk_images = []
3610
      for idx in range(export_disks):
3611
        option = 'disk%d_dump' % idx
3612
        if export_info.has_option(constants.INISECT_INS, option):
3613
          # FIXME: are the old os-es, disk sizes, etc. useful?
3614
          export_name = export_info.get(constants.INISECT_INS, option)
3615
          image = os.path.join(src_path, export_name)
3616
          disk_images.append(image)
3617
        else:
3618
          disk_images.append(False)
3619

    
3620
      self.src_images = disk_images
3621

    
3622
      old_name = export_info.get(constants.INISECT_INS, 'name')
3623
      # FIXME: int() here could throw a ValueError on broken exports
3624
      exp_nic_count = int(export_info.get(constants.INISECT_INS, 'nic_count'))
3625
      if self.op.instance_name == old_name:
3626
        for idx, nic in enumerate(self.nics):
3627
          if nic.mac == constants.VALUE_AUTO and exp_nic_count >= idx:
3628
            nic_mac_ini = 'nic%d_mac' % idx
3629
            nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini)
3630

    
3631
    # ip ping checks (we use the same ip that was resolved in ExpandNames)
3632
    if self.op.start and not self.op.ip_check:
3633
      raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3634
                                 " adding an instance in start mode")
3635

    
3636
    if self.op.ip_check:
3637
      if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
3638
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
3639
                                   (self.check_ip, self.op.instance_name))
3640

    
3641
    #### allocator run
3642

    
3643
    if self.op.iallocator is not None:
3644
      self._RunAllocator()
3645

    
3646
    #### node related checks
3647

    
3648
    # check primary node
3649
    self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
3650
    assert self.pnode is not None, \
3651
      "Cannot retrieve locked node %s" % self.op.pnode
3652
    self.secondaries = []
3653

    
3654
    # mirror node verification
3655
    if self.op.disk_template in constants.DTS_NET_MIRROR:
3656
      if self.op.snode is None:
3657
        raise errors.OpPrereqError("The networked disk templates need"
3658
                                   " a mirror node")
3659
      if self.op.snode == pnode.name:
3660
        raise errors.OpPrereqError("The secondary node cannot be"
3661
                                   " the primary node.")
3662
      self.secondaries.append(self.op.snode)
3663

    
3664
    nodenames = [pnode.name] + self.secondaries
3665

    
3666
    req_size = _ComputeDiskSize(self.op.disk_template,
3667
                                self.disks)
3668

    
3669
    # Check lv size requirements
3670
    if req_size is not None:
3671
      nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
3672
                                         self.op.hypervisor)
3673
      for node in nodenames:
3674
        info = nodeinfo.get(node, None)
3675
        if not info:
3676
          raise errors.OpPrereqError("Cannot get current information"
3677
                                     " from node '%s'" % node)
3678
        vg_free = info.get('vg_free', None)
3679
        if not isinstance(vg_free, int):
3680
          raise errors.OpPrereqError("Can't compute free disk space on"
3681
                                     " node %s" % node)
3682
        if req_size > info['vg_free']:
3683
          raise errors.OpPrereqError("Not enough disk space on target node %s."
3684
                                     " %d MB available, %d MB required" %
3685
                                     (node, info['vg_free'], req_size))
3686

    
3687
    _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
3688

    
3689
    # os verification
3690
    os_obj = self.rpc.call_os_get(pnode.name, self.op.os_type)
3691
    if not os_obj:
3692
      raise errors.OpPrereqError("OS '%s' not in supported os list for"
3693
                                 " primary node"  % self.op.os_type)
3694

    
3695
    # bridge check on primary node
3696
    bridges = [n.bridge for n in self.nics]
3697
    if not self.rpc.call_bridges_exist(self.pnode.name, bridges):
3698
      raise errors.OpPrereqError("one of the target bridges '%s' does not"
3699
                                 " exist on"
3700
                                 " destination node '%s'" %
3701
                                 (",".join(bridges), pnode.name))
3702

    
3703
    # memory check on primary node
3704
    if self.op.start:
3705
      _CheckNodeFreeMemory(self, self.pnode.name,
3706
                           "creating instance %s" % self.op.instance_name,
3707
                           self.be_full[constants.BE_MEMORY],
3708
                           self.op.hypervisor)
3709

    
3710
    if self.op.start:
3711
      self.instance_status = 'up'
3712
    else:
3713
      self.instance_status = 'down'
3714

    
3715
  def Exec(self, feedback_fn):
3716
    """Create and add the instance to the cluster.
3717

3718
    """
3719
    instance = self.op.instance_name
3720
    pnode_name = self.pnode.name
3721

    
3722
    for nic in self.nics:
3723
      if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
3724
        nic.mac = self.cfg.GenerateMAC()
3725

    
3726
    ht_kind = self.op.hypervisor
3727
    if ht_kind in constants.HTS_REQ_PORT:
3728
      network_port = self.cfg.AllocatePort()
3729
    else:
3730
      network_port = None
3731

    
3732
    ##if self.op.vnc_bind_address is None:
3733
    ##  self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
3734

    
3735
    # this is needed because os.path.join does not accept None arguments
3736
    if self.op.file_storage_dir is None:
3737
      string_file_storage_dir = ""
3738
    else:
3739
      string_file_storage_dir = self.op.file_storage_dir
3740

    
3741
    # build the full file storage dir path
3742
    file_storage_dir = os.path.normpath(os.path.join(
3743
                                        self.cfg.GetFileStorageDir(),
3744
                                        string_file_storage_dir, instance))
3745

    
3746

    
3747
    disks = _GenerateDiskTemplate(self,
3748
                                  self.op.disk_template,
3749
                                  instance, pnode_name,
3750
                                  self.secondaries,
3751
                                  self.disks,
3752
                                  file_storage_dir,
3753
                                  self.op.file_driver,
3754
                                  0)
3755

    
3756
    iobj = objects.Instance(name=instance, os=self.op.os_type,
3757
                            primary_node=pnode_name,
3758
                            nics=self.nics, disks=disks,
3759
                            disk_template=self.op.disk_template,
3760
                            status=self.instance_status,
3761
                            network_port=network_port,
3762
                            beparams=self.op.beparams,
3763
                            hvparams=self.op.hvparams,
3764
                            hypervisor=self.op.hypervisor,
3765
                            )
3766

    
3767
    feedback_fn("* creating instance disks...")
3768
    if not _CreateDisks(self, iobj):
3769
      _RemoveDisks(self, iobj)
3770
      self.cfg.ReleaseDRBDMinors(instance)
3771
      raise errors.OpExecError("Device creation failed, reverting...")
3772

    
3773
    feedback_fn("adding instance %s to cluster config" % instance)
3774

    
3775
    self.cfg.AddInstance(iobj)
3776
    # Declare that we don't want to remove the instance lock anymore, as we've
3777
    # added the instance to the config
3778
    del self.remove_locks[locking.LEVEL_INSTANCE]
3779
    # Remove the temp. assignements for the instance's drbds
3780
    self.cfg.ReleaseDRBDMinors(instance)
3781
    # Unlock all the nodes
3782
    self.context.glm.release(locking.LEVEL_NODE)
3783
    del self.acquired_locks[locking.LEVEL_NODE]
3784

    
3785
    if self.op.wait_for_sync:
3786
      disk_abort = not _WaitForSync(self, iobj)
3787
    elif iobj.disk_template in constants.DTS_NET_MIRROR:
3788
      # make sure the disks are not degraded (still sync-ing is ok)
3789
      time.sleep(15)
3790
      feedback_fn("* checking mirrors status")
3791
      disk_abort = not _WaitForSync(self, iobj, oneshot=True)
3792
    else:
3793
      disk_abort = False
3794

    
3795
    if disk_abort:
3796
      _RemoveDisks(self, iobj)
3797
      self.cfg.RemoveInstance(iobj.name)
3798
      # Make sure the instance lock gets removed
3799
      self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
3800
      raise errors.OpExecError("There are some degraded disks for"
3801
                               " this instance")
3802

    
3803
    feedback_fn("creating os for instance %s on node %s" %
3804
                (instance, pnode_name))
3805

    
3806
    if iobj.disk_template != constants.DT_DISKLESS:
3807
      if self.op.mode == constants.INSTANCE_CREATE:
3808
        feedback_fn("* running the instance OS create scripts...")
3809
        if not self.rpc.call_instance_os_add(pnode_name, iobj):
3810
          raise errors.OpExecError("could not add os for instance %s"
3811
                                   " on node %s" %
3812
                                   (instance, pnode_name))
3813

    
3814
      elif self.op.mode == constants.INSTANCE_IMPORT:
3815
        feedback_fn("* running the instance OS import scripts...")
3816
        src_node = self.op.src_node
3817
        src_images = self.src_images
3818
        cluster_name = self.cfg.GetClusterName()
3819
        import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
3820
                                                         src_node, src_images,
3821
                                                         cluster_name)
3822
        for idx, result in enumerate(import_result):
3823
          if not result:
3824
            self.LogWarning("Could not image %s for on instance %s, disk %d,"
3825
                            " on node %s" % (src_images[idx], instance, idx,
3826
                                             pnode_name))
3827
      else:
3828
        # also checked in the prereq part
3829
        raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3830
                                     % self.op.mode)
3831

    
3832
    if self.op.start:
3833
      logging.info("Starting instance %s on node %s", instance, pnode_name)
3834
      feedback_fn("* starting instance...")
3835
      if not self.rpc.call_instance_start(pnode_name, iobj, None):
3836
        raise errors.OpExecError("Could not start instance")
3837

    
3838

    
3839
class LUConnectConsole(NoHooksLU):
3840
  """Connect to an instance's console.
3841

3842
  This is somewhat special in that it returns the command line that
3843
  you need to run on the master node in order to connect to the
3844
  console.
3845

3846
  """
3847
  _OP_REQP = ["instance_name"]
3848
  REQ_BGL = False
3849

    
3850
  def ExpandNames(self):
3851
    self._ExpandAndLockInstance()
3852

    
3853
  def CheckPrereq(self):
3854
    """Check prerequisites.
3855

3856
    This checks that the instance is in the cluster.
3857

3858
    """
3859
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3860
    assert self.instance is not None, \
3861
      "Cannot retrieve locked instance %s" % self.op.instance_name
3862

    
3863
  def Exec(self, feedback_fn):
3864
    """Connect to the console of an instance
3865

3866
    """
3867
    instance = self.instance
3868
    node = instance.primary_node
3869

    
3870
    node_insts = self.rpc.call_instance_list([node],
3871
                                             [instance.hypervisor])[node]
3872
    if node_insts is False:
3873
      raise errors.OpExecError("Can't connect to node %s." % node)
3874

    
3875
    if instance.name not in node_insts:
3876
      raise errors.OpExecError("Instance %s is not running." % instance.name)
3877

    
3878
    logging.debug("Connecting to console of %s on %s", instance.name, node)
3879

    
3880
    hyper = hypervisor.GetHypervisor(instance.hypervisor)
3881
    console_cmd = hyper.GetShellCommandForConsole(instance)
3882

    
3883
    # build ssh cmdline
3884
    return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
3885

    
3886

    
3887
class LUReplaceDisks(LogicalUnit):
3888
  """Replace the disks of an instance.
3889

3890
  """
3891
  HPATH = "mirrors-replace"
3892
  HTYPE = constants.HTYPE_INSTANCE
3893
  _OP_REQP = ["instance_name", "mode", "disks"]
3894
  REQ_BGL = False
3895

    
3896
  def ExpandNames(self):
3897
    self._ExpandAndLockInstance()
3898

    
3899
    if not hasattr(self.op, "remote_node"):
3900
      self.op.remote_node = None
3901

    
3902
    ia_name = getattr(self.op, "iallocator", None)
3903
    if ia_name is not None:
3904
      if self.op.remote_node is not None:
3905
        raise errors.OpPrereqError("Give either the iallocator or the new"
3906
                                   " secondary, not both")
3907
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3908
    elif self.op.remote_node is not None:
3909
      remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
3910
      if remote_node is None:
3911
        raise errors.OpPrereqError("Node '%s' not known" %
3912
                                   self.op.remote_node)
3913
      self.op.remote_node = remote_node
3914
      self.needed_locks[locking.LEVEL_NODE] = [remote_node]
3915
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
3916
    else:
3917
      self.needed_locks[locking.LEVEL_NODE] = []
3918
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3919

    
3920
  def DeclareLocks(self, level):
3921
    # If we're not already locking all nodes in the set we have to declare the
3922
    # instance's primary/secondary nodes.
3923
    if (level == locking.LEVEL_NODE and
3924
        self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
3925
      self._LockInstancesNodes()
3926

    
3927
  def _RunAllocator(self):
3928
    """Compute a new secondary node using an IAllocator.
3929

3930
    """
3931
    ial = IAllocator(self,
3932
                     mode=constants.IALLOCATOR_MODE_RELOC,
3933
                     name=self.op.instance_name,
3934
                     relocate_from=[self.sec_node])
3935

    
3936
    ial.Run(self.op.iallocator)
3937

    
3938
    if not ial.success:
3939
      raise errors.OpPrereqError("Can't compute nodes using"
3940
                                 " iallocator '%s': %s" % (self.op.iallocator,
3941
                                                           ial.info))
3942
    if len(ial.nodes) != ial.required_nodes:
3943
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3944
                                 " of nodes (%s), required %s" %
3945
                                 (len(ial.nodes), ial.required_nodes))
3946
    self.op.remote_node = ial.nodes[0]
3947
    self.LogInfo("Selected new secondary for the instance: %s",
3948
                 self.op.remote_node)
3949

    
3950
  def BuildHooksEnv(self):
3951
    """Build hooks env.
3952

3953
    This runs on the master, the primary and all the secondaries.
3954

3955
    """
3956
    env = {
3957
      "MODE": self.op.mode,
3958
      "NEW_SECONDARY": self.op.remote_node,
3959
      "OLD_SECONDARY": self.instance.secondary_nodes[0],
3960
      }
3961
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3962
    nl = [
3963
      self.cfg.GetMasterNode(),
3964
      self.instance.primary_node,
3965
      ]
3966
    if self.op.remote_node is not None:
3967
      nl.append(self.op.remote_node)
3968
    return env, nl, nl
3969

    
3970
  def CheckPrereq(self):
3971
    """Check prerequisites.
3972

3973
    This checks that the instance is in the cluster.
3974

3975
    """
3976
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3977
    assert instance is not None, \
3978
      "Cannot retrieve locked instance %s" % self.op.instance_name
3979
    self.instance = instance
3980

    
3981
    if instance.disk_template not in constants.DTS_NET_MIRROR:
3982
      raise errors.OpPrereqError("Instance's disk layout is not"
3983
                                 " network mirrored.")
3984

    
3985
    if len(instance.secondary_nodes) != 1:
3986
      raise errors.OpPrereqError("The instance has a strange layout,"
3987
                                 " expected one secondary but found %d" %
3988
                                 len(instance.secondary_nodes))
3989

    
3990
    self.sec_node = instance.secondary_nodes[0]
3991

    
3992
    ia_name = getattr(self.op, "iallocator", None)
3993
    if ia_name is not None:
3994
      self._RunAllocator()
3995

    
3996
    remote_node = self.op.remote_node
3997
    if remote_node is not None:
3998
      self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3999
      assert self.remote_node_info is not None, \
4000
        "Cannot retrieve locked node %s" % remote_node
4001
    else:
4002
      self.remote_node_info = None
4003
    if remote_node == instance.primary_node:
4004
      raise errors.OpPrereqError("The specified node is the primary node of"
4005
                                 " the instance.")
4006
    elif remote_node == self.sec_node:
4007
      if self.op.mode == constants.REPLACE_DISK_SEC:
4008
        # this is for DRBD8, where we can't execute the same mode of
4009
        # replacement as for drbd7 (no different port allocated)
4010
        raise errors.OpPrereqError("Same secondary given, cannot execute"
4011
                                   " replacement")
4012
    if instance.disk_template == constants.DT_DRBD8:
4013
      if (self.op.mode == constants.REPLACE_DISK_ALL and
4014
          remote_node is not None):
4015
        # switch to replace secondary mode
4016
        self.op.mode = constants.REPLACE_DISK_SEC
4017

    
4018
      if self.op.mode == constants.REPLACE_DISK_ALL:
4019
        raise errors.OpPrereqError("Template 'drbd' only allows primary or"
4020
                                   " secondary disk replacement, not"
4021
                                   " both at once")
4022
      elif self.op.mode == constants.REPLACE_DISK_PRI:
4023
        if remote_node is not None:
4024
          raise errors.OpPrereqError("Template 'drbd' does not allow changing"
4025
                                     " the secondary while doing a primary"
4026
                                     " node disk replacement")
4027
        self.tgt_node = instance.primary_node
4028
        self.oth_node = instance.secondary_nodes[0]
4029
      elif self.op.mode == constants.REPLACE_DISK_SEC:
4030
        self.new_node = remote_node # this can be None, in which case
4031
                                    # we don't change the secondary
4032
        self.tgt_node = instance.secondary_nodes[0]
4033
        self.oth_node = instance.primary_node
4034
      else:
4035
        raise errors.ProgrammerError("Unhandled disk replace mode")
4036

    
4037
    if not self.op.disks:
4038
      self.op.disks = range(len(instance.disks))
4039

    
4040
    for disk_idx in self.op.disks:
4041
      instance.FindDisk(disk_idx)
4042

    
4043
  def _ExecD8DiskOnly(self, feedback_fn):
4044
    """Replace a disk on the primary or secondary for dbrd8.
4045

4046
    The algorithm for replace is quite complicated:
4047

4048
      1. for each disk to be replaced:
4049

4050
        1. create new LVs on the target node with unique names
4051
        1. detach old LVs from the drbd device
4052
        1. rename old LVs to name_replaced.<time_t>
4053
        1. rename new LVs to old LVs
4054
        1. attach the new LVs (with the old names now) to the drbd device
4055

4056
      1. wait for sync across all devices
4057

4058
      1. for each modified disk:
4059

4060
        1. remove old LVs (which have the name name_replaces.<time_t>)
4061

4062
    Failures are not very well handled.
4063

4064
    """
4065
    steps_total = 6
4066
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
4067
    instance = self.instance
4068
    iv_names = {}
4069
    vgname = self.cfg.GetVGName()
4070
    # start of work
4071
    cfg = self.cfg
4072
    tgt_node = self.tgt_node
4073
    oth_node = self.oth_node
4074

    
4075
    # Step: check device activation
4076
    self.proc.LogStep(1, steps_total, "check device existence")
4077
    info("checking volume groups")
4078
    my_vg = cfg.GetVGName()
4079
    results = self.rpc.call_vg_list([oth_node, tgt_node])
4080
    if not results:
4081
      raise errors.OpExecError("Can't list volume groups on the nodes")
4082
    for node in oth_node, tgt_node:
4083
      res = results.get(node, False)
4084
      if not res or my_vg not in res:
4085
        raise errors.OpExecError("Volume group '%s' not found on %s" %
4086
                                 (my_vg, node))
4087
    for idx, dev in enumerate(instance.disks):
4088
      if idx not in self.op.disks:
4089
        continue
4090
      for node in tgt_node, oth_node:
4091
        info("checking disk/%d on %s" % (idx, node))
4092
        cfg.SetDiskID(dev, node)
4093
        if not self.rpc.call_blockdev_find(node, dev):
4094
          raise errors.OpExecError("Can't find disk/%d on node %s" %
4095
                                   (idx, node))
4096

    
4097
    # Step: check other node consistency
4098
    self.proc.LogStep(2, steps_total, "check peer consistency")
4099
    for idx, dev in enumerate(instance.disks):
4100
      if idx not in self.op.disks:
4101
        continue
4102
      info("checking disk/%d consistency on %s" % (idx, oth_node))
4103
      if not _CheckDiskConsistency(self, dev, oth_node,
4104
                                   oth_node==instance.primary_node):
4105
        raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
4106
                                 " to replace disks on this node (%s)" %
4107
                                 (oth_node, tgt_node))
4108

    
4109
    # Step: create new storage
4110
    self.proc.LogStep(3, steps_total, "allocate new storage")
4111
    for idx, dev in enumerate(instance.disks):
4112
      if idx not in self.op.disks:
4113
        continue
4114
      size = dev.size
4115
      cfg.SetDiskID(dev, tgt_node)
4116
      lv_names = [".disk%d_%s" % (idx, suf)
4117
                  for suf in ["data", "meta"]]
4118
      names = _GenerateUniqueNames(self, lv_names)
4119
      lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
4120
                             logical_id=(vgname, names[0]))
4121
      lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
4122
                             logical_id=(vgname, names[1]))
4123
      new_lvs = [lv_data, lv_meta]
4124
      old_lvs = dev.children
4125
      iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
4126
      info("creating new local storage on %s for %s" %
4127
           (tgt_node, dev.iv_name))
4128
      # since we *always* want to create this LV, we use the
4129
      # _Create...OnPrimary (which forces the creation), even if we
4130
      # are talking about the secondary node
4131
      for new_lv in new_lvs:
4132
        if not _CreateBlockDevOnPrimary(self, tgt_node, instance, new_lv,
4133
                                        _GetInstanceInfoText(instance)):
4134
          raise errors.OpExecError("Failed to create new LV named '%s' on"
4135
                                   " node '%s'" %
4136
                                   (new_lv.logical_id[1], tgt_node))
4137

    
4138
    # Step: for each lv, detach+rename*2+attach
4139
    self.proc.LogStep(4, steps_total, "change drbd configuration")
4140
    for dev, old_lvs, new_lvs in iv_names.itervalues():
4141
      info("detaching %s drbd from local storage" % dev.iv_name)
4142
      if not self.rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
4143
        raise errors.OpExecError("Can't detach drbd from local storage on node"
4144
                                 " %s for device %s" % (tgt_node, dev.iv_name))
4145
      #dev.children = []
4146
      #cfg.Update(instance)
4147

    
4148
      # ok, we created the new LVs, so now we know we have the needed
4149
      # storage; as such, we proceed on the target node to rename
4150
      # old_lv to _old, and new_lv to old_lv; note that we rename LVs
4151
      # using the assumption that logical_id == physical_id (which in
4152
      # turn is the unique_id on that node)
4153

    
4154
      # FIXME(iustin): use a better name for the replaced LVs
4155
      temp_suffix = int(time.time())
4156
      ren_fn = lambda d, suff: (d.physical_id[0],
4157
                                d.physical_id[1] + "_replaced-%s" % suff)
4158
      # build the rename list based on what LVs exist on the node
4159
      rlist = []
4160
      for to_ren in old_lvs:
4161
        find_res = self.rpc.call_blockdev_find(tgt_node, to_ren)
4162
        if find_res is not None: # device exists
4163
          rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
4164

    
4165
      info("renaming the old LVs on the target node")
4166
      if not self.rpc.call_blockdev_rename(tgt_node, rlist):
4167
        raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
4168
      # now we rename the new LVs to the old LVs
4169
      info("renaming the new LVs on the target node")
4170
      rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
4171
      if not self.rpc.call_blockdev_rename(tgt_node, rlist):
4172
        raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
4173

    
4174
      for old, new in zip(old_lvs, new_lvs):
4175
        new.logical_id = old.logical_id
4176
        cfg.SetDiskID(new, tgt_node)
4177

    
4178
      for disk in old_lvs:
4179
        disk.logical_id = ren_fn(disk, temp_suffix)
4180
        cfg.SetDiskID(disk, tgt_node)
4181

    
4182
      # now that the new lvs have the old name, we can add them to the device
4183
      info("adding new mirror component on %s" % tgt_node)
4184
      if not self.rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
4185
        for new_lv in new_lvs:
4186
          if not self.rpc.call_blockdev_remove(tgt_node, new_lv):
4187
            warning("Can't rollback device %s", hint="manually cleanup unused"
4188
                    " logical volumes")
4189
        raise errors.OpExecError("Can't add local storage to drbd")
4190

    
4191
      dev.children = new_lvs
4192
      cfg.Update(instance)
4193

    
4194
    # Step: wait for sync
4195

    
4196
    # this can fail as the old devices are degraded and _WaitForSync
4197
    # does a combined result over all disks, so we don't check its
4198
    # return value
4199
    self.proc.LogStep(5, steps_total, "sync devices")
4200
    _WaitForSync(self, instance, unlock=True)
4201

    
4202
    # so check manually all the devices
4203
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
4204
      cfg.SetDiskID(dev, instance.primary_node)
4205
      is_degr = self.rpc.call_blockdev_find(instance.primary_node, dev)[5]
4206
      if is_degr:
4207
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
4208

    
4209
    # Step: remove old storage
4210
    self.proc.LogStep(6, steps_total, "removing old storage")
4211
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
4212
      info("remove logical volumes for %s" % name)
4213
      for lv in old_lvs:
4214
        cfg.SetDiskID(lv, tgt_node)
4215
        if not self.rpc.call_blockdev_remove(tgt_node, lv):
4216
          warning("Can't remove old LV", hint="manually remove unused LVs")
4217
          continue
4218

    
4219
  def _ExecD8Secondary(self, feedback_fn):
4220
    """Replace the secondary node for drbd8.
4221

4222
    The algorithm for replace is quite complicated:
4223
      - for all disks of the instance:
4224
        - create new LVs on the new node with same names
4225
        - shutdown the drbd device on the old secondary
4226
        - disconnect the drbd network on the primary
4227
        - create the drbd device on the new secondary
4228
        - network attach the drbd on the primary, using an artifice:
4229
          the drbd code for Attach() will connect to the network if it
4230
          finds a device which is connected to the good local disks but
4231
          not network enabled
4232
      - wait for sync across all devices
4233
      - remove all disks from the old secondary
4234

4235
    Failures are not very well handled.
4236

4237
    """
4238
    steps_total = 6
4239
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
4240
    instance = self.instance
4241
    iv_names = {}
4242
    vgname = self.cfg.GetVGName()
4243
    # start of work
4244
    cfg = self.cfg
4245
    old_node = self.tgt_node
4246
    new_node = self.new_node
4247
    pri_node = instance.primary_node
4248

    
4249
    # Step: check device activation
4250
    self.proc.LogStep(1, steps_total, "check device existence")
4251
    info("checking volume groups")
4252
    my_vg = cfg.GetVGName()
4253
    results = self.rpc.call_vg_list([pri_node, new_node])
4254
    if not results:
4255
      raise errors.OpExecError("Can't list volume groups on the nodes")
4256
    for node in pri_node, new_node:
4257
      res = results.get(node, False)
4258
      if not res or my_vg not in res:
4259
        raise errors.OpExecError("Volume group '%s' not found on %s" %
4260
                                 (my_vg, node))
4261
    for idx, dev in enumerate(instance.disks):
4262
      if idx not in self.op.disks:
4263
        continue
4264
      info("checking disk/%d on %s" % (idx, pri_node))
4265
      cfg.SetDiskID(dev, pri_node)
4266
      if not self.rpc.call_blockdev_find(pri_node, dev):
4267
        raise errors.OpExecError("Can't find disk/%d on node %s" %
4268
                                 (idx, pri_node))
4269

    
4270
    # Step: check other node consistency
4271
    self.proc.LogStep(2, steps_total, "check peer consistency")
4272
    for idx, dev in enumerate(instance.disks):
4273
      if idx not in self.op.disks:
4274
        continue
4275
      info("checking disk/%d consistency on %s" % (idx, pri_node))
4276
      if not _CheckDiskConsistency(self, dev, pri_node, True, ldisk=True):
4277
        raise errors.OpExecError("Primary node (%s) has degraded storage,"
4278
                                 " unsafe to replace the secondary" %
4279
                                 pri_node)
4280

    
4281
    # Step: create new storage
4282
    self.proc.LogStep(3, steps_total, "allocate new storage")
4283
    for idx, dev in enumerate(instance.disks):
4284
      size = dev.size
4285
      info("adding new local storage on %s for disk/%d" %
4286
           (new_node, idx))
4287
      # since we *always* want to create this LV, we use the
4288
      # _Create...OnPrimary (which forces the creation), even if we
4289
      # are talking about the secondary node
4290
      for new_lv in dev.children:
4291
        if not _CreateBlockDevOnPrimary(self, new_node, instance, new_lv,
4292
                                        _GetInstanceInfoText(instance)):
4293
          raise errors.OpExecError("Failed to create new LV named '%s' on"
4294
                                   " node '%s'" %
4295
                                   (new_lv.logical_id[1], new_node))
4296

    
4297
    # Step 4: dbrd minors and drbd setups changes
4298
    # after this, we must manually remove the drbd minors on both the
4299
    # error and the success paths
4300
    minors = cfg.AllocateDRBDMinor([new_node for dev in instance.disks],
4301
                                   instance.name)
4302
    logging.debug("Allocated minors %s" % (minors,))
4303
    self.proc.LogStep(4, steps_total, "changing drbd configuration")
4304
    for idx, (dev, new_minor) in enumerate(zip(instance.disks, minors)):
4305
      size = dev.size
4306
      info("activating a new drbd on %s for disk/%d" % (new_node, idx))
4307
      # create new devices on new_node
4308
      if pri_node == dev.logical_id[0]:
4309
        new_logical_id = (pri_node, new_node,
4310
                          dev.logical_id[2], dev.logical_id[3], new_minor,
4311
                          dev.logical_id[5])
4312
      else:
4313
        new_logical_id = (new_node, pri_node,
4314
                          dev.logical_id[2], new_minor, dev.logical_id[4],
4315
                          dev.logical_id[5])
4316
      iv_names[idx] = (dev, dev.children, new_logical_id)
4317
      logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
4318
                    new_logical_id)
4319
      new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
4320
                              logical_id=new_logical_id,
4321
                              children=dev.children)
4322
      if not _CreateBlockDevOnSecondary(self, new_node, instance,
4323
                                        new_drbd, False,
4324
                                        _GetInstanceInfoText(instance)):
4325
        self.cfg.ReleaseDRBDMinors(instance.name)
4326
        raise errors.OpExecError("Failed to create new DRBD on"
4327
                                 " node '%s'" % new_node)
4328

    
4329
    for idx, dev in enumerate(instance.disks):
4330
      # we have new devices, shutdown the drbd on the old secondary
4331
      info("shutting down drbd for disk/%d on old node" % idx)
4332
      cfg.SetDiskID(dev, old_node)
4333
      if not self.rpc.call_blockdev_shutdown(old_node, dev):
4334
        warning("Failed to shutdown drbd for disk/%d on old node" % idx,
4335
                hint="Please cleanup this device manually as soon as possible")
4336

    
4337
    info("detaching primary drbds from the network (=> standalone)")
4338
    done = 0
4339
    for idx, dev in enumerate(instance.disks):
4340
      cfg.SetDiskID(dev, pri_node)
4341
      # set the network part of the physical (unique in bdev terms) id
4342
      # to None, meaning detach from network
4343
      dev.physical_id = (None, None, None, None) + dev.physical_id[4:]
4344
      # and 'find' the device, which will 'fix' it to match the
4345
      # standalone state
4346
      if self.rpc.call_blockdev_find(pri_node, dev):
4347
        done += 1
4348
      else:
4349
        warning("Failed to detach drbd disk/%d from network, unusual case" %
4350
                idx)
4351

    
4352
    if not done:
4353
      # no detaches succeeded (very unlikely)
4354
      self.cfg.ReleaseDRBDMinors(instance.name)
4355
      raise errors.OpExecError("Can't detach at least one DRBD from old node")
4356

    
4357
    # if we managed to detach at least one, we update all the disks of
4358
    # the instance to point to the new secondary
4359
    info("updating instance configuration")
4360
    for dev, _, new_logical_id in iv_names.itervalues():
4361
      dev.logical_id = new_logical_id
4362
      cfg.SetDiskID(dev, pri_node)
4363
    cfg.Update(instance)
4364
    # we can remove now the temp minors as now the new values are
4365
    # written to the config file (and therefore stable)
4366
    self.cfg.ReleaseDRBDMinors(instance.name)
4367

    
4368
    # and now perform the drbd attach
4369
    info("attaching primary drbds to new secondary (standalone => connected)")
4370
    failures = []
4371
    for idx, dev in enumerate(instance.disks):
4372
      info("attaching primary drbd for disk/%d to new secondary node" % idx)
4373
      # since the attach is smart, it's enough to 'find' the device,
4374
      # it will automatically activate the network, if the physical_id
4375
      # is correct
4376
      cfg.SetDiskID(dev, pri_node)
4377
      logging.debug("Disk to attach: %s", dev)
4378
      if not self.rpc.call_blockdev_find(pri_node, dev):
4379
        warning("can't attach drbd disk/%d to new secondary!" % idx,
4380
                "please do a gnt-instance info to see the status of disks")
4381

    
4382
    # this can fail as the old devices are degraded and _WaitForSync
4383
    # does a combined result over all disks, so we don't check its
4384
    # return value
4385
    self.proc.LogStep(5, steps_total, "sync devices")
4386
    _WaitForSync(self, instance, unlock=True)
4387

    
4388
    # so check manually all the devices
4389
    for idx, (dev, old_lvs, _) in iv_names.iteritems():
4390
      cfg.SetDiskID(dev, pri_node)
4391
      is_degr = self.rpc.call_blockdev_find(pri_node, dev)[5]
4392
      if is_degr:
4393
        raise errors.OpExecError("DRBD device disk/%d is degraded!" % idx)
4394

    
4395
    self.proc.LogStep(6, steps_total, "removing old storage")
4396
    for idx, (dev, old_lvs, _) in iv_names.iteritems():
4397
      info("remove logical volumes for disk/%d" % idx)
4398
      for lv in old_lvs:
4399
        cfg.SetDiskID(lv, old_node)
4400
        if not self.rpc.call_blockdev_remove(old_node, lv):
4401
          warning("Can't remove LV on old secondary",
4402
                  hint="Cleanup stale volumes by hand")
4403

    
4404
  def Exec(self, feedback_fn):
4405
    """Execute disk replacement.
4406

4407
    This dispatches the disk replacement to the appropriate handler.
4408

4409
    """
4410
    instance = self.instance
4411

    
4412
    # Activate the instance disks if we're replacing them on a down instance
4413
    if instance.status == "down":
4414
      _StartInstanceDisks(self, instance, True)
4415

    
4416
    if instance.disk_template == constants.DT_DRBD8:
4417
      if self.op.remote_node is None:
4418
        fn = self._ExecD8DiskOnly
4419
      else:
4420
        fn = self._ExecD8Secondary
4421
    else:
4422
      raise errors.ProgrammerError("Unhandled disk replacement case")
4423

    
4424
    ret = fn(feedback_fn)
4425

    
4426
    # Deactivate the instance disks if we're replacing them on a down instance
4427
    if instance.status == "down":
4428
      _SafeShutdownInstanceDisks(self, instance)
4429

    
4430
    return ret
4431

    
4432

    
4433
class LUGrowDisk(LogicalUnit):
4434
  """Grow a disk of an instance.
4435

4436
  """
4437
  HPATH = "disk-grow"
4438
  HTYPE = constants.HTYPE_INSTANCE
4439
  _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
4440
  REQ_BGL = False
4441

    
4442
  def ExpandNames(self):
4443
    self._ExpandAndLockInstance()
4444
    self.needed_locks[locking.LEVEL_NODE] = []
4445
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4446

    
4447
  def DeclareLocks(self, level):
4448
    if level == locking.LEVEL_NODE:
4449
      self._LockInstancesNodes()
4450

    
4451
  def BuildHooksEnv(self):
4452
    """Build hooks env.
4453

4454
    This runs on the master, the primary and all the secondaries.
4455

4456
    """
4457
    env = {
4458
      "DISK": self.op.disk,
4459
      "AMOUNT": self.op.amount,
4460
      }
4461
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
4462
    nl = [
4463
      self.cfg.GetMasterNode(),
4464
      self.instance.primary_node,
4465
      ]
4466
    return env, nl, nl
4467

    
4468
  def CheckPrereq(self):
4469
    """Check prerequisites.
4470

4471
    This checks that the instance is in the cluster.
4472

4473
    """
4474
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4475
    assert instance is not None, \
4476
      "Cannot retrieve locked instance %s" % self.op.instance_name
4477

    
4478
    self.instance = instance
4479

    
4480
    if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
4481
      raise errors.OpPrereqError("Instance's disk layout does not support"
4482
                                 " growing.")
4483

    
4484
    self.disk = instance.FindDisk(self.op.disk)
4485

    
4486
    nodenames = [instance.primary_node] + list(instance.secondary_nodes)
4487
    nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
4488
                                       instance.hypervisor)
4489
    for node in nodenames:
4490
      info = nodeinfo.get(node, None)
4491
      if not info:
4492
        raise errors.OpPrereqError("Cannot get current information"
4493
                                   " from node '%s'" % node)
4494
      vg_free = info.get('vg_free', None)
4495
      if not isinstance(vg_free, int):
4496
        raise errors.OpPrereqError("Can't compute free disk space on"
4497
                                   " node %s" % node)
4498
      if self.op.amount > info['vg_free']:
4499
        raise errors.OpPrereqError("Not enough disk space on target node %s:"
4500
                                   " %d MiB available, %d MiB required" %
4501
                                   (node, info['vg_free'], self.op.amount))
4502

    
4503
  def Exec(self, feedback_fn):
4504
    """Execute disk grow.
4505

4506
    """
4507
    instance = self.instance
4508
    disk = self.disk
4509
    for node in (instance.secondary_nodes + (instance.primary_node,)):
4510
      self.cfg.SetDiskID(disk, node)
4511
      result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
4512
      if (not result or not isinstance(result, (list, tuple)) or
4513
          len(result) != 2):
4514
        raise errors.OpExecError("grow request failed to node %s" % node)
4515
      elif not result[0]:
4516
        raise errors.OpExecError("grow request failed to node %s: %s" %
4517
                                 (node, result[1]))
4518
    disk.RecordGrow(self.op.amount)
4519
    self.cfg.Update(instance)
4520
    if self.op.wait_for_sync:
4521
      disk_abort = not _WaitForSync(self, instance)
4522
      if disk_abort:
4523
        self.proc.LogWarning("Warning: disk sync-ing has not returned a good"
4524
                             " status.\nPlease check the instance.")
4525

    
4526

    
4527
class LUQueryInstanceData(NoHooksLU):
4528
  """Query runtime instance data.
4529

4530
  """
4531
  _OP_REQP = ["instances", "static"]
4532
  REQ_BGL = False
4533

    
4534
  def ExpandNames(self):
4535
    self.needed_locks = {}
4536
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
4537

    
4538
    if not isinstance(self.op.instances, list):
4539
      raise errors.OpPrereqError("Invalid argument type 'instances'")
4540

    
4541
    if self.op.instances:
4542
      self.wanted_names = []
4543
      for name in self.op.instances:
4544
        full_name = self.cfg.ExpandInstanceName(name)
4545
        if full_name is None:
4546
          raise errors.OpPrereqError("Instance '%s' not known" %
4547
                                     self.op.instance_name)
4548
        self.wanted_names.append(full_name)
4549
      self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
4550
    else:
4551
      self.wanted_names = None
4552
      self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
4553

    
4554
    self.needed_locks[locking.LEVEL_NODE] = []
4555
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4556

    
4557
  def DeclareLocks(self, level):
4558
    if level == locking.LEVEL_NODE:
4559
      self._LockInstancesNodes()
4560

    
4561
  def CheckPrereq(self):
4562
    """Check prerequisites.
4563

4564
    This only checks the optional instance list against the existing names.
4565

4566
    """
4567
    if self.wanted_names is None:
4568
      self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
4569

    
4570
    self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
4571
                             in self.wanted_names]
4572
    return
4573

    
4574
  def _ComputeDiskStatus(self, instance, snode, dev):
4575
    """Compute block device status.
4576

4577
    """
4578
    static = self.op.static
4579
    if not static:
4580
      self.cfg.SetDiskID(dev, instance.primary_node)
4581
      dev_pstatus = self.rpc.call_blockdev_find(instance.primary_node, dev)
4582
    else:
4583
      dev_pstatus = None
4584

    
4585
    if dev.dev_type in constants.LDS_DRBD:
4586
      # we change the snode then (otherwise we use the one passed in)
4587
      if dev.logical_id[0] == instance.primary_node:
4588
        snode = dev.logical_id[1]
4589
      else:
4590
        snode = dev.logical_id[0]
4591

    
4592
    if snode and not static:
4593
      self.cfg.SetDiskID(dev, snode)
4594
      dev_sstatus = self.rpc.call_blockdev_find(snode, dev)
4595
    else:
4596
      dev_sstatus = None
4597

    
4598
    if dev.children:
4599
      dev_children = [self._ComputeDiskStatus(instance, snode, child)
4600
                      for child in dev.children]
4601
    else:
4602
      dev_children = []
4603

    
4604
    data = {
4605
      "iv_name": dev.iv_name,
4606
      "dev_type": dev.dev_type,
4607
      "logical_id": dev.logical_id,
4608
      "physical_id": dev.physical_id,
4609
      "pstatus": dev_pstatus,
4610
      "sstatus": dev_sstatus,
4611
      "children": dev_children,
4612
      "mode": dev.mode,
4613
      }
4614

    
4615
    return data
4616

    
4617
  def Exec(self, feedback_fn):
4618
    """Gather and return data"""
4619
    result = {}
4620

    
4621
    cluster = self.cfg.GetClusterInfo()
4622

    
4623
    for instance in self.wanted_instances:
4624
      if not self.op.static:
4625
        remote_info = self.rpc.call_instance_info(instance.primary_node,
4626
                                                  instance.name,
4627
                                                  instance.hypervisor)
4628
        if remote_info and "state" in remote_info:
4629
          remote_state = "up"
4630
        else:
4631
          remote_state = "down"
4632
      else:
4633
        remote_state = None
4634
      if instance.status == "down":
4635
        config_state = "down"
4636
      else:
4637
        config_state = "up"
4638

    
4639
      disks = [self._ComputeDiskStatus(instance, None, device)
4640
               for device in instance.disks]
4641

    
4642
      idict = {
4643
        "name": instance.name,
4644
        "config_state": config_state,
4645
        "run_state": remote_state,
4646
        "pnode": instance.primary_node,
4647
        "snodes": instance.secondary_nodes,
4648
        "os": instance.os,
4649
        "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
4650
        "disks": disks,
4651
        "hypervisor": instance.hypervisor,
4652
        "network_port": instance.network_port,
4653
        "hv_instance": instance.hvparams,
4654
        "hv_actual": cluster.FillHV(instance),
4655
        "be_instance": instance.beparams,
4656
        "be_actual": cluster.FillBE(instance),
4657
        }
4658

    
4659
      result[instance.name] = idict
4660

    
4661
    return result
4662

    
4663

    
4664
class LUSetInstanceParams(LogicalUnit):
4665
  """Modifies an instances's parameters.
4666

4667
  """
4668
  HPATH = "instance-modify"
4669
  HTYPE = constants.HTYPE_INSTANCE
4670
  _OP_REQP = ["instance_name", "hvparams"]
4671
  REQ_BGL = False
4672

    
4673
  def ExpandNames(self):
4674
    self._ExpandAndLockInstance()
4675
    self.needed_locks[locking.LEVEL_NODE] = []
4676
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4677

    
4678

    
4679
  def DeclareLocks(self, level):
4680
    if level == locking.LEVEL_NODE:
4681
      self._LockInstancesNodes()
4682

    
4683
  def BuildHooksEnv(self):
4684
    """Build hooks env.
4685

4686
    This runs on the master, primary and secondaries.
4687

4688
    """
4689
    args = dict()
4690
    if constants.BE_MEMORY in self.be_new:
4691
      args['memory'] = self.be_new[constants.BE_MEMORY]
4692
    if constants.BE_VCPUS in self.be_new:
4693
      args['vcpus'] = self.be_new[constants.BE_VCPUS]
4694
    if self.do_ip or self.do_bridge or self.mac:
4695
      if self.do_ip:
4696
        ip = self.ip
4697
      else:
4698
        ip = self.instance.nics[0].ip
4699
      if self.bridge:
4700
        bridge = self.bridge
4701
      else:
4702
        bridge = self.instance.nics[0].bridge
4703
      if self.mac:
4704
        mac = self.mac
4705
      else:
4706
        mac = self.instance.nics[0].mac
4707
      args['nics'] = [(ip, bridge, mac)]
4708
    env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
4709
    nl = [self.cfg.GetMasterNode(),
4710
          self.instance.primary_node] + list(self.instance.secondary_nodes)
4711
    return env, nl, nl
4712

    
4713
  def CheckPrereq(self):
4714
    """Check prerequisites.
4715

4716
    This only checks the instance list against the existing names.
4717

4718
    """
4719
    # FIXME: all the parameters could be checked before, in ExpandNames, or in
4720
    # a separate CheckArguments function, if we implement one, so the operation
4721
    # can be aborted without waiting for any lock, should it have an error...
4722
    self.ip = getattr(self.op, "ip", None)
4723
    self.mac = getattr(self.op, "mac", None)
4724
    self.bridge = getattr(self.op, "bridge", None)
4725
    self.kernel_path = getattr(self.op, "kernel_path", None)
4726
    self.initrd_path = getattr(self.op, "initrd_path", None)
4727
    self.force = getattr(self.op, "force", None)
4728
    all_parms = [self.ip, self.bridge, self.mac]
4729
    if (all_parms.count(None) == len(all_parms) and
4730
        not self.op.hvparams and
4731
        not self.op.beparams):
4732
      raise errors.OpPrereqError("No changes submitted")
4733
    for item in (constants.BE_MEMORY, constants.BE_VCPUS):
4734
      val = self.op.beparams.get(item, None)
4735
      if val is not None:
4736
        try:
4737
          val = int(val)
4738
        except ValueError, err:
4739
          raise errors.OpPrereqError("Invalid %s size: %s" % (item, str(err)))
4740
        self.op.beparams[item] = val
4741
    if self.ip is not None:
4742
      self.do_ip = True
4743
      if self.ip.lower() == "none":
4744
        self.ip = None
4745
      else:
4746
        if not utils.IsValidIP(self.ip):
4747
          raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
4748
    else:
4749
      self.do_ip = False
4750
    self.do_bridge = (self.bridge is not None)
4751
    if self.mac is not None:
4752
      if self.cfg.IsMacInUse(self.mac):
4753
        raise errors.OpPrereqError('MAC address %s already in use in cluster' %
4754
                                   self.mac)
4755
      if not utils.IsValidMac(self.mac):
4756
        raise errors.OpPrereqError('Invalid MAC address %s' % self.mac)
4757

    
4758
    # checking the new params on the primary/secondary nodes
4759

    
4760
    instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4761
    assert self.instance is not None, \
4762
      "Cannot retrieve locked instance %s" % self.op.instance_name
4763
    pnode = self.instance.primary_node
4764
    nodelist = [pnode]
4765
    nodelist.extend(instance.secondary_nodes)
4766

    
4767
    # hvparams processing
4768
    if self.op.hvparams:
4769
      i_hvdict = copy.deepcopy(instance.hvparams)
4770
      for key, val in self.op.hvparams.iteritems():
4771
        if val is None:
4772
          try:
4773
            del i_hvdict[key]
4774
          except KeyError:
4775
            pass
4776
        else:
4777
          i_hvdict[key] = val
4778
      cluster = self.cfg.GetClusterInfo()
4779
      hv_new = cluster.FillDict(cluster.hvparams[instance.hypervisor],
4780
                                i_hvdict)
4781
      # local check
4782
      hypervisor.GetHypervisor(
4783
        instance.hypervisor).CheckParameterSyntax(hv_new)
4784
      _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
4785
      self.hv_new = hv_new # the new actual values
4786
      self.hv_inst = i_hvdict # the new dict (without defaults)
4787
    else:
4788
      self.hv_new = self.hv_inst = {}
4789

    
4790
    # beparams processing
4791
    if self.op.beparams:
4792
      i_bedict = copy.deepcopy(instance.beparams)
4793
      for key, val in self.op.beparams.iteritems():
4794
        if val is None:
4795
          try:
4796
            del i_bedict[key]
4797
          except KeyError:
4798
            pass
4799
        else:
4800
          i_bedict[key] = val
4801
      cluster = self.cfg.GetClusterInfo()
4802
      be_new = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
4803
                                i_bedict)
4804
      self.be_new = be_new # the new actual values
4805
      self.be_inst = i_bedict # the new dict (without defaults)
4806
    else:
4807
      self.be_new = self.be_inst = {}
4808

    
4809
    self.warn = []
4810

    
4811
    if constants.BE_MEMORY in self.op.beparams and not self.force:
4812
      mem_check_list = [pnode]
4813
      if be_new[constants.BE_AUTO_BALANCE]:
4814
        # either we changed auto_balance to yes or it was from before
4815
        mem_check_list.extend(instance.secondary_nodes)
4816
      instance_info = self.rpc.call_instance_info(pnode, instance.name,
4817
                                                  instance.hypervisor)
4818
      nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(),
4819
                                         instance.hypervisor)
4820

    
4821
      if pnode not in nodeinfo or not isinstance(nodeinfo[pnode], dict):
4822
        # Assume the primary node is unreachable and go ahead
4823
        self.warn.append("Can't get info from primary node %s" % pnode)
4824
      else:
4825
        if instance_info:
4826
          current_mem = instance_info['memory']
4827
        else:
4828
          # Assume instance not running
4829
          # (there is a slight race condition here, but it's not very probable,
4830
          # and we have no other way to check)
4831
          current_mem = 0
4832
        miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
4833
                    nodeinfo[pnode]['memory_free'])
4834
        if miss_mem > 0:
4835
          raise errors.OpPrereqError("This change will prevent the instance"
4836
                                     " from starting, due to %d MB of memory"
4837
                                     " missing on its primary node" % miss_mem)
4838

    
4839
      if be_new[constants.BE_AUTO_BALANCE]:
4840
        for node in instance.secondary_nodes:
4841
          if node not in nodeinfo or not isinstance(nodeinfo[node], dict):
4842
            self.warn.append("Can't get info from secondary node %s" % node)
4843
          elif be_new[constants.BE_MEMORY] > nodeinfo[node]['memory_free']:
4844
            self.warn.append("Not enough memory to failover instance to"
4845
                             " secondary node %s" % node)
4846

    
4847
    return
4848

    
4849
  def Exec(self, feedback_fn):
4850
    """Modifies an instance.
4851

4852
    All parameters take effect only at the next restart of the instance.
4853
    """
4854
    # Process here the warnings from CheckPrereq, as we don't have a
4855
    # feedback_fn there.
4856
    for warn in self.warn:
4857
      feedback_fn("WARNING: %s" % warn)
4858

    
4859
    result = []
4860
    instance = self.instance
4861
    if self.do_ip:
4862
      instance.nics[0].ip = self.ip
4863
      result.append(("ip", self.ip))
4864
    if self.bridge:
4865
      instance.nics[0].bridge = self.bridge
4866
      result.append(("bridge", self.bridge))
4867
    if self.mac:
4868
      instance.nics[0].mac = self.mac
4869
      result.append(("mac", self.mac))
4870
    if self.op.hvparams:
4871
      instance.hvparams = self.hv_new
4872
      for key, val in self.op.hvparams.iteritems():
4873
        result.append(("hv/%s" % key, val))
4874
    if self.op.beparams:
4875
      instance.beparams = self.be_inst
4876
      for key, val in self.op.beparams.iteritems():
4877
        result.append(("be/%s" % key, val))
4878

    
4879
    self.cfg.Update(instance)
4880

    
4881
    return result
4882

    
4883

    
4884
class LUQueryExports(NoHooksLU):
4885
  """Query the exports list
4886

4887
  """
4888
  _OP_REQP = ['nodes']
4889
  REQ_BGL = False
4890

    
4891
  def ExpandNames(self):
4892
    self.needed_locks = {}
4893
    self.share_locks[locking.LEVEL_NODE] = 1
4894
    if not self.op.nodes:
4895
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4896
    else:
4897
      self.needed_locks[locking.LEVEL_NODE] = \
4898
        _GetWantedNodes(self, self.op.nodes)
4899

    
4900
  def CheckPrereq(self):
4901
    """Check prerequisites.
4902

4903
    """
4904
    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
4905

    
4906
  def Exec(self, feedback_fn):
4907
    """Compute the list of all the exported system images.
4908

4909
    @rtype: dict
4910
    @return: a dictionary with the structure node->(export-list)
4911
        where export-list is a list of the instances exported on
4912
        that node.
4913

4914
    """
4915
    return self.rpc.call_export_list(self.nodes)
4916

    
4917

    
4918
class LUExportInstance(LogicalUnit):
4919
  """Export an instance to an image in the cluster.
4920

4921
  """
4922
  HPATH = "instance-export"
4923
  HTYPE = constants.HTYPE_INSTANCE
4924
  _OP_REQP = ["instance_name", "target_node", "shutdown"]
4925
  REQ_BGL = False
4926

    
4927
  def ExpandNames(self):
4928
    self._ExpandAndLockInstance()
4929
    # FIXME: lock only instance primary and destination node
4930
    #
4931
    # Sad but true, for now we have do lock all nodes, as we don't know where
4932
    # the previous export might be, and and in this LU we search for it and
4933
    # remove it from its current node. In the future we could fix this by:
4934
    #  - making a tasklet to search (share-lock all), then create the new one,
4935
    #    then one to remove, after
4936
    #  - removing the removal operation altoghether
4937
    self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4938

    
4939
  def DeclareLocks(self, level):
4940
    """Last minute lock declaration."""
4941
    # All nodes are locked anyway, so nothing to do here.
4942

    
4943
  def BuildHooksEnv(self):
4944
    """Build hooks env.
4945

4946
    This will run on the master, primary node and target node.
4947

4948
    """
4949
    env = {
4950
      "EXPORT_NODE": self.op.target_node,
4951
      "EXPORT_DO_SHUTDOWN": self.op.shutdown,
4952
      }
4953
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
4954
    nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
4955
          self.op.target_node]
4956
    return env, nl, nl
4957

    
4958
  def CheckPrereq(self):
4959
    """Check prerequisites.
4960

4961
    This checks that the instance and node names are valid.
4962

4963
    """
4964
    instance_name = self.op.instance_name
4965
    self.instance = self.cfg.GetInstanceInfo(instance_name)
4966
    assert self.instance is not None, \
4967
          "Cannot retrieve locked instance %s" % self.op.instance_name
4968

    
4969
    self.dst_node = self.cfg.GetNodeInfo(
4970
      self.cfg.ExpandNodeName(self.op.target_node))
4971

    
4972
    assert self.dst_node is not None, \
4973
          "Cannot retrieve locked node %s" % self.op.target_node
4974

    
4975
    # instance disk type verification
4976
    for disk in self.instance.disks:
4977
      if disk.dev_type == constants.LD_FILE:
4978
        raise errors.OpPrereqError("Export not supported for instances with"
4979
                                   " file-based disks")
4980

    
4981
  def Exec(self, feedback_fn):
4982
    """Export an instance to an image in the cluster.
4983

4984
    """
4985
    instance = self.instance
4986
    dst_node = self.dst_node
4987
    src_node = instance.primary_node
4988
    if self.op.shutdown:
4989
      # shutdown the instance, but not the disks
4990
      if not self.rpc.call_instance_shutdown(src_node, instance):
4991
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
4992
                                 (instance.name, src_node))
4993

    
4994
    vgname = self.cfg.GetVGName()
4995

    
4996
    snap_disks = []
4997

    
4998
    try:
4999
      for disk in instance.disks:
5000
        # new_dev_name will be a snapshot of an lvm leaf of the one we passed
5001
        new_dev_name = self.rpc.call_blockdev_snapshot(src_node, disk)
5002

    
5003
        if not new_dev_name:
5004
          self.LogWarning("Could not snapshot block device %s on node %s",
5005
                          disk.logical_id[1], src_node)
5006
          snap_disks.append(False)
5007
        else:
5008
          new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
5009
                                 logical_id=(vgname, new_dev_name),
5010
                                 physical_id=(vgname, new_dev_name),
5011
                                 iv_name=disk.iv_name)
5012
          snap_disks.append(new_dev)
5013

    
5014
    finally:
5015
      if self.op.shutdown and instance.status == "up":
5016
        if not self.rpc.call_instance_start(src_node, instance, None):
5017
          _ShutdownInstanceDisks(self, instance)
5018
          raise errors.OpExecError("Could not start instance")
5019

    
5020
    # TODO: check for size
5021

    
5022
    cluster_name = self.cfg.GetClusterName()
5023
    for idx, dev in enumerate(snap_disks):
5024
      if dev:
5025
        if not self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
5026
                                             instance, cluster_name, idx):
5027
          self.LogWarning("Could not export block device %s from node %s to"
5028
                          " node %s", dev.logical_id[1], src_node,
5029
                          dst_node.name)
5030
        if not self.rpc.call_blockdev_remove(src_node, dev):
5031
          self.LogWarning("Could not remove snapshot block device %s from node"
5032
                          " %s", dev.logical_id[1], src_node)
5033

    
5034
    if not self.rpc.call_finalize_export(dst_node.name, instance, snap_disks):
5035
      self.LogWarning("Could not finalize export for instance %s on node %s",
5036
                      instance.name, dst_node.name)
5037

    
5038
    nodelist = self.cfg.GetNodeList()
5039
    nodelist.remove(dst_node.name)
5040

    
5041
    # on one-node clusters nodelist will be empty after the removal
5042
    # if we proceed the backup would be removed because OpQueryExports
5043
    # substitutes an empty list with the full cluster node list.
5044
    if nodelist:
5045
      exportlist = self.rpc.call_export_list(nodelist)
5046
      for node in exportlist:
5047
        if instance.name in exportlist[node]:
5048
          if not self.rpc.call_export_remove(node, instance.name):
5049
            self.LogWarning("Could not remove older export for instance %s"
5050
                            " on node %s", instance.name, node)
5051

    
5052

    
5053
class LURemoveExport(NoHooksLU):
5054
  """Remove exports related to the named instance.
5055

5056
  """
5057
  _OP_REQP = ["instance_name"]
5058
  REQ_BGL = False
5059

    
5060
  def ExpandNames(self):
5061
    self.needed_locks = {}
5062
    # We need all nodes to be locked in order for RemoveExport to work, but we
5063
    # don't need to lock the instance itself, as nothing will happen to it (and
5064
    # we can remove exports also for a removed instance)
5065
    self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5066

    
5067
  def CheckPrereq(self):
5068
    """Check prerequisites.
5069
    """
5070
    pass
5071

    
5072
  def Exec(self, feedback_fn):
5073
    """Remove any export.
5074

5075
    """
5076
    instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
5077
    # If the instance was not found we'll try with the name that was passed in.
5078
    # This will only work if it was an FQDN, though.
5079
    fqdn_warn = False
5080
    if not instance_name:
5081
      fqdn_warn = True
5082
      instance_name = self.op.instance_name
5083

    
5084
    exportlist = self.rpc.call_export_list(self.acquired_locks[
5085
      locking.LEVEL_NODE])
5086
    found = False
5087
    for node in exportlist:
5088
      if instance_name in exportlist[node]:
5089
        found = True
5090
        if not self.rpc.call_export_remove(node, instance_name):
5091
          logging.error("Could not remove export for instance %s"
5092
                        " on node %s", instance_name, node)
5093

    
5094
    if fqdn_warn and not found:
5095
      feedback_fn("Export not found. If trying to remove an export belonging"
5096
                  " to a deleted instance please use its Fully Qualified"
5097
                  " Domain Name.")
5098

    
5099

    
5100
class TagsLU(NoHooksLU):
5101
  """Generic tags LU.
5102

5103
  This is an abstract class which is the parent of all the other tags LUs.
5104

5105
  """
5106

    
5107
  def ExpandNames(self):
5108
    self.needed_locks = {}
5109
    if self.op.kind == constants.TAG_NODE:
5110
      name = self.cfg.ExpandNodeName(self.op.name)
5111
      if name is None:
5112
        raise errors.OpPrereqError("Invalid node name (%s)" %
5113
                                   (self.op.name,))
5114
      self.op.name = name
5115
      self.needed_locks[locking.LEVEL_NODE] = name
5116
    elif self.op.kind == constants.TAG_INSTANCE:
5117
      name = self.cfg.ExpandInstanceName(self.op.name)
5118
      if name is None:
5119
        raise errors.OpPrereqError("Invalid instance name (%s)" %
5120
                                   (self.op.name,))
5121
      self.op.name = name
5122
      self.needed_locks[locking.LEVEL_INSTANCE] = name
5123

    
5124
  def CheckPrereq(self):
5125
    """Check prerequisites.
5126

5127
    """
5128
    if self.op.kind == constants.TAG_CLUSTER:
5129
      self.target = self.cfg.GetClusterInfo()
5130
    elif self.op.kind == constants.TAG_NODE:
5131
      self.target = self.cfg.GetNodeInfo(self.op.name)
5132
    elif self.op.kind == constants.TAG_INSTANCE:
5133
      self.target = self.cfg.GetInstanceInfo(self.op.name)
5134
    else:
5135
      raise errors.OpPrereqError("Wrong tag type requested (%s)" %
5136
                                 str(self.op.kind))
5137

    
5138

    
5139
class LUGetTags(TagsLU):
5140
  """Returns the tags of a given object.
5141

5142
  """
5143
  _OP_REQP = ["kind", "name"]
5144
  REQ_BGL = False
5145

    
5146
  def Exec(self, feedback_fn):
5147
    """Returns the tag list.
5148

5149
    """
5150
    return list(self.target.GetTags())
5151

    
5152

    
5153
class LUSearchTags(NoHooksLU):
5154
  """Searches the tags for a given pattern.
5155

5156
  """
5157
  _OP_REQP = ["pattern"]
5158
  REQ_BGL = False
5159

    
5160
  def ExpandNames(self):
5161
    self.needed_locks = {}
5162

    
5163
  def CheckPrereq(self):
5164
    """Check prerequisites.
5165

5166
    This checks the pattern passed for validity by compiling it.
5167

5168
    """
5169
    try:
5170
      self.re = re.compile(self.op.pattern)
5171
    except re.error, err:
5172
      raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
5173
                                 (self.op.pattern, err))
5174

    
5175
  def Exec(self, feedback_fn):
5176
    """Returns the tag list.
5177

5178
    """
5179
    cfg = self.cfg
5180
    tgts = [("/cluster", cfg.GetClusterInfo())]
5181
    ilist = cfg.GetAllInstancesInfo().values()
5182
    tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
5183
    nlist = cfg.GetAllNodesInfo().values()
5184
    tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
5185
    results = []
5186
    for path, target in tgts:
5187
      for tag in target.GetTags():
5188
        if self.re.search(tag):
5189
          results.append((path, tag))
5190
    return results
5191

    
5192

    
5193
class LUAddTags(TagsLU):
5194
  """Sets a tag on a given object.
5195

5196
  """
5197
  _OP_REQP = ["kind", "name", "tags"]
5198
  REQ_BGL = False
5199

    
5200
  def CheckPrereq(self):
5201
    """Check prerequisites.
5202

5203
    This checks the type and length of the tag name and value.
5204

5205
    """
5206
    TagsLU.CheckPrereq(self)
5207
    for tag in self.op.tags:
5208
      objects.TaggableObject.ValidateTag(tag)
5209

    
5210
  def Exec(self, feedback_fn):
5211
    """Sets the tag.
5212

5213
    """
5214
    try:
5215
      for tag in self.op.tags:
5216
        self.target.AddTag(tag)
5217
    except errors.TagError, err:
5218
      raise errors.OpExecError("Error while setting tag: %s" % str(err))
5219
    try:
5220
      self.cfg.Update(self.target)
5221
    except errors.ConfigurationError:
5222
      raise errors.OpRetryError("There has been a modification to the"
5223
                                " config file and the operation has been"
5224
                                " aborted. Please retry.")
5225

    
5226

    
5227
class LUDelTags(TagsLU):
5228
  """Delete a list of tags from a given object.
5229

5230
  """
5231
  _OP_REQP = ["kind", "name", "tags"]
5232
  REQ_BGL = False
5233

    
5234
  def CheckPrereq(self):
5235
    """Check prerequisites.
5236

5237
    This checks that we have the given tag.
5238

5239
    """
5240
    TagsLU.CheckPrereq(self)
5241
    for tag in self.op.tags:
5242
      objects.TaggableObject.ValidateTag(tag)
5243
    del_tags = frozenset(self.op.tags)
5244
    cur_tags = self.target.GetTags()
5245
    if not del_tags <= cur_tags:
5246
      diff_tags = del_tags - cur_tags
5247
      diff_names = ["'%s'" % tag for tag in diff_tags]
5248
      diff_names.sort()
5249
      raise errors.OpPrereqError("Tag(s) %s not found" %
5250
                                 (",".join(diff_names)))
5251

    
5252
  def Exec(self, feedback_fn):
5253
    """Remove the tag from the object.
5254

5255
    """
5256
    for tag in self.op.tags:
5257
      self.target.RemoveTag(tag)
5258
    try:
5259
      self.cfg.Update(self.target)
5260
    except errors.ConfigurationError:
5261
      raise errors.OpRetryError("There has been a modification to the"
5262
                                " config file and the operation has been"
5263
                                " aborted. Please retry.")
5264

    
5265

    
5266
class LUTestDelay(NoHooksLU):
5267
  """Sleep for a specified amount of time.
5268

5269
  This LU sleeps on the master and/or nodes for a specified amount of
5270
  time.
5271

5272
  """
5273
  _OP_REQP = ["duration", "on_master", "on_nodes"]
5274
  REQ_BGL = False
5275

    
5276
  def ExpandNames(self):
5277
    """Expand names and set required locks.
5278

5279
    This expands the node list, if any.
5280

5281
    """
5282
    self.needed_locks = {}
5283
    if self.op.on_nodes:
5284
      # _GetWantedNodes can be used here, but is not always appropriate to use
5285
      # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
5286
      # more information.
5287
      self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
5288
      self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
5289

    
5290
  def CheckPrereq(self):
5291
    """Check prerequisites.
5292

5293
    """
5294

    
5295
  def Exec(self, feedback_fn):
5296
    """Do the actual sleep.
5297

5298
    """
5299
    if self.op.on_master:
5300
      if not utils.TestDelay(self.op.duration):
5301
        raise errors.OpExecError("Error during master delay test")
5302
    if self.op.on_nodes:
5303
      result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
5304
      if not result:
5305
        raise errors.OpExecError("Complete failure from rpc call")
5306
      for node, node_result in result.items():
5307
        if not node_result:
5308
          raise errors.OpExecError("Failure during rpc call to node %s,"
5309
                                   " result: %s" % (node, node_result))
5310

    
5311

    
5312
class IAllocator(object):
5313
  """IAllocator framework.
5314

5315
  An IAllocator instance has three sets of attributes:
5316
    - cfg that is needed to query the cluster
5317
    - input data (all members of the _KEYS class attribute are required)
5318
    - four buffer attributes (in|out_data|text), that represent the
5319
      input (to the external script) in text and data structure format,
5320
      and the output from it, again in two formats
5321
    - the result variables from the script (success, info, nodes) for
5322
      easy usage
5323

5324
  """
5325
  _ALLO_KEYS = [
5326
    "mem_size", "disks", "disk_template",
5327
    "os", "tags", "nics", "vcpus", "hypervisor",
5328
    ]
5329
  _RELO_KEYS = [
5330
    "relocate_from",
5331
    ]
5332

    
5333
  def __init__(self, lu, mode, name, **kwargs):
5334
    self.lu = lu
5335
    # init buffer variables
5336
    self.in_text = self.out_text = self.in_data = self.out_data = None
5337
    # init all input fields so that pylint is happy
5338
    self.mode = mode
5339
    self.name = name
5340
    self.mem_size = self.disks = self.disk_template = None
5341
    self.os = self.tags = self.nics = self.vcpus = None
5342
    self.relocate_from = None
5343
    # computed fields
5344
    self.required_nodes = None
5345
    # init result fields
5346
    self.success = self.info = self.nodes = None
5347
    if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5348
      keyset = self._ALLO_KEYS
5349
    elif self.mode == constants.IALLOCATOR_MODE_RELOC:
5350
      keyset = self._RELO_KEYS
5351
    else:
5352
      raise errors.ProgrammerError("Unknown mode '%s' passed to the"
5353
                                   " IAllocator" % self.mode)
5354
    for key in kwargs:
5355
      if key not in keyset:
5356
        raise errors.ProgrammerError("Invalid input parameter '%s' to"
5357
                                     " IAllocator" % key)
5358
      setattr(self, key, kwargs[key])
5359
    for key in keyset:
5360
      if key not in kwargs:
5361
        raise errors.ProgrammerError("Missing input parameter '%s' to"
5362
                                     " IAllocator" % key)
5363
    self._BuildInputData()
5364

    
5365
  def _ComputeClusterData(self):
5366
    """Compute the generic allocator input data.
5367

5368
    This is the data that is independent of the actual operation.
5369

5370
    """
5371
    cfg = self.lu.cfg
5372
    cluster_info = cfg.GetClusterInfo()
5373
    # cluster data
5374
    data = {
5375
      "version": 1,
5376
      "cluster_name": cfg.GetClusterName(),
5377
      "cluster_tags": list(cluster_info.GetTags()),
5378
      "enable_hypervisors": list(cluster_info.enabled_hypervisors),
5379
      # we don't have job IDs
5380
      }
5381
    iinfo = cfg.GetAllInstancesInfo().values()
5382
    i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo]
5383

    
5384
    # node data
5385
    node_results = {}
5386
    node_list = cfg.GetNodeList()
5387

    
5388
    if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5389
      hypervisor = self.hypervisor
5390
    elif self.mode == constants.IALLOCATOR_MODE_RELOC:
5391
      hypervisor = cfg.GetInstanceInfo(self.name).hypervisor
5392

    
5393
    node_data = self.lu.rpc.call_node_info(node_list, cfg.GetVGName(),
5394
                                           hypervisor)
5395
    node_iinfo = self.lu.rpc.call_all_instances_info(node_list,
5396
                       cluster_info.enabled_hypervisors)
5397
    for nname in node_list:
5398
      ninfo = cfg.GetNodeInfo(nname)
5399
      if nname not in node_data or not isinstance(node_data[nname], dict):
5400
        raise errors.OpExecError("Can't get data for node %s" % nname)
5401
      remote_info = node_data[nname]
5402
      for attr in ['memory_total', 'memory_free', 'memory_dom0',
5403
                   'vg_size', 'vg_free', 'cpu_total']:
5404
        if attr not in remote_info:
5405
          raise errors.OpExecError("Node '%s' didn't return attribute '%s'" %
5406
                                   (nname, attr))
5407
        try:
5408
          remote_info[attr] = int(remote_info[attr])
5409
        except ValueError, err:
5410
          raise errors.OpExecError("Node '%s' returned invalid value for '%s':"
5411
                                   " %s" % (nname, attr, str(err)))
5412
      # compute memory used by primary instances
5413
      i_p_mem = i_p_up_mem = 0
5414
      for iinfo, beinfo in i_list:
5415
        if iinfo.primary_node == nname:
5416
          i_p_mem += beinfo[constants.BE_MEMORY]
5417
          if iinfo.name not in node_iinfo[nname]:
5418
            i_used_mem = 0
5419
          else:
5420
            i_used_mem = int(node_iinfo[nname][iinfo.name]['memory'])
5421
          i_mem_diff = beinfo[constants.BE_MEMORY] - i_used_mem
5422
          remote_info['memory_free'] -= max(0, i_mem_diff)
5423

    
5424
          if iinfo.status == "up":
5425
            i_p_up_mem += beinfo[constants.BE_MEMORY]
5426

    
5427
      # compute memory used by instances
5428
      pnr = {
5429
        "tags": list(ninfo.GetTags()),
5430
        "total_memory": remote_info['memory_total'],
5431
        "reserved_memory": remote_info['memory_dom0'],
5432
        "free_memory": remote_info['memory_free'],
5433
        "i_pri_memory": i_p_mem,
5434
        "i_pri_up_memory": i_p_up_mem,
5435
        "total_disk": remote_info['vg_size'],
5436
        "free_disk": remote_info['vg_free'],
5437
        "primary_ip": ninfo.primary_ip,
5438
        "secondary_ip": ninfo.secondary_ip,
5439
        "total_cpus": remote_info['cpu_total'],
5440
        }
5441
      node_results[nname] = pnr
5442
    data["nodes"] = node_results
5443

    
5444
    # instance data
5445
    instance_data = {}
5446
    for iinfo, beinfo in i_list:
5447
      nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
5448
                  for n in iinfo.nics]
5449
      pir = {
5450
        "tags": list(iinfo.GetTags()),
5451
        "should_run": iinfo.status == "up",
5452
        "vcpus": beinfo[constants.BE_VCPUS],
5453
        "memory": beinfo[constants.BE_MEMORY],
5454
        "os": iinfo.os,
5455
        "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
5456
        "nics": nic_data,
5457
        "disks": [{"size": dsk.size, "mode": "w"} for dsk in iinfo.disks],
5458
        "disk_template": iinfo.disk_template,
5459
        "hypervisor": iinfo.hypervisor,
5460
        }
5461
      instance_data[iinfo.name] = pir
5462

    
5463
    data["instances"] = instance_data
5464

    
5465
    self.in_data = data
5466

    
5467
  def _AddNewInstance(self):
5468
    """Add new instance data to allocator structure.
5469

5470
    This in combination with _AllocatorGetClusterData will create the
5471
    correct structure needed as input for the allocator.
5472

5473
    The checks for the completeness of the opcode must have already been
5474
    done.
5475

5476
    """
5477
    data = self.in_data
5478
    if len(self.disks) != 2:
5479
      raise errors.OpExecError("Only two-disk configurations supported")
5480

    
5481
    disk_space = _ComputeDiskSize(self.disk_template, self.disks)
5482

    
5483
    if self.disk_template in constants.DTS_NET_MIRROR:
5484
      self.required_nodes = 2
5485
    else:
5486
      self.required_nodes = 1
5487
    request = {
5488
      "type": "allocate",
5489
      "name": self.name,
5490
      "disk_template": self.disk_template,
5491
      "tags": self.tags,
5492
      "os": self.os,
5493
      "vcpus": self.vcpus,
5494
      "memory": self.mem_size,
5495
      "disks": self.disks,
5496
      "disk_space_total": disk_space,
5497
      "nics": self.nics,
5498
      "required_nodes": self.required_nodes,
5499
      }
5500
    data["request"] = request
5501

    
5502
  def _AddRelocateInstance(self):
5503
    """Add relocate instance data to allocator structure.
5504

5505
    This in combination with _IAllocatorGetClusterData will create the
5506
    correct structure needed as input for the allocator.
5507

5508
    The checks for the completeness of the opcode must have already been
5509
    done.
5510

5511
    """
5512
    instance = self.lu.cfg.GetInstanceInfo(self.name)
5513
    if instance is None:
5514
      raise errors.ProgrammerError("Unknown instance '%s' passed to"
5515
                                   " IAllocator" % self.name)
5516

    
5517
    if instance.disk_template not in constants.DTS_NET_MIRROR:
5518
      raise errors.OpPrereqError("Can't relocate non-mirrored instances")
5519

    
5520
    if len(instance.secondary_nodes) != 1:
5521
      raise errors.OpPrereqError("Instance has not exactly one secondary node")
5522

    
5523
    self.required_nodes = 1
5524
    disk_sizes = [{'size': disk.size} for disk in instance.disks]
5525
    disk_space = _ComputeDiskSize(instance.disk_template, disk_sizes)
5526

    
5527
    request = {
5528
      "type": "relocate",
5529
      "name": self.name,
5530
      "disk_space_total": disk_space,
5531
      "required_nodes": self.required_nodes,
5532
      "relocate_from": self.relocate_from,
5533
      }
5534
    self.in_data["request"] = request
5535

    
5536
  def _BuildInputData(self):
5537
    """Build input data structures.
5538

5539
    """
5540
    self._ComputeClusterData()
5541

    
5542
    if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5543
      self._AddNewInstance()
5544
    else:
5545
      self._AddRelocateInstance()
5546

    
5547
    self.in_text = serializer.Dump(self.in_data)
5548

    
5549
  def Run(self, name, validate=True, call_fn=None):
5550
    """Run an instance allocator and return the results.
5551

5552
    """
5553
    if call_fn is None:
5554
      call_fn = self.lu.rpc.call_iallocator_runner
5555
    data = self.in_text
5556

    
5557
    result = call_fn(self.lu.cfg.GetMasterNode(), name, self.in_text)
5558

    
5559
    if not isinstance(result, (list, tuple)) or len(result) != 4:
5560
      raise errors.OpExecError("Invalid result from master iallocator runner")
5561

    
5562
    rcode, stdout, stderr, fail = result
5563

    
5564
    if rcode == constants.IARUN_NOTFOUND:
5565
      raise errors.OpExecError("Can't find allocator '%s'" % name)
5566
    elif rcode == constants.IARUN_FAILURE:
5567
      raise errors.OpExecError("Instance allocator call failed: %s,"
5568
                               " output: %s" % (fail, stdout+stderr))
5569
    self.out_text = stdout
5570
    if validate:
5571
      self._ValidateResult()
5572

    
5573
  def _ValidateResult(self):
5574
    """Process the allocator results.
5575

5576
    This will process and if successful save the result in
5577
    self.out_data and the other parameters.
5578

5579
    """
5580
    try:
5581
      rdict = serializer.Load(self.out_text)
5582
    except Exception, err:
5583
      raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
5584

    
5585
    if not isinstance(rdict, dict):
5586
      raise errors.OpExecError("Can't parse iallocator results: not a dict")
5587

    
5588
    for key in "success", "info", "nodes":
5589
      if key not in rdict:
5590
        raise errors.OpExecError("Can't parse iallocator results:"
5591
                                 " missing key '%s'" % key)
5592
      setattr(self, key, rdict[key])
5593

    
5594
    if not isinstance(rdict["nodes"], list):
5595
      raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
5596
                               " is not a list")
5597
    self.out_data = rdict
5598

    
5599

    
5600
class LUTestAllocator(NoHooksLU):
5601
  """Run allocator tests.
5602

5603
  This LU runs the allocator tests
5604

5605
  """
5606
  _OP_REQP = ["direction", "mode", "name"]
5607

    
5608
  def CheckPrereq(self):
5609
    """Check prerequisites.
5610

5611
    This checks the opcode parameters depending on the director and mode test.
5612

5613
    """
5614
    if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5615
      for attr in ["name", "mem_size", "disks", "disk_template",
5616
                   "os", "tags", "nics", "vcpus"]:
5617
        if not hasattr(self.op, attr):
5618
          raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
5619
                                     attr)
5620
      iname = self.cfg.ExpandInstanceName(self.op.name)
5621
      if iname is not None:
5622
        raise errors.OpPrereqError("Instance '%s' already in the cluster" %
5623
                                   iname)
5624
      if not isinstance(self.op.nics, list):
5625
        raise errors.OpPrereqError("Invalid parameter 'nics'")
5626
      for row in self.op.nics:
5627
        if (not isinstance(row, dict) or
5628
            "mac" not in row or
5629
            "ip" not in row or
5630
            "bridge" not in row):
5631
          raise errors.OpPrereqError("Invalid contents of the"
5632
                                     " 'nics' parameter")
5633
      if not isinstance(self.op.disks, list):
5634
        raise errors.OpPrereqError("Invalid parameter 'disks'")
5635
      if len(self.op.disks) != 2:
5636
        raise errors.OpPrereqError("Only two-disk configurations supported")
5637
      for row in self.op.disks:
5638
        if (not isinstance(row, dict) or
5639
            "size" not in row or
5640
            not isinstance(row["size"], int) or
5641
            "mode" not in row or
5642
            row["mode"] not in ['r', 'w']):
5643
          raise errors.OpPrereqError("Invalid contents of the"
5644
                                     " 'disks' parameter")
5645
      if self.op.hypervisor is None:
5646
        self.op.hypervisor = self.cfg.GetHypervisorType()
5647
    elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
5648
      if not hasattr(self.op, "name"):
5649
        raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
5650
      fname = self.cfg.ExpandInstanceName(self.op.name)
5651
      if fname is None:
5652
        raise errors.OpPrereqError("Instance '%s' not found for relocation" %
5653
                                   self.op.name)
5654
      self.op.name = fname
5655
      self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
5656
    else:
5657
      raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
5658
                                 self.op.mode)
5659

    
5660
    if self.op.direction == constants.IALLOCATOR_DIR_OUT:
5661
      if not hasattr(self.op, "allocator") or self.op.allocator is None:
5662
        raise errors.OpPrereqError("Missing allocator name")
5663
    elif self.op.direction != constants.IALLOCATOR_DIR_IN:
5664
      raise errors.OpPrereqError("Wrong allocator test '%s'" %
5665
                                 self.op.direction)
5666

    
5667
  def Exec(self, feedback_fn):
5668
    """Run the allocator test.
5669

5670
    """
5671
    if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5672
      ial = IAllocator(self,
5673
                       mode=self.op.mode,
5674
                       name=self.op.name,
5675
                       mem_size=self.op.mem_size,
5676
                       disks=self.op.disks,
5677
                       disk_template=self.op.disk_template,
5678
                       os=self.op.os,
5679
                       tags=self.op.tags,
5680
                       nics=self.op.nics,
5681
                       vcpus=self.op.vcpus,
5682
                       hypervisor=self.op.hypervisor,
5683
                       )
5684
    else:
5685
      ial = IAllocator(self,
5686
                       mode=self.op.mode,
5687
                       name=self.op.name,
5688
                       relocate_from=list(self.relocate_from),
5689
                       )
5690

    
5691
    if self.op.direction == constants.IALLOCATOR_DIR_IN:
5692
      result = ial.in_text
5693
    else:
5694
      ial.Run(self.op.allocator, validate=False)
5695
      result = ial.out_text
5696
    return result