Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ b63ed789

History | View | Annotate | Download (147.8 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0613,W0201
25

    
26
import os
27
import os.path
28
import sha
29
import time
30
import tempfile
31
import re
32
import platform
33

    
34
from ganeti import rpc
35
from ganeti import ssh
36
from ganeti import logger
37
from ganeti import utils
38
from ganeti import errors
39
from ganeti import hypervisor
40
from ganeti import config
41
from ganeti import constants
42
from ganeti import objects
43
from ganeti import opcodes
44
from ganeti import ssconf
45

    
46
class LogicalUnit(object):
47
  """Logical Unit base class.
48

49
  Subclasses must follow these rules:
50
    - implement CheckPrereq which also fills in the opcode instance
51
      with all the fields (even if as None)
52
    - implement Exec
53
    - implement BuildHooksEnv
54
    - redefine HPATH and HTYPE
55
    - optionally redefine their run requirements (REQ_CLUSTER,
56
      REQ_MASTER); note that all commands require root permissions
57

58
  """
59
  HPATH = None
60
  HTYPE = None
61
  _OP_REQP = []
62
  REQ_CLUSTER = True
63
  REQ_MASTER = True
64

    
65
  def __init__(self, processor, op, cfg, sstore):
66
    """Constructor for LogicalUnit.
67

68
    This needs to be overriden in derived classes in order to check op
69
    validity.
70

71
    """
72
    self.proc = processor
73
    self.op = op
74
    self.cfg = cfg
75
    self.sstore = sstore
76
    for attr_name in self._OP_REQP:
77
      attr_val = getattr(op, attr_name, None)
78
      if attr_val is None:
79
        raise errors.OpPrereqError("Required parameter '%s' missing" %
80
                                   attr_name)
81
    if self.REQ_CLUSTER:
82
      if not cfg.IsCluster():
83
        raise errors.OpPrereqError("Cluster not initialized yet,"
84
                                   " use 'gnt-cluster init' first.")
85
      if self.REQ_MASTER:
86
        master = sstore.GetMasterNode()
87
        if master != utils.HostInfo().name:
88
          raise errors.OpPrereqError("Commands must be run on the master"
89
                                     " node %s" % master)
90

    
91
  def CheckPrereq(self):
92
    """Check prerequisites for this LU.
93

94
    This method should check that the prerequisites for the execution
95
    of this LU are fulfilled. It can do internode communication, but
96
    it should be idempotent - no cluster or system changes are
97
    allowed.
98

99
    The method should raise errors.OpPrereqError in case something is
100
    not fulfilled. Its return value is ignored.
101

102
    This method should also update all the parameters of the opcode to
103
    their canonical form; e.g. a short node name must be fully
104
    expanded after this method has successfully completed (so that
105
    hooks, logging, etc. work correctly).
106

107
    """
108
    raise NotImplementedError
109

    
110
  def Exec(self, feedback_fn):
111
    """Execute the LU.
112

113
    This method should implement the actual work. It should raise
114
    errors.OpExecError for failures that are somewhat dealt with in
115
    code, or expected.
116

117
    """
118
    raise NotImplementedError
119

    
120
  def BuildHooksEnv(self):
121
    """Build hooks environment for this LU.
122

123
    This method should return a three-node tuple consisting of: a dict
124
    containing the environment that will be used for running the
125
    specific hook for this LU, a list of node names on which the hook
126
    should run before the execution, and a list of node names on which
127
    the hook should run after the execution.
128

129
    The keys of the dict must not have 'GANETI_' prefixed as this will
130
    be handled in the hooks runner. Also note additional keys will be
131
    added by the hooks runner. If the LU doesn't define any
132
    environment, an empty dict (and not None) should be returned.
133

134
    As for the node lists, the master should not be included in the
135
    them, as it will be added by the hooks runner in case this LU
136
    requires a cluster to run on (otherwise we don't have a node
137
    list). No nodes should be returned as an empty list (and not
138
    None).
139

140
    Note that if the HPATH for a LU class is None, this function will
141
    not be called.
142

143
    """
144
    raise NotImplementedError
145

    
146

    
147
class NoHooksLU(LogicalUnit):
148
  """Simple LU which runs no hooks.
149

150
  This LU is intended as a parent for other LogicalUnits which will
151
  run no hooks, in order to reduce duplicate code.
152

153
  """
154
  HPATH = None
155
  HTYPE = None
156

    
157
  def BuildHooksEnv(self):
158
    """Build hooks env.
159

160
    This is a no-op, since we don't run hooks.
161

162
    """
163
    return {}, [], []
164

    
165

    
166
def _AddHostToEtcHosts(hostname):
167
  """Wrapper around utils.SetEtcHostsEntry.
168

169
  """
170
  hi = utils.HostInfo(name=hostname)
171
  utils.SetEtcHostsEntry(constants.ETC_HOSTS, hi.ip, hi.name, [hi.ShortName()])
172

    
173

    
174
def _RemoveHostFromEtcHosts(hostname):
175
  """Wrapper around utils.RemoveEtcHostsEntry.
176

177
  """
178
  hi = utils.HostInfo(name=hostname)
179
  utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.name)
180
  utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.ShortName())
181

    
182

    
183
def _GetWantedNodes(lu, nodes):
184
  """Returns list of checked and expanded node names.
185

186
  Args:
187
    nodes: List of nodes (strings) or None for all
188

189
  """
190
  if not isinstance(nodes, list):
191
    raise errors.OpPrereqError("Invalid argument type 'nodes'")
192

    
193
  if nodes:
194
    wanted = []
195

    
196
    for name in nodes:
197
      node = lu.cfg.ExpandNodeName(name)
198
      if node is None:
199
        raise errors.OpPrereqError("No such node name '%s'" % name)
200
      wanted.append(node)
201

    
202
  else:
203
    wanted = lu.cfg.GetNodeList()
204
  return utils.NiceSort(wanted)
205

    
206

    
207
def _GetWantedInstances(lu, instances):
208
  """Returns list of checked and expanded instance names.
209

210
  Args:
211
    instances: List of instances (strings) or None for all
212

213
  """
214
  if not isinstance(instances, list):
215
    raise errors.OpPrereqError("Invalid argument type 'instances'")
216

    
217
  if instances:
218
    wanted = []
219

    
220
    for name in instances:
221
      instance = lu.cfg.ExpandInstanceName(name)
222
      if instance is None:
223
        raise errors.OpPrereqError("No such instance name '%s'" % name)
224
      wanted.append(instance)
225

    
226
  else:
227
    wanted = lu.cfg.GetInstanceList()
228
  return utils.NiceSort(wanted)
229

    
230

    
231
def _CheckOutputFields(static, dynamic, selected):
232
  """Checks whether all selected fields are valid.
233

234
  Args:
235
    static: Static fields
236
    dynamic: Dynamic fields
237

238
  """
239
  static_fields = frozenset(static)
240
  dynamic_fields = frozenset(dynamic)
241

    
242
  all_fields = static_fields | dynamic_fields
243

    
244
  if not all_fields.issuperset(selected):
245
    raise errors.OpPrereqError("Unknown output fields selected: %s"
246
                               % ",".join(frozenset(selected).
247
                                          difference(all_fields)))
248

    
249

    
250
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
251
                          memory, vcpus, nics):
252
  """Builds instance related env variables for hooks from single variables.
253

254
  Args:
255
    secondary_nodes: List of secondary nodes as strings
256
  """
257
  env = {
258
    "OP_TARGET": name,
259
    "INSTANCE_NAME": name,
260
    "INSTANCE_PRIMARY": primary_node,
261
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
262
    "INSTANCE_OS_TYPE": os_type,
263
    "INSTANCE_STATUS": status,
264
    "INSTANCE_MEMORY": memory,
265
    "INSTANCE_VCPUS": vcpus,
266
  }
267

    
268
  if nics:
269
    nic_count = len(nics)
270
    for idx, (ip, bridge) in enumerate(nics):
271
      if ip is None:
272
        ip = ""
273
      env["INSTANCE_NIC%d_IP" % idx] = ip
274
      env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
275
  else:
276
    nic_count = 0
277

    
278
  env["INSTANCE_NIC_COUNT"] = nic_count
279

    
280
  return env
281

    
282

    
283
def _BuildInstanceHookEnvByObject(instance, override=None):
284
  """Builds instance related env variables for hooks from an object.
285

286
  Args:
287
    instance: objects.Instance object of instance
288
    override: dict of values to override
289
  """
290
  args = {
291
    'name': instance.name,
292
    'primary_node': instance.primary_node,
293
    'secondary_nodes': instance.secondary_nodes,
294
    'os_type': instance.os,
295
    'status': instance.os,
296
    'memory': instance.memory,
297
    'vcpus': instance.vcpus,
298
    'nics': [(nic.ip, nic.bridge) for nic in instance.nics],
299
  }
300
  if override:
301
    args.update(override)
302
  return _BuildInstanceHookEnv(**args)
303

    
304

    
305
def _UpdateKnownHosts(fullnode, ip, pubkey):
306
  """Ensure a node has a correct known_hosts entry.
307

308
  Args:
309
    fullnode - Fully qualified domain name of host. (str)
310
    ip       - IPv4 address of host (str)
311
    pubkey   - the public key of the cluster
312

313
  """
314
  if os.path.exists(constants.SSH_KNOWN_HOSTS_FILE):
315
    f = open(constants.SSH_KNOWN_HOSTS_FILE, 'r+')
316
  else:
317
    f = open(constants.SSH_KNOWN_HOSTS_FILE, 'w+')
318

    
319
  inthere = False
320

    
321
  save_lines = []
322
  add_lines = []
323
  removed = False
324

    
325
  for rawline in f:
326
    logger.Debug('read %s' % (repr(rawline),))
327

    
328
    parts = rawline.rstrip('\r\n').split()
329

    
330
    # Ignore unwanted lines
331
    if len(parts) >= 3 and not rawline.lstrip()[0] == '#':
332
      fields = parts[0].split(',')
333
      key = parts[2]
334

    
335
      haveall = True
336
      havesome = False
337
      for spec in [ ip, fullnode ]:
338
        if spec not in fields:
339
          haveall = False
340
        if spec in fields:
341
          havesome = True
342

    
343
      logger.Debug("key, pubkey = %s." % (repr((key, pubkey)),))
344
      if haveall and key == pubkey:
345
        inthere = True
346
        save_lines.append(rawline)
347
        logger.Debug("Keeping known_hosts '%s'." % (repr(rawline),))
348
        continue
349

    
350
      if havesome and (not haveall or key != pubkey):
351
        removed = True
352
        logger.Debug("Discarding known_hosts '%s'." % (repr(rawline),))
353
        continue
354

    
355
    save_lines.append(rawline)
356

    
357
  if not inthere:
358
    add_lines.append('%s,%s ssh-rsa %s\n' % (fullnode, ip, pubkey))
359
    logger.Debug("Adding known_hosts '%s'." % (repr(add_lines[-1]),))
360

    
361
  if removed:
362
    save_lines = save_lines + add_lines
363

    
364
    # Write a new file and replace old.
365
    fd, tmpname = tempfile.mkstemp('.tmp', 'known_hosts.',
366
                                   constants.DATA_DIR)
367
    newfile = os.fdopen(fd, 'w')
368
    try:
369
      newfile.write(''.join(save_lines))
370
    finally:
371
      newfile.close()
372
    logger.Debug("Wrote new known_hosts.")
373
    os.rename(tmpname, constants.SSH_KNOWN_HOSTS_FILE)
374

    
375
  elif add_lines:
376
    # Simply appending a new line will do the trick.
377
    f.seek(0, 2)
378
    for add in add_lines:
379
      f.write(add)
380

    
381
  f.close()
382

    
383

    
384
def _HasValidVG(vglist, vgname):
385
  """Checks if the volume group list is valid.
386

387
  A non-None return value means there's an error, and the return value
388
  is the error message.
389

390
  """
391
  vgsize = vglist.get(vgname, None)
392
  if vgsize is None:
393
    return "volume group '%s' missing" % vgname
394
  elif vgsize < 20480:
395
    return ("volume group '%s' too small (20480MiB required, %dMib found)" %
396
            (vgname, vgsize))
397
  return None
398

    
399

    
400
def _InitSSHSetup(node):
401
  """Setup the SSH configuration for the cluster.
402

403

404
  This generates a dsa keypair for root, adds the pub key to the
405
  permitted hosts and adds the hostkey to its own known hosts.
406

407
  Args:
408
    node: the name of this host as a fqdn
409

410
  """
411
  priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
412

    
413
  for name in priv_key, pub_key:
414
    if os.path.exists(name):
415
      utils.CreateBackup(name)
416
    utils.RemoveFile(name)
417

    
418
  result = utils.RunCmd(["ssh-keygen", "-t", "dsa",
419
                         "-f", priv_key,
420
                         "-q", "-N", ""])
421
  if result.failed:
422
    raise errors.OpExecError("Could not generate ssh keypair, error %s" %
423
                             result.output)
424

    
425
  f = open(pub_key, 'r')
426
  try:
427
    utils.AddAuthorizedKey(auth_keys, f.read(8192))
428
  finally:
429
    f.close()
430

    
431

    
432
def _InitGanetiServerSetup(ss):
433
  """Setup the necessary configuration for the initial node daemon.
434

435
  This creates the nodepass file containing the shared password for
436
  the cluster and also generates the SSL certificate.
437

438
  """
439
  # Create pseudo random password
440
  randpass = sha.new(os.urandom(64)).hexdigest()
441
  # and write it into sstore
442
  ss.SetKey(ss.SS_NODED_PASS, randpass)
443

    
444
  result = utils.RunCmd(["openssl", "req", "-new", "-newkey", "rsa:1024",
445
                         "-days", str(365*5), "-nodes", "-x509",
446
                         "-keyout", constants.SSL_CERT_FILE,
447
                         "-out", constants.SSL_CERT_FILE, "-batch"])
448
  if result.failed:
449
    raise errors.OpExecError("could not generate server ssl cert, command"
450
                             " %s had exitcode %s and error message %s" %
451
                             (result.cmd, result.exit_code, result.output))
452

    
453
  os.chmod(constants.SSL_CERT_FILE, 0400)
454

    
455
  result = utils.RunCmd([constants.NODE_INITD_SCRIPT, "restart"])
456

    
457
  if result.failed:
458
    raise errors.OpExecError("Could not start the node daemon, command %s"
459
                             " had exitcode %s and error %s" %
460
                             (result.cmd, result.exit_code, result.output))
461

    
462

    
463
def _CheckInstanceBridgesExist(instance):
464
  """Check that the brigdes needed by an instance exist.
465

466
  """
467
  # check bridges existance
468
  brlist = [nic.bridge for nic in instance.nics]
469
  if not rpc.call_bridges_exist(instance.primary_node, brlist):
470
    raise errors.OpPrereqError("one or more target bridges %s does not"
471
                               " exist on destination node '%s'" %
472
                               (brlist, instance.primary_node))
473

    
474

    
475
class LUInitCluster(LogicalUnit):
476
  """Initialise the cluster.
477

478
  """
479
  HPATH = "cluster-init"
480
  HTYPE = constants.HTYPE_CLUSTER
481
  _OP_REQP = ["cluster_name", "hypervisor_type", "vg_name", "mac_prefix",
482
              "def_bridge", "master_netdev"]
483
  REQ_CLUSTER = False
484

    
485
  def BuildHooksEnv(self):
486
    """Build hooks env.
487

488
    Notes: Since we don't require a cluster, we must manually add
489
    ourselves in the post-run node list.
490

491
    """
492
    env = {"OP_TARGET": self.op.cluster_name}
493
    return env, [], [self.hostname.name]
494

    
495
  def CheckPrereq(self):
496
    """Verify that the passed name is a valid one.
497

498
    """
499
    if config.ConfigWriter.IsCluster():
500
      raise errors.OpPrereqError("Cluster is already initialised")
501

    
502
    if self.op.hypervisor_type == constants.HT_XEN_HVM31:
503
      if not os.path.exists(constants.VNC_PASSWORD_FILE):
504
        raise errors.OpPrereqError("Please prepare the cluster VNC"
505
                                   "password file %s" %
506
                                   constants.VNC_PASSWORD_FILE)
507

    
508
    self.hostname = hostname = utils.HostInfo()
509

    
510
    if hostname.ip.startswith("127."):
511
      raise errors.OpPrereqError("This host's IP resolves to the private"
512
                                 " range (%s). Please fix DNS or /etc/hosts." %
513
                                 (hostname.ip,))
514

    
515
    self.clustername = clustername = utils.HostInfo(self.op.cluster_name)
516

    
517
    if not utils.TcpPing(constants.LOCALHOST_IP_ADDRESS, hostname.ip,
518
                         constants.DEFAULT_NODED_PORT):
519
      raise errors.OpPrereqError("Inconsistency: this host's name resolves"
520
                                 " to %s,\nbut this ip address does not"
521
                                 " belong to this host."
522
                                 " Aborting." % hostname.ip)
523

    
524
    secondary_ip = getattr(self.op, "secondary_ip", None)
525
    if secondary_ip and not utils.IsValidIP(secondary_ip):
526
      raise errors.OpPrereqError("Invalid secondary ip given")
527
    if (secondary_ip and
528
        secondary_ip != hostname.ip and
529
        (not utils.TcpPing(constants.LOCALHOST_IP_ADDRESS, secondary_ip,
530
                           constants.DEFAULT_NODED_PORT))):
531
      raise errors.OpPrereqError("You gave %s as secondary IP,"
532
                                 " but it does not belong to this host." %
533
                                 secondary_ip)
534
    self.secondary_ip = secondary_ip
535

    
536
    # checks presence of the volume group given
537
    vgstatus = _HasValidVG(utils.ListVolumeGroups(), self.op.vg_name)
538

    
539
    if vgstatus:
540
      raise errors.OpPrereqError("Error: %s" % vgstatus)
541

    
542
    if not re.match("^[0-9a-z]{2}:[0-9a-z]{2}:[0-9a-z]{2}$",
543
                    self.op.mac_prefix):
544
      raise errors.OpPrereqError("Invalid mac prefix given '%s'" %
545
                                 self.op.mac_prefix)
546

    
547
    if self.op.hypervisor_type not in constants.HYPER_TYPES:
548
      raise errors.OpPrereqError("Invalid hypervisor type given '%s'" %
549
                                 self.op.hypervisor_type)
550

    
551
    result = utils.RunCmd(["ip", "link", "show", "dev", self.op.master_netdev])
552
    if result.failed:
553
      raise errors.OpPrereqError("Invalid master netdev given (%s): '%s'" %
554
                                 (self.op.master_netdev,
555
                                  result.output.strip()))
556

    
557
    if not (os.path.isfile(constants.NODE_INITD_SCRIPT) and
558
            os.access(constants.NODE_INITD_SCRIPT, os.X_OK)):
559
      raise errors.OpPrereqError("Init.d script '%s' missing or not"
560
                                 " executable." % constants.NODE_INITD_SCRIPT)
561

    
562
  def Exec(self, feedback_fn):
563
    """Initialize the cluster.
564

565
    """
566
    clustername = self.clustername
567
    hostname = self.hostname
568

    
569
    # set up the simple store
570
    self.sstore = ss = ssconf.SimpleStore()
571
    ss.SetKey(ss.SS_HYPERVISOR, self.op.hypervisor_type)
572
    ss.SetKey(ss.SS_MASTER_NODE, hostname.name)
573
    ss.SetKey(ss.SS_MASTER_IP, clustername.ip)
574
    ss.SetKey(ss.SS_MASTER_NETDEV, self.op.master_netdev)
575
    ss.SetKey(ss.SS_CLUSTER_NAME, clustername.name)
576

    
577
    # set up the inter-node password and certificate
578
    _InitGanetiServerSetup(ss)
579

    
580
    # start the master ip
581
    rpc.call_node_start_master(hostname.name)
582

    
583
    # set up ssh config and /etc/hosts
584
    f = open(constants.SSH_HOST_RSA_PUB, 'r')
585
    try:
586
      sshline = f.read()
587
    finally:
588
      f.close()
589
    sshkey = sshline.split(" ")[1]
590

    
591
    _AddHostToEtcHosts(hostname.name)
592

    
593
    _UpdateKnownHosts(hostname.name, hostname.ip, sshkey)
594

    
595
    _InitSSHSetup(hostname.name)
596

    
597
    # init of cluster config file
598
    self.cfg = cfgw = config.ConfigWriter()
599
    cfgw.InitConfig(hostname.name, hostname.ip, self.secondary_ip,
600
                    sshkey, self.op.mac_prefix,
601
                    self.op.vg_name, self.op.def_bridge)
602

    
603

    
604
class LUDestroyCluster(NoHooksLU):
605
  """Logical unit for destroying the cluster.
606

607
  """
608
  _OP_REQP = []
609

    
610
  def CheckPrereq(self):
611
    """Check prerequisites.
612

613
    This checks whether the cluster is empty.
614

615
    Any errors are signalled by raising errors.OpPrereqError.
616

617
    """
618
    master = self.sstore.GetMasterNode()
619

    
620
    nodelist = self.cfg.GetNodeList()
621
    if len(nodelist) != 1 or nodelist[0] != master:
622
      raise errors.OpPrereqError("There are still %d node(s) in"
623
                                 " this cluster." % (len(nodelist) - 1))
624
    instancelist = self.cfg.GetInstanceList()
625
    if instancelist:
626
      raise errors.OpPrereqError("There are still %d instance(s) in"
627
                                 " this cluster." % len(instancelist))
628

    
629
  def Exec(self, feedback_fn):
630
    """Destroys the cluster.
631

632
    """
633
    master = self.sstore.GetMasterNode()
634
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
635
    utils.CreateBackup(priv_key)
636
    utils.CreateBackup(pub_key)
637
    rpc.call_node_leave_cluster(master)
638

    
639

    
640
class LUVerifyCluster(NoHooksLU):
641
  """Verifies the cluster status.
642

643
  """
644
  _OP_REQP = []
645

    
646
  def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
647
                  remote_version, feedback_fn):
648
    """Run multiple tests against a node.
649

650
    Test list:
651
      - compares ganeti version
652
      - checks vg existance and size > 20G
653
      - checks config file checksum
654
      - checks ssh to other nodes
655

656
    Args:
657
      node: name of the node to check
658
      file_list: required list of files
659
      local_cksum: dictionary of local files and their checksums
660

661
    """
662
    # compares ganeti version
663
    local_version = constants.PROTOCOL_VERSION
664
    if not remote_version:
665
      feedback_fn(" - ERROR: connection to %s failed" % (node))
666
      return True
667

    
668
    if local_version != remote_version:
669
      feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
670
                      (local_version, node, remote_version))
671
      return True
672

    
673
    # checks vg existance and size > 20G
674

    
675
    bad = False
676
    if not vglist:
677
      feedback_fn("  - ERROR: unable to check volume groups on node %s." %
678
                      (node,))
679
      bad = True
680
    else:
681
      vgstatus = _HasValidVG(vglist, self.cfg.GetVGName())
682
      if vgstatus:
683
        feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
684
        bad = True
685

    
686
    # checks config file checksum
687
    # checks ssh to any
688

    
689
    if 'filelist' not in node_result:
690
      bad = True
691
      feedback_fn("  - ERROR: node hasn't returned file checksum data")
692
    else:
693
      remote_cksum = node_result['filelist']
694
      for file_name in file_list:
695
        if file_name not in remote_cksum:
696
          bad = True
697
          feedback_fn("  - ERROR: file '%s' missing" % file_name)
698
        elif remote_cksum[file_name] != local_cksum[file_name]:
699
          bad = True
700
          feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
701

    
702
    if 'nodelist' not in node_result:
703
      bad = True
704
      feedback_fn("  - ERROR: node hasn't returned node connectivity data")
705
    else:
706
      if node_result['nodelist']:
707
        bad = True
708
        for node in node_result['nodelist']:
709
          feedback_fn("  - ERROR: communication with node '%s': %s" %
710
                          (node, node_result['nodelist'][node]))
711
    hyp_result = node_result.get('hypervisor', None)
712
    if hyp_result is not None:
713
      feedback_fn("  - ERROR: hypervisor verify failure: '%s'" % hyp_result)
714
    return bad
715

    
716
  def _VerifyInstance(self, instance, node_vol_is, node_instance, feedback_fn):
717
    """Verify an instance.
718

719
    This function checks to see if the required block devices are
720
    available on the instance's node.
721

722
    """
723
    bad = False
724

    
725
    instancelist = self.cfg.GetInstanceList()
726
    if not instance in instancelist:
727
      feedback_fn("  - ERROR: instance %s not in instance list %s" %
728
                      (instance, instancelist))
729
      bad = True
730

    
731
    instanceconfig = self.cfg.GetInstanceInfo(instance)
732
    node_current = instanceconfig.primary_node
733

    
734
    node_vol_should = {}
735
    instanceconfig.MapLVsByNode(node_vol_should)
736

    
737
    for node in node_vol_should:
738
      for volume in node_vol_should[node]:
739
        if node not in node_vol_is or volume not in node_vol_is[node]:
740
          feedback_fn("  - ERROR: volume %s missing on node %s" %
741
                          (volume, node))
742
          bad = True
743

    
744
    if not instanceconfig.status == 'down':
745
      if not instance in node_instance[node_current]:
746
        feedback_fn("  - ERROR: instance %s not running on node %s" %
747
                        (instance, node_current))
748
        bad = True
749

    
750
    for node in node_instance:
751
      if (not node == node_current):
752
        if instance in node_instance[node]:
753
          feedback_fn("  - ERROR: instance %s should not run on node %s" %
754
                          (instance, node))
755
          bad = True
756

    
757
    return bad
758

    
759
  def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
760
    """Verify if there are any unknown volumes in the cluster.
761

762
    The .os, .swap and backup volumes are ignored. All other volumes are
763
    reported as unknown.
764

765
    """
766
    bad = False
767

    
768
    for node in node_vol_is:
769
      for volume in node_vol_is[node]:
770
        if node not in node_vol_should or volume not in node_vol_should[node]:
771
          feedback_fn("  - ERROR: volume %s on node %s should not exist" %
772
                      (volume, node))
773
          bad = True
774
    return bad
775

    
776
  def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
777
    """Verify the list of running instances.
778

779
    This checks what instances are running but unknown to the cluster.
780

781
    """
782
    bad = False
783
    for node in node_instance:
784
      for runninginstance in node_instance[node]:
785
        if runninginstance not in instancelist:
786
          feedback_fn("  - ERROR: instance %s on node %s should not exist" %
787
                          (runninginstance, node))
788
          bad = True
789
    return bad
790

    
791
  def CheckPrereq(self):
792
    """Check prerequisites.
793

794
    This has no prerequisites.
795

796
    """
797
    pass
798

    
799
  def Exec(self, feedback_fn):
800
    """Verify integrity of cluster, performing various test on nodes.
801

802
    """
803
    bad = False
804
    feedback_fn("* Verifying global settings")
805
    for msg in self.cfg.VerifyConfig():
806
      feedback_fn("  - ERROR: %s" % msg)
807

    
808
    vg_name = self.cfg.GetVGName()
809
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
810
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
811
    node_volume = {}
812
    node_instance = {}
813

    
814
    # FIXME: verify OS list
815
    # do local checksums
816
    file_names = list(self.sstore.GetFileList())
817
    file_names.append(constants.SSL_CERT_FILE)
818
    file_names.append(constants.CLUSTER_CONF_FILE)
819
    local_checksums = utils.FingerprintFiles(file_names)
820

    
821
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
822
    all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
823
    all_instanceinfo = rpc.call_instance_list(nodelist)
824
    all_vglist = rpc.call_vg_list(nodelist)
825
    node_verify_param = {
826
      'filelist': file_names,
827
      'nodelist': nodelist,
828
      'hypervisor': None,
829
      }
830
    all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
831
    all_rversion = rpc.call_version(nodelist)
832

    
833
    for node in nodelist:
834
      feedback_fn("* Verifying node %s" % node)
835
      result = self._VerifyNode(node, file_names, local_checksums,
836
                                all_vglist[node], all_nvinfo[node],
837
                                all_rversion[node], feedback_fn)
838
      bad = bad or result
839

    
840
      # node_volume
841
      volumeinfo = all_volumeinfo[node]
842

    
843
      if isinstance(volumeinfo, basestring):
844
        feedback_fn("  - ERROR: LVM problem on node %s: %s" %
845
                    (node, volumeinfo[-400:].encode('string_escape')))
846
        bad = True
847
        node_volume[node] = {}
848
      elif not isinstance(volumeinfo, dict):
849
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
850
        bad = True
851
        continue
852
      else:
853
        node_volume[node] = volumeinfo
854

    
855
      # node_instance
856
      nodeinstance = all_instanceinfo[node]
857
      if type(nodeinstance) != list:
858
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
859
        bad = True
860
        continue
861

    
862
      node_instance[node] = nodeinstance
863

    
864
    node_vol_should = {}
865

    
866
    for instance in instancelist:
867
      feedback_fn("* Verifying instance %s" % instance)
868
      result =  self._VerifyInstance(instance, node_volume, node_instance,
869
                                     feedback_fn)
870
      bad = bad or result
871

    
872
      inst_config = self.cfg.GetInstanceInfo(instance)
873

    
874
      inst_config.MapLVsByNode(node_vol_should)
875

    
876
    feedback_fn("* Verifying orphan volumes")
877
    result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
878
                                       feedback_fn)
879
    bad = bad or result
880

    
881
    feedback_fn("* Verifying remaining instances")
882
    result = self._VerifyOrphanInstances(instancelist, node_instance,
883
                                         feedback_fn)
884
    bad = bad or result
885

    
886
    return int(bad)
887

    
888

    
889
class LUVerifyDisks(NoHooksLU):
890
  """Verifies the cluster disks status.
891

892
  """
893
  _OP_REQP = []
894

    
895
  def CheckPrereq(self):
896
    """Check prerequisites.
897

898
    This has no prerequisites.
899

900
    """
901
    pass
902

    
903
  def Exec(self, feedback_fn):
904
    """Verify integrity of cluster disks.
905

906
    """
907
    result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
908

    
909
    vg_name = self.cfg.GetVGName()
910
    nodes = utils.NiceSort(self.cfg.GetNodeList())
911
    instances = [self.cfg.GetInstanceInfo(name)
912
                 for name in self.cfg.GetInstanceList()]
913

    
914
    nv_dict = {}
915
    for inst in instances:
916
      inst_lvs = {}
917
      if (inst.status != "up" or
918
          inst.disk_template not in constants.DTS_NET_MIRROR):
919
        continue
920
      inst.MapLVsByNode(inst_lvs)
921
      # transform { iname: {node: [vol,],},} to {(node, vol): iname}
922
      for node, vol_list in inst_lvs.iteritems():
923
        for vol in vol_list:
924
          nv_dict[(node, vol)] = inst
925

    
926
    if not nv_dict:
927
      return result
928

    
929
    node_lvs = rpc.call_volume_list(nodes, vg_name)
930

    
931
    to_act = set()
932
    for node in nodes:
933
      # node_volume
934
      lvs = node_lvs[node]
935

    
936
      if isinstance(lvs, basestring):
937
        logger.Info("error enumerating LVs on node %s: %s" % (node, lvs))
938
        res_nlvm[node] = lvs
939
      elif not isinstance(lvs, dict):
940
        logger.Info("connection to node %s failed or invalid data returned" %
941
                    (node,))
942
        res_nodes.append(node)
943
        continue
944

    
945
      for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
946
        inst = nv_dict.pop((node, lv_name), None)
947
        if (not lv_online and inst is not None
948
            and inst.name not in res_instances):
949
            res_instances.append(inst.name)
950

    
951
    # any leftover items in nv_dict are missing LVs, let's arrange the
952
    # data better
953
    for key, inst in nv_dict.iteritems():
954
      if inst.name not in res_missing:
955
        res_missing[inst.name] = []
956
      res_missing[inst.name].append(key)
957

    
958
    return result
959

    
960

    
961
class LURenameCluster(LogicalUnit):
962
  """Rename the cluster.
963

964
  """
965
  HPATH = "cluster-rename"
966
  HTYPE = constants.HTYPE_CLUSTER
967
  _OP_REQP = ["name"]
968

    
969
  def BuildHooksEnv(self):
970
    """Build hooks env.
971

972
    """
973
    env = {
974
      "OP_TARGET": self.op.sstore.GetClusterName(),
975
      "NEW_NAME": self.op.name,
976
      }
977
    mn = self.sstore.GetMasterNode()
978
    return env, [mn], [mn]
979

    
980
  def CheckPrereq(self):
981
    """Verify that the passed name is a valid one.
982

983
    """
984
    hostname = utils.HostInfo(self.op.name)
985

    
986
    new_name = hostname.name
987
    self.ip = new_ip = hostname.ip
988
    old_name = self.sstore.GetClusterName()
989
    old_ip = self.sstore.GetMasterIP()
990
    if new_name == old_name and new_ip == old_ip:
991
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
992
                                 " cluster has changed")
993
    if new_ip != old_ip:
994
      result = utils.RunCmd(["fping", "-q", new_ip])
995
      if not result.failed:
996
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
997
                                   " reachable on the network. Aborting." %
998
                                   new_ip)
999

    
1000
    self.op.name = new_name
1001

    
1002
  def Exec(self, feedback_fn):
1003
    """Rename the cluster.
1004

1005
    """
1006
    clustername = self.op.name
1007
    ip = self.ip
1008
    ss = self.sstore
1009

    
1010
    # shutdown the master IP
1011
    master = ss.GetMasterNode()
1012
    if not rpc.call_node_stop_master(master):
1013
      raise errors.OpExecError("Could not disable the master role")
1014

    
1015
    try:
1016
      # modify the sstore
1017
      ss.SetKey(ss.SS_MASTER_IP, ip)
1018
      ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
1019

    
1020
      # Distribute updated ss config to all nodes
1021
      myself = self.cfg.GetNodeInfo(master)
1022
      dist_nodes = self.cfg.GetNodeList()
1023
      if myself.name in dist_nodes:
1024
        dist_nodes.remove(myself.name)
1025

    
1026
      logger.Debug("Copying updated ssconf data to all nodes")
1027
      for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1028
        fname = ss.KeyToFilename(keyname)
1029
        result = rpc.call_upload_file(dist_nodes, fname)
1030
        for to_node in dist_nodes:
1031
          if not result[to_node]:
1032
            logger.Error("copy of file %s to node %s failed" %
1033
                         (fname, to_node))
1034
    finally:
1035
      if not rpc.call_node_start_master(master):
1036
        logger.Error("Could not re-enable the master role on the master,"
1037
                     " please restart manually.")
1038

    
1039

    
1040
def _WaitForSync(cfgw, instance, proc, oneshot=False, unlock=False):
1041
  """Sleep and poll for an instance's disk to sync.
1042

1043
  """
1044
  if not instance.disks:
1045
    return True
1046

    
1047
  if not oneshot:
1048
    proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1049

    
1050
  node = instance.primary_node
1051

    
1052
  for dev in instance.disks:
1053
    cfgw.SetDiskID(dev, node)
1054

    
1055
  retries = 0
1056
  while True:
1057
    max_time = 0
1058
    done = True
1059
    cumul_degraded = False
1060
    rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1061
    if not rstats:
1062
      proc.LogWarning("Can't get any data from node %s" % node)
1063
      retries += 1
1064
      if retries >= 10:
1065
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1066
                                 " aborting." % node)
1067
      time.sleep(6)
1068
      continue
1069
    retries = 0
1070
    for i in range(len(rstats)):
1071
      mstat = rstats[i]
1072
      if mstat is None:
1073
        proc.LogWarning("Can't compute data for node %s/%s" %
1074
                        (node, instance.disks[i].iv_name))
1075
        continue
1076
      # we ignore the ldisk parameter
1077
      perc_done, est_time, is_degraded, _ = mstat
1078
      cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1079
      if perc_done is not None:
1080
        done = False
1081
        if est_time is not None:
1082
          rem_time = "%d estimated seconds remaining" % est_time
1083
          max_time = est_time
1084
        else:
1085
          rem_time = "no time estimate"
1086
        proc.LogInfo("- device %s: %5.2f%% done, %s" %
1087
                     (instance.disks[i].iv_name, perc_done, rem_time))
1088
    if done or oneshot:
1089
      break
1090

    
1091
    if unlock:
1092
      utils.Unlock('cmd')
1093
    try:
1094
      time.sleep(min(60, max_time))
1095
    finally:
1096
      if unlock:
1097
        utils.Lock('cmd')
1098

    
1099
  if done:
1100
    proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1101
  return not cumul_degraded
1102

    
1103

    
1104
def _CheckDiskConsistency(cfgw, dev, node, on_primary, ldisk=False):
1105
  """Check that mirrors are not degraded.
1106

1107
  The ldisk parameter, if True, will change the test from the
1108
  is_degraded attribute (which represents overall non-ok status for
1109
  the device(s)) to the ldisk (representing the local storage status).
1110

1111
  """
1112
  cfgw.SetDiskID(dev, node)
1113
  if ldisk:
1114
    idx = 6
1115
  else:
1116
    idx = 5
1117

    
1118
  result = True
1119
  if on_primary or dev.AssembleOnSecondary():
1120
    rstats = rpc.call_blockdev_find(node, dev)
1121
    if not rstats:
1122
      logger.ToStderr("Can't get any data from node %s" % node)
1123
      result = False
1124
    else:
1125
      result = result and (not rstats[idx])
1126
  if dev.children:
1127
    for child in dev.children:
1128
      result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1129

    
1130
  return result
1131

    
1132

    
1133
class LUDiagnoseOS(NoHooksLU):
1134
  """Logical unit for OS diagnose/query.
1135

1136
  """
1137
  _OP_REQP = []
1138

    
1139
  def CheckPrereq(self):
1140
    """Check prerequisites.
1141

1142
    This always succeeds, since this is a pure query LU.
1143

1144
    """
1145
    return
1146

    
1147
  def Exec(self, feedback_fn):
1148
    """Compute the list of OSes.
1149

1150
    """
1151
    node_list = self.cfg.GetNodeList()
1152
    node_data = rpc.call_os_diagnose(node_list)
1153
    if node_data == False:
1154
      raise errors.OpExecError("Can't gather the list of OSes")
1155
    return node_data
1156

    
1157

    
1158
class LURemoveNode(LogicalUnit):
1159
  """Logical unit for removing a node.
1160

1161
  """
1162
  HPATH = "node-remove"
1163
  HTYPE = constants.HTYPE_NODE
1164
  _OP_REQP = ["node_name"]
1165

    
1166
  def BuildHooksEnv(self):
1167
    """Build hooks env.
1168

1169
    This doesn't run on the target node in the pre phase as a failed
1170
    node would not allows itself to run.
1171

1172
    """
1173
    env = {
1174
      "OP_TARGET": self.op.node_name,
1175
      "NODE_NAME": self.op.node_name,
1176
      }
1177
    all_nodes = self.cfg.GetNodeList()
1178
    all_nodes.remove(self.op.node_name)
1179
    return env, all_nodes, all_nodes
1180

    
1181
  def CheckPrereq(self):
1182
    """Check prerequisites.
1183

1184
    This checks:
1185
     - the node exists in the configuration
1186
     - it does not have primary or secondary instances
1187
     - it's not the master
1188

1189
    Any errors are signalled by raising errors.OpPrereqError.
1190

1191
    """
1192
    node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1193
    if node is None:
1194
      raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1195

    
1196
    instance_list = self.cfg.GetInstanceList()
1197

    
1198
    masternode = self.sstore.GetMasterNode()
1199
    if node.name == masternode:
1200
      raise errors.OpPrereqError("Node is the master node,"
1201
                                 " you need to failover first.")
1202

    
1203
    for instance_name in instance_list:
1204
      instance = self.cfg.GetInstanceInfo(instance_name)
1205
      if node.name == instance.primary_node:
1206
        raise errors.OpPrereqError("Instance %s still running on the node,"
1207
                                   " please remove first." % instance_name)
1208
      if node.name in instance.secondary_nodes:
1209
        raise errors.OpPrereqError("Instance %s has node as a secondary,"
1210
                                   " please remove first." % instance_name)
1211
    self.op.node_name = node.name
1212
    self.node = node
1213

    
1214
  def Exec(self, feedback_fn):
1215
    """Removes the node from the cluster.
1216

1217
    """
1218
    node = self.node
1219
    logger.Info("stopping the node daemon and removing configs from node %s" %
1220
                node.name)
1221

    
1222
    rpc.call_node_leave_cluster(node.name)
1223

    
1224
    ssh.SSHCall(node.name, 'root', "%s stop" % constants.NODE_INITD_SCRIPT)
1225

    
1226
    logger.Info("Removing node %s from config" % node.name)
1227

    
1228
    self.cfg.RemoveNode(node.name)
1229

    
1230
    _RemoveHostFromEtcHosts(node.name)
1231

    
1232

    
1233
class LUQueryNodes(NoHooksLU):
1234
  """Logical unit for querying nodes.
1235

1236
  """
1237
  _OP_REQP = ["output_fields", "names"]
1238

    
1239
  def CheckPrereq(self):
1240
    """Check prerequisites.
1241

1242
    This checks that the fields required are valid output fields.
1243

1244
    """
1245
    self.dynamic_fields = frozenset(["dtotal", "dfree",
1246
                                     "mtotal", "mnode", "mfree",
1247
                                     "bootid"])
1248

    
1249
    _CheckOutputFields(static=["name", "pinst_cnt", "sinst_cnt",
1250
                               "pinst_list", "sinst_list",
1251
                               "pip", "sip"],
1252
                       dynamic=self.dynamic_fields,
1253
                       selected=self.op.output_fields)
1254

    
1255
    self.wanted = _GetWantedNodes(self, self.op.names)
1256

    
1257
  def Exec(self, feedback_fn):
1258
    """Computes the list of nodes and their attributes.
1259

1260
    """
1261
    nodenames = self.wanted
1262
    nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1263

    
1264
    # begin data gathering
1265

    
1266
    if self.dynamic_fields.intersection(self.op.output_fields):
1267
      live_data = {}
1268
      node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1269
      for name in nodenames:
1270
        nodeinfo = node_data.get(name, None)
1271
        if nodeinfo:
1272
          live_data[name] = {
1273
            "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1274
            "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1275
            "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1276
            "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1277
            "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1278
            "bootid": nodeinfo['bootid'],
1279
            }
1280
        else:
1281
          live_data[name] = {}
1282
    else:
1283
      live_data = dict.fromkeys(nodenames, {})
1284

    
1285
    node_to_primary = dict([(name, set()) for name in nodenames])
1286
    node_to_secondary = dict([(name, set()) for name in nodenames])
1287

    
1288
    inst_fields = frozenset(("pinst_cnt", "pinst_list",
1289
                             "sinst_cnt", "sinst_list"))
1290
    if inst_fields & frozenset(self.op.output_fields):
1291
      instancelist = self.cfg.GetInstanceList()
1292

    
1293
      for instance_name in instancelist:
1294
        inst = self.cfg.GetInstanceInfo(instance_name)
1295
        if inst.primary_node in node_to_primary:
1296
          node_to_primary[inst.primary_node].add(inst.name)
1297
        for secnode in inst.secondary_nodes:
1298
          if secnode in node_to_secondary:
1299
            node_to_secondary[secnode].add(inst.name)
1300

    
1301
    # end data gathering
1302

    
1303
    output = []
1304
    for node in nodelist:
1305
      node_output = []
1306
      for field in self.op.output_fields:
1307
        if field == "name":
1308
          val = node.name
1309
        elif field == "pinst_list":
1310
          val = list(node_to_primary[node.name])
1311
        elif field == "sinst_list":
1312
          val = list(node_to_secondary[node.name])
1313
        elif field == "pinst_cnt":
1314
          val = len(node_to_primary[node.name])
1315
        elif field == "sinst_cnt":
1316
          val = len(node_to_secondary[node.name])
1317
        elif field == "pip":
1318
          val = node.primary_ip
1319
        elif field == "sip":
1320
          val = node.secondary_ip
1321
        elif field in self.dynamic_fields:
1322
          val = live_data[node.name].get(field, None)
1323
        else:
1324
          raise errors.ParameterError(field)
1325
        node_output.append(val)
1326
      output.append(node_output)
1327

    
1328
    return output
1329

    
1330

    
1331
class LUQueryNodeVolumes(NoHooksLU):
1332
  """Logical unit for getting volumes on node(s).
1333

1334
  """
1335
  _OP_REQP = ["nodes", "output_fields"]
1336

    
1337
  def CheckPrereq(self):
1338
    """Check prerequisites.
1339

1340
    This checks that the fields required are valid output fields.
1341

1342
    """
1343
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1344

    
1345
    _CheckOutputFields(static=["node"],
1346
                       dynamic=["phys", "vg", "name", "size", "instance"],
1347
                       selected=self.op.output_fields)
1348

    
1349

    
1350
  def Exec(self, feedback_fn):
1351
    """Computes the list of nodes and their attributes.
1352

1353
    """
1354
    nodenames = self.nodes
1355
    volumes = rpc.call_node_volumes(nodenames)
1356

    
1357
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
1358
             in self.cfg.GetInstanceList()]
1359

    
1360
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1361

    
1362
    output = []
1363
    for node in nodenames:
1364
      if node not in volumes or not volumes[node]:
1365
        continue
1366

    
1367
      node_vols = volumes[node][:]
1368
      node_vols.sort(key=lambda vol: vol['dev'])
1369

    
1370
      for vol in node_vols:
1371
        node_output = []
1372
        for field in self.op.output_fields:
1373
          if field == "node":
1374
            val = node
1375
          elif field == "phys":
1376
            val = vol['dev']
1377
          elif field == "vg":
1378
            val = vol['vg']
1379
          elif field == "name":
1380
            val = vol['name']
1381
          elif field == "size":
1382
            val = int(float(vol['size']))
1383
          elif field == "instance":
1384
            for inst in ilist:
1385
              if node not in lv_by_node[inst]:
1386
                continue
1387
              if vol['name'] in lv_by_node[inst][node]:
1388
                val = inst.name
1389
                break
1390
            else:
1391
              val = '-'
1392
          else:
1393
            raise errors.ParameterError(field)
1394
          node_output.append(str(val))
1395

    
1396
        output.append(node_output)
1397

    
1398
    return output
1399

    
1400

    
1401
class LUAddNode(LogicalUnit):
1402
  """Logical unit for adding node to the cluster.
1403

1404
  """
1405
  HPATH = "node-add"
1406
  HTYPE = constants.HTYPE_NODE
1407
  _OP_REQP = ["node_name"]
1408

    
1409
  def BuildHooksEnv(self):
1410
    """Build hooks env.
1411

1412
    This will run on all nodes before, and on all nodes + the new node after.
1413

1414
    """
1415
    env = {
1416
      "OP_TARGET": self.op.node_name,
1417
      "NODE_NAME": self.op.node_name,
1418
      "NODE_PIP": self.op.primary_ip,
1419
      "NODE_SIP": self.op.secondary_ip,
1420
      }
1421
    nodes_0 = self.cfg.GetNodeList()
1422
    nodes_1 = nodes_0 + [self.op.node_name, ]
1423
    return env, nodes_0, nodes_1
1424

    
1425
  def CheckPrereq(self):
1426
    """Check prerequisites.
1427

1428
    This checks:
1429
     - the new node is not already in the config
1430
     - it is resolvable
1431
     - its parameters (single/dual homed) matches the cluster
1432

1433
    Any errors are signalled by raising errors.OpPrereqError.
1434

1435
    """
1436
    node_name = self.op.node_name
1437
    cfg = self.cfg
1438

    
1439
    dns_data = utils.HostInfo(node_name)
1440

    
1441
    node = dns_data.name
1442
    primary_ip = self.op.primary_ip = dns_data.ip
1443
    secondary_ip = getattr(self.op, "secondary_ip", None)
1444
    if secondary_ip is None:
1445
      secondary_ip = primary_ip
1446
    if not utils.IsValidIP(secondary_ip):
1447
      raise errors.OpPrereqError("Invalid secondary IP given")
1448
    self.op.secondary_ip = secondary_ip
1449
    node_list = cfg.GetNodeList()
1450
    if node in node_list:
1451
      raise errors.OpPrereqError("Node %s is already in the configuration"
1452
                                 % node)
1453

    
1454
    for existing_node_name in node_list:
1455
      existing_node = cfg.GetNodeInfo(existing_node_name)
1456
      if (existing_node.primary_ip == primary_ip or
1457
          existing_node.secondary_ip == primary_ip or
1458
          existing_node.primary_ip == secondary_ip or
1459
          existing_node.secondary_ip == secondary_ip):
1460
        raise errors.OpPrereqError("New node ip address(es) conflict with"
1461
                                   " existing node %s" % existing_node.name)
1462

    
1463
    # check that the type of the node (single versus dual homed) is the
1464
    # same as for the master
1465
    myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1466
    master_singlehomed = myself.secondary_ip == myself.primary_ip
1467
    newbie_singlehomed = secondary_ip == primary_ip
1468
    if master_singlehomed != newbie_singlehomed:
1469
      if master_singlehomed:
1470
        raise errors.OpPrereqError("The master has no private ip but the"
1471
                                   " new node has one")
1472
      else:
1473
        raise errors.OpPrereqError("The master has a private ip but the"
1474
                                   " new node doesn't have one")
1475

    
1476
    # checks reachablity
1477
    if not utils.TcpPing(utils.HostInfo().name,
1478
                         primary_ip,
1479
                         constants.DEFAULT_NODED_PORT):
1480
      raise errors.OpPrereqError("Node not reachable by ping")
1481

    
1482
    if not newbie_singlehomed:
1483
      # check reachability from my secondary ip to newbie's secondary ip
1484
      if not utils.TcpPing(myself.secondary_ip,
1485
                           secondary_ip,
1486
                           constants.DEFAULT_NODED_PORT):
1487
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1488
                                   " based ping to noded port")
1489

    
1490
    self.new_node = objects.Node(name=node,
1491
                                 primary_ip=primary_ip,
1492
                                 secondary_ip=secondary_ip)
1493

    
1494
    if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1495
      if not os.path.exists(constants.VNC_PASSWORD_FILE):
1496
        raise errors.OpPrereqError("Cluster VNC password file %s missing" %
1497
                                   constants.VNC_PASSWORD_FILE)
1498

    
1499
  def Exec(self, feedback_fn):
1500
    """Adds the new node to the cluster.
1501

1502
    """
1503
    new_node = self.new_node
1504
    node = new_node.name
1505

    
1506
    # set up inter-node password and certificate and restarts the node daemon
1507
    gntpass = self.sstore.GetNodeDaemonPassword()
1508
    if not re.match('^[a-zA-Z0-9.]{1,64}$', gntpass):
1509
      raise errors.OpExecError("ganeti password corruption detected")
1510
    f = open(constants.SSL_CERT_FILE)
1511
    try:
1512
      gntpem = f.read(8192)
1513
    finally:
1514
      f.close()
1515
    # in the base64 pem encoding, neither '!' nor '.' are valid chars,
1516
    # so we use this to detect an invalid certificate; as long as the
1517
    # cert doesn't contain this, the here-document will be correctly
1518
    # parsed by the shell sequence below
1519
    if re.search('^!EOF\.', gntpem, re.MULTILINE):
1520
      raise errors.OpExecError("invalid PEM encoding in the SSL certificate")
1521
    if not gntpem.endswith("\n"):
1522
      raise errors.OpExecError("PEM must end with newline")
1523
    logger.Info("copy cluster pass to %s and starting the node daemon" % node)
1524

    
1525
    # and then connect with ssh to set password and start ganeti-noded
1526
    # note that all the below variables are sanitized at this point,
1527
    # either by being constants or by the checks above
1528
    ss = self.sstore
1529
    mycommand = ("umask 077 && "
1530
                 "echo '%s' > '%s' && "
1531
                 "cat > '%s' << '!EOF.' && \n"
1532
                 "%s!EOF.\n%s restart" %
1533
                 (gntpass, ss.KeyToFilename(ss.SS_NODED_PASS),
1534
                  constants.SSL_CERT_FILE, gntpem,
1535
                  constants.NODE_INITD_SCRIPT))
1536

    
1537
    result = ssh.SSHCall(node, 'root', mycommand, batch=False, ask_key=True)
1538
    if result.failed:
1539
      raise errors.OpExecError("Remote command on node %s, error: %s,"
1540
                               " output: %s" %
1541
                               (node, result.fail_reason, result.output))
1542

    
1543
    # check connectivity
1544
    time.sleep(4)
1545

    
1546
    result = rpc.call_version([node])[node]
1547
    if result:
1548
      if constants.PROTOCOL_VERSION == result:
1549
        logger.Info("communication to node %s fine, sw version %s match" %
1550
                    (node, result))
1551
      else:
1552
        raise errors.OpExecError("Version mismatch master version %s,"
1553
                                 " node version %s" %
1554
                                 (constants.PROTOCOL_VERSION, result))
1555
    else:
1556
      raise errors.OpExecError("Cannot get version from the new node")
1557

    
1558
    # setup ssh on node
1559
    logger.Info("copy ssh key to node %s" % node)
1560
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1561
    keyarray = []
1562
    keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1563
                constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1564
                priv_key, pub_key]
1565

    
1566
    for i in keyfiles:
1567
      f = open(i, 'r')
1568
      try:
1569
        keyarray.append(f.read())
1570
      finally:
1571
        f.close()
1572

    
1573
    result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1574
                               keyarray[3], keyarray[4], keyarray[5])
1575

    
1576
    if not result:
1577
      raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1578

    
1579
    # Add node to our /etc/hosts, and add key to known_hosts
1580
    _AddHostToEtcHosts(new_node.name)
1581

    
1582
    _UpdateKnownHosts(new_node.name, new_node.primary_ip,
1583
                      self.cfg.GetHostKey())
1584

    
1585
    if new_node.secondary_ip != new_node.primary_ip:
1586
      if not rpc.call_node_tcp_ping(new_node.name,
1587
                                    constants.LOCALHOST_IP_ADDRESS,
1588
                                    new_node.secondary_ip,
1589
                                    constants.DEFAULT_NODED_PORT,
1590
                                    10, False):
1591
        raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1592
                                 " you gave (%s). Please fix and re-run this"
1593
                                 " command." % new_node.secondary_ip)
1594

    
1595
    success, msg = ssh.VerifyNodeHostname(node)
1596
    if not success:
1597
      raise errors.OpExecError("Node '%s' claims it has a different hostname"
1598
                               " than the one the resolver gives: %s."
1599
                               " Please fix and re-run this command." %
1600
                               (node, msg))
1601

    
1602
    # Distribute updated /etc/hosts and known_hosts to all nodes,
1603
    # including the node just added
1604
    myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1605
    dist_nodes = self.cfg.GetNodeList() + [node]
1606
    if myself.name in dist_nodes:
1607
      dist_nodes.remove(myself.name)
1608

    
1609
    logger.Debug("Copying hosts and known_hosts to all nodes")
1610
    for fname in ("/etc/hosts", constants.SSH_KNOWN_HOSTS_FILE):
1611
      result = rpc.call_upload_file(dist_nodes, fname)
1612
      for to_node in dist_nodes:
1613
        if not result[to_node]:
1614
          logger.Error("copy of file %s to node %s failed" %
1615
                       (fname, to_node))
1616

    
1617
    to_copy = ss.GetFileList()
1618
    if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1619
      to_copy.append(constants.VNC_PASSWORD_FILE)
1620
    for fname in to_copy:
1621
      if not ssh.CopyFileToNode(node, fname):
1622
        logger.Error("could not copy file %s to node %s" % (fname, node))
1623

    
1624
    logger.Info("adding node %s to cluster.conf" % node)
1625
    self.cfg.AddNode(new_node)
1626

    
1627

    
1628
class LUMasterFailover(LogicalUnit):
1629
  """Failover the master node to the current node.
1630

1631
  This is a special LU in that it must run on a non-master node.
1632

1633
  """
1634
  HPATH = "master-failover"
1635
  HTYPE = constants.HTYPE_CLUSTER
1636
  REQ_MASTER = False
1637
  _OP_REQP = []
1638

    
1639
  def BuildHooksEnv(self):
1640
    """Build hooks env.
1641

1642
    This will run on the new master only in the pre phase, and on all
1643
    the nodes in the post phase.
1644

1645
    """
1646
    env = {
1647
      "OP_TARGET": self.new_master,
1648
      "NEW_MASTER": self.new_master,
1649
      "OLD_MASTER": self.old_master,
1650
      }
1651
    return env, [self.new_master], self.cfg.GetNodeList()
1652

    
1653
  def CheckPrereq(self):
1654
    """Check prerequisites.
1655

1656
    This checks that we are not already the master.
1657

1658
    """
1659
    self.new_master = utils.HostInfo().name
1660
    self.old_master = self.sstore.GetMasterNode()
1661

    
1662
    if self.old_master == self.new_master:
1663
      raise errors.OpPrereqError("This commands must be run on the node"
1664
                                 " where you want the new master to be."
1665
                                 " %s is already the master" %
1666
                                 self.old_master)
1667

    
1668
  def Exec(self, feedback_fn):
1669
    """Failover the master node.
1670

1671
    This command, when run on a non-master node, will cause the current
1672
    master to cease being master, and the non-master to become new
1673
    master.
1674

1675
    """
1676
    #TODO: do not rely on gethostname returning the FQDN
1677
    logger.Info("setting master to %s, old master: %s" %
1678
                (self.new_master, self.old_master))
1679

    
1680
    if not rpc.call_node_stop_master(self.old_master):
1681
      logger.Error("could disable the master role on the old master"
1682
                   " %s, please disable manually" % self.old_master)
1683

    
1684
    ss = self.sstore
1685
    ss.SetKey(ss.SS_MASTER_NODE, self.new_master)
1686
    if not rpc.call_upload_file(self.cfg.GetNodeList(),
1687
                                ss.KeyToFilename(ss.SS_MASTER_NODE)):
1688
      logger.Error("could not distribute the new simple store master file"
1689
                   " to the other nodes, please check.")
1690

    
1691
    if not rpc.call_node_start_master(self.new_master):
1692
      logger.Error("could not start the master role on the new master"
1693
                   " %s, please check" % self.new_master)
1694
      feedback_fn("Error in activating the master IP on the new master,"
1695
                  " please fix manually.")
1696

    
1697

    
1698

    
1699
class LUQueryClusterInfo(NoHooksLU):
1700
  """Query cluster configuration.
1701

1702
  """
1703
  _OP_REQP = []
1704
  REQ_MASTER = False
1705

    
1706
  def CheckPrereq(self):
1707
    """No prerequsites needed for this LU.
1708

1709
    """
1710
    pass
1711

    
1712
  def Exec(self, feedback_fn):
1713
    """Return cluster config.
1714

1715
    """
1716
    result = {
1717
      "name": self.sstore.GetClusterName(),
1718
      "software_version": constants.RELEASE_VERSION,
1719
      "protocol_version": constants.PROTOCOL_VERSION,
1720
      "config_version": constants.CONFIG_VERSION,
1721
      "os_api_version": constants.OS_API_VERSION,
1722
      "export_version": constants.EXPORT_VERSION,
1723
      "master": self.sstore.GetMasterNode(),
1724
      "architecture": (platform.architecture()[0], platform.machine()),
1725
      }
1726

    
1727
    return result
1728

    
1729

    
1730
class LUClusterCopyFile(NoHooksLU):
1731
  """Copy file to cluster.
1732

1733
  """
1734
  _OP_REQP = ["nodes", "filename"]
1735

    
1736
  def CheckPrereq(self):
1737
    """Check prerequisites.
1738

1739
    It should check that the named file exists and that the given list
1740
    of nodes is valid.
1741

1742
    """
1743
    if not os.path.exists(self.op.filename):
1744
      raise errors.OpPrereqError("No such filename '%s'" % self.op.filename)
1745

    
1746
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1747

    
1748
  def Exec(self, feedback_fn):
1749
    """Copy a file from master to some nodes.
1750

1751
    Args:
1752
      opts - class with options as members
1753
      args - list containing a single element, the file name
1754
    Opts used:
1755
      nodes - list containing the name of target nodes; if empty, all nodes
1756

1757
    """
1758
    filename = self.op.filename
1759

    
1760
    myname = utils.HostInfo().name
1761

    
1762
    for node in self.nodes:
1763
      if node == myname:
1764
        continue
1765
      if not ssh.CopyFileToNode(node, filename):
1766
        logger.Error("Copy of file %s to node %s failed" % (filename, node))
1767

    
1768

    
1769
class LUDumpClusterConfig(NoHooksLU):
1770
  """Return a text-representation of the cluster-config.
1771

1772
  """
1773
  _OP_REQP = []
1774

    
1775
  def CheckPrereq(self):
1776
    """No prerequisites.
1777

1778
    """
1779
    pass
1780

    
1781
  def Exec(self, feedback_fn):
1782
    """Dump a representation of the cluster config to the standard output.
1783

1784
    """
1785
    return self.cfg.DumpConfig()
1786

    
1787

    
1788
class LURunClusterCommand(NoHooksLU):
1789
  """Run a command on some nodes.
1790

1791
  """
1792
  _OP_REQP = ["command", "nodes"]
1793

    
1794
  def CheckPrereq(self):
1795
    """Check prerequisites.
1796

1797
    It checks that the given list of nodes is valid.
1798

1799
    """
1800
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1801

    
1802
  def Exec(self, feedback_fn):
1803
    """Run a command on some nodes.
1804

1805
    """
1806
    data = []
1807
    for node in self.nodes:
1808
      result = ssh.SSHCall(node, "root", self.op.command)
1809
      data.append((node, result.output, result.exit_code))
1810

    
1811
    return data
1812

    
1813

    
1814
class LUActivateInstanceDisks(NoHooksLU):
1815
  """Bring up an instance's disks.
1816

1817
  """
1818
  _OP_REQP = ["instance_name"]
1819

    
1820
  def CheckPrereq(self):
1821
    """Check prerequisites.
1822

1823
    This checks that the instance is in the cluster.
1824

1825
    """
1826
    instance = self.cfg.GetInstanceInfo(
1827
      self.cfg.ExpandInstanceName(self.op.instance_name))
1828
    if instance is None:
1829
      raise errors.OpPrereqError("Instance '%s' not known" %
1830
                                 self.op.instance_name)
1831
    self.instance = instance
1832

    
1833

    
1834
  def Exec(self, feedback_fn):
1835
    """Activate the disks.
1836

1837
    """
1838
    disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
1839
    if not disks_ok:
1840
      raise errors.OpExecError("Cannot activate block devices")
1841

    
1842
    return disks_info
1843

    
1844

    
1845
def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
1846
  """Prepare the block devices for an instance.
1847

1848
  This sets up the block devices on all nodes.
1849

1850
  Args:
1851
    instance: a ganeti.objects.Instance object
1852
    ignore_secondaries: if true, errors on secondary nodes won't result
1853
                        in an error return from the function
1854

1855
  Returns:
1856
    false if the operation failed
1857
    list of (host, instance_visible_name, node_visible_name) if the operation
1858
         suceeded with the mapping from node devices to instance devices
1859
  """
1860
  device_info = []
1861
  disks_ok = True
1862
  for inst_disk in instance.disks:
1863
    master_result = None
1864
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1865
      cfg.SetDiskID(node_disk, node)
1866
      is_primary = node == instance.primary_node
1867
      result = rpc.call_blockdev_assemble(node, node_disk,
1868
                                          instance.name, is_primary)
1869
      if not result:
1870
        logger.Error("could not prepare block device %s on node %s"
1871
                     " (is_primary=%s)" %
1872
                     (inst_disk.iv_name, node, is_primary))
1873
        if is_primary or not ignore_secondaries:
1874
          disks_ok = False
1875
      if is_primary:
1876
        master_result = result
1877
    device_info.append((instance.primary_node, inst_disk.iv_name,
1878
                        master_result))
1879

    
1880
  # leave the disks configured for the primary node
1881
  # this is a workaround that would be fixed better by
1882
  # improving the logical/physical id handling
1883
  for disk in instance.disks:
1884
    cfg.SetDiskID(disk, instance.primary_node)
1885

    
1886
  return disks_ok, device_info
1887

    
1888

    
1889
def _StartInstanceDisks(cfg, instance, force):
1890
  """Start the disks of an instance.
1891

1892
  """
1893
  disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
1894
                                           ignore_secondaries=force)
1895
  if not disks_ok:
1896
    _ShutdownInstanceDisks(instance, cfg)
1897
    if force is not None and not force:
1898
      logger.Error("If the message above refers to a secondary node,"
1899
                   " you can retry the operation using '--force'.")
1900
    raise errors.OpExecError("Disk consistency error")
1901

    
1902

    
1903
class LUDeactivateInstanceDisks(NoHooksLU):
1904
  """Shutdown an instance's disks.
1905

1906
  """
1907
  _OP_REQP = ["instance_name"]
1908

    
1909
  def CheckPrereq(self):
1910
    """Check prerequisites.
1911

1912
    This checks that the instance is in the cluster.
1913

1914
    """
1915
    instance = self.cfg.GetInstanceInfo(
1916
      self.cfg.ExpandInstanceName(self.op.instance_name))
1917
    if instance is None:
1918
      raise errors.OpPrereqError("Instance '%s' not known" %
1919
                                 self.op.instance_name)
1920
    self.instance = instance
1921

    
1922
  def Exec(self, feedback_fn):
1923
    """Deactivate the disks
1924

1925
    """
1926
    instance = self.instance
1927
    ins_l = rpc.call_instance_list([instance.primary_node])
1928
    ins_l = ins_l[instance.primary_node]
1929
    if not type(ins_l) is list:
1930
      raise errors.OpExecError("Can't contact node '%s'" %
1931
                               instance.primary_node)
1932

    
1933
    if self.instance.name in ins_l:
1934
      raise errors.OpExecError("Instance is running, can't shutdown"
1935
                               " block devices.")
1936

    
1937
    _ShutdownInstanceDisks(instance, self.cfg)
1938

    
1939

    
1940
def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
1941
  """Shutdown block devices of an instance.
1942

1943
  This does the shutdown on all nodes of the instance.
1944

1945
  If the ignore_primary is false, errors on the primary node are
1946
  ignored.
1947

1948
  """
1949
  result = True
1950
  for disk in instance.disks:
1951
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
1952
      cfg.SetDiskID(top_disk, node)
1953
      if not rpc.call_blockdev_shutdown(node, top_disk):
1954
        logger.Error("could not shutdown block device %s on node %s" %
1955
                     (disk.iv_name, node))
1956
        if not ignore_primary or node != instance.primary_node:
1957
          result = False
1958
  return result
1959

    
1960

    
1961
class LUStartupInstance(LogicalUnit):
1962
  """Starts an instance.
1963

1964
  """
1965
  HPATH = "instance-start"
1966
  HTYPE = constants.HTYPE_INSTANCE
1967
  _OP_REQP = ["instance_name", "force"]
1968

    
1969
  def BuildHooksEnv(self):
1970
    """Build hooks env.
1971

1972
    This runs on master, primary and secondary nodes of the instance.
1973

1974
    """
1975
    env = {
1976
      "FORCE": self.op.force,
1977
      }
1978
    env.update(_BuildInstanceHookEnvByObject(self.instance))
1979
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
1980
          list(self.instance.secondary_nodes))
1981
    return env, nl, nl
1982

    
1983
  def CheckPrereq(self):
1984
    """Check prerequisites.
1985

1986
    This checks that the instance is in the cluster.
1987

1988
    """
1989
    instance = self.cfg.GetInstanceInfo(
1990
      self.cfg.ExpandInstanceName(self.op.instance_name))
1991
    if instance is None:
1992
      raise errors.OpPrereqError("Instance '%s' not known" %
1993
                                 self.op.instance_name)
1994

    
1995
    # check bridges existance
1996
    _CheckInstanceBridgesExist(instance)
1997

    
1998
    self.instance = instance
1999
    self.op.instance_name = instance.name
2000

    
2001
  def Exec(self, feedback_fn):
2002
    """Start the instance.
2003

2004
    """
2005
    instance = self.instance
2006
    force = self.op.force
2007
    extra_args = getattr(self.op, "extra_args", "")
2008

    
2009
    node_current = instance.primary_node
2010

    
2011
    nodeinfo = rpc.call_node_info([node_current], self.cfg.GetVGName())
2012
    if not nodeinfo:
2013
      raise errors.OpExecError("Could not contact node %s for infos" %
2014
                               (node_current))
2015

    
2016
    freememory = nodeinfo[node_current]['memory_free']
2017
    memory = instance.memory
2018
    if memory > freememory:
2019
      raise errors.OpExecError("Not enough memory to start instance"
2020
                               " %s on node %s"
2021
                               " needed %s MiB, available %s MiB" %
2022
                               (instance.name, node_current, memory,
2023
                                freememory))
2024

    
2025
    _StartInstanceDisks(self.cfg, instance, force)
2026

    
2027
    if not rpc.call_instance_start(node_current, instance, extra_args):
2028
      _ShutdownInstanceDisks(instance, self.cfg)
2029
      raise errors.OpExecError("Could not start instance")
2030

    
2031
    self.cfg.MarkInstanceUp(instance.name)
2032

    
2033

    
2034
class LURebootInstance(LogicalUnit):
2035
  """Reboot an instance.
2036

2037
  """
2038
  HPATH = "instance-reboot"
2039
  HTYPE = constants.HTYPE_INSTANCE
2040
  _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2041

    
2042
  def BuildHooksEnv(self):
2043
    """Build hooks env.
2044

2045
    This runs on master, primary and secondary nodes of the instance.
2046

2047
    """
2048
    env = {
2049
      "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2050
      }
2051
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2052
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2053
          list(self.instance.secondary_nodes))
2054
    return env, nl, nl
2055

    
2056
  def CheckPrereq(self):
2057
    """Check prerequisites.
2058

2059
    This checks that the instance is in the cluster.
2060

2061
    """
2062
    instance = self.cfg.GetInstanceInfo(
2063
      self.cfg.ExpandInstanceName(self.op.instance_name))
2064
    if instance is None:
2065
      raise errors.OpPrereqError("Instance '%s' not known" %
2066
                                 self.op.instance_name)
2067

    
2068
    # check bridges existance
2069
    _CheckInstanceBridgesExist(instance)
2070

    
2071
    self.instance = instance
2072
    self.op.instance_name = instance.name
2073

    
2074
  def Exec(self, feedback_fn):
2075
    """Reboot the instance.
2076

2077
    """
2078
    instance = self.instance
2079
    ignore_secondaries = self.op.ignore_secondaries
2080
    reboot_type = self.op.reboot_type
2081
    extra_args = getattr(self.op, "extra_args", "")
2082

    
2083
    node_current = instance.primary_node
2084

    
2085
    if reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2086
                           constants.INSTANCE_REBOOT_HARD,
2087
                           constants.INSTANCE_REBOOT_FULL]:
2088
      raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2089
                                  (constants.INSTANCE_REBOOT_SOFT,
2090
                                   constants.INSTANCE_REBOOT_HARD,
2091
                                   constants.INSTANCE_REBOOT_FULL))
2092

    
2093
    if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2094
                       constants.INSTANCE_REBOOT_HARD]:
2095
      if not rpc.call_instance_reboot(node_current, instance,
2096
                                      reboot_type, extra_args):
2097
        raise errors.OpExecError("Could not reboot instance")
2098
    else:
2099
      if not rpc.call_instance_shutdown(node_current, instance):
2100
        raise errors.OpExecError("could not shutdown instance for full reboot")
2101
      _ShutdownInstanceDisks(instance, self.cfg)
2102
      _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
2103
      if not rpc.call_instance_start(node_current, instance, extra_args):
2104
        _ShutdownInstanceDisks(instance, self.cfg)
2105
        raise errors.OpExecError("Could not start instance for full reboot")
2106

    
2107
    self.cfg.MarkInstanceUp(instance.name)
2108

    
2109

    
2110
class LUShutdownInstance(LogicalUnit):
2111
  """Shutdown an instance.
2112

2113
  """
2114
  HPATH = "instance-stop"
2115
  HTYPE = constants.HTYPE_INSTANCE
2116
  _OP_REQP = ["instance_name"]
2117

    
2118
  def BuildHooksEnv(self):
2119
    """Build hooks env.
2120

2121
    This runs on master, primary and secondary nodes of the instance.
2122

2123
    """
2124
    env = _BuildInstanceHookEnvByObject(self.instance)
2125
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2126
          list(self.instance.secondary_nodes))
2127
    return env, nl, nl
2128

    
2129
  def CheckPrereq(self):
2130
    """Check prerequisites.
2131

2132
    This checks that the instance is in the cluster.
2133

2134
    """
2135
    instance = self.cfg.GetInstanceInfo(
2136
      self.cfg.ExpandInstanceName(self.op.instance_name))
2137
    if instance is None:
2138
      raise errors.OpPrereqError("Instance '%s' not known" %
2139
                                 self.op.instance_name)
2140
    self.instance = instance
2141

    
2142
  def Exec(self, feedback_fn):
2143
    """Shutdown the instance.
2144

2145
    """
2146
    instance = self.instance
2147
    node_current = instance.primary_node
2148
    if not rpc.call_instance_shutdown(node_current, instance):
2149
      logger.Error("could not shutdown instance")
2150

    
2151
    self.cfg.MarkInstanceDown(instance.name)
2152
    _ShutdownInstanceDisks(instance, self.cfg)
2153

    
2154

    
2155
class LUReinstallInstance(LogicalUnit):
2156
  """Reinstall an instance.
2157

2158
  """
2159
  HPATH = "instance-reinstall"
2160
  HTYPE = constants.HTYPE_INSTANCE
2161
  _OP_REQP = ["instance_name"]
2162

    
2163
  def BuildHooksEnv(self):
2164
    """Build hooks env.
2165

2166
    This runs on master, primary and secondary nodes of the instance.
2167

2168
    """
2169
    env = _BuildInstanceHookEnvByObject(self.instance)
2170
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2171
          list(self.instance.secondary_nodes))
2172
    return env, nl, nl
2173

    
2174
  def CheckPrereq(self):
2175
    """Check prerequisites.
2176

2177
    This checks that the instance is in the cluster and is not running.
2178

2179
    """
2180
    instance = self.cfg.GetInstanceInfo(
2181
      self.cfg.ExpandInstanceName(self.op.instance_name))
2182
    if instance is None:
2183
      raise errors.OpPrereqError("Instance '%s' not known" %
2184
                                 self.op.instance_name)
2185
    if instance.disk_template == constants.DT_DISKLESS:
2186
      raise errors.OpPrereqError("Instance '%s' has no disks" %
2187
                                 self.op.instance_name)
2188
    if instance.status != "down":
2189
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2190
                                 self.op.instance_name)
2191
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2192
    if remote_info:
2193
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2194
                                 (self.op.instance_name,
2195
                                  instance.primary_node))
2196

    
2197
    self.op.os_type = getattr(self.op, "os_type", None)
2198
    if self.op.os_type is not None:
2199
      # OS verification
2200
      pnode = self.cfg.GetNodeInfo(
2201
        self.cfg.ExpandNodeName(instance.primary_node))
2202
      if pnode is None:
2203
        raise errors.OpPrereqError("Primary node '%s' is unknown" %
2204
                                   self.op.pnode)
2205
      os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2206
      if not os_obj:
2207
        raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2208
                                   " primary node"  % self.op.os_type)
2209

    
2210
    self.instance = instance
2211

    
2212
  def Exec(self, feedback_fn):
2213
    """Reinstall the instance.
2214

2215
    """
2216
    inst = self.instance
2217

    
2218
    if self.op.os_type is not None:
2219
      feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2220
      inst.os = self.op.os_type
2221
      self.cfg.AddInstance(inst)
2222

    
2223
    _StartInstanceDisks(self.cfg, inst, None)
2224
    try:
2225
      feedback_fn("Running the instance OS create scripts...")
2226
      if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2227
        raise errors.OpExecError("Could not install OS for instance %s"
2228
                                 " on node %s" %
2229
                                 (inst.name, inst.primary_node))
2230
    finally:
2231
      _ShutdownInstanceDisks(inst, self.cfg)
2232

    
2233

    
2234
class LURenameInstance(LogicalUnit):
2235
  """Rename an instance.
2236

2237
  """
2238
  HPATH = "instance-rename"
2239
  HTYPE = constants.HTYPE_INSTANCE
2240
  _OP_REQP = ["instance_name", "new_name"]
2241

    
2242
  def BuildHooksEnv(self):
2243
    """Build hooks env.
2244

2245
    This runs on master, primary and secondary nodes of the instance.
2246

2247
    """
2248
    env = _BuildInstanceHookEnvByObject(self.instance)
2249
    env["INSTANCE_NEW_NAME"] = self.op.new_name
2250
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2251
          list(self.instance.secondary_nodes))
2252
    return env, nl, nl
2253

    
2254
  def CheckPrereq(self):
2255
    """Check prerequisites.
2256

2257
    This checks that the instance is in the cluster and is not running.
2258

2259
    """
2260
    instance = self.cfg.GetInstanceInfo(
2261
      self.cfg.ExpandInstanceName(self.op.instance_name))
2262
    if instance is None:
2263
      raise errors.OpPrereqError("Instance '%s' not known" %
2264
                                 self.op.instance_name)
2265
    if instance.status != "down":
2266
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2267
                                 self.op.instance_name)
2268
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2269
    if remote_info:
2270
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2271
                                 (self.op.instance_name,
2272
                                  instance.primary_node))
2273
    self.instance = instance
2274

    
2275
    # new name verification
2276
    name_info = utils.HostInfo(self.op.new_name)
2277

    
2278
    self.op.new_name = new_name = name_info.name
2279
    if not getattr(self.op, "ignore_ip", False):
2280
      command = ["fping", "-q", name_info.ip]
2281
      result = utils.RunCmd(command)
2282
      if not result.failed:
2283
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
2284
                                   (name_info.ip, new_name))
2285

    
2286

    
2287
  def Exec(self, feedback_fn):
2288
    """Reinstall the instance.
2289

2290
    """
2291
    inst = self.instance
2292
    old_name = inst.name
2293

    
2294
    self.cfg.RenameInstance(inst.name, self.op.new_name)
2295

    
2296
    # re-read the instance from the configuration after rename
2297
    inst = self.cfg.GetInstanceInfo(self.op.new_name)
2298

    
2299
    _StartInstanceDisks(self.cfg, inst, None)
2300
    try:
2301
      if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2302
                                          "sda", "sdb"):
2303
        msg = ("Could run OS rename script for instance %s on node %s (but the"
2304
               " instance has been renamed in Ganeti)" %
2305
               (inst.name, inst.primary_node))
2306
        logger.Error(msg)
2307
    finally:
2308
      _ShutdownInstanceDisks(inst, self.cfg)
2309

    
2310

    
2311
class LURemoveInstance(LogicalUnit):
2312
  """Remove an instance.
2313

2314
  """
2315
  HPATH = "instance-remove"
2316
  HTYPE = constants.HTYPE_INSTANCE
2317
  _OP_REQP = ["instance_name"]
2318

    
2319
  def BuildHooksEnv(self):
2320
    """Build hooks env.
2321

2322
    This runs on master, primary and secondary nodes of the instance.
2323

2324
    """
2325
    env = _BuildInstanceHookEnvByObject(self.instance)
2326
    nl = [self.sstore.GetMasterNode()]
2327
    return env, nl, nl
2328

    
2329
  def CheckPrereq(self):
2330
    """Check prerequisites.
2331

2332
    This checks that the instance is in the cluster.
2333

2334
    """
2335
    instance = self.cfg.GetInstanceInfo(
2336
      self.cfg.ExpandInstanceName(self.op.instance_name))
2337
    if instance is None:
2338
      raise errors.OpPrereqError("Instance '%s' not known" %
2339
                                 self.op.instance_name)
2340
    self.instance = instance
2341

    
2342
  def Exec(self, feedback_fn):
2343
    """Remove the instance.
2344

2345
    """
2346
    instance = self.instance
2347
    logger.Info("shutting down instance %s on node %s" %
2348
                (instance.name, instance.primary_node))
2349

    
2350
    if not rpc.call_instance_shutdown(instance.primary_node, instance):
2351
      if self.op.ignore_failures:
2352
        feedback_fn("Warning: can't shutdown instance")
2353
      else:
2354
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2355
                                 (instance.name, instance.primary_node))
2356

    
2357
    logger.Info("removing block devices for instance %s" % instance.name)
2358

    
2359
    if not _RemoveDisks(instance, self.cfg):
2360
      if self.op.ignore_failures:
2361
        feedback_fn("Warning: can't remove instance's disks")
2362
      else:
2363
        raise errors.OpExecError("Can't remove instance's disks")
2364

    
2365
    logger.Info("removing instance %s out of cluster config" % instance.name)
2366

    
2367
    self.cfg.RemoveInstance(instance.name)
2368

    
2369

    
2370
class LUQueryInstances(NoHooksLU):
2371
  """Logical unit for querying instances.
2372

2373
  """
2374
  _OP_REQP = ["output_fields", "names"]
2375

    
2376
  def CheckPrereq(self):
2377
    """Check prerequisites.
2378

2379
    This checks that the fields required are valid output fields.
2380

2381
    """
2382
    self.dynamic_fields = frozenset(["oper_state", "oper_ram"])
2383
    _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2384
                               "admin_state", "admin_ram",
2385
                               "disk_template", "ip", "mac", "bridge",
2386
                               "sda_size", "sdb_size"],
2387
                       dynamic=self.dynamic_fields,
2388
                       selected=self.op.output_fields)
2389

    
2390
    self.wanted = _GetWantedInstances(self, self.op.names)
2391

    
2392
  def Exec(self, feedback_fn):
2393
    """Computes the list of nodes and their attributes.
2394

2395
    """
2396
    instance_names = self.wanted
2397
    instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2398
                     in instance_names]
2399

    
2400
    # begin data gathering
2401

    
2402
    nodes = frozenset([inst.primary_node for inst in instance_list])
2403

    
2404
    bad_nodes = []
2405
    if self.dynamic_fields.intersection(self.op.output_fields):
2406
      live_data = {}
2407
      node_data = rpc.call_all_instances_info(nodes)
2408
      for name in nodes:
2409
        result = node_data[name]
2410
        if result:
2411
          live_data.update(result)
2412
        elif result == False:
2413
          bad_nodes.append(name)
2414
        # else no instance is alive
2415
    else:
2416
      live_data = dict([(name, {}) for name in instance_names])
2417

    
2418
    # end data gathering
2419

    
2420
    output = []
2421
    for instance in instance_list:
2422
      iout = []
2423
      for field in self.op.output_fields:
2424
        if field == "name":
2425
          val = instance.name
2426
        elif field == "os":
2427
          val = instance.os
2428
        elif field == "pnode":
2429
          val = instance.primary_node
2430
        elif field == "snodes":
2431
          val = list(instance.secondary_nodes)
2432
        elif field == "admin_state":
2433
          val = (instance.status != "down")
2434
        elif field == "oper_state":
2435
          if instance.primary_node in bad_nodes:
2436
            val = None
2437
          else:
2438
            val = bool(live_data.get(instance.name))
2439
        elif field == "admin_ram":
2440
          val = instance.memory
2441
        elif field == "oper_ram":
2442
          if instance.primary_node in bad_nodes:
2443
            val = None
2444
          elif instance.name in live_data:
2445
            val = live_data[instance.name].get("memory", "?")
2446
          else:
2447
            val = "-"
2448
        elif field == "disk_template":
2449
          val = instance.disk_template
2450
        elif field == "ip":
2451
          val = instance.nics[0].ip
2452
        elif field == "bridge":
2453
          val = instance.nics[0].bridge
2454
        elif field == "mac":
2455
          val = instance.nics[0].mac
2456
        elif field == "sda_size" or field == "sdb_size":
2457
          disk = instance.FindDisk(field[:3])
2458
          if disk is None:
2459
            val = None
2460
          else:
2461
            val = disk.size
2462
        else:
2463
          raise errors.ParameterError(field)
2464
        iout.append(val)
2465
      output.append(iout)
2466

    
2467
    return output
2468

    
2469

    
2470
class LUFailoverInstance(LogicalUnit):
2471
  """Failover an instance.
2472

2473
  """
2474
  HPATH = "instance-failover"
2475
  HTYPE = constants.HTYPE_INSTANCE
2476
  _OP_REQP = ["instance_name", "ignore_consistency"]
2477

    
2478
  def BuildHooksEnv(self):
2479
    """Build hooks env.
2480

2481
    This runs on master, primary and secondary nodes of the instance.
2482

2483
    """
2484
    env = {
2485
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2486
      }
2487
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2488
    nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2489
    return env, nl, nl
2490

    
2491
  def CheckPrereq(self):
2492
    """Check prerequisites.
2493

2494
    This checks that the instance is in the cluster.
2495

2496
    """
2497
    instance = self.cfg.GetInstanceInfo(
2498
      self.cfg.ExpandInstanceName(self.op.instance_name))
2499
    if instance is None:
2500
      raise errors.OpPrereqError("Instance '%s' not known" %
2501
                                 self.op.instance_name)
2502

    
2503
    if instance.disk_template not in constants.DTS_NET_MIRROR:
2504
      raise errors.OpPrereqError("Instance's disk layout is not"
2505
                                 " network mirrored, cannot failover.")
2506

    
2507
    secondary_nodes = instance.secondary_nodes
2508
    if not secondary_nodes:
2509
      raise errors.ProgrammerError("no secondary node but using "
2510
                                   "DT_REMOTE_RAID1 template")
2511

    
2512
    # check memory requirements on the secondary node
2513
    target_node = secondary_nodes[0]
2514
    nodeinfo = rpc.call_node_info([target_node], self.cfg.GetVGName())
2515
    info = nodeinfo.get(target_node, None)
2516
    if not info:
2517
      raise errors.OpPrereqError("Cannot get current information"
2518
                                 " from node '%s'" % nodeinfo)
2519
    if instance.memory > info['memory_free']:
2520
      raise errors.OpPrereqError("Not enough memory on target node %s."
2521
                                 " %d MB available, %d MB required" %
2522
                                 (target_node, info['memory_free'],
2523
                                  instance.memory))
2524

    
2525
    # check bridge existance
2526
    brlist = [nic.bridge for nic in instance.nics]
2527
    if not rpc.call_bridges_exist(target_node, brlist):
2528
      raise errors.OpPrereqError("One or more target bridges %s does not"
2529
                                 " exist on destination node '%s'" %
2530
                                 (brlist, target_node))
2531

    
2532
    self.instance = instance
2533

    
2534
  def Exec(self, feedback_fn):
2535
    """Failover an instance.
2536

2537
    The failover is done by shutting it down on its present node and
2538
    starting it on the secondary.
2539

2540
    """
2541
    instance = self.instance
2542

    
2543
    source_node = instance.primary_node
2544
    target_node = instance.secondary_nodes[0]
2545

    
2546
    feedback_fn("* checking disk consistency between source and target")
2547
    for dev in instance.disks:
2548
      # for remote_raid1, these are md over drbd
2549
      if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2550
        if not self.op.ignore_consistency:
2551
          raise errors.OpExecError("Disk %s is degraded on target node,"
2552
                                   " aborting failover." % dev.iv_name)
2553

    
2554
    feedback_fn("* checking target node resource availability")
2555
    nodeinfo = rpc.call_node_info([target_node], self.cfg.GetVGName())
2556

    
2557
    if not nodeinfo:
2558
      raise errors.OpExecError("Could not contact target node %s." %
2559
                               target_node)
2560

    
2561
    free_memory = int(nodeinfo[target_node]['memory_free'])
2562
    memory = instance.memory
2563
    if memory > free_memory:
2564
      raise errors.OpExecError("Not enough memory to create instance %s on"
2565
                               " node %s. needed %s MiB, available %s MiB" %
2566
                               (instance.name, target_node, memory,
2567
                                free_memory))
2568

    
2569
    feedback_fn("* shutting down instance on source node")
2570
    logger.Info("Shutting down instance %s on node %s" %
2571
                (instance.name, source_node))
2572

    
2573
    if not rpc.call_instance_shutdown(source_node, instance):
2574
      if self.op.ignore_consistency:
2575
        logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2576
                     " anyway. Please make sure node %s is down"  %
2577
                     (instance.name, source_node, source_node))
2578
      else:
2579
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2580
                                 (instance.name, source_node))
2581

    
2582
    feedback_fn("* deactivating the instance's disks on source node")
2583
    if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2584
      raise errors.OpExecError("Can't shut down the instance's disks.")
2585

    
2586
    instance.primary_node = target_node
2587
    # distribute new instance config to the other nodes
2588
    self.cfg.AddInstance(instance)
2589

    
2590
    feedback_fn("* activating the instance's disks on target node")
2591
    logger.Info("Starting instance %s on node %s" %
2592
                (instance.name, target_node))
2593

    
2594
    disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2595
                                             ignore_secondaries=True)
2596
    if not disks_ok:
2597
      _ShutdownInstanceDisks(instance, self.cfg)
2598
      raise errors.OpExecError("Can't activate the instance's disks")
2599

    
2600
    feedback_fn("* starting the instance on the target node")
2601
    if not rpc.call_instance_start(target_node, instance, None):
2602
      _ShutdownInstanceDisks(instance, self.cfg)
2603
      raise errors.OpExecError("Could not start instance %s on node %s." %
2604
                               (instance.name, target_node))
2605

    
2606

    
2607
def _CreateBlockDevOnPrimary(cfg, node, instance, device, info):
2608
  """Create a tree of block devices on the primary node.
2609

2610
  This always creates all devices.
2611

2612
  """
2613
  if device.children:
2614
    for child in device.children:
2615
      if not _CreateBlockDevOnPrimary(cfg, node, instance, child, info):
2616
        return False
2617

    
2618
  cfg.SetDiskID(device, node)
2619
  new_id = rpc.call_blockdev_create(node, device, device.size,
2620
                                    instance.name, True, info)
2621
  if not new_id:
2622
    return False
2623
  if device.physical_id is None:
2624
    device.physical_id = new_id
2625
  return True
2626

    
2627

    
2628
def _CreateBlockDevOnSecondary(cfg, node, instance, device, force, info):
2629
  """Create a tree of block devices on a secondary node.
2630

2631
  If this device type has to be created on secondaries, create it and
2632
  all its children.
2633

2634
  If not, just recurse to children keeping the same 'force' value.
2635

2636
  """
2637
  if device.CreateOnSecondary():
2638
    force = True
2639
  if device.children:
2640
    for child in device.children:
2641
      if not _CreateBlockDevOnSecondary(cfg, node, instance,
2642
                                        child, force, info):
2643
        return False
2644

    
2645
  if not force:
2646
    return True
2647
  cfg.SetDiskID(device, node)
2648
  new_id = rpc.call_blockdev_create(node, device, device.size,
2649
                                    instance.name, False, info)
2650
  if not new_id:
2651
    return False
2652
  if device.physical_id is None:
2653
    device.physical_id = new_id
2654
  return True
2655

    
2656

    
2657
def _GenerateUniqueNames(cfg, exts):
2658
  """Generate a suitable LV name.
2659

2660
  This will generate a logical volume name for the given instance.
2661

2662
  """
2663
  results = []
2664
  for val in exts:
2665
    new_id = cfg.GenerateUniqueID()
2666
    results.append("%s%s" % (new_id, val))
2667
  return results
2668

    
2669

    
2670
def _GenerateMDDRBDBranch(cfg, primary, secondary, size, names):
2671
  """Generate a drbd device complete with its children.
2672

2673
  """
2674
  port = cfg.AllocatePort()
2675
  vgname = cfg.GetVGName()
2676
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2677
                          logical_id=(vgname, names[0]))
2678
  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2679
                          logical_id=(vgname, names[1]))
2680
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD7, size=size,
2681
                          logical_id = (primary, secondary, port),
2682
                          children = [dev_data, dev_meta])
2683
  return drbd_dev
2684

    
2685

    
2686
def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name):
2687
  """Generate a drbd8 device complete with its children.
2688

2689
  """
2690
  port = cfg.AllocatePort()
2691
  vgname = cfg.GetVGName()
2692
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2693
                          logical_id=(vgname, names[0]))
2694
  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2695
                          logical_id=(vgname, names[1]))
2696
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
2697
                          logical_id = (primary, secondary, port),
2698
                          children = [dev_data, dev_meta],
2699
                          iv_name=iv_name)
2700
  return drbd_dev
2701

    
2702
def _GenerateDiskTemplate(cfg, template_name,
2703
                          instance_name, primary_node,
2704
                          secondary_nodes, disk_sz, swap_sz):
2705
  """Generate the entire disk layout for a given template type.
2706

2707
  """
2708
  #TODO: compute space requirements
2709

    
2710
  vgname = cfg.GetVGName()
2711
  if template_name == "diskless":
2712
    disks = []
2713
  elif template_name == "plain":
2714
    if len(secondary_nodes) != 0:
2715
      raise errors.ProgrammerError("Wrong template configuration")
2716

    
2717
    names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
2718
    sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2719
                           logical_id=(vgname, names[0]),
2720
                           iv_name = "sda")
2721
    sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2722
                           logical_id=(vgname, names[1]),
2723
                           iv_name = "sdb")
2724
    disks = [sda_dev, sdb_dev]
2725
  elif template_name == "local_raid1":
2726
    if len(secondary_nodes) != 0:
2727
      raise errors.ProgrammerError("Wrong template configuration")
2728

    
2729

    
2730
    names = _GenerateUniqueNames(cfg, [".sda_m1", ".sda_m2",
2731
                                       ".sdb_m1", ".sdb_m2"])
2732
    sda_dev_m1 = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2733
                              logical_id=(vgname, names[0]))
2734
    sda_dev_m2 = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2735
                              logical_id=(vgname, names[1]))
2736
    md_sda_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name = "sda",
2737
                              size=disk_sz,
2738
                              children = [sda_dev_m1, sda_dev_m2])
2739
    sdb_dev_m1 = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2740
                              logical_id=(vgname, names[2]))
2741
    sdb_dev_m2 = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2742
                              logical_id=(vgname, names[3]))
2743
    md_sdb_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name = "sdb",
2744
                              size=swap_sz,
2745
                              children = [sdb_dev_m1, sdb_dev_m2])
2746
    disks = [md_sda_dev, md_sdb_dev]
2747
  elif template_name == constants.DT_REMOTE_RAID1:
2748
    if len(secondary_nodes) != 1:
2749
      raise errors.ProgrammerError("Wrong template configuration")
2750
    remote_node = secondary_nodes[0]
2751
    names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2752
                                       ".sdb_data", ".sdb_meta"])
2753
    drbd_sda_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2754
                                         disk_sz, names[0:2])
2755
    md_sda_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name="sda",
2756
                              children = [drbd_sda_dev], size=disk_sz)
2757
    drbd_sdb_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2758
                                         swap_sz, names[2:4])
2759
    md_sdb_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name="sdb",
2760
                              children = [drbd_sdb_dev], size=swap_sz)
2761
    disks = [md_sda_dev, md_sdb_dev]
2762
  elif template_name == constants.DT_DRBD8:
2763
    if len(secondary_nodes) != 1:
2764
      raise errors.ProgrammerError("Wrong template configuration")
2765
    remote_node = secondary_nodes[0]
2766
    names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2767
                                       ".sdb_data", ".sdb_meta"])
2768
    drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2769
                                         disk_sz, names[0:2], "sda")
2770
    drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2771
                                         swap_sz, names[2:4], "sdb")
2772
    disks = [drbd_sda_dev, drbd_sdb_dev]
2773
  else:
2774
    raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
2775
  return disks
2776

    
2777

    
2778
def _GetInstanceInfoText(instance):
2779
  """Compute that text that should be added to the disk's metadata.
2780

2781
  """
2782
  return "originstname+%s" % instance.name
2783

    
2784

    
2785
def _CreateDisks(cfg, instance):
2786
  """Create all disks for an instance.
2787

2788
  This abstracts away some work from AddInstance.
2789

2790
  Args:
2791
    instance: the instance object
2792

2793
  Returns:
2794
    True or False showing the success of the creation process
2795

2796
  """
2797
  info = _GetInstanceInfoText(instance)
2798

    
2799
  for device in instance.disks:
2800
    logger.Info("creating volume %s for instance %s" %
2801
              (device.iv_name, instance.name))
2802
    #HARDCODE
2803
    for secondary_node in instance.secondary_nodes:
2804
      if not _CreateBlockDevOnSecondary(cfg, secondary_node, instance,
2805
                                        device, False, info):
2806
        logger.Error("failed to create volume %s (%s) on secondary node %s!" %
2807
                     (device.iv_name, device, secondary_node))
2808
        return False
2809
    #HARDCODE
2810
    if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
2811
                                    instance, device, info):
2812
      logger.Error("failed to create volume %s on primary!" %
2813
                   device.iv_name)
2814
      return False
2815
  return True
2816

    
2817

    
2818
def _RemoveDisks(instance, cfg):
2819
  """Remove all disks for an instance.
2820

2821
  This abstracts away some work from `AddInstance()` and
2822
  `RemoveInstance()`. Note that in case some of the devices couldn't
2823
  be removed, the removal will continue with the other ones (compare
2824
  with `_CreateDisks()`).
2825

2826
  Args:
2827
    instance: the instance object
2828

2829
  Returns:
2830
    True or False showing the success of the removal proces
2831

2832
  """
2833
  logger.Info("removing block devices for instance %s" % instance.name)
2834

    
2835
  result = True
2836
  for device in instance.disks:
2837
    for node, disk in device.ComputeNodeTree(instance.primary_node):
2838
      cfg.SetDiskID(disk, node)
2839
      if not rpc.call_blockdev_remove(node, disk):
2840
        logger.Error("could not remove block device %s on node %s,"
2841
                     " continuing anyway" %
2842
                     (device.iv_name, node))
2843
        result = False
2844
  return result
2845

    
2846

    
2847
class LUCreateInstance(LogicalUnit):
2848
  """Create an instance.
2849

2850
  """
2851
  HPATH = "instance-add"
2852
  HTYPE = constants.HTYPE_INSTANCE
2853
  _OP_REQP = ["instance_name", "mem_size", "disk_size", "pnode",
2854
              "disk_template", "swap_size", "mode", "start", "vcpus",
2855
              "wait_for_sync", "ip_check", "mac"]
2856

    
2857
  def BuildHooksEnv(self):
2858
    """Build hooks env.
2859

2860
    This runs on master, primary and secondary nodes of the instance.
2861

2862
    """
2863
    env = {
2864
      "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
2865
      "INSTANCE_DISK_SIZE": self.op.disk_size,
2866
      "INSTANCE_SWAP_SIZE": self.op.swap_size,
2867
      "INSTANCE_ADD_MODE": self.op.mode,
2868
      }
2869
    if self.op.mode == constants.INSTANCE_IMPORT:
2870
      env["INSTANCE_SRC_NODE"] = self.op.src_node
2871
      env["INSTANCE_SRC_PATH"] = self.op.src_path
2872
      env["INSTANCE_SRC_IMAGE"] = self.src_image
2873

    
2874
    env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
2875
      primary_node=self.op.pnode,
2876
      secondary_nodes=self.secondaries,
2877
      status=self.instance_status,
2878
      os_type=self.op.os_type,
2879
      memory=self.op.mem_size,
2880
      vcpus=self.op.vcpus,
2881
      nics=[(self.inst_ip, self.op.bridge)],
2882
    ))
2883

    
2884
    nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
2885
          self.secondaries)
2886
    return env, nl, nl
2887

    
2888

    
2889
  def CheckPrereq(self):
2890
    """Check prerequisites.
2891

2892
    """
2893
    if self.op.mode not in (constants.INSTANCE_CREATE,
2894
                            constants.INSTANCE_IMPORT):
2895
      raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
2896
                                 self.op.mode)
2897

    
2898
    if self.op.mode == constants.INSTANCE_IMPORT:
2899
      src_node = getattr(self.op, "src_node", None)
2900
      src_path = getattr(self.op, "src_path", None)
2901
      if src_node is None or src_path is None:
2902
        raise errors.OpPrereqError("Importing an instance requires source"
2903
                                   " node and path options")
2904
      src_node_full = self.cfg.ExpandNodeName(src_node)
2905
      if src_node_full is None:
2906
        raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
2907
      self.op.src_node = src_node = src_node_full
2908

    
2909
      if not os.path.isabs(src_path):
2910
        raise errors.OpPrereqError("The source path must be absolute")
2911

    
2912
      export_info = rpc.call_export_info(src_node, src_path)
2913

    
2914
      if not export_info:
2915
        raise errors.OpPrereqError("No export found in dir %s" % src_path)
2916

    
2917
      if not export_info.has_section(constants.INISECT_EXP):
2918
        raise errors.ProgrammerError("Corrupted export config")
2919

    
2920
      ei_version = export_info.get(constants.INISECT_EXP, 'version')
2921
      if (int(ei_version) != constants.EXPORT_VERSION):
2922
        raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
2923
                                   (ei_version, constants.EXPORT_VERSION))
2924

    
2925
      if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
2926
        raise errors.OpPrereqError("Can't import instance with more than"
2927
                                   " one data disk")
2928

    
2929
      # FIXME: are the old os-es, disk sizes, etc. useful?
2930
      self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
2931
      diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
2932
                                                         'disk0_dump'))
2933
      self.src_image = diskimage
2934
    else: # INSTANCE_CREATE
2935
      if getattr(self.op, "os_type", None) is None:
2936
        raise errors.OpPrereqError("No guest OS specified")
2937

    
2938
    # check primary node
2939
    pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
2940
    if pnode is None:
2941
      raise errors.OpPrereqError("Primary node '%s' is unknown" %
2942
                                 self.op.pnode)
2943
    self.op.pnode = pnode.name
2944
    self.pnode = pnode
2945
    self.secondaries = []
2946
    # disk template and mirror node verification
2947
    if self.op.disk_template not in constants.DISK_TEMPLATES:
2948
      raise errors.OpPrereqError("Invalid disk template name")
2949

    
2950
    if self.op.disk_template in constants.DTS_NET_MIRROR:
2951
      if getattr(self.op, "snode", None) is None:
2952
        raise errors.OpPrereqError("The networked disk templates need"
2953
                                   " a mirror node")
2954

    
2955
      snode_name = self.cfg.ExpandNodeName(self.op.snode)
2956
      if snode_name is None:
2957
        raise errors.OpPrereqError("Unknown secondary node '%s'" %
2958
                                   self.op.snode)
2959
      elif snode_name == pnode.name:
2960
        raise errors.OpPrereqError("The secondary node cannot be"
2961
                                   " the primary node.")
2962
      self.secondaries.append(snode_name)
2963

    
2964
    # Check lv size requirements
2965
    nodenames = [pnode.name] + self.secondaries
2966
    nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
2967

    
2968
    # Required free disk space as a function of disk and swap space
2969
    req_size_dict = {
2970
      constants.DT_DISKLESS: 0,
2971
      constants.DT_PLAIN: self.op.disk_size + self.op.swap_size,
2972
      constants.DT_LOCAL_RAID1: (self.op.disk_size + self.op.swap_size) * 2,
2973
      # 256 MB are added for drbd metadata, 128MB for each drbd device
2974
      constants.DT_REMOTE_RAID1: self.op.disk_size + self.op.swap_size + 256,
2975
      constants.DT_DRBD8: self.op.disk_size + self.op.swap_size + 256,
2976
    }
2977

    
2978
    if self.op.disk_template not in req_size_dict:
2979
      raise errors.ProgrammerError("Disk template '%s' size requirement"
2980
                                   " is unknown" %  self.op.disk_template)
2981

    
2982
    req_size = req_size_dict[self.op.disk_template]
2983

    
2984
    for node in nodenames:
2985
      info = nodeinfo.get(node, None)
2986
      if not info:
2987
        raise errors.OpPrereqError("Cannot get current information"
2988
                                   " from node '%s'" % nodeinfo)
2989
      if req_size > info['vg_free']:
2990
        raise errors.OpPrereqError("Not enough disk space on target node %s."
2991
                                   " %d MB available, %d MB required" %
2992
                                   (node, info['vg_free'], req_size))
2993

    
2994
    # os verification
2995
    os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2996
    if not os_obj:
2997
      raise errors.OpPrereqError("OS '%s' not in supported os list for"
2998
                                 " primary node"  % self.op.os_type)
2999

    
3000
    # instance verification
3001
    hostname1 = utils.HostInfo(self.op.instance_name)
3002

    
3003
    self.op.instance_name = instance_name = hostname1.name
3004
    instance_list = self.cfg.GetInstanceList()
3005
    if instance_name in instance_list:
3006
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3007
                                 instance_name)
3008

    
3009
    ip = getattr(self.op, "ip", None)
3010
    if ip is None or ip.lower() == "none":
3011
      inst_ip = None
3012
    elif ip.lower() == "auto":
3013
      inst_ip = hostname1.ip
3014
    else:
3015
      if not utils.IsValidIP(ip):
3016
        raise errors.OpPrereqError("given IP address '%s' doesn't look"
3017
                                   " like a valid IP" % ip)
3018
      inst_ip = ip
3019
    self.inst_ip = inst_ip
3020

    
3021
    if self.op.start and not self.op.ip_check:
3022
      raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3023
                                 " adding an instance in start mode")
3024

    
3025
    if self.op.ip_check:
3026
      if utils.TcpPing(utils.HostInfo().name, hostname1.ip,
3027
                       constants.DEFAULT_NODED_PORT):
3028
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
3029
                                   (hostname1.ip, instance_name))
3030

    
3031
    # MAC address verification
3032
    if self.op.mac != "auto":
3033
      if not utils.IsValidMac(self.op.mac.lower()):
3034
        raise errors.OpPrereqError("invalid MAC address specified: %s" %
3035
                                   self.op.mac)
3036

    
3037
    # bridge verification
3038
    bridge = getattr(self.op, "bridge", None)
3039
    if bridge is None:
3040
      self.op.bridge = self.cfg.GetDefBridge()
3041
    else:
3042
      self.op.bridge = bridge
3043

    
3044
    if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3045
      raise errors.OpPrereqError("target bridge '%s' does not exist on"
3046
                                 " destination node '%s'" %
3047
                                 (self.op.bridge, pnode.name))
3048

    
3049
    if self.op.start:
3050
      self.instance_status = 'up'
3051
    else:
3052
      self.instance_status = 'down'
3053

    
3054
  def Exec(self, feedback_fn):
3055
    """Create and add the instance to the cluster.
3056

3057
    """
3058
    instance = self.op.instance_name
3059
    pnode_name = self.pnode.name
3060

    
3061
    if self.op.mac == "auto":
3062
      mac_address=self.cfg.GenerateMAC()
3063
    else:
3064
      mac_address=self.op.mac
3065

    
3066
    nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3067
    if self.inst_ip is not None:
3068
      nic.ip = self.inst_ip
3069

    
3070
    ht_kind = self.sstore.GetHypervisorType()
3071
    if ht_kind in constants.HTS_REQ_PORT:
3072
      network_port = self.cfg.AllocatePort()
3073
    else:
3074
      network_port = None
3075

    
3076
    disks = _GenerateDiskTemplate(self.cfg,
3077
                                  self.op.disk_template,
3078
                                  instance, pnode_name,
3079
                                  self.secondaries, self.op.disk_size,
3080
                                  self.op.swap_size)
3081

    
3082
    iobj = objects.Instance(name=instance, os=self.op.os_type,
3083
                            primary_node=pnode_name,
3084
                            memory=self.op.mem_size,
3085
                            vcpus=self.op.vcpus,
3086
                            nics=[nic], disks=disks,
3087
                            disk_template=self.op.disk_template,
3088
                            status=self.instance_status,
3089
                            network_port=network_port,
3090
                            )
3091

    
3092
    feedback_fn("* creating instance disks...")
3093
    if not _CreateDisks(self.cfg, iobj):
3094
      _RemoveDisks(iobj, self.cfg)
3095
      raise errors.OpExecError("Device creation failed, reverting...")
3096

    
3097
    feedback_fn("adding instance %s to cluster config" % instance)
3098

    
3099
    self.cfg.AddInstance(iobj)
3100

    
3101
    if self.op.wait_for_sync:
3102
      disk_abort = not _WaitForSync(self.cfg, iobj, self.proc)
3103
    elif iobj.disk_template in constants.DTS_NET_MIRROR:
3104
      # make sure the disks are not degraded (still sync-ing is ok)
3105
      time.sleep(15)
3106
      feedback_fn("* checking mirrors status")
3107
      disk_abort = not _WaitForSync(self.cfg, iobj, self.proc, oneshot=True)
3108
    else:
3109
      disk_abort = False
3110

    
3111
    if disk_abort:
3112
      _RemoveDisks(iobj, self.cfg)
3113
      self.cfg.RemoveInstance(iobj.name)
3114
      raise errors.OpExecError("There are some degraded disks for"
3115
                               " this instance")
3116

    
3117
    feedback_fn("creating os for instance %s on node %s" %
3118
                (instance, pnode_name))
3119

    
3120
    if iobj.disk_template != constants.DT_DISKLESS:
3121
      if self.op.mode == constants.INSTANCE_CREATE:
3122
        feedback_fn("* running the instance OS create scripts...")
3123
        if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3124
          raise errors.OpExecError("could not add os for instance %s"
3125
                                   " on node %s" %
3126
                                   (instance, pnode_name))
3127

    
3128
      elif self.op.mode == constants.INSTANCE_IMPORT:
3129
        feedback_fn("* running the instance OS import scripts...")
3130
        src_node = self.op.src_node
3131
        src_image = self.src_image
3132
        if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3133
                                                src_node, src_image):
3134
          raise errors.OpExecError("Could not import os for instance"
3135
                                   " %s on node %s" %
3136
                                   (instance, pnode_name))
3137
      else:
3138
        # also checked in the prereq part
3139
        raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3140
                                     % self.op.mode)
3141

    
3142
    if self.op.start:
3143
      logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3144
      feedback_fn("* starting instance...")
3145
      if not rpc.call_instance_start(pnode_name, iobj, None):
3146
        raise errors.OpExecError("Could not start instance")
3147

    
3148

    
3149
class LUConnectConsole(NoHooksLU):
3150
  """Connect to an instance's console.
3151

3152
  This is somewhat special in that it returns the command line that
3153
  you need to run on the master node in order to connect to the
3154
  console.
3155

3156
  """
3157
  _OP_REQP = ["instance_name"]
3158

    
3159
  def CheckPrereq(self):
3160
    """Check prerequisites.
3161

3162
    This checks that the instance is in the cluster.
3163

3164
    """
3165
    instance = self.cfg.GetInstanceInfo(
3166
      self.cfg.ExpandInstanceName(self.op.instance_name))
3167
    if instance is None:
3168
      raise errors.OpPrereqError("Instance '%s' not known" %
3169
                                 self.op.instance_name)
3170
    self.instance = instance
3171

    
3172
  def Exec(self, feedback_fn):
3173
    """Connect to the console of an instance
3174

3175
    """
3176
    instance = self.instance
3177
    node = instance.primary_node
3178

    
3179
    node_insts = rpc.call_instance_list([node])[node]
3180
    if node_insts is False:
3181
      raise errors.OpExecError("Can't connect to node %s." % node)
3182

    
3183
    if instance.name not in node_insts:
3184
      raise errors.OpExecError("Instance %s is not running." % instance.name)
3185

    
3186
    logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3187

    
3188
    hyper = hypervisor.GetHypervisor()
3189
    console_cmd = hyper.GetShellCommandForConsole(instance)
3190
    # build ssh cmdline
3191
    argv = ["ssh", "-q", "-t"]
3192
    argv.extend(ssh.KNOWN_HOSTS_OPTS)
3193
    argv.extend(ssh.BATCH_MODE_OPTS)
3194
    argv.append(node)
3195
    argv.append(console_cmd)
3196
    return "ssh", argv
3197

    
3198

    
3199
class LUAddMDDRBDComponent(LogicalUnit):
3200
  """Adda new mirror member to an instance's disk.
3201

3202
  """
3203
  HPATH = "mirror-add"
3204
  HTYPE = constants.HTYPE_INSTANCE
3205
  _OP_REQP = ["instance_name", "remote_node", "disk_name"]
3206

    
3207
  def BuildHooksEnv(self):
3208
    """Build hooks env.
3209

3210
    This runs on the master, the primary and all the secondaries.
3211

3212
    """
3213
    env = {
3214
      "NEW_SECONDARY": self.op.remote_node,
3215
      "DISK_NAME": self.op.disk_name,
3216
      }
3217
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3218
    nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
3219
          self.op.remote_node,] + list(self.instance.secondary_nodes)
3220
    return env, nl, nl
3221

    
3222
  def CheckPrereq(self):
3223
    """Check prerequisites.
3224

3225
    This checks that the instance is in the cluster.
3226

3227
    """
3228
    instance = self.cfg.GetInstanceInfo(
3229
      self.cfg.ExpandInstanceName(self.op.instance_name))
3230
    if instance is None:
3231
      raise errors.OpPrereqError("Instance '%s' not known" %
3232
                                 self.op.instance_name)
3233
    self.instance = instance
3234

    
3235
    remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
3236
    if remote_node is None:
3237
      raise errors.OpPrereqError("Node '%s' not known" % self.op.remote_node)
3238
    self.remote_node = remote_node
3239

    
3240
    if remote_node == instance.primary_node:
3241
      raise errors.OpPrereqError("The specified node is the primary node of"
3242
                                 " the instance.")
3243

    
3244
    if instance.disk_template != constants.DT_REMOTE_RAID1:
3245
      raise errors.OpPrereqError("Instance's disk layout is not"
3246
                                 " remote_raid1.")
3247
    for disk in instance.disks:
3248
      if disk.iv_name == self.op.disk_name:
3249
        break
3250
    else:
3251
      raise errors.OpPrereqError("Can't find this device ('%s') in the"
3252
                                 " instance." % self.op.disk_name)
3253
    if len(disk.children) > 1:
3254
      raise errors.OpPrereqError("The device already has two slave devices."
3255
                                 " This would create a 3-disk raid1 which we"
3256
                                 " don't allow.")
3257
    self.disk = disk
3258

    
3259
  def Exec(self, feedback_fn):
3260
    """Add the mirror component
3261

3262
    """
3263
    disk = self.disk
3264
    instance = self.instance
3265

    
3266
    remote_node = self.remote_node
3267
    lv_names = [".%s_%s" % (disk.iv_name, suf) for suf in ["data", "meta"]]
3268
    names = _GenerateUniqueNames(self.cfg, lv_names)
3269
    new_drbd = _GenerateMDDRBDBranch(self.cfg, instance.primary_node,
3270
                                     remote_node, disk.size, names)
3271

    
3272
    logger.Info("adding new mirror component on secondary")
3273
    #HARDCODE
3274
    if not _CreateBlockDevOnSecondary(self.cfg, remote_node, instance,
3275
                                      new_drbd, False,
3276
                                      _GetInstanceInfoText(instance)):
3277
      raise errors.OpExecError("Failed to create new component on secondary"
3278
                               " node %s" % remote_node)
3279

    
3280
    logger.Info("adding new mirror component on primary")
3281
    #HARDCODE
3282
    if not _CreateBlockDevOnPrimary(self.cfg, instance.primary_node,
3283
                                    instance, new_drbd,
3284
                                    _GetInstanceInfoText(instance)):
3285
      # remove secondary dev
3286
      self.cfg.SetDiskID(new_drbd, remote_node)
3287
      rpc.call_blockdev_remove(remote_node, new_drbd)
3288
      raise errors.OpExecError("Failed to create volume on primary")
3289

    
3290
    # the device exists now
3291
    # call the primary node to add the mirror to md
3292
    logger.Info("adding new mirror component to md")
3293
    if not rpc.call_blockdev_addchildren(instance.primary_node,
3294
                                         disk, [new_drbd]):
3295
      logger.Error("Can't add mirror compoment to md!")
3296
      self.cfg.SetDiskID(new_drbd, remote_node)
3297
      if not rpc.call_blockdev_remove(remote_node, new_drbd):
3298
        logger.Error("Can't rollback on secondary")
3299
      self.cfg.SetDiskID(new_drbd, instance.primary_node)
3300
      if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3301
        logger.Error("Can't rollback on primary")
3302
      raise errors.OpExecError("Can't add mirror component to md array")
3303

    
3304
    disk.children.append(new_drbd)
3305

    
3306
    self.cfg.AddInstance(instance)
3307

    
3308
    _WaitForSync(self.cfg, instance, self.proc)
3309

    
3310
    return 0
3311

    
3312

    
3313
class LURemoveMDDRBDComponent(LogicalUnit):
3314
  """Remove a component from a remote_raid1 disk.
3315

3316
  """
3317
  HPATH = "mirror-remove"
3318
  HTYPE = constants.HTYPE_INSTANCE
3319
  _OP_REQP = ["instance_name", "disk_name", "disk_id"]
3320

    
3321
  def BuildHooksEnv(self):
3322
    """Build hooks env.
3323

3324
    This runs on the master, the primary and all the secondaries.
3325

3326
    """
3327
    env = {
3328
      "DISK_NAME": self.op.disk_name,
3329
      "DISK_ID": self.op.disk_id,
3330
      "OLD_SECONDARY": self.old_secondary,
3331
      }
3332
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3333
    nl = [self.sstore.GetMasterNode(),
3334
          self.instance.primary_node] + list(self.instance.secondary_nodes)
3335
    return env, nl, nl
3336

    
3337
  def CheckPrereq(self):
3338
    """Check prerequisites.
3339

3340
    This checks that the instance is in the cluster.
3341

3342
    """
3343
    instance = self.cfg.GetInstanceInfo(
3344
      self.cfg.ExpandInstanceName(self.op.instance_name))
3345
    if instance is None:
3346
      raise errors.OpPrereqError("Instance '%s' not known" %
3347
                                 self.op.instance_name)
3348
    self.instance = instance
3349

    
3350
    if instance.disk_template != constants.DT_REMOTE_RAID1:
3351
      raise errors.OpPrereqError("Instance's disk layout is not"
3352
                                 " remote_raid1.")
3353
    for disk in instance.disks:
3354
      if disk.iv_name == self.op.disk_name:
3355
        break
3356
    else:
3357
      raise errors.OpPrereqError("Can't find this device ('%s') in the"
3358
                                 " instance." % self.op.disk_name)
3359
    for child in disk.children:
3360
      if (child.dev_type == constants.LD_DRBD7 and
3361
          child.logical_id[2] == self.op.disk_id):
3362
        break
3363
    else:
3364
      raise errors.OpPrereqError("Can't find the device with this port.")
3365

    
3366
    if len(disk.children) < 2:
3367
      raise errors.OpPrereqError("Cannot remove the last component from"
3368
                                 " a mirror.")
3369
    self.disk = disk
3370
    self.child = child
3371
    if self.child.logical_id[0] == instance.primary_node:
3372
      oid = 1
3373
    else:
3374
      oid = 0
3375
    self.old_secondary = self.child.logical_id[oid]
3376

    
3377
  def Exec(self, feedback_fn):
3378
    """Remove the mirror component
3379

3380
    """
3381
    instance = self.instance
3382
    disk = self.disk
3383
    child = self.child
3384
    logger.Info("remove mirror component")
3385
    self.cfg.SetDiskID(disk, instance.primary_node)
3386
    if not rpc.call_blockdev_removechildren(instance.primary_node,
3387
                                            disk, [child]):
3388
      raise errors.OpExecError("Can't remove child from mirror.")
3389

    
3390
    for node in child.logical_id[:2]:
3391
      self.cfg.SetDiskID(child, node)
3392
      if not rpc.call_blockdev_remove(node, child):
3393
        logger.Error("Warning: failed to remove device from node %s,"
3394
                     " continuing operation." % node)
3395

    
3396
    disk.children.remove(child)
3397
    self.cfg.AddInstance(instance)
3398

    
3399

    
3400
class LUReplaceDisks(LogicalUnit):
3401
  """Replace the disks of an instance.
3402

3403
  """
3404
  HPATH = "mirrors-replace"
3405
  HTYPE = constants.HTYPE_INSTANCE
3406
  _OP_REQP = ["instance_name", "mode", "disks"]
3407

    
3408
  def BuildHooksEnv(self):
3409
    """Build hooks env.
3410

3411
    This runs on the master, the primary and all the secondaries.
3412

3413
    """
3414
    env = {
3415
      "MODE": self.op.mode,
3416
      "NEW_SECONDARY": self.op.remote_node,
3417
      "OLD_SECONDARY": self.instance.secondary_nodes[0],
3418
      }
3419
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3420
    nl = [
3421
      self.sstore.GetMasterNode(),
3422
      self.instance.primary_node,
3423
      ]
3424
    if self.op.remote_node is not None:
3425
      nl.append(self.op.remote_node)
3426
    return env, nl, nl
3427

    
3428
  def CheckPrereq(self):
3429
    """Check prerequisites.
3430

3431
    This checks that the instance is in the cluster.
3432

3433
    """
3434
    instance = self.cfg.GetInstanceInfo(
3435
      self.cfg.ExpandInstanceName(self.op.instance_name))
3436
    if instance is None:
3437
      raise errors.OpPrereqError("Instance '%s' not known" %
3438
                                 self.op.instance_name)
3439
    self.instance = instance
3440
    self.op.instance_name = instance.name
3441

    
3442
    if instance.disk_template not in constants.DTS_NET_MIRROR:
3443
      raise errors.OpPrereqError("Instance's disk layout is not"
3444
                                 " network mirrored.")
3445

    
3446
    if len(instance.secondary_nodes) != 1:
3447
      raise errors.OpPrereqError("The instance has a strange layout,"
3448
                                 " expected one secondary but found %d" %
3449
                                 len(instance.secondary_nodes))
3450

    
3451
    self.sec_node = instance.secondary_nodes[0]
3452

    
3453
    remote_node = getattr(self.op, "remote_node", None)
3454
    if remote_node is not None:
3455
      remote_node = self.cfg.ExpandNodeName(remote_node)
3456
      if remote_node is None:
3457
        raise errors.OpPrereqError("Node '%s' not known" %
3458
                                   self.op.remote_node)
3459
      self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3460
    else:
3461
      self.remote_node_info = None
3462
    if remote_node == instance.primary_node:
3463
      raise errors.OpPrereqError("The specified node is the primary node of"
3464
                                 " the instance.")
3465
    elif remote_node == self.sec_node:
3466
      if self.op.mode == constants.REPLACE_DISK_SEC:
3467
        # this is for DRBD8, where we can't execute the same mode of
3468
        # replacement as for drbd7 (no different port allocated)
3469
        raise errors.OpPrereqError("Same secondary given, cannot execute"
3470
                                   " replacement")
3471
      # the user gave the current secondary, switch to
3472
      # 'no-replace-secondary' mode for drbd7
3473
      remote_node = None
3474
    if (instance.disk_template == constants.DT_REMOTE_RAID1 and
3475
        self.op.mode != constants.REPLACE_DISK_ALL):
3476
      raise errors.OpPrereqError("Template 'remote_raid1' only allows all"
3477
                                 " disks replacement, not individual ones")
3478
    if instance.disk_template == constants.DT_DRBD8:
3479
      if (self.op.mode == constants.REPLACE_DISK_ALL and
3480
          remote_node is not None):
3481
        # switch to replace secondary mode
3482
        self.op.mode = constants.REPLACE_DISK_SEC
3483

    
3484
      if self.op.mode == constants.REPLACE_DISK_ALL:
3485
        raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3486
                                   " secondary disk replacement, not"
3487
                                   " both at once")
3488
      elif self.op.mode == constants.REPLACE_DISK_PRI:
3489
        if remote_node is not None:
3490
          raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3491
                                     " the secondary while doing a primary"
3492
                                     " node disk replacement")
3493
        self.tgt_node = instance.primary_node
3494
        self.oth_node = instance.secondary_nodes[0]
3495
      elif self.op.mode == constants.REPLACE_DISK_SEC:
3496
        self.new_node = remote_node # this can be None, in which case
3497
                                    # we don't change the secondary
3498
        self.tgt_node = instance.secondary_nodes[0]
3499
        self.oth_node = instance.primary_node
3500
      else:
3501
        raise errors.ProgrammerError("Unhandled disk replace mode")
3502

    
3503
    for name in self.op.disks:
3504
      if instance.FindDisk(name) is None:
3505
        raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3506
                                   (name, instance.name))
3507
    self.op.remote_node = remote_node
3508

    
3509
  def _ExecRR1(self, feedback_fn):
3510
    """Replace the disks of an instance.
3511

3512
    """
3513
    instance = self.instance
3514
    iv_names = {}
3515
    # start of work
3516
    if self.op.remote_node is None:
3517
      remote_node = self.sec_node
3518
    else:
3519
      remote_node = self.op.remote_node
3520
    cfg = self.cfg
3521
    for dev in instance.disks:
3522
      size = dev.size
3523
      lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3524
      names = _GenerateUniqueNames(cfg, lv_names)
3525
      new_drbd = _GenerateMDDRBDBranch(cfg, instance.primary_node,
3526
                                       remote_node, size, names)
3527
      iv_names[dev.iv_name] = (dev, dev.children[0], new_drbd)
3528
      logger.Info("adding new mirror component on secondary for %s" %
3529
                  dev.iv_name)
3530
      #HARDCODE
3531
      if not _CreateBlockDevOnSecondary(cfg, remote_node, instance,
3532
                                        new_drbd, False,
3533
                                        _GetInstanceInfoText(instance)):
3534
        raise errors.OpExecError("Failed to create new component on secondary"
3535
                                 " node %s. Full abort, cleanup manually!" %
3536
                                 remote_node)
3537

    
3538
      logger.Info("adding new mirror component on primary")
3539
      #HARDCODE
3540
      if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
3541
                                      instance, new_drbd,
3542
                                      _GetInstanceInfoText(instance)):
3543
        # remove secondary dev
3544
        cfg.SetDiskID(new_drbd, remote_node)
3545
        rpc.call_blockdev_remove(remote_node, new_drbd)
3546
        raise errors.OpExecError("Failed to create volume on primary!"
3547
                                 " Full abort, cleanup manually!!")
3548

    
3549
      # the device exists now
3550
      # call the primary node to add the mirror to md
3551
      logger.Info("adding new mirror component to md")
3552
      if not rpc.call_blockdev_addchildren(instance.primary_node, dev,
3553
                                           [new_drbd]):
3554
        logger.Error("Can't add mirror compoment to md!")
3555
        cfg.SetDiskID(new_drbd, remote_node)
3556
        if not rpc.call_blockdev_remove(remote_node, new_drbd):
3557
          logger.Error("Can't rollback on secondary")
3558
        cfg.SetDiskID(new_drbd, instance.primary_node)
3559
        if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3560
          logger.Error("Can't rollback on primary")
3561
        raise errors.OpExecError("Full abort, cleanup manually!!")
3562

    
3563
      dev.children.append(new_drbd)
3564
      cfg.AddInstance(instance)
3565

    
3566
    # this can fail as the old devices are degraded and _WaitForSync
3567
    # does a combined result over all disks, so we don't check its
3568
    # return value
3569
    _WaitForSync(cfg, instance, self.proc, unlock=True)
3570

    
3571
    # so check manually all the devices
3572
    for name in iv_names:
3573
      dev, child, new_drbd = iv_names[name]
3574
      cfg.SetDiskID(dev, instance.primary_node)
3575
      is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3576
      if is_degr:
3577
        raise errors.OpExecError("MD device %s is degraded!" % name)
3578
      cfg.SetDiskID(new_drbd, instance.primary_node)
3579
      is_degr = rpc.call_blockdev_find(instance.primary_node, new_drbd)[5]
3580
      if is_degr:
3581
        raise errors.OpExecError("New drbd device %s is degraded!" % name)
3582

    
3583
    for name in iv_names:
3584
      dev, child, new_drbd = iv_names[name]
3585
      logger.Info("remove mirror %s component" % name)
3586
      cfg.SetDiskID(dev, instance.primary_node)
3587
      if not rpc.call_blockdev_removechildren(instance.primary_node,
3588
                                              dev, [child]):
3589
        logger.Error("Can't remove child from mirror, aborting"
3590
                     " *this device cleanup*.\nYou need to cleanup manually!!")
3591
        continue
3592

    
3593
      for node in child.logical_id[:2]:
3594
        logger.Info("remove child device on %s" % node)
3595
        cfg.SetDiskID(child, node)
3596
        if not rpc.call_blockdev_remove(node, child):
3597
          logger.Error("Warning: failed to remove device from node %s,"
3598
                       " continuing operation." % node)
3599

    
3600
      dev.children.remove(child)
3601

    
3602
      cfg.AddInstance(instance)
3603

    
3604
  def _ExecD8DiskOnly(self, feedback_fn):
3605
    """Replace a disk on the primary or secondary for dbrd8.
3606

3607
    The algorithm for replace is quite complicated:
3608
      - for each disk to be replaced:
3609
        - create new LVs on the target node with unique names
3610
        - detach old LVs from the drbd device
3611
        - rename old LVs to name_replaced.<time_t>
3612
        - rename new LVs to old LVs
3613
        - attach the new LVs (with the old names now) to the drbd device
3614
      - wait for sync across all devices
3615
      - for each modified disk:
3616
        - remove old LVs (which have the name name_replaces.<time_t>)
3617

3618
    Failures are not very well handled.
3619

3620
    """
3621
    steps_total = 6
3622
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3623
    instance = self.instance
3624
    iv_names = {}
3625
    vgname = self.cfg.GetVGName()
3626
    # start of work
3627
    cfg = self.cfg
3628
    tgt_node = self.tgt_node
3629
    oth_node = self.oth_node
3630

    
3631
    # Step: check device activation
3632
    self.proc.LogStep(1, steps_total, "check device existence")
3633
    info("checking volume groups")
3634
    my_vg = cfg.GetVGName()
3635
    results = rpc.call_vg_list([oth_node, tgt_node])
3636
    if not results:
3637
      raise errors.OpExecError("Can't list volume groups on the nodes")
3638
    for node in oth_node, tgt_node:
3639
      res = results.get(node, False)
3640
      if not res or my_vg not in res:
3641
        raise errors.OpExecError("Volume group '%s' not found on %s" %
3642
                                 (my_vg, node))
3643
    for dev in instance.disks:
3644
      if not dev.iv_name in self.op.disks:
3645
        continue
3646
      for node in tgt_node, oth_node:
3647
        info("checking %s on %s" % (dev.iv_name, node))
3648
        cfg.SetDiskID(dev, node)
3649
        if not rpc.call_blockdev_find(node, dev):
3650
          raise errors.OpExecError("Can't find device %s on node %s" %
3651
                                   (dev.iv_name, node))
3652

    
3653
    # Step: check other node consistency
3654
    self.proc.LogStep(2, steps_total, "check peer consistency")
3655
    for dev in instance.disks:
3656
      if not dev.iv_name in self.op.disks:
3657
        continue
3658
      info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3659
      if not _CheckDiskConsistency(self.cfg, dev, oth_node,
3660
                                   oth_node==instance.primary_node):
3661
        raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3662
                                 " to replace disks on this node (%s)" %
3663
                                 (oth_node, tgt_node))
3664

    
3665
    # Step: create new storage
3666
    self.proc.LogStep(3, steps_total, "allocate new storage")
3667
    for dev in instance.disks:
3668
      if not dev.iv_name in self.op.disks:
3669
        continue
3670
      size = dev.size
3671
      cfg.SetDiskID(dev, tgt_node)
3672
      lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3673
      names = _GenerateUniqueNames(cfg, lv_names)
3674
      lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3675
                             logical_id=(vgname, names[0]))
3676
      lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3677
                             logical_id=(vgname, names[1]))
3678
      new_lvs = [lv_data, lv_meta]
3679
      old_lvs = dev.children
3680
      iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3681
      info("creating new local storage on %s for %s" %
3682
           (tgt_node, dev.iv_name))
3683
      # since we *always* want to create this LV, we use the
3684
      # _Create...OnPrimary (which forces the creation), even if we
3685
      # are talking about the secondary node
3686
      for new_lv in new_lvs:
3687
        if not _CreateBlockDevOnPrimary(cfg, tgt_node, instance, new_lv,
3688
                                        _GetInstanceInfoText(instance)):
3689
          raise errors.OpExecError("Failed to create new LV named '%s' on"
3690
                                   " node '%s'" %
3691
                                   (new_lv.logical_id[1], tgt_node))
3692

    
3693
    # Step: for each lv, detach+rename*2+attach
3694
    self.proc.LogStep(4, steps_total, "change drbd configuration")
3695
    for dev, old_lvs, new_lvs in iv_names.itervalues():
3696
      info("detaching %s drbd from local storage" % dev.iv_name)
3697
      if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
3698
        raise errors.OpExecError("Can't detach drbd from local storage on node"
3699
                                 " %s for device %s" % (tgt_node, dev.iv_name))
3700
      #dev.children = []
3701
      #cfg.Update(instance)
3702

    
3703
      # ok, we created the new LVs, so now we know we have the needed
3704
      # storage; as such, we proceed on the target node to rename
3705
      # old_lv to _old, and new_lv to old_lv; note that we rename LVs
3706
      # using the assumption than logical_id == physical_id (which in
3707
      # turn is the unique_id on that node)
3708

    
3709
      # FIXME(iustin): use a better name for the replaced LVs
3710
      temp_suffix = int(time.time())
3711
      ren_fn = lambda d, suff: (d.physical_id[0],
3712
                                d.physical_id[1] + "_replaced-%s" % suff)
3713
      # build the rename list based on what LVs exist on the node
3714
      rlist = []
3715
      for to_ren in old_lvs:
3716
        find_res = rpc.call_blockdev_find(tgt_node, to_ren)
3717
        if find_res is not None: # device exists
3718
          rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
3719

    
3720
      info("renaming the old LVs on the target node")
3721
      if not rpc.call_blockdev_rename(tgt_node, rlist):
3722
        raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
3723
      # now we rename the new LVs to the old LVs
3724
      info("renaming the new LVs on the target node")
3725
      rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
3726
      if not rpc.call_blockdev_rename(tgt_node, rlist):
3727
        raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
3728

    
3729
      for old, new in zip(old_lvs, new_lvs):
3730
        new.logical_id = old.logical_id
3731
        cfg.SetDiskID(new, tgt_node)
3732

    
3733
      for disk in old_lvs:
3734
        disk.logical_id = ren_fn(disk, temp_suffix)
3735
        cfg.SetDiskID(disk, tgt_node)
3736

    
3737
      # now that the new lvs have the old name, we can add them to the device
3738
      info("adding new mirror component on %s" % tgt_node)
3739
      if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
3740
        for new_lv in new_lvs:
3741
          if not rpc.call_blockdev_remove(tgt_node, new_lv):
3742
            warning("Can't rollback device %s", hint="manually cleanup unused"
3743
                    " logical volumes")
3744
        raise errors.OpExecError("Can't add local storage to drbd")
3745

    
3746
      dev.children = new_lvs
3747
      cfg.Update(instance)
3748

    
3749
    # Step: wait for sync
3750

    
3751
    # this can fail as the old devices are degraded and _WaitForSync
3752
    # does a combined result over all disks, so we don't check its
3753
    # return value
3754
    self.proc.LogStep(5, steps_total, "sync devices")
3755
    _WaitForSync(cfg, instance, self.proc, unlock=True)
3756

    
3757
    # so check manually all the devices
3758
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3759
      cfg.SetDiskID(dev, instance.primary_node)
3760
      is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3761
      if is_degr:
3762
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
3763

    
3764
    # Step: remove old storage
3765
    self.proc.LogStep(6, steps_total, "removing old storage")
3766
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3767
      info("remove logical volumes for %s" % name)
3768
      for lv in old_lvs:
3769
        cfg.SetDiskID(lv, tgt_node)
3770
        if not rpc.call_blockdev_remove(tgt_node, lv):
3771
          warning("Can't remove old LV", hint="manually remove unused LVs")
3772
          continue
3773

    
3774
  def _ExecD8Secondary(self, feedback_fn):
3775
    """Replace the secondary node for drbd8.
3776

3777
    The algorithm for replace is quite complicated:
3778
      - for all disks of the instance:
3779
        - create new LVs on the new node with same names
3780
        - shutdown the drbd device on the old secondary
3781
        - disconnect the drbd network on the primary
3782
        - create the drbd device on the new secondary
3783
        - network attach the drbd on the primary, using an artifice:
3784
          the drbd code for Attach() will connect to the network if it
3785
          finds a device which is connected to the good local disks but
3786
          not network enabled
3787
      - wait for sync across all devices
3788
      - remove all disks from the old secondary
3789

3790
    Failures are not very well handled.
3791

3792
    """
3793
    steps_total = 6
3794
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3795
    instance = self.instance
3796
    iv_names = {}
3797
    vgname = self.cfg.GetVGName()
3798
    # start of work
3799
    cfg = self.cfg
3800
    old_node = self.tgt_node
3801
    new_node = self.new_node
3802
    pri_node = instance.primary_node
3803

    
3804
    # Step: check device activation
3805
    self.proc.LogStep(1, steps_total, "check device existence")
3806
    info("checking volume groups")
3807
    my_vg = cfg.GetVGName()
3808
    results = rpc.call_vg_list([pri_node, new_node])
3809
    if not results:
3810
      raise errors.OpExecError("Can't list volume groups on the nodes")
3811
    for node in pri_node, new_node:
3812
      res = results.get(node, False)
3813
      if not res or my_vg not in res:
3814
        raise errors.OpExecError("Volume group '%s' not found on %s" %
3815
                                 (my_vg, node))
3816
    for dev in instance.disks:
3817
      if not dev.iv_name in self.op.disks:
3818
        continue
3819
      info("checking %s on %s" % (dev.iv_name, pri_node))
3820
      cfg.SetDiskID(dev, pri_node)
3821
      if not rpc.call_blockdev_find(pri_node, dev):
3822
        raise errors.OpExecError("Can't find device %s on node %s" %
3823
                                 (dev.iv_name, pri_node))
3824

    
3825
    # Step: check other node consistency
3826
    self.proc.LogStep(2, steps_total, "check peer consistency")
3827
    for dev in instance.disks:
3828
      if not dev.iv_name in self.op.disks:
3829
        continue
3830
      info("checking %s consistency on %s" % (dev.iv_name, pri_node))
3831
      if not _CheckDiskConsistency(self.cfg, dev, pri_node, True, ldisk=True):
3832
        raise errors.OpExecError("Primary node (%s) has degraded storage,"
3833
                                 " unsafe to replace the secondary" %
3834
                                 pri_node)
3835

    
3836
    # Step: create new storage
3837
    self.proc.LogStep(3, steps_total, "allocate new storage")
3838
    for dev in instance.disks:
3839
      size = dev.size
3840
      info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
3841
      # since we *always* want to create this LV, we use the
3842
      # _Create...OnPrimary (which forces the creation), even if we
3843
      # are talking about the secondary node
3844
      for new_lv in dev.children:
3845
        if not _CreateBlockDevOnPrimary(cfg, new_node, instance, new_lv,
3846
                                        _GetInstanceInfoText(instance)):
3847
          raise errors.OpExecError("Failed to create new LV named '%s' on"
3848
                                   " node '%s'" %
3849
                                   (new_lv.logical_id[1], new_node))
3850

    
3851
      iv_names[dev.iv_name] = (dev, dev.children)
3852

    
3853
    self.proc.LogStep(4, steps_total, "changing drbd configuration")
3854
    for dev in instance.disks:
3855
      size = dev.size
3856
      info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
3857
      # create new devices on new_node
3858
      new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
3859
                              logical_id=(pri_node, new_node,
3860
                                          dev.logical_id[2]),
3861
                              children=dev.children)
3862
      if not _CreateBlockDevOnSecondary(cfg, new_node, instance,
3863
                                        new_drbd, False,
3864
                                      _GetInstanceInfoText(instance)):
3865
        raise errors.OpExecError("Failed to create new DRBD on"
3866
                                 " node '%s'" % new_node)
3867

    
3868
    for dev in instance.disks:
3869
      # we have new devices, shutdown the drbd on the old secondary
3870
      info("shutting down drbd for %s on old node" % dev.iv_name)
3871
      cfg.SetDiskID(dev, old_node)
3872
      if not rpc.call_blockdev_shutdown(old_node, dev):
3873
        warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
3874
                hint="Please cleanup this device manually as soon as possible")
3875

    
3876
    info("detaching primary drbds from the network (=> standalone)")
3877
    done = 0
3878
    for dev in instance.disks:
3879
      cfg.SetDiskID(dev, pri_node)
3880
      # set the physical (unique in bdev terms) id to None, meaning
3881
      # detach from network
3882
      dev.physical_id = (None,) * len(dev.physical_id)
3883
      # and 'find' the device, which will 'fix' it to match the
3884
      # standalone state
3885
      if rpc.call_blockdev_find(pri_node, dev):
3886
        done += 1
3887
      else:
3888
        warning("Failed to detach drbd %s from network, unusual case" %
3889
                dev.iv_name)
3890

    
3891
    if not done:
3892
      # no detaches succeeded (very unlikely)
3893
      raise errors.OpExecError("Can't detach at least one DRBD from old node")
3894

    
3895
    # if we managed to detach at least one, we update all the disks of
3896
    # the instance to point to the new secondary
3897
    info("updating instance configuration")
3898
    for dev in instance.disks:
3899
      dev.logical_id = (pri_node, new_node) + dev.logical_id[2:]
3900
      cfg.SetDiskID(dev, pri_node)
3901
    cfg.Update(instance)
3902

    
3903
    # and now perform the drbd attach
3904
    info("attaching primary drbds to new secondary (standalone => connected)")
3905
    failures = []
3906
    for dev in instance.disks:
3907
      info("attaching primary drbd for %s to new secondary node" % dev.iv_name)
3908
      # since the attach is smart, it's enough to 'find' the device,
3909
      # it will automatically activate the network, if the physical_id
3910
      # is correct
3911
      cfg.SetDiskID(dev, pri_node)
3912
      if not rpc.call_blockdev_find(pri_node, dev):
3913
        warning("can't attach drbd %s to new secondary!" % dev.iv_name,
3914
                "please do a gnt-instance info to see the status of disks")
3915

    
3916
    # this can fail as the old devices are degraded and _WaitForSync
3917
    # does a combined result over all disks, so we don't check its
3918
    # return value
3919
    self.proc.LogStep(5, steps_total, "sync devices")
3920
    _WaitForSync(cfg, instance, self.proc, unlock=True)
3921

    
3922
    # so check manually all the devices
3923
    for name, (dev, old_lvs) in iv_names.iteritems():
3924
      cfg.SetDiskID(dev, pri_node)
3925
      is_degr = rpc.call_blockdev_find(pri_node, dev)[5]
3926
      if is_degr:
3927
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
3928

    
3929
    self.proc.LogStep(6, steps_total, "removing old storage")
3930
    for name, (dev, old_lvs) in iv_names.iteritems():
3931
      info("remove logical volumes for %s" % name)
3932
      for lv in old_lvs:
3933
        cfg.SetDiskID(lv, old_node)
3934
        if not rpc.call_blockdev_remove(old_node, lv):
3935
          warning("Can't remove LV on old secondary",
3936
                  hint="Cleanup stale volumes by hand")
3937

    
3938
  def Exec(self, feedback_fn):
3939
    """Execute disk replacement.
3940

3941
    This dispatches the disk replacement to the appropriate handler.
3942

3943
    """
3944
    instance = self.instance
3945
    if instance.disk_template == constants.DT_REMOTE_RAID1:
3946
      fn = self._ExecRR1
3947
    elif instance.disk_template == constants.DT_DRBD8:
3948
      if self.op.remote_node is None:
3949
        fn = self._ExecD8DiskOnly
3950
      else:
3951
        fn = self._ExecD8Secondary
3952
    else:
3953
      raise errors.ProgrammerError("Unhandled disk replacement case")
3954
    return fn(feedback_fn)
3955

    
3956

    
3957
class LUQueryInstanceData(NoHooksLU):
3958
  """Query runtime instance data.
3959

3960
  """
3961
  _OP_REQP = ["instances"]
3962

    
3963
  def CheckPrereq(self):
3964
    """Check prerequisites.
3965

3966
    This only checks the optional instance list against the existing names.
3967

3968
    """
3969
    if not isinstance(self.op.instances, list):
3970
      raise errors.OpPrereqError("Invalid argument type 'instances'")
3971
    if self.op.instances:
3972
      self.wanted_instances = []
3973
      names = self.op.instances
3974
      for name in names:
3975
        instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
3976
        if instance is None:
3977
          raise errors.OpPrereqError("No such instance name '%s'" % name)
3978
      self.wanted_instances.append(instance)
3979
    else:
3980
      self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
3981
                               in self.cfg.GetInstanceList()]
3982
    return
3983

    
3984

    
3985
  def _ComputeDiskStatus(self, instance, snode, dev):
3986
    """Compute block device status.
3987

3988