Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ 726d7d68

History | View | Annotate | Download (203.4 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0613,W0201
25

    
26
import os
27
import os.path
28
import sha
29
import time
30
import tempfile
31
import re
32
import platform
33
import logging
34
import copy
35

    
36
from ganeti import ssh
37
from ganeti import utils
38
from ganeti import errors
39
from ganeti import hypervisor
40
from ganeti import locking
41
from ganeti import constants
42
from ganeti import objects
43
from ganeti import opcodes
44
from ganeti import serializer
45

    
46

    
47
class LogicalUnit(object):
48
  """Logical Unit base class.
49

50
  Subclasses must follow these rules:
51
    - implement ExpandNames
52
    - implement CheckPrereq
53
    - implement Exec
54
    - implement BuildHooksEnv
55
    - redefine HPATH and HTYPE
56
    - optionally redefine their run requirements:
57
        REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
58

59
  Note that all commands require root permissions.
60

61
  """
62
  HPATH = None
63
  HTYPE = None
64
  _OP_REQP = []
65
  REQ_BGL = True
66

    
67
  def __init__(self, processor, op, context, rpc):
68
    """Constructor for LogicalUnit.
69

70
    This needs to be overriden in derived classes in order to check op
71
    validity.
72

73
    """
74
    self.proc = processor
75
    self.op = op
76
    self.cfg = context.cfg
77
    self.context = context
78
    self.rpc = rpc
79
    # Dicts used to declare locking needs to mcpu
80
    self.needed_locks = None
81
    self.acquired_locks = {}
82
    self.share_locks = dict(((i, 0) for i in locking.LEVELS))
83
    self.add_locks = {}
84
    self.remove_locks = {}
85
    # Used to force good behavior when calling helper functions
86
    self.recalculate_locks = {}
87
    self.__ssh = None
88
    # logging
89
    self.LogWarning = processor.LogWarning
90
    self.LogInfo = processor.LogInfo
91

    
92
    for attr_name in self._OP_REQP:
93
      attr_val = getattr(op, attr_name, None)
94
      if attr_val is None:
95
        raise errors.OpPrereqError("Required parameter '%s' missing" %
96
                                   attr_name)
97
    self.CheckArguments()
98

    
99
  def __GetSSH(self):
100
    """Returns the SshRunner object
101

102
    """
103
    if not self.__ssh:
104
      self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
105
    return self.__ssh
106

    
107
  ssh = property(fget=__GetSSH)
108

    
109
  def CheckArguments(self):
110
    """Check syntactic validity for the opcode arguments.
111

112
    This method is for doing a simple syntactic check and ensure
113
    validity of opcode parameters, without any cluster-related
114
    checks. While the same can be accomplished in ExpandNames and/or
115
    CheckPrereq, doing these separate is better because:
116

117
      - ExpandNames is left as as purely a lock-related function
118
      - CheckPrereq is run after we have aquired locks (and possible
119
        waited for them)
120

121
    The function is allowed to change the self.op attribute so that
122
    later methods can no longer worry about missing parameters.
123

124
    """
125
    pass
126

    
127
  def ExpandNames(self):
128
    """Expand names for this LU.
129

130
    This method is called before starting to execute the opcode, and it should
131
    update all the parameters of the opcode to their canonical form (e.g. a
132
    short node name must be fully expanded after this method has successfully
133
    completed). This way locking, hooks, logging, ecc. can work correctly.
134

135
    LUs which implement this method must also populate the self.needed_locks
136
    member, as a dict with lock levels as keys, and a list of needed lock names
137
    as values. Rules:
138

139
      - use an empty dict if you don't need any lock
140
      - if you don't need any lock at a particular level omit that level
141
      - don't put anything for the BGL level
142
      - if you want all locks at a level use locking.ALL_SET as a value
143

144
    If you need to share locks (rather than acquire them exclusively) at one
145
    level you can modify self.share_locks, setting a true value (usually 1) for
146
    that level. By default locks are not shared.
147

148
    Examples::
149

150
      # Acquire all nodes and one instance
151
      self.needed_locks = {
152
        locking.LEVEL_NODE: locking.ALL_SET,
153
        locking.LEVEL_INSTANCE: ['instance1.example.tld'],
154
      }
155
      # Acquire just two nodes
156
      self.needed_locks = {
157
        locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
158
      }
159
      # Acquire no locks
160
      self.needed_locks = {} # No, you can't leave it to the default value None
161

162
    """
163
    # The implementation of this method is mandatory only if the new LU is
164
    # concurrent, so that old LUs don't need to be changed all at the same
165
    # time.
166
    if self.REQ_BGL:
167
      self.needed_locks = {} # Exclusive LUs don't need locks.
168
    else:
169
      raise NotImplementedError
170

    
171
  def DeclareLocks(self, level):
172
    """Declare LU locking needs for a level
173

174
    While most LUs can just declare their locking needs at ExpandNames time,
175
    sometimes there's the need to calculate some locks after having acquired
176
    the ones before. This function is called just before acquiring locks at a
177
    particular level, but after acquiring the ones at lower levels, and permits
178
    such calculations. It can be used to modify self.needed_locks, and by
179
    default it does nothing.
180

181
    This function is only called if you have something already set in
182
    self.needed_locks for the level.
183

184
    @param level: Locking level which is going to be locked
185
    @type level: member of ganeti.locking.LEVELS
186

187
    """
188

    
189
  def CheckPrereq(self):
190
    """Check prerequisites for this LU.
191

192
    This method should check that the prerequisites for the execution
193
    of this LU are fulfilled. It can do internode communication, but
194
    it should be idempotent - no cluster or system changes are
195
    allowed.
196

197
    The method should raise errors.OpPrereqError in case something is
198
    not fulfilled. Its return value is ignored.
199

200
    This method should also update all the parameters of the opcode to
201
    their canonical form if it hasn't been done by ExpandNames before.
202

203
    """
204
    raise NotImplementedError
205

    
206
  def Exec(self, feedback_fn):
207
    """Execute the LU.
208

209
    This method should implement the actual work. It should raise
210
    errors.OpExecError for failures that are somewhat dealt with in
211
    code, or expected.
212

213
    """
214
    raise NotImplementedError
215

    
216
  def BuildHooksEnv(self):
217
    """Build hooks environment for this LU.
218

219
    This method should return a three-node tuple consisting of: a dict
220
    containing the environment that will be used for running the
221
    specific hook for this LU, a list of node names on which the hook
222
    should run before the execution, and a list of node names on which
223
    the hook should run after the execution.
224

225
    The keys of the dict must not have 'GANETI_' prefixed as this will
226
    be handled in the hooks runner. Also note additional keys will be
227
    added by the hooks runner. If the LU doesn't define any
228
    environment, an empty dict (and not None) should be returned.
229

230
    No nodes should be returned as an empty list (and not None).
231

232
    Note that if the HPATH for a LU class is None, this function will
233
    not be called.
234

235
    """
236
    raise NotImplementedError
237

    
238
  def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
239
    """Notify the LU about the results of its hooks.
240

241
    This method is called every time a hooks phase is executed, and notifies
242
    the Logical Unit about the hooks' result. The LU can then use it to alter
243
    its result based on the hooks.  By default the method does nothing and the
244
    previous result is passed back unchanged but any LU can define it if it
245
    wants to use the local cluster hook-scripts somehow.
246

247
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
248
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
249
    @param hook_results: the results of the multi-node hooks rpc call
250
    @param feedback_fn: function used send feedback back to the caller
251
    @param lu_result: the previous Exec result this LU had, or None
252
        in the PRE phase
253
    @return: the new Exec result, based on the previous result
254
        and hook results
255

256
    """
257
    return lu_result
258

    
259
  def _ExpandAndLockInstance(self):
260
    """Helper function to expand and lock an instance.
261

262
    Many LUs that work on an instance take its name in self.op.instance_name
263
    and need to expand it and then declare the expanded name for locking. This
264
    function does it, and then updates self.op.instance_name to the expanded
265
    name. It also initializes needed_locks as a dict, if this hasn't been done
266
    before.
267

268
    """
269
    if self.needed_locks is None:
270
      self.needed_locks = {}
271
    else:
272
      assert locking.LEVEL_INSTANCE not in self.needed_locks, \
273
        "_ExpandAndLockInstance called with instance-level locks set"
274
    expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
275
    if expanded_name is None:
276
      raise errors.OpPrereqError("Instance '%s' not known" %
277
                                  self.op.instance_name)
278
    self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
279
    self.op.instance_name = expanded_name
280

    
281
  def _LockInstancesNodes(self, primary_only=False):
282
    """Helper function to declare instances' nodes for locking.
283

284
    This function should be called after locking one or more instances to lock
285
    their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
286
    with all primary or secondary nodes for instances already locked and
287
    present in self.needed_locks[locking.LEVEL_INSTANCE].
288

289
    It should be called from DeclareLocks, and for safety only works if
290
    self.recalculate_locks[locking.LEVEL_NODE] is set.
291

292
    In the future it may grow parameters to just lock some instance's nodes, or
293
    to just lock primaries or secondary nodes, if needed.
294

295
    If should be called in DeclareLocks in a way similar to::
296

297
      if level == locking.LEVEL_NODE:
298
        self._LockInstancesNodes()
299

300
    @type primary_only: boolean
301
    @param primary_only: only lock primary nodes of locked instances
302

303
    """
304
    assert locking.LEVEL_NODE in self.recalculate_locks, \
305
      "_LockInstancesNodes helper function called with no nodes to recalculate"
306

    
307
    # TODO: check if we're really been called with the instance locks held
308

    
309
    # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
310
    # future we might want to have different behaviors depending on the value
311
    # of self.recalculate_locks[locking.LEVEL_NODE]
312
    wanted_nodes = []
313
    for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
314
      instance = self.context.cfg.GetInstanceInfo(instance_name)
315
      wanted_nodes.append(instance.primary_node)
316
      if not primary_only:
317
        wanted_nodes.extend(instance.secondary_nodes)
318

    
319
    if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
320
      self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
321
    elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
322
      self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
323

    
324
    del self.recalculate_locks[locking.LEVEL_NODE]
325

    
326

    
327
class NoHooksLU(LogicalUnit):
328
  """Simple LU which runs no hooks.
329

330
  This LU is intended as a parent for other LogicalUnits which will
331
  run no hooks, in order to reduce duplicate code.
332

333
  """
334
  HPATH = None
335
  HTYPE = None
336

    
337

    
338
def _GetWantedNodes(lu, nodes):
339
  """Returns list of checked and expanded node names.
340

341
  @type lu: L{LogicalUnit}
342
  @param lu: the logical unit on whose behalf we execute
343
  @type nodes: list
344
  @param nodes: list of node names or None for all nodes
345
  @rtype: list
346
  @return: the list of nodes, sorted
347
  @raise errors.OpProgrammerError: if the nodes parameter is wrong type
348

349
  """
350
  if not isinstance(nodes, list):
351
    raise errors.OpPrereqError("Invalid argument type 'nodes'")
352

    
353
  if not nodes:
354
    raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
355
      " non-empty list of nodes whose name is to be expanded.")
356

    
357
  wanted = []
358
  for name in nodes:
359
    node = lu.cfg.ExpandNodeName(name)
360
    if node is None:
361
      raise errors.OpPrereqError("No such node name '%s'" % name)
362
    wanted.append(node)
363

    
364
  return utils.NiceSort(wanted)
365

    
366

    
367
def _GetWantedInstances(lu, instances):
368
  """Returns list of checked and expanded instance names.
369

370
  @type lu: L{LogicalUnit}
371
  @param lu: the logical unit on whose behalf we execute
372
  @type instances: list
373
  @param instances: list of instance names or None for all instances
374
  @rtype: list
375
  @return: the list of instances, sorted
376
  @raise errors.OpPrereqError: if the instances parameter is wrong type
377
  @raise errors.OpPrereqError: if any of the passed instances is not found
378

379
  """
380
  if not isinstance(instances, list):
381
    raise errors.OpPrereqError("Invalid argument type 'instances'")
382

    
383
  if instances:
384
    wanted = []
385

    
386
    for name in instances:
387
      instance = lu.cfg.ExpandInstanceName(name)
388
      if instance is None:
389
        raise errors.OpPrereqError("No such instance name '%s'" % name)
390
      wanted.append(instance)
391

    
392
  else:
393
    wanted = lu.cfg.GetInstanceList()
394
  return utils.NiceSort(wanted)
395

    
396

    
397
def _CheckOutputFields(static, dynamic, selected):
398
  """Checks whether all selected fields are valid.
399

400
  @type static: L{utils.FieldSet}
401
  @param static: static fields set
402
  @type dynamic: L{utils.FieldSet}
403
  @param dynamic: dynamic fields set
404

405
  """
406
  f = utils.FieldSet()
407
  f.Extend(static)
408
  f.Extend(dynamic)
409

    
410
  delta = f.NonMatching(selected)
411
  if delta:
412
    raise errors.OpPrereqError("Unknown output fields selected: %s"
413
                               % ",".join(delta))
414

    
415

    
416
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
417
                          memory, vcpus, nics):
418
  """Builds instance related env variables for hooks
419

420
  This builds the hook environment from individual variables.
421

422
  @type name: string
423
  @param name: the name of the instance
424
  @type primary_node: string
425
  @param primary_node: the name of the instance's primary node
426
  @type secondary_nodes: list
427
  @param secondary_nodes: list of secondary nodes as strings
428
  @type os_type: string
429
  @param os_type: the name of the instance's OS
430
  @type status: string
431
  @param status: the desired status of the instances
432
  @type memory: string
433
  @param memory: the memory size of the instance
434
  @type vcpus: string
435
  @param vcpus: the count of VCPUs the instance has
436
  @type nics: list
437
  @param nics: list of tuples (ip, bridge, mac) representing
438
      the NICs the instance  has
439
  @rtype: dict
440
  @return: the hook environment for this instance
441

442
  """
443
  env = {
444
    "OP_TARGET": name,
445
    "INSTANCE_NAME": name,
446
    "INSTANCE_PRIMARY": primary_node,
447
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
448
    "INSTANCE_OS_TYPE": os_type,
449
    "INSTANCE_STATUS": status,
450
    "INSTANCE_MEMORY": memory,
451
    "INSTANCE_VCPUS": vcpus,
452
  }
453

    
454
  if nics:
455
    nic_count = len(nics)
456
    for idx, (ip, bridge, mac) in enumerate(nics):
457
      if ip is None:
458
        ip = ""
459
      env["INSTANCE_NIC%d_IP" % idx] = ip
460
      env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
461
      env["INSTANCE_NIC%d_HWADDR" % idx] = mac
462
  else:
463
    nic_count = 0
464

    
465
  env["INSTANCE_NIC_COUNT"] = nic_count
466

    
467
  return env
468

    
469

    
470
def _BuildInstanceHookEnvByObject(lu, instance, override=None):
471
  """Builds instance related env variables for hooks from an object.
472

473
  @type lu: L{LogicalUnit}
474
  @param lu: the logical unit on whose behalf we execute
475
  @type instance: L{objects.Instance}
476
  @param instance: the instance for which we should build the
477
      environment
478
  @type override: dict
479
  @param override: dictionary with key/values that will override
480
      our values
481
  @rtype: dict
482
  @return: the hook environment dictionary
483

484
  """
485
  bep = lu.cfg.GetClusterInfo().FillBE(instance)
486
  args = {
487
    'name': instance.name,
488
    'primary_node': instance.primary_node,
489
    'secondary_nodes': instance.secondary_nodes,
490
    'os_type': instance.os,
491
    'status': instance.os,
492
    'memory': bep[constants.BE_MEMORY],
493
    'vcpus': bep[constants.BE_VCPUS],
494
    'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
495
  }
496
  if override:
497
    args.update(override)
498
  return _BuildInstanceHookEnv(**args)
499

    
500

    
501
def _CheckInstanceBridgesExist(lu, instance):
502
  """Check that the brigdes needed by an instance exist.
503

504
  """
505
  # check bridges existance
506
  brlist = [nic.bridge for nic in instance.nics]
507
  if not lu.rpc.call_bridges_exist(instance.primary_node, brlist):
508
    raise errors.OpPrereqError("one or more target bridges %s does not"
509
                               " exist on destination node '%s'" %
510
                               (brlist, instance.primary_node))
511

    
512

    
513
class LUDestroyCluster(NoHooksLU):
514
  """Logical unit for destroying the cluster.
515

516
  """
517
  _OP_REQP = []
518

    
519
  def CheckPrereq(self):
520
    """Check prerequisites.
521

522
    This checks whether the cluster is empty.
523

524
    Any errors are signalled by raising errors.OpPrereqError.
525

526
    """
527
    master = self.cfg.GetMasterNode()
528

    
529
    nodelist = self.cfg.GetNodeList()
530
    if len(nodelist) != 1 or nodelist[0] != master:
531
      raise errors.OpPrereqError("There are still %d node(s) in"
532
                                 " this cluster." % (len(nodelist) - 1))
533
    instancelist = self.cfg.GetInstanceList()
534
    if instancelist:
535
      raise errors.OpPrereqError("There are still %d instance(s) in"
536
                                 " this cluster." % len(instancelist))
537

    
538
  def Exec(self, feedback_fn):
539
    """Destroys the cluster.
540

541
    """
542
    master = self.cfg.GetMasterNode()
543
    if not self.rpc.call_node_stop_master(master, False):
544
      raise errors.OpExecError("Could not disable the master role")
545
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
546
    utils.CreateBackup(priv_key)
547
    utils.CreateBackup(pub_key)
548
    return master
549

    
550

    
551
class LUVerifyCluster(LogicalUnit):
552
  """Verifies the cluster status.
553

554
  """
555
  HPATH = "cluster-verify"
556
  HTYPE = constants.HTYPE_CLUSTER
557
  _OP_REQP = ["skip_checks"]
558
  REQ_BGL = False
559

    
560
  def ExpandNames(self):
561
    self.needed_locks = {
562
      locking.LEVEL_NODE: locking.ALL_SET,
563
      locking.LEVEL_INSTANCE: locking.ALL_SET,
564
    }
565
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
566

    
567
  def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
568
                  remote_version, feedback_fn):
569
    """Run multiple tests against a node.
570

571
    Test list::
572

573
      - compares ganeti version
574
      - checks vg existance and size > 20G
575
      - checks config file checksum
576
      - checks ssh to other nodes
577

578
    @type node: string
579
    @param node: the name of the node to check
580
    @param file_list: required list of files
581
    @param local_cksum: dictionary of local files and their checksums
582
    @type vglist: dict
583
    @param vglist: dictionary of volume group names and their size
584
    @param node_result: the results from the node
585
    @param remote_version: the RPC version from the remote node
586
    @param feedback_fn: function used to accumulate results
587

588
    """
589
    # compares ganeti version
590
    local_version = constants.PROTOCOL_VERSION
591
    if not remote_version:
592
      feedback_fn("  - ERROR: connection to %s failed" % (node))
593
      return True
594

    
595
    if local_version != remote_version:
596
      feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
597
                      (local_version, node, remote_version))
598
      return True
599

    
600
    # checks vg existance and size > 20G
601

    
602
    bad = False
603
    if not vglist:
604
      feedback_fn("  - ERROR: unable to check volume groups on node %s." %
605
                      (node,))
606
      bad = True
607
    else:
608
      vgstatus = utils.CheckVolumeGroupSize(vglist, self.cfg.GetVGName(),
609
                                            constants.MIN_VG_SIZE)
610
      if vgstatus:
611
        feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
612
        bad = True
613

    
614
    if not node_result:
615
      feedback_fn("  - ERROR: unable to verify node %s." % (node,))
616
      return True
617

    
618
    # checks config file checksum
619
    # checks ssh to any
620

    
621
    if 'filelist' not in node_result:
622
      bad = True
623
      feedback_fn("  - ERROR: node hasn't returned file checksum data")
624
    else:
625
      remote_cksum = node_result['filelist']
626
      for file_name in file_list:
627
        if file_name not in remote_cksum:
628
          bad = True
629
          feedback_fn("  - ERROR: file '%s' missing" % file_name)
630
        elif remote_cksum[file_name] != local_cksum[file_name]:
631
          bad = True
632
          feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
633

    
634
    if 'nodelist' not in node_result:
635
      bad = True
636
      feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
637
    else:
638
      if node_result['nodelist']:
639
        bad = True
640
        for node in node_result['nodelist']:
641
          feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
642
                          (node, node_result['nodelist'][node]))
643
    if 'node-net-test' not in node_result:
644
      bad = True
645
      feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
646
    else:
647
      if node_result['node-net-test']:
648
        bad = True
649
        nlist = utils.NiceSort(node_result['node-net-test'].keys())
650
        for node in nlist:
651
          feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
652
                          (node, node_result['node-net-test'][node]))
653

    
654
    hyp_result = node_result.get('hypervisor', None)
655
    if isinstance(hyp_result, dict):
656
      for hv_name, hv_result in hyp_result.iteritems():
657
        if hv_result is not None:
658
          feedback_fn("  - ERROR: hypervisor %s verify failure: '%s'" %
659
                      (hv_name, hv_result))
660
    return bad
661

    
662
  def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
663
                      node_instance, feedback_fn):
664
    """Verify an instance.
665

666
    This function checks to see if the required block devices are
667
    available on the instance's node.
668

669
    """
670
    bad = False
671

    
672
    node_current = instanceconfig.primary_node
673

    
674
    node_vol_should = {}
675
    instanceconfig.MapLVsByNode(node_vol_should)
676

    
677
    for node in node_vol_should:
678
      for volume in node_vol_should[node]:
679
        if node not in node_vol_is or volume not in node_vol_is[node]:
680
          feedback_fn("  - ERROR: volume %s missing on node %s" %
681
                          (volume, node))
682
          bad = True
683

    
684
    if not instanceconfig.status == 'down':
685
      if (node_current not in node_instance or
686
          not instance in node_instance[node_current]):
687
        feedback_fn("  - ERROR: instance %s not running on node %s" %
688
                        (instance, node_current))
689
        bad = True
690

    
691
    for node in node_instance:
692
      if (not node == node_current):
693
        if instance in node_instance[node]:
694
          feedback_fn("  - ERROR: instance %s should not run on node %s" %
695
                          (instance, node))
696
          bad = True
697

    
698
    return bad
699

    
700
  def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
701
    """Verify if there are any unknown volumes in the cluster.
702

703
    The .os, .swap and backup volumes are ignored. All other volumes are
704
    reported as unknown.
705

706
    """
707
    bad = False
708

    
709
    for node in node_vol_is:
710
      for volume in node_vol_is[node]:
711
        if node not in node_vol_should or volume not in node_vol_should[node]:
712
          feedback_fn("  - ERROR: volume %s on node %s should not exist" %
713
                      (volume, node))
714
          bad = True
715
    return bad
716

    
717
  def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
718
    """Verify the list of running instances.
719

720
    This checks what instances are running but unknown to the cluster.
721

722
    """
723
    bad = False
724
    for node in node_instance:
725
      for runninginstance in node_instance[node]:
726
        if runninginstance not in instancelist:
727
          feedback_fn("  - ERROR: instance %s on node %s should not exist" %
728
                          (runninginstance, node))
729
          bad = True
730
    return bad
731

    
732
  def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
733
    """Verify N+1 Memory Resilience.
734

735
    Check that if one single node dies we can still start all the instances it
736
    was primary for.
737

738
    """
739
    bad = False
740

    
741
    for node, nodeinfo in node_info.iteritems():
742
      # This code checks that every node which is now listed as secondary has
743
      # enough memory to host all instances it is supposed to should a single
744
      # other node in the cluster fail.
745
      # FIXME: not ready for failover to an arbitrary node
746
      # FIXME: does not support file-backed instances
747
      # WARNING: we currently take into account down instances as well as up
748
      # ones, considering that even if they're down someone might want to start
749
      # them even in the event of a node failure.
750
      for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
751
        needed_mem = 0
752
        for instance in instances:
753
          bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
754
          if bep[constants.BE_AUTO_BALANCE]:
755
            needed_mem += bep[constants.BE_MEMORY]
756
        if nodeinfo['mfree'] < needed_mem:
757
          feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
758
                      " failovers should node %s fail" % (node, prinode))
759
          bad = True
760
    return bad
761

    
762
  def CheckPrereq(self):
763
    """Check prerequisites.
764

765
    Transform the list of checks we're going to skip into a set and check that
766
    all its members are valid.
767

768
    """
769
    self.skip_set = frozenset(self.op.skip_checks)
770
    if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
771
      raise errors.OpPrereqError("Invalid checks to be skipped specified")
772

    
773
  def BuildHooksEnv(self):
774
    """Build hooks env.
775

776
    Cluster-Verify hooks just rone in the post phase and their failure makes
777
    the output be logged in the verify output and the verification to fail.
778

779
    """
780
    all_nodes = self.cfg.GetNodeList()
781
    # TODO: populate the environment with useful information for verify hooks
782
    env = {}
783
    return env, [], all_nodes
784

    
785
  def Exec(self, feedback_fn):
786
    """Verify integrity of cluster, performing various test on nodes.
787

788
    """
789
    bad = False
790
    feedback_fn("* Verifying global settings")
791
    for msg in self.cfg.VerifyConfig():
792
      feedback_fn("  - ERROR: %s" % msg)
793

    
794
    vg_name = self.cfg.GetVGName()
795
    hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
796
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
797
    nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
798
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
799
    i_non_redundant = [] # Non redundant instances
800
    i_non_a_balanced = [] # Non auto-balanced instances
801
    node_volume = {}
802
    node_instance = {}
803
    node_info = {}
804
    instance_cfg = {}
805

    
806
    # FIXME: verify OS list
807
    # do local checksums
808
    file_names = []
809
    file_names.append(constants.SSL_CERT_FILE)
810
    file_names.append(constants.CLUSTER_CONF_FILE)
811
    local_checksums = utils.FingerprintFiles(file_names)
812

    
813
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
814
    all_volumeinfo = self.rpc.call_volume_list(nodelist, vg_name)
815
    all_instanceinfo = self.rpc.call_instance_list(nodelist, hypervisors)
816
    all_vglist = self.rpc.call_vg_list(nodelist)
817
    node_verify_param = {
818
      'filelist': file_names,
819
      'nodelist': nodelist,
820
      'hypervisor': hypervisors,
821
      'node-net-test': [(node.name, node.primary_ip, node.secondary_ip)
822
                        for node in nodeinfo]
823
      }
824
    all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
825
                                           self.cfg.GetClusterName())
826
    all_rversion = self.rpc.call_version(nodelist)
827
    all_ninfo = self.rpc.call_node_info(nodelist, self.cfg.GetVGName(),
828
                                        self.cfg.GetHypervisorType())
829

    
830
    cluster = self.cfg.GetClusterInfo()
831
    for node in nodelist:
832
      feedback_fn("* Verifying node %s" % node)
833
      result = self._VerifyNode(node, file_names, local_checksums,
834
                                all_vglist[node], all_nvinfo[node],
835
                                all_rversion[node], feedback_fn)
836
      bad = bad or result
837

    
838
      # node_volume
839
      volumeinfo = all_volumeinfo[node]
840

    
841
      if isinstance(volumeinfo, basestring):
842
        feedback_fn("  - ERROR: LVM problem on node %s: %s" %
843
                    (node, volumeinfo[-400:].encode('string_escape')))
844
        bad = True
845
        node_volume[node] = {}
846
      elif not isinstance(volumeinfo, dict):
847
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
848
        bad = True
849
        continue
850
      else:
851
        node_volume[node] = volumeinfo
852

    
853
      # node_instance
854
      nodeinstance = all_instanceinfo[node]
855
      if type(nodeinstance) != list:
856
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
857
        bad = True
858
        continue
859

    
860
      node_instance[node] = nodeinstance
861

    
862
      # node_info
863
      nodeinfo = all_ninfo[node]
864
      if not isinstance(nodeinfo, dict):
865
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
866
        bad = True
867
        continue
868

    
869
      try:
870
        node_info[node] = {
871
          "mfree": int(nodeinfo['memory_free']),
872
          "dfree": int(nodeinfo['vg_free']),
873
          "pinst": [],
874
          "sinst": [],
875
          # dictionary holding all instances this node is secondary for,
876
          # grouped by their primary node. Each key is a cluster node, and each
877
          # value is a list of instances which have the key as primary and the
878
          # current node as secondary.  this is handy to calculate N+1 memory
879
          # availability if you can only failover from a primary to its
880
          # secondary.
881
          "sinst-by-pnode": {},
882
        }
883
      except ValueError:
884
        feedback_fn("  - ERROR: invalid value returned from node %s" % (node,))
885
        bad = True
886
        continue
887

    
888
    node_vol_should = {}
889

    
890
    for instance in instancelist:
891
      feedback_fn("* Verifying instance %s" % instance)
892
      inst_config = self.cfg.GetInstanceInfo(instance)
893
      result =  self._VerifyInstance(instance, inst_config, node_volume,
894
                                     node_instance, feedback_fn)
895
      bad = bad or result
896

    
897
      inst_config.MapLVsByNode(node_vol_should)
898

    
899
      instance_cfg[instance] = inst_config
900

    
901
      pnode = inst_config.primary_node
902
      if pnode in node_info:
903
        node_info[pnode]['pinst'].append(instance)
904
      else:
905
        feedback_fn("  - ERROR: instance %s, connection to primary node"
906
                    " %s failed" % (instance, pnode))
907
        bad = True
908

    
909
      # If the instance is non-redundant we cannot survive losing its primary
910
      # node, so we are not N+1 compliant. On the other hand we have no disk
911
      # templates with more than one secondary so that situation is not well
912
      # supported either.
913
      # FIXME: does not support file-backed instances
914
      if len(inst_config.secondary_nodes) == 0:
915
        i_non_redundant.append(instance)
916
      elif len(inst_config.secondary_nodes) > 1:
917
        feedback_fn("  - WARNING: multiple secondaries for instance %s"
918
                    % instance)
919

    
920
      if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
921
        i_non_a_balanced.append(instance)
922

    
923
      for snode in inst_config.secondary_nodes:
924
        if snode in node_info:
925
          node_info[snode]['sinst'].append(instance)
926
          if pnode not in node_info[snode]['sinst-by-pnode']:
927
            node_info[snode]['sinst-by-pnode'][pnode] = []
928
          node_info[snode]['sinst-by-pnode'][pnode].append(instance)
929
        else:
930
          feedback_fn("  - ERROR: instance %s, connection to secondary node"
931
                      " %s failed" % (instance, snode))
932

    
933
    feedback_fn("* Verifying orphan volumes")
934
    result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
935
                                       feedback_fn)
936
    bad = bad or result
937

    
938
    feedback_fn("* Verifying remaining instances")
939
    result = self._VerifyOrphanInstances(instancelist, node_instance,
940
                                         feedback_fn)
941
    bad = bad or result
942

    
943
    if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
944
      feedback_fn("* Verifying N+1 Memory redundancy")
945
      result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
946
      bad = bad or result
947

    
948
    feedback_fn("* Other Notes")
949
    if i_non_redundant:
950
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
951
                  % len(i_non_redundant))
952

    
953
    if i_non_a_balanced:
954
      feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
955
                  % len(i_non_a_balanced))
956

    
957
    return not bad
958

    
959
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
960
    """Analize the post-hooks' result
961

962
    This method analyses the hook result, handles it, and sends some
963
    nicely-formatted feedback back to the user.
964

965
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
966
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
967
    @param hooks_results: the results of the multi-node hooks rpc call
968
    @param feedback_fn: function used send feedback back to the caller
969
    @param lu_result: previous Exec result
970
    @return: the new Exec result, based on the previous result
971
        and hook results
972

973
    """
974
    # We only really run POST phase hooks, and are only interested in
975
    # their results
976
    if phase == constants.HOOKS_PHASE_POST:
977
      # Used to change hooks' output to proper indentation
978
      indent_re = re.compile('^', re.M)
979
      feedback_fn("* Hooks Results")
980
      if not hooks_results:
981
        feedback_fn("  - ERROR: general communication failure")
982
        lu_result = 1
983
      else:
984
        for node_name in hooks_results:
985
          show_node_header = True
986
          res = hooks_results[node_name]
987
          if res is False or not isinstance(res, list):
988
            feedback_fn("    Communication failure")
989
            lu_result = 1
990
            continue
991
          for script, hkr, output in res:
992
            if hkr == constants.HKR_FAIL:
993
              # The node header is only shown once, if there are
994
              # failing hooks on that node
995
              if show_node_header:
996
                feedback_fn("  Node %s:" % node_name)
997
                show_node_header = False
998
              feedback_fn("    ERROR: Script %s failed, output:" % script)
999
              output = indent_re.sub('      ', output)
1000
              feedback_fn("%s" % output)
1001
              lu_result = 1
1002

    
1003
      return lu_result
1004

    
1005

    
1006
class LUVerifyDisks(NoHooksLU):
1007
  """Verifies the cluster disks status.
1008

1009
  """
1010
  _OP_REQP = []
1011
  REQ_BGL = False
1012

    
1013
  def ExpandNames(self):
1014
    self.needed_locks = {
1015
      locking.LEVEL_NODE: locking.ALL_SET,
1016
      locking.LEVEL_INSTANCE: locking.ALL_SET,
1017
    }
1018
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1019

    
1020
  def CheckPrereq(self):
1021
    """Check prerequisites.
1022

1023
    This has no prerequisites.
1024

1025
    """
1026
    pass
1027

    
1028
  def Exec(self, feedback_fn):
1029
    """Verify integrity of cluster disks.
1030

1031
    """
1032
    result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
1033

    
1034
    vg_name = self.cfg.GetVGName()
1035
    nodes = utils.NiceSort(self.cfg.GetNodeList())
1036
    instances = [self.cfg.GetInstanceInfo(name)
1037
                 for name in self.cfg.GetInstanceList()]
1038

    
1039
    nv_dict = {}
1040
    for inst in instances:
1041
      inst_lvs = {}
1042
      if (inst.status != "up" or
1043
          inst.disk_template not in constants.DTS_NET_MIRROR):
1044
        continue
1045
      inst.MapLVsByNode(inst_lvs)
1046
      # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1047
      for node, vol_list in inst_lvs.iteritems():
1048
        for vol in vol_list:
1049
          nv_dict[(node, vol)] = inst
1050

    
1051
    if not nv_dict:
1052
      return result
1053

    
1054
    node_lvs = self.rpc.call_volume_list(nodes, vg_name)
1055

    
1056
    to_act = set()
1057
    for node in nodes:
1058
      # node_volume
1059
      lvs = node_lvs[node]
1060

    
1061
      if isinstance(lvs, basestring):
1062
        logging.warning("Error enumerating LVs on node %s: %s", node, lvs)
1063
        res_nlvm[node] = lvs
1064
      elif not isinstance(lvs, dict):
1065
        logging.warning("Connection to node %s failed or invalid data"
1066
                        " returned", node)
1067
        res_nodes.append(node)
1068
        continue
1069

    
1070
      for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
1071
        inst = nv_dict.pop((node, lv_name), None)
1072
        if (not lv_online and inst is not None
1073
            and inst.name not in res_instances):
1074
          res_instances.append(inst.name)
1075

    
1076
    # any leftover items in nv_dict are missing LVs, let's arrange the
1077
    # data better
1078
    for key, inst in nv_dict.iteritems():
1079
      if inst.name not in res_missing:
1080
        res_missing[inst.name] = []
1081
      res_missing[inst.name].append(key)
1082

    
1083
    return result
1084

    
1085

    
1086
class LURenameCluster(LogicalUnit):
1087
  """Rename the cluster.
1088

1089
  """
1090
  HPATH = "cluster-rename"
1091
  HTYPE = constants.HTYPE_CLUSTER
1092
  _OP_REQP = ["name"]
1093

    
1094
  def BuildHooksEnv(self):
1095
    """Build hooks env.
1096

1097
    """
1098
    env = {
1099
      "OP_TARGET": self.cfg.GetClusterName(),
1100
      "NEW_NAME": self.op.name,
1101
      }
1102
    mn = self.cfg.GetMasterNode()
1103
    return env, [mn], [mn]
1104

    
1105
  def CheckPrereq(self):
1106
    """Verify that the passed name is a valid one.
1107

1108
    """
1109
    hostname = utils.HostInfo(self.op.name)
1110

    
1111
    new_name = hostname.name
1112
    self.ip = new_ip = hostname.ip
1113
    old_name = self.cfg.GetClusterName()
1114
    old_ip = self.cfg.GetMasterIP()
1115
    if new_name == old_name and new_ip == old_ip:
1116
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
1117
                                 " cluster has changed")
1118
    if new_ip != old_ip:
1119
      if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1120
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
1121
                                   " reachable on the network. Aborting." %
1122
                                   new_ip)
1123

    
1124
    self.op.name = new_name
1125

    
1126
  def Exec(self, feedback_fn):
1127
    """Rename the cluster.
1128

1129
    """
1130
    clustername = self.op.name
1131
    ip = self.ip
1132

    
1133
    # shutdown the master IP
1134
    master = self.cfg.GetMasterNode()
1135
    if not self.rpc.call_node_stop_master(master, False):
1136
      raise errors.OpExecError("Could not disable the master role")
1137

    
1138
    try:
1139
      # modify the sstore
1140
      # TODO: sstore
1141
      ss.SetKey(ss.SS_MASTER_IP, ip)
1142
      ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
1143

    
1144
      # Distribute updated ss config to all nodes
1145
      myself = self.cfg.GetNodeInfo(master)
1146
      dist_nodes = self.cfg.GetNodeList()
1147
      if myself.name in dist_nodes:
1148
        dist_nodes.remove(myself.name)
1149

    
1150
      logging.debug("Copying updated ssconf data to all nodes")
1151
      for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1152
        fname = ss.KeyToFilename(keyname)
1153
        result = self.rpc.call_upload_file(dist_nodes, fname)
1154
        for to_node in dist_nodes:
1155
          if not result[to_node]:
1156
            self.LogWarning("Copy of file %s to node %s failed",
1157
                            fname, to_node)
1158
    finally:
1159
      if not self.rpc.call_node_start_master(master, False):
1160
        self.LogWarning("Could not re-enable the master role on"
1161
                        " the master, please restart manually.")
1162

    
1163

    
1164
def _RecursiveCheckIfLVMBased(disk):
1165
  """Check if the given disk or its children are lvm-based.
1166

1167
  @type disk: L{objects.Disk}
1168
  @param disk: the disk to check
1169
  @rtype: booleean
1170
  @return: boolean indicating whether a LD_LV dev_type was found or not
1171

1172
  """
1173
  if disk.children:
1174
    for chdisk in disk.children:
1175
      if _RecursiveCheckIfLVMBased(chdisk):
1176
        return True
1177
  return disk.dev_type == constants.LD_LV
1178

    
1179

    
1180
class LUSetClusterParams(LogicalUnit):
1181
  """Change the parameters of the cluster.
1182

1183
  """
1184
  HPATH = "cluster-modify"
1185
  HTYPE = constants.HTYPE_CLUSTER
1186
  _OP_REQP = []
1187
  REQ_BGL = False
1188

    
1189
  def ExpandNames(self):
1190
    # FIXME: in the future maybe other cluster params won't require checking on
1191
    # all nodes to be modified.
1192
    self.needed_locks = {
1193
      locking.LEVEL_NODE: locking.ALL_SET,
1194
    }
1195
    self.share_locks[locking.LEVEL_NODE] = 1
1196

    
1197
  def BuildHooksEnv(self):
1198
    """Build hooks env.
1199

1200
    """
1201
    env = {
1202
      "OP_TARGET": self.cfg.GetClusterName(),
1203
      "NEW_VG_NAME": self.op.vg_name,
1204
      }
1205
    mn = self.cfg.GetMasterNode()
1206
    return env, [mn], [mn]
1207

    
1208
  def CheckPrereq(self):
1209
    """Check prerequisites.
1210

1211
    This checks whether the given params don't conflict and
1212
    if the given volume group is valid.
1213

1214
    """
1215
    # FIXME: This only works because there is only one parameter that can be
1216
    # changed or removed.
1217
    if self.op.vg_name is not None and not self.op.vg_name:
1218
      instances = self.cfg.GetAllInstancesInfo().values()
1219
      for inst in instances:
1220
        for disk in inst.disks:
1221
          if _RecursiveCheckIfLVMBased(disk):
1222
            raise errors.OpPrereqError("Cannot disable lvm storage while"
1223
                                       " lvm-based instances exist")
1224

    
1225
    node_list = self.acquired_locks[locking.LEVEL_NODE]
1226

    
1227
    # if vg_name not None, checks given volume group on all nodes
1228
    if self.op.vg_name:
1229
      vglist = self.rpc.call_vg_list(node_list)
1230
      for node in node_list:
1231
        vgstatus = utils.CheckVolumeGroupSize(vglist[node], self.op.vg_name,
1232
                                              constants.MIN_VG_SIZE)
1233
        if vgstatus:
1234
          raise errors.OpPrereqError("Error on node '%s': %s" %
1235
                                     (node, vgstatus))
1236

    
1237
    self.cluster = cluster = self.cfg.GetClusterInfo()
1238
    # beparams changes do not need validation (we can't validate?),
1239
    # but we still process here
1240
    if self.op.beparams:
1241
      self.new_beparams = cluster.FillDict(
1242
        cluster.beparams[constants.BEGR_DEFAULT], self.op.beparams)
1243

    
1244
    # hypervisor list/parameters
1245
    self.new_hvparams = cluster.FillDict(cluster.hvparams, {})
1246
    if self.op.hvparams:
1247
      if not isinstance(self.op.hvparams, dict):
1248
        raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1249
      for hv_name, hv_dict in self.op.hvparams.items():
1250
        if hv_name not in self.new_hvparams:
1251
          self.new_hvparams[hv_name] = hv_dict
1252
        else:
1253
          self.new_hvparams[hv_name].update(hv_dict)
1254

    
1255
    if self.op.enabled_hypervisors is not None:
1256
      self.hv_list = self.op.enabled_hypervisors
1257
    else:
1258
      self.hv_list = cluster.enabled_hypervisors
1259

    
1260
    if self.op.hvparams or self.op.enabled_hypervisors is not None:
1261
      # either the enabled list has changed, or the parameters have, validate
1262
      for hv_name, hv_params in self.new_hvparams.items():
1263
        if ((self.op.hvparams and hv_name in self.op.hvparams) or
1264
            (self.op.enabled_hypervisors and
1265
             hv_name in self.op.enabled_hypervisors)):
1266
          # either this is a new hypervisor, or its parameters have changed
1267
          hv_class = hypervisor.GetHypervisor(hv_name)
1268
          hv_class.CheckParameterSyntax(hv_params)
1269
          _CheckHVParams(self, node_list, hv_name, hv_params)
1270

    
1271
  def Exec(self, feedback_fn):
1272
    """Change the parameters of the cluster.
1273

1274
    """
1275
    if self.op.vg_name is not None:
1276
      if self.op.vg_name != self.cfg.GetVGName():
1277
        self.cfg.SetVGName(self.op.vg_name)
1278
      else:
1279
        feedback_fn("Cluster LVM configuration already in desired"
1280
                    " state, not changing")
1281
    if self.op.hvparams:
1282
      self.cluster.hvparams = self.new_hvparams
1283
    if self.op.enabled_hypervisors is not None:
1284
      self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1285
    if self.op.beparams:
1286
      self.cluster.beparams[constants.BEGR_DEFAULT] = self.new_beparams
1287
    self.cfg.Update(self.cluster)
1288

    
1289

    
1290
def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1291
  """Sleep and poll for an instance's disk to sync.
1292

1293
  """
1294
  if not instance.disks:
1295
    return True
1296

    
1297
  if not oneshot:
1298
    lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1299

    
1300
  node = instance.primary_node
1301

    
1302
  for dev in instance.disks:
1303
    lu.cfg.SetDiskID(dev, node)
1304

    
1305
  retries = 0
1306
  while True:
1307
    max_time = 0
1308
    done = True
1309
    cumul_degraded = False
1310
    rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1311
    if not rstats:
1312
      lu.LogWarning("Can't get any data from node %s", node)
1313
      retries += 1
1314
      if retries >= 10:
1315
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1316
                                 " aborting." % node)
1317
      time.sleep(6)
1318
      continue
1319
    retries = 0
1320
    for i in range(len(rstats)):
1321
      mstat = rstats[i]
1322
      if mstat is None:
1323
        lu.LogWarning("Can't compute data for node %s/%s",
1324
                           node, instance.disks[i].iv_name)
1325
        continue
1326
      # we ignore the ldisk parameter
1327
      perc_done, est_time, is_degraded, _ = mstat
1328
      cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1329
      if perc_done is not None:
1330
        done = False
1331
        if est_time is not None:
1332
          rem_time = "%d estimated seconds remaining" % est_time
1333
          max_time = est_time
1334
        else:
1335
          rem_time = "no time estimate"
1336
        lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1337
                        (instance.disks[i].iv_name, perc_done, rem_time))
1338
    if done or oneshot:
1339
      break
1340

    
1341
    time.sleep(min(60, max_time))
1342

    
1343
  if done:
1344
    lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1345
  return not cumul_degraded
1346

    
1347

    
1348
def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1349
  """Check that mirrors are not degraded.
1350

1351
  The ldisk parameter, if True, will change the test from the
1352
  is_degraded attribute (which represents overall non-ok status for
1353
  the device(s)) to the ldisk (representing the local storage status).
1354

1355
  """
1356
  lu.cfg.SetDiskID(dev, node)
1357
  if ldisk:
1358
    idx = 6
1359
  else:
1360
    idx = 5
1361

    
1362
  result = True
1363
  if on_primary or dev.AssembleOnSecondary():
1364
    rstats = lu.rpc.call_blockdev_find(node, dev)
1365
    if not rstats:
1366
      logging.warning("Node %s: disk degraded, not found or node down", node)
1367
      result = False
1368
    else:
1369
      result = result and (not rstats[idx])
1370
  if dev.children:
1371
    for child in dev.children:
1372
      result = result and _CheckDiskConsistency(lu, child, node, on_primary)
1373

    
1374
  return result
1375

    
1376

    
1377
class LUDiagnoseOS(NoHooksLU):
1378
  """Logical unit for OS diagnose/query.
1379

1380
  """
1381
  _OP_REQP = ["output_fields", "names"]
1382
  REQ_BGL = False
1383
  _FIELDS_STATIC = utils.FieldSet()
1384
  _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status")
1385

    
1386
  def ExpandNames(self):
1387
    if self.op.names:
1388
      raise errors.OpPrereqError("Selective OS query not supported")
1389

    
1390
    _CheckOutputFields(static=self._FIELDS_STATIC,
1391
                       dynamic=self._FIELDS_DYNAMIC,
1392
                       selected=self.op.output_fields)
1393

    
1394
    # Lock all nodes, in shared mode
1395
    self.needed_locks = {}
1396
    self.share_locks[locking.LEVEL_NODE] = 1
1397
    self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1398

    
1399
  def CheckPrereq(self):
1400
    """Check prerequisites.
1401

1402
    """
1403

    
1404
  @staticmethod
1405
  def _DiagnoseByOS(node_list, rlist):
1406
    """Remaps a per-node return list into an a per-os per-node dictionary
1407

1408
    @param node_list: a list with the names of all nodes
1409
    @param rlist: a map with node names as keys and OS objects as values
1410

1411
    @rtype: dict
1412
    @returns: a dictionary with osnames as keys and as value another map, with
1413
        nodes as keys and list of OS objects as values, eg::
1414

1415
          {"debian-etch": {"node1": [<object>,...],
1416
                           "node2": [<object>,]}
1417
          }
1418

1419
    """
1420
    all_os = {}
1421
    for node_name, nr in rlist.iteritems():
1422
      if not nr:
1423
        continue
1424
      for os_obj in nr:
1425
        if os_obj.name not in all_os:
1426
          # build a list of nodes for this os containing empty lists
1427
          # for each node in node_list
1428
          all_os[os_obj.name] = {}
1429
          for nname in node_list:
1430
            all_os[os_obj.name][nname] = []
1431
        all_os[os_obj.name][node_name].append(os_obj)
1432
    return all_os
1433

    
1434
  def Exec(self, feedback_fn):
1435
    """Compute the list of OSes.
1436

1437
    """
1438
    node_list = self.acquired_locks[locking.LEVEL_NODE]
1439
    node_data = self.rpc.call_os_diagnose(node_list)
1440
    if node_data == False:
1441
      raise errors.OpExecError("Can't gather the list of OSes")
1442
    pol = self._DiagnoseByOS(node_list, node_data)
1443
    output = []
1444
    for os_name, os_data in pol.iteritems():
1445
      row = []
1446
      for field in self.op.output_fields:
1447
        if field == "name":
1448
          val = os_name
1449
        elif field == "valid":
1450
          val = utils.all([osl and osl[0] for osl in os_data.values()])
1451
        elif field == "node_status":
1452
          val = {}
1453
          for node_name, nos_list in os_data.iteritems():
1454
            val[node_name] = [(v.status, v.path) for v in nos_list]
1455
        else:
1456
          raise errors.ParameterError(field)
1457
        row.append(val)
1458
      output.append(row)
1459

    
1460
    return output
1461

    
1462

    
1463
class LURemoveNode(LogicalUnit):
1464
  """Logical unit for removing a node.
1465

1466
  """
1467
  HPATH = "node-remove"
1468
  HTYPE = constants.HTYPE_NODE
1469
  _OP_REQP = ["node_name"]
1470

    
1471
  def BuildHooksEnv(self):
1472
    """Build hooks env.
1473

1474
    This doesn't run on the target node in the pre phase as a failed
1475
    node would then be impossible to remove.
1476

1477
    """
1478
    env = {
1479
      "OP_TARGET": self.op.node_name,
1480
      "NODE_NAME": self.op.node_name,
1481
      }
1482
    all_nodes = self.cfg.GetNodeList()
1483
    all_nodes.remove(self.op.node_name)
1484
    return env, all_nodes, all_nodes
1485

    
1486
  def CheckPrereq(self):
1487
    """Check prerequisites.
1488

1489
    This checks:
1490
     - the node exists in the configuration
1491
     - it does not have primary or secondary instances
1492
     - it's not the master
1493

1494
    Any errors are signalled by raising errors.OpPrereqError.
1495

1496
    """
1497
    node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1498
    if node is None:
1499
      raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1500

    
1501
    instance_list = self.cfg.GetInstanceList()
1502

    
1503
    masternode = self.cfg.GetMasterNode()
1504
    if node.name == masternode:
1505
      raise errors.OpPrereqError("Node is the master node,"
1506
                                 " you need to failover first.")
1507

    
1508
    for instance_name in instance_list:
1509
      instance = self.cfg.GetInstanceInfo(instance_name)
1510
      if node.name == instance.primary_node:
1511
        raise errors.OpPrereqError("Instance %s still running on the node,"
1512
                                   " please remove first." % instance_name)
1513
      if node.name in instance.secondary_nodes:
1514
        raise errors.OpPrereqError("Instance %s has node as a secondary,"
1515
                                   " please remove first." % instance_name)
1516
    self.op.node_name = node.name
1517
    self.node = node
1518

    
1519
  def Exec(self, feedback_fn):
1520
    """Removes the node from the cluster.
1521

1522
    """
1523
    node = self.node
1524
    logging.info("Stopping the node daemon and removing configs from node %s",
1525
                 node.name)
1526

    
1527
    self.context.RemoveNode(node.name)
1528

    
1529
    self.rpc.call_node_leave_cluster(node.name)
1530

    
1531

    
1532
class LUQueryNodes(NoHooksLU):
1533
  """Logical unit for querying nodes.
1534

1535
  """
1536
  _OP_REQP = ["output_fields", "names"]
1537
  REQ_BGL = False
1538
  _FIELDS_DYNAMIC = utils.FieldSet(
1539
    "dtotal", "dfree",
1540
    "mtotal", "mnode", "mfree",
1541
    "bootid",
1542
    "ctotal",
1543
    )
1544

    
1545
  _FIELDS_STATIC = utils.FieldSet(
1546
    "name", "pinst_cnt", "sinst_cnt",
1547
    "pinst_list", "sinst_list",
1548
    "pip", "sip", "tags",
1549
    "serial_no",
1550
    )
1551

    
1552
  def ExpandNames(self):
1553
    _CheckOutputFields(static=self._FIELDS_STATIC,
1554
                       dynamic=self._FIELDS_DYNAMIC,
1555
                       selected=self.op.output_fields)
1556

    
1557
    self.needed_locks = {}
1558
    self.share_locks[locking.LEVEL_NODE] = 1
1559

    
1560
    if self.op.names:
1561
      self.wanted = _GetWantedNodes(self, self.op.names)
1562
    else:
1563
      self.wanted = locking.ALL_SET
1564

    
1565
    self.do_locking = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
1566
    if self.do_locking:
1567
      # if we don't request only static fields, we need to lock the nodes
1568
      self.needed_locks[locking.LEVEL_NODE] = self.wanted
1569

    
1570

    
1571
  def CheckPrereq(self):
1572
    """Check prerequisites.
1573

1574
    """
1575
    # The validation of the node list is done in the _GetWantedNodes,
1576
    # if non empty, and if empty, there's no validation to do
1577
    pass
1578

    
1579
  def Exec(self, feedback_fn):
1580
    """Computes the list of nodes and their attributes.
1581

1582
    """
1583
    all_info = self.cfg.GetAllNodesInfo()
1584
    if self.do_locking:
1585
      nodenames = self.acquired_locks[locking.LEVEL_NODE]
1586
    elif self.wanted != locking.ALL_SET:
1587
      nodenames = self.wanted
1588
      missing = set(nodenames).difference(all_info.keys())
1589
      if missing:
1590
        raise errors.OpExecError(
1591
          "Some nodes were removed before retrieving their data: %s" % missing)
1592
    else:
1593
      nodenames = all_info.keys()
1594

    
1595
    nodenames = utils.NiceSort(nodenames)
1596
    nodelist = [all_info[name] for name in nodenames]
1597

    
1598
    # begin data gathering
1599

    
1600
    if self.do_locking:
1601
      live_data = {}
1602
      node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
1603
                                          self.cfg.GetHypervisorType())
1604
      for name in nodenames:
1605
        nodeinfo = node_data.get(name, None)
1606
        if nodeinfo:
1607
          live_data[name] = {
1608
            "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1609
            "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1610
            "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1611
            "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1612
            "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1613
            "ctotal": utils.TryConvert(int, nodeinfo['cpu_total']),
1614
            "bootid": nodeinfo['bootid'],
1615
            }
1616
        else:
1617
          live_data[name] = {}
1618
    else:
1619
      live_data = dict.fromkeys(nodenames, {})
1620

    
1621
    node_to_primary = dict([(name, set()) for name in nodenames])
1622
    node_to_secondary = dict([(name, set()) for name in nodenames])
1623

    
1624
    inst_fields = frozenset(("pinst_cnt", "pinst_list",
1625
                             "sinst_cnt", "sinst_list"))
1626
    if inst_fields & frozenset(self.op.output_fields):
1627
      instancelist = self.cfg.GetInstanceList()
1628

    
1629
      for instance_name in instancelist:
1630
        inst = self.cfg.GetInstanceInfo(instance_name)
1631
        if inst.primary_node in node_to_primary:
1632
          node_to_primary[inst.primary_node].add(inst.name)
1633
        for secnode in inst.secondary_nodes:
1634
          if secnode in node_to_secondary:
1635
            node_to_secondary[secnode].add(inst.name)
1636

    
1637
    # end data gathering
1638

    
1639
    output = []
1640
    for node in nodelist:
1641
      node_output = []
1642
      for field in self.op.output_fields:
1643
        if field == "name":
1644
          val = node.name
1645
        elif field == "pinst_list":
1646
          val = list(node_to_primary[node.name])
1647
        elif field == "sinst_list":
1648
          val = list(node_to_secondary[node.name])
1649
        elif field == "pinst_cnt":
1650
          val = len(node_to_primary[node.name])
1651
        elif field == "sinst_cnt":
1652
          val = len(node_to_secondary[node.name])
1653
        elif field == "pip":
1654
          val = node.primary_ip
1655
        elif field == "sip":
1656
          val = node.secondary_ip
1657
        elif field == "tags":
1658
          val = list(node.GetTags())
1659
        elif field == "serial_no":
1660
          val = node.serial_no
1661
        elif self._FIELDS_DYNAMIC.Matches(field):
1662
          val = live_data[node.name].get(field, None)
1663
        else:
1664
          raise errors.ParameterError(field)
1665
        node_output.append(val)
1666
      output.append(node_output)
1667

    
1668
    return output
1669

    
1670

    
1671
class LUQueryNodeVolumes(NoHooksLU):
1672
  """Logical unit for getting volumes on node(s).
1673

1674
  """
1675
  _OP_REQP = ["nodes", "output_fields"]
1676
  REQ_BGL = False
1677
  _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
1678
  _FIELDS_STATIC = utils.FieldSet("node")
1679

    
1680
  def ExpandNames(self):
1681
    _CheckOutputFields(static=self._FIELDS_STATIC,
1682
                       dynamic=self._FIELDS_DYNAMIC,
1683
                       selected=self.op.output_fields)
1684

    
1685
    self.needed_locks = {}
1686
    self.share_locks[locking.LEVEL_NODE] = 1
1687
    if not self.op.nodes:
1688
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1689
    else:
1690
      self.needed_locks[locking.LEVEL_NODE] = \
1691
        _GetWantedNodes(self, self.op.nodes)
1692

    
1693
  def CheckPrereq(self):
1694
    """Check prerequisites.
1695

1696
    This checks that the fields required are valid output fields.
1697

1698
    """
1699
    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
1700

    
1701
  def Exec(self, feedback_fn):
1702
    """Computes the list of nodes and their attributes.
1703

1704
    """
1705
    nodenames = self.nodes
1706
    volumes = self.rpc.call_node_volumes(nodenames)
1707

    
1708
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
1709
             in self.cfg.GetInstanceList()]
1710

    
1711
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1712

    
1713
    output = []
1714
    for node in nodenames:
1715
      if node not in volumes or not volumes[node]:
1716
        continue
1717

    
1718
      node_vols = volumes[node][:]
1719
      node_vols.sort(key=lambda vol: vol['dev'])
1720

    
1721
      for vol in node_vols:
1722
        node_output = []
1723
        for field in self.op.output_fields:
1724
          if field == "node":
1725
            val = node
1726
          elif field == "phys":
1727
            val = vol['dev']
1728
          elif field == "vg":
1729
            val = vol['vg']
1730
          elif field == "name":
1731
            val = vol['name']
1732
          elif field == "size":
1733
            val = int(float(vol['size']))
1734
          elif field == "instance":
1735
            for inst in ilist:
1736
              if node not in lv_by_node[inst]:
1737
                continue
1738
              if vol['name'] in lv_by_node[inst][node]:
1739
                val = inst.name
1740
                break
1741
            else:
1742
              val = '-'
1743
          else:
1744
            raise errors.ParameterError(field)
1745
          node_output.append(str(val))
1746

    
1747
        output.append(node_output)
1748

    
1749
    return output
1750

    
1751

    
1752
class LUAddNode(LogicalUnit):
1753
  """Logical unit for adding node to the cluster.
1754

1755
  """
1756
  HPATH = "node-add"
1757
  HTYPE = constants.HTYPE_NODE
1758
  _OP_REQP = ["node_name"]
1759

    
1760
  def BuildHooksEnv(self):
1761
    """Build hooks env.
1762

1763
    This will run on all nodes before, and on all nodes + the new node after.
1764

1765
    """
1766
    env = {
1767
      "OP_TARGET": self.op.node_name,
1768
      "NODE_NAME": self.op.node_name,
1769
      "NODE_PIP": self.op.primary_ip,
1770
      "NODE_SIP": self.op.secondary_ip,
1771
      }
1772
    nodes_0 = self.cfg.GetNodeList()
1773
    nodes_1 = nodes_0 + [self.op.node_name, ]
1774
    return env, nodes_0, nodes_1
1775

    
1776
  def CheckPrereq(self):
1777
    """Check prerequisites.
1778

1779
    This checks:
1780
     - the new node is not already in the config
1781
     - it is resolvable
1782
     - its parameters (single/dual homed) matches the cluster
1783

1784
    Any errors are signalled by raising errors.OpPrereqError.
1785

1786
    """
1787
    node_name = self.op.node_name
1788
    cfg = self.cfg
1789

    
1790
    dns_data = utils.HostInfo(node_name)
1791

    
1792
    node = dns_data.name
1793
    primary_ip = self.op.primary_ip = dns_data.ip
1794
    secondary_ip = getattr(self.op, "secondary_ip", None)
1795
    if secondary_ip is None:
1796
      secondary_ip = primary_ip
1797
    if not utils.IsValidIP(secondary_ip):
1798
      raise errors.OpPrereqError("Invalid secondary IP given")
1799
    self.op.secondary_ip = secondary_ip
1800

    
1801
    node_list = cfg.GetNodeList()
1802
    if not self.op.readd and node in node_list:
1803
      raise errors.OpPrereqError("Node %s is already in the configuration" %
1804
                                 node)
1805
    elif self.op.readd and node not in node_list:
1806
      raise errors.OpPrereqError("Node %s is not in the configuration" % node)
1807

    
1808
    for existing_node_name in node_list:
1809
      existing_node = cfg.GetNodeInfo(existing_node_name)
1810

    
1811
      if self.op.readd and node == existing_node_name:
1812
        if (existing_node.primary_ip != primary_ip or
1813
            existing_node.secondary_ip != secondary_ip):
1814
          raise errors.OpPrereqError("Readded node doesn't have the same IP"
1815
                                     " address configuration as before")
1816
        continue
1817

    
1818
      if (existing_node.primary_ip == primary_ip or
1819
          existing_node.secondary_ip == primary_ip or
1820
          existing_node.primary_ip == secondary_ip or
1821
          existing_node.secondary_ip == secondary_ip):
1822
        raise errors.OpPrereqError("New node ip address(es) conflict with"
1823
                                   " existing node %s" % existing_node.name)
1824

    
1825
    # check that the type of the node (single versus dual homed) is the
1826
    # same as for the master
1827
    myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
1828
    master_singlehomed = myself.secondary_ip == myself.primary_ip
1829
    newbie_singlehomed = secondary_ip == primary_ip
1830
    if master_singlehomed != newbie_singlehomed:
1831
      if master_singlehomed:
1832
        raise errors.OpPrereqError("The master has no private ip but the"
1833
                                   " new node has one")
1834
      else:
1835
        raise errors.OpPrereqError("The master has a private ip but the"
1836
                                   " new node doesn't have one")
1837

    
1838
    # checks reachablity
1839
    if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1840
      raise errors.OpPrereqError("Node not reachable by ping")
1841

    
1842
    if not newbie_singlehomed:
1843
      # check reachability from my secondary ip to newbie's secondary ip
1844
      if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1845
                           source=myself.secondary_ip):
1846
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1847
                                   " based ping to noded port")
1848

    
1849
    self.new_node = objects.Node(name=node,
1850
                                 primary_ip=primary_ip,
1851
                                 secondary_ip=secondary_ip)
1852

    
1853
  def Exec(self, feedback_fn):
1854
    """Adds the new node to the cluster.
1855

1856
    """
1857
    new_node = self.new_node
1858
    node = new_node.name
1859

    
1860
    # check connectivity
1861
    result = self.rpc.call_version([node])[node]
1862
    if result:
1863
      if constants.PROTOCOL_VERSION == result:
1864
        logging.info("Communication to node %s fine, sw version %s match",
1865
                     node, result)
1866
      else:
1867
        raise errors.OpExecError("Version mismatch master version %s,"
1868
                                 " node version %s" %
1869
                                 (constants.PROTOCOL_VERSION, result))
1870
    else:
1871
      raise errors.OpExecError("Cannot get version from the new node")
1872

    
1873
    # setup ssh on node
1874
    logging.info("Copy ssh key to node %s", node)
1875
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1876
    keyarray = []
1877
    keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1878
                constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1879
                priv_key, pub_key]
1880

    
1881
    for i in keyfiles:
1882
      f = open(i, 'r')
1883
      try:
1884
        keyarray.append(f.read())
1885
      finally:
1886
        f.close()
1887

    
1888
    result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
1889
                                    keyarray[2],
1890
                                    keyarray[3], keyarray[4], keyarray[5])
1891

    
1892
    if not result:
1893
      raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1894

    
1895
    # Add node to our /etc/hosts, and add key to known_hosts
1896
    utils.AddHostToEtcHosts(new_node.name)
1897

    
1898
    if new_node.secondary_ip != new_node.primary_ip:
1899
      if not self.rpc.call_node_has_ip_address(new_node.name,
1900
                                               new_node.secondary_ip):
1901
        raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1902
                                 " you gave (%s). Please fix and re-run this"
1903
                                 " command." % new_node.secondary_ip)
1904

    
1905
    node_verify_list = [self.cfg.GetMasterNode()]
1906
    node_verify_param = {
1907
      'nodelist': [node],
1908
      # TODO: do a node-net-test as well?
1909
    }
1910

    
1911
    result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
1912
                                       self.cfg.GetClusterName())
1913
    for verifier in node_verify_list:
1914
      if not result[verifier]:
1915
        raise errors.OpExecError("Cannot communicate with %s's node daemon"
1916
                                 " for remote verification" % verifier)
1917
      if result[verifier]['nodelist']:
1918
        for failed in result[verifier]['nodelist']:
1919
          feedback_fn("ssh/hostname verification failed %s -> %s" %
1920
                      (verifier, result[verifier]['nodelist'][failed]))
1921
        raise errors.OpExecError("ssh/hostname verification failed.")
1922

    
1923
    # Distribute updated /etc/hosts and known_hosts to all nodes,
1924
    # including the node just added
1925
    myself = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
1926
    dist_nodes = self.cfg.GetNodeList()
1927
    if not self.op.readd:
1928
      dist_nodes.append(node)
1929
    if myself.name in dist_nodes:
1930
      dist_nodes.remove(myself.name)
1931

    
1932
    logging.debug("Copying hosts and known_hosts to all nodes")
1933
    for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1934
      result = self.rpc.call_upload_file(dist_nodes, fname)
1935
      for to_node in dist_nodes:
1936
        if not result[to_node]:
1937
          logging.error("Copy of file %s to node %s failed", fname, to_node)
1938

    
1939
    to_copy = []
1940
    if constants.HT_XEN_HVM in self.cfg.GetClusterInfo().enabled_hypervisors:
1941
      to_copy.append(constants.VNC_PASSWORD_FILE)
1942
    for fname in to_copy:
1943
      result = self.rpc.call_upload_file([node], fname)
1944
      if not result[node]:
1945
        logging.error("Could not copy file %s to node %s", fname, node)
1946

    
1947
    if self.op.readd:
1948
      self.context.ReaddNode(new_node)
1949
    else:
1950
      self.context.AddNode(new_node)
1951

    
1952

    
1953
class LUQueryClusterInfo(NoHooksLU):
1954
  """Query cluster configuration.
1955

1956
  """
1957
  _OP_REQP = []
1958
  REQ_BGL = False
1959

    
1960
  def ExpandNames(self):
1961
    self.needed_locks = {}
1962

    
1963
  def CheckPrereq(self):
1964
    """No prerequsites needed for this LU.
1965

1966
    """
1967
    pass
1968

    
1969
  def Exec(self, feedback_fn):
1970
    """Return cluster config.
1971

1972
    """
1973
    cluster = self.cfg.GetClusterInfo()
1974
    result = {
1975
      "software_version": constants.RELEASE_VERSION,
1976
      "protocol_version": constants.PROTOCOL_VERSION,
1977
      "config_version": constants.CONFIG_VERSION,
1978
      "os_api_version": constants.OS_API_VERSION,
1979
      "export_version": constants.EXPORT_VERSION,
1980
      "architecture": (platform.architecture()[0], platform.machine()),
1981
      "name": cluster.cluster_name,
1982
      "master": cluster.master_node,
1983
      "default_hypervisor": cluster.default_hypervisor,
1984
      "enabled_hypervisors": cluster.enabled_hypervisors,
1985
      "hvparams": cluster.hvparams,
1986
      "beparams": cluster.beparams,
1987
      }
1988

    
1989
    return result
1990

    
1991

    
1992
class LUQueryConfigValues(NoHooksLU):
1993
  """Return configuration values.
1994

1995
  """
1996
  _OP_REQP = []
1997
  REQ_BGL = False
1998
  _FIELDS_DYNAMIC = utils.FieldSet()
1999
  _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag")
2000

    
2001
  def ExpandNames(self):
2002
    self.needed_locks = {}
2003

    
2004
    _CheckOutputFields(static=self._FIELDS_STATIC,
2005
                       dynamic=self._FIELDS_DYNAMIC,
2006
                       selected=self.op.output_fields)
2007

    
2008
  def CheckPrereq(self):
2009
    """No prerequisites.
2010

2011
    """
2012
    pass
2013

    
2014
  def Exec(self, feedback_fn):
2015
    """Dump a representation of the cluster config to the standard output.
2016

2017
    """
2018
    values = []
2019
    for field in self.op.output_fields:
2020
      if field == "cluster_name":
2021
        entry = self.cfg.GetClusterName()
2022
      elif field == "master_node":
2023
        entry = self.cfg.GetMasterNode()
2024
      elif field == "drain_flag":
2025
        entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
2026
      else:
2027
        raise errors.ParameterError(field)
2028
      values.append(entry)
2029
    return values
2030

    
2031

    
2032
class LUActivateInstanceDisks(NoHooksLU):
2033
  """Bring up an instance's disks.
2034

2035
  """
2036
  _OP_REQP = ["instance_name"]
2037
  REQ_BGL = False
2038

    
2039
  def ExpandNames(self):
2040
    self._ExpandAndLockInstance()
2041
    self.needed_locks[locking.LEVEL_NODE] = []
2042
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2043

    
2044
  def DeclareLocks(self, level):
2045
    if level == locking.LEVEL_NODE:
2046
      self._LockInstancesNodes()
2047

    
2048
  def CheckPrereq(self):
2049
    """Check prerequisites.
2050

2051
    This checks that the instance is in the cluster.
2052

2053
    """
2054
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2055
    assert self.instance is not None, \
2056
      "Cannot retrieve locked instance %s" % self.op.instance_name
2057

    
2058
  def Exec(self, feedback_fn):
2059
    """Activate the disks.
2060

2061
    """
2062
    disks_ok, disks_info = _AssembleInstanceDisks(self, self.instance)
2063
    if not disks_ok:
2064
      raise errors.OpExecError("Cannot activate block devices")
2065

    
2066
    return disks_info
2067

    
2068

    
2069
def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
2070
  """Prepare the block devices for an instance.
2071

2072
  This sets up the block devices on all nodes.
2073

2074
  @type lu: L{LogicalUnit}
2075
  @param lu: the logical unit on whose behalf we execute
2076
  @type instance: L{objects.Instance}
2077
  @param instance: the instance for whose disks we assemble
2078
  @type ignore_secondaries: boolean
2079
  @param ignore_secondaries: if true, errors on secondary nodes
2080
      won't result in an error return from the function
2081
  @return: False if the operation failed, otherwise a list of
2082
      (host, instance_visible_name, node_visible_name)
2083
      with the mapping from node devices to instance devices
2084

2085
  """
2086
  device_info = []
2087
  disks_ok = True
2088
  iname = instance.name
2089
  # With the two passes mechanism we try to reduce the window of
2090
  # opportunity for the race condition of switching DRBD to primary
2091
  # before handshaking occured, but we do not eliminate it
2092

    
2093
  # The proper fix would be to wait (with some limits) until the
2094
  # connection has been made and drbd transitions from WFConnection
2095
  # into any other network-connected state (Connected, SyncTarget,
2096
  # SyncSource, etc.)
2097

    
2098
  # 1st pass, assemble on all nodes in secondary mode
2099
  for inst_disk in instance.disks:
2100
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2101
      lu.cfg.SetDiskID(node_disk, node)
2102
      result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
2103
      if not result:
2104
        lu.proc.LogWarning("Could not prepare block device %s on node %s"
2105
                           " (is_primary=False, pass=1)",
2106
                           inst_disk.iv_name, node)
2107
        if not ignore_secondaries:
2108
          disks_ok = False
2109

    
2110
  # FIXME: race condition on drbd migration to primary
2111

    
2112
  # 2nd pass, do only the primary node
2113
  for inst_disk in instance.disks:
2114
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2115
      if node != instance.primary_node:
2116
        continue
2117
      lu.cfg.SetDiskID(node_disk, node)
2118
      result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
2119
      if not result:
2120
        lu.proc.LogWarning("Could not prepare block device %s on node %s"
2121
                           " (is_primary=True, pass=2)",
2122
                           inst_disk.iv_name, node)
2123
        disks_ok = False
2124
    device_info.append((instance.primary_node, inst_disk.iv_name, result))
2125

    
2126
  # leave the disks configured for the primary node
2127
  # this is a workaround that would be fixed better by
2128
  # improving the logical/physical id handling
2129
  for disk in instance.disks:
2130
    lu.cfg.SetDiskID(disk, instance.primary_node)
2131

    
2132
  return disks_ok, device_info
2133

    
2134

    
2135
def _StartInstanceDisks(lu, instance, force):
2136
  """Start the disks of an instance.
2137

2138
  """
2139
  disks_ok, dummy = _AssembleInstanceDisks(lu, instance,
2140
                                           ignore_secondaries=force)
2141
  if not disks_ok:
2142
    _ShutdownInstanceDisks(lu, instance)
2143
    if force is not None and not force:
2144
      lu.proc.LogWarning("", hint="If the message above refers to a"
2145
                         " secondary node,"
2146
                         " you can retry the operation using '--force'.")
2147
    raise errors.OpExecError("Disk consistency error")
2148

    
2149

    
2150
class LUDeactivateInstanceDisks(NoHooksLU):
2151
  """Shutdown an instance's disks.
2152

2153
  """
2154
  _OP_REQP = ["instance_name"]
2155
  REQ_BGL = False
2156

    
2157
  def ExpandNames(self):
2158
    self._ExpandAndLockInstance()
2159
    self.needed_locks[locking.LEVEL_NODE] = []
2160
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2161

    
2162
  def DeclareLocks(self, level):
2163
    if level == locking.LEVEL_NODE:
2164
      self._LockInstancesNodes()
2165

    
2166
  def CheckPrereq(self):
2167
    """Check prerequisites.
2168

2169
    This checks that the instance is in the cluster.
2170

2171
    """
2172
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2173
    assert self.instance is not None, \
2174
      "Cannot retrieve locked instance %s" % self.op.instance_name
2175

    
2176
  def Exec(self, feedback_fn):
2177
    """Deactivate the disks
2178

2179
    """
2180
    instance = self.instance
2181
    _SafeShutdownInstanceDisks(self, instance)
2182

    
2183

    
2184
def _SafeShutdownInstanceDisks(lu, instance):
2185
  """Shutdown block devices of an instance.
2186

2187
  This function checks if an instance is running, before calling
2188
  _ShutdownInstanceDisks.
2189

2190
  """
2191
  ins_l = lu.rpc.call_instance_list([instance.primary_node],
2192
                                      [instance.hypervisor])
2193
  ins_l = ins_l[instance.primary_node]
2194
  if not type(ins_l) is list:
2195
    raise errors.OpExecError("Can't contact node '%s'" %
2196
                             instance.primary_node)
2197

    
2198
  if instance.name in ins_l:
2199
    raise errors.OpExecError("Instance is running, can't shutdown"
2200
                             " block devices.")
2201

    
2202
  _ShutdownInstanceDisks(lu, instance)
2203

    
2204

    
2205
def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
2206
  """Shutdown block devices of an instance.
2207

2208
  This does the shutdown on all nodes of the instance.
2209

2210
  If the ignore_primary is false, errors on the primary node are
2211
  ignored.
2212

2213
  """
2214
  result = True
2215
  for disk in instance.disks:
2216
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2217
      lu.cfg.SetDiskID(top_disk, node)
2218
      if not lu.rpc.call_blockdev_shutdown(node, top_disk):
2219
        logging.error("Could not shutdown block device %s on node %s",
2220
                      disk.iv_name, node)
2221
        if not ignore_primary or node != instance.primary_node:
2222
          result = False
2223
  return result
2224

    
2225

    
2226
def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor):
2227
  """Checks if a node has enough free memory.
2228

2229
  This function check if a given node has the needed amount of free
2230
  memory. In case the node has less memory or we cannot get the
2231
  information from the node, this function raise an OpPrereqError
2232
  exception.
2233

2234
  @type lu: C{LogicalUnit}
2235
  @param lu: a logical unit from which we get configuration data
2236
  @type node: C{str}
2237
  @param node: the node to check
2238
  @type reason: C{str}
2239
  @param reason: string to use in the error message
2240
  @type requested: C{int}
2241
  @param requested: the amount of memory in MiB to check for
2242
  @type hypervisor: C{str}
2243
  @param hypervisor: the hypervisor to ask for memory stats
2244
  @raise errors.OpPrereqError: if the node doesn't have enough memory, or
2245
      we cannot check the node
2246

2247
  """
2248
  nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor)
2249
  if not nodeinfo or not isinstance(nodeinfo, dict):
2250
    raise errors.OpPrereqError("Could not contact node %s for resource"
2251
                             " information" % (node,))
2252

    
2253
  free_mem = nodeinfo[node].get('memory_free')
2254
  if not isinstance(free_mem, int):
2255
    raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2256
                             " was '%s'" % (node, free_mem))
2257
  if requested > free_mem:
2258
    raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2259
                             " needed %s MiB, available %s MiB" %
2260
                             (node, reason, requested, free_mem))
2261

    
2262

    
2263
class LUStartupInstance(LogicalUnit):
2264
  """Starts an instance.
2265

2266
  """
2267
  HPATH = "instance-start"
2268
  HTYPE = constants.HTYPE_INSTANCE
2269
  _OP_REQP = ["instance_name", "force"]
2270
  REQ_BGL = False
2271

    
2272
  def ExpandNames(self):
2273
    self._ExpandAndLockInstance()
2274

    
2275
  def BuildHooksEnv(self):
2276
    """Build hooks env.
2277

2278
    This runs on master, primary and secondary nodes of the instance.
2279

2280
    """
2281
    env = {
2282
      "FORCE": self.op.force,
2283
      }
2284
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2285
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2286
          list(self.instance.secondary_nodes))
2287
    return env, nl, nl
2288

    
2289
  def CheckPrereq(self):
2290
    """Check prerequisites.
2291

2292
    This checks that the instance is in the cluster.
2293

2294
    """
2295
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2296
    assert self.instance is not None, \
2297
      "Cannot retrieve locked instance %s" % self.op.instance_name
2298

    
2299
    bep = self.cfg.GetClusterInfo().FillBE(instance)
2300
    # check bridges existance
2301
    _CheckInstanceBridgesExist(self, instance)
2302

    
2303
    _CheckNodeFreeMemory(self, instance.primary_node,
2304
                         "starting instance %s" % instance.name,
2305
                         bep[constants.BE_MEMORY], instance.hypervisor)
2306

    
2307
  def Exec(self, feedback_fn):
2308
    """Start the instance.
2309

2310
    """
2311
    instance = self.instance
2312
    force = self.op.force
2313
    extra_args = getattr(self.op, "extra_args", "")
2314

    
2315
    self.cfg.MarkInstanceUp(instance.name)
2316

    
2317
    node_current = instance.primary_node
2318

    
2319
    _StartInstanceDisks(self, instance, force)
2320

    
2321
    if not self.rpc.call_instance_start(node_current, instance, extra_args):
2322
      _ShutdownInstanceDisks(self, instance)
2323
      raise errors.OpExecError("Could not start instance")
2324

    
2325

    
2326
class LURebootInstance(LogicalUnit):
2327
  """Reboot an instance.
2328

2329
  """
2330
  HPATH = "instance-reboot"
2331
  HTYPE = constants.HTYPE_INSTANCE
2332
  _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2333
  REQ_BGL = False
2334

    
2335
  def ExpandNames(self):
2336
    if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2337
                                   constants.INSTANCE_REBOOT_HARD,
2338
                                   constants.INSTANCE_REBOOT_FULL]:
2339
      raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2340
                                  (constants.INSTANCE_REBOOT_SOFT,
2341
                                   constants.INSTANCE_REBOOT_HARD,
2342
                                   constants.INSTANCE_REBOOT_FULL))
2343
    self._ExpandAndLockInstance()
2344

    
2345
  def BuildHooksEnv(self):
2346
    """Build hooks env.
2347

2348
    This runs on master, primary and secondary nodes of the instance.
2349

2350
    """
2351
    env = {
2352
      "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2353
      }
2354
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2355
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2356
          list(self.instance.secondary_nodes))
2357
    return env, nl, nl
2358

    
2359
  def CheckPrereq(self):
2360
    """Check prerequisites.
2361

2362
    This checks that the instance is in the cluster.
2363

2364
    """
2365
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2366
    assert self.instance is not None, \
2367
      "Cannot retrieve locked instance %s" % self.op.instance_name
2368

    
2369
    # check bridges existance
2370
    _CheckInstanceBridgesExist(self, instance)
2371

    
2372
  def Exec(self, feedback_fn):
2373
    """Reboot the instance.
2374

2375
    """
2376
    instance = self.instance
2377
    ignore_secondaries = self.op.ignore_secondaries
2378
    reboot_type = self.op.reboot_type
2379
    extra_args = getattr(self.op, "extra_args", "")
2380

    
2381
    node_current = instance.primary_node
2382

    
2383
    if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2384
                       constants.INSTANCE_REBOOT_HARD]:
2385
      if not self.rpc.call_instance_reboot(node_current, instance,
2386
                                           reboot_type, extra_args):
2387
        raise errors.OpExecError("Could not reboot instance")
2388
    else:
2389
      if not self.rpc.call_instance_shutdown(node_current, instance):
2390
        raise errors.OpExecError("could not shutdown instance for full reboot")
2391
      _ShutdownInstanceDisks(self, instance)
2392
      _StartInstanceDisks(self, instance, ignore_secondaries)
2393
      if not self.rpc.call_instance_start(node_current, instance, extra_args):
2394
        _ShutdownInstanceDisks(self, instance)
2395
        raise errors.OpExecError("Could not start instance for full reboot")
2396

    
2397
    self.cfg.MarkInstanceUp(instance.name)
2398

    
2399

    
2400
class LUShutdownInstance(LogicalUnit):
2401
  """Shutdown an instance.
2402

2403
  """
2404
  HPATH = "instance-stop"
2405
  HTYPE = constants.HTYPE_INSTANCE
2406
  _OP_REQP = ["instance_name"]
2407
  REQ_BGL = False
2408

    
2409
  def ExpandNames(self):
2410
    self._ExpandAndLockInstance()
2411

    
2412
  def BuildHooksEnv(self):
2413
    """Build hooks env.
2414

2415
    This runs on master, primary and secondary nodes of the instance.
2416

2417
    """
2418
    env = _BuildInstanceHookEnvByObject(self, self.instance)
2419
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2420
          list(self.instance.secondary_nodes))
2421
    return env, nl, nl
2422

    
2423
  def CheckPrereq(self):
2424
    """Check prerequisites.
2425

2426
    This checks that the instance is in the cluster.
2427

2428
    """
2429
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2430
    assert self.instance is not None, \
2431
      "Cannot retrieve locked instance %s" % self.op.instance_name
2432

    
2433
  def Exec(self, feedback_fn):
2434
    """Shutdown the instance.
2435

2436
    """
2437
    instance = self.instance
2438
    node_current = instance.primary_node
2439
    self.cfg.MarkInstanceDown(instance.name)
2440
    if not self.rpc.call_instance_shutdown(node_current, instance):
2441
      self.proc.LogWarning("Could not shutdown instance")
2442

    
2443
    _ShutdownInstanceDisks(self, instance)
2444

    
2445

    
2446
class LUReinstallInstance(LogicalUnit):
2447
  """Reinstall an instance.
2448

2449
  """
2450
  HPATH = "instance-reinstall"
2451
  HTYPE = constants.HTYPE_INSTANCE
2452
  _OP_REQP = ["instance_name"]
2453
  REQ_BGL = False
2454

    
2455
  def ExpandNames(self):
2456
    self._ExpandAndLockInstance()
2457

    
2458
  def BuildHooksEnv(self):
2459
    """Build hooks env.
2460

2461
    This runs on master, primary and secondary nodes of the instance.
2462

2463
    """
2464
    env = _BuildInstanceHookEnvByObject(self, self.instance)
2465
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2466
          list(self.instance.secondary_nodes))
2467
    return env, nl, nl
2468

    
2469
  def CheckPrereq(self):
2470
    """Check prerequisites.
2471

2472
    This checks that the instance is in the cluster and is not running.
2473

2474
    """
2475
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2476
    assert instance is not None, \
2477
      "Cannot retrieve locked instance %s" % self.op.instance_name
2478

    
2479
    if instance.disk_template == constants.DT_DISKLESS:
2480
      raise errors.OpPrereqError("Instance '%s' has no disks" %
2481
                                 self.op.instance_name)
2482
    if instance.status != "down":
2483
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2484
                                 self.op.instance_name)
2485
    remote_info = self.rpc.call_instance_info(instance.primary_node,
2486
                                              instance.name,
2487
                                              instance.hypervisor)
2488
    if remote_info:
2489
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2490
                                 (self.op.instance_name,
2491
                                  instance.primary_node))
2492

    
2493
    self.op.os_type = getattr(self.op, "os_type", None)
2494
    if self.op.os_type is not None:
2495
      # OS verification
2496
      pnode = self.cfg.GetNodeInfo(
2497
        self.cfg.ExpandNodeName(instance.primary_node))
2498
      if pnode is None:
2499
        raise errors.OpPrereqError("Primary node '%s' is unknown" %
2500
                                   self.op.pnode)
2501
      os_obj = self.rpc.call_os_get(pnode.name, self.op.os_type)
2502
      if not os_obj:
2503
        raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2504
                                   " primary node"  % self.op.os_type)
2505

    
2506
    self.instance = instance
2507

    
2508
  def Exec(self, feedback_fn):
2509
    """Reinstall the instance.
2510

2511
    """
2512
    inst = self.instance
2513

    
2514
    if self.op.os_type is not None:
2515
      feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2516
      inst.os = self.op.os_type
2517
      self.cfg.Update(inst)
2518

    
2519
    _StartInstanceDisks(self, inst, None)
2520
    try:
2521
      feedback_fn("Running the instance OS create scripts...")
2522
      if not self.rpc.call_instance_os_add(inst.primary_node, inst):
2523
        raise errors.OpExecError("Could not install OS for instance %s"
2524
                                 " on node %s" %
2525
                                 (inst.name, inst.primary_node))
2526
    finally:
2527
      _ShutdownInstanceDisks(self, inst)
2528

    
2529

    
2530
class LURenameInstance(LogicalUnit):
2531
  """Rename an instance.
2532

2533
  """
2534
  HPATH = "instance-rename"
2535
  HTYPE = constants.HTYPE_INSTANCE
2536
  _OP_REQP = ["instance_name", "new_name"]
2537

    
2538
  def BuildHooksEnv(self):
2539
    """Build hooks env.
2540

2541
    This runs on master, primary and secondary nodes of the instance.
2542

2543
    """
2544
    env = _BuildInstanceHookEnvByObject(self, self.instance)
2545
    env["INSTANCE_NEW_NAME"] = self.op.new_name
2546
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2547
          list(self.instance.secondary_nodes))
2548
    return env, nl, nl
2549

    
2550
  def CheckPrereq(self):
2551
    """Check prerequisites.
2552

2553
    This checks that the instance is in the cluster and is not running.
2554

2555
    """
2556
    instance = self.cfg.GetInstanceInfo(
2557
      self.cfg.ExpandInstanceName(self.op.instance_name))
2558
    if instance is None:
2559
      raise errors.OpPrereqError("Instance '%s' not known" %
2560
                                 self.op.instance_name)
2561
    if instance.status != "down":
2562
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2563
                                 self.op.instance_name)
2564
    remote_info = self.rpc.call_instance_info(instance.primary_node,
2565
                                              instance.name,
2566
                                              instance.hypervisor)
2567
    if remote_info:
2568
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2569
                                 (self.op.instance_name,
2570
                                  instance.primary_node))
2571
    self.instance = instance
2572

    
2573
    # new name verification
2574
    name_info = utils.HostInfo(self.op.new_name)
2575

    
2576
    self.op.new_name = new_name = name_info.name
2577
    instance_list = self.cfg.GetInstanceList()
2578
    if new_name in instance_list:
2579
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2580
                                 new_name)
2581

    
2582
    if not getattr(self.op, "ignore_ip", False):
2583
      if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
2584
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
2585
                                   (name_info.ip, new_name))
2586

    
2587

    
2588
  def Exec(self, feedback_fn):
2589
    """Reinstall the instance.
2590

2591
    """
2592
    inst = self.instance
2593
    old_name = inst.name
2594

    
2595
    if inst.disk_template == constants.DT_FILE:
2596
      old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2597

    
2598
    self.cfg.RenameInstance(inst.name, self.op.new_name)
2599
    # Change the instance lock. This is definitely safe while we hold the BGL
2600
    self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
2601
    self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
2602

    
2603
    # re-read the instance from the configuration after rename
2604
    inst = self.cfg.GetInstanceInfo(self.op.new_name)
2605

    
2606
    if inst.disk_template == constants.DT_FILE:
2607
      new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2608
      result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
2609
                                                     old_file_storage_dir,
2610
                                                     new_file_storage_dir)
2611

    
2612
      if not result:
2613
        raise errors.OpExecError("Could not connect to node '%s' to rename"
2614
                                 " directory '%s' to '%s' (but the instance"
2615
                                 " has been renamed in Ganeti)" % (
2616
                                 inst.primary_node, old_file_storage_dir,
2617
                                 new_file_storage_dir))
2618

    
2619
      if not result[0]:
2620
        raise errors.OpExecError("Could not rename directory '%s' to '%s'"
2621
                                 " (but the instance has been renamed in"
2622
                                 " Ganeti)" % (old_file_storage_dir,
2623
                                               new_file_storage_dir))
2624

    
2625
    _StartInstanceDisks(self, inst, None)
2626
    try:
2627
      if not self.rpc.call_instance_run_rename(inst.primary_node, inst,
2628
                                               old_name):
2629
        msg = ("Could not run OS rename script for instance %s on node %s"
2630
               " (but the instance has been renamed in Ganeti)" %
2631
               (inst.name, inst.primary_node))
2632
        self.proc.LogWarning(msg)
2633
    finally:
2634
      _ShutdownInstanceDisks(self, inst)
2635

    
2636

    
2637
class LURemoveInstance(LogicalUnit):
2638
  """Remove an instance.
2639

2640
  """
2641
  HPATH = "instance-remove"
2642
  HTYPE = constants.HTYPE_INSTANCE
2643
  _OP_REQP = ["instance_name", "ignore_failures"]
2644
  REQ_BGL = False
2645

    
2646
  def ExpandNames(self):
2647
    self._ExpandAndLockInstance()
2648
    self.needed_locks[locking.LEVEL_NODE] = []
2649
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2650

    
2651
  def DeclareLocks(self, level):
2652
    if level == locking.LEVEL_NODE:
2653
      self._LockInstancesNodes()
2654

    
2655
  def BuildHooksEnv(self):
2656
    """Build hooks env.
2657

2658
    This runs on master, primary and secondary nodes of the instance.
2659

2660
    """
2661
    env = _BuildInstanceHookEnvByObject(self, self.instance)
2662
    nl = [self.cfg.GetMasterNode()]
2663
    return env, nl, nl
2664

    
2665
  def CheckPrereq(self):
2666
    """Check prerequisites.
2667

2668
    This checks that the instance is in the cluster.
2669

2670
    """
2671
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2672
    assert self.instance is not None, \
2673
      "Cannot retrieve locked instance %s" % self.op.instance_name
2674

    
2675
  def Exec(self, feedback_fn):
2676
    """Remove the instance.
2677

2678
    """
2679
    instance = self.instance
2680
    logging.info("Shutting down instance %s on node %s",
2681
                 instance.name, instance.primary_node)
2682

    
2683
    if not self.rpc.call_instance_shutdown(instance.primary_node, instance):
2684
      if self.op.ignore_failures:
2685
        feedback_fn("Warning: can't shutdown instance")
2686
      else:
2687
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2688
                                 (instance.name, instance.primary_node))
2689

    
2690
    logging.info("Removing block devices for instance %s", instance.name)
2691

    
2692
    if not _RemoveDisks(self, instance):
2693
      if self.op.ignore_failures:
2694
        feedback_fn("Warning: can't remove instance's disks")
2695
      else:
2696
        raise errors.OpExecError("Can't remove instance's disks")
2697

    
2698
    logging.info("Removing instance %s out of cluster config", instance.name)
2699

    
2700
    self.cfg.RemoveInstance(instance.name)
2701
    self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
2702

    
2703

    
2704
class LUQueryInstances(NoHooksLU):
2705
  """Logical unit for querying instances.
2706

2707
  """
2708
  _OP_REQP = ["output_fields", "names"]
2709
  REQ_BGL = False
2710
  _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
2711
                                    "admin_state", "admin_ram",
2712
                                    "disk_template", "ip", "mac", "bridge",
2713
                                    "sda_size", "sdb_size", "vcpus", "tags",
2714
                                    "network_port", "beparams",
2715
                                    "(disk).(size)/([0-9]+)",
2716
                                    "(disk).(sizes)",
2717
                                    "(nic).(mac|ip|bridge)/([0-9]+)",
2718
                                    "(nic).(macs|ips|bridges)",
2719
                                    "(disk|nic).(count)",
2720
                                    "serial_no", "hypervisor", "hvparams",] +
2721
                                  ["hv/%s" % name
2722
                                   for name in constants.HVS_PARAMETERS] +
2723
                                  ["be/%s" % name
2724
                                   for name in constants.BES_PARAMETERS])
2725
  _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
2726

    
2727

    
2728
  def ExpandNames(self):
2729
    _CheckOutputFields(static=self._FIELDS_STATIC,
2730
                       dynamic=self._FIELDS_DYNAMIC,
2731
                       selected=self.op.output_fields)
2732

    
2733
    self.needed_locks = {}
2734
    self.share_locks[locking.LEVEL_INSTANCE] = 1
2735
    self.share_locks[locking.LEVEL_NODE] = 1
2736

    
2737
    if self.op.names:
2738
      self.wanted = _GetWantedInstances(self, self.op.names)
2739
    else:
2740
      self.wanted = locking.ALL_SET
2741

    
2742
    self.do_locking = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
2743
    if self.do_locking:
2744
      self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
2745
      self.needed_locks[locking.LEVEL_NODE] = []
2746
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2747

    
2748
  def DeclareLocks(self, level):
2749
    if level == locking.LEVEL_NODE and self.do_locking:
2750
      self._LockInstancesNodes()
2751

    
2752
  def CheckPrereq(self):
2753
    """Check prerequisites.
2754

2755
    """
2756
    pass
2757

    
2758
  def Exec(self, feedback_fn):
2759
    """Computes the list of nodes and their attributes.
2760

2761
    """
2762
    all_info = self.cfg.GetAllInstancesInfo()
2763
    if self.do_locking:
2764
      instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
2765
    elif self.wanted != locking.ALL_SET:
2766
      instance_names = self.wanted
2767
      missing = set(instance_names).difference(all_info.keys())
2768
      if missing:
2769
        raise errors.OpExecError(
2770
          "Some instances were removed before retrieving their data: %s"
2771
          % missing)
2772
    else:
2773
      instance_names = all_info.keys()
2774

    
2775
    instance_names = utils.NiceSort(instance_names)
2776
    instance_list = [all_info[iname] for iname in instance_names]
2777

    
2778
    # begin data gathering
2779

    
2780
    nodes = frozenset([inst.primary_node for inst in instance_list])
2781
    hv_list = list(set([inst.hypervisor for inst in instance_list]))
2782

    
2783
    bad_nodes = []
2784
    if self.do_locking:
2785
      live_data = {}
2786
      node_data = self.rpc.call_all_instances_info(nodes, hv_list)
2787
      for name in nodes:
2788
        result = node_data[name]
2789
        if result:
2790
          live_data.update(result)
2791
        elif result == False:
2792
          bad_nodes.append(name)
2793
        # else no instance is alive
2794
    else:
2795
      live_data = dict([(name, {}) for name in instance_names])
2796

    
2797
    # end data gathering
2798

    
2799
    HVPREFIX = "hv/"
2800
    BEPREFIX = "be/"
2801
    output = []
2802
    for instance in instance_list:
2803
      iout = []
2804
      i_hv = self.cfg.GetClusterInfo().FillHV(instance)
2805
      i_be = self.cfg.GetClusterInfo().FillBE(instance)
2806
      for field in self.op.output_fields:
2807
        st_match = self._FIELDS_STATIC.Matches(field)
2808
        if field == "name":
2809
          val = instance.name
2810
        elif field == "os":
2811
          val = instance.os
2812
        elif field == "pnode":
2813
          val = instance.primary_node
2814
        elif field == "snodes":
2815
          val = list(instance.secondary_nodes)
2816
        elif field == "admin_state":
2817
          val = (instance.status != "down")
2818
        elif field == "oper_state":
2819
          if instance.primary_node in bad_nodes:
2820
            val = None
2821
          else:
2822
            val = bool(live_data.get(instance.name))
2823
        elif field == "status":
2824
          if instance.primary_node in bad_nodes:
2825
            val = "ERROR_nodedown"
2826
          else:
2827
            running = bool(live_data.get(instance.name))
2828
            if running:
2829
              if instance.status != "down":
2830
                val = "running"
2831
              else:
2832
                val = "ERROR_up"
2833
            else:
2834
              if instance.status != "down":
2835
                val = "ERROR_down"
2836
              else:
2837
                val = "ADMIN_down"
2838
        elif field == "oper_ram":
2839
          if instance.primary_node in bad_nodes:
2840
            val = None
2841
          elif instance.name in live_data:
2842
            val = live_data[instance.name].get("memory", "?")
2843
          else:
2844
            val = "-"
2845
        elif field == "disk_template":
2846
          val = instance.disk_template
2847
        elif field == "ip":
2848
          val = instance.nics[0].ip
2849
        elif field == "bridge":
2850
          val = instance.nics[0].bridge
2851
        elif field == "mac":
2852
          val = instance.nics[0].mac
2853
        elif field == "sda_size" or field == "sdb_size":
2854
          idx = ord(field[2]) - ord('a')
2855
          try:
2856
            val = instance.FindDisk(idx).size
2857
          except errors.OpPrereqError:
2858
            val = None
2859
        elif field == "tags":
2860
          val = list(instance.GetTags())
2861
        elif field == "serial_no":
2862
          val = instance.serial_no
2863
        elif field == "network_port":
2864
          val = instance.network_port
2865
        elif field == "hypervisor":
2866
          val = instance.hypervisor
2867
        elif field == "hvparams":
2868
          val = i_hv
2869
        elif (field.startswith(HVPREFIX) and
2870
              field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
2871
          val = i_hv.get(field[len(HVPREFIX):], None)
2872
        elif field == "beparams":
2873
          val = i_be
2874
        elif (field.startswith(BEPREFIX) and
2875
              field[len(BEPREFIX):] in constants.BES_PARAMETERS):
2876
          val = i_be.get(field[len(BEPREFIX):], None)
2877
        elif st_match and st_match.groups():
2878
          # matches a variable list
2879
          st_groups = st_match.groups()
2880
          if st_groups and st_groups[0] == "disk":
2881
            if st_groups[1] == "count":
2882
              val = len(instance.disks)
2883
            elif st_groups[1] == "sizes":
2884
              val = [disk.size for disk in instance.disks]
2885
            elif st_groups[1] == "size":
2886
              try:
2887
                val = instance.FindDisk(st_groups[2]).size
2888
              except errors.OpPrereqError:
2889
                val = None
2890
            else:
2891
              assert False, "Unhandled disk parameter"
2892
          elif st_groups[0] == "nic":
2893
            if st_groups[1] == "count":
2894
              val = len(instance.nics)
2895
            elif st_groups[1] == "macs":
2896
              val = [nic.mac for nic in instance.nics]
2897
            elif st_groups[1] == "ips":
2898
              val = [nic.ip for nic in instance.nics]
2899
            elif st_groups[1] == "bridges":
2900
              val = [nic.bridge for nic in instance.nics]
2901
            else:
2902
              # index-based item
2903
              nic_idx = int(st_groups[2])
2904
              if nic_idx >= len(instance.nics):
2905
                val = None
2906
              else:
2907
                if st_groups[1] == "mac":
2908
                  val = instance.nics[nic_idx].mac
2909
                elif st_groups[1] == "ip":
2910
                  val = instance.nics[nic_idx].ip
2911
                elif st_groups[1] == "bridge":
2912
                  val = instance.nics[nic_idx].bridge
2913
                else:
2914
                  assert False, "Unhandled NIC parameter"
2915
          else:
2916
            assert False, "Unhandled variable parameter"
2917
        else:
2918
          raise errors.ParameterError(field)
2919
        iout.append(val)
2920
      output.append(iout)
2921

    
2922
    return output
2923

    
2924

    
2925
class LUFailoverInstance(LogicalUnit):
2926
  """Failover an instance.
2927

2928
  """
2929
  HPATH = "instance-failover"
2930
  HTYPE = constants.HTYPE_INSTANCE
2931
  _OP_REQP = ["instance_name", "ignore_consistency"]
2932
  REQ_BGL = False
2933

    
2934
  def ExpandNames(self):
2935
    self._ExpandAndLockInstance()
2936
    self.needed_locks[locking.LEVEL_NODE] = []
2937
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2938

    
2939
  def DeclareLocks(self, level):
2940
    if level == locking.LEVEL_NODE:
2941
      self._LockInstancesNodes()
2942

    
2943
  def BuildHooksEnv(self):
2944
    """Build hooks env.
2945

2946
    This runs on master, primary and secondary nodes of the instance.
2947

2948
    """
2949
    env = {
2950
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2951
      }
2952
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2953
    nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
2954
    return env, nl, nl
2955

    
2956
  def CheckPrereq(self):
2957
    """Check prerequisites.
2958

2959
    This checks that the instance is in the cluster.
2960

2961
    """
2962
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2963
    assert self.instance is not None, \
2964
      "Cannot retrieve locked instance %s" % self.op.instance_name
2965

    
2966
    bep = self.cfg.GetClusterInfo().FillBE(instance)
2967
    if instance.disk_template not in constants.DTS_NET_MIRROR:
2968
      raise errors.OpPrereqError("Instance's disk layout is not"
2969
                                 " network mirrored, cannot failover.")
2970

    
2971
    secondary_nodes = instance.secondary_nodes
2972
    if not secondary_nodes:
2973
      raise errors.ProgrammerError("no secondary node but using "
2974
                                   "a mirrored disk template")
2975

    
2976
    target_node = secondary_nodes[0]
2977
    # check memory requirements on the secondary node
2978
    _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
2979
                         instance.name, bep[constants.BE_MEMORY],
2980
                         instance.hypervisor)
2981

    
2982
    # check bridge existance
2983
    brlist = [nic.bridge for nic in instance.nics]
2984
    if not self.rpc.call_bridges_exist(target_node, brlist):
2985
      raise errors.OpPrereqError("One or more target bridges %s does not"
2986
                                 " exist on destination node '%s'" %
2987
                                 (brlist, target_node))
2988

    
2989
  def Exec(self, feedback_fn):
2990
    """Failover an instance.
2991

2992
    The failover is done by shutting it down on its present node and
2993
    starting it on the secondary.
2994

2995
    """
2996
    instance = self.instance
2997

    
2998
    source_node = instance.primary_node
2999
    target_node = instance.secondary_nodes[0]
3000

    
3001
    feedback_fn("* checking disk consistency between source and target")
3002
    for dev in instance.disks:
3003
      # for drbd, these are drbd over lvm
3004
      if not _CheckDiskConsistency(self, dev, target_node, False):
3005
        if instance.status == "up" and not self.op.ignore_consistency:
3006
          raise errors.OpExecError("Disk %s is degraded on target node,"
3007
                                   " aborting failover." % dev.iv_name)
3008

    
3009
    feedback_fn("* shutting down instance on source node")
3010
    logging.info("Shutting down instance %s on node %s",
3011
                 instance.name, source_node)
3012

    
3013
    if not self.rpc.call_instance_shutdown(source_node, instance):
3014
      if self.op.ignore_consistency:
3015
        self.proc.LogWarning("Could not shutdown instance %s on node %s."
3016
                             " Proceeding"
3017
                             " anyway. Please make sure node %s is down",
3018
                             instance.name, source_node, source_node)
3019
      else:
3020
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
3021
                                 (instance.name, source_node))
3022

    
3023
    feedback_fn("* deactivating the instance's disks on source node")
3024
    if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
3025
      raise errors.OpExecError("Can't shut down the instance's disks.")
3026

    
3027
    instance.primary_node = target_node
3028
    # distribute new instance config to the other nodes
3029
    self.cfg.Update(instance)
3030

    
3031
    # Only start the instance if it's marked as up
3032
    if instance.status == "up":
3033
      feedback_fn("* activating the instance's disks on target node")
3034
      logging.info("Starting instance %s on node %s",
3035
                   instance.name, target_node)
3036

    
3037
      disks_ok, dummy = _AssembleInstanceDisks(self, instance,
3038
                                               ignore_secondaries=True)
3039
      if not disks_ok:
3040
        _ShutdownInstanceDisks(self, instance)
3041
        raise errors.OpExecError("Can't activate the instance's disks")
3042

    
3043
      feedback_fn("* starting the instance on the target node")
3044
      if not self.rpc.call_instance_start(target_node, instance, None):
3045
        _ShutdownInstanceDisks(self, instance)
3046
        raise errors.OpExecError("Could not start instance %s on node %s." %
3047
                                 (instance.name, target_node))
3048

    
3049

    
3050
def _CreateBlockDevOnPrimary(lu, node, instance, device, info):
3051
  """Create a tree of block devices on the primary node.
3052

3053
  This always creates all devices.
3054

3055
  """
3056
  if device.children:
3057
    for child in device.children:
3058
      if not _CreateBlockDevOnPrimary(lu, node, instance, child, info):
3059
        return False
3060

    
3061
  lu.cfg.SetDiskID(device, node)
3062
  new_id = lu.rpc.call_blockdev_create(node, device, device.size,
3063
                                       instance.name, True, info)
3064
  if not new_id:
3065
    return False
3066
  if device.physical_id is None:
3067
    device.physical_id = new_id
3068
  return True
3069

    
3070

    
3071
def _CreateBlockDevOnSecondary(lu, node, instance, device, force, info):
3072
  """Create a tree of block devices on a secondary node.
3073

3074
  If this device type has to be created on secondaries, create it and
3075
  all its children.
3076

3077
  If not, just recurse to children keeping the same 'force' value.
3078

3079
  """
3080
  if device.CreateOnSecondary():
3081
    force = True
3082
  if device.children:
3083
    for child in device.children:
3084
      if not _CreateBlockDevOnSecondary(lu, node, instance,
3085
                                        child, force, info):
3086
        return False
3087

    
3088
  if not force:
3089
    return True
3090
  lu.cfg.SetDiskID(device, node)
3091
  new_id = lu.rpc.call_blockdev_create(node, device, device.size,
3092
                                       instance.name, False, info)
3093
  if not new_id:
3094
    return False
3095
  if device.physical_id is None:
3096
    device.physical_id = new_id
3097
  return True
3098

    
3099

    
3100
def _GenerateUniqueNames(lu, exts):
3101
  """Generate a suitable LV name.
3102

3103
  This will generate a logical volume name for the given instance.
3104

3105
  """
3106
  results = []
3107
  for val in exts:
3108
    new_id = lu.cfg.GenerateUniqueID()
3109
    results.append("%s%s" % (new_id, val))
3110
  return results
3111

    
3112

    
3113
def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
3114
                         p_minor, s_minor):
3115
  """Generate a drbd8 device complete with its children.
3116

3117
  """
3118
  port = lu.cfg.AllocatePort()
3119
  vgname = lu.cfg.GetVGName()
3120
  shared_secret = lu.cfg.GenerateDRBDSecret()
3121
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3122
                          logical_id=(vgname, names[0]))
3123
  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3124
                          logical_id=(vgname, names[1]))
3125
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
3126
                          logical_id=(primary, secondary, port,
3127
                                      p_minor, s_minor,
3128
                                      shared_secret),
3129
                          children=[dev_data, dev_meta],
3130
                          iv_name=iv_name)
3131
  return drbd_dev
3132

    
3133

    
3134
def _GenerateDiskTemplate(lu, template_name,
3135
                          instance_name, primary_node,
3136
                          secondary_nodes, disk_info,
3137
                          file_storage_dir, file_driver,
3138
                          base_index):
3139
  """Generate the entire disk layout for a given template type.
3140

3141
  """
3142
  #TODO: compute space requirements
3143

    
3144
  vgname = lu.cfg.GetVGName()
3145
  disk_count = len(disk_info)
3146
  disks = []
3147
  if template_name == constants.DT_DISKLESS:
3148
    pass
3149
  elif template_name == constants.DT_PLAIN:
3150
    if len(secondary_nodes) != 0:
3151
      raise errors.ProgrammerError("Wrong template configuration")
3152

    
3153
    names = _GenerateUniqueNames(lu, [".disk%d" % i
3154
                                      for i in range(disk_count)])
3155
    for idx, disk in enumerate(disk_info):
3156
      disk_index = idx + base_index
3157
      disk_dev = objects.Disk(dev_type=constants.LD_LV, size=disk["size"],
3158
                              logical_id=(vgname, names[idx]),
3159
                              iv_name="disk/%d" % disk_index)
3160
      disks.append(disk_dev)
3161
  elif template_name == constants.DT_DRBD8:
3162
    if len(secondary_nodes) != 1:
3163
      raise errors.ProgrammerError("Wrong template configuration")
3164
    remote_node = secondary_nodes[0]
3165
    minors = lu.cfg.AllocateDRBDMinor(
3166
      [primary_node, remote_node] * len(disk_info), instance_name)
3167

    
3168
    names = _GenerateUniqueNames(lu,
3169
                                 [".disk%d_%s" % (i, s)
3170
                                  for i in range(disk_count)
3171
                                  for s in ("data", "meta")
3172
                                  ])
3173
    for idx, disk in enumerate(disk_info):
3174
      disk_index = idx + base_index
3175
      disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
3176
                                      disk["size"], names[idx*2:idx*2+2],
3177
                                      "disk/%d" % disk_index,
3178
                                      minors[idx*2], minors[idx*2+1])
3179
      disks.append(disk_dev)
3180
  elif template_name == constants.DT_FILE:
3181
    if len(secondary_nodes) != 0:
3182
      raise errors.ProgrammerError("Wrong template configuration")
3183

    
3184
    for idx, disk in enumerate(disk_info):
3185
      disk_index = idx + base_index
3186
      disk_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk["size"],
3187
                              iv_name="disk/%d" % disk_index,
3188
                              logical_id=(file_driver,
3189
                                          "%s/disk%d" % (file_storage_dir,
3190
                                                         idx)))
3191
      disks.append(disk_dev)
3192
  else:
3193
    raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
3194
  return disks
3195

    
3196

    
3197
def _GetInstanceInfoText(instance):
3198
  """Compute that text that should be added to the disk's metadata.
3199

3200
  """
3201
  return "originstname+%s" % instance.name
3202

    
3203

    
3204
def _CreateDisks(lu, instance):
3205
  """Create all disks for an instance.
3206

3207
  This abstracts away some work from AddInstance.
3208

3209
  @type lu: L{LogicalUnit}
3210
  @param lu: the logical unit on whose behalf we execute
3211
  @type instance: L{objects.Instance}
3212
  @param instance: the instance whose disks we should create
3213
  @rtype: boolean
3214
  @return: the success of the creation
3215

3216
  """
3217
  info = _GetInstanceInfoText(instance)
3218

    
3219
  if instance.disk_template == constants.DT_FILE:
3220
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3221
    result = lu.rpc.call_file_storage_dir_create(instance.primary_node,
3222
                                                 file_storage_dir)
3223

    
3224
    if not result:
3225
      logging.error("Could not connect to node '%s'", instance.primary_node)
3226
      return False
3227

    
3228
    if not result[0]:
3229
      logging.error("Failed to create directory '%s'", file_storage_dir)
3230
      return False
3231

    
3232
  # Note: this needs to be kept in sync with adding of disks in
3233
  # LUSetInstanceParams
3234
  for device in instance.disks:
3235
    logging.info("Creating volume %s for instance %s",
3236
                 device.iv_name, instance.name)
3237
    #HARDCODE
3238
    for secondary_node in instance.secondary_nodes:
3239
      if not _CreateBlockDevOnSecondary(lu, secondary_node, instance,
3240
                                        device, False, info):
3241
        logging.error("Failed to create volume %s (%s) on secondary node %s!",
3242
                      device.iv_name, device, secondary_node)
3243
        return False
3244
    #HARDCODE
3245
    if not _CreateBlockDevOnPrimary(lu, instance.primary_node,
3246
                                    instance, device, info):
3247
      logging.error("Failed to create volume %s on primary!", device.iv_name)
3248
      return False
3249

    
3250
  return True
3251

    
3252

    
3253
def _RemoveDisks(lu, instance):
3254
  """Remove all disks for an instance.
3255

3256
  This abstracts away some work from `AddInstance()` and
3257
  `RemoveInstance()`. Note that in case some of the devices couldn't
3258
  be removed, the removal will continue with the other ones (compare
3259
  with `_CreateDisks()`).
3260

3261
  @type lu: L{LogicalUnit}
3262
  @param lu: the logical unit on whose behalf we execute
3263
  @type instance: L{objects.Instance}
3264
  @param instance: the instance whose disks we should remove
3265
  @rtype: boolean
3266
  @return: the success of the removal
3267

3268
  """
3269
  logging.info("Removing block devices for instance %s", instance.name)
3270

    
3271
  result = True
3272
  for device in instance.disks:
3273
    for node, disk in device.ComputeNodeTree(instance.primary_node):
3274
      lu.cfg.SetDiskID(disk, node)
3275
      if not lu.rpc.call_blockdev_remove(node, disk):
3276
        lu.proc.LogWarning("Could not remove block device %s on node %s,"
3277
                           " continuing anyway", device.iv_name, node)
3278
        result = False
3279

    
3280
  if instance.disk_template == constants.DT_FILE:
3281
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3282
    if not lu.rpc.call_file_storage_dir_remove(instance.primary_node,
3283
                                               file_storage_dir):
3284
      logging.error("Could not remove directory '%s'", file_storage_dir)
3285
      result = False
3286

    
3287
  return result
3288

    
3289

    
3290
def _ComputeDiskSize(disk_template, disks):
3291
  """Compute disk size requirements in the volume group
3292

3293
  """
3294
  # Required free disk space as a function of disk and swap space
3295
  req_size_dict = {
3296
    constants.DT_DISKLESS: None,
3297
    constants.DT_PLAIN: sum(d["size"] for d in disks),
3298
    # 128 MB are added for drbd metadata for each disk
3299
    constants.DT_DRBD8: sum(d["size"] + 128 for d in disks),
3300
    constants.DT_FILE: None,
3301
  }
3302

    
3303
  if disk_template not in req_size_dict:
3304
    raise errors.ProgrammerError("Disk template '%s' size requirement"
3305
                                 " is unknown" %  disk_template)
3306

    
3307
  return req_size_dict[disk_template]
3308

    
3309

    
3310
def _CheckHVParams(lu, nodenames, hvname, hvparams):
3311
  """Hypervisor parameter validation.
3312

3313
  This function abstract the hypervisor parameter validation to be
3314
  used in both instance create and instance modify.
3315

3316
  @type lu: L{LogicalUnit}
3317
  @param lu: the logical unit for which we check
3318
  @type nodenames: list
3319
  @param nodenames: the list of nodes on which we should check
3320
  @type hvname: string
3321
  @param hvname: the name of the hypervisor we should use
3322
  @type hvparams: dict
3323
  @param hvparams: the parameters which we need to check
3324
  @raise errors.OpPrereqError: if the parameters are not valid
3325

3326
  """
3327
  hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
3328
                                                  hvname,
3329
                                                  hvparams)
3330
  for node in nodenames:
3331
    info = hvinfo.get(node, None)
3332
    if not info or not isinstance(info, (tuple, list)):
3333
      raise errors.OpPrereqError("Cannot get current information"
3334
                                 " from node '%s' (%s)" % (node, info))
3335
    if not info[0]:
3336
      raise errors.OpPrereqError("Hypervisor parameter validation failed:"
3337
                                 " %s" % info[1])
3338

    
3339

    
3340
class LUCreateInstance(LogicalUnit):
3341
  """Create an instance.
3342

3343
  """
3344
  HPATH = "instance-add"
3345
  HTYPE = constants.HTYPE_INSTANCE
3346
  _OP_REQP = ["instance_name", "disks", "disk_template",
3347
              "mode", "start",
3348
              "wait_for_sync", "ip_check", "nics",
3349
              "hvparams", "beparams"]
3350
  REQ_BGL = False
3351

    
3352
  def _ExpandNode(self, node):
3353
    """Expands and checks one node name.
3354

3355
    """
3356
    node_full = self.cfg.ExpandNodeName(node)
3357
    if node_full is None:
3358
      raise errors.OpPrereqError("Unknown node %s" % node)
3359
    return node_full
3360

    
3361
  def ExpandNames(self):
3362
    """ExpandNames for CreateInstance.
3363

3364
    Figure out the right locks for instance creation.
3365

3366
    """
3367
    self.needed_locks = {}
3368

    
3369
    # set optional parameters to none if they don't exist
3370
    for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
3371
      if not hasattr(self.op, attr):
3372
        setattr(self.op, attr, None)
3373

    
3374
    # cheap checks, mostly valid constants given
3375

    
3376
    # verify creation mode
3377
    if self.op.mode not in (constants.INSTANCE_CREATE,
3378
                            constants.INSTANCE_IMPORT):
3379
      raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
3380
                                 self.op.mode)
3381

    
3382
    # disk template and mirror node verification
3383
    if self.op.disk_template not in constants.DISK_TEMPLATES:
3384
      raise errors.OpPrereqError("Invalid disk template name")
3385

    
3386
    if self.op.hypervisor is None:
3387
      self.op.hypervisor = self.cfg.GetHypervisorType()
3388

    
3389
    cluster = self.cfg.GetClusterInfo()
3390
    enabled_hvs = cluster.enabled_hypervisors
3391
    if self.op.hypervisor not in enabled_hvs:
3392
      raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
3393
                                 " cluster (%s)" % (self.op.hypervisor,
3394
                                  ",".join(enabled_hvs)))
3395

    
3396
    # check hypervisor parameter syntax (locally)
3397

    
3398
    filled_hvp = cluster.FillDict(cluster.hvparams[self.op.hypervisor],
3399
                                  self.op.hvparams)
3400
    hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
3401
    hv_type.CheckParameterSyntax(filled_hvp)
3402

    
3403
    # fill and remember the beparams dict
3404
    self.be_full = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
3405
                                    self.op.beparams)
3406

    
3407
    #### instance parameters check
3408

    
3409
    # instance name verification
3410
    hostname1 = utils.HostInfo(self.op.instance_name)
3411
    self.op.instance_name = instance_name = hostname1.name
3412

    
3413
    # this is just a preventive check, but someone might still add this
3414
    # instance in the meantime, and creation will fail at lock-add time
3415
    if instance_name in self.cfg.GetInstanceList():
3416
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3417
                                 instance_name)
3418

    
3419
    self.add_locks[locking.LEVEL_INSTANCE] = instance_name
3420

    
3421
    # NIC buildup
3422
    self.nics = []
3423
    for nic in self.op.nics:
3424
      # ip validity checks
3425
      ip = nic.get("ip", None)
3426
      if ip is None or ip.lower() == "none":
3427
        nic_ip = None
3428
      elif ip.lower() == constants.VALUE_AUTO:
3429
        nic_ip = hostname1.ip
3430
      else:
3431
        if not utils.IsValidIP(ip):
3432
          raise errors.OpPrereqError("Given IP address '%s' doesn't look"
3433
                                     " like a valid IP" % ip)
3434
        nic_ip = ip
3435

    
3436
      # MAC address verification
3437
      mac = nic.get("mac", constants.VALUE_AUTO)
3438
      if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
3439
        if not utils.IsValidMac(mac.lower()):
3440
          raise errors.OpPrereqError("Invalid MAC address specified: %s" %
3441
                                     mac)
3442
      # bridge verification
3443
      bridge = nic.get("bridge", self.cfg.GetDefBridge())
3444
      self.nics.append(objects.NIC(mac=mac, ip=nic_ip, bridge=bridge))
3445

    
3446
    # disk checks/pre-build
3447
    self.disks = []
3448
    for disk in self.op.disks:
3449
      mode = disk.get("mode", constants.DISK_RDWR)
3450
      if mode not in constants.DISK_ACCESS_SET:
3451
        raise errors.OpPrereqError("Invalid disk access mode '%s'" %
3452
                                   mode)
3453
      size = disk.get("size", None)
3454
      if size is None:
3455
        raise errors.OpPrereqError("Missing disk size")
3456
      try:
3457
        size = int(size)
3458
      except ValueError:
3459
        raise errors.OpPrereqError("Invalid disk size '%s'" % size)
3460
      self.disks.append({"size": size, "mode": mode})
3461

    
3462
    # used in CheckPrereq for ip ping check
3463
    self.check_ip = hostname1.ip
3464

    
3465
    # file storage checks
3466
    if (self.op.file_driver and
3467
        not self.op.file_driver in constants.FILE_DRIVER):
3468
      raise errors.OpPrereqError("Invalid file driver name '%s'" %
3469
                                 self.op.file_driver)
3470

    
3471
    if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
3472
      raise errors.OpPrereqError("File storage directory path not absolute")
3473

    
3474
    ### Node/iallocator related checks
3475
    if [self.op.iallocator, self.op.pnode].count(None) != 1:
3476
      raise errors.OpPrereqError("One and only one of iallocator and primary"
3477
                                 " node must be given")
3478

    
3479
    if self.op.iallocator:
3480
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3481
    else:
3482
      self.op.pnode = self._ExpandNode(self.op.pnode)
3483
      nodelist = [self.op.pnode]
3484
      if self.op.snode is not None:
3485
        self.op.snode = self._ExpandNode(self.op.snode)
3486
        nodelist.append(self.op.snode)
3487
      self.needed_locks[locking.LEVEL_NODE] = nodelist
3488

    
3489
    # in case of import lock the source node too
3490
    if self.op.mode == constants.INSTANCE_IMPORT:
3491
      src_node = getattr(self.op, "src_node", None)
3492
      src_path = getattr(self.op, "src_path", None)
3493

    
3494
      if src_node is None or src_path is None:
3495
        raise errors.OpPrereqError("Importing an instance requires source"
3496
                                   " node and path options")
3497

    
3498
      if not os.path.isabs(src_path):
3499
        raise errors.OpPrereqError("The source path must be absolute")
3500

    
3501
      self.op.src_node = src_node = self._ExpandNode(src_node)
3502
      if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
3503
        self.needed_locks[locking.LEVEL_NODE].append(src_node)
3504

    
3505
    else: # INSTANCE_CREATE
3506
      if getattr(self.op, "os_type", None) is None:
3507
        raise errors.OpPrereqError("No guest OS specified")
3508

    
3509
  def _RunAllocator(self):
3510
    """Run the allocator based on input opcode.
3511

3512
    """
3513
    nics = [n.ToDict() for n in self.nics]
3514
    ial = IAllocator(self,
3515
                     mode=constants.IALLOCATOR_MODE_ALLOC,
3516
                     name=self.op.instance_name,
3517
                     disk_template=self.op.disk_template,
3518
                     tags=[],
3519
                     os=self.op.os_type,
3520
                     vcpus=self.be_full[constants.BE_VCPUS],
3521
                     mem_size=self.be_full[constants.BE_MEMORY],
3522
                     disks=self.disks,
3523
                     nics=nics,
3524
                     hypervisor=self.op.hypervisor,
3525
                     )
3526

    
3527
    ial.Run(self.op.iallocator)
3528

    
3529
    if not ial.success:
3530
      raise errors.OpPrereqError("Can't compute nodes using"
3531
                                 " iallocator '%s': %s" % (self.op.iallocator,
3532
                                                           ial.info))
3533
    if len(ial.nodes) != ial.required_nodes:
3534
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3535
                                 " of nodes (%s), required %s" %
3536
                                 (self.op.iallocator, len(ial.nodes),
3537
                                  ial.required_nodes))
3538
    self.op.pnode = ial.nodes[0]
3539
    self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
3540
                 self.op.instance_name, self.op.iallocator,
3541
                 ", ".join(ial.nodes))
3542
    if ial.required_nodes == 2:
3543
      self.op.snode = ial.nodes[1]
3544

    
3545
  def BuildHooksEnv(self):
3546
    """Build hooks env.
3547

3548
    This runs on master, primary and secondary nodes of the instance.
3549

3550
    """
3551
    env = {
3552
      "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
3553
      "INSTANCE_DISK_SIZE": ",".join(str(d["size"]) for d in self.disks),
3554
      "INSTANCE_ADD_MODE": self.op.mode,
3555
      }
3556
    if self.op.mode == constants.INSTANCE_IMPORT:
3557
      env["INSTANCE_SRC_NODE"] = self.op.src_node
3558
      env["INSTANCE_SRC_PATH"] = self.op.src_path
3559
      env["INSTANCE_SRC_IMAGES"] = self.src_images
3560

    
3561
    env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
3562
      primary_node=self.op.pnode,
3563
      secondary_nodes=self.secondaries,
3564
      status=self.instance_status,
3565
      os_type=self.op.os_type,
3566
      memory=self.be_full[constants.BE_MEMORY],
3567
      vcpus=self.be_full[constants.BE_VCPUS],
3568
      nics=[(n.ip, n.bridge, n.mac) for n in self.nics],
3569
    ))
3570

    
3571
    nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
3572
          self.secondaries)
3573
    return env, nl, nl
3574

    
3575

    
3576
  def CheckPrereq(self):
3577
    """Check prerequisites.
3578

3579
    """
3580
    if (not self.cfg.GetVGName() and
3581
        self.op.disk_template not in constants.DTS_NOT_LVM):
3582
      raise errors.OpPrereqError("Cluster does not support lvm-based"
3583
                                 " instances")
3584

    
3585

    
3586
    if self.op.mode == constants.INSTANCE_IMPORT:
3587
      src_node = self.op.src_node
3588
      src_path = self.op.src_path
3589

    
3590
      export_info = self.rpc.call_export_info(src_node, src_path)
3591

    
3592
      if not export_info:
3593
        raise errors.OpPrereqError("No export found in dir %s" % src_path)
3594

    
3595
      if not export_info.has_section(constants.INISECT_EXP):
3596
        raise errors.ProgrammerError("Corrupted export config")
3597

    
3598
      ei_version = export_info.get(constants.INISECT_EXP, 'version')
3599
      if (int(ei_version) != constants.EXPORT_VERSION):
3600
        raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3601
                                   (ei_version, constants.EXPORT_VERSION))
3602

    
3603
      # Check that the new instance doesn't have less disks than the export
3604
      instance_disks = len(self.disks)
3605
      export_disks = export_info.getint(constants.INISECT_INS, 'disk_count')
3606
      if instance_disks < export_disks:
3607
        raise errors.OpPrereqError("Not enough disks to import."
3608
                                   " (instance: %d, export: %d)" %
3609
                                   (instance_disks, export_disks))
3610

    
3611
      self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3612
      disk_images = []
3613
      for idx in range(export_disks):
3614
        option = 'disk%d_dump' % idx
3615
        if export_info.has_option(constants.INISECT_INS, option):
3616
          # FIXME: are the old os-es, disk sizes, etc. useful?
3617
          export_name = export_info.get(constants.INISECT_INS, option)
3618
          image = os.path.join(src_path, export_name)
3619
          disk_images.append(image)
3620
        else:
3621
          disk_images.append(False)
3622

    
3623
      self.src_images = disk_images
3624

    
3625
      old_name = export_info.get(constants.INISECT_INS, 'name')
3626
      # FIXME: int() here could throw a ValueError on broken exports
3627
      exp_nic_count = int(export_info.get(constants.INISECT_INS, 'nic_count'))
3628
      if self.op.instance_name == old_name:
3629
        for idx, nic in enumerate(self.nics):
3630
          if nic.mac == constants.VALUE_AUTO and exp_nic_count >= idx:
3631
            nic_mac_ini = 'nic%d_mac' % idx
3632
            nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini)
3633

    
3634
    # ip ping checks (we use the same ip that was resolved in ExpandNames)
3635
    if self.op.start and not self.op.ip_check:
3636
      raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3637
                                 " adding an instance in start mode")
3638

    
3639
    if self.op.ip_check:
3640
      if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
3641
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
3642
                                   (self.check_ip, self.op.instance_name))
3643

    
3644
    #### allocator run
3645

    
3646
    if self.op.iallocator is not None:
3647
      self._RunAllocator()
3648

    
3649
    #### node related checks
3650

    
3651
    # check primary node
3652
    self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
3653
    assert self.pnode is not None, \
3654
      "Cannot retrieve locked node %s" % self.op.pnode
3655
    self.secondaries = []
3656

    
3657
    # mirror node verification
3658
    if self.op.disk_template in constants.DTS_NET_MIRROR:
3659
      if self.op.snode is None:
3660
        raise errors.OpPrereqError("The networked disk templates need"
3661
                                   " a mirror node")
3662
      if self.op.snode == pnode.name:
3663
        raise errors.OpPrereqError("The secondary node cannot be"
3664
                                   " the primary node.")
3665
      self.secondaries.append(self.op.snode)
3666

    
3667
    nodenames = [pnode.name] + self.secondaries
3668

    
3669
    req_size = _ComputeDiskSize(self.op.disk_template,
3670
                                self.disks)
3671

    
3672
    # Check lv size requirements
3673
    if req_size is not None:
3674
      nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
3675
                                         self.op.hypervisor)
3676
      for node in nodenames:
3677
        info = nodeinfo.get(node, None)
3678
        if not info:
3679
          raise errors.OpPrereqError("Cannot get current information"
3680
                                     " from node '%s'" % node)
3681
        vg_free = info.get('vg_free', None)
3682
        if not isinstance(vg_free, int):
3683
          raise errors.OpPrereqError("Can't compute free disk space on"
3684
                                     " node %s" % node)
3685
        if req_size > info['vg_free']:
3686
          raise errors.OpPrereqError("Not enough disk space on target node %s."
3687
                                     " %d MB available, %d MB required" %
3688
                                     (node, info['vg_free'], req_size))
3689

    
3690
    _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
3691

    
3692
    # os verification
3693
    os_obj = self.rpc.call_os_get(pnode.name, self.op.os_type)
3694
    if not os_obj:
3695
      raise errors.OpPrereqError("OS '%s' not in supported os list for"
3696
                                 " primary node"  % self.op.os_type)
3697

    
3698
    # bridge check on primary node
3699
    bridges = [n.bridge for n in self.nics]
3700
    if not self.rpc.call_bridges_exist(self.pnode.name, bridges):
3701
      raise errors.OpPrereqError("one of the target bridges '%s' does not"
3702
                                 " exist on"
3703
                                 " destination node '%s'" %
3704
                                 (",".join(bridges), pnode.name))
3705

    
3706
    # memory check on primary node
3707
    if self.op.start:
3708
      _CheckNodeFreeMemory(self, self.pnode.name,
3709
                           "creating instance %s" % self.op.instance_name,
3710
                           self.be_full[constants.BE_MEMORY],
3711
                           self.op.hypervisor)
3712

    
3713
    if self.op.start:
3714
      self.instance_status = 'up'
3715
    else:
3716
      self.instance_status = 'down'
3717

    
3718
  def Exec(self, feedback_fn):
3719
    """Create and add the instance to the cluster.
3720

3721
    """
3722
    instance = self.op.instance_name
3723
    pnode_name = self.pnode.name
3724

    
3725
    for nic in self.nics:
3726
      if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
3727
        nic.mac = self.cfg.GenerateMAC()
3728

    
3729
    ht_kind = self.op.hypervisor
3730
    if ht_kind in constants.HTS_REQ_PORT:
3731
      network_port = self.cfg.AllocatePort()
3732
    else:
3733
      network_port = None
3734

    
3735
    ##if self.op.vnc_bind_address is None:
3736
    ##  self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
3737

    
3738
    # this is needed because os.path.join does not accept None arguments
3739
    if self.op.file_storage_dir is None:
3740
      string_file_storage_dir = ""
3741
    else:
3742
      string_file_storage_dir = self.op.file_storage_dir
3743

    
3744
    # build the full file storage dir path
3745
    file_storage_dir = os.path.normpath(os.path.join(
3746
                                        self.cfg.GetFileStorageDir(),
3747
                                        string_file_storage_dir, instance))
3748

    
3749

    
3750
    disks = _GenerateDiskTemplate(self,
3751
                                  self.op.disk_template,
3752
                                  instance, pnode_name,
3753
                                  self.secondaries,
3754
                                  self.disks,
3755
                                  file_storage_dir,
3756
                                  self.op.file_driver,
3757
                                  0)
3758

    
3759
    iobj = objects.Instance(name=instance, os=self.op.os_type,
3760
                            primary_node=pnode_name,
3761
                            nics=self.nics, disks=disks,
3762
                            disk_template=self.op.disk_template,
3763
                            status=self.instance_status,
3764
                            network_port=network_port,
3765
                            beparams=self.op.beparams,
3766
                            hvparams=self.op.hvparams,
3767
                            hypervisor=self.op.hypervisor,
3768
                            )
3769

    
3770
    feedback_fn("* creating instance disks...")
3771
    if not _CreateDisks(self, iobj):
3772
      _RemoveDisks(self, iobj)
3773
      self.cfg.ReleaseDRBDMinors(instance)
3774
      raise errors.OpExecError("Device creation failed, reverting...")
3775

    
3776
    feedback_fn("adding instance %s to cluster config" % instance)
3777

    
3778
    self.cfg.AddInstance(iobj)
3779
    # Declare that we don't want to remove the instance lock anymore, as we've
3780
    # added the instance to the config
3781
    del self.remove_locks[locking.LEVEL_INSTANCE]
3782
    # Remove the temp. assignements for the instance's drbds
3783
    self.cfg.ReleaseDRBDMinors(instance)
3784
    # Unlock all the nodes
3785
    self.context.glm.release(locking.LEVEL_NODE)
3786
    del self.acquired_locks[locking.LEVEL_NODE]
3787

    
3788
    if self.op.wait_for_sync:
3789
      disk_abort = not _WaitForSync(self, iobj)
3790
    elif iobj.disk_template in constants.DTS_NET_MIRROR:
3791
      # make sure the disks are not degraded (still sync-ing is ok)
3792
      time.sleep(15)
3793
      feedback_fn("* checking mirrors status")
3794
      disk_abort = not _WaitForSync(self, iobj, oneshot=True)
3795
    else:
3796
      disk_abort = False
3797

    
3798
    if disk_abort:
3799
      _RemoveDisks(self, iobj)
3800
      self.cfg.RemoveInstance(iobj.name)
3801
      # Make sure the instance lock gets removed
3802
      self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
3803
      raise errors.OpExecError("There are some degraded disks for"
3804
                               " this instance")
3805

    
3806
    feedback_fn("creating os for instance %s on node %s" %
3807
                (instance, pnode_name))
3808

    
3809
    if iobj.disk_template != constants.DT_DISKLESS:
3810
      if self.op.mode == constants.INSTANCE_CREATE:
3811
        feedback_fn("* running the instance OS create scripts...")
3812
        if not self.rpc.call_instance_os_add(pnode_name, iobj):
3813
          raise errors.OpExecError("could not add os for instance %s"
3814
                                   " on node %s" %
3815
                                   (instance, pnode_name))
3816

    
3817
      elif self.op.mode == constants.INSTANCE_IMPORT:
3818
        feedback_fn("* running the instance OS import scripts...")
3819
        src_node = self.op.src_node
3820
        src_images = self.src_images
3821
        cluster_name = self.cfg.GetClusterName()
3822
        import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
3823
                                                         src_node, src_images,
3824
                                                         cluster_name)
3825
        for idx, result in enumerate(import_result):
3826
          if not result:
3827
            self.LogWarning("Could not import the image %s for instance"
3828
                            " %s, disk %d, on node %s" %
3829
                            (src_images[idx], instance, idx, pnode_name))
3830
      else:
3831
        # also checked in the prereq part
3832
        raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3833
                                     % self.op.mode)
3834

    
3835
    if self.op.start:
3836
      logging.info("Starting instance %s on node %s", instance, pnode_name)
3837
      feedback_fn("* starting instance...")
3838
      if not self.rpc.call_instance_start(pnode_name, iobj, None):
3839
        raise errors.OpExecError("Could not start instance")
3840

    
3841

    
3842
class LUConnectConsole(NoHooksLU):
3843
  """Connect to an instance's console.
3844

3845
  This is somewhat special in that it returns the command line that
3846
  you need to run on the master node in order to connect to the
3847
  console.
3848

3849
  """
3850
  _OP_REQP = ["instance_name"]
3851
  REQ_BGL = False
3852

    
3853
  def ExpandNames(self):
3854
    self._ExpandAndLockInstance()
3855

    
3856
  def CheckPrereq(self):
3857
    """Check prerequisites.
3858

3859
    This checks that the instance is in the cluster.
3860

3861
    """
3862
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3863
    assert self.instance is not None, \
3864
      "Cannot retrieve locked instance %s" % self.op.instance_name
3865

    
3866
  def Exec(self, feedback_fn):
3867
    """Connect to the console of an instance
3868

3869
    """
3870
    instance = self.instance
3871
    node = instance.primary_node
3872

    
3873
    node_insts = self.rpc.call_instance_list([node],
3874
                                             [instance.hypervisor])[node]
3875
    if node_insts is False:
3876
      raise errors.OpExecError("Can't connect to node %s." % node)
3877

    
3878
    if instance.name not in node_insts:
3879
      raise errors.OpExecError("Instance %s is not running." % instance.name)
3880

    
3881
    logging.debug("Connecting to console of %s on %s", instance.name, node)
3882

    
3883
    hyper = hypervisor.GetHypervisor(instance.hypervisor)
3884
    console_cmd = hyper.GetShellCommandForConsole(instance)
3885

    
3886
    # build ssh cmdline
3887
    return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
3888

    
3889

    
3890
class LUReplaceDisks(LogicalUnit):
3891
  """Replace the disks of an instance.
3892

3893
  """
3894
  HPATH = "mirrors-replace"
3895
  HTYPE = constants.HTYPE_INSTANCE
3896
  _OP_REQP = ["instance_name", "mode", "disks"]
3897
  REQ_BGL = False
3898

    
3899
  def ExpandNames(self):
3900
    self._ExpandAndLockInstance()
3901

    
3902
    if not hasattr(self.op, "remote_node"):
3903
      self.op.remote_node = None
3904

    
3905
    ia_name = getattr(self.op, "iallocator", None)
3906
    if ia_name is not None:
3907
      if self.op.remote_node is not None:
3908
        raise errors.OpPrereqError("Give either the iallocator or the new"
3909
                                   " secondary, not both")
3910
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3911
    elif self.op.remote_node is not None:
3912
      remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
3913
      if remote_node is None:
3914
        raise errors.OpPrereqError("Node '%s' not known" %
3915
                                   self.op.remote_node)
3916
      self.op.remote_node = remote_node
3917
      self.needed_locks[locking.LEVEL_NODE] = [remote_node]
3918
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
3919
    else:
3920
      self.needed_locks[locking.LEVEL_NODE] = []
3921
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3922

    
3923
  def DeclareLocks(self, level):
3924
    # If we're not already locking all nodes in the set we have to declare the
3925
    # instance's primary/secondary nodes.
3926
    if (level == locking.LEVEL_NODE and
3927
        self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
3928
      self._LockInstancesNodes()
3929

    
3930
  def _RunAllocator(self):
3931
    """Compute a new secondary node using an IAllocator.
3932

3933
    """
3934
    ial = IAllocator(self,
3935
                     mode=constants.IALLOCATOR_MODE_RELOC,
3936
                     name=self.op.instance_name,
3937
                     relocate_from=[self.sec_node])
3938

    
3939
    ial.Run(self.op.iallocator)
3940

    
3941
    if not ial.success:
3942
      raise errors.OpPrereqError("Can't compute nodes using"
3943
                                 " iallocator '%s': %s" % (self.op.iallocator,
3944
                                                           ial.info))
3945
    if len(ial.nodes) != ial.required_nodes:
3946
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3947
                                 " of nodes (%s), required %s" %
3948
                                 (len(ial.nodes), ial.required_nodes))
3949
    self.op.remote_node = ial.nodes[0]
3950
    self.LogInfo("Selected new secondary for the instance: %s",
3951
                 self.op.remote_node)
3952

    
3953
  def BuildHooksEnv(self):
3954
    """Build hooks env.
3955

3956
    This runs on the master, the primary and all the secondaries.
3957

3958
    """
3959
    env = {
3960
      "MODE": self.op.mode,
3961
      "NEW_SECONDARY": self.op.remote_node,
3962
      "OLD_SECONDARY": self.instance.secondary_nodes[0],
3963
      }
3964
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3965
    nl = [
3966
      self.cfg.GetMasterNode(),
3967
      self.instance.primary_node,
3968
      ]
3969
    if self.op.remote_node is not None:
3970
      nl.append(self.op.remote_node)
3971
    return env, nl, nl
3972

    
3973
  def CheckPrereq(self):
3974
    """Check prerequisites.
3975

3976
    This checks that the instance is in the cluster.
3977

3978
    """
3979
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3980
    assert instance is not None, \
3981
      "Cannot retrieve locked instance %s" % self.op.instance_name
3982
    self.instance = instance
3983

    
3984
    if instance.disk_template not in constants.DTS_NET_MIRROR:
3985
      raise errors.OpPrereqError("Instance's disk layout is not"
3986
                                 " network mirrored.")
3987

    
3988
    if len(instance.secondary_nodes) != 1:
3989
      raise errors.OpPrereqError("The instance has a strange layout,"
3990
                                 " expected one secondary but found %d" %
3991
                                 len(instance.secondary_nodes))
3992

    
3993
    self.sec_node = instance.secondary_nodes[0]
3994

    
3995
    ia_name = getattr(self.op, "iallocator", None)
3996
    if ia_name is not None:
3997
      self._RunAllocator()
3998

    
3999
    remote_node = self.op.remote_node
4000
    if remote_node is not None:
4001
      self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
4002
      assert self.remote_node_info is not None, \
4003
        "Cannot retrieve locked node %s" % remote_node
4004
    else:
4005
      self.remote_node_info = None
4006
    if remote_node == instance.primary_node:
4007
      raise errors.OpPrereqError("The specified node is the primary node of"
4008
                                 " the instance.")
4009
    elif remote_node == self.sec_node:
4010
      if self.op.mode == constants.REPLACE_DISK_SEC:
4011
        # this is for DRBD8, where we can't execute the same mode of
4012
        # replacement as for drbd7 (no different port allocated)
4013
        raise errors.OpPrereqError("Same secondary given, cannot execute"
4014
                                   " replacement")
4015
    if instance.disk_template == constants.DT_DRBD8:
4016
      if (self.op.mode == constants.REPLACE_DISK_ALL and
4017
          remote_node is not None):
4018
        # switch to replace secondary mode
4019
        self.op.mode = constants.REPLACE_DISK_SEC
4020

    
4021
      if self.op.mode == constants.REPLACE_DISK_ALL:
4022
        raise errors.OpPrereqError("Template 'drbd' only allows primary or"
4023
                                   " secondary disk replacement, not"
4024
                                   " both at once")
4025
      elif self.op.mode == constants.REPLACE_DISK_PRI:
4026
        if remote_node is not None:
4027
          raise errors.OpPrereqError("Template 'drbd' does not allow changing"
4028
                                     " the secondary while doing a primary"
4029
                                     " node disk replacement")
4030
        self.tgt_node = instance.primary_node
4031
        self.oth_node = instance.secondary_nodes[0]
4032
      elif self.op.mode == constants.REPLACE_DISK_SEC:
4033
        self.new_node = remote_node # this can be None, in which case
4034
                                    # we don't change the secondary
4035
        self.tgt_node = instance.secondary_nodes[0]
4036
        self.oth_node = instance.primary_node
4037
      else:
4038
        raise errors.ProgrammerError("Unhandled disk replace mode")
4039

    
4040
    if not self.op.disks:
4041
      self.op.disks = range(len(instance.disks))
4042

    
4043
    for disk_idx in self.op.disks:
4044
      instance.FindDisk(disk_idx)
4045

    
4046
  def _ExecD8DiskOnly(self, feedback_fn):
4047
    """Replace a disk on the primary or secondary for dbrd8.
4048

4049
    The algorithm for replace is quite complicated:
4050

4051
      1. for each disk to be replaced:
4052

4053
        1. create new LVs on the target node with unique names
4054
        1. detach old LVs from the drbd device
4055
        1. rename old LVs to name_replaced.<time_t>
4056
        1. rename new LVs to old LVs
4057
        1. attach the new LVs (with the old names now) to the drbd device
4058

4059
      1. wait for sync across all devices
4060

4061
      1. for each modified disk:
4062

4063
        1. remove old LVs (which have the name name_replaces.<time_t>)
4064

4065
    Failures are not very well handled.
4066

4067
    """
4068
    steps_total = 6
4069
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
4070
    instance = self.instance
4071
    iv_names = {}
4072
    vgname = self.cfg.GetVGName()
4073
    # start of work
4074
    cfg = self.cfg
4075
    tgt_node = self.tgt_node
4076
    oth_node = self.oth_node
4077

    
4078
    # Step: check device activation
4079
    self.proc.LogStep(1, steps_total, "check device existence")
4080
    info("checking volume groups")
4081
    my_vg = cfg.GetVGName()
4082
    results = self.rpc.call_vg_list([oth_node, tgt_node])
4083
    if not results:
4084
      raise errors.OpExecError("Can't list volume groups on the nodes")
4085
    for node in oth_node, tgt_node:
4086
      res = results.get(node, False)
4087
      if not res or my_vg not in res:
4088
        raise errors.OpExecError("Volume group '%s' not found on %s" %
4089
                                 (my_vg, node))
4090
    for idx, dev in enumerate(instance.disks):
4091
      if idx not in self.op.disks:
4092
        continue
4093
      for node in tgt_node, oth_node:
4094
        info("checking disk/%d on %s" % (idx, node))
4095
        cfg.SetDiskID(dev, node)
4096
        if not self.rpc.call_blockdev_find(node, dev):
4097
          raise errors.OpExecError("Can't find disk/%d on node %s" %
4098
                                   (idx, node))
4099

    
4100
    # Step: check other node consistency
4101
    self.proc.LogStep(2, steps_total, "check peer consistency")
4102
    for idx, dev in enumerate(instance.disks):
4103
      if idx not in self.op.disks:
4104
        continue
4105
      info("checking disk/%d consistency on %s" % (idx, oth_node))
4106
      if not _CheckDiskConsistency(self, dev, oth_node,
4107
                                   oth_node==instance.primary_node):
4108
        raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
4109
                                 " to replace disks on this node (%s)" %
4110
                                 (oth_node, tgt_node))
4111

    
4112
    # Step: create new storage
4113
    self.proc.LogStep(3, steps_total, "allocate new storage")
4114
    for idx, dev in enumerate(instance.disks):
4115
      if idx not in self.op.disks:
4116
        continue
4117
      size = dev.size
4118
      cfg.SetDiskID(dev, tgt_node)
4119
      lv_names = [".disk%d_%s" % (idx, suf)
4120
                  for suf in ["data", "meta"]]
4121
      names = _GenerateUniqueNames(self, lv_names)
4122
      lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
4123
                             logical_id=(vgname, names[0]))
4124
      lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
4125
                             logical_id=(vgname, names[1]))
4126
      new_lvs = [lv_data, lv_meta]
4127
      old_lvs = dev.children
4128
      iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
4129
      info("creating new local storage on %s for %s" %
4130
           (tgt_node, dev.iv_name))
4131
      # since we *always* want to create this LV, we use the
4132
      # _Create...OnPrimary (which forces the creation), even if we
4133
      # are talking about the secondary node
4134
      for new_lv in new_lvs:
4135
        if not _CreateBlockDevOnPrimary(self, tgt_node, instance, new_lv,
4136
                                        _GetInstanceInfoText(instance)):
4137
          raise errors.OpExecError("Failed to create new LV named '%s' on"
4138
                                   " node '%s'" %
4139
                                   (new_lv.logical_id[1], tgt_node))
4140

    
4141
    # Step: for each lv, detach+rename*2+attach
4142
    self.proc.LogStep(4, steps_total, "change drbd configuration")
4143
    for dev, old_lvs, new_lvs in iv_names.itervalues():
4144
      info("detaching %s drbd from local storage" % dev.iv_name)
4145
      if not self.rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
4146
        raise errors.OpExecError("Can't detach drbd from local storage on node"
4147
                                 " %s for device %s" % (tgt_node, dev.iv_name))
4148
      #dev.children = []
4149
      #cfg.Update(instance)
4150

    
4151
      # ok, we created the new LVs, so now we know we have the needed
4152
      # storage; as such, we proceed on the target node to rename
4153
      # old_lv to _old, and new_lv to old_lv; note that we rename LVs
4154
      # using the assumption that logical_id == physical_id (which in
4155
      # turn is the unique_id on that node)
4156

    
4157
      # FIXME(iustin): use a better name for the replaced LVs
4158
      temp_suffix = int(time.time())
4159
      ren_fn = lambda d, suff: (d.physical_id[0],
4160
                                d.physical_id[1] + "_replaced-%s" % suff)
4161
      # build the rename list based on what LVs exist on the node
4162
      rlist = []
4163
      for to_ren in old_lvs:
4164
        find_res = self.rpc.call_blockdev_find(tgt_node, to_ren)
4165
        if find_res is not None: # device exists
4166
          rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
4167

    
4168
      info("renaming the old LVs on the target node")
4169
      if not self.rpc.call_blockdev_rename(tgt_node, rlist):
4170
        raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
4171
      # now we rename the new LVs to the old LVs
4172
      info("renaming the new LVs on the target node")
4173
      rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
4174
      if not self.rpc.call_blockdev_rename(tgt_node, rlist):
4175
        raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
4176

    
4177
      for old, new in zip(old_lvs, new_lvs):
4178
        new.logical_id = old.logical_id
4179
        cfg.SetDiskID(new, tgt_node)
4180

    
4181
      for disk in old_lvs:
4182
        disk.logical_id = ren_fn(disk, temp_suffix)
4183
        cfg.SetDiskID(disk, tgt_node)
4184

    
4185
      # now that the new lvs have the old name, we can add them to the device
4186
      info("adding new mirror component on %s" % tgt_node)
4187
      if not self.rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
4188
        for new_lv in new_lvs:
4189
          if not self.rpc.call_blockdev_remove(tgt_node, new_lv):
4190
            warning("Can't rollback device %s", hint="manually cleanup unused"
4191
                    " logical volumes")
4192
        raise errors.OpExecError("Can't add local storage to drbd")
4193

    
4194
      dev.children = new_lvs
4195
      cfg.Update(instance)
4196

    
4197
    # Step: wait for sync
4198

    
4199
    # this can fail as the old devices are degraded and _WaitForSync
4200
    # does a combined result over all disks, so we don't check its
4201
    # return value
4202
    self.proc.LogStep(5, steps_total, "sync devices")
4203
    _WaitForSync(self, instance, unlock=True)
4204

    
4205
    # so check manually all the devices
4206
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
4207
      cfg.SetDiskID(dev, instance.primary_node)
4208
      is_degr = self.rpc.call_blockdev_find(instance.primary_node, dev)[5]
4209
      if is_degr:
4210
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
4211

    
4212
    # Step: remove old storage
4213
    self.proc.LogStep(6, steps_total, "removing old storage")
4214
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
4215
      info("remove logical volumes for %s" % name)
4216
      for lv in old_lvs:
4217
        cfg.SetDiskID(lv, tgt_node)
4218
        if not self.rpc.call_blockdev_remove(tgt_node, lv):
4219
          warning("Can't remove old LV", hint="manually remove unused LVs")
4220
          continue
4221

    
4222
  def _ExecD8Secondary(self, feedback_fn):
4223
    """Replace the secondary node for drbd8.
4224

4225
    The algorithm for replace is quite complicated:
4226
      - for all disks of the instance:
4227
        - create new LVs on the new node with same names
4228
        - shutdown the drbd device on the old secondary
4229
        - disconnect the drbd network on the primary
4230
        - create the drbd device on the new secondary
4231
        - network attach the drbd on the primary, using an artifice:
4232
          the drbd code for Attach() will connect to the network if it
4233
          finds a device which is connected to the good local disks but
4234
          not network enabled
4235
      - wait for sync across all devices
4236
      - remove all disks from the old secondary
4237

4238
    Failures are not very well handled.
4239

4240
    """
4241
    steps_total = 6
4242
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
4243
    instance = self.instance
4244
    iv_names = {}
4245
    vgname = self.cfg.GetVGName()
4246
    # start of work
4247
    cfg = self.cfg
4248
    old_node = self.tgt_node
4249
    new_node = self.new_node
4250
    pri_node = instance.primary_node
4251

    
4252
    # Step: check device activation
4253
    self.proc.LogStep(1, steps_total, "check device existence")
4254
    info("checking volume groups")
4255
    my_vg = cfg.GetVGName()
4256
    results = self.rpc.call_vg_list([pri_node, new_node])
4257
    if not results:
4258
      raise errors.OpExecError("Can't list volume groups on the nodes")
4259
    for node in pri_node, new_node:
4260
      res = results.get(node, False)
4261
      if not res or my_vg not in res:
4262
        raise errors.OpExecError("Volume group '%s' not found on %s" %
4263
                                 (my_vg, node))
4264
    for idx, dev in enumerate(instance.disks):
4265
      if idx not in self.op.disks:
4266
        continue
4267
      info("checking disk/%d on %s" % (idx, pri_node))
4268
      cfg.SetDiskID(dev, pri_node)
4269
      if not self.rpc.call_blockdev_find(pri_node, dev):
4270
        raise errors.OpExecError("Can't find disk/%d on node %s" %
4271
                                 (idx, pri_node))
4272

    
4273
    # Step: check other node consistency
4274
    self.proc.LogStep(2, steps_total, "check peer consistency")
4275
    for idx, dev in enumerate(instance.disks):
4276
      if idx not in self.op.disks:
4277
        continue
4278
      info("checking disk/%d consistency on %s" % (idx, pri_node))
4279
      if not _CheckDiskConsistency(self, dev, pri_node, True, ldisk=True):
4280
        raise errors.OpExecError("Primary node (%s) has degraded storage,"
4281
                                 " unsafe to replace the secondary" %
4282
                                 pri_node)
4283

    
4284
    # Step: create new storage
4285
    self.proc.LogStep(3, steps_total, "allocate new storage")
4286
    for idx, dev in enumerate(instance.disks):
4287
      size = dev.size
4288
      info("adding new local storage on %s for disk/%d" %
4289
           (new_node, idx))
4290
      # since we *always* want to create this LV, we use the
4291
      # _Create...OnPrimary (which forces the creation), even if we
4292
      # are talking about the secondary node
4293
      for new_lv in dev.children:
4294
        if not _CreateBlockDevOnPrimary(self, new_node, instance, new_lv,
4295
                                        _GetInstanceInfoText(instance)):
4296
          raise errors.OpExecError("Failed to create new LV named '%s' on"
4297
                                   " node '%s'" %
4298
                                   (new_lv.logical_id[1], new_node))
4299

    
4300
    # Step 4: dbrd minors and drbd setups changes
4301
    # after this, we must manually remove the drbd minors on both the
4302
    # error and the success paths
4303
    minors = cfg.AllocateDRBDMinor([new_node for dev in instance.disks],
4304
                                   instance.name)
4305
    logging.debug("Allocated minors %s" % (minors,))
4306
    self.proc.LogStep(4, steps_total, "changing drbd configuration")
4307
    for idx, (dev, new_minor) in enumerate(zip(instance.disks, minors)):
4308
      size = dev.size
4309
      info("activating a new drbd on %s for disk/%d" % (new_node, idx))
4310
      # create new devices on new_node
4311
      if pri_node == dev.logical_id[0]:
4312
        new_logical_id = (pri_node, new_node,
4313
                          dev.logical_id[2], dev.logical_id[3], new_minor,
4314
                          dev.logical_id[5])
4315
      else:
4316
        new_logical_id = (new_node, pri_node,
4317
                          dev.logical_id[2], new_minor, dev.logical_id[4],
4318
                          dev.logical_id[5])
4319
      iv_names[idx] = (dev, dev.children, new_logical_id)
4320
      logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
4321
                    new_logical_id)
4322
      new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
4323
                              logical_id=new_logical_id,
4324
                              children=dev.children)
4325
      if not _CreateBlockDevOnSecondary(self, new_node, instance,
4326
                                        new_drbd, False,
4327
                                        _GetInstanceInfoText(instance)):
4328
        self.cfg.ReleaseDRBDMinors(instance.name)
4329
        raise errors.OpExecError("Failed to create new DRBD on"
4330
                                 " node '%s'" % new_node)
4331

    
4332
    for idx, dev in enumerate(instance.disks):
4333
      # we have new devices, shutdown the drbd on the old secondary
4334
      info("shutting down drbd for disk/%d on old node" % idx)
4335
      cfg.SetDiskID(dev, old_node)
4336
      if not self.rpc.call_blockdev_shutdown(old_node, dev):
4337
        warning("Failed to shutdown drbd for disk/%d on old node" % idx,
4338
                hint="Please cleanup this device manually as soon as possible")
4339

    
4340
    info("detaching primary drbds from the network (=> standalone)")
4341
    done = 0
4342
    for idx, dev in enumerate(instance.disks):
4343
      cfg.SetDiskID(dev, pri_node)
4344
      # set the network part of the physical (unique in bdev terms) id
4345
      # to None, meaning detach from network
4346
      dev.physical_id = (None, None, None, None) + dev.physical_id[4:]
4347
      # and 'find' the device, which will 'fix' it to match the
4348
      # standalone state
4349
      if self.rpc.call_blockdev_find(pri_node, dev):
4350
        done += 1
4351
      else:
4352
        warning("Failed to detach drbd disk/%d from network, unusual case" %
4353
                idx)
4354

    
4355
    if not done:
4356
      # no detaches succeeded (very unlikely)
4357
      self.cfg.ReleaseDRBDMinors(instance.name)
4358
      raise errors.OpExecError("Can't detach at least one DRBD from old node")
4359

    
4360
    # if we managed to detach at least one, we update all the disks of
4361
    # the instance to point to the new secondary
4362
    info("updating instance configuration")
4363
    for dev, _, new_logical_id in iv_names.itervalues():
4364
      dev.logical_id = new_logical_id
4365
      cfg.SetDiskID(dev, pri_node)
4366
    cfg.Update(instance)
4367
    # we can remove now the temp minors as now the new values are
4368
    # written to the config file (and therefore stable)
4369
    self.cfg.ReleaseDRBDMinors(instance.name)
4370

    
4371
    # and now perform the drbd attach
4372
    info("attaching primary drbds to new secondary (standalone => connected)")
4373
    failures = []
4374
    for idx, dev in enumerate(instance.disks):
4375
      info("attaching primary drbd for disk/%d to new secondary node" % idx)
4376
      # since the attach is smart, it's enough to 'find' the device,
4377
      # it will automatically activate the network, if the physical_id
4378
      # is correct
4379
      cfg.SetDiskID(dev, pri_node)
4380
      logging.debug("Disk to attach: %s", dev)
4381
      if not self.rpc.call_blockdev_find(pri_node, dev):
4382
        warning("can't attach drbd disk/%d to new secondary!" % idx,
4383
                "please do a gnt-instance info to see the status of disks")
4384

    
4385
    # this can fail as the old devices are degraded and _WaitForSync
4386
    # does a combined result over all disks, so we don't check its
4387
    # return value
4388
    self.proc.LogStep(5, steps_total, "sync devices")
4389
    _WaitForSync(self, instance, unlock=True)
4390

    
4391
    # so check manually all the devices
4392
    for idx, (dev, old_lvs, _) in iv_names.iteritems():
4393
      cfg.SetDiskID(dev, pri_node)
4394
      is_degr = self.rpc.call_blockdev_find(pri_node, dev)[5]
4395
      if is_degr:
4396
        raise errors.OpExecError("DRBD device disk/%d is degraded!" % idx)
4397

    
4398
    self.proc.LogStep(6, steps_total, "removing old storage")
4399
    for idx, (dev, old_lvs, _) in iv_names.iteritems():
4400
      info("remove logical volumes for disk/%d" % idx)
4401
      for lv in old_lvs:
4402
        cfg.SetDiskID(lv, old_node)
4403
        if not self.rpc.call_blockdev_remove(old_node, lv):
4404
          warning("Can't remove LV on old secondary",
4405
                  hint="Cleanup stale volumes by hand")
4406

    
4407
  def Exec(self, feedback_fn):
4408
    """Execute disk replacement.
4409

4410
    This dispatches the disk replacement to the appropriate handler.
4411

4412
    """
4413
    instance = self.instance
4414

    
4415
    # Activate the instance disks if we're replacing them on a down instance
4416
    if instance.status == "down":
4417
      _StartInstanceDisks(self, instance, True)
4418

    
4419
    if instance.disk_template == constants.DT_DRBD8:
4420
      if self.op.remote_node is None:
4421
        fn = self._ExecD8DiskOnly
4422
      else:
4423
        fn = self._ExecD8Secondary
4424
    else:
4425
      raise errors.ProgrammerError("Unhandled disk replacement case")
4426

    
4427
    ret = fn(feedback_fn)
4428

    
4429
    # Deactivate the instance disks if we're replacing them on a down instance
4430
    if instance.status == "down":
4431
      _SafeShutdownInstanceDisks(self, instance)
4432

    
4433
    return ret
4434

    
4435

    
4436
class LUGrowDisk(LogicalUnit):
4437
  """Grow a disk of an instance.
4438

4439
  """
4440
  HPATH = "disk-grow"
4441
  HTYPE = constants.HTYPE_INSTANCE
4442
  _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
4443
  REQ_BGL = False
4444

    
4445
  def ExpandNames(self):
4446
    self._ExpandAndLockInstance()
4447
    self.needed_locks[locking.LEVEL_NODE] = []
4448
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4449

    
4450
  def DeclareLocks(self, level):
4451
    if level == locking.LEVEL_NODE:
4452
      self._LockInstancesNodes()
4453

    
4454
  def BuildHooksEnv(self):
4455
    """Build hooks env.
4456

4457
    This runs on the master, the primary and all the secondaries.
4458

4459
    """
4460
    env = {
4461
      "DISK": self.op.disk,
4462
      "AMOUNT": self.op.amount,
4463
      }
4464
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
4465
    nl = [
4466
      self.cfg.GetMasterNode(),
4467
      self.instance.primary_node,
4468
      ]
4469
    return env, nl, nl
4470

    
4471
  def CheckPrereq(self):
4472
    """Check prerequisites.
4473

4474
    This checks that the instance is in the cluster.
4475

4476
    """
4477
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4478
    assert instance is not None, \
4479
      "Cannot retrieve locked instance %s" % self.op.instance_name
4480

    
4481
    self.instance = instance
4482

    
4483
    if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
4484
      raise errors.OpPrereqError("Instance's disk layout does not support"
4485
                                 " growing.")
4486

    
4487
    self.disk = instance.FindDisk(self.op.disk)
4488

    
4489
    nodenames = [instance.primary_node] + list(instance.secondary_nodes)
4490
    nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
4491
                                       instance.hypervisor)
4492
    for node in nodenames:
4493
      info = nodeinfo.get(node, None)
4494
      if not info:
4495
        raise errors.OpPrereqError("Cannot get current information"
4496
                                   " from node '%s'" % node)
4497
      vg_free = info.get('vg_free', None)
4498
      if not isinstance(vg_free, int):
4499
        raise errors.OpPrereqError("Can't compute free disk space on"
4500
                                   " node %s" % node)
4501
      if self.op.amount > info['vg_free']:
4502
        raise errors.OpPrereqError("Not enough disk space on target node %s:"
4503
                                   " %d MiB available, %d MiB required" %
4504
                                   (node, info['vg_free'], self.op.amount))
4505

    
4506
  def Exec(self, feedback_fn):
4507
    """Execute disk grow.
4508

4509
    """
4510
    instance = self.instance
4511
    disk = self.disk
4512
    for node in (instance.secondary_nodes + (instance.primary_node,)):
4513
      self.cfg.SetDiskID(disk, node)
4514
      result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
4515
      if (not result or not isinstance(result, (list, tuple)) or
4516
          len(result) != 2):
4517
        raise errors.OpExecError("grow request failed to node %s" % node)
4518
      elif not result[0]:
4519
        raise errors.OpExecError("grow request failed to node %s: %s" %
4520
                                 (node, result[1]))
4521
    disk.RecordGrow(self.op.amount)
4522
    self.cfg.Update(instance)
4523
    if self.op.wait_for_sync:
4524
      disk_abort = not _WaitForSync(self, instance)
4525
      if disk_abort:
4526
        self.proc.LogWarning("Warning: disk sync-ing has not returned a good"
4527
                             " status.\nPlease check the instance.")
4528

    
4529

    
4530
class LUQueryInstanceData(NoHooksLU):
4531
  """Query runtime instance data.
4532

4533
  """
4534
  _OP_REQP = ["instances", "static"]
4535
  REQ_BGL = False
4536

    
4537
  def ExpandNames(self):
4538
    self.needed_locks = {}
4539
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
4540

    
4541
    if not isinstance(self.op.instances, list):
4542
      raise errors.OpPrereqError("Invalid argument type 'instances'")
4543

    
4544
    if self.op.instances:
4545
      self.wanted_names = []
4546
      for name in self.op.instances:
4547
        full_name = self.cfg.ExpandInstanceName(name)
4548
        if full_name is None:
4549
          raise errors.OpPrereqError("Instance '%s' not known" %
4550
                                     self.op.instance_name)
4551
        self.wanted_names.append(full_name)
4552
      self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
4553
    else:
4554
      self.wanted_names = None
4555
      self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
4556

    
4557
    self.needed_locks[locking.LEVEL_NODE] = []
4558
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4559

    
4560
  def DeclareLocks(self, level):
4561
    if level == locking.LEVEL_NODE:
4562
      self._LockInstancesNodes()
4563

    
4564
  def CheckPrereq(self):
4565
    """Check prerequisites.
4566

4567
    This only checks the optional instance list against the existing names.
4568

4569
    """
4570
    if self.wanted_names is None:
4571
      self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
4572

    
4573
    self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
4574
                             in self.wanted_names]
4575
    return
4576

    
4577
  def _ComputeDiskStatus(self, instance, snode, dev):
4578
    """Compute block device status.
4579

4580
    """
4581
    static = self.op.static
4582
    if not static:
4583
      self.cfg.SetDiskID(dev, instance.primary_node)
4584
      dev_pstatus = self.rpc.call_blockdev_find(instance.primary_node, dev)
4585
    else:
4586
      dev_pstatus = None
4587

    
4588
    if dev.dev_type in constants.LDS_DRBD:
4589
      # we change the snode then (otherwise we use the one passed in)
4590
      if dev.logical_id[0] == instance.primary_node:
4591
        snode = dev.logical_id[1]
4592
      else:
4593
        snode = dev.logical_id[0]
4594

    
4595
    if snode and not static:
4596
      self.cfg.SetDiskID(dev, snode)
4597
      dev_sstatus = self.rpc.call_blockdev_find(snode, dev)
4598
    else:
4599
      dev_sstatus = None
4600

    
4601
    if dev.children:
4602
      dev_children = [self._ComputeDiskStatus(instance, snode, child)
4603
                      for child in dev.children]
4604
    else:
4605
      dev_children = []
4606

    
4607
    data = {
4608
      "iv_name": dev.iv_name,
4609
      "dev_type": dev.dev_type,
4610
      "logical_id": dev.logical_id,
4611
      "physical_id": dev.physical_id,
4612
      "pstatus": dev_pstatus,
4613
      "sstatus": dev_sstatus,
4614
      "children": dev_children,
4615
      "mode": dev.mode,
4616
      }
4617

    
4618
    return data
4619

    
4620
  def Exec(self, feedback_fn):
4621
    """Gather and return data"""
4622
    result = {}
4623

    
4624
    cluster = self.cfg.GetClusterInfo()
4625

    
4626
    for instance in self.wanted_instances:
4627
      if not self.op.static:
4628
        remote_info = self.rpc.call_instance_info(instance.primary_node,
4629
                                                  instance.name,
4630
                                                  instance.hypervisor)
4631
        if remote_info and "state" in remote_info:
4632
          remote_state = "up"
4633
        else:
4634
          remote_state = "down"
4635
      else:
4636
        remote_state = None
4637
      if instance.status == "down":
4638
        config_state = "down"
4639
      else:
4640
        config_state = "up"
4641

    
4642
      disks = [self._ComputeDiskStatus(instance, None, device)
4643
               for device in instance.disks]
4644

    
4645
      idict = {
4646
        "name": instance.name,
4647
        "config_state": config_state,
4648
        "run_state": remote_state,
4649
        "pnode": instance.primary_node,
4650
        "snodes": instance.secondary_nodes,
4651
        "os": instance.os,
4652
        "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
4653
        "disks": disks,
4654
        "hypervisor": instance.hypervisor,
4655
        "network_port": instance.network_port,
4656
        "hv_instance": instance.hvparams,
4657
        "hv_actual": cluster.FillHV(instance),
4658
        "be_instance": instance.beparams,
4659
        "be_actual": cluster.FillBE(instance),
4660
        }
4661

    
4662
      result[instance.name] = idict
4663

    
4664
    return result
4665

    
4666

    
4667
class LUSetInstanceParams(LogicalUnit):
4668
  """Modifies an instances's parameters.
4669

4670
  """
4671
  HPATH = "instance-modify"
4672
  HTYPE = constants.HTYPE_INSTANCE
4673
  _OP_REQP = ["instance_name"]
4674
  REQ_BGL = False
4675

    
4676
  def CheckArguments(self):
4677
    if not hasattr(self.op, 'nics'):
4678
      self.op.nics = []
4679
    if not hasattr(self.op, 'disks'):
4680
      self.op.disks = []
4681
    if not hasattr(self.op, 'beparams'):
4682
      self.op.beparams = {}
4683
    if not hasattr(self.op, 'hvparams'):
4684
      self.op.hvparams = {}
4685
    self.op.force = getattr(self.op, "force", False)
4686
    if not (self.op.nics or self.op.disks or
4687
            self.op.hvparams or self.op.beparams):
4688
      raise errors.OpPrereqError("No changes submitted")
4689

    
4690
    for item in (constants.BE_MEMORY, constants.BE_VCPUS):
4691
      val = self.op.beparams.get(item, None)
4692
      if val is not None:
4693
        try:
4694
          val = int(val)
4695
        except ValueError, err:
4696
          raise errors.OpPrereqError("Invalid %s size: %s" % (item, str(err)))
4697
        self.op.beparams[item] = val
4698
    # Disk validation
4699
    disk_addremove = 0
4700
    for disk_op, disk_dict in self.op.disks:
4701
      if disk_op == constants.DDM_REMOVE:
4702
        disk_addremove += 1
4703
        continue
4704
      elif disk_op == constants.DDM_ADD:
4705
        disk_addremove += 1
4706
      else:
4707
        if not isinstance(disk_op, int):
4708
          raise errors.OpPrereqError("Invalid disk index")
4709
      if disk_op == constants.DDM_ADD:
4710
        mode = disk_dict.setdefault('mode', constants.DISK_RDWR)
4711
        if mode not in (constants.DISK_RDONLY, constants.DISK_RDWR):
4712
          raise errors.OpPrereqError("Invalid disk access mode '%s'" % mode)
4713
        size = disk_dict.get('size', None)
4714
        if size is None:
4715
          raise errors.OpPrereqError("Required disk parameter size missing")
4716
        try:
4717
          size = int(size)
4718
        except ValueError, err:
4719
          raise errors.OpPrereqError("Invalid disk size parameter: %s" %
4720
                                     str(err))
4721
        disk_dict['size'] = size
4722
      else:
4723
        # modification of disk
4724
        if 'size' in disk_dict:
4725
          raise errors.OpPrereqError("Disk size change not possible, use"
4726
                                     " grow-disk")
4727

    
4728
    if disk_addremove > 1:
4729
      raise errors.OpPrereqError("Only one disk add or remove operation"
4730
                                 " supported at a time")
4731

    
4732
    # NIC validation
4733
    nic_addremove = 0
4734
    for nic_op, nic_dict in self.op.nics:
4735
      if nic_op == constants.DDM_REMOVE:
4736
        nic_addremove += 1
4737
        continue
4738
      elif nic_op == constants.DDM_ADD:
4739
        nic_addremove += 1
4740
      else:
4741
        if not isinstance(nic_op, int):
4742
          raise errors.OpPrereqError("Invalid nic index")
4743

    
4744
      # nic_dict should be a dict
4745
      nic_ip = nic_dict.get('ip', None)
4746
      if nic_ip is not None:
4747
        if nic_ip.lower() == "none":
4748
          nic_dict['ip'] = None
4749
        else:
4750
          if not utils.IsValidIP(nic_ip):
4751
            raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip)
4752
      # we can only check None bridges and assign the default one
4753
      nic_bridge = nic_dict.get('bridge', None)
4754
      if nic_bridge is None:
4755
        nic_dict['bridge'] = self.cfg.GetDefBridge()
4756
      # but we can validate MACs
4757
      nic_mac = nic_dict.get('mac', None)
4758
      if nic_mac is not None:
4759
        if self.cfg.IsMacInUse(nic_mac):
4760
          raise errors.OpPrereqError("MAC address %s already in use"
4761
                                     " in cluster" % nic_mac)
4762
        if nic_mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4763
          if not utils.IsValidMac(nic_mac):
4764
            raise errors.OpPrereqError("Invalid MAC address %s" % nic_mac)
4765
    if nic_addremove > 1:
4766
      raise errors.OpPrereqError("Only one NIC add or remove operation"
4767
                                 " supported at a time")
4768

    
4769
  def ExpandNames(self):
4770
    self._ExpandAndLockInstance()
4771
    self.needed_locks[locking.LEVEL_NODE] = []
4772
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4773

    
4774
  def DeclareLocks(self, level):
4775
    if level == locking.LEVEL_NODE:
4776
      self._LockInstancesNodes()
4777

    
4778
  def BuildHooksEnv(self):
4779
    """Build hooks env.
4780

4781
    This runs on the master, primary and secondaries.
4782

4783
    """
4784
    args = dict()
4785
    if constants.BE_MEMORY in self.be_new:
4786
      args['memory'] = self.be_new[constants.BE_MEMORY]
4787
    if constants.BE_VCPUS in self.be_new:
4788
      args['vcpus'] = self.be_new[constants.BE_VCPUS]
4789
    # FIXME: readd disk/nic changes
4790
    env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
4791
    nl = [self.cfg.GetMasterNode(),
4792
          self.instance.primary_node] + list(self.instance.secondary_nodes)
4793
    return env, nl, nl
4794

    
4795
  def CheckPrereq(self):
4796
    """Check prerequisites.
4797

4798
    This only checks the instance list against the existing names.
4799

4800
    """
4801
    force = self.force = self.op.force
4802

    
4803
    # checking the new params on the primary/secondary nodes
4804

    
4805
    instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4806
    assert self.instance is not None, \
4807
      "Cannot retrieve locked instance %s" % self.op.instance_name
4808
    pnode = self.instance.primary_node
4809
    nodelist = [pnode]
4810
    nodelist.extend(instance.secondary_nodes)
4811

    
4812
    # hvparams processing
4813
    if self.op.hvparams:
4814
      i_hvdict = copy.deepcopy(instance.hvparams)
4815
      for key, val in self.op.hvparams.iteritems():
4816
        if val is None:
4817
          try:
4818
            del i_hvdict[key]
4819
          except KeyError:
4820
            pass
4821
        else:
4822
          i_hvdict[key] = val
4823
      cluster = self.cfg.GetClusterInfo()
4824
      hv_new = cluster.FillDict(cluster.hvparams[instance.hypervisor],
4825
                                i_hvdict)
4826
      # local check
4827
      hypervisor.GetHypervisor(
4828
        instance.hypervisor).CheckParameterSyntax(hv_new)
4829
      _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
4830
      self.hv_new = hv_new # the new actual values
4831
      self.hv_inst = i_hvdict # the new dict (without defaults)
4832
    else:
4833
      self.hv_new = self.hv_inst = {}
4834

    
4835
    # beparams processing
4836
    if self.op.beparams:
4837
      i_bedict = copy.deepcopy(instance.beparams)
4838
      for key, val in self.op.beparams.iteritems():
4839
        if val is None:
4840
          try:
4841
            del i_bedict[key]
4842
          except KeyError:
4843
            pass
4844
        else:
4845
          i_bedict[key] = val
4846
      cluster = self.cfg.GetClusterInfo()
4847
      be_new = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
4848
                                i_bedict)
4849
      self.be_new = be_new # the new actual values
4850
      self.be_inst = i_bedict # the new dict (without defaults)
4851
    else:
4852
      self.be_new = self.be_inst = {}
4853

    
4854
    self.warn = []
4855

    
4856
    if constants.BE_MEMORY in self.op.beparams and not self.force:
4857
      mem_check_list = [pnode]
4858
      if be_new[constants.BE_AUTO_BALANCE]:
4859
        # either we changed auto_balance to yes or it was from before
4860
        mem_check_list.extend(instance.secondary_nodes)
4861
      instance_info = self.rpc.call_instance_info(pnode, instance.name,
4862
                                                  instance.hypervisor)
4863
      nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(),
4864
                                         instance.hypervisor)
4865

    
4866
      if pnode not in nodeinfo or not isinstance(nodeinfo[pnode], dict):
4867
        # Assume the primary node is unreachable and go ahead
4868
        self.warn.append("Can't get info from primary node %s" % pnode)
4869
      else:
4870
        if instance_info:
4871
          current_mem = instance_info['memory']
4872
        else:
4873
          # Assume instance not running
4874
          # (there is a slight race condition here, but it's not very probable,
4875
          # and we have no other way to check)
4876
          current_mem = 0
4877
        miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
4878
                    nodeinfo[pnode]['memory_free'])
4879
        if miss_mem > 0:
4880
          raise errors.OpPrereqError("This change will prevent the instance"
4881
                                     " from starting, due to %d MB of memory"
4882
                                     " missing on its primary node" % miss_mem)
4883

    
4884
      if be_new[constants.BE_AUTO_BALANCE]:
4885
        for node in instance.secondary_nodes:
4886
          if node not in nodeinfo or not isinstance(nodeinfo[node], dict):
4887
            self.warn.append("Can't get info from secondary node %s" % node)
4888
          elif be_new[constants.BE_MEMORY] > nodeinfo[node]['memory_free']:
4889
            self.warn.append("Not enough memory to failover instance to"
4890
                             " secondary node %s" % node)
4891

    
4892
    # NIC processing
4893
    for nic_op, nic_dict in self.op.nics:
4894
      if nic_op == constants.DDM_REMOVE:
4895
        if not instance.nics:
4896
          raise errors.OpPrereqError("Instance has no NICs, cannot remove")
4897
        continue
4898
      if nic_op != constants.DDM_ADD:
4899
        # an existing nic
4900
        if nic_op < 0 or nic_op >= len(instance.nics):
4901
          raise errors.OpPrereqError("Invalid NIC index %s, valid values"
4902
                                     " are 0 to %d" %
4903
                                     (nic_op, len(instance.nics)))
4904
      nic_bridge = nic_dict.get('bridge', None)
4905
      if nic_bridge is not None:
4906
        if not self.rpc.call_bridges_exist(pnode, [nic_bridge]):
4907
          msg = ("Bridge '%s' doesn't exist on one of"
4908
                 " the instance nodes" % nic_bridge)
4909
          if self.force:
4910
            self.warn.append(msg)
4911
          else:
4912
            raise errors.OpPrereqError(msg)
4913

    
4914
    # DISK processing
4915
    if self.op.disks and instance.disk_template == constants.DT_DISKLESS:
4916
      raise errors.OpPrereqError("Disk operations not supported for"
4917
                                 " diskless instances")
4918
    for disk_op, disk_dict in self.op.disks:
4919
      if disk_op == constants.DDM_REMOVE:
4920
        if len(instance.disks) == 1:
4921
          raise errors.OpPrereqError("Cannot remove the last disk of"
4922
                                     " an instance")
4923
        ins_l = self.rpc.call_instance_list([pnode], [instance.hypervisor])
4924
        ins_l = ins_l[pnode]
4925
        if not type(ins_l) is list:
4926
          raise errors.OpPrereqError("Can't contact node '%s'" % pnode)
4927
        if instance.name in ins_l:
4928
          raise errors.OpPrereqError("Instance is running, can't remove"
4929
                                     " disks.")
4930

    
4931
      if (disk_op == constants.DDM_ADD and
4932
          len(instance.nics) >= constants.MAX_DISKS):
4933
        raise errors.OpPrereqError("Instance has too many disks (%d), cannot"
4934
                                   " add more" % constants.MAX_DISKS)
4935
      if disk_op not in (constants.DDM_ADD, constants.DDM_REMOVE):
4936
        # an existing disk
4937
        if disk_op < 0 or disk_op >= len(instance.disks):
4938
          raise errors.OpPrereqError("Invalid disk index %s, valid values"
4939
                                     " are 0 to %d" %
4940
                                     (disk_op, len(instance.disks)))
4941

    
4942
    return
4943

    
4944
  def Exec(self, feedback_fn):
4945
    """Modifies an instance.
4946

4947
    All parameters take effect only at the next restart of the instance.
4948

4949
    """
4950
    # Process here the warnings from CheckPrereq, as we don't have a
4951
    # feedback_fn there.
4952
    for warn in self.warn:
4953
      feedback_fn("WARNING: %s" % warn)
4954

    
4955
    result = []
4956
    instance = self.instance
4957
    # disk changes
4958
    for disk_op, disk_dict in self.op.disks:
4959
      if disk_op == constants.DDM_REMOVE:
4960
        # remove the last disk
4961
        device = instance.disks.pop()
4962
        device_idx = len(instance.disks)
4963
        for node, disk in device.ComputeNodeTree(instance.primary_node):
4964
          self.cfg.SetDiskID(disk, node)
4965
          if not self.rpc.call_blockdev_remove(node, disk):
4966
            self.proc.LogWarning("Could not remove disk/%d on node %s,"
4967
                                 " continuing anyway", device_idx, node)
4968
        result.append(("disk/%d" % device_idx, "remove"))
4969
      elif disk_op == constants.DDM_ADD:
4970
        # add a new disk
4971
        if instance.disk_template == constants.DT_FILE:
4972
          file_driver, file_path = instance.disks[0].logical_id
4973
          file_path = os.path.dirname(file_path)
4974
        else:
4975
          file_driver = file_path = None
4976
        disk_idx_base = len(instance.disks)
4977
        new_disk = _GenerateDiskTemplate(self,
4978
                                         instance.disk_template,
4979
                                         instance, instance.primary_node,
4980
                                         instance.secondary_nodes,
4981
                                         [disk_dict],
4982
                                         file_path,
4983
                                         file_driver,
4984
                                         disk_idx_base)[0]
4985
        new_disk.mode = disk_dict['mode']
4986
        instance.disks.append(new_disk)
4987
        info = _GetInstanceInfoText(instance)
4988

    
4989
        logging.info("Creating volume %s for instance %s",
4990
                     new_disk.iv_name, instance.name)
4991
        # Note: this needs to be kept in sync with _CreateDisks
4992
        #HARDCODE
4993
        for secondary_node in instance.secondary_nodes:
4994
          if not _CreateBlockDevOnSecondary(self, secondary_node, instance,
4995
                                            new_disk, False, info):
4996
            self.LogWarning("Failed to create volume %s (%s) on"
4997
                            " secondary node %s!",
4998
                            new_disk.iv_name, new_disk, secondary_node)
4999
        #HARDCODE
5000
        if not _CreateBlockDevOnPrimary(self, instance.primary_node,
5001
                                        instance, new_disk, info):
5002
          self.LogWarning("Failed to create volume %s on primary!",
5003
                          new_disk.iv_name)
5004
        result.append(("disk/%d" % disk_idx_base, "add:size=%s,mode=%s" %
5005
                       (new_disk.size, new_disk.mode)))
5006
      else:
5007
        # change a given disk
5008
        instance.disks[disk_op].mode = disk_dict['mode']
5009
        result.append(("disk.mode/%d" % disk_op, disk_dict['mode']))
5010
    # NIC changes
5011
    for nic_op, nic_dict in self.op.nics:
5012
      if nic_op == constants.DDM_REMOVE:
5013
        # remove the last nic
5014
        del instance.nics[-1]
5015
        result.append(("nic.%d" % len(instance.nics), "remove"))
5016
      elif nic_op == constants.DDM_ADD:
5017
        # add a new nic
5018
        if 'mac' not in nic_dict:
5019
          mac = constants.VALUE_GENERATE
5020
        else:
5021
          mac = nic_dict['mac']
5022
        if mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5023
          mac = self.cfg.GenerateMAC()
5024
        new_nic = objects.NIC(mac=mac, ip=nic_dict.get('ip', None),
5025
                              bridge=nic_dict.get('bridge', None))
5026
        instance.nics.append(new_nic)
5027
        result.append(("nic.%d" % (len(instance.nics) - 1),
5028
                       "add:mac=%s,ip=%s,bridge=%s" %
5029
                       (new_nic.mac, new_nic.ip, new_nic.bridge)))
5030
      else:
5031
        # change a given nic
5032
        for key in 'mac', 'ip', 'bridge':
5033
          if key in nic_dict:
5034
            setattr(instance.nics[nic_op], key, nic_dict[key])
5035
            result.append(("nic.%s/%d" % (key, nic_op), nic_dict[key]))
5036

    
5037
    # hvparams changes
5038
    if self.op.hvparams:
5039
      instance.hvparams = self.hv_new
5040
      for key, val in self.op.hvparams.iteritems():
5041
        result.append(("hv/%s" % key, val))
5042

    
5043
    # beparams changes
5044
    if self.op.beparams:
5045
      instance.beparams = self.be_inst
5046
      for key, val in self.op.beparams.iteritems():
5047
        result.append(("be/%s" % key, val))
5048

    
5049
    self.cfg.Update(instance)
5050

    
5051
    return result
5052

    
5053

    
5054
class LUQueryExports(NoHooksLU):
5055
  """Query the exports list
5056

5057
  """
5058
  _OP_REQP = ['nodes']
5059
  REQ_BGL = False
5060

    
5061
  def ExpandNames(self):
5062
    self.needed_locks = {}
5063
    self.share_locks[locking.LEVEL_NODE] = 1
5064
    if not self.op.nodes:
5065
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5066
    else:
5067
      self.needed_locks[locking.LEVEL_NODE] = \
5068
        _GetWantedNodes(self, self.op.nodes)
5069

    
5070
  def CheckPrereq(self):
5071
    """Check prerequisites.
5072

5073
    """
5074
    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
5075

    
5076
  def Exec(self, feedback_fn):
5077
    """Compute the list of all the exported system images.
5078

5079
    @rtype: dict
5080
    @return: a dictionary with the structure node->(export-list)
5081
        where export-list is a list of the instances exported on
5082
        that node.
5083

5084
    """
5085
    return self.rpc.call_export_list(self.nodes)
5086

    
5087

    
5088
class LUExportInstance(LogicalUnit):
5089
  """Export an instance to an image in the cluster.
5090

5091
  """
5092
  HPATH = "instance-export"
5093
  HTYPE = constants.HTYPE_INSTANCE
5094
  _OP_REQP = ["instance_name", "target_node", "shutdown"]
5095
  REQ_BGL = False
5096

    
5097
  def ExpandNames(self):
5098
    self._ExpandAndLockInstance()
5099
    # FIXME: lock only instance primary and destination node
5100
    #
5101
    # Sad but true, for now we have do lock all nodes, as we don't know where
5102
    # the previous export might be, and and in this LU we search for it and
5103
    # remove it from its current node. In the future we could fix this by:
5104
    #  - making a tasklet to search (share-lock all), then create the new one,
5105
    #    then one to remove, after
5106
    #  - removing the removal operation altoghether
5107
    self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5108

    
5109
  def DeclareLocks(self, level):
5110
    """Last minute lock declaration."""
5111
    # All nodes are locked anyway, so nothing to do here.
5112

    
5113
  def BuildHooksEnv(self):
5114
    """Build hooks env.
5115

5116
    This will run on the master, primary node and target node.
5117

5118
    """
5119
    env = {
5120
      "EXPORT_NODE": self.op.target_node,
5121
      "EXPORT_DO_SHUTDOWN": self.op.shutdown,
5122
      }
5123
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5124
    nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
5125
          self.op.target_node]
5126
    return env, nl, nl
5127

    
5128
  def CheckPrereq(self):
5129
    """Check prerequisites.
5130

5131
    This checks that the instance and node names are valid.
5132

5133
    """
5134
    instance_name = self.op.instance_name
5135
    self.instance = self.cfg.GetInstanceInfo(instance_name)
5136
    assert self.instance is not None, \
5137
          "Cannot retrieve locked instance %s" % self.op.instance_name
5138

    
5139
    self.dst_node = self.cfg.GetNodeInfo(
5140
      self.cfg.ExpandNodeName(self.op.target_node))
5141

    
5142
    if self.dst_node is None:
5143
      # This is wrong node name, not a non-locked node
5144
      raise errors.OpPrereqError("Wrong node name %s" % self.op.target_node)
5145

    
5146
    # instance disk type verification
5147
    for disk in self.instance.disks:
5148
      if disk.dev_type == constants.LD_FILE:
5149
        raise errors.OpPrereqError("Export not supported for instances with"
5150
                                   " file-based disks")
5151

    
5152
  def Exec(self, feedback_fn):
5153
    """Export an instance to an image in the cluster.
5154

5155
    """
5156
    instance = self.instance
5157
    dst_node = self.dst_node
5158
    src_node = instance.primary_node
5159
    if self.op.shutdown:
5160
      # shutdown the instance, but not the disks
5161
      if not self.rpc.call_instance_shutdown(src_node, instance):
5162
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
5163
                                 (instance.name, src_node))
5164

    
5165
    vgname = self.cfg.GetVGName()
5166

    
5167
    snap_disks = []
5168

    
5169
    try:
5170
      for disk in instance.disks:
5171
        # new_dev_name will be a snapshot of an lvm leaf of the one we passed
5172
        new_dev_name = self.rpc.call_blockdev_snapshot(src_node, disk)
5173

    
5174
        if not new_dev_name:
5175
          self.LogWarning("Could not snapshot block device %s on node %s",
5176
                          disk.logical_id[1], src_node)
5177
          snap_disks.append(False)
5178
        else:
5179
          new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
5180
                                 logical_id=(vgname, new_dev_name),
5181
                                 physical_id=(vgname, new_dev_name),
5182
                                 iv_name=disk.iv_name)
5183
          snap_disks.append(new_dev)
5184

    
5185
    finally:
5186
      if self.op.shutdown and instance.status == "up":
5187
        if not self.rpc.call_instance_start(src_node, instance, None):
5188
          _ShutdownInstanceDisks(self, instance)
5189
          raise errors.OpExecError("Could not start instance")
5190

    
5191
    # TODO: check for size
5192

    
5193
    cluster_name = self.cfg.GetClusterName()
5194
    for idx, dev in enumerate(snap_disks):
5195
      if dev:
5196
        if not self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
5197
                                             instance, cluster_name, idx):
5198
          self.LogWarning("Could not export block device %s from node %s to"
5199
                          " node %s", dev.logical_id[1], src_node,
5200
                          dst_node.name)
5201
        if not self.rpc.call_blockdev_remove(src_node, dev):
5202
          self.LogWarning("Could not remove snapshot block device %s from node"
5203
                          " %s", dev.logical_id[1], src_node)
5204

    
5205
    if not self.rpc.call_finalize_export(dst_node.name, instance, snap_disks):
5206
      self.LogWarning("Could not finalize export for instance %s on node %s",
5207
                      instance.name, dst_node.name)
5208

    
5209
    nodelist = self.cfg.GetNodeList()
5210
    nodelist.remove(dst_node.name)
5211

    
5212
    # on one-node clusters nodelist will be empty after the removal
5213
    # if we proceed the backup would be removed because OpQueryExports
5214
    # substitutes an empty list with the full cluster node list.
5215
    if nodelist:
5216
      exportlist = self.rpc.call_export_list(nodelist)
5217
      for node in exportlist:
5218
        if instance.name in exportlist[node]:
5219
          if not self.rpc.call_export_remove(node, instance.name):
5220
            self.LogWarning("Could not remove older export for instance %s"
5221
                            " on node %s", instance.name, node)
5222

    
5223

    
5224
class LURemoveExport(NoHooksLU):
5225
  """Remove exports related to the named instance.
5226

5227
  """
5228
  _OP_REQP = ["instance_name"]
5229
  REQ_BGL = False
5230

    
5231
  def ExpandNames(self):
5232
    self.needed_locks = {}
5233
    # We need all nodes to be locked in order for RemoveExport to work, but we
5234
    # don't need to lock the instance itself, as nothing will happen to it (and
5235
    # we can remove exports also for a removed instance)
5236
    self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5237

    
5238
  def CheckPrereq(self):
5239
    """Check prerequisites.
5240
    """
5241
    pass
5242

    
5243
  def Exec(self, feedback_fn):
5244
    """Remove any export.
5245

5246
    """
5247
    instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
5248
    # If the instance was not found we'll try with the name that was passed in.
5249
    # This will only work if it was an FQDN, though.
5250
    fqdn_warn = False
5251
    if not instance_name:
5252
      fqdn_warn = True
5253
      instance_name = self.op.instance_name
5254

    
5255
    exportlist = self.rpc.call_export_list(self.acquired_locks[
5256
      locking.LEVEL_NODE])
5257
    found = False
5258
    for node in exportlist:
5259
      if instance_name in exportlist[node]:
5260
        found = True
5261
        if not self.rpc.call_export_remove(node, instance_name):
5262
          logging.error("Could not remove export for instance %s"
5263
                        " on node %s", instance_name, node)
5264

    
5265
    if fqdn_warn and not found:
5266
      feedback_fn("Export not found. If trying to remove an export belonging"
5267
                  " to a deleted instance please use its Fully Qualified"
5268
                  " Domain Name.")
5269

    
5270

    
5271
class TagsLU(NoHooksLU):
5272
  """Generic tags LU.
5273

5274
  This is an abstract class which is the parent of all the other tags LUs.
5275

5276
  """
5277

    
5278
  def ExpandNames(self):
5279
    self.needed_locks = {}
5280
    if self.op.kind == constants.TAG_NODE:
5281
      name = self.cfg.ExpandNodeName(self.op.name)
5282
      if name is None:
5283
        raise errors.OpPrereqError("Invalid node name (%s)" %
5284
                                   (self.op.name,))
5285
      self.op.name = name
5286
      self.needed_locks[locking.LEVEL_NODE] = name
5287
    elif self.op.kind == constants.TAG_INSTANCE:
5288
      name = self.cfg.ExpandInstanceName(self.op.name)
5289
      if name is None:
5290
        raise errors.OpPrereqError("Invalid instance name (%s)" %
5291
                                   (self.op.name,))
5292
      self.op.name = name
5293
      self.needed_locks[locking.LEVEL_INSTANCE] = name
5294

    
5295
  def CheckPrereq(self):
5296
    """Check prerequisites.
5297

5298
    """
5299
    if self.op.kind == constants.TAG_CLUSTER:
5300
      self.target = self.cfg.GetClusterInfo()
5301
    elif self.op.kind == constants.TAG_NODE:
5302
      self.target = self.cfg.GetNodeInfo(self.op.name)
5303
    elif self.op.kind == constants.TAG_INSTANCE:
5304
      self.target = self.cfg.GetInstanceInfo(self.op.name)
5305
    else:
5306
      raise errors.OpPrereqError("Wrong tag type requested (%s)" %
5307
                                 str(self.op.kind))
5308

    
5309

    
5310
class LUGetTags(TagsLU):
5311
  """Returns the tags of a given object.
5312

5313
  """
5314
  _OP_REQP = ["kind", "name"]
5315
  REQ_BGL = False
5316

    
5317
  def Exec(self, feedback_fn):
5318
    """Returns the tag list.
5319

5320
    """
5321
    return list(self.target.GetTags())
5322

    
5323

    
5324
class LUSearchTags(NoHooksLU):
5325
  """Searches the tags for a given pattern.
5326

5327
  """
5328
  _OP_REQP = ["pattern"]
5329
  REQ_BGL = False
5330

    
5331
  def ExpandNames(self):
5332
    self.needed_locks = {}
5333

    
5334
  def CheckPrereq(self):
5335
    """Check prerequisites.
5336

5337
    This checks the pattern passed for validity by compiling it.
5338

5339
    """
5340
    try:
5341
      self.re = re.compile(self.op.pattern)
5342
    except re.error, err:
5343
      raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
5344
                                 (self.op.pattern, err))
5345

    
5346
  def Exec(self, feedback_fn):
5347
    """Returns the tag list.
5348

5349
    """
5350
    cfg = self.cfg
5351
    tgts = [("/cluster", cfg.GetClusterInfo())]
5352
    ilist = cfg.GetAllInstancesInfo().values()
5353
    tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
5354
    nlist = cfg.GetAllNodesInfo().values()
5355
    tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
5356
    results = []
5357
    for path, target in tgts:
5358
      for tag in target.GetTags():
5359
        if self.re.search(tag):
5360
          results.append((path, tag))
5361
    return results
5362

    
5363

    
5364
class LUAddTags(TagsLU):
5365
  """Sets a tag on a given object.
5366

5367
  """
5368
  _OP_REQP = ["kind", "name", "tags"]
5369
  REQ_BGL = False
5370

    
5371
  def CheckPrereq(self):
5372
    """Check prerequisites.
5373

5374
    This checks the type and length of the tag name and value.
5375

5376
    """
5377
    TagsLU.CheckPrereq(self)
5378
    for tag in self.op.tags:
5379
      objects.TaggableObject.ValidateTag(tag)
5380

    
5381
  def Exec(self, feedback_fn):
5382
    """Sets the tag.
5383

5384
    """
5385
    try:
5386
      for tag in self.op.tags:
5387
        self.target.AddTag(tag)
5388
    except errors.TagError, err:
5389
      raise errors.OpExecError("Error while setting tag: %s" % str(err))
5390
    try:
5391
      self.cfg.Update(self.target)
5392
    except errors.ConfigurationError:
5393
      raise errors.OpRetryError("There has been a modification to the"
5394
                                " config file and the operation has been"
5395
                                " aborted. Please retry.")
5396

    
5397

    
5398
class LUDelTags(TagsLU):
5399
  """Delete a list of tags from a given object.
5400

5401
  """
5402
  _OP_REQP = ["kind", "name", "tags"]
5403
  REQ_BGL = False
5404

    
5405
  def CheckPrereq(self):
5406
    """Check prerequisites.
5407

5408
    This checks that we have the given tag.
5409

5410
    """
5411
    TagsLU.CheckPrereq(self)
5412
    for tag in self.op.tags:
5413
      objects.TaggableObject.ValidateTag(tag)
5414
    del_tags = frozenset(self.op.tags)
5415
    cur_tags = self.target.GetTags()
5416
    if not del_tags <= cur_tags:
5417
      diff_tags = del_tags - cur_tags
5418
      diff_names = ["'%s'" % tag for tag in diff_tags]
5419
      diff_names.sort()
5420
      raise errors.OpPrereqError("Tag(s) %s not found" %
5421
                                 (",".join(diff_names)))
5422

    
5423
  def Exec(self, feedback_fn):
5424
    """Remove the tag from the object.
5425

5426
    """
5427
    for tag in self.op.tags:
5428
      self.target.RemoveTag(tag)
5429
    try:
5430
      self.cfg.Update(self.target)
5431
    except errors.ConfigurationError:
5432
      raise errors.OpRetryError("There has been a modification to the"
5433
                                " config file and the operation has been"
5434
                                " aborted. Please retry.")
5435

    
5436

    
5437
class LUTestDelay(NoHooksLU):
5438
  """Sleep for a specified amount of time.
5439

5440
  This LU sleeps on the master and/or nodes for a specified amount of
5441
  time.
5442

5443
  """
5444
  _OP_REQP = ["duration", "on_master", "on_nodes"]
5445
  REQ_BGL = False
5446

    
5447
  def ExpandNames(self):
5448
    """Expand names and set required locks.
5449

5450
    This expands the node list, if any.
5451

5452
    """
5453
    self.needed_locks = {}
5454
    if self.op.on_nodes:
5455
      # _GetWantedNodes can be used here, but is not always appropriate to use
5456
      # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
5457
      # more information.
5458
      self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
5459
      self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
5460

    
5461
  def CheckPrereq(self):
5462
    """Check prerequisites.
5463

5464
    """
5465

    
5466
  def Exec(self, feedback_fn):
5467
    """Do the actual sleep.
5468

5469
    """
5470
    if self.op.on_master:
5471
      if not utils.TestDelay(self.op.duration):
5472
        raise errors.OpExecError("Error during master delay test")
5473
    if self.op.on_nodes:
5474
      result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
5475
      if not result:
5476
        raise errors.OpExecError("Complete failure from rpc call")
5477
      for node, node_result in result.items():
5478
        if not node_result:
5479
          raise errors.OpExecError("Failure during rpc call to node %s,"
5480
                                   " result: %s" % (node, node_result))
5481

    
5482

    
5483
class IAllocator(object):
5484
  """IAllocator framework.
5485

5486
  An IAllocator instance has three sets of attributes:
5487
    - cfg that is needed to query the cluster
5488
    - input data (all members of the _KEYS class attribute are required)
5489
    - four buffer attributes (in|out_data|text), that represent the
5490
      input (to the external script) in text and data structure format,
5491
      and the output from it, again in two formats
5492
    - the result variables from the script (success, info, nodes) for
5493
      easy usage
5494

5495
  """
5496
  _ALLO_KEYS = [
5497
    "mem_size", "disks", "disk_template",
5498
    "os", "tags", "nics", "vcpus", "hypervisor",
5499
    ]
5500
  _RELO_KEYS = [
5501
    "relocate_from",
5502
    ]
5503

    
5504
  def __init__(self, lu, mode, name, **kwargs):
5505
    self.lu = lu
5506
    # init buffer variables
5507
    self.in_text = self.out_text = self.in_data = self.out_data = None
5508
    # init all input fields so that pylint is happy
5509
    self.mode = mode
5510
    self.name = name
5511
    self.mem_size = self.disks = self.disk_template = None
5512
    self.os = self.tags = self.nics = self.vcpus = None
5513
    self.relocate_from = None
5514
    # computed fields
5515
    self.required_nodes = None
5516
    # init result fields
5517
    self.success = self.info = self.nodes = None
5518
    if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5519
      keyset = self._ALLO_KEYS
5520
    elif self.mode == constants.IALLOCATOR_MODE_RELOC:
5521
      keyset = self._RELO_KEYS
5522
    else:
5523
      raise errors.ProgrammerError("Unknown mode '%s' passed to the"
5524
                                   " IAllocator" % self.mode)
5525
    for key in kwargs:
5526
      if key not in keyset:
5527
        raise errors.ProgrammerError("Invalid input parameter '%s' to"
5528
                                     " IAllocator" % key)
5529
      setattr(self, key, kwargs[key])
5530
    for key in keyset:
5531
      if key not in kwargs:
5532
        raise errors.ProgrammerError("Missing input parameter '%s' to"
5533
                                     " IAllocator" % key)
5534
    self._BuildInputData()
5535

    
5536
  def _ComputeClusterData(self):
5537
    """Compute the generic allocator input data.
5538

5539
    This is the data that is independent of the actual operation.
5540

5541
    """
5542
    cfg = self.lu.cfg
5543
    cluster_info = cfg.GetClusterInfo()
5544
    # cluster data
5545
    data = {
5546
      "version": 1,
5547
      "cluster_name": cfg.GetClusterName(),
5548
      "cluster_tags": list(cluster_info.GetTags()),
5549
      "enable_hypervisors": list(cluster_info.enabled_hypervisors),
5550
      # we don't have job IDs
5551
      }
5552
    iinfo = cfg.GetAllInstancesInfo().values()
5553
    i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo]
5554

    
5555
    # node data
5556
    node_results = {}
5557
    node_list = cfg.GetNodeList()
5558

    
5559
    if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5560
      hypervisor = self.hypervisor
5561
    elif self.mode == constants.IALLOCATOR_MODE_RELOC:
5562
      hypervisor = cfg.GetInstanceInfo(self.name).hypervisor
5563

    
5564
    node_data = self.lu.rpc.call_node_info(node_list, cfg.GetVGName(),
5565
                                           hypervisor)
5566
    node_iinfo = self.lu.rpc.call_all_instances_info(node_list,
5567
                       cluster_info.enabled_hypervisors)
5568
    for nname in node_list:
5569
      ninfo = cfg.GetNodeInfo(nname)
5570
      if nname not in node_data or not isinstance(node_data[nname], dict):
5571
        raise errors.OpExecError("Can't get data for node %s" % nname)
5572
      remote_info = node_data[nname]
5573
      for attr in ['memory_total', 'memory_free', 'memory_dom0',
5574
                   'vg_size', 'vg_free', 'cpu_total']:
5575
        if attr not in remote_info:
5576
          raise errors.OpExecError("Node '%s' didn't return attribute '%s'" %
5577
                                   (nname, attr))
5578
        try:
5579
          remote_info[attr] = int(remote_info[attr])
5580
        except ValueError, err:
5581
          raise errors.OpExecError("Node '%s' returned invalid value for '%s':"
5582
                                   " %s" % (nname, attr, str(err)))
5583
      # compute memory used by primary instances
5584
      i_p_mem = i_p_up_mem = 0
5585
      for iinfo, beinfo in i_list:
5586
        if iinfo.primary_node == nname:
5587
          i_p_mem += beinfo[constants.BE_MEMORY]
5588
          if iinfo.name not in node_iinfo[nname]:
5589
            i_used_mem = 0
5590
          else:
5591
            i_used_mem = int(node_iinfo[nname][iinfo.name]['memory'])
5592
          i_mem_diff = beinfo[constants.BE_MEMORY] - i_used_mem
5593
          remote_info['memory_free'] -= max(0, i_mem_diff)
5594

    
5595
          if iinfo.status == "up":
5596
            i_p_up_mem += beinfo[constants.BE_MEMORY]
5597

    
5598
      # compute memory used by instances
5599
      pnr = {
5600
        "tags": list(ninfo.GetTags()),
5601
        "total_memory": remote_info['memory_total'],
5602
        "reserved_memory": remote_info['memory_dom0'],
5603
        "free_memory": remote_info['memory_free'],
5604
        "i_pri_memory": i_p_mem,
5605
        "i_pri_up_memory": i_p_up_mem,
5606
        "total_disk": remote_info['vg_size'],
5607
        "free_disk": remote_info['vg_free'],
5608
        "primary_ip": ninfo.primary_ip,
5609
        "secondary_ip": ninfo.secondary_ip,
5610
        "total_cpus": remote_info['cpu_total'],
5611
        }
5612
      node_results[nname] = pnr
5613
    data["nodes"] = node_results
5614

    
5615
    # instance data
5616
    instance_data = {}
5617
    for iinfo, beinfo in i_list:
5618
      nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
5619
                  for n in iinfo.nics]
5620
      pir = {
5621
        "tags": list(iinfo.GetTags()),
5622
        "should_run": iinfo.status == "up",
5623
        "vcpus": beinfo[constants.BE_VCPUS],
5624
        "memory": beinfo[constants.BE_MEMORY],
5625
        "os": iinfo.os,
5626
        "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
5627
        "nics": nic_data,
5628
        "disks": [{"size": dsk.size, "mode": "w"} for dsk in iinfo.disks],
5629
        "disk_template": iinfo.disk_template,
5630
        "hypervisor": iinfo.hypervisor,
5631
        }
5632
      instance_data[iinfo.name] = pir
5633

    
5634
    data["instances"] = instance_data
5635

    
5636
    self.in_data = data
5637

    
5638
  def _AddNewInstance(self):
5639
    """Add new instance data to allocator structure.
5640

5641
    This in combination with _AllocatorGetClusterData will create the
5642
    correct structure needed as input for the allocator.
5643

5644
    The checks for the completeness of the opcode must have already been
5645
    done.
5646

5647
    """
5648
    data = self.in_data
5649
    if len(self.disks) != 2:
5650
      raise errors.OpExecError("Only two-disk configurations supported")
5651

    
5652
    disk_space = _ComputeDiskSize(self.disk_template, self.disks)
5653

    
5654
    if self.disk_template in constants.DTS_NET_MIRROR:
5655
      self.required_nodes = 2
5656
    else:
5657
      self.required_nodes = 1
5658
    request = {
5659
      "type": "allocate",
5660
      "name": self.name,
5661
      "disk_template": self.disk_template,
5662
      "tags": self.tags,
5663
      "os": self.os,
5664
      "vcpus": self.vcpus,
5665
      "memory": self.mem_size,
5666
      "disks": self.disks,
5667
      "disk_space_total": disk_space,
5668
      "nics": self.nics,
5669
      "required_nodes": self.required_nodes,
5670
      }
5671
    data["request"] = request
5672

    
5673
  def _AddRelocateInstance(self):
5674
    """Add relocate instance data to allocator structure.
5675

5676
    This in combination with _IAllocatorGetClusterData will create the
5677
    correct structure needed as input for the allocator.
5678

5679
    The checks for the completeness of the opcode must have already been
5680
    done.
5681

5682
    """
5683
    instance = self.lu.cfg.GetInstanceInfo(self.name)
5684
    if instance is None:
5685
      raise errors.ProgrammerError("Unknown instance '%s' passed to"
5686
                                   " IAllocator" % self.name)
5687

    
5688
    if instance.disk_template not in constants.DTS_NET_MIRROR:
5689
      raise errors.OpPrereqError("Can't relocate non-mirrored instances")
5690

    
5691
    if len(instance.secondary_nodes) != 1:
5692
      raise errors.OpPrereqError("Instance has not exactly one secondary node")
5693

    
5694
    self.required_nodes = 1
5695
    disk_sizes = [{'size': disk.size} for disk in instance.disks]
5696
    disk_space = _ComputeDiskSize(instance.disk_template, disk_sizes)
5697

    
5698
    request = {
5699
      "type": "relocate",
5700
      "name": self.name,
5701
      "disk_space_total": disk_space,
5702
      "required_nodes": self.required_nodes,
5703
      "relocate_from": self.relocate_from,
5704
      }
5705
    self.in_data["request"] = request
5706

    
5707
  def _BuildInputData(self):
5708
    """Build input data structures.
5709

5710
    """
5711
    self._ComputeClusterData()
5712

    
5713
    if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5714
      self._AddNewInstance()
5715
    else:
5716
      self._AddRelocateInstance()
5717

    
5718
    self.in_text = serializer.Dump(self.in_data)
5719

    
5720
  def Run(self, name, validate=True, call_fn=None):
5721
    """Run an instance allocator and return the results.
5722

5723
    """
5724
    if call_fn is None:
5725
      call_fn = self.lu.rpc.call_iallocator_runner
5726
    data = self.in_text
5727

    
5728
    result = call_fn(self.lu.cfg.GetMasterNode(), name, self.in_text)
5729

    
5730
    if not isinstance(result, (list, tuple)) or len(result) != 4:
5731
      raise errors.OpExecError("Invalid result from master iallocator runner")
5732

    
5733
    rcode, stdout, stderr, fail = result
5734

    
5735
    if rcode == constants.IARUN_NOTFOUND:
5736
      raise errors.OpExecError("Can't find allocator '%s'" % name)
5737
    elif rcode == constants.IARUN_FAILURE:
5738
      raise errors.OpExecError("Instance allocator call failed: %s,"
5739
                               " output: %s" % (fail, stdout+stderr))
5740
    self.out_text = stdout
5741
    if validate:
5742
      self._ValidateResult()
5743

    
5744
  def _ValidateResult(self):
5745
    """Process the allocator results.
5746

5747
    This will process and if successful save the result in
5748
    self.out_data and the other parameters.
5749

5750
    """
5751
    try:
5752
      rdict = serializer.Load(self.out_text)
5753
    except Exception, err:
5754
      raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
5755

    
5756
    if not isinstance(rdict, dict):
5757
      raise errors.OpExecError("Can't parse iallocator results: not a dict")
5758

    
5759
    for key in "success", "info", "nodes":
5760
      if key not in rdict:
5761
        raise errors.OpExecError("Can't parse iallocator results:"
5762
                                 " missing key '%s'" % key)
5763
      setattr(self, key, rdict[key])
5764

    
5765
    if not isinstance(rdict["nodes"], list):
5766
      raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
5767
                               " is not a list")
5768
    self.out_data = rdict
5769

    
5770

    
5771
class LUTestAllocator(NoHooksLU):
5772
  """Run allocator tests.
5773

5774
  This LU runs the allocator tests
5775

5776
  """
5777
  _OP_REQP = ["direction", "mode", "name"]
5778

    
5779
  def CheckPrereq(self):
5780
    """Check prerequisites.
5781

5782
    This checks the opcode parameters depending on the director and mode test.
5783

5784
    """
5785
    if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5786
      for attr in ["name", "mem_size", "disks", "disk_template",
5787
                   "os", "tags", "nics", "vcpus"]:
5788
        if not hasattr(self.op, attr):
5789
          raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
5790
                                     attr)
5791
      iname = self.cfg.ExpandInstanceName(self.op.name)
5792
      if iname is not None:
5793
        raise errors.OpPrereqError("Instance '%s' already in the cluster" %
5794
                                   iname)
5795
      if not isinstance(self.op.nics, list):
5796
        raise errors.OpPrereqError("Invalid parameter 'nics'")
5797
      for row in self.op.nics:
5798
        if (not isinstance(row, dict) or
5799
            "mac" not in row or
5800
            "ip" not in row or
5801
            "bridge" not in row):
5802
          raise errors.OpPrereqError("Invalid contents of the"
5803
                                     " 'nics' parameter")
5804
      if not isinstance(self.op.disks, list):
5805
        raise errors.OpPrereqError("Invalid parameter 'disks'")
5806
      if len(self.op.disks) != 2:
5807
        raise errors.OpPrereqError("Only two-disk configurations supported")
5808
      for row in self.op.disks:
5809
        if (not isinstance(row, dict) or
5810
            "size" not in row or
5811
            not isinstance(row["size"], int) or
5812
            "mode" not in row or
5813
            row["mode"] not in ['r', 'w']):
5814
          raise errors.OpPrereqError("Invalid contents of the"
5815
                                     " 'disks' parameter")
5816
      if self.op.hypervisor is None:
5817
        self.op.hypervisor = self.cfg.GetHypervisorType()
5818
    elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
5819
      if not hasattr(self.op, "name"):
5820
        raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
5821
      fname = self.cfg.ExpandInstanceName(self.op.name)
5822
      if fname is None:
5823
        raise errors.OpPrereqError("Instance '%s' not found for relocation" %
5824
                                   self.op.name)
5825
      self.op.name = fname
5826
      self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
5827
    else:
5828
      raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
5829
                                 self.op.mode)
5830

    
5831
    if self.op.direction == constants.IALLOCATOR_DIR_OUT:
5832
      if not hasattr(self.op, "allocator") or self.op.allocator is None:
5833
        raise errors.OpPrereqError("Missing allocator name")
5834
    elif self.op.direction != constants.IALLOCATOR_DIR_IN:
5835
      raise errors.OpPrereqError("Wrong allocator test '%s'" %
5836
                                 self.op.direction)
5837

    
5838
  def Exec(self, feedback_fn):
5839
    """Run the allocator test.
5840

5841
    """
5842
    if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5843
      ial = IAllocator(self,
5844
                       mode=self.op.mode,
5845
                       name=self.op.name,
5846
                       mem_size=self.op.mem_size,
5847
                       disks=self.op.disks,
5848
                       disk_template=self.op.disk_template,
5849
                       os=self.op.os,
5850
                       tags=self.op.tags,
5851
                       nics=self.op.nics,
5852
                       vcpus=self.op.vcpus,
5853
                       hypervisor=self.op.hypervisor,
5854
                       )
5855
    else:
5856
      ial = IAllocator(self,
5857
                       mode=self.op.mode,
5858
                       name=self.op.name,
5859
                       relocate_from=list(self.relocate_from),
5860
                       )
5861

    
5862
    if self.op.direction == constants.IALLOCATOR_DIR_IN:
5863
      result = ial.in_text
5864
    else:
5865
      ial.Run(self.op.allocator, validate=False)
5866
      result = ial.out_text
5867
    return result