Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ 515207af

History | View | Annotate | Download (153.2 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0613,W0201
25

    
26
import os
27
import os.path
28
import sha
29
import time
30
import tempfile
31
import re
32
import platform
33

    
34
from ganeti import rpc
35
from ganeti import ssh
36
from ganeti import logger
37
from ganeti import utils
38
from ganeti import errors
39
from ganeti import hypervisor
40
from ganeti import config
41
from ganeti import constants
42
from ganeti import objects
43
from ganeti import opcodes
44
from ganeti import ssconf
45

    
46
class LogicalUnit(object):
47
  """Logical Unit base class.
48

49
  Subclasses must follow these rules:
50
    - implement CheckPrereq which also fills in the opcode instance
51
      with all the fields (even if as None)
52
    - implement Exec
53
    - implement BuildHooksEnv
54
    - redefine HPATH and HTYPE
55
    - optionally redefine their run requirements (REQ_CLUSTER,
56
      REQ_MASTER); note that all commands require root permissions
57

58
  """
59
  HPATH = None
60
  HTYPE = None
61
  _OP_REQP = []
62
  REQ_CLUSTER = True
63
  REQ_MASTER = True
64

    
65
  def __init__(self, processor, op, cfg, sstore):
66
    """Constructor for LogicalUnit.
67

68
    This needs to be overriden in derived classes in order to check op
69
    validity.
70

71
    """
72
    self.proc = processor
73
    self.op = op
74
    self.cfg = cfg
75
    self.sstore = sstore
76
    for attr_name in self._OP_REQP:
77
      attr_val = getattr(op, attr_name, None)
78
      if attr_val is None:
79
        raise errors.OpPrereqError("Required parameter '%s' missing" %
80
                                   attr_name)
81
    if self.REQ_CLUSTER:
82
      if not cfg.IsCluster():
83
        raise errors.OpPrereqError("Cluster not initialized yet,"
84
                                   " use 'gnt-cluster init' first.")
85
      if self.REQ_MASTER:
86
        master = sstore.GetMasterNode()
87
        if master != utils.HostInfo().name:
88
          raise errors.OpPrereqError("Commands must be run on the master"
89
                                     " node %s" % master)
90

    
91
  def CheckPrereq(self):
92
    """Check prerequisites for this LU.
93

94
    This method should check that the prerequisites for the execution
95
    of this LU are fulfilled. It can do internode communication, but
96
    it should be idempotent - no cluster or system changes are
97
    allowed.
98

99
    The method should raise errors.OpPrereqError in case something is
100
    not fulfilled. Its return value is ignored.
101

102
    This method should also update all the parameters of the opcode to
103
    their canonical form; e.g. a short node name must be fully
104
    expanded after this method has successfully completed (so that
105
    hooks, logging, etc. work correctly).
106

107
    """
108
    raise NotImplementedError
109

    
110
  def Exec(self, feedback_fn):
111
    """Execute the LU.
112

113
    This method should implement the actual work. It should raise
114
    errors.OpExecError for failures that are somewhat dealt with in
115
    code, or expected.
116

117
    """
118
    raise NotImplementedError
119

    
120
  def BuildHooksEnv(self):
121
    """Build hooks environment for this LU.
122

123
    This method should return a three-node tuple consisting of: a dict
124
    containing the environment that will be used for running the
125
    specific hook for this LU, a list of node names on which the hook
126
    should run before the execution, and a list of node names on which
127
    the hook should run after the execution.
128

129
    The keys of the dict must not have 'GANETI_' prefixed as this will
130
    be handled in the hooks runner. Also note additional keys will be
131
    added by the hooks runner. If the LU doesn't define any
132
    environment, an empty dict (and not None) should be returned.
133

134
    As for the node lists, the master should not be included in the
135
    them, as it will be added by the hooks runner in case this LU
136
    requires a cluster to run on (otherwise we don't have a node
137
    list). No nodes should be returned as an empty list (and not
138
    None).
139

140
    Note that if the HPATH for a LU class is None, this function will
141
    not be called.
142

143
    """
144
    raise NotImplementedError
145

    
146

    
147
class NoHooksLU(LogicalUnit):
148
  """Simple LU which runs no hooks.
149

150
  This LU is intended as a parent for other LogicalUnits which will
151
  run no hooks, in order to reduce duplicate code.
152

153
  """
154
  HPATH = None
155
  HTYPE = None
156

    
157
  def BuildHooksEnv(self):
158
    """Build hooks env.
159

160
    This is a no-op, since we don't run hooks.
161

162
    """
163
    return {}, [], []
164

    
165

    
166
def _AddHostToEtcHosts(hostname):
167
  """Wrapper around utils.SetEtcHostsEntry.
168

169
  """
170
  hi = utils.HostInfo(name=hostname)
171
  utils.SetEtcHostsEntry(constants.ETC_HOSTS, hi.ip, hi.name, [hi.ShortName()])
172

    
173

    
174
def _RemoveHostFromEtcHosts(hostname):
175
  """Wrapper around utils.RemoveEtcHostsEntry.
176

177
  """
178
  hi = utils.HostInfo(name=hostname)
179
  utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.name)
180
  utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.ShortName())
181

    
182

    
183
def _GetWantedNodes(lu, nodes):
184
  """Returns list of checked and expanded node names.
185

186
  Args:
187
    nodes: List of nodes (strings) or None for all
188

189
  """
190
  if not isinstance(nodes, list):
191
    raise errors.OpPrereqError("Invalid argument type 'nodes'")
192

    
193
  if nodes:
194
    wanted = []
195

    
196
    for name in nodes:
197
      node = lu.cfg.ExpandNodeName(name)
198
      if node is None:
199
        raise errors.OpPrereqError("No such node name '%s'" % name)
200
      wanted.append(node)
201

    
202
  else:
203
    wanted = lu.cfg.GetNodeList()
204
  return utils.NiceSort(wanted)
205

    
206

    
207
def _GetWantedInstances(lu, instances):
208
  """Returns list of checked and expanded instance names.
209

210
  Args:
211
    instances: List of instances (strings) or None for all
212

213
  """
214
  if not isinstance(instances, list):
215
    raise errors.OpPrereqError("Invalid argument type 'instances'")
216

    
217
  if instances:
218
    wanted = []
219

    
220
    for name in instances:
221
      instance = lu.cfg.ExpandInstanceName(name)
222
      if instance is None:
223
        raise errors.OpPrereqError("No such instance name '%s'" % name)
224
      wanted.append(instance)
225

    
226
  else:
227
    wanted = lu.cfg.GetInstanceList()
228
  return utils.NiceSort(wanted)
229

    
230

    
231
def _CheckOutputFields(static, dynamic, selected):
232
  """Checks whether all selected fields are valid.
233

234
  Args:
235
    static: Static fields
236
    dynamic: Dynamic fields
237

238
  """
239
  static_fields = frozenset(static)
240
  dynamic_fields = frozenset(dynamic)
241

    
242
  all_fields = static_fields | dynamic_fields
243

    
244
  if not all_fields.issuperset(selected):
245
    raise errors.OpPrereqError("Unknown output fields selected: %s"
246
                               % ",".join(frozenset(selected).
247
                                          difference(all_fields)))
248

    
249

    
250
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
251
                          memory, vcpus, nics):
252
  """Builds instance related env variables for hooks from single variables.
253

254
  Args:
255
    secondary_nodes: List of secondary nodes as strings
256
  """
257
  env = {
258
    "OP_TARGET": name,
259
    "INSTANCE_NAME": name,
260
    "INSTANCE_PRIMARY": primary_node,
261
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
262
    "INSTANCE_OS_TYPE": os_type,
263
    "INSTANCE_STATUS": status,
264
    "INSTANCE_MEMORY": memory,
265
    "INSTANCE_VCPUS": vcpus,
266
  }
267

    
268
  if nics:
269
    nic_count = len(nics)
270
    for idx, (ip, bridge, mac) in enumerate(nics):
271
      if ip is None:
272
        ip = ""
273
      env["INSTANCE_NIC%d_IP" % idx] = ip
274
      env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
275
      env["INSTANCE_NIC%d_HWADDR" % idx] = mac
276
  else:
277
    nic_count = 0
278

    
279
  env["INSTANCE_NIC_COUNT"] = nic_count
280

    
281
  return env
282

    
283

    
284
def _BuildInstanceHookEnvByObject(instance, override=None):
285
  """Builds instance related env variables for hooks from an object.
286

287
  Args:
288
    instance: objects.Instance object of instance
289
    override: dict of values to override
290
  """
291
  args = {
292
    'name': instance.name,
293
    'primary_node': instance.primary_node,
294
    'secondary_nodes': instance.secondary_nodes,
295
    'os_type': instance.os,
296
    'status': instance.os,
297
    'memory': instance.memory,
298
    'vcpus': instance.vcpus,
299
    'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
300
  }
301
  if override:
302
    args.update(override)
303
  return _BuildInstanceHookEnv(**args)
304

    
305

    
306
def _UpdateKnownHosts(fullnode, ip, pubkey):
307
  """Ensure a node has a correct known_hosts entry.
308

309
  Args:
310
    fullnode - Fully qualified domain name of host. (str)
311
    ip       - IPv4 address of host (str)
312
    pubkey   - the public key of the cluster
313

314
  """
315
  if os.path.exists(constants.SSH_KNOWN_HOSTS_FILE):
316
    f = open(constants.SSH_KNOWN_HOSTS_FILE, 'r+')
317
  else:
318
    f = open(constants.SSH_KNOWN_HOSTS_FILE, 'w+')
319

    
320
  inthere = False
321

    
322
  save_lines = []
323
  add_lines = []
324
  removed = False
325

    
326
  for rawline in f:
327
    logger.Debug('read %s' % (repr(rawline),))
328

    
329
    parts = rawline.rstrip('\r\n').split()
330

    
331
    # Ignore unwanted lines
332
    if len(parts) >= 3 and not rawline.lstrip()[0] == '#':
333
      fields = parts[0].split(',')
334
      key = parts[2]
335

    
336
      haveall = True
337
      havesome = False
338
      for spec in [ ip, fullnode ]:
339
        if spec not in fields:
340
          haveall = False
341
        if spec in fields:
342
          havesome = True
343

    
344
      logger.Debug("key, pubkey = %s." % (repr((key, pubkey)),))
345
      if haveall and key == pubkey:
346
        inthere = True
347
        save_lines.append(rawline)
348
        logger.Debug("Keeping known_hosts '%s'." % (repr(rawline),))
349
        continue
350

    
351
      if havesome and (not haveall or key != pubkey):
352
        removed = True
353
        logger.Debug("Discarding known_hosts '%s'." % (repr(rawline),))
354
        continue
355

    
356
    save_lines.append(rawline)
357

    
358
  if not inthere:
359
    add_lines.append('%s,%s ssh-rsa %s\n' % (fullnode, ip, pubkey))
360
    logger.Debug("Adding known_hosts '%s'." % (repr(add_lines[-1]),))
361

    
362
  if removed:
363
    save_lines = save_lines + add_lines
364

    
365
    # Write a new file and replace old.
366
    fd, tmpname = tempfile.mkstemp('.tmp', 'known_hosts.',
367
                                   constants.DATA_DIR)
368
    newfile = os.fdopen(fd, 'w')
369
    try:
370
      newfile.write(''.join(save_lines))
371
    finally:
372
      newfile.close()
373
    logger.Debug("Wrote new known_hosts.")
374
    os.rename(tmpname, constants.SSH_KNOWN_HOSTS_FILE)
375

    
376
  elif add_lines:
377
    # Simply appending a new line will do the trick.
378
    f.seek(0, 2)
379
    for add in add_lines:
380
      f.write(add)
381

    
382
  f.close()
383

    
384

    
385
def _HasValidVG(vglist, vgname):
386
  """Checks if the volume group list is valid.
387

388
  A non-None return value means there's an error, and the return value
389
  is the error message.
390

391
  """
392
  vgsize = vglist.get(vgname, None)
393
  if vgsize is None:
394
    return "volume group '%s' missing" % vgname
395
  elif vgsize < 20480:
396
    return ("volume group '%s' too small (20480MiB required, %dMib found)" %
397
            (vgname, vgsize))
398
  return None
399

    
400

    
401
def _InitSSHSetup(node):
402
  """Setup the SSH configuration for the cluster.
403

404

405
  This generates a dsa keypair for root, adds the pub key to the
406
  permitted hosts and adds the hostkey to its own known hosts.
407

408
  Args:
409
    node: the name of this host as a fqdn
410

411
  """
412
  priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
413

    
414
  for name in priv_key, pub_key:
415
    if os.path.exists(name):
416
      utils.CreateBackup(name)
417
    utils.RemoveFile(name)
418

    
419
  result = utils.RunCmd(["ssh-keygen", "-t", "dsa",
420
                         "-f", priv_key,
421
                         "-q", "-N", ""])
422
  if result.failed:
423
    raise errors.OpExecError("Could not generate ssh keypair, error %s" %
424
                             result.output)
425

    
426
  f = open(pub_key, 'r')
427
  try:
428
    utils.AddAuthorizedKey(auth_keys, f.read(8192))
429
  finally:
430
    f.close()
431

    
432

    
433
def _InitGanetiServerSetup(ss):
434
  """Setup the necessary configuration for the initial node daemon.
435

436
  This creates the nodepass file containing the shared password for
437
  the cluster and also generates the SSL certificate.
438

439
  """
440
  # Create pseudo random password
441
  randpass = sha.new(os.urandom(64)).hexdigest()
442
  # and write it into sstore
443
  ss.SetKey(ss.SS_NODED_PASS, randpass)
444

    
445
  result = utils.RunCmd(["openssl", "req", "-new", "-newkey", "rsa:1024",
446
                         "-days", str(365*5), "-nodes", "-x509",
447
                         "-keyout", constants.SSL_CERT_FILE,
448
                         "-out", constants.SSL_CERT_FILE, "-batch"])
449
  if result.failed:
450
    raise errors.OpExecError("could not generate server ssl cert, command"
451
                             " %s had exitcode %s and error message %s" %
452
                             (result.cmd, result.exit_code, result.output))
453

    
454
  os.chmod(constants.SSL_CERT_FILE, 0400)
455

    
456
  result = utils.RunCmd([constants.NODE_INITD_SCRIPT, "restart"])
457

    
458
  if result.failed:
459
    raise errors.OpExecError("Could not start the node daemon, command %s"
460
                             " had exitcode %s and error %s" %
461
                             (result.cmd, result.exit_code, result.output))
462

    
463

    
464
def _CheckInstanceBridgesExist(instance):
465
  """Check that the brigdes needed by an instance exist.
466

467
  """
468
  # check bridges existance
469
  brlist = [nic.bridge for nic in instance.nics]
470
  if not rpc.call_bridges_exist(instance.primary_node, brlist):
471
    raise errors.OpPrereqError("one or more target bridges %s does not"
472
                               " exist on destination node '%s'" %
473
                               (brlist, instance.primary_node))
474

    
475

    
476
class LUInitCluster(LogicalUnit):
477
  """Initialise the cluster.
478

479
  """
480
  HPATH = "cluster-init"
481
  HTYPE = constants.HTYPE_CLUSTER
482
  _OP_REQP = ["cluster_name", "hypervisor_type", "vg_name", "mac_prefix",
483
              "def_bridge", "master_netdev"]
484
  REQ_CLUSTER = False
485

    
486
  def BuildHooksEnv(self):
487
    """Build hooks env.
488

489
    Notes: Since we don't require a cluster, we must manually add
490
    ourselves in the post-run node list.
491

492
    """
493
    env = {"OP_TARGET": self.op.cluster_name}
494
    return env, [], [self.hostname.name]
495

    
496
  def CheckPrereq(self):
497
    """Verify that the passed name is a valid one.
498

499
    """
500
    if config.ConfigWriter.IsCluster():
501
      raise errors.OpPrereqError("Cluster is already initialised")
502

    
503
    if self.op.hypervisor_type == constants.HT_XEN_HVM31:
504
      if not os.path.exists(constants.VNC_PASSWORD_FILE):
505
        raise errors.OpPrereqError("Please prepare the cluster VNC"
506
                                   "password file %s" %
507
                                   constants.VNC_PASSWORD_FILE)
508

    
509
    self.hostname = hostname = utils.HostInfo()
510

    
511
    if hostname.ip.startswith("127."):
512
      raise errors.OpPrereqError("This host's IP resolves to the private"
513
                                 " range (%s). Please fix DNS or /etc/hosts." %
514
                                 (hostname.ip,))
515

    
516
    self.clustername = clustername = utils.HostInfo(self.op.cluster_name)
517

    
518
    if not utils.TcpPing(constants.LOCALHOST_IP_ADDRESS, hostname.ip,
519
                         constants.DEFAULT_NODED_PORT):
520
      raise errors.OpPrereqError("Inconsistency: this host's name resolves"
521
                                 " to %s,\nbut this ip address does not"
522
                                 " belong to this host."
523
                                 " Aborting." % hostname.ip)
524

    
525
    secondary_ip = getattr(self.op, "secondary_ip", None)
526
    if secondary_ip and not utils.IsValidIP(secondary_ip):
527
      raise errors.OpPrereqError("Invalid secondary ip given")
528
    if (secondary_ip and
529
        secondary_ip != hostname.ip and
530
        (not utils.TcpPing(constants.LOCALHOST_IP_ADDRESS, secondary_ip,
531
                           constants.DEFAULT_NODED_PORT))):
532
      raise errors.OpPrereqError("You gave %s as secondary IP,"
533
                                 " but it does not belong to this host." %
534
                                 secondary_ip)
535
    self.secondary_ip = secondary_ip
536

    
537
    # checks presence of the volume group given
538
    vgstatus = _HasValidVG(utils.ListVolumeGroups(), self.op.vg_name)
539

    
540
    if vgstatus:
541
      raise errors.OpPrereqError("Error: %s" % vgstatus)
542

    
543
    if not re.match("^[0-9a-z]{2}:[0-9a-z]{2}:[0-9a-z]{2}$",
544
                    self.op.mac_prefix):
545
      raise errors.OpPrereqError("Invalid mac prefix given '%s'" %
546
                                 self.op.mac_prefix)
547

    
548
    if self.op.hypervisor_type not in constants.HYPER_TYPES:
549
      raise errors.OpPrereqError("Invalid hypervisor type given '%s'" %
550
                                 self.op.hypervisor_type)
551

    
552
    result = utils.RunCmd(["ip", "link", "show", "dev", self.op.master_netdev])
553
    if result.failed:
554
      raise errors.OpPrereqError("Invalid master netdev given (%s): '%s'" %
555
                                 (self.op.master_netdev,
556
                                  result.output.strip()))
557

    
558
    if not (os.path.isfile(constants.NODE_INITD_SCRIPT) and
559
            os.access(constants.NODE_INITD_SCRIPT, os.X_OK)):
560
      raise errors.OpPrereqError("Init.d script '%s' missing or not"
561
                                 " executable." % constants.NODE_INITD_SCRIPT)
562

    
563
  def Exec(self, feedback_fn):
564
    """Initialize the cluster.
565

566
    """
567
    clustername = self.clustername
568
    hostname = self.hostname
569

    
570
    # set up the simple store
571
    self.sstore = ss = ssconf.SimpleStore()
572
    ss.SetKey(ss.SS_HYPERVISOR, self.op.hypervisor_type)
573
    ss.SetKey(ss.SS_MASTER_NODE, hostname.name)
574
    ss.SetKey(ss.SS_MASTER_IP, clustername.ip)
575
    ss.SetKey(ss.SS_MASTER_NETDEV, self.op.master_netdev)
576
    ss.SetKey(ss.SS_CLUSTER_NAME, clustername.name)
577

    
578
    # set up the inter-node password and certificate
579
    _InitGanetiServerSetup(ss)
580

    
581
    # start the master ip
582
    rpc.call_node_start_master(hostname.name)
583

    
584
    # set up ssh config and /etc/hosts
585
    f = open(constants.SSH_HOST_RSA_PUB, 'r')
586
    try:
587
      sshline = f.read()
588
    finally:
589
      f.close()
590
    sshkey = sshline.split(" ")[1]
591

    
592
    _AddHostToEtcHosts(hostname.name)
593

    
594
    _UpdateKnownHosts(hostname.name, hostname.ip, sshkey)
595

    
596
    _InitSSHSetup(hostname.name)
597

    
598
    # init of cluster config file
599
    self.cfg = cfgw = config.ConfigWriter()
600
    cfgw.InitConfig(hostname.name, hostname.ip, self.secondary_ip,
601
                    sshkey, self.op.mac_prefix,
602
                    self.op.vg_name, self.op.def_bridge)
603

    
604

    
605
class LUDestroyCluster(NoHooksLU):
606
  """Logical unit for destroying the cluster.
607

608
  """
609
  _OP_REQP = []
610

    
611
  def CheckPrereq(self):
612
    """Check prerequisites.
613

614
    This checks whether the cluster is empty.
615

616
    Any errors are signalled by raising errors.OpPrereqError.
617

618
    """
619
    master = self.sstore.GetMasterNode()
620

    
621
    nodelist = self.cfg.GetNodeList()
622
    if len(nodelist) != 1 or nodelist[0] != master:
623
      raise errors.OpPrereqError("There are still %d node(s) in"
624
                                 " this cluster." % (len(nodelist) - 1))
625
    instancelist = self.cfg.GetInstanceList()
626
    if instancelist:
627
      raise errors.OpPrereqError("There are still %d instance(s) in"
628
                                 " this cluster." % len(instancelist))
629

    
630
  def Exec(self, feedback_fn):
631
    """Destroys the cluster.
632

633
    """
634
    master = self.sstore.GetMasterNode()
635
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
636
    utils.CreateBackup(priv_key)
637
    utils.CreateBackup(pub_key)
638
    rpc.call_node_leave_cluster(master)
639

    
640

    
641
class LUVerifyCluster(NoHooksLU):
642
  """Verifies the cluster status.
643

644
  """
645
  _OP_REQP = []
646

    
647
  def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
648
                  remote_version, feedback_fn):
649
    """Run multiple tests against a node.
650

651
    Test list:
652
      - compares ganeti version
653
      - checks vg existance and size > 20G
654
      - checks config file checksum
655
      - checks ssh to other nodes
656

657
    Args:
658
      node: name of the node to check
659
      file_list: required list of files
660
      local_cksum: dictionary of local files and their checksums
661

662
    """
663
    # compares ganeti version
664
    local_version = constants.PROTOCOL_VERSION
665
    if not remote_version:
666
      feedback_fn(" - ERROR: connection to %s failed" % (node))
667
      return True
668

    
669
    if local_version != remote_version:
670
      feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
671
                      (local_version, node, remote_version))
672
      return True
673

    
674
    # checks vg existance and size > 20G
675

    
676
    bad = False
677
    if not vglist:
678
      feedback_fn("  - ERROR: unable to check volume groups on node %s." %
679
                      (node,))
680
      bad = True
681
    else:
682
      vgstatus = _HasValidVG(vglist, self.cfg.GetVGName())
683
      if vgstatus:
684
        feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
685
        bad = True
686

    
687
    # checks config file checksum
688
    # checks ssh to any
689

    
690
    if 'filelist' not in node_result:
691
      bad = True
692
      feedback_fn("  - ERROR: node hasn't returned file checksum data")
693
    else:
694
      remote_cksum = node_result['filelist']
695
      for file_name in file_list:
696
        if file_name not in remote_cksum:
697
          bad = True
698
          feedback_fn("  - ERROR: file '%s' missing" % file_name)
699
        elif remote_cksum[file_name] != local_cksum[file_name]:
700
          bad = True
701
          feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
702

    
703
    if 'nodelist' not in node_result:
704
      bad = True
705
      feedback_fn("  - ERROR: node hasn't returned node connectivity data")
706
    else:
707
      if node_result['nodelist']:
708
        bad = True
709
        for node in node_result['nodelist']:
710
          feedback_fn("  - ERROR: communication with node '%s': %s" %
711
                          (node, node_result['nodelist'][node]))
712
    hyp_result = node_result.get('hypervisor', None)
713
    if hyp_result is not None:
714
      feedback_fn("  - ERROR: hypervisor verify failure: '%s'" % hyp_result)
715
    return bad
716

    
717
  def _VerifyInstance(self, instance, node_vol_is, node_instance, feedback_fn):
718
    """Verify an instance.
719

720
    This function checks to see if the required block devices are
721
    available on the instance's node.
722

723
    """
724
    bad = False
725

    
726
    instancelist = self.cfg.GetInstanceList()
727
    if not instance in instancelist:
728
      feedback_fn("  - ERROR: instance %s not in instance list %s" %
729
                      (instance, instancelist))
730
      bad = True
731

    
732
    instanceconfig = self.cfg.GetInstanceInfo(instance)
733
    node_current = instanceconfig.primary_node
734

    
735
    node_vol_should = {}
736
    instanceconfig.MapLVsByNode(node_vol_should)
737

    
738
    for node in node_vol_should:
739
      for volume in node_vol_should[node]:
740
        if node not in node_vol_is or volume not in node_vol_is[node]:
741
          feedback_fn("  - ERROR: volume %s missing on node %s" %
742
                          (volume, node))
743
          bad = True
744

    
745
    if not instanceconfig.status == 'down':
746
      if not instance in node_instance[node_current]:
747
        feedback_fn("  - ERROR: instance %s not running on node %s" %
748
                        (instance, node_current))
749
        bad = True
750

    
751
    for node in node_instance:
752
      if (not node == node_current):
753
        if instance in node_instance[node]:
754
          feedback_fn("  - ERROR: instance %s should not run on node %s" %
755
                          (instance, node))
756
          bad = True
757

    
758
    return bad
759

    
760
  def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
761
    """Verify if there are any unknown volumes in the cluster.
762

763
    The .os, .swap and backup volumes are ignored. All other volumes are
764
    reported as unknown.
765

766
    """
767
    bad = False
768

    
769
    for node in node_vol_is:
770
      for volume in node_vol_is[node]:
771
        if node not in node_vol_should or volume not in node_vol_should[node]:
772
          feedback_fn("  - ERROR: volume %s on node %s should not exist" %
773
                      (volume, node))
774
          bad = True
775
    return bad
776

    
777
  def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
778
    """Verify the list of running instances.
779

780
    This checks what instances are running but unknown to the cluster.
781

782
    """
783
    bad = False
784
    for node in node_instance:
785
      for runninginstance in node_instance[node]:
786
        if runninginstance not in instancelist:
787
          feedback_fn("  - ERROR: instance %s on node %s should not exist" %
788
                          (runninginstance, node))
789
          bad = True
790
    return bad
791

    
792
  def CheckPrereq(self):
793
    """Check prerequisites.
794

795
    This has no prerequisites.
796

797
    """
798
    pass
799

    
800
  def Exec(self, feedback_fn):
801
    """Verify integrity of cluster, performing various test on nodes.
802

803
    """
804
    bad = False
805
    feedback_fn("* Verifying global settings")
806
    for msg in self.cfg.VerifyConfig():
807
      feedback_fn("  - ERROR: %s" % msg)
808

    
809
    vg_name = self.cfg.GetVGName()
810
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
811
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
812
    node_volume = {}
813
    node_instance = {}
814

    
815
    # FIXME: verify OS list
816
    # do local checksums
817
    file_names = list(self.sstore.GetFileList())
818
    file_names.append(constants.SSL_CERT_FILE)
819
    file_names.append(constants.CLUSTER_CONF_FILE)
820
    local_checksums = utils.FingerprintFiles(file_names)
821

    
822
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
823
    all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
824
    all_instanceinfo = rpc.call_instance_list(nodelist)
825
    all_vglist = rpc.call_vg_list(nodelist)
826
    node_verify_param = {
827
      'filelist': file_names,
828
      'nodelist': nodelist,
829
      'hypervisor': None,
830
      }
831
    all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
832
    all_rversion = rpc.call_version(nodelist)
833

    
834
    for node in nodelist:
835
      feedback_fn("* Verifying node %s" % node)
836
      result = self._VerifyNode(node, file_names, local_checksums,
837
                                all_vglist[node], all_nvinfo[node],
838
                                all_rversion[node], feedback_fn)
839
      bad = bad or result
840

    
841
      # node_volume
842
      volumeinfo = all_volumeinfo[node]
843

    
844
      if isinstance(volumeinfo, basestring):
845
        feedback_fn("  - ERROR: LVM problem on node %s: %s" %
846
                    (node, volumeinfo[-400:].encode('string_escape')))
847
        bad = True
848
        node_volume[node] = {}
849
      elif not isinstance(volumeinfo, dict):
850
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
851
        bad = True
852
        continue
853
      else:
854
        node_volume[node] = volumeinfo
855

    
856
      # node_instance
857
      nodeinstance = all_instanceinfo[node]
858
      if type(nodeinstance) != list:
859
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
860
        bad = True
861
        continue
862

    
863
      node_instance[node] = nodeinstance
864

    
865
    node_vol_should = {}
866

    
867
    for instance in instancelist:
868
      feedback_fn("* Verifying instance %s" % instance)
869
      result =  self._VerifyInstance(instance, node_volume, node_instance,
870
                                     feedback_fn)
871
      bad = bad or result
872

    
873
      inst_config = self.cfg.GetInstanceInfo(instance)
874

    
875
      inst_config.MapLVsByNode(node_vol_should)
876

    
877
    feedback_fn("* Verifying orphan volumes")
878
    result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
879
                                       feedback_fn)
880
    bad = bad or result
881

    
882
    feedback_fn("* Verifying remaining instances")
883
    result = self._VerifyOrphanInstances(instancelist, node_instance,
884
                                         feedback_fn)
885
    bad = bad or result
886

    
887
    return int(bad)
888

    
889

    
890
class LUVerifyDisks(NoHooksLU):
891
  """Verifies the cluster disks status.
892

893
  """
894
  _OP_REQP = []
895

    
896
  def CheckPrereq(self):
897
    """Check prerequisites.
898

899
    This has no prerequisites.
900

901
    """
902
    pass
903

    
904
  def Exec(self, feedback_fn):
905
    """Verify integrity of cluster disks.
906

907
    """
908
    result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
909

    
910
    vg_name = self.cfg.GetVGName()
911
    nodes = utils.NiceSort(self.cfg.GetNodeList())
912
    instances = [self.cfg.GetInstanceInfo(name)
913
                 for name in self.cfg.GetInstanceList()]
914

    
915
    nv_dict = {}
916
    for inst in instances:
917
      inst_lvs = {}
918
      if (inst.status != "up" or
919
          inst.disk_template not in constants.DTS_NET_MIRROR):
920
        continue
921
      inst.MapLVsByNode(inst_lvs)
922
      # transform { iname: {node: [vol,],},} to {(node, vol): iname}
923
      for node, vol_list in inst_lvs.iteritems():
924
        for vol in vol_list:
925
          nv_dict[(node, vol)] = inst
926

    
927
    if not nv_dict:
928
      return result
929

    
930
    node_lvs = rpc.call_volume_list(nodes, vg_name)
931

    
932
    to_act = set()
933
    for node in nodes:
934
      # node_volume
935
      lvs = node_lvs[node]
936

    
937
      if isinstance(lvs, basestring):
938
        logger.Info("error enumerating LVs on node %s: %s" % (node, lvs))
939
        res_nlvm[node] = lvs
940
      elif not isinstance(lvs, dict):
941
        logger.Info("connection to node %s failed or invalid data returned" %
942
                    (node,))
943
        res_nodes.append(node)
944
        continue
945

    
946
      for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
947
        inst = nv_dict.pop((node, lv_name), None)
948
        if (not lv_online and inst is not None
949
            and inst.name not in res_instances):
950
          res_instances.append(inst.name)
951

    
952
    # any leftover items in nv_dict are missing LVs, let's arrange the
953
    # data better
954
    for key, inst in nv_dict.iteritems():
955
      if inst.name not in res_missing:
956
        res_missing[inst.name] = []
957
      res_missing[inst.name].append(key)
958

    
959
    return result
960

    
961

    
962
class LURenameCluster(LogicalUnit):
963
  """Rename the cluster.
964

965
  """
966
  HPATH = "cluster-rename"
967
  HTYPE = constants.HTYPE_CLUSTER
968
  _OP_REQP = ["name"]
969

    
970
  def BuildHooksEnv(self):
971
    """Build hooks env.
972

973
    """
974
    env = {
975
      "OP_TARGET": self.op.sstore.GetClusterName(),
976
      "NEW_NAME": self.op.name,
977
      }
978
    mn = self.sstore.GetMasterNode()
979
    return env, [mn], [mn]
980

    
981
  def CheckPrereq(self):
982
    """Verify that the passed name is a valid one.
983

984
    """
985
    hostname = utils.HostInfo(self.op.name)
986

    
987
    new_name = hostname.name
988
    self.ip = new_ip = hostname.ip
989
    old_name = self.sstore.GetClusterName()
990
    old_ip = self.sstore.GetMasterIP()
991
    if new_name == old_name and new_ip == old_ip:
992
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
993
                                 " cluster has changed")
994
    if new_ip != old_ip:
995
      result = utils.RunCmd(["fping", "-q", new_ip])
996
      if not result.failed:
997
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
998
                                   " reachable on the network. Aborting." %
999
                                   new_ip)
1000

    
1001
    self.op.name = new_name
1002

    
1003
  def Exec(self, feedback_fn):
1004
    """Rename the cluster.
1005

1006
    """
1007
    clustername = self.op.name
1008
    ip = self.ip
1009
    ss = self.sstore
1010

    
1011
    # shutdown the master IP
1012
    master = ss.GetMasterNode()
1013
    if not rpc.call_node_stop_master(master):
1014
      raise errors.OpExecError("Could not disable the master role")
1015

    
1016
    try:
1017
      # modify the sstore
1018
      ss.SetKey(ss.SS_MASTER_IP, ip)
1019
      ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
1020

    
1021
      # Distribute updated ss config to all nodes
1022
      myself = self.cfg.GetNodeInfo(master)
1023
      dist_nodes = self.cfg.GetNodeList()
1024
      if myself.name in dist_nodes:
1025
        dist_nodes.remove(myself.name)
1026

    
1027
      logger.Debug("Copying updated ssconf data to all nodes")
1028
      for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1029
        fname = ss.KeyToFilename(keyname)
1030
        result = rpc.call_upload_file(dist_nodes, fname)
1031
        for to_node in dist_nodes:
1032
          if not result[to_node]:
1033
            logger.Error("copy of file %s to node %s failed" %
1034
                         (fname, to_node))
1035
    finally:
1036
      if not rpc.call_node_start_master(master):
1037
        logger.Error("Could not re-enable the master role on the master,"
1038
                     " please restart manually.")
1039

    
1040

    
1041
def _WaitForSync(cfgw, instance, proc, oneshot=False, unlock=False):
1042
  """Sleep and poll for an instance's disk to sync.
1043

1044
  """
1045
  if not instance.disks:
1046
    return True
1047

    
1048
  if not oneshot:
1049
    proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1050

    
1051
  node = instance.primary_node
1052

    
1053
  for dev in instance.disks:
1054
    cfgw.SetDiskID(dev, node)
1055

    
1056
  retries = 0
1057
  while True:
1058
    max_time = 0
1059
    done = True
1060
    cumul_degraded = False
1061
    rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1062
    if not rstats:
1063
      proc.LogWarning("Can't get any data from node %s" % node)
1064
      retries += 1
1065
      if retries >= 10:
1066
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1067
                                 " aborting." % node)
1068
      time.sleep(6)
1069
      continue
1070
    retries = 0
1071
    for i in range(len(rstats)):
1072
      mstat = rstats[i]
1073
      if mstat is None:
1074
        proc.LogWarning("Can't compute data for node %s/%s" %
1075
                        (node, instance.disks[i].iv_name))
1076
        continue
1077
      # we ignore the ldisk parameter
1078
      perc_done, est_time, is_degraded, _ = mstat
1079
      cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1080
      if perc_done is not None:
1081
        done = False
1082
        if est_time is not None:
1083
          rem_time = "%d estimated seconds remaining" % est_time
1084
          max_time = est_time
1085
        else:
1086
          rem_time = "no time estimate"
1087
        proc.LogInfo("- device %s: %5.2f%% done, %s" %
1088
                     (instance.disks[i].iv_name, perc_done, rem_time))
1089
    if done or oneshot:
1090
      break
1091

    
1092
    if unlock:
1093
      utils.Unlock('cmd')
1094
    try:
1095
      time.sleep(min(60, max_time))
1096
    finally:
1097
      if unlock:
1098
        utils.Lock('cmd')
1099

    
1100
  if done:
1101
    proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1102
  return not cumul_degraded
1103

    
1104

    
1105
def _CheckDiskConsistency(cfgw, dev, node, on_primary, ldisk=False):
1106
  """Check that mirrors are not degraded.
1107

1108
  The ldisk parameter, if True, will change the test from the
1109
  is_degraded attribute (which represents overall non-ok status for
1110
  the device(s)) to the ldisk (representing the local storage status).
1111

1112
  """
1113
  cfgw.SetDiskID(dev, node)
1114
  if ldisk:
1115
    idx = 6
1116
  else:
1117
    idx = 5
1118

    
1119
  result = True
1120
  if on_primary or dev.AssembleOnSecondary():
1121
    rstats = rpc.call_blockdev_find(node, dev)
1122
    if not rstats:
1123
      logger.ToStderr("Can't get any data from node %s" % node)
1124
      result = False
1125
    else:
1126
      result = result and (not rstats[idx])
1127
  if dev.children:
1128
    for child in dev.children:
1129
      result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1130

    
1131
  return result
1132

    
1133

    
1134
class LUDiagnoseOS(NoHooksLU):
1135
  """Logical unit for OS diagnose/query.
1136

1137
  """
1138
  _OP_REQP = []
1139

    
1140
  def CheckPrereq(self):
1141
    """Check prerequisites.
1142

1143
    This always succeeds, since this is a pure query LU.
1144

1145
    """
1146
    return
1147

    
1148
  def Exec(self, feedback_fn):
1149
    """Compute the list of OSes.
1150

1151
    """
1152
    node_list = self.cfg.GetNodeList()
1153
    node_data = rpc.call_os_diagnose(node_list)
1154
    if node_data == False:
1155
      raise errors.OpExecError("Can't gather the list of OSes")
1156
    return node_data
1157

    
1158

    
1159
class LURemoveNode(LogicalUnit):
1160
  """Logical unit for removing a node.
1161

1162
  """
1163
  HPATH = "node-remove"
1164
  HTYPE = constants.HTYPE_NODE
1165
  _OP_REQP = ["node_name"]
1166

    
1167
  def BuildHooksEnv(self):
1168
    """Build hooks env.
1169

1170
    This doesn't run on the target node in the pre phase as a failed
1171
    node would not allows itself to run.
1172

1173
    """
1174
    env = {
1175
      "OP_TARGET": self.op.node_name,
1176
      "NODE_NAME": self.op.node_name,
1177
      }
1178
    all_nodes = self.cfg.GetNodeList()
1179
    all_nodes.remove(self.op.node_name)
1180
    return env, all_nodes, all_nodes
1181

    
1182
  def CheckPrereq(self):
1183
    """Check prerequisites.
1184

1185
    This checks:
1186
     - the node exists in the configuration
1187
     - it does not have primary or secondary instances
1188
     - it's not the master
1189

1190
    Any errors are signalled by raising errors.OpPrereqError.
1191

1192
    """
1193
    node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1194
    if node is None:
1195
      raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1196

    
1197
    instance_list = self.cfg.GetInstanceList()
1198

    
1199
    masternode = self.sstore.GetMasterNode()
1200
    if node.name == masternode:
1201
      raise errors.OpPrereqError("Node is the master node,"
1202
                                 " you need to failover first.")
1203

    
1204
    for instance_name in instance_list:
1205
      instance = self.cfg.GetInstanceInfo(instance_name)
1206
      if node.name == instance.primary_node:
1207
        raise errors.OpPrereqError("Instance %s still running on the node,"
1208
                                   " please remove first." % instance_name)
1209
      if node.name in instance.secondary_nodes:
1210
        raise errors.OpPrereqError("Instance %s has node as a secondary,"
1211
                                   " please remove first." % instance_name)
1212
    self.op.node_name = node.name
1213
    self.node = node
1214

    
1215
  def Exec(self, feedback_fn):
1216
    """Removes the node from the cluster.
1217

1218
    """
1219
    node = self.node
1220
    logger.Info("stopping the node daemon and removing configs from node %s" %
1221
                node.name)
1222

    
1223
    rpc.call_node_leave_cluster(node.name)
1224

    
1225
    ssh.SSHCall(node.name, 'root', "%s stop" % constants.NODE_INITD_SCRIPT)
1226

    
1227
    logger.Info("Removing node %s from config" % node.name)
1228

    
1229
    self.cfg.RemoveNode(node.name)
1230

    
1231
    _RemoveHostFromEtcHosts(node.name)
1232

    
1233

    
1234
class LUQueryNodes(NoHooksLU):
1235
  """Logical unit for querying nodes.
1236

1237
  """
1238
  _OP_REQP = ["output_fields", "names"]
1239

    
1240
  def CheckPrereq(self):
1241
    """Check prerequisites.
1242

1243
    This checks that the fields required are valid output fields.
1244

1245
    """
1246
    self.dynamic_fields = frozenset(["dtotal", "dfree",
1247
                                     "mtotal", "mnode", "mfree",
1248
                                     "bootid"])
1249

    
1250
    _CheckOutputFields(static=["name", "pinst_cnt", "sinst_cnt",
1251
                               "pinst_list", "sinst_list",
1252
                               "pip", "sip"],
1253
                       dynamic=self.dynamic_fields,
1254
                       selected=self.op.output_fields)
1255

    
1256
    self.wanted = _GetWantedNodes(self, self.op.names)
1257

    
1258
  def Exec(self, feedback_fn):
1259
    """Computes the list of nodes and their attributes.
1260

1261
    """
1262
    nodenames = self.wanted
1263
    nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1264

    
1265
    # begin data gathering
1266

    
1267
    if self.dynamic_fields.intersection(self.op.output_fields):
1268
      live_data = {}
1269
      node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1270
      for name in nodenames:
1271
        nodeinfo = node_data.get(name, None)
1272
        if nodeinfo:
1273
          live_data[name] = {
1274
            "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1275
            "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1276
            "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1277
            "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1278
            "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1279
            "bootid": nodeinfo['bootid'],
1280
            }
1281
        else:
1282
          live_data[name] = {}
1283
    else:
1284
      live_data = dict.fromkeys(nodenames, {})
1285

    
1286
    node_to_primary = dict([(name, set()) for name in nodenames])
1287
    node_to_secondary = dict([(name, set()) for name in nodenames])
1288

    
1289
    inst_fields = frozenset(("pinst_cnt", "pinst_list",
1290
                             "sinst_cnt", "sinst_list"))
1291
    if inst_fields & frozenset(self.op.output_fields):
1292
      instancelist = self.cfg.GetInstanceList()
1293

    
1294
      for instance_name in instancelist:
1295
        inst = self.cfg.GetInstanceInfo(instance_name)
1296
        if inst.primary_node in node_to_primary:
1297
          node_to_primary[inst.primary_node].add(inst.name)
1298
        for secnode in inst.secondary_nodes:
1299
          if secnode in node_to_secondary:
1300
            node_to_secondary[secnode].add(inst.name)
1301

    
1302
    # end data gathering
1303

    
1304
    output = []
1305
    for node in nodelist:
1306
      node_output = []
1307
      for field in self.op.output_fields:
1308
        if field == "name":
1309
          val = node.name
1310
        elif field == "pinst_list":
1311
          val = list(node_to_primary[node.name])
1312
        elif field == "sinst_list":
1313
          val = list(node_to_secondary[node.name])
1314
        elif field == "pinst_cnt":
1315
          val = len(node_to_primary[node.name])
1316
        elif field == "sinst_cnt":
1317
          val = len(node_to_secondary[node.name])
1318
        elif field == "pip":
1319
          val = node.primary_ip
1320
        elif field == "sip":
1321
          val = node.secondary_ip
1322
        elif field in self.dynamic_fields:
1323
          val = live_data[node.name].get(field, None)
1324
        else:
1325
          raise errors.ParameterError(field)
1326
        node_output.append(val)
1327
      output.append(node_output)
1328

    
1329
    return output
1330

    
1331

    
1332
class LUQueryNodeVolumes(NoHooksLU):
1333
  """Logical unit for getting volumes on node(s).
1334

1335
  """
1336
  _OP_REQP = ["nodes", "output_fields"]
1337

    
1338
  def CheckPrereq(self):
1339
    """Check prerequisites.
1340

1341
    This checks that the fields required are valid output fields.
1342

1343
    """
1344
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1345

    
1346
    _CheckOutputFields(static=["node"],
1347
                       dynamic=["phys", "vg", "name", "size", "instance"],
1348
                       selected=self.op.output_fields)
1349

    
1350

    
1351
  def Exec(self, feedback_fn):
1352
    """Computes the list of nodes and their attributes.
1353

1354
    """
1355
    nodenames = self.nodes
1356
    volumes = rpc.call_node_volumes(nodenames)
1357

    
1358
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
1359
             in self.cfg.GetInstanceList()]
1360

    
1361
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1362

    
1363
    output = []
1364
    for node in nodenames:
1365
      if node not in volumes or not volumes[node]:
1366
        continue
1367

    
1368
      node_vols = volumes[node][:]
1369
      node_vols.sort(key=lambda vol: vol['dev'])
1370

    
1371
      for vol in node_vols:
1372
        node_output = []
1373
        for field in self.op.output_fields:
1374
          if field == "node":
1375
            val = node
1376
          elif field == "phys":
1377
            val = vol['dev']
1378
          elif field == "vg":
1379
            val = vol['vg']
1380
          elif field == "name":
1381
            val = vol['name']
1382
          elif field == "size":
1383
            val = int(float(vol['size']))
1384
          elif field == "instance":
1385
            for inst in ilist:
1386
              if node not in lv_by_node[inst]:
1387
                continue
1388
              if vol['name'] in lv_by_node[inst][node]:
1389
                val = inst.name
1390
                break
1391
            else:
1392
              val = '-'
1393
          else:
1394
            raise errors.ParameterError(field)
1395
          node_output.append(str(val))
1396

    
1397
        output.append(node_output)
1398

    
1399
    return output
1400

    
1401

    
1402
class LUAddNode(LogicalUnit):
1403
  """Logical unit for adding node to the cluster.
1404

1405
  """
1406
  HPATH = "node-add"
1407
  HTYPE = constants.HTYPE_NODE
1408
  _OP_REQP = ["node_name"]
1409

    
1410
  def BuildHooksEnv(self):
1411
    """Build hooks env.
1412

1413
    This will run on all nodes before, and on all nodes + the new node after.
1414

1415
    """
1416
    env = {
1417
      "OP_TARGET": self.op.node_name,
1418
      "NODE_NAME": self.op.node_name,
1419
      "NODE_PIP": self.op.primary_ip,
1420
      "NODE_SIP": self.op.secondary_ip,
1421
      }
1422
    nodes_0 = self.cfg.GetNodeList()
1423
    nodes_1 = nodes_0 + [self.op.node_name, ]
1424
    return env, nodes_0, nodes_1
1425

    
1426
  def CheckPrereq(self):
1427
    """Check prerequisites.
1428

1429
    This checks:
1430
     - the new node is not already in the config
1431
     - it is resolvable
1432
     - its parameters (single/dual homed) matches the cluster
1433

1434
    Any errors are signalled by raising errors.OpPrereqError.
1435

1436
    """
1437
    node_name = self.op.node_name
1438
    cfg = self.cfg
1439

    
1440
    dns_data = utils.HostInfo(node_name)
1441

    
1442
    node = dns_data.name
1443
    primary_ip = self.op.primary_ip = dns_data.ip
1444
    secondary_ip = getattr(self.op, "secondary_ip", None)
1445
    if secondary_ip is None:
1446
      secondary_ip = primary_ip
1447
    if not utils.IsValidIP(secondary_ip):
1448
      raise errors.OpPrereqError("Invalid secondary IP given")
1449
    self.op.secondary_ip = secondary_ip
1450
    node_list = cfg.GetNodeList()
1451
    if node in node_list:
1452
      raise errors.OpPrereqError("Node %s is already in the configuration"
1453
                                 % node)
1454

    
1455
    for existing_node_name in node_list:
1456
      existing_node = cfg.GetNodeInfo(existing_node_name)
1457
      if (existing_node.primary_ip == primary_ip or
1458
          existing_node.secondary_ip == primary_ip or
1459
          existing_node.primary_ip == secondary_ip or
1460
          existing_node.secondary_ip == secondary_ip):
1461
        raise errors.OpPrereqError("New node ip address(es) conflict with"
1462
                                   " existing node %s" % existing_node.name)
1463

    
1464
    # check that the type of the node (single versus dual homed) is the
1465
    # same as for the master
1466
    myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1467
    master_singlehomed = myself.secondary_ip == myself.primary_ip
1468
    newbie_singlehomed = secondary_ip == primary_ip
1469
    if master_singlehomed != newbie_singlehomed:
1470
      if master_singlehomed:
1471
        raise errors.OpPrereqError("The master has no private ip but the"
1472
                                   " new node has one")
1473
      else:
1474
        raise errors.OpPrereqError("The master has a private ip but the"
1475
                                   " new node doesn't have one")
1476

    
1477
    # checks reachablity
1478
    if not utils.TcpPing(utils.HostInfo().name,
1479
                         primary_ip,
1480
                         constants.DEFAULT_NODED_PORT):
1481
      raise errors.OpPrereqError("Node not reachable by ping")
1482

    
1483
    if not newbie_singlehomed:
1484
      # check reachability from my secondary ip to newbie's secondary ip
1485
      if not utils.TcpPing(myself.secondary_ip,
1486
                           secondary_ip,
1487
                           constants.DEFAULT_NODED_PORT):
1488
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1489
                                   " based ping to noded port")
1490

    
1491
    self.new_node = objects.Node(name=node,
1492
                                 primary_ip=primary_ip,
1493
                                 secondary_ip=secondary_ip)
1494

    
1495
    if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1496
      if not os.path.exists(constants.VNC_PASSWORD_FILE):
1497
        raise errors.OpPrereqError("Cluster VNC password file %s missing" %
1498
                                   constants.VNC_PASSWORD_FILE)
1499

    
1500
  def Exec(self, feedback_fn):
1501
    """Adds the new node to the cluster.
1502

1503
    """
1504
    new_node = self.new_node
1505
    node = new_node.name
1506

    
1507
    # set up inter-node password and certificate and restarts the node daemon
1508
    gntpass = self.sstore.GetNodeDaemonPassword()
1509
    if not re.match('^[a-zA-Z0-9.]{1,64}$', gntpass):
1510
      raise errors.OpExecError("ganeti password corruption detected")
1511
    f = open(constants.SSL_CERT_FILE)
1512
    try:
1513
      gntpem = f.read(8192)
1514
    finally:
1515
      f.close()
1516
    # in the base64 pem encoding, neither '!' nor '.' are valid chars,
1517
    # so we use this to detect an invalid certificate; as long as the
1518
    # cert doesn't contain this, the here-document will be correctly
1519
    # parsed by the shell sequence below
1520
    if re.search('^!EOF\.', gntpem, re.MULTILINE):
1521
      raise errors.OpExecError("invalid PEM encoding in the SSL certificate")
1522
    if not gntpem.endswith("\n"):
1523
      raise errors.OpExecError("PEM must end with newline")
1524
    logger.Info("copy cluster pass to %s and starting the node daemon" % node)
1525

    
1526
    # and then connect with ssh to set password and start ganeti-noded
1527
    # note that all the below variables are sanitized at this point,
1528
    # either by being constants or by the checks above
1529
    ss = self.sstore
1530
    mycommand = ("umask 077 && "
1531
                 "echo '%s' > '%s' && "
1532
                 "cat > '%s' << '!EOF.' && \n"
1533
                 "%s!EOF.\n%s restart" %
1534
                 (gntpass, ss.KeyToFilename(ss.SS_NODED_PASS),
1535
                  constants.SSL_CERT_FILE, gntpem,
1536
                  constants.NODE_INITD_SCRIPT))
1537

    
1538
    result = ssh.SSHCall(node, 'root', mycommand, batch=False, ask_key=True)
1539
    if result.failed:
1540
      raise errors.OpExecError("Remote command on node %s, error: %s,"
1541
                               " output: %s" %
1542
                               (node, result.fail_reason, result.output))
1543

    
1544
    # check connectivity
1545
    time.sleep(4)
1546

    
1547
    result = rpc.call_version([node])[node]
1548
    if result:
1549
      if constants.PROTOCOL_VERSION == result:
1550
        logger.Info("communication to node %s fine, sw version %s match" %
1551
                    (node, result))
1552
      else:
1553
        raise errors.OpExecError("Version mismatch master version %s,"
1554
                                 " node version %s" %
1555
                                 (constants.PROTOCOL_VERSION, result))
1556
    else:
1557
      raise errors.OpExecError("Cannot get version from the new node")
1558

    
1559
    # setup ssh on node
1560
    logger.Info("copy ssh key to node %s" % node)
1561
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1562
    keyarray = []
1563
    keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1564
                constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1565
                priv_key, pub_key]
1566

    
1567
    for i in keyfiles:
1568
      f = open(i, 'r')
1569
      try:
1570
        keyarray.append(f.read())
1571
      finally:
1572
        f.close()
1573

    
1574
    result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1575
                               keyarray[3], keyarray[4], keyarray[5])
1576

    
1577
    if not result:
1578
      raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1579

    
1580
    # Add node to our /etc/hosts, and add key to known_hosts
1581
    _AddHostToEtcHosts(new_node.name)
1582

    
1583
    _UpdateKnownHosts(new_node.name, new_node.primary_ip,
1584
                      self.cfg.GetHostKey())
1585

    
1586
    if new_node.secondary_ip != new_node.primary_ip:
1587
      if not rpc.call_node_tcp_ping(new_node.name,
1588
                                    constants.LOCALHOST_IP_ADDRESS,
1589
                                    new_node.secondary_ip,
1590
                                    constants.DEFAULT_NODED_PORT,
1591
                                    10, False):
1592
        raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1593
                                 " you gave (%s). Please fix and re-run this"
1594
                                 " command." % new_node.secondary_ip)
1595

    
1596
    success, msg = ssh.VerifyNodeHostname(node)
1597
    if not success:
1598
      raise errors.OpExecError("Node '%s' claims it has a different hostname"
1599
                               " than the one the resolver gives: %s."
1600
                               " Please fix and re-run this command." %
1601
                               (node, msg))
1602

    
1603
    # Distribute updated /etc/hosts and known_hosts to all nodes,
1604
    # including the node just added
1605
    myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1606
    dist_nodes = self.cfg.GetNodeList() + [node]
1607
    if myself.name in dist_nodes:
1608
      dist_nodes.remove(myself.name)
1609

    
1610
    logger.Debug("Copying hosts and known_hosts to all nodes")
1611
    for fname in ("/etc/hosts", constants.SSH_KNOWN_HOSTS_FILE):
1612
      result = rpc.call_upload_file(dist_nodes, fname)
1613
      for to_node in dist_nodes:
1614
        if not result[to_node]:
1615
          logger.Error("copy of file %s to node %s failed" %
1616
                       (fname, to_node))
1617

    
1618
    to_copy = ss.GetFileList()
1619
    if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1620
      to_copy.append(constants.VNC_PASSWORD_FILE)
1621
    for fname in to_copy:
1622
      if not ssh.CopyFileToNode(node, fname):
1623
        logger.Error("could not copy file %s to node %s" % (fname, node))
1624

    
1625
    logger.Info("adding node %s to cluster.conf" % node)
1626
    self.cfg.AddNode(new_node)
1627

    
1628

    
1629
class LUMasterFailover(LogicalUnit):
1630
  """Failover the master node to the current node.
1631

1632
  This is a special LU in that it must run on a non-master node.
1633

1634
  """
1635
  HPATH = "master-failover"
1636
  HTYPE = constants.HTYPE_CLUSTER
1637
  REQ_MASTER = False
1638
  _OP_REQP = []
1639

    
1640
  def BuildHooksEnv(self):
1641
    """Build hooks env.
1642

1643
    This will run on the new master only in the pre phase, and on all
1644
    the nodes in the post phase.
1645

1646
    """
1647
    env = {
1648
      "OP_TARGET": self.new_master,
1649
      "NEW_MASTER": self.new_master,
1650
      "OLD_MASTER": self.old_master,
1651
      }
1652
    return env, [self.new_master], self.cfg.GetNodeList()
1653

    
1654
  def CheckPrereq(self):
1655
    """Check prerequisites.
1656

1657
    This checks that we are not already the master.
1658

1659
    """
1660
    self.new_master = utils.HostInfo().name
1661
    self.old_master = self.sstore.GetMasterNode()
1662

    
1663
    if self.old_master == self.new_master:
1664
      raise errors.OpPrereqError("This commands must be run on the node"
1665
                                 " where you want the new master to be."
1666
                                 " %s is already the master" %
1667
                                 self.old_master)
1668

    
1669
  def Exec(self, feedback_fn):
1670
    """Failover the master node.
1671

1672
    This command, when run on a non-master node, will cause the current
1673
    master to cease being master, and the non-master to become new
1674
    master.
1675

1676
    """
1677
    #TODO: do not rely on gethostname returning the FQDN
1678
    logger.Info("setting master to %s, old master: %s" %
1679
                (self.new_master, self.old_master))
1680

    
1681
    if not rpc.call_node_stop_master(self.old_master):
1682
      logger.Error("could disable the master role on the old master"
1683
                   " %s, please disable manually" % self.old_master)
1684

    
1685
    ss = self.sstore
1686
    ss.SetKey(ss.SS_MASTER_NODE, self.new_master)
1687
    if not rpc.call_upload_file(self.cfg.GetNodeList(),
1688
                                ss.KeyToFilename(ss.SS_MASTER_NODE)):
1689
      logger.Error("could not distribute the new simple store master file"
1690
                   " to the other nodes, please check.")
1691

    
1692
    if not rpc.call_node_start_master(self.new_master):
1693
      logger.Error("could not start the master role on the new master"
1694
                   " %s, please check" % self.new_master)
1695
      feedback_fn("Error in activating the master IP on the new master,"
1696
                  " please fix manually.")
1697

    
1698

    
1699

    
1700
class LUQueryClusterInfo(NoHooksLU):
1701
  """Query cluster configuration.
1702

1703
  """
1704
  _OP_REQP = []
1705
  REQ_MASTER = False
1706

    
1707
  def CheckPrereq(self):
1708
    """No prerequsites needed for this LU.
1709

1710
    """
1711
    pass
1712

    
1713
  def Exec(self, feedback_fn):
1714
    """Return cluster config.
1715

1716
    """
1717
    result = {
1718
      "name": self.sstore.GetClusterName(),
1719
      "software_version": constants.RELEASE_VERSION,
1720
      "protocol_version": constants.PROTOCOL_VERSION,
1721
      "config_version": constants.CONFIG_VERSION,
1722
      "os_api_version": constants.OS_API_VERSION,
1723
      "export_version": constants.EXPORT_VERSION,
1724
      "master": self.sstore.GetMasterNode(),
1725
      "architecture": (platform.architecture()[0], platform.machine()),
1726
      }
1727

    
1728
    return result
1729

    
1730

    
1731
class LUClusterCopyFile(NoHooksLU):
1732
  """Copy file to cluster.
1733

1734
  """
1735
  _OP_REQP = ["nodes", "filename"]
1736

    
1737
  def CheckPrereq(self):
1738
    """Check prerequisites.
1739

1740
    It should check that the named file exists and that the given list
1741
    of nodes is valid.
1742

1743
    """
1744
    if not os.path.exists(self.op.filename):
1745
      raise errors.OpPrereqError("No such filename '%s'" % self.op.filename)
1746

    
1747
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1748

    
1749
  def Exec(self, feedback_fn):
1750
    """Copy a file from master to some nodes.
1751

1752
    Args:
1753
      opts - class with options as members
1754
      args - list containing a single element, the file name
1755
    Opts used:
1756
      nodes - list containing the name of target nodes; if empty, all nodes
1757

1758
    """
1759
    filename = self.op.filename
1760

    
1761
    myname = utils.HostInfo().name
1762

    
1763
    for node in self.nodes:
1764
      if node == myname:
1765
        continue
1766
      if not ssh.CopyFileToNode(node, filename):
1767
        logger.Error("Copy of file %s to node %s failed" % (filename, node))
1768

    
1769

    
1770
class LUDumpClusterConfig(NoHooksLU):
1771
  """Return a text-representation of the cluster-config.
1772

1773
  """
1774
  _OP_REQP = []
1775

    
1776
  def CheckPrereq(self):
1777
    """No prerequisites.
1778

1779
    """
1780
    pass
1781

    
1782
  def Exec(self, feedback_fn):
1783
    """Dump a representation of the cluster config to the standard output.
1784

1785
    """
1786
    return self.cfg.DumpConfig()
1787

    
1788

    
1789
class LURunClusterCommand(NoHooksLU):
1790
  """Run a command on some nodes.
1791

1792
  """
1793
  _OP_REQP = ["command", "nodes"]
1794

    
1795
  def CheckPrereq(self):
1796
    """Check prerequisites.
1797

1798
    It checks that the given list of nodes is valid.
1799

1800
    """
1801
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1802

    
1803
  def Exec(self, feedback_fn):
1804
    """Run a command on some nodes.
1805

1806
    """
1807
    data = []
1808
    for node in self.nodes:
1809
      result = ssh.SSHCall(node, "root", self.op.command)
1810
      data.append((node, result.output, result.exit_code))
1811

    
1812
    return data
1813

    
1814

    
1815
class LUActivateInstanceDisks(NoHooksLU):
1816
  """Bring up an instance's disks.
1817

1818
  """
1819
  _OP_REQP = ["instance_name"]
1820

    
1821
  def CheckPrereq(self):
1822
    """Check prerequisites.
1823

1824
    This checks that the instance is in the cluster.
1825

1826
    """
1827
    instance = self.cfg.GetInstanceInfo(
1828
      self.cfg.ExpandInstanceName(self.op.instance_name))
1829
    if instance is None:
1830
      raise errors.OpPrereqError("Instance '%s' not known" %
1831
                                 self.op.instance_name)
1832
    self.instance = instance
1833

    
1834

    
1835
  def Exec(self, feedback_fn):
1836
    """Activate the disks.
1837

1838
    """
1839
    disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
1840
    if not disks_ok:
1841
      raise errors.OpExecError("Cannot activate block devices")
1842

    
1843
    return disks_info
1844

    
1845

    
1846
def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
1847
  """Prepare the block devices for an instance.
1848

1849
  This sets up the block devices on all nodes.
1850

1851
  Args:
1852
    instance: a ganeti.objects.Instance object
1853
    ignore_secondaries: if true, errors on secondary nodes won't result
1854
                        in an error return from the function
1855

1856
  Returns:
1857
    false if the operation failed
1858
    list of (host, instance_visible_name, node_visible_name) if the operation
1859
         suceeded with the mapping from node devices to instance devices
1860
  """
1861
  device_info = []
1862
  disks_ok = True
1863
  iname = instance.name
1864
  # With the two passes mechanism we try to reduce the window of
1865
  # opportunity for the race condition of switching DRBD to primary
1866
  # before handshaking occured, but we do not eliminate it
1867

    
1868
  # The proper fix would be to wait (with some limits) until the
1869
  # connection has been made and drbd transitions from WFConnection
1870
  # into any other network-connected state (Connected, SyncTarget,
1871
  # SyncSource, etc.)
1872

    
1873
  # 1st pass, assemble on all nodes in secondary mode
1874
  for inst_disk in instance.disks:
1875
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1876
      cfg.SetDiskID(node_disk, node)
1877
      result = rpc.call_blockdev_assemble(node, node_disk, iname, False)
1878
      if not result:
1879
        logger.Error("could not prepare block device %s on node %s"
1880
                     " (is_primary=False, pass=1)" % (inst_disk.iv_name, node))
1881
        if not ignore_secondaries:
1882
          disks_ok = False
1883

    
1884
  # FIXME: race condition on drbd migration to primary
1885

    
1886
  # 2nd pass, do only the primary node
1887
  for inst_disk in instance.disks:
1888
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1889
      if node != instance.primary_node:
1890
        continue
1891
      cfg.SetDiskID(node_disk, node)
1892
      result = rpc.call_blockdev_assemble(node, node_disk, iname, True)
1893
      if not result:
1894
        logger.Error("could not prepare block device %s on node %s"
1895
                     " (is_primary=True, pass=2)" % (inst_disk.iv_name, node))
1896
        disks_ok = False
1897
    device_info.append((instance.primary_node, inst_disk.iv_name, result))
1898

    
1899
  # leave the disks configured for the primary node
1900
  # this is a workaround that would be fixed better by
1901
  # improving the logical/physical id handling
1902
  for disk in instance.disks:
1903
    cfg.SetDiskID(disk, instance.primary_node)
1904

    
1905
  return disks_ok, device_info
1906

    
1907

    
1908
def _StartInstanceDisks(cfg, instance, force):
1909
  """Start the disks of an instance.
1910

1911
  """
1912
  disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
1913
                                           ignore_secondaries=force)
1914
  if not disks_ok:
1915
    _ShutdownInstanceDisks(instance, cfg)
1916
    if force is not None and not force:
1917
      logger.Error("If the message above refers to a secondary node,"
1918
                   " you can retry the operation using '--force'.")
1919
    raise errors.OpExecError("Disk consistency error")
1920

    
1921

    
1922
class LUDeactivateInstanceDisks(NoHooksLU):
1923
  """Shutdown an instance's disks.
1924

1925
  """
1926
  _OP_REQP = ["instance_name"]
1927

    
1928
  def CheckPrereq(self):
1929
    """Check prerequisites.
1930

1931
    This checks that the instance is in the cluster.
1932

1933
    """
1934
    instance = self.cfg.GetInstanceInfo(
1935
      self.cfg.ExpandInstanceName(self.op.instance_name))
1936
    if instance is None:
1937
      raise errors.OpPrereqError("Instance '%s' not known" %
1938
                                 self.op.instance_name)
1939
    self.instance = instance
1940

    
1941
  def Exec(self, feedback_fn):
1942
    """Deactivate the disks
1943

1944
    """
1945
    instance = self.instance
1946
    ins_l = rpc.call_instance_list([instance.primary_node])
1947
    ins_l = ins_l[instance.primary_node]
1948
    if not type(ins_l) is list:
1949
      raise errors.OpExecError("Can't contact node '%s'" %
1950
                               instance.primary_node)
1951

    
1952
    if self.instance.name in ins_l:
1953
      raise errors.OpExecError("Instance is running, can't shutdown"
1954
                               " block devices.")
1955

    
1956
    _ShutdownInstanceDisks(instance, self.cfg)
1957

    
1958

    
1959
def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
1960
  """Shutdown block devices of an instance.
1961

1962
  This does the shutdown on all nodes of the instance.
1963

1964
  If the ignore_primary is false, errors on the primary node are
1965
  ignored.
1966

1967
  """
1968
  result = True
1969
  for disk in instance.disks:
1970
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
1971
      cfg.SetDiskID(top_disk, node)
1972
      if not rpc.call_blockdev_shutdown(node, top_disk):
1973
        logger.Error("could not shutdown block device %s on node %s" %
1974
                     (disk.iv_name, node))
1975
        if not ignore_primary or node != instance.primary_node:
1976
          result = False
1977
  return result
1978

    
1979

    
1980
def _CheckNodeFreeMemory(cfg, node, reason, requested):
1981
  """Checks if a node has enough free memory.
1982

1983
  This function check if a given node has the needed amount of free
1984
  memory. In case the node has less memory or we cannot get the
1985
  information from the node, this function raise an OpPrereqError
1986
  exception.
1987

1988
  Args:
1989
    - cfg: a ConfigWriter instance
1990
    - node: the node name
1991
    - reason: string to use in the error message
1992
    - requested: the amount of memory in MiB
1993

1994
  """
1995
  nodeinfo = rpc.call_node_info([node], cfg.GetVGName())
1996
  if not nodeinfo or not isinstance(nodeinfo, dict):
1997
    raise errors.OpPrereqError("Could not contact node %s for resource"
1998
                             " information" % (node,))
1999

    
2000
  free_mem = nodeinfo[node].get('memory_free')
2001
  if not isinstance(free_mem, int):
2002
    raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2003
                             " was '%s'" % (node, free_mem))
2004
  if requested > free_mem:
2005
    raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2006
                             " needed %s MiB, available %s MiB" %
2007
                             (node, reason, requested, free_mem))
2008

    
2009

    
2010
class LUStartupInstance(LogicalUnit):
2011
  """Starts an instance.
2012

2013
  """
2014
  HPATH = "instance-start"
2015
  HTYPE = constants.HTYPE_INSTANCE
2016
  _OP_REQP = ["instance_name", "force"]
2017

    
2018
  def BuildHooksEnv(self):
2019
    """Build hooks env.
2020

2021
    This runs on master, primary and secondary nodes of the instance.
2022

2023
    """
2024
    env = {
2025
      "FORCE": self.op.force,
2026
      }
2027
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2028
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2029
          list(self.instance.secondary_nodes))
2030
    return env, nl, nl
2031

    
2032
  def CheckPrereq(self):
2033
    """Check prerequisites.
2034

2035
    This checks that the instance is in the cluster.
2036

2037
    """
2038
    instance = self.cfg.GetInstanceInfo(
2039
      self.cfg.ExpandInstanceName(self.op.instance_name))
2040
    if instance is None:
2041
      raise errors.OpPrereqError("Instance '%s' not known" %
2042
                                 self.op.instance_name)
2043

    
2044
    # check bridges existance
2045
    _CheckInstanceBridgesExist(instance)
2046

    
2047
    _CheckNodeFreeMemory(self.cfg, instance.primary_node,
2048
                         "starting instance %s" % instance.name,
2049
                         instance.memory)
2050

    
2051
    self.instance = instance
2052
    self.op.instance_name = instance.name
2053

    
2054
  def Exec(self, feedback_fn):
2055
    """Start the instance.
2056

2057
    """
2058
    instance = self.instance
2059
    force = self.op.force
2060
    extra_args = getattr(self.op, "extra_args", "")
2061

    
2062
    node_current = instance.primary_node
2063

    
2064
    _StartInstanceDisks(self.cfg, instance, force)
2065

    
2066
    if not rpc.call_instance_start(node_current, instance, extra_args):
2067
      _ShutdownInstanceDisks(instance, self.cfg)
2068
      raise errors.OpExecError("Could not start instance")
2069

    
2070
    self.cfg.MarkInstanceUp(instance.name)
2071

    
2072

    
2073
class LURebootInstance(LogicalUnit):
2074
  """Reboot an instance.
2075

2076
  """
2077
  HPATH = "instance-reboot"
2078
  HTYPE = constants.HTYPE_INSTANCE
2079
  _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2080

    
2081
  def BuildHooksEnv(self):
2082
    """Build hooks env.
2083

2084
    This runs on master, primary and secondary nodes of the instance.
2085

2086
    """
2087
    env = {
2088
      "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2089
      }
2090
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2091
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2092
          list(self.instance.secondary_nodes))
2093
    return env, nl, nl
2094

    
2095
  def CheckPrereq(self):
2096
    """Check prerequisites.
2097

2098
    This checks that the instance is in the cluster.
2099

2100
    """
2101
    instance = self.cfg.GetInstanceInfo(
2102
      self.cfg.ExpandInstanceName(self.op.instance_name))
2103
    if instance is None:
2104
      raise errors.OpPrereqError("Instance '%s' not known" %
2105
                                 self.op.instance_name)
2106

    
2107
    # check bridges existance
2108
    _CheckInstanceBridgesExist(instance)
2109

    
2110
    self.instance = instance
2111
    self.op.instance_name = instance.name
2112

    
2113
  def Exec(self, feedback_fn):
2114
    """Reboot the instance.
2115

2116
    """
2117
    instance = self.instance
2118
    ignore_secondaries = self.op.ignore_secondaries
2119
    reboot_type = self.op.reboot_type
2120
    extra_args = getattr(self.op, "extra_args", "")
2121

    
2122
    node_current = instance.primary_node
2123

    
2124
    if reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2125
                           constants.INSTANCE_REBOOT_HARD,
2126
                           constants.INSTANCE_REBOOT_FULL]:
2127
      raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2128
                                  (constants.INSTANCE_REBOOT_SOFT,
2129
                                   constants.INSTANCE_REBOOT_HARD,
2130
                                   constants.INSTANCE_REBOOT_FULL))
2131

    
2132
    if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2133
                       constants.INSTANCE_REBOOT_HARD]:
2134
      if not rpc.call_instance_reboot(node_current, instance,
2135
                                      reboot_type, extra_args):
2136
        raise errors.OpExecError("Could not reboot instance")
2137
    else:
2138
      if not rpc.call_instance_shutdown(node_current, instance):
2139
        raise errors.OpExecError("could not shutdown instance for full reboot")
2140
      _ShutdownInstanceDisks(instance, self.cfg)
2141
      _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
2142
      if not rpc.call_instance_start(node_current, instance, extra_args):
2143
        _ShutdownInstanceDisks(instance, self.cfg)
2144
        raise errors.OpExecError("Could not start instance for full reboot")
2145

    
2146
    self.cfg.MarkInstanceUp(instance.name)
2147

    
2148

    
2149
class LUShutdownInstance(LogicalUnit):
2150
  """Shutdown an instance.
2151

2152
  """
2153
  HPATH = "instance-stop"
2154
  HTYPE = constants.HTYPE_INSTANCE
2155
  _OP_REQP = ["instance_name"]
2156

    
2157
  def BuildHooksEnv(self):
2158
    """Build hooks env.
2159

2160
    This runs on master, primary and secondary nodes of the instance.
2161

2162
    """
2163
    env = _BuildInstanceHookEnvByObject(self.instance)
2164
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2165
          list(self.instance.secondary_nodes))
2166
    return env, nl, nl
2167

    
2168
  def CheckPrereq(self):
2169
    """Check prerequisites.
2170

2171
    This checks that the instance is in the cluster.
2172

2173
    """
2174
    instance = self.cfg.GetInstanceInfo(
2175
      self.cfg.ExpandInstanceName(self.op.instance_name))
2176
    if instance is None:
2177
      raise errors.OpPrereqError("Instance '%s' not known" %
2178
                                 self.op.instance_name)
2179
    self.instance = instance
2180

    
2181
  def Exec(self, feedback_fn):
2182
    """Shutdown the instance.
2183

2184
    """
2185
    instance = self.instance
2186
    node_current = instance.primary_node
2187
    if not rpc.call_instance_shutdown(node_current, instance):
2188
      logger.Error("could not shutdown instance")
2189

    
2190
    self.cfg.MarkInstanceDown(instance.name)
2191
    _ShutdownInstanceDisks(instance, self.cfg)
2192

    
2193

    
2194
class LUReinstallInstance(LogicalUnit):
2195
  """Reinstall an instance.
2196

2197
  """
2198
  HPATH = "instance-reinstall"
2199
  HTYPE = constants.HTYPE_INSTANCE
2200
  _OP_REQP = ["instance_name"]
2201

    
2202
  def BuildHooksEnv(self):
2203
    """Build hooks env.
2204

2205
    This runs on master, primary and secondary nodes of the instance.
2206

2207
    """
2208
    env = _BuildInstanceHookEnvByObject(self.instance)
2209
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2210
          list(self.instance.secondary_nodes))
2211
    return env, nl, nl
2212

    
2213
  def CheckPrereq(self):
2214
    """Check prerequisites.
2215

2216
    This checks that the instance is in the cluster and is not running.
2217

2218
    """
2219
    instance = self.cfg.GetInstanceInfo(
2220
      self.cfg.ExpandInstanceName(self.op.instance_name))
2221
    if instance is None:
2222
      raise errors.OpPrereqError("Instance '%s' not known" %
2223
                                 self.op.instance_name)
2224
    if instance.disk_template == constants.DT_DISKLESS:
2225
      raise errors.OpPrereqError("Instance '%s' has no disks" %
2226
                                 self.op.instance_name)
2227
    if instance.status != "down":
2228
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2229
                                 self.op.instance_name)
2230
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2231
    if remote_info:
2232
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2233
                                 (self.op.instance_name,
2234
                                  instance.primary_node))
2235

    
2236
    self.op.os_type = getattr(self.op, "os_type", None)
2237
    if self.op.os_type is not None:
2238
      # OS verification
2239
      pnode = self.cfg.GetNodeInfo(
2240
        self.cfg.ExpandNodeName(instance.primary_node))
2241
      if pnode is None:
2242
        raise errors.OpPrereqError("Primary node '%s' is unknown" %
2243
                                   self.op.pnode)
2244
      os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2245
      if not os_obj:
2246
        raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2247
                                   " primary node"  % self.op.os_type)
2248

    
2249
    self.instance = instance
2250

    
2251
  def Exec(self, feedback_fn):
2252
    """Reinstall the instance.
2253

2254
    """
2255
    inst = self.instance
2256

    
2257
    if self.op.os_type is not None:
2258
      feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2259
      inst.os = self.op.os_type
2260
      self.cfg.AddInstance(inst)
2261

    
2262
    _StartInstanceDisks(self.cfg, inst, None)
2263
    try:
2264
      feedback_fn("Running the instance OS create scripts...")
2265
      if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2266
        raise errors.OpExecError("Could not install OS for instance %s"
2267
                                 " on node %s" %
2268
                                 (inst.name, inst.primary_node))
2269
    finally:
2270
      _ShutdownInstanceDisks(inst, self.cfg)
2271

    
2272

    
2273
class LURenameInstance(LogicalUnit):
2274
  """Rename an instance.
2275

2276
  """
2277
  HPATH = "instance-rename"
2278
  HTYPE = constants.HTYPE_INSTANCE
2279
  _OP_REQP = ["instance_name", "new_name"]
2280

    
2281
  def BuildHooksEnv(self):
2282
    """Build hooks env.
2283

2284
    This runs on master, primary and secondary nodes of the instance.
2285

2286
    """
2287
    env = _BuildInstanceHookEnvByObject(self.instance)
2288
    env["INSTANCE_NEW_NAME"] = self.op.new_name
2289
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2290
          list(self.instance.secondary_nodes))
2291
    return env, nl, nl
2292

    
2293
  def CheckPrereq(self):
2294
    """Check prerequisites.
2295

2296
    This checks that the instance is in the cluster and is not running.
2297

2298
    """
2299
    instance = self.cfg.GetInstanceInfo(
2300
      self.cfg.ExpandInstanceName(self.op.instance_name))
2301
    if instance is None:
2302
      raise errors.OpPrereqError("Instance '%s' not known" %
2303
                                 self.op.instance_name)
2304
    if instance.status != "down":
2305
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2306
                                 self.op.instance_name)
2307
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2308
    if remote_info:
2309
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2310
                                 (self.op.instance_name,
2311
                                  instance.primary_node))
2312
    self.instance = instance
2313

    
2314
    # new name verification
2315
    name_info = utils.HostInfo(self.op.new_name)
2316

    
2317
    self.op.new_name = new_name = name_info.name
2318
    if not getattr(self.op, "ignore_ip", False):
2319
      command = ["fping", "-q", name_info.ip]
2320
      result = utils.RunCmd(command)
2321
      if not result.failed:
2322
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
2323
                                   (name_info.ip, new_name))
2324

    
2325

    
2326
  def Exec(self, feedback_fn):
2327
    """Reinstall the instance.
2328

2329
    """
2330
    inst = self.instance
2331
    old_name = inst.name
2332

    
2333
    self.cfg.RenameInstance(inst.name, self.op.new_name)
2334

    
2335
    # re-read the instance from the configuration after rename
2336
    inst = self.cfg.GetInstanceInfo(self.op.new_name)
2337

    
2338
    _StartInstanceDisks(self.cfg, inst, None)
2339
    try:
2340
      if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2341
                                          "sda", "sdb"):
2342
        msg = ("Could run OS rename script for instance %s on node %s (but the"
2343
               " instance has been renamed in Ganeti)" %
2344
               (inst.name, inst.primary_node))
2345
        logger.Error(msg)
2346
    finally:
2347
      _ShutdownInstanceDisks(inst, self.cfg)
2348

    
2349

    
2350
class LURemoveInstance(LogicalUnit):
2351
  """Remove an instance.
2352

2353
  """
2354
  HPATH = "instance-remove"
2355
  HTYPE = constants.HTYPE_INSTANCE
2356
  _OP_REQP = ["instance_name"]
2357

    
2358
  def BuildHooksEnv(self):
2359
    """Build hooks env.
2360

2361
    This runs on master, primary and secondary nodes of the instance.
2362

2363
    """
2364
    env = _BuildInstanceHookEnvByObject(self.instance)
2365
    nl = [self.sstore.GetMasterNode()]
2366
    return env, nl, nl
2367

    
2368
  def CheckPrereq(self):
2369
    """Check prerequisites.
2370

2371
    This checks that the instance is in the cluster.
2372

2373
    """
2374
    instance = self.cfg.GetInstanceInfo(
2375
      self.cfg.ExpandInstanceName(self.op.instance_name))
2376
    if instance is None:
2377
      raise errors.OpPrereqError("Instance '%s' not known" %
2378
                                 self.op.instance_name)
2379
    self.instance = instance
2380

    
2381
  def Exec(self, feedback_fn):
2382
    """Remove the instance.
2383

2384
    """
2385
    instance = self.instance
2386
    logger.Info("shutting down instance %s on node %s" %
2387
                (instance.name, instance.primary_node))
2388

    
2389
    if not rpc.call_instance_shutdown(instance.primary_node, instance):
2390
      if self.op.ignore_failures:
2391
        feedback_fn("Warning: can't shutdown instance")
2392
      else:
2393
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2394
                                 (instance.name, instance.primary_node))
2395

    
2396
    logger.Info("removing block devices for instance %s" % instance.name)
2397

    
2398
    if not _RemoveDisks(instance, self.cfg):
2399
      if self.op.ignore_failures:
2400
        feedback_fn("Warning: can't remove instance's disks")
2401
      else:
2402
        raise errors.OpExecError("Can't remove instance's disks")
2403

    
2404
    logger.Info("removing instance %s out of cluster config" % instance.name)
2405

    
2406
    self.cfg.RemoveInstance(instance.name)
2407

    
2408

    
2409
class LUQueryInstances(NoHooksLU):
2410
  """Logical unit for querying instances.
2411

2412
  """
2413
  _OP_REQP = ["output_fields", "names"]
2414

    
2415
  def CheckPrereq(self):
2416
    """Check prerequisites.
2417

2418
    This checks that the fields required are valid output fields.
2419

2420
    """
2421
    self.dynamic_fields = frozenset(["oper_state", "oper_ram", "status"])
2422
    _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2423
                               "admin_state", "admin_ram",
2424
                               "disk_template", "ip", "mac", "bridge",
2425
                               "sda_size", "sdb_size", "vcpus"],
2426
                       dynamic=self.dynamic_fields,
2427
                       selected=self.op.output_fields)
2428

    
2429
    self.wanted = _GetWantedInstances(self, self.op.names)
2430

    
2431
  def Exec(self, feedback_fn):
2432
    """Computes the list of nodes and their attributes.
2433

2434
    """
2435
    instance_names = self.wanted
2436
    instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2437
                     in instance_names]
2438

    
2439
    # begin data gathering
2440

    
2441
    nodes = frozenset([inst.primary_node for inst in instance_list])
2442

    
2443
    bad_nodes = []
2444
    if self.dynamic_fields.intersection(self.op.output_fields):
2445
      live_data = {}
2446
      node_data = rpc.call_all_instances_info(nodes)
2447
      for name in nodes:
2448
        result = node_data[name]
2449
        if result:
2450
          live_data.update(result)
2451
        elif result == False:
2452
          bad_nodes.append(name)
2453
        # else no instance is alive
2454
    else:
2455
      live_data = dict([(name, {}) for name in instance_names])
2456

    
2457
    # end data gathering
2458

    
2459
    output = []
2460
    for instance in instance_list:
2461
      iout = []
2462
      for field in self.op.output_fields:
2463
        if field == "name":
2464
          val = instance.name
2465
        elif field == "os":
2466
          val = instance.os
2467
        elif field == "pnode":
2468
          val = instance.primary_node
2469
        elif field == "snodes":
2470
          val = list(instance.secondary_nodes)
2471
        elif field == "admin_state":
2472
          val = (instance.status != "down")
2473
        elif field == "oper_state":
2474
          if instance.primary_node in bad_nodes:
2475
            val = None
2476
          else:
2477
            val = bool(live_data.get(instance.name))
2478
        elif field == "status":
2479
          if instance.primary_node in bad_nodes:
2480
            val = "ERROR_nodedown"
2481
          else:
2482
            running = bool(live_data.get(instance.name))
2483
            if running:
2484
              if instance.status != "down":
2485
                val = "running"
2486
              else:
2487
                val = "ERROR_up"
2488
            else:
2489
              if instance.status != "down":
2490
                val = "ERROR_down"
2491
              else:
2492
                val = "ADMIN_down"
2493
        elif field == "admin_ram":
2494
          val = instance.memory
2495
        elif field == "oper_ram":
2496
          if instance.primary_node in bad_nodes:
2497
            val = None
2498
          elif instance.name in live_data:
2499
            val = live_data[instance.name].get("memory", "?")
2500
          else:
2501
            val = "-"
2502
        elif field == "disk_template":
2503
          val = instance.disk_template
2504
        elif field == "ip":
2505
          val = instance.nics[0].ip
2506
        elif field == "bridge":
2507
          val = instance.nics[0].bridge
2508
        elif field == "mac":
2509
          val = instance.nics[0].mac
2510
        elif field == "sda_size" or field == "sdb_size":
2511
          disk = instance.FindDisk(field[:3])
2512
          if disk is None:
2513
            val = None
2514
          else:
2515
            val = disk.size
2516
        elif field == "vcpus":
2517
          val = instance.vcpus
2518
        else:
2519
          raise errors.ParameterError(field)
2520
        iout.append(val)
2521
      output.append(iout)
2522

    
2523
    return output
2524

    
2525

    
2526
class LUFailoverInstance(LogicalUnit):
2527
  """Failover an instance.
2528

2529
  """
2530
  HPATH = "instance-failover"
2531
  HTYPE = constants.HTYPE_INSTANCE
2532
  _OP_REQP = ["instance_name", "ignore_consistency"]
2533

    
2534
  def BuildHooksEnv(self):
2535
    """Build hooks env.
2536

2537
    This runs on master, primary and secondary nodes of the instance.
2538

2539
    """
2540
    env = {
2541
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2542
      }
2543
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2544
    nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2545
    return env, nl, nl
2546

    
2547
  def CheckPrereq(self):
2548
    """Check prerequisites.
2549

2550
    This checks that the instance is in the cluster.
2551

2552
    """
2553
    instance = self.cfg.GetInstanceInfo(
2554
      self.cfg.ExpandInstanceName(self.op.instance_name))
2555
    if instance is None:
2556
      raise errors.OpPrereqError("Instance '%s' not known" %
2557
                                 self.op.instance_name)
2558

    
2559
    if instance.disk_template not in constants.DTS_NET_MIRROR:
2560
      raise errors.OpPrereqError("Instance's disk layout is not"
2561
                                 " network mirrored, cannot failover.")
2562

    
2563
    secondary_nodes = instance.secondary_nodes
2564
    if not secondary_nodes:
2565
      raise errors.ProgrammerError("no secondary node but using "
2566
                                   "DT_REMOTE_RAID1 template")
2567

    
2568
    target_node = secondary_nodes[0]
2569
    # check memory requirements on the secondary node
2570
    _CheckNodeFreeMemory(self.cfg, target_node, "failing over instance %s" %
2571
                         instance.name, instance.memory)
2572

    
2573
    # check bridge existance
2574
    brlist = [nic.bridge for nic in instance.nics]
2575
    if not rpc.call_bridges_exist(target_node, brlist):
2576
      raise errors.OpPrereqError("One or more target bridges %s does not"
2577
                                 " exist on destination node '%s'" %
2578
                                 (brlist, target_node))
2579

    
2580
    self.instance = instance
2581

    
2582
  def Exec(self, feedback_fn):
2583
    """Failover an instance.
2584

2585
    The failover is done by shutting it down on its present node and
2586
    starting it on the secondary.
2587

2588
    """
2589
    instance = self.instance
2590

    
2591
    source_node = instance.primary_node
2592
    target_node = instance.secondary_nodes[0]
2593

    
2594
    feedback_fn("* checking disk consistency between source and target")
2595
    for dev in instance.disks:
2596
      # for remote_raid1, these are md over drbd
2597
      if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2598
        if not self.op.ignore_consistency:
2599
          raise errors.OpExecError("Disk %s is degraded on target node,"
2600
                                   " aborting failover." % dev.iv_name)
2601

    
2602
    feedback_fn("* shutting down instance on source node")
2603
    logger.Info("Shutting down instance %s on node %s" %
2604
                (instance.name, source_node))
2605

    
2606
    if not rpc.call_instance_shutdown(source_node, instance):
2607
      if self.op.ignore_consistency:
2608
        logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2609
                     " anyway. Please make sure node %s is down"  %
2610
                     (instance.name, source_node, source_node))
2611
      else:
2612
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2613
                                 (instance.name, source_node))
2614

    
2615
    feedback_fn("* deactivating the instance's disks on source node")
2616
    if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2617
      raise errors.OpExecError("Can't shut down the instance's disks.")
2618

    
2619
    instance.primary_node = target_node
2620
    # distribute new instance config to the other nodes
2621
    self.cfg.AddInstance(instance)
2622

    
2623
    feedback_fn("* activating the instance's disks on target node")
2624
    logger.Info("Starting instance %s on node %s" %
2625
                (instance.name, target_node))
2626

    
2627
    disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2628
                                             ignore_secondaries=True)
2629
    if not disks_ok:
2630
      _ShutdownInstanceDisks(instance, self.cfg)
2631
      raise errors.OpExecError("Can't activate the instance's disks")
2632

    
2633
    feedback_fn("* starting the instance on the target node")
2634
    if not rpc.call_instance_start(target_node, instance, None):
2635
      _ShutdownInstanceDisks(instance, self.cfg)
2636
      raise errors.OpExecError("Could not start instance %s on node %s." %
2637
                               (instance.name, target_node))
2638

    
2639

    
2640
def _CreateBlockDevOnPrimary(cfg, node, instance, device, info):
2641
  """Create a tree of block devices on the primary node.
2642

2643
  This always creates all devices.
2644

2645
  """
2646
  if device.children:
2647
    for child in device.children:
2648
      if not _CreateBlockDevOnPrimary(cfg, node, instance, child, info):
2649
        return False
2650

    
2651
  cfg.SetDiskID(device, node)
2652
  new_id = rpc.call_blockdev_create(node, device, device.size,
2653
                                    instance.name, True, info)
2654
  if not new_id:
2655
    return False
2656
  if device.physical_id is None:
2657
    device.physical_id = new_id
2658
  return True
2659

    
2660

    
2661
def _CreateBlockDevOnSecondary(cfg, node, instance, device, force, info):
2662
  """Create a tree of block devices on a secondary node.
2663

2664
  If this device type has to be created on secondaries, create it and
2665
  all its children.
2666

2667
  If not, just recurse to children keeping the same 'force' value.
2668

2669
  """
2670
  if device.CreateOnSecondary():
2671
    force = True
2672
  if device.children:
2673
    for child in device.children:
2674
      if not _CreateBlockDevOnSecondary(cfg, node, instance,
2675
                                        child, force, info):
2676
        return False
2677

    
2678
  if not force:
2679
    return True
2680
  cfg.SetDiskID(device, node)
2681
  new_id = rpc.call_blockdev_create(node, device, device.size,
2682
                                    instance.name, False, info)
2683
  if not new_id:
2684
    return False
2685
  if device.physical_id is None:
2686
    device.physical_id = new_id
2687
  return True
2688

    
2689

    
2690
def _GenerateUniqueNames(cfg, exts):
2691
  """Generate a suitable LV name.
2692

2693
  This will generate a logical volume name for the given instance.
2694

2695
  """
2696
  results = []
2697
  for val in exts:
2698
    new_id = cfg.GenerateUniqueID()
2699
    results.append("%s%s" % (new_id, val))
2700
  return results
2701

    
2702

    
2703
def _GenerateMDDRBDBranch(cfg, primary, secondary, size, names):
2704
  """Generate a drbd device complete with its children.
2705

2706
  """
2707
  port = cfg.AllocatePort()
2708
  vgname = cfg.GetVGName()
2709
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2710
                          logical_id=(vgname, names[0]))
2711
  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2712
                          logical_id=(vgname, names[1]))
2713
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD7, size=size,
2714
                          logical_id = (primary, secondary, port),
2715
                          children = [dev_data, dev_meta])
2716
  return drbd_dev
2717

    
2718

    
2719
def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name):
2720
  """Generate a drbd8 device complete with its children.
2721

2722
  """
2723
  port = cfg.AllocatePort()
2724
  vgname = cfg.GetVGName()
2725
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2726
                          logical_id=(vgname, names[0]))
2727
  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2728
                          logical_id=(vgname, names[1]))
2729
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
2730
                          logical_id = (primary, secondary, port),
2731
                          children = [dev_data, dev_meta],
2732
                          iv_name=iv_name)
2733
  return drbd_dev
2734

    
2735
def _GenerateDiskTemplate(cfg, template_name,
2736
                          instance_name, primary_node,
2737
                          secondary_nodes, disk_sz, swap_sz):
2738
  """Generate the entire disk layout for a given template type.
2739

2740
  """
2741
  #TODO: compute space requirements
2742

    
2743
  vgname = cfg.GetVGName()
2744
  if template_name == "diskless":
2745
    disks = []
2746
  elif template_name == "plain":
2747
    if len(secondary_nodes) != 0:
2748
      raise errors.ProgrammerError("Wrong template configuration")
2749

    
2750
    names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
2751
    sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2752
                           logical_id=(vgname, names[0]),
2753
                           iv_name = "sda")
2754
    sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2755
                           logical_id=(vgname, names[1]),
2756
                           iv_name = "sdb")
2757
    disks = [sda_dev, sdb_dev]
2758
  elif template_name == "local_raid1":
2759
    if len(secondary_nodes) != 0:
2760
      raise errors.ProgrammerError("Wrong template configuration")
2761

    
2762

    
2763
    names = _GenerateUniqueNames(cfg, [".sda_m1", ".sda_m2",
2764
                                       ".sdb_m1", ".sdb_m2"])
2765
    sda_dev_m1 = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2766
                              logical_id=(vgname, names[0]))
2767
    sda_dev_m2 = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2768
                              logical_id=(vgname, names[1]))
2769
    md_sda_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name = "sda",
2770
                              size=disk_sz,
2771
                              children = [sda_dev_m1, sda_dev_m2])
2772
    sdb_dev_m1 = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2773
                              logical_id=(vgname, names[2]))
2774
    sdb_dev_m2 = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2775
                              logical_id=(vgname, names[3]))
2776
    md_sdb_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name = "sdb",
2777
                              size=swap_sz,
2778
                              children = [sdb_dev_m1, sdb_dev_m2])
2779
    disks = [md_sda_dev, md_sdb_dev]
2780
  elif template_name == constants.DT_REMOTE_RAID1:
2781
    if len(secondary_nodes) != 1:
2782
      raise errors.ProgrammerError("Wrong template configuration")
2783
    remote_node = secondary_nodes[0]
2784
    names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2785
                                       ".sdb_data", ".sdb_meta"])
2786
    drbd_sda_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2787
                                         disk_sz, names[0:2])
2788
    md_sda_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name="sda",
2789
                              children = [drbd_sda_dev], size=disk_sz)
2790
    drbd_sdb_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2791
                                         swap_sz, names[2:4])
2792
    md_sdb_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name="sdb",
2793
                              children = [drbd_sdb_dev], size=swap_sz)
2794
    disks = [md_sda_dev, md_sdb_dev]
2795
  elif template_name == constants.DT_DRBD8:
2796
    if len(secondary_nodes) != 1:
2797
      raise errors.ProgrammerError("Wrong template configuration")
2798
    remote_node = secondary_nodes[0]
2799
    names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2800
                                       ".sdb_data", ".sdb_meta"])
2801
    drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2802
                                         disk_sz, names[0:2], "sda")
2803
    drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2804
                                         swap_sz, names[2:4], "sdb")
2805
    disks = [drbd_sda_dev, drbd_sdb_dev]
2806
  else:
2807
    raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
2808
  return disks
2809

    
2810

    
2811
def _GetInstanceInfoText(instance):
2812
  """Compute that text that should be added to the disk's metadata.
2813

2814
  """
2815
  return "originstname+%s" % instance.name
2816

    
2817

    
2818
def _CreateDisks(cfg, instance):
2819
  """Create all disks for an instance.
2820

2821
  This abstracts away some work from AddInstance.
2822

2823
  Args:
2824
    instance: the instance object
2825

2826
  Returns:
2827
    True or False showing the success of the creation process
2828

2829
  """
2830
  info = _GetInstanceInfoText(instance)
2831

    
2832
  for device in instance.disks:
2833
    logger.Info("creating volume %s for instance %s" %
2834
              (device.iv_name, instance.name))
2835
    #HARDCODE
2836
    for secondary_node in instance.secondary_nodes:
2837
      if not _CreateBlockDevOnSecondary(cfg, secondary_node, instance,
2838
                                        device, False, info):
2839
        logger.Error("failed to create volume %s (%s) on secondary node %s!" %
2840
                     (device.iv_name, device, secondary_node))
2841
        return False
2842
    #HARDCODE
2843
    if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
2844
                                    instance, device, info):
2845
      logger.Error("failed to create volume %s on primary!" %
2846
                   device.iv_name)
2847
      return False
2848
  return True
2849

    
2850

    
2851
def _RemoveDisks(instance, cfg):
2852
  """Remove all disks for an instance.
2853

2854
  This abstracts away some work from `AddInstance()` and
2855
  `RemoveInstance()`. Note that in case some of the devices couldn't
2856
  be removed, the removal will continue with the other ones (compare
2857
  with `_CreateDisks()`).
2858

2859
  Args:
2860
    instance: the instance object
2861

2862
  Returns:
2863
    True or False showing the success of the removal proces
2864

2865
  """
2866
  logger.Info("removing block devices for instance %s" % instance.name)
2867

    
2868
  result = True
2869
  for device in instance.disks:
2870
    for node, disk in device.ComputeNodeTree(instance.primary_node):
2871
      cfg.SetDiskID(disk, node)
2872
      if not rpc.call_blockdev_remove(node, disk):
2873
        logger.Error("could not remove block device %s on node %s,"
2874
                     " continuing anyway" %
2875
                     (device.iv_name, node))
2876
        result = False
2877
  return result
2878

    
2879

    
2880
class LUCreateInstance(LogicalUnit):
2881
  """Create an instance.
2882

2883
  """
2884
  HPATH = "instance-add"
2885
  HTYPE = constants.HTYPE_INSTANCE
2886
  _OP_REQP = ["instance_name", "mem_size", "disk_size", "pnode",
2887
              "disk_template", "swap_size", "mode", "start", "vcpus",
2888
              "wait_for_sync", "ip_check", "mac"]
2889

    
2890
  def BuildHooksEnv(self):
2891
    """Build hooks env.
2892

2893
    This runs on master, primary and secondary nodes of the instance.
2894

2895
    """
2896
    env = {
2897
      "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
2898
      "INSTANCE_DISK_SIZE": self.op.disk_size,
2899
      "INSTANCE_SWAP_SIZE": self.op.swap_size,
2900
      "INSTANCE_ADD_MODE": self.op.mode,
2901
      }
2902
    if self.op.mode == constants.INSTANCE_IMPORT:
2903
      env["INSTANCE_SRC_NODE"] = self.op.src_node
2904
      env["INSTANCE_SRC_PATH"] = self.op.src_path
2905
      env["INSTANCE_SRC_IMAGE"] = self.src_image
2906

    
2907
    env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
2908
      primary_node=self.op.pnode,
2909
      secondary_nodes=self.secondaries,
2910
      status=self.instance_status,
2911
      os_type=self.op.os_type,
2912
      memory=self.op.mem_size,
2913
      vcpus=self.op.vcpus,
2914
      nics=[(self.inst_ip, self.op.bridge, self.op.mac)],
2915
    ))
2916

    
2917
    nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
2918
          self.secondaries)
2919
    return env, nl, nl
2920

    
2921

    
2922
  def CheckPrereq(self):
2923
    """Check prerequisites.
2924

2925
    """
2926
    for attr in ["kernel_path", "initrd_path", "hvm_boot_order"]:
2927
      if not hasattr(self.op, attr):
2928
        setattr(self.op, attr, None)
2929

    
2930
    if self.op.mode not in (constants.INSTANCE_CREATE,
2931
                            constants.INSTANCE_IMPORT):
2932
      raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
2933
                                 self.op.mode)
2934

    
2935
    if self.op.mode == constants.INSTANCE_IMPORT:
2936
      src_node = getattr(self.op, "src_node", None)
2937
      src_path = getattr(self.op, "src_path", None)
2938
      if src_node is None or src_path is None:
2939
        raise errors.OpPrereqError("Importing an instance requires source"
2940
                                   " node and path options")
2941
      src_node_full = self.cfg.ExpandNodeName(src_node)
2942
      if src_node_full is None:
2943
        raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
2944
      self.op.src_node = src_node = src_node_full
2945

    
2946
      if not os.path.isabs(src_path):
2947
        raise errors.OpPrereqError("The source path must be absolute")
2948

    
2949
      export_info = rpc.call_export_info(src_node, src_path)
2950

    
2951
      if not export_info:
2952
        raise errors.OpPrereqError("No export found in dir %s" % src_path)
2953

    
2954
      if not export_info.has_section(constants.INISECT_EXP):
2955
        raise errors.ProgrammerError("Corrupted export config")
2956

    
2957
      ei_version = export_info.get(constants.INISECT_EXP, 'version')
2958
      if (int(ei_version) != constants.EXPORT_VERSION):
2959
        raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
2960
                                   (ei_version, constants.EXPORT_VERSION))
2961

    
2962
      if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
2963
        raise errors.OpPrereqError("Can't import instance with more than"
2964
                                   " one data disk")
2965

    
2966
      # FIXME: are the old os-es, disk sizes, etc. useful?
2967
      self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
2968
      diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
2969
                                                         'disk0_dump'))
2970
      self.src_image = diskimage
2971
    else: # INSTANCE_CREATE
2972
      if getattr(self.op, "os_type", None) is None:
2973
        raise errors.OpPrereqError("No guest OS specified")
2974

    
2975
    # check primary node
2976
    pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
2977
    if pnode is None:
2978
      raise errors.OpPrereqError("Primary node '%s' is unknown" %
2979
                                 self.op.pnode)
2980
    self.op.pnode = pnode.name
2981
    self.pnode = pnode
2982
    self.secondaries = []
2983
    # disk template and mirror node verification
2984
    if self.op.disk_template not in constants.DISK_TEMPLATES:
2985
      raise errors.OpPrereqError("Invalid disk template name")
2986

    
2987
    if self.op.disk_template in constants.DTS_NET_MIRROR:
2988
      if getattr(self.op, "snode", None) is None:
2989
        raise errors.OpPrereqError("The networked disk templates need"
2990
                                   " a mirror node")
2991

    
2992
      snode_name = self.cfg.ExpandNodeName(self.op.snode)
2993
      if snode_name is None:
2994
        raise errors.OpPrereqError("Unknown secondary node '%s'" %
2995
                                   self.op.snode)
2996
      elif snode_name == pnode.name:
2997
        raise errors.OpPrereqError("The secondary node cannot be"
2998
                                   " the primary node.")
2999
      self.secondaries.append(snode_name)
3000

    
3001
    # Required free disk space as a function of disk and swap space
3002
    req_size_dict = {
3003
      constants.DT_DISKLESS: None,
3004
      constants.DT_PLAIN: self.op.disk_size + self.op.swap_size,
3005
      constants.DT_LOCAL_RAID1: (self.op.disk_size + self.op.swap_size) * 2,
3006
      # 256 MB are added for drbd metadata, 128MB for each drbd device
3007
      constants.DT_REMOTE_RAID1: self.op.disk_size + self.op.swap_size + 256,
3008
      constants.DT_DRBD8: self.op.disk_size + self.op.swap_size + 256,
3009
    }
3010

    
3011
    if self.op.disk_template not in req_size_dict:
3012
      raise errors.ProgrammerError("Disk template '%s' size requirement"
3013
                                   " is unknown" %  self.op.disk_template)
3014

    
3015
    req_size = req_size_dict[self.op.disk_template]
3016

    
3017
    # Check lv size requirements
3018
    if req_size is not None:
3019
      nodenames = [pnode.name] + self.secondaries
3020
      nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
3021
      for node in nodenames:
3022
        info = nodeinfo.get(node, None)
3023
        if not info:
3024
          raise errors.OpPrereqError("Cannot get current information"
3025
                                     " from node '%s'" % nodeinfo)
3026
        vg_free = info.get('vg_free', None)
3027
        if not isinstance(vg_free, int):
3028
          raise errors.OpPrereqError("Can't compute free disk space on"
3029
                                     " node %s" % node)
3030
        if req_size > info['vg_free']:
3031
          raise errors.OpPrereqError("Not enough disk space on target node %s."
3032
                                     " %d MB available, %d MB required" %
3033
                                     (node, info['vg_free'], req_size))
3034

    
3035
    # os verification
3036
    os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
3037
    if not os_obj:
3038
      raise errors.OpPrereqError("OS '%s' not in supported os list for"
3039
                                 " primary node"  % self.op.os_type)
3040

    
3041
    if self.op.kernel_path == constants.VALUE_NONE:
3042
      raise errors.OpPrereqError("Can't set instance kernel to none")
3043

    
3044
    # instance verification
3045
    hostname1 = utils.HostInfo(self.op.instance_name)
3046

    
3047
    self.op.instance_name = instance_name = hostname1.name
3048
    instance_list = self.cfg.GetInstanceList()
3049
    if instance_name in instance_list:
3050
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3051
                                 instance_name)
3052

    
3053
    ip = getattr(self.op, "ip", None)
3054
    if ip is None or ip.lower() == "none":
3055
      inst_ip = None
3056
    elif ip.lower() == "auto":
3057
      inst_ip = hostname1.ip
3058
    else:
3059
      if not utils.IsValidIP(ip):
3060
        raise errors.OpPrereqError("given IP address '%s' doesn't look"
3061
                                   " like a valid IP" % ip)
3062
      inst_ip = ip
3063
    self.inst_ip = inst_ip
3064

    
3065
    if self.op.start and not self.op.ip_check:
3066
      raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3067
                                 " adding an instance in start mode")
3068

    
3069
    if self.op.ip_check:
3070
      if utils.TcpPing(utils.HostInfo().name, hostname1.ip,
3071
                       constants.DEFAULT_NODED_PORT):
3072
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
3073
                                   (hostname1.ip, instance_name))
3074

    
3075
    # MAC address verification
3076
    if self.op.mac != "auto":
3077
      if not utils.IsValidMac(self.op.mac.lower()):
3078
        raise errors.OpPrereqError("invalid MAC address specified: %s" %
3079
                                   self.op.mac)
3080

    
3081
    # bridge verification
3082
    bridge = getattr(self.op, "bridge", None)
3083
    if bridge is None:
3084
      self.op.bridge = self.cfg.GetDefBridge()
3085
    else:
3086
      self.op.bridge = bridge
3087

    
3088
    if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3089
      raise errors.OpPrereqError("target bridge '%s' does not exist on"
3090
                                 " destination node '%s'" %
3091
                                 (self.op.bridge, pnode.name))
3092

    
3093
    # boot order verification
3094
    if self.op.hvm_boot_order is not None:
3095
      if len(self.op.hvm_boot_order.strip("acdn")) != 0:
3096
        raise errors.OpPrereqError("invalid boot order specified,"
3097
                                   " must be one or more of [acdn]")
3098

    
3099
    if self.op.start:
3100
      self.instance_status = 'up'
3101
    else:
3102
      self.instance_status = 'down'
3103

    
3104
  def Exec(self, feedback_fn):
3105
    """Create and add the instance to the cluster.
3106

3107
    """
3108
    instance = self.op.instance_name
3109
    pnode_name = self.pnode.name
3110

    
3111
    if self.op.mac == "auto":
3112
      mac_address = self.cfg.GenerateMAC()
3113
    else:
3114
      mac_address = self.op.mac
3115

    
3116
    nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3117
    if self.inst_ip is not None:
3118
      nic.ip = self.inst_ip
3119

    
3120
    ht_kind = self.sstore.GetHypervisorType()
3121
    if ht_kind in constants.HTS_REQ_PORT:
3122
      network_port = self.cfg.AllocatePort()
3123
    else:
3124
      network_port = None
3125

    
3126
    disks = _GenerateDiskTemplate(self.cfg,
3127
                                  self.op.disk_template,
3128
                                  instance, pnode_name,
3129
                                  self.secondaries, self.op.disk_size,
3130
                                  self.op.swap_size)
3131

    
3132
    iobj = objects.Instance(name=instance, os=self.op.os_type,
3133
                            primary_node=pnode_name,
3134
                            memory=self.op.mem_size,
3135
                            vcpus=self.op.vcpus,
3136
                            nics=[nic], disks=disks,
3137
                            disk_template=self.op.disk_template,
3138
                            status=self.instance_status,
3139
                            network_port=network_port,
3140
                            kernel_path=self.op.kernel_path,
3141
                            initrd_path=self.op.initrd_path,
3142
                            hvm_boot_order=self.op.hvm_boot_order,
3143
                            )
3144

    
3145
    feedback_fn("* creating instance disks...")
3146
    if not _CreateDisks(self.cfg, iobj):
3147
      _RemoveDisks(iobj, self.cfg)
3148
      raise errors.OpExecError("Device creation failed, reverting...")
3149

    
3150
    feedback_fn("adding instance %s to cluster config" % instance)
3151

    
3152
    self.cfg.AddInstance(iobj)
3153

    
3154
    if self.op.wait_for_sync:
3155
      disk_abort = not _WaitForSync(self.cfg, iobj, self.proc)
3156
    elif iobj.disk_template in constants.DTS_NET_MIRROR:
3157
      # make sure the disks are not degraded (still sync-ing is ok)
3158
      time.sleep(15)
3159
      feedback_fn("* checking mirrors status")
3160
      disk_abort = not _WaitForSync(self.cfg, iobj, self.proc, oneshot=True)
3161
    else:
3162
      disk_abort = False
3163

    
3164
    if disk_abort:
3165
      _RemoveDisks(iobj, self.cfg)
3166
      self.cfg.RemoveInstance(iobj.name)
3167
      raise errors.OpExecError("There are some degraded disks for"
3168
                               " this instance")
3169

    
3170
    feedback_fn("creating os for instance %s on node %s" %
3171
                (instance, pnode_name))
3172

    
3173
    if iobj.disk_template != constants.DT_DISKLESS:
3174
      if self.op.mode == constants.INSTANCE_CREATE:
3175
        feedback_fn("* running the instance OS create scripts...")
3176
        if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3177
          raise errors.OpExecError("could not add os for instance %s"
3178
                                   " on node %s" %
3179
                                   (instance, pnode_name))
3180

    
3181
      elif self.op.mode == constants.INSTANCE_IMPORT:
3182
        feedback_fn("* running the instance OS import scripts...")
3183
        src_node = self.op.src_node
3184
        src_image = self.src_image
3185
        if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3186
                                                src_node, src_image):
3187
          raise errors.OpExecError("Could not import os for instance"
3188
                                   " %s on node %s" %
3189
                                   (instance, pnode_name))
3190
      else:
3191
        # also checked in the prereq part
3192
        raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3193
                                     % self.op.mode)
3194

    
3195
    if self.op.start:
3196
      logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3197
      feedback_fn("* starting instance...")
3198
      if not rpc.call_instance_start(pnode_name, iobj, None):
3199
        raise errors.OpExecError("Could not start instance")
3200

    
3201

    
3202
class LUConnectConsole(NoHooksLU):
3203
  """Connect to an instance's console.
3204

3205
  This is somewhat special in that it returns the command line that
3206
  you need to run on the master node in order to connect to the
3207
  console.
3208

3209
  """
3210
  _OP_REQP = ["instance_name"]
3211

    
3212
  def CheckPrereq(self):
3213
    """Check prerequisites.
3214

3215
    This checks that the instance is in the cluster.
3216

3217
    """
3218
    instance = self.cfg.GetInstanceInfo(
3219
      self.cfg.ExpandInstanceName(self.op.instance_name))
3220
    if instance is None:
3221
      raise errors.OpPrereqError("Instance '%s' not known" %
3222
                                 self.op.instance_name)
3223
    self.instance = instance
3224

    
3225
  def Exec(self, feedback_fn):
3226
    """Connect to the console of an instance
3227

3228
    """
3229
    instance = self.instance
3230
    node = instance.primary_node
3231

    
3232
    node_insts = rpc.call_instance_list([node])[node]
3233
    if node_insts is False:
3234
      raise errors.OpExecError("Can't connect to node %s." % node)
3235

    
3236
    if instance.name not in node_insts:
3237
      raise errors.OpExecError("Instance %s is not running." % instance.name)
3238

    
3239
    logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3240

    
3241
    hyper = hypervisor.GetHypervisor()
3242
    console_cmd = hyper.GetShellCommandForConsole(instance)
3243
    # build ssh cmdline
3244
    argv = ["ssh", "-q", "-t"]
3245
    argv.extend(ssh.KNOWN_HOSTS_OPTS)
3246
    argv.extend(ssh.BATCH_MODE_OPTS)
3247
    argv.append(node)
3248
    argv.append(console_cmd)
3249
    return "ssh", argv
3250

    
3251

    
3252
class LUAddMDDRBDComponent(LogicalUnit):
3253
  """Adda new mirror member to an instance's disk.
3254

3255
  """
3256
  HPATH = "mirror-add"
3257
  HTYPE = constants.HTYPE_INSTANCE
3258
  _OP_REQP = ["instance_name", "remote_node", "disk_name"]
3259

    
3260
  def BuildHooksEnv(self):
3261
    """Build hooks env.
3262

3263
    This runs on the master, the primary and all the secondaries.
3264

3265
    """
3266
    env = {
3267
      "NEW_SECONDARY": self.op.remote_node,
3268
      "DISK_NAME": self.op.disk_name,
3269
      }
3270
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3271
    nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
3272
          self.op.remote_node,] + list(self.instance.secondary_nodes)
3273
    return env, nl, nl
3274

    
3275
  def CheckPrereq(self):
3276
    """Check prerequisites.
3277

3278
    This checks that the instance is in the cluster.
3279

3280
    """
3281
    instance = self.cfg.GetInstanceInfo(
3282
      self.cfg.ExpandInstanceName(self.op.instance_name))
3283
    if instance is None:
3284
      raise errors.OpPrereqError("Instance '%s' not known" %
3285
                                 self.op.instance_name)
3286
    self.instance = instance
3287

    
3288
    remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
3289
    if remote_node is None:
3290
      raise errors.OpPrereqError("Node '%s' not known" % self.op.remote_node)
3291
    self.remote_node = remote_node
3292

    
3293
    if remote_node == instance.primary_node:
3294
      raise errors.OpPrereqError("The specified node is the primary node of"
3295
                                 " the instance.")
3296

    
3297
    if instance.disk_template != constants.DT_REMOTE_RAID1:
3298
      raise errors.OpPrereqError("Instance's disk layout is not"
3299
                                 " remote_raid1.")
3300
    for disk in instance.disks:
3301
      if disk.iv_name == self.op.disk_name:
3302
        break
3303
    else:
3304
      raise errors.OpPrereqError("Can't find this device ('%s') in the"
3305
                                 " instance." % self.op.disk_name)
3306
    if len(disk.children) > 1:
3307
      raise errors.OpPrereqError("The device already has two slave devices."
3308
                                 " This would create a 3-disk raid1 which we"
3309
                                 " don't allow.")
3310
    self.disk = disk
3311

    
3312
  def Exec(self, feedback_fn):
3313
    """Add the mirror component
3314

3315
    """
3316
    disk = self.disk
3317
    instance = self.instance
3318

    
3319
    remote_node = self.remote_node
3320
    lv_names = [".%s_%s" % (disk.iv_name, suf) for suf in ["data", "meta"]]
3321
    names = _GenerateUniqueNames(self.cfg, lv_names)
3322
    new_drbd = _GenerateMDDRBDBranch(self.cfg, instance.primary_node,
3323
                                     remote_node, disk.size, names)
3324

    
3325
    logger.Info("adding new mirror component on secondary")
3326
    #HARDCODE
3327
    if not _CreateBlockDevOnSecondary(self.cfg, remote_node, instance,
3328
                                      new_drbd, False,
3329
                                      _GetInstanceInfoText(instance)):
3330
      raise errors.OpExecError("Failed to create new component on secondary"
3331
                               " node %s" % remote_node)
3332

    
3333
    logger.Info("adding new mirror component on primary")
3334
    #HARDCODE
3335
    if not _CreateBlockDevOnPrimary(self.cfg, instance.primary_node,
3336
                                    instance, new_drbd,
3337
                                    _GetInstanceInfoText(instance)):
3338
      # remove secondary dev
3339
      self.cfg.SetDiskID(new_drbd, remote_node)
3340
      rpc.call_blockdev_remove(remote_node, new_drbd)
3341
      raise errors.OpExecError("Failed to create volume on primary")
3342

    
3343
    # the device exists now
3344
    # call the primary node to add the mirror to md
3345
    logger.Info("adding new mirror component to md")
3346
    if not rpc.call_blockdev_addchildren(instance.primary_node,
3347
                                         disk, [new_drbd]):
3348
      logger.Error("Can't add mirror compoment to md!")
3349
      self.cfg.SetDiskID(new_drbd, remote_node)
3350
      if not rpc.call_blockdev_remove(remote_node, new_drbd):
3351
        logger.Error("Can't rollback on secondary")
3352
      self.cfg.SetDiskID(new_drbd, instance.primary_node)
3353
      if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3354
        logger.Error("Can't rollback on primary")
3355
      raise errors.OpExecError("Can't add mirror component to md array")
3356

    
3357
    disk.children.append(new_drbd)
3358

    
3359
    self.cfg.AddInstance(instance)
3360

    
3361
    _WaitForSync(self.cfg, instance, self.proc)
3362

    
3363
    return 0
3364

    
3365

    
3366
class LURemoveMDDRBDComponent(LogicalUnit):
3367
  """Remove a component from a remote_raid1 disk.
3368

3369
  """
3370
  HPATH = "mirror-remove"
3371
  HTYPE = constants.HTYPE_INSTANCE
3372
  _OP_REQP = ["instance_name", "disk_name", "disk_id"]
3373

    
3374
  def BuildHooksEnv(self):
3375
    """Build hooks env.
3376

3377
    This runs on the master, the primary and all the secondaries.
3378

3379
    """
3380
    env = {
3381
      "DISK_NAME": self.op.disk_name,
3382
      "DISK_ID": self.op.disk_id,
3383
      "OLD_SECONDARY": self.old_secondary,
3384
      }
3385
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3386
    nl = [self.sstore.GetMasterNode(),
3387
          self.instance.primary_node] + list(self.instance.secondary_nodes)
3388
    return env, nl, nl
3389

    
3390
  def CheckPrereq(self):
3391
    """Check prerequisites.
3392

3393
    This checks that the instance is in the cluster.
3394

3395
    """
3396
    instance = self.cfg.GetInstanceInfo(
3397
      self.cfg.ExpandInstanceName(self.op.instance_name))
3398
    if instance is None:
3399
      raise errors.OpPrereqError("Instance '%s' not known" %
3400
                                 self.op.instance_name)
3401
    self.instance = instance
3402

    
3403
    if instance.disk_template != constants.DT_REMOTE_RAID1:
3404
      raise errors.OpPrereqError("Instance's disk layout is not"
3405
                                 " remote_raid1.")
3406
    for disk in instance.disks:
3407
      if disk.iv_name == self.op.disk_name:
3408
        break
3409
    else:
3410
      raise errors.OpPrereqError("Can't find this device ('%s') in the"
3411
                                 " instance." % self.op.disk_name)
3412
    for child in disk.children:
3413
      if (child.dev_type == constants.LD_DRBD7 and
3414
          child.logical_id[2] == self.op.disk_id):
3415
        break
3416
    else:
3417
      raise errors.OpPrereqError("Can't find the device with this port.")
3418

    
3419
    if len(disk.children) < 2:
3420
      raise errors.OpPrereqError("Cannot remove the last component from"
3421
                                 " a mirror.")
3422
    self.disk = disk
3423
    self.child = child
3424
    if self.child.logical_id[0] == instance.primary_node:
3425
      oid = 1
3426
    else:
3427
      oid = 0
3428
    self.old_secondary = self.child.logical_id[oid]
3429

    
3430
  def Exec(self, feedback_fn):
3431
    """Remove the mirror component
3432

3433
    """
3434
    instance = self.instance
3435
    disk = self.disk
3436
    child = self.child
3437
    logger.Info("remove mirror component")
3438
    self.cfg.SetDiskID(disk, instance.primary_node)
3439
    if not rpc.call_blockdev_removechildren(instance.primary_node,
3440
                                            disk, [child]):
3441
      raise errors.OpExecError("Can't remove child from mirror.")
3442

    
3443
    for node in child.logical_id[:2]:
3444
      self.cfg.SetDiskID(child, node)
3445
      if not rpc.call_blockdev_remove(node, child):
3446
        logger.Error("Warning: failed to remove device from node %s,"
3447
                     " continuing operation." % node)
3448

    
3449
    disk.children.remove(child)
3450
    self.cfg.AddInstance(instance)
3451

    
3452

    
3453
class LUReplaceDisks(LogicalUnit):
3454
  """Replace the disks of an instance.
3455

3456
  """
3457
  HPATH = "mirrors-replace"
3458
  HTYPE = constants.HTYPE_INSTANCE
3459
  _OP_REQP = ["instance_name", "mode", "disks"]
3460

    
3461
  def BuildHooksEnv(self):
3462
    """Build hooks env.
3463

3464
    This runs on the master, the primary and all the secondaries.
3465

3466
    """
3467
    env = {
3468
      "MODE": self.op.mode,
3469
      "NEW_SECONDARY": self.op.remote_node,
3470
      "OLD_SECONDARY": self.instance.secondary_nodes[0],
3471
      }
3472
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3473
    nl = [
3474
      self.sstore.GetMasterNode(),
3475
      self.instance.primary_node,
3476
      ]
3477
    if self.op.remote_node is not None:
3478
      nl.append(self.op.remote_node)
3479
    return env, nl, nl
3480

    
3481
  def CheckPrereq(self):
3482
    """Check prerequisites.
3483

3484
    This checks that the instance is in the cluster.
3485

3486
    """
3487
    instance = self.cfg.GetInstanceInfo(
3488
      self.cfg.ExpandInstanceName(self.op.instance_name))
3489
    if instance is None:
3490
      raise errors.OpPrereqError("Instance '%s' not known" %
3491
                                 self.op.instance_name)
3492
    self.instance = instance
3493
    self.op.instance_name = instance.name
3494

    
3495
    if instance.disk_template not in constants.DTS_NET_MIRROR:
3496
      raise errors.OpPrereqError("Instance's disk layout is not"
3497
                                 " network mirrored.")
3498

    
3499
    if len(instance.secondary_nodes) != 1:
3500
      raise errors.OpPrereqError("The instance has a strange layout,"
3501
                                 " expected one secondary but found %d" %
3502
                                 len(instance.secondary_nodes))
3503

    
3504
    self.sec_node = instance.secondary_nodes[0]
3505

    
3506
    remote_node = getattr(self.op, "remote_node", None)
3507
    if remote_node is not None:
3508
      remote_node = self.cfg.ExpandNodeName(remote_node)
3509
      if remote_node is None:
3510
        raise errors.OpPrereqError("Node '%s' not known" %
3511
                                   self.op.remote_node)
3512
      self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3513
    else:
3514
      self.remote_node_info = None
3515
    if remote_node == instance.primary_node:
3516
      raise errors.OpPrereqError("The specified node is the primary node of"
3517
                                 " the instance.")
3518
    elif remote_node == self.sec_node:
3519
      if self.op.mode == constants.REPLACE_DISK_SEC:
3520
        # this is for DRBD8, where we can't execute the same mode of
3521
        # replacement as for drbd7 (no different port allocated)
3522
        raise errors.OpPrereqError("Same secondary given, cannot execute"
3523
                                   " replacement")
3524
      # the user gave the current secondary, switch to
3525
      # 'no-replace-secondary' mode for drbd7
3526
      remote_node = None
3527
    if (instance.disk_template == constants.DT_REMOTE_RAID1 and
3528
        self.op.mode != constants.REPLACE_DISK_ALL):
3529
      raise errors.OpPrereqError("Template 'remote_raid1' only allows all"
3530
                                 " disks replacement, not individual ones")
3531
    if instance.disk_template == constants.DT_DRBD8:
3532
      if (self.op.mode == constants.REPLACE_DISK_ALL and
3533
          remote_node is not None):
3534
        # switch to replace secondary mode
3535
        self.op.mode = constants.REPLACE_DISK_SEC
3536

    
3537
      if self.op.mode == constants.REPLACE_DISK_ALL:
3538
        raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3539
                                   " secondary disk replacement, not"
3540
                                   " both at once")
3541
      elif self.op.mode == constants.REPLACE_DISK_PRI:
3542
        if remote_node is not None:
3543
          raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3544
                                     " the secondary while doing a primary"
3545
                                     " node disk replacement")
3546
        self.tgt_node = instance.primary_node
3547
        self.oth_node = instance.secondary_nodes[0]
3548
      elif self.op.mode == constants.REPLACE_DISK_SEC:
3549
        self.new_node = remote_node # this can be None, in which case
3550
                                    # we don't change the secondary
3551
        self.tgt_node = instance.secondary_nodes[0]
3552
        self.oth_node = instance.primary_node
3553
      else:
3554
        raise errors.ProgrammerError("Unhandled disk replace mode")
3555

    
3556
    for name in self.op.disks:
3557
      if instance.FindDisk(name) is None:
3558
        raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3559
                                   (name, instance.name))
3560
    self.op.remote_node = remote_node
3561

    
3562
  def _ExecRR1(self, feedback_fn):
3563
    """Replace the disks of an instance.
3564

3565
    """
3566
    instance = self.instance
3567
    iv_names = {}
3568
    # start of work
3569
    if self.op.remote_node is None:
3570
      remote_node = self.sec_node
3571
    else:
3572
      remote_node = self.op.remote_node
3573
    cfg = self.cfg
3574
    for dev in instance.disks:
3575
      size = dev.size
3576
      lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3577
      names = _GenerateUniqueNames(cfg, lv_names)
3578
      new_drbd = _GenerateMDDRBDBranch(cfg, instance.primary_node,
3579
                                       remote_node, size, names)
3580
      iv_names[dev.iv_name] = (dev, dev.children[0], new_drbd)
3581
      logger.Info("adding new mirror component on secondary for %s" %
3582
                  dev.iv_name)
3583
      #HARDCODE
3584
      if not _CreateBlockDevOnSecondary(cfg, remote_node, instance,
3585
                                        new_drbd, False,
3586
                                        _GetInstanceInfoText(instance)):
3587
        raise errors.OpExecError("Failed to create new component on secondary"
3588
                                 " node %s. Full abort, cleanup manually!" %
3589
                                 remote_node)
3590

    
3591
      logger.Info("adding new mirror component on primary")
3592
      #HARDCODE
3593
      if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
3594
                                      instance, new_drbd,
3595
                                      _GetInstanceInfoText(instance)):
3596
        # remove secondary dev
3597
        cfg.SetDiskID(new_drbd, remote_node)
3598
        rpc.call_blockdev_remove(remote_node, new_drbd)
3599
        raise errors.OpExecError("Failed to create volume on primary!"
3600
                                 " Full abort, cleanup manually!!")
3601

    
3602
      # the device exists now
3603
      # call the primary node to add the mirror to md
3604
      logger.Info("adding new mirror component to md")
3605
      if not rpc.call_blockdev_addchildren(instance.primary_node, dev,
3606
                                           [new_drbd]):
3607
        logger.Error("Can't add mirror compoment to md!")
3608
        cfg.SetDiskID(new_drbd, remote_node)
3609
        if not rpc.call_blockdev_remove(remote_node, new_drbd):
3610
          logger.Error("Can't rollback on secondary")
3611
        cfg.SetDiskID(new_drbd, instance.primary_node)
3612
        if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3613
          logger.Error("Can't rollback on primary")
3614
        raise errors.OpExecError("Full abort, cleanup manually!!")
3615

    
3616
      dev.children.append(new_drbd)
3617
      cfg.AddInstance(instance)
3618

    
3619
    # this can fail as the old devices are degraded and _WaitForSync
3620
    # does a combined result over all disks, so we don't check its
3621
    # return value
3622
    _WaitForSync(cfg, instance, self.proc, unlock=True)
3623

    
3624
    # so check manually all the devices
3625
    for name in iv_names:
3626
      dev, child, new_drbd = iv_names[name]
3627
      cfg.SetDiskID(dev, instance.primary_node)
3628
      is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3629
      if is_degr:
3630
        raise errors.OpExecError("MD device %s is degraded!" % name)
3631
      cfg.SetDiskID(new_drbd, instance.primary_node)
3632
      is_degr = rpc.call_blockdev_find(instance.primary_node, new_drbd)[5]
3633
      if is_degr:
3634
        raise errors.OpExecError("New drbd device %s is degraded!" % name)
3635

    
3636
    for name in iv_names:
3637
      dev, child, new_drbd = iv_names[name]
3638
      logger.Info("remove mirror %s component" % name)
3639
      cfg.SetDiskID(dev, instance.primary_node)
3640
      if not rpc.call_blockdev_removechildren(instance.primary_node,
3641
                                              dev, [child]):
3642
        logger.Error("Can't remove child from mirror, aborting"
3643
                     " *this device cleanup*.\nYou need to cleanup manually!!")
3644
        continue
3645

    
3646
      for node in child.logical_id[:2]:
3647
        logger.Info("remove child device on %s" % node)
3648
        cfg.SetDiskID(child, node)
3649
        if not rpc.call_blockdev_remove(node, child):
3650
          logger.Error("Warning: failed to remove device from node %s,"
3651
                       " continuing operation." % node)
3652

    
3653
      dev.children.remove(child)
3654

    
3655
      cfg.AddInstance(instance)
3656

    
3657
  def _ExecD8DiskOnly(self, feedback_fn):
3658
    """Replace a disk on the primary or secondary for dbrd8.
3659

3660
    The algorithm for replace is quite complicated:
3661
      - for each disk to be replaced:
3662
        - create new LVs on the target node with unique names
3663
        - detach old LVs from the drbd device
3664
        - rename old LVs to name_replaced.<time_t>
3665
        - rename new LVs to old LVs
3666
        - attach the new LVs (with the old names now) to the drbd device
3667
      - wait for sync across all devices
3668
      - for each modified disk:
3669
        - remove old LVs (which have the name name_replaces.<time_t>)
3670

3671
    Failures are not very well handled.
3672

3673
    """
3674
    steps_total = 6
3675
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3676
    instance = self.instance
3677
    iv_names = {}
3678
    vgname = self.cfg.GetVGName()
3679
    # start of work
3680
    cfg = self.cfg
3681
    tgt_node = self.tgt_node
3682
    oth_node = self.oth_node
3683

    
3684
    # Step: check device activation
3685
    self.proc.LogStep(1, steps_total, "check device existence")
3686
    info("checking volume groups")
3687
    my_vg = cfg.GetVGName()
3688
    results = rpc.call_vg_list([oth_node, tgt_node])
3689
    if not results:
3690
      raise errors.OpExecError("Can't list volume groups on the nodes")
3691
    for node in oth_node, tgt_node:
3692
      res = results.get(node, False)
3693
      if not res or my_vg not in res:
3694
        raise errors.OpExecError("Volume group '%s' not found on %s" %
3695
                                 (my_vg, node))
3696
    for dev in instance.disks:
3697
      if not dev.iv_name in self.op.disks:
3698
        continue
3699
      for node in tgt_node, oth_node:
3700
        info("checking %s on %s" % (dev.iv_name, node))
3701
        cfg.SetDiskID(dev, node)
3702
        if not rpc.call_blockdev_find(node, dev):
3703
          raise errors.OpExecError("Can't find device %s on node %s" %
3704
                                   (dev.iv_name, node))
3705

    
3706
    # Step: check other node consistency
3707
    self.proc.LogStep(2, steps_total, "check peer consistency")
3708
    for dev in instance.disks:
3709
      if not dev.iv_name in self.op.disks:
3710
        continue
3711
      info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3712
      if not _CheckDiskConsistency(self.cfg, dev, oth_node,
3713
                                   oth_node==instance.primary_node):
3714
        raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3715
                                 " to replace disks on this node (%s)" %
3716
                                 (oth_node, tgt_node))
3717

    
3718
    # Step: create new storage
3719
    self.proc.LogStep(3, steps_total, "allocate new storage")
3720
    for dev in instance.disks:
3721
      if not dev.iv_name in self.op.disks:
3722
        continue
3723
      size = dev.size
3724
      cfg.SetDiskID(dev, tgt_node)
3725
      lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3726
      names = _GenerateUniqueNames(cfg, lv_names)
3727
      lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3728
                             logical_id=(vgname, names[0]))
3729
      lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3730
                             logical_id=(vgname, names[1]))
3731
      new_lvs = [lv_data, lv_meta]
3732
      old_lvs = dev.children
3733
      iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3734
      info("creating new local storage on %s for %s" %
3735
           (tgt_node, dev.iv_name))
3736
      # since we *always* want to create this LV, we use the
3737
      # _Create...OnPrimary (which forces the creation), even if we
3738
      # are talking about the secondary node
3739
      for new_lv in new_lvs:
3740
        if not _CreateBlockDevOnPrimary(cfg, tgt_node, instance, new_lv,
3741
                                        _GetInstanceInfoText(instance)):
3742
          raise errors.OpExecError("Failed to create new LV named '%s' on"
3743
                                   " node '%s'" %
3744
                                   (new_lv.logical_id[1], tgt_node))
3745

    
3746
    # Step: for each lv, detach+rename*2+attach
3747
    self.proc.LogStep(4, steps_total, "change drbd configuration")
3748
    for dev, old_lvs, new_lvs in iv_names.itervalues():
3749
      info("detaching %s drbd from local storage" % dev.iv_name)
3750
      if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
3751
        raise errors.OpExecError("Can't detach drbd from local storage on node"
3752
                                 " %s for device %s" % (tgt_node, dev.iv_name))
3753
      #dev.children = []
3754
      #cfg.Update(instance)
3755

    
3756
      # ok, we created the new LVs, so now we know we have the needed
3757
      # storage; as such, we proceed on the target node to rename
3758
      # old_lv to _old, and new_lv to old_lv; note that we rename LVs
3759
      # using the assumption than logical_id == physical_id (which in
3760
      # turn is the unique_id on that node)
3761

    
3762
      # FIXME(iustin): use a better name for the replaced LVs
3763
      temp_suffix = int(time.time())
3764
      ren_fn = lambda d, suff: (d.physical_id[0],
3765
                                d.physical_id[1] + "_replaced-%s" % suff)
3766
      # build the rename list based on what LVs exist on the node
3767
      rlist = []
3768
      for to_ren in old_lvs:
3769
        find_res = rpc.call_blockdev_find(tgt_node, to_ren)
3770
        if find_res is not None: # device exists
3771
          rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
3772

    
3773
      info("renaming the old LVs on the target node")
3774
      if not rpc.call_blockdev_rename(tgt_node, rlist):
3775
        raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
3776
      # now we rename the new LVs to the old LVs
3777
      info("renaming the new LVs on the target node")
3778
      rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
3779
      if not rpc.call_blockdev_rename(tgt_node, rlist):
3780
        raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
3781

    
3782
      for old, new in zip(old_lvs, new_lvs):
3783
        new.logical_id = old.logical_id
3784
        cfg.SetDiskID(new, tgt_node)
3785

    
3786
      for disk in old_lvs:
3787
        disk.logical_id = ren_fn(disk, temp_suffix)
3788
        cfg.SetDiskID(disk, tgt_node)
3789

    
3790
      # now that the new lvs have the old name, we can add them to the device
3791
      info("adding new mirror component on %s" % tgt_node)
3792
      if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
3793
        for new_lv in new_lvs:
3794
          if not rpc.call_blockdev_remove(tgt_node, new_lv):
3795
            warning("Can't rollback device %s", hint="manually cleanup unused"
3796
                    " logical volumes")
3797
        raise errors.OpExecError("Can't add local storage to drbd")
3798

    
3799
      dev.children = new_lvs
3800
      cfg.Update(instance)
3801

    
3802
    # Step: wait for sync
3803

    
3804
    # this can fail as the old devices are degraded and _WaitForSync
3805
    # does a combined result over all disks, so we don't check its
3806
    # return value
3807
    self.proc.LogStep(5, steps_total, "sync devices")
3808
    _WaitForSync(cfg, instance, self.proc, unlock=True)
3809

    
3810
    # so check manually all the devices
3811
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3812
      cfg.SetDiskID(dev, instance.primary_node)
3813
      is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3814
      if is_degr:
3815
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
3816

    
3817
    # Step: remove old storage
3818
    self.proc.LogStep(6, steps_total, "removing old storage")
3819
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3820
      info("remove logical volumes for %s" % name)
3821
      for lv in old_lvs:
3822
        cfg.SetDiskID(lv, tgt_node)
3823
        if not rpc.call_blockdev_remove(tgt_node, lv):
3824
          warning("Can't remove old LV", hint="manually remove unused LVs")
3825
          continue
3826

    
3827
  def _ExecD8Secondary(self, feedback_fn):
3828
    """Replace the secondary node for drbd8.
3829

3830
    The algorithm for replace is quite complicated:
3831
      - for all disks of the instance:
3832
        - create new LVs on the new node with same names
3833
        - shutdown the drbd device on the old secondary
3834
        - disconnect the drbd network on the primary
3835
        - create the drbd device on the new secondary
3836
        - network attach the drbd on the primary, using an artifice:
3837
          the drbd code for Attach() will connect to the network if it
3838
          finds a device which is connected to the good local disks but
3839
          not network enabled
3840
      - wait for sync across all devices
3841
      - remove all disks from the old secondary
3842

3843
    Failures are not very well handled.
3844

3845
    """
3846
    steps_total = 6
3847
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3848
    instance = self.instance
3849
    iv_names = {}
3850
    vgname = self.cfg.GetVGName()
3851
    # start of work
3852
    cfg = self.cfg
3853
    old_node = self.tgt_node
3854
    new_node = self.new_node
3855
    pri_node = instance.primary_node
3856

    
3857
    # Step: check device activation
3858
    self.proc.LogStep(1, steps_total, "check device existence")
3859
    info("checking volume groups")
3860
    my_vg = cfg.GetVGName()
3861
    results = rpc.call_vg_list([pri_node, new_node])
3862
    if not results:
3863
      raise errors.OpExecError("Can't list volume groups on the nodes")
3864
    for node in pri_node, new_node:
3865
      res = results.get(node, False)
3866
      if not res or my_vg not in res:
3867
        raise errors.OpExecError("Volume group '%s' not found on %s" %
3868
                                 (my_vg, node))
3869
    for dev in instance.disks:
3870
      if not dev.iv_name in self.op.disks:
3871
        continue
3872
      info("checking %s on %s" % (dev.iv_name, pri_node))
3873
      cfg.SetDiskID(dev, pri_node)
3874
      if not rpc.call_blockdev_find(pri_node, dev):
3875
        raise errors.OpExecError("Can't find device %s on node %s" %
3876
                                 (dev.iv_name, pri_node))
3877

    
3878
    # Step: check other node consistency
3879
    self.proc.LogStep(2, steps_total, "check peer consistency")
3880
    for dev in instance.disks:
3881
      if not dev.iv_name in self.op.disks:
3882
        continue
3883
      info("checking %s consistency on %s" % (dev.iv_name, pri_node))
3884
      if not _CheckDiskConsistency(self.cfg, dev, pri_node, True, ldisk=True):
3885
        raise errors.OpExecError("Primary node (%s) has degraded storage,"
3886
                                 " unsafe to replace the secondary" %
3887
                                 pri_node)
3888

    
3889
    # Step: create new storage
3890
    self.proc.LogStep(3, steps_total, "allocate new storage")
3891
    for dev in instance.disks:
3892
      size = dev.size
3893
      info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
3894
      # since we *always* want to create this LV, we use the
3895
      # _Create...OnPrimary (which forces the creation), even if we
3896
      # are talking about the secondary node
3897
      for new_lv in dev.children:
3898
        if not _CreateBlockDevOnPrimary(cfg, new_node, instance, new_lv,
3899
                                        _GetInstanceInfoText(instance)):
3900
          raise errors.OpExecError("Failed to create new LV named '%s' on"
3901
                                   " node '%s'" %
3902
                                   (new_lv.logical_id[1], new_node))
3903

    
3904
      iv_names[dev.iv_name] = (dev, dev.children)
3905

    
3906
    self.proc.LogStep(4, steps_total, "changing drbd configuration")
3907
    for dev in instance.disks:
3908
      size = dev.size
3909
      info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
3910
      # create new devices on new_node
3911
      new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
3912
                              logical_id=(pri_node, new_node,
3913
                                          dev.logical_id[2]),
3914
                              children=dev.children)
3915
      if not _CreateBlockDevOnSecondary(cfg, new_node, instance,
3916
                                        new_drbd, False,
3917
                                      _GetInstanceInfoText(instance)):
3918
        raise errors.OpExecError("Failed to create new DRBD on"
3919
                                 " node '%s'" % new_node)
3920

    
3921
    for dev in instance.disks:
3922
      # we have new devices, shutdown the drbd on the old secondary
3923
      info("shutting down drbd for %s on old node" % dev.iv_name)
3924
      cfg.SetDiskID(dev, old_node)
3925
      if not rpc.call_blockdev_shutdown(old_node, dev):
3926
        warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
3927
                hint="Please cleanup this device manually as soon as possible")
3928

    
3929
    info("detaching primary drbds from the network (=> standalone)")
3930
    done = 0
3931
    for dev in instance.disks:
3932
      cfg.SetDiskID(dev, pri_node)
3933
      # set the physical (unique in bdev terms) id to None, meaning
3934
      # detach from network
3935
      dev.physical_id = (None,) * len(dev.physical_id)
3936
      # and 'find' the device, which will 'fix' it to match the
3937
      # standalone state
3938
      if rpc.call_blockdev_find(pri_node, dev):
3939
        done += 1
3940
      else:
3941
        warning("Failed to detach drbd %s from network, unusual case" %
3942
                dev.iv_name)
3943

    
3944
    if not done:
3945
      # no detaches succeeded (very unlikely)
3946
      raise errors.OpExecError("Can't detach at least one DRBD from old node")
3947

    
3948
    # if we managed to detach at least one, we update all the disks of
3949
    # the instance to point to the new secondary
3950
    info("updating instance configuration")
3951
    for dev in instance.disks:
3952
      dev.logical_id = (pri_node, new_node) + dev.logical_id[2:]
3953
      cfg.SetDiskID(dev, pri_node)
3954
    cfg.Update(instance)
3955

    
3956
    # and now perform the drbd attach
3957
    info("attaching primary drbds to new secondary (standalone => connected)")
3958
    failures = []
3959
    for dev in instance.disks:
3960
      info("attaching primary drbd for %s to new secondary node" % dev.iv_name)
3961
      # since the attach is smart, it's enough to 'find' the device,
3962
      # it will automatically activate the network, if the physical_id
3963
      # is correct
3964
      cfg.SetDiskID(dev, pri_node)
3965
      if not rpc.call_blockdev_find(pri_node, dev):
3966
        warning("can't attach drbd %s to new secondary!" % dev.iv_name,
3967
                "please do a gnt-instance info to see the status of disks")
3968

    
3969
    # this can fail as the old devices are degraded and _WaitForSync
3970
    # does a combined result over all disks, so we don't check its
3971
    # return value
3972
    self.proc.LogStep(5, steps_total, "sync devices")
3973
    _WaitForSync(cfg, instance, self.proc, unlock=True)
3974

    
3975
    # so check manually all the devices
3976
    for name, (dev, old_lvs) in iv_names.iteritems():
3977
      cfg.SetDiskID(dev, pri_node)
3978
      is_degr = rpc.call_blockdev_find(pri_node, dev)[5]
3979
      if is_degr:
3980
        raise errors.OpExecError(