Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ 24991749

History | View | Annotate | Download (203.2 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0613,W0201
25

    
26
import os
27
import os.path
28
import sha
29
import time
30
import tempfile
31
import re
32
import platform
33
import logging
34
import copy
35

    
36
from ganeti import ssh
37
from ganeti import utils
38
from ganeti import errors
39
from ganeti import hypervisor
40
from ganeti import locking
41
from ganeti import constants
42
from ganeti import objects
43
from ganeti import opcodes
44
from ganeti import serializer
45

    
46

    
47
class LogicalUnit(object):
48
  """Logical Unit base class.
49

50
  Subclasses must follow these rules:
51
    - implement ExpandNames
52
    - implement CheckPrereq
53
    - implement Exec
54
    - implement BuildHooksEnv
55
    - redefine HPATH and HTYPE
56
    - optionally redefine their run requirements:
57
        REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
58

59
  Note that all commands require root permissions.
60

61
  """
62
  HPATH = None
63
  HTYPE = None
64
  _OP_REQP = []
65
  REQ_BGL = True
66

    
67
  def __init__(self, processor, op, context, rpc):
68
    """Constructor for LogicalUnit.
69

70
    This needs to be overriden in derived classes in order to check op
71
    validity.
72

73
    """
74
    self.proc = processor
75
    self.op = op
76
    self.cfg = context.cfg
77
    self.context = context
78
    self.rpc = rpc
79
    # Dicts used to declare locking needs to mcpu
80
    self.needed_locks = None
81
    self.acquired_locks = {}
82
    self.share_locks = dict(((i, 0) for i in locking.LEVELS))
83
    self.add_locks = {}
84
    self.remove_locks = {}
85
    # Used to force good behavior when calling helper functions
86
    self.recalculate_locks = {}
87
    self.__ssh = None
88
    # logging
89
    self.LogWarning = processor.LogWarning
90
    self.LogInfo = processor.LogInfo
91

    
92
    for attr_name in self._OP_REQP:
93
      attr_val = getattr(op, attr_name, None)
94
      if attr_val is None:
95
        raise errors.OpPrereqError("Required parameter '%s' missing" %
96
                                   attr_name)
97
    self.CheckArguments()
98

    
99
  def __GetSSH(self):
100
    """Returns the SshRunner object
101

102
    """
103
    if not self.__ssh:
104
      self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
105
    return self.__ssh
106

    
107
  ssh = property(fget=__GetSSH)
108

    
109
  def CheckArguments(self):
110
    """Check syntactic validity for the opcode arguments.
111

112
    This method is for doing a simple syntactic check and ensure
113
    validity of opcode parameters, without any cluster-related
114
    checks. While the same can be accomplished in ExpandNames and/or
115
    CheckPrereq, doing these separate is better because:
116

117
      - ExpandNames is left as as purely a lock-related function
118
      - CheckPrereq is run after we have aquired locks (and possible
119
        waited for them)
120

121
    The function is allowed to change the self.op attribute so that
122
    later methods can no longer worry about missing parameters.
123

124
    """
125
    pass
126

    
127
  def ExpandNames(self):
128
    """Expand names for this LU.
129

130
    This method is called before starting to execute the opcode, and it should
131
    update all the parameters of the opcode to their canonical form (e.g. a
132
    short node name must be fully expanded after this method has successfully
133
    completed). This way locking, hooks, logging, ecc. can work correctly.
134

135
    LUs which implement this method must also populate the self.needed_locks
136
    member, as a dict with lock levels as keys, and a list of needed lock names
137
    as values. Rules:
138

139
      - use an empty dict if you don't need any lock
140
      - if you don't need any lock at a particular level omit that level
141
      - don't put anything for the BGL level
142
      - if you want all locks at a level use locking.ALL_SET as a value
143

144
    If you need to share locks (rather than acquire them exclusively) at one
145
    level you can modify self.share_locks, setting a true value (usually 1) for
146
    that level. By default locks are not shared.
147

148
    Examples::
149

150
      # Acquire all nodes and one instance
151
      self.needed_locks = {
152
        locking.LEVEL_NODE: locking.ALL_SET,
153
        locking.LEVEL_INSTANCE: ['instance1.example.tld'],
154
      }
155
      # Acquire just two nodes
156
      self.needed_locks = {
157
        locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
158
      }
159
      # Acquire no locks
160
      self.needed_locks = {} # No, you can't leave it to the default value None
161

162
    """
163
    # The implementation of this method is mandatory only if the new LU is
164
    # concurrent, so that old LUs don't need to be changed all at the same
165
    # time.
166
    if self.REQ_BGL:
167
      self.needed_locks = {} # Exclusive LUs don't need locks.
168
    else:
169
      raise NotImplementedError
170

    
171
  def DeclareLocks(self, level):
172
    """Declare LU locking needs for a level
173

174
    While most LUs can just declare their locking needs at ExpandNames time,
175
    sometimes there's the need to calculate some locks after having acquired
176
    the ones before. This function is called just before acquiring locks at a
177
    particular level, but after acquiring the ones at lower levels, and permits
178
    such calculations. It can be used to modify self.needed_locks, and by
179
    default it does nothing.
180

181
    This function is only called if you have something already set in
182
    self.needed_locks for the level.
183

184
    @param level: Locking level which is going to be locked
185
    @type level: member of ganeti.locking.LEVELS
186

187
    """
188

    
189
  def CheckPrereq(self):
190
    """Check prerequisites for this LU.
191

192
    This method should check that the prerequisites for the execution
193
    of this LU are fulfilled. It can do internode communication, but
194
    it should be idempotent - no cluster or system changes are
195
    allowed.
196

197
    The method should raise errors.OpPrereqError in case something is
198
    not fulfilled. Its return value is ignored.
199

200
    This method should also update all the parameters of the opcode to
201
    their canonical form if it hasn't been done by ExpandNames before.
202

203
    """
204
    raise NotImplementedError
205

    
206
  def Exec(self, feedback_fn):
207
    """Execute the LU.
208

209
    This method should implement the actual work. It should raise
210
    errors.OpExecError for failures that are somewhat dealt with in
211
    code, or expected.
212

213
    """
214
    raise NotImplementedError
215

    
216
  def BuildHooksEnv(self):
217
    """Build hooks environment for this LU.
218

219
    This method should return a three-node tuple consisting of: a dict
220
    containing the environment that will be used for running the
221
    specific hook for this LU, a list of node names on which the hook
222
    should run before the execution, and a list of node names on which
223
    the hook should run after the execution.
224

225
    The keys of the dict must not have 'GANETI_' prefixed as this will
226
    be handled in the hooks runner. Also note additional keys will be
227
    added by the hooks runner. If the LU doesn't define any
228
    environment, an empty dict (and not None) should be returned.
229

230
    No nodes should be returned as an empty list (and not None).
231

232
    Note that if the HPATH for a LU class is None, this function will
233
    not be called.
234

235
    """
236
    raise NotImplementedError
237

    
238
  def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
239
    """Notify the LU about the results of its hooks.
240

241
    This method is called every time a hooks phase is executed, and notifies
242
    the Logical Unit about the hooks' result. The LU can then use it to alter
243
    its result based on the hooks.  By default the method does nothing and the
244
    previous result is passed back unchanged but any LU can define it if it
245
    wants to use the local cluster hook-scripts somehow.
246

247
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
248
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
249
    @param hook_results: the results of the multi-node hooks rpc call
250
    @param feedback_fn: function used send feedback back to the caller
251
    @param lu_result: the previous Exec result this LU had, or None
252
        in the PRE phase
253
    @return: the new Exec result, based on the previous result
254
        and hook results
255

256
    """
257
    return lu_result
258

    
259
  def _ExpandAndLockInstance(self):
260
    """Helper function to expand and lock an instance.
261

262
    Many LUs that work on an instance take its name in self.op.instance_name
263
    and need to expand it and then declare the expanded name for locking. This
264
    function does it, and then updates self.op.instance_name to the expanded
265
    name. It also initializes needed_locks as a dict, if this hasn't been done
266
    before.
267

268
    """
269
    if self.needed_locks is None:
270
      self.needed_locks = {}
271
    else:
272
      assert locking.LEVEL_INSTANCE not in self.needed_locks, \
273
        "_ExpandAndLockInstance called with instance-level locks set"
274
    expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
275
    if expanded_name is None:
276
      raise errors.OpPrereqError("Instance '%s' not known" %
277
                                  self.op.instance_name)
278
    self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
279
    self.op.instance_name = expanded_name
280

    
281
  def _LockInstancesNodes(self, primary_only=False):
282
    """Helper function to declare instances' nodes for locking.
283

284
    This function should be called after locking one or more instances to lock
285
    their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
286
    with all primary or secondary nodes for instances already locked and
287
    present in self.needed_locks[locking.LEVEL_INSTANCE].
288

289
    It should be called from DeclareLocks, and for safety only works if
290
    self.recalculate_locks[locking.LEVEL_NODE] is set.
291

292
    In the future it may grow parameters to just lock some instance's nodes, or
293
    to just lock primaries or secondary nodes, if needed.
294

295
    If should be called in DeclareLocks in a way similar to::
296

297
      if level == locking.LEVEL_NODE:
298
        self._LockInstancesNodes()
299

300
    @type primary_only: boolean
301
    @param primary_only: only lock primary nodes of locked instances
302

303
    """
304
    assert locking.LEVEL_NODE in self.recalculate_locks, \
305
      "_LockInstancesNodes helper function called with no nodes to recalculate"
306

    
307
    # TODO: check if we're really been called with the instance locks held
308

    
309
    # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
310
    # future we might want to have different behaviors depending on the value
311
    # of self.recalculate_locks[locking.LEVEL_NODE]
312
    wanted_nodes = []
313
    for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
314
      instance = self.context.cfg.GetInstanceInfo(instance_name)
315
      wanted_nodes.append(instance.primary_node)
316
      if not primary_only:
317
        wanted_nodes.extend(instance.secondary_nodes)
318

    
319
    if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
320
      self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
321
    elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
322
      self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
323

    
324
    del self.recalculate_locks[locking.LEVEL_NODE]
325

    
326

    
327
class NoHooksLU(LogicalUnit):
328
  """Simple LU which runs no hooks.
329

330
  This LU is intended as a parent for other LogicalUnits which will
331
  run no hooks, in order to reduce duplicate code.
332

333
  """
334
  HPATH = None
335
  HTYPE = None
336

    
337

    
338
def _GetWantedNodes(lu, nodes):
339
  """Returns list of checked and expanded node names.
340

341
  @type lu: L{LogicalUnit}
342
  @param lu: the logical unit on whose behalf we execute
343
  @type nodes: list
344
  @param nodes: list of node names or None for all nodes
345
  @rtype: list
346
  @return: the list of nodes, sorted
347
  @raise errors.OpProgrammerError: if the nodes parameter is wrong type
348

349
  """
350
  if not isinstance(nodes, list):
351
    raise errors.OpPrereqError("Invalid argument type 'nodes'")
352

    
353
  if not nodes:
354
    raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
355
      " non-empty list of nodes whose name is to be expanded.")
356

    
357
  wanted = []
358
  for name in nodes:
359
    node = lu.cfg.ExpandNodeName(name)
360
    if node is None:
361
      raise errors.OpPrereqError("No such node name '%s'" % name)
362
    wanted.append(node)
363

    
364
  return utils.NiceSort(wanted)
365

    
366

    
367
def _GetWantedInstances(lu, instances):
368
  """Returns list of checked and expanded instance names.
369

370
  @type lu: L{LogicalUnit}
371
  @param lu: the logical unit on whose behalf we execute
372
  @type instances: list
373
  @param instances: list of instance names or None for all instances
374
  @rtype: list
375
  @return: the list of instances, sorted
376
  @raise errors.OpPrereqError: if the instances parameter is wrong type
377
  @raise errors.OpPrereqError: if any of the passed instances is not found
378

379
  """
380
  if not isinstance(instances, list):
381
    raise errors.OpPrereqError("Invalid argument type 'instances'")
382

    
383
  if instances:
384
    wanted = []
385

    
386
    for name in instances:
387
      instance = lu.cfg.ExpandInstanceName(name)
388
      if instance is None:
389
        raise errors.OpPrereqError("No such instance name '%s'" % name)
390
      wanted.append(instance)
391

    
392
  else:
393
    wanted = lu.cfg.GetInstanceList()
394
  return utils.NiceSort(wanted)
395

    
396

    
397
def _CheckOutputFields(static, dynamic, selected):
398
  """Checks whether all selected fields are valid.
399

400
  @type static: L{utils.FieldSet}
401
  @param static: static fields set
402
  @type dynamic: L{utils.FieldSet}
403
  @param dynamic: dynamic fields set
404

405
  """
406
  f = utils.FieldSet()
407
  f.Extend(static)
408
  f.Extend(dynamic)
409

    
410
  delta = f.NonMatching(selected)
411
  if delta:
412
    raise errors.OpPrereqError("Unknown output fields selected: %s"
413
                               % ",".join(delta))
414

    
415

    
416
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
417
                          memory, vcpus, nics):
418
  """Builds instance related env variables for hooks
419

420
  This builds the hook environment from individual variables.
421

422
  @type name: string
423
  @param name: the name of the instance
424
  @type primary_node: string
425
  @param primary_node: the name of the instance's primary node
426
  @type secondary_nodes: list
427
  @param secondary_nodes: list of secondary nodes as strings
428
  @type os_type: string
429
  @param os_type: the name of the instance's OS
430
  @type status: string
431
  @param status: the desired status of the instances
432
  @type memory: string
433
  @param memory: the memory size of the instance
434
  @type vcpus: string
435
  @param vcpus: the count of VCPUs the instance has
436
  @type nics: list
437
  @param nics: list of tuples (ip, bridge, mac) representing
438
      the NICs the instance  has
439
  @rtype: dict
440
  @return: the hook environment for this instance
441

442
  """
443
  env = {
444
    "OP_TARGET": name,
445
    "INSTANCE_NAME": name,
446
    "INSTANCE_PRIMARY": primary_node,
447
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
448
    "INSTANCE_OS_TYPE": os_type,
449
    "INSTANCE_STATUS": status,
450
    "INSTANCE_MEMORY": memory,
451
    "INSTANCE_VCPUS": vcpus,
452
  }
453

    
454
  if nics:
455
    nic_count = len(nics)
456
    for idx, (ip, bridge, mac) in enumerate(nics):
457
      if ip is None:
458
        ip = ""
459
      env["INSTANCE_NIC%d_IP" % idx] = ip
460
      env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
461
      env["INSTANCE_NIC%d_HWADDR" % idx] = mac
462
  else:
463
    nic_count = 0
464

    
465
  env["INSTANCE_NIC_COUNT"] = nic_count
466

    
467
  return env
468

    
469

    
470
def _BuildInstanceHookEnvByObject(lu, instance, override=None):
471
  """Builds instance related env variables for hooks from an object.
472

473
  @type lu: L{LogicalUnit}
474
  @param lu: the logical unit on whose behalf we execute
475
  @type instance: L{objects.Instance}
476
  @param instance: the instance for which we should build the
477
      environment
478
  @type override: dict
479
  @param override: dictionary with key/values that will override
480
      our values
481
  @rtype: dict
482
  @return: the hook environment dictionary
483

484
  """
485
  bep = lu.cfg.GetClusterInfo().FillBE(instance)
486
  args = {
487
    'name': instance.name,
488
    'primary_node': instance.primary_node,
489
    'secondary_nodes': instance.secondary_nodes,
490
    'os_type': instance.os,
491
    'status': instance.os,
492
    'memory': bep[constants.BE_MEMORY],
493
    'vcpus': bep[constants.BE_VCPUS],
494
    'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
495
  }
496
  if override:
497
    args.update(override)
498
  return _BuildInstanceHookEnv(**args)
499

    
500

    
501
def _CheckInstanceBridgesExist(lu, instance):
502
  """Check that the brigdes needed by an instance exist.
503

504
  """
505
  # check bridges existance
506
  brlist = [nic.bridge for nic in instance.nics]
507
  if not lu.rpc.call_bridges_exist(instance.primary_node, brlist):
508
    raise errors.OpPrereqError("one or more target bridges %s does not"
509
                               " exist on destination node '%s'" %
510
                               (brlist, instance.primary_node))
511

    
512

    
513
class LUDestroyCluster(NoHooksLU):
514
  """Logical unit for destroying the cluster.
515

516
  """
517
  _OP_REQP = []
518

    
519
  def CheckPrereq(self):
520
    """Check prerequisites.
521

522
    This checks whether the cluster is empty.
523

524
    Any errors are signalled by raising errors.OpPrereqError.
525

526
    """
527
    master = self.cfg.GetMasterNode()
528

    
529
    nodelist = self.cfg.GetNodeList()
530
    if len(nodelist) != 1 or nodelist[0] != master:
531
      raise errors.OpPrereqError("There are still %d node(s) in"
532
                                 " this cluster." % (len(nodelist) - 1))
533
    instancelist = self.cfg.GetInstanceList()
534
    if instancelist:
535
      raise errors.OpPrereqError("There are still %d instance(s) in"
536
                                 " this cluster." % len(instancelist))
537

    
538
  def Exec(self, feedback_fn):
539
    """Destroys the cluster.
540

541
    """
542
    master = self.cfg.GetMasterNode()
543
    if not self.rpc.call_node_stop_master(master, False):
544
      raise errors.OpExecError("Could not disable the master role")
545
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
546
    utils.CreateBackup(priv_key)
547
    utils.CreateBackup(pub_key)
548
    return master
549

    
550

    
551
class LUVerifyCluster(LogicalUnit):
552
  """Verifies the cluster status.
553

554
  """
555
  HPATH = "cluster-verify"
556
  HTYPE = constants.HTYPE_CLUSTER
557
  _OP_REQP = ["skip_checks"]
558
  REQ_BGL = False
559

    
560
  def ExpandNames(self):
561
    self.needed_locks = {
562
      locking.LEVEL_NODE: locking.ALL_SET,
563
      locking.LEVEL_INSTANCE: locking.ALL_SET,
564
    }
565
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
566

    
567
  def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
568
                  remote_version, feedback_fn):
569
    """Run multiple tests against a node.
570

571
    Test list::
572

573
      - compares ganeti version
574
      - checks vg existance and size > 20G
575
      - checks config file checksum
576
      - checks ssh to other nodes
577

578
    @type node: string
579
    @param node: the name of the node to check
580
    @param file_list: required list of files
581
    @param local_cksum: dictionary of local files and their checksums
582
    @type vglist: dict
583
    @param vglist: dictionary of volume group names and their size
584
    @param node_result: the results from the node
585
    @param remote_version: the RPC version from the remote node
586
    @param feedback_fn: function used to accumulate results
587

588
    """
589
    # compares ganeti version
590
    local_version = constants.PROTOCOL_VERSION
591
    if not remote_version:
592
      feedback_fn("  - ERROR: connection to %s failed" % (node))
593
      return True
594

    
595
    if local_version != remote_version:
596
      feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
597
                      (local_version, node, remote_version))
598
      return True
599

    
600
    # checks vg existance and size > 20G
601

    
602
    bad = False
603
    if not vglist:
604
      feedback_fn("  - ERROR: unable to check volume groups on node %s." %
605
                      (node,))
606
      bad = True
607
    else:
608
      vgstatus = utils.CheckVolumeGroupSize(vglist, self.cfg.GetVGName(),
609
                                            constants.MIN_VG_SIZE)
610
      if vgstatus:
611
        feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
612
        bad = True
613

    
614
    if not node_result:
615
      feedback_fn("  - ERROR: unable to verify node %s." % (node,))
616
      return True
617

    
618
    # checks config file checksum
619
    # checks ssh to any
620

    
621
    if 'filelist' not in node_result:
622
      bad = True
623
      feedback_fn("  - ERROR: node hasn't returned file checksum data")
624
    else:
625
      remote_cksum = node_result['filelist']
626
      for file_name in file_list:
627
        if file_name not in remote_cksum:
628
          bad = True
629
          feedback_fn("  - ERROR: file '%s' missing" % file_name)
630
        elif remote_cksum[file_name] != local_cksum[file_name]:
631
          bad = True
632
          feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
633

    
634
    if 'nodelist' not in node_result:
635
      bad = True
636
      feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
637
    else:
638
      if node_result['nodelist']:
639
        bad = True
640
        for node in node_result['nodelist']:
641
          feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
642
                          (node, node_result['nodelist'][node]))
643
    if 'node-net-test' not in node_result:
644
      bad = True
645
      feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
646
    else:
647
      if node_result['node-net-test']:
648
        bad = True
649
        nlist = utils.NiceSort(node_result['node-net-test'].keys())
650
        for node in nlist:
651
          feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
652
                          (node, node_result['node-net-test'][node]))
653

    
654
    hyp_result = node_result.get('hypervisor', None)
655
    if isinstance(hyp_result, dict):
656
      for hv_name, hv_result in hyp_result.iteritems():
657
        if hv_result is not None:
658
          feedback_fn("  - ERROR: hypervisor %s verify failure: '%s'" %
659
                      (hv_name, hv_result))
660
    return bad
661

    
662
  def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
663
                      node_instance, feedback_fn):
664
    """Verify an instance.
665

666
    This function checks to see if the required block devices are
667
    available on the instance's node.
668

669
    """
670
    bad = False
671

    
672
    node_current = instanceconfig.primary_node
673

    
674
    node_vol_should = {}
675
    instanceconfig.MapLVsByNode(node_vol_should)
676

    
677
    for node in node_vol_should:
678
      for volume in node_vol_should[node]:
679
        if node not in node_vol_is or volume not in node_vol_is[node]:
680
          feedback_fn("  - ERROR: volume %s missing on node %s" %
681
                          (volume, node))
682
          bad = True
683

    
684
    if not instanceconfig.status == 'down':
685
      if (node_current not in node_instance or
686
          not instance in node_instance[node_current]):
687
        feedback_fn("  - ERROR: instance %s not running on node %s" %
688
                        (instance, node_current))
689
        bad = True
690

    
691
    for node in node_instance:
692
      if (not node == node_current):
693
        if instance in node_instance[node]:
694
          feedback_fn("  - ERROR: instance %s should not run on node %s" %
695
                          (instance, node))
696
          bad = True
697

    
698
    return bad
699

    
700
  def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
701
    """Verify if there are any unknown volumes in the cluster.
702

703
    The .os, .swap and backup volumes are ignored. All other volumes are
704
    reported as unknown.
705

706
    """
707
    bad = False
708

    
709
    for node in node_vol_is:
710
      for volume in node_vol_is[node]:
711
        if node not in node_vol_should or volume not in node_vol_should[node]:
712
          feedback_fn("  - ERROR: volume %s on node %s should not exist" %
713
                      (volume, node))
714
          bad = True
715
    return bad
716

    
717
  def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
718
    """Verify the list of running instances.
719

720
    This checks what instances are running but unknown to the cluster.
721

722
    """
723
    bad = False
724
    for node in node_instance:
725
      for runninginstance in node_instance[node]:
726
        if runninginstance not in instancelist:
727
          feedback_fn("  - ERROR: instance %s on node %s should not exist" %
728
                          (runninginstance, node))
729
          bad = True
730
    return bad
731

    
732
  def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
733
    """Verify N+1 Memory Resilience.
734

735
    Check that if one single node dies we can still start all the instances it
736
    was primary for.
737

738
    """
739
    bad = False
740

    
741
    for node, nodeinfo in node_info.iteritems():
742
      # This code checks that every node which is now listed as secondary has
743
      # enough memory to host all instances it is supposed to should a single
744
      # other node in the cluster fail.
745
      # FIXME: not ready for failover to an arbitrary node
746
      # FIXME: does not support file-backed instances
747
      # WARNING: we currently take into account down instances as well as up
748
      # ones, considering that even if they're down someone might want to start
749
      # them even in the event of a node failure.
750
      for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
751
        needed_mem = 0
752
        for instance in instances:
753
          bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
754
          if bep[constants.BE_AUTO_BALANCE]:
755
            needed_mem += bep[constants.BE_MEMORY]
756
        if nodeinfo['mfree'] < needed_mem:
757
          feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
758
                      " failovers should node %s fail" % (node, prinode))
759
          bad = True
760
    return bad
761

    
762
  def CheckPrereq(self):
763
    """Check prerequisites.
764

765
    Transform the list of checks we're going to skip into a set and check that
766
    all its members are valid.
767

768
    """
769
    self.skip_set = frozenset(self.op.skip_checks)
770
    if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
771
      raise errors.OpPrereqError("Invalid checks to be skipped specified")
772

    
773
  def BuildHooksEnv(self):
774
    """Build hooks env.
775

776
    Cluster-Verify hooks just rone in the post phase and their failure makes
777
    the output be logged in the verify output and the verification to fail.
778

779
    """
780
    all_nodes = self.cfg.GetNodeList()
781
    # TODO: populate the environment with useful information for verify hooks
782
    env = {}
783
    return env, [], all_nodes
784

    
785
  def Exec(self, feedback_fn):
786
    """Verify integrity of cluster, performing various test on nodes.
787

788
    """
789
    bad = False
790
    feedback_fn("* Verifying global settings")
791
    for msg in self.cfg.VerifyConfig():
792
      feedback_fn("  - ERROR: %s" % msg)
793

    
794
    vg_name = self.cfg.GetVGName()
795
    hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
796
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
797
    nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
798
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
799
    i_non_redundant = [] # Non redundant instances
800
    i_non_a_balanced = [] # Non auto-balanced instances
801
    node_volume = {}
802
    node_instance = {}
803
    node_info = {}
804
    instance_cfg = {}
805

    
806
    # FIXME: verify OS list
807
    # do local checksums
808
    file_names = []
809
    file_names.append(constants.SSL_CERT_FILE)
810
    file_names.append(constants.CLUSTER_CONF_FILE)
811
    local_checksums = utils.FingerprintFiles(file_names)
812

    
813
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
814
    all_volumeinfo = self.rpc.call_volume_list(nodelist, vg_name)
815
    all_instanceinfo = self.rpc.call_instance_list(nodelist, hypervisors)
816
    all_vglist = self.rpc.call_vg_list(nodelist)
817
    node_verify_param = {
818
      'filelist': file_names,
819
      'nodelist': nodelist,
820
      'hypervisor': hypervisors,
821
      'node-net-test': [(node.name, node.primary_ip, node.secondary_ip)
822
                        for node in nodeinfo]
823
      }
824
    all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
825
                                           self.cfg.GetClusterName())
826
    all_rversion = self.rpc.call_version(nodelist)
827
    all_ninfo = self.rpc.call_node_info(nodelist, self.cfg.GetVGName(),
828
                                        self.cfg.GetHypervisorType())
829

    
830
    cluster = self.cfg.GetClusterInfo()
831
    for node in nodelist:
832
      feedback_fn("* Verifying node %s" % node)
833
      result = self._VerifyNode(node, file_names, local_checksums,
834
                                all_vglist[node], all_nvinfo[node],
835
                                all_rversion[node], feedback_fn)
836
      bad = bad or result
837

    
838
      # node_volume
839
      volumeinfo = all_volumeinfo[node]
840

    
841
      if isinstance(volumeinfo, basestring):
842
        feedback_fn("  - ERROR: LVM problem on node %s: %s" %
843
                    (node, volumeinfo[-400:].encode('string_escape')))
844
        bad = True
845
        node_volume[node] = {}
846
      elif not isinstance(volumeinfo, dict):
847
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
848
        bad = True
849
        continue
850
      else:
851
        node_volume[node] = volumeinfo
852

    
853
      # node_instance
854
      nodeinstance = all_instanceinfo[node]
855
      if type(nodeinstance) != list:
856
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
857
        bad = True
858
        continue
859

    
860
      node_instance[node] = nodeinstance
861

    
862
      # node_info
863
      nodeinfo = all_ninfo[node]
864
      if not isinstance(nodeinfo, dict):
865
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
866
        bad = True
867
        continue
868

    
869
      try:
870
        node_info[node] = {
871
          "mfree": int(nodeinfo['memory_free']),
872
          "dfree": int(nodeinfo['vg_free']),
873
          "pinst": [],
874
          "sinst": [],
875
          # dictionary holding all instances this node is secondary for,
876
          # grouped by their primary node. Each key is a cluster node, and each
877
          # value is a list of instances which have the key as primary and the
878
          # current node as secondary.  this is handy to calculate N+1 memory
879
          # availability if you can only failover from a primary to its
880
          # secondary.
881
          "sinst-by-pnode": {},
882
        }
883
      except ValueError:
884
        feedback_fn("  - ERROR: invalid value returned from node %s" % (node,))
885
        bad = True
886
        continue
887

    
888
    node_vol_should = {}
889

    
890
    for instance in instancelist:
891
      feedback_fn("* Verifying instance %s" % instance)
892
      inst_config = self.cfg.GetInstanceInfo(instance)
893
      result =  self._VerifyInstance(instance, inst_config, node_volume,
894
                                     node_instance, feedback_fn)
895
      bad = bad or result
896

    
897
      inst_config.MapLVsByNode(node_vol_should)
898

    
899
      instance_cfg[instance] = inst_config
900

    
901
      pnode = inst_config.primary_node
902
      if pnode in node_info:
903
        node_info[pnode]['pinst'].append(instance)
904
      else:
905
        feedback_fn("  - ERROR: instance %s, connection to primary node"
906
                    " %s failed" % (instance, pnode))
907
        bad = True
908

    
909
      # If the instance is non-redundant we cannot survive losing its primary
910
      # node, so we are not N+1 compliant. On the other hand we have no disk
911
      # templates with more than one secondary so that situation is not well
912
      # supported either.
913
      # FIXME: does not support file-backed instances
914
      if len(inst_config.secondary_nodes) == 0:
915
        i_non_redundant.append(instance)
916
      elif len(inst_config.secondary_nodes) > 1:
917
        feedback_fn("  - WARNING: multiple secondaries for instance %s"
918
                    % instance)
919

    
920
      if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
921
        i_non_a_balanced.append(instance)
922

    
923
      for snode in inst_config.secondary_nodes:
924
        if snode in node_info:
925
          node_info[snode]['sinst'].append(instance)
926
          if pnode not in node_info[snode]['sinst-by-pnode']:
927
            node_info[snode]['sinst-by-pnode'][pnode] = []
928
          node_info[snode]['sinst-by-pnode'][pnode].append(instance)
929
        else:
930
          feedback_fn("  - ERROR: instance %s, connection to secondary node"
931
                      " %s failed" % (instance, snode))
932

    
933
    feedback_fn("* Verifying orphan volumes")
934
    result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
935
                                       feedback_fn)
936
    bad = bad or result
937

    
938
    feedback_fn("* Verifying remaining instances")
939
    result = self._VerifyOrphanInstances(instancelist, node_instance,
940
                                         feedback_fn)
941
    bad = bad or result
942

    
943
    if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
944
      feedback_fn("* Verifying N+1 Memory redundancy")
945
      result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
946
      bad = bad or result
947

    
948
    feedback_fn("* Other Notes")
949
    if i_non_redundant:
950
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
951
                  % len(i_non_redundant))
952

    
953
    if i_non_a_balanced:
954
      feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
955
                  % len(i_non_a_balanced))
956

    
957
    return not bad
958

    
959
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
960
    """Analize the post-hooks' result
961

962
    This method analyses the hook result, handles it, and sends some
963
    nicely-formatted feedback back to the user.
964

965
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
966
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
967
    @param hooks_results: the results of the multi-node hooks rpc call
968
    @param feedback_fn: function used send feedback back to the caller
969
    @param lu_result: previous Exec result
970
    @return: the new Exec result, based on the previous result
971
        and hook results
972

973
    """
974
    # We only really run POST phase hooks, and are only interested in
975
    # their results
976
    if phase == constants.HOOKS_PHASE_POST:
977
      # Used to change hooks' output to proper indentation
978
      indent_re = re.compile('^', re.M)
979
      feedback_fn("* Hooks Results")
980
      if not hooks_results:
981
        feedback_fn("  - ERROR: general communication failure")
982
        lu_result = 1
983
      else:
984
        for node_name in hooks_results:
985
          show_node_header = True
986
          res = hooks_results[node_name]
987
          if res is False or not isinstance(res, list):
988
            feedback_fn("    Communication failure")
989
            lu_result = 1
990
            continue
991
          for script, hkr, output in res:
992
            if hkr == constants.HKR_FAIL:
993
              # The node header is only shown once, if there are
994
              # failing hooks on that node
995
              if show_node_header:
996
                feedback_fn("  Node %s:" % node_name)
997
                show_node_header = False
998
              feedback_fn("    ERROR: Script %s failed, output:" % script)
999
              output = indent_re.sub('      ', output)
1000
              feedback_fn("%s" % output)
1001
              lu_result = 1
1002

    
1003
      return lu_result
1004

    
1005

    
1006
class LUVerifyDisks(NoHooksLU):
1007
  """Verifies the cluster disks status.
1008

1009
  """
1010
  _OP_REQP = []
1011
  REQ_BGL = False
1012

    
1013
  def ExpandNames(self):
1014
    self.needed_locks = {
1015
      locking.LEVEL_NODE: locking.ALL_SET,
1016
      locking.LEVEL_INSTANCE: locking.ALL_SET,
1017
    }
1018
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1019

    
1020
  def CheckPrereq(self):
1021
    """Check prerequisites.
1022

1023
    This has no prerequisites.
1024

1025
    """
1026
    pass
1027

    
1028
  def Exec(self, feedback_fn):
1029
    """Verify integrity of cluster disks.
1030

1031
    """
1032
    result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
1033

    
1034
    vg_name = self.cfg.GetVGName()
1035
    nodes = utils.NiceSort(self.cfg.GetNodeList())
1036
    instances = [self.cfg.GetInstanceInfo(name)
1037
                 for name in self.cfg.GetInstanceList()]
1038

    
1039
    nv_dict = {}
1040
    for inst in instances:
1041
      inst_lvs = {}
1042
      if (inst.status != "up" or
1043
          inst.disk_template not in constants.DTS_NET_MIRROR):
1044
        continue
1045
      inst.MapLVsByNode(inst_lvs)
1046
      # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1047
      for node, vol_list in inst_lvs.iteritems():
1048
        for vol in vol_list:
1049
          nv_dict[(node, vol)] = inst
1050

    
1051
    if not nv_dict:
1052
      return result
1053

    
1054
    node_lvs = self.rpc.call_volume_list(nodes, vg_name)
1055

    
1056
    to_act = set()
1057
    for node in nodes:
1058
      # node_volume
1059
      lvs = node_lvs[node]
1060

    
1061
      if isinstance(lvs, basestring):
1062
        logging.warning("Error enumerating LVs on node %s: %s", node, lvs)
1063
        res_nlvm[node] = lvs
1064
      elif not isinstance(lvs, dict):
1065
        logging.warning("Connection to node %s failed or invalid data"
1066
                        " returned", node)
1067
        res_nodes.append(node)
1068
        continue
1069

    
1070
      for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
1071
        inst = nv_dict.pop((node, lv_name), None)
1072
        if (not lv_online and inst is not None
1073
            and inst.name not in res_instances):
1074
          res_instances.append(inst.name)
1075

    
1076
    # any leftover items in nv_dict are missing LVs, let's arrange the
1077
    # data better
1078
    for key, inst in nv_dict.iteritems():
1079
      if inst.name not in res_missing:
1080
        res_missing[inst.name] = []
1081
      res_missing[inst.name].append(key)
1082

    
1083
    return result
1084

    
1085

    
1086
class LURenameCluster(LogicalUnit):
1087
  """Rename the cluster.
1088

1089
  """
1090
  HPATH = "cluster-rename"
1091
  HTYPE = constants.HTYPE_CLUSTER
1092
  _OP_REQP = ["name"]
1093

    
1094
  def BuildHooksEnv(self):
1095
    """Build hooks env.
1096

1097
    """
1098
    env = {
1099
      "OP_TARGET": self.cfg.GetClusterName(),
1100
      "NEW_NAME": self.op.name,
1101
      }
1102
    mn = self.cfg.GetMasterNode()
1103
    return env, [mn], [mn]
1104

    
1105
  def CheckPrereq(self):
1106
    """Verify that the passed name is a valid one.
1107

1108
    """
1109
    hostname = utils.HostInfo(self.op.name)
1110

    
1111
    new_name = hostname.name
1112
    self.ip = new_ip = hostname.ip
1113
    old_name = self.cfg.GetClusterName()
1114
    old_ip = self.cfg.GetMasterIP()
1115
    if new_name == old_name and new_ip == old_ip:
1116
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
1117
                                 " cluster has changed")
1118
    if new_ip != old_ip:
1119
      if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1120
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
1121
                                   " reachable on the network. Aborting." %
1122
                                   new_ip)
1123

    
1124
    self.op.name = new_name
1125

    
1126
  def Exec(self, feedback_fn):
1127
    """Rename the cluster.
1128

1129
    """
1130
    clustername = self.op.name
1131
    ip = self.ip
1132

    
1133
    # shutdown the master IP
1134
    master = self.cfg.GetMasterNode()
1135
    if not self.rpc.call_node_stop_master(master, False):
1136
      raise errors.OpExecError("Could not disable the master role")
1137

    
1138
    try:
1139
      # modify the sstore
1140
      # TODO: sstore
1141
      ss.SetKey(ss.SS_MASTER_IP, ip)
1142
      ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
1143

    
1144
      # Distribute updated ss config to all nodes
1145
      myself = self.cfg.GetNodeInfo(master)
1146
      dist_nodes = self.cfg.GetNodeList()
1147
      if myself.name in dist_nodes:
1148
        dist_nodes.remove(myself.name)
1149

    
1150
      logging.debug("Copying updated ssconf data to all nodes")
1151
      for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1152
        fname = ss.KeyToFilename(keyname)
1153
        result = self.rpc.call_upload_file(dist_nodes, fname)
1154
        for to_node in dist_nodes:
1155
          if not result[to_node]:
1156
            self.LogWarning("Copy of file %s to node %s failed",
1157
                            fname, to_node)
1158
    finally:
1159
      if not self.rpc.call_node_start_master(master, False):
1160
        self.LogWarning("Could not re-enable the master role on"
1161
                        " the master, please restart manually.")
1162

    
1163

    
1164
def _RecursiveCheckIfLVMBased(disk):
1165
  """Check if the given disk or its children are lvm-based.
1166

1167
  @type disk: L{objects.Disk}
1168
  @param disk: the disk to check
1169
  @rtype: booleean
1170
  @return: boolean indicating whether a LD_LV dev_type was found or not
1171

1172
  """
1173
  if disk.children:
1174
    for chdisk in disk.children:
1175
      if _RecursiveCheckIfLVMBased(chdisk):
1176
        return True
1177
  return disk.dev_type == constants.LD_LV
1178

    
1179

    
1180
class LUSetClusterParams(LogicalUnit):
1181
  """Change the parameters of the cluster.
1182

1183
  """
1184
  HPATH = "cluster-modify"
1185
  HTYPE = constants.HTYPE_CLUSTER
1186
  _OP_REQP = []
1187
  REQ_BGL = False
1188

    
1189
  def ExpandNames(self):
1190
    # FIXME: in the future maybe other cluster params won't require checking on
1191
    # all nodes to be modified.
1192
    self.needed_locks = {
1193
      locking.LEVEL_NODE: locking.ALL_SET,
1194
    }
1195
    self.share_locks[locking.LEVEL_NODE] = 1
1196

    
1197
  def BuildHooksEnv(self):
1198
    """Build hooks env.
1199

1200
    """
1201
    env = {
1202
      "OP_TARGET": self.cfg.GetClusterName(),
1203
      "NEW_VG_NAME": self.op.vg_name,
1204
      }
1205
    mn = self.cfg.GetMasterNode()
1206
    return env, [mn], [mn]
1207

    
1208
  def CheckPrereq(self):
1209
    """Check prerequisites.
1210

1211
    This checks whether the given params don't conflict and
1212
    if the given volume group is valid.
1213

1214
    """
1215
    # FIXME: This only works because there is only one parameter that can be
1216
    # changed or removed.
1217
    if self.op.vg_name is not None and not self.op.vg_name:
1218
      instances = self.cfg.GetAllInstancesInfo().values()
1219
      for inst in instances:
1220
        for disk in inst.disks:
1221
          if _RecursiveCheckIfLVMBased(disk):
1222
            raise errors.OpPrereqError("Cannot disable lvm storage while"
1223
                                       " lvm-based instances exist")
1224

    
1225
    node_list = self.acquired_locks[locking.LEVEL_NODE]
1226

    
1227
    # if vg_name not None, checks given volume group on all nodes
1228
    if self.op.vg_name:
1229
      vglist = self.rpc.call_vg_list(node_list)
1230
      for node in node_list:
1231
        vgstatus = utils.CheckVolumeGroupSize(vglist[node], self.op.vg_name,
1232
                                              constants.MIN_VG_SIZE)
1233
        if vgstatus:
1234
          raise errors.OpPrereqError("Error on node '%s': %s" %
1235
                                     (node, vgstatus))
1236

    
1237
    self.cluster = cluster = self.cfg.GetClusterInfo()
1238
    # beparams changes do not need validation (we can't validate?),
1239
    # but we still process here
1240
    if self.op.beparams:
1241
      self.new_beparams = cluster.FillDict(
1242
        cluster.beparams[constants.BEGR_DEFAULT], self.op.beparams)
1243

    
1244
    # hypervisor list/parameters
1245
    self.new_hvparams = cluster.FillDict(cluster.hvparams, {})
1246
    if self.op.hvparams:
1247
      if not isinstance(self.op.hvparams, dict):
1248
        raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1249
      for hv_name, hv_dict in self.op.hvparams.items():
1250
        if hv_name not in self.new_hvparams:
1251
          self.new_hvparams[hv_name] = hv_dict
1252
        else:
1253
          self.new_hvparams[hv_name].update(hv_dict)
1254

    
1255
    if self.op.enabled_hypervisors is not None:
1256
      self.hv_list = self.op.enabled_hypervisors
1257
    else:
1258
      self.hv_list = cluster.enabled_hypervisors
1259

    
1260
    if self.op.hvparams or self.op.enabled_hypervisors is not None:
1261
      # either the enabled list has changed, or the parameters have, validate
1262
      for hv_name, hv_params in self.new_hvparams.items():
1263
        if ((self.op.hvparams and hv_name in self.op.hvparams) or
1264
            (self.op.enabled_hypervisors and
1265
             hv_name in self.op.enabled_hypervisors)):
1266
          # either this is a new hypervisor, or its parameters have changed
1267
          hv_class = hypervisor.GetHypervisor(hv_name)
1268
          hv_class.CheckParameterSyntax(hv_params)
1269
          _CheckHVParams(self, node_list, hv_name, hv_params)
1270

    
1271
  def Exec(self, feedback_fn):
1272
    """Change the parameters of the cluster.
1273

1274
    """
1275
    if self.op.vg_name is not None:
1276
      if self.op.vg_name != self.cfg.GetVGName():
1277
        self.cfg.SetVGName(self.op.vg_name)
1278
      else:
1279
        feedback_fn("Cluster LVM configuration already in desired"
1280
                    " state, not changing")
1281
    if self.op.hvparams:
1282
      self.cluster.hvparams = self.new_hvparams
1283
    if self.op.enabled_hypervisors is not None:
1284
      self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1285
    if self.op.beparams:
1286
      self.cluster.beparams[constants.BEGR_DEFAULT] = self.new_beparams
1287
    self.cfg.Update(self.cluster)
1288

    
1289

    
1290
def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1291
  """Sleep and poll for an instance's disk to sync.
1292

1293
  """
1294
  if not instance.disks:
1295
    return True
1296

    
1297
  if not oneshot:
1298
    lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1299

    
1300
  node = instance.primary_node
1301

    
1302
  for dev in instance.disks:
1303
    lu.cfg.SetDiskID(dev, node)
1304

    
1305
  retries = 0
1306
  while True:
1307
    max_time = 0
1308
    done = True
1309
    cumul_degraded = False
1310
    rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1311
    if not rstats:
1312
      lu.LogWarning("Can't get any data from node %s", node)
1313
      retries += 1
1314
      if retries >= 10:
1315
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1316
                                 " aborting." % node)
1317
      time.sleep(6)
1318
      continue
1319
    retries = 0
1320
    for i in range(len(rstats)):
1321
      mstat = rstats[i]
1322
      if mstat is None:
1323
        lu.LogWarning("Can't compute data for node %s/%s",
1324
                           node, instance.disks[i].iv_name)
1325
        continue
1326
      # we ignore the ldisk parameter
1327
      perc_done, est_time, is_degraded, _ = mstat
1328
      cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1329
      if perc_done is not None:
1330
        done = False
1331
        if est_time is not None:
1332
          rem_time = "%d estimated seconds remaining" % est_time
1333
          max_time = est_time
1334
        else:
1335
          rem_time = "no time estimate"
1336
        lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1337
                        (instance.disks[i].iv_name, perc_done, rem_time))
1338
    if done or oneshot:
1339
      break
1340

    
1341
    time.sleep(min(60, max_time))
1342

    
1343
  if done:
1344
    lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1345
  return not cumul_degraded
1346

    
1347

    
1348
def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1349
  """Check that mirrors are not degraded.
1350

1351
  The ldisk parameter, if True, will change the test from the
1352
  is_degraded attribute (which represents overall non-ok status for
1353
  the device(s)) to the ldisk (representing the local storage status).
1354

1355
  """
1356
  lu.cfg.SetDiskID(dev, node)
1357
  if ldisk:
1358
    idx = 6
1359
  else:
1360
    idx = 5
1361

    
1362
  result = True
1363
  if on_primary or dev.AssembleOnSecondary():
1364
    rstats = lu.rpc.call_blockdev_find(node, dev)
1365
    if not rstats:
1366
      logging.warning("Node %s: disk degraded, not found or node down", node)
1367
      result = False
1368
    else:
1369
      result = result and (not rstats[idx])
1370
  if dev.children:
1371
    for child in dev.children:
1372
      result = result and _CheckDiskConsistency(lu, child, node, on_primary)
1373

    
1374
  return result
1375

    
1376

    
1377
class LUDiagnoseOS(NoHooksLU):
1378
  """Logical unit for OS diagnose/query.
1379

1380
  """
1381
  _OP_REQP = ["output_fields", "names"]
1382
  REQ_BGL = False
1383
  _FIELDS_STATIC = utils.FieldSet()
1384
  _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status")
1385

    
1386
  def ExpandNames(self):
1387
    if self.op.names:
1388
      raise errors.OpPrereqError("Selective OS query not supported")
1389

    
1390
    _CheckOutputFields(static=self._FIELDS_STATIC,
1391
                       dynamic=self._FIELDS_DYNAMIC,
1392
                       selected=self.op.output_fields)
1393

    
1394
    # Lock all nodes, in shared mode
1395
    self.needed_locks = {}
1396
    self.share_locks[locking.LEVEL_NODE] = 1
1397
    self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1398

    
1399
  def CheckPrereq(self):
1400
    """Check prerequisites.
1401

1402
    """
1403

    
1404
  @staticmethod
1405
  def _DiagnoseByOS(node_list, rlist):
1406
    """Remaps a per-node return list into an a per-os per-node dictionary
1407

1408
    @param node_list: a list with the names of all nodes
1409
    @param rlist: a map with node names as keys and OS objects as values
1410

1411
    @rtype: dict
1412
    @returns: a dictionary with osnames as keys and as value another map, with
1413
        nodes as keys and list of OS objects as values, eg::
1414

1415
          {"debian-etch": {"node1": [<object>,...],
1416
                           "node2": [<object>,]}
1417
          }
1418

1419
    """
1420
    all_os = {}
1421
    for node_name, nr in rlist.iteritems():
1422
      if not nr:
1423
        continue
1424
      for os_obj in nr:
1425
        if os_obj.name not in all_os:
1426
          # build a list of nodes for this os containing empty lists
1427
          # for each node in node_list
1428
          all_os[os_obj.name] = {}
1429
          for nname in node_list:
1430
            all_os[os_obj.name][nname] = []
1431
        all_os[os_obj.name][node_name].append(os_obj)
1432
    return all_os
1433

    
1434
  def Exec(self, feedback_fn):
1435
    """Compute the list of OSes.
1436

1437
    """
1438
    node_list = self.acquired_locks[locking.LEVEL_NODE]
1439
    node_data = self.rpc.call_os_diagnose(node_list)
1440
    if node_data == False:
1441
      raise errors.OpExecError("Can't gather the list of OSes")
1442
    pol = self._DiagnoseByOS(node_list, node_data)
1443
    output = []
1444
    for os_name, os_data in pol.iteritems():
1445
      row = []
1446
      for field in self.op.output_fields:
1447
        if field == "name":
1448
          val = os_name
1449
        elif field == "valid":
1450
          val = utils.all([osl and osl[0] for osl in os_data.values()])
1451
        elif field == "node_status":
1452
          val = {}
1453
          for node_name, nos_list in os_data.iteritems():
1454
            val[node_name] = [(v.status, v.path) for v in nos_list]
1455
        else:
1456
          raise errors.ParameterError(field)
1457
        row.append(val)
1458
      output.append(row)
1459

    
1460
    return output
1461

    
1462

    
1463
class LURemoveNode(LogicalUnit):
1464
  """Logical unit for removing a node.
1465

1466
  """
1467
  HPATH = "node-remove"
1468
  HTYPE = constants.HTYPE_NODE
1469
  _OP_REQP = ["node_name"]
1470

    
1471
  def BuildHooksEnv(self):
1472
    """Build hooks env.
1473

1474
    This doesn't run on the target node in the pre phase as a failed
1475
    node would then be impossible to remove.
1476

1477
    """
1478
    env = {
1479
      "OP_TARGET": self.op.node_name,
1480
      "NODE_NAME": self.op.node_name,
1481
      }
1482
    all_nodes = self.cfg.GetNodeList()
1483
    all_nodes.remove(self.op.node_name)
1484
    return env, all_nodes, all_nodes
1485

    
1486
  def CheckPrereq(self):
1487
    """Check prerequisites.
1488

1489
    This checks:
1490
     - the node exists in the configuration
1491
     - it does not have primary or secondary instances
1492
     - it's not the master
1493

1494
    Any errors are signalled by raising errors.OpPrereqError.
1495

1496
    """
1497
    node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1498
    if node is None:
1499
      raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1500

    
1501
    instance_list = self.cfg.GetInstanceList()
1502

    
1503
    masternode = self.cfg.GetMasterNode()
1504
    if node.name == masternode:
1505
      raise errors.OpPrereqError("Node is the master node,"
1506
                                 " you need to failover first.")
1507

    
1508
    for instance_name in instance_list:
1509
      instance = self.cfg.GetInstanceInfo(instance_name)
1510
      if node.name == instance.primary_node:
1511
        raise errors.OpPrereqError("Instance %s still running on the node,"
1512
                                   " please remove first." % instance_name)
1513
      if node.name in instance.secondary_nodes:
1514
        raise errors.OpPrereqError("Instance %s has node as a secondary,"
1515
                                   " please remove first." % instance_name)
1516
    self.op.node_name = node.name
1517
    self.node = node
1518

    
1519
  def Exec(self, feedback_fn):
1520
    """Removes the node from the cluster.
1521

1522
    """
1523
    node = self.node
1524
    logging.info("Stopping the node daemon and removing configs from node %s",
1525
                 node.name)
1526

    
1527
    self.context.RemoveNode(node.name)
1528

    
1529
    self.rpc.call_node_leave_cluster(node.name)
1530

    
1531

    
1532
class LUQueryNodes(NoHooksLU):
1533
  """Logical unit for querying nodes.
1534

1535
  """
1536
  _OP_REQP = ["output_fields", "names"]
1537
  REQ_BGL = False
1538
  _FIELDS_DYNAMIC = utils.FieldSet(
1539
    "dtotal", "dfree",
1540
    "mtotal", "mnode", "mfree",
1541
    "bootid",
1542
    "ctotal",
1543
    )
1544

    
1545
  _FIELDS_STATIC = utils.FieldSet(
1546
    "name", "pinst_cnt", "sinst_cnt",
1547
    "pinst_list", "sinst_list",
1548
    "pip", "sip", "tags",
1549
    "serial_no",
1550
    )
1551

    
1552
  def ExpandNames(self):
1553
    _CheckOutputFields(static=self._FIELDS_STATIC,
1554
                       dynamic=self._FIELDS_DYNAMIC,
1555
                       selected=self.op.output_fields)
1556

    
1557
    self.needed_locks = {}
1558
    self.share_locks[locking.LEVEL_NODE] = 1
1559

    
1560
    if self.op.names:
1561
      self.wanted = _GetWantedNodes(self, self.op.names)
1562
    else:
1563
      self.wanted = locking.ALL_SET
1564

    
1565
    self.do_locking = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
1566
    if self.do_locking:
1567
      # if we don't request only static fields, we need to lock the nodes
1568
      self.needed_locks[locking.LEVEL_NODE] = self.wanted
1569

    
1570

    
1571
  def CheckPrereq(self):
1572
    """Check prerequisites.
1573

1574
    """
1575
    # The validation of the node list is done in the _GetWantedNodes,
1576
    # if non empty, and if empty, there's no validation to do
1577
    pass
1578

    
1579
  def Exec(self, feedback_fn):
1580
    """Computes the list of nodes and their attributes.
1581

1582
    """
1583
    all_info = self.cfg.GetAllNodesInfo()
1584
    if self.do_locking:
1585
      nodenames = self.acquired_locks[locking.LEVEL_NODE]
1586
    elif self.wanted != locking.ALL_SET:
1587
      nodenames = self.wanted
1588
      missing = set(nodenames).difference(all_info.keys())
1589
      if missing:
1590
        raise errors.OpExecError(
1591
          "Some nodes were removed before retrieving their data: %s" % missing)
1592
    else:
1593
      nodenames = all_info.keys()
1594

    
1595
    nodenames = utils.NiceSort(nodenames)
1596
    nodelist = [all_info[name] for name in nodenames]
1597

    
1598
    # begin data gathering
1599

    
1600
    if self.do_locking:
1601
      live_data = {}
1602
      node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
1603
                                          self.cfg.GetHypervisorType())
1604
      for name in nodenames:
1605
        nodeinfo = node_data.get(name, None)
1606
        if nodeinfo:
1607
          live_data[name] = {
1608
            "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1609
            "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1610
            "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1611
            "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1612
            "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1613
            "ctotal": utils.TryConvert(int, nodeinfo['cpu_total']),
1614
            "bootid": nodeinfo['bootid'],
1615
            }
1616
        else:
1617
          live_data[name] = {}
1618
    else:
1619
      live_data = dict.fromkeys(nodenames, {})
1620

    
1621
    node_to_primary = dict([(name, set()) for name in nodenames])
1622
    node_to_secondary = dict([(name, set()) for name in nodenames])
1623

    
1624
    inst_fields = frozenset(("pinst_cnt", "pinst_list",
1625
                             "sinst_cnt", "sinst_list"))
1626
    if inst_fields & frozenset(self.op.output_fields):
1627
      instancelist = self.cfg.GetInstanceList()
1628

    
1629
      for instance_name in instancelist:
1630
        inst = self.cfg.GetInstanceInfo(instance_name)
1631
        if inst.primary_node in node_to_primary:
1632
          node_to_primary[inst.primary_node].add(inst.name)
1633
        for secnode in inst.secondary_nodes:
1634
          if secnode in node_to_secondary:
1635
            node_to_secondary[secnode].add(inst.name)
1636

    
1637
    # end data gathering
1638

    
1639
    output = []
1640
    for node in nodelist:
1641
      node_output = []
1642
      for field in self.op.output_fields:
1643
        if field == "name":
1644
          val = node.name
1645
        elif field == "pinst_list":
1646
          val = list(node_to_primary[node.name])
1647
        elif field == "sinst_list":
1648
          val = list(node_to_secondary[node.name])
1649
        elif field == "pinst_cnt":
1650
          val = len(node_to_primary[node.name])
1651
        elif field == "sinst_cnt":
1652
          val = len(node_to_secondary[node.name])
1653
        elif field == "pip":
1654
          val = node.primary_ip
1655
        elif field == "sip":
1656
          val = node.secondary_ip
1657
        elif field == "tags":
1658
          val = list(node.GetTags())
1659
        elif field == "serial_no":
1660
          val = node.serial_no
1661
        elif self._FIELDS_DYNAMIC.Matches(field):
1662
          val = live_data[node.name].get(field, None)
1663
        else:
1664
          raise errors.ParameterError(field)
1665
        node_output.append(val)
1666
      output.append(node_output)
1667

    
1668
    return output
1669

    
1670

    
1671
class LUQueryNodeVolumes(NoHooksLU):
1672
  """Logical unit for getting volumes on node(s).
1673

1674
  """
1675
  _OP_REQP = ["nodes", "output_fields"]
1676
  REQ_BGL = False
1677
  _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
1678
  _FIELDS_STATIC = utils.FieldSet("node")
1679

    
1680
  def ExpandNames(self):
1681
    _CheckOutputFields(static=self._FIELDS_STATIC,
1682
                       dynamic=self._FIELDS_DYNAMIC,
1683
                       selected=self.op.output_fields)
1684

    
1685
    self.needed_locks = {}
1686
    self.share_locks[locking.LEVEL_NODE] = 1
1687
    if not self.op.nodes:
1688
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1689
    else:
1690
      self.needed_locks[locking.LEVEL_NODE] = \
1691
        _GetWantedNodes(self, self.op.nodes)
1692

    
1693
  def CheckPrereq(self):
1694
    """Check prerequisites.
1695

1696
    This checks that the fields required are valid output fields.
1697

1698
    """
1699
    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
1700

    
1701
  def Exec(self, feedback_fn):
1702
    """Computes the list of nodes and their attributes.
1703

1704
    """
1705
    nodenames = self.nodes
1706
    volumes = self.rpc.call_node_volumes(nodenames)
1707

    
1708
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
1709
             in self.cfg.GetInstanceList()]
1710

    
1711
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1712

    
1713
    output = []
1714
    for node in nodenames:
1715
      if node not in volumes or not volumes[node]:
1716
        continue
1717

    
1718
      node_vols = volumes[node][:]
1719
      node_vols.sort(key=lambda vol: vol['dev'])
1720

    
1721
      for vol in node_vols:
1722
        node_output = []
1723
        for field in self.op.output_fields:
1724
          if field == "node":
1725
            val = node
1726
          elif field == "phys":
1727
            val = vol['dev']
1728
          elif field == "vg":
1729
            val = vol['vg']
1730
          elif field == "name":
1731
            val = vol['name']
1732
          elif field == "size":
1733
            val = int(float(vol['size']))
1734
          elif field == "instance":
1735
            for inst in ilist:
1736
              if node not in lv_by_node[inst]:
1737
                continue
1738
              if vol['name'] in lv_by_node[inst][node]:
1739
                val = inst.name
1740
                break
1741
            else:
1742
              val = '-'
1743
          else:
1744
            raise errors.ParameterError(field)
1745
          node_output.append(str(val))
1746

    
1747
        output.append(node_output)
1748

    
1749
    return output
1750

    
1751

    
1752
class LUAddNode(LogicalUnit):
1753
  """Logical unit for adding node to the cluster.
1754

1755
  """
1756
  HPATH = "node-add"
1757
  HTYPE = constants.HTYPE_NODE
1758
  _OP_REQP = ["node_name"]
1759

    
1760
  def BuildHooksEnv(self):
1761
    """Build hooks env.
1762

1763
    This will run on all nodes before, and on all nodes + the new node after.
1764

1765
    """
1766
    env = {
1767
      "OP_TARGET": self.op.node_name,
1768
      "NODE_NAME": self.op.node_name,
1769
      "NODE_PIP": self.op.primary_ip,
1770
      "NODE_SIP": self.op.secondary_ip,
1771
      }
1772
    nodes_0 = self.cfg.GetNodeList()
1773
    nodes_1 = nodes_0 + [self.op.node_name, ]
1774
    return env, nodes_0, nodes_1
1775

    
1776
  def CheckPrereq(self):
1777
    """Check prerequisites.
1778

1779
    This checks:
1780
     - the new node is not already in the config
1781
     - it is resolvable
1782
     - its parameters (single/dual homed) matches the cluster
1783

1784
    Any errors are signalled by raising errors.OpPrereqError.
1785

1786
    """
1787
    node_name = self.op.node_name
1788
    cfg = self.cfg
1789

    
1790
    dns_data = utils.HostInfo(node_name)
1791

    
1792
    node = dns_data.name
1793
    primary_ip = self.op.primary_ip = dns_data.ip
1794
    secondary_ip = getattr(self.op, "secondary_ip", None)
1795
    if secondary_ip is None:
1796
      secondary_ip = primary_ip
1797
    if not utils.IsValidIP(secondary_ip):
1798
      raise errors.OpPrereqError("Invalid secondary IP given")
1799
    self.op.secondary_ip = secondary_ip
1800

    
1801
    node_list = cfg.GetNodeList()
1802
    if not self.op.readd and node in node_list:
1803
      raise errors.OpPrereqError("Node %s is already in the configuration" %
1804
                                 node)
1805
    elif self.op.readd and node not in node_list:
1806
      raise errors.OpPrereqError("Node %s is not in the configuration" % node)
1807

    
1808
    for existing_node_name in node_list:
1809
      existing_node = cfg.GetNodeInfo(existing_node_name)
1810

    
1811
      if self.op.readd and node == existing_node_name:
1812
        if (existing_node.primary_ip != primary_ip or
1813
            existing_node.secondary_ip != secondary_ip):
1814
          raise errors.OpPrereqError("Readded node doesn't have the same IP"
1815
                                     " address configuration as before")
1816
        continue
1817

    
1818
      if (existing_node.primary_ip == primary_ip or
1819
          existing_node.secondary_ip == primary_ip or
1820
          existing_node.primary_ip == secondary_ip or
1821
          existing_node.secondary_ip == secondary_ip):
1822
        raise errors.OpPrereqError("New node ip address(es) conflict with"
1823
                                   " existing node %s" % existing_node.name)
1824

    
1825
    # check that the type of the node (single versus dual homed) is the
1826
    # same as for the master
1827
    myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
1828
    master_singlehomed = myself.secondary_ip == myself.primary_ip
1829
    newbie_singlehomed = secondary_ip == primary_ip
1830
    if master_singlehomed != newbie_singlehomed:
1831
      if master_singlehomed:
1832
        raise errors.OpPrereqError("The master has no private ip but the"
1833
                                   " new node has one")
1834
      else:
1835
        raise errors.OpPrereqError("The master has a private ip but the"
1836
                                   " new node doesn't have one")
1837

    
1838
    # checks reachablity
1839
    if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1840
      raise errors.OpPrereqError("Node not reachable by ping")
1841

    
1842
    if not newbie_singlehomed:
1843
      # check reachability from my secondary ip to newbie's secondary ip
1844
      if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1845
                           source=myself.secondary_ip):
1846
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1847
                                   " based ping to noded port")
1848

    
1849
    self.new_node = objects.Node(name=node,
1850
                                 primary_ip=primary_ip,
1851
                                 secondary_ip=secondary_ip)
1852

    
1853
  def Exec(self, feedback_fn):
1854
    """Adds the new node to the cluster.
1855

1856
    """
1857
    new_node = self.new_node
1858
    node = new_node.name
1859

    
1860
    # check connectivity
1861
    result = self.rpc.call_version([node])[node]
1862
    if result:
1863
      if constants.PROTOCOL_VERSION == result:
1864
        logging.info("Communication to node %s fine, sw version %s match",
1865
                     node, result)
1866
      else:
1867
        raise errors.OpExecError("Version mismatch master version %s,"
1868
                                 " node version %s" %
1869
                                 (constants.PROTOCOL_VERSION, result))
1870
    else:
1871
      raise errors.OpExecError("Cannot get version from the new node")
1872

    
1873
    # setup ssh on node
1874
    logging.info("Copy ssh key to node %s", node)
1875
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1876
    keyarray = []
1877
    keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1878
                constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1879
                priv_key, pub_key]
1880

    
1881
    for i in keyfiles:
1882
      f = open(i, 'r')
1883
      try:
1884
        keyarray.append(f.read())
1885
      finally:
1886
        f.close()
1887

    
1888
    result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
1889
                                    keyarray[2],
1890
                                    keyarray[3], keyarray[4], keyarray[5])
1891

    
1892
    if not result:
1893
      raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1894

    
1895
    # Add node to our /etc/hosts, and add key to known_hosts
1896
    utils.AddHostToEtcHosts(new_node.name)
1897

    
1898
    if new_node.secondary_ip != new_node.primary_ip:
1899
      if not self.rpc.call_node_has_ip_address(new_node.name,
1900
                                               new_node.secondary_ip):
1901
        raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1902
                                 " you gave (%s). Please fix and re-run this"
1903
                                 " command." % new_node.secondary_ip)
1904

    
1905
    node_verify_list = [self.cfg.GetMasterNode()]
1906
    node_verify_param = {
1907
      'nodelist': [node],
1908
      # TODO: do a node-net-test as well?
1909
    }
1910

    
1911
    result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
1912
                                       self.cfg.GetClusterName())
1913
    for verifier in node_verify_list:
1914
      if not result[verifier]:
1915
        raise errors.OpExecError("Cannot communicate with %s's node daemon"
1916
                                 " for remote verification" % verifier)
1917
      if result[verifier]['nodelist']:
1918
        for failed in result[verifier]['nodelist']:
1919
          feedback_fn("ssh/hostname verification failed %s -> %s" %
1920
                      (verifier, result[verifier]['nodelist'][failed]))
1921
        raise errors.OpExecError("ssh/hostname verification failed.")
1922

    
1923
    # Distribute updated /etc/hosts and known_hosts to all nodes,
1924
    # including the node just added
1925
    myself = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
1926
    dist_nodes = self.cfg.GetNodeList()
1927
    if not self.op.readd:
1928
      dist_nodes.append(node)
1929
    if myself.name in dist_nodes:
1930
      dist_nodes.remove(myself.name)
1931

    
1932
    logging.debug("Copying hosts and known_hosts to all nodes")
1933
    for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1934
      result = self.rpc.call_upload_file(dist_nodes, fname)
1935
      for to_node in dist_nodes:
1936
        if not result[to_node]:
1937
          logging.error("Copy of file %s to node %s failed", fname, to_node)
1938

    
1939
    to_copy = []
1940
    if constants.HT_XEN_HVM in self.cfg.GetClusterInfo().enabled_hypervisors:
1941
      to_copy.append(constants.VNC_PASSWORD_FILE)
1942
    for fname in to_copy:
1943
      result = self.rpc.call_upload_file([node], fname)
1944
      if not result[node]:
1945
        logging.error("Could not copy file %s to node %s", fname, node)
1946

    
1947
    if self.op.readd:
1948
      self.context.ReaddNode(new_node)
1949
    else:
1950
      self.context.AddNode(new_node)
1951

    
1952

    
1953
class LUQueryClusterInfo(NoHooksLU):
1954
  """Query cluster configuration.
1955

1956
  """
1957
  _OP_REQP = []
1958
  REQ_BGL = False
1959

    
1960
  def ExpandNames(self):
1961
    self.needed_locks = {}
1962

    
1963
  def CheckPrereq(self):
1964
    """No prerequsites needed for this LU.
1965

1966
    """
1967
    pass
1968

    
1969
  def Exec(self, feedback_fn):
1970
    """Return cluster config.
1971

1972
    """
1973
    cluster = self.cfg.GetClusterInfo()
1974
    result = {
1975
      "software_version": constants.RELEASE_VERSION,
1976
      "protocol_version": constants.PROTOCOL_VERSION,
1977
      "config_version": constants.CONFIG_VERSION,
1978
      "os_api_version": constants.OS_API_VERSION,
1979
      "export_version": constants.EXPORT_VERSION,
1980
      "architecture": (platform.architecture()[0], platform.machine()),
1981
      "name": cluster.cluster_name,
1982
      "master": cluster.master_node,
1983
      "default_hypervisor": cluster.default_hypervisor,
1984
      "enabled_hypervisors": cluster.enabled_hypervisors,
1985
      "hvparams": cluster.hvparams,
1986
      "beparams": cluster.beparams,
1987
      }
1988

    
1989
    return result
1990

    
1991

    
1992
class LUQueryConfigValues(NoHooksLU):
1993
  """Return configuration values.
1994

1995
  """
1996
  _OP_REQP = []
1997
  REQ_BGL = False
1998
  _FIELDS_DYNAMIC = utils.FieldSet()
1999
  _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag")
2000

    
2001
  def ExpandNames(self):
2002
    self.needed_locks = {}
2003

    
2004
    _CheckOutputFields(static=self._FIELDS_STATIC,
2005
                       dynamic=self._FIELDS_DYNAMIC,
2006
                       selected=self.op.output_fields)
2007

    
2008
  def CheckPrereq(self):
2009
    """No prerequisites.
2010

2011
    """
2012
    pass
2013

    
2014
  def Exec(self, feedback_fn):
2015
    """Dump a representation of the cluster config to the standard output.
2016

2017
    """
2018
    values = []
2019
    for field in self.op.output_fields:
2020
      if field == "cluster_name":
2021
        entry = self.cfg.GetClusterName()
2022
      elif field == "master_node":
2023
        entry = self.cfg.GetMasterNode()
2024
      elif field == "drain_flag":
2025
        entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
2026
      else:
2027
        raise errors.ParameterError(field)
2028
      values.append(entry)
2029
    return values
2030

    
2031

    
2032
class LUActivateInstanceDisks(NoHooksLU):
2033
  """Bring up an instance's disks.
2034

2035
  """
2036
  _OP_REQP = ["instance_name"]
2037
  REQ_BGL = False
2038

    
2039
  def ExpandNames(self):
2040
    self._ExpandAndLockInstance()
2041
    self.needed_locks[locking.LEVEL_NODE] = []
2042
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2043

    
2044
  def DeclareLocks(self, level):
2045
    if level == locking.LEVEL_NODE:
2046
      self._LockInstancesNodes()
2047

    
2048
  def CheckPrereq(self):
2049
    """Check prerequisites.
2050

2051
    This checks that the instance is in the cluster.
2052

2053
    """
2054
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2055
    assert self.instance is not None, \
2056
      "Cannot retrieve locked instance %s" % self.op.instance_name
2057

    
2058
  def Exec(self, feedback_fn):
2059
    """Activate the disks.
2060

2061
    """
2062
    disks_ok, disks_info = _AssembleInstanceDisks(self, self.instance)
2063
    if not disks_ok:
2064
      raise errors.OpExecError("Cannot activate block devices")
2065

    
2066
    return disks_info
2067

    
2068

    
2069
def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
2070
  """Prepare the block devices for an instance.
2071

2072
  This sets up the block devices on all nodes.
2073

2074
  @type lu: L{LogicalUnit}
2075
  @param lu: the logical unit on whose behalf we execute
2076
  @type instance: L{objects.Instance}
2077
  @param instance: the instance for whose disks we assemble
2078
  @type ignore_secondaries: boolean
2079
  @param ignore_secondaries: if true, errors on secondary nodes
2080
      won't result in an error return from the function
2081
  @return: False if the operation failed, otherwise a list of
2082
      (host, instance_visible_name, node_visible_name)
2083
      with the mapping from node devices to instance devices
2084

2085
  """
2086
  device_info = []
2087
  disks_ok = True
2088
  iname = instance.name
2089
  # With the two passes mechanism we try to reduce the window of
2090
  # opportunity for the race condition of switching DRBD to primary
2091
  # before handshaking occured, but we do not eliminate it
2092

    
2093
  # The proper fix would be to wait (with some limits) until the
2094
  # connection has been made and drbd transitions from WFConnection
2095
  # into any other network-connected state (Connected, SyncTarget,
2096
  # SyncSource, etc.)
2097

    
2098
  # 1st pass, assemble on all nodes in secondary mode
2099
  for inst_disk in instance.disks:
2100
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2101
      lu.cfg.SetDiskID(node_disk, node)
2102
      result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
2103
      if not result:
2104
        lu.proc.LogWarning("Could not prepare block device %s on node %s"
2105
                           " (is_primary=False, pass=1)",
2106
                           inst_disk.iv_name, node)
2107
        if not ignore_secondaries:
2108
          disks_ok = False
2109

    
2110
  # FIXME: race condition on drbd migration to primary
2111

    
2112
  # 2nd pass, do only the primary node
2113
  for inst_disk in instance.disks:
2114
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2115
      if node != instance.primary_node:
2116
        continue
2117
      lu.cfg.SetDiskID(node_disk, node)
2118
      result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
2119
      if not result:
2120
        lu.proc.LogWarning("Could not prepare block device %s on node %s"
2121
                           " (is_primary=True, pass=2)",
2122
                           inst_disk.iv_name, node)
2123
        disks_ok = False
2124
    device_info.append((instance.primary_node, inst_disk.iv_name, result))
2125

    
2126
  # leave the disks configured for the primary node
2127
  # this is a workaround that would be fixed better by
2128
  # improving the logical/physical id handling
2129
  for disk in instance.disks:
2130
    lu.cfg.SetDiskID(disk, instance.primary_node)
2131

    
2132
  return disks_ok, device_info
2133

    
2134

    
2135
def _StartInstanceDisks(lu, instance, force):
2136
  """Start the disks of an instance.
2137

2138
  """
2139
  disks_ok, dummy = _AssembleInstanceDisks(lu, instance,
2140
                                           ignore_secondaries=force)
2141
  if not disks_ok:
2142
    _ShutdownInstanceDisks(lu, instance)
2143
    if force is not None and not force:
2144
      lu.proc.LogWarning("", hint="If the message above refers to a"
2145
                         " secondary node,"
2146
                         " you can retry the operation using '--force'.")
2147
    raise errors.OpExecError("Disk consistency error")
2148

    
2149

    
2150
class LUDeactivateInstanceDisks(NoHooksLU):
2151
  """Shutdown an instance's disks.
2152

2153
  """
2154
  _OP_REQP = ["instance_name"]
2155
  REQ_BGL = False
2156

    
2157
  def ExpandNames(self):
2158
    self._ExpandAndLockInstance()
2159
    self.needed_locks[locking.LEVEL_NODE] = []
2160
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2161

    
2162
  def DeclareLocks(self, level):
2163
    if level == locking.LEVEL_NODE:
2164
      self._LockInstancesNodes()
2165

    
2166
  def CheckPrereq(self):
2167
    """Check prerequisites.
2168

2169
    This checks that the instance is in the cluster.
2170

2171
    """
2172
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2173
    assert self.instance is not None, \
2174
      "Cannot retrieve locked instance %s" % self.op.instance_name
2175

    
2176
  def Exec(self, feedback_fn):
2177
    """Deactivate the disks
2178

2179
    """
2180
    instance = self.instance
2181
    _SafeShutdownInstanceDisks(self, instance)
2182

    
2183

    
2184
def _SafeShutdownInstanceDisks(lu, instance):
2185
  """Shutdown block devices of an instance.
2186

2187
  This function checks if an instance is running, before calling
2188
  _ShutdownInstanceDisks.
2189

2190
  """
2191
  ins_l = lu.rpc.call_instance_list([instance.primary_node],
2192
                                      [instance.hypervisor])
2193
  ins_l = ins_l[instance.primary_node]
2194
  if not type(ins_l) is list:
2195
    raise errors.OpExecError("Can't contact node '%s'" %
2196
                             instance.primary_node)
2197

    
2198
  if instance.name in ins_l:
2199
    raise errors.OpExecError("Instance is running, can't shutdown"
2200
                             " block devices.")
2201

    
2202
  _ShutdownInstanceDisks(lu, instance)
2203

    
2204

    
2205
def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
2206
  """Shutdown block devices of an instance.
2207

2208
  This does the shutdown on all nodes of the instance.
2209

2210
  If the ignore_primary is false, errors on the primary node are
2211
  ignored.
2212

2213
  """
2214
  result = True
2215
  for disk in instance.disks:
2216
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2217
      lu.cfg.SetDiskID(top_disk, node)
2218
      if not lu.rpc.call_blockdev_shutdown(node, top_disk):
2219
        logging.error("Could not shutdown block device %s on node %s",
2220
                      disk.iv_name, node)
2221
        if not ignore_primary or node != instance.primary_node:
2222
          result = False
2223
  return result
2224

    
2225

    
2226
def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor):
2227
  """Checks if a node has enough free memory.
2228

2229
  This function check if a given node has the needed amount of free
2230
  memory. In case the node has less memory or we cannot get the
2231
  information from the node, this function raise an OpPrereqError
2232
  exception.
2233

2234
  @type lu: C{LogicalUnit}
2235
  @param lu: a logical unit from which we get configuration data
2236
  @type node: C{str}
2237
  @param node: the node to check
2238
  @type reason: C{str}
2239
  @param reason: string to use in the error message
2240
  @type requested: C{int}
2241
  @param requested: the amount of memory in MiB to check for
2242
  @type hypervisor: C{str}
2243
  @param hypervisor: the hypervisor to ask for memory stats
2244
  @raise errors.OpPrereqError: if the node doesn't have enough memory, or
2245
      we cannot check the node
2246

2247
  """
2248
  nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor)
2249
  if not nodeinfo or not isinstance(nodeinfo, dict):
2250
    raise errors.OpPrereqError("Could not contact node %s for resource"
2251
                             " information" % (node,))
2252

    
2253
  free_mem = nodeinfo[node].get('memory_free')
2254
  if not isinstance(free_mem, int):
2255
    raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2256
                             " was '%s'" % (node, free_mem))
2257
  if requested > free_mem:
2258
    raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2259
                             " needed %s MiB, available %s MiB" %
2260
                             (node, reason, requested, free_mem))
2261

    
2262

    
2263
class LUStartupInstance(LogicalUnit):
2264
  """Starts an instance.
2265

2266
  """
2267
  HPATH = "instance-start"
2268
  HTYPE = constants.HTYPE_INSTANCE
2269
  _OP_REQP = ["instance_name", "force"]
2270
  REQ_BGL = False
2271

    
2272
  def ExpandNames(self):
2273
    self._ExpandAndLockInstance()
2274

    
2275
  def BuildHooksEnv(self):
2276
    """Build hooks env.
2277

2278
    This runs on master, primary and secondary nodes of the instance.
2279

2280
    """
2281
    env = {
2282
      "FORCE": self.op.force,
2283
      }
2284
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2285
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2286
          list(self.instance.secondary_nodes))
2287
    return env, nl, nl
2288

    
2289
  def CheckPrereq(self):
2290
    """Check prerequisites.
2291

2292
    This checks that the instance is in the cluster.
2293

2294
    """
2295
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2296
    assert self.instance is not None, \
2297
      "Cannot retrieve locked instance %s" % self.op.instance_name
2298

    
2299
    bep = self.cfg.GetClusterInfo().FillBE(instance)
2300
    # check bridges existance
2301
    _CheckInstanceBridgesExist(self, instance)
2302

    
2303
    _CheckNodeFreeMemory(self, instance.primary_node,
2304
                         "starting instance %s" % instance.name,
2305
                         bep[constants.BE_MEMORY], instance.hypervisor)
2306

    
2307
  def Exec(self, feedback_fn):
2308
    """Start the instance.
2309

2310
    """
2311
    instance = self.instance
2312
    force = self.op.force
2313
    extra_args = getattr(self.op, "extra_args", "")
2314

    
2315
    self.cfg.MarkInstanceUp(instance.name)
2316

    
2317
    node_current = instance.primary_node
2318

    
2319
    _StartInstanceDisks(self, instance, force)
2320

    
2321
    if not self.rpc.call_instance_start(node_current, instance, extra_args):
2322
      _ShutdownInstanceDisks(self, instance)
2323
      raise errors.OpExecError("Could not start instance")
2324

    
2325

    
2326
class LURebootInstance(LogicalUnit):
2327
  """Reboot an instance.
2328

2329
  """
2330
  HPATH = "instance-reboot"
2331
  HTYPE = constants.HTYPE_INSTANCE
2332
  _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2333
  REQ_BGL = False
2334

    
2335
  def ExpandNames(self):
2336
    if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2337
                                   constants.INSTANCE_REBOOT_HARD,
2338
                                   constants.INSTANCE_REBOOT_FULL]:
2339
      raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2340
                                  (constants.INSTANCE_REBOOT_SOFT,
2341
                                   constants.INSTANCE_REBOOT_HARD,
2342
                                   constants.INSTANCE_REBOOT_FULL))
2343
    self._ExpandAndLockInstance()
2344

    
2345
  def BuildHooksEnv(self):
2346
    """Build hooks env.
2347

2348
    This runs on master, primary and secondary nodes of the instance.
2349

2350
    """
2351
    env = {
2352
      "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2353
      }
2354
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2355
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2356
          list(self.instance.secondary_nodes))
2357
    return env, nl, nl
2358

    
2359
  def CheckPrereq(self):
2360
    """Check prerequisites.
2361

2362
    This checks that the instance is in the cluster.
2363

2364
    """
2365
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2366
    assert self.instance is not None, \
2367
      "Cannot retrieve locked instance %s" % self.op.instance_name
2368

    
2369
    # check bridges existance
2370
    _CheckInstanceBridgesExist(self, instance)
2371

    
2372
  def Exec(self, feedback_fn):
2373
    """Reboot the instance.
2374

2375
    """
2376
    instance = self.instance
2377
    ignore_secondaries = self.op.ignore_secondaries
2378
    reboot_type = self.op.reboot_type
2379
    extra_args = getattr(self.op, "extra_args", "")
2380

    
2381
    node_current = instance.primary_node
2382

    
2383
    if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2384
                       constants.INSTANCE_REBOOT_HARD]:
2385
      if not self.rpc.call_instance_reboot(node_current, instance,
2386
                                           reboot_type, extra_args):
2387
        raise errors.OpExecError("Could not reboot instance")
2388
    else:
2389
      if not self.rpc.call_instance_shutdown(node_current, instance):
2390
        raise errors.OpExecError("could not shutdown instance for full reboot")
2391
      _ShutdownInstanceDisks(self, instance)
2392
      _StartInstanceDisks(self, instance, ignore_secondaries)
2393
      if not self.rpc.call_instance_start(node_current, instance, extra_args):
2394
        _ShutdownInstanceDisks(self, instance)
2395
        raise errors.OpExecError("Could not start instance for full reboot")
2396

    
2397
    self.cfg.MarkInstanceUp(instance.name)
2398

    
2399

    
2400
class LUShutdownInstance(LogicalUnit):
2401
  """Shutdown an instance.
2402

2403
  """
2404
  HPATH = "instance-stop"
2405
  HTYPE = constants.HTYPE_INSTANCE
2406
  _OP_REQP = ["instance_name"]
2407
  REQ_BGL = False
2408

    
2409
  def ExpandNames(self):
2410
    self._ExpandAndLockInstance()
2411

    
2412
  def BuildHooksEnv(self):
2413
    """Build hooks env.
2414

2415
    This runs on master, primary and secondary nodes of the instance.
2416

2417
    """
2418
    env = _BuildInstanceHookEnvByObject(self, self.instance)
2419
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2420
          list(self.instance.secondary_nodes))
2421
    return env, nl, nl
2422

    
2423
  def CheckPrereq(self):
2424
    """Check prerequisites.
2425

2426
    This checks that the instance is in the cluster.
2427

2428
    """
2429
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2430
    assert self.instance is not None, \
2431
      "Cannot retrieve locked instance %s" % self.op.instance_name
2432

    
2433
  def Exec(self, feedback_fn):
2434
    """Shutdown the instance.
2435

2436
    """
2437
    instance = self.instance
2438
    node_current = instance.primary_node
2439
    self.cfg.MarkInstanceDown(instance.name)
2440
    if not self.rpc.call_instance_shutdown(node_current, instance):
2441
      self.proc.LogWarning("Could not shutdown instance")
2442

    
2443
    _ShutdownInstanceDisks(self, instance)
2444

    
2445

    
2446
class LUReinstallInstance(LogicalUnit):
2447
  """Reinstall an instance.
2448

2449
  """
2450
  HPATH = "instance-reinstall"
2451
  HTYPE = constants.HTYPE_INSTANCE
2452
  _OP_REQP = ["instance_name"]
2453
  REQ_BGL = False
2454

    
2455
  def ExpandNames(self):
2456
    self._ExpandAndLockInstance()
2457

    
2458
  def BuildHooksEnv(self):
2459
    """Build hooks env.
2460

2461
    This runs on master, primary and secondary nodes of the instance.
2462

2463
    """
2464
    env = _BuildInstanceHookEnvByObject(self, self.instance)
2465
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2466
          list(self.instance.secondary_nodes))
2467
    return env, nl, nl
2468

    
2469
  def CheckPrereq(self):
2470
    """Check prerequisites.
2471

2472
    This checks that the instance is in the cluster and is not running.
2473

2474
    """
2475
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2476
    assert instance is not None, \
2477
      "Cannot retrieve locked instance %s" % self.op.instance_name
2478

    
2479
    if instance.disk_template == constants.DT_DISKLESS:
2480
      raise errors.OpPrereqError("Instance '%s' has no disks" %
2481
                                 self.op.instance_name)
2482
    if instance.status != "down":
2483
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2484
                                 self.op.instance_name)
2485
    remote_info = self.rpc.call_instance_info(instance.primary_node,
2486
                                              instance.name,
2487
                                              instance.hypervisor)
2488
    if remote_info:
2489
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2490
                                 (self.op.instance_name,
2491
                                  instance.primary_node))
2492

    
2493
    self.op.os_type = getattr(self.op, "os_type", None)
2494
    if self.op.os_type is not None:
2495
      # OS verification
2496
      pnode = self.cfg.GetNodeInfo(
2497
        self.cfg.ExpandNodeName(instance.primary_node))
2498
      if pnode is None:
2499
        raise errors.OpPrereqError("Primary node '%s' is unknown" %
2500
                                   self.op.pnode)
2501
      os_obj = self.rpc.call_os_get(pnode.name, self.op.os_type)
2502
      if not os_obj:
2503
        raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2504
                                   " primary node"  % self.op.os_type)
2505

    
2506
    self.instance = instance
2507

    
2508
  def Exec(self, feedback_fn):
2509
    """Reinstall the instance.
2510

2511
    """
2512
    inst = self.instance
2513

    
2514
    if self.op.os_type is not None:
2515
      feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2516
      inst.os = self.op.os_type
2517
      self.cfg.Update(inst)
2518

    
2519
    _StartInstanceDisks(self, inst, None)
2520
    try:
2521
      feedback_fn("Running the instance OS create scripts...")
2522
      if not self.rpc.call_instance_os_add(inst.primary_node, inst):
2523
        raise errors.OpExecError("Could not install OS for instance %s"
2524
                                 " on node %s" %
2525
                                 (inst.name, inst.primary_node))
2526
    finally:
2527
      _ShutdownInstanceDisks(self, inst)
2528

    
2529

    
2530
class LURenameInstance(LogicalUnit):
2531
  """Rename an instance.
2532

2533
  """
2534
  HPATH = "instance-rename"
2535
  HTYPE = constants.HTYPE_INSTANCE
2536
  _OP_REQP = ["instance_name", "new_name"]
2537

    
2538
  def BuildHooksEnv(self):
2539
    """Build hooks env.
2540

2541
    This runs on master, primary and secondary nodes of the instance.
2542

2543
    """
2544
    env = _BuildInstanceHookEnvByObject(self, self.instance)
2545
    env["INSTANCE_NEW_NAME"] = self.op.new_name
2546
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2547
          list(self.instance.secondary_nodes))
2548
    return env, nl, nl
2549

    
2550
  def CheckPrereq(self):
2551
    """Check prerequisites.
2552

2553
    This checks that the instance is in the cluster and is not running.
2554

2555
    """
2556
    instance = self.cfg.GetInstanceInfo(
2557
      self.cfg.ExpandInstanceName(self.op.instance_name))
2558
    if instance is None:
2559
      raise errors.OpPrereqError("Instance '%s' not known" %
2560
                                 self.op.instance_name)
2561
    if instance.status != "down":
2562
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2563
                                 self.op.instance_name)
2564
    remote_info = self.rpc.call_instance_info(instance.primary_node,
2565
                                              instance.name,
2566
                                              instance.hypervisor)
2567
    if remote_info:
2568
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2569
                                 (self.op.instance_name,
2570
                                  instance.primary_node))
2571
    self.instance = instance
2572

    
2573
    # new name verification
2574
    name_info = utils.HostInfo(self.op.new_name)
2575

    
2576
    self.op.new_name = new_name = name_info.name
2577
    instance_list = self.cfg.GetInstanceList()
2578
    if new_name in instance_list:
2579
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2580
                                 new_name)
2581

    
2582
    if not getattr(self.op, "ignore_ip", False):
2583
      if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
2584
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
2585
                                   (name_info.ip, new_name))
2586

    
2587

    
2588
  def Exec(self, feedback_fn):
2589
    """Reinstall the instance.
2590

2591
    """
2592
    inst = self.instance
2593
    old_name = inst.name
2594

    
2595
    if inst.disk_template == constants.DT_FILE:
2596
      old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2597

    
2598
    self.cfg.RenameInstance(inst.name, self.op.new_name)
2599
    # Change the instance lock. This is definitely safe while we hold the BGL
2600
    self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
2601
    self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
2602

    
2603
    # re-read the instance from the configuration after rename
2604
    inst = self.cfg.GetInstanceInfo(self.op.new_name)
2605

    
2606
    if inst.disk_template == constants.DT_FILE:
2607
      new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2608
      result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
2609
                                                     old_file_storage_dir,
2610
                                                     new_file_storage_dir)
2611

    
2612
      if not result:
2613
        raise errors.OpExecError("Could not connect to node '%s' to rename"
2614
                                 " directory '%s' to '%s' (but the instance"
2615
                                 " has been renamed in Ganeti)" % (
2616
                                 inst.primary_node, old_file_storage_dir,
2617
                                 new_file_storage_dir))
2618

    
2619
      if not result[0]:
2620
        raise errors.OpExecError("Could not rename directory '%s' to '%s'"
2621
                                 " (but the instance has been renamed in"
2622
                                 " Ganeti)" % (old_file_storage_dir,
2623
                                               new_file_storage_dir))
2624

    
2625
    _StartInstanceDisks(self, inst, None)
2626
    try:
2627
      if not self.rpc.call_instance_run_rename(inst.primary_node, inst,
2628
                                               old_name):
2629
        msg = ("Could not run OS rename script for instance %s on node %s"
2630
               " (but the instance has been renamed in Ganeti)" %
2631
               (inst.name, inst.primary_node))
2632
        self.proc.LogWarning(msg)
2633
    finally:
2634
      _ShutdownInstanceDisks(self, inst)
2635

    
2636

    
2637
class LURemoveInstance(LogicalUnit):
2638
  """Remove an instance.
2639

2640
  """
2641
  HPATH = "instance-remove"
2642
  HTYPE = constants.HTYPE_INSTANCE
2643
  _OP_REQP = ["instance_name", "ignore_failures"]
2644
  REQ_BGL = False
2645

    
2646
  def ExpandNames(self):
2647
    self._ExpandAndLockInstance()
2648
    self.needed_locks[locking.LEVEL_NODE] = []
2649
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2650

    
2651
  def DeclareLocks(self, level):
2652
    if level == locking.LEVEL_NODE:
2653
      self._LockInstancesNodes()
2654

    
2655
  def BuildHooksEnv(self):
2656
    """Build hooks env.
2657

2658
    This runs on master, primary and secondary nodes of the instance.
2659

2660
    """
2661
    env = _BuildInstanceHookEnvByObject(self, self.instance)
2662
    nl = [self.cfg.GetMasterNode()]
2663
    return env, nl, nl
2664

    
2665
  def CheckPrereq(self):
2666
    """Check prerequisites.
2667

2668
    This checks that the instance is in the cluster.
2669

2670
    """
2671
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2672
    assert self.instance is not None, \
2673
      "Cannot retrieve locked instance %s" % self.op.instance_name
2674

    
2675
  def Exec(self, feedback_fn):
2676
    """Remove the instance.
2677

2678
    """
2679
    instance = self.instance
2680
    logging.info("Shutting down instance %s on node %s",
2681
                 instance.name, instance.primary_node)
2682

    
2683
    if not self.rpc.call_instance_shutdown(instance.primary_node, instance):
2684
      if self.op.ignore_failures:
2685
        feedback_fn("Warning: can't shutdown instance")
2686
      else:
2687
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2688
                                 (instance.name, instance.primary_node))
2689

    
2690
    logging.info("Removing block devices for instance %s", instance.name)
2691

    
2692
    if not _RemoveDisks(self, instance):
2693
      if self.op.ignore_failures:
2694
        feedback_fn("Warning: can't remove instance's disks")
2695
      else:
2696
        raise errors.OpExecError("Can't remove instance's disks")
2697

    
2698
    logging.info("Removing instance %s out of cluster config", instance.name)
2699

    
2700
    self.cfg.RemoveInstance(instance.name)
2701
    self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
2702

    
2703

    
2704
class LUQueryInstances(NoHooksLU):
2705
  """Logical unit for querying instances.
2706

2707
  """
2708
  _OP_REQP = ["output_fields", "names"]
2709
  REQ_BGL = False
2710
  _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
2711
                                    "admin_state", "admin_ram",
2712
                                    "disk_template", "ip", "mac", "bridge",
2713
                                    "sda_size", "sdb_size", "vcpus", "tags",
2714
                                    "network_port", "beparams",
2715
                                    "(disk).(size)/([0-9]+)",
2716
                                    "(disk).(sizes)",
2717
                                    "(nic).(mac|ip|bridge)/([0-9]+)",
2718
                                    "(nic).(macs|ips|bridges)",
2719
                                    "(disk|nic).(count)",
2720
                                    "serial_no", "hypervisor", "hvparams",] +
2721
                                  ["hv/%s" % name
2722
                                   for name in constants.HVS_PARAMETERS] +
2723
                                  ["be/%s" % name
2724
                                   for name in constants.BES_PARAMETERS])
2725
  _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
2726

    
2727

    
2728
  def ExpandNames(self):
2729
    _CheckOutputFields(static=self._FIELDS_STATIC,
2730
                       dynamic=self._FIELDS_DYNAMIC,
2731
                       selected=self.op.output_fields)
2732

    
2733
    self.needed_locks = {}
2734
    self.share_locks[locking.LEVEL_INSTANCE] = 1
2735
    self.share_locks[locking.LEVEL_NODE] = 1
2736

    
2737
    if self.op.names:
2738
      self.wanted = _GetWantedInstances(self, self.op.names)
2739
    else:
2740
      self.wanted = locking.ALL_SET
2741

    
2742
    self.do_locking = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
2743
    if self.do_locking:
2744
      self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
2745
      self.needed_locks[locking.LEVEL_NODE] = []
2746
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2747

    
2748
  def DeclareLocks(self, level):
2749
    if level == locking.LEVEL_NODE and self.do_locking:
2750
      self._LockInstancesNodes()
2751

    
2752
  def CheckPrereq(self):
2753
    """Check prerequisites.
2754

2755
    """
2756
    pass
2757

    
2758
  def Exec(self, feedback_fn):
2759
    """Computes the list of nodes and their attributes.
2760

2761
    """
2762
    all_info = self.cfg.GetAllInstancesInfo()
2763
    if self.do_locking:
2764
      instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
2765
    elif self.wanted != locking.ALL_SET:
2766
      instance_names = self.wanted
2767
      missing = set(instance_names).difference(all_info.keys())
2768
      if missing:
2769
        raise errors.OpExecError(
2770
          "Some instances were removed before retrieving their data: %s"
2771
          % missing)
2772
    else:
2773
      instance_names = all_info.keys()
2774

    
2775
    instance_names = utils.NiceSort(instance_names)
2776
    instance_list = [all_info[iname] for iname in instance_names]
2777

    
2778
    # begin data gathering
2779

    
2780
    nodes = frozenset([inst.primary_node for inst in instance_list])
2781
    hv_list = list(set([inst.hypervisor for inst in instance_list]))
2782

    
2783
    bad_nodes = []
2784
    if self.do_locking:
2785
      live_data = {}
2786
      node_data = self.rpc.call_all_instances_info(nodes, hv_list)
2787
      for name in nodes:
2788
        result = node_data[name]
2789
        if result:
2790
          live_data.update(result)
2791
        elif result == False:
2792
          bad_nodes.append(name)
2793
        # else no instance is alive
2794
    else:
2795
      live_data = dict([(name, {}) for name in instance_names])
2796

    
2797
    # end data gathering
2798

    
2799
    HVPREFIX = "hv/"
2800
    BEPREFIX = "be/"
2801
    output = []
2802
    for instance in instance_list:
2803
      iout = []
2804
      i_hv = self.cfg.GetClusterInfo().FillHV(instance)
2805
      i_be = self.cfg.GetClusterInfo().FillBE(instance)
2806
      for field in self.op.output_fields:
2807
        st_match = self._FIELDS_STATIC.Matches(field)
2808
        if field == "name":
2809
          val = instance.name
2810
        elif field == "os":
2811
          val = instance.os
2812
        elif field == "pnode":
2813
          val = instance.primary_node
2814
        elif field == "snodes":
2815
          val = list(instance.secondary_nodes)
2816
        elif field == "admin_state":
2817
          val = (instance.status != "down")
2818
        elif field == "oper_state":
2819
          if instance.primary_node in bad_nodes:
2820
            val = None
2821
          else:
2822
            val = bool(live_data.get(instance.name))
2823
        elif field == "status":
2824
          if instance.primary_node in bad_nodes:
2825
            val = "ERROR_nodedown"
2826
          else:
2827
            running = bool(live_data.get(instance.name))
2828
            if running:
2829
              if instance.status != "down":
2830
                val = "running"
2831
              else:
2832
                val = "ERROR_up"
2833
            else:
2834
              if instance.status != "down":
2835
                val = "ERROR_down"
2836
              else:
2837
                val = "ADMIN_down"
2838
        elif field == "oper_ram":
2839
          if instance.primary_node in bad_nodes:
2840
            val = None
2841
          elif instance.name in live_data:
2842
            val = live_data[instance.name].get("memory", "?")
2843
          else:
2844
            val = "-"
2845
        elif field == "disk_template":
2846
          val = instance.disk_template
2847
        elif field == "ip":
2848
          val = instance.nics[0].ip
2849
        elif field == "bridge":
2850
          val = instance.nics[0].bridge
2851
        elif field == "mac":
2852
          val = instance.nics[0].mac
2853
        elif field == "sda_size" or field == "sdb_size":
2854
          idx = ord(field[2]) - ord('a')
2855
          try:
2856
            val = instance.FindDisk(idx).size
2857
          except errors.OpPrereqError:
2858
            val = None
2859
        elif field == "tags":
2860
          val = list(instance.GetTags())
2861
        elif field == "serial_no":
2862
          val = instance.serial_no
2863
        elif field == "network_port":
2864
          val = instance.network_port
2865
        elif field == "hypervisor":
2866
          val = instance.hypervisor
2867
        elif field == "hvparams":
2868
          val = i_hv
2869
        elif (field.startswith(HVPREFIX) and
2870
              field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
2871
          val = i_hv.get(field[len(HVPREFIX):], None)
2872
        elif field == "beparams":
2873
          val = i_be
2874
        elif (field.startswith(BEPREFIX) and
2875
              field[len(BEPREFIX):] in constants.BES_PARAMETERS):
2876
          val = i_be.get(field[len(BEPREFIX):], None)
2877
        elif st_match and st_match.groups():
2878
          # matches a variable list
2879
          st_groups = st_match.groups()
2880
          if st_groups and st_groups[0] == "disk":
2881
            if st_groups[1] == "count":
2882
              val = len(instance.disks)
2883
            elif st_groups[1] == "sizes":
2884
              val = [disk.size for disk in instance.disks]
2885
            elif st_groups[1] == "size":
2886
              try:
2887
                val = instance.FindDisk(st_groups[2]).size
2888
              except errors.OpPrereqError:
2889
                val = None
2890
            else:
2891
              assert False, "Unhandled disk parameter"
2892
          elif st_groups[0] == "nic":
2893
            if st_groups[1] == "count":
2894
              val = len(instance.nics)
2895
            elif st_groups[1] == "macs":
2896
              val = [nic.mac for nic in instance.nics]
2897
            elif st_groups[1] == "ips":
2898
              val = [nic.ip for nic in instance.nics]
2899
            elif st_groups[1] == "bridges":
2900
              val = [nic.bridge for nic in instance.nics]
2901
            else:
2902
              # index-based item
2903
              nic_idx = int(st_groups[2])
2904
              if nic_idx >= len(instance.nics):
2905
                val = None
2906
              else:
2907
                if st_groups[1] == "mac":
2908
                  val = instance.nics[nic_idx].mac
2909
                elif st_groups[1] == "ip":
2910
                  val = instance.nics[nic_idx].ip
2911
                elif st_groups[1] == "bridge":
2912
                  val = instance.nics[nic_idx].bridge
2913
                else:
2914
                  assert False, "Unhandled NIC parameter"
2915
          else:
2916
            assert False, "Unhandled variable parameter"
2917
        else:
2918
          raise errors.ParameterError(field)
2919
        iout.append(val)
2920
      output.append(iout)
2921

    
2922
    return output
2923

    
2924

    
2925
class LUFailoverInstance(LogicalUnit):
2926
  """Failover an instance.
2927

2928
  """
2929
  HPATH = "instance-failover"
2930
  HTYPE = constants.HTYPE_INSTANCE
2931
  _OP_REQP = ["instance_name", "ignore_consistency"]
2932
  REQ_BGL = False
2933

    
2934
  def ExpandNames(self):
2935
    self._ExpandAndLockInstance()
2936
    self.needed_locks[locking.LEVEL_NODE] = []
2937
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2938

    
2939
  def DeclareLocks(self, level):
2940
    if level == locking.LEVEL_NODE:
2941
      self._LockInstancesNodes()
2942

    
2943
  def BuildHooksEnv(self):
2944
    """Build hooks env.
2945

2946
    This runs on master, primary and secondary nodes of the instance.
2947

2948
    """
2949
    env = {
2950
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2951
      }
2952
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2953
    nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
2954
    return env, nl, nl
2955

    
2956
  def CheckPrereq(self):
2957
    """Check prerequisites.
2958

2959
    This checks that the instance is in the cluster.
2960

2961
    """
2962
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2963
    assert self.instance is not None, \
2964
      "Cannot retrieve locked instance %s" % self.op.instance_name
2965

    
2966
    bep = self.cfg.GetClusterInfo().FillBE(instance)
2967
    if instance.disk_template not in constants.DTS_NET_MIRROR:
2968
      raise errors.OpPrereqError("Instance's disk layout is not"
2969
                                 " network mirrored, cannot failover.")
2970

    
2971
    secondary_nodes = instance.secondary_nodes
2972
    if not secondary_nodes:
2973
      raise errors.ProgrammerError("no secondary node but using "
2974
                                   "a mirrored disk template")
2975

    
2976
    target_node = secondary_nodes[0]
2977
    # check memory requirements on the secondary node
2978
    _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
2979
                         instance.name, bep[constants.BE_MEMORY],
2980
                         instance.hypervisor)
2981

    
2982
    # check bridge existance
2983
    brlist = [nic.bridge for nic in instance.nics]
2984
    if not self.rpc.call_bridges_exist(target_node, brlist):
2985
      raise errors.OpPrereqError("One or more target bridges %s does not"
2986
                                 " exist on destination node '%s'" %
2987
                                 (brlist, target_node))
2988

    
2989
  def Exec(self, feedback_fn):
2990
    """Failover an instance.
2991

2992
    The failover is done by shutting it down on its present node and
2993
    starting it on the secondary.
2994

2995
    """
2996
    instance = self.instance
2997

    
2998
    source_node = instance.primary_node
2999
    target_node = instance.secondary_nodes[0]
3000

    
3001
    feedback_fn("* checking disk consistency between source and target")
3002
    for dev in instance.disks:
3003
      # for drbd, these are drbd over lvm
3004
      if not _CheckDiskConsistency(self, dev, target_node, False):
3005
        if instance.status == "up" and not self.op.ignore_consistency:
3006
          raise errors.OpExecError("Disk %s is degraded on target node,"
3007
                                   " aborting failover." % dev.iv_name)
3008

    
3009
    feedback_fn("* shutting down instance on source node")
3010
    logging.info("Shutting down instance %s on node %s",
3011
                 instance.name, source_node)
3012

    
3013
    if not self.rpc.call_instance_shutdown(source_node, instance):
3014
      if self.op.ignore_consistency:
3015
        self.proc.LogWarning("Could not shutdown instance %s on node %s."
3016
                             " Proceeding"
3017
                             " anyway. Please make sure node %s is down",
3018
                             instance.name, source_node, source_node)
3019
      else:
3020
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
3021
                                 (instance.name, source_node))
3022

    
3023
    feedback_fn("* deactivating the instance's disks on source node")
3024
    if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
3025
      raise errors.OpExecError("Can't shut down the instance's disks.")
3026

    
3027
    instance.primary_node = target_node
3028
    # distribute new instance config to the other nodes
3029
    self.cfg.Update(instance)
3030

    
3031
    # Only start the instance if it's marked as up
3032
    if instance.status == "up":
3033
      feedback_fn("* activating the instance's disks on target node")
3034
      logging.info("Starting instance %s on node %s",
3035
                   instance.name, target_node)
3036

    
3037
      disks_ok, dummy = _AssembleInstanceDisks(self, instance,
3038
                                               ignore_secondaries=True)
3039
      if not disks_ok:
3040
        _ShutdownInstanceDisks(self, instance)
3041
        raise errors.OpExecError("Can't activate the instance's disks")
3042

    
3043
      feedback_fn("* starting the instance on the target node")
3044
      if not self.rpc.call_instance_start(target_node, instance, None):
3045
        _ShutdownInstanceDisks(self, instance)
3046
        raise errors.OpExecError("Could not start instance %s on node %s." %
3047
                                 (instance.name, target_node))
3048

    
3049

    
3050
def _CreateBlockDevOnPrimary(lu, node, instance, device, info):
3051
  """Create a tree of block devices on the primary node.
3052

3053
  This always creates all devices.
3054

3055
  """
3056
  if device.children:
3057
    for child in device.children:
3058
      if not _CreateBlockDevOnPrimary(lu, node, instance, child, info):
3059
        return False
3060

    
3061
  lu.cfg.SetDiskID(device, node)
3062
  new_id = lu.rpc.call_blockdev_create(node, device, device.size,
3063
                                       instance.name, True, info)
3064
  if not new_id:
3065
    return False
3066
  if device.physical_id is None:
3067
    device.physical_id = new_id
3068
  return True
3069

    
3070

    
3071
def _CreateBlockDevOnSecondary(lu, node, instance, device, force, info):
3072
  """Create a tree of block devices on a secondary node.
3073

3074
  If this device type has to be created on secondaries, create it and
3075
  all its children.
3076

3077
  If not, just recurse to children keeping the same 'force' value.
3078

3079
  """
3080
  if device.CreateOnSecondary():
3081
    force = True
3082
  if device.children:
3083
    for child in device.children:
3084
      if not _CreateBlockDevOnSecondary(lu, node, instance,
3085
                                        child, force, info):
3086
        return False
3087

    
3088
  if not force:
3089
    return True
3090
  lu.cfg.SetDiskID(device, node)
3091
  new_id = lu.rpc.call_blockdev_create(node, device, device.size,
3092
                                       instance.name, False, info)
3093
  if not new_id:
3094
    return False
3095
  if device.physical_id is None:
3096
    device.physical_id = new_id
3097
  return True
3098

    
3099

    
3100
def _GenerateUniqueNames(lu, exts):
3101
  """Generate a suitable LV name.
3102

3103
  This will generate a logical volume name for the given instance.
3104

3105
  """
3106
  results = []
3107
  for val in exts:
3108
    new_id = lu.cfg.GenerateUniqueID()
3109
    results.append("%s%s" % (new_id, val))
3110
  return results
3111

    
3112

    
3113
def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
3114
                         p_minor, s_minor):
3115
  """Generate a drbd8 device complete with its children.
3116

3117
  """
3118
  port = lu.cfg.AllocatePort()
3119
  vgname = lu.cfg.GetVGName()
3120
  shared_secret = lu.cfg.GenerateDRBDSecret()
3121
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3122
                          logical_id=(vgname, names[0]))
3123
  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3124
                          logical_id=(vgname, names[1]))
3125
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
3126
                          logical_id=(primary, secondary, port,
3127
                                      p_minor, s_minor,
3128
                                      shared_secret),
3129
                          children=[dev_data, dev_meta],
3130
                          iv_name=iv_name)
3131
  return drbd_dev
3132

    
3133

    
3134
def _GenerateDiskTemplate(lu, template_name,
3135
                          instance_name, primary_node,
3136
                          secondary_nodes, disk_info,
3137
                          file_storage_dir, file_driver,
3138
                          base_index):
3139
  """Generate the entire disk layout for a given template type.
3140

3141
  """
3142
  #TODO: compute space requirements
3143

    
3144
  vgname = lu.cfg.GetVGName()
3145
  disk_count = len(disk_info)
3146
  disks = []
3147
  if template_name == constants.DT_DISKLESS:
3148
    pass
3149
  elif template_name == constants.DT_PLAIN:
3150
    if len(secondary_nodes) != 0:
3151
      raise errors.ProgrammerError("Wrong template configuration")
3152

    
3153
    names = _GenerateUniqueNames(lu, [".disk%d" % i
3154
                                      for i in range(disk_count)])
3155
    for idx, disk in enumerate(disk_info):
3156
      disk_index = idx + base_index
3157
      disk_dev = objects.Disk(dev_type=constants.LD_LV, size=disk["size"],
3158
                              logical_id=(vgname, names[idx]),
3159
                              iv_name="disk/%d" % disk_index)
3160
      disks.append(disk_dev)
3161
  elif template_name == constants.DT_DRBD8:
3162
    if len(secondary_nodes) != 1:
3163
      raise errors.ProgrammerError("Wrong template configuration")
3164
    remote_node = secondary_nodes[0]
3165
    minors = lu.cfg.AllocateDRBDMinor(
3166
      [primary_node, remote_node] * len(disk_info), instance_name)
3167

    
3168
    names = _GenerateUniqueNames(lu,
3169
                                 [".disk%d_%s" % (i, s)
3170
                                  for i in range(disk_count)
3171
                                  for s in ("data", "meta")
3172
                                  ])
3173
    for idx, disk in enumerate(disk_info):
3174
      disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
3175
                                      disk["size"], names[idx*2:idx*2+2],
3176
                                      "disk/%d" % disk_index,
3177
                                      minors[idx*2], minors[idx*2+1])
3178
      disks.append(disk_dev)
3179
  elif template_name == constants.DT_FILE:
3180
    if len(secondary_nodes) != 0:
3181
      raise errors.ProgrammerError("Wrong template configuration")
3182

    
3183
    for idx, disk in enumerate(disk_info):
3184

    
3185
      disk_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk["size"],
3186
                              iv_name="disk/%d" % disk_index,
3187
                              logical_id=(file_driver,
3188
                                          "%s/disk%d" % (file_storage_dir,
3189
                                                         idx)))
3190
      disks.append(disk_dev)
3191
  else:
3192
    raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
3193
  return disks
3194

    
3195

    
3196
def _GetInstanceInfoText(instance):
3197
  """Compute that text that should be added to the disk's metadata.
3198

3199
  """
3200
  return "originstname+%s" % instance.name
3201

    
3202

    
3203
def _CreateDisks(lu, instance):
3204
  """Create all disks for an instance.
3205

3206
  This abstracts away some work from AddInstance.
3207

3208
  @type lu: L{LogicalUnit}
3209
  @param lu: the logical unit on whose behalf we execute
3210
  @type instance: L{objects.Instance}
3211
  @param instance: the instance whose disks we should create
3212
  @rtype: boolean
3213
  @return: the success of the creation
3214

3215
  """
3216
  info = _GetInstanceInfoText(instance)
3217

    
3218
  if instance.disk_template == constants.DT_FILE:
3219
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3220
    result = lu.rpc.call_file_storage_dir_create(instance.primary_node,
3221
                                                 file_storage_dir)
3222

    
3223
    if not result:
3224
      logging.error("Could not connect to node '%s'", instance.primary_node)
3225
      return False
3226

    
3227
    if not result[0]:
3228
      logging.error("Failed to create directory '%s'", file_storage_dir)
3229
      return False
3230

    
3231
  # Note: this needs to be kept in sync with adding of disks in
3232
  # LUSetInstanceParams
3233
  for device in instance.disks:
3234
    logging.info("Creating volume %s for instance %s",
3235
                 device.iv_name, instance.name)
3236
    #HARDCODE
3237
    for secondary_node in instance.secondary_nodes:
3238
      if not _CreateBlockDevOnSecondary(lu, secondary_node, instance,
3239
                                        device, False, info):
3240
        logging.error("Failed to create volume %s (%s) on secondary node %s!",
3241
                      device.iv_name, device, secondary_node)
3242
        return False
3243
    #HARDCODE
3244
    if not _CreateBlockDevOnPrimary(lu, instance.primary_node,
3245
                                    instance, device, info):
3246
      logging.error("Failed to create volume %s on primary!", device.iv_name)
3247
      return False
3248

    
3249
  return True
3250

    
3251

    
3252
def _RemoveDisks(lu, instance):
3253
  """Remove all disks for an instance.
3254

3255
  This abstracts away some work from `AddInstance()` and
3256
  `RemoveInstance()`. Note that in case some of the devices couldn't
3257
  be removed, the removal will continue with the other ones (compare
3258
  with `_CreateDisks()`).
3259

3260
  @type lu: L{LogicalUnit}
3261
  @param lu: the logical unit on whose behalf we execute
3262
  @type instance: L{objects.Instance}
3263
  @param instance: the instance whose disks we should remove
3264
  @rtype: boolean
3265
  @return: the success of the removal
3266

3267
  """
3268
  logging.info("Removing block devices for instance %s", instance.name)
3269

    
3270
  result = True
3271
  for device in instance.disks:
3272
    for node, disk in device.ComputeNodeTree(instance.primary_node):
3273
      lu.cfg.SetDiskID(disk, node)
3274
      if not lu.rpc.call_blockdev_remove(node, disk):
3275
        lu.proc.LogWarning("Could not remove block device %s on node %s,"
3276
                           " continuing anyway", device.iv_name, node)
3277
        result = False
3278

    
3279
  if instance.disk_template == constants.DT_FILE:
3280
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3281
    if not lu.rpc.call_file_storage_dir_remove(instance.primary_node,
3282
                                               file_storage_dir):
3283
      logging.error("Could not remove directory '%s'", file_storage_dir)
3284
      result = False
3285

    
3286
  return result
3287

    
3288

    
3289
def _ComputeDiskSize(disk_template, disks):
3290
  """Compute disk size requirements in the volume group
3291

3292
  """
3293
  # Required free disk space as a function of disk and swap space
3294
  req_size_dict = {
3295
    constants.DT_DISKLESS: None,
3296
    constants.DT_PLAIN: sum(d["size"] for d in disks),
3297
    # 128 MB are added for drbd metadata for each disk
3298
    constants.DT_DRBD8: sum(d["size"] + 128 for d in disks),
3299
    constants.DT_FILE: None,
3300
  }
3301

    
3302
  if disk_template not in req_size_dict:
3303
    raise errors.ProgrammerError("Disk template '%s' size requirement"
3304
                                 " is unknown" %  disk_template)
3305

    
3306
  return req_size_dict[disk_template]
3307

    
3308

    
3309
def _CheckHVParams(lu, nodenames, hvname, hvparams):
3310
  """Hypervisor parameter validation.
3311

3312
  This function abstract the hypervisor parameter validation to be
3313
  used in both instance create and instance modify.
3314

3315
  @type lu: L{LogicalUnit}
3316
  @param lu: the logical unit for which we check
3317
  @type nodenames: list
3318
  @param nodenames: the list of nodes on which we should check
3319
  @type hvname: string
3320
  @param hvname: the name of the hypervisor we should use
3321
  @type hvparams: dict
3322
  @param hvparams: the parameters which we need to check
3323
  @raise errors.OpPrereqError: if the parameters are not valid
3324

3325
  """
3326
  hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
3327
                                                  hvname,
3328
                                                  hvparams)
3329
  for node in nodenames:
3330
    info = hvinfo.get(node, None)
3331
    if not info or not isinstance(info, (tuple, list)):
3332
      raise errors.OpPrereqError("Cannot get current information"
3333
                                 " from node '%s' (%s)" % (node, info))
3334
    if not info[0]:
3335
      raise errors.OpPrereqError("Hypervisor parameter validation failed:"
3336
                                 " %s" % info[1])
3337

    
3338

    
3339
class LUCreateInstance(LogicalUnit):
3340
  """Create an instance.
3341

3342
  """
3343
  HPATH = "instance-add"
3344
  HTYPE = constants.HTYPE_INSTANCE
3345
  _OP_REQP = ["instance_name", "disks", "disk_template",
3346
              "mode", "start",
3347
              "wait_for_sync", "ip_check", "nics",
3348
              "hvparams", "beparams"]
3349
  REQ_BGL = False
3350

    
3351
  def _ExpandNode(self, node):
3352
    """Expands and checks one node name.
3353

3354
    """
3355
    node_full = self.cfg.ExpandNodeName(node)
3356
    if node_full is None:
3357
      raise errors.OpPrereqError("Unknown node %s" % node)
3358
    return node_full
3359

    
3360
  def ExpandNames(self):
3361
    """ExpandNames for CreateInstance.
3362

3363
    Figure out the right locks for instance creation.
3364

3365
    """
3366
    self.needed_locks = {}
3367

    
3368
    # set optional parameters to none if they don't exist
3369
    for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
3370
      if not hasattr(self.op, attr):
3371
        setattr(self.op, attr, None)
3372

    
3373
    # cheap checks, mostly valid constants given
3374

    
3375
    # verify creation mode
3376
    if self.op.mode not in (constants.INSTANCE_CREATE,
3377
                            constants.INSTANCE_IMPORT):
3378
      raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
3379
                                 self.op.mode)
3380

    
3381
    # disk template and mirror node verification
3382
    if self.op.disk_template not in constants.DISK_TEMPLATES:
3383
      raise errors.OpPrereqError("Invalid disk template name")
3384

    
3385
    if self.op.hypervisor is None:
3386
      self.op.hypervisor = self.cfg.GetHypervisorType()
3387

    
3388
    cluster = self.cfg.GetClusterInfo()
3389
    enabled_hvs = cluster.enabled_hypervisors
3390
    if self.op.hypervisor not in enabled_hvs:
3391
      raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
3392
                                 " cluster (%s)" % (self.op.hypervisor,
3393
                                  ",".join(enabled_hvs)))
3394

    
3395
    # check hypervisor parameter syntax (locally)
3396

    
3397
    filled_hvp = cluster.FillDict(cluster.hvparams[self.op.hypervisor],
3398
                                  self.op.hvparams)
3399
    hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
3400
    hv_type.CheckParameterSyntax(filled_hvp)
3401

    
3402
    # fill and remember the beparams dict
3403
    self.be_full = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
3404
                                    self.op.beparams)
3405

    
3406
    #### instance parameters check
3407

    
3408
    # instance name verification
3409
    hostname1 = utils.HostInfo(self.op.instance_name)
3410
    self.op.instance_name = instance_name = hostname1.name
3411

    
3412
    # this is just a preventive check, but someone might still add this
3413
    # instance in the meantime, and creation will fail at lock-add time
3414
    if instance_name in self.cfg.GetInstanceList():
3415
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3416
                                 instance_name)
3417

    
3418
    self.add_locks[locking.LEVEL_INSTANCE] = instance_name
3419

    
3420
    # NIC buildup
3421
    self.nics = []
3422
    for nic in self.op.nics:
3423
      # ip validity checks
3424
      ip = nic.get("ip", None)
3425
      if ip is None or ip.lower() == "none":
3426
        nic_ip = None
3427
      elif ip.lower() == constants.VALUE_AUTO:
3428
        nic_ip = hostname1.ip
3429
      else:
3430
        if not utils.IsValidIP(ip):
3431
          raise errors.OpPrereqError("Given IP address '%s' doesn't look"
3432
                                     " like a valid IP" % ip)
3433
        nic_ip = ip
3434

    
3435
      # MAC address verification
3436
      mac = nic.get("mac", constants.VALUE_AUTO)
3437
      if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
3438
        if not utils.IsValidMac(mac.lower()):
3439
          raise errors.OpPrereqError("Invalid MAC address specified: %s" %
3440
                                     mac)
3441
      # bridge verification
3442
      bridge = nic.get("bridge", self.cfg.GetDefBridge())
3443
      self.nics.append(objects.NIC(mac=mac, ip=nic_ip, bridge=bridge))
3444

    
3445
    # disk checks/pre-build
3446
    self.disks = []
3447
    for disk in self.op.disks:
3448
      mode = disk.get("mode", constants.DISK_RDWR)
3449
      if mode not in constants.DISK_ACCESS_SET:
3450
        raise errors.OpPrereqError("Invalid disk access mode '%s'" %
3451
                                   mode)
3452
      size = disk.get("size", None)
3453
      if size is None:
3454
        raise errors.OpPrereqError("Missing disk size")
3455
      try:
3456
        size = int(size)
3457
      except ValueError:
3458
        raise errors.OpPrereqError("Invalid disk size '%s'" % size)
3459
      self.disks.append({"size": size, "mode": mode})
3460

    
3461
    # used in CheckPrereq for ip ping check
3462
    self.check_ip = hostname1.ip
3463

    
3464
    # file storage checks
3465
    if (self.op.file_driver and
3466
        not self.op.file_driver in constants.FILE_DRIVER):
3467
      raise errors.OpPrereqError("Invalid file driver name '%s'" %
3468
                                 self.op.file_driver)
3469

    
3470
    if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
3471
      raise errors.OpPrereqError("File storage directory path not absolute")
3472

    
3473
    ### Node/iallocator related checks
3474
    if [self.op.iallocator, self.op.pnode].count(None) != 1:
3475
      raise errors.OpPrereqError("One and only one of iallocator and primary"
3476
                                 " node must be given")
3477

    
3478
    if self.op.iallocator:
3479
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3480
    else:
3481
      self.op.pnode = self._ExpandNode(self.op.pnode)
3482
      nodelist = [self.op.pnode]
3483
      if self.op.snode is not None:
3484
        self.op.snode = self._ExpandNode(self.op.snode)
3485
        nodelist.append(self.op.snode)
3486
      self.needed_locks[locking.LEVEL_NODE] = nodelist
3487

    
3488
    # in case of import lock the source node too
3489
    if self.op.mode == constants.INSTANCE_IMPORT:
3490
      src_node = getattr(self.op, "src_node", None)
3491
      src_path = getattr(self.op, "src_path", None)
3492

    
3493
      if src_node is None or src_path is None:
3494
        raise errors.OpPrereqError("Importing an instance requires source"
3495
                                   " node and path options")
3496

    
3497
      if not os.path.isabs(src_path):
3498
        raise errors.OpPrereqError("The source path must be absolute")
3499

    
3500
      self.op.src_node = src_node = self._ExpandNode(src_node)
3501
      if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
3502
        self.needed_locks[locking.LEVEL_NODE].append(src_node)
3503

    
3504
    else: # INSTANCE_CREATE
3505
      if getattr(self.op, "os_type", None) is None:
3506
        raise errors.OpPrereqError("No guest OS specified")
3507

    
3508
  def _RunAllocator(self):
3509
    """Run the allocator based on input opcode.
3510

3511
    """
3512
    nics = [n.ToDict() for n in self.nics]
3513
    ial = IAllocator(self,
3514
                     mode=constants.IALLOCATOR_MODE_ALLOC,
3515
                     name=self.op.instance_name,
3516
                     disk_template=self.op.disk_template,
3517
                     tags=[],
3518
                     os=self.op.os_type,
3519
                     vcpus=self.be_full[constants.BE_VCPUS],
3520
                     mem_size=self.be_full[constants.BE_MEMORY],
3521
                     disks=self.disks,
3522
                     nics=nics,
3523
                     hypervisor=self.op.hypervisor,
3524
                     )
3525

    
3526
    ial.Run(self.op.iallocator)
3527

    
3528
    if not ial.success:
3529
      raise errors.OpPrereqError("Can't compute nodes using"
3530
                                 " iallocator '%s': %s" % (self.op.iallocator,
3531
                                                           ial.info))
3532
    if len(ial.nodes) != ial.required_nodes:
3533
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3534
                                 " of nodes (%s), required %s" %
3535
                                 (self.op.iallocator, len(ial.nodes),
3536
                                  ial.required_nodes))
3537
    self.op.pnode = ial.nodes[0]
3538
    self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
3539
                 self.op.instance_name, self.op.iallocator,
3540
                 ", ".join(ial.nodes))
3541
    if ial.required_nodes == 2:
3542
      self.op.snode = ial.nodes[1]
3543

    
3544
  def BuildHooksEnv(self):
3545
    """Build hooks env.
3546

3547
    This runs on master, primary and secondary nodes of the instance.
3548

3549
    """
3550
    env = {
3551
      "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
3552
      "INSTANCE_DISK_SIZE": ",".join(str(d["size"]) for d in self.disks),
3553
      "INSTANCE_ADD_MODE": self.op.mode,
3554
      }
3555
    if self.op.mode == constants.INSTANCE_IMPORT:
3556
      env["INSTANCE_SRC_NODE"] = self.op.src_node
3557
      env["INSTANCE_SRC_PATH"] = self.op.src_path
3558
      env["INSTANCE_SRC_IMAGES"] = self.src_images
3559

    
3560
    env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
3561
      primary_node=self.op.pnode,
3562
      secondary_nodes=self.secondaries,
3563
      status=self.instance_status,
3564
      os_type=self.op.os_type,
3565
      memory=self.be_full[constants.BE_MEMORY],
3566
      vcpus=self.be_full[constants.BE_VCPUS],
3567
      nics=[(n.ip, n.bridge, n.mac) for n in self.nics],
3568
    ))
3569

    
3570
    nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
3571
          self.secondaries)
3572
    return env, nl, nl
3573

    
3574

    
3575
  def CheckPrereq(self):
3576
    """Check prerequisites.
3577

3578
    """
3579
    if (not self.cfg.GetVGName() and
3580
        self.op.disk_template not in constants.DTS_NOT_LVM):
3581
      raise errors.OpPrereqError("Cluster does not support lvm-based"
3582
                                 " instances")
3583

    
3584

    
3585
    if self.op.mode == constants.INSTANCE_IMPORT:
3586
      src_node = self.op.src_node
3587
      src_path = self.op.src_path
3588

    
3589
      export_info = self.rpc.call_export_info(src_node, src_path)
3590

    
3591
      if not export_info:
3592
        raise errors.OpPrereqError("No export found in dir %s" % src_path)
3593

    
3594
      if not export_info.has_section(constants.INISECT_EXP):
3595
        raise errors.ProgrammerError("Corrupted export config")
3596

    
3597
      ei_version = export_info.get(constants.INISECT_EXP, 'version')
3598
      if (int(ei_version) != constants.EXPORT_VERSION):
3599
        raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3600
                                   (ei_version, constants.EXPORT_VERSION))
3601

    
3602
      # Check that the new instance doesn't have less disks than the export
3603
      instance_disks = len(self.disks)
3604
      export_disks = export_info.getint(constants.INISECT_INS, 'disk_count')
3605
      if instance_disks < export_disks:
3606
        raise errors.OpPrereqError("Not enough disks to import."
3607
                                   " (instance: %d, export: %d)" %
3608
                                   (2, export_disks))
3609

    
3610
      self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3611
      disk_images = []
3612
      for idx in range(export_disks):
3613
        option = 'disk%d_dump' % idx
3614
        if export_info.has_option(constants.INISECT_INS, option):
3615
          # FIXME: are the old os-es, disk sizes, etc. useful?
3616
          export_name = export_info.get(constants.INISECT_INS, option)
3617
          image = os.path.join(src_path, export_name)
3618
          disk_images.append(image)
3619
        else:
3620
          disk_images.append(False)
3621

    
3622
      self.src_images = disk_images
3623

    
3624
      old_name = export_info.get(constants.INISECT_INS, 'name')
3625
      # FIXME: int() here could throw a ValueError on broken exports
3626
      exp_nic_count = int(export_info.get(constants.INISECT_INS, 'nic_count'))
3627
      if self.op.instance_name == old_name:
3628
        for idx, nic in enumerate(self.nics):
3629
          if nic.mac == constants.VALUE_AUTO and exp_nic_count >= idx:
3630
            nic_mac_ini = 'nic%d_mac' % idx
3631
            nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini)
3632

    
3633
    # ip ping checks (we use the same ip that was resolved in ExpandNames)
3634
    if self.op.start and not self.op.ip_check:
3635
      raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3636
                                 " adding an instance in start mode")
3637

    
3638
    if self.op.ip_check:
3639
      if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
3640
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
3641
                                   (self.check_ip, self.op.instance_name))
3642

    
3643
    #### allocator run
3644

    
3645
    if self.op.iallocator is not None:
3646
      self._RunAllocator()
3647

    
3648
    #### node related checks
3649

    
3650
    # check primary node
3651
    self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
3652
    assert self.pnode is not None, \
3653
      "Cannot retrieve locked node %s" % self.op.pnode
3654
    self.secondaries = []
3655

    
3656
    # mirror node verification
3657
    if self.op.disk_template in constants.DTS_NET_MIRROR:
3658
      if self.op.snode is None:
3659
        raise errors.OpPrereqError("The networked disk templates need"
3660
                                   " a mirror node")
3661
      if self.op.snode == pnode.name:
3662
        raise errors.OpPrereqError("The secondary node cannot be"
3663
                                   " the primary node.")
3664
      self.secondaries.append(self.op.snode)
3665

    
3666
    nodenames = [pnode.name] + self.secondaries
3667

    
3668
    req_size = _ComputeDiskSize(self.op.disk_template,
3669
                                self.disks)
3670

    
3671
    # Check lv size requirements
3672
    if req_size is not None:
3673
      nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
3674
                                         self.op.hypervisor)
3675
      for node in nodenames:
3676
        info = nodeinfo.get(node, None)
3677
        if not info:
3678
          raise errors.OpPrereqError("Cannot get current information"
3679
                                     " from node '%s'" % node)
3680
        vg_free = info.get('vg_free', None)
3681
        if not isinstance(vg_free, int):
3682
          raise errors.OpPrereqError("Can't compute free disk space on"
3683
                                     " node %s" % node)
3684
        if req_size > info['vg_free']:
3685
          raise errors.OpPrereqError("Not enough disk space on target node %s."
3686
                                     " %d MB available, %d MB required" %
3687
                                     (node, info['vg_free'], req_size))
3688

    
3689
    _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
3690

    
3691
    # os verification
3692
    os_obj = self.rpc.call_os_get(pnode.name, self.op.os_type)
3693
    if not os_obj:
3694
      raise errors.OpPrereqError("OS '%s' not in supported os list for"
3695
                                 " primary node"  % self.op.os_type)
3696

    
3697
    # bridge check on primary node
3698
    bridges = [n.bridge for n in self.nics]
3699
    if not self.rpc.call_bridges_exist(self.pnode.name, bridges):
3700
      raise errors.OpPrereqError("one of the target bridges '%s' does not"
3701
                                 " exist on"
3702
                                 " destination node '%s'" %
3703
                                 (",".join(bridges), pnode.name))
3704

    
3705
    # memory check on primary node
3706
    if self.op.start:
3707
      _CheckNodeFreeMemory(self, self.pnode.name,
3708
                           "creating instance %s" % self.op.instance_name,
3709
                           self.be_full[constants.BE_MEMORY],
3710
                           self.op.hypervisor)
3711

    
3712
    if self.op.start:
3713
      self.instance_status = 'up'
3714
    else:
3715
      self.instance_status = 'down'
3716

    
3717
  def Exec(self, feedback_fn):
3718
    """Create and add the instance to the cluster.
3719

3720
    """
3721
    instance = self.op.instance_name
3722
    pnode_name = self.pnode.name
3723

    
3724
    for nic in self.nics:
3725
      if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
3726
        nic.mac = self.cfg.GenerateMAC()
3727

    
3728
    ht_kind = self.op.hypervisor
3729
    if ht_kind in constants.HTS_REQ_PORT:
3730
      network_port = self.cfg.AllocatePort()
3731
    else:
3732
      network_port = None
3733

    
3734
    ##if self.op.vnc_bind_address is None:
3735
    ##  self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
3736

    
3737
    # this is needed because os.path.join does not accept None arguments
3738
    if self.op.file_storage_dir is None:
3739
      string_file_storage_dir = ""
3740
    else:
3741
      string_file_storage_dir = self.op.file_storage_dir
3742

    
3743
    # build the full file storage dir path
3744
    file_storage_dir = os.path.normpath(os.path.join(
3745
                                        self.cfg.GetFileStorageDir(),
3746
                                        string_file_storage_dir, instance))
3747

    
3748

    
3749
    disks = _GenerateDiskTemplate(self,
3750
                                  self.op.disk_template,
3751
                                  instance, pnode_name,
3752
                                  self.secondaries,
3753
                                  self.disks,
3754
                                  file_storage_dir,
3755
                                  self.op.file_driver,
3756
                                  0)
3757

    
3758
    iobj = objects.Instance(name=instance, os=self.op.os_type,
3759
                            primary_node=pnode_name,
3760
                            nics=self.nics, disks=disks,
3761
                            disk_template=self.op.disk_template,
3762
                            status=self.instance_status,
3763
                            network_port=network_port,
3764
                            beparams=self.op.beparams,
3765
                            hvparams=self.op.hvparams,
3766
                            hypervisor=self.op.hypervisor,
3767
                            )
3768

    
3769
    feedback_fn("* creating instance disks...")
3770
    if not _CreateDisks(self, iobj):
3771
      _RemoveDisks(self, iobj)
3772
      self.cfg.ReleaseDRBDMinors(instance)
3773
      raise errors.OpExecError("Device creation failed, reverting...")
3774

    
3775
    feedback_fn("adding instance %s to cluster config" % instance)
3776

    
3777
    self.cfg.AddInstance(iobj)
3778
    # Declare that we don't want to remove the instance lock anymore, as we've
3779
    # added the instance to the config
3780
    del self.remove_locks[locking.LEVEL_INSTANCE]
3781
    # Remove the temp. assignements for the instance's drbds
3782
    self.cfg.ReleaseDRBDMinors(instance)
3783
    # Unlock all the nodes
3784
    self.context.glm.release(locking.LEVEL_NODE)
3785
    del self.acquired_locks[locking.LEVEL_NODE]
3786

    
3787
    if self.op.wait_for_sync:
3788
      disk_abort = not _WaitForSync(self, iobj)
3789
    elif iobj.disk_template in constants.DTS_NET_MIRROR:
3790
      # make sure the disks are not degraded (still sync-ing is ok)
3791
      time.sleep(15)
3792
      feedback_fn("* checking mirrors status")
3793
      disk_abort = not _WaitForSync(self, iobj, oneshot=True)
3794
    else:
3795
      disk_abort = False
3796

    
3797
    if disk_abort:
3798
      _RemoveDisks(self, iobj)
3799
      self.cfg.RemoveInstance(iobj.name)
3800
      # Make sure the instance lock gets removed
3801
      self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
3802
      raise errors.OpExecError("There are some degraded disks for"
3803
                               " this instance")
3804

    
3805
    feedback_fn("creating os for instance %s on node %s" %
3806
                (instance, pnode_name))
3807

    
3808
    if iobj.disk_template != constants.DT_DISKLESS:
3809
      if self.op.mode == constants.INSTANCE_CREATE:
3810
        feedback_fn("* running the instance OS create scripts...")
3811
        if not self.rpc.call_instance_os_add(pnode_name, iobj):
3812
          raise errors.OpExecError("could not add os for instance %s"
3813
                                   " on node %s" %
3814
                                   (instance, pnode_name))
3815

    
3816
      elif self.op.mode == constants.INSTANCE_IMPORT:
3817
        feedback_fn("* running the instance OS import scripts...")
3818
        src_node = self.op.src_node
3819
        src_images = self.src_images
3820
        cluster_name = self.cfg.GetClusterName()
3821
        import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
3822
                                                         src_node, src_images,
3823
                                                         cluster_name)
3824
        for idx, result in enumerate(import_result):
3825
          if not result:
3826
            self.LogWarning("Could not image %s for on instance %s, disk %d,"
3827
                            " on node %s" % (src_images[idx], instance, idx,
3828
                                             pnode_name))
3829
      else:
3830
        # also checked in the prereq part
3831
        raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3832
                                     % self.op.mode)
3833

    
3834
    if self.op.start:
3835
      logging.info("Starting instance %s on node %s", instance, pnode_name)
3836
      feedback_fn("* starting instance...")
3837
      if not self.rpc.call_instance_start(pnode_name, iobj, None):
3838
        raise errors.OpExecError("Could not start instance")
3839

    
3840

    
3841
class LUConnectConsole(NoHooksLU):
3842
  """Connect to an instance's console.
3843

3844
  This is somewhat special in that it returns the command line that
3845
  you need to run on the master node in order to connect to the
3846
  console.
3847

3848
  """
3849
  _OP_REQP = ["instance_name"]
3850
  REQ_BGL = False
3851

    
3852
  def ExpandNames(self):
3853
    self._ExpandAndLockInstance()
3854

    
3855
  def CheckPrereq(self):
3856
    """Check prerequisites.
3857

3858
    This checks that the instance is in the cluster.
3859

3860
    """
3861
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3862
    assert self.instance is not None, \
3863
      "Cannot retrieve locked instance %s" % self.op.instance_name
3864

    
3865
  def Exec(self, feedback_fn):
3866
    """Connect to the console of an instance
3867

3868
    """
3869
    instance = self.instance
3870
    node = instance.primary_node
3871

    
3872
    node_insts = self.rpc.call_instance_list([node],
3873
                                             [instance.hypervisor])[node]
3874
    if node_insts is False:
3875
      raise errors.OpExecError("Can't connect to node %s." % node)
3876

    
3877
    if instance.name not in node_insts:
3878
      raise errors.OpExecError("Instance %s is not running." % instance.name)
3879

    
3880
    logging.debug("Connecting to console of %s on %s", instance.name, node)
3881

    
3882
    hyper = hypervisor.GetHypervisor(instance.hypervisor)
3883
    console_cmd = hyper.GetShellCommandForConsole(instance)
3884

    
3885
    # build ssh cmdline
3886
    return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
3887

    
3888

    
3889
class LUReplaceDisks(LogicalUnit):
3890
  """Replace the disks of an instance.
3891

3892
  """
3893
  HPATH = "mirrors-replace"
3894
  HTYPE = constants.HTYPE_INSTANCE
3895
  _OP_REQP = ["instance_name", "mode", "disks"]
3896
  REQ_BGL = False
3897

    
3898
  def ExpandNames(self):
3899
    self._ExpandAndLockInstance()
3900

    
3901
    if not hasattr(self.op, "remote_node"):
3902
      self.op.remote_node = None
3903

    
3904
    ia_name = getattr(self.op, "iallocator", None)
3905
    if ia_name is not None:
3906
      if self.op.remote_node is not None:
3907
        raise errors.OpPrereqError("Give either the iallocator or the new"
3908
                                   " secondary, not both")
3909
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3910
    elif self.op.remote_node is not None:
3911
      remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
3912
      if remote_node is None:
3913
        raise errors.OpPrereqError("Node '%s' not known" %
3914
                                   self.op.remote_node)
3915
      self.op.remote_node = remote_node
3916
      self.needed_locks[locking.LEVEL_NODE] = [remote_node]
3917
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
3918
    else:
3919
      self.needed_locks[locking.LEVEL_NODE] = []
3920
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3921

    
3922
  def DeclareLocks(self, level):
3923
    # If we're not already locking all nodes in the set we have to declare the
3924
    # instance's primary/secondary nodes.
3925
    if (level == locking.LEVEL_NODE and
3926
        self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
3927
      self._LockInstancesNodes()
3928

    
3929
  def _RunAllocator(self):
3930
    """Compute a new secondary node using an IAllocator.
3931

3932
    """
3933
    ial = IAllocator(self,
3934
                     mode=constants.IALLOCATOR_MODE_RELOC,
3935
                     name=self.op.instance_name,
3936
                     relocate_from=[self.sec_node])
3937

    
3938
    ial.Run(self.op.iallocator)
3939

    
3940
    if not ial.success:
3941
      raise errors.OpPrereqError("Can't compute nodes using"
3942
                                 " iallocator '%s': %s" % (self.op.iallocator,
3943
                                                           ial.info))
3944
    if len(ial.nodes) != ial.required_nodes:
3945
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3946
                                 " of nodes (%s), required %s" %
3947
                                 (len(ial.nodes), ial.required_nodes))
3948
    self.op.remote_node = ial.nodes[0]
3949
    self.LogInfo("Selected new secondary for the instance: %s",
3950
                 self.op.remote_node)
3951

    
3952
  def BuildHooksEnv(self):
3953
    """Build hooks env.
3954

3955
    This runs on the master, the primary and all the secondaries.
3956

3957
    """
3958
    env = {
3959
      "MODE": self.op.mode,
3960
      "NEW_SECONDARY": self.op.remote_node,
3961
      "OLD_SECONDARY": self.instance.secondary_nodes[0],
3962
      }
3963
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3964
    nl = [
3965
      self.cfg.GetMasterNode(),
3966
      self.instance.primary_node,
3967
      ]
3968
    if self.op.remote_node is not None:
3969
      nl.append(self.op.remote_node)
3970
    return env, nl, nl
3971

    
3972
  def CheckPrereq(self):
3973
    """Check prerequisites.
3974

3975
    This checks that the instance is in the cluster.
3976

3977
    """
3978
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3979
    assert instance is not None, \
3980
      "Cannot retrieve locked instance %s" % self.op.instance_name
3981
    self.instance = instance
3982

    
3983
    if instance.disk_template not in constants.DTS_NET_MIRROR:
3984
      raise errors.OpPrereqError("Instance's disk layout is not"
3985
                                 " network mirrored.")
3986

    
3987
    if len(instance.secondary_nodes) != 1:
3988
      raise errors.OpPrereqError("The instance has a strange layout,"
3989
                                 " expected one secondary but found %d" %
3990
                                 len(instance.secondary_nodes))
3991

    
3992
    self.sec_node = instance.secondary_nodes[0]
3993

    
3994
    ia_name = getattr(self.op, "iallocator", None)
3995
    if ia_name is not None:
3996
      self._RunAllocator()
3997

    
3998
    remote_node = self.op.remote_node
3999
    if remote_node is not None:
4000
      self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
4001
      assert self.remote_node_info is not None, \
4002
        "Cannot retrieve locked node %s" % remote_node
4003
    else:
4004
      self.remote_node_info = None
4005
    if remote_node == instance.primary_node:
4006
      raise errors.OpPrereqError("The specified node is the primary node of"
4007
                                 " the instance.")
4008
    elif remote_node == self.sec_node:
4009
      if self.op.mode == constants.REPLACE_DISK_SEC:
4010
        # this is for DRBD8, where we can't execute the same mode of
4011
        # replacement as for drbd7 (no different port allocated)
4012
        raise errors.OpPrereqError("Same secondary given, cannot execute"
4013
                                   " replacement")
4014
    if instance.disk_template == constants.DT_DRBD8:
4015
      if (self.op.mode == constants.REPLACE_DISK_ALL and
4016
          remote_node is not None):
4017
        # switch to replace secondary mode
4018
        self.op.mode = constants.REPLACE_DISK_SEC
4019

    
4020
      if self.op.mode == constants.REPLACE_DISK_ALL:
4021
        raise errors.OpPrereqError("Template 'drbd' only allows primary or"
4022
                                   " secondary disk replacement, not"
4023
                                   " both at once")
4024
      elif self.op.mode == constants.REPLACE_DISK_PRI:
4025
        if remote_node is not None:
4026
          raise errors.OpPrereqError("Template 'drbd' does not allow changing"
4027
                                     " the secondary while doing a primary"
4028
                                     " node disk replacement")
4029
        self.tgt_node = instance.primary_node
4030
        self.oth_node = instance.secondary_nodes[0]
4031
      elif self.op.mode == constants.REPLACE_DISK_SEC:
4032
        self.new_node = remote_node # this can be None, in which case
4033
                                    # we don't change the secondary
4034
        self.tgt_node = instance.secondary_nodes[0]
4035
        self.oth_node = instance.primary_node
4036
      else:
4037
        raise errors.ProgrammerError("Unhandled disk replace mode")
4038

    
4039
    if not self.op.disks:
4040
      self.op.disks = range(len(instance.disks))
4041

    
4042
    for disk_idx in self.op.disks:
4043
      instance.FindDisk(disk_idx)
4044

    
4045
  def _ExecD8DiskOnly(self, feedback_fn):
4046
    """Replace a disk on the primary or secondary for dbrd8.
4047

4048
    The algorithm for replace is quite complicated:
4049

4050
      1. for each disk to be replaced:
4051

4052
        1. create new LVs on the target node with unique names
4053
        1. detach old LVs from the drbd device
4054
        1. rename old LVs to name_replaced.<time_t>
4055
        1. rename new LVs to old LVs
4056
        1. attach the new LVs (with the old names now) to the drbd device
4057

4058
      1. wait for sync across all devices
4059

4060
      1. for each modified disk:
4061

4062
        1. remove old LVs (which have the name name_replaces.<time_t>)
4063

4064
    Failures are not very well handled.
4065

4066
    """
4067
    steps_total = 6
4068
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
4069
    instance = self.instance
4070
    iv_names = {}
4071
    vgname = self.cfg.GetVGName()
4072
    # start of work
4073
    cfg = self.cfg
4074
    tgt_node = self.tgt_node
4075
    oth_node = self.oth_node
4076

    
4077
    # Step: check device activation
4078
    self.proc.LogStep(1, steps_total, "check device existence")
4079
    info("checking volume groups")
4080
    my_vg = cfg.GetVGName()
4081
    results = self.rpc.call_vg_list([oth_node, tgt_node])
4082
    if not results:
4083
      raise errors.OpExecError("Can't list volume groups on the nodes")
4084
    for node in oth_node, tgt_node:
4085
      res = results.get(node, False)
4086
      if not res or my_vg not in res:
4087
        raise errors.OpExecError("Volume group '%s' not found on %s" %
4088
                                 (my_vg, node))
4089
    for idx, dev in enumerate(instance.disks):
4090
      if idx not in self.op.disks:
4091
        continue
4092
      for node in tgt_node, oth_node:
4093
        info("checking disk/%d on %s" % (idx, node))
4094
        cfg.SetDiskID(dev, node)
4095
        if not self.rpc.call_blockdev_find(node, dev):
4096
          raise errors.OpExecError("Can't find disk/%d on node %s" %
4097
                                   (idx, node))
4098

    
4099
    # Step: check other node consistency
4100
    self.proc.LogStep(2, steps_total, "check peer consistency")
4101
    for idx, dev in enumerate(instance.disks):
4102
      if idx not in self.op.disks:
4103
        continue
4104
      info("checking disk/%d consistency on %s" % (idx, oth_node))
4105
      if not _CheckDiskConsistency(self, dev, oth_node,
4106
                                   oth_node==instance.primary_node):
4107
        raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
4108
                                 " to replace disks on this node (%s)" %
4109
                                 (oth_node, tgt_node))
4110

    
4111
    # Step: create new storage
4112
    self.proc.LogStep(3, steps_total, "allocate new storage")
4113
    for idx, dev in enumerate(instance.disks):
4114
      if idx not in self.op.disks:
4115
        continue
4116
      size = dev.size
4117
      cfg.SetDiskID(dev, tgt_node)
4118
      lv_names = [".disk%d_%s" % (idx, suf)
4119
                  for suf in ["data", "meta"]]
4120
      names = _GenerateUniqueNames(self, lv_names)
4121
      lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
4122
                             logical_id=(vgname, names[0]))
4123
      lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
4124
                             logical_id=(vgname, names[1]))
4125
      new_lvs = [lv_data, lv_meta]
4126
      old_lvs = dev.children
4127
      iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
4128
      info("creating new local storage on %s for %s" %
4129
           (tgt_node, dev.iv_name))
4130
      # since we *always* want to create this LV, we use the
4131
      # _Create...OnPrimary (which forces the creation), even if we
4132
      # are talking about the secondary node
4133
      for new_lv in new_lvs:
4134
        if not _CreateBlockDevOnPrimary(self, tgt_node, instance, new_lv,
4135
                                        _GetInstanceInfoText(instance)):
4136
          raise errors.OpExecError("Failed to create new LV named '%s' on"
4137
                                   " node '%s'" %
4138
                                   (new_lv.logical_id[1], tgt_node))
4139

    
4140
    # Step: for each lv, detach+rename*2+attach
4141
    self.proc.LogStep(4, steps_total, "change drbd configuration")
4142
    for dev, old_lvs, new_lvs in iv_names.itervalues():
4143
      info("detaching %s drbd from local storage" % dev.iv_name)
4144
      if not self.rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
4145
        raise errors.OpExecError("Can't detach drbd from local storage on node"
4146
                                 " %s for device %s" % (tgt_node, dev.iv_name))
4147
      #dev.children = []
4148
      #cfg.Update(instance)
4149

    
4150
      # ok, we created the new LVs, so now we know we have the needed
4151
      # storage; as such, we proceed on the target node to rename
4152
      # old_lv to _old, and new_lv to old_lv; note that we rename LVs
4153
      # using the assumption that logical_id == physical_id (which in
4154
      # turn is the unique_id on that node)
4155

    
4156
      # FIXME(iustin): use a better name for the replaced LVs
4157
      temp_suffix = int(time.time())
4158
      ren_fn = lambda d, suff: (d.physical_id[0],
4159
                                d.physical_id[1] + "_replaced-%s" % suff)
4160
      # build the rename list based on what LVs exist on the node
4161
      rlist = []
4162
      for to_ren in old_lvs:
4163
        find_res = self.rpc.call_blockdev_find(tgt_node, to_ren)
4164
        if find_res is not None: # device exists
4165
          rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
4166

    
4167
      info("renaming the old LVs on the target node")
4168
      if not self.rpc.call_blockdev_rename(tgt_node, rlist):
4169
        raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
4170
      # now we rename the new LVs to the old LVs
4171
      info("renaming the new LVs on the target node")
4172
      rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
4173
      if not self.rpc.call_blockdev_rename(tgt_node, rlist):
4174
        raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
4175

    
4176
      for old, new in zip(old_lvs, new_lvs):
4177
        new.logical_id = old.logical_id
4178
        cfg.SetDiskID(new, tgt_node)
4179

    
4180
      for disk in old_lvs:
4181
        disk.logical_id = ren_fn(disk, temp_suffix)
4182
        cfg.SetDiskID(disk, tgt_node)
4183

    
4184
      # now that the new lvs have the old name, we can add them to the device
4185
      info("adding new mirror component on %s" % tgt_node)
4186
      if not self.rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
4187
        for new_lv in new_lvs:
4188
          if not self.rpc.call_blockdev_remove(tgt_node, new_lv):
4189
            warning("Can't rollback device %s", hint="manually cleanup unused"
4190
                    " logical volumes")
4191
        raise errors.OpExecError("Can't add local storage to drbd")
4192

    
4193
      dev.children = new_lvs
4194
      cfg.Update(instance)
4195

    
4196
    # Step: wait for sync
4197

    
4198
    # this can fail as the old devices are degraded and _WaitForSync
4199
    # does a combined result over all disks, so we don't check its
4200
    # return value
4201
    self.proc.LogStep(5, steps_total, "sync devices")
4202
    _WaitForSync(self, instance, unlock=True)
4203

    
4204
    # so check manually all the devices
4205
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
4206
      cfg.SetDiskID(dev, instance.primary_node)
4207
      is_degr = self.rpc.call_blockdev_find(instance.primary_node, dev)[5]
4208
      if is_degr:
4209
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
4210

    
4211
    # Step: remove old storage
4212
    self.proc.LogStep(6, steps_total, "removing old storage")
4213
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
4214
      info("remove logical volumes for %s" % name)
4215
      for lv in old_lvs:
4216
        cfg.SetDiskID(lv, tgt_node)
4217
        if not self.rpc.call_blockdev_remove(tgt_node, lv):
4218
          warning("Can't remove old LV", hint="manually remove unused LVs")
4219
          continue
4220

    
4221
  def _ExecD8Secondary(self, feedback_fn):
4222
    """Replace the secondary node for drbd8.
4223

4224
    The algorithm for replace is quite complicated:
4225
      - for all disks of the instance:
4226
        - create new LVs on the new node with same names
4227
        - shutdown the drbd device on the old secondary
4228
        - disconnect the drbd network on the primary
4229
        - create the drbd device on the new secondary
4230
        - network attach the drbd on the primary, using an artifice:
4231
          the drbd code for Attach() will connect to the network if it
4232
          finds a device which is connected to the good local disks but
4233
          not network enabled
4234
      - wait for sync across all devices
4235
      - remove all disks from the old secondary
4236

4237
    Failures are not very well handled.
4238

4239
    """
4240
    steps_total = 6
4241
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
4242
    instance = self.instance
4243
    iv_names = {}
4244
    vgname = self.cfg.GetVGName()
4245
    # start of work
4246
    cfg = self.cfg
4247
    old_node = self.tgt_node
4248
    new_node = self.new_node
4249
    pri_node = instance.primary_node
4250

    
4251
    # Step: check device activation
4252
    self.proc.LogStep(1, steps_total, "check device existence")
4253
    info("checking volume groups")
4254
    my_vg = cfg.GetVGName()
4255
    results = self.rpc.call_vg_list([pri_node, new_node])
4256
    if not results:
4257
      raise errors.OpExecError("Can't list volume groups on the nodes")
4258
    for node in pri_node, new_node:
4259
      res = results.get(node, False)
4260
      if not res or my_vg not in res:
4261
        raise errors.OpExecError("Volume group '%s' not found on %s" %
4262
                                 (my_vg, node))
4263
    for idx, dev in enumerate(instance.disks):
4264
      if idx not in self.op.disks:
4265
        continue
4266
      info("checking disk/%d on %s" % (idx, pri_node))
4267
      cfg.SetDiskID(dev, pri_node)
4268
      if not self.rpc.call_blockdev_find(pri_node, dev):
4269
        raise errors.OpExecError("Can't find disk/%d on node %s" %
4270
                                 (idx, pri_node))
4271

    
4272
    # Step: check other node consistency
4273
    self.proc.LogStep(2, steps_total, "check peer consistency")
4274
    for idx, dev in enumerate(instance.disks):
4275
      if idx not in self.op.disks:
4276
        continue
4277
      info("checking disk/%d consistency on %s" % (idx, pri_node))
4278
      if not _CheckDiskConsistency(self, dev, pri_node, True, ldisk=True):
4279
        raise errors.OpExecError("Primary node (%s) has degraded storage,"
4280
                                 " unsafe to replace the secondary" %
4281
                                 pri_node)
4282

    
4283
    # Step: create new storage
4284
    self.proc.LogStep(3, steps_total, "allocate new storage")
4285
    for idx, dev in enumerate(instance.disks):
4286
      size = dev.size
4287
      info("adding new local storage on %s for disk/%d" %
4288
           (new_node, idx))
4289
      # since we *always* want to create this LV, we use the
4290
      # _Create...OnPrimary (which forces the creation), even if we
4291
      # are talking about the secondary node
4292
      for new_lv in dev.children:
4293
        if not _CreateBlockDevOnPrimary(self, new_node, instance, new_lv,
4294
                                        _GetInstanceInfoText(instance)):
4295
          raise errors.OpExecError("Failed to create new LV named '%s' on"
4296
                                   " node '%s'" %
4297
                                   (new_lv.logical_id[1], new_node))
4298

    
4299
    # Step 4: dbrd minors and drbd setups changes
4300
    # after this, we must manually remove the drbd minors on both the
4301
    # error and the success paths
4302
    minors = cfg.AllocateDRBDMinor([new_node for dev in instance.disks],
4303
                                   instance.name)
4304
    logging.debug("Allocated minors %s" % (minors,))
4305
    self.proc.LogStep(4, steps_total, "changing drbd configuration")
4306
    for idx, (dev, new_minor) in enumerate(zip(instance.disks, minors)):
4307
      size = dev.size
4308
      info("activating a new drbd on %s for disk/%d" % (new_node, idx))
4309
      # create new devices on new_node
4310
      if pri_node == dev.logical_id[0]:
4311
        new_logical_id = (pri_node, new_node,
4312
                          dev.logical_id[2], dev.logical_id[3], new_minor,
4313
                          dev.logical_id[5])
4314
      else:
4315
        new_logical_id = (new_node, pri_node,
4316
                          dev.logical_id[2], new_minor, dev.logical_id[4],
4317
                          dev.logical_id[5])
4318
      iv_names[idx] = (dev, dev.children, new_logical_id)
4319
      logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
4320
                    new_logical_id)
4321
      new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
4322
                              logical_id=new_logical_id,
4323
                              children=dev.children)
4324
      if not _CreateBlockDevOnSecondary(self, new_node, instance,
4325
                                        new_drbd, False,
4326
                                        _GetInstanceInfoText(instance)):
4327
        self.cfg.ReleaseDRBDMinors(instance.name)
4328
        raise errors.OpExecError("Failed to create new DRBD on"
4329
                                 " node '%s'" % new_node)
4330

    
4331
    for idx, dev in enumerate(instance.disks):
4332
      # we have new devices, shutdown the drbd on the old secondary
4333
      info("shutting down drbd for disk/%d on old node" % idx)
4334
      cfg.SetDiskID(dev, old_node)
4335
      if not self.rpc.call_blockdev_shutdown(old_node, dev):
4336
        warning("Failed to shutdown drbd for disk/%d on old node" % idx,
4337
                hint="Please cleanup this device manually as soon as possible")
4338

    
4339
    info("detaching primary drbds from the network (=> standalone)")
4340
    done = 0
4341
    for idx, dev in enumerate(instance.disks):
4342
      cfg.SetDiskID(dev, pri_node)
4343
      # set the network part of the physical (unique in bdev terms) id
4344
      # to None, meaning detach from network
4345
      dev.physical_id = (None, None, None, None) + dev.physical_id[4:]
4346
      # and 'find' the device, which will 'fix' it to match the
4347
      # standalone state
4348
      if self.rpc.call_blockdev_find(pri_node, dev):
4349
        done += 1
4350
      else:
4351
        warning("Failed to detach drbd disk/%d from network, unusual case" %
4352
                idx)
4353

    
4354
    if not done:
4355
      # no detaches succeeded (very unlikely)
4356
      self.cfg.ReleaseDRBDMinors(instance.name)
4357
      raise errors.OpExecError("Can't detach at least one DRBD from old node")
4358

    
4359
    # if we managed to detach at least one, we update all the disks of
4360
    # the instance to point to the new secondary
4361
    info("updating instance configuration")
4362
    for dev, _, new_logical_id in iv_names.itervalues():
4363
      dev.logical_id = new_logical_id
4364
      cfg.SetDiskID(dev, pri_node)
4365
    cfg.Update(instance)
4366
    # we can remove now the temp minors as now the new values are
4367
    # written to the config file (and therefore stable)
4368
    self.cfg.ReleaseDRBDMinors(instance.name)
4369

    
4370
    # and now perform the drbd attach
4371
    info("attaching primary drbds to new secondary (standalone => connected)")
4372
    failures = []
4373
    for idx, dev in enumerate(instance.disks):
4374
      info("attaching primary drbd for disk/%d to new secondary node" % idx)
4375
      # since the attach is smart, it's enough to 'find' the device,
4376
      # it will automatically activate the network, if the physical_id
4377
      # is correct
4378
      cfg.SetDiskID(dev, pri_node)
4379
      logging.debug("Disk to attach: %s", dev)
4380
      if not self.rpc.call_blockdev_find(pri_node, dev):
4381
        warning("can't attach drbd disk/%d to new secondary!" % idx,
4382
                "please do a gnt-instance info to see the status of disks")
4383

    
4384
    # this can fail as the old devices are degraded and _WaitForSync
4385
    # does a combined result over all disks, so we don't check its
4386
    # return value
4387
    self.proc.LogStep(5, steps_total, "sync devices")
4388
    _WaitForSync(self, instance, unlock=True)
4389

    
4390
    # so check manually all the devices
4391
    for idx, (dev, old_lvs, _) in iv_names.iteritems():
4392
      cfg.SetDiskID(dev, pri_node)
4393
      is_degr = self.rpc.call_blockdev_find(pri_node, dev)[5]
4394
      if is_degr:
4395
        raise errors.OpExecError("DRBD device disk/%d is degraded!" % idx)
4396

    
4397
    self.proc.LogStep(6, steps_total, "removing old storage")
4398
    for idx, (dev, old_lvs, _) in iv_names.iteritems():
4399
      info("remove logical volumes for disk/%d" % idx)
4400
      for lv in old_lvs:
4401
        cfg.SetDiskID(lv, old_node)
4402
        if not self.rpc.call_blockdev_remove(old_node, lv):
4403
          warning("Can't remove LV on old secondary",
4404
                  hint="Cleanup stale volumes by hand")
4405

    
4406
  def Exec(self, feedback_fn):
4407
    """Execute disk replacement.
4408

4409
    This dispatches the disk replacement to the appropriate handler.
4410

4411
    """
4412
    instance = self.instance
4413

    
4414
    # Activate the instance disks if we're replacing them on a down instance
4415
    if instance.status == "down":
4416
      _StartInstanceDisks(self, instance, True)
4417

    
4418
    if instance.disk_template == constants.DT_DRBD8:
4419
      if self.op.remote_node is None:
4420
        fn = self._ExecD8DiskOnly
4421
      else:
4422
        fn = self._ExecD8Secondary
4423
    else:
4424
      raise errors.ProgrammerError("Unhandled disk replacement case")
4425

    
4426
    ret = fn(feedback_fn)
4427

    
4428
    # Deactivate the instance disks if we're replacing them on a down instance
4429
    if instance.status == "down":
4430
      _SafeShutdownInstanceDisks(self, instance)
4431

    
4432
    return ret
4433

    
4434

    
4435
class LUGrowDisk(LogicalUnit):
4436
  """Grow a disk of an instance.
4437

4438
  """
4439
  HPATH = "disk-grow"
4440
  HTYPE = constants.HTYPE_INSTANCE
4441
  _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
4442
  REQ_BGL = False
4443

    
4444
  def ExpandNames(self):
4445
    self._ExpandAndLockInstance()
4446
    self.needed_locks[locking.LEVEL_NODE] = []
4447
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4448

    
4449
  def DeclareLocks(self, level):
4450
    if level == locking.LEVEL_NODE:
4451
      self._LockInstancesNodes()
4452

    
4453
  def BuildHooksEnv(self):
4454
    """Build hooks env.
4455

4456
    This runs on the master, the primary and all the secondaries.
4457

4458
    """
4459
    env = {
4460
      "DISK": self.op.disk,
4461
      "AMOUNT": self.op.amount,
4462
      }
4463
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
4464
    nl = [
4465
      self.cfg.GetMasterNode(),
4466
      self.instance.primary_node,
4467
      ]
4468
    return env, nl, nl
4469

    
4470
  def CheckPrereq(self):
4471
    """Check prerequisites.
4472

4473
    This checks that the instance is in the cluster.
4474

4475
    """
4476
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4477
    assert instance is not None, \
4478
      "Cannot retrieve locked instance %s" % self.op.instance_name
4479

    
4480
    self.instance = instance
4481

    
4482
    if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
4483
      raise errors.OpPrereqError("Instance's disk layout does not support"
4484
                                 " growing.")
4485

    
4486
    self.disk = instance.FindDisk(self.op.disk)
4487

    
4488
    nodenames = [instance.primary_node] + list(instance.secondary_nodes)
4489
    nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
4490
                                       instance.hypervisor)
4491
    for node in nodenames:
4492
      info = nodeinfo.get(node, None)
4493
      if not info:
4494
        raise errors.OpPrereqError("Cannot get current information"
4495
                                   " from node '%s'" % node)
4496
      vg_free = info.get('vg_free', None)
4497
      if not isinstance(vg_free, int):
4498
        raise errors.OpPrereqError("Can't compute free disk space on"
4499
                                   " node %s" % node)
4500
      if self.op.amount > info['vg_free']:
4501
        raise errors.OpPrereqError("Not enough disk space on target node %s:"
4502
                                   " %d MiB available, %d MiB required" %
4503
                                   (node, info['vg_free'], self.op.amount))
4504

    
4505
  def Exec(self, feedback_fn):
4506
    """Execute disk grow.
4507

4508
    """
4509
    instance = self.instance
4510
    disk = self.disk
4511
    for node in (instance.secondary_nodes + (instance.primary_node,)):
4512
      self.cfg.SetDiskID(disk, node)
4513
      result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
4514
      if (not result or not isinstance(result, (list, tuple)) or
4515
          len(result) != 2):
4516
        raise errors.OpExecError("grow request failed to node %s" % node)
4517
      elif not result[0]:
4518
        raise errors.OpExecError("grow request failed to node %s: %s" %
4519
                                 (node, result[1]))
4520
    disk.RecordGrow(self.op.amount)
4521
    self.cfg.Update(instance)
4522
    if self.op.wait_for_sync:
4523
      disk_abort = not _WaitForSync(self, instance)
4524
      if disk_abort:
4525
        self.proc.LogWarning("Warning: disk sync-ing has not returned a good"
4526
                             " status.\nPlease check the instance.")
4527

    
4528

    
4529
class LUQueryInstanceData(NoHooksLU):
4530
  """Query runtime instance data.
4531

4532
  """
4533
  _OP_REQP = ["instances", "static"]
4534
  REQ_BGL = False
4535

    
4536
  def ExpandNames(self):
4537
    self.needed_locks = {}
4538
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
4539

    
4540
    if not isinstance(self.op.instances, list):
4541
      raise errors.OpPrereqError("Invalid argument type 'instances'")
4542

    
4543
    if self.op.instances:
4544
      self.wanted_names = []
4545
      for name in self.op.instances:
4546
        full_name = self.cfg.ExpandInstanceName(name)
4547
        if full_name is None:
4548
          raise errors.OpPrereqError("Instance '%s' not known" %
4549
                                     self.op.instance_name)
4550
        self.wanted_names.append(full_name)
4551
      self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
4552
    else:
4553
      self.wanted_names = None
4554
      self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
4555

    
4556
    self.needed_locks[locking.LEVEL_NODE] = []
4557
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4558

    
4559
  def DeclareLocks(self, level):
4560
    if level == locking.LEVEL_NODE:
4561
      self._LockInstancesNodes()
4562

    
4563
  def CheckPrereq(self):
4564
    """Check prerequisites.
4565

4566
    This only checks the optional instance list against the existing names.
4567

4568
    """
4569
    if self.wanted_names is None:
4570
      self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
4571

    
4572
    self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
4573
                             in self.wanted_names]
4574
    return
4575

    
4576
  def _ComputeDiskStatus(self, instance, snode, dev):
4577
    """Compute block device status.
4578

4579
    """
4580
    static = self.op.static
4581
    if not static:
4582
      self.cfg.SetDiskID(dev, instance.primary_node)
4583
      dev_pstatus = self.rpc.call_blockdev_find(instance.primary_node, dev)
4584
    else:
4585
      dev_pstatus = None
4586

    
4587
    if dev.dev_type in constants.LDS_DRBD:
4588
      # we change the snode then (otherwise we use the one passed in)
4589
      if dev.logical_id[0] == instance.primary_node:
4590
        snode = dev.logical_id[1]
4591
      else:
4592
        snode = dev.logical_id[0]
4593

    
4594
    if snode and not static:
4595
      self.cfg.SetDiskID(dev, snode)
4596
      dev_sstatus = self.rpc.call_blockdev_find(snode, dev)
4597
    else:
4598
      dev_sstatus = None
4599

    
4600
    if dev.children:
4601
      dev_children = [self._ComputeDiskStatus(instance, snode, child)
4602
                      for child in dev.children]
4603
    else:
4604
      dev_children = []
4605

    
4606
    data = {
4607
      "iv_name": dev.iv_name,
4608
      "dev_type": dev.dev_type,
4609
      "logical_id": dev.logical_id,
4610
      "physical_id": dev.physical_id,
4611
      "pstatus": dev_pstatus,
4612
      "sstatus": dev_sstatus,
4613
      "children": dev_children,
4614
      "mode": dev.mode,
4615
      }
4616

    
4617
    return data
4618

    
4619
  def Exec(self, feedback_fn):
4620
    """Gather and return data"""
4621
    result = {}
4622

    
4623
    cluster = self.cfg.GetClusterInfo()
4624

    
4625
    for instance in self.wanted_instances:
4626
      if not self.op.static:
4627
        remote_info = self.rpc.call_instance_info(instance.primary_node,
4628
                                                  instance.name,
4629
                                                  instance.hypervisor)
4630
        if remote_info and "state" in remote_info:
4631
          remote_state = "up"
4632
        else:
4633
          remote_state = "down"
4634
      else:
4635
        remote_state = None
4636
      if instance.status == "down":
4637
        config_state = "down"
4638
      else:
4639
        config_state = "up"
4640

    
4641
      disks = [self._ComputeDiskStatus(instance, None, device)
4642
               for device in instance.disks]
4643

    
4644
      idict = {
4645
        "name": instance.name,
4646
        "config_state": config_state,
4647
        "run_state": remote_state,
4648
        "pnode": instance.primary_node,
4649
        "snodes": instance.secondary_nodes,
4650
        "os": instance.os,
4651
        "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
4652
        "disks": disks,
4653
        "hypervisor": instance.hypervisor,
4654
        "network_port": instance.network_port,
4655
        "hv_instance": instance.hvparams,
4656
        "hv_actual": cluster.FillHV(instance),
4657
        "be_instance": instance.beparams,
4658
        "be_actual": cluster.FillBE(instance),
4659
        }
4660

    
4661
      result[instance.name] = idict
4662

    
4663
    return result
4664

    
4665

    
4666
class LUSetInstanceParams(LogicalUnit):
4667
  """Modifies an instances's parameters.
4668

4669
  """
4670
  HPATH = "instance-modify"
4671
  HTYPE = constants.HTYPE_INSTANCE
4672
  _OP_REQP = ["instance_name"]
4673
  REQ_BGL = False
4674

    
4675
  def CheckArguments(self):
4676
    if not hasattr(self.op, 'nics'):
4677
      self.op.nics = []
4678
    if not hasattr(self.op, 'disks'):
4679
      self.op.disks = []
4680
    if not hasattr(self.op, 'beparams'):
4681
      self.op.beparams = {}
4682
    if not hasattr(self.op, 'hvparams'):
4683
      self.op.hvparams = {}
4684
    self.op.force = getattr(self.op, "force", False)
4685
    if not (self.op.nics or self.op.disks or
4686
            self.op.hvparams or self.op.beparams):
4687
      raise errors.OpPrereqError("No changes submitted")
4688

    
4689
    for item in (constants.BE_MEMORY, constants.BE_VCPUS):
4690
      val = self.op.beparams.get(item, None)
4691
      if val is not None:
4692
        try:
4693
          val = int(val)
4694
        except ValueError, err:
4695
          raise errors.OpPrereqError("Invalid %s size: %s" % (item, str(err)))
4696
        self.op.beparams[item] = val
4697
    # Disk validation
4698
    disk_addremove = 0
4699
    for disk_op, disk_dict in self.op.disks:
4700
      if disk_op == constants.DDM_REMOVE:
4701
        disk_addremove += 1
4702
        continue
4703
      elif disk_op == constants.DDM_ADD:
4704
        disk_addremove += 1
4705
      else:
4706
        if not isinstance(disk_op, int):
4707
          raise errors.OpPrereqError("Invalid disk index")
4708
      if disk_op == constants.DDM_ADD:
4709
        mode = disk_dict.setdefault('mode', constants.DISK_RDWR)
4710
        if mode not in (constants.DISK_RDONLY, constants.DISK_RDWR):
4711
          raise errors.OpPrereqError("Invalid disk access mode '%s'" % mode)
4712
        size = disk_dict.get('size', None)
4713
        if size is None:
4714
          raise errors.OpPrereqError("Required disk parameter size missing")
4715
        try:
4716
          size = int(size)
4717
        except ValueError, err:
4718
          raise errors.OpPrereqError("Invalid disk size parameter: %s" %
4719
                                     str(err))
4720
        disk_dict['size'] = size
4721
      else:
4722
        # modification of disk
4723
        if 'size' in disk_dict:
4724
          raise errors.OpPrereqError("Disk size change not possible, use"
4725
                                     " grow-disk")
4726

    
4727
    if disk_addremove > 1:
4728
      raise errors.OpPrereqError("Only one disk add or remove operation"
4729
                                 " supported at a time")
4730

    
4731
    # NIC validation
4732
    nic_addremove = 0
4733
    for nic_op, nic_dict in self.op.nics:
4734
      if nic_op == constants.DDM_REMOVE:
4735
        nic_addremove += 1
4736
        continue
4737
      elif nic_op == constants.DDM_ADD:
4738
        nic_addremove += 1
4739
      else:
4740
        if not isinstance(nic_op, int):
4741
          raise errors.OpPrereqError("Invalid nic index")
4742

    
4743
      # nic_dict should be a dict
4744
      nic_ip = nic_dict.get('ip', None)
4745
      if nic_ip is not None:
4746
        if nic_ip.lower() == "none":
4747
          nic_dict['ip'] = None
4748
        else:
4749
          if not utils.IsValidIP(nic_ip):
4750
            raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip)
4751
      # we can only check None bridges and assign the default one
4752
      nic_bridge = nic_dict.get('bridge', None)
4753
      if nic_bridge is None:
4754
        nic_dict['bridge'] = self.cfg.GetDefBridge()
4755
      # but we can validate MACs
4756
      nic_mac = nic_dict.get('mac', None)
4757
      if nic_mac is not None:
4758
        if self.cfg.IsMacInUse(nic_mac):
4759
          raise errors.OpPrereqError("MAC address %s already in use"
4760
                                     " in cluster" % nic_mac)
4761
        if nic_mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4762
          if not utils.IsValidMac(nic_mac):
4763
            raise errors.OpPrereqError("Invalid MAC address %s" % nic_mac)
4764
    if nic_addremove > 1:
4765
      raise errors.OpPrereqError("Only one NIC add or remove operation"
4766
                                 " supported at a time")
4767

    
4768
  def ExpandNames(self):
4769
    self._ExpandAndLockInstance()
4770
    self.needed_locks[locking.LEVEL_NODE] = []
4771
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4772

    
4773
  def DeclareLocks(self, level):
4774
    if level == locking.LEVEL_NODE:
4775
      self._LockInstancesNodes()
4776

    
4777
  def BuildHooksEnv(self):
4778
    """Build hooks env.
4779

4780
    This runs on the master, primary and secondaries.
4781

4782
    """
4783
    args = dict()
4784
    if constants.BE_MEMORY in self.be_new:
4785
      args['memory'] = self.be_new[constants.BE_MEMORY]
4786
    if constants.BE_VCPUS in self.be_new:
4787
      args['vcpus'] = self.be_new[constants.BE_VCPUS]
4788
    # FIXME: readd disk/nic changes
4789
    env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
4790
    nl = [self.cfg.GetMasterNode(),
4791
          self.instance.primary_node] + list(self.instance.secondary_nodes)
4792
    return env, nl, nl
4793

    
4794
  def CheckPrereq(self):
4795
    """Check prerequisites.
4796

4797
    This only checks the instance list against the existing names.
4798

4799
    """
4800
    force = self.force = self.op.force
4801

    
4802
    # checking the new params on the primary/secondary nodes
4803

    
4804
    instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4805
    assert self.instance is not None, \
4806
      "Cannot retrieve locked instance %s" % self.op.instance_name
4807
    pnode = self.instance.primary_node
4808
    nodelist = [pnode]
4809
    nodelist.extend(instance.secondary_nodes)
4810

    
4811
    # hvparams processing
4812
    if self.op.hvparams:
4813
      i_hvdict = copy.deepcopy(instance.hvparams)
4814
      for key, val in self.op.hvparams.iteritems():
4815
        if val is None:
4816
          try:
4817
            del i_hvdict[key]
4818
          except KeyError:
4819
            pass
4820
        else:
4821
          i_hvdict[key] = val
4822
      cluster = self.cfg.GetClusterInfo()
4823
      hv_new = cluster.FillDict(cluster.hvparams[instance.hypervisor],
4824
                                i_hvdict)
4825
      # local check
4826
      hypervisor.GetHypervisor(
4827
        instance.hypervisor).CheckParameterSyntax(hv_new)
4828
      _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
4829
      self.hv_new = hv_new # the new actual values
4830
      self.hv_inst = i_hvdict # the new dict (without defaults)
4831
    else:
4832
      self.hv_new = self.hv_inst = {}
4833

    
4834
    # beparams processing
4835
    if self.op.beparams:
4836
      i_bedict = copy.deepcopy(instance.beparams)
4837
      for key, val in self.op.beparams.iteritems():
4838
        if val is None:
4839
          try:
4840
            del i_bedict[key]
4841
          except KeyError:
4842
            pass
4843
        else:
4844
          i_bedict[key] = val
4845
      cluster = self.cfg.GetClusterInfo()
4846
      be_new = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
4847
                                i_bedict)
4848
      self.be_new = be_new # the new actual values
4849
      self.be_inst = i_bedict # the new dict (without defaults)
4850
    else:
4851
      self.be_new = self.be_inst = {}
4852

    
4853
    self.warn = []
4854

    
4855
    if constants.BE_MEMORY in self.op.beparams and not self.force:
4856
      mem_check_list = [pnode]
4857
      if be_new[constants.BE_AUTO_BALANCE]:
4858
        # either we changed auto_balance to yes or it was from before
4859
        mem_check_list.extend(instance.secondary_nodes)
4860
      instance_info = self.rpc.call_instance_info(pnode, instance.name,
4861
                                                  instance.hypervisor)
4862
      nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(),
4863
                                         instance.hypervisor)
4864

    
4865
      if pnode not in nodeinfo or not isinstance(nodeinfo[pnode], dict):
4866
        # Assume the primary node is unreachable and go ahead
4867
        self.warn.append("Can't get info from primary node %s" % pnode)
4868
      else:
4869
        if instance_info:
4870
          current_mem = instance_info['memory']
4871
        else:
4872
          # Assume instance not running
4873
          # (there is a slight race condition here, but it's not very probable,
4874
          # and we have no other way to check)
4875
          current_mem = 0
4876
        miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
4877
                    nodeinfo[pnode]['memory_free'])
4878
        if miss_mem > 0:
4879
          raise errors.OpPrereqError("This change will prevent the instance"
4880
                                     " from starting, due to %d MB of memory"
4881
                                     " missing on its primary node" % miss_mem)
4882

    
4883
      if be_new[constants.BE_AUTO_BALANCE]:
4884
        for node in instance.secondary_nodes:
4885
          if node not in nodeinfo or not isinstance(nodeinfo[node], dict):
4886
            self.warn.append("Can't get info from secondary node %s" % node)
4887
          elif be_new[constants.BE_MEMORY] > nodeinfo[node]['memory_free']:
4888
            self.warn.append("Not enough memory to failover instance to"
4889
                             " secondary node %s" % node)
4890

    
4891
    # NIC processing
4892
    for nic_op, nic_dict in self.op.nics:
4893
      if nic_op == constants.DDM_REMOVE:
4894
        if not instance.nics:
4895
          raise errors.OpPrereqError("Instance has no NICs, cannot remove")
4896
        continue
4897
      if nic_op != constants.DDM_ADD:
4898
        # an existing nic
4899
        if nic_op < 0 or nic_op >= len(instance.nics):
4900
          raise errors.OpPrereqError("Invalid NIC index %s, valid values"
4901
                                     " are 0 to %d" %
4902
                                     (nic_op, len(instance.nics)))
4903
      nic_bridge = nic_dict.get('bridge', None)
4904
      if nic_bridge is not None:
4905
        if not self.rpc.call_bridges_exist(pnode, [nic_bridge]):
4906
          msg = ("Bridge '%s' doesn't exist on one of"
4907
                 " the instance nodes" % nic_bridge)
4908
          if self.force:
4909
            self.warn.append(msg)
4910
          else:
4911
            raise errors.OpPrereqError(msg)
4912

    
4913
    # DISK processing
4914
    if self.op.disks and instance.disk_template == constants.DT_DISKLESS:
4915
      raise errors.OpPrereqError("Disk operations not supported for"
4916
                                 " diskless instances")
4917
    for disk_op, disk_dict in self.op.disks:
4918
      if disk_op == constants.DDM_REMOVE:
4919
        if len(instance.disks) == 1:
4920
          raise errors.OpPrereqError("Cannot remove the last disk of"
4921
                                     " an instance")
4922
        ins_l = self.rpc.call_instance_list([pnode], [instance.hypervisor])
4923
        ins_l = ins_l[pnode]
4924
        if not type(ins_l) is list:
4925
          raise errors.OpPrereqError("Can't contact node '%s'" % pnode)
4926
        if instance.name in ins_l:
4927
          raise errors.OpPrereqError("Instance is running, can't remove"
4928
                                     " disks.")
4929

    
4930
      if (disk_op == constants.DDM_ADD and
4931
          len(instance.nics) >= constants.MAX_DISKS):
4932
        raise errors.OpPrereqError("Instance has too many disks (%d), cannot"
4933
                                   " add more" % constants.MAX_DISKS)
4934
      if disk_op not in (constants.DDM_ADD, constants.DDM_REMOVE):
4935
        # an existing disk
4936
        if disk_op < 0 or disk_op >= len(instance.disks):
4937
          raise errors.OpPrereqError("Invalid disk index %s, valid values"
4938
                                     " are 0 to %d" %
4939
                                     (disk_op, len(instance.disks)))
4940

    
4941
    return
4942

    
4943
  def Exec(self, feedback_fn):
4944
    """Modifies an instance.
4945

4946
    All parameters take effect only at the next restart of the instance.
4947

4948
    """
4949
    # Process here the warnings from CheckPrereq, as we don't have a
4950
    # feedback_fn there.
4951
    for warn in self.warn:
4952
      feedback_fn("WARNING: %s" % warn)
4953

    
4954
    result = []
4955
    instance = self.instance
4956
    # disk changes
4957
    for disk_op, disk_dict in self.op.disks:
4958
      if disk_op == constants.DDM_REMOVE:
4959
        # remove the last disk
4960
        device = instance.disks.pop()
4961
        device_idx = len(instance.disks)
4962
        for node, disk in device.ComputeNodeTree(instance.primary_node):
4963
          self.cfg.SetDiskID(disk, node)
4964
          if not self.rpc.call_blockdev_remove(node, disk):
4965
            self.proc.LogWarning("Could not remove disk/%d on node %s,"
4966
                                 " continuing anyway", device_idx, node)
4967
        result.append(("disk/%d" % device_idx, "remove"))
4968
      elif disk_op == constants.DDM_ADD:
4969
        # add a new disk
4970
        if instance.disk_template == constants.DT_FILE:
4971
          file_driver, file_path = instance.disks[0].logical_id
4972
          file_path = os.path.dirname(file_path)
4973
        else:
4974
          file_driver = file_path = None
4975
        disk_idx_base = len(instance.disks)
4976
        new_disk = _GenerateDiskTemplate(self,
4977
                                         instance.disk_template,
4978
                                         instance, instance.primary_node,
4979
                                         instance.secondary_nodes,
4980
                                         [disk_dict],
4981
                                         file_path,
4982
                                         file_driver,
4983
                                         disk_idx_base)[0]
4984
        new_disk.mode = disk_dict['mode']
4985
        instance.disks.append(new_disk)
4986
        info = _GetInstanceInfoText(instance)
4987

    
4988
        logging.info("Creating volume %s for instance %s",
4989
                     new_disk.iv_name, instance.name)
4990
        # Note: this needs to be kept in sync with _CreateDisks
4991
        #HARDCODE
4992
        for secondary_node in instance.secondary_nodes:
4993
          if not _CreateBlockDevOnSecondary(self, secondary_node, instance,
4994
                                            new_disk, False, info):
4995
            self.LogWarning("Failed to create volume %s (%s) on"
4996
                            " secondary node %s!",
4997
                            new_disk.iv_name, new_disk, secondary_node)
4998
        #HARDCODE
4999
        if not _CreateBlockDevOnPrimary(self, instance.primary_node,
5000
                                        instance, new_disk, info):
5001
          self.LogWarning("Failed to create volume %s on primary!",
5002
                          new_disk.iv_name)
5003
        result.append(("disk/%d" % disk_idx_base, "add:size=%s,mode=%s" %
5004
                       (new_disk.size, new_disk.mode)))
5005
      else:
5006
        # change a given disk
5007
        instance.disks[disk_op].mode = disk_dict['mode']
5008
        result.append(("disk.mode/%d" % disk_op, disk_dict['mode']))
5009
    # NIC changes
5010
    for nic_op, nic_dict in self.op.nics:
5011
      if nic_op == constants.DDM_REMOVE:
5012
        # remove the last nic
5013
        del instance.nics[-1]
5014
        result.append(("nic.%d" % len(instance.nics), "remove"))
5015
      elif nic_op == constants.DDM_ADD:
5016
        # add a new nic
5017
        if 'mac' not in nic_dict:
5018
          mac = constants.VALUE_GENERATE
5019
        else:
5020
          mac = nic_dict['mac']
5021
        if mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5022
          mac = self.cfg.GenerateMAC()
5023
        new_nic = objects.NIC(mac=mac, ip=nic_dict.get('ip', None),
5024
                              bridge=nic_dict.get('bridge', None))
5025
        instance.nics.append(new_nic)
5026
        result.append(("nic.%d" % (len(instance.nics) - 1),
5027
                       "add:mac=%s,ip=%s,bridge=%s" %
5028
                       (new_nic.mac, new_nic.ip, new_nic.bridge)))
5029
      else:
5030
        # change a given nic
5031
        for key in 'mac', 'ip', 'bridge':
5032
          if key in nic_dict:
5033
            setattr(instance.nics[nic_op], key, nic_dict[key])
5034
            result.append(("nic.%s/%d" % (key, nic_op), nic_dict[key]))
5035

    
5036
    # hvparams changes
5037
    if self.op.hvparams:
5038
      instance.hvparams = self.hv_new
5039
      for key, val in self.op.hvparams.iteritems():
5040
        result.append(("hv/%s" % key, val))
5041

    
5042
    # beparams changes
5043
    if self.op.beparams:
5044
      instance.beparams = self.be_inst
5045
      for key, val in self.op.beparams.iteritems():
5046
        result.append(("be/%s" % key, val))
5047

    
5048
    self.cfg.Update(instance)
5049

    
5050
    return result
5051

    
5052

    
5053
class LUQueryExports(NoHooksLU):
5054
  """Query the exports list
5055

5056
  """
5057
  _OP_REQP = ['nodes']
5058
  REQ_BGL = False
5059

    
5060
  def ExpandNames(self):
5061
    self.needed_locks = {}
5062
    self.share_locks[locking.LEVEL_NODE] = 1
5063
    if not self.op.nodes:
5064
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5065
    else:
5066
      self.needed_locks[locking.LEVEL_NODE] = \
5067
        _GetWantedNodes(self, self.op.nodes)
5068

    
5069
  def CheckPrereq(self):
5070
    """Check prerequisites.
5071

5072
    """
5073
    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
5074

    
5075
  def Exec(self, feedback_fn):
5076
    """Compute the list of all the exported system images.
5077

5078
    @rtype: dict
5079
    @return: a dictionary with the structure node->(export-list)
5080
        where export-list is a list of the instances exported on
5081
        that node.
5082

5083
    """
5084
    return self.rpc.call_export_list(self.nodes)
5085

    
5086

    
5087
class LUExportInstance(LogicalUnit):
5088
  """Export an instance to an image in the cluster.
5089

5090
  """
5091
  HPATH = "instance-export"
5092
  HTYPE = constants.HTYPE_INSTANCE
5093
  _OP_REQP = ["instance_name", "target_node", "shutdown"]
5094
  REQ_BGL = False
5095

    
5096
  def ExpandNames(self):
5097
    self._ExpandAndLockInstance()
5098
    # FIXME: lock only instance primary and destination node
5099
    #
5100
    # Sad but true, for now we have do lock all nodes, as we don't know where
5101
    # the previous export might be, and and in this LU we search for it and
5102
    # remove it from its current node. In the future we could fix this by:
5103
    #  - making a tasklet to search (share-lock all), then create the new one,
5104
    #    then one to remove, after
5105
    #  - removing the removal operation altoghether
5106
    self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5107

    
5108
  def DeclareLocks(self, level):
5109
    """Last minute lock declaration."""
5110
    # All nodes are locked anyway, so nothing to do here.
5111

    
5112
  def BuildHooksEnv(self):
5113
    """Build hooks env.
5114

5115
    This will run on the master, primary node and target node.
5116

5117
    """
5118
    env = {
5119
      "EXPORT_NODE": self.op.target_node,
5120
      "EXPORT_DO_SHUTDOWN": self.op.shutdown,
5121
      }
5122
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5123
    nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
5124
          self.op.target_node]
5125
    return env, nl, nl
5126

    
5127
  def CheckPrereq(self):
5128
    """Check prerequisites.
5129

5130
    This checks that the instance and node names are valid.
5131

5132
    """
5133
    instance_name = self.op.instance_name
5134
    self.instance = self.cfg.GetInstanceInfo(instance_name)
5135
    assert self.instance is not None, \
5136
          "Cannot retrieve locked instance %s" % self.op.instance_name
5137

    
5138
    self.dst_node = self.cfg.GetNodeInfo(
5139
      self.cfg.ExpandNodeName(self.op.target_node))
5140

    
5141
    assert self.dst_node is not None, \
5142
          "Cannot retrieve locked node %s" % self.op.target_node
5143

    
5144
    # instance disk type verification
5145
    for disk in self.instance.disks:
5146
      if disk.dev_type == constants.LD_FILE:
5147
        raise errors.OpPrereqError("Export not supported for instances with"
5148
                                   " file-based disks")
5149

    
5150
  def Exec(self, feedback_fn):
5151
    """Export an instance to an image in the cluster.
5152

5153
    """
5154
    instance = self.instance
5155
    dst_node = self.dst_node
5156
    src_node = instance.primary_node
5157
    if self.op.shutdown:
5158
      # shutdown the instance, but not the disks
5159
      if not self.rpc.call_instance_shutdown(src_node, instance):
5160
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
5161
                                 (instance.name, src_node))
5162

    
5163
    vgname = self.cfg.GetVGName()
5164

    
5165
    snap_disks = []
5166

    
5167
    try:
5168
      for disk in instance.disks:
5169
        # new_dev_name will be a snapshot of an lvm leaf of the one we passed
5170
        new_dev_name = self.rpc.call_blockdev_snapshot(src_node, disk)
5171

    
5172
        if not new_dev_name:
5173
          self.LogWarning("Could not snapshot block device %s on node %s",
5174
                          disk.logical_id[1], src_node)
5175
          snap_disks.append(False)
5176
        else:
5177
          new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
5178
                                 logical_id=(vgname, new_dev_name),
5179
                                 physical_id=(vgname, new_dev_name),
5180
                                 iv_name=disk.iv_name)
5181
          snap_disks.append(new_dev)
5182

    
5183
    finally:
5184
      if self.op.shutdown and instance.status == "up":
5185
        if not self.rpc.call_instance_start(src_node, instance, None):
5186
          _ShutdownInstanceDisks(self, instance)
5187
          raise errors.OpExecError("Could not start instance")
5188

    
5189
    # TODO: check for size
5190

    
5191
    cluster_name = self.cfg.GetClusterName()
5192
    for idx, dev in enumerate(snap_disks):
5193
      if dev:
5194
        if not self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
5195
                                             instance, cluster_name, idx):
5196
          self.LogWarning("Could not export block device %s from node %s to"
5197
                          " node %s", dev.logical_id[1], src_node,
5198
                          dst_node.name)
5199
        if not self.rpc.call_blockdev_remove(src_node, dev):
5200
          self.LogWarning("Could not remove snapshot block device %s from node"
5201
                          " %s", dev.logical_id[1], src_node)
5202

    
5203
    if not self.rpc.call_finalize_export(dst_node.name, instance, snap_disks):
5204
      self.LogWarning("Could not finalize export for instance %s on node %s",
5205
                      instance.name, dst_node.name)
5206

    
5207
    nodelist = self.cfg.GetNodeList()
5208
    nodelist.remove(dst_node.name)
5209

    
5210
    # on one-node clusters nodelist will be empty after the removal
5211
    # if we proceed the backup would be removed because OpQueryExports
5212
    # substitutes an empty list with the full cluster node list.
5213
    if nodelist:
5214
      exportlist = self.rpc.call_export_list(nodelist)
5215
      for node in exportlist:
5216
        if instance.name in exportlist[node]:
5217
          if not self.rpc.call_export_remove(node, instance.name):
5218
            self.LogWarning("Could not remove older export for instance %s"
5219
                            " on node %s", instance.name, node)
5220

    
5221

    
5222
class LURemoveExport(NoHooksLU):
5223
  """Remove exports related to the named instance.
5224

5225
  """
5226
  _OP_REQP = ["instance_name"]
5227
  REQ_BGL = False
5228

    
5229
  def ExpandNames(self):
5230
    self.needed_locks = {}
5231
    # We need all nodes to be locked in order for RemoveExport to work, but we
5232
    # don't need to lock the instance itself, as nothing will happen to it (and
5233
    # we can remove exports also for a removed instance)
5234
    self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5235

    
5236
  def CheckPrereq(self):
5237
    """Check prerequisites.
5238
    """
5239
    pass
5240

    
5241
  def Exec(self, feedback_fn):
5242
    """Remove any export.
5243

5244
    """
5245
    instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
5246
    # If the instance was not found we'll try with the name that was passed in.
5247
    # This will only work if it was an FQDN, though.
5248
    fqdn_warn = False
5249
    if not instance_name:
5250
      fqdn_warn = True
5251
      instance_name = self.op.instance_name
5252

    
5253
    exportlist = self.rpc.call_export_list(self.acquired_locks[
5254
      locking.LEVEL_NODE])
5255
    found = False
5256
    for node in exportlist:
5257
      if instance_name in exportlist[node]:
5258
        found = True
5259
        if not self.rpc.call_export_remove(node, instance_name):
5260
          logging.error("Could not remove export for instance %s"
5261
                        " on node %s", instance_name, node)
5262

    
5263
    if fqdn_warn and not found:
5264
      feedback_fn("Export not found. If trying to remove an export belonging"
5265
                  " to a deleted instance please use its Fully Qualified"
5266
                  " Domain Name.")
5267

    
5268

    
5269
class TagsLU(NoHooksLU):
5270
  """Generic tags LU.
5271

5272
  This is an abstract class which is the parent of all the other tags LUs.
5273

5274
  """
5275

    
5276
  def ExpandNames(self):
5277
    self.needed_locks = {}
5278
    if self.op.kind == constants.TAG_NODE:
5279
      name = self.cfg.ExpandNodeName(self.op.name)
5280
      if name is None:
5281
        raise errors.OpPrereqError("Invalid node name (%s)" %
5282
                                   (self.op.name,))
5283
      self.op.name = name
5284
      self.needed_locks[locking.LEVEL_NODE] = name
5285
    elif self.op.kind == constants.TAG_INSTANCE:
5286
      name = self.cfg.ExpandInstanceName(self.op.name)
5287
      if name is None:
5288
        raise errors.OpPrereqError("Invalid instance name (%s)" %
5289
                                   (self.op.name,))
5290
      self.op.name = name
5291
      self.needed_locks[locking.LEVEL_INSTANCE] = name
5292

    
5293
  def CheckPrereq(self):
5294
    """Check prerequisites.
5295

5296
    """
5297
    if self.op.kind == constants.TAG_CLUSTER:
5298
      self.target = self.cfg.GetClusterInfo()
5299
    elif self.op.kind == constants.TAG_NODE:
5300
      self.target = self.cfg.GetNodeInfo(self.op.name)
5301
    elif self.op.kind == constants.TAG_INSTANCE:
5302
      self.target = self.cfg.GetInstanceInfo(self.op.name)
5303
    else:
5304
      raise errors.OpPrereqError("Wrong tag type requested (%s)" %
5305
                                 str(self.op.kind))
5306

    
5307

    
5308
class LUGetTags(TagsLU):
5309
  """Returns the tags of a given object.
5310

5311
  """
5312
  _OP_REQP = ["kind", "name"]
5313
  REQ_BGL = False
5314

    
5315
  def Exec(self, feedback_fn):
5316
    """Returns the tag list.
5317

5318
    """
5319
    return list(self.target.GetTags())
5320

    
5321

    
5322
class LUSearchTags(NoHooksLU):
5323
  """Searches the tags for a given pattern.
5324

5325
  """
5326
  _OP_REQP = ["pattern"]
5327
  REQ_BGL = False
5328

    
5329
  def ExpandNames(self):
5330
    self.needed_locks = {}
5331

    
5332
  def CheckPrereq(self):
5333
    """Check prerequisites.
5334

5335
    This checks the pattern passed for validity by compiling it.
5336

5337
    """
5338
    try:
5339
      self.re = re.compile(self.op.pattern)
5340
    except re.error, err:
5341
      raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
5342
                                 (self.op.pattern, err))
5343

    
5344
  def Exec(self, feedback_fn):
5345
    """Returns the tag list.
5346

5347
    """
5348
    cfg = self.cfg
5349
    tgts = [("/cluster", cfg.GetClusterInfo())]
5350
    ilist = cfg.GetAllInstancesInfo().values()
5351
    tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
5352
    nlist = cfg.GetAllNodesInfo().values()
5353
    tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
5354
    results = []
5355
    for path, target in tgts:
5356
      for tag in target.GetTags():
5357
        if self.re.search(tag):
5358
          results.append((path, tag))
5359
    return results
5360

    
5361

    
5362
class LUAddTags(TagsLU):
5363
  """Sets a tag on a given object.
5364

5365
  """
5366
  _OP_REQP = ["kind", "name", "tags"]
5367
  REQ_BGL = False
5368

    
5369
  def CheckPrereq(self):
5370
    """Check prerequisites.
5371

5372
    This checks the type and length of the tag name and value.
5373

5374
    """
5375
    TagsLU.CheckPrereq(self)
5376
    for tag in self.op.tags:
5377
      objects.TaggableObject.ValidateTag(tag)
5378

    
5379
  def Exec(self, feedback_fn):
5380
    """Sets the tag.
5381

5382
    """
5383
    try:
5384
      for tag in self.op.tags:
5385
        self.target.AddTag(tag)
5386
    except errors.TagError, err:
5387
      raise errors.OpExecError("Error while setting tag: %s" % str(err))
5388
    try:
5389
      self.cfg.Update(self.target)
5390
    except errors.ConfigurationError:
5391
      raise errors.OpRetryError("There has been a modification to the"
5392
                                " config file and the operation has been"
5393
                                " aborted. Please retry.")
5394

    
5395

    
5396
class LUDelTags(TagsLU):
5397
  """Delete a list of tags from a given object.
5398

5399
  """
5400
  _OP_REQP = ["kind", "name", "tags"]
5401
  REQ_BGL = False
5402

    
5403
  def CheckPrereq(self):
5404
    """Check prerequisites.
5405

5406
    This checks that we have the given tag.
5407

5408
    """
5409
    TagsLU.CheckPrereq(self)
5410
    for tag in self.op.tags:
5411
      objects.TaggableObject.ValidateTag(tag)
5412
    del_tags = frozenset(self.op.tags)
5413
    cur_tags = self.target.GetTags()
5414
    if not del_tags <= cur_tags:
5415
      diff_tags = del_tags - cur_tags
5416
      diff_names = ["'%s'" % tag for tag in diff_tags]
5417
      diff_names.sort()
5418
      raise errors.OpPrereqError("Tag(s) %s not found" %
5419
                                 (",".join(diff_names)))
5420

    
5421
  def Exec(self, feedback_fn):
5422
    """Remove the tag from the object.
5423

5424
    """
5425
    for tag in self.op.tags:
5426
      self.target.RemoveTag(tag)
5427
    try:
5428
      self.cfg.Update(self.target)
5429
    except errors.ConfigurationError:
5430
      raise errors.OpRetryError("There has been a modification to the"
5431
                                " config file and the operation has been"
5432
                                " aborted. Please retry.")
5433

    
5434

    
5435
class LUTestDelay(NoHooksLU):
5436
  """Sleep for a specified amount of time.
5437

5438
  This LU sleeps on the master and/or nodes for a specified amount of
5439
  time.
5440

5441
  """
5442
  _OP_REQP = ["duration", "on_master", "on_nodes"]
5443
  REQ_BGL = False
5444

    
5445
  def ExpandNames(self):
5446
    """Expand names and set required locks.
5447

5448
    This expands the node list, if any.
5449

5450
    """
5451
    self.needed_locks = {}
5452
    if self.op.on_nodes:
5453
      # _GetWantedNodes can be used here, but is not always appropriate to use
5454
      # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
5455
      # more information.
5456
      self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
5457
      self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
5458

    
5459
  def CheckPrereq(self):
5460
    """Check prerequisites.
5461

5462
    """
5463

    
5464
  def Exec(self, feedback_fn):
5465
    """Do the actual sleep.
5466

5467
    """
5468
    if self.op.on_master:
5469
      if not utils.TestDelay(self.op.duration):
5470
        raise errors.OpExecError("Error during master delay test")
5471
    if self.op.on_nodes:
5472
      result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
5473
      if not result:
5474
        raise errors.OpExecError("Complete failure from rpc call")
5475
      for node, node_result in result.items():
5476
        if not node_result:
5477
          raise errors.OpExecError("Failure during rpc call to node %s,"
5478
                                   " result: %s" % (node, node_result))
5479

    
5480

    
5481
class IAllocator(object):
5482
  """IAllocator framework.
5483

5484
  An IAllocator instance has three sets of attributes:
5485
    - cfg that is needed to query the cluster
5486
    - input data (all members of the _KEYS class attribute are required)
5487
    - four buffer attributes (in|out_data|text), that represent the
5488
      input (to the external script) in text and data structure format,
5489
      and the output from it, again in two formats
5490
    - the result variables from the script (success, info, nodes) for
5491
      easy usage
5492

5493
  """
5494
  _ALLO_KEYS = [
5495
    "mem_size", "disks", "disk_template",
5496
    "os", "tags", "nics", "vcpus", "hypervisor",
5497
    ]
5498
  _RELO_KEYS = [
5499
    "relocate_from",
5500
    ]
5501

    
5502
  def __init__(self, lu, mode, name, **kwargs):
5503
    self.lu = lu
5504
    # init buffer variables
5505
    self.in_text = self.out_text = self.in_data = self.out_data = None
5506
    # init all input fields so that pylint is happy
5507
    self.mode = mode
5508
    self.name = name
5509
    self.mem_size = self.disks = self.disk_template = None
5510
    self.os = self.tags = self.nics = self.vcpus = None
5511
    self.relocate_from = None
5512
    # computed fields
5513
    self.required_nodes = None
5514
    # init result fields
5515
    self.success = self.info = self.nodes = None
5516
    if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5517
      keyset = self._ALLO_KEYS
5518
    elif self.mode == constants.IALLOCATOR_MODE_RELOC:
5519
      keyset = self._RELO_KEYS
5520
    else:
5521
      raise errors.ProgrammerError("Unknown mode '%s' passed to the"
5522
                                   " IAllocator" % self.mode)
5523
    for key in kwargs:
5524
      if key not in keyset:
5525
        raise errors.ProgrammerError("Invalid input parameter '%s' to"
5526
                                     " IAllocator" % key)
5527
      setattr(self, key, kwargs[key])
5528
    for key in keyset:
5529
      if key not in kwargs:
5530
        raise errors.ProgrammerError("Missing input parameter '%s' to"
5531
                                     " IAllocator" % key)
5532
    self._BuildInputData()
5533

    
5534
  def _ComputeClusterData(self):
5535
    """Compute the generic allocator input data.
5536

5537
    This is the data that is independent of the actual operation.
5538

5539
    """
5540
    cfg = self.lu.cfg
5541
    cluster_info = cfg.GetClusterInfo()
5542
    # cluster data
5543
    data = {
5544
      "version": 1,
5545
      "cluster_name": cfg.GetClusterName(),
5546
      "cluster_tags": list(cluster_info.GetTags()),
5547
      "enable_hypervisors": list(cluster_info.enabled_hypervisors),
5548
      # we don't have job IDs
5549
      }
5550
    iinfo = cfg.GetAllInstancesInfo().values()
5551
    i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo]
5552

    
5553
    # node data
5554
    node_results = {}
5555
    node_list = cfg.GetNodeList()
5556

    
5557
    if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5558
      hypervisor = self.hypervisor
5559
    elif self.mode == constants.IALLOCATOR_MODE_RELOC:
5560
      hypervisor = cfg.GetInstanceInfo(self.name).hypervisor
5561

    
5562
    node_data = self.lu.rpc.call_node_info(node_list, cfg.GetVGName(),
5563
                                           hypervisor)
5564
    node_iinfo = self.lu.rpc.call_all_instances_info(node_list,
5565
                       cluster_info.enabled_hypervisors)
5566
    for nname in node_list:
5567
      ninfo = cfg.GetNodeInfo(nname)
5568
      if nname not in node_data or not isinstance(node_data[nname], dict):
5569
        raise errors.OpExecError("Can't get data for node %s" % nname)
5570
      remote_info = node_data[nname]
5571
      for attr in ['memory_total', 'memory_free', 'memory_dom0',
5572
                   'vg_size', 'vg_free', 'cpu_total']:
5573
        if attr not in remote_info:
5574
          raise errors.OpExecError("Node '%s' didn't return attribute '%s'" %
5575
                                   (nname, attr))
5576
        try:
5577
          remote_info[attr] = int(remote_info[attr])
5578
        except ValueError, err:
5579
          raise errors.OpExecError("Node '%s' returned invalid value for '%s':"
5580
                                   " %s" % (nname, attr, str(err)))
5581
      # compute memory used by primary instances
5582
      i_p_mem = i_p_up_mem = 0
5583
      for iinfo, beinfo in i_list:
5584
        if iinfo.primary_node == nname:
5585
          i_p_mem += beinfo[constants.BE_MEMORY]
5586
          if iinfo.name not in node_iinfo[nname]:
5587
            i_used_mem = 0
5588
          else:
5589
            i_used_mem = int(node_iinfo[nname][iinfo.name]['memory'])
5590
          i_mem_diff = beinfo[constants.BE_MEMORY] - i_used_mem
5591
          remote_info['memory_free'] -= max(0, i_mem_diff)
5592

    
5593
          if iinfo.status == "up":
5594
            i_p_up_mem += beinfo[constants.BE_MEMORY]
5595

    
5596
      # compute memory used by instances
5597
      pnr = {
5598
        "tags": list(ninfo.GetTags()),
5599
        "total_memory": remote_info['memory_total'],
5600
        "reserved_memory": remote_info['memory_dom0'],
5601
        "free_memory": remote_info['memory_free'],
5602
        "i_pri_memory": i_p_mem,
5603
        "i_pri_up_memory": i_p_up_mem,
5604
        "total_disk": remote_info['vg_size'],
5605
        "free_disk": remote_info['vg_free'],
5606
        "primary_ip": ninfo.primary_ip,
5607
        "secondary_ip": ninfo.secondary_ip,
5608
        "total_cpus": remote_info['cpu_total'],
5609
        }
5610
      node_results[nname] = pnr
5611
    data["nodes"] = node_results
5612

    
5613
    # instance data
5614
    instance_data = {}
5615
    for iinfo, beinfo in i_list:
5616
      nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
5617
                  for n in iinfo.nics]
5618
      pir = {
5619
        "tags": list(iinfo.GetTags()),
5620
        "should_run": iinfo.status == "up",
5621
        "vcpus": beinfo[constants.BE_VCPUS],
5622
        "memory": beinfo[constants.BE_MEMORY],
5623
        "os": iinfo.os,
5624
        "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
5625
        "nics": nic_data,
5626
        "disks": [{"size": dsk.size, "mode": "w"} for dsk in iinfo.disks],
5627
        "disk_template": iinfo.disk_template,
5628
        "hypervisor": iinfo.hypervisor,
5629
        }
5630
      instance_data[iinfo.name] = pir
5631

    
5632
    data["instances"] = instance_data
5633

    
5634
    self.in_data = data
5635

    
5636
  def _AddNewInstance(self):
5637
    """Add new instance data to allocator structure.
5638

5639
    This in combination with _AllocatorGetClusterData will create the
5640
    correct structure needed as input for the allocator.
5641

5642
    The checks for the completeness of the opcode must have already been
5643
    done.
5644

5645
    """
5646
    data = self.in_data
5647
    if len(self.disks) != 2:
5648
      raise errors.OpExecError("Only two-disk configurations supported")
5649

    
5650
    disk_space = _ComputeDiskSize(self.disk_template, self.disks)
5651

    
5652
    if self.disk_template in constants.DTS_NET_MIRROR:
5653
      self.required_nodes = 2
5654
    else:
5655
      self.required_nodes = 1
5656
    request = {
5657
      "type": "allocate",
5658
      "name": self.name,
5659
      "disk_template": self.disk_template,
5660
      "tags": self.tags,
5661
      "os": self.os,
5662
      "vcpus": self.vcpus,
5663
      "memory": self.mem_size,
5664
      "disks": self.disks,
5665
      "disk_space_total": disk_space,
5666
      "nics": self.nics,
5667
      "required_nodes": self.required_nodes,
5668
      }
5669
    data["request"] = request
5670

    
5671
  def _AddRelocateInstance(self):
5672
    """Add relocate instance data to allocator structure.
5673

5674
    This in combination with _IAllocatorGetClusterData will create the
5675
    correct structure needed as input for the allocator.
5676

5677
    The checks for the completeness of the opcode must have already been
5678
    done.
5679

5680
    """
5681
    instance = self.lu.cfg.GetInstanceInfo(self.name)
5682
    if instance is None:
5683
      raise errors.ProgrammerError("Unknown instance '%s' passed to"
5684
                                   " IAllocator" % self.name)
5685

    
5686
    if instance.disk_template not in constants.DTS_NET_MIRROR:
5687
      raise errors.OpPrereqError("Can't relocate non-mirrored instances")
5688

    
5689
    if len(instance.secondary_nodes) != 1:
5690
      raise errors.OpPrereqError("Instance has not exactly one secondary node")
5691

    
5692
    self.required_nodes = 1
5693
    disk_sizes = [{'size': disk.size} for disk in instance.disks]
5694
    disk_space = _ComputeDiskSize(instance.disk_template, disk_sizes)
5695

    
5696
    request = {
5697
      "type": "relocate",
5698
      "name": self.name,
5699
      "disk_space_total": disk_space,
5700
      "required_nodes": self.required_nodes,
5701
      "relocate_from": self.relocate_from,
5702
      }
5703
    self.in_data["request"] = request
5704

    
5705
  def _BuildInputData(self):
5706
    """Build input data structures.
5707

5708
    """
5709
    self._ComputeClusterData()
5710

    
5711
    if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5712
      self._AddNewInstance()
5713
    else:
5714
      self._AddRelocateInstance()
5715

    
5716
    self.in_text = serializer.Dump(self.in_data)
5717

    
5718
  def Run(self, name, validate=True, call_fn=None):
5719
    """Run an instance allocator and return the results.
5720

5721
    """
5722
    if call_fn is None:
5723
      call_fn = self.lu.rpc.call_iallocator_runner
5724
    data = self.in_text
5725

    
5726
    result = call_fn(self.lu.cfg.GetMasterNode(), name, self.in_text)
5727

    
5728
    if not isinstance(result, (list, tuple)) or len(result) != 4:
5729
      raise errors.OpExecError("Invalid result from master iallocator runner")
5730

    
5731
    rcode, stdout, stderr, fail = result
5732

    
5733
    if rcode == constants.IARUN_NOTFOUND:
5734
      raise errors.OpExecError("Can't find allocator '%s'" % name)
5735
    elif rcode == constants.IARUN_FAILURE:
5736
      raise errors.OpExecError("Instance allocator call failed: %s,"
5737
                               " output: %s" % (fail, stdout+stderr))
5738
    self.out_text = stdout
5739
    if validate:
5740
      self._ValidateResult()
5741

    
5742
  def _ValidateResult(self):
5743
    """Process the allocator results.
5744

5745
    This will process and if successful save the result in
5746
    self.out_data and the other parameters.
5747

5748
    """
5749
    try:
5750
      rdict = serializer.Load(self.out_text)
5751
    except Exception, err:
5752
      raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
5753

    
5754
    if not isinstance(rdict, dict):
5755
      raise errors.OpExecError("Can't parse iallocator results: not a dict")
5756

    
5757
    for key in "success", "info", "nodes":
5758
      if key not in rdict:
5759
        raise errors.OpExecError("Can't parse iallocator results:"
5760
                                 " missing key '%s'" % key)
5761
      setattr(self, key, rdict[key])
5762

    
5763
    if not isinstance(rdict["nodes"], list):
5764
      raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
5765
                               " is not a list")
5766
    self.out_data = rdict
5767

    
5768

    
5769
class LUTestAllocator(NoHooksLU):
5770
  """Run allocator tests.
5771

5772
  This LU runs the allocator tests
5773

5774
  """
5775
  _OP_REQP = ["direction", "mode", "name"]
5776

    
5777
  def CheckPrereq(self):
5778
    """Check prerequisites.
5779

5780
    This checks the opcode parameters depending on the director and mode test.
5781

5782
    """
5783
    if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5784
      for attr in ["name", "mem_size", "disks", "disk_template",
5785
                   "os", "tags", "nics", "vcpus"]:
5786
        if not hasattr(self.op, attr):
5787
          raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
5788
                                     attr)
5789
      iname = self.cfg.ExpandInstanceName(self.op.name)
5790
      if iname is not None:
5791
        raise errors.OpPrereqError("Instance '%s' already in the cluster" %
5792
                                   iname)
5793
      if not isinstance(self.op.nics, list):
5794
        raise errors.OpPrereqError("Invalid parameter 'nics'")
5795
      for row in self.op.nics:
5796
        if (not isinstance(row, dict) or
5797
            "mac" not in row or
5798
            "ip" not in row or
5799
            "bridge" not in row):
5800
          raise errors.OpPrereqError("Invalid contents of the"
5801
                                     " 'nics' parameter")
5802
      if not isinstance(self.op.disks, list):
5803
        raise errors.OpPrereqError("Invalid parameter 'disks'")
5804
      if len(self.op.disks) != 2:
5805
        raise errors.OpPrereqError("Only two-disk configurations supported")
5806
      for row in self.op.disks:
5807
        if (not isinstance(row, dict) or
5808
            "size" not in row or
5809
            not isinstance(row["size"], int) or
5810
            "mode" not in row or
5811
            row["mode"] not in ['r', 'w']):
5812
          raise errors.OpPrereqError("Invalid contents of the"
5813
                                     " 'disks' parameter")
5814
      if self.op.hypervisor is None:
5815
        self.op.hypervisor = self.cfg.GetHypervisorType()
5816
    elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
5817
      if not hasattr(self.op, "name"):
5818
        raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
5819
      fname = self.cfg.ExpandInstanceName(self.op.name)
5820
      if fname is None:
5821
        raise errors.OpPrereqError("Instance '%s' not found for relocation" %
5822
                                   self.op.name)
5823
      self.op.name = fname
5824
      self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
5825
    else:
5826
      raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
5827
                                 self.op.mode)
5828

    
5829
    if self.op.direction == constants.IALLOCATOR_DIR_OUT:
5830
      if not hasattr(self.op, "allocator") or self.op.allocator is None:
5831
        raise errors.OpPrereqError("Missing allocator name")
5832
    elif self.op.direction != constants.IALLOCATOR_DIR_IN:
5833
      raise errors.OpPrereqError("Wrong allocator test '%s'" %
5834
                                 self.op.direction)
5835

    
5836
  def Exec(self, feedback_fn):
5837
    """Run the allocator test.
5838

5839
    """
5840
    if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5841
      ial = IAllocator(self,
5842
                       mode=self.op.mode,
5843
                       name=self.op.name,
5844
                       mem_size=self.op.mem_size,
5845
                       disks=self.op.disks,
5846
                       disk_template=self.op.disk_template,
5847
                       os=self.op.os,
5848
                       tags=self.op.tags,
5849
                       nics=self.op.nics,
5850
                       vcpus=self.op.vcpus,
5851
                       hypervisor=self.op.hypervisor,
5852
                       )
5853
    else:
5854
      ial = IAllocator(self,
5855
                       mode=self.op.mode,
5856
                       name=self.op.name,
5857
                       relocate_from=list(self.relocate_from),
5858
                       )
5859

    
5860
    if self.op.direction == constants.IALLOCATOR_DIR_IN:
5861
      result = ial.in_text
5862
    else:
5863
      ial.Run(self.op.allocator, validate=False)
5864
      result = ial.out_text
5865
    return result