Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ e2fe6369

History | View | Annotate | Download (155.8 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0613,W0201
25

    
26
import os
27
import os.path
28
import sha
29
import time
30
import tempfile
31
import re
32
import platform
33

    
34
from ganeti import rpc
35
from ganeti import ssh
36
from ganeti import logger
37
from ganeti import utils
38
from ganeti import errors
39
from ganeti import hypervisor
40
from ganeti import config
41
from ganeti import constants
42
from ganeti import objects
43
from ganeti import opcodes
44
from ganeti import ssconf
45

    
46

    
47
class LogicalUnit(object):
48
  """Logical Unit base class.
49

50
  Subclasses must follow these rules:
51
    - implement CheckPrereq which also fills in the opcode instance
52
      with all the fields (even if as None)
53
    - implement Exec
54
    - implement BuildHooksEnv
55
    - redefine HPATH and HTYPE
56
    - optionally redefine their run requirements (REQ_CLUSTER,
57
      REQ_MASTER); note that all commands require root permissions
58

59
  """
60
  HPATH = None
61
  HTYPE = None
62
  _OP_REQP = []
63
  REQ_CLUSTER = True
64
  REQ_MASTER = True
65

    
66
  def __init__(self, processor, op, cfg, sstore):
67
    """Constructor for LogicalUnit.
68

69
    This needs to be overriden in derived classes in order to check op
70
    validity.
71

72
    """
73
    self.proc = processor
74
    self.op = op
75
    self.cfg = cfg
76
    self.sstore = sstore
77
    self.__ssh = None
78

    
79
    for attr_name in self._OP_REQP:
80
      attr_val = getattr(op, attr_name, None)
81
      if attr_val is None:
82
        raise errors.OpPrereqError("Required parameter '%s' missing" %
83
                                   attr_name)
84
    if self.REQ_CLUSTER:
85
      if not cfg.IsCluster():
86
        raise errors.OpPrereqError("Cluster not initialized yet,"
87
                                   " use 'gnt-cluster init' first.")
88
      if self.REQ_MASTER:
89
        master = sstore.GetMasterNode()
90
        if master != utils.HostInfo().name:
91
          raise errors.OpPrereqError("Commands must be run on the master"
92
                                     " node %s" % master)
93

    
94
  def __GetSSH(self):
95
    """Returns the SshRunner object
96

97
    """
98
    if not self.__ssh:
99
      self.__ssh = ssh.SshRunner(self.sstore)
100
    return self.__ssh
101

    
102
  ssh = property(fget=__GetSSH)
103

    
104
  def CheckPrereq(self):
105
    """Check prerequisites for this LU.
106

107
    This method should check that the prerequisites for the execution
108
    of this LU are fulfilled. It can do internode communication, but
109
    it should be idempotent - no cluster or system changes are
110
    allowed.
111

112
    The method should raise errors.OpPrereqError in case something is
113
    not fulfilled. Its return value is ignored.
114

115
    This method should also update all the parameters of the opcode to
116
    their canonical form; e.g. a short node name must be fully
117
    expanded after this method has successfully completed (so that
118
    hooks, logging, etc. work correctly).
119

120
    """
121
    raise NotImplementedError
122

    
123
  def Exec(self, feedback_fn):
124
    """Execute the LU.
125

126
    This method should implement the actual work. It should raise
127
    errors.OpExecError for failures that are somewhat dealt with in
128
    code, or expected.
129

130
    """
131
    raise NotImplementedError
132

    
133
  def BuildHooksEnv(self):
134
    """Build hooks environment for this LU.
135

136
    This method should return a three-node tuple consisting of: a dict
137
    containing the environment that will be used for running the
138
    specific hook for this LU, a list of node names on which the hook
139
    should run before the execution, and a list of node names on which
140
    the hook should run after the execution.
141

142
    The keys of the dict must not have 'GANETI_' prefixed as this will
143
    be handled in the hooks runner. Also note additional keys will be
144
    added by the hooks runner. If the LU doesn't define any
145
    environment, an empty dict (and not None) should be returned.
146

147
    As for the node lists, the master should not be included in the
148
    them, as it will be added by the hooks runner in case this LU
149
    requires a cluster to run on (otherwise we don't have a node
150
    list). No nodes should be returned as an empty list (and not
151
    None).
152

153
    Note that if the HPATH for a LU class is None, this function will
154
    not be called.
155

156
    """
157
    raise NotImplementedError
158

    
159

    
160
class NoHooksLU(LogicalUnit):
161
  """Simple LU which runs no hooks.
162

163
  This LU is intended as a parent for other LogicalUnits which will
164
  run no hooks, in order to reduce duplicate code.
165

166
  """
167
  HPATH = None
168
  HTYPE = None
169

    
170
  def BuildHooksEnv(self):
171
    """Build hooks env.
172

173
    This is a no-op, since we don't run hooks.
174

175
    """
176
    return {}, [], []
177

    
178

    
179
def _AddHostToEtcHosts(hostname):
180
  """Wrapper around utils.SetEtcHostsEntry.
181

182
  """
183
  hi = utils.HostInfo(name=hostname)
184
  utils.SetEtcHostsEntry(constants.ETC_HOSTS, hi.ip, hi.name, [hi.ShortName()])
185

    
186

    
187
def _RemoveHostFromEtcHosts(hostname):
188
  """Wrapper around utils.RemoveEtcHostsEntry.
189

190
  """
191
  hi = utils.HostInfo(name=hostname)
192
  utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.name)
193
  utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.ShortName())
194

    
195

    
196
def _GetWantedNodes(lu, nodes):
197
  """Returns list of checked and expanded node names.
198

199
  Args:
200
    nodes: List of nodes (strings) or None for all
201

202
  """
203
  if not isinstance(nodes, list):
204
    raise errors.OpPrereqError("Invalid argument type 'nodes'")
205

    
206
  if nodes:
207
    wanted = []
208

    
209
    for name in nodes:
210
      node = lu.cfg.ExpandNodeName(name)
211
      if node is None:
212
        raise errors.OpPrereqError("No such node name '%s'" % name)
213
      wanted.append(node)
214

    
215
  else:
216
    wanted = lu.cfg.GetNodeList()
217
  return utils.NiceSort(wanted)
218

    
219

    
220
def _GetWantedInstances(lu, instances):
221
  """Returns list of checked and expanded instance names.
222

223
  Args:
224
    instances: List of instances (strings) or None for all
225

226
  """
227
  if not isinstance(instances, list):
228
    raise errors.OpPrereqError("Invalid argument type 'instances'")
229

    
230
  if instances:
231
    wanted = []
232

    
233
    for name in instances:
234
      instance = lu.cfg.ExpandInstanceName(name)
235
      if instance is None:
236
        raise errors.OpPrereqError("No such instance name '%s'" % name)
237
      wanted.append(instance)
238

    
239
  else:
240
    wanted = lu.cfg.GetInstanceList()
241
  return utils.NiceSort(wanted)
242

    
243

    
244
def _CheckOutputFields(static, dynamic, selected):
245
  """Checks whether all selected fields are valid.
246

247
  Args:
248
    static: Static fields
249
    dynamic: Dynamic fields
250

251
  """
252
  static_fields = frozenset(static)
253
  dynamic_fields = frozenset(dynamic)
254

    
255
  all_fields = static_fields | dynamic_fields
256

    
257
  if not all_fields.issuperset(selected):
258
    raise errors.OpPrereqError("Unknown output fields selected: %s"
259
                               % ",".join(frozenset(selected).
260
                                          difference(all_fields)))
261

    
262

    
263
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
264
                          memory, vcpus, nics):
265
  """Builds instance related env variables for hooks from single variables.
266

267
  Args:
268
    secondary_nodes: List of secondary nodes as strings
269
  """
270
  env = {
271
    "OP_TARGET": name,
272
    "INSTANCE_NAME": name,
273
    "INSTANCE_PRIMARY": primary_node,
274
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
275
    "INSTANCE_OS_TYPE": os_type,
276
    "INSTANCE_STATUS": status,
277
    "INSTANCE_MEMORY": memory,
278
    "INSTANCE_VCPUS": vcpus,
279
  }
280

    
281
  if nics:
282
    nic_count = len(nics)
283
    for idx, (ip, bridge, mac) in enumerate(nics):
284
      if ip is None:
285
        ip = ""
286
      env["INSTANCE_NIC%d_IP" % idx] = ip
287
      env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
288
      env["INSTANCE_NIC%d_HWADDR" % idx] = mac
289
  else:
290
    nic_count = 0
291

    
292
  env["INSTANCE_NIC_COUNT"] = nic_count
293

    
294
  return env
295

    
296

    
297
def _BuildInstanceHookEnvByObject(instance, override=None):
298
  """Builds instance related env variables for hooks from an object.
299

300
  Args:
301
    instance: objects.Instance object of instance
302
    override: dict of values to override
303
  """
304
  args = {
305
    'name': instance.name,
306
    'primary_node': instance.primary_node,
307
    'secondary_nodes': instance.secondary_nodes,
308
    'os_type': instance.os,
309
    'status': instance.os,
310
    'memory': instance.memory,
311
    'vcpus': instance.vcpus,
312
    'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
313
  }
314
  if override:
315
    args.update(override)
316
  return _BuildInstanceHookEnv(**args)
317

    
318

    
319
def _HasValidVG(vglist, vgname):
320
  """Checks if the volume group list is valid.
321

322
  A non-None return value means there's an error, and the return value
323
  is the error message.
324

325
  """
326
  vgsize = vglist.get(vgname, None)
327
  if vgsize is None:
328
    return "volume group '%s' missing" % vgname
329
  elif vgsize < 20480:
330
    return ("volume group '%s' too small (20480MiB required, %dMib found)" %
331
            (vgname, vgsize))
332
  return None
333

    
334

    
335
def _InitSSHSetup(node):
336
  """Setup the SSH configuration for the cluster.
337

338

339
  This generates a dsa keypair for root, adds the pub key to the
340
  permitted hosts and adds the hostkey to its own known hosts.
341

342
  Args:
343
    node: the name of this host as a fqdn
344

345
  """
346
  priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
347

    
348
  for name in priv_key, pub_key:
349
    if os.path.exists(name):
350
      utils.CreateBackup(name)
351
    utils.RemoveFile(name)
352

    
353
  result = utils.RunCmd(["ssh-keygen", "-t", "dsa",
354
                         "-f", priv_key,
355
                         "-q", "-N", ""])
356
  if result.failed:
357
    raise errors.OpExecError("Could not generate ssh keypair, error %s" %
358
                             result.output)
359

    
360
  f = open(pub_key, 'r')
361
  try:
362
    utils.AddAuthorizedKey(auth_keys, f.read(8192))
363
  finally:
364
    f.close()
365

    
366

    
367
def _InitGanetiServerSetup(ss):
368
  """Setup the necessary configuration for the initial node daemon.
369

370
  This creates the nodepass file containing the shared password for
371
  the cluster and also generates the SSL certificate.
372

373
  """
374
  # Create pseudo random password
375
  randpass = sha.new(os.urandom(64)).hexdigest()
376
  # and write it into sstore
377
  ss.SetKey(ss.SS_NODED_PASS, randpass)
378

    
379
  result = utils.RunCmd(["openssl", "req", "-new", "-newkey", "rsa:1024",
380
                         "-days", str(365*5), "-nodes", "-x509",
381
                         "-keyout", constants.SSL_CERT_FILE,
382
                         "-out", constants.SSL_CERT_FILE, "-batch"])
383
  if result.failed:
384
    raise errors.OpExecError("could not generate server ssl cert, command"
385
                             " %s had exitcode %s and error message %s" %
386
                             (result.cmd, result.exit_code, result.output))
387

    
388
  os.chmod(constants.SSL_CERT_FILE, 0400)
389

    
390
  result = utils.RunCmd([constants.NODE_INITD_SCRIPT, "restart"])
391

    
392
  if result.failed:
393
    raise errors.OpExecError("Could not start the node daemon, command %s"
394
                             " had exitcode %s and error %s" %
395
                             (result.cmd, result.exit_code, result.output))
396

    
397

    
398
def _CheckInstanceBridgesExist(instance):
399
  """Check that the brigdes needed by an instance exist.
400

401
  """
402
  # check bridges existance
403
  brlist = [nic.bridge for nic in instance.nics]
404
  if not rpc.call_bridges_exist(instance.primary_node, brlist):
405
    raise errors.OpPrereqError("one or more target bridges %s does not"
406
                               " exist on destination node '%s'" %
407
                               (brlist, instance.primary_node))
408

    
409

    
410
class LUInitCluster(LogicalUnit):
411
  """Initialise the cluster.
412

413
  """
414
  HPATH = "cluster-init"
415
  HTYPE = constants.HTYPE_CLUSTER
416
  _OP_REQP = ["cluster_name", "hypervisor_type", "mac_prefix",
417
              "def_bridge", "master_netdev", "file_storage_dir"]
418
  REQ_CLUSTER = False
419

    
420
  def BuildHooksEnv(self):
421
    """Build hooks env.
422

423
    Notes: Since we don't require a cluster, we must manually add
424
    ourselves in the post-run node list.
425

426
    """
427
    env = {"OP_TARGET": self.op.cluster_name}
428
    return env, [], [self.hostname.name]
429

    
430
  def CheckPrereq(self):
431
    """Verify that the passed name is a valid one.
432

433
    """
434
    if config.ConfigWriter.IsCluster():
435
      raise errors.OpPrereqError("Cluster is already initialised")
436

    
437
    if self.op.hypervisor_type == constants.HT_XEN_HVM31:
438
      if not os.path.exists(constants.VNC_PASSWORD_FILE):
439
        raise errors.OpPrereqError("Please prepare the cluster VNC"
440
                                   "password file %s" %
441
                                   constants.VNC_PASSWORD_FILE)
442

    
443
    self.hostname = hostname = utils.HostInfo()
444

    
445
    if hostname.ip.startswith("127."):
446
      raise errors.OpPrereqError("This host's IP resolves to the private"
447
                                 " range (%s). Please fix DNS or %s." %
448
                                 (hostname.ip, constants.ETC_HOSTS))
449

    
450
    if not utils.TcpPing(hostname.ip, constants.DEFAULT_NODED_PORT,
451
                         source=constants.LOCALHOST_IP_ADDRESS):
452
      raise errors.OpPrereqError("Inconsistency: this host's name resolves"
453
                                 " to %s,\nbut this ip address does not"
454
                                 " belong to this host."
455
                                 " Aborting." % hostname.ip)
456

    
457
    self.clustername = clustername = utils.HostInfo(self.op.cluster_name)
458

    
459
    if utils.TcpPing(clustername.ip, constants.DEFAULT_NODED_PORT,
460
                     timeout=5):
461
      raise errors.OpPrereqError("Cluster IP already active. Aborting.")
462

    
463
    secondary_ip = getattr(self.op, "secondary_ip", None)
464
    if secondary_ip and not utils.IsValidIP(secondary_ip):
465
      raise errors.OpPrereqError("Invalid secondary ip given")
466
    if (secondary_ip and
467
        secondary_ip != hostname.ip and
468
        (not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
469
                           source=constants.LOCALHOST_IP_ADDRESS))):
470
      raise errors.OpPrereqError("You gave %s as secondary IP,"
471
                                 " but it does not belong to this host." %
472
                                 secondary_ip)
473
    self.secondary_ip = secondary_ip
474

    
475
    if not hasattr(self.op, "vg_name"):
476
      self.op.vg_name = None
477
    # if vg_name not None, checks if volume group is valid
478
    if self.op.vg_name:
479
      vgstatus = _HasValidVG(utils.ListVolumeGroups(), self.op.vg_name)
480
      if vgstatus:
481
        raise errors.OpPrereqError("Error: %s\nspecify --no-lvm-storage if"
482
                                   " you are not using lvm" % vgstatus)
483

    
484
    self.op.file_storage_dir = os.path.normpath(self.op.file_storage_dir)
485

    
486
    if not os.path.isabs(self.op.file_storage_dir):
487
      raise errors.OpPrereqError("The file storage directory you have is"
488
                                 " not an absolute path.")
489

    
490
    if not os.path.exists(self.op.file_storage_dir):
491
      try:
492
        os.makedirs(self.op.file_storage_dir, 0750)
493
      except OSError, err:
494
        raise errors.OpPrereqError("Cannot create file storage directory"
495
                                   " '%s': %s" %
496
                                   (self.op.file_storage_dir, err))
497

    
498
    if not os.path.isdir(self.op.file_storage_dir):
499
      raise errors.OpPrereqError("The file storage directory '%s' is not"
500
                                 " a directory." % self.op.file_storage_dir)
501

    
502
    if not re.match("^[0-9a-z]{2}:[0-9a-z]{2}:[0-9a-z]{2}$",
503
                    self.op.mac_prefix):
504
      raise errors.OpPrereqError("Invalid mac prefix given '%s'" %
505
                                 self.op.mac_prefix)
506

    
507
    if self.op.hypervisor_type not in constants.HYPER_TYPES:
508
      raise errors.OpPrereqError("Invalid hypervisor type given '%s'" %
509
                                 self.op.hypervisor_type)
510

    
511
    result = utils.RunCmd(["ip", "link", "show", "dev", self.op.master_netdev])
512
    if result.failed:
513
      raise errors.OpPrereqError("Invalid master netdev given (%s): '%s'" %
514
                                 (self.op.master_netdev,
515
                                  result.output.strip()))
516

    
517
    if not (os.path.isfile(constants.NODE_INITD_SCRIPT) and
518
            os.access(constants.NODE_INITD_SCRIPT, os.X_OK)):
519
      raise errors.OpPrereqError("Init.d script '%s' missing or not"
520
                                 " executable." % constants.NODE_INITD_SCRIPT)
521

    
522
  def Exec(self, feedback_fn):
523
    """Initialize the cluster.
524

525
    """
526
    clustername = self.clustername
527
    hostname = self.hostname
528

    
529
    # set up the simple store
530
    self.sstore = ss = ssconf.SimpleStore()
531
    ss.SetKey(ss.SS_HYPERVISOR, self.op.hypervisor_type)
532
    ss.SetKey(ss.SS_MASTER_NODE, hostname.name)
533
    ss.SetKey(ss.SS_MASTER_IP, clustername.ip)
534
    ss.SetKey(ss.SS_MASTER_NETDEV, self.op.master_netdev)
535
    ss.SetKey(ss.SS_CLUSTER_NAME, clustername.name)
536
    ss.SetKey(ss.SS_FILE_STORAGE_DIR, self.op.file_storage_dir)
537

    
538
    # set up the inter-node password and certificate
539
    _InitGanetiServerSetup(ss)
540

    
541
    # start the master ip
542
    rpc.call_node_start_master(hostname.name)
543

    
544
    # set up ssh config and /etc/hosts
545
    f = open(constants.SSH_HOST_RSA_PUB, 'r')
546
    try:
547
      sshline = f.read()
548
    finally:
549
      f.close()
550
    sshkey = sshline.split(" ")[1]
551

    
552
    _AddHostToEtcHosts(hostname.name)
553
    _InitSSHSetup(hostname.name)
554

    
555
    # init of cluster config file
556
    self.cfg = cfgw = config.ConfigWriter()
557
    cfgw.InitConfig(hostname.name, hostname.ip, self.secondary_ip,
558
                    sshkey, self.op.mac_prefix,
559
                    self.op.vg_name, self.op.def_bridge)
560

    
561
    ssh.WriteKnownHostsFile(cfgw, ss, constants.SSH_KNOWN_HOSTS_FILE)
562

    
563

    
564
class LUDestroyCluster(NoHooksLU):
565
  """Logical unit for destroying the cluster.
566

567
  """
568
  _OP_REQP = []
569

    
570
  def CheckPrereq(self):
571
    """Check prerequisites.
572

573
    This checks whether the cluster is empty.
574

575
    Any errors are signalled by raising errors.OpPrereqError.
576

577
    """
578
    master = self.sstore.GetMasterNode()
579

    
580
    nodelist = self.cfg.GetNodeList()
581
    if len(nodelist) != 1 or nodelist[0] != master:
582
      raise errors.OpPrereqError("There are still %d node(s) in"
583
                                 " this cluster." % (len(nodelist) - 1))
584
    instancelist = self.cfg.GetInstanceList()
585
    if instancelist:
586
      raise errors.OpPrereqError("There are still %d instance(s) in"
587
                                 " this cluster." % len(instancelist))
588

    
589
  def Exec(self, feedback_fn):
590
    """Destroys the cluster.
591

592
    """
593
    master = self.sstore.GetMasterNode()
594
    if not rpc.call_node_stop_master(master):
595
      raise errors.OpExecError("Could not disable the master role")
596
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
597
    utils.CreateBackup(priv_key)
598
    utils.CreateBackup(pub_key)
599
    rpc.call_node_leave_cluster(master)
600

    
601

    
602
class LUVerifyCluster(NoHooksLU):
603
  """Verifies the cluster status.
604

605
  """
606
  _OP_REQP = ["skip_checks"]
607

    
608
  def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
609
                  remote_version, feedback_fn):
610
    """Run multiple tests against a node.
611

612
    Test list:
613
      - compares ganeti version
614
      - checks vg existance and size > 20G
615
      - checks config file checksum
616
      - checks ssh to other nodes
617

618
    Args:
619
      node: name of the node to check
620
      file_list: required list of files
621
      local_cksum: dictionary of local files and their checksums
622

623
    """
624
    # compares ganeti version
625
    local_version = constants.PROTOCOL_VERSION
626
    if not remote_version:
627
      feedback_fn("  - ERROR: connection to %s failed" % (node))
628
      return True
629

    
630
    if local_version != remote_version:
631
      feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
632
                      (local_version, node, remote_version))
633
      return True
634

    
635
    # checks vg existance and size > 20G
636

    
637
    bad = False
638
    if not vglist:
639
      feedback_fn("  - ERROR: unable to check volume groups on node %s." %
640
                      (node,))
641
      bad = True
642
    else:
643
      vgstatus = _HasValidVG(vglist, self.cfg.GetVGName())
644
      if vgstatus:
645
        feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
646
        bad = True
647

    
648
    # checks config file checksum
649
    # checks ssh to any
650

    
651
    if 'filelist' not in node_result:
652
      bad = True
653
      feedback_fn("  - ERROR: node hasn't returned file checksum data")
654
    else:
655
      remote_cksum = node_result['filelist']
656
      for file_name in file_list:
657
        if file_name not in remote_cksum:
658
          bad = True
659
          feedback_fn("  - ERROR: file '%s' missing" % file_name)
660
        elif remote_cksum[file_name] != local_cksum[file_name]:
661
          bad = True
662
          feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
663

    
664
    if 'nodelist' not in node_result:
665
      bad = True
666
      feedback_fn("  - ERROR: node hasn't returned node connectivity data")
667
    else:
668
      if node_result['nodelist']:
669
        bad = True
670
        for node in node_result['nodelist']:
671
          feedback_fn("  - ERROR: communication with node '%s': %s" %
672
                          (node, node_result['nodelist'][node]))
673
    hyp_result = node_result.get('hypervisor', None)
674
    if hyp_result is not None:
675
      feedback_fn("  - ERROR: hypervisor verify failure: '%s'" % hyp_result)
676
    return bad
677

    
678
  def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
679
                      node_instance, feedback_fn):
680
    """Verify an instance.
681

682
    This function checks to see if the required block devices are
683
    available on the instance's node.
684

685
    """
686
    bad = False
687

    
688
    node_current = instanceconfig.primary_node
689

    
690
    node_vol_should = {}
691
    instanceconfig.MapLVsByNode(node_vol_should)
692

    
693
    for node in node_vol_should:
694
      for volume in node_vol_should[node]:
695
        if node not in node_vol_is or volume not in node_vol_is[node]:
696
          feedback_fn("  - ERROR: volume %s missing on node %s" %
697
                          (volume, node))
698
          bad = True
699

    
700
    if not instanceconfig.status == 'down':
701
      if (node_current not in node_instance or
702
          not instance in node_instance[node_current]):
703
        feedback_fn("  - ERROR: instance %s not running on node %s" %
704
                        (instance, node_current))
705
        bad = True
706

    
707
    for node in node_instance:
708
      if (not node == node_current):
709
        if instance in node_instance[node]:
710
          feedback_fn("  - ERROR: instance %s should not run on node %s" %
711
                          (instance, node))
712
          bad = True
713

    
714
    return bad
715

    
716
  def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
717
    """Verify if there are any unknown volumes in the cluster.
718

719
    The .os, .swap and backup volumes are ignored. All other volumes are
720
    reported as unknown.
721

722
    """
723
    bad = False
724

    
725
    for node in node_vol_is:
726
      for volume in node_vol_is[node]:
727
        if node not in node_vol_should or volume not in node_vol_should[node]:
728
          feedback_fn("  - ERROR: volume %s on node %s should not exist" %
729
                      (volume, node))
730
          bad = True
731
    return bad
732

    
733
  def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
734
    """Verify the list of running instances.
735

736
    This checks what instances are running but unknown to the cluster.
737

738
    """
739
    bad = False
740
    for node in node_instance:
741
      for runninginstance in node_instance[node]:
742
        if runninginstance not in instancelist:
743
          feedback_fn("  - ERROR: instance %s on node %s should not exist" %
744
                          (runninginstance, node))
745
          bad = True
746
    return bad
747

    
748
  def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
749
    """Verify N+1 Memory Resilience.
750

751
    Check that if one single node dies we can still start all the instances it
752
    was primary for.
753

754
    """
755
    bad = False
756

    
757
    for node, nodeinfo in node_info.iteritems():
758
      # This code checks that every node which is now listed as secondary has
759
      # enough memory to host all instances it is supposed to should a single
760
      # other node in the cluster fail.
761
      # FIXME: not ready for failover to an arbitrary node
762
      # FIXME: does not support file-backed instances
763
      # WARNING: we currently take into account down instances as well as up
764
      # ones, considering that even if they're down someone might want to start
765
      # them even in the event of a node failure.
766
      for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
767
        needed_mem = 0
768
        for instance in instances:
769
          needed_mem += instance_cfg[instance].memory
770
        if nodeinfo['mfree'] < needed_mem:
771
          feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
772
                      " failovers should node %s fail" % (node, prinode))
773
          bad = True
774
    return bad
775

    
776
  def CheckPrereq(self):
777
    """Check prerequisites.
778

779
    Transform the list of checks we're going to skip into a set and check that
780
    all its members are valid.
781

782
    """
783
    self.skip_set = frozenset(self.op.skip_checks)
784
    if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
785
      raise errors.OpPrereqError("Invalid checks to be skipped specified")
786

    
787
  def Exec(self, feedback_fn):
788
    """Verify integrity of cluster, performing various test on nodes.
789

790
    """
791
    bad = False
792
    feedback_fn("* Verifying global settings")
793
    for msg in self.cfg.VerifyConfig():
794
      feedback_fn("  - ERROR: %s" % msg)
795

    
796
    vg_name = self.cfg.GetVGName()
797
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
798
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
799
    i_non_redundant = [] # Non redundant instances
800
    node_volume = {}
801
    node_instance = {}
802
    node_info = {}
803
    instance_cfg = {}
804

    
805
    # FIXME: verify OS list
806
    # do local checksums
807
    file_names = list(self.sstore.GetFileList())
808
    file_names.append(constants.SSL_CERT_FILE)
809
    file_names.append(constants.CLUSTER_CONF_FILE)
810
    local_checksums = utils.FingerprintFiles(file_names)
811

    
812
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
813
    all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
814
    all_instanceinfo = rpc.call_instance_list(nodelist)
815
    all_vglist = rpc.call_vg_list(nodelist)
816
    node_verify_param = {
817
      'filelist': file_names,
818
      'nodelist': nodelist,
819
      'hypervisor': None,
820
      }
821
    all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
822
    all_rversion = rpc.call_version(nodelist)
823
    all_ninfo = rpc.call_node_info(nodelist, self.cfg.GetVGName())
824

    
825
    for node in nodelist:
826
      feedback_fn("* Verifying node %s" % node)
827
      result = self._VerifyNode(node, file_names, local_checksums,
828
                                all_vglist[node], all_nvinfo[node],
829
                                all_rversion[node], feedback_fn)
830
      bad = bad or result
831

    
832
      # node_volume
833
      volumeinfo = all_volumeinfo[node]
834

    
835
      if isinstance(volumeinfo, basestring):
836
        feedback_fn("  - ERROR: LVM problem on node %s: %s" %
837
                    (node, volumeinfo[-400:].encode('string_escape')))
838
        bad = True
839
        node_volume[node] = {}
840
      elif not isinstance(volumeinfo, dict):
841
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
842
        bad = True
843
        continue
844
      else:
845
        node_volume[node] = volumeinfo
846

    
847
      # node_instance
848
      nodeinstance = all_instanceinfo[node]
849
      if type(nodeinstance) != list:
850
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
851
        bad = True
852
        continue
853

    
854
      node_instance[node] = nodeinstance
855

    
856
      # node_info
857
      nodeinfo = all_ninfo[node]
858
      if not isinstance(nodeinfo, dict):
859
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
860
        bad = True
861
        continue
862

    
863
      try:
864
        node_info[node] = {
865
          "mfree": int(nodeinfo['memory_free']),
866
          "dfree": int(nodeinfo['vg_free']),
867
          "pinst": [],
868
          "sinst": [],
869
          # dictionary holding all instances this node is secondary for,
870
          # grouped by their primary node. Each key is a cluster node, and each
871
          # value is a list of instances which have the key as primary and the
872
          # current node as secondary.  this is handy to calculate N+1 memory
873
          # availability if you can only failover from a primary to its
874
          # secondary.
875
          "sinst-by-pnode": {},
876
        }
877
      except ValueError:
878
        feedback_fn("  - ERROR: invalid value returned from node %s" % (node,))
879
        bad = True
880
        continue
881

    
882
    node_vol_should = {}
883

    
884
    for instance in instancelist:
885
      feedback_fn("* Verifying instance %s" % instance)
886
      inst_config = self.cfg.GetInstanceInfo(instance)
887
      result =  self._VerifyInstance(instance, inst_config, node_volume,
888
                                     node_instance, feedback_fn)
889
      bad = bad or result
890

    
891
      inst_config.MapLVsByNode(node_vol_should)
892

    
893
      instance_cfg[instance] = inst_config
894

    
895
      pnode = inst_config.primary_node
896
      if pnode in node_info:
897
        node_info[pnode]['pinst'].append(instance)
898
      else:
899
        feedback_fn("  - ERROR: instance %s, connection to primary node"
900
                    " %s failed" % (instance, pnode))
901
        bad = True
902

    
903
      # If the instance is non-redundant we cannot survive losing its primary
904
      # node, so we are not N+1 compliant. On the other hand we have no disk
905
      # templates with more than one secondary so that situation is not well
906
      # supported either.
907
      # FIXME: does not support file-backed instances
908
      if len(inst_config.secondary_nodes) == 0:
909
        i_non_redundant.append(instance)
910
      elif len(inst_config.secondary_nodes) > 1:
911
        feedback_fn("  - WARNING: multiple secondaries for instance %s"
912
                    % instance)
913

    
914
      for snode in inst_config.secondary_nodes:
915
        if snode in node_info:
916
          node_info[snode]['sinst'].append(instance)
917
          if pnode not in node_info[snode]['sinst-by-pnode']:
918
            node_info[snode]['sinst-by-pnode'][pnode] = []
919
          node_info[snode]['sinst-by-pnode'][pnode].append(instance)
920
        else:
921
          feedback_fn("  - ERROR: instance %s, connection to secondary node"
922
                      " %s failed" % (instance, snode))
923

    
924
    feedback_fn("* Verifying orphan volumes")
925
    result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
926
                                       feedback_fn)
927
    bad = bad or result
928

    
929
    feedback_fn("* Verifying remaining instances")
930
    result = self._VerifyOrphanInstances(instancelist, node_instance,
931
                                         feedback_fn)
932
    bad = bad or result
933

    
934
    if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
935
      feedback_fn("* Verifying N+1 Memory redundancy")
936
      result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
937
      bad = bad or result
938

    
939
    feedback_fn("* Other Notes")
940
    if i_non_redundant:
941
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
942
                  % len(i_non_redundant))
943

    
944
    return int(bad)
945

    
946

    
947
class LUVerifyDisks(NoHooksLU):
948
  """Verifies the cluster disks status.
949

950
  """
951
  _OP_REQP = []
952

    
953
  def CheckPrereq(self):
954
    """Check prerequisites.
955

956
    This has no prerequisites.
957

958
    """
959
    pass
960

    
961
  def Exec(self, feedback_fn):
962
    """Verify integrity of cluster disks.
963

964
    """
965
    result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
966

    
967
    vg_name = self.cfg.GetVGName()
968
    nodes = utils.NiceSort(self.cfg.GetNodeList())
969
    instances = [self.cfg.GetInstanceInfo(name)
970
                 for name in self.cfg.GetInstanceList()]
971

    
972
    nv_dict = {}
973
    for inst in instances:
974
      inst_lvs = {}
975
      if (inst.status != "up" or
976
          inst.disk_template not in constants.DTS_NET_MIRROR):
977
        continue
978
      inst.MapLVsByNode(inst_lvs)
979
      # transform { iname: {node: [vol,],},} to {(node, vol): iname}
980
      for node, vol_list in inst_lvs.iteritems():
981
        for vol in vol_list:
982
          nv_dict[(node, vol)] = inst
983

    
984
    if not nv_dict:
985
      return result
986

    
987
    node_lvs = rpc.call_volume_list(nodes, vg_name)
988

    
989
    to_act = set()
990
    for node in nodes:
991
      # node_volume
992
      lvs = node_lvs[node]
993

    
994
      if isinstance(lvs, basestring):
995
        logger.Info("error enumerating LVs on node %s: %s" % (node, lvs))
996
        res_nlvm[node] = lvs
997
      elif not isinstance(lvs, dict):
998
        logger.Info("connection to node %s failed or invalid data returned" %
999
                    (node,))
1000
        res_nodes.append(node)
1001
        continue
1002

    
1003
      for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
1004
        inst = nv_dict.pop((node, lv_name), None)
1005
        if (not lv_online and inst is not None
1006
            and inst.name not in res_instances):
1007
          res_instances.append(inst.name)
1008

    
1009
    # any leftover items in nv_dict are missing LVs, let's arrange the
1010
    # data better
1011
    for key, inst in nv_dict.iteritems():
1012
      if inst.name not in res_missing:
1013
        res_missing[inst.name] = []
1014
      res_missing[inst.name].append(key)
1015

    
1016
    return result
1017

    
1018

    
1019
class LURenameCluster(LogicalUnit):
1020
  """Rename the cluster.
1021

1022
  """
1023
  HPATH = "cluster-rename"
1024
  HTYPE = constants.HTYPE_CLUSTER
1025
  _OP_REQP = ["name"]
1026

    
1027
  def BuildHooksEnv(self):
1028
    """Build hooks env.
1029

1030
    """
1031
    env = {
1032
      "OP_TARGET": self.sstore.GetClusterName(),
1033
      "NEW_NAME": self.op.name,
1034
      }
1035
    mn = self.sstore.GetMasterNode()
1036
    return env, [mn], [mn]
1037

    
1038
  def CheckPrereq(self):
1039
    """Verify that the passed name is a valid one.
1040

1041
    """
1042
    hostname = utils.HostInfo(self.op.name)
1043

    
1044
    new_name = hostname.name
1045
    self.ip = new_ip = hostname.ip
1046
    old_name = self.sstore.GetClusterName()
1047
    old_ip = self.sstore.GetMasterIP()
1048
    if new_name == old_name and new_ip == old_ip:
1049
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
1050
                                 " cluster has changed")
1051
    if new_ip != old_ip:
1052
      result = utils.RunCmd(["fping", "-q", new_ip])
1053
      if not result.failed:
1054
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
1055
                                   " reachable on the network. Aborting." %
1056
                                   new_ip)
1057

    
1058
    self.op.name = new_name
1059

    
1060
  def Exec(self, feedback_fn):
1061
    """Rename the cluster.
1062

1063
    """
1064
    clustername = self.op.name
1065
    ip = self.ip
1066
    ss = self.sstore
1067

    
1068
    # shutdown the master IP
1069
    master = ss.GetMasterNode()
1070
    if not rpc.call_node_stop_master(master):
1071
      raise errors.OpExecError("Could not disable the master role")
1072

    
1073
    try:
1074
      # modify the sstore
1075
      ss.SetKey(ss.SS_MASTER_IP, ip)
1076
      ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
1077

    
1078
      # Distribute updated ss config to all nodes
1079
      myself = self.cfg.GetNodeInfo(master)
1080
      dist_nodes = self.cfg.GetNodeList()
1081
      if myself.name in dist_nodes:
1082
        dist_nodes.remove(myself.name)
1083

    
1084
      logger.Debug("Copying updated ssconf data to all nodes")
1085
      for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1086
        fname = ss.KeyToFilename(keyname)
1087
        result = rpc.call_upload_file(dist_nodes, fname)
1088
        for to_node in dist_nodes:
1089
          if not result[to_node]:
1090
            logger.Error("copy of file %s to node %s failed" %
1091
                         (fname, to_node))
1092
    finally:
1093
      if not rpc.call_node_start_master(master):
1094
        logger.Error("Could not re-enable the master role on the master,"
1095
                     " please restart manually.")
1096

    
1097

    
1098
def _RecursiveCheckIfLVMBased(disk):
1099
  """Check if the given disk or its children are lvm-based.
1100

1101
  Args:
1102
    disk: ganeti.objects.Disk object
1103

1104
  Returns:
1105
    boolean indicating whether a LD_LV dev_type was found or not
1106

1107
  """
1108
  if disk.children:
1109
    for chdisk in disk.children:
1110
      if _RecursiveCheckIfLVMBased(chdisk):
1111
        return True
1112
  return disk.dev_type == constants.LD_LV
1113

    
1114

    
1115
class LUSetClusterParams(LogicalUnit):
1116
  """Change the parameters of the cluster.
1117

1118
  """
1119
  HPATH = "cluster-modify"
1120
  HTYPE = constants.HTYPE_CLUSTER
1121
  _OP_REQP = []
1122

    
1123
  def BuildHooksEnv(self):
1124
    """Build hooks env.
1125

1126
    """
1127
    env = {
1128
      "OP_TARGET": self.sstore.GetClusterName(),
1129
      "NEW_VG_NAME": self.op.vg_name,
1130
      }
1131
    mn = self.sstore.GetMasterNode()
1132
    return env, [mn], [mn]
1133

    
1134
  def CheckPrereq(self):
1135
    """Check prerequisites.
1136

1137
    This checks whether the given params don't conflict and
1138
    if the given volume group is valid.
1139

1140
    """
1141
    if not self.op.vg_name:
1142
      instances = [self.cfg.GetInstanceInfo(name)
1143
                   for name in self.cfg.GetInstanceList()]
1144
      for inst in instances:
1145
        for disk in inst.disks:
1146
          if _RecursiveCheckIfLVMBased(disk):
1147
            raise errors.OpPrereqError("Cannot disable lvm storage while"
1148
                                       " lvm-based instances exist")
1149

    
1150
    # if vg_name not None, checks given volume group on all nodes
1151
    if self.op.vg_name:
1152
      node_list = self.cfg.GetNodeList()
1153
      vglist = rpc.call_vg_list(node_list)
1154
      for node in node_list:
1155
        vgstatus = _HasValidVG(vglist[node], self.op.vg_name)
1156
        if vgstatus:
1157
          raise errors.OpPrereqError("Error on node '%s': %s" %
1158
                                     (node, vgstatus))
1159

    
1160
  def Exec(self, feedback_fn):
1161
    """Change the parameters of the cluster.
1162

1163
    """
1164
    if self.op.vg_name != self.cfg.GetVGName():
1165
      self.cfg.SetVGName(self.op.vg_name)
1166
    else:
1167
      feedback_fn("Cluster LVM configuration already in desired"
1168
                  " state, not changing")
1169

    
1170

    
1171
def _WaitForSync(cfgw, instance, proc, oneshot=False, unlock=False):
1172
  """Sleep and poll for an instance's disk to sync.
1173

1174
  """
1175
  if not instance.disks:
1176
    return True
1177

    
1178
  if not oneshot:
1179
    proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1180

    
1181
  node = instance.primary_node
1182

    
1183
  for dev in instance.disks:
1184
    cfgw.SetDiskID(dev, node)
1185

    
1186
  retries = 0
1187
  while True:
1188
    max_time = 0
1189
    done = True
1190
    cumul_degraded = False
1191
    rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1192
    if not rstats:
1193
      proc.LogWarning("Can't get any data from node %s" % node)
1194
      retries += 1
1195
      if retries >= 10:
1196
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1197
                                 " aborting." % node)
1198
      time.sleep(6)
1199
      continue
1200
    retries = 0
1201
    for i in range(len(rstats)):
1202
      mstat = rstats[i]
1203
      if mstat is None:
1204
        proc.LogWarning("Can't compute data for node %s/%s" %
1205
                        (node, instance.disks[i].iv_name))
1206
        continue
1207
      # we ignore the ldisk parameter
1208
      perc_done, est_time, is_degraded, _ = mstat
1209
      cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1210
      if perc_done is not None:
1211
        done = False
1212
        if est_time is not None:
1213
          rem_time = "%d estimated seconds remaining" % est_time
1214
          max_time = est_time
1215
        else:
1216
          rem_time = "no time estimate"
1217
        proc.LogInfo("- device %s: %5.2f%% done, %s" %
1218
                     (instance.disks[i].iv_name, perc_done, rem_time))
1219
    if done or oneshot:
1220
      break
1221

    
1222
    if unlock:
1223
      utils.Unlock('cmd')
1224
    try:
1225
      time.sleep(min(60, max_time))
1226
    finally:
1227
      if unlock:
1228
        utils.Lock('cmd')
1229

    
1230
  if done:
1231
    proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1232
  return not cumul_degraded
1233

    
1234

    
1235
def _CheckDiskConsistency(cfgw, dev, node, on_primary, ldisk=False):
1236
  """Check that mirrors are not degraded.
1237

1238
  The ldisk parameter, if True, will change the test from the
1239
  is_degraded attribute (which represents overall non-ok status for
1240
  the device(s)) to the ldisk (representing the local storage status).
1241

1242
  """
1243
  cfgw.SetDiskID(dev, node)
1244
  if ldisk:
1245
    idx = 6
1246
  else:
1247
    idx = 5
1248

    
1249
  result = True
1250
  if on_primary or dev.AssembleOnSecondary():
1251
    rstats = rpc.call_blockdev_find(node, dev)
1252
    if not rstats:
1253
      logger.ToStderr("Node %s: Disk degraded, not found or node down" % node)
1254
      result = False
1255
    else:
1256
      result = result and (not rstats[idx])
1257
  if dev.children:
1258
    for child in dev.children:
1259
      result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1260

    
1261
  return result
1262

    
1263

    
1264
class LUDiagnoseOS(NoHooksLU):
1265
  """Logical unit for OS diagnose/query.
1266

1267
  """
1268
  _OP_REQP = ["output_fields", "names"]
1269

    
1270
  def CheckPrereq(self):
1271
    """Check prerequisites.
1272

1273
    This always succeeds, since this is a pure query LU.
1274

1275
    """
1276
    if self.op.names:
1277
      raise errors.OpPrereqError("Selective OS query not supported")
1278

    
1279
    self.dynamic_fields = frozenset(["name", "valid", "node_status"])
1280
    _CheckOutputFields(static=[],
1281
                       dynamic=self.dynamic_fields,
1282
                       selected=self.op.output_fields)
1283

    
1284
  @staticmethod
1285
  def _DiagnoseByOS(node_list, rlist):
1286
    """Remaps a per-node return list into an a per-os per-node dictionary
1287

1288
      Args:
1289
        node_list: a list with the names of all nodes
1290
        rlist: a map with node names as keys and OS objects as values
1291

1292
      Returns:
1293
        map: a map with osnames as keys and as value another map, with
1294
             nodes as
1295
             keys and list of OS objects as values
1296
             e.g. {"debian-etch": {"node1": [<object>,...],
1297
                                   "node2": [<object>,]}
1298
                  }
1299

1300
    """
1301
    all_os = {}
1302
    for node_name, nr in rlist.iteritems():
1303
      if not nr:
1304
        continue
1305
      for os in nr:
1306
        if os.name not in all_os:
1307
          # build a list of nodes for this os containing empty lists
1308
          # for each node in node_list
1309
          all_os[os.name] = {}
1310
          for nname in node_list:
1311
            all_os[os.name][nname] = []
1312
        all_os[os.name][node_name].append(os)
1313
    return all_os
1314

    
1315
  def Exec(self, feedback_fn):
1316
    """Compute the list of OSes.
1317

1318
    """
1319
    node_list = self.cfg.GetNodeList()
1320
    node_data = rpc.call_os_diagnose(node_list)
1321
    if node_data == False:
1322
      raise errors.OpExecError("Can't gather the list of OSes")
1323
    pol = self._DiagnoseByOS(node_list, node_data)
1324
    output = []
1325
    for os_name, os_data in pol.iteritems():
1326
      row = []
1327
      for field in self.op.output_fields:
1328
        if field == "name":
1329
          val = os_name
1330
        elif field == "valid":
1331
          val = utils.all([osl and osl[0] for osl in os_data.values()])
1332
        elif field == "node_status":
1333
          val = {}
1334
          for node_name, nos_list in os_data.iteritems():
1335
            val[node_name] = [(v.status, v.path) for v in nos_list]
1336
        else:
1337
          raise errors.ParameterError(field)
1338
        row.append(val)
1339
      output.append(row)
1340

    
1341
    return output
1342

    
1343

    
1344
class LURemoveNode(LogicalUnit):
1345
  """Logical unit for removing a node.
1346

1347
  """
1348
  HPATH = "node-remove"
1349
  HTYPE = constants.HTYPE_NODE
1350
  _OP_REQP = ["node_name"]
1351

    
1352
  def BuildHooksEnv(self):
1353
    """Build hooks env.
1354

1355
    This doesn't run on the target node in the pre phase as a failed
1356
    node would not allows itself to run.
1357

1358
    """
1359
    env = {
1360
      "OP_TARGET": self.op.node_name,
1361
      "NODE_NAME": self.op.node_name,
1362
      }
1363
    all_nodes = self.cfg.GetNodeList()
1364
    all_nodes.remove(self.op.node_name)
1365
    return env, all_nodes, all_nodes
1366

    
1367
  def CheckPrereq(self):
1368
    """Check prerequisites.
1369

1370
    This checks:
1371
     - the node exists in the configuration
1372
     - it does not have primary or secondary instances
1373
     - it's not the master
1374

1375
    Any errors are signalled by raising errors.OpPrereqError.
1376

1377
    """
1378
    node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1379
    if node is None:
1380
      raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1381

    
1382
    instance_list = self.cfg.GetInstanceList()
1383

    
1384
    masternode = self.sstore.GetMasterNode()
1385
    if node.name == masternode:
1386
      raise errors.OpPrereqError("Node is the master node,"
1387
                                 " you need to failover first.")
1388

    
1389
    for instance_name in instance_list:
1390
      instance = self.cfg.GetInstanceInfo(instance_name)
1391
      if node.name == instance.primary_node:
1392
        raise errors.OpPrereqError("Instance %s still running on the node,"
1393
                                   " please remove first." % instance_name)
1394
      if node.name in instance.secondary_nodes:
1395
        raise errors.OpPrereqError("Instance %s has node as a secondary,"
1396
                                   " please remove first." % instance_name)
1397
    self.op.node_name = node.name
1398
    self.node = node
1399

    
1400
  def Exec(self, feedback_fn):
1401
    """Removes the node from the cluster.
1402

1403
    """
1404
    node = self.node
1405
    logger.Info("stopping the node daemon and removing configs from node %s" %
1406
                node.name)
1407

    
1408
    rpc.call_node_leave_cluster(node.name)
1409

    
1410
    self.ssh.Run(node.name, 'root', "%s stop" % constants.NODE_INITD_SCRIPT)
1411

    
1412
    logger.Info("Removing node %s from config" % node.name)
1413

    
1414
    self.cfg.RemoveNode(node.name)
1415

    
1416
    _RemoveHostFromEtcHosts(node.name)
1417

    
1418

    
1419
class LUQueryNodes(NoHooksLU):
1420
  """Logical unit for querying nodes.
1421

1422
  """
1423
  _OP_REQP = ["output_fields", "names"]
1424

    
1425
  def CheckPrereq(self):
1426
    """Check prerequisites.
1427

1428
    This checks that the fields required are valid output fields.
1429

1430
    """
1431
    self.dynamic_fields = frozenset(["dtotal", "dfree",
1432
                                     "mtotal", "mnode", "mfree",
1433
                                     "bootid"])
1434

    
1435
    _CheckOutputFields(static=["name", "pinst_cnt", "sinst_cnt",
1436
                               "pinst_list", "sinst_list",
1437
                               "pip", "sip"],
1438
                       dynamic=self.dynamic_fields,
1439
                       selected=self.op.output_fields)
1440

    
1441
    self.wanted = _GetWantedNodes(self, self.op.names)
1442

    
1443
  def Exec(self, feedback_fn):
1444
    """Computes the list of nodes and their attributes.
1445

1446
    """
1447
    nodenames = self.wanted
1448
    nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1449

    
1450
    # begin data gathering
1451

    
1452
    if self.dynamic_fields.intersection(self.op.output_fields):
1453
      live_data = {}
1454
      node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1455
      for name in nodenames:
1456
        nodeinfo = node_data.get(name, None)
1457
        if nodeinfo:
1458
          live_data[name] = {
1459
            "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1460
            "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1461
            "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1462
            "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1463
            "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1464
            "bootid": nodeinfo['bootid'],
1465
            }
1466
        else:
1467
          live_data[name] = {}
1468
    else:
1469
      live_data = dict.fromkeys(nodenames, {})
1470

    
1471
    node_to_primary = dict([(name, set()) for name in nodenames])
1472
    node_to_secondary = dict([(name, set()) for name in nodenames])
1473

    
1474
    inst_fields = frozenset(("pinst_cnt", "pinst_list",
1475
                             "sinst_cnt", "sinst_list"))
1476
    if inst_fields & frozenset(self.op.output_fields):
1477
      instancelist = self.cfg.GetInstanceList()
1478

    
1479
      for instance_name in instancelist:
1480
        inst = self.cfg.GetInstanceInfo(instance_name)
1481
        if inst.primary_node in node_to_primary:
1482
          node_to_primary[inst.primary_node].add(inst.name)
1483
        for secnode in inst.secondary_nodes:
1484
          if secnode in node_to_secondary:
1485
            node_to_secondary[secnode].add(inst.name)
1486

    
1487
    # end data gathering
1488

    
1489
    output = []
1490
    for node in nodelist:
1491
      node_output = []
1492
      for field in self.op.output_fields:
1493
        if field == "name":
1494
          val = node.name
1495
        elif field == "pinst_list":
1496
          val = list(node_to_primary[node.name])
1497
        elif field == "sinst_list":
1498
          val = list(node_to_secondary[node.name])
1499
        elif field == "pinst_cnt":
1500
          val = len(node_to_primary[node.name])
1501
        elif field == "sinst_cnt":
1502
          val = len(node_to_secondary[node.name])
1503
        elif field == "pip":
1504
          val = node.primary_ip
1505
        elif field == "sip":
1506
          val = node.secondary_ip
1507
        elif field in self.dynamic_fields:
1508
          val = live_data[node.name].get(field, None)
1509
        else:
1510
          raise errors.ParameterError(field)
1511
        node_output.append(val)
1512
      output.append(node_output)
1513

    
1514
    return output
1515

    
1516

    
1517
class LUQueryNodeVolumes(NoHooksLU):
1518
  """Logical unit for getting volumes on node(s).
1519

1520
  """
1521
  _OP_REQP = ["nodes", "output_fields"]
1522

    
1523
  def CheckPrereq(self):
1524
    """Check prerequisites.
1525

1526
    This checks that the fields required are valid output fields.
1527

1528
    """
1529
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1530

    
1531
    _CheckOutputFields(static=["node"],
1532
                       dynamic=["phys", "vg", "name", "size", "instance"],
1533
                       selected=self.op.output_fields)
1534

    
1535

    
1536
  def Exec(self, feedback_fn):
1537
    """Computes the list of nodes and their attributes.
1538

1539
    """
1540
    nodenames = self.nodes
1541
    volumes = rpc.call_node_volumes(nodenames)
1542

    
1543
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
1544
             in self.cfg.GetInstanceList()]
1545

    
1546
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1547

    
1548
    output = []
1549
    for node in nodenames:
1550
      if node not in volumes or not volumes[node]:
1551
        continue
1552

    
1553
      node_vols = volumes[node][:]
1554
      node_vols.sort(key=lambda vol: vol['dev'])
1555

    
1556
      for vol in node_vols:
1557
        node_output = []
1558
        for field in self.op.output_fields:
1559
          if field == "node":
1560
            val = node
1561
          elif field == "phys":
1562
            val = vol['dev']
1563
          elif field == "vg":
1564
            val = vol['vg']
1565
          elif field == "name":
1566
            val = vol['name']
1567
          elif field == "size":
1568
            val = int(float(vol['size']))
1569
          elif field == "instance":
1570
            for inst in ilist:
1571
              if node not in lv_by_node[inst]:
1572
                continue
1573
              if vol['name'] in lv_by_node[inst][node]:
1574
                val = inst.name
1575
                break
1576
            else:
1577
              val = '-'
1578
          else:
1579
            raise errors.ParameterError(field)
1580
          node_output.append(str(val))
1581

    
1582
        output.append(node_output)
1583

    
1584
    return output
1585

    
1586

    
1587
class LUAddNode(LogicalUnit):
1588
  """Logical unit for adding node to the cluster.
1589

1590
  """
1591
  HPATH = "node-add"
1592
  HTYPE = constants.HTYPE_NODE
1593
  _OP_REQP = ["node_name"]
1594

    
1595
  def BuildHooksEnv(self):
1596
    """Build hooks env.
1597

1598
    This will run on all nodes before, and on all nodes + the new node after.
1599

1600
    """
1601
    env = {
1602
      "OP_TARGET": self.op.node_name,
1603
      "NODE_NAME": self.op.node_name,
1604
      "NODE_PIP": self.op.primary_ip,
1605
      "NODE_SIP": self.op.secondary_ip,
1606
      }
1607
    nodes_0 = self.cfg.GetNodeList()
1608
    nodes_1 = nodes_0 + [self.op.node_name, ]
1609
    return env, nodes_0, nodes_1
1610

    
1611
  def CheckPrereq(self):
1612
    """Check prerequisites.
1613

1614
    This checks:
1615
     - the new node is not already in the config
1616
     - it is resolvable
1617
     - its parameters (single/dual homed) matches the cluster
1618

1619
    Any errors are signalled by raising errors.OpPrereqError.
1620

1621
    """
1622
    node_name = self.op.node_name
1623
    cfg = self.cfg
1624

    
1625
    dns_data = utils.HostInfo(node_name)
1626

    
1627
    node = dns_data.name
1628
    primary_ip = self.op.primary_ip = dns_data.ip
1629
    secondary_ip = getattr(self.op, "secondary_ip", None)
1630
    if secondary_ip is None:
1631
      secondary_ip = primary_ip
1632
    if not utils.IsValidIP(secondary_ip):
1633
      raise errors.OpPrereqError("Invalid secondary IP given")
1634
    self.op.secondary_ip = secondary_ip
1635
    node_list = cfg.GetNodeList()
1636
    if node in node_list:
1637
      raise errors.OpPrereqError("Node %s is already in the configuration"
1638
                                 % node)
1639

    
1640
    for existing_node_name in node_list:
1641
      existing_node = cfg.GetNodeInfo(existing_node_name)
1642
      if (existing_node.primary_ip == primary_ip or
1643
          existing_node.secondary_ip == primary_ip or
1644
          existing_node.primary_ip == secondary_ip or
1645
          existing_node.secondary_ip == secondary_ip):
1646
        raise errors.OpPrereqError("New node ip address(es) conflict with"
1647
                                   " existing node %s" % existing_node.name)
1648

    
1649
    # check that the type of the node (single versus dual homed) is the
1650
    # same as for the master
1651
    myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1652
    master_singlehomed = myself.secondary_ip == myself.primary_ip
1653
    newbie_singlehomed = secondary_ip == primary_ip
1654
    if master_singlehomed != newbie_singlehomed:
1655
      if master_singlehomed:
1656
        raise errors.OpPrereqError("The master has no private ip but the"
1657
                                   " new node has one")
1658
      else:
1659
        raise errors.OpPrereqError("The master has a private ip but the"
1660
                                   " new node doesn't have one")
1661

    
1662
    # checks reachablity
1663
    if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1664
      raise errors.OpPrereqError("Node not reachable by ping")
1665

    
1666
    if not newbie_singlehomed:
1667
      # check reachability from my secondary ip to newbie's secondary ip
1668
      if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1669
                           source=myself.secondary_ip):
1670
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1671
                                   " based ping to noded port")
1672

    
1673
    self.new_node = objects.Node(name=node,
1674
                                 primary_ip=primary_ip,
1675
                                 secondary_ip=secondary_ip)
1676

    
1677
    if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1678
      if not os.path.exists(constants.VNC_PASSWORD_FILE):
1679
        raise errors.OpPrereqError("Cluster VNC password file %s missing" %
1680
                                   constants.VNC_PASSWORD_FILE)
1681

    
1682
  def Exec(self, feedback_fn):
1683
    """Adds the new node to the cluster.
1684

1685
    """
1686
    new_node = self.new_node
1687
    node = new_node.name
1688

    
1689
    # set up inter-node password and certificate and restarts the node daemon
1690
    gntpass = self.sstore.GetNodeDaemonPassword()
1691
    if not re.match('^[a-zA-Z0-9.]{1,64}$', gntpass):
1692
      raise errors.OpExecError("ganeti password corruption detected")
1693
    f = open(constants.SSL_CERT_FILE)
1694
    try:
1695
      gntpem = f.read(8192)
1696
    finally:
1697
      f.close()
1698
    # in the base64 pem encoding, neither '!' nor '.' are valid chars,
1699
    # so we use this to detect an invalid certificate; as long as the
1700
    # cert doesn't contain this, the here-document will be correctly
1701
    # parsed by the shell sequence below
1702
    if re.search('^!EOF\.', gntpem, re.MULTILINE):
1703
      raise errors.OpExecError("invalid PEM encoding in the SSL certificate")
1704
    if not gntpem.endswith("\n"):
1705
      raise errors.OpExecError("PEM must end with newline")
1706
    logger.Info("copy cluster pass to %s and starting the node daemon" % node)
1707

    
1708
    # and then connect with ssh to set password and start ganeti-noded
1709
    # note that all the below variables are sanitized at this point,
1710
    # either by being constants or by the checks above
1711
    ss = self.sstore
1712
    mycommand = ("umask 077 && "
1713
                 "echo '%s' > '%s' && "
1714
                 "cat > '%s' << '!EOF.' && \n"
1715
                 "%s!EOF.\n%s restart" %
1716
                 (gntpass, ss.KeyToFilename(ss.SS_NODED_PASS),
1717
                  constants.SSL_CERT_FILE, gntpem,
1718
                  constants.NODE_INITD_SCRIPT))
1719

    
1720
    result = self.ssh.Run(node, 'root', mycommand, batch=False, ask_key=True)
1721
    if result.failed:
1722
      raise errors.OpExecError("Remote command on node %s, error: %s,"
1723
                               " output: %s" %
1724
                               (node, result.fail_reason, result.output))
1725

    
1726
    # check connectivity
1727
    time.sleep(4)
1728

    
1729
    result = rpc.call_version([node])[node]
1730
    if result:
1731
      if constants.PROTOCOL_VERSION == result:
1732
        logger.Info("communication to node %s fine, sw version %s match" %
1733
                    (node, result))
1734
      else:
1735
        raise errors.OpExecError("Version mismatch master version %s,"
1736
                                 " node version %s" %
1737
                                 (constants.PROTOCOL_VERSION, result))
1738
    else:
1739
      raise errors.OpExecError("Cannot get version from the new node")
1740

    
1741
    # setup ssh on node
1742
    logger.Info("copy ssh key to node %s" % node)
1743
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1744
    keyarray = []
1745
    keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1746
                constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1747
                priv_key, pub_key]
1748

    
1749
    for i in keyfiles:
1750
      f = open(i, 'r')
1751
      try:
1752
        keyarray.append(f.read())
1753
      finally:
1754
        f.close()
1755

    
1756
    result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1757
                               keyarray[3], keyarray[4], keyarray[5])
1758

    
1759
    if not result:
1760
      raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1761

    
1762
    # Add node to our /etc/hosts, and add key to known_hosts
1763
    _AddHostToEtcHosts(new_node.name)
1764

    
1765
    if new_node.secondary_ip != new_node.primary_ip:
1766
      if not rpc.call_node_tcp_ping(new_node.name,
1767
                                    constants.LOCALHOST_IP_ADDRESS,
1768
                                    new_node.secondary_ip,
1769
                                    constants.DEFAULT_NODED_PORT,
1770
                                    10, False):
1771
        raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1772
                                 " you gave (%s). Please fix and re-run this"
1773
                                 " command." % new_node.secondary_ip)
1774

    
1775
    success, msg = self.ssh.VerifyNodeHostname(node)
1776
    if not success:
1777
      raise errors.OpExecError("Node '%s' claims it has a different hostname"
1778
                               " than the one the resolver gives: %s."
1779
                               " Please fix and re-run this command." %
1780
                               (node, msg))
1781

    
1782
    # Distribute updated /etc/hosts and known_hosts to all nodes,
1783
    # including the node just added
1784
    myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1785
    dist_nodes = self.cfg.GetNodeList() + [node]
1786
    if myself.name in dist_nodes:
1787
      dist_nodes.remove(myself.name)
1788

    
1789
    logger.Debug("Copying hosts and known_hosts to all nodes")
1790
    for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1791
      result = rpc.call_upload_file(dist_nodes, fname)
1792
      for to_node in dist_nodes:
1793
        if not result[to_node]:
1794
          logger.Error("copy of file %s to node %s failed" %
1795
                       (fname, to_node))
1796

    
1797
    to_copy = ss.GetFileList()
1798
    if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1799
      to_copy.append(constants.VNC_PASSWORD_FILE)
1800
    for fname in to_copy:
1801
      if not self.ssh.CopyFileToNode(node, fname):
1802
        logger.Error("could not copy file %s to node %s" % (fname, node))
1803

    
1804
    logger.Info("adding node %s to cluster.conf" % node)
1805
    self.cfg.AddNode(new_node)
1806

    
1807

    
1808
class LUMasterFailover(LogicalUnit):
1809
  """Failover the master node to the current node.
1810

1811
  This is a special LU in that it must run on a non-master node.
1812

1813
  """
1814
  HPATH = "master-failover"
1815
  HTYPE = constants.HTYPE_CLUSTER
1816
  REQ_MASTER = False
1817
  _OP_REQP = []
1818

    
1819
  def BuildHooksEnv(self):
1820
    """Build hooks env.
1821

1822
    This will run on the new master only in the pre phase, and on all
1823
    the nodes in the post phase.
1824

1825
    """
1826
    env = {
1827
      "OP_TARGET": self.new_master,
1828
      "NEW_MASTER": self.new_master,
1829
      "OLD_MASTER": self.old_master,
1830
      }
1831
    return env, [self.new_master], self.cfg.GetNodeList()
1832

    
1833
  def CheckPrereq(self):
1834
    """Check prerequisites.
1835

1836
    This checks that we are not already the master.
1837

1838
    """
1839
    self.new_master = utils.HostInfo().name
1840
    self.old_master = self.sstore.GetMasterNode()
1841

    
1842
    if self.old_master == self.new_master:
1843
      raise errors.OpPrereqError("This commands must be run on the node"
1844
                                 " where you want the new master to be."
1845
                                 " %s is already the master" %
1846
                                 self.old_master)
1847

    
1848
  def Exec(self, feedback_fn):
1849
    """Failover the master node.
1850

1851
    This command, when run on a non-master node, will cause the current
1852
    master to cease being master, and the non-master to become new
1853
    master.
1854

1855
    """
1856
    #TODO: do not rely on gethostname returning the FQDN
1857
    logger.Info("setting master to %s, old master: %s" %
1858
                (self.new_master, self.old_master))
1859

    
1860
    if not rpc.call_node_stop_master(self.old_master):
1861
      logger.Error("could disable the master role on the old master"
1862
                   " %s, please disable manually" % self.old_master)
1863

    
1864
    ss = self.sstore
1865
    ss.SetKey(ss.SS_MASTER_NODE, self.new_master)
1866
    if not rpc.call_upload_file(self.cfg.GetNodeList(),
1867
                                ss.KeyToFilename(ss.SS_MASTER_NODE)):
1868
      logger.Error("could not distribute the new simple store master file"
1869
                   " to the other nodes, please check.")
1870

    
1871
    if not rpc.call_node_start_master(self.new_master):
1872
      logger.Error("could not start the master role on the new master"
1873
                   " %s, please check" % self.new_master)
1874
      feedback_fn("Error in activating the master IP on the new master,"
1875
                  " please fix manually.")
1876

    
1877

    
1878

    
1879
class LUQueryClusterInfo(NoHooksLU):
1880
  """Query cluster configuration.
1881

1882
  """
1883
  _OP_REQP = []
1884
  REQ_MASTER = False
1885

    
1886
  def CheckPrereq(self):
1887
    """No prerequsites needed for this LU.
1888

1889
    """
1890
    pass
1891

    
1892
  def Exec(self, feedback_fn):
1893
    """Return cluster config.
1894

1895
    """
1896
    result = {
1897
      "name": self.sstore.GetClusterName(),
1898
      "software_version": constants.RELEASE_VERSION,
1899
      "protocol_version": constants.PROTOCOL_VERSION,
1900
      "config_version": constants.CONFIG_VERSION,
1901
      "os_api_version": constants.OS_API_VERSION,
1902
      "export_version": constants.EXPORT_VERSION,
1903
      "master": self.sstore.GetMasterNode(),
1904
      "architecture": (platform.architecture()[0], platform.machine()),
1905
      }
1906

    
1907
    return result
1908

    
1909

    
1910
class LUClusterCopyFile(NoHooksLU):
1911
  """Copy file to cluster.
1912

1913
  """
1914
  _OP_REQP = ["nodes", "filename"]
1915

    
1916
  def CheckPrereq(self):
1917
    """Check prerequisites.
1918

1919
    It should check that the named file exists and that the given list
1920
    of nodes is valid.
1921

1922
    """
1923
    if not os.path.exists(self.op.filename):
1924
      raise errors.OpPrereqError("No such filename '%s'" % self.op.filename)
1925

    
1926
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1927

    
1928
  def Exec(self, feedback_fn):
1929
    """Copy a file from master to some nodes.
1930

1931
    Args:
1932
      opts - class with options as members
1933
      args - list containing a single element, the file name
1934
    Opts used:
1935
      nodes - list containing the name of target nodes; if empty, all nodes
1936

1937
    """
1938
    filename = self.op.filename
1939

    
1940
    myname = utils.HostInfo().name
1941

    
1942
    for node in self.nodes:
1943
      if node == myname:
1944
        continue
1945
      if not self.ssh.CopyFileToNode(node, filename):
1946
        logger.Error("Copy of file %s to node %s failed" % (filename, node))
1947

    
1948

    
1949
class LUDumpClusterConfig(NoHooksLU):
1950
  """Return a text-representation of the cluster-config.
1951

1952
  """
1953
  _OP_REQP = []
1954

    
1955
  def CheckPrereq(self):
1956
    """No prerequisites.
1957

1958
    """
1959
    pass
1960

    
1961
  def Exec(self, feedback_fn):
1962
    """Dump a representation of the cluster config to the standard output.
1963

1964
    """
1965
    return self.cfg.DumpConfig()
1966

    
1967

    
1968
class LURunClusterCommand(NoHooksLU):
1969
  """Run a command on some nodes.
1970

1971
  """
1972
  _OP_REQP = ["command", "nodes"]
1973

    
1974
  def CheckPrereq(self):
1975
    """Check prerequisites.
1976

1977
    It checks that the given list of nodes is valid.
1978

1979
    """
1980
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1981

    
1982
  def Exec(self, feedback_fn):
1983
    """Run a command on some nodes.
1984

1985
    """
1986
    # put the master at the end of the nodes list
1987
    master_node = self.sstore.GetMasterNode()
1988
    if master_node in self.nodes:
1989
      self.nodes.remove(master_node)
1990
      self.nodes.append(master_node)
1991

    
1992
    data = []
1993
    for node in self.nodes:
1994
      result = self.ssh.Run(node, "root", self.op.command)
1995
      data.append((node, result.output, result.exit_code))
1996

    
1997
    return data
1998

    
1999

    
2000
class LUActivateInstanceDisks(NoHooksLU):
2001
  """Bring up an instance's disks.
2002

2003
  """
2004
  _OP_REQP = ["instance_name"]
2005

    
2006
  def CheckPrereq(self):
2007
    """Check prerequisites.
2008

2009
    This checks that the instance is in the cluster.
2010

2011
    """
2012
    instance = self.cfg.GetInstanceInfo(
2013
      self.cfg.ExpandInstanceName(self.op.instance_name))
2014
    if instance is None:
2015
      raise errors.OpPrereqError("Instance '%s' not known" %
2016
                                 self.op.instance_name)
2017
    self.instance = instance
2018

    
2019

    
2020
  def Exec(self, feedback_fn):
2021
    """Activate the disks.
2022

2023
    """
2024
    disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
2025
    if not disks_ok:
2026
      raise errors.OpExecError("Cannot activate block devices")
2027

    
2028
    return disks_info
2029

    
2030

    
2031
def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
2032
  """Prepare the block devices for an instance.
2033

2034
  This sets up the block devices on all nodes.
2035

2036
  Args:
2037
    instance: a ganeti.objects.Instance object
2038
    ignore_secondaries: if true, errors on secondary nodes won't result
2039
                        in an error return from the function
2040

2041
  Returns:
2042
    false if the operation failed
2043
    list of (host, instance_visible_name, node_visible_name) if the operation
2044
         suceeded with the mapping from node devices to instance devices
2045
  """
2046
  device_info = []
2047
  disks_ok = True
2048
  iname = instance.name
2049
  # With the two passes mechanism we try to reduce the window of
2050
  # opportunity for the race condition of switching DRBD to primary
2051
  # before handshaking occured, but we do not eliminate it
2052

    
2053
  # The proper fix would be to wait (with some limits) until the
2054
  # connection has been made and drbd transitions from WFConnection
2055
  # into any other network-connected state (Connected, SyncTarget,
2056
  # SyncSource, etc.)
2057

    
2058
  # 1st pass, assemble on all nodes in secondary mode
2059
  for inst_disk in instance.disks:
2060
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2061
      cfg.SetDiskID(node_disk, node)
2062
      result = rpc.call_blockdev_assemble(node, node_disk, iname, False)
2063
      if not result:
2064
        logger.Error("could not prepare block device %s on node %s"
2065
                     " (is_primary=False, pass=1)" % (inst_disk.iv_name, node))
2066
        if not ignore_secondaries:
2067
          disks_ok = False
2068

    
2069
  # FIXME: race condition on drbd migration to primary
2070

    
2071
  # 2nd pass, do only the primary node
2072
  for inst_disk in instance.disks:
2073
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2074
      if node != instance.primary_node:
2075
        continue
2076
      cfg.SetDiskID(node_disk, node)
2077
      result = rpc.call_blockdev_assemble(node, node_disk, iname, True)
2078
      if not result:
2079
        logger.Error("could not prepare block device %s on node %s"
2080
                     " (is_primary=True, pass=2)" % (inst_disk.iv_name, node))
2081
        disks_ok = False
2082
    device_info.append((instance.primary_node, inst_disk.iv_name, result))
2083

    
2084
  # leave the disks configured for the primary node
2085
  # this is a workaround that would be fixed better by
2086
  # improving the logical/physical id handling
2087
  for disk in instance.disks:
2088
    cfg.SetDiskID(disk, instance.primary_node)
2089

    
2090
  return disks_ok, device_info
2091

    
2092

    
2093
def _StartInstanceDisks(cfg, instance, force):
2094
  """Start the disks of an instance.
2095

2096
  """
2097
  disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
2098
                                           ignore_secondaries=force)
2099
  if not disks_ok:
2100
    _ShutdownInstanceDisks(instance, cfg)
2101
    if force is not None and not force:
2102
      logger.Error("If the message above refers to a secondary node,"
2103
                   " you can retry the operation using '--force'.")
2104
    raise errors.OpExecError("Disk consistency error")
2105

    
2106

    
2107
class LUDeactivateInstanceDisks(NoHooksLU):
2108
  """Shutdown an instance's disks.
2109

2110
  """
2111
  _OP_REQP = ["instance_name"]
2112

    
2113
  def CheckPrereq(self):
2114
    """Check prerequisites.
2115

2116
    This checks that the instance is in the cluster.
2117

2118
    """
2119
    instance = self.cfg.GetInstanceInfo(
2120
      self.cfg.ExpandInstanceName(self.op.instance_name))
2121
    if instance is None:
2122
      raise errors.OpPrereqError("Instance '%s' not known" %
2123
                                 self.op.instance_name)
2124
    self.instance = instance
2125

    
2126
  def Exec(self, feedback_fn):
2127
    """Deactivate the disks
2128

2129
    """
2130
    instance = self.instance
2131
    ins_l = rpc.call_instance_list([instance.primary_node])
2132
    ins_l = ins_l[instance.primary_node]
2133
    if not type(ins_l) is list:
2134
      raise errors.OpExecError("Can't contact node '%s'" %
2135
                               instance.primary_node)
2136

    
2137
    if self.instance.name in ins_l:
2138
      raise errors.OpExecError("Instance is running, can't shutdown"
2139
                               " block devices.")
2140

    
2141
    _ShutdownInstanceDisks(instance, self.cfg)
2142

    
2143

    
2144
def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
2145
  """Shutdown block devices of an instance.
2146

2147
  This does the shutdown on all nodes of the instance.
2148

2149
  If the ignore_primary is false, errors on the primary node are
2150
  ignored.
2151

2152
  """
2153
  result = True
2154
  for disk in instance.disks:
2155
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2156
      cfg.SetDiskID(top_disk, node)
2157
      if not rpc.call_blockdev_shutdown(node, top_disk):
2158
        logger.Error("could not shutdown block device %s on node %s" %
2159
                     (disk.iv_name, node))
2160
        if not ignore_primary or node != instance.primary_node:
2161
          result = False
2162
  return result
2163

    
2164

    
2165
def _CheckNodeFreeMemory(cfg, node, reason, requested):
2166
  """Checks if a node has enough free memory.
2167

2168
  This function check if a given node has the needed amount of free
2169
  memory. In case the node has less memory or we cannot get the
2170
  information from the node, this function raise an OpPrereqError
2171
  exception.
2172

2173
  Args:
2174
    - cfg: a ConfigWriter instance
2175
    - node: the node name
2176
    - reason: string to use in the error message
2177
    - requested: the amount of memory in MiB
2178

2179
  """
2180
  nodeinfo = rpc.call_node_info([node], cfg.GetVGName())
2181
  if not nodeinfo or not isinstance(nodeinfo, dict):
2182
    raise errors.OpPrereqError("Could not contact node %s for resource"
2183
                             " information" % (node,))
2184

    
2185
  free_mem = nodeinfo[node].get('memory_free')
2186
  if not isinstance(free_mem, int):
2187
    raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2188
                             " was '%s'" % (node, free_mem))
2189
  if requested > free_mem:
2190
    raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2191
                             " needed %s MiB, available %s MiB" %
2192
                             (node, reason, requested, free_mem))
2193

    
2194

    
2195
class LUStartupInstance(LogicalUnit):
2196
  """Starts an instance.
2197

2198
  """
2199
  HPATH = "instance-start"
2200
  HTYPE = constants.HTYPE_INSTANCE
2201
  _OP_REQP = ["instance_name", "force"]
2202

    
2203
  def BuildHooksEnv(self):
2204
    """Build hooks env.
2205

2206
    This runs on master, primary and secondary nodes of the instance.
2207

2208
    """
2209
    env = {
2210
      "FORCE": self.op.force,
2211
      }
2212
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2213
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2214
          list(self.instance.secondary_nodes))
2215
    return env, nl, nl
2216

    
2217
  def CheckPrereq(self):
2218
    """Check prerequisites.
2219

2220
    This checks that the instance is in the cluster.
2221

2222
    """
2223
    instance = self.cfg.GetInstanceInfo(
2224
      self.cfg.ExpandInstanceName(self.op.instance_name))
2225
    if instance is None:
2226
      raise errors.OpPrereqError("Instance '%s' not known" %
2227
                                 self.op.instance_name)
2228

    
2229
    # check bridges existance
2230
    _CheckInstanceBridgesExist(instance)
2231

    
2232
    _CheckNodeFreeMemory(self.cfg, instance.primary_node,
2233
                         "starting instance %s" % instance.name,
2234
                         instance.memory)
2235

    
2236
    self.instance = instance
2237
    self.op.instance_name = instance.name
2238

    
2239
  def Exec(self, feedback_fn):
2240
    """Start the instance.
2241

2242
    """
2243
    instance = self.instance
2244
    force = self.op.force
2245
    extra_args = getattr(self.op, "extra_args", "")
2246

    
2247
    self.cfg.MarkInstanceUp(instance.name)
2248

    
2249
    node_current = instance.primary_node
2250

    
2251
    _StartInstanceDisks(self.cfg, instance, force)
2252

    
2253
    if not rpc.call_instance_start(node_current, instance, extra_args):
2254
      _ShutdownInstanceDisks(instance, self.cfg)
2255
      raise errors.OpExecError("Could not start instance")
2256

    
2257

    
2258
class LURebootInstance(LogicalUnit):
2259
  """Reboot an instance.
2260

2261
  """
2262
  HPATH = "instance-reboot"
2263
  HTYPE = constants.HTYPE_INSTANCE
2264
  _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2265

    
2266
  def BuildHooksEnv(self):
2267
    """Build hooks env.
2268

2269
    This runs on master, primary and secondary nodes of the instance.
2270

2271
    """
2272
    env = {
2273
      "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2274
      }
2275
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2276
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2277
          list(self.instance.secondary_nodes))
2278
    return env, nl, nl
2279

    
2280
  def CheckPrereq(self):
2281
    """Check prerequisites.
2282

2283
    This checks that the instance is in the cluster.
2284

2285
    """
2286
    instance = self.cfg.GetInstanceInfo(
2287
      self.cfg.ExpandInstanceName(self.op.instance_name))
2288
    if instance is None:
2289
      raise errors.OpPrereqError("Instance '%s' not known" %
2290
                                 self.op.instance_name)
2291

    
2292
    # check bridges existance
2293
    _CheckInstanceBridgesExist(instance)
2294

    
2295
    self.instance = instance
2296
    self.op.instance_name = instance.name
2297

    
2298
  def Exec(self, feedback_fn):
2299
    """Reboot the instance.
2300

2301
    """
2302
    instance = self.instance
2303
    ignore_secondaries = self.op.ignore_secondaries
2304
    reboot_type = self.op.reboot_type
2305
    extra_args = getattr(self.op, "extra_args", "")
2306

    
2307
    node_current = instance.primary_node
2308

    
2309
    if reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2310
                           constants.INSTANCE_REBOOT_HARD,
2311
                           constants.INSTANCE_REBOOT_FULL]:
2312
      raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2313
                                  (constants.INSTANCE_REBOOT_SOFT,
2314
                                   constants.INSTANCE_REBOOT_HARD,
2315
                                   constants.INSTANCE_REBOOT_FULL))
2316

    
2317
    if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2318
                       constants.INSTANCE_REBOOT_HARD]:
2319
      if not rpc.call_instance_reboot(node_current, instance,
2320
                                      reboot_type, extra_args):
2321
        raise errors.OpExecError("Could not reboot instance")
2322
    else:
2323
      if not rpc.call_instance_shutdown(node_current, instance):
2324
        raise errors.OpExecError("could not shutdown instance for full reboot")
2325
      _ShutdownInstanceDisks(instance, self.cfg)
2326
      _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
2327
      if not rpc.call_instance_start(node_current, instance, extra_args):
2328
        _ShutdownInstanceDisks(instance, self.cfg)
2329
        raise errors.OpExecError("Could not start instance for full reboot")
2330

    
2331
    self.cfg.MarkInstanceUp(instance.name)
2332

    
2333

    
2334
class LUShutdownInstance(LogicalUnit):
2335
  """Shutdown an instance.
2336

2337
  """
2338
  HPATH = "instance-stop"
2339
  HTYPE = constants.HTYPE_INSTANCE
2340
  _OP_REQP = ["instance_name"]
2341

    
2342
  def BuildHooksEnv(self):
2343
    """Build hooks env.
2344

2345
    This runs on master, primary and secondary nodes of the instance.
2346

2347
    """
2348
    env = _BuildInstanceHookEnvByObject(self.instance)
2349
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2350
          list(self.instance.secondary_nodes))
2351
    return env, nl, nl
2352

    
2353
  def CheckPrereq(self):
2354
    """Check prerequisites.
2355

2356
    This checks that the instance is in the cluster.
2357

2358
    """
2359
    instance = self.cfg.GetInstanceInfo(
2360
      self.cfg.ExpandInstanceName(self.op.instance_name))
2361
    if instance is None:
2362
      raise errors.OpPrereqError("Instance '%s' not known" %
2363
                                 self.op.instance_name)
2364
    self.instance = instance
2365

    
2366
  def Exec(self, feedback_fn):
2367
    """Shutdown the instance.
2368

2369
    """
2370
    instance = self.instance
2371
    node_current = instance.primary_node
2372
    self.cfg.MarkInstanceDown(instance.name)
2373
    if not rpc.call_instance_shutdown(node_current, instance):
2374
      logger.Error("could not shutdown instance")
2375

    
2376
    _ShutdownInstanceDisks(instance, self.cfg)
2377

    
2378

    
2379
class LUReinstallInstance(LogicalUnit):
2380
  """Reinstall an instance.
2381

2382
  """
2383
  HPATH = "instance-reinstall"
2384
  HTYPE = constants.HTYPE_INSTANCE
2385
  _OP_REQP = ["instance_name"]
2386

    
2387
  def BuildHooksEnv(self):
2388
    """Build hooks env.
2389

2390
    This runs on master, primary and secondary nodes of the instance.
2391

2392
    """
2393
    env = _BuildInstanceHookEnvByObject(self.instance)
2394
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2395
          list(self.instance.secondary_nodes))
2396
    return env, nl, nl
2397

    
2398
  def CheckPrereq(self):
2399
    """Check prerequisites.
2400

2401
    This checks that the instance is in the cluster and is not running.
2402

2403
    """
2404
    instance = self.cfg.GetInstanceInfo(
2405
      self.cfg.ExpandInstanceName(self.op.instance_name))
2406
    if instance is None:
2407
      raise errors.OpPrereqError("Instance '%s' not known" %
2408
                                 self.op.instance_name)
2409
    if instance.disk_template == constants.DT_DISKLESS:
2410
      raise errors.OpPrereqError("Instance '%s' has no disks" %
2411
                                 self.op.instance_name)
2412
    if instance.status != "down":
2413
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2414
                                 self.op.instance_name)
2415
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2416
    if remote_info:
2417
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2418
                                 (self.op.instance_name,
2419
                                  instance.primary_node))
2420

    
2421
    self.op.os_type = getattr(self.op, "os_type", None)
2422
    if self.op.os_type is not None:
2423
      # OS verification
2424
      pnode = self.cfg.GetNodeInfo(
2425
        self.cfg.ExpandNodeName(instance.primary_node))
2426
      if pnode is None:
2427
        raise errors.OpPrereqError("Primary node '%s' is unknown" %
2428
                                   self.op.pnode)
2429
      os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2430
      if not os_obj:
2431
        raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2432
                                   " primary node"  % self.op.os_type)
2433

    
2434
    self.instance = instance
2435

    
2436
  def Exec(self, feedback_fn):
2437
    """Reinstall the instance.
2438

2439
    """
2440
    inst = self.instance
2441

    
2442
    if self.op.os_type is not None:
2443
      feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2444
      inst.os = self.op.os_type
2445
      self.cfg.AddInstance(inst)
2446

    
2447
    _StartInstanceDisks(self.cfg, inst, None)
2448
    try:
2449
      feedback_fn("Running the instance OS create scripts...")
2450
      if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2451
        raise errors.OpExecError("Could not install OS for instance %s"
2452
                                 " on node %s" %
2453
                                 (inst.name, inst.primary_node))
2454
    finally:
2455
      _ShutdownInstanceDisks(inst, self.cfg)
2456

    
2457

    
2458
class LURenameInstance(LogicalUnit):
2459
  """Rename an instance.
2460

2461
  """
2462
  HPATH = "instance-rename"
2463
  HTYPE = constants.HTYPE_INSTANCE
2464
  _OP_REQP = ["instance_name", "new_name"]
2465

    
2466
  def BuildHooksEnv(self):
2467
    """Build hooks env.
2468

2469
    This runs on master, primary and secondary nodes of the instance.
2470

2471
    """
2472
    env = _BuildInstanceHookEnvByObject(self.instance)
2473
    env["INSTANCE_NEW_NAME"] = self.op.new_name
2474
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2475
          list(self.instance.secondary_nodes))
2476
    return env, nl, nl
2477

    
2478
  def CheckPrereq(self):
2479
    """Check prerequisites.
2480

2481
    This checks that the instance is in the cluster and is not running.
2482

2483
    """
2484
    instance = self.cfg.GetInstanceInfo(
2485
      self.cfg.ExpandInstanceName(self.op.instance_name))
2486
    if instance is None:
2487
      raise errors.OpPrereqError("Instance '%s' not known" %
2488
                                 self.op.instance_name)
2489
    if instance.status != "down":
2490
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2491
                                 self.op.instance_name)
2492
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2493
    if remote_info:
2494
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2495
                                 (self.op.instance_name,
2496
                                  instance.primary_node))
2497
    self.instance = instance
2498

    
2499
    # new name verification
2500
    name_info = utils.HostInfo(self.op.new_name)
2501

    
2502
    self.op.new_name = new_name = name_info.name
2503
    instance_list = self.cfg.GetInstanceList()
2504
    if new_name in instance_list:
2505
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2506
                                 instance_name)
2507

    
2508
    if not getattr(self.op, "ignore_ip", False):
2509
      command = ["fping", "-q", name_info.ip]
2510
      result = utils.RunCmd(command)
2511
      if not result.failed:
2512
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
2513
                                   (name_info.ip, new_name))
2514

    
2515

    
2516
  def Exec(self, feedback_fn):
2517
    """Reinstall the instance.
2518

2519
    """
2520
    inst = self.instance
2521
    old_name = inst.name
2522

    
2523
    if inst.disk_template == constants.DT_FILE:
2524
      old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2525

    
2526
    self.cfg.RenameInstance(inst.name, self.op.new_name)
2527

    
2528
    # re-read the instance from the configuration after rename
2529
    inst = self.cfg.GetInstanceInfo(self.op.new_name)
2530

    
2531
    if inst.disk_template == constants.DT_FILE:
2532
      new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2533
      result = rpc.call_file_storage_dir_rename(inst.primary_node,
2534
                                                old_file_storage_dir,
2535
                                                new_file_storage_dir)
2536

    
2537
      if not result:
2538
        raise errors.OpExecError("Could not connect to node '%s' to rename"
2539
                                 " directory '%s' to '%s' (but the instance"
2540
                                 " has been renamed in Ganeti)" % (
2541
                                 inst.primary_node, old_file_storage_dir,
2542
                                 new_file_storage_dir))
2543

    
2544
      if not result[0]:
2545
        raise errors.OpExecError("Could not rename directory '%s' to '%s'"
2546
                                 " (but the instance has been renamed in"
2547
                                 " Ganeti)" % (old_file_storage_dir,
2548
                                               new_file_storage_dir))
2549

    
2550
    _StartInstanceDisks(self.cfg, inst, None)
2551
    try:
2552
      if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2553
                                          "sda", "sdb"):
2554
        msg = ("Could run OS rename script for instance %s on node %s (but the"
2555
               " instance has been renamed in Ganeti)" %
2556
               (inst.name, inst.primary_node))
2557
        logger.Error(msg)
2558
    finally:
2559
      _ShutdownInstanceDisks(inst, self.cfg)
2560

    
2561

    
2562
class LURemoveInstance(LogicalUnit):
2563
  """Remove an instance.
2564

2565
  """
2566
  HPATH = "instance-remove"
2567
  HTYPE = constants.HTYPE_INSTANCE
2568
  _OP_REQP = ["instance_name"]
2569

    
2570
  def BuildHooksEnv(self):
2571
    """Build hooks env.
2572

2573
    This runs on master, primary and secondary nodes of the instance.
2574

2575
    """
2576
    env = _BuildInstanceHookEnvByObject(self.instance)
2577
    nl = [self.sstore.GetMasterNode()]
2578
    return env, nl, nl
2579

    
2580
  def CheckPrereq(self):
2581
    """Check prerequisites.
2582

2583
    This checks that the instance is in the cluster.
2584

2585
    """
2586
    instance = self.cfg.GetInstanceInfo(
2587
      self.cfg.ExpandInstanceName(self.op.instance_name))
2588
    if instance is None:
2589
      raise errors.OpPrereqError("Instance '%s' not known" %
2590
                                 self.op.instance_name)
2591
    self.instance = instance
2592

    
2593
  def Exec(self, feedback_fn):
2594
    """Remove the instance.
2595

2596
    """
2597
    instance = self.instance
2598
    logger.Info("shutting down instance %s on node %s" %
2599
                (instance.name, instance.primary_node))
2600

    
2601
    if not rpc.call_instance_shutdown(instance.primary_node, instance):
2602
      if self.op.ignore_failures:
2603
        feedback_fn("Warning: can't shutdown instance")
2604
      else:
2605
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2606
                                 (instance.name, instance.primary_node))
2607

    
2608
    logger.Info("removing block devices for instance %s" % instance.name)
2609

    
2610
    if not _RemoveDisks(instance, self.cfg):
2611
      if self.op.ignore_failures:
2612
        feedback_fn("Warning: can't remove instance's disks")
2613
      else:
2614
        raise errors.OpExecError("Can't remove instance's disks")
2615

    
2616
    logger.Info("removing instance %s out of cluster config" % instance.name)
2617

    
2618
    self.cfg.RemoveInstance(instance.name)
2619

    
2620

    
2621
class LUQueryInstances(NoHooksLU):
2622
  """Logical unit for querying instances.
2623

2624
  """
2625
  _OP_REQP = ["output_fields", "names"]
2626

    
2627
  def CheckPrereq(self):
2628
    """Check prerequisites.
2629

2630
    This checks that the fields required are valid output fields.
2631

2632
    """
2633
    self.dynamic_fields = frozenset(["oper_state", "oper_ram", "status"])
2634
    _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2635
                               "admin_state", "admin_ram",
2636
                               "disk_template", "ip", "mac", "bridge",
2637
                               "sda_size", "sdb_size", "vcpus"],
2638
                       dynamic=self.dynamic_fields,
2639
                       selected=self.op.output_fields)
2640

    
2641
    self.wanted = _GetWantedInstances(self, self.op.names)
2642

    
2643
  def Exec(self, feedback_fn):
2644
    """Computes the list of nodes and their attributes.
2645

2646
    """
2647
    instance_names = self.wanted
2648
    instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2649
                     in instance_names]
2650

    
2651
    # begin data gathering
2652

    
2653
    nodes = frozenset([inst.primary_node for inst in instance_list])
2654

    
2655
    bad_nodes = []
2656
    if self.dynamic_fields.intersection(self.op.output_fields):
2657
      live_data = {}
2658
      node_data = rpc.call_all_instances_info(nodes)
2659
      for name in nodes:
2660
        result = node_data[name]
2661
        if result:
2662
          live_data.update(result)
2663
        elif result == False:
2664
          bad_nodes.append(name)
2665
        # else no instance is alive
2666
    else:
2667
      live_data = dict([(name, {}) for name in instance_names])
2668

    
2669
    # end data gathering
2670

    
2671
    output = []
2672
    for instance in instance_list:
2673
      iout = []
2674
      for field in self.op.output_fields:
2675
        if field == "name":
2676
          val = instance.name
2677
        elif field == "os":
2678
          val = instance.os
2679
        elif field == "pnode":
2680
          val = instance.primary_node
2681
        elif field == "snodes":
2682
          val = list(instance.secondary_nodes)
2683
        elif field == "admin_state":
2684
          val = (instance.status != "down")
2685
        elif field == "oper_state":
2686
          if instance.primary_node in bad_nodes:
2687
            val = None
2688
          else:
2689
            val = bool(live_data.get(instance.name))
2690
        elif field == "status":
2691
          if instance.primary_node in bad_nodes:
2692
            val = "ERROR_nodedown"
2693
          else:
2694
            running = bool(live_data.get(instance.name))
2695
            if running:
2696
              if instance.status != "down":
2697
                val = "running"
2698
              else:
2699
                val = "ERROR_up"
2700
            else:
2701
              if instance.status != "down":
2702
                val = "ERROR_down"
2703
              else:
2704
                val = "ADMIN_down"
2705
        elif field == "admin_ram":
2706
          val = instance.memory
2707
        elif field == "oper_ram":
2708
          if instance.primary_node in bad_nodes:
2709
            val = None
2710
          elif instance.name in live_data:
2711
            val = live_data[instance.name].get("memory", "?")
2712
          else:
2713
            val = "-"
2714
        elif field == "disk_template":
2715
          val = instance.disk_template
2716
        elif field == "ip":
2717
          val = instance.nics[0].ip
2718
        elif field == "bridge":
2719
          val = instance.nics[0].bridge
2720
        elif field == "mac":
2721
          val = instance.nics[0].mac
2722
        elif field == "sda_size" or field == "sdb_size":
2723
          disk = instance.FindDisk(field[:3])
2724
          if disk is None:
2725
            val = None
2726
          else:
2727
            val = disk.size
2728
        elif field == "vcpus":
2729
          val = instance.vcpus
2730
        else:
2731
          raise errors.ParameterError(field)
2732
        iout.append(val)
2733
      output.append(iout)
2734

    
2735
    return output
2736

    
2737

    
2738
class LUFailoverInstance(LogicalUnit):
2739
  """Failover an instance.
2740

2741
  """
2742
  HPATH = "instance-failover"
2743
  HTYPE = constants.HTYPE_INSTANCE
2744
  _OP_REQP = ["instance_name", "ignore_consistency"]
2745

    
2746
  def BuildHooksEnv(self):
2747
    """Build hooks env.
2748

2749
    This runs on master, primary and secondary nodes of the instance.
2750

2751
    """
2752
    env = {
2753
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2754
      }
2755
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2756
    nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2757
    return env, nl, nl
2758

    
2759
  def CheckPrereq(self):
2760
    """Check prerequisites.
2761

2762
    This checks that the instance is in the cluster.
2763

2764
    """
2765
    instance = self.cfg.GetInstanceInfo(
2766
      self.cfg.ExpandInstanceName(self.op.instance_name))
2767
    if instance is None:
2768
      raise errors.OpPrereqError("Instance '%s' not known" %
2769
                                 self.op.instance_name)
2770

    
2771
    if instance.disk_template not in constants.DTS_NET_MIRROR:
2772
      raise errors.OpPrereqError("Instance's disk layout is not"
2773
                                 " network mirrored, cannot failover.")
2774

    
2775
    secondary_nodes = instance.secondary_nodes
2776
    if not secondary_nodes:
2777
      raise errors.ProgrammerError("no secondary node but using "
2778
                                   "DT_REMOTE_RAID1 template")
2779

    
2780
    target_node = secondary_nodes[0]
2781
    # check memory requirements on the secondary node
2782
    _CheckNodeFreeMemory(self.cfg, target_node, "failing over instance %s" %
2783
                         instance.name, instance.memory)
2784

    
2785
    # check bridge existance
2786
    brlist = [nic.bridge for nic in instance.nics]
2787
    if not rpc.call_bridges_exist(target_node, brlist):
2788
      raise errors.OpPrereqError("One or more target bridges %s does not"
2789
                                 " exist on destination node '%s'" %
2790
                                 (brlist, target_node))
2791

    
2792
    self.instance = instance
2793

    
2794
  def Exec(self, feedback_fn):
2795
    """Failover an instance.
2796

2797
    The failover is done by shutting it down on its present node and
2798
    starting it on the secondary.
2799

2800
    """
2801
    instance = self.instance
2802

    
2803
    source_node = instance.primary_node
2804
    target_node = instance.secondary_nodes[0]
2805

    
2806
    feedback_fn("* checking disk consistency between source and target")
2807
    for dev in instance.disks:
2808
      # for remote_raid1, these are md over drbd
2809
      if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2810
        if instance.status == "up" and not self.op.ignore_consistency:
2811
          raise errors.OpExecError("Disk %s is degraded on target node,"
2812
                                   " aborting failover." % dev.iv_name)
2813

    
2814
    feedback_fn("* shutting down instance on source node")
2815
    logger.Info("Shutting down instance %s on node %s" %
2816
                (instance.name, source_node))
2817

    
2818
    if not rpc.call_instance_shutdown(source_node, instance):
2819
      if self.op.ignore_consistency:
2820
        logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2821
                     " anyway. Please make sure node %s is down"  %
2822
                     (instance.name, source_node, source_node))
2823
      else:
2824
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2825
                                 (instance.name, source_node))
2826

    
2827
    feedback_fn("* deactivating the instance's disks on source node")
2828
    if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2829
      raise errors.OpExecError("Can't shut down the instance's disks.")
2830

    
2831
    instance.primary_node = target_node
2832
    # distribute new instance config to the other nodes
2833
    self.cfg.AddInstance(instance)
2834

    
2835
    # Only start the instance if it's marked as up
2836
    if instance.status == "up":
2837
      feedback_fn("* activating the instance's disks on target node")
2838
      logger.Info("Starting instance %s on node %s" %
2839
                  (instance.name, target_node))
2840

    
2841
      disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2842
                                               ignore_secondaries=True)
2843
      if not disks_ok:
2844
        _ShutdownInstanceDisks(instance, self.cfg)
2845
        raise errors.OpExecError("Can't activate the instance's disks")
2846

    
2847
      feedback_fn("* starting the instance on the target node")
2848
      if not rpc.call_instance_start(target_node, instance, None):
2849
        _ShutdownInstanceDisks(instance, self.cfg)
2850
        raise errors.OpExecError("Could not start instance %s on node %s." %
2851
                                 (instance.name, target_node))
2852

    
2853

    
2854
def _CreateBlockDevOnPrimary(cfg, node, instance, device, info):
2855
  """Create a tree of block devices on the primary node.
2856

2857
  This always creates all devices.
2858

2859
  """
2860
  if device.children:
2861
    for child in device.children:
2862
      if not _CreateBlockDevOnPrimary(cfg, node, instance, child, info):
2863
        return False
2864

    
2865
  cfg.SetDiskID(device, node)
2866
  new_id = rpc.call_blockdev_create(node, device, device.size,
2867
                                    instance.name, True, info)
2868
  if not new_id:
2869
    return False
2870
  if device.physical_id is None:
2871
    device.physical_id = new_id
2872
  return True
2873

    
2874

    
2875
def _CreateBlockDevOnSecondary(cfg, node, instance, device, force, info):
2876
  """Create a tree of block devices on a secondary node.
2877

2878
  If this device type has to be created on secondaries, create it and
2879
  all its children.
2880

2881
  If not, just recurse to children keeping the same 'force' value.
2882

2883
  """
2884
  if device.CreateOnSecondary():
2885
    force = True
2886
  if device.children:
2887
    for child in device.children:
2888
      if not _CreateBlockDevOnSecondary(cfg, node, instance,
2889
                                        child, force, info):
2890
        return False
2891

    
2892
  if not force:
2893
    return True
2894
  cfg.SetDiskID(device, node)
2895
  new_id = rpc.call_blockdev_create(node, device, device.size,
2896
                                    instance.name, False, info)
2897
  if not new_id:
2898
    return False
2899
  if device.physical_id is None:
2900
    device.physical_id = new_id
2901
  return True
2902

    
2903

    
2904
def _GenerateUniqueNames(cfg, exts):
2905
  """Generate a suitable LV name.
2906

2907
  This will generate a logical volume name for the given instance.
2908

2909
  """
2910
  results = []
2911
  for val in exts:
2912
    new_id = cfg.GenerateUniqueID()
2913
    results.append("%s%s" % (new_id, val))
2914
  return results
2915

    
2916

    
2917
def _GenerateMDDRBDBranch(cfg, primary, secondary, size, names):
2918
  """Generate a drbd device complete with its children.
2919

2920
  """
2921
  port = cfg.AllocatePort()
2922
  vgname = cfg.GetVGName()
2923
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2924
                          logical_id=(vgname, names[0]))
2925
  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2926
                          logical_id=(vgname, names[1]))
2927
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD7, size=size,
2928
                          logical_id = (primary, secondary, port),
2929
                          children = [dev_data, dev_meta])
2930
  return drbd_dev
2931

    
2932

    
2933
def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name):
2934
  """Generate a drbd8 device complete with its children.
2935

2936
  """
2937
  port = cfg.AllocatePort()
2938
  vgname = cfg.GetVGName()
2939
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2940
                          logical_id=(vgname, names[0]))
2941
  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2942
                          logical_id=(vgname, names[1]))
2943
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
2944
                          logical_id = (primary, secondary, port),
2945
                          children = [dev_data, dev_meta],
2946
                          iv_name=iv_name)
2947
  return drbd_dev
2948

    
2949

    
2950
def _GenerateDiskTemplate(cfg, template_name,
2951
                          instance_name, primary_node,
2952
                          secondary_nodes, disk_sz, swap_sz,
2953
                          file_storage_dir, file_driver):
2954
  """Generate the entire disk layout for a given template type.
2955

2956
  """
2957
  #TODO: compute space requirements
2958

    
2959
  vgname = cfg.GetVGName()
2960
  if template_name == constants.DT_DISKLESS:
2961
    disks = []
2962
  elif template_name == constants.DT_PLAIN:
2963
    if len(secondary_nodes) != 0:
2964
      raise errors.ProgrammerError("Wrong template configuration")
2965

    
2966
    names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
2967
    sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2968
                           logical_id=(vgname, names[0]),
2969
                           iv_name = "sda")
2970
    sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2971
                           logical_id=(vgname, names[1]),
2972
                           iv_name = "sdb")
2973
    disks = [sda_dev, sdb_dev]
2974
  elif template_name == constants.DT_DRBD8:
2975
    if len(secondary_nodes) != 1:
2976
      raise errors.ProgrammerError("Wrong template configuration")
2977
    remote_node = secondary_nodes[0]
2978
    names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2979
                                       ".sdb_data", ".sdb_meta"])
2980
    drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2981
                                         disk_sz, names[0:2], "sda")
2982
    drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2983
                                         swap_sz, names[2:4], "sdb")
2984
    disks = [drbd_sda_dev, drbd_sdb_dev]
2985
  elif template_name == constants.DT_FILE:
2986
    if len(secondary_nodes) != 0:
2987
      raise errors.ProgrammerError("Wrong template configuration")
2988

    
2989
    file_sda_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk_sz,
2990
                                iv_name="sda", logical_id=(file_driver,
2991
                                "%s/sda" % file_storage_dir))
2992
    file_sdb_dev = objects.Disk(dev_type=constants.LD_FILE, size=swap_sz,
2993
                                iv_name="sdb", logical_id=(file_driver,
2994
                                "%s/sdb" % file_storage_dir))
2995
    disks = [file_sda_dev, file_sdb_dev]
2996
  else:
2997
    raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
2998
  return disks
2999

    
3000

    
3001
def _GetInstanceInfoText(instance):
3002
  """Compute that text that should be added to the disk's metadata.
3003

3004
  """
3005
  return "originstname+%s" % instance.name
3006

    
3007

    
3008
def _CreateDisks(cfg, instance):
3009
  """Create all disks for an instance.
3010

3011
  This abstracts away some work from AddInstance.
3012

3013
  Args:
3014
    instance: the instance object
3015

3016
  Returns:
3017
    True or False showing the success of the creation process
3018

3019
  """
3020
  info = _GetInstanceInfoText(instance)
3021

    
3022
  if instance.disk_template == constants.DT_FILE:
3023
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3024
    result = rpc.call_file_storage_dir_create(instance.primary_node,
3025
                                              file_storage_dir)
3026

    
3027
    if not result:
3028
      logger.Error("Could not connect to node '%s'" % inst.primary_node)
3029
      return False
3030

    
3031
    if not result[0]:
3032
      logger.Error("failed to create directory '%s'" % file_storage_dir)
3033
      return False
3034

    
3035
  for device in instance.disks:
3036
    logger.Info("creating volume %s for instance %s" %
3037
                (device.iv_name, instance.name))
3038
    #HARDCODE
3039
    for secondary_node in instance.secondary_nodes:
3040
      if not _CreateBlockDevOnSecondary(cfg, secondary_node, instance,
3041
                                        device, False, info):
3042
        logger.Error("failed to create volume %s (%s) on secondary node %s!" %
3043
                     (device.iv_name, device, secondary_node))
3044
        return False
3045
    #HARDCODE
3046
    if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
3047
                                    instance, device, info):
3048
      logger.Error("failed to create volume %s on primary!" %
3049
                   device.iv_name)
3050
      return False
3051

    
3052
  return True
3053

    
3054

    
3055
def _RemoveDisks(instance, cfg):
3056
  """Remove all disks for an instance.
3057

3058
  This abstracts away some work from `AddInstance()` and
3059
  `RemoveInstance()`. Note that in case some of the devices couldn't
3060
  be removed, the removal will continue with the other ones (compare
3061
  with `_CreateDisks()`).
3062

3063
  Args:
3064
    instance: the instance object
3065

3066
  Returns:
3067
    True or False showing the success of the removal proces
3068

3069
  """
3070
  logger.Info("removing block devices for instance %s" % instance.name)
3071

    
3072
  result = True
3073
  for device in instance.disks:
3074
    for node, disk in device.ComputeNodeTree(instance.primary_node):
3075
      cfg.SetDiskID(disk, node)
3076
      if not rpc.call_blockdev_remove(node, disk):
3077
        logger.Error("could not remove block device %s on node %s,"
3078
                     " continuing anyway" %
3079
                     (device.iv_name, node))
3080
        result = False
3081

    
3082
  if instance.disk_template == constants.DT_FILE:
3083
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3084
    if not rpc.call_file_storage_dir_remove(instance.primary_node,
3085
                                            file_storage_dir):
3086
      logger.Error("could not remove directory '%s'" % file_storage_dir)
3087
      result = False
3088

    
3089
  return result
3090

    
3091

    
3092
def _ComputeDiskSize(disk_template, disk_size, swap_size):
3093
  """Compute disk size requirements in the volume group
3094

3095
  This is currently hard-coded for the two-drive layout.
3096

3097
  """
3098
  # Required free disk space as a function of disk and swap space
3099
  req_size_dict = {
3100
    constants.DT_DISKLESS: None,
3101
    constants.DT_PLAIN: disk_size + swap_size,
3102
    # 256 MB are added for drbd metadata, 128MB for each drbd device
3103
    constants.DT_DRBD8: disk_size + swap_size + 256,
3104
    constants.DT_FILE: None,
3105
  }
3106

    
3107
  if disk_template not in req_size_dict:
3108
    raise errors.ProgrammerError("Disk template '%s' size requirement"
3109
                                 " is unknown" %  disk_template)
3110

    
3111
  return req_size_dict[disk_template]
3112

    
3113

    
3114
class LUCreateInstance(LogicalUnit):
3115
  """Create an instance.
3116

3117
  """
3118
  HPATH = "instance-add"
3119
  HTYPE = constants.HTYPE_INSTANCE
3120
  _OP_REQP = ["instance_name", "mem_size", "disk_size", "pnode",
3121
              "disk_template", "swap_size", "mode", "start", "vcpus",
3122
              "wait_for_sync", "ip_check", "mac"]
3123

    
3124
  def BuildHooksEnv(self):
3125
    """Build hooks env.
3126

3127
    This runs on master, primary and secondary nodes of the instance.
3128

3129
    """
3130
    env = {
3131
      "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
3132
      "INSTANCE_DISK_SIZE": self.op.disk_size,
3133
      "INSTANCE_SWAP_SIZE": self.op.swap_size,
3134
      "INSTANCE_ADD_MODE": self.op.mode,
3135
      }
3136
    if self.op.mode == constants.INSTANCE_IMPORT:
3137
      env["INSTANCE_SRC_NODE"] = self.op.src_node
3138
      env["INSTANCE_SRC_PATH"] = self.op.src_path
3139
      env["INSTANCE_SRC_IMAGE"] = self.src_image
3140

    
3141
    env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
3142
      primary_node=self.op.pnode,
3143
      secondary_nodes=self.secondaries,
3144
      status=self.instance_status,
3145
      os_type=self.op.os_type,
3146
      memory=self.op.mem_size,
3147
      vcpus=self.op.vcpus,
3148
      nics=[(self.inst_ip, self.op.bridge, self.op.mac)],
3149
    ))
3150

    
3151
    nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
3152
          self.secondaries)
3153
    return env, nl, nl
3154

    
3155

    
3156
  def CheckPrereq(self):
3157
    """Check prerequisites.
3158

3159
    """
3160
    for attr in ["kernel_path", "initrd_path", "hvm_boot_order"]:
3161
      if not hasattr(self.op, attr):
3162
        setattr(self.op, attr, None)
3163

    
3164
    if self.op.mode not in (constants.INSTANCE_CREATE,
3165
                            constants.INSTANCE_IMPORT):
3166
      raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
3167
                                 self.op.mode)
3168

    
3169
    if (not self.cfg.GetVGName() and
3170
        self.op.disk_template not in constants.DTS_NOT_LVM):
3171
      raise errors.OpPrereqError("Cluster does not support lvm-based"
3172
                                 " instances")
3173

    
3174
    if self.op.mode == constants.INSTANCE_IMPORT:
3175
      src_node = getattr(self.op, "src_node", None)
3176
      src_path = getattr(self.op, "src_path", None)
3177
      if src_node is None or src_path is None:
3178
        raise errors.OpPrereqError("Importing an instance requires source"
3179
                                   " node and path options")
3180
      src_node_full = self.cfg.ExpandNodeName(src_node)
3181
      if src_node_full is None:
3182
        raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
3183
      self.op.src_node = src_node = src_node_full
3184

    
3185
      if not os.path.isabs(src_path):
3186
        raise errors.OpPrereqError("The source path must be absolute")
3187

    
3188
      export_info = rpc.call_export_info(src_node, src_path)
3189

    
3190
      if not export_info:
3191
        raise errors.OpPrereqError("No export found in dir %s" % src_path)
3192

    
3193
      if not export_info.has_section(constants.INISECT_EXP):
3194
        raise errors.ProgrammerError("Corrupted export config")
3195

    
3196
      ei_version = export_info.get(constants.INISECT_EXP, 'version')
3197
      if (int(ei_version) != constants.EXPORT_VERSION):
3198
        raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3199
                                   (ei_version, constants.EXPORT_VERSION))
3200

    
3201
      if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
3202
        raise errors.OpPrereqError("Can't import instance with more than"
3203
                                   " one data disk")
3204

    
3205
      # FIXME: are the old os-es, disk sizes, etc. useful?
3206
      self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3207
      diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
3208
                                                         'disk0_dump'))
3209
      self.src_image = diskimage
3210
    else: # INSTANCE_CREATE
3211
      if getattr(self.op, "os_type", None) is None:
3212
        raise errors.OpPrereqError("No guest OS specified")
3213

    
3214
    # check primary node
3215
    pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
3216
    if pnode is None:
3217
      raise errors.OpPrereqError("Primary node '%s' is unknown" %
3218
                                 self.op.pnode)
3219
    self.op.pnode = pnode.name
3220
    self.pnode = pnode
3221
    self.secondaries = []
3222
    # disk template and mirror node verification
3223
    if self.op.disk_template not in constants.DISK_TEMPLATES:
3224
      raise errors.OpPrereqError("Invalid disk template name")
3225

    
3226
    if (self.op.file_driver and
3227
        not self.op.file_driver in constants.FILE_DRIVER):
3228
      raise errors.OpPrereqError("Invalid file driver name '%s'" %
3229
                                 self.op.file_driver)
3230

    
3231
    if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
3232
        raise errors.OpPrereqError("File storage directory not a relative"
3233
                                   " path")
3234

    
3235
    if self.op.disk_template in constants.DTS_NET_MIRROR:
3236
      if getattr(self.op, "snode", None) is None:
3237
        raise errors.OpPrereqError("The networked disk templates need"
3238
                                   " a mirror node")
3239

    
3240
      snode_name = self.cfg.ExpandNodeName(self.op.snode)
3241
      if snode_name is None:
3242
        raise errors.OpPrereqError("Unknown secondary node '%s'" %
3243
                                   self.op.snode)
3244
      elif snode_name == pnode.name:
3245
        raise errors.OpPrereqError("The secondary node cannot be"
3246
                                   " the primary node.")
3247
      self.secondaries.append(snode_name)
3248

    
3249
    req_size = _ComputeDiskSize(self.op.disk_template,
3250
                                self.op.disk_size, self.op.swap_size)
3251

    
3252
    # Check lv size requirements
3253
    if req_size is not None:
3254
      nodenames = [pnode.name] + self.secondaries
3255
      nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
3256
      for node in nodenames:
3257
        info = nodeinfo.get(node, None)
3258
        if not info:
3259
          raise errors.OpPrereqError("Cannot get current information"
3260
                                     " from node '%s'" % nodeinfo)
3261
        vg_free = info.get('vg_free', None)
3262
        if not isinstance(vg_free, int):
3263
          raise errors.OpPrereqError("Can't compute free disk space on"
3264
                                     " node %s" % node)
3265
        if req_size > info['vg_free']:
3266
          raise errors.OpPrereqError("Not enough disk space on target node %s."
3267
                                     " %d MB available, %d MB required" %
3268
                                     (node, info['vg_free'], req_size))
3269

    
3270
    # os verification
3271
    os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
3272
    if not os_obj:
3273
      raise errors.OpPrereqError("OS '%s' not in supported os list for"
3274
                                 " primary node"  % self.op.os_type)
3275

    
3276
    if self.op.kernel_path == constants.VALUE_NONE:
3277
      raise errors.OpPrereqError("Can't set instance kernel to none")
3278

    
3279
    # instance verification
3280
    hostname1 = utils.HostInfo(self.op.instance_name)
3281

    
3282
    self.op.instance_name = instance_name = hostname1.name
3283
    instance_list = self.cfg.GetInstanceList()
3284
    if instance_name in instance_list:
3285
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3286
                                 instance_name)
3287

    
3288
    ip = getattr(self.op, "ip", None)
3289
    if ip is None or ip.lower() == "none":
3290
      inst_ip = None
3291
    elif ip.lower() == "auto":
3292
      inst_ip = hostname1.ip
3293
    else:
3294
      if not utils.IsValidIP(ip):
3295
        raise errors.OpPrereqError("given IP address '%s' doesn't look"
3296
                                   " like a valid IP" % ip)
3297
      inst_ip = ip
3298
    self.inst_ip = inst_ip
3299

    
3300
    if self.op.start and not self.op.ip_check:
3301
      raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3302
                                 " adding an instance in start mode")
3303

    
3304
    if self.op.ip_check:
3305
      if utils.TcpPing(hostname1.ip, constants.DEFAULT_NODED_PORT):
3306
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
3307
                                   (hostname1.ip, instance_name))
3308

    
3309
    # MAC address verification
3310
    if self.op.mac != "auto":
3311
      if not utils.IsValidMac(self.op.mac.lower()):
3312
        raise errors.OpPrereqError("invalid MAC address specified: %s" %
3313
                                   self.op.mac)
3314

    
3315
    # bridge verification
3316
    bridge = getattr(self.op, "bridge", None)
3317
    if bridge is None:
3318
      self.op.bridge = self.cfg.GetDefBridge()
3319
    else:
3320
      self.op.bridge = bridge
3321

    
3322
    if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3323
      raise errors.OpPrereqError("target bridge '%s' does not exist on"
3324
                                 " destination node '%s'" %
3325
                                 (self.op.bridge, pnode.name))
3326

    
3327
    # boot order verification
3328
    if self.op.hvm_boot_order is not None:
3329
      if len(self.op.hvm_boot_order.strip("acdn")) != 0:
3330
        raise errors.OpPrereqError("invalid boot order specified,"
3331
                                   " must be one or more of [acdn]")
3332

    
3333
    if self.op.start:
3334
      self.instance_status = 'up'
3335
    else:
3336
      self.instance_status = 'down'
3337

    
3338
  def Exec(self, feedback_fn):
3339
    """Create and add the instance to the cluster.
3340

3341
    """
3342
    instance = self.op.instance_name
3343
    pnode_name = self.pnode.name
3344

    
3345
    if self.op.mac == "auto":
3346
      mac_address = self.cfg.GenerateMAC()
3347
    else:
3348
      mac_address = self.op.mac
3349

    
3350
    nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3351
    if self.inst_ip is not None:
3352
      nic.ip = self.inst_ip
3353

    
3354
    ht_kind = self.sstore.GetHypervisorType()
3355
    if ht_kind in constants.HTS_REQ_PORT:
3356
      network_port = self.cfg.AllocatePort()
3357
    else:
3358
      network_port = None
3359

    
3360
    # this is needed because os.path.join does not accept None arguments
3361
    if self.op.file_storage_dir is None:
3362
      string_file_storage_dir = ""
3363
    else:
3364
      string_file_storage_dir = self.op.file_storage_dir
3365

    
3366
    # build the full file storage dir path
3367
    file_storage_dir = os.path.normpath(os.path.join(
3368
                                        self.sstore.GetFileStorageDir(),
3369
                                        string_file_storage_dir, instance))
3370

    
3371

    
3372
    disks = _GenerateDiskTemplate(self.cfg,
3373
                                  self.op.disk_template,
3374
                                  instance, pnode_name,
3375
                                  self.secondaries, self.op.disk_size,
3376
                                  self.op.swap_size,
3377
                                  file_storage_dir,
3378
                                  self.op.file_driver)
3379

    
3380
    iobj = objects.Instance(name=instance, os=self.op.os_type,
3381
                            primary_node=pnode_name,
3382
                            memory=self.op.mem_size,
3383
                            vcpus=self.op.vcpus,
3384
                            nics=[nic], disks=disks,
3385
                            disk_template=self.op.disk_template,
3386
                            status=self.instance_status,
3387
                            network_port=network_port,
3388
                            kernel_path=self.op.kernel_path,
3389
                            initrd_path=self.op.initrd_path,
3390
                            hvm_boot_order=self.op.hvm_boot_order,
3391
                            )
3392

    
3393
    feedback_fn("* creating instance disks...")
3394
    if not _CreateDisks(self.cfg, iobj):
3395
      _RemoveDisks(iobj, self.cfg)
3396
      raise errors.OpExecError("Device creation failed, reverting...")
3397

    
3398
    feedback_fn("adding instance %s to cluster config" % instance)
3399

    
3400
    self.cfg.AddInstance(iobj)
3401

    
3402
    if self.op.wait_for_sync:
3403
      disk_abort = not _WaitForSync(self.cfg, iobj, self.proc)
3404
    elif iobj.disk_template in constants.DTS_NET_MIRROR:
3405
      # make sure the disks are not degraded (still sync-ing is ok)
3406
      time.sleep(15)
3407
      feedback_fn("* checking mirrors status")
3408
      disk_abort = not _WaitForSync(self.cfg, iobj, self.proc, oneshot=True)
3409
    else:
3410
      disk_abort = False
3411

    
3412
    if disk_abort:
3413
      _RemoveDisks(iobj, self.cfg)
3414
      self.cfg.RemoveInstance(iobj.name)
3415
      raise errors.OpExecError("There are some degraded disks for"
3416
                               " this instance")
3417

    
3418
    feedback_fn("creating os for instance %s on node %s" %
3419
                (instance, pnode_name))
3420

    
3421
    if iobj.disk_template != constants.DT_DISKLESS:
3422
      if self.op.mode == constants.INSTANCE_CREATE:
3423
        feedback_fn("* running the instance OS create scripts...")
3424
        if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3425
          raise errors.OpExecError("could not add os for instance %s"
3426
                                   " on node %s" %
3427
                                   (instance, pnode_name))
3428

    
3429
      elif self.op.mode == constants.INSTANCE_IMPORT:
3430
        feedback_fn("* running the instance OS import scripts...")
3431
        src_node = self.op.src_node
3432
        src_image = self.src_image
3433
        if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3434
                                                src_node, src_image):
3435
          raise errors.OpExecError("Could not import os for instance"
3436
                                   " %s on node %s" %
3437
                                   (instance, pnode_name))
3438
      else:
3439
        # also checked in the prereq part
3440
        raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3441
                                     % self.op.mode)
3442

    
3443
    if self.op.start:
3444
      logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3445
      feedback_fn("* starting instance...")
3446
      if not rpc.call_instance_start(pnode_name, iobj, None):
3447
        raise errors.OpExecError("Could not start instance")
3448

    
3449

    
3450
class LUConnectConsole(NoHooksLU):
3451
  """Connect to an instance's console.
3452

3453
  This is somewhat special in that it returns the command line that
3454
  you need to run on the master node in order to connect to the
3455
  console.
3456

3457
  """
3458
  _OP_REQP = ["instance_name"]
3459

    
3460
  def CheckPrereq(self):
3461
    """Check prerequisites.
3462

3463
    This checks that the instance is in the cluster.
3464

3465
    """
3466
    instance = self.cfg.GetInstanceInfo(
3467
      self.cfg.ExpandInstanceName(self.op.instance_name))
3468
    if instance is None:
3469
      raise errors.OpPrereqError("Instance '%s' not known" %
3470
                                 self.op.instance_name)
3471
    self.instance = instance
3472

    
3473
  def Exec(self, feedback_fn):
3474
    """Connect to the console of an instance
3475

3476
    """
3477
    instance = self.instance
3478
    node = instance.primary_node
3479

    
3480
    node_insts = rpc.call_instance_list([node])[node]
3481
    if node_insts is False:
3482
      raise errors.OpExecError("Can't connect to node %s." % node)
3483

    
3484
    if instance.name not in node_insts:
3485
      raise errors.OpExecError("Instance %s is not running." % instance.name)
3486

    
3487
    logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3488

    
3489
    hyper = hypervisor.GetHypervisor()
3490
    console_cmd = hyper.GetShellCommandForConsole(instance)
3491

    
3492
    # build ssh cmdline
3493
    return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
3494

    
3495

    
3496
class LUReplaceDisks(LogicalUnit):
3497
  """Replace the disks of an instance.
3498

3499
  """
3500
  HPATH = "mirrors-replace"
3501
  HTYPE = constants.HTYPE_INSTANCE
3502
  _OP_REQP = ["instance_name", "mode", "disks"]
3503

    
3504
  def BuildHooksEnv(self):
3505
    """Build hooks env.
3506

3507
    This runs on the master, the primary and all the secondaries.
3508

3509
    """
3510
    env = {
3511
      "MODE": self.op.mode,
3512
      "NEW_SECONDARY": self.op.remote_node,
3513
      "OLD_SECONDARY": self.instance.secondary_nodes[0],
3514
      }
3515
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3516
    nl = [
3517
      self.sstore.GetMasterNode(),
3518
      self.instance.primary_node,
3519
      ]
3520
    if self.op.remote_node is not None:
3521
      nl.append(self.op.remote_node)
3522
    return env, nl, nl
3523

    
3524
  def CheckPrereq(self):
3525
    """Check prerequisites.
3526

3527
    This checks that the instance is in the cluster.
3528

3529
    """
3530
    instance = self.cfg.GetInstanceInfo(
3531
      self.cfg.ExpandInstanceName(self.op.instance_name))
3532
    if instance is None:
3533
      raise errors.OpPrereqError("Instance '%s' not known" %
3534
                                 self.op.instance_name)
3535
    self.instance = instance
3536
    self.op.instance_name = instance.name
3537

    
3538
    if instance.disk_template not in constants.DTS_NET_MIRROR:
3539
      raise errors.OpPrereqError("Instance's disk layout is not"
3540
                                 " network mirrored.")
3541

    
3542
    if len(instance.secondary_nodes) != 1:
3543
      raise errors.OpPrereqError("The instance has a strange layout,"
3544
                                 " expected one secondary but found %d" %
3545
                                 len(instance.secondary_nodes))
3546

    
3547
    self.sec_node = instance.secondary_nodes[0]
3548

    
3549
    remote_node = getattr(self.op, "remote_node", None)
3550
    if remote_node is not None:
3551
      remote_node = self.cfg.ExpandNodeName(remote_node)
3552
      if remote_node is None:
3553
        raise errors.OpPrereqError("Node '%s' not known" %
3554
                                   self.op.remote_node)
3555
      self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3556
    else:
3557
      self.remote_node_info = None
3558
    if remote_node == instance.primary_node:
3559
      raise errors.OpPrereqError("The specified node is the primary node of"
3560
                                 " the instance.")
3561
    elif remote_node == self.sec_node:
3562
      if self.op.mode == constants.REPLACE_DISK_SEC:
3563
        # this is for DRBD8, where we can't execute the same mode of
3564
        # replacement as for drbd7 (no different port allocated)
3565
        raise errors.OpPrereqError("Same secondary given, cannot execute"
3566
                                   " replacement")
3567
      # the user gave the current secondary, switch to
3568
      # 'no-replace-secondary' mode for drbd7
3569
      remote_node = None
3570
    if (instance.disk_template == constants.DT_REMOTE_RAID1 and
3571
        self.op.mode != constants.REPLACE_DISK_ALL):
3572
      raise errors.OpPrereqError("Template 'remote_raid1' only allows all"
3573
                                 " disks replacement, not individual ones")
3574
    if instance.disk_template == constants.DT_DRBD8:
3575
      if (self.op.mode == constants.REPLACE_DISK_ALL and
3576
          remote_node is not None):
3577
        # switch to replace secondary mode
3578
        self.op.mode = constants.REPLACE_DISK_SEC
3579

    
3580
      if self.op.mode == constants.REPLACE_DISK_ALL:
3581
        raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3582
                                   " secondary disk replacement, not"
3583
                                   " both at once")
3584
      elif self.op.mode == constants.REPLACE_DISK_PRI:
3585
        if remote_node is not None:
3586
          raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3587
                                     " the secondary while doing a primary"
3588
                                     " node disk replacement")
3589
        self.tgt_node = instance.primary_node
3590
        self.oth_node = instance.secondary_nodes[0]
3591
      elif self.op.mode == constants.REPLACE_DISK_SEC:
3592
        self.new_node = remote_node # this can be None, in which case
3593
                                    # we don't change the secondary
3594
        self.tgt_node = instance.secondary_nodes[0]
3595
        self.oth_node = instance.primary_node
3596
      else:
3597
        raise errors.ProgrammerError("Unhandled disk replace mode")
3598

    
3599
    for name in self.op.disks:
3600
      if instance.FindDisk(name) is None:
3601
        raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3602
                                   (name, instance.name))
3603
    self.op.remote_node = remote_node
3604

    
3605
  def _ExecRR1(self, feedback_fn):
3606
    """Replace the disks of an instance.
3607

3608
    """
3609
    instance = self.instance
3610
    iv_names = {}
3611
    # start of work
3612
    if self.op.remote_node is None:
3613
      remote_node = self.sec_node
3614
    else:
3615
      remote_node = self.op.remote_node
3616
    cfg = self.cfg
3617
    for dev in instance.disks:
3618
      size = dev.size
3619
      lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3620
      names = _GenerateUniqueNames(cfg, lv_names)
3621
      new_drbd = _GenerateMDDRBDBranch(cfg, instance.primary_node,
3622
                                       remote_node, size, names)
3623
      iv_names[dev.iv_name] = (dev, dev.children[0], new_drbd)
3624
      logger.Info("adding new mirror component on secondary for %s" %
3625
                  dev.iv_name)
3626
      #HARDCODE
3627
      if not _CreateBlockDevOnSecondary(cfg, remote_node, instance,
3628
                                        new_drbd, False,
3629
                                        _GetInstanceInfoText(instance)):
3630
        raise errors.OpExecError("Failed to create new component on secondary"
3631
                                 " node %s. Full abort, cleanup manually!" %
3632
                                 remote_node)
3633

    
3634
      logger.Info("adding new mirror component on primary")
3635
      #HARDCODE
3636
      if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
3637
                                      instance, new_drbd,
3638
                                      _GetInstanceInfoText(instance)):
3639
        # remove secondary dev
3640
        cfg.SetDiskID(new_drbd, remote_node)
3641
        rpc.call_blockdev_remove(remote_node, new_drbd)
3642
        raise errors.OpExecError("Failed to create volume on primary!"
3643
                                 " Full abort, cleanup manually!!")
3644

    
3645
      # the device exists now
3646
      # call the primary node to add the mirror to md
3647
      logger.Info("adding new mirror component to md")
3648
      if not rpc.call_blockdev_addchildren(instance.primary_node, dev,
3649
                                           [new_drbd]):
3650
        logger.Error("Can't add mirror compoment to md!")
3651
        cfg.SetDiskID(new_drbd, remote_node)
3652
        if not rpc.call_blockdev_remove(remote_node, new_drbd):
3653
          logger.Error("Can't rollback on secondary")
3654
        cfg.SetDiskID(new_drbd, instance.primary_node)
3655
        if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3656
          logger.Error("Can't rollback on primary")
3657
        raise errors.OpExecError("Full abort, cleanup manually!!")
3658

    
3659
      dev.children.append(new_drbd)
3660
      cfg.AddInstance(instance)
3661

    
3662
    # this can fail as the old devices are degraded and _WaitForSync
3663
    # does a combined result over all disks, so we don't check its
3664
    # return value
3665
    _WaitForSync(cfg, instance, self.proc, unlock=True)
3666

    
3667
    # so check manually all the devices
3668
    for name in iv_names:
3669
      dev, child, new_drbd = iv_names[name]
3670
      cfg.SetDiskID(dev, instance.primary_node)
3671
      is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3672
      if is_degr:
3673
        raise errors.OpExecError("MD device %s is degraded!" % name)
3674
      cfg.SetDiskID(new_drbd, instance.primary_node)
3675
      is_degr = rpc.call_blockdev_find(instance.primary_node, new_drbd)[5]
3676
      if is_degr:
3677
        raise errors.OpExecError("New drbd device %s is degraded!" % name)
3678

    
3679
    for name in iv_names:
3680
      dev, child, new_drbd = iv_names[name]
3681
      logger.Info("remove mirror %s component" % name)
3682
      cfg.SetDiskID(dev, instance.primary_node)
3683
      if not rpc.call_blockdev_removechildren(instance.primary_node,
3684
                                              dev, [child]):
3685
        logger.Error("Can't remove child from mirror, aborting"
3686
                     " *this device cleanup*.\nYou need to cleanup manually!!")
3687
        continue
3688

    
3689
      for node in child.logical_id[:2]:
3690
        logger.Info("remove child device on %s" % node)
3691
        cfg.SetDiskID(child, node)
3692
        if not rpc.call_blockdev_remove(node, child):
3693
          logger.Error("Warning: failed to remove device from node %s,"
3694
                       " continuing operation." % node)
3695

    
3696
      dev.children.remove(child)
3697

    
3698
      cfg.AddInstance(instance)
3699

    
3700
  def _ExecD8DiskOnly(self, feedback_fn):
3701
    """Replace a disk on the primary or secondary for dbrd8.
3702

3703
    The algorithm for replace is quite complicated:
3704
      - for each disk to be replaced:
3705
        - create new LVs on the target node with unique names
3706
        - detach old LVs from the drbd device
3707
        - rename old LVs to name_replaced.<time_t>
3708
        - rename new LVs to old LVs
3709
        - attach the new LVs (with the old names now) to the drbd device
3710
      - wait for sync across all devices
3711
      - for each modified disk:
3712
        - remove old LVs (which have the name name_replaces.<time_t>)
3713

3714
    Failures are not very well handled.
3715

3716
    """
3717
    steps_total = 6
3718
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3719
    instance = self.instance
3720
    iv_names = {}
3721
    vgname = self.cfg.GetVGName()
3722
    # start of work
3723
    cfg = self.cfg
3724
    tgt_node = self.tgt_node
3725
    oth_node = self.oth_node
3726

    
3727
    # Step: check device activation
3728
    self.proc.LogStep(1, steps_total, "check device existence")
3729
    info("checking volume groups")
3730
    my_vg = cfg.GetVGName()
3731
    results = rpc.call_vg_list([oth_node, tgt_node])
3732
    if not results:
3733
      raise errors.OpExecError("Can't list volume groups on the nodes")
3734
    for node in oth_node, tgt_node:
3735
      res = results.get(node, False)
3736
      if not res or my_vg not in res:
3737
        raise errors.OpExecError("Volume group '%s' not found on %s" %
3738
                                 (my_vg, node))
3739
    for dev in instance.disks:
3740
      if not dev.iv_name in self.op.disks:
3741
        continue
3742
      for node in tgt_node, oth_node:
3743
        info("checking %s on %s" % (dev.iv_name, node))
3744
        cfg.SetDiskID(dev, node)
3745
        if not rpc.call_blockdev_find(node, dev):
3746
          raise errors.OpExecError("Can't find device %s on node %s" %
3747
                                   (dev.iv_name, node))
3748

    
3749
    # Step: check other node consistency
3750
    self.proc.LogStep(2, steps_total, "check peer consistency")
3751
    for dev in instance.disks:
3752
      if not dev.iv_name in self.op.disks:
3753
        continue
3754
      info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3755
      if not _CheckDiskConsistency(self.cfg, dev, oth_node,
3756
                                   oth_node==instance.primary_node):
3757
        raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3758
                                 " to replace disks on this node (%s)" %
3759
                                 (oth_node, tgt_node))
3760

    
3761
    # Step: create new storage
3762
    self.proc.LogStep(3, steps_total, "allocate new storage")
3763
    for dev in instance.disks:
3764
      if not dev.iv_name in self.op.disks:
3765
        continue
3766
      size = dev.size
3767
      cfg.SetDiskID(dev, tgt_node)
3768
      lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3769
      names = _GenerateUniqueNames(cfg, lv_names)
3770
      lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3771
                             logical_id=(vgname, names[0]))
3772
      lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3773
                             logical_id=(vgname, names[1]))
3774
      new_lvs = [lv_data, lv_meta]
3775
      old_lvs = dev.children
3776
      iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3777
      info("creating new local storage on %s for %s" %
3778
           (tgt_node, dev.iv_name))
3779
      # since we *always* want to create this LV, we use the
3780
      # _Create...OnPrimary (which forces the creation), even if we
3781
      # are talking about the secondary node
3782
      for new_lv in new_lvs:
3783
        if not _CreateBlockDevOnPrimary(cfg, tgt_node, instance, new_lv,
3784
                                        _GetInstanceInfoText(instance)):
3785
          raise errors.OpExecError("Failed to create new LV named '%s' on"
3786
                                   " node '%s'" %
3787
                                   (new_lv.logical_id[1], tgt_node))
3788

    
3789
    # Step: for each lv, detach+rename*2+attach
3790
    self.proc.LogStep(4, steps_total, "change drbd configuration")
3791
    for dev, old_lvs, new_lvs in iv_names.itervalues():
3792
      info("detaching %s drbd from local storage" % dev.iv_name)
3793
      if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
3794
        raise errors.OpExecError("Can't detach drbd from local storage on node"
3795
                                 " %s for device %s" % (tgt_node, dev.iv_name))
3796
      #dev.children = []
3797
      #cfg.Update(instance)
3798

    
3799
      # ok, we created the new LVs, so now we know we have the needed
3800
      # storage; as such, we proceed on the target node to rename
3801
      # old_lv to _old, and new_lv to old_lv; note that we rename LVs
3802
      # using the assumption that logical_id == physical_id (which in
3803
      # turn is the unique_id on that node)
3804

    
3805
      # FIXME(iustin): use a better name for the replaced LVs
3806
      temp_suffix = int(time.time())
3807
      ren_fn = lambda d, suff: (d.physical_id[0],
3808
                                d.physical_id[1] + "_replaced-%s" % suff)
3809
      # build the rename list based on what LVs exist on the node
3810
      rlist = []
3811
      for to_ren in old_lvs:
3812
        find_res = rpc.call_blockdev_find(tgt_node, to_ren)
3813
        if find_res is not None: # device exists
3814
          rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
3815

    
3816
      info("renaming the old LVs on the target node")
3817
      if not rpc.call_blockdev_rename(tgt_node, rlist):
3818
        raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
3819
      # now we rename the new LVs to the old LVs
3820
      info("renaming the new LVs on the target node")
3821
      rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
3822
      if not rpc.call_blockdev_rename(tgt_node, rlist):
3823
        raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
3824

    
3825
      for old, new in zip(old_lvs, new_lvs):
3826
        new.logical_id = old.logical_id
3827
        cfg.SetDiskID(new, tgt_node)
3828

    
3829
      for disk in old_lvs:
3830
        disk.logical_id = ren_fn(disk, temp_suffix)
3831
        cfg.SetDiskID(disk, tgt_node)
3832

    
3833
      # now that the new lvs have the old name, we can add them to the device
3834
      info("adding new mirror component on %s" % tgt_node)
3835
      if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
3836
        for new_lv in new_lvs:
3837
          if not rpc.call_blockdev_remove(tgt_node, new_lv):
3838
            warning("Can't rollback device %s", hint="manually cleanup unused"
3839
                    " logical volumes")
3840
        raise errors.OpExecError("Can't add local storage to drbd")
3841

    
3842
      dev.children = new_lvs
3843
      cfg.Update(instance)
3844

    
3845
    # Step: wait for sync
3846

    
3847
    # this can fail as the old devices are degraded and _WaitForSync
3848
    # does a combined result over all disks, so we don't check its
3849
    # return value
3850
    self.proc.LogStep(5, steps_total, "sync devices")
3851
    _WaitForSync(cfg, instance, self.proc, unlock=True)
3852

    
3853
    # so check manually all the devices
3854
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3855
      cfg.SetDiskID(dev, instance.primary_node)
3856
      is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3857
      if is_degr:
3858
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
3859

    
3860
    # Step: remove old storage
3861
    self.proc.LogStep(6, steps_total, "removing old storage")
3862
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3863
      info("remove logical volumes for %s" % name)
3864
      for lv in old_lvs:
3865
        cfg.SetDiskID(lv, tgt_node)
3866
        if not rpc.call_blockdev_remove(tgt_node, lv):
3867
          warning("Can't remove old LV", hint="manually remove unused LVs")
3868
          continue
3869

    
3870
  def _ExecD8Secondary(self, feedback_fn):
3871
    """Replace the secondary node for drbd8.
3872

3873
    The algorithm for replace is quite complicated:
3874
      - for all disks of the instance:
3875
        - create new LVs on the new node with same names
3876
        - shutdown the drbd device on the old secondary
3877
        - disconnect the drbd network on the primary
3878
        - create the drbd device on the new secondary
3879
        - network attach the drbd on the primary, using an artifice:
3880
          the drbd code for Attach() will connect to the network if it
3881
          finds a device which is connected to the good local disks but
3882
          not network enabled
3883
      - wait for sync across all devices
3884
      - remove all disks from the old secondary
3885

3886
    Failures are not very well handled.
3887

3888
    """
3889
    steps_total = 6
3890
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3891
    instance = self.instance
3892
    iv_names = {}
3893
    vgname = self.cfg.GetVGName()
3894
    # start of work
3895
    cfg = self.cfg
3896
    old_node = self.tgt_node
3897
    new_node = self.new_node
3898
    pri_node = instance.primary_node
3899

    
3900
    # Step: check device activation
3901
    self.proc.LogStep(1, steps_total, "check device existence")
3902
    info("checking volume groups")
3903
    my_vg = cfg.GetVGName()
3904
    results = rpc.call_vg_list([pri_node, new_node])
3905
    if not results:
3906
      raise errors.OpExecError("Can't list volume groups on the nodes")
3907
    for node in pri_node, new_node:
3908
      res = results.get(node, False)
3909
      if not res or my_vg not in res:
3910
        raise errors.OpExecError("Volume group '%s' not found on %s" %
3911
                                 (my_vg, node))
3912
    for dev in instance.disks:
3913
      if not dev.iv_name in self.op.disks:
3914
        continue
3915
      info("checking %s on %s" % (dev.iv_name, pri_node))
3916
      cfg.SetDiskID(dev, pri_node)
3917
      if not rpc.call_blockdev_find(pri_node, dev):
3918
        raise errors.OpExecError("Can't find device %s on node %s" %
3919
                                 (dev.iv_name, pri_node))
3920

    
3921
    # Step: check other node consistency
3922
    self.proc.LogStep(2, steps_total, "check peer consistency")
3923
    for dev in instance.disks:
3924
      if not dev.iv_name in self.op.disks:
3925
        continue
3926
      info("checking %s consistency on %s" % (dev.iv_name, pri_node))
3927
      if not _CheckDiskConsistency(self.cfg, dev, pri_node, True, ldisk=True):
3928
        raise errors.OpExecError("Primary node (%s) has degraded storage,"
3929
                                 " unsafe to replace the secondary" %
3930
                                 pri_node)
3931

    
3932
    # Step: create new storage
3933
    self.proc.LogStep(3, steps_total, "allocate new storage")
3934
    for dev in instance.disks:
3935
      size = dev.size
3936
      info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
3937
      # since we *always* want to create this LV, we use the
3938
      # _Create...OnPrimary (which forces the creation), even if we
3939
      # are talking about the secondary node
3940
      for new_lv in dev.children:
3941
        if not _CreateBlockDevOnPrimary(cfg, new_node, instance, new_lv,
3942
                                        _GetInstanceInfoText(instance)):
3943
          raise errors.OpExecError("Failed to create new LV named '%s' on"
3944
                                   " node '%s'" %
3945
                                   (new_lv.logical_id[1], new_node))
3946

    
3947
      iv_names[dev.iv_name] = (dev, dev.children)
3948

    
3949
    self.proc.LogStep(4, steps_total, "changing drbd configuration")
3950
    for dev in instance.disks:
3951
      size = dev.size
3952
      info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
3953
      # create new devices on new_node
3954
      new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
3955
                              logical_id=(pri_node, new_node,
3956
                                          dev.logical_id[2]),
3957
                              children=dev.children)
3958
      if not _CreateBlockDevOnSecondary(cfg, new_node, instance,
3959
                                        new_drbd, False,
3960
                                      _GetInstanceInfoText(instance)):
3961
        raise errors.OpExecError("Failed to create new DRBD on"
3962
                                 " node '%s'" % new_node)
3963

    
3964
    for dev in instance.disks:
3965
      # we have new devices, shutdown the drbd on the old secondary
3966
      info("shutting down drbd for %s on old node" % dev.iv_name)
3967
      cfg.SetDiskID(dev, old_node)
3968
      if not rpc.call_blockdev_shutdown(old_node, dev):
3969
        warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
3970
                hint="Please cleanup this device manually as soon as possible")
3971

    
3972
    info("detaching primary drbds from the network (=> standalone)")
3973
    done = 0
3974
    for dev in instance.disks:
3975
      cfg.SetDiskID(dev, pri_node)
3976