Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ 24991749

History | View | Annotate | Download (203.2 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0613,W0201
25

    
26
import os
27
import os.path
28
import sha
29
import time
30
import tempfile
31
import re
32
import platform
33
import logging
34
import copy
35

    
36
from ganeti import ssh
37
from ganeti import utils
38
from ganeti import errors
39
from ganeti import hypervisor
40
from ganeti import locking
41
from ganeti import constants
42
from ganeti import objects
43
from ganeti import opcodes
44
from ganeti import serializer
45

    
46

    
47
class LogicalUnit(object):
48
  """Logical Unit base class.
49

50
  Subclasses must follow these rules:
51
    - implement ExpandNames
52
    - implement CheckPrereq
53
    - implement Exec
54
    - implement BuildHooksEnv
55
    - redefine HPATH and HTYPE
56
    - optionally redefine their run requirements:
57
        REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
58

59
  Note that all commands require root permissions.
60

61
  """
62
  HPATH = None
63
  HTYPE = None
64
  _OP_REQP = []
65
  REQ_BGL = True
66

    
67
  def __init__(self, processor, op, context, rpc):
68
    """Constructor for LogicalUnit.
69

70
    This needs to be overriden in derived classes in order to check op
71
    validity.
72

73
    """
74
    self.proc = processor
75
    self.op = op
76
    self.cfg = context.cfg
77
    self.context = context
78
    self.rpc = rpc
79
    # Dicts used to declare locking needs to mcpu
80
    self.needed_locks = None
81
    self.acquired_locks = {}
82
    self.share_locks = dict(((i, 0) for i in locking.LEVELS))
83
    self.add_locks = {}
84
    self.remove_locks = {}
85
    # Used to force good behavior when calling helper functions
86
    self.recalculate_locks = {}
87
    self.__ssh = None
88
    # logging
89
    self.LogWarning = processor.LogWarning
90
    self.LogInfo = processor.LogInfo
91

    
92
    for attr_name in self._OP_REQP:
93
      attr_val = getattr(op, attr_name, None)
94
      if attr_val is None:
95
        raise errors.OpPrereqError("Required parameter '%s' missing" %
96
                                   attr_name)
97
    self.CheckArguments()
98

    
99
  def __GetSSH(self):
100
    """Returns the SshRunner object
101

102
    """
103
    if not self.__ssh:
104
      self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
105
    return self.__ssh
106

    
107
  ssh = property(fget=__GetSSH)
108

    
109
  def CheckArguments(self):
110
    """Check syntactic validity for the opcode arguments.
111

112
    This method is for doing a simple syntactic check and ensure
113
    validity of opcode parameters, without any cluster-related
114
    checks. While the same can be accomplished in ExpandNames and/or
115
    CheckPrereq, doing these separate is better because:
116

117
      - ExpandNames is left as as purely a lock-related function
118
      - CheckPrereq is run after we have aquired locks (and possible
119
        waited for them)
120

121
    The function is allowed to change the self.op attribute so that
122
    later methods can no longer worry about missing parameters.
123

124
    """
125
    pass
126

    
127
  def ExpandNames(self):
128
    """Expand names for this LU.
129

130
    This method is called before starting to execute the opcode, and it should
131
    update all the parameters of the opcode to their canonical form (e.g. a
132
    short node name must be fully expanded after this method has successfully
133
    completed). This way locking, hooks, logging, ecc. can work correctly.
134

135
    LUs which implement this method must also populate the self.needed_locks
136
    member, as a dict with lock levels as keys, and a list of needed lock names
137
    as values. Rules:
138

139
      - use an empty dict if you don't need any lock
140
      - if you don't need any lock at a particular level omit that level
141
      - don't put anything for the BGL level
142
      - if you want all locks at a level use locking.ALL_SET as a value
143

144
    If you need to share locks (rather than acquire them exclusively) at one
145
    level you can modify self.share_locks, setting a true value (usually 1) for
146
    that level. By default locks are not shared.
147

148
    Examples::
149

150
      # Acquire all nodes and one instance
151
      self.needed_locks = {
152
        locking.LEVEL_NODE: locking.ALL_SET,
153
        locking.LEVEL_INSTANCE: ['instance1.example.tld'],
154
      }
155
      # Acquire just two nodes
156
      self.needed_locks = {
157
        locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
158
      }
159
      # Acquire no locks
160
      self.needed_locks = {} # No, you can't leave it to the default value None
161

162
    """
163
    # The implementation of this method is mandatory only if the new LU is
164
    # concurrent, so that old LUs don't need to be changed all at the same
165
    # time.
166
    if self.REQ_BGL:
167
      self.needed_locks = {} # Exclusive LUs don't need locks.
168
    else:
169
      raise NotImplementedError
170

    
171
  def DeclareLocks(self, level):
172
    """Declare LU locking needs for a level
173

174
    While most LUs can just declare their locking needs at ExpandNames time,
175
    sometimes there's the need to calculate some locks after having acquired
176
    the ones before. This function is called just before acquiring locks at a
177
    particular level, but after acquiring the ones at lower levels, and permits
178
    such calculations. It can be used to modify self.needed_locks, and by
179
    default it does nothing.
180

181
    This function is only called if you have something already set in
182
    self.needed_locks for the level.
183

184
    @param level: Locking level which is going to be locked
185
    @type level: member of ganeti.locking.LEVELS
186

187
    """
188

    
189
  def CheckPrereq(self):
190
    """Check prerequisites for this LU.
191

192
    This method should check that the prerequisites for the execution
193
    of this LU are fulfilled. It can do internode communication, but
194
    it should be idempotent - no cluster or system changes are
195
    allowed.
196

197
    The method should raise errors.OpPrereqError in case something is
198
    not fulfilled. Its return value is ignored.
199

200
    This method should also update all the parameters of the opcode to
201
    their canonical form if it hasn't been done by ExpandNames before.
202

203
    """
204
    raise NotImplementedError
205

    
206
  def Exec(self, feedback_fn):
207
    """Execute the LU.
208

209
    This method should implement the actual work. It should raise
210
    errors.OpExecError for failures that are somewhat dealt with in
211
    code, or expected.
212

213
    """
214
    raise NotImplementedError
215

    
216
  def BuildHooksEnv(self):
217
    """Build hooks environment for this LU.
218

219
    This method should return a three-node tuple consisting of: a dict
220
    containing the environment that will be used for running the
221
    specific hook for this LU, a list of node names on which the hook
222
    should run before the execution, and a list of node names on which
223
    the hook should run after the execution.
224

225
    The keys of the dict must not have 'GANETI_' prefixed as this will
226
    be handled in the hooks runner. Also note additional keys will be
227
    added by the hooks runner. If the LU doesn't define any
228
    environment, an empty dict (and not None) should be returned.
229

230
    No nodes should be returned as an empty list (and not None).
231

232
    Note that if the HPATH for a LU class is None, this function will
233
    not be called.
234

235
    """
236
    raise NotImplementedError
237

    
238
  def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
239
    """Notify the LU about the results of its hooks.
240

241
    This method is called every time a hooks phase is executed, and notifies
242
    the Logical Unit about the hooks' result. The LU can then use it to alter
243
    its result based on the hooks.  By default the method does nothing and the
244
    previous result is passed back unchanged but any LU can define it if it
245
    wants to use the local cluster hook-scripts somehow.
246

247
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
248
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
249
    @param hook_results: the results of the multi-node hooks rpc call
250
    @param feedback_fn: function used send feedback back to the caller
251
    @param lu_result: the previous Exec result this LU had, or None
252
        in the PRE phase
253
    @return: the new Exec result, based on the previous result
254
        and hook results
255

256
    """
257
    return lu_result
258

    
259
  def _ExpandAndLockInstance(self):
260
    """Helper function to expand and lock an instance.
261

262
    Many LUs that work on an instance take its name in self.op.instance_name
263
    and need to expand it and then declare the expanded name for locking. This
264
    function does it, and then updates self.op.instance_name to the expanded
265
    name. It also initializes needed_locks as a dict, if this hasn't been done
266
    before.
267

268
    """
269
    if self.needed_locks is None:
270
      self.needed_locks = {}
271
    else:
272
      assert locking.LEVEL_INSTANCE not in self.needed_locks, \
273
        "_ExpandAndLockInstance called with instance-level locks set"
274
    expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
275
    if expanded_name is None:
276
      raise errors.OpPrereqError("Instance '%s' not known" %
277
                                  self.op.instance_name)
278
    self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
279
    self.op.instance_name = expanded_name
280

    
281
  def _LockInstancesNodes(self, primary_only=False):
282
    """Helper function to declare instances' nodes for locking.
283

284
    This function should be called after locking one or more instances to lock
285
    their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
286
    with all primary or secondary nodes for instances already locked and
287
    present in self.needed_locks[locking.LEVEL_INSTANCE].
288

289
    It should be called from DeclareLocks, and for safety only works if
290
    self.recalculate_locks[locking.LEVEL_NODE] is set.
291

292
    In the future it may grow parameters to just lock some instance's nodes, or
293
    to just lock primaries or secondary nodes, if needed.
294

295
    If should be called in DeclareLocks in a way similar to::
296

297
      if level == locking.LEVEL_NODE:
298
        self._LockInstancesNodes()
299

300
    @type primary_only: boolean
301
    @param primary_only: only lock primary nodes of locked instances
302

303
    """
304
    assert locking.LEVEL_NODE in self.recalculate_locks, \
305
      "_LockInstancesNodes helper function called with no nodes to recalculate"
306

    
307
    # TODO: check if we're really been called with the instance locks held
308

    
309
    # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
310
    # future we might want to have different behaviors depending on the value
311
    # of self.recalculate_locks[locking.LEVEL_NODE]
312
    wanted_nodes = []
313
    for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
314
      instance = self.context.cfg.GetInstanceInfo(instance_name)
315
      wanted_nodes.append(instance.primary_node)
316
      if not primary_only:
317
        wanted_nodes.extend(instance.secondary_nodes)
318

    
319
    if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
320
      self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
321
    elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
322
      self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
323

    
324
    del self.recalculate_locks[locking.LEVEL_NODE]
325

    
326

    
327
class NoHooksLU(LogicalUnit):
328
  """Simple LU which runs no hooks.
329

330
  This LU is intended as a parent for other LogicalUnits which will
331
  run no hooks, in order to reduce duplicate code.
332

333
  """
334
  HPATH = None
335
  HTYPE = None
336

    
337

    
338
def _GetWantedNodes(lu, nodes):
339
  """Returns list of checked and expanded node names.
340

341
  @type lu: L{LogicalUnit}
342
  @param lu: the logical unit on whose behalf we execute
343
  @type nodes: list
344
  @param nodes: list of node names or None for all nodes
345
  @rtype: list
346
  @return: the list of nodes, sorted
347
  @raise errors.OpProgrammerError: if the nodes parameter is wrong type
348

349
  """
350
  if not isinstance(nodes, list):
351
    raise errors.OpPrereqError("Invalid argument type 'nodes'")
352

    
353
  if not nodes:
354
    raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
355
      " non-empty list of nodes whose name is to be expanded.")
356

    
357
  wanted = []
358
  for name in nodes:
359
    node = lu.cfg.ExpandNodeName(name)
360
    if node is None:
361
      raise errors.OpPrereqError("No such node name '%s'" % name)
362
    wanted.append(node)
363

    
364
  return utils.NiceSort(wanted)
365

    
366

    
367
def _GetWantedInstances(lu, instances):
368
  """Returns list of checked and expanded instance names.
369

370
  @type lu: L{LogicalUnit}
371
  @param lu: the logical unit on whose behalf we execute
372
  @type instances: list
373
  @param instances: list of instance names or None for all instances
374
  @rtype: list
375
  @return: the list of instances, sorted
376
  @raise errors.OpPrereqError: if the instances parameter is wrong type
377
  @raise errors.OpPrereqError: if any of the passed instances is not found
378

379
  """
380
  if not isinstance(instances, list):
381
    raise errors.OpPrereqError("Invalid argument type 'instances'")
382

    
383
  if instances:
384
    wanted = []
385

    
386
    for name in instances:
387
      instance = lu.cfg.ExpandInstanceName(name)
388
      if instance is None:
389
        raise errors.OpPrereqError("No such instance name '%s'" % name)
390
      wanted.append(instance)
391

    
392
  else:
393
    wanted = lu.cfg.GetInstanceList()
394
  return utils.NiceSort(wanted)
395

    
396

    
397
def _CheckOutputFields(static, dynamic, selected):
398
  """Checks whether all selected fields are valid.
399

400
  @type static: L{utils.FieldSet}
401
  @param static: static fields set
402
  @type dynamic: L{utils.FieldSet}
403
  @param dynamic: dynamic fields set
404

405
  """
406
  f = utils.FieldSet()
407
  f.Extend(static)
408
  f.Extend(dynamic)
409

    
410
  delta = f.NonMatching(selected)
411
  if delta:
412
    raise errors.OpPrereqError("Unknown output fields selected: %s"
413
                               % ",".join(delta))
414

    
415

    
416
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
417
                          memory, vcpus, nics):
418
  """Builds instance related env variables for hooks
419

420
  This builds the hook environment from individual variables.
421

422
  @type name: string
423
  @param name: the name of the instance
424
  @type primary_node: string
425
  @param primary_node: the name of the instance's primary node
426
  @type secondary_nodes: list
427
  @param secondary_nodes: list of secondary nodes as strings
428
  @type os_type: string
429
  @param os_type: the name of the instance's OS
430
  @type status: string
431
  @param status: the desired status of the instances
432
  @type memory: string
433
  @param memory: the memory size of the instance
434
  @type vcpus: string
435
  @param vcpus: the count of VCPUs the instance has
436
  @type nics: list
437
  @param nics: list of tuples (ip, bridge, mac) representing
438
      the NICs the instance  has
439
  @rtype: dict
440
  @return: the hook environment for this instance
441

442
  """
443
  env = {
444
    "OP_TARGET": name,
445
    "INSTANCE_NAME": name,
446
    "INSTANCE_PRIMARY": primary_node,
447
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
448
    "INSTANCE_OS_TYPE": os_type,
449
    "INSTANCE_STATUS": status,
450
    "INSTANCE_MEMORY": memory,
451
    "INSTANCE_VCPUS": vcpus,
452
  }
453

    
454
  if nics:
455
    nic_count = len(nics)
456
    for idx, (ip, bridge, mac) in enumerate(nics):
457
      if ip is None:
458
        ip = ""
459
      env["INSTANCE_NIC%d_IP" % idx] = ip
460
      env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
461
      env["INSTANCE_NIC%d_HWADDR" % idx] = mac
462
  else:
463
    nic_count = 0
464

    
465
  env["INSTANCE_NIC_COUNT"] = nic_count
466

    
467
  return env
468

    
469

    
470
def _BuildInstanceHookEnvByObject(lu, instance, override=None):
471
  """Builds instance related env variables for hooks from an object.
472

473
  @type lu: L{LogicalUnit}
474
  @param lu: the logical unit on whose behalf we execute
475
  @type instance: L{objects.Instance}
476
  @param instance: the instance for which we should build the
477
      environment
478
  @type override: dict
479
  @param override: dictionary with key/values that will override
480
      our values
481
  @rtype: dict
482
  @return: the hook environment dictionary
483

484
  """
485
  bep = lu.cfg.GetClusterInfo().FillBE(instance)
486
  args = {
487
    'name': instance.name,
488
    'primary_node': instance.primary_node,
489
    'secondary_nodes': instance.secondary_nodes,
490
    'os_type': instance.os,
491
    'status': instance.os,
492
    'memory': bep[constants.BE_MEMORY],
493
    'vcpus': bep[constants.BE_VCPUS],
494
    'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
495
  }
496
  if override:
497
    args.update(override)
498
  return _BuildInstanceHookEnv(**args)
499

    
500

    
501
def _CheckInstanceBridgesExist(lu, instance):
502
  """Check that the brigdes needed by an instance exist.
503

504
  """
505
  # check bridges existance
506
  brlist = [nic.bridge for nic in instance.nics]
507
  if not lu.rpc.call_bridges_exist(instance.primary_node, brlist):
508
    raise errors.OpPrereqError("one or more target bridges %s does not"
509
                               " exist on destination node '%s'" %
510
                               (brlist, instance.primary_node))
511

    
512

    
513
class LUDestroyCluster(NoHooksLU):
514
  """Logical unit for destroying the cluster.
515

516
  """
517
  _OP_REQP = []
518

    
519
  def CheckPrereq(self):
520
    """Check prerequisites.
521

522
    This checks whether the cluster is empty.
523

524
    Any errors are signalled by raising errors.OpPrereqError.
525

526
    """
527
    master = self.cfg.GetMasterNode()
528

    
529
    nodelist = self.cfg.GetNodeList()
530
    if len(nodelist) != 1 or nodelist[0] != master:
531
      raise errors.OpPrereqError("There are still %d node(s) in"
532
                                 " this cluster." % (len(nodelist) - 1))
533
    instancelist = self.cfg.GetInstanceList()
534
    if instancelist:
535
      raise errors.OpPrereqError("There are still %d instance(s) in"
536
                                 " this cluster." % len(instancelist))
537

    
538
  def Exec(self, feedback_fn):
539
    """Destroys the cluster.
540

541
    """
542
    master = self.cfg.GetMasterNode()
543
    if not self.rpc.call_node_stop_master(master, False):
544
      raise errors.OpExecError("Could not disable the master role")
545
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
546
    utils.CreateBackup(priv_key)
547
    utils.CreateBackup(pub_key)
548
    return master
549

    
550

    
551
class LUVerifyCluster(LogicalUnit):
552
  """Verifies the cluster status.
553

554
  """
555
  HPATH = "cluster-verify"
556
  HTYPE = constants.HTYPE_CLUSTER
557
  _OP_REQP = ["skip_checks"]
558
  REQ_BGL = False
559

    
560
  def ExpandNames(self):
561
    self.needed_locks = {
562
      locking.LEVEL_NODE: locking.ALL_SET,
563
      locking.LEVEL_INSTANCE: locking.ALL_SET,
564
    }
565
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
566

    
567
  def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
568
                  remote_version, feedback_fn):
569
    """Run multiple tests against a node.
570

571
    Test list::
572

573
      - compares ganeti version
574
      - checks vg existance and size > 20G
575
      - checks config file checksum
576
      - checks ssh to other nodes
577

578
    @type node: string
579
    @param node: the name of the node to check
580
    @param file_list: required list of files
581
    @param local_cksum: dictionary of local files and their checksums
582
    @type vglist: dict
583
    @param vglist: dictionary of volume group names and their size
584
    @param node_result: the results from the node
585
    @param remote_version: the RPC version from the remote node
586
    @param feedback_fn: function used to accumulate results
587

588
    """
589
    # compares ganeti version
590
    local_version = constants.PROTOCOL_VERSION
591
    if not remote_version:
592
      feedback_fn("  - ERROR: connection to %s failed" % (node))
593
      return True
594

    
595
    if local_version != remote_version:
596
      feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
597
                      (local_version, node, remote_version))
598
      return True
599

    
600
    # checks vg existance and size > 20G
601

    
602
    bad = False
603
    if not vglist:
604
      feedback_fn("  - ERROR: unable to check volume groups on node %s." %
605
                      (node,))
606
      bad = True
607
    else:
608
      vgstatus = utils.CheckVolumeGroupSize(vglist, self.cfg.GetVGName(),
609
                                            constants.MIN_VG_SIZE)
610
      if vgstatus:
611
        feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
612
        bad = True
613

    
614
    if not node_result:
615
      feedback_fn("  - ERROR: unable to verify node %s." % (node,))
616
      return True
617

    
618
    # checks config file checksum
619
    # checks ssh to any
620

    
621
    if 'filelist' not in node_result:
622
      bad = True
623
      feedback_fn("  - ERROR: node hasn't returned file checksum data")
624
    else:
625
      remote_cksum = node_result['filelist']
626
      for file_name in file_list:
627
        if file_name not in remote_cksum:
628
          bad = True
629
          feedback_fn("  - ERROR: file '%s' missing" % file_name)
630
        elif remote_cksum[file_name] != local_cksum[file_name]:
631
          bad = True
632
          feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
633

    
634
    if 'nodelist' not in node_result:
635
      bad = True
636
      feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
637
    else:
638
      if node_result['nodelist']:
639
        bad = True
640
        for node in node_result['nodelist']:
641
          feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
642
                          (node, node_result['nodelist'][node]))
643
    if 'node-net-test' not in node_result:
644
      bad = True
645
      feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
646
    else:
647
      if node_result['node-net-test']:
648
        bad = True
649
        nlist = utils.NiceSort(node_result['node-net-test'].keys())
650
        for node in nlist:
651
          feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
652
                          (node, node_result['node-net-test'][node]))
653

    
654
    hyp_result = node_result.get('hypervisor', None)
655
    if isinstance(hyp_result, dict):
656
      for hv_name, hv_result in hyp_result.iteritems():
657
        if hv_result is not None:
658
          feedback_fn("  - ERROR: hypervisor %s verify failure: '%s'" %
659
                      (hv_name, hv_result))
660
    return bad
661

    
662
  def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
663
                      node_instance, feedback_fn):
664
    """Verify an instance.
665

666
    This function checks to see if the required block devices are
667
    available on the instance's node.
668

669
    """
670
    bad = False
671

    
672
    node_current = instanceconfig.primary_node
673

    
674
    node_vol_should = {}
675
    instanceconfig.MapLVsByNode(node_vol_should)
676

    
677
    for node in node_vol_should:
678
      for volume in node_vol_should[node]:
679
        if node not in node_vol_is or volume not in node_vol_is[node]:
680
          feedback_fn("  - ERROR: volume %s missing on node %s" %
681
                          (volume, node))
682
          bad = True
683

    
684
    if not instanceconfig.status == 'down':
685
      if (node_current not in node_instance or
686
          not instance in node_instance[node_current]):
687
        feedback_fn("  - ERROR: instance %s not running on node %s" %
688
                        (instance, node_current))
689
        bad = True
690

    
691
    for node in node_instance:
692
      if (not node == node_current):
693
        if instance in node_instance[node]:
694
          feedback_fn("  - ERROR: instance %s should not run on node %s" %
695
                          (instance, node))
696
          bad = True
697

    
698
    return bad
699

    
700
  def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
701
    """Verify if there are any unknown volumes in the cluster.
702

703
    The .os, .swap and backup volumes are ignored. All other volumes are
704
    reported as unknown.
705

706
    """
707
    bad = False
708

    
709
    for node in node_vol_is:
710
      for volume in node_vol_is[node]:
711
        if node not in node_vol_should or volume not in node_vol_should[node]:
712
          feedback_fn("  - ERROR: volume %s on node %s should not exist" %
713
                      (volume, node))
714
          bad = True
715
    return bad
716

    
717
  def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
718
    """Verify the list of running instances.
719

720
    This checks what instances are running but unknown to the cluster.
721

722
    """
723
    bad = False
724
    for node in node_instance:
725
      for runninginstance in node_instance[node]:
726
        if runninginstance not in instancelist:
727
          feedback_fn("  - ERROR: instance %s on node %s should not exist" %
728
                          (runninginstance, node))
729
          bad = True
730
    return bad
731

    
732
  def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
733
    """Verify N+1 Memory Resilience.
734

735
    Check that if one single node dies we can still start all the instances it
736
    was primary for.
737

738
    """
739
    bad = False
740

    
741
    for node, nodeinfo in node_info.iteritems():
742
      # This code checks that every node which is now listed as secondary has
743
      # enough memory to host all instances it is supposed to should a single
744
      # other node in the cluster fail.
745
      # FIXME: not ready for failover to an arbitrary node
746
      # FIXME: does not support file-backed instances
747
      # WARNING: we currently take into account down instances as well as up
748
      # ones, considering that even if they're down someone might want to start
749
      # them even in the event of a node failure.
750
      for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
751
        needed_mem = 0
752
        for instance in instances:
753
          bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
754
          if bep[constants.BE_AUTO_BALANCE]:
755
            needed_mem += bep[constants.BE_MEMORY]
756
        if nodeinfo['mfree'] < needed_mem:
757
          feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
758
                      " failovers should node %s fail" % (node, prinode))
759
          bad = True
760
    return bad
761

    
762
  def CheckPrereq(self):
763
    """Check prerequisites.
764

765
    Transform the list of checks we're going to skip into a set and check that
766
    all its members are valid.
767

768
    """
769
    self.skip_set = frozenset(self.op.skip_checks)
770
    if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
771
      raise errors.OpPrereqError("Invalid checks to be skipped specified")
772

    
773
  def BuildHooksEnv(self):
774
    """Build hooks env.
775

776
    Cluster-Verify hooks just rone in the post phase and their failure makes
777
    the output be logged in the verify output and the verification to fail.
778

779
    """
780
    all_nodes = self.cfg.GetNodeList()
781
    # TODO: populate the environment with useful information for verify hooks
782
    env = {}
783
    return env, [], all_nodes
784

    
785
  def Exec(self, feedback_fn):
786
    """Verify integrity of cluster, performing various test on nodes.
787

788
    """
789
    bad = False
790
    feedback_fn("* Verifying global settings")
791
    for msg in self.cfg.VerifyConfig():
792
      feedback_fn("  - ERROR: %s" % msg)
793

    
794
    vg_name = self.cfg.GetVGName()
795
    hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
796
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
797
    nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
798
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
799
    i_non_redundant = [] # Non redundant instances
800
    i_non_a_balanced = [] # Non auto-balanced instances
801
    node_volume = {}
802
    node_instance = {}
803
    node_info = {}
804
    instance_cfg = {}
805

    
806
    # FIXME: verify OS list
807
    # do local checksums
808
    file_names = []
809
    file_names.append(constants.SSL_CERT_FILE)
810
    file_names.append(constants.CLUSTER_CONF_FILE)
811
    local_checksums = utils.FingerprintFiles(file_names)
812

    
813
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
814
    all_volumeinfo = self.rpc.call_volume_list(nodelist, vg_name)
815
    all_instanceinfo = self.rpc.call_instance_list(nodelist, hypervisors)
816
    all_vglist = self.rpc.call_vg_list(nodelist)
817
    node_verify_param = {
818
      'filelist': file_names,
819
      'nodelist': nodelist,
820
      'hypervisor': hypervisors,
821
      'node-net-test': [(node.name, node.primary_ip, node.secondary_ip)
822
                        for node in nodeinfo]
823
      }
824
    all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
825
                                           self.cfg.GetClusterName())
826
    all_rversion = self.rpc.call_version(nodelist)
827
    all_ninfo = self.rpc.call_node_info(nodelist, self.cfg.GetVGName(),
828
                                        self.cfg.GetHypervisorType())
829

    
830
    cluster = self.cfg.GetClusterInfo()
831
    for node in nodelist:
832
      feedback_fn("* Verifying node %s" % node)
833
      result = self._VerifyNode(node, file_names, local_checksums,
834
                                all_vglist[node], all_nvinfo[node],
835
                                all_rversion[node], feedback_fn)
836
      bad = bad or result
837

    
838
      # node_volume
839
      volumeinfo = all_volumeinfo[node]
840

    
841
      if isinstance(volumeinfo, basestring):
842
        feedback_fn("  - ERROR: LVM problem on node %s: %s" %
843
                    (node, volumeinfo[-400:].encode('string_escape')))
844
        bad = True
845
        node_volume[node] = {}
846
      elif not isinstance(volumeinfo, dict):
847
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
848
        bad = True
849
        continue
850
      else:
851
        node_volume[node] = volumeinfo
852

    
853
      # node_instance
854
      nodeinstance = all_instanceinfo[node]
855
      if type(nodeinstance) != list:
856
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
857
        bad = True
858
        continue
859

    
860
      node_instance[node] = nodeinstance
861

    
862
      # node_info
863
      nodeinfo = all_ninfo[node]
864
      if not isinstance(nodeinfo, dict):
865
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
866
        bad = True
867
        continue
868

    
869
      try:
870
        node_info[node] = {
871
          "mfree": int(nodeinfo['memory_free']),
872
          "dfree": int(nodeinfo['vg_free']),
873
          "pinst": [],
874
          "sinst": [],
875
          # dictionary holding all instances this node is secondary for,
876
          # grouped by their primary node. Each key is a cluster node, and each
877
          # value is a list of instances which have the key as primary and the
878
          # current node as secondary.  this is handy to calculate N+1 memory
879
          # availability if you can only failover from a primary to its
880
          # secondary.
881
          "sinst-by-pnode": {},
882
        }
883
      except ValueError:
884
        feedback_fn("  - ERROR: invalid value returned from node %s" % (node,))
885
        bad = True
886
        continue
887

    
888
    node_vol_should = {}
889

    
890
    for instance in instancelist:
891
      feedback_fn("* Verifying instance %s" % instance)
892
      inst_config = self.cfg.GetInstanceInfo(instance)
893
      result =  self._VerifyInstance(instance, inst_config, node_volume,
894
                                     node_instance, feedback_fn)
895
      bad = bad or result
896

    
897
      inst_config.MapLVsByNode(node_vol_should)
898

    
899
      instance_cfg[instance] = inst_config
900

    
901
      pnode = inst_config.primary_node
902
      if pnode in node_info:
903
        node_info[pnode]['pinst'].append(instance)
904
      else:
905
        feedback_fn("  - ERROR: instance %s, connection to primary node"
906
                    " %s failed" % (instance, pnode))
907
        bad = True
908

    
909
      # If the instance is non-redundant we cannot survive losing its primary
910
      # node, so we are not N+1 compliant. On the other hand we have no disk
911
      # templates with more than one secondary so that situation is not well
912
      # supported either.
913
      # FIXME: does not support file-backed instances
914
      if len(inst_config.secondary_nodes) == 0:
915
        i_non_redundant.append(instance)
916
      elif len(inst_config.secondary_nodes) > 1:
917
        feedback_fn("  - WARNING: multiple secondaries for instance %s"
918
                    % instance)
919

    
920
      if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
921
        i_non_a_balanced.append(instance)
922

    
923
      for snode in inst_config.secondary_nodes:
924
        if snode in node_info:
925
          node_info[snode]['sinst'].append(instance)
926
          if pnode not in node_info[snode]['sinst-by-pnode']:
927
            node_info[snode]['sinst-by-pnode'][pnode] = []
928
          node_info[snode]['sinst-by-pnode'][pnode].append(instance)
929
        else:
930
          feedback_fn("  - ERROR: instance %s, connection to secondary node"
931
                      " %s failed" % (instance, snode))
932

    
933
    feedback_fn("* Verifying orphan volumes")
934
    result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
935
                                       feedback_fn)
936
    bad = bad or result
937

    
938
    feedback_fn("* Verifying remaining instances")
939
    result = self._VerifyOrphanInstances(instancelist, node_instance,
940
                                         feedback_fn)
941
    bad = bad or result
942

    
943
    if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
944
      feedback_fn("* Verifying N+1 Memory redundancy")
945
      result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
946
      bad = bad or result
947

    
948
    feedback_fn("* Other Notes")
949
    if i_non_redundant:
950
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
951
                  % len(i_non_redundant))
952

    
953
    if i_non_a_balanced:
954
      feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
955
                  % len(i_non_a_balanced))
956

    
957
    return not bad
958

    
959
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
960
    """Analize the post-hooks' result
961

962
    This method analyses the hook result, handles it, and sends some
963
    nicely-formatted feedback back to the user.
964

965
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
966
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
967
    @param hooks_results: the results of the multi-node hooks rpc call
968
    @param feedback_fn: function used send feedback back to the caller
969
    @param lu_result: previous Exec result
970
    @return: the new Exec result, based on the previous result
971
        and hook results
972

973
    """
974
    # We only really run POST phase hooks, and are only interested in
975
    # their results
976
    if phase == constants.HOOKS_PHASE_POST:
977
      # Used to change hooks' output to proper indentation
978
      indent_re = re.compile('^', re.M)
979
      feedback_fn("* Hooks Results")
980
      if not hooks_results:
981
        feedback_fn("  - ERROR: general communication failure")
982
        lu_result = 1
983
      else:
984
        for node_name in hooks_results:
985
          show_node_header = True
986
          res = hooks_results[node_name]
987
          if res is False or not isinstance(res, list):
988
            feedback_fn("    Communication failure")
989
            lu_result = 1
990
            continue
991
          for script, hkr, output in res:
992
            if hkr == constants.HKR_FAIL:
993
              # The node header is only shown once, if there are
994
              # failing hooks on that node
995
              if show_node_header:
996
                feedback_fn("  Node %s:" % node_name)
997
                show_node_header = False
998
              feedback_fn("    ERROR: Script %s failed, output:" % script)
999
              output = indent_re.sub('      ', output)
1000
              feedback_fn("%s" % output)
1001
              lu_result = 1
1002

    
1003
      return lu_result
1004

    
1005

    
1006
class LUVerifyDisks(NoHooksLU):
1007
  """Verifies the cluster disks status.
1008

1009
  """
1010
  _OP_REQP = []
1011
  REQ_BGL = False
1012

    
1013
  def ExpandNames(self):
1014
    self.needed_locks = {
1015
      locking.LEVEL_NODE: locking.ALL_SET,
1016
      locking.LEVEL_INSTANCE: locking.ALL_SET,
1017
    }
1018
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1019

    
1020
  def CheckPrereq(self):
1021
    """Check prerequisites.
1022

1023
    This has no prerequisites.
1024

1025
    """
1026
    pass
1027

    
1028
  def Exec(self, feedback_fn):
1029
    """Verify integrity of cluster disks.
1030

1031
    """
1032
    result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
1033

    
1034
    vg_name = self.cfg.GetVGName()
1035
    nodes = utils.NiceSort(self.cfg.GetNodeList())
1036
    instances = [self.cfg.GetInstanceInfo(name)
1037
                 for name in self.cfg.GetInstanceList()]
1038

    
1039
    nv_dict = {}
1040
    for inst in instances:
1041
      inst_lvs = {}
1042
      if (inst.status != "up" or
1043
          inst.disk_template not in constants.DTS_NET_MIRROR):
1044
        continue
1045
      inst.MapLVsByNode(inst_lvs)
1046
      # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1047
      for node, vol_list in inst_lvs.iteritems():
1048
        for vol in vol_list:
1049
          nv_dict[(node, vol)] = inst
1050

    
1051
    if not nv_dict:
1052
      return result
1053

    
1054
    node_lvs = self.rpc.call_volume_list(nodes, vg_name)
1055

    
1056
    to_act = set()
1057
    for node in nodes:
1058
      # node_volume
1059
      lvs = node_lvs[node]
1060

    
1061
      if isinstance(lvs, basestring):
1062
        logging.warning("Error enumerating LVs on node %s: %s", node, lvs)
1063
        res_nlvm[node] = lvs
1064
      elif not isinstance(lvs, dict):
1065
        logging.warning("Connection to node %s failed or invalid data"
1066
                        " returned", node)
1067
        res_nodes.append(node)
1068
        continue
1069

    
1070
      for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
1071
        inst = nv_dict.pop((node, lv_name), None)
1072
        if (not lv_online and inst is not None
1073
            and inst.name not in res_instances):
1074
          res_instances.append(inst.name)
1075

    
1076
    # any leftover items in nv_dict are missing LVs, let's arrange the
1077
    # data better
1078
    for key, inst in nv_dict.iteritems():
1079
      if inst.name not in res_missing:
1080
        res_missing[inst.name] = []
1081
      res_missing[inst.name].append(key)
1082

    
1083
    return result
1084

    
1085

    
1086
class LURenameCluster(LogicalUnit):
1087
  """Rename the cluster.
1088

1089
  """
1090
  HPATH = "cluster-rename"
1091
  HTYPE = constants.HTYPE_CLUSTER
1092
  _OP_REQP = ["name"]
1093

    
1094
  def BuildHooksEnv(self):
1095
    """Build hooks env.
1096

1097
    """
1098
    env = {
1099
      "OP_TARGET": self.cfg.GetClusterName(),
1100
      "NEW_NAME": self.op.name,
1101
      }
1102
    mn = self.cfg.GetMasterNode()
1103
    return env, [mn], [mn]
1104

    
1105
  def CheckPrereq(self):
1106
    """Verify that the passed name is a valid one.
1107

1108
    """
1109
    hostname = utils.HostInfo(self.op.name)
1110

    
1111
    new_name = hostname.name
1112
    self.ip = new_ip = hostname.ip
1113
    old_name = self.cfg.GetClusterName()
1114
    old_ip = self.cfg.GetMasterIP()
1115
    if new_name == old_name and new_ip == old_ip:
1116
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
1117
                                 " cluster has changed")
1118
    if new_ip != old_ip:
1119
      if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1120
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
1121
                                   " reachable on the network. Aborting." %
1122
                                   new_ip)
1123

    
1124
    self.op.name = new_name
1125

    
1126
  def Exec(self, feedback_fn):
1127
    """Rename the cluster.
1128

1129
    """
1130
    clustername = self.op.name
1131
    ip = self.ip
1132

    
1133
    # shutdown the master IP
1134
    master = self.cfg.GetMasterNode()
1135
    if not self.rpc.call_node_stop_master(master, False):
1136
      raise errors.OpExecError("Could not disable the master role")
1137

    
1138
    try:
1139
      # modify the sstore
1140
      # TODO: sstore
1141
      ss.SetKey(ss.SS_MASTER_IP, ip)
1142
      ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
1143

    
1144
      # Distribute updated ss config to all nodes
1145
      myself = self.cfg.GetNodeInfo(master)
1146
      dist_nodes = self.cfg.GetNodeList()
1147
      if myself.name in dist_nodes:
1148
        dist_nodes.remove(myself.name)
1149

    
1150
      logging.debug("Copying updated ssconf data to all nodes")
1151
      for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1152
        fname = ss.KeyToFilename(keyname)
1153
        result = self.rpc.call_upload_file(dist_nodes, fname)
1154
        for to_node in dist_nodes:
1155
          if not result[to_node]:
1156
            self.LogWarning("Copy of file %s to node %s failed",
1157
                            fname, to_node)
1158
    finally:
1159
      if not self.rpc.call_node_start_master(master, False):
1160
        self.LogWarning("Could not re-enable the master role on"
1161
                        " the master, please restart manually.")
1162

    
1163

    
1164
def _RecursiveCheckIfLVMBased(disk):
1165
  """Check if the given disk or its children are lvm-based.
1166

1167
  @type disk: L{objects.Disk}
1168
  @param disk: the disk to check
1169
  @rtype: booleean
1170
  @return: boolean indicating whether a LD_LV dev_type was found or not
1171

1172
  """
1173
  if disk.children:
1174
    for chdisk in disk.children:
1175
      if _RecursiveCheckIfLVMBased(chdisk):
1176
        return True
1177
  return disk.dev_type == constants.LD_LV
1178

    
1179

    
1180
class LUSetClusterParams(LogicalUnit):
1181
  """Change the parameters of the cluster.
1182

1183
  """
1184
  HPATH = "cluster-modify"
1185
  HTYPE = constants.HTYPE_CLUSTER
1186
  _OP_REQP = []
1187
  REQ_BGL = False
1188

    
1189
  def ExpandNames(self):
1190
    # FIXME: in the future maybe other cluster params won't require checking on
1191
    # all nodes to be modified.
1192
    self.needed_locks = {
1193
      locking.LEVEL_NODE: locking.ALL_SET,
1194
    }
1195
    self.share_locks[locking.LEVEL_NODE] = 1
1196

    
1197
  def BuildHooksEnv(self):
1198
    """Build hooks env.
1199

1200
    """
1201
    env = {
1202
      "OP_TARGET": self.cfg.GetClusterName(),
1203
      "NEW_VG_NAME": self.op.vg_name,
1204
      }
1205
    mn = self.cfg.GetMasterNode()
1206
    return env, [mn], [mn]
1207

    
1208
  def CheckPrereq(self):
1209
    """Check prerequisites.
1210

1211
    This checks whether the given params don't conflict and
1212
    if the given volume group is valid.
1213

1214
    """
1215
    # FIXME: This only works because there is only one parameter that can be
1216
    # changed or removed.
1217
    if self.op.vg_name is not None and not self.op.vg_name:
1218
      instances = self.cfg.GetAllInstancesInfo().values()
1219
      for inst in instances:
1220
        for disk in inst.disks:
1221
          if _RecursiveCheckIfLVMBased(disk):
1222
            raise errors.OpPrereqError("Cannot disable lvm storage while"
1223
                                       " lvm-based instances exist")
1224

    
1225
    node_list = self.acquired_locks[locking.LEVEL_NODE]
1226

    
1227
    # if vg_name not None, checks given volume group on all nodes
1228
    if self.op.vg_name:
1229
      vglist = self.rpc.call_vg_list(node_list)
1230
      for node in node_list:
1231
        vgstatus = utils.CheckVolumeGroupSize(vglist[node], self.op.vg_name,
1232
                                              constants.MIN_VG_SIZE)
1233
        if vgstatus:
1234
          raise errors.OpPrereqError("Error on node '%s': %s" %
1235
                                     (node, vgstatus))
1236

    
1237
    self.cluster = cluster = self.cfg.GetClusterInfo()
1238
    # beparams changes do not need validation (we can't validate?),
1239
    # but we still process here
1240
    if self.op.beparams:
1241
      self.new_beparams = cluster.FillDict(
1242
        cluster.beparams[constants.BEGR_DEFAULT], self.op.beparams)
1243

    
1244
    # hypervisor list/parameters
1245
    self.new_hvparams = cluster.FillDict(cluster.hvparams, {})
1246
    if self.op.hvparams:
1247
      if not isinstance(self.op.hvparams, dict):
1248
        raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1249
      for hv_name, hv_dict in self.op.hvparams.items():
1250
        if hv_name not in self.new_hvparams:
1251
          self.new_hvparams[hv_name] = hv_dict
1252
        else:
1253
          self.new_hvparams[hv_name].update(hv_dict)
1254

    
1255
    if self.op.enabled_hypervisors is not None:
1256
      self.hv_list = self.op.enabled_hypervisors
1257
    else:
1258
      self.hv_list = cluster.enabled_hypervisors
1259

    
1260
    if self.op.hvparams or self.op.enabled_hypervisors is not None:
1261
      # either the enabled list has changed, or the parameters have, validate
1262
      for hv_name, hv_params in self.new_hvparams.items():
1263
        if ((self.op.hvparams and hv_name in self.op.hvparams) or
1264
            (self.op.enabled_hypervisors and
1265
             hv_name in self.op.enabled_hypervisors)):
1266
          # either this is a new hypervisor, or its parameters have changed
1267
          hv_class = hypervisor.GetHypervisor(hv_name)
1268
          hv_class.CheckParameterSyntax(hv_params)
1269
          _CheckHVParams(self, node_list, hv_name, hv_params)
1270

    
1271
  def Exec(self, feedback_fn):
1272
    """Change the parameters of the cluster.
1273

1274
    """
1275
    if self.op.vg_name is not None:
1276
      if self.op.vg_name != self.cfg.GetVGName():
1277
        self.cfg.SetVGName(self.op.vg_name)
1278
      else:
1279
        feedback_fn("Cluster LVM configuration already in desired"
1280
                    " state, not changing")
1281
    if self.op.hvparams:
1282
      self.cluster.hvparams = self.new_hvparams
1283
    if self.op.enabled_hypervisors is not None:
1284
      self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1285
    if self.op.beparams:
1286
      self.cluster.beparams[constants.BEGR_DEFAULT] = self.new_beparams
1287
    self.cfg.Update(self.cluster)
1288

    
1289

    
1290
def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1291
  """Sleep and poll for an instance's disk to sync.
1292

1293
  """
1294
  if not instance.disks:
1295
    return True
1296

    
1297
  if not oneshot:
1298
    lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1299

    
1300
  node = instance.primary_node
1301

    
1302
  for dev in instance.disks:
1303
    lu.cfg.SetDiskID(dev, node)
1304

    
1305
  retries = 0
1306
  while True:
1307
    max_time = 0
1308
    done = True
1309
    cumul_degraded = False
1310
    rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1311
    if not rstats:
1312
      lu.LogWarning("Can't get any data from node %s", node)
1313
      retries += 1
1314
      if retries >= 10:
1315
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1316
                                 " aborting." % node)
1317
      time.sleep(6)
1318
      continue
1319
    retries = 0
1320
    for i in range(len(rstats)):
1321
      mstat = rstats[i]
1322
      if mstat is None:
1323
        lu.LogWarning("Can't compute data for node %s/%s",
1324
                           node, instance.disks[i].iv_name)
1325
        continue
1326
      # we ignore the ldisk parameter
1327
      perc_done, est_time, is_degraded, _ = mstat
1328
      cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1329
      if perc_done is not None:
1330
        done = False
1331
        if est_time is not None:
1332
          rem_time = "%d estimated seconds remaining" % est_time
1333
          max_time = est_time
1334
        else:
1335
          rem_time = "no time estimate"
1336
        lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1337
                        (instance.disks[i].iv_name, perc_done, rem_time))
1338
    if done or oneshot:
1339
      break
1340

    
1341
    time.sleep(min(60, max_time))
1342

    
1343
  if done:
1344
    lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1345
  return not cumul_degraded
1346

    
1347

    
1348
def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1349
  """Check that mirrors are not degraded.
1350

1351
  The ldisk parameter, if True, will change the test from the
1352
  is_degraded attribute (which represents overall non-ok status for
1353
  the device(s)) to the ldisk (representing the local storage status).
1354

1355
  """
1356
  lu.cfg.SetDiskID(dev, node)
1357
  if ldisk:
1358
    idx = 6
1359
  else:
1360
    idx = 5
1361

    
1362
  result = True
1363
  if on_primary or dev.AssembleOnSecondary():
1364
    rstats = lu.rpc.call_blockdev_find(node, dev)
1365
    if not rstats:
1366
      logging.warning("Node %s: disk degraded, not found or node down", node)
1367
      result = False
1368
    else:
1369
      result = result and (not rstats[idx])
1370
  if dev.children:
1371
    for child in dev.children:
1372
      result = result and _CheckDiskConsistency(lu, child, node, on_primary)
1373

    
1374
  return result
1375

    
1376

    
1377
class LUDiagnoseOS(NoHooksLU):
1378
  """Logical unit for OS diagnose/query.
1379

1380
  """
1381
  _OP_REQP = ["output_fields", "names"]
1382
  REQ_BGL = False
1383
  _FIELDS_STATIC = utils.FieldSet()
1384
  _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status")
1385

    
1386
  def ExpandNames(self):
1387
    if self.op.names:
1388
      raise errors.OpPrereqError("Selective OS query not supported")
1389

    
1390
    _CheckOutputFields(static=self._FIELDS_STATIC,
1391
                       dynamic=self._FIELDS_DYNAMIC,
1392
                       selected=self.op.output_fields)
1393

    
1394
    # Lock all nodes, in shared mode
1395
    self.needed_locks = {}
1396
    self.share_locks[locking.LEVEL_NODE] = 1
1397
    self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1398

    
1399
  def CheckPrereq(self):
1400
    """Check prerequisites.
1401

1402
    """
1403

    
1404
  @staticmethod
1405
  def _DiagnoseByOS(node_list, rlist):
1406
    """Remaps a per-node return list into an a per-os per-node dictionary
1407

1408
    @param node_list: a list with the names of all nodes
1409
    @param rlist: a map with node names as keys and OS objects as values
1410

1411
    @rtype: dict
1412
    @returns: a dictionary with osnames as keys and as value another map, with
1413
        nodes as keys and list of OS objects as values, eg::
1414

1415
          {"debian-etch": {"node1": [<object>,...],
1416
                           "node2": [<object>,]}
1417
          }
1418

1419
    """
1420
    all_os = {}
1421
    for node_name, nr in rlist.iteritems():
1422
      if not nr:
1423
        continue
1424
      for os_obj in nr:
1425
        if os_obj.name not in all_os:
1426
          # build a list of nodes for this os containing empty lists
1427
          # for each node in node_list
1428
          all_os[os_obj.name] = {}
1429
          for nname in node_list:
1430
            all_os[os_obj.name][nname] = []
1431
        all_os[os_obj.name][node_name].append(os_obj)
1432
    return all_os
1433

    
1434
  def Exec(self, feedback_fn):
1435
    """Compute the list of OSes.
1436

1437
    """
1438
    node_list = self.acquired_locks[locking.LEVEL_NODE]
1439
    node_data = self.rpc.call_os_diagnose(node_list)
1440
    if node_data == False:
1441
      raise errors.OpExecError("Can't gather the list of OSes")
1442
    pol = self._DiagnoseByOS(node_list, node_data)
1443
    output = []
1444
    for os_name, os_data in pol.iteritems():
1445
      row = []
1446
      for field in self.op.output_fields:
1447
        if field == "name":
1448
          val = os_name
1449
        elif field == "valid":
1450
          val = utils.all([osl and osl[0] for osl in os_data.values()])
1451
        elif field == "node_status":
1452
          val = {}
1453
          for node_name, nos_list in os_data.iteritems():
1454
            val[node_name] = [(v.status, v.path) for v in nos_list]
1455
        else:
1456
          raise errors.ParameterError(field)
1457
        row.append(val)
1458
      output.append(row)
1459

    
1460
    return output
1461

    
1462

    
1463
class LURemoveNode(LogicalUnit):
1464
  """Logical unit for removing a node.
1465

1466
  """
1467
  HPATH = "node-remove"
1468
  HTYPE = constants.HTYPE_NODE
1469
  _OP_REQP = ["node_name"]
1470

    
1471
  def BuildHooksEnv(self):
1472
    """Build hooks env.
1473

1474
    This doesn't run on the target node in the pre phase as a failed
1475
    node would then be impossible to remove.
1476

1477
    """
1478
    env = {
1479
      "OP_TARGET": self.op.node_name,
1480
      "NODE_NAME": self.op.node_name,
1481
      }
1482
    all_nodes = self.cfg.GetNodeList()
1483
    all_nodes.remove(self.op.node_name)
1484
    return env, all_nodes, all_nodes
1485

    
1486
  def CheckPrereq(self):
1487
    """Check prerequisites.
1488

1489
    This checks:
1490
     - the node exists in the configuration
1491
     - it does not have primary or secondary instances
1492
     - it's not the master
1493

1494
    Any errors are signalled by raising errors.OpPrereqError.
1495

1496
    """
1497
    node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1498
    if node is None:
1499
      raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1500

    
1501
    instance_list = self.cfg.GetInstanceList()
1502

    
1503
    masternode = self.cfg.GetMasterNode()
1504
    if node.name == masternode:
1505
      raise errors.OpPrereqError("Node is the master node,"
1506
                                 " you need to failover first.")
1507

    
1508
    for instance_name in instance_list:
1509
      instance = self.cfg.GetInstanceInfo(instance_name)
1510
      if node.name == instance.primary_node:
1511
        raise errors.OpPrereqError("Instance %s still running on the node,"
1512
                                   " please remove first." % instance_name)
1513
      if node.name in instance.secondary_nodes:
1514
        raise errors.OpPrereqError("Instance %s has node as a secondary,"
1515
                                   " please remove first." % instance_name)
1516
    self.op.node_name = node.name
1517
    self.node = node
1518

    
1519
  def Exec(self, feedback_fn):
1520
    """Removes the node from the cluster.
1521

1522
    """
1523
    node = self.node
1524
    logging.info("Stopping the node daemon and removing configs from node %s",
1525
                 node.name)
1526

    
1527
    self.context.RemoveNode(node.name)
1528

    
1529
    self.rpc.call_node_leave_cluster(node.name)
1530

    
1531

    
1532
class LUQueryNodes(NoHooksLU):
1533
  """Logical unit for querying nodes.
1534

1535
  """
1536
  _OP_REQP = ["output_fields", "names"]
1537
  REQ_BGL = False
1538
  _FIELDS_DYNAMIC = utils.FieldSet(
1539
    "dtotal", "dfree",
1540
    "mtotal", "mnode", "mfree",
1541
    "bootid",
1542
    "ctotal",
1543
    )
1544

    
1545
  _FIELDS_STATIC = utils.FieldSet(
1546
    "name", "pinst_cnt", "sinst_cnt",
1547
    "pinst_list", "sinst_list",
1548
    "pip", "sip", "tags",
1549
    "serial_no",
1550
    )
1551

    
1552
  def ExpandNames(self):
1553
    _CheckOutputFields(static=self._FIELDS_STATIC,
1554
                       dynamic=self._FIELDS_DYNAMIC,
1555
                       selected=self.op.output_fields)
1556

    
1557
    self.needed_locks = {}
1558
    self.share_locks[locking.LEVEL_NODE] = 1
1559

    
1560
    if self.op.names:
1561
      self.wanted = _GetWantedNodes(self, self.op.names)
1562
    else:
1563
      self.wanted = locking.ALL_SET
1564

    
1565
    self.do_locking = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
1566
    if self.do_locking:
1567
      # if we don't request only static fields, we need to lock the nodes
1568
      self.needed_locks[locking.LEVEL_NODE] = self.wanted
1569

    
1570

    
1571
  def CheckPrereq(self):
1572
    """Check prerequisites.
1573

1574
    """
1575
    # The validation of the node list is done in the _GetWantedNodes,
1576
    # if non empty, and if empty, there's no validation to do
1577
    pass
1578

    
1579
  def Exec(self, feedback_fn):
1580
    """Computes the list of nodes and their attributes.
1581

1582
    """
1583
    all_info = self.cfg.GetAllNodesInfo()
1584
    if self.do_locking:
1585
      nodenames = self.acquired_locks[locking.LEVEL_NODE]
1586
    elif self.wanted != locking.ALL_SET:
1587
      nodenames = self.wanted
1588
      missing = set(nodenames).difference(all_info.keys())
1589
      if missing:
1590
        raise errors.OpExecError(
1591
          "Some nodes were removed before retrieving their data: %s" % missing)
1592
    else:
1593
      nodenames = all_info.keys()
1594

    
1595
    nodenames = utils.NiceSort(nodenames)
1596
    nodelist = [all_info[name] for name in nodenames]
1597

    
1598
    # begin data gathering
1599

    
1600
    if self.do_locking:
1601
      live_data = {}
1602
      node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
1603
                                          self.cfg.GetHypervisorType())
1604
      for name in nodenames:
1605
        nodeinfo = node_data.get(name, None)
1606
        if nodeinfo:
1607
          live_data[name] = {
1608
            "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1609
            "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1610
            "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1611
            "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1612
            "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1613
            "ctotal": utils.TryConvert(int, nodeinfo['cpu_total']),
1614
            "bootid": nodeinfo['bootid'],
1615
            }
1616
        else:
1617
          live_data[name] = {}
1618
    else:
1619
      live_data = dict.fromkeys(nodenames, {})
1620

    
1621
    node_to_primary = dict([(name, set()) for name in nodenames])
1622
    node_to_secondary = dict([(name, set()) for name in nodenames])
1623

    
1624
    inst_fields = frozenset(("pinst_cnt", "pinst_list",
1625
                             "sinst_cnt", "sinst_list"))
1626
    if inst_fields & frozenset(self.op.output_fields):
1627
      instancelist = self.cfg.GetInstanceList()
1628

    
1629
      for instance_name in instancelist:
1630
        inst = self.cfg.GetInstanceInfo(instance_name)
1631
        if inst.primary_node in node_to_primary:
1632
          node_to_primary[inst.primary_node].add(inst.name)
1633
        for secnode in inst.secondary_nodes:
1634
          if secnode in node_to_secondary:
1635
            node_to_secondary[secnode].add(inst.name)
1636

    
1637
    # end data gathering
1638

    
1639
    output = []
1640
    for node in nodelist:
1641
      node_output = []
1642
      for field in self.op.output_fields:
1643
        if field == "name":
1644
          val = node.name
1645
        elif field == "pinst_list":
1646
          val = list(node_to_primary[node.name])
1647
        elif field == "sinst_list":
1648
          val = list(node_to_secondary[node.name])
1649
        elif field == "pinst_cnt":
1650
          val = len(node_to_primary[node.name])
1651
        elif field == "sinst_cnt":
1652
          val = len(node_to_secondary[node.name])
1653
        elif field == "pip":
1654
          val = node.primary_ip
1655
        elif field == "sip":
1656
          val = node.secondary_ip
1657
        elif field == "tags":
1658
          val = list(node.GetTags())
1659
        elif field == "serial_no":
1660
          val = node.serial_no
1661
        elif self._FIELDS_DYNAMIC.Matches(field):
1662
          val = live_data[node.name].get(field, None)
1663
        else:
1664
          raise errors.ParameterError(field)
1665
        node_output.append(val)
1666
      output.append(node_output)
1667

    
1668
    return output
1669

    
1670

    
1671
class LUQueryNodeVolumes(NoHooksLU):
1672
  """Logical unit for getting volumes on node(s).
1673

1674
  """
1675
  _OP_REQP = ["nodes", "output_fields"]
1676
  REQ_BGL = False
1677
  _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
1678
  _FIELDS_STATIC = utils.FieldSet("node")
1679

    
1680
  def ExpandNames(self):
1681
    _CheckOutputFields(static=self._FIELDS_STATIC,
1682
                       dynamic=self._FIELDS_DYNAMIC,
1683
                       selected=self.op.output_fields)
1684

    
1685
    self.needed_locks = {}
1686
    self.share_locks[locking.LEVEL_NODE] = 1
1687
    if not self.op.nodes:
1688
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1689
    else:
1690
      self.needed_locks[locking.LEVEL_NODE] = \
1691
        _GetWantedNodes(self, self.op.nodes)
1692

    
1693
  def CheckPrereq(self):
1694
    """Check prerequisites.
1695

1696
    This checks that the fields required are valid output fields.
1697

1698
    """
1699
    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
1700

    
1701
  def Exec(self, feedback_fn):
1702
    """Computes the list of nodes and their attributes.
1703

1704
    """
1705
    nodenames = self.nodes
1706
    volumes = self.rpc.call_node_volumes(nodenames)
1707

    
1708
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
1709
             in self.cfg.GetInstanceList()]
1710

    
1711
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1712

    
1713
    output = []
1714
    for node in nodenames:
1715
      if node not in volumes or not volumes[node]:
1716
        continue
1717

    
1718
      node_vols = volumes[node][:]
1719
      node_vols.sort(key=lambda vol: vol['dev'])
1720

    
1721
      for vol in node_vols:
1722
        node_output = []
1723
        for field in self.op.output_fields:
1724
          if field == "node":
1725
            val = node
1726
          elif field == "phys":
1727
            val = vol['dev']
1728
          elif field == "vg":
1729
            val = vol['vg']
1730
          elif field == "name":
1731
            val = vol['name']
1732
          elif field == "size":
1733
            val = int(float(vol['size']))
1734
          elif field == "instance":
1735
            for inst in ilist:
1736
              if node not in lv_by_node[inst]:
1737
                continue
1738
              if vol['name'] in lv_by_node[inst][node]:
1739
                val = inst.name
1740
                break
1741
            else:
1742
              val = '-'
1743
          else:
1744
            raise errors.ParameterError(field)
1745
          node_output.append(str(val))
1746

    
1747
        output.append(node_output)
1748

    
1749
    return output
1750

    
1751

    
1752
class LUAddNode(LogicalUnit):
1753
  """Logical unit for adding node to the cluster.
1754

1755
  """
1756
  HPATH = "node-add"
1757
  HTYPE = constants.HTYPE_NODE
1758
  _OP_REQP = ["node_name"]
1759

    
1760
  def BuildHooksEnv(self):
1761
    """Build hooks env.
1762

1763
    This will run on all nodes before, and on all nodes + the new node after.
1764

1765
    """
1766
    env = {
1767
      "OP_TARGET": self.op.node_name,
1768
      "NODE_NAME": self.op.node_name,
1769
      "NODE_PIP": self.op.primary_ip,
1770
      "NODE_SIP": self.op.secondary_ip,
1771
      }
1772
    nodes_0 = self.cfg.GetNodeList()
1773
    nodes_1 = nodes_0 + [self.op.node_name, ]
1774
    return env, nodes_0, nodes_1
1775

    
1776
  def CheckPrereq(self):
1777
    """Check prerequisites.
1778

1779
    This checks:
1780
     - the new node is not already in the config
1781
     - it is resolvable
1782
     - its parameters (single/dual homed) matches the cluster
1783

1784
    Any errors are signalled by raising errors.OpPrereqError.
1785

1786
    """
1787
    node_name = self.op.node_name
1788
    cfg = self.cfg
1789

    
1790
    dns_data = utils.HostInfo(node_name)
1791

    
1792
    node = dns_data.name
1793
    primary_ip = self.op.primary_ip = dns_data.ip
1794
    secondary_ip = getattr(self.op, "secondary_ip", None)
1795
    if secondary_ip is None:
1796
      secondary_ip = primary_ip
1797
    if not utils.IsValidIP(secondary_ip):
1798
      raise errors.OpPrereqError("Invalid secondary IP given")
1799
    self.op.secondary_ip = secondary_ip
1800

    
1801
    node_list = cfg.GetNodeList()
1802
    if not self.op.readd and node in node_list:
1803
      raise errors.OpPrereqError("Node %s is already in the configuration" %
1804
                                 node)
1805
    elif self.op.readd and node not in node_list:
1806
      raise errors.OpPrereqError("Node %s is not in the configuration" % node)
1807

    
1808
    for existing_node_name in node_list:
1809
      existing_node = cfg.GetNodeInfo(existing_node_name)
1810

    
1811
      if self.op.readd and node == existing_node_name:
1812
        if (existing_node.primary_ip != primary_ip or
1813
            existing_node.secondary_ip != secondary_ip):
1814
          raise errors.OpPrereqError("Readded node doesn't have the same IP"
1815
                                     " address configuration as before")
1816
        continue
1817

    
1818
      if (existing_node.primary_ip == primary_ip or
1819
          existing_node.secondary_ip == primary_ip or
1820
          existing_node.primary_ip == secondary_ip or
1821
          existing_node.secondary_ip == secondary_ip):
1822
        raise errors.OpPrereqError("New node ip address(es) conflict with"
1823
                                   " existing node %s" % existing_node.name)
1824

    
1825
    # check that the type of the node (single versus dual homed) is the
1826
    # same as for the master
1827
    myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
1828
    master_singlehomed = myself.secondary_ip == myself.primary_ip
1829
    newbie_singlehomed = secondary_ip == primary_ip
1830
    if master_singlehomed != newbie_singlehomed:
1831
      if master_singlehomed:
1832
        raise errors.OpPrereqError("The master has no private ip but the"
1833
                                   " new node has one")
1834
      else:
1835
        raise errors.OpPrereqError("The master has a private ip but the"
1836
                                   " new node doesn't have one")
1837

    
1838
    # checks reachablity
1839
    if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1840
      raise errors.OpPrereqError("Node not reachable by ping")
1841

    
1842
    if not newbie_singlehomed:
1843
      # check reachability from my secondary ip to newbie's secondary ip
1844
      if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1845
                           source=myself.secondary_ip):
1846
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1847
                                   " based ping to noded port")
1848

    
1849
    self.new_node = objects.Node(name=node,
1850
                                 primary_ip=primary_ip,
1851
                                 secondary_ip=secondary_ip)
1852

    
1853
  def Exec(self, feedback_fn):
1854
    """Adds the new node to the cluster.
1855

1856
    """
1857
    new_node = self.new_node
1858
    node = new_node.name
1859

    
1860
    # check connectivity
1861
    result = self.rpc.call_version([node])[node]
1862
    if result:
1863
      if constants.PROTOCOL_VERSION == result:
1864
        logging.info("Communication to node %s fine, sw version %s match",
1865
                     node, result)
1866
      else:
1867
        raise errors.OpExecError("Version mismatch master version %s,"
1868
                                 " node version %s" %
1869
                                 (constants.PROTOCOL_VERSION, result))
1870
    else:
1871
      raise errors.OpExecError("Cannot get version from the new node")
1872

    
1873
    # setup ssh on node
1874
    logging.info("Copy ssh key to node %s", node)
1875
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1876
    keyarray = []
1877
    keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1878
                constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1879
                priv_key, pub_key]
1880

    
1881
    for i in keyfiles:
1882
      f = open(i, 'r')
1883
      try:
1884
        keyarray.append(f.read())
1885
      finally:
1886
        f.close()
1887

    
1888
    result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
1889
                                    keyarray[2],
1890
                                    keyarray[3], keyarray[4], keyarray[5])
1891

    
1892
    if not result:
1893
      raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1894

    
1895
    # Add node to our /etc/hosts, and add key to known_hosts
1896
    utils.AddHostToEtcHosts(new_node.name)
1897

    
1898
    if new_node.secondary_ip != new_node.primary_ip:
1899
      if not self.rpc.call_node_has_ip_address(new_node.name,
1900
                                               new_node.secondary_ip):
1901
        raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1902
                                 " you gave (%s). Please fix and re-run this"
1903
                                 " command." % new_node.secondary_ip)
1904

    
1905
    node_verify_list = [self.cfg.GetMasterNode()]
1906
    node_verify_param = {
1907
      'nodelist': [node],
1908
      # TODO: do a node-net-test as well?
1909
    }
1910

    
1911
    result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
1912
                                       self.cfg.GetClusterName())
1913
    for verifier in node_verify_list:
1914
      if not result[verifier]:
1915
        raise errors.OpExecError("Cannot communicate with %s's node daemon"
1916
                                 " for remote verification" % verifier)
1917
      if result[verifier]['nodelist']:
1918
        for failed in result[verifier]['nodelist']:
1919
          feedback_fn("ssh/hostname verification failed %s -> %s" %
1920
                      (verifier, result[verifier]['nodelist'][failed]))
1921
        raise errors.OpExecError("ssh/hostname verification failed.")
1922

    
1923
    # Distribute updated /etc/hosts and known_hosts to all nodes,
1924
    # including the node just added
1925
    myself = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
1926
    dist_nodes = self.cfg.GetNodeList()
1927
    if not self.op.readd:
1928
      dist_nodes.append(node)
1929
    if myself.name in dist_nodes:
1930
      dist_nodes.remove(myself.name)
1931

    
1932
    logging.debug("Copying hosts and known_hosts to all nodes")
1933
    for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1934
      result = self.rpc.call_upload_file(dist_nodes, fname)
1935
      for to_node in dist_nodes:
1936
        if not result[to_node]:
1937
          logging.error("Copy of file %s to node %s failed", fname, to_node)
1938

    
1939
    to_copy = []
1940
    if constants.HT_XEN_HVM in self.cfg.GetClusterInfo().enabled_hypervisors:
1941
      to_copy.append(constants.VNC_PASSWORD_FILE)
1942
    for fname in to_copy:
1943
      result = self.rpc.call_upload_file([node], fname)
1944
      if not result[node]:
1945
        logging.error("Could not copy file %s to node %s", fname, node)
1946

    
1947
    if self.op.readd:
1948
      self.context.ReaddNode(new_node)
1949
    else:
1950
      self.context.AddNode(new_node)
1951

    
1952

    
1953
class LUQueryClusterInfo(NoHooksLU):
1954
  """Query cluster configuration.
1955

1956
  """
1957
  _OP_REQP = []
1958
  REQ_BGL = False
1959

    
1960
  def ExpandNames(self):
1961
    self.needed_locks = {}
1962

    
1963
  def CheckPrereq(self):
1964
    """No prerequsites needed for this LU.
1965

1966
    """
1967
    pass
1968

    
1969
  def Exec(self, feedback_fn):
1970
    """Return cluster config.
1971

1972
    """
1973
    cluster = self.cfg.GetClusterInfo()
1974
    result = {
1975
      "software_version": constants.RELEASE_VERSION,
1976
      "protocol_version": constants.PROTOCOL_VERSION,
1977
      "config_version": constants.CONFIG_VERSION,
1978
      "os_api_version": constants.OS_API_VERSION,
1979
      "export_version": constants.EXPORT_VERSION,
1980
      "architecture": (platform.architecture()[0], platform.machine()),
1981
      "name": cluster.cluster_name,
1982
      "master": cluster.master_node,
1983
      "default_hypervisor": cluster.default_hypervisor,
1984
      "enabled_hypervisors": cluster.enabled_hypervisors,
1985
      "hvparams": cluster.hvparams,
1986
      "beparams": cluster.beparams,
1987
      }
1988

    
1989
    return result
1990

    
1991

    
1992
class LUQueryConfigValues(NoHooksLU):
1993
  """Return configuration values.
1994

1995
  """
1996
  _OP_REQP = []
1997
  REQ_BGL = False
1998
  _FIELDS_DYNAMIC = utils.FieldSet()
1999
  _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag")
2000

    
2001
  def ExpandNames(self):
2002
    self.needed_locks = {}
2003

    
2004
    _CheckOutputFields(static=self._FIELDS_STATIC,
2005
                       dynamic=self._FIELDS_DYNAMIC,
2006
                       selected=self.op.output_fields)
2007

    
2008
  def CheckPrereq(self):
2009
    """No prerequisites.
2010

2011
    """
2012
    pass
2013

    
2014
  def Exec(self, feedback_fn):
2015
    """Dump a representation of the cluster config to the standard output.
2016

2017
    """
2018
    values = []
2019
    for field in self.op.output_fields:
2020
      if field == "cluster_name":
2021
        entry = self.cfg.GetClusterName()
2022
      elif field == "master_node":
2023
        entry = self.cfg.GetMasterNode()
2024
      elif field == "drain_flag":
2025
        entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
2026
      else:
2027
        raise errors.ParameterError(field)
2028
      values.append(entry)
2029
    return values
2030

    
2031

    
2032
class LUActivateInstanceDisks(NoHooksLU):
2033
  """Bring up an instance's disks.
2034

2035
  """
2036
  _OP_REQP = ["instance_name"]
2037
  REQ_BGL = False
2038

    
2039
  def ExpandNames(self):
2040
    self._ExpandAndLockInstance()
2041
    self.needed_locks[locking.LEVEL_NODE] = []
2042
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2043

    
2044
  def DeclareLocks(self, level):
2045
    if level == locking.LEVEL_NODE:
2046
      self._LockInstancesNodes()
2047

    
2048
  def CheckPrereq(self):
2049
    """Check prerequisites.
2050

2051
    This checks that the instance is in the cluster.
2052

2053
    """
2054
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2055
    assert self.instance is not None, \
2056
      "Cannot retrieve locked instance %s" % self.op.instance_name
2057

    
2058
  def Exec(self, feedback_fn):
2059
    """Activate the disks.
2060

2061
    """
2062
    disks_ok, disks_info = _AssembleInstanceDisks(self, self.instance)
2063
    if not disks_ok:
2064
      raise errors.OpExecError("Cannot activate block devices")
2065

    
2066
    return disks_info
2067

    
2068

    
2069
def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
2070
  """Prepare the block devices for an instance.
2071

2072
  This sets up the block devices on all nodes.
2073

2074
  @type lu: L{LogicalUnit}
2075
  @param lu: the logical unit on whose behalf we execute
2076
  @type instance: L{objects.Instance}
2077
  @param instance: the instance for whose disks we assemble
2078
  @type ignore_secondaries: boolean
2079
  @param ignore_secondaries: if true, errors on secondary nodes
2080
      won't result in an error return from the function
2081
  @return: False if the operation failed, otherwise a list of
2082
      (host, instance_visible_name, node_visible_name)
2083
      with the mapping from node devices to instance devices
2084

2085
  """
2086
  device_info = []
2087
  disks_ok = True
2088
  iname = instance.name
2089
  # With the two passes mechanism we try to reduce the window of
2090
  # opportunity for the race condition of switching DRBD to primary
2091
  # before handshaking occured, but we do not eliminate it
2092

    
2093
  # The proper fix would be to wait (with some limits) until the
2094
  # connection has been made and drbd transitions from WFConnection
2095
  # into any other network-connected state (Connected, SyncTarget,
2096
  # SyncSource, etc.)
2097

    
2098
  # 1st pass, assemble on all nodes in secondary mode
2099
  for inst_disk in instance.disks:
2100
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2101
      lu.cfg.SetDiskID(node_disk, node)
2102
      result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
2103
      if not result:
2104
        lu.proc.LogWarning("Could not prepare block device %s on node %s"
2105
                           " (is_primary=False, pass=1)",
2106
                           inst_disk.iv_name, node)
2107
        if not ignore_secondaries:
2108
          disks_ok = False
2109

    
2110
  # FIXME: race condition on drbd migration to primary
2111

    
2112
  # 2nd pass, do only the primary node
2113
  for inst_disk in instance.disks:
2114
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2115
      if node != instance.primary_node:
2116
        continue
2117
      lu.cfg.SetDiskID(node_disk, node)
2118
      result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
2119
      if not result:
2120
        lu.proc.LogWarning("Could not prepare block device %s on node %s"
2121
                           " (is_primary=True, pass=2)",
2122
                           inst_disk.iv_name, node)
2123
        disks_ok = False
2124
    device_info.append((instance.primary_node, inst_disk.iv_name, result))
2125

    
2126
  # leave the disks configured for the primary node
2127
  # this is a workaround that would be fixed better by
2128
  # improving the logical/physical id handling
2129
  for disk in instance.disks:
2130
    lu.cfg.SetDiskID(disk, instance.primary_node)
2131

    
2132
  return disks_ok, device_info
2133

    
2134

    
2135
def _StartInstanceDisks(lu, instance, force):
2136
  """Start the disks of an instance.
2137

2138
  """
2139
  disks_ok, dummy = _AssembleInstanceDisks(lu, instance,
2140
                                           ignore_secondaries=force)
2141
  if not disks_ok:
2142
    _ShutdownInstanceDisks(lu, instance)
2143
    if force is not None and not force:
2144
      lu.proc.LogWarning("", hint="If the message above refers to a"
2145
                         " secondary node,"
2146
                         " you can retry the operation using '--force'.")
2147
    raise errors.OpExecError("Disk consistency error")
2148

    
2149

    
2150
class LUDeactivateInstanceDisks(NoHooksLU):
2151
  """Shutdown an instance's disks.
2152

2153
  """
2154
  _OP_REQP = ["instance_name"]
2155
  REQ_BGL = False
2156

    
2157
  def ExpandNames(self):
2158
    self._ExpandAndLockInstance()
2159
    self.needed_locks[locking.LEVEL_NODE] = []
2160
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2161

    
2162
  def DeclareLocks(self, level):
2163
    if level == locking.LEVEL_NODE:
2164
      self._LockInstancesNodes()
2165

    
2166
  def CheckPrereq(self):
2167
    """Check prerequisites.
2168

2169
    This checks that the instance is in the cluster.
2170

2171
    """
2172
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2173
    assert self.instance is not None, \
2174
      "Cannot retrieve locked instance %s" % self.op.instance_name
2175

    
2176
  def Exec(self, feedback_fn):
2177
    """Deactivate the disks
2178

2179
    """
2180
    instance = self.instance
2181
    _SafeShutdownInstanceDisks(self, instance)
2182

    
2183

    
2184
def _SafeShutdownInstanceDisks(lu, instance):
2185
  """Shutdown block devices of an instance.
2186

2187
  This function checks if an instance is running, before calling
2188
  _ShutdownInstanceDisks.
2189

2190
  """
2191
  ins_l = lu.rpc.call_instance_list([instance.primary_node],
2192
                                      [instance.hypervisor])
2193
  ins_l = ins_l[instance.primary_node]
2194
  if not type(ins_l) is list:
2195
    raise errors.OpExecError("Can't contact node '%s'" %
2196
                             instance.primary_node)
2197

    
2198
  if instance.name in ins_l:
2199
    raise errors.OpExecError("Instance is running, can't shutdown"
2200
                             " block devices.")
2201

    
2202
  _ShutdownInstanceDisks(lu, instance)
2203

    
2204

    
2205
def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
2206
  """Shutdown block devices of an instance.
2207

2208
  This does the shutdown on all nodes of the instance.
2209

2210
  If the ignore_primary is false, errors on the primary node are
2211
  ignored.
2212

2213
  """
2214
  result = True
2215
  for disk in instance.disks:
2216
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2217
      lu.cfg.SetDiskID(top_disk, node)
2218
      if not lu.rpc.call_blockdev_shutdown(node, top_disk):
2219
        logging.error("Could not shutdown block device %s on node %s",
2220
                      disk.iv_name, node)
2221
        if not ignore_primary or node != instance.primary_node:
2222
          result = False
2223
  return result
2224

    
2225

    
2226
def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor):
2227
  """Checks if a node has enough free memory.
2228

2229
  This function check if a given node has the needed amount of free
2230
  memory. In case the node has less memory or we cannot get the
2231
  information from the node, this function raise an OpPrereqError
2232
  exception.
2233

2234
  @type lu: C{LogicalUnit}
2235
  @param lu: a logical unit from which we get configuration data
2236
  @type node: C{str}
2237
  @param node: the node to check
2238
  @type reason: C{str}
2239
  @param reason: string to use in the error message
2240
  @type requested: C{int}
2241
  @param requested: the amount of memory in MiB to check for
2242
  @type hypervisor: C{str}
2243
  @param hypervisor: the hypervisor to ask for memory stats
2244
  @raise errors.OpPrereqError: if the node doesn't have enough memory, or
2245
      we cannot check the node
2246

2247
  """
2248
  nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor)
2249
  if not nodeinfo or not isinstance(nodeinfo, dict):
2250
    raise errors.OpPrereqError("Could not contact node %s for resource"
2251
                             " information" % (node,))
2252

    
2253
  free_mem = nodeinfo[node].get('memory_free')
2254
  if not isinstance(free_mem, int):
2255
    raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2256
                             " was '%s'" % (node, free_mem))
2257
  if requested > free_mem:
2258
    raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2259
                             " needed %s MiB, available %s MiB" %
2260
                             (node, reason, requested, free_mem))
2261

    
2262

    
2263
class LUStartupInstance(LogicalUnit):
2264
  """Starts an instance.
2265

2266
  """
2267
  HPATH = "instance-start"
2268
  HTYPE = constants.HTYPE_INSTANCE
2269
  _OP_REQP = ["instance_name", "force"]
2270
  REQ_BGL = False
2271

    
2272
  def ExpandNames(self):
2273
    self._ExpandAndLockInstance()
2274

    
2275
  def BuildHooksEnv(self):
2276
    """Build hooks env.
2277

2278
    This runs on master, primary and secondary nodes of the instance.
2279

2280
    """
2281
    env = {
2282
      "FORCE": self.op.force,
2283
      }
2284
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2285
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2286
          list(self.instance.secondary_nodes))
2287
    return env, nl, nl
2288

    
2289
  def CheckPrereq(self):
2290
    """Check prerequisites.
2291

2292
    This checks that the instance is in the cluster.
2293

2294
    """
2295
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2296
    assert self.instance is not None, \
2297
      "Cannot retrieve locked instance %s" % self.op.instance_name
2298

    
2299
    bep = self.cfg.GetClusterInfo().FillBE(instance)
2300
    # check bridges existance
2301
    _CheckInstanceBridgesExist(self, instance)
2302

    
2303
    _CheckNodeFreeMemory(self, instance.primary_node,
2304
                         "starting instance %s" % instance.name,
2305
                         bep[constants.BE_MEMORY], instance.hypervisor)
2306

    
2307
  def Exec(self, feedback_fn):
2308
    """Start the instance.
2309

2310
    """
2311
    instance = self.instance
2312
    force = self.op.force
2313
    extra_args = getattr(self.op, "extra_args", "")
2314

    
2315
    self.cfg.MarkInstanceUp(instance.name)
2316

    
2317
    node_current = instance.primary_node
2318

    
2319
    _StartInstanceDisks(self, instance, force)
2320

    
2321
    if not self.rpc.call_instance_start(node_current, instance, extra_args):
2322
      _ShutdownInstanceDisks(self, instance)
2323
      raise errors.OpExecError("Could not start instance")
2324

    
2325

    
2326
class LURebootInstance(LogicalUnit):
2327
  """Reboot an instance.
2328

2329
  """
2330
  HPATH = "instance-reboot"
2331
  HTYPE = constants.HTYPE_INSTANCE
2332
  _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2333
  REQ_BGL = False
2334

    
2335
  def ExpandNames(self):
2336
    if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2337
                                   constants.INSTANCE_REBOOT_HARD,
2338
                                   constants.INSTANCE_REBOOT_FULL]:
2339
      raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2340
                                  (constants.INSTANCE_REBOOT_SOFT,
2341
                                   constants.INSTANCE_REBOOT_HARD,
2342
                                   constants.INSTANCE_REBOOT_FULL))
2343
    self._ExpandAndLockInstance()
2344

    
2345
  def BuildHooksEnv(self):
2346
    """Build hooks env.
2347

2348
    This runs on master, primary and secondary nodes of the instance.
2349

2350
    """
2351
    env = {
2352
      "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2353
      }
2354
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2355
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2356
          list(self.instance.secondary_nodes))
2357
    return env, nl, nl
2358

    
2359
  def CheckPrereq(self):
2360
    """Check prerequisites.
2361

2362
    This checks that the instance is in the cluster.
2363

2364
    """
2365
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2366
    assert self.instance is not None, \
2367
      "Cannot retrieve locked instance %s" % self.op.instance_name
2368

    
2369
    # check bridges existance
2370
    _CheckInstanceBridgesExist(self, instance)
2371

    
2372
  def Exec(self, feedback_fn):
2373
    """Reboot the instance.
2374

2375
    """
2376
    instance = self.instance
2377
    ignore_secondaries = self.op.ignore_secondaries
2378
    reboot_type = self.op.reboot_type
2379
    extra_args = getattr(self.op, "extra_args", "")
2380

    
2381
    node_current = instance.primary_node
2382

    
2383
    if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2384
                       constants.INSTANCE_REBOOT_HARD]:
2385
      if not self.rpc.call_instance_reboot(node_current, instance,
2386
                                           reboot_type, extra_args):
2387
        raise errors.OpExecError("Could not reboot instance")
2388
    else:
2389
      if not self.rpc.call_instance_shutdown(node_current, instance):
2390
        raise errors.OpExecError("could not shutdown instance for full reboot")
2391
      _ShutdownInstanceDisks(self, instance)
2392
      _StartInstanceDisks(self, instance, ignore_secondaries)
2393
      if not self.rpc.call_instance_start(node_current, instance, extra_args):
2394
        _ShutdownInstanceDisks(self, instance)
2395
        raise errors.OpExecError("Could not start instance for full reboot")
2396

    
2397
    self.cfg.MarkInstanceUp(instance.name)
2398

    
2399

    
2400
class LUShutdownInstance(LogicalUnit):
2401
  """Shutdown an instance.
2402

2403
  """
2404
  HPATH = "instance-stop"
2405
  HTYPE = constants.HTYPE_INSTANCE
2406
  _OP_REQP = ["instance_name"]
2407
  REQ_BGL = False
2408

    
2409
  def ExpandNames(self):
2410
    self._ExpandAndLockInstance()
2411

    
2412
  def BuildHooksEnv(self):
2413
    """Build hooks env.
2414

2415
    This runs on master, primary and secondary nodes of the instance.
2416

2417
    """
2418
    env = _BuildInstanceHookEnvByObject(self, self.instance)
2419
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2420
          list(self.instance.secondary_nodes))
2421
    return env, nl, nl
2422

    
2423
  def CheckPrereq(self):
2424
    """Check prerequisites.
2425

2426
    This checks that the instance is in the cluster.
2427

2428
    """
2429
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2430
    assert self.instance is not None, \
2431
      "Cannot retrieve locked instance %s" % self.op.instance_name
2432

    
2433
  def Exec(self, feedback_fn):
2434
    """Shutdown the instance.
2435

2436
    """
2437
    instance = self.instance
2438
    node_current = instance.primary_node
2439
    self.cfg.MarkInstanceDown(instance.name)
2440
    if not self.rpc.call_instance_shutdown(node_current, instance):
2441
      self.proc.LogWarning("Could not shutdown instance")
2442

    
2443
    _ShutdownInstanceDisks(self, instance)
2444

    
2445

    
2446
class LUReinstallInstance(LogicalUnit):
2447
  """Reinstall an instance.
2448

2449
  """
2450
  HPATH = "instance-reinstall"
2451
  HTYPE = constants.HTYPE_INSTANCE
2452
  _OP_REQP = ["instance_name"]
2453
  REQ_BGL = False
2454

    
2455
  def ExpandNames(self):
2456
    self._ExpandAndLockInstance()
2457

    
2458
  def BuildHooksEnv(self):
2459
    """Build hooks env.
2460

2461
    This runs on master, primary and secondary nodes of the instance.
2462

2463
    """
2464
    env = _BuildInstanceHookEnvByObject(self, self.instance)
2465
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2466
          list(self.instance.secondary_nodes))
2467
    return env, nl, nl
2468

    
2469
  def CheckPrereq(self):
2470
    """Check prerequisites.
2471

2472
    This checks that the instance is in the cluster and is not running.
2473

2474
    """
2475
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2476
    assert instance is not None, \
2477
      "Cannot retrieve locked instance %s" % self.op.instance_name
2478

    
2479
    if instance.disk_template == constants.DT_DISKLESS:
2480
      raise errors.OpPrereqError("Instance '%s' has no disks" %
2481
                                 self.op.instance_name)
2482
    if instance.status != "down":
2483
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2484
                                 self.op.instance_name)
2485
    remote_info = self.rpc.call_instance_info(instance.primary_node,
2486
                                              instance.name,
2487
                                              instance.hypervisor)
2488
    if remote_info:
2489
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2490
                                 (self.op.instance_name,
2491
                                  instance.primary_node))
2492

    
2493
    self.op.os_type = getattr(self.op, "os_type", None)
2494
    if self.op.os_type is not None:
2495
      # OS verification
2496
      pnode = self.cfg.GetNodeInfo(
2497
        self.cfg.ExpandNodeName(instance.primary_node))
2498
      if pnode is None:
2499
        raise errors.OpPrereqError("Primary node '%s' is unknown" %
2500
                                   self.op.pnode)
2501
      os_obj = self.rpc.call_os_get(pnode.name, self.op.os_type)
2502
      if not os_obj:
2503
        raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2504
                                   " primary node"  % self.op.os_type)
2505

    
2506
    self.instance = instance
2507

    
2508
  def Exec(self, feedback_fn):
2509
    """Reinstall the instance.
2510

2511
    """
2512
    inst = self.instance
2513

    
2514
    if self.op.os_type is not None:
2515
      feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2516
      inst.os = self.op.os_type
2517
      self.cfg.Update(inst)
2518

    
2519
    _StartInstanceDisks(self, inst, None)
2520
    try:
2521
      feedback_fn("Running the instance OS create scripts...")
2522
      if not self.rpc.call_instance_os_add(inst.primary_node, inst):
2523
        raise errors.OpExecError("Could not install OS for instance %s"
2524
                                 " on node %s" %
2525
                                 (inst.name, inst.primary_node))
2526
    finally:
2527
      _ShutdownInstanceDisks(self, inst)
2528

    
2529

    
2530
class LURenameInstance(LogicalUnit):
2531
  """Rename an instance.
2532

2533
  """
2534
  HPATH = "instance-rename"
2535
  HTYPE = constants.HTYPE_INSTANCE
2536
  _OP_REQP = ["instance_name", "new_name"]
2537

    
2538
  def BuildHooksEnv(self):
2539
    """Build hooks env.
2540

2541
    This runs on master, primary and secondary nodes of the instance.
2542

2543
    """
2544
    env = _BuildInstanceHookEnvByObject(self, self.instance)
2545
    env["INSTANCE_NEW_NAME"] = self.op.new_name
2546
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2547
          list(self.instance.secondary_nodes))
2548
    return env, nl, nl
2549

    
2550
  def CheckPrereq(self):
2551
    """Check prerequisites.
2552

2553
    This checks that the instance is in the cluster and is not running.
2554

2555
    """
2556
    instance = self.cfg.GetInstanceInfo(
2557
      self.cfg.ExpandInstanceName(self.op.instance_name))
2558
    if instance is None:
2559
      raise errors.OpPrereqError("Instance '%s' not known" %
2560
                                 self.op.instance_name)
2561
    if instance.status != "down":
2562
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2563
                                 self.op.instance_name)
2564
    remote_info = self.rpc.call_instance_info(instance.primary_node,
2565
                                              instance.name,
2566
                                              instance.hypervisor)
2567
    if remote_info:
2568
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2569
                                 (self.op.instance_name,
2570
                                  instance.primary_node))
2571
    self.instance = instance
2572

    
2573
    # new name verification
2574
    name_info = utils.HostInfo(self.op.new_name)
2575

    
2576
    self.op.new_name = new_name = name_info.name
2577
    instance_list = self.cfg.GetInstanceList()
2578
    if new_name in instance_list:
2579
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2580
                                 new_name)
2581

    
2582
    if not getattr(self.op, "ignore_ip", False):
2583
      if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
2584
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
2585
                                   (name_info.ip, new_name))
2586

    
2587

    
2588
  def Exec(self, feedback_fn):
2589
    """Reinstall the instance.
2590

2591
    """
2592
    inst = self.instance
2593
    old_name = inst.name
2594

    
2595
    if inst.disk_template == constants.DT_FILE:
2596
      old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2597

    
2598
    self.cfg.RenameInstance(inst.name, self.op.new_name)
2599
    # Change the instance lock. This is definitely safe while we hold the BGL
2600
    self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
2601
    self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
2602

    
2603
    # re-read the instance from the configuration after rename
2604
    inst = self.cfg.GetInstanceInfo(self.op.new_name)
2605

    
2606
    if inst.disk_template == constants.DT_FILE:
2607
      new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2608
      result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
2609
                                                     old_file_storage_dir,
2610
                                                     new_file_storage_dir)
2611

    
2612
      if not result:
2613
        raise errors.OpExecError("Could not connect to node '%s' to rename"
2614
                                 " directory '%s' to '%s' (but the instance"
2615
                                 " has been renamed in Ganeti)" % (
2616
                                 inst.primary_node, old_file_storage_dir,
2617
                                 new_file_storage_dir))
2618

    
2619
      if not result[0]:
2620
        raise errors.OpExecError("Could not rename directory '%s' to '%s'"
2621
                                 " (but the instance has been renamed in"
2622
                                 " Ganeti)" % (old_file_storage_dir,
2623
                                               new_file_storage_dir))
2624

    
2625
    _StartInstanceDisks(self, inst, None)
2626
    try:
2627
      if not self.rpc.call_instance_run_rename(inst.primary_node, inst,
2628
                                               old_name):
2629
        msg = ("Could not run OS rename script for instance %s on node %s"
2630
               " (but the instance has been renamed in Ganeti)" %
2631
               (inst.name, inst.primary_node))
2632
        self.proc.LogWarning(msg)
2633
    finally:
2634
      _ShutdownInstanceDisks(self, inst)
2635

    
2636

    
2637
class LURemoveInstance(LogicalUnit):
2638
  """Remove an instance.
2639

2640
  """
2641
  HPATH = "instance-remove"
2642
  HTYPE = constants.HTYPE_INSTANCE
2643
  _OP_REQP = ["instance_name", "ignore_failures"]
2644
  REQ_BGL = False
2645

    
2646
  def ExpandNames(self):
2647
    self._ExpandAndLockInstance()
2648
    self.needed_locks[locking.LEVEL_NODE] = []
2649
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2650

    
2651
  def DeclareLocks(self, level):
2652
    if level == locking.LEVEL_NODE:
2653
      self._LockInstancesNodes()
2654

    
2655
  def BuildHooksEnv(self):
2656
    """Build hooks env.
2657

2658
    This runs on master, primary and secondary nodes of the instance.
2659

2660
    """
2661
    env = _BuildInstanceHookEnvByObject(self, self.instance)
2662
    nl = [self.cfg.GetMasterNode()]
2663
    return env, nl, nl
2664

    
2665
  def CheckPrereq(self):
2666
    """Check prerequisites.
2667

2668
    This checks that the instance is in the cluster.
2669

2670
    """
2671
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2672
    assert self.instance is not None, \
2673
      "Cannot retrieve locked instance %s" % self.op.instance_name
2674

    
2675
  def Exec(self, feedback_fn):
2676
    """Remove the instance.
2677

2678
    """
2679
    instance = self.instance
2680
    logging.info("Shutting down instance %s on node %s",
2681
                 instance.name, instance.primary_node)
2682

    
2683
    if not self.rpc.call_instance_shutdown(instance.primary_node, instance):
2684
      if self.op.ignore_failures:
2685
        feedback_fn("Warning: can't shutdown instance")
2686
      else:
2687
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2688
                                 (instance.name, instance.primary_node))
2689

    
2690
    logging.info("Removing block devices for instance %s", instance.name)
2691

    
2692
    if not _RemoveDisks(self, instance):
2693
      if self.op.ignore_failures:
2694
        feedback_fn("Warning: can't remove instance's disks")
2695
      else:
2696
        raise errors.OpExecError("Can't remove instance's disks")
2697

    
2698
    logging.info("Removing instance %s out of cluster config", instance.name)
2699

    
2700
    self.cfg.RemoveInstance(instance.name)
2701
    self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
2702

    
2703

    
2704
class LUQueryInstances(NoHooksLU):
2705
  """Logical unit for querying instances.
2706

2707
  """
2708
  _OP_REQP = ["output_fields", "names"]
2709
  REQ_BGL = False
2710
  _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
2711
                                    "admin_state", "admin_ram",
2712
                                    "disk_template", "ip", "mac", "bridge",
2713
                                    "sda_size", "sdb_size", "vcpus", "tags",
2714
                                    "network_port", "beparams",
2715
                                    "(disk).(size)/([0-9]+)",
2716
                                    "(disk).(sizes)",
2717
                                    "(nic).(mac|ip|bridge)/([0-9]+)",
2718
                                    "(nic).(macs|ips|bridges)",
2719
                                    "(disk|nic).(count)",
2720
                                    "serial_no", "hypervisor", "hvparams",] +
2721
                                  ["hv/%s" % name
2722
                                   for name in constants.HVS_PARAMETERS] +
2723
                                  ["be/%s" % name
2724
                                   for name in constants.BES_PARAMETERS])
2725
  _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
2726

    
2727

    
2728
  def ExpandNames(self):
2729
    _CheckOutputFields(static=self._FIELDS_STATIC,
2730
                       dynamic=self._FIELDS_DYNAMIC,
2731
                       selected=self.op.output_fields)
2732

    
2733
    self.needed_locks = {}
2734
    self.share_locks[locking.LEVEL_INSTANCE] = 1
2735
    self.share_locks[locking.LEVEL_NODE] = 1
2736

    
2737
    if self.op.names:
2738
      self.wanted = _GetWantedInstances(self, self.op.names)
2739
    else:
2740
      self.wanted = locking.ALL_SET
2741

    
2742
    self.do_locking = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
2743
    if self.do_locking:
2744
      self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
2745
      self.needed_locks[locking.LEVEL_NODE] = []
2746
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2747

    
2748
  def DeclareLocks(self, level):
2749
    if level == locking.LEVEL_NODE and self.do_locking:
2750
      self._LockInstancesNodes()
2751

    
2752
  def CheckPrereq(self):
2753
    """Check prerequisites.
2754

2755
    """
2756
    pass
2757

    
2758
  def Exec(self, feedback_fn):
2759
    """Computes the list of nodes and their attributes.
2760

2761
    """
2762
    all_info = self.cfg.GetAllInstancesInfo()
2763
    if self.do_locking:
2764
      instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
2765
    elif self.wanted != locking.ALL_SET:
2766
      instance_names = self.wanted
2767
      missing = set(instance_names).difference(all_info.keys())
2768
      if missing:
2769
        raise errors.OpExecError(
2770
          "Some instances were removed before retrieving their data: %s"
2771
          % missing)
2772
    else:
2773
      instance_names = all_info.keys()
2774

    
2775
    instance_names = utils.NiceSort(instance_names)
2776
    instance_list = [all_info[iname] for iname in instance_names]
2777

    
2778
    # begin data gathering
2779

    
2780
    nodes = frozenset([inst.primary_node for inst in instance_list])
2781
    hv_list = list(set([inst.hypervisor for inst in instance_list]))
2782

    
2783
    bad_nodes = []
2784
    if self.do_locking:
2785
      live_data = {}
2786
      node_data = self.rpc.call_all_instances_info(nodes, hv_list)
2787
      for name in nodes:
2788
        result = node_data[name]
2789
        if result:
2790
          live_data.update(result)
2791
        elif result == False:
2792
          bad_nodes.append(name)
2793
        # else no instance is alive
2794
    else:
2795
      live_data = dict([(name, {}) for name in instance_names])
2796

    
2797
    # end data gathering
2798

    
2799
    HVPREFIX = "hv/"
2800
    BEPREFIX = "be/"
2801
    output = []
2802
    for instance in instance_list:
2803
      iout = []
2804
      i_hv = self.cfg.GetClusterInfo().FillHV(instance)
2805
      i_be = self.cfg.GetClusterInfo().FillBE(instance)
2806
      for field in self.op.output_fields:
2807
        st_match = self._FIELDS_STATIC.Matches(field)
2808
        if field == "name":
2809
          val = instance.name
2810
        elif field == "os":
2811
          val = instance.os
2812
        elif field == "pnode":
2813
          val = instance.primary_node
2814
        elif field == "snodes":
2815
          val = list(instance.secondary_nodes)
2816
        elif field == "admin_state":
2817
          val = (instance.status != "down")
2818
        elif field == "oper_state":
2819
          if instance.primary_node in bad_nodes:
2820
            val = None
2821
          else:
2822
            val = bool(live_data.get(instance.name))
2823
        elif field == "status":
2824
          if instance.primary_node in bad_nodes:
2825
            val = "ERROR_nodedown"
2826
          else:
2827
            running = bool(live_data.get(instance.name))
2828
            if running:
2829
              if instance.status != "down":
2830
                val = "running"
2831
              else:
2832
                val = "ERROR_up"
2833
            else:
2834
              if instance.status != "down":
2835
                val = "ERROR_down"
2836
              else:
2837
                val = "ADMIN_down"
2838
        elif field == "oper_ram":
2839
          if instance.primary_node in bad_nodes:
2840
            val = None
2841
          elif instance.name in live_data:
2842
            val = live_data[instance.name].get("memory", "?")
2843
          else:
2844
            val = "-"
2845
        elif field == "disk_template":
2846
          val = instance.disk_template
2847
        elif field == "ip":
2848
          val = instance.nics[0].ip
2849
        elif field == "bridge":
2850
          val = instance.nics[0].bridge
2851
        elif field == "mac":
2852
          val = instance.nics[0].mac
2853
        elif field == "sda_size" or field == "sdb_size":
2854
          idx = ord(field[2]) - ord('a')
2855
          try:
2856
            val = instance.FindDisk(idx).size
2857
          except errors.OpPrereqError:
2858
            val = None
2859
        elif field == "tags":
2860
          val = list(instance.GetTags())
2861
        elif field == "serial_no":
2862
          val = instance.serial_no
2863
        elif field == "network_port":
2864
          val = instance.network_port
2865
        elif field == "hypervisor":
2866
          val = instance.hypervisor
2867
        elif field == "hvparams":
2868
          val = i_hv
2869
        elif (field.startswith(HVPREFIX) and
2870
              field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
2871
          val = i_hv.get(field[len(HVPREFIX):], None)
2872
        elif field == "beparams":
2873
          val = i_be
2874
        elif (field.startswith(BEPREFIX) and
2875
              field[len(BEPREFIX):] in constants.BES_PARAMETERS):
2876
          val = i_be.get(field[len(BEPREFIX):], None)
2877
        elif st_match and st_match.groups():
2878
          # matches a variable list
2879
          st_groups = st_match.groups()
2880
          if st_groups and st_groups[0] == "disk":
2881
            if st_groups[1] == "count":
2882
              val = len(instance.disks)
2883
            elif st_groups[1] == "sizes":
2884
              val = [disk.size for disk in instance.disks]
2885
            elif st_groups[1] == "size":
2886
              try:
2887
                val = instance.FindDisk(st_groups[2]).size
2888
              except errors.OpPrereqError:
2889
                val = None
2890
            else:
2891
              assert False, "Unhandled disk parameter"
2892
          elif st_groups[0] == "nic":
2893
            if st_groups[1] == "count":
2894
              val = len(instance.nics)
2895
            elif st_groups[1] == "macs":
2896
              val = [nic.mac for nic in instance.nics]
2897
            elif st_groups[1] == "ips":
2898
              val = [nic.ip for nic in instance.nics]
2899
            elif st_groups[1] == "bridges":
2900
              val = [nic.bridge for nic in instance.nics]
2901
            else:
2902
              # index-based item
2903
              nic_idx = int(st_groups[2])
2904
              if nic_idx >= len(instance.nics):
2905
                val = None
2906
              else:
2907
                if st_groups[1] == "mac":
2908
                  val = instance.nics[nic_idx].mac
2909
                elif st_groups[1] == "ip":
2910
                  val = instance.nics[nic_idx].ip
2911
                elif st_groups[1] == "bridge":
2912
                  val = instance.nics[nic_idx].bridge
2913
                else:
2914
                  assert False, "Unhandled NIC parameter"
2915
          else:
2916
            assert False, "Unhandled variable parameter"
2917
        else:
2918
          raise errors.ParameterError(field)
2919
        iout.append(val)
2920
      output.append(iout)
2921

    
2922
    return output
2923

    
2924

    
2925
class LUFailoverInstance(LogicalUnit):
2926
  """Failover an instance.
2927

2928
  """
2929
  HPATH = "instance-failover"
2930
  HTYPE = constants.HTYPE_INSTANCE
2931
  _OP_REQP = ["instance_name", "ignore_consistency"]
2932
  REQ_BGL = False
2933

    
2934
  def ExpandNames(self):
2935
    self._ExpandAndLockInstance()
2936
    self.needed_locks[locking.LEVEL_NODE] = []
2937
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2938

    
2939
  def DeclareLocks(self, level):
2940
    if level == locking.LEVEL_NODE:
2941
      self._LockInstancesNodes()
2942

    
2943
  def BuildHooksEnv(self):
2944
    """Build hooks env.
2945

2946
    This runs on master, primary and secondary nodes of the instance.
2947

2948
    """
2949
    env = {
2950
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2951
      }
2952
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2953
    nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
2954
    return env, nl, nl
2955

    
2956
  def CheckPrereq(self):
2957
    """Check prerequisites.
2958

2959
    This checks that the instance is in the cluster.
2960

2961
    """
2962
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2963
    assert self.instance is not None, \
2964
      "Cannot retrieve locked instance %s" % self.op.instance_name
2965

    
2966
    bep = self.cfg.GetClusterInfo().FillBE(instance)
2967
    if instance.disk_template not in constants.DTS_NET_MIRROR:
2968
      raise errors.OpPrereqError("Instance's disk layout is not"
2969
                                 " network mirrored, cannot failover.")
2970

    
2971
    secondary_nodes = instance.secondary_nodes
2972
    if not secondary_nodes:
2973
      raise errors.ProgrammerError("no secondary node but using "
2974
                                   "a mirrored disk template")
2975

    
2976
    target_node = secondary_nodes[0]
2977
    # check memory requirements on the secondary node
2978
    _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
2979
                         instance.name, bep[constants.BE_MEMORY],
2980
                         instance.hypervisor)
2981

    
2982
    # check bridge existance
2983
    brlist = [nic.bridge for nic in instance.nics]
2984
    if not self.rpc.call_bridges_exist(target_node, brlist):
2985
      raise errors.OpPrereqError("One or more target bridges %s does not"
2986
                                 " exist on destination node '%s'" %
2987
                                 (brlist, target_node))
2988

    
2989
  def Exec(self, feedback_fn):
2990
    """Failover an instance.
2991

2992
    The failover is done by shutting it down on its present node and
2993
    starting it on the secondary.
2994

2995
    """
2996
    instance = self.instance
2997

    
2998
    source_node = instance.primary_node
2999
    target_node = instance.secondary_nodes[0]
3000

    
3001
    feedback_fn("* checking disk consistency between source and target")
3002
    for dev in instance.disks:
3003
      # for drbd, these are drbd over lvm
3004
      if not _CheckDiskConsistency(self, dev, target_node, False):
3005
        if instance.status == "up" and not self.op.ignore_consistency:
3006
          raise errors.OpExecError("Disk %s is degraded on target node,"
3007
                                   " aborting failover." % dev.iv_name)
3008

    
3009
    feedback_fn("* shutting down instance on source node")
3010
    logging.info("Shutting down instance %s on node %s",
3011
                 instance.name, source_node)
3012

    
3013
    if not self.rpc.call_instance_shutdown(source_node, instance):
3014
      if self.op.ignore_consistency:
3015
        self.proc.LogWarning("Could not shutdown instance %s on node %s."
3016
                             " Proceeding"
3017
                             " anyway. Please make sure node %s is down",
3018
                             instance.name, source_node, source_node)
3019
      else:
3020
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
3021
                                 (instance.name, source_node))
3022

    
3023
    feedback_fn("* deactivating the instance's disks on source node")
3024
    if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
3025
      raise errors.OpExecError("Can't shut down the instance's disks.")
3026

    
3027
    instance.primary_node = target_node
3028
    # distribute new instance config to the other nodes
3029
    self.cfg.Update(instance)
3030

    
3031
    # Only start the instance if it's marked as up
3032
    if instance.status == "up":
3033
      feedback_fn("* activating the instance's disks on target node")
3034
      logging.info("Starting instance %s on node %s",
3035
                   instance.name, target_node)
3036

    
3037
      disks_ok, dummy = _AssembleInstanceDisks(self, instance,
3038
                                               ignore_secondaries=True)
3039
      if not disks_ok:
3040
        _ShutdownInstanceDisks(self, instance)
3041
        raise errors.OpExecError("Can't activate the instance's disks")
3042

    
3043
      feedback_fn("* starting the instance on the target node")
3044
      if not self.rpc.call_instance_start(target_node, instance, None):
3045
        _ShutdownInstanceDisks(self, instance)
3046
        raise errors.OpExecError("Could not start instance %s on node %s." %
3047
                                 (instance.name, target_node))
3048

    
3049

    
3050
def _CreateBlockDevOnPrimary(lu, node, instance, device, info):
3051
  """Create a tree of block devices on the primary node.
3052

3053
  This always creates all devices.
3054

3055
  """
3056
  if device.children:
3057
    for child in device.children:
3058
      if not _CreateBlockDevOnPrimary(lu, node, instance, child, info):
3059
        return False
3060

    
3061
  lu.cfg.SetDiskID(device, node)
3062
  new_id = lu.rpc.call_blockdev_create(node, device, device.size,
3063
                                       instance.name, True, info)
3064
  if not new_id:
3065
    return False
3066
  if device.physical_id is None:
3067
    device.physical_id = new_id
3068
  return True
3069

    
3070

    
3071
def _CreateBlockDevOnSecondary(lu, node, instance, device, force, info):
3072
  """Create a tree of block devices on a secondary node.
3073

3074
  If this device type has to be created on secondaries, create it and
3075
  all its children.
3076

3077
  If not, just recurse to children keeping the same 'force' value.
3078

3079
  """
3080
  if device.CreateOnSecondary():
3081
    force = True
3082
  if device.children:
3083
    for child in device.children:
3084
      if not _CreateBlockDevOnSecondary(lu, node, instance,
3085
                                        child, force, info):
3086
        return False
3087

    
3088
  if not force:
3089
    return True
3090
  lu.cfg.SetDiskID(device, node)
3091
  new_id = lu.rpc.call_blockdev_create(node, device, device.size,
3092
                                       instance.name, False, info)
3093
  if not new_id:
3094
    return False
3095
  if device.physical_id is None:
3096
    device.physical_id = new_id
3097
  return True
3098

    
3099

    
3100
def _GenerateUniqueNames(lu, exts):
3101
  """Generate a suitable LV name.
3102

3103
  This will generate a logical volume name for the given instance.
3104

3105
  """
3106
  results = []
3107
  for val in exts:
3108
    new_id = lu.cfg.GenerateUniqueID()
3109
    results.append("%s%s" % (new_id, val))
3110
  return results
3111

    
3112

    
3113
def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
3114
                         p_minor, s_minor):
3115
  """Generate a drbd8 device complete with its children.
3116

3117
  """
3118
  port = lu.cfg.AllocatePort()
3119
  vgname = lu.cfg.GetVGName()
3120
  shared_secret = lu.cfg.GenerateDRBDSecret()
3121
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3122
                          logical_id=(vgname, names[0]))
3123
  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3124
                          logical_id=(vgname, names[1]))
3125
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
3126
                          logical_id=(primary, secondary, port,
3127
                                      p_minor, s_minor,
3128
                                      shared_secret),
3129
                          children=[dev_data, dev_meta],
3130
                          iv_name=iv_name)
3131
  return drbd_dev
3132

    
3133

    
3134
def _GenerateDiskTemplate(lu, template_name,
3135
                          instance_name, primary_node,
3136
                          secondary_nodes, disk_info,
3137
                          file_storage_dir, file_driver,
3138
                          base_index):
3139
  """Generate the entire disk layout for a given template type.
3140

3141
  """
3142
  #TODO: compute space requirements
3143

    
3144
  vgname = lu.cfg.GetVGName()
3145
  disk_count = len(disk_info)
3146
  disks = []
3147
  if template_name == constants.DT_DISKLESS:
3148
    pass
3149
  elif template_name == constants.DT_PLAIN:
3150
    if len(secondary_nodes) != 0:
3151
      raise errors.ProgrammerError("Wrong template configuration")
3152

    
3153
    names = _GenerateUniqueNames(lu, [".disk%d" % i
3154
                                      for i in range(disk_count)])
3155
    for idx, disk in enumerate(disk_info):
3156
      disk_index = idx + base_index
3157
      disk_dev = objects.Disk(dev_type=constants.LD_LV, size=disk["size"],
3158
                              logical_id=(vgname, names[idx]),
3159
                              iv_name="disk/%d" % disk_index)
3160
      disks.append(disk_dev)
3161
  elif template_name == constants.DT_DRBD8:
3162
    if len(secondary_nodes) != 1:
3163
      raise errors.ProgrammerError("Wrong template configuration")
3164
    remote_node = secondary_nodes[0]
3165
    minors = lu.cfg.AllocateDRBDMinor(
3166
      [primary_node, remote_node] * len(disk_info), instance_name)
3167

    
3168
    names = _GenerateUniqueNames(lu,
3169
                                 [".disk%d_%s" % (i, s)
3170
                                  for i in range(disk_count)
3171
                                  for s in ("data", "meta")
3172
                                  ])
3173
    for idx, disk in enumerate(disk_info):
3174
      disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
3175
                                      disk["size"], names[idx*2:idx*2+2],
3176
                                      "disk/%d" % disk_index,
3177
                                      minors[idx*2], minors[idx*2+1])
3178
      disks.append(disk_dev)
3179
  elif template_name == constants.DT_FILE:
3180
    if len(secondary_nodes) != 0:
3181
      raise errors.ProgrammerError("Wrong template configuration")
3182

    
3183
    for idx, disk in enumerate(disk_info):
3184

    
3185
      disk_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk["size"],
3186
                              iv_name="disk/%d" % disk_index,
3187
                              logical_id=(file_driver,
3188
                                          "%s/disk%d" % (file_storage_dir,
3189
                                                         idx)))
3190
      disks.append(disk_dev)
3191
  else:
3192
    raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
3193
  return disks
3194

    
3195

    
3196
def _GetInstanceInfoText(instance):
3197
  """Compute that text that should be added to the disk's metadata.
3198

3199
  """
3200
  return "originstname+%s" % instance.name
3201

    
3202

    
3203
def _CreateDisks(lu, instance):
3204
  """Create all disks for an instance.
3205

3206
  This abstracts away some work from AddInstance.
3207

3208
  @type lu: L{LogicalUnit}
3209
  @param lu: the logical unit on whose behalf we execute
3210
  @type instance: L{objects.Instance}
3211
  @param instance: the instance whose disks we should create
3212
  @rtype: boolean
3213
  @return: the success of the creation
3214

3215
  """
3216
  info = _GetInstanceInfoText(instance)
3217

    
3218
  if instance.disk_template == constants.DT_FILE:
3219
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3220
    result = lu.rpc.call_file_storage_dir_create(instance.primary_node,
3221
                                                 file_storage_dir)
3222

    
3223
    if not result:
3224
      logging.error("Could not connect to node '%s'", instance.primary_node)
3225
      return False
3226

    
3227
    if not result[0]:
3228
      logging.error("Failed to create directory '%s'", file_storage_dir)
3229
      return False
3230

    
3231
  # Note: this needs to be kept in sync with adding of disks in
3232
  # LUSetInstanceParams
3233
  for device in instance.disks:
3234
    logging.info("Creating volume %s for instance %s",
3235
                 device.iv_name, instance.name)
3236
    #HARDCODE
3237
    for secondary_node in instance.secondary_nodes:
3238
      if not _CreateBlockDevOnSecondary(lu, secondary_node, instance,
3239
                                        device, False, info):
3240
        logging.error("Failed to create volume %s (%s) on secondary node %s!",
3241
                      device.iv_name, device, secondary_node)
3242
        return False
3243
    #HARDCODE
3244
    if not _CreateBlockDevOnPrimary(lu, instance.primary_node,
3245
                                    instance, device, info):
3246
      logging.error("Failed to create volume %s on primary!", device.iv_name)
3247
      return False
3248

    
3249
  return True
3250

    
3251

    
3252
def _RemoveDisks(lu, instance):
3253
  """Remove all disks for an instance.
3254

3255
  This abstracts away some work from `AddInstance()` and
3256
  `RemoveInstance()`. Note that in case some of the devices couldn't
3257
  be removed, the removal will continue with the other ones (compare
3258
  with `_CreateDisks()`).
3259

3260
  @type lu: L{LogicalUnit}
3261
  @param lu: the logical unit on whose behalf we execute
3262
  @type instance: L{objects.Instance}
3263
  @param instance: the instance whose disks we should remove
3264
  @rtype: boolean
3265
  @return: the success of the removal
3266

3267
  """
3268
  logging.info("Removing block devices for instance %s", instance.name)
3269

    
3270
  result = True
3271
  for device in instance.disks:
3272
    for node, disk in device.ComputeNodeTree(instance.primary_node):
3273
      lu.cfg.SetDiskID(disk, node)
3274
      if not lu.rpc.call_blockdev_remove(node, disk):
3275
        lu.proc.LogWarning("Could not remove block device %s on node %s,"
3276
                           " continuing anyway", device.iv_name, node)
3277
        result = False
3278

    
3279
  if instance.disk_template == constants.DT_FILE:
3280
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3281
    if not lu.rpc.call_file_storage_dir_remove(instance.primary_node,
3282
                                               file_storage_dir):
3283
      logging.error("Could not remove directory '%s'", file_storage_dir)
3284
      result = False
3285

    
3286
  return result
3287

    
3288

    
3289
def _ComputeDiskSize(disk_template, disks):
3290
  """Compute disk size requirements in the volume group
3291

3292
  """
3293
  # Required free disk space as a function of disk and swap space
3294
  req_size_dict = {
3295
    constants.DT_DISKLESS: None,
3296
    constants.DT_PLAIN: sum(d["size"] for d in disks),
3297
    # 128 MB are added for drbd metadata for each disk
3298
    constants.DT_DRBD8: sum(d["size"] + 128 for d in disks),
3299
    constants.DT_FILE: None,
3300
  }
3301

    
3302
  if disk_template not in req_size_dict:
3303
    raise errors.ProgrammerError("Disk template '%s' size requirement"
3304
                                 " is unknown" %  disk_template)
3305

    
3306
  return req_size_dict[disk_template]
3307

    
3308

    
3309
def _CheckHVParams(lu, nodenames, hvname, hvparams):
3310
  """Hypervisor parameter validation.
3311

3312
  This function abstract the hypervisor parameter validation to be
3313
  used in both instance create and instance modify.
3314

3315
  @type lu: L{LogicalUnit}
3316
  @param lu: the logical unit for which we check
3317
  @type nodenames: list
3318
  @param nodenames: the list of nodes on which we should check
3319
  @type hvname: string
3320
  @param hvname: the name of the hypervisor we should use
3321
  @type hvparams: dict
3322
  @param hvparams: the parameters which we need to check
3323
  @raise errors.OpPrereqError: if the parameters are not valid
3324

3325
  """
3326
  hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
3327
                                                  hvname,
3328
                                                  hvparams)
3329
  for node in nodenames:
3330
    info = hvinfo.get(node, None)
3331
    if not info or not isinstance(info, (tuple, list)):
3332
      raise errors.OpPrereqError("Cannot get current information"
3333
                                 " from node '%s' (%s)" % (node, info))
3334
    if not info[0]:
3335
      raise errors.OpPrereqError("Hypervisor parameter validation failed:"
3336
                                 " %s" % info[1])
3337

    
3338

    
3339
class LUCreateInstance(LogicalUnit):
3340
  """Create an instance.
3341

3342
  """
3343
  HPATH = "instance-add"
3344
  HTYPE = constants.HTYPE_INSTANCE
3345
  _OP_REQP = ["instance_name", "disks", "disk_template",
3346
              "mode", "start",
3347
              "wait_for_sync", "ip_check", "nics",
3348
              "hvparams", "beparams"]
3349
  REQ_BGL = False
3350

    
3351
  def _ExpandNode(self, node):
3352
    """Expands and checks one node name.
3353

3354
    """
3355
    node_full = self.cfg.ExpandNodeName(node)
3356
    if node_full is None:
3357
      raise errors.OpPrereqError("Unknown node %s" % node)
3358
    return node_full
3359

    
3360
  def ExpandNames(self):
3361
    """ExpandNames for CreateInstance.
3362

3363
    Figure out the right locks for instance creation.
3364

3365
    """
3366
    self.needed_locks = {}
3367

    
3368
    # set optional parameters to none if they don't exist
3369
    for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
3370
      if not hasattr(self.op, attr):
3371
        setattr(self.op, attr, None)
3372

    
3373
    # cheap checks, mostly valid constants given
3374

    
3375
    # verify creation mode
3376
    if self.op.mode not in (constants.INSTANCE_CREATE,
3377
                            constants.INSTANCE_IMPORT):
3378
      raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
3379
                                 self.op.mode)
3380

    
3381
    # disk template and mirror node verification
3382
    if self.op.disk_template not in constants.DISK_TEMPLATES:
3383
      raise errors.OpPrereqError("Invalid disk template name")
3384

    
3385
    if self.op.hypervisor is None:
3386
      self.op.hypervisor = self.cfg.GetHypervisorType()
3387

    
3388
    cluster = self.cfg.GetClusterInfo()
3389
    enabled_hvs = cluster.enabled_hypervisors
3390
    if self.op.hypervisor not in enabled_hvs:
3391
      raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
3392
                                 " cluster (%s)" % (self.op.hypervisor,
3393
                                  ",".join(enabled_hvs)))
3394

    
3395
    # check hypervisor parameter syntax (locally)
3396

    
3397
    filled_hvp = cluster.FillDict(cluster.hvparams[self.op.hypervisor],
3398
                                  self.op.hvparams)
3399
    hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
3400
    hv_type.CheckParameterSyntax(filled_hvp)
3401

    
3402
    # fill and remember the beparams dict
3403
    self.be_full = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
3404
                                    self.op.beparams)
3405

    
3406
    #### instance parameters check
3407

    
3408
    # instance name verification
3409
    hostname1 = utils.HostInfo(self.op.instance_name)
3410
    self.op.instance_name = instance_name = hostname1.name
3411

    
3412
    # this is just a preventive check, but someone might still add this
3413
    # instance in the meantime, and creation will fail at lock-add time
3414
    if instance_name in self.cfg.GetInstanceList():
3415
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3416
                                 instance_name)
3417

    
3418
    self.add_locks[locking.LEVEL_INSTANCE] = instance_name
3419

    
3420
    # NIC buildup
3421
    self.nics = []
3422
    for nic in self.op.nics:
3423
      # ip validity checks
3424
      ip = nic.get("ip", None)
3425
      if ip is None or ip.lower() == "none":
3426
        nic_ip = None
3427
      elif ip.lower() == constants.VALUE_AUTO:
3428
        nic_ip = hostname1.ip
3429
      else:
3430
        if not utils.IsValidIP(ip):
3431
          raise errors.OpPrereqError("Given IP address '%s' doesn't look"
3432
                                     " like a valid IP" % ip)
3433
        nic_ip = ip
3434

    
3435
      # MAC address verification
3436
      mac = nic.get("mac", constants.VALUE_AUTO)
3437
      if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
3438
        if not utils.IsValidMac(mac.lower()):
3439
          raise errors.OpPrereqError("Invalid MAC address specified: %s" %
3440
                                     mac)
3441
      # bridge verification
3442
      bridge = nic.get("bridge", self.cfg.GetDefBridge())
3443
      self.nics.append(objects.NIC(mac=mac, ip=nic_ip, bridge=bridge))
3444

    
3445
    # disk checks/pre-build
3446
    self.disks = []
3447
    for disk in self.op.disks:
3448
      mode = disk.get("mode", constants.DISK_RDWR)
3449
      if mode not in constants.DISK_ACCESS_SET:
3450
        raise errors.OpPrereqError("Invalid disk access mode '%s'" %
3451
                                   mode)
3452
      size = disk.get("size", None)
3453
      if size is None:
3454
        raise errors.OpPrereqError("Missing disk size")
3455
      try:
3456
        size = int(size)
3457
      except ValueError:
3458
        raise errors.OpPrereqError("Invalid disk size '%s'" % size)
3459
      self.disks.append({"size": size, "mode": mode})
3460

    
3461
    # used in CheckPrereq for ip ping check
3462
    self.check_ip = hostname1.ip
3463

    
3464
    # file storage checks
3465
    if (self.op.file_driver and
3466
        not self.op.file_driver in constants.FILE_DRIVER):
3467
      raise errors.OpPrereqError("Invalid file driver name '%s'" %
3468
                                 self.op.file_driver)
3469

    
3470
    if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
3471
      raise errors.OpPrereqError("File storage directory path not absolute")
3472

    
3473
    ### Node/iallocator related checks
3474
    if [self.op.iallocator, self.op.pnode].count(None) != 1:
3475
      raise errors.OpPrereqError("One and only one of iallocator and primary"
3476
                                 " node must be given")
3477

    
3478
    if self.op.iallocator:
3479
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3480
    else:
3481
      self.op.pnode = self._ExpandNode(self.op.pnode)
3482
      nodelist = [self.op.pnode]
3483
      if self.op.snode is not None:
3484
        self.op.snode = self._ExpandNode(self.op.snode)
3485
        nodelist.append(self.op.snode)
3486
      self.needed_locks[locking.LEVEL_NODE] = nodelist
3487

    
3488
    # in case of import lock the source node too
3489
    if self.op.mode == constants.INSTANCE_IMPORT:
3490
      src_node = getattr(self.op, "src_node", None)
3491
      src_path = getattr(self.op, "src_path", None)
3492

    
3493
      if src_node is None or src_path is None:
3494
        raise errors.OpPrereqError("Importing an instance requires source"
3495
                                   " node and path options")
3496

    
3497
      if not os.path.isabs(src_path):
3498
        raise errors.OpPrereqError("The source path must be absolute")
3499

    
3500
      self.op.src_node = src_node = self._ExpandNode(src_node)
3501
      if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
3502
        self.needed_locks[locking.LEVEL_NODE].append(src_node)
3503

    
3504
    else: # INSTANCE_CREATE
3505
      if getattr(self.op, "os_type", None) is None:
3506
        raise errors.OpPrereqError("No guest OS specified")
3507

    
3508
  def _RunAllocator(self):
3509
    """Run the allocator based on input opcode.
3510

3511
    """
3512
    nics = [n.ToDict() for n in self.nics]
3513
    ial = IAllocator(self,
3514
                     mode=constants.IALLOCATOR_MODE_ALLOC,
3515
                     name=self.op.instance_name,
3516
                     disk_template=self.op.disk_template,
3517
                     tags=[],
3518
                     os=self.op.os_type,
3519
                     vcpus=self.be_full[constants.BE_VCPUS],
3520
                     mem_size=self.be_full[constants.BE_MEMORY],
3521
                     disks=self.disks,
3522
                     nics=nics,
3523
                     hypervisor=self.op.hypervisor,
3524
                     )
3525

    
3526
    ial.Run(self.op.iallocator)
3527

    
3528
    if not ial.success:
3529
      raise errors.OpPrereqError("Can't compute nodes using"
3530
                                 " iallocator '%s': %s" % (self.op.iallocator,
3531
                                                           ial.info))
3532
    if len(ial.nodes) != ial.required_nodes:
3533
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3534
                                 " of nodes (%s), required %s" %
3535
                                 (self.op.iallocator, len(ial.nodes),
3536
                                  ial.required_nodes))
3537
    self.op.pnode = ial.nodes[0]
3538
    self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
3539
                 self.op.instance_name, self.op.iallocator,
3540
                 ", ".join(ial.nodes))
3541
    if ial.required_nodes == 2:
3542
      self.op.snode = ial.nodes[1]
3543

    
3544
  def BuildHooksEnv(self):
3545
    """Build hooks env.
3546

3547
    This runs on master, primary and secondary nodes of the instance.
3548

3549
    """
3550
    env = {
3551
      "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
3552
      "INSTANCE_DISK_SIZE": ",".join(str(d["size"]) for d in self.disks),
3553
      "INSTANCE_ADD_MODE": self.op.mode,
3554
      }
3555
    if self.op.mode == constants.INSTANCE_IMPORT:
3556
      env["INSTANCE_SRC_NODE"] = self.op.src_node
3557
      env["INSTANCE_SRC_PATH"] = self.op.src_path
3558
      env["INSTANCE_SRC_IMAGES"] = self.src_images
3559

    
3560
    env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
3561
      primary_node=self.op.pnode,
3562
      secondary_nodes=self.secondaries,
3563
      status=self.instance_status,
3564
      os_type=self.op.os_type,
3565
      memory=self.be_full[constants.BE_MEMORY],
3566
      vcpus=self.be_full[constants.BE_VCPUS],
3567
      nics=[(n.ip, n.bridge, n.mac) for n in self.nics],
3568
    ))
3569

    
3570
    nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
3571
          self.secondaries)
3572
    return env, nl, nl
3573

    
3574

    
3575
  def CheckPrereq(self):
3576
    """Check prerequisites.
3577

3578
    """
3579
    if (not self.cfg.GetVGName() and
3580
        self.op.disk_template not in constants.DTS_NOT_LVM):
3581
      raise errors.OpPrereqError("Cluster does not support lvm-based"
3582
                                 " instances")
3583

    
3584

    
3585
    if self.op.mode == constants.INSTANCE_IMPORT:
3586
      src_node = self.op.src_node
3587
      src_path = self.op.src_path
3588

    
3589
      export_info = self.rpc.call_export_info(src_node, src_path)
3590

    
3591
      if not export_info:
3592
        raise errors.OpPrereqError("No export found in dir %s" % src_path)
3593

    
3594
      if not export_info.has_section(constants.INISECT_EXP):
3595
        raise errors.ProgrammerError("Corrupted export config")
3596

    
3597
      ei_version = export_info.get(constants.INISECT_EXP, 'version')
3598
      if (int(ei_version) != constants.EXPORT_VERSION):
3599
        raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3600
                                   (ei_version, constants.EXPORT_VERSION))
3601

    
3602
      # Check that the new instance doesn't have less disks than the export
3603
      instance_disks = len(self.disks)
3604
      export_disks = export_info.getint(constants.INISECT_INS, 'disk_count')
3605
      if instance_disks < export_disks:
3606
        raise errors.OpPrereqError("Not enough disks to import."
3607
                                   " (instance: %d, export: %d)" %
3608
                                   (2, export_disks))
3609

    
3610
      self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3611
      disk_images = []
3612
      for idx in range(export_disks):
3613
        option = 'disk%d_dump' % idx
3614
        if export_info.has_option(constants.INISECT_INS, option):
3615
          # FIXME: are the old os-es, disk sizes, etc. useful?
3616
          export_name = export_info.get(constants.INISECT_INS, option)
3617
          image = os.path.join(src_path, export_name)
3618
          disk_images.append(image)
3619
        else:
3620
          disk_images.append(False)
3621

    
3622
      self.src_images = disk_images
3623

    
3624
      old_name = export_info.get(constants.INISECT_INS, 'name')
3625
      # FIXME: int() here could throw a ValueError on broken exports
3626
      exp_nic_count = int(export_info.get(constants.INISECT_INS, 'nic_count'))
3627
      if self.op.instance_name == old_name:
3628
        for idx, nic in enumerate(self.nics):
3629
          if nic.mac == constants.VALUE_AUTO and exp_nic_count >= idx:
3630
            nic_mac_ini = 'nic%d_mac' % idx
3631
            nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini)
3632

    
3633
    # ip ping checks (we use the same ip that was resolved in ExpandNames)
3634
    if self.op.start and not self.op.ip_check:
3635
      raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3636
                                 " adding an instance in start mode")
3637

    
3638
    if self.op.ip_check:
3639
      if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
3640
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
3641
                                   (self.check_ip, self.op.instance_name))
3642

    
3643
    #### allocator run
3644

    
3645
    if self.op.iallocator is not None:
3646
      self._RunAllocator()
3647

    
3648
    #### node related checks
3649

    
3650
    # check primary node
3651
    self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
3652
    assert self.pnode is not None, \
3653
      "Cannot retrieve locked node %s" % self.op.pnode
3654
    self.secondaries = []
3655

    
3656
    # mirror node verification
3657
    if self.op.disk_template in constants.DTS_NET_MIRROR:
3658
      if self.op.snode is None:
3659
        raise errors.OpPrereqError("The networked disk templates need"
3660
                                   " a mirror node")
3661
      if self.op.snode == pnode.name:
3662
        raise errors.OpPrereqError("The secondary node cannot be"
3663
                                   " the primary node.")
3664
      self.secondaries.append(self.op.snode)
3665

    
3666
    nodenames = [pnode.name] + self.secondaries
3667

    
3668
    req_size = _ComputeDiskSize(self.op.disk_template,
3669
                                self.disks)
3670

    
3671
    # Check lv size requirements
3672
    if req_size is not None:
3673
      nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
3674
                                         self.op.hypervisor)
3675
      for node in nodenames:
3676
        info = nodeinfo.get(node, None)
3677
        if not info:
3678
          raise errors.OpPrereqError("Cannot get current information"
3679
                                     " from node '%s'" % node)
3680
        vg_free = info.get('vg_free', None)
3681
        if not isinstance(vg_free, int):
3682
          raise errors.OpPrereqError("Can't compute free disk space on"
3683
                                     " node %s" % node)
3684
        if req_size > info['vg_free']:
3685
          raise errors.OpPrereqError("Not enough disk space on target node %s."
3686
                                     " %d MB available, %d MB required" %
3687
                                     (node, info['vg_free'], req_size))
3688

    
3689
    _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
3690

    
3691
    # os verification
3692
    os_obj = self.rpc.call_os_get(pnode.name, self.op.os_type)
3693
    if not os_obj:
3694
      raise errors.OpPrereqError("OS '%s' not in supported os list for"
3695
                                 " primary node"  % self.op.os_type)
3696

    
3697
    # bridge check on primary node
3698
    bridges = [n.bridge for n in self.nics]
3699
    if not self.rpc.call_bridges_exist(self.pnode.name, bridges):
3700
      raise errors.OpPrereqError("one of the target bridges '%s' does not"
3701
                                 " exist on"
3702
                                 " destination node '%s'" %
3703
                                 (",".join(bridges), pnode.name))
3704

    
3705
    # memory check on primary node
3706
    if self.op.start:
3707
      _CheckNodeFreeMemory(self, self.pnode.name,
3708
                           "creating instance %s" % self.op.instance_name,
3709
                           self.be_full[constants.BE_MEMORY],
3710
                           self.op.hypervisor)
3711

    
3712
    if self.op.start:
3713
      self.instance_status = 'up'
3714
    else:
3715
      self.instance_status = 'down'
3716

    
3717
  def Exec(self, feedback_fn):
3718
    """Create and add the instance to the cluster.
3719

3720
    """
3721
    instance = self.op.instance_name
3722
    pnode_name = self.pnode.name
3723

    
3724
    for nic in self.nics:
3725
      if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
3726
        nic.mac = self.cfg.GenerateMAC()
3727

    
3728
    ht_kind = self.op.hypervisor
3729
    if ht_kind in constants.HTS_REQ_PORT:
3730
      network_port = self.cfg.AllocatePort()
3731
    else:
3732
      network_port = None
3733

    
3734
    ##if self.op.vnc_bind_address is None:
3735
    ##  self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
3736

    
3737
    # this is needed because os.path.join does not accept None arguments
3738
    if self.op.file_storage_dir is None:
3739
      string_file_storage_dir = ""
3740
    else:
3741
      string_file_storage_dir = self.op.file_storage_dir
3742

    
3743
    # build the full file storage dir path
3744
    file_storage_dir = os.path.normpath(os.path.join(
3745
                                        self.cfg.GetFileStorageDir(),
3746
                                        string_file_storage_dir, instance))
3747

    
3748

    
3749
    disks = _GenerateDiskTemplate(self,
3750
                                  self.op.disk_template,
3751
                                  instance, pnode_name,
3752
                                  self.secondaries,
3753
                                  self.disks,
3754
                                  file_storage_dir,
3755
                                  self.op.file_driver,
3756
                                  0)
3757

    
3758
    iobj = objects.Instance(name=instance, os=self.op.os_type,
3759
                            primary_node=pnode_name,
3760
                            nics=self.nics, disks=disks,
3761
                            disk_template=self.op.disk_template,
3762
                            status=self.instance_status,
3763
                            network_port=network_port,
3764
                            beparams=self.op.beparams,
3765
                            hvparams=self.op.hvparams,
3766
                            hypervisor=self.op.hypervisor,
3767
                            )
3768

    
3769
    feedback_fn("* creating instance disks...")
3770
    if not _CreateDisks(self, iobj):
3771
      _RemoveDisks(self, iobj)
3772
      self.cfg.ReleaseDRBDMinors(instance)
3773
      raise errors.OpExecError("Device creation failed, reverting...")
3774

    
3775
    feedback_fn("adding instance %s to cluster config" % instance)
3776

    
3777
    self.cfg.AddInstance(iobj)
3778
    # Declare that we don't want to remove the instance lock anymore, as we've
3779
    # added the instance to the config
3780
    del self.remove_locks[locking.LEVEL_INSTANCE]
3781
    # Remove the temp. assignements for the instance's drbds
3782
    self.cfg.ReleaseDRBDMinors(instance)
3783
    # Unlock all the nodes
3784
    self.context.glm.release(locking.LEVEL_NODE)
3785
    del self.acquired_locks[locking.LEVEL_NODE]
3786

    
3787
    if self.op.wait_for_sync:
3788
      disk_abort = not _WaitForSync(self, iobj)
3789
    elif iobj.disk_template in constants.DTS_NET_MIRROR:
3790
      # make sure the disks are not degraded (still sync-ing is ok)
3791
      time.sleep(15)
3792
      feedback_fn("* checking mirrors status")
3793
      disk_abort = not _WaitForSync(self, iobj, oneshot=True)
3794
    else:
3795
      disk_abort = False
3796

    
3797
    if disk_abort:
3798
      _RemoveDisks(self, iobj)
3799
      self.cfg.RemoveInstance(iobj.name)
3800
      # Make sure the instance lock gets removed
3801
      self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
3802
      raise errors.OpExecError("There are some degraded disks for"
3803
                               " this instance")
3804

    
3805
    feedback_fn("creating os for instance %s on node %s" %
3806
                (instance, pnode_name))
3807

    
3808
    if iobj.disk_template != constants.DT_DISKLESS:
3809
      if self.op.mode == constants.INSTANCE_CREATE:
3810
        feedback_fn("* running the instance OS create scripts...")
3811
        if not self.rpc.call_instance_os_add(pnode_name, iobj):
3812
          raise errors.OpExecError("could not add os for instance %s"
3813
                                   " on node %s" %
3814
                                   (instance, pnode_name))
3815

    
3816
      elif self.op.mode == constants.INSTANCE_IMPORT:
3817
        feedback_fn("* running the instance OS import scripts...")
3818
        src_node = self.op.src_node
3819
        src_images = self.src_images
3820
        cluster_name = self.cfg.GetClusterName()
3821
        import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
3822
                                                         src_node, src_images,
3823
                                                         cluster_name)
3824
        for idx, result in enumerate(import_result):
3825
          if not result:
3826
            self.LogWarning("Could not image %s for on instance %s, disk %d,"
3827
                            " on node %s" % (src_images[idx], instance, idx,
3828
                                             pnode_name))
3829
      else:
3830
        # also checked in the prereq part
3831
        raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3832
                                     % self.op.mode)
3833

    
3834
    if self.op.start:
3835
      logging.info("Starting instance %s on node %s", instance, pnode_name)
3836
      feedback_fn("* starting instance...")
3837
      if not self.rpc.call_instance_start(pnode_name, iobj, None):
3838
        raise errors.OpExecError("Could not start instance")
3839

    
3840

    
3841
class LUConnectConsole(NoHooksLU):
3842
  """Connect to an instance's console.
3843

3844
  This is somewhat special in that it returns the command line that
3845
  you need to run on the master node in order to connect to the
3846
  console.
3847

3848
  """
3849
  _OP_REQP = ["instance_name"]
3850
  REQ_BGL = False
3851

    
3852
  def ExpandNames(self):
3853
    self._ExpandAndLockInstance()
3854

    
3855
  def CheckPrereq(self):
3856
    """Check prerequisites.
3857

3858
    This checks that the instance is in the cluster.
3859

3860
    """
3861
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3862
    assert self.instance is not None, \
3863
      "Cannot retrieve locked instance %s" % self.op.instance_name
3864

    
3865
  def Exec(self, feedback_fn):
3866
    """Connect to the console of an instance
3867

3868
    """
3869
    instance = self.instance
3870
    node = instance.primary_node
3871

    
3872
    node_insts = self.rpc.call_instance_list([node],
3873
                                             [instance.hypervisor])[node]
3874
    if node_insts is False:
3875
      raise errors.OpExecError("Can't connect to node %s." % node)
3876

    
3877
    if instance.name not in node_insts:
3878
      raise errors.OpExecError("Instance %s is not running." % instance.name)
3879

    
3880
    logging.debug("Connecting to console of %s on %s", instance.name, node)
3881

    
3882
    hyper = hypervisor.GetHypervisor(instance.hypervisor)
3883
    console_cmd = hyper.GetShellCommandForConsole(instance)
3884

    
3885
    # build ssh cmdline
3886
    return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
3887

    
3888

    
3889
class LUReplaceDisks(LogicalUnit):
3890
  """Replace the disks of an instance.
3891

3892
  """
3893
  HPATH = "mirrors-replace"
3894
  HTYPE = constants.HTYPE_INSTANCE
3895
  _OP_REQP = ["instance_name", "mode", "disks"]
3896
  REQ_BGL = False
3897

    
3898
  def ExpandNames(self):
3899
    self._ExpandAndLockInstance()
3900

    
3901
    if not hasattr(self.op, "remote_node"):
3902
      self.op.remote_node = None
3903

    
3904
    ia_name = getattr(self.op, "iallocator", None)
3905
    if ia_name is not None:
3906
      if self.op.remote_node is not None:
3907
        raise errors.OpPrereqError("Give either the iallocator or the new"
3908
                                   " secondary, not both")
3909
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3910
    elif self.op.remote_node is not None:
3911
      remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
3912
      if remote_node is None:
3913
        raise errors.OpPrereqError("Node '%s' not known" %
3914
                                   self.op.remote_node)
3915
      self.op.remote_node = remote_node
3916
      self.needed_locks[locking.LEVEL_NODE] = [remote_node]
3917
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
3918
    else:
3919
      self.needed_locks[locking.LEVEL_NODE] = []
3920
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3921

    
3922
  def DeclareLocks(self, level):
3923
    # If we're not already locking all nodes in the set we have to declare the
3924
    # instance's primary/secondary nodes.
3925
    if (level == locking.LEVEL_NODE and
3926
        self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
3927
      self._LockInstancesNodes()
3928

    
3929
  def _RunAllocator(self):
3930
    """Compute a new secondary node using an IAllocator.
3931

3932
    """
3933
    ial = IAllocator(self,
3934
                     mode=constants.IALLOCATOR_MODE_RELOC,
3935
                     name=self.op.instance_name,
3936
                     relocate_from=[self.sec_node])
3937

    
3938
    ial.Run(self.op.iallocator)
3939

    
3940
    if not ial.success:
3941
      raise errors.OpPrereqError("Can't compute nodes using"
3942
                                 " iallocator '%s': %s" % (self.op.iallocator,
3943
                                                           ial.info))
3944
    if len(ial.nodes) != ial.required_nodes:
3945
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3946
                                 " of nodes (%s), required %s" %
3947
                                 (len(ial.nodes), ial.required_nodes))
3948
    self.op.remote_node = ial.nodes[0]
3949
    self.LogInfo("Selected new secondary for the instance: %s",
3950
                 self.op.remote_node)
3951

    
3952
  def BuildHooksEnv(self):
3953
    """Build hooks env.
3954

3955
    This runs on the master, the primary and all the secondaries.
3956

3957
    """
3958
    env = {
3959
      "MODE": self.op.mode,
3960
      "NEW_SECONDARY": self.op.remote_node,
3961
      "OLD_SECONDARY": self.instance.secondary_nodes[0],
3962
      }
3963
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3964
    nl = [
3965
      self.cfg.GetMasterNode(),
3966
      self.instance.primary_node,
3967
      ]
3968
    if self.op.remote_node is not None:
3969
      nl.append(self.op.remote_node)
3970
    return env, nl, nl
3971

    
3972
  def CheckPrereq(self):
3973
    """Check prerequisites.
3974

3975
    This checks that the instance is in the cluster.
3976

3977
    """
3978
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3979
    assert instance is not None, \
3980
      "Cannot retrieve locked instance %s" % self.op.instance_name
3981
    self.instance = instance
3982

    
3983
    if instance.disk_template not in constants.DTS_NET_MIRROR:
3984
      raise errors.OpPrereqError("Instance's disk layout is not"
3985
                                 " network mirrored.")
3986

    
3987
    if len(instance.secondary_nodes) != 1:
3988
      raise errors.OpPrereqError("The instance has a strange layout,