Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ 0e67cdbe

History | View | Annotate | Download (203.6 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0613,W0201
25

    
26
import os
27
import os.path
28
import sha
29
import time
30
import tempfile
31
import re
32
import platform
33
import logging
34
import copy
35

    
36
from ganeti import ssh
37
from ganeti import utils
38
from ganeti import errors
39
from ganeti import hypervisor
40
from ganeti import locking
41
from ganeti import constants
42
from ganeti import objects
43
from ganeti import opcodes
44
from ganeti import serializer
45

    
46

    
47
class LogicalUnit(object):
48
  """Logical Unit base class.
49

50
  Subclasses must follow these rules:
51
    - implement ExpandNames
52
    - implement CheckPrereq
53
    - implement Exec
54
    - implement BuildHooksEnv
55
    - redefine HPATH and HTYPE
56
    - optionally redefine their run requirements:
57
        REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
58

59
  Note that all commands require root permissions.
60

61
  """
62
  HPATH = None
63
  HTYPE = None
64
  _OP_REQP = []
65
  REQ_BGL = True
66

    
67
  def __init__(self, processor, op, context, rpc):
68
    """Constructor for LogicalUnit.
69

70
    This needs to be overriden in derived classes in order to check op
71
    validity.
72

73
    """
74
    self.proc = processor
75
    self.op = op
76
    self.cfg = context.cfg
77
    self.context = context
78
    self.rpc = rpc
79
    # Dicts used to declare locking needs to mcpu
80
    self.needed_locks = None
81
    self.acquired_locks = {}
82
    self.share_locks = dict(((i, 0) for i in locking.LEVELS))
83
    self.add_locks = {}
84
    self.remove_locks = {}
85
    # Used to force good behavior when calling helper functions
86
    self.recalculate_locks = {}
87
    self.__ssh = None
88
    # logging
89
    self.LogWarning = processor.LogWarning
90
    self.LogInfo = processor.LogInfo
91

    
92
    for attr_name in self._OP_REQP:
93
      attr_val = getattr(op, attr_name, None)
94
      if attr_val is None:
95
        raise errors.OpPrereqError("Required parameter '%s' missing" %
96
                                   attr_name)
97
    self.CheckArguments()
98

    
99
  def __GetSSH(self):
100
    """Returns the SshRunner object
101

102
    """
103
    if not self.__ssh:
104
      self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
105
    return self.__ssh
106

    
107
  ssh = property(fget=__GetSSH)
108

    
109
  def CheckArguments(self):
110
    """Check syntactic validity for the opcode arguments.
111

112
    This method is for doing a simple syntactic check and ensure
113
    validity of opcode parameters, without any cluster-related
114
    checks. While the same can be accomplished in ExpandNames and/or
115
    CheckPrereq, doing these separate is better because:
116

117
      - ExpandNames is left as as purely a lock-related function
118
      - CheckPrereq is run after we have aquired locks (and possible
119
        waited for them)
120

121
    The function is allowed to change the self.op attribute so that
122
    later methods can no longer worry about missing parameters.
123

124
    """
125
    pass
126

    
127
  def ExpandNames(self):
128
    """Expand names for this LU.
129

130
    This method is called before starting to execute the opcode, and it should
131
    update all the parameters of the opcode to their canonical form (e.g. a
132
    short node name must be fully expanded after this method has successfully
133
    completed). This way locking, hooks, logging, ecc. can work correctly.
134

135
    LUs which implement this method must also populate the self.needed_locks
136
    member, as a dict with lock levels as keys, and a list of needed lock names
137
    as values. Rules:
138

139
      - use an empty dict if you don't need any lock
140
      - if you don't need any lock at a particular level omit that level
141
      - don't put anything for the BGL level
142
      - if you want all locks at a level use locking.ALL_SET as a value
143

144
    If you need to share locks (rather than acquire them exclusively) at one
145
    level you can modify self.share_locks, setting a true value (usually 1) for
146
    that level. By default locks are not shared.
147

148
    Examples::
149

150
      # Acquire all nodes and one instance
151
      self.needed_locks = {
152
        locking.LEVEL_NODE: locking.ALL_SET,
153
        locking.LEVEL_INSTANCE: ['instance1.example.tld'],
154
      }
155
      # Acquire just two nodes
156
      self.needed_locks = {
157
        locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
158
      }
159
      # Acquire no locks
160
      self.needed_locks = {} # No, you can't leave it to the default value None
161

162
    """
163
    # The implementation of this method is mandatory only if the new LU is
164
    # concurrent, so that old LUs don't need to be changed all at the same
165
    # time.
166
    if self.REQ_BGL:
167
      self.needed_locks = {} # Exclusive LUs don't need locks.
168
    else:
169
      raise NotImplementedError
170

    
171
  def DeclareLocks(self, level):
172
    """Declare LU locking needs for a level
173

174
    While most LUs can just declare their locking needs at ExpandNames time,
175
    sometimes there's the need to calculate some locks after having acquired
176
    the ones before. This function is called just before acquiring locks at a
177
    particular level, but after acquiring the ones at lower levels, and permits
178
    such calculations. It can be used to modify self.needed_locks, and by
179
    default it does nothing.
180

181
    This function is only called if you have something already set in
182
    self.needed_locks for the level.
183

184
    @param level: Locking level which is going to be locked
185
    @type level: member of ganeti.locking.LEVELS
186

187
    """
188

    
189
  def CheckPrereq(self):
190
    """Check prerequisites for this LU.
191

192
    This method should check that the prerequisites for the execution
193
    of this LU are fulfilled. It can do internode communication, but
194
    it should be idempotent - no cluster or system changes are
195
    allowed.
196

197
    The method should raise errors.OpPrereqError in case something is
198
    not fulfilled. Its return value is ignored.
199

200
    This method should also update all the parameters of the opcode to
201
    their canonical form if it hasn't been done by ExpandNames before.
202

203
    """
204
    raise NotImplementedError
205

    
206
  def Exec(self, feedback_fn):
207
    """Execute the LU.
208

209
    This method should implement the actual work. It should raise
210
    errors.OpExecError for failures that are somewhat dealt with in
211
    code, or expected.
212

213
    """
214
    raise NotImplementedError
215

    
216
  def BuildHooksEnv(self):
217
    """Build hooks environment for this LU.
218

219
    This method should return a three-node tuple consisting of: a dict
220
    containing the environment that will be used for running the
221
    specific hook for this LU, a list of node names on which the hook
222
    should run before the execution, and a list of node names on which
223
    the hook should run after the execution.
224

225
    The keys of the dict must not have 'GANETI_' prefixed as this will
226
    be handled in the hooks runner. Also note additional keys will be
227
    added by the hooks runner. If the LU doesn't define any
228
    environment, an empty dict (and not None) should be returned.
229

230
    No nodes should be returned as an empty list (and not None).
231

232
    Note that if the HPATH for a LU class is None, this function will
233
    not be called.
234

235
    """
236
    raise NotImplementedError
237

    
238
  def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
239
    """Notify the LU about the results of its hooks.
240

241
    This method is called every time a hooks phase is executed, and notifies
242
    the Logical Unit about the hooks' result. The LU can then use it to alter
243
    its result based on the hooks.  By default the method does nothing and the
244
    previous result is passed back unchanged but any LU can define it if it
245
    wants to use the local cluster hook-scripts somehow.
246

247
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
248
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
249
    @param hook_results: the results of the multi-node hooks rpc call
250
    @param feedback_fn: function used send feedback back to the caller
251
    @param lu_result: the previous Exec result this LU had, or None
252
        in the PRE phase
253
    @return: the new Exec result, based on the previous result
254
        and hook results
255

256
    """
257
    return lu_result
258

    
259
  def _ExpandAndLockInstance(self):
260
    """Helper function to expand and lock an instance.
261

262
    Many LUs that work on an instance take its name in self.op.instance_name
263
    and need to expand it and then declare the expanded name for locking. This
264
    function does it, and then updates self.op.instance_name to the expanded
265
    name. It also initializes needed_locks as a dict, if this hasn't been done
266
    before.
267

268
    """
269
    if self.needed_locks is None:
270
      self.needed_locks = {}
271
    else:
272
      assert locking.LEVEL_INSTANCE not in self.needed_locks, \
273
        "_ExpandAndLockInstance called with instance-level locks set"
274
    expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
275
    if expanded_name is None:
276
      raise errors.OpPrereqError("Instance '%s' not known" %
277
                                  self.op.instance_name)
278
    self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
279
    self.op.instance_name = expanded_name
280

    
281
  def _LockInstancesNodes(self, primary_only=False):
282
    """Helper function to declare instances' nodes for locking.
283

284
    This function should be called after locking one or more instances to lock
285
    their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
286
    with all primary or secondary nodes for instances already locked and
287
    present in self.needed_locks[locking.LEVEL_INSTANCE].
288

289
    It should be called from DeclareLocks, and for safety only works if
290
    self.recalculate_locks[locking.LEVEL_NODE] is set.
291

292
    In the future it may grow parameters to just lock some instance's nodes, or
293
    to just lock primaries or secondary nodes, if needed.
294

295
    If should be called in DeclareLocks in a way similar to::
296

297
      if level == locking.LEVEL_NODE:
298
        self._LockInstancesNodes()
299

300
    @type primary_only: boolean
301
    @param primary_only: only lock primary nodes of locked instances
302

303
    """
304
    assert locking.LEVEL_NODE in self.recalculate_locks, \
305
      "_LockInstancesNodes helper function called with no nodes to recalculate"
306

    
307
    # TODO: check if we're really been called with the instance locks held
308

    
309
    # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
310
    # future we might want to have different behaviors depending on the value
311
    # of self.recalculate_locks[locking.LEVEL_NODE]
312
    wanted_nodes = []
313
    for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
314
      instance = self.context.cfg.GetInstanceInfo(instance_name)
315
      wanted_nodes.append(instance.primary_node)
316
      if not primary_only:
317
        wanted_nodes.extend(instance.secondary_nodes)
318

    
319
    if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
320
      self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
321
    elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
322
      self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
323

    
324
    del self.recalculate_locks[locking.LEVEL_NODE]
325

    
326

    
327
class NoHooksLU(LogicalUnit):
328
  """Simple LU which runs no hooks.
329

330
  This LU is intended as a parent for other LogicalUnits which will
331
  run no hooks, in order to reduce duplicate code.
332

333
  """
334
  HPATH = None
335
  HTYPE = None
336

    
337

    
338
def _GetWantedNodes(lu, nodes):
339
  """Returns list of checked and expanded node names.
340

341
  @type lu: L{LogicalUnit}
342
  @param lu: the logical unit on whose behalf we execute
343
  @type nodes: list
344
  @param nodes: list of node names or None for all nodes
345
  @rtype: list
346
  @return: the list of nodes, sorted
347
  @raise errors.OpProgrammerError: if the nodes parameter is wrong type
348

349
  """
350
  if not isinstance(nodes, list):
351
    raise errors.OpPrereqError("Invalid argument type 'nodes'")
352

    
353
  if not nodes:
354
    raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
355
      " non-empty list of nodes whose name is to be expanded.")
356

    
357
  wanted = []
358
  for name in nodes:
359
    node = lu.cfg.ExpandNodeName(name)
360
    if node is None:
361
      raise errors.OpPrereqError("No such node name '%s'" % name)
362
    wanted.append(node)
363

    
364
  return utils.NiceSort(wanted)
365

    
366

    
367
def _GetWantedInstances(lu, instances):
368
  """Returns list of checked and expanded instance names.
369

370
  @type lu: L{LogicalUnit}
371
  @param lu: the logical unit on whose behalf we execute
372
  @type instances: list
373
  @param instances: list of instance names or None for all instances
374
  @rtype: list
375
  @return: the list of instances, sorted
376
  @raise errors.OpPrereqError: if the instances parameter is wrong type
377
  @raise errors.OpPrereqError: if any of the passed instances is not found
378

379
  """
380
  if not isinstance(instances, list):
381
    raise errors.OpPrereqError("Invalid argument type 'instances'")
382

    
383
  if instances:
384
    wanted = []
385

    
386
    for name in instances:
387
      instance = lu.cfg.ExpandInstanceName(name)
388
      if instance is None:
389
        raise errors.OpPrereqError("No such instance name '%s'" % name)
390
      wanted.append(instance)
391

    
392
  else:
393
    wanted = lu.cfg.GetInstanceList()
394
  return utils.NiceSort(wanted)
395

    
396

    
397
def _CheckOutputFields(static, dynamic, selected):
398
  """Checks whether all selected fields are valid.
399

400
  @type static: L{utils.FieldSet}
401
  @param static: static fields set
402
  @type dynamic: L{utils.FieldSet}
403
  @param dynamic: dynamic fields set
404

405
  """
406
  f = utils.FieldSet()
407
  f.Extend(static)
408
  f.Extend(dynamic)
409

    
410
  delta = f.NonMatching(selected)
411
  if delta:
412
    raise errors.OpPrereqError("Unknown output fields selected: %s"
413
                               % ",".join(delta))
414

    
415

    
416
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
417
                          memory, vcpus, nics):
418
  """Builds instance related env variables for hooks
419

420
  This builds the hook environment from individual variables.
421

422
  @type name: string
423
  @param name: the name of the instance
424
  @type primary_node: string
425
  @param primary_node: the name of the instance's primary node
426
  @type secondary_nodes: list
427
  @param secondary_nodes: list of secondary nodes as strings
428
  @type os_type: string
429
  @param os_type: the name of the instance's OS
430
  @type status: string
431
  @param status: the desired status of the instances
432
  @type memory: string
433
  @param memory: the memory size of the instance
434
  @type vcpus: string
435
  @param vcpus: the count of VCPUs the instance has
436
  @type nics: list
437
  @param nics: list of tuples (ip, bridge, mac) representing
438
      the NICs the instance  has
439
  @rtype: dict
440
  @return: the hook environment for this instance
441

442
  """
443
  env = {
444
    "OP_TARGET": name,
445
    "INSTANCE_NAME": name,
446
    "INSTANCE_PRIMARY": primary_node,
447
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
448
    "INSTANCE_OS_TYPE": os_type,
449
    "INSTANCE_STATUS": status,
450
    "INSTANCE_MEMORY": memory,
451
    "INSTANCE_VCPUS": vcpus,
452
  }
453

    
454
  if nics:
455
    nic_count = len(nics)
456
    for idx, (ip, bridge, mac) in enumerate(nics):
457
      if ip is None:
458
        ip = ""
459
      env["INSTANCE_NIC%d_IP" % idx] = ip
460
      env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
461
      env["INSTANCE_NIC%d_HWADDR" % idx] = mac
462
  else:
463
    nic_count = 0
464

    
465
  env["INSTANCE_NIC_COUNT"] = nic_count
466

    
467
  return env
468

    
469

    
470
def _BuildInstanceHookEnvByObject(lu, instance, override=None):
471
  """Builds instance related env variables for hooks from an object.
472

473
  @type lu: L{LogicalUnit}
474
  @param lu: the logical unit on whose behalf we execute
475
  @type instance: L{objects.Instance}
476
  @param instance: the instance for which we should build the
477
      environment
478
  @type override: dict
479
  @param override: dictionary with key/values that will override
480
      our values
481
  @rtype: dict
482
  @return: the hook environment dictionary
483

484
  """
485
  bep = lu.cfg.GetClusterInfo().FillBE(instance)
486
  args = {
487
    'name': instance.name,
488
    'primary_node': instance.primary_node,
489
    'secondary_nodes': instance.secondary_nodes,
490
    'os_type': instance.os,
491
    'status': instance.os,
492
    'memory': bep[constants.BE_MEMORY],
493
    'vcpus': bep[constants.BE_VCPUS],
494
    'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
495
  }
496
  if override:
497
    args.update(override)
498
  return _BuildInstanceHookEnv(**args)
499

    
500

    
501
def _CheckInstanceBridgesExist(lu, instance):
502
  """Check that the brigdes needed by an instance exist.
503

504
  """
505
  # check bridges existance
506
  brlist = [nic.bridge for nic in instance.nics]
507
  if not lu.rpc.call_bridges_exist(instance.primary_node, brlist):
508
    raise errors.OpPrereqError("one or more target bridges %s does not"
509
                               " exist on destination node '%s'" %
510
                               (brlist, instance.primary_node))
511

    
512

    
513
class LUDestroyCluster(NoHooksLU):
514
  """Logical unit for destroying the cluster.
515

516
  """
517
  _OP_REQP = []
518

    
519
  def CheckPrereq(self):
520
    """Check prerequisites.
521

522
    This checks whether the cluster is empty.
523

524
    Any errors are signalled by raising errors.OpPrereqError.
525

526
    """
527
    master = self.cfg.GetMasterNode()
528

    
529
    nodelist = self.cfg.GetNodeList()
530
    if len(nodelist) != 1 or nodelist[0] != master:
531
      raise errors.OpPrereqError("There are still %d node(s) in"
532
                                 " this cluster." % (len(nodelist) - 1))
533
    instancelist = self.cfg.GetInstanceList()
534
    if instancelist:
535
      raise errors.OpPrereqError("There are still %d instance(s) in"
536
                                 " this cluster." % len(instancelist))
537

    
538
  def Exec(self, feedback_fn):
539
    """Destroys the cluster.
540

541
    """
542
    master = self.cfg.GetMasterNode()
543
    if not self.rpc.call_node_stop_master(master, False):
544
      raise errors.OpExecError("Could not disable the master role")
545
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
546
    utils.CreateBackup(priv_key)
547
    utils.CreateBackup(pub_key)
548
    return master
549

    
550

    
551
class LUVerifyCluster(LogicalUnit):
552
  """Verifies the cluster status.
553

554
  """
555
  HPATH = "cluster-verify"
556
  HTYPE = constants.HTYPE_CLUSTER
557
  _OP_REQP = ["skip_checks"]
558
  REQ_BGL = False
559

    
560
  def ExpandNames(self):
561
    self.needed_locks = {
562
      locking.LEVEL_NODE: locking.ALL_SET,
563
      locking.LEVEL_INSTANCE: locking.ALL_SET,
564
    }
565
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
566

    
567
  def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
568
                  remote_version, feedback_fn):
569
    """Run multiple tests against a node.
570

571
    Test list::
572

573
      - compares ganeti version
574
      - checks vg existance and size > 20G
575
      - checks config file checksum
576
      - checks ssh to other nodes
577

578
    @type node: string
579
    @param node: the name of the node to check
580
    @param file_list: required list of files
581
    @param local_cksum: dictionary of local files and their checksums
582
    @type vglist: dict
583
    @param vglist: dictionary of volume group names and their size
584
    @param node_result: the results from the node
585
    @param remote_version: the RPC version from the remote node
586
    @param feedback_fn: function used to accumulate results
587

588
    """
589
    # compares ganeti version
590
    local_version = constants.PROTOCOL_VERSION
591
    if not remote_version:
592
      feedback_fn("  - ERROR: connection to %s failed" % (node))
593
      return True
594

    
595
    if local_version != remote_version:
596
      feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
597
                      (local_version, node, remote_version))
598
      return True
599

    
600
    # checks vg existance and size > 20G
601

    
602
    bad = False
603
    if not vglist:
604
      feedback_fn("  - ERROR: unable to check volume groups on node %s." %
605
                      (node,))
606
      bad = True
607
    else:
608
      vgstatus = utils.CheckVolumeGroupSize(vglist, self.cfg.GetVGName(),
609
                                            constants.MIN_VG_SIZE)
610
      if vgstatus:
611
        feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
612
        bad = True
613

    
614
    if not node_result:
615
      feedback_fn("  - ERROR: unable to verify node %s." % (node,))
616
      return True
617

    
618
    # checks config file checksum
619
    # checks ssh to any
620

    
621
    if 'filelist' not in node_result:
622
      bad = True
623
      feedback_fn("  - ERROR: node hasn't returned file checksum data")
624
    else:
625
      remote_cksum = node_result['filelist']
626
      for file_name in file_list:
627
        if file_name not in remote_cksum:
628
          bad = True
629
          feedback_fn("  - ERROR: file '%s' missing" % file_name)
630
        elif remote_cksum[file_name] != local_cksum[file_name]:
631
          bad = True
632
          feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
633

    
634
    if 'nodelist' not in node_result:
635
      bad = True
636
      feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
637
    else:
638
      if node_result['nodelist']:
639
        bad = True
640
        for node in node_result['nodelist']:
641
          feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
642
                          (node, node_result['nodelist'][node]))
643
    if 'node-net-test' not in node_result:
644
      bad = True
645
      feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
646
    else:
647
      if node_result['node-net-test']:
648
        bad = True
649
        nlist = utils.NiceSort(node_result['node-net-test'].keys())
650
        for node in nlist:
651
          feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
652
                          (node, node_result['node-net-test'][node]))
653

    
654
    hyp_result = node_result.get('hypervisor', None)
655
    if isinstance(hyp_result, dict):
656
      for hv_name, hv_result in hyp_result.iteritems():
657
        if hv_result is not None:
658
          feedback_fn("  - ERROR: hypervisor %s verify failure: '%s'" %
659
                      (hv_name, hv_result))
660
    return bad
661

    
662
  def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
663
                      node_instance, feedback_fn):
664
    """Verify an instance.
665

666
    This function checks to see if the required block devices are
667
    available on the instance's node.
668

669
    """
670
    bad = False
671

    
672
    node_current = instanceconfig.primary_node
673

    
674
    node_vol_should = {}
675
    instanceconfig.MapLVsByNode(node_vol_should)
676

    
677
    for node in node_vol_should:
678
      for volume in node_vol_should[node]:
679
        if node not in node_vol_is or volume not in node_vol_is[node]:
680
          feedback_fn("  - ERROR: volume %s missing on node %s" %
681
                          (volume, node))
682
          bad = True
683

    
684
    if not instanceconfig.status == 'down':
685
      if (node_current not in node_instance or
686
          not instance in node_instance[node_current]):
687
        feedback_fn("  - ERROR: instance %s not running on node %s" %
688
                        (instance, node_current))
689
        bad = True
690

    
691
    for node in node_instance:
692
      if (not node == node_current):
693
        if instance in node_instance[node]:
694
          feedback_fn("  - ERROR: instance %s should not run on node %s" %
695
                          (instance, node))
696
          bad = True
697

    
698
    return bad
699

    
700
  def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
701
    """Verify if there are any unknown volumes in the cluster.
702

703
    The .os, .swap and backup volumes are ignored. All other volumes are
704
    reported as unknown.
705

706
    """
707
    bad = False
708

    
709
    for node in node_vol_is:
710
      for volume in node_vol_is[node]:
711
        if node not in node_vol_should or volume not in node_vol_should[node]:
712
          feedback_fn("  - ERROR: volume %s on node %s should not exist" %
713
                      (volume, node))
714
          bad = True
715
    return bad
716

    
717
  def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
718
    """Verify the list of running instances.
719

720
    This checks what instances are running but unknown to the cluster.
721

722
    """
723
    bad = False
724
    for node in node_instance:
725
      for runninginstance in node_instance[node]:
726
        if runninginstance not in instancelist:
727
          feedback_fn("  - ERROR: instance %s on node %s should not exist" %
728
                          (runninginstance, node))
729
          bad = True
730
    return bad
731

    
732
  def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
733
    """Verify N+1 Memory Resilience.
734

735
    Check that if one single node dies we can still start all the instances it
736
    was primary for.
737

738
    """
739
    bad = False
740

    
741
    for node, nodeinfo in node_info.iteritems():
742
      # This code checks that every node which is now listed as secondary has
743
      # enough memory to host all instances it is supposed to should a single
744
      # other node in the cluster fail.
745
      # FIXME: not ready for failover to an arbitrary node
746
      # FIXME: does not support file-backed instances
747
      # WARNING: we currently take into account down instances as well as up
748
      # ones, considering that even if they're down someone might want to start
749
      # them even in the event of a node failure.
750
      for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
751
        needed_mem = 0
752
        for instance in instances:
753
          bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
754
          if bep[constants.BE_AUTO_BALANCE]:
755
            needed_mem += bep[constants.BE_MEMORY]
756
        if nodeinfo['mfree'] < needed_mem:
757
          feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
758
                      " failovers should node %s fail" % (node, prinode))
759
          bad = True
760
    return bad
761

    
762
  def CheckPrereq(self):
763
    """Check prerequisites.
764

765
    Transform the list of checks we're going to skip into a set and check that
766
    all its members are valid.
767

768
    """
769
    self.skip_set = frozenset(self.op.skip_checks)
770
    if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
771
      raise errors.OpPrereqError("Invalid checks to be skipped specified")
772

    
773
  def BuildHooksEnv(self):
774
    """Build hooks env.
775

776
    Cluster-Verify hooks just rone in the post phase and their failure makes
777
    the output be logged in the verify output and the verification to fail.
778

779
    """
780
    all_nodes = self.cfg.GetNodeList()
781
    # TODO: populate the environment with useful information for verify hooks
782
    env = {}
783
    return env, [], all_nodes
784

    
785
  def Exec(self, feedback_fn):
786
    """Verify integrity of cluster, performing various test on nodes.
787

788
    """
789
    bad = False
790
    feedback_fn("* Verifying global settings")
791
    for msg in self.cfg.VerifyConfig():
792
      feedback_fn("  - ERROR: %s" % msg)
793

    
794
    vg_name = self.cfg.GetVGName()
795
    hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
796
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
797
    nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
798
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
799
    i_non_redundant = [] # Non redundant instances
800
    i_non_a_balanced = [] # Non auto-balanced instances
801
    node_volume = {}
802
    node_instance = {}
803
    node_info = {}
804
    instance_cfg = {}
805

    
806
    # FIXME: verify OS list
807
    # do local checksums
808
    file_names = []
809
    file_names.append(constants.SSL_CERT_FILE)
810
    file_names.append(constants.CLUSTER_CONF_FILE)
811
    local_checksums = utils.FingerprintFiles(file_names)
812

    
813
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
814
    all_volumeinfo = self.rpc.call_volume_list(nodelist, vg_name)
815
    all_instanceinfo = self.rpc.call_instance_list(nodelist, hypervisors)
816
    all_vglist = self.rpc.call_vg_list(nodelist)
817
    node_verify_param = {
818
      'filelist': file_names,
819
      'nodelist': nodelist,
820
      'hypervisor': hypervisors,
821
      'node-net-test': [(node.name, node.primary_ip, node.secondary_ip)
822
                        for node in nodeinfo]
823
      }
824
    all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
825
                                           self.cfg.GetClusterName())
826
    all_rversion = self.rpc.call_version(nodelist)
827
    all_ninfo = self.rpc.call_node_info(nodelist, self.cfg.GetVGName(),
828
                                        self.cfg.GetHypervisorType())
829

    
830
    cluster = self.cfg.GetClusterInfo()
831
    for node in nodelist:
832
      feedback_fn("* Verifying node %s" % node)
833
      result = self._VerifyNode(node, file_names, local_checksums,
834
                                all_vglist[node], all_nvinfo[node],
835
                                all_rversion[node], feedback_fn)
836
      bad = bad or result
837

    
838
      # node_volume
839
      volumeinfo = all_volumeinfo[node]
840

    
841
      if isinstance(volumeinfo, basestring):
842
        feedback_fn("  - ERROR: LVM problem on node %s: %s" %
843
                    (node, volumeinfo[-400:].encode('string_escape')))
844
        bad = True
845
        node_volume[node] = {}
846
      elif not isinstance(volumeinfo, dict):
847
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
848
        bad = True
849
        continue
850
      else:
851
        node_volume[node] = volumeinfo
852

    
853
      # node_instance
854
      nodeinstance = all_instanceinfo[node]
855
      if type(nodeinstance) != list:
856
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
857
        bad = True
858
        continue
859

    
860
      node_instance[node] = nodeinstance
861

    
862
      # node_info
863
      nodeinfo = all_ninfo[node]
864
      if not isinstance(nodeinfo, dict):
865
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
866
        bad = True
867
        continue
868

    
869
      try:
870
        node_info[node] = {
871
          "mfree": int(nodeinfo['memory_free']),
872
          "dfree": int(nodeinfo['vg_free']),
873
          "pinst": [],
874
          "sinst": [],
875
          # dictionary holding all instances this node is secondary for,
876
          # grouped by their primary node. Each key is a cluster node, and each
877
          # value is a list of instances which have the key as primary and the
878
          # current node as secondary.  this is handy to calculate N+1 memory
879
          # availability if you can only failover from a primary to its
880
          # secondary.
881
          "sinst-by-pnode": {},
882
        }
883
      except ValueError:
884
        feedback_fn("  - ERROR: invalid value returned from node %s" % (node,))
885
        bad = True
886
        continue
887

    
888
    node_vol_should = {}
889

    
890
    for instance in instancelist:
891
      feedback_fn("* Verifying instance %s" % instance)
892
      inst_config = self.cfg.GetInstanceInfo(instance)
893
      result =  self._VerifyInstance(instance, inst_config, node_volume,
894
                                     node_instance, feedback_fn)
895
      bad = bad or result
896

    
897
      inst_config.MapLVsByNode(node_vol_should)
898

    
899
      instance_cfg[instance] = inst_config
900

    
901
      pnode = inst_config.primary_node
902
      if pnode in node_info:
903
        node_info[pnode]['pinst'].append(instance)
904
      else:
905
        feedback_fn("  - ERROR: instance %s, connection to primary node"
906
                    " %s failed" % (instance, pnode))
907
        bad = True
908

    
909
      # If the instance is non-redundant we cannot survive losing its primary
910
      # node, so we are not N+1 compliant. On the other hand we have no disk
911
      # templates with more than one secondary so that situation is not well
912
      # supported either.
913
      # FIXME: does not support file-backed instances
914
      if len(inst_config.secondary_nodes) == 0:
915
        i_non_redundant.append(instance)
916
      elif len(inst_config.secondary_nodes) > 1:
917
        feedback_fn("  - WARNING: multiple secondaries for instance %s"
918
                    % instance)
919

    
920
      if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
921
        i_non_a_balanced.append(instance)
922

    
923
      for snode in inst_config.secondary_nodes:
924
        if snode in node_info:
925
          node_info[snode]['sinst'].append(instance)
926
          if pnode not in node_info[snode]['sinst-by-pnode']:
927
            node_info[snode]['sinst-by-pnode'][pnode] = []
928
          node_info[snode]['sinst-by-pnode'][pnode].append(instance)
929
        else:
930
          feedback_fn("  - ERROR: instance %s, connection to secondary node"
931
                      " %s failed" % (instance, snode))
932

    
933
    feedback_fn("* Verifying orphan volumes")
934
    result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
935
                                       feedback_fn)
936
    bad = bad or result
937

    
938
    feedback_fn("* Verifying remaining instances")
939
    result = self._VerifyOrphanInstances(instancelist, node_instance,
940
                                         feedback_fn)
941
    bad = bad or result
942

    
943
    if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
944
      feedback_fn("* Verifying N+1 Memory redundancy")
945
      result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
946
      bad = bad or result
947

    
948
    feedback_fn("* Other Notes")
949
    if i_non_redundant:
950
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
951
                  % len(i_non_redundant))
952

    
953
    if i_non_a_balanced:
954
      feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
955
                  % len(i_non_a_balanced))
956

    
957
    return not bad
958

    
959
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
960
    """Analize the post-hooks' result
961

962
    This method analyses the hook result, handles it, and sends some
963
    nicely-formatted feedback back to the user.
964

965
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
966
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
967
    @param hooks_results: the results of the multi-node hooks rpc call
968
    @param feedback_fn: function used send feedback back to the caller
969
    @param lu_result: previous Exec result
970
    @return: the new Exec result, based on the previous result
971
        and hook results
972

973
    """
974
    # We only really run POST phase hooks, and are only interested in
975
    # their results
976
    if phase == constants.HOOKS_PHASE_POST:
977
      # Used to change hooks' output to proper indentation
978
      indent_re = re.compile('^', re.M)
979
      feedback_fn("* Hooks Results")
980
      if not hooks_results:
981
        feedback_fn("  - ERROR: general communication failure")
982
        lu_result = 1
983
      else:
984
        for node_name in hooks_results:
985
          show_node_header = True
986
          res = hooks_results[node_name]
987
          if res is False or not isinstance(res, list):
988
            feedback_fn("    Communication failure")
989
            lu_result = 1
990
            continue
991
          for script, hkr, output in res:
992
            if hkr == constants.HKR_FAIL:
993
              # The node header is only shown once, if there are
994
              # failing hooks on that node
995
              if show_node_header:
996
                feedback_fn("  Node %s:" % node_name)
997
                show_node_header = False
998
              feedback_fn("    ERROR: Script %s failed, output:" % script)
999
              output = indent_re.sub('      ', output)
1000
              feedback_fn("%s" % output)
1001
              lu_result = 1
1002

    
1003
      return lu_result
1004

    
1005

    
1006
class LUVerifyDisks(NoHooksLU):
1007
  """Verifies the cluster disks status.
1008

1009
  """
1010
  _OP_REQP = []
1011
  REQ_BGL = False
1012

    
1013
  def ExpandNames(self):
1014
    self.needed_locks = {
1015
      locking.LEVEL_NODE: locking.ALL_SET,
1016
      locking.LEVEL_INSTANCE: locking.ALL_SET,
1017
    }
1018
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1019

    
1020
  def CheckPrereq(self):
1021
    """Check prerequisites.
1022

1023
    This has no prerequisites.
1024

1025
    """
1026
    pass
1027

    
1028
  def Exec(self, feedback_fn):
1029
    """Verify integrity of cluster disks.
1030

1031
    """
1032
    result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
1033

    
1034
    vg_name = self.cfg.GetVGName()
1035
    nodes = utils.NiceSort(self.cfg.GetNodeList())
1036
    instances = [self.cfg.GetInstanceInfo(name)
1037
                 for name in self.cfg.GetInstanceList()]
1038

    
1039
    nv_dict = {}
1040
    for inst in instances:
1041
      inst_lvs = {}
1042
      if (inst.status != "up" or
1043
          inst.disk_template not in constants.DTS_NET_MIRROR):
1044
        continue
1045
      inst.MapLVsByNode(inst_lvs)
1046
      # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1047
      for node, vol_list in inst_lvs.iteritems():
1048
        for vol in vol_list:
1049
          nv_dict[(node, vol)] = inst
1050

    
1051
    if not nv_dict:
1052
      return result
1053

    
1054
    node_lvs = self.rpc.call_volume_list(nodes, vg_name)
1055

    
1056
    to_act = set()
1057
    for node in nodes:
1058
      # node_volume
1059
      lvs = node_lvs[node]
1060

    
1061
      if isinstance(lvs, basestring):
1062
        logging.warning("Error enumerating LVs on node %s: %s", node, lvs)
1063
        res_nlvm[node] = lvs
1064
      elif not isinstance(lvs, dict):
1065
        logging.warning("Connection to node %s failed or invalid data"
1066
                        " returned", node)
1067
        res_nodes.append(node)
1068
        continue
1069

    
1070
      for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
1071
        inst = nv_dict.pop((node, lv_name), None)
1072
        if (not lv_online and inst is not None
1073
            and inst.name not in res_instances):
1074
          res_instances.append(inst.name)
1075

    
1076
    # any leftover items in nv_dict are missing LVs, let's arrange the
1077
    # data better
1078
    for key, inst in nv_dict.iteritems():
1079
      if inst.name not in res_missing:
1080
        res_missing[inst.name] = []
1081
      res_missing[inst.name].append(key)
1082

    
1083
    return result
1084

    
1085

    
1086
class LURenameCluster(LogicalUnit):
1087
  """Rename the cluster.
1088

1089
  """
1090
  HPATH = "cluster-rename"
1091
  HTYPE = constants.HTYPE_CLUSTER
1092
  _OP_REQP = ["name"]
1093

    
1094
  def BuildHooksEnv(self):
1095
    """Build hooks env.
1096

1097
    """
1098
    env = {
1099
      "OP_TARGET": self.cfg.GetClusterName(),
1100
      "NEW_NAME": self.op.name,
1101
      }
1102
    mn = self.cfg.GetMasterNode()
1103
    return env, [mn], [mn]
1104

    
1105
  def CheckPrereq(self):
1106
    """Verify that the passed name is a valid one.
1107

1108
    """
1109
    hostname = utils.HostInfo(self.op.name)
1110

    
1111
    new_name = hostname.name
1112
    self.ip = new_ip = hostname.ip
1113
    old_name = self.cfg.GetClusterName()
1114
    old_ip = self.cfg.GetMasterIP()
1115
    if new_name == old_name and new_ip == old_ip:
1116
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
1117
                                 " cluster has changed")
1118
    if new_ip != old_ip:
1119
      if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1120
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
1121
                                   " reachable on the network. Aborting." %
1122
                                   new_ip)
1123

    
1124
    self.op.name = new_name
1125

    
1126
  def Exec(self, feedback_fn):
1127
    """Rename the cluster.
1128

1129
    """
1130
    clustername = self.op.name
1131
    ip = self.ip
1132

    
1133
    # shutdown the master IP
1134
    master = self.cfg.GetMasterNode()
1135
    if not self.rpc.call_node_stop_master(master, False):
1136
      raise errors.OpExecError("Could not disable the master role")
1137

    
1138
    try:
1139
      # modify the sstore
1140
      # TODO: sstore
1141
      ss.SetKey(ss.SS_MASTER_IP, ip)
1142
      ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
1143

    
1144
      # Distribute updated ss config to all nodes
1145
      myself = self.cfg.GetNodeInfo(master)
1146
      dist_nodes = self.cfg.GetNodeList()
1147
      if myself.name in dist_nodes:
1148
        dist_nodes.remove(myself.name)
1149

    
1150
      logging.debug("Copying updated ssconf data to all nodes")
1151
      for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1152
        fname = ss.KeyToFilename(keyname)
1153
        result = self.rpc.call_upload_file(dist_nodes, fname)
1154
        for to_node in dist_nodes:
1155
          if not result[to_node]:
1156
            self.LogWarning("Copy of file %s to node %s failed",
1157
                            fname, to_node)
1158
    finally:
1159
      if not self.rpc.call_node_start_master(master, False):
1160
        self.LogWarning("Could not re-enable the master role on"
1161
                        " the master, please restart manually.")
1162

    
1163

    
1164
def _RecursiveCheckIfLVMBased(disk):
1165
  """Check if the given disk or its children are lvm-based.
1166

1167
  @type disk: L{objects.Disk}
1168
  @param disk: the disk to check
1169
  @rtype: booleean
1170
  @return: boolean indicating whether a LD_LV dev_type was found or not
1171

1172
  """
1173
  if disk.children:
1174
    for chdisk in disk.children:
1175
      if _RecursiveCheckIfLVMBased(chdisk):
1176
        return True
1177
  return disk.dev_type == constants.LD_LV
1178

    
1179

    
1180
class LUSetClusterParams(LogicalUnit):
1181
  """Change the parameters of the cluster.
1182

1183
  """
1184
  HPATH = "cluster-modify"
1185
  HTYPE = constants.HTYPE_CLUSTER
1186
  _OP_REQP = []
1187
  REQ_BGL = False
1188

    
1189
  def ExpandNames(self):
1190
    # FIXME: in the future maybe other cluster params won't require checking on
1191
    # all nodes to be modified.
1192
    self.needed_locks = {
1193
      locking.LEVEL_NODE: locking.ALL_SET,
1194
    }
1195
    self.share_locks[locking.LEVEL_NODE] = 1
1196

    
1197
  def BuildHooksEnv(self):
1198
    """Build hooks env.
1199

1200
    """
1201
    env = {
1202
      "OP_TARGET": self.cfg.GetClusterName(),
1203
      "NEW_VG_NAME": self.op.vg_name,
1204
      }
1205
    mn = self.cfg.GetMasterNode()
1206
    return env, [mn], [mn]
1207

    
1208
  def CheckPrereq(self):
1209
    """Check prerequisites.
1210

1211
    This checks whether the given params don't conflict and
1212
    if the given volume group is valid.
1213

1214
    """
1215
    # FIXME: This only works because there is only one parameter that can be
1216
    # changed or removed.
1217
    if self.op.vg_name is not None and not self.op.vg_name:
1218
      instances = self.cfg.GetAllInstancesInfo().values()
1219
      for inst in instances:
1220
        for disk in inst.disks:
1221
          if _RecursiveCheckIfLVMBased(disk):
1222
            raise errors.OpPrereqError("Cannot disable lvm storage while"
1223
                                       " lvm-based instances exist")
1224

    
1225
    node_list = self.acquired_locks[locking.LEVEL_NODE]
1226

    
1227
    # if vg_name not None, checks given volume group on all nodes
1228
    if self.op.vg_name:
1229
      vglist = self.rpc.call_vg_list(node_list)
1230
      for node in node_list:
1231
        vgstatus = utils.CheckVolumeGroupSize(vglist[node], self.op.vg_name,
1232
                                              constants.MIN_VG_SIZE)
1233
        if vgstatus:
1234
          raise errors.OpPrereqError("Error on node '%s': %s" %
1235
                                     (node, vgstatus))
1236

    
1237
    self.cluster = cluster = self.cfg.GetClusterInfo()
1238
    # beparams changes do not need validation (we can't validate?),
1239
    # but we still process here
1240
    if self.op.beparams:
1241
      self.new_beparams = cluster.FillDict(
1242
        cluster.beparams[constants.BEGR_DEFAULT], self.op.beparams)
1243

    
1244
    # hypervisor list/parameters
1245
    self.new_hvparams = cluster.FillDict(cluster.hvparams, {})
1246
    if self.op.hvparams:
1247
      if not isinstance(self.op.hvparams, dict):
1248
        raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1249
      for hv_name, hv_dict in self.op.hvparams.items():
1250
        if hv_name not in self.new_hvparams:
1251
          self.new_hvparams[hv_name] = hv_dict
1252
        else:
1253
          self.new_hvparams[hv_name].update(hv_dict)
1254

    
1255
    if self.op.enabled_hypervisors is not None:
1256
      self.hv_list = self.op.enabled_hypervisors
1257
    else:
1258
      self.hv_list = cluster.enabled_hypervisors
1259

    
1260
    if self.op.hvparams or self.op.enabled_hypervisors is not None:
1261
      # either the enabled list has changed, or the parameters have, validate
1262
      for hv_name, hv_params in self.new_hvparams.items():
1263
        if ((self.op.hvparams and hv_name in self.op.hvparams) or
1264
            (self.op.enabled_hypervisors and
1265
             hv_name in self.op.enabled_hypervisors)):
1266
          # either this is a new hypervisor, or its parameters have changed
1267
          hv_class = hypervisor.GetHypervisor(hv_name)
1268
          hv_class.CheckParameterSyntax(hv_params)
1269
          _CheckHVParams(self, node_list, hv_name, hv_params)
1270

    
1271
  def Exec(self, feedback_fn):
1272
    """Change the parameters of the cluster.
1273

1274
    """
1275
    if self.op.vg_name is not None:
1276
      if self.op.vg_name != self.cfg.GetVGName():
1277
        self.cfg.SetVGName(self.op.vg_name)
1278
      else:
1279
        feedback_fn("Cluster LVM configuration already in desired"
1280
                    " state, not changing")
1281
    if self.op.hvparams:
1282
      self.cluster.hvparams = self.new_hvparams
1283
    if self.op.enabled_hypervisors is not None:
1284
      self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1285
    if self.op.beparams:
1286
      self.cluster.beparams[constants.BEGR_DEFAULT] = self.new_beparams
1287
    self.cfg.Update(self.cluster)
1288

    
1289

    
1290
def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1291
  """Sleep and poll for an instance's disk to sync.
1292

1293
  """
1294
  if not instance.disks:
1295
    return True
1296

    
1297
  if not oneshot:
1298
    lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1299

    
1300
  node = instance.primary_node
1301

    
1302
  for dev in instance.disks:
1303
    lu.cfg.SetDiskID(dev, node)
1304

    
1305
  retries = 0
1306
  while True:
1307
    max_time = 0
1308
    done = True
1309
    cumul_degraded = False
1310
    rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1311
    if not rstats:
1312
      lu.LogWarning("Can't get any data from node %s", node)
1313
      retries += 1
1314
      if retries >= 10:
1315
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1316
                                 " aborting." % node)
1317
      time.sleep(6)
1318
      continue
1319
    retries = 0
1320
    for i in range(len(rstats)):
1321
      mstat = rstats[i]
1322
      if mstat is None:
1323
        lu.LogWarning("Can't compute data for node %s/%s",
1324
                           node, instance.disks[i].iv_name)
1325
        continue
1326
      # we ignore the ldisk parameter
1327
      perc_done, est_time, is_degraded, _ = mstat
1328
      cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1329
      if perc_done is not None:
1330
        done = False
1331
        if est_time is not None:
1332
          rem_time = "%d estimated seconds remaining" % est_time
1333
          max_time = est_time
1334
        else:
1335
          rem_time = "no time estimate"
1336
        lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1337
                        (instance.disks[i].iv_name, perc_done, rem_time))
1338
    if done or oneshot:
1339
      break
1340

    
1341
    time.sleep(min(60, max_time))
1342

    
1343
  if done:
1344
    lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1345
  return not cumul_degraded
1346

    
1347

    
1348
def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1349
  """Check that mirrors are not degraded.
1350

1351
  The ldisk parameter, if True, will change the test from the
1352
  is_degraded attribute (which represents overall non-ok status for
1353
  the device(s)) to the ldisk (representing the local storage status).
1354

1355
  """
1356
  lu.cfg.SetDiskID(dev, node)
1357
  if ldisk:
1358
    idx = 6
1359
  else:
1360
    idx = 5
1361

    
1362
  result = True
1363
  if on_primary or dev.AssembleOnSecondary():
1364
    rstats = lu.rpc.call_blockdev_find(node, dev)
1365
    if not rstats:
1366
      logging.warning("Node %s: disk degraded, not found or node down", node)
1367
      result = False
1368
    else:
1369
      result = result and (not rstats[idx])
1370
  if dev.children:
1371
    for child in dev.children:
1372
      result = result and _CheckDiskConsistency(lu, child, node, on_primary)
1373

    
1374
  return result
1375

    
1376

    
1377
class LUDiagnoseOS(NoHooksLU):
1378
  """Logical unit for OS diagnose/query.
1379

1380
  """
1381
  _OP_REQP = ["output_fields", "names"]
1382
  REQ_BGL = False
1383
  _FIELDS_STATIC = utils.FieldSet()
1384
  _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status")
1385

    
1386
  def ExpandNames(self):
1387
    if self.op.names:
1388
      raise errors.OpPrereqError("Selective OS query not supported")
1389

    
1390
    _CheckOutputFields(static=self._FIELDS_STATIC,
1391
                       dynamic=self._FIELDS_DYNAMIC,
1392
                       selected=self.op.output_fields)
1393

    
1394
    # Lock all nodes, in shared mode
1395
    self.needed_locks = {}
1396
    self.share_locks[locking.LEVEL_NODE] = 1
1397
    self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1398

    
1399
  def CheckPrereq(self):
1400
    """Check prerequisites.
1401

1402
    """
1403

    
1404
  @staticmethod
1405
  def _DiagnoseByOS(node_list, rlist):
1406
    """Remaps a per-node return list into an a per-os per-node dictionary
1407

1408
    @param node_list: a list with the names of all nodes
1409
    @param rlist: a map with node names as keys and OS objects as values
1410

1411
    @rtype: dict
1412
    @returns: a dictionary with osnames as keys and as value another map, with
1413
        nodes as keys and list of OS objects as values, eg::
1414

1415
          {"debian-etch": {"node1": [<object>,...],
1416
                           "node2": [<object>,]}
1417
          }
1418

1419
    """
1420
    all_os = {}
1421
    for node_name, nr in rlist.iteritems():
1422
      if not nr:
1423
        continue
1424
      for os_obj in nr:
1425
        if os_obj.name not in all_os:
1426
          # build a list of nodes for this os containing empty lists
1427
          # for each node in node_list
1428
          all_os[os_obj.name] = {}
1429
          for nname in node_list:
1430
            all_os[os_obj.name][nname] = []
1431
        all_os[os_obj.name][node_name].append(os_obj)
1432
    return all_os
1433

    
1434
  def Exec(self, feedback_fn):
1435
    """Compute the list of OSes.
1436

1437
    """
1438
    node_list = self.acquired_locks[locking.LEVEL_NODE]
1439
    node_data = self.rpc.call_os_diagnose(node_list)
1440
    if node_data == False:
1441
      raise errors.OpExecError("Can't gather the list of OSes")
1442
    pol = self._DiagnoseByOS(node_list, node_data)
1443
    output = []
1444
    for os_name, os_data in pol.iteritems():
1445
      row = []
1446
      for field in self.op.output_fields:
1447
        if field == "name":
1448
          val = os_name
1449
        elif field == "valid":
1450
          val = utils.all([osl and osl[0] for osl in os_data.values()])
1451
        elif field == "node_status":
1452
          val = {}
1453
          for node_name, nos_list in os_data.iteritems():
1454
            val[node_name] = [(v.status, v.path) for v in nos_list]
1455
        else:
1456
          raise errors.ParameterError(field)
1457
        row.append(val)
1458
      output.append(row)
1459

    
1460
    return output
1461

    
1462

    
1463
class LURemoveNode(LogicalUnit):
1464
  """Logical unit for removing a node.
1465

1466
  """
1467
  HPATH = "node-remove"
1468
  HTYPE = constants.HTYPE_NODE
1469
  _OP_REQP = ["node_name"]
1470

    
1471
  def BuildHooksEnv(self):
1472
    """Build hooks env.
1473

1474
    This doesn't run on the target node in the pre phase as a failed
1475
    node would then be impossible to remove.
1476

1477
    """
1478
    env = {
1479
      "OP_TARGET": self.op.node_name,
1480
      "NODE_NAME": self.op.node_name,
1481
      }
1482
    all_nodes = self.cfg.GetNodeList()
1483
    all_nodes.remove(self.op.node_name)
1484
    return env, all_nodes, all_nodes
1485

    
1486
  def CheckPrereq(self):
1487
    """Check prerequisites.
1488

1489
    This checks:
1490
     - the node exists in the configuration
1491
     - it does not have primary or secondary instances
1492
     - it's not the master
1493

1494
    Any errors are signalled by raising errors.OpPrereqError.
1495

1496
    """
1497
    node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1498
    if node is None:
1499
      raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1500

    
1501
    instance_list = self.cfg.GetInstanceList()
1502

    
1503
    masternode = self.cfg.GetMasterNode()
1504
    if node.name == masternode:
1505
      raise errors.OpPrereqError("Node is the master node,"
1506
                                 " you need to failover first.")
1507

    
1508
    for instance_name in instance_list:
1509
      instance = self.cfg.GetInstanceInfo(instance_name)
1510
      if node.name == instance.primary_node:
1511
        raise errors.OpPrereqError("Instance %s still running on the node,"
1512
                                   " please remove first." % instance_name)
1513
      if node.name in instance.secondary_nodes:
1514
        raise errors.OpPrereqError("Instance %s has node as a secondary,"
1515
                                   " please remove first." % instance_name)
1516
    self.op.node_name = node.name
1517
    self.node = node
1518

    
1519
  def Exec(self, feedback_fn):
1520
    """Removes the node from the cluster.
1521

1522
    """
1523
    node = self.node
1524
    logging.info("Stopping the node daemon and removing configs from node %s",
1525
                 node.name)
1526

    
1527
    self.context.RemoveNode(node.name)
1528

    
1529
    self.rpc.call_node_leave_cluster(node.name)
1530

    
1531

    
1532
class LUQueryNodes(NoHooksLU):
1533
  """Logical unit for querying nodes.
1534

1535
  """
1536
  _OP_REQP = ["output_fields", "names"]
1537
  REQ_BGL = False
1538
  _FIELDS_DYNAMIC = utils.FieldSet(
1539
    "dtotal", "dfree",
1540
    "mtotal", "mnode", "mfree",
1541
    "bootid",
1542
    "ctotal",
1543
    )
1544

    
1545
  _FIELDS_STATIC = utils.FieldSet(
1546
    "name", "pinst_cnt", "sinst_cnt",
1547
    "pinst_list", "sinst_list",
1548
    "pip", "sip", "tags",
1549
    "serial_no",
1550
    "master_candidate",
1551
    "master",
1552
    )
1553

    
1554
  def ExpandNames(self):
1555
    _CheckOutputFields(static=self._FIELDS_STATIC,
1556
                       dynamic=self._FIELDS_DYNAMIC,
1557
                       selected=self.op.output_fields)
1558

    
1559
    self.needed_locks = {}
1560
    self.share_locks[locking.LEVEL_NODE] = 1
1561

    
1562
    if self.op.names:
1563
      self.wanted = _GetWantedNodes(self, self.op.names)
1564
    else:
1565
      self.wanted = locking.ALL_SET
1566

    
1567
    self.do_locking = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
1568
    if self.do_locking:
1569
      # if we don't request only static fields, we need to lock the nodes
1570
      self.needed_locks[locking.LEVEL_NODE] = self.wanted
1571

    
1572

    
1573
  def CheckPrereq(self):
1574
    """Check prerequisites.
1575

1576
    """
1577
    # The validation of the node list is done in the _GetWantedNodes,
1578
    # if non empty, and if empty, there's no validation to do
1579
    pass
1580

    
1581
  def Exec(self, feedback_fn):
1582
    """Computes the list of nodes and their attributes.
1583

1584
    """
1585
    all_info = self.cfg.GetAllNodesInfo()
1586
    if self.do_locking:
1587
      nodenames = self.acquired_locks[locking.LEVEL_NODE]
1588
    elif self.wanted != locking.ALL_SET:
1589
      nodenames = self.wanted
1590
      missing = set(nodenames).difference(all_info.keys())
1591
      if missing:
1592
        raise errors.OpExecError(
1593
          "Some nodes were removed before retrieving their data: %s" % missing)
1594
    else:
1595
      nodenames = all_info.keys()
1596

    
1597
    nodenames = utils.NiceSort(nodenames)
1598
    nodelist = [all_info[name] for name in nodenames]
1599

    
1600
    # begin data gathering
1601

    
1602
    if self.do_locking:
1603
      live_data = {}
1604
      node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
1605
                                          self.cfg.GetHypervisorType())
1606
      for name in nodenames:
1607
        nodeinfo = node_data.get(name, None)
1608
        if nodeinfo:
1609
          fn = utils.TryConvert
1610
          live_data[name] = {
1611
            "mtotal": fn(int, nodeinfo.get('memory_total', None)),
1612
            "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
1613
            "mfree": fn(int, nodeinfo.get('memory_free', None)),
1614
            "dtotal": fn(int, nodeinfo.get('vg_size', None)),
1615
            "dfree": fn(int, nodeinfo.get('vg_free', None)),
1616
            "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
1617
            "bootid": nodeinfo.get('bootid', None),
1618
            }
1619
        else:
1620
          live_data[name] = {}
1621
    else:
1622
      live_data = dict.fromkeys(nodenames, {})
1623

    
1624
    node_to_primary = dict([(name, set()) for name in nodenames])
1625
    node_to_secondary = dict([(name, set()) for name in nodenames])
1626

    
1627
    inst_fields = frozenset(("pinst_cnt", "pinst_list",
1628
                             "sinst_cnt", "sinst_list"))
1629
    if inst_fields & frozenset(self.op.output_fields):
1630
      instancelist = self.cfg.GetInstanceList()
1631

    
1632
      for instance_name in instancelist:
1633
        inst = self.cfg.GetInstanceInfo(instance_name)
1634
        if inst.primary_node in node_to_primary:
1635
          node_to_primary[inst.primary_node].add(inst.name)
1636
        for secnode in inst.secondary_nodes:
1637
          if secnode in node_to_secondary:
1638
            node_to_secondary[secnode].add(inst.name)
1639

    
1640
    master_node = self.cfg.GetMasterNode()
1641

    
1642
    # end data gathering
1643

    
1644
    output = []
1645
    for node in nodelist:
1646
      node_output = []
1647
      for field in self.op.output_fields:
1648
        if field == "name":
1649
          val = node.name
1650
        elif field == "pinst_list":
1651
          val = list(node_to_primary[node.name])
1652
        elif field == "sinst_list":
1653
          val = list(node_to_secondary[node.name])
1654
        elif field == "pinst_cnt":
1655
          val = len(node_to_primary[node.name])
1656
        elif field == "sinst_cnt":
1657
          val = len(node_to_secondary[node.name])
1658
        elif field == "pip":
1659
          val = node.primary_ip
1660
        elif field == "sip":
1661
          val = node.secondary_ip
1662
        elif field == "tags":
1663
          val = list(node.GetTags())
1664
        elif field == "serial_no":
1665
          val = node.serial_no
1666
        elif field == "master_candidate":
1667
          val = node.master_candidate
1668
        elif field == "master":
1669
          val = node.name == master_node
1670
        elif self._FIELDS_DYNAMIC.Matches(field):
1671
          val = live_data[node.name].get(field, None)
1672
        else:
1673
          raise errors.ParameterError(field)
1674
        node_output.append(val)
1675
      output.append(node_output)
1676

    
1677
    return output
1678

    
1679

    
1680
class LUQueryNodeVolumes(NoHooksLU):
1681
  """Logical unit for getting volumes on node(s).
1682

1683
  """
1684
  _OP_REQP = ["nodes", "output_fields"]
1685
  REQ_BGL = False
1686
  _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
1687
  _FIELDS_STATIC = utils.FieldSet("node")
1688

    
1689
  def ExpandNames(self):
1690
    _CheckOutputFields(static=self._FIELDS_STATIC,
1691
                       dynamic=self._FIELDS_DYNAMIC,
1692
                       selected=self.op.output_fields)
1693

    
1694
    self.needed_locks = {}
1695
    self.share_locks[locking.LEVEL_NODE] = 1
1696
    if not self.op.nodes:
1697
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1698
    else:
1699
      self.needed_locks[locking.LEVEL_NODE] = \
1700
        _GetWantedNodes(self, self.op.nodes)
1701

    
1702
  def CheckPrereq(self):
1703
    """Check prerequisites.
1704

1705
    This checks that the fields required are valid output fields.
1706

1707
    """
1708
    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
1709

    
1710
  def Exec(self, feedback_fn):
1711
    """Computes the list of nodes and their attributes.
1712

1713
    """
1714
    nodenames = self.nodes
1715
    volumes = self.rpc.call_node_volumes(nodenames)
1716

    
1717
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
1718
             in self.cfg.GetInstanceList()]
1719

    
1720
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1721

    
1722
    output = []
1723
    for node in nodenames:
1724
      if node not in volumes or not volumes[node]:
1725
        continue
1726

    
1727
      node_vols = volumes[node][:]
1728
      node_vols.sort(key=lambda vol: vol['dev'])
1729

    
1730
      for vol in node_vols:
1731
        node_output = []
1732
        for field in self.op.output_fields:
1733
          if field == "node":
1734
            val = node
1735
          elif field == "phys":
1736
            val = vol['dev']
1737
          elif field == "vg":
1738
            val = vol['vg']
1739
          elif field == "name":
1740
            val = vol['name']
1741
          elif field == "size":
1742
            val = int(float(vol['size']))
1743
          elif field == "instance":
1744
            for inst in ilist:
1745
              if node not in lv_by_node[inst]:
1746
                continue
1747
              if vol['name'] in lv_by_node[inst][node]:
1748
                val = inst.name
1749
                break
1750
            else:
1751
              val = '-'
1752
          else:
1753
            raise errors.ParameterError(field)
1754
          node_output.append(str(val))
1755

    
1756
        output.append(node_output)
1757

    
1758
    return output
1759

    
1760

    
1761
class LUAddNode(LogicalUnit):
1762
  """Logical unit for adding node to the cluster.
1763

1764
  """
1765
  HPATH = "node-add"
1766
  HTYPE = constants.HTYPE_NODE
1767
  _OP_REQP = ["node_name"]
1768

    
1769
  def BuildHooksEnv(self):
1770
    """Build hooks env.
1771

1772
    This will run on all nodes before, and on all nodes + the new node after.
1773

1774
    """
1775
    env = {
1776
      "OP_TARGET": self.op.node_name,
1777
      "NODE_NAME": self.op.node_name,
1778
      "NODE_PIP": self.op.primary_ip,
1779
      "NODE_SIP": self.op.secondary_ip,
1780
      }
1781
    nodes_0 = self.cfg.GetNodeList()
1782
    nodes_1 = nodes_0 + [self.op.node_name, ]
1783
    return env, nodes_0, nodes_1
1784

    
1785
  def CheckPrereq(self):
1786
    """Check prerequisites.
1787

1788
    This checks:
1789
     - the new node is not already in the config
1790
     - it is resolvable
1791
     - its parameters (single/dual homed) matches the cluster
1792

1793
    Any errors are signalled by raising errors.OpPrereqError.
1794

1795
    """
1796
    node_name = self.op.node_name
1797
    cfg = self.cfg
1798

    
1799
    dns_data = utils.HostInfo(node_name)
1800

    
1801
    node = dns_data.name
1802
    primary_ip = self.op.primary_ip = dns_data.ip
1803
    secondary_ip = getattr(self.op, "secondary_ip", None)
1804
    if secondary_ip is None:
1805
      secondary_ip = primary_ip
1806
    if not utils.IsValidIP(secondary_ip):
1807
      raise errors.OpPrereqError("Invalid secondary IP given")
1808
    self.op.secondary_ip = secondary_ip
1809

    
1810
    node_list = cfg.GetNodeList()
1811
    if not self.op.readd and node in node_list:
1812
      raise errors.OpPrereqError("Node %s is already in the configuration" %
1813
                                 node)
1814
    elif self.op.readd and node not in node_list:
1815
      raise errors.OpPrereqError("Node %s is not in the configuration" % node)
1816

    
1817
    for existing_node_name in node_list:
1818
      existing_node = cfg.GetNodeInfo(existing_node_name)
1819

    
1820
      if self.op.readd and node == existing_node_name:
1821
        if (existing_node.primary_ip != primary_ip or
1822
            existing_node.secondary_ip != secondary_ip):
1823
          raise errors.OpPrereqError("Readded node doesn't have the same IP"
1824
                                     " address configuration as before")
1825
        continue
1826

    
1827
      if (existing_node.primary_ip == primary_ip or
1828
          existing_node.secondary_ip == primary_ip or
1829
          existing_node.primary_ip == secondary_ip or
1830
          existing_node.secondary_ip == secondary_ip):
1831
        raise errors.OpPrereqError("New node ip address(es) conflict with"
1832
                                   " existing node %s" % existing_node.name)
1833

    
1834
    # check that the type of the node (single versus dual homed) is the
1835
    # same as for the master
1836
    myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
1837
    master_singlehomed = myself.secondary_ip == myself.primary_ip
1838
    newbie_singlehomed = secondary_ip == primary_ip
1839
    if master_singlehomed != newbie_singlehomed:
1840
      if master_singlehomed:
1841
        raise errors.OpPrereqError("The master has no private ip but the"
1842
                                   " new node has one")
1843
      else:
1844
        raise errors.OpPrereqError("The master has a private ip but the"
1845
                                   " new node doesn't have one")
1846

    
1847
    # checks reachablity
1848
    if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1849
      raise errors.OpPrereqError("Node not reachable by ping")
1850

    
1851
    if not newbie_singlehomed:
1852
      # check reachability from my secondary ip to newbie's secondary ip
1853
      if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1854
                           source=myself.secondary_ip):
1855
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1856
                                   " based ping to noded port")
1857

    
1858
    self.new_node = objects.Node(name=node,
1859
                                 primary_ip=primary_ip,
1860
                                 secondary_ip=secondary_ip)
1861

    
1862
  def Exec(self, feedback_fn):
1863
    """Adds the new node to the cluster.
1864

1865
    """
1866
    new_node = self.new_node
1867
    node = new_node.name
1868

    
1869
    # check connectivity
1870
    result = self.rpc.call_version([node])[node]
1871
    if result:
1872
      if constants.PROTOCOL_VERSION == result:
1873
        logging.info("Communication to node %s fine, sw version %s match",
1874
                     node, result)
1875
      else:
1876
        raise errors.OpExecError("Version mismatch master version %s,"
1877
                                 " node version %s" %
1878
                                 (constants.PROTOCOL_VERSION, result))
1879
    else:
1880
      raise errors.OpExecError("Cannot get version from the new node")
1881

    
1882
    # setup ssh on node
1883
    logging.info("Copy ssh key to node %s", node)
1884
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1885
    keyarray = []
1886
    keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1887
                constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1888
                priv_key, pub_key]
1889

    
1890
    for i in keyfiles:
1891
      f = open(i, 'r')
1892
      try:
1893
        keyarray.append(f.read())
1894
      finally:
1895
        f.close()
1896

    
1897
    result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
1898
                                    keyarray[2],
1899
                                    keyarray[3], keyarray[4], keyarray[5])
1900

    
1901
    if not result:
1902
      raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1903

    
1904
    # Add node to our /etc/hosts, and add key to known_hosts
1905
    utils.AddHostToEtcHosts(new_node.name)
1906

    
1907
    if new_node.secondary_ip != new_node.primary_ip:
1908
      if not self.rpc.call_node_has_ip_address(new_node.name,
1909
                                               new_node.secondary_ip):
1910
        raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1911
                                 " you gave (%s). Please fix and re-run this"
1912
                                 " command." % new_node.secondary_ip)
1913

    
1914
    node_verify_list = [self.cfg.GetMasterNode()]
1915
    node_verify_param = {
1916
      'nodelist': [node],
1917
      # TODO: do a node-net-test as well?
1918
    }
1919

    
1920
    result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
1921
                                       self.cfg.GetClusterName())
1922
    for verifier in node_verify_list:
1923
      if not result[verifier]:
1924
        raise errors.OpExecError("Cannot communicate with %s's node daemon"
1925
                                 " for remote verification" % verifier)
1926
      if result[verifier]['nodelist']:
1927
        for failed in result[verifier]['nodelist']:
1928
          feedback_fn("ssh/hostname verification failed %s -> %s" %
1929
                      (verifier, result[verifier]['nodelist'][failed]))
1930
        raise errors.OpExecError("ssh/hostname verification failed.")
1931

    
1932
    # Distribute updated /etc/hosts and known_hosts to all nodes,
1933
    # including the node just added
1934
    myself = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
1935
    dist_nodes = self.cfg.GetNodeList()
1936
    if not self.op.readd:
1937
      dist_nodes.append(node)
1938
    if myself.name in dist_nodes:
1939
      dist_nodes.remove(myself.name)
1940

    
1941
    logging.debug("Copying hosts and known_hosts to all nodes")
1942
    for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1943
      result = self.rpc.call_upload_file(dist_nodes, fname)
1944
      for to_node in dist_nodes:
1945
        if not result[to_node]:
1946
          logging.error("Copy of file %s to node %s failed", fname, to_node)
1947

    
1948
    to_copy = []
1949
    if constants.HT_XEN_HVM in self.cfg.GetClusterInfo().enabled_hypervisors:
1950
      to_copy.append(constants.VNC_PASSWORD_FILE)
1951
    for fname in to_copy:
1952
      result = self.rpc.call_upload_file([node], fname)
1953
      if not result[node]:
1954
        logging.error("Could not copy file %s to node %s", fname, node)
1955

    
1956
    if self.op.readd:
1957
      self.context.ReaddNode(new_node)
1958
    else:
1959
      self.context.AddNode(new_node)
1960

    
1961

    
1962
class LUQueryClusterInfo(NoHooksLU):
1963
  """Query cluster configuration.
1964

1965
  """
1966
  _OP_REQP = []
1967
  REQ_BGL = False
1968

    
1969
  def ExpandNames(self):
1970
    self.needed_locks = {}
1971

    
1972
  def CheckPrereq(self):
1973
    """No prerequsites needed for this LU.
1974

1975
    """
1976
    pass
1977

    
1978
  def Exec(self, feedback_fn):
1979
    """Return cluster config.
1980

1981
    """
1982
    cluster = self.cfg.GetClusterInfo()
1983
    result = {
1984
      "software_version": constants.RELEASE_VERSION,
1985
      "protocol_version": constants.PROTOCOL_VERSION,
1986
      "config_version": constants.CONFIG_VERSION,
1987
      "os_api_version": constants.OS_API_VERSION,
1988
      "export_version": constants.EXPORT_VERSION,
1989
      "architecture": (platform.architecture()[0], platform.machine()),
1990
      "name": cluster.cluster_name,
1991
      "master": cluster.master_node,
1992
      "default_hypervisor": cluster.default_hypervisor,
1993
      "enabled_hypervisors": cluster.enabled_hypervisors,
1994
      "hvparams": cluster.hvparams,
1995
      "beparams": cluster.beparams,
1996
      }
1997

    
1998
    return result
1999

    
2000

    
2001
class LUQueryConfigValues(NoHooksLU):
2002
  """Return configuration values.
2003

2004
  """
2005
  _OP_REQP = []
2006
  REQ_BGL = False
2007
  _FIELDS_DYNAMIC = utils.FieldSet()
2008
  _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag")
2009

    
2010
  def ExpandNames(self):
2011
    self.needed_locks = {}
2012

    
2013
    _CheckOutputFields(static=self._FIELDS_STATIC,
2014
                       dynamic=self._FIELDS_DYNAMIC,
2015
                       selected=self.op.output_fields)
2016

    
2017
  def CheckPrereq(self):
2018
    """No prerequisites.
2019

2020
    """
2021
    pass
2022

    
2023
  def Exec(self, feedback_fn):
2024
    """Dump a representation of the cluster config to the standard output.
2025

2026
    """
2027
    values = []
2028
    for field in self.op.output_fields:
2029
      if field == "cluster_name":
2030
        entry = self.cfg.GetClusterName()
2031
      elif field == "master_node":
2032
        entry = self.cfg.GetMasterNode()
2033
      elif field == "drain_flag":
2034
        entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
2035
      else:
2036
        raise errors.ParameterError(field)
2037
      values.append(entry)
2038
    return values
2039

    
2040

    
2041
class LUActivateInstanceDisks(NoHooksLU):
2042
  """Bring up an instance's disks.
2043

2044
  """
2045
  _OP_REQP = ["instance_name"]
2046
  REQ_BGL = False
2047

    
2048
  def ExpandNames(self):
2049
    self._ExpandAndLockInstance()
2050
    self.needed_locks[locking.LEVEL_NODE] = []
2051
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2052

    
2053
  def DeclareLocks(self, level):
2054
    if level == locking.LEVEL_NODE:
2055
      self._LockInstancesNodes()
2056

    
2057
  def CheckPrereq(self):
2058
    """Check prerequisites.
2059

2060
    This checks that the instance is in the cluster.
2061

2062
    """
2063
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2064
    assert self.instance is not None, \
2065
      "Cannot retrieve locked instance %s" % self.op.instance_name
2066

    
2067
  def Exec(self, feedback_fn):
2068
    """Activate the disks.
2069

2070
    """
2071
    disks_ok, disks_info = _AssembleInstanceDisks(self, self.instance)
2072
    if not disks_ok:
2073
      raise errors.OpExecError("Cannot activate block devices")
2074

    
2075
    return disks_info
2076

    
2077

    
2078
def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
2079
  """Prepare the block devices for an instance.
2080

2081
  This sets up the block devices on all nodes.
2082

2083
  @type lu: L{LogicalUnit}
2084
  @param lu: the logical unit on whose behalf we execute
2085
  @type instance: L{objects.Instance}
2086
  @param instance: the instance for whose disks we assemble
2087
  @type ignore_secondaries: boolean
2088
  @param ignore_secondaries: if true, errors on secondary nodes
2089
      won't result in an error return from the function
2090
  @return: False if the operation failed, otherwise a list of
2091
      (host, instance_visible_name, node_visible_name)
2092
      with the mapping from node devices to instance devices
2093

2094
  """
2095
  device_info = []
2096
  disks_ok = True
2097
  iname = instance.name
2098
  # With the two passes mechanism we try to reduce the window of
2099
  # opportunity for the race condition of switching DRBD to primary
2100
  # before handshaking occured, but we do not eliminate it
2101

    
2102
  # The proper fix would be to wait (with some limits) until the
2103
  # connection has been made and drbd transitions from WFConnection
2104
  # into any other network-connected state (Connected, SyncTarget,
2105
  # SyncSource, etc.)
2106

    
2107
  # 1st pass, assemble on all nodes in secondary mode
2108
  for inst_disk in instance.disks:
2109
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2110
      lu.cfg.SetDiskID(node_disk, node)
2111
      result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
2112
      if not result:
2113
        lu.proc.LogWarning("Could not prepare block device %s on node %s"
2114
                           " (is_primary=False, pass=1)",
2115
                           inst_disk.iv_name, node)
2116
        if not ignore_secondaries:
2117
          disks_ok = False
2118

    
2119
  # FIXME: race condition on drbd migration to primary
2120

    
2121
  # 2nd pass, do only the primary node
2122
  for inst_disk in instance.disks:
2123
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2124
      if node != instance.primary_node:
2125
        continue
2126
      lu.cfg.SetDiskID(node_disk, node)
2127
      result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
2128
      if not result:
2129
        lu.proc.LogWarning("Could not prepare block device %s on node %s"
2130
                           " (is_primary=True, pass=2)",
2131
                           inst_disk.iv_name, node)
2132
        disks_ok = False
2133
    device_info.append((instance.primary_node, inst_disk.iv_name, result))
2134

    
2135
  # leave the disks configured for the primary node
2136
  # this is a workaround that would be fixed better by
2137
  # improving the logical/physical id handling
2138
  for disk in instance.disks:
2139
    lu.cfg.SetDiskID(disk, instance.primary_node)
2140

    
2141
  return disks_ok, device_info
2142

    
2143

    
2144
def _StartInstanceDisks(lu, instance, force):
2145
  """Start the disks of an instance.
2146

2147
  """
2148
  disks_ok, dummy = _AssembleInstanceDisks(lu, instance,
2149
                                           ignore_secondaries=force)
2150
  if not disks_ok:
2151
    _ShutdownInstanceDisks(lu, instance)
2152
    if force is not None and not force:
2153
      lu.proc.LogWarning("", hint="If the message above refers to a"
2154
                         " secondary node,"
2155
                         " you can retry the operation using '--force'.")
2156
    raise errors.OpExecError("Disk consistency error")
2157

    
2158

    
2159
class LUDeactivateInstanceDisks(NoHooksLU):
2160
  """Shutdown an instance's disks.
2161

2162
  """
2163
  _OP_REQP = ["instance_name"]
2164
  REQ_BGL = False
2165

    
2166
  def ExpandNames(self):
2167
    self._ExpandAndLockInstance()
2168
    self.needed_locks[locking.LEVEL_NODE] = []
2169
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2170

    
2171
  def DeclareLocks(self, level):
2172
    if level == locking.LEVEL_NODE:
2173
      self._LockInstancesNodes()
2174

    
2175
  def CheckPrereq(self):
2176
    """Check prerequisites.
2177

2178
    This checks that the instance is in the cluster.
2179

2180
    """
2181
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2182
    assert self.instance is not None, \
2183
      "Cannot retrieve locked instance %s" % self.op.instance_name
2184

    
2185
  def Exec(self, feedback_fn):
2186
    """Deactivate the disks
2187

2188
    """
2189
    instance = self.instance
2190
    _SafeShutdownInstanceDisks(self, instance)
2191

    
2192

    
2193
def _SafeShutdownInstanceDisks(lu, instance):
2194
  """Shutdown block devices of an instance.
2195

2196
  This function checks if an instance is running, before calling
2197
  _ShutdownInstanceDisks.
2198

2199
  """
2200
  ins_l = lu.rpc.call_instance_list([instance.primary_node],
2201
                                      [instance.hypervisor])
2202
  ins_l = ins_l[instance.primary_node]
2203
  if not type(ins_l) is list:
2204
    raise errors.OpExecError("Can't contact node '%s'" %
2205
                             instance.primary_node)
2206

    
2207
  if instance.name in ins_l:
2208
    raise errors.OpExecError("Instance is running, can't shutdown"
2209
                             " block devices.")
2210

    
2211
  _ShutdownInstanceDisks(lu, instance)
2212

    
2213

    
2214
def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
2215
  """Shutdown block devices of an instance.
2216

2217
  This does the shutdown on all nodes of the instance.
2218

2219
  If the ignore_primary is false, errors on the primary node are
2220
  ignored.
2221

2222
  """
2223
  result = True
2224
  for disk in instance.disks:
2225
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2226
      lu.cfg.SetDiskID(top_disk, node)
2227
      if not lu.rpc.call_blockdev_shutdown(node, top_disk):
2228
        logging.error("Could not shutdown block device %s on node %s",
2229
                      disk.iv_name, node)
2230
        if not ignore_primary or node != instance.primary_node:
2231
          result = False
2232
  return result
2233

    
2234

    
2235
def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor):
2236
  """Checks if a node has enough free memory.
2237

2238
  This function check if a given node has the needed amount of free
2239
  memory. In case the node has less memory or we cannot get the
2240
  information from the node, this function raise an OpPrereqError
2241
  exception.
2242

2243
  @type lu: C{LogicalUnit}
2244
  @param lu: a logical unit from which we get configuration data
2245
  @type node: C{str}
2246
  @param node: the node to check
2247
  @type reason: C{str}
2248
  @param reason: string to use in the error message
2249
  @type requested: C{int}
2250
  @param requested: the amount of memory in MiB to check for
2251
  @type hypervisor: C{str}
2252
  @param hypervisor: the hypervisor to ask for memory stats
2253
  @raise errors.OpPrereqError: if the node doesn't have enough memory, or
2254
      we cannot check the node
2255

2256
  """
2257
  nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor)
2258
  if not nodeinfo or not isinstance(nodeinfo, dict):
2259
    raise errors.OpPrereqError("Could not contact node %s for resource"
2260
                             " information" % (node,))
2261

    
2262
  free_mem = nodeinfo[node].get('memory_free')
2263
  if not isinstance(free_mem, int):
2264
    raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2265
                             " was '%s'" % (node, free_mem))
2266
  if requested > free_mem:
2267
    raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2268
                             " needed %s MiB, available %s MiB" %
2269
                             (node, reason, requested, free_mem))
2270

    
2271

    
2272
class LUStartupInstance(LogicalUnit):
2273
  """Starts an instance.
2274

2275
  """
2276
  HPATH = "instance-start"
2277
  HTYPE = constants.HTYPE_INSTANCE
2278
  _OP_REQP = ["instance_name", "force"]
2279
  REQ_BGL = False
2280

    
2281
  def ExpandNames(self):
2282
    self._ExpandAndLockInstance()
2283

    
2284
  def BuildHooksEnv(self):
2285
    """Build hooks env.
2286

2287
    This runs on master, primary and secondary nodes of the instance.
2288

2289
    """
2290
    env = {
2291
      "FORCE": self.op.force,
2292
      }
2293
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2294
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2295
          list(self.instance.secondary_nodes))
2296
    return env, nl, nl
2297

    
2298
  def CheckPrereq(self):
2299
    """Check prerequisites.
2300

2301
    This checks that the instance is in the cluster.
2302

2303
    """
2304
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2305
    assert self.instance is not None, \
2306
      "Cannot retrieve locked instance %s" % self.op.instance_name
2307

    
2308
    bep = self.cfg.GetClusterInfo().FillBE(instance)
2309
    # check bridges existance
2310
    _CheckInstanceBridgesExist(self, instance)
2311

    
2312
    _CheckNodeFreeMemory(self, instance.primary_node,
2313
                         "starting instance %s" % instance.name,
2314
                         bep[constants.BE_MEMORY], instance.hypervisor)
2315

    
2316
  def Exec(self, feedback_fn):
2317
    """Start the instance.
2318

2319
    """
2320
    instance = self.instance
2321
    force = self.op.force
2322
    extra_args = getattr(self.op, "extra_args", "")
2323

    
2324
    self.cfg.MarkInstanceUp(instance.name)
2325

    
2326
    node_current = instance.primary_node
2327

    
2328
    _StartInstanceDisks(self, instance, force)
2329

    
2330
    if not self.rpc.call_instance_start(node_current, instance, extra_args):
2331
      _ShutdownInstanceDisks(self, instance)
2332
      raise errors.OpExecError("Could not start instance")
2333

    
2334

    
2335
class LURebootInstance(LogicalUnit):
2336
  """Reboot an instance.
2337

2338
  """
2339
  HPATH = "instance-reboot"
2340
  HTYPE = constants.HTYPE_INSTANCE
2341
  _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2342
  REQ_BGL = False
2343

    
2344
  def ExpandNames(self):
2345
    if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2346
                                   constants.INSTANCE_REBOOT_HARD,
2347
                                   constants.INSTANCE_REBOOT_FULL]:
2348
      raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2349
                                  (constants.INSTANCE_REBOOT_SOFT,
2350
                                   constants.INSTANCE_REBOOT_HARD,
2351
                                   constants.INSTANCE_REBOOT_FULL))
2352
    self._ExpandAndLockInstance()
2353

    
2354
  def BuildHooksEnv(self):
2355
    """Build hooks env.
2356

2357
    This runs on master, primary and secondary nodes of the instance.
2358

2359
    """
2360
    env = {
2361
      "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2362
      }
2363
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2364
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2365
          list(self.instance.secondary_nodes))
2366
    return env, nl, nl
2367

    
2368
  def CheckPrereq(self):
2369
    """Check prerequisites.
2370

2371
    This checks that the instance is in the cluster.
2372

2373
    """
2374
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2375
    assert self.instance is not None, \
2376
      "Cannot retrieve locked instance %s" % self.op.instance_name
2377

    
2378
    # check bridges existance
2379
    _CheckInstanceBridgesExist(self, instance)
2380

    
2381
  def Exec(self, feedback_fn):
2382
    """Reboot the instance.
2383

2384
    """
2385
    instance = self.instance
2386
    ignore_secondaries = self.op.ignore_secondaries
2387
    reboot_type = self.op.reboot_type
2388
    extra_args = getattr(self.op, "extra_args", "")
2389

    
2390
    node_current = instance.primary_node
2391

    
2392
    if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2393
                       constants.INSTANCE_REBOOT_HARD]:
2394
      if not self.rpc.call_instance_reboot(node_current, instance,
2395
                                           reboot_type, extra_args):
2396
        raise errors.OpExecError("Could not reboot instance")
2397
    else:
2398
      if not self.rpc.call_instance_shutdown(node_current, instance):
2399
        raise errors.OpExecError("could not shutdown instance for full reboot")
2400
      _ShutdownInstanceDisks(self, instance)
2401
      _StartInstanceDisks(self, instance, ignore_secondaries)
2402
      if not self.rpc.call_instance_start(node_current, instance, extra_args):
2403
        _ShutdownInstanceDisks(self, instance)
2404
        raise errors.OpExecError("Could not start instance for full reboot")
2405

    
2406
    self.cfg.MarkInstanceUp(instance.name)
2407

    
2408

    
2409
class LUShutdownInstance(LogicalUnit):
2410
  """Shutdown an instance.
2411

2412
  """
2413
  HPATH = "instance-stop"
2414
  HTYPE = constants.HTYPE_INSTANCE
2415
  _OP_REQP = ["instance_name"]
2416
  REQ_BGL = False
2417

    
2418
  def ExpandNames(self):
2419
    self._ExpandAndLockInstance()
2420

    
2421
  def BuildHooksEnv(self):
2422
    """Build hooks env.
2423

2424
    This runs on master, primary and secondary nodes of the instance.
2425

2426
    """
2427
    env = _BuildInstanceHookEnvByObject(self, self.instance)
2428
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2429
          list(self.instance.secondary_nodes))
2430
    return env, nl, nl
2431

    
2432
  def CheckPrereq(self):
2433
    """Check prerequisites.
2434

2435
    This checks that the instance is in the cluster.
2436

2437
    """
2438
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2439
    assert self.instance is not None, \
2440
      "Cannot retrieve locked instance %s" % self.op.instance_name
2441

    
2442
  def Exec(self, feedback_fn):
2443
    """Shutdown the instance.
2444

2445
    """
2446
    instance = self.instance
2447
    node_current = instance.primary_node
2448
    self.cfg.MarkInstanceDown(instance.name)
2449
    if not self.rpc.call_instance_shutdown(node_current, instance):
2450
      self.proc.LogWarning("Could not shutdown instance")
2451

    
2452
    _ShutdownInstanceDisks(self, instance)
2453

    
2454

    
2455
class LUReinstallInstance(LogicalUnit):
2456
  """Reinstall an instance.
2457

2458
  """
2459
  HPATH = "instance-reinstall"
2460
  HTYPE = constants.HTYPE_INSTANCE
2461
  _OP_REQP = ["instance_name"]
2462
  REQ_BGL = False
2463

    
2464
  def ExpandNames(self):
2465
    self._ExpandAndLockInstance()
2466

    
2467
  def BuildHooksEnv(self):
2468
    """Build hooks env.
2469

2470
    This runs on master, primary and secondary nodes of the instance.
2471

2472
    """
2473
    env = _BuildInstanceHookEnvByObject(self, self.instance)
2474
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2475
          list(self.instance.secondary_nodes))
2476
    return env, nl, nl
2477

    
2478
  def CheckPrereq(self):
2479
    """Check prerequisites.
2480

2481
    This checks that the instance is in the cluster and is not running.
2482

2483
    """
2484
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2485
    assert instance is not None, \
2486
      "Cannot retrieve locked instance %s" % self.op.instance_name
2487

    
2488
    if instance.disk_template == constants.DT_DISKLESS:
2489
      raise errors.OpPrereqError("Instance '%s' has no disks" %
2490
                                 self.op.instance_name)
2491
    if instance.status != "down":
2492
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2493
                                 self.op.instance_name)
2494
    remote_info = self.rpc.call_instance_info(instance.primary_node,
2495
                                              instance.name,
2496
                                              instance.hypervisor)
2497
    if remote_info:
2498
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2499
                                 (self.op.instance_name,
2500
                                  instance.primary_node))
2501

    
2502
    self.op.os_type = getattr(self.op, "os_type", None)
2503
    if self.op.os_type is not None:
2504
      # OS verification
2505
      pnode = self.cfg.GetNodeInfo(
2506
        self.cfg.ExpandNodeName(instance.primary_node))
2507
      if pnode is None:
2508
        raise errors.OpPrereqError("Primary node '%s' is unknown" %
2509
                                   self.op.pnode)
2510
      os_obj = self.rpc.call_os_get(pnode.name, self.op.os_type)
2511
      if not os_obj:
2512
        raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2513
                                   " primary node"  % self.op.os_type)
2514

    
2515
    self.instance = instance
2516

    
2517
  def Exec(self, feedback_fn):
2518
    """Reinstall the instance.
2519

2520
    """
2521
    inst = self.instance
2522

    
2523
    if self.op.os_type is not None:
2524
      feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2525
      inst.os = self.op.os_type
2526
      self.cfg.Update(inst)
2527

    
2528
    _StartInstanceDisks(self, inst, None)
2529
    try:
2530
      feedback_fn("Running the instance OS create scripts...")
2531
      if not self.rpc.call_instance_os_add(inst.primary_node, inst):
2532
        raise errors.OpExecError("Could not install OS for instance %s"
2533
                                 " on node %s" %
2534
                                 (inst.name, inst.primary_node))
2535
    finally:
2536
      _ShutdownInstanceDisks(self, inst)
2537

    
2538

    
2539
class LURenameInstance(LogicalUnit):
2540
  """Rename an instance.
2541

2542
  """
2543
  HPATH = "instance-rename"
2544
  HTYPE = constants.HTYPE_INSTANCE
2545
  _OP_REQP = ["instance_name", "new_name"]
2546

    
2547
  def BuildHooksEnv(self):
2548
    """Build hooks env.
2549

2550
    This runs on master, primary and secondary nodes of the instance.
2551

2552
    """
2553
    env = _BuildInstanceHookEnvByObject(self, self.instance)
2554
    env["INSTANCE_NEW_NAME"] = self.op.new_name
2555
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2556
          list(self.instance.secondary_nodes))
2557
    return env, nl, nl
2558

    
2559
  def CheckPrereq(self):
2560
    """Check prerequisites.
2561

2562
    This checks that the instance is in the cluster and is not running.
2563

2564
    """
2565
    instance = self.cfg.GetInstanceInfo(
2566
      self.cfg.ExpandInstanceName(self.op.instance_name))
2567
    if instance is None:
2568
      raise errors.OpPrereqError("Instance '%s' not known" %
2569
                                 self.op.instance_name)
2570
    if instance.status != "down":
2571
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2572
                                 self.op.instance_name)
2573
    remote_info = self.rpc.call_instance_info(instance.primary_node,
2574
                                              instance.name,
2575
                                              instance.hypervisor)
2576
    if remote_info:
2577
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2578
                                 (self.op.instance_name,
2579
                                  instance.primary_node))
2580
    self.instance = instance
2581

    
2582
    # new name verification
2583
    name_info = utils.HostInfo(self.op.new_name)
2584

    
2585
    self.op.new_name = new_name = name_info.name
2586
    instance_list = self.cfg.GetInstanceList()
2587
    if new_name in instance_list:
2588
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2589
                                 new_name)
2590

    
2591
    if not getattr(self.op, "ignore_ip", False):
2592
      if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
2593
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
2594
                                   (name_info.ip, new_name))
2595

    
2596

    
2597
  def Exec(self, feedback_fn):
2598
    """Reinstall the instance.
2599

2600
    """
2601
    inst = self.instance
2602
    old_name = inst.name
2603

    
2604
    if inst.disk_template == constants.DT_FILE:
2605
      old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2606

    
2607
    self.cfg.RenameInstance(inst.name, self.op.new_name)
2608
    # Change the instance lock. This is definitely safe while we hold the BGL
2609
    self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
2610
    self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
2611

    
2612
    # re-read the instance from the configuration after rename
2613
    inst = self.cfg.GetInstanceInfo(self.op.new_name)
2614

    
2615
    if inst.disk_template == constants.DT_FILE:
2616
      new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2617
      result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
2618
                                                     old_file_storage_dir,
2619
                                                     new_file_storage_dir)
2620

    
2621
      if not result:
2622
        raise errors.OpExecError("Could not connect to node '%s' to rename"
2623
                                 " directory '%s' to '%s' (but the instance"
2624
                                 " has been renamed in Ganeti)" % (
2625
                                 inst.primary_node, old_file_storage_dir,
2626
                                 new_file_storage_dir))
2627

    
2628
      if not result[0]:
2629
        raise errors.OpExecError("Could not rename directory '%s' to '%s'"
2630
                                 " (but the instance has been renamed in"
2631
                                 " Ganeti)" % (old_file_storage_dir,
2632
                                               new_file_storage_dir))
2633

    
2634
    _StartInstanceDisks(self, inst, None)
2635
    try:
2636
      if not self.rpc.call_instance_run_rename(inst.primary_node, inst,
2637
                                               old_name):
2638
        msg = ("Could not run OS rename script for instance %s on node %s"
2639
               " (but the instance has been renamed in Ganeti)" %
2640
               (inst.name, inst.primary_node))
2641
        self.proc.LogWarning(msg)
2642
    finally:
2643
      _ShutdownInstanceDisks(self, inst)
2644

    
2645

    
2646
class LURemoveInstance(LogicalUnit):
2647
  """Remove an instance.
2648

2649
  """
2650
  HPATH = "instance-remove"
2651
  HTYPE = constants.HTYPE_INSTANCE
2652
  _OP_REQP = ["instance_name", "ignore_failures"]
2653
  REQ_BGL = False
2654

    
2655
  def ExpandNames(self):
2656
    self._ExpandAndLockInstance()
2657
    self.needed_locks[locking.LEVEL_NODE] = []
2658
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2659

    
2660
  def DeclareLocks(self, level):
2661
    if level == locking.LEVEL_NODE:
2662
      self._LockInstancesNodes()
2663

    
2664
  def BuildHooksEnv(self):
2665
    """Build hooks env.
2666

2667
    This runs on master, primary and secondary nodes of the instance.
2668

2669
    """
2670
    env = _BuildInstanceHookEnvByObject(self, self.instance)
2671
    nl = [self.cfg.GetMasterNode()]
2672
    return env, nl, nl
2673

    
2674
  def CheckPrereq(self):
2675
    """Check prerequisites.
2676

2677
    This checks that the instance is in the cluster.
2678

2679
    """
2680
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2681
    assert self.instance is not None, \
2682
      "Cannot retrieve locked instance %s" % self.op.instance_name
2683

    
2684
  def Exec(self, feedback_fn):
2685
    """Remove the instance.
2686

2687
    """
2688
    instance = self.instance
2689
    logging.info("Shutting down instance %s on node %s",
2690
                 instance.name, instance.primary_node)
2691

    
2692
    if not self.rpc.call_instance_shutdown(instance.primary_node, instance):
2693
      if self.op.ignore_failures:
2694
        feedback_fn("Warning: can't shutdown instance")
2695
      else:
2696
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2697
                                 (instance.name, instance.primary_node))
2698

    
2699
    logging.info("Removing block devices for instance %s", instance.name)
2700

    
2701
    if not _RemoveDisks(self, instance):
2702
      if self.op.ignore_failures:
2703
        feedback_fn("Warning: can't remove instance's disks")
2704
      else:
2705
        raise errors.OpExecError("Can't remove instance's disks")
2706

    
2707
    logging.info("Removing instance %s out of cluster config", instance.name)
2708

    
2709
    self.cfg.RemoveInstance(instance.name)
2710
    self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
2711

    
2712

    
2713
class LUQueryInstances(NoHooksLU):
2714
  """Logical unit for querying instances.
2715

2716
  """
2717
  _OP_REQP = ["output_fields", "names"]
2718
  REQ_BGL = False
2719
  _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
2720
                                    "admin_state", "admin_ram",
2721
                                    "disk_template", "ip", "mac", "bridge",
2722
                                    "sda_size", "sdb_size", "vcpus", "tags",
2723
                                    "network_port", "beparams",
2724
                                    "(disk).(size)/([0-9]+)",
2725
                                    "(disk).(sizes)",
2726
                                    "(nic).(mac|ip|bridge)/([0-9]+)",
2727
                                    "(nic).(macs|ips|bridges)",
2728
                                    "(disk|nic).(count)",
2729
                                    "serial_no", "hypervisor", "hvparams",] +
2730
                                  ["hv/%s" % name
2731
                                   for name in constants.HVS_PARAMETERS] +
2732
                                  ["be/%s" % name
2733
                                   for name in constants.BES_PARAMETERS])
2734
  _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
2735

    
2736

    
2737
  def ExpandNames(self):
2738
    _CheckOutputFields(static=self._FIELDS_STATIC,
2739
                       dynamic=self._FIELDS_DYNAMIC,
2740
                       selected=self.op.output_fields)
2741

    
2742
    self.needed_locks = {}
2743
    self.share_locks[locking.LEVEL_INSTANCE] = 1
2744
    self.share_locks[locking.LEVEL_NODE] = 1
2745

    
2746
    if self.op.names:
2747
      self.wanted = _GetWantedInstances(self, self.op.names)
2748
    else:
2749
      self.wanted = locking.ALL_SET
2750

    
2751
    self.do_locking = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
2752
    if self.do_locking:
2753
      self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
2754
      self.needed_locks[locking.LEVEL_NODE] = []
2755
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2756

    
2757
  def DeclareLocks(self, level):
2758
    if level == locking.LEVEL_NODE and self.do_locking:
2759
      self._LockInstancesNodes()
2760

    
2761
  def CheckPrereq(self):
2762
    """Check prerequisites.
2763

2764
    """
2765
    pass
2766

    
2767
  def Exec(self, feedback_fn):
2768
    """Computes the list of nodes and their attributes.
2769

2770
    """
2771
    all_info = self.cfg.GetAllInstancesInfo()
2772
    if self.do_locking:
2773
      instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
2774
    elif self.wanted != locking.ALL_SET:
2775
      instance_names = self.wanted
2776
      missing = set(instance_names).difference(all_info.keys())
2777
      if missing:
2778
        raise errors.OpExecError(
2779
          "Some instances were removed before retrieving their data: %s"
2780
          % missing)
2781
    else:
2782
      instance_names = all_info.keys()
2783

    
2784
    instance_names = utils.NiceSort(instance_names)
2785
    instance_list = [all_info[iname] for iname in instance_names]
2786

    
2787
    # begin data gathering
2788

    
2789
    nodes = frozenset([inst.primary_node for inst in instance_list])
2790
    hv_list = list(set([inst.hypervisor for inst in instance_list]))
2791

    
2792
    bad_nodes = []
2793
    if self.do_locking:
2794
      live_data = {}
2795
      node_data = self.rpc.call_all_instances_info(nodes, hv_list)
2796
      for name in nodes:
2797
        result = node_data[name]
2798
        if result:
2799
          live_data.update(result)
2800
        elif result == False:
2801
          bad_nodes.append(name)
2802
        # else no instance is alive
2803
    else:
2804
      live_data = dict([(name, {}) for name in instance_names])
2805

    
2806
    # end data gathering
2807

    
2808
    HVPREFIX = "hv/"
2809
    BEPREFIX = "be/"
2810
    output = []
2811
    for instance in instance_list:
2812
      iout = []
2813
      i_hv = self.cfg.GetClusterInfo().FillHV(instance)
2814
      i_be = self.cfg.GetClusterInfo().FillBE(instance)
2815
      for field in self.op.output_fields:
2816
        st_match = self._FIELDS_STATIC.Matches(field)
2817
        if field == "name":
2818
          val = instance.name
2819
        elif field == "os":
2820
          val = instance.os
2821
        elif field == "pnode":
2822
          val = instance.primary_node
2823
        elif field == "snodes":
2824
          val = list(instance.secondary_nodes)
2825
        elif field == "admin_state":
2826
          val = (instance.status != "down")
2827
        elif field == "oper_state":
2828
          if instance.primary_node in bad_nodes:
2829
            val = None
2830
          else:
2831
            val = bool(live_data.get(instance.name))
2832
        elif field == "status":
2833
          if instance.primary_node in bad_nodes:
2834
            val = "ERROR_nodedown"
2835
          else:
2836
            running = bool(live_data.get(instance.name))
2837
            if running:
2838
              if instance.status != "down":
2839
                val = "running"
2840
              else:
2841
                val = "ERROR_up"
2842
            else:
2843
              if instance.status != "down":
2844
                val = "ERROR_down"
2845
              else:
2846
                val = "ADMIN_down"
2847
        elif field == "oper_ram":
2848
          if instance.primary_node in bad_nodes:
2849
            val = None
2850
          elif instance.name in live_data:
2851
            val = live_data[instance.name].get("memory", "?")
2852
          else:
2853
            val = "-"
2854
        elif field == "disk_template":
2855
          val = instance.disk_template
2856
        elif field == "ip":
2857
          val = instance.nics[0].ip
2858
        elif field == "bridge":
2859
          val = instance.nics[0].bridge
2860
        elif field == "mac":
2861
          val = instance.nics[0].mac
2862
        elif field == "sda_size" or field == "sdb_size":
2863
          idx = ord(field[2]) - ord('a')
2864
          try:
2865
            val = instance.FindDisk(idx).size
2866
          except errors.OpPrereqError:
2867
            val = None
2868
        elif field == "tags":
2869
          val = list(instance.GetTags())
2870
        elif field == "serial_no":
2871
          val = instance.serial_no
2872
        elif field == "network_port":
2873
          val = instance.network_port
2874
        elif field == "hypervisor":
2875
          val = instance.hypervisor
2876
        elif field == "hvparams":
2877
          val = i_hv
2878
        elif (field.startswith(HVPREFIX) and
2879
              field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
2880
          val = i_hv.get(field[len(HVPREFIX):], None)
2881
        elif field == "beparams":
2882
          val = i_be
2883
        elif (field.startswith(BEPREFIX) and
2884
              field[len(BEPREFIX):] in constants.BES_PARAMETERS):
2885
          val = i_be.get(field[len(BEPREFIX):], None)
2886
        elif st_match and st_match.groups():
2887
          # matches a variable list
2888
          st_groups = st_match.groups()
2889
          if st_groups and st_groups[0] == "disk":
2890
            if st_groups[1] == "count":
2891
              val = len(instance.disks)
2892
            elif st_groups[1] == "sizes":
2893
              val = [disk.size for disk in instance.disks]
2894
            elif st_groups[1] == "size":
2895
              try:
2896
                val = instance.FindDisk(st_groups[2]).size
2897
              except errors.OpPrereqError:
2898
                val = None
2899
            else:
2900
              assert False, "Unhandled disk parameter"
2901
          elif st_groups[0] == "nic":
2902
            if st_groups[1] == "count":
2903
              val = len(instance.nics)
2904
            elif st_groups[1] == "macs":
2905
              val = [nic.mac for nic in instance.nics]
2906
            elif st_groups[1] == "ips":
2907
              val = [nic.ip for nic in instance.nics]
2908
            elif st_groups[1] == "bridges":
2909
              val = [nic.bridge for nic in instance.nics]
2910
            else:
2911
              # index-based item
2912
              nic_idx = int(st_groups[2])
2913
              if nic_idx >= len(instance.nics):
2914
                val = None
2915
              else:
2916
                if st_groups[1] == "mac":
2917
                  val = instance.nics[nic_idx].mac
2918
                elif st_groups[1] == "ip":
2919
                  val = instance.nics[nic_idx].ip
2920
                elif st_groups[1] == "bridge":
2921
                  val = instance.nics[nic_idx].bridge
2922
                else:
2923
                  assert False, "Unhandled NIC parameter"
2924
          else:
2925
            assert False, "Unhandled variable parameter"
2926
        else:
2927
          raise errors.ParameterError(field)
2928
        iout.append(val)
2929
      output.append(iout)
2930

    
2931
    return output
2932

    
2933

    
2934
class LUFailoverInstance(LogicalUnit):
2935
  """Failover an instance.
2936

2937
  """
2938
  HPATH = "instance-failover"
2939
  HTYPE = constants.HTYPE_INSTANCE
2940
  _OP_REQP = ["instance_name", "ignore_consistency"]
2941
  REQ_BGL = False
2942

    
2943
  def ExpandNames(self):
2944
    self._ExpandAndLockInstance()
2945
    self.needed_locks[locking.LEVEL_NODE] = []
2946
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2947

    
2948
  def DeclareLocks(self, level):
2949
    if level == locking.LEVEL_NODE:
2950
      self._LockInstancesNodes()
2951

    
2952
  def BuildHooksEnv(self):
2953
    """Build hooks env.
2954

2955
    This runs on master, primary and secondary nodes of the instance.
2956

2957
    """
2958
    env = {
2959
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2960
      }
2961
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2962
    nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
2963
    return env, nl, nl
2964

    
2965
  def CheckPrereq(self):
2966
    """Check prerequisites.
2967

2968
    This checks that the instance is in the cluster.
2969

2970
    """
2971
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2972
    assert self.instance is not None, \
2973
      "Cannot retrieve locked instance %s" % self.op.instance_name
2974

    
2975
    bep = self.cfg.GetClusterInfo().FillBE(instance)
2976
    if instance.disk_template not in constants.DTS_NET_MIRROR:
2977
      raise errors.OpPrereqError("Instance's disk layout is not"
2978
                                 " network mirrored, cannot failover.")
2979

    
2980
    secondary_nodes = instance.secondary_nodes
2981
    if not secondary_nodes:
2982
      raise errors.ProgrammerError("no secondary node but using "
2983
                                   "a mirrored disk template")
2984

    
2985
    target_node = secondary_nodes[0]
2986
    # check memory requirements on the secondary node
2987
    _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
2988
                         instance.name, bep[constants.BE_MEMORY],
2989
                         instance.hypervisor)
2990

    
2991
    # check bridge existance
2992
    brlist = [nic.bridge for nic in instance.nics]
2993
    if not self.rpc.call_bridges_exist(target_node, brlist):
2994
      raise errors.OpPrereqError("One or more target bridges %s does not"
2995
                                 " exist on destination node '%s'" %
2996
                                 (brlist, target_node))
2997

    
2998
  def Exec(self, feedback_fn):
2999
    """Failover an instance.
3000

3001
    The failover is done by shutting it down on its present node and
3002
    starting it on the secondary.
3003

3004
    """
3005
    instance = self.instance
3006

    
3007
    source_node = instance.primary_node
3008
    target_node = instance.secondary_nodes[0]
3009

    
3010
    feedback_fn("* checking disk consistency between source and target")
3011
    for dev in instance.disks:
3012
      # for drbd, these are drbd over lvm
3013
      if not _CheckDiskConsistency(self, dev, target_node, False):
3014
        if instance.status == "up" and not self.op.ignore_consistency:
3015
          raise errors.OpExecError("Disk %s is degraded on target node,"
3016
                                   " aborting failover." % dev.iv_name)
3017

    
3018
    feedback_fn("* shutting down instance on source node")
3019
    logging.info("Shutting down instance %s on node %s",
3020
                 instance.name, source_node)
3021

    
3022
    if not self.rpc.call_instance_shutdown(source_node, instance):
3023
      if self.op.ignore_consistency:
3024
        self.proc.LogWarning("Could not shutdown instance %s on node %s."
3025
                             " Proceeding"
3026
                             " anyway. Please make sure node %s is down",
3027
                             instance.name, source_node, source_node)
3028
      else:
3029
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
3030
                                 (instance.name, source_node))
3031

    
3032
    feedback_fn("* deactivating the instance's disks on source node")
3033
    if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
3034
      raise errors.OpExecError("Can't shut down the instance's disks.")
3035

    
3036
    instance.primary_node = target_node
3037
    # distribute new instance config to the other nodes
3038
    self.cfg.Update(instance)
3039

    
3040
    # Only start the instance if it's marked as up
3041
    if instance.status == "up":
3042
      feedback_fn("* activating the instance's disks on target node")
3043
      logging.info("Starting instance %s on node %s",
3044
                   instance.name, target_node)
3045

    
3046
      disks_ok, dummy = _AssembleInstanceDisks(self, instance,
3047
                                               ignore_secondaries=True)
3048
      if not disks_ok:
3049
        _ShutdownInstanceDisks(self, instance)
3050
        raise errors.OpExecError("Can't activate the instance's disks")
3051

    
3052
      feedback_fn("* starting the instance on the target node")
3053
      if not self.rpc.call_instance_start(target_node, instance, None):
3054
        _ShutdownInstanceDisks(self, instance)
3055
        raise errors.OpExecError("Could not start instance %s on node %s." %
3056
                                 (instance.name, target_node))
3057

    
3058

    
3059
def _CreateBlockDevOnPrimary(lu, node, instance, device, info):
3060
  """Create a tree of block devices on the primary node.
3061

3062
  This always creates all devices.
3063

3064
  """
3065
  if device.children:
3066
    for child in device.children:
3067
      if not _CreateBlockDevOnPrimary(lu, node, instance, child, info):
3068
        return False
3069

    
3070
  lu.cfg.SetDiskID(device, node)
3071
  new_id = lu.rpc.call_blockdev_create(node, device, device.size,
3072
                                       instance.name, True, info)
3073
  if not new_id:
3074
    return False
3075
  if device.physical_id is None:
3076
    device.physical_id = new_id
3077
  return True
3078

    
3079

    
3080
def _CreateBlockDevOnSecondary(lu, node, instance, device, force, info):
3081
  """Create a tree of block devices on a secondary node.
3082

3083
  If this device type has to be created on secondaries, create it and
3084
  all its children.
3085

3086
  If not, just recurse to children keeping the same 'force' value.
3087

3088
  """
3089
  if device.CreateOnSecondary():
3090
    force = True
3091
  if device.children:
3092
    for child in device.children:
3093
      if not _CreateBlockDevOnSecondary(lu, node, instance,
3094
                                        child, force, info):
3095
        return False
3096

    
3097
  if not force:
3098
    return True
3099
  lu.cfg.SetDiskID(device, node)
3100
  new_id = lu.rpc.call_blockdev_create(node, device, device.size,
3101
                                       instance.name, False, info)
3102
  if not new_id:
3103
    return False
3104
  if device.physical_id is None:
3105
    device.physical_id = new_id
3106
  return True
3107

    
3108

    
3109
def _GenerateUniqueNames(lu, exts):
3110
  """Generate a suitable LV name.
3111

3112
  This will generate a logical volume name for the given instance.
3113

3114
  """
3115
  results = []
3116
  for val in exts:
3117
    new_id = lu.cfg.GenerateUniqueID()
3118
    results.append("%s%s" % (new_id, val))
3119
  return results
3120

    
3121

    
3122
def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
3123
                         p_minor, s_minor):
3124
  """Generate a drbd8 device complete with its children.
3125

3126
  """
3127
  port = lu.cfg.AllocatePort()
3128
  vgname = lu.cfg.GetVGName()
3129
  shared_secret = lu.cfg.GenerateDRBDSecret()
3130
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3131
                          logical_id=(vgname, names[0]))
3132
  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3133
                          logical_id=(vgname, names[1]))
3134
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
3135
                          logical_id=(primary, secondary, port,
3136
                                      p_minor, s_minor,
3137
                                      shared_secret),
3138
                          children=[dev_data, dev_meta],
3139
                          iv_name=iv_name)
3140
  return drbd_dev
3141

    
3142

    
3143
def _GenerateDiskTemplate(lu, template_name,
3144
                          instance_name, primary_node,
3145
                          secondary_nodes, disk_info,
3146
                          file_storage_dir, file_driver,
3147
                          base_index):
3148
  """Generate the entire disk layout for a given template type.
3149

3150
  """
3151
  #TODO: compute space requirements
3152

    
3153
  vgname = lu.cfg.GetVGName()
3154
  disk_count = len(disk_info)
3155
  disks = []
3156
  if template_name == constants.DT_DISKLESS:
3157
    pass
3158
  elif template_name == constants.DT_PLAIN:
3159
    if len(secondary_nodes) != 0:
3160
      raise errors.ProgrammerError("Wrong template configuration")
3161

    
3162
    names = _GenerateUniqueNames(lu, [".disk%d" % i
3163
                                      for i in range(disk_count)])
3164
    for idx, disk in enumerate(disk_info):
3165
      disk_index = idx + base_index
3166
      disk_dev = objects.Disk(dev_type=constants.LD_LV, size=disk["size"],
3167
                              logical_id=(vgname, names[idx]),
3168
                              iv_name="disk/%d" % disk_index)
3169
      disks.append(disk_dev)
3170
  elif template_name == constants.DT_DRBD8:
3171
    if len(secondary_nodes) != 1:
3172
      raise errors.ProgrammerError("Wrong template configuration")
3173
    remote_node = secondary_nodes[0]
3174
    minors = lu.cfg.AllocateDRBDMinor(
3175
      [primary_node, remote_node] * len(disk_info), instance_name)
3176

    
3177
    names = _GenerateUniqueNames(lu,
3178
                                 [".disk%d_%s" % (i, s)
3179
                                  for i in range(disk_count)
3180
                                  for s in ("data", "meta")
3181
                                  ])
3182
    for idx, disk in enumerate(disk_info):
3183
      disk_index = idx + base_index
3184
      disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
3185
                                      disk["size"], names[idx*2:idx*2+2],
3186
                                      "disk/%d" % disk_index,
3187
                                      minors[idx*2], minors[idx*2+1])
3188
      disks.append(disk_dev)
3189
  elif template_name == constants.DT_FILE:
3190
    if len(secondary_nodes) != 0:
3191
      raise errors.ProgrammerError("Wrong template configuration")
3192

    
3193
    for idx, disk in enumerate(disk_info):
3194
      disk_index = idx + base_index
3195
      disk_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk["size"],
3196
                              iv_name="disk/%d" % disk_index,
3197
                              logical_id=(file_driver,
3198
                                          "%s/disk%d" % (file_storage_dir,
3199
                                                         idx)))
3200
      disks.append(disk_dev)
3201
  else:
3202
    raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
3203
  return disks
3204

    
3205

    
3206
def _GetInstanceInfoText(instance):
3207
  """Compute that text that should be added to the disk's metadata.
3208

3209
  """
3210
  return "originstname+%s" % instance.name
3211

    
3212

    
3213
def _CreateDisks(lu, instance):
3214
  """Create all disks for an instance.
3215

3216
  This abstracts away some work from AddInstance.
3217

3218
  @type lu: L{LogicalUnit}
3219
  @param lu: the logical unit on whose behalf we execute
3220
  @type instance: L{objects.Instance}
3221
  @param instance: the instance whose disks we should create
3222
  @rtype: boolean
3223
  @return: the success of the creation
3224

3225
  """
3226
  info = _GetInstanceInfoText(instance)
3227

    
3228
  if instance.disk_template == constants.DT_FILE:
3229
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3230
    result = lu.rpc.call_file_storage_dir_create(instance.primary_node,
3231
                                                 file_storage_dir)
3232

    
3233
    if not result:
3234
      logging.error("Could not connect to node '%s'", instance.primary_node)
3235
      return False
3236

    
3237
    if not result[0]:
3238
      logging.error("Failed to create directory '%s'", file_storage_dir)
3239
      return False
3240

    
3241
  # Note: this needs to be kept in sync with adding of disks in
3242
  # LUSetInstanceParams
3243
  for device in instance.disks:
3244
    logging.info("Creating volume %s for instance %s",
3245
                 device.iv_name, instance.name)
3246
    #HARDCODE
3247
    for secondary_node in instance.secondary_nodes:
3248
      if not _CreateBlockDevOnSecondary(lu, secondary_node, instance,
3249
                                        device, False, info):
3250
        logging.error("Failed to create volume %s (%s) on secondary node %s!",
3251
                      device.iv_name, device, secondary_node)
3252
        return False
3253
    #HARDCODE
3254
    if not _CreateBlockDevOnPrimary(lu, instance.primary_node,
3255
                                    instance, device, info):
3256
      logging.error("Failed to create volume %s on primary!", device.iv_name)
3257
      return False
3258

    
3259
  return True
3260

    
3261

    
3262
def _RemoveDisks(lu, instance):
3263
  """Remove all disks for an instance.
3264

3265
  This abstracts away some work from `AddInstance()` and
3266
  `RemoveInstance()`. Note that in case some of the devices couldn't
3267
  be removed, the removal will continue with the other ones (compare
3268
  with `_CreateDisks()`).
3269

3270
  @type lu: L{LogicalUnit}
3271
  @param lu: the logical unit on whose behalf we execute
3272
  @type instance: L{objects.Instance}
3273
  @param instance: the instance whose disks we should remove
3274
  @rtype: boolean
3275
  @return: the success of the removal
3276

3277
  """
3278
  logging.info("Removing block devices for instance %s", instance.name)
3279

    
3280
  result = True
3281
  for device in instance.disks:
3282
    for node, disk in device.ComputeNodeTree(instance.primary_node):
3283
      lu.cfg.SetDiskID(disk, node)
3284
      if not lu.rpc.call_blockdev_remove(node, disk):
3285
        lu.proc.LogWarning("Could not remove block device %s on node %s,"
3286
                           " continuing anyway", device.iv_name, node)
3287
        result = False
3288

    
3289
  if instance.disk_template == constants.DT_FILE:
3290
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3291
    if not lu.rpc.call_file_storage_dir_remove(instance.primary_node,
3292
                                               file_storage_dir):
3293
      logging.error("Could not remove directory '%s'", file_storage_dir)
3294
      result = False
3295

    
3296
  return result
3297

    
3298

    
3299
def _ComputeDiskSize(disk_template, disks):
3300
  """Compute disk size requirements in the volume group
3301

3302
  """
3303
  # Required free disk space as a function of disk and swap space
3304
  req_size_dict = {
3305
    constants.DT_DISKLESS: None,
3306
    constants.DT_PLAIN: sum(d["size"] for d in disks),
3307
    # 128 MB are added for drbd metadata for each disk
3308
    constants.DT_DRBD8: sum(d["size"] + 128 for d in disks),
3309
    constants.DT_FILE: None,
3310
  }
3311

    
3312
  if disk_template not in req_size_dict:
3313
    raise errors.ProgrammerError("Disk template '%s' size requirement"
3314
                                 " is unknown" %  disk_template)
3315

    
3316
  return req_size_dict[disk_template]
3317

    
3318

    
3319
def _CheckHVParams(lu, nodenames, hvname, hvparams):
3320
  """Hypervisor parameter validation.
3321

3322
  This function abstract the hypervisor parameter validation to be
3323
  used in both instance create and instance modify.
3324

3325
  @type lu: L{LogicalUnit}
3326
  @param lu: the logical unit for which we check
3327
  @type nodenames: list
3328
  @param nodenames: the list of nodes on which we should check
3329
  @type hvname: string
3330
  @param hvname: the name of the hypervisor we should use
3331
  @type hvparams: dict
3332
  @param hvparams: the parameters which we need to check
3333
  @raise errors.OpPrereqError: if the parameters are not valid
3334

3335
  """
3336
  hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
3337
                                                  hvname,
3338
                                                  hvparams)
3339
  for node in nodenames:
3340
    info = hvinfo.get(node, None)
3341
    if not info or not isinstance(info, (tuple, list)):
3342
      raise errors.OpPrereqError("Cannot get current information"
3343
                                 " from node '%s' (%s)" % (node, info))
3344
    if not info[0]:
3345
      raise errors.OpPrereqError("Hypervisor parameter validation failed:"
3346
                                 " %s" % info[1])
3347

    
3348

    
3349
class LUCreateInstance(LogicalUnit):
3350
  """Create an instance.
3351

3352
  """
3353
  HPATH = "instance-add"
3354
  HTYPE = constants.HTYPE_INSTANCE
3355
  _OP_REQP = ["instance_name", "disks", "disk_template",
3356
              "mode", "start",
3357
              "wait_for_sync", "ip_check", "nics",
3358
              "hvparams", "beparams"]
3359
  REQ_BGL = False
3360

    
3361
  def _ExpandNode(self, node):
3362
    """Expands and checks one node name.
3363

3364
    """
3365
    node_full = self.cfg.ExpandNodeName(node)
3366
    if node_full is None:
3367
      raise errors.OpPrereqError("Unknown node %s" % node)
3368
    return node_full
3369

    
3370
  def ExpandNames(self):
3371
    """ExpandNames for CreateInstance.
3372

3373
    Figure out the right locks for instance creation.
3374

3375
    """
3376
    self.needed_locks = {}
3377

    
3378
    # set optional parameters to none if they don't exist
3379
    for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
3380
      if not hasattr(self.op, attr):
3381
        setattr(self.op, attr, None)
3382

    
3383
    # cheap checks, mostly valid constants given
3384

    
3385
    # verify creation mode
3386
    if self.op.mode not in (constants.INSTANCE_CREATE,
3387
                            constants.INSTANCE_IMPORT):
3388
      raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
3389
                                 self.op.mode)
3390

    
3391
    # disk template and mirror node verification
3392
    if self.op.disk_template not in constants.DISK_TEMPLATES:
3393
      raise errors.OpPrereqError("Invalid disk template name")
3394

    
3395
    if self.op.hypervisor is None:
3396
      self.op.hypervisor = self.cfg.GetHypervisorType()
3397

    
3398
    cluster = self.cfg.GetClusterInfo()
3399
    enabled_hvs = cluster.enabled_hypervisors
3400
    if self.op.hypervisor not in enabled_hvs:
3401
      raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
3402
                                 " cluster (%s)" % (self.op.hypervisor,
3403
                                  ",".join(enabled_hvs)))
3404

    
3405
    # check hypervisor parameter syntax (locally)
3406

    
3407
    filled_hvp = cluster.FillDict(cluster.hvparams[self.op.hypervisor],
3408
                                  self.op.hvparams)
3409
    hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
3410
    hv_type.CheckParameterSyntax(filled_hvp)
3411

    
3412
    # fill and remember the beparams dict
3413
    self.be_full = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
3414
                                    self.op.beparams)
3415

    
3416
    #### instance parameters check
3417

    
3418
    # instance name verification
3419
    hostname1 = utils.HostInfo(self.op.instance_name)
3420
    self.op.instance_name = instance_name = hostname1.name
3421

    
3422
    # this is just a preventive check, but someone might still add this
3423
    # instance in the meantime, and creation will fail at lock-add time
3424
    if instance_name in self.cfg.GetInstanceList():
3425
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3426
                                 instance_name)
3427

    
3428
    self.add_locks[locking.LEVEL_INSTANCE] = instance_name
3429

    
3430
    # NIC buildup
3431
    self.nics = []
3432
    for nic in self.op.nics:
3433
      # ip validity checks
3434
      ip = nic.get("ip", None)
3435
      if ip is None or ip.lower() == "none":
3436
        nic_ip = None
3437
      elif ip.lower() == constants.VALUE_AUTO:
3438
        nic_ip = hostname1.ip
3439
      else:
3440
        if not utils.IsValidIP(ip):
3441
          raise errors.OpPrereqError("Given IP address '%s' doesn't look"
3442
                                     " like a valid IP" % ip)
3443
        nic_ip = ip
3444

    
3445
      # MAC address verification
3446
      mac = nic.get("mac", constants.VALUE_AUTO)
3447
      if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
3448
        if not utils.IsValidMac(mac.lower()):
3449
          raise errors.OpPrereqError("Invalid MAC address specified: %s" %
3450
                                     mac)
3451
      # bridge verification
3452
      bridge = nic.get("bridge", self.cfg.GetDefBridge())
3453
      self.nics.append(objects.NIC(mac=mac, ip=nic_ip, bridge=bridge))
3454

    
3455
    # disk checks/pre-build
3456
    self.disks = []
3457
    for disk in self.op.disks:
3458
      mode = disk.get("mode", constants.DISK_RDWR)
3459
      if mode not in constants.DISK_ACCESS_SET:
3460
        raise errors.OpPrereqError("Invalid disk access mode '%s'" %
3461
                                   mode)
3462
      size = disk.get("size", None)
3463
      if size is None:
3464
        raise errors.OpPrereqError("Missing disk size")
3465
      try:
3466
        size = int(size)
3467
      except ValueError:
3468
        raise errors.OpPrereqError("Invalid disk size '%s'" % size)
3469
      self.disks.append({"size": size, "mode": mode})
3470

    
3471
    # used in CheckPrereq for ip ping check
3472
    self.check_ip = hostname1.ip
3473

    
3474
    # file storage checks
3475
    if (self.op.file_driver and
3476
        not self.op.file_driver in constants.FILE_DRIVER):
3477
      raise errors.OpPrereqError("Invalid file driver name '%s'" %
3478
                                 self.op.file_driver)
3479

    
3480
    if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
3481
      raise errors.OpPrereqError("File storage directory path not absolute")
3482

    
3483
    ### Node/iallocator related checks
3484
    if [self.op.iallocator, self.op.pnode].count(None) != 1:
3485
      raise errors.OpPrereqError("One and only one of iallocator and primary"
3486
                                 " node must be given")
3487

    
3488
    if self.op.iallocator:
3489
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3490
    else:
3491
      self.op.pnode = self._ExpandNode(self.op.pnode)
3492
      nodelist = [self.op.pnode]
3493
      if self.op.snode is not None:
3494
        self.op.snode = self._ExpandNode(self.op.snode)
3495
        nodelist.append(self.op.snode)
3496
      self.needed_locks[locking.LEVEL_NODE] = nodelist
3497

    
3498
    # in case of import lock the source node too
3499
    if self.op.mode == constants.INSTANCE_IMPORT:
3500
      src_node = getattr(self.op, "src_node", None)
3501
      src_path = getattr(self.op, "src_path", None)
3502

    
3503
      if src_node is None or src_path is None:
3504
        raise errors.OpPrereqError("Importing an instance requires source"
3505
                                   " node and path options")
3506

    
3507
      if not os.path.isabs(src_path):
3508
        raise errors.OpPrereqError("The source path must be absolute")
3509

    
3510
      self.op.src_node = src_node = self._ExpandNode(src_node)
3511
      if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
3512
        self.needed_locks[locking.LEVEL_NODE].append(src_node)
3513

    
3514
    else: # INSTANCE_CREATE
3515
      if getattr(self.op, "os_type", None) is None:
3516
        raise errors.OpPrereqError("No guest OS specified")
3517

    
3518
  def _RunAllocator(self):
3519
    """Run the allocator based on input opcode.
3520

3521
    """
3522
    nics = [n.ToDict() for n in self.nics]
3523
    ial = IAllocator(self,
3524
                     mode=constants.IALLOCATOR_MODE_ALLOC,
3525
                     name=self.op.instance_name,
3526
                     disk_template=self.op.disk_template,
3527
                     tags=[],
3528
                     os=self.op.os_type,
3529
                     vcpus=self.be_full[constants.BE_VCPUS],
3530
                     mem_size=self.be_full[constants.BE_MEMORY],
3531
                     disks=self.disks,
3532
                     nics=nics,
3533
                     hypervisor=self.op.hypervisor,
3534
                     )
3535

    
3536
    ial.Run(self.op.iallocator)
3537

    
3538
    if not ial.success:
3539
      raise errors.OpPrereqError("Can't compute nodes using"
3540
                                 " iallocator '%s': %s" % (self.op.iallocator,
3541
                                                           ial.info))
3542
    if len(ial.nodes) != ial.required_nodes:
3543
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3544
                                 " of nodes (%s), required %s" %
3545
                                 (self.op.iallocator, len(ial.nodes),
3546
                                  ial.required_nodes))
3547
    self.op.pnode = ial.nodes[0]
3548
    self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
3549
                 self.op.instance_name, self.op.iallocator,
3550
                 ", ".join(ial.nodes))
3551
    if ial.required_nodes == 2:
3552
      self.op.snode = ial.nodes[1]
3553

    
3554
  def BuildHooksEnv(self):
3555
    """Build hooks env.
3556

3557
    This runs on master, primary and secondary nodes of the instance.
3558

3559
    """
3560
    env = {
3561
      "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
3562
      "INSTANCE_DISK_SIZE": ",".join(str(d["size"]) for d in self.disks),
3563
      "INSTANCE_ADD_MODE": self.op.mode,
3564
      }
3565
    if self.op.mode == constants.INSTANCE_IMPORT:
3566
      env["INSTANCE_SRC_NODE"] = self.op.src_node
3567
      env["INSTANCE_SRC_PATH"] = self.op.src_path
3568
      env["INSTANCE_SRC_IMAGES"] = self.src_images
3569

    
3570
    env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
3571
      primary_node=self.op.pnode,
3572
      secondary_nodes=self.secondaries,
3573
      status=self.instance_status,
3574
      os_type=self.op.os_type,
3575
      memory=self.be_full[constants.BE_MEMORY],
3576
      vcpus=self.be_full[constants.BE_VCPUS],
3577
      nics=[(n.ip, n.bridge, n.mac) for n in self.nics],
3578
    ))
3579

    
3580
    nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
3581
          self.secondaries)
3582
    return env, nl, nl
3583

    
3584

    
3585
  def CheckPrereq(self):
3586
    """Check prerequisites.
3587

3588
    """
3589
    if (not self.cfg.GetVGName() and
3590
        self.op.disk_template not in constants.DTS_NOT_LVM):
3591
      raise errors.OpPrereqError("Cluster does not support lvm-based"
3592
                                 " instances")
3593

    
3594

    
3595
    if self.op.mode == constants.INSTANCE_IMPORT:
3596
      src_node = self.op.src_node
3597
      src_path = self.op.src_path
3598

    
3599
      export_info = self.rpc.call_export_info(src_node, src_path)
3600

    
3601
      if not export_info:
3602
        raise errors.OpPrereqError("No export found in dir %s" % src_path)
3603

    
3604
      if not export_info.has_section(constants.INISECT_EXP):
3605
        raise errors.ProgrammerError("Corrupted export config")
3606

    
3607
      ei_version = export_info.get(constants.INISECT_EXP, 'version')
3608
      if (int(ei_version) != constants.EXPORT_VERSION):
3609
        raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3610
                                   (ei_version, constants.EXPORT_VERSION))
3611

    
3612
      # Check that the new instance doesn't have less disks than the export
3613
      instance_disks = len(self.disks)
3614
      export_disks = export_info.getint(constants.INISECT_INS, 'disk_count')
3615
      if instance_disks < export_disks:
3616
        raise errors.OpPrereqError("Not enough disks to import."
3617
                                   " (instance: %d, export: %d)" %
3618
                                   (instance_disks, export_disks))
3619

    
3620
      self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3621
      disk_images = []
3622
      for idx in range(export_disks):
3623
        option = 'disk%d_dump' % idx
3624
        if export_info.has_option(constants.INISECT_INS, option):
3625
          # FIXME: are the old os-es, disk sizes, etc. useful?
3626
          export_name = export_info.get(constants.INISECT_INS, option)
3627
          image = os.path.join(src_path, export_name)
3628
          disk_images.append(image)
3629
        else:
3630
          disk_images.append(False)
3631

    
3632
      self.src_images = disk_images
3633

    
3634
      old_name = export_info.get(constants.INISECT_INS, 'name')
3635
      # FIXME: int() here could throw a ValueError on broken exports
3636
      exp_nic_count = int(export_info.get(constants.INISECT_INS, 'nic_count'))
3637
      if self.op.instance_name == old_name:
3638
        for idx, nic in enumerate(self.nics):
3639
          if nic.mac == constants.VALUE_AUTO and exp_nic_count >= idx:
3640
            nic_mac_ini = 'nic%d_mac' % idx
3641
            nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini)
3642

    
3643
    # ip ping checks (we use the same ip that was resolved in ExpandNames)
3644
    if self.op.start and not self.op.ip_check:
3645
      raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3646
                                 " adding an instance in start mode")
3647

    
3648
    if self.op.ip_check:
3649
      if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
3650
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
3651
                                   (self.check_ip, self.op.instance_name))
3652

    
3653
    #### allocator run
3654

    
3655
    if self.op.iallocator is not None:
3656
      self._RunAllocator()
3657

    
3658
    #### node related checks
3659

    
3660
    # check primary node
3661
    self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
3662
    assert self.pnode is not None, \
3663
      "Cannot retrieve locked node %s" % self.op.pnode
3664
    self.secondaries = []
3665

    
3666
    # mirror node verification
3667
    if self.op.disk_template in constants.DTS_NET_MIRROR:
3668
      if self.op.snode is None:
3669
        raise errors.OpPrereqError("The networked disk templates need"
3670
                                   " a mirror node")
3671
      if self.op.snode == pnode.name:
3672
        raise errors.OpPrereqError("The secondary node cannot be"
3673
                                   " the primary node.")
3674
      self.secondaries.append(self.op.snode)
3675

    
3676
    nodenames = [pnode.name] + self.secondaries
3677

    
3678
    req_size = _ComputeDiskSize(self.op.disk_template,
3679
                                self.disks)
3680

    
3681
    # Check lv size requirements
3682
    if req_size is not None:
3683
      nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
3684
                                         self.op.hypervisor)
3685
      for node in nodenames:
3686
        info = nodeinfo.get(node, None)
3687
        if not info:
3688
          raise errors.OpPrereqError("Cannot get current information"
3689
                                     " from node '%s'" % node)
3690
        vg_free = info.get('vg_free', None)
3691
        if not isinstance(vg_free, int):
3692
          raise errors.OpPrereqError("Can't compute free disk space on"
3693
                                     " node %s" % node)
3694
        if req_size > info['vg_free']:
3695
          raise errors.OpPrereqError("Not enough disk space on target node %s."
3696
                                     " %d MB available, %d MB required" %
3697
                                     (node, info['vg_free'], req_size))
3698

    
3699
    _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
3700

    
3701
    # os verification
3702
    os_obj = self.rpc.call_os_get(pnode.name, self.op.os_type)
3703
    if not os_obj:
3704
      raise errors.OpPrereqError("OS '%s' not in supported os list for"
3705
                                 " primary node"  % self.op.os_type)
3706

    
3707
    # bridge check on primary node
3708
    bridges = [n.bridge for n in self.nics]
3709
    if not self.rpc.call_bridges_exist(self.pnode.name, bridges):
3710
      raise errors.OpPrereqError("one of the target bridges '%s' does not"
3711
                                 " exist on"
3712
                                 " destination node '%s'" %
3713
                                 (",".join(bridges), pnode.name))
3714

    
3715
    # memory check on primary node
3716
    if self.op.start:
3717
      _CheckNodeFreeMemory(self, self.pnode.name,
3718
                           "creating instance %s" % self.op.instance_name,
3719
                           self.be_full[constants.BE_MEMORY],
3720
                           self.op.hypervisor)
3721

    
3722
    if self.op.start:
3723
      self.instance_status = 'up'
3724
    else:
3725
      self.instance_status = 'down'
3726

    
3727
  def Exec(self, feedback_fn):
3728
    """Create and add the instance to the cluster.
3729

3730
    """
3731
    instance = self.op.instance_name
3732
    pnode_name = self.pnode.name
3733

    
3734
    for nic in self.nics:
3735
      if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
3736
        nic.mac = self.cfg.GenerateMAC()
3737

    
3738
    ht_kind = self.op.hypervisor
3739
    if ht_kind in constants.HTS_REQ_PORT:
3740
      network_port = self.cfg.AllocatePort()
3741
    else:
3742
      network_port = None
3743

    
3744
    ##if self.op.vnc_bind_address is None:
3745
    ##  self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
3746

    
3747
    # this is needed because os.path.join does not accept None arguments
3748
    if self.op.file_storage_dir is None:
3749
      string_file_storage_dir = ""
3750
    else:
3751
      string_file_storage_dir = self.op.file_storage_dir
3752

    
3753
    # build the full file storage dir path
3754
    file_storage_dir = os.path.normpath(os.path.join(
3755
                                        self.cfg.GetFileStorageDir(),
3756
                                        string_file_storage_dir, instance))
3757

    
3758

    
3759
    disks = _GenerateDiskTemplate(self,
3760
                                  self.op.disk_template,
3761
                                  instance, pnode_name,
3762
                                  self.secondaries,
3763
                                  self.disks,
3764
                                  file_storage_dir,
3765
                                  self.op.file_driver,
3766
                                  0)
3767

    
3768
    iobj = objects.Instance(name=instance, os=self.op.os_type,
3769
                            primary_node=pnode_name,
3770
                            nics=self.nics, disks=disks,
3771
                            disk_template=self.op.disk_template,
3772
                            status=self.instance_status,
3773
                            network_port=network_port,
3774
                            beparams=self.op.beparams,
3775
                            hvparams=self.op.hvparams,
3776
                            hypervisor=self.op.hypervisor,
3777
                            )
3778

    
3779
    feedback_fn("* creating instance disks...")
3780
    if not _CreateDisks(self, iobj):
3781
      _RemoveDisks(self, iobj)
3782
      self.cfg.ReleaseDRBDMinors(instance)
3783
      raise errors.OpExecError("Device creation failed, reverting...")
3784

    
3785
    feedback_fn("adding instance %s to cluster config" % instance)
3786

    
3787
    self.cfg.AddInstance(iobj)
3788
    # Declare that we don't want to remove the instance lock anymore, as we've
3789
    # added the instance to the config
3790
    del self.remove_locks[locking.LEVEL_INSTANCE]
3791
    # Remove the temp. assignements for the instance's drbds
3792
    self.cfg.ReleaseDRBDMinors(instance)
3793
    # Unlock all the nodes
3794
    self.context.glm.release(locking.LEVEL_NODE)
3795
    del self.acquired_locks[locking.LEVEL_NODE]
3796

    
3797
    if self.op.wait_for_sync:
3798
      disk_abort = not _WaitForSync(self, iobj)
3799
    elif iobj.disk_template in constants.DTS_NET_MIRROR:
3800
      # make sure the disks are not degraded (still sync-ing is ok)
3801
      time.sleep(15)
3802
      feedback_fn("* checking mirrors status")
3803
      disk_abort = not _WaitForSync(self, iobj, oneshot=True)
3804
    else:
3805
      disk_abort = False
3806

    
3807
    if disk_abort:
3808
      _RemoveDisks(self, iobj)
3809
      self.cfg.RemoveInstance(iobj.name)
3810
      # Make sure the instance lock gets removed
3811
      self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
3812
      raise errors.OpExecError("There are some degraded disks for"
3813
                               " this instance")
3814

    
3815
    feedback_fn("creating os for instance %s on node %s" %
3816
                (instance, pnode_name))
3817

    
3818
    if iobj.disk_template != constants.DT_DISKLESS:
3819
      if self.op.mode == constants.INSTANCE_CREATE:
3820
        feedback_fn("* running the instance OS create scripts...")
3821
        if not self.rpc.call_instance_os_add(pnode_name, iobj):
3822
          raise errors.OpExecError("could not add os for instance %s"
3823
                                   " on node %s" %
3824
                                   (instance, pnode_name))
3825

    
3826
      elif self.op.mode == constants.INSTANCE_IMPORT:
3827
        feedback_fn("* running the instance OS import scripts...")
3828
        src_node = self.op.src_node
3829
        src_images = self.src_images
3830
        cluster_name = self.cfg.GetClusterName()
3831
        import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
3832
                                                         src_node, src_images,
3833
                                                         cluster_name)
3834
        for idx, result in enumerate(import_result):
3835
          if not result:
3836
            self.LogWarning("Could not import the image %s for instance"
3837
                            " %s, disk %d, on node %s" %
3838
                            (src_images[idx], instance, idx, pnode_name))
3839
      else:
3840
        # also checked in the prereq part
3841
        raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3842
                                     % self.op.mode)
3843

    
3844
    if self.op.start:
3845
      logging.info("Starting instance %s on node %s", instance, pnode_name)
3846
      feedback_fn("* starting instance...")
3847
      if not self.rpc.call_instance_start(pnode_name, iobj, None):
3848
        raise errors.OpExecError("Could not start instance")
3849

    
3850

    
3851
class LUConnectConsole(NoHooksLU):
3852
  """Connect to an instance's console.
3853

3854
  This is somewhat special in that it returns the command line that
3855
  you need to run on the master node in order to connect to the
3856
  console.
3857

3858
  """
3859
  _OP_REQP = ["instance_name"]
3860
  REQ_BGL = False
3861

    
3862
  def ExpandNames(self):
3863
    self._ExpandAndLockInstance()
3864

    
3865
  def CheckPrereq(self):
3866
    """Check prerequisites.
3867

3868
    This checks that the instance is in the cluster.
3869

3870
    """
3871
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3872
    assert self.instance is not None, \
3873
      "Cannot retrieve locked instance %s" % self.op.instance_name
3874

    
3875
  def Exec(self, feedback_fn):
3876
    """Connect to the console of an instance
3877

3878
    """
3879
    instance = self.instance
3880
    node = instance.primary_node
3881

    
3882
    node_insts = self.rpc.call_instance_list([node],
3883
                                             [instance.hypervisor])[node]
3884
    if node_insts is False:
3885
      raise errors.OpExecError("Can't connect to node %s." % node)
3886

    
3887
    if instance.name not in node_insts:
3888
      raise errors.OpExecError("Instance %s is not running." % instance.name)
3889

    
3890
    logging.debug("Connecting to console of %s on %s", instance.name, node)
3891

    
3892
    hyper = hypervisor.GetHypervisor(instance.hypervisor)
3893
    console_cmd = hyper.GetShellCommandForConsole(instance)
3894

    
3895
    # build ssh cmdline
3896
    return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
3897

    
3898

    
3899
class LUReplaceDisks(LogicalUnit):
3900
  """Replace the disks of an instance.
3901

3902
  """
3903
  HPATH = "mirrors-replace"
3904
  HTYPE = constants.HTYPE_INSTANCE
3905
  _OP_REQP = ["instance_name", "mode", "disks"]
3906
  REQ_BGL = False
3907

    
3908
  def ExpandNames(self):
3909
    self._ExpandAndLockInstance()
3910

    
3911
    if not hasattr(self.op, "remote_node"):
3912
      self.op.remote_node = None
3913

    
3914
    ia_name = getattr(self.op, "iallocator", None)
3915
    if ia_name is not None:
3916
      if self.op.remote_node is not None:
3917
        raise errors.OpPrereqError("Give either the iallocator or the new"
3918
                                   " secondary, not both")
3919
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3920
    elif self.op.remote_node is not None:
3921
      remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
3922
      if remote_node is None:
3923
        raise errors.OpPrereqError("Node '%s' not known" %
3924
                                   self.op.remote_node)
3925
      self.op.remote_node = remote_node
3926
      self.needed_locks[locking.LEVEL_NODE] = [remote_node]
3927
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
3928
    else:
3929
      self.needed_locks[locking.LEVEL_NODE] = []
3930
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3931

    
3932
  def DeclareLocks(self, level):
3933
    # If we're not already locking all nodes in the set we have to declare the
3934
    # instance's primary/secondary nodes.
3935
    if (level == locking.LEVEL_NODE and
3936
        self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
3937
      self._LockInstancesNodes()
3938

    
3939
  def _RunAllocator(self):
3940
    """Compute a new secondary node using an IAllocator.
3941

3942
    """
3943
    ial = IAllocator(self,
3944
                     mode=constants.IALLOCATOR_MODE_RELOC,
3945
                     name=self.op.instance_name,
3946
                     relocate_from=[self.sec_node])
3947

    
3948
    ial.Run(self.op.iallocator)
3949

    
3950
    if not ial.success:
3951
      raise errors.OpPrereqError("Can't compute nodes using"
3952
                                 " iallocator '%s': %s" % (self.op.iallocator,
3953
                                                           ial.info))
3954
    if len(ial.nodes) != ial.required_nodes:
3955
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3956
                                 " of nodes (%s), required %s" %
3957
                                 (len(ial.nodes), ial.required_nodes))
3958
    self.op.remote_node = ial.nodes[0]
3959
    self.LogInfo("Selected new secondary for the instance: %s",
3960
                 self.op.remote_node)
3961

    
3962
  def BuildHooksEnv(self):
3963
    """Build hooks env.
3964

3965
    This runs on the master, the primary and all the secondaries.
3966

3967
    """
3968
    env = {
3969
      "MODE": self.op.mode,
3970
      "NEW_SECONDARY": self.op.remote_node,
3971
      "OLD_SECONDARY": self.instance.secondary_nodes[0],
3972
      }
3973
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3974
    nl = [
3975
      self.cfg.GetMasterNode(),
3976
      self.instance.primary_node,
3977
      ]
3978
    if self.op.remote_node is not None:
3979
      nl.append(self.op.remote_node)
3980
    return env, nl, nl
3981

    
3982
  def CheckPrereq(self):
3983
    """Check prerequisites.
3984

3985
    This checks that the instance is in the cluster.
3986

3987
    """
3988
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3989