Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ 6510a58a

History | View | Annotate | Download (143 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0613,W0201
25

    
26
import os
27
import os.path
28
import sha
29
import time
30
import tempfile
31
import re
32
import platform
33

    
34
from ganeti import rpc
35
from ganeti import ssh
36
from ganeti import logger
37
from ganeti import utils
38
from ganeti import errors
39
from ganeti import hypervisor
40
from ganeti import config
41
from ganeti import constants
42
from ganeti import objects
43
from ganeti import opcodes
44
from ganeti import ssconf
45

    
46
class LogicalUnit(object):
47
  """Logical Unit base class.
48

49
  Subclasses must follow these rules:
50
    - implement CheckPrereq which also fills in the opcode instance
51
      with all the fields (even if as None)
52
    - implement Exec
53
    - implement BuildHooksEnv
54
    - redefine HPATH and HTYPE
55
    - optionally redefine their run requirements (REQ_CLUSTER,
56
      REQ_MASTER); note that all commands require root permissions
57

58
  """
59
  HPATH = None
60
  HTYPE = None
61
  _OP_REQP = []
62
  REQ_CLUSTER = True
63
  REQ_MASTER = True
64

    
65
  def __init__(self, processor, op, cfg, sstore):
66
    """Constructor for LogicalUnit.
67

68
    This needs to be overriden in derived classes in order to check op
69
    validity.
70

71
    """
72
    self.proc = processor
73
    self.op = op
74
    self.cfg = cfg
75
    self.sstore = sstore
76
    for attr_name in self._OP_REQP:
77
      attr_val = getattr(op, attr_name, None)
78
      if attr_val is None:
79
        raise errors.OpPrereqError("Required parameter '%s' missing" %
80
                                   attr_name)
81
    if self.REQ_CLUSTER:
82
      if not cfg.IsCluster():
83
        raise errors.OpPrereqError("Cluster not initialized yet,"
84
                                   " use 'gnt-cluster init' first.")
85
      if self.REQ_MASTER:
86
        master = sstore.GetMasterNode()
87
        if master != utils.HostInfo().name:
88
          raise errors.OpPrereqError("Commands must be run on the master"
89
                                     " node %s" % master)
90

    
91
  def CheckPrereq(self):
92
    """Check prerequisites for this LU.
93

94
    This method should check that the prerequisites for the execution
95
    of this LU are fulfilled. It can do internode communication, but
96
    it should be idempotent - no cluster or system changes are
97
    allowed.
98

99
    The method should raise errors.OpPrereqError in case something is
100
    not fulfilled. Its return value is ignored.
101

102
    This method should also update all the parameters of the opcode to
103
    their canonical form; e.g. a short node name must be fully
104
    expanded after this method has successfully completed (so that
105
    hooks, logging, etc. work correctly).
106

107
    """
108
    raise NotImplementedError
109

    
110
  def Exec(self, feedback_fn):
111
    """Execute the LU.
112

113
    This method should implement the actual work. It should raise
114
    errors.OpExecError for failures that are somewhat dealt with in
115
    code, or expected.
116

117
    """
118
    raise NotImplementedError
119

    
120
  def BuildHooksEnv(self):
121
    """Build hooks environment for this LU.
122

123
    This method should return a three-node tuple consisting of: a dict
124
    containing the environment that will be used for running the
125
    specific hook for this LU, a list of node names on which the hook
126
    should run before the execution, and a list of node names on which
127
    the hook should run after the execution.
128

129
    The keys of the dict must not have 'GANETI_' prefixed as this will
130
    be handled in the hooks runner. Also note additional keys will be
131
    added by the hooks runner. If the LU doesn't define any
132
    environment, an empty dict (and not None) should be returned.
133

134
    As for the node lists, the master should not be included in the
135
    them, as it will be added by the hooks runner in case this LU
136
    requires a cluster to run on (otherwise we don't have a node
137
    list). No nodes should be returned as an empty list (and not
138
    None).
139

140
    Note that if the HPATH for a LU class is None, this function will
141
    not be called.
142

143
    """
144
    raise NotImplementedError
145

    
146

    
147
class NoHooksLU(LogicalUnit):
148
  """Simple LU which runs no hooks.
149

150
  This LU is intended as a parent for other LogicalUnits which will
151
  run no hooks, in order to reduce duplicate code.
152

153
  """
154
  HPATH = None
155
  HTYPE = None
156

    
157
  def BuildHooksEnv(self):
158
    """Build hooks env.
159

160
    This is a no-op, since we don't run hooks.
161

162
    """
163
    return {}, [], []
164

    
165

    
166
def _AddHostToEtcHosts(hostname):
167
  """Wrapper around utils.SetEtcHostsEntry.
168

169
  """
170
  hi = utils.HostInfo(name=hostname)
171
  utils.SetEtcHostsEntry(constants.ETC_HOSTS, hi.ip, hi.name, [hi.ShortName()])
172

    
173

    
174
def _RemoveHostFromEtcHosts(hostname):
175
  """Wrapper around utils.RemoveEtcHostsEntry.
176

177
  """
178
  hi = utils.HostInfo(name=hostname)
179
  utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.name)
180
  utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.ShortName())
181

    
182

    
183
def _GetWantedNodes(lu, nodes):
184
  """Returns list of checked and expanded node names.
185

186
  Args:
187
    nodes: List of nodes (strings) or None for all
188

189
  """
190
  if not isinstance(nodes, list):
191
    raise errors.OpPrereqError("Invalid argument type 'nodes'")
192

    
193
  if nodes:
194
    wanted = []
195

    
196
    for name in nodes:
197
      node = lu.cfg.ExpandNodeName(name)
198
      if node is None:
199
        raise errors.OpPrereqError("No such node name '%s'" % name)
200
      wanted.append(node)
201

    
202
  else:
203
    wanted = lu.cfg.GetNodeList()
204
  return utils.NiceSort(wanted)
205

    
206

    
207
def _GetWantedInstances(lu, instances):
208
  """Returns list of checked and expanded instance names.
209

210
  Args:
211
    instances: List of instances (strings) or None for all
212

213
  """
214
  if not isinstance(instances, list):
215
    raise errors.OpPrereqError("Invalid argument type 'instances'")
216

    
217
  if instances:
218
    wanted = []
219

    
220
    for name in instances:
221
      instance = lu.cfg.ExpandInstanceName(name)
222
      if instance is None:
223
        raise errors.OpPrereqError("No such instance name '%s'" % name)
224
      wanted.append(instance)
225

    
226
  else:
227
    wanted = lu.cfg.GetInstanceList()
228
  return utils.NiceSort(wanted)
229

    
230

    
231
def _CheckOutputFields(static, dynamic, selected):
232
  """Checks whether all selected fields are valid.
233

234
  Args:
235
    static: Static fields
236
    dynamic: Dynamic fields
237

238
  """
239
  static_fields = frozenset(static)
240
  dynamic_fields = frozenset(dynamic)
241

    
242
  all_fields = static_fields | dynamic_fields
243

    
244
  if not all_fields.issuperset(selected):
245
    raise errors.OpPrereqError("Unknown output fields selected: %s"
246
                               % ",".join(frozenset(selected).
247
                                          difference(all_fields)))
248

    
249

    
250
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
251
                          memory, vcpus, nics):
252
  """Builds instance related env variables for hooks from single variables.
253

254
  Args:
255
    secondary_nodes: List of secondary nodes as strings
256
  """
257
  env = {
258
    "OP_TARGET": name,
259
    "INSTANCE_NAME": name,
260
    "INSTANCE_PRIMARY": primary_node,
261
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
262
    "INSTANCE_OS_TYPE": os_type,
263
    "INSTANCE_STATUS": status,
264
    "INSTANCE_MEMORY": memory,
265
    "INSTANCE_VCPUS": vcpus,
266
  }
267

    
268
  if nics:
269
    nic_count = len(nics)
270
    for idx, (ip, bridge) in enumerate(nics):
271
      if ip is None:
272
        ip = ""
273
      env["INSTANCE_NIC%d_IP" % idx] = ip
274
      env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
275
  else:
276
    nic_count = 0
277

    
278
  env["INSTANCE_NIC_COUNT"] = nic_count
279

    
280
  return env
281

    
282

    
283
def _BuildInstanceHookEnvByObject(instance, override=None):
284
  """Builds instance related env variables for hooks from an object.
285

286
  Args:
287
    instance: objects.Instance object of instance
288
    override: dict of values to override
289
  """
290
  args = {
291
    'name': instance.name,
292
    'primary_node': instance.primary_node,
293
    'secondary_nodes': instance.secondary_nodes,
294
    'os_type': instance.os,
295
    'status': instance.os,
296
    'memory': instance.memory,
297
    'vcpus': instance.vcpus,
298
    'nics': [(nic.ip, nic.bridge) for nic in instance.nics],
299
  }
300
  if override:
301
    args.update(override)
302
  return _BuildInstanceHookEnv(**args)
303

    
304

    
305
def _UpdateKnownHosts(fullnode, ip, pubkey):
306
  """Ensure a node has a correct known_hosts entry.
307

308
  Args:
309
    fullnode - Fully qualified domain name of host. (str)
310
    ip       - IPv4 address of host (str)
311
    pubkey   - the public key of the cluster
312

313
  """
314
  if os.path.exists(constants.SSH_KNOWN_HOSTS_FILE):
315
    f = open(constants.SSH_KNOWN_HOSTS_FILE, 'r+')
316
  else:
317
    f = open(constants.SSH_KNOWN_HOSTS_FILE, 'w+')
318

    
319
  inthere = False
320

    
321
  save_lines = []
322
  add_lines = []
323
  removed = False
324

    
325
  for rawline in f:
326
    logger.Debug('read %s' % (repr(rawline),))
327

    
328
    parts = rawline.rstrip('\r\n').split()
329

    
330
    # Ignore unwanted lines
331
    if len(parts) >= 3 and not rawline.lstrip()[0] == '#':
332
      fields = parts[0].split(',')
333
      key = parts[2]
334

    
335
      haveall = True
336
      havesome = False
337
      for spec in [ ip, fullnode ]:
338
        if spec not in fields:
339
          haveall = False
340
        if spec in fields:
341
          havesome = True
342

    
343
      logger.Debug("key, pubkey = %s." % (repr((key, pubkey)),))
344
      if haveall and key == pubkey:
345
        inthere = True
346
        save_lines.append(rawline)
347
        logger.Debug("Keeping known_hosts '%s'." % (repr(rawline),))
348
        continue
349

    
350
      if havesome and (not haveall or key != pubkey):
351
        removed = True
352
        logger.Debug("Discarding known_hosts '%s'." % (repr(rawline),))
353
        continue
354

    
355
    save_lines.append(rawline)
356

    
357
  if not inthere:
358
    add_lines.append('%s,%s ssh-rsa %s\n' % (fullnode, ip, pubkey))
359
    logger.Debug("Adding known_hosts '%s'." % (repr(add_lines[-1]),))
360

    
361
  if removed:
362
    save_lines = save_lines + add_lines
363

    
364
    # Write a new file and replace old.
365
    fd, tmpname = tempfile.mkstemp('.tmp', 'known_hosts.',
366
                                   constants.DATA_DIR)
367
    newfile = os.fdopen(fd, 'w')
368
    try:
369
      newfile.write(''.join(save_lines))
370
    finally:
371
      newfile.close()
372
    logger.Debug("Wrote new known_hosts.")
373
    os.rename(tmpname, constants.SSH_KNOWN_HOSTS_FILE)
374

    
375
  elif add_lines:
376
    # Simply appending a new line will do the trick.
377
    f.seek(0, 2)
378
    for add in add_lines:
379
      f.write(add)
380

    
381
  f.close()
382

    
383

    
384
def _HasValidVG(vglist, vgname):
385
  """Checks if the volume group list is valid.
386

387
  A non-None return value means there's an error, and the return value
388
  is the error message.
389

390
  """
391
  vgsize = vglist.get(vgname, None)
392
  if vgsize is None:
393
    return "volume group '%s' missing" % vgname
394
  elif vgsize < 20480:
395
    return ("volume group '%s' too small (20480MiB required, %dMib found)" %
396
            (vgname, vgsize))
397
  return None
398

    
399

    
400
def _InitSSHSetup(node):
401
  """Setup the SSH configuration for the cluster.
402

403

404
  This generates a dsa keypair for root, adds the pub key to the
405
  permitted hosts and adds the hostkey to its own known hosts.
406

407
  Args:
408
    node: the name of this host as a fqdn
409

410
  """
411
  priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
412

    
413
  for name in priv_key, pub_key:
414
    if os.path.exists(name):
415
      utils.CreateBackup(name)
416
    utils.RemoveFile(name)
417

    
418
  result = utils.RunCmd(["ssh-keygen", "-t", "dsa",
419
                         "-f", priv_key,
420
                         "-q", "-N", ""])
421
  if result.failed:
422
    raise errors.OpExecError("Could not generate ssh keypair, error %s" %
423
                             result.output)
424

    
425
  f = open(pub_key, 'r')
426
  try:
427
    utils.AddAuthorizedKey(auth_keys, f.read(8192))
428
  finally:
429
    f.close()
430

    
431

    
432
def _InitGanetiServerSetup(ss):
433
  """Setup the necessary configuration for the initial node daemon.
434

435
  This creates the nodepass file containing the shared password for
436
  the cluster and also generates the SSL certificate.
437

438
  """
439
  # Create pseudo random password
440
  randpass = sha.new(os.urandom(64)).hexdigest()
441
  # and write it into sstore
442
  ss.SetKey(ss.SS_NODED_PASS, randpass)
443

    
444
  result = utils.RunCmd(["openssl", "req", "-new", "-newkey", "rsa:1024",
445
                         "-days", str(365*5), "-nodes", "-x509",
446
                         "-keyout", constants.SSL_CERT_FILE,
447
                         "-out", constants.SSL_CERT_FILE, "-batch"])
448
  if result.failed:
449
    raise errors.OpExecError("could not generate server ssl cert, command"
450
                             " %s had exitcode %s and error message %s" %
451
                             (result.cmd, result.exit_code, result.output))
452

    
453
  os.chmod(constants.SSL_CERT_FILE, 0400)
454

    
455
  result = utils.RunCmd([constants.NODE_INITD_SCRIPT, "restart"])
456

    
457
  if result.failed:
458
    raise errors.OpExecError("Could not start the node daemon, command %s"
459
                             " had exitcode %s and error %s" %
460
                             (result.cmd, result.exit_code, result.output))
461

    
462

    
463
def _CheckInstanceBridgesExist(instance):
464
  """Check that the brigdes needed by an instance exist.
465

466
  """
467
  # check bridges existance
468
  brlist = [nic.bridge for nic in instance.nics]
469
  if not rpc.call_bridges_exist(instance.primary_node, brlist):
470
    raise errors.OpPrereqError("one or more target bridges %s does not"
471
                               " exist on destination node '%s'" %
472
                               (brlist, instance.primary_node))
473

    
474

    
475
class LUInitCluster(LogicalUnit):
476
  """Initialise the cluster.
477

478
  """
479
  HPATH = "cluster-init"
480
  HTYPE = constants.HTYPE_CLUSTER
481
  _OP_REQP = ["cluster_name", "hypervisor_type", "vg_name", "mac_prefix",
482
              "def_bridge", "master_netdev"]
483
  REQ_CLUSTER = False
484

    
485
  def BuildHooksEnv(self):
486
    """Build hooks env.
487

488
    Notes: Since we don't require a cluster, we must manually add
489
    ourselves in the post-run node list.
490

491
    """
492
    env = {"OP_TARGET": self.op.cluster_name}
493
    return env, [], [self.hostname.name]
494

    
495
  def CheckPrereq(self):
496
    """Verify that the passed name is a valid one.
497

498
    """
499
    if config.ConfigWriter.IsCluster():
500
      raise errors.OpPrereqError("Cluster is already initialised")
501

    
502
    self.hostname = hostname = utils.HostInfo()
503

    
504
    if hostname.ip.startswith("127."):
505
      raise errors.OpPrereqError("This host's IP resolves to the private"
506
                                 " range (%s). Please fix DNS or /etc/hosts." %
507
                                 (hostname.ip,))
508

    
509
    self.clustername = clustername = utils.HostInfo(self.op.cluster_name)
510

    
511
    if not utils.TcpPing(constants.LOCALHOST_IP_ADDRESS, hostname.ip,
512
                         constants.DEFAULT_NODED_PORT):
513
      raise errors.OpPrereqError("Inconsistency: this host's name resolves"
514
                                 " to %s,\nbut this ip address does not"
515
                                 " belong to this host."
516
                                 " Aborting." % hostname.ip)
517

    
518
    secondary_ip = getattr(self.op, "secondary_ip", None)
519
    if secondary_ip and not utils.IsValidIP(secondary_ip):
520
      raise errors.OpPrereqError("Invalid secondary ip given")
521
    if (secondary_ip and
522
        secondary_ip != hostname.ip and
523
        (not utils.TcpPing(constants.LOCALHOST_IP_ADDRESS, secondary_ip,
524
                           constants.DEFAULT_NODED_PORT))):
525
      raise errors.OpPrereqError("You gave %s as secondary IP,\n"
526
                                 "but it does not belong to this host." %
527
                                 secondary_ip)
528
    self.secondary_ip = secondary_ip
529

    
530
    # checks presence of the volume group given
531
    vgstatus = _HasValidVG(utils.ListVolumeGroups(), self.op.vg_name)
532

    
533
    if vgstatus:
534
      raise errors.OpPrereqError("Error: %s" % vgstatus)
535

    
536
    if not re.match("^[0-9a-z]{2}:[0-9a-z]{2}:[0-9a-z]{2}$",
537
                    self.op.mac_prefix):
538
      raise errors.OpPrereqError("Invalid mac prefix given '%s'" %
539
                                 self.op.mac_prefix)
540

    
541
    if self.op.hypervisor_type not in hypervisor.VALID_HTYPES:
542
      raise errors.OpPrereqError("Invalid hypervisor type given '%s'" %
543
                                 self.op.hypervisor_type)
544

    
545
    result = utils.RunCmd(["ip", "link", "show", "dev", self.op.master_netdev])
546
    if result.failed:
547
      raise errors.OpPrereqError("Invalid master netdev given (%s): '%s'" %
548
                                 (self.op.master_netdev,
549
                                  result.output.strip()))
550

    
551
    if not (os.path.isfile(constants.NODE_INITD_SCRIPT) and
552
            os.access(constants.NODE_INITD_SCRIPT, os.X_OK)):
553
      raise errors.OpPrereqError("Init.d script '%s' missing or not "
554
                                 "executable." % constants.NODE_INITD_SCRIPT)
555

    
556
  def Exec(self, feedback_fn):
557
    """Initialize the cluster.
558

559
    """
560
    clustername = self.clustername
561
    hostname = self.hostname
562

    
563
    # set up the simple store
564
    self.sstore = ss = ssconf.SimpleStore()
565
    ss.SetKey(ss.SS_HYPERVISOR, self.op.hypervisor_type)
566
    ss.SetKey(ss.SS_MASTER_NODE, hostname.name)
567
    ss.SetKey(ss.SS_MASTER_IP, clustername.ip)
568
    ss.SetKey(ss.SS_MASTER_NETDEV, self.op.master_netdev)
569
    ss.SetKey(ss.SS_CLUSTER_NAME, clustername.name)
570

    
571
    # set up the inter-node password and certificate
572
    _InitGanetiServerSetup(ss)
573

    
574
    # start the master ip
575
    rpc.call_node_start_master(hostname.name)
576

    
577
    # set up ssh config and /etc/hosts
578
    f = open(constants.SSH_HOST_RSA_PUB, 'r')
579
    try:
580
      sshline = f.read()
581
    finally:
582
      f.close()
583
    sshkey = sshline.split(" ")[1]
584

    
585
    _AddHostToEtcHosts(hostname.name)
586

    
587
    _UpdateKnownHosts(hostname.name, hostname.ip, sshkey)
588

    
589
    _InitSSHSetup(hostname.name)
590

    
591
    # init of cluster config file
592
    self.cfg = cfgw = config.ConfigWriter()
593
    cfgw.InitConfig(hostname.name, hostname.ip, self.secondary_ip,
594
                    sshkey, self.op.mac_prefix,
595
                    self.op.vg_name, self.op.def_bridge)
596

    
597

    
598
class LUDestroyCluster(NoHooksLU):
599
  """Logical unit for destroying the cluster.
600

601
  """
602
  _OP_REQP = []
603

    
604
  def CheckPrereq(self):
605
    """Check prerequisites.
606

607
    This checks whether the cluster is empty.
608

609
    Any errors are signalled by raising errors.OpPrereqError.
610

611
    """
612
    master = self.sstore.GetMasterNode()
613

    
614
    nodelist = self.cfg.GetNodeList()
615
    if len(nodelist) != 1 or nodelist[0] != master:
616
      raise errors.OpPrereqError("There are still %d node(s) in"
617
                                 " this cluster." % (len(nodelist) - 1))
618
    instancelist = self.cfg.GetInstanceList()
619
    if instancelist:
620
      raise errors.OpPrereqError("There are still %d instance(s) in"
621
                                 " this cluster." % len(instancelist))
622

    
623
  def Exec(self, feedback_fn):
624
    """Destroys the cluster.
625

626
    """
627
    master = self.sstore.GetMasterNode()
628
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
629
    utils.CreateBackup(priv_key)
630
    utils.CreateBackup(pub_key)
631
    rpc.call_node_leave_cluster(master)
632

    
633

    
634
class LUVerifyCluster(NoHooksLU):
635
  """Verifies the cluster status.
636

637
  """
638
  _OP_REQP = []
639

    
640
  def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
641
                  remote_version, feedback_fn):
642
    """Run multiple tests against a node.
643

644
    Test list:
645
      - compares ganeti version
646
      - checks vg existance and size > 20G
647
      - checks config file checksum
648
      - checks ssh to other nodes
649

650
    Args:
651
      node: name of the node to check
652
      file_list: required list of files
653
      local_cksum: dictionary of local files and their checksums
654

655
    """
656
    # compares ganeti version
657
    local_version = constants.PROTOCOL_VERSION
658
    if not remote_version:
659
      feedback_fn(" - ERROR: connection to %s failed" % (node))
660
      return True
661

    
662
    if local_version != remote_version:
663
      feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
664
                      (local_version, node, remote_version))
665
      return True
666

    
667
    # checks vg existance and size > 20G
668

    
669
    bad = False
670
    if not vglist:
671
      feedback_fn("  - ERROR: unable to check volume groups on node %s." %
672
                      (node,))
673
      bad = True
674
    else:
675
      vgstatus = _HasValidVG(vglist, self.cfg.GetVGName())
676
      if vgstatus:
677
        feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
678
        bad = True
679

    
680
    # checks config file checksum
681
    # checks ssh to any
682

    
683
    if 'filelist' not in node_result:
684
      bad = True
685
      feedback_fn("  - ERROR: node hasn't returned file checksum data")
686
    else:
687
      remote_cksum = node_result['filelist']
688
      for file_name in file_list:
689
        if file_name not in remote_cksum:
690
          bad = True
691
          feedback_fn("  - ERROR: file '%s' missing" % file_name)
692
        elif remote_cksum[file_name] != local_cksum[file_name]:
693
          bad = True
694
          feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
695

    
696
    if 'nodelist' not in node_result:
697
      bad = True
698
      feedback_fn("  - ERROR: node hasn't returned node connectivity data")
699
    else:
700
      if node_result['nodelist']:
701
        bad = True
702
        for node in node_result['nodelist']:
703
          feedback_fn("  - ERROR: communication with node '%s': %s" %
704
                          (node, node_result['nodelist'][node]))
705
    hyp_result = node_result.get('hypervisor', None)
706
    if hyp_result is not None:
707
      feedback_fn("  - ERROR: hypervisor verify failure: '%s'" % hyp_result)
708
    return bad
709

    
710
  def _VerifyInstance(self, instance, node_vol_is, node_instance, feedback_fn):
711
    """Verify an instance.
712

713
    This function checks to see if the required block devices are
714
    available on the instance's node.
715

716
    """
717
    bad = False
718

    
719
    instancelist = self.cfg.GetInstanceList()
720
    if not instance in instancelist:
721
      feedback_fn("  - ERROR: instance %s not in instance list %s" %
722
                      (instance, instancelist))
723
      bad = True
724

    
725
    instanceconfig = self.cfg.GetInstanceInfo(instance)
726
    node_current = instanceconfig.primary_node
727

    
728
    node_vol_should = {}
729
    instanceconfig.MapLVsByNode(node_vol_should)
730

    
731
    for node in node_vol_should:
732
      for volume in node_vol_should[node]:
733
        if node not in node_vol_is or volume not in node_vol_is[node]:
734
          feedback_fn("  - ERROR: volume %s missing on node %s" %
735
                          (volume, node))
736
          bad = True
737

    
738
    if not instanceconfig.status == 'down':
739
      if not instance in node_instance[node_current]:
740
        feedback_fn("  - ERROR: instance %s not running on node %s" %
741
                        (instance, node_current))
742
        bad = True
743

    
744
    for node in node_instance:
745
      if (not node == node_current):
746
        if instance in node_instance[node]:
747
          feedback_fn("  - ERROR: instance %s should not run on node %s" %
748
                          (instance, node))
749
          bad = True
750

    
751
    return bad
752

    
753
  def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
754
    """Verify if there are any unknown volumes in the cluster.
755

756
    The .os, .swap and backup volumes are ignored. All other volumes are
757
    reported as unknown.
758

759
    """
760
    bad = False
761

    
762
    for node in node_vol_is:
763
      for volume in node_vol_is[node]:
764
        if node not in node_vol_should or volume not in node_vol_should[node]:
765
          feedback_fn("  - ERROR: volume %s on node %s should not exist" %
766
                      (volume, node))
767
          bad = True
768
    return bad
769

    
770
  def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
771
    """Verify the list of running instances.
772

773
    This checks what instances are running but unknown to the cluster.
774

775
    """
776
    bad = False
777
    for node in node_instance:
778
      for runninginstance in node_instance[node]:
779
        if runninginstance not in instancelist:
780
          feedback_fn("  - ERROR: instance %s on node %s should not exist" %
781
                          (runninginstance, node))
782
          bad = True
783
    return bad
784

    
785
  def CheckPrereq(self):
786
    """Check prerequisites.
787

788
    This has no prerequisites.
789

790
    """
791
    pass
792

    
793
  def Exec(self, feedback_fn):
794
    """Verify integrity of cluster, performing various test on nodes.
795

796
    """
797
    bad = False
798
    feedback_fn("* Verifying global settings")
799
    self.cfg.VerifyConfig()
800

    
801
    vg_name = self.cfg.GetVGName()
802
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
803
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
804
    node_volume = {}
805
    node_instance = {}
806

    
807
    # FIXME: verify OS list
808
    # do local checksums
809
    file_names = list(self.sstore.GetFileList())
810
    file_names.append(constants.SSL_CERT_FILE)
811
    file_names.append(constants.CLUSTER_CONF_FILE)
812
    local_checksums = utils.FingerprintFiles(file_names)
813

    
814
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
815
    all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
816
    all_instanceinfo = rpc.call_instance_list(nodelist)
817
    all_vglist = rpc.call_vg_list(nodelist)
818
    node_verify_param = {
819
      'filelist': file_names,
820
      'nodelist': nodelist,
821
      'hypervisor': None,
822
      }
823
    all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
824
    all_rversion = rpc.call_version(nodelist)
825

    
826
    for node in nodelist:
827
      feedback_fn("* Verifying node %s" % node)
828
      result = self._VerifyNode(node, file_names, local_checksums,
829
                                all_vglist[node], all_nvinfo[node],
830
                                all_rversion[node], feedback_fn)
831
      bad = bad or result
832

    
833
      # node_volume
834
      volumeinfo = all_volumeinfo[node]
835

    
836
      if type(volumeinfo) != dict:
837
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
838
        bad = True
839
        continue
840

    
841
      node_volume[node] = volumeinfo
842

    
843
      # node_instance
844
      nodeinstance = all_instanceinfo[node]
845
      if type(nodeinstance) != list:
846
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
847
        bad = True
848
        continue
849

    
850
      node_instance[node] = nodeinstance
851

    
852
    node_vol_should = {}
853

    
854
    for instance in instancelist:
855
      feedback_fn("* Verifying instance %s" % instance)
856
      result =  self._VerifyInstance(instance, node_volume, node_instance,
857
                                     feedback_fn)
858
      bad = bad or result
859

    
860
      inst_config = self.cfg.GetInstanceInfo(instance)
861

    
862
      inst_config.MapLVsByNode(node_vol_should)
863

    
864
    feedback_fn("* Verifying orphan volumes")
865
    result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
866
                                       feedback_fn)
867
    bad = bad or result
868

    
869
    feedback_fn("* Verifying remaining instances")
870
    result = self._VerifyOrphanInstances(instancelist, node_instance,
871
                                         feedback_fn)
872
    bad = bad or result
873

    
874
    return int(bad)
875

    
876

    
877
class LURenameCluster(LogicalUnit):
878
  """Rename the cluster.
879

880
  """
881
  HPATH = "cluster-rename"
882
  HTYPE = constants.HTYPE_CLUSTER
883
  _OP_REQP = ["name"]
884

    
885
  def BuildHooksEnv(self):
886
    """Build hooks env.
887

888
    """
889
    env = {
890
      "OP_TARGET": self.op.sstore.GetClusterName(),
891
      "NEW_NAME": self.op.name,
892
      }
893
    mn = self.sstore.GetMasterNode()
894
    return env, [mn], [mn]
895

    
896
  def CheckPrereq(self):
897
    """Verify that the passed name is a valid one.
898

899
    """
900
    hostname = utils.HostInfo(self.op.name)
901

    
902
    new_name = hostname.name
903
    self.ip = new_ip = hostname.ip
904
    old_name = self.sstore.GetClusterName()
905
    old_ip = self.sstore.GetMasterIP()
906
    if new_name == old_name and new_ip == old_ip:
907
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
908
                                 " cluster has changed")
909
    if new_ip != old_ip:
910
      result = utils.RunCmd(["fping", "-q", new_ip])
911
      if not result.failed:
912
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
913
                                   " reachable on the network. Aborting." %
914
                                   new_ip)
915

    
916
    self.op.name = new_name
917

    
918
  def Exec(self, feedback_fn):
919
    """Rename the cluster.
920

921
    """
922
    clustername = self.op.name
923
    ip = self.ip
924
    ss = self.sstore
925

    
926
    # shutdown the master IP
927
    master = ss.GetMasterNode()
928
    if not rpc.call_node_stop_master(master):
929
      raise errors.OpExecError("Could not disable the master role")
930

    
931
    try:
932
      # modify the sstore
933
      ss.SetKey(ss.SS_MASTER_IP, ip)
934
      ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
935

    
936
      # Distribute updated ss config to all nodes
937
      myself = self.cfg.GetNodeInfo(master)
938
      dist_nodes = self.cfg.GetNodeList()
939
      if myself.name in dist_nodes:
940
        dist_nodes.remove(myself.name)
941

    
942
      logger.Debug("Copying updated ssconf data to all nodes")
943
      for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
944
        fname = ss.KeyToFilename(keyname)
945
        result = rpc.call_upload_file(dist_nodes, fname)
946
        for to_node in dist_nodes:
947
          if not result[to_node]:
948
            logger.Error("copy of file %s to node %s failed" %
949
                         (fname, to_node))
950
    finally:
951
      if not rpc.call_node_start_master(master):
952
        logger.Error("Could not re-enable the master role on the master,\n"
953
                     "please restart manually.")
954

    
955

    
956
def _WaitForSync(cfgw, instance, proc, oneshot=False, unlock=False):
957
  """Sleep and poll for an instance's disk to sync.
958

959
  """
960
  if not instance.disks:
961
    return True
962

    
963
  if not oneshot:
964
    proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
965

    
966
  node = instance.primary_node
967

    
968
  for dev in instance.disks:
969
    cfgw.SetDiskID(dev, node)
970

    
971
  retries = 0
972
  while True:
973
    max_time = 0
974
    done = True
975
    cumul_degraded = False
976
    rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
977
    if not rstats:
978
      proc.LogWarning("Can't get any data from node %s" % node)
979
      retries += 1
980
      if retries >= 10:
981
        raise errors.RemoteError("Can't contact node %s for mirror data,"
982
                                 " aborting." % node)
983
      time.sleep(6)
984
      continue
985
    retries = 0
986
    for i in range(len(rstats)):
987
      mstat = rstats[i]
988
      if mstat is None:
989
        proc.LogWarning("Can't compute data for node %s/%s" %
990
                        (node, instance.disks[i].iv_name))
991
        continue
992
      # we ignore the ldisk parameter
993
      perc_done, est_time, is_degraded, _ = mstat
994
      cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
995
      if perc_done is not None:
996
        done = False
997
        if est_time is not None:
998
          rem_time = "%d estimated seconds remaining" % est_time
999
          max_time = est_time
1000
        else:
1001
          rem_time = "no time estimate"
1002
        proc.LogInfo("- device %s: %5.2f%% done, %s" %
1003
                     (instance.disks[i].iv_name, perc_done, rem_time))
1004
    if done or oneshot:
1005
      break
1006

    
1007
    if unlock:
1008
      utils.Unlock('cmd')
1009
    try:
1010
      time.sleep(min(60, max_time))
1011
    finally:
1012
      if unlock:
1013
        utils.Lock('cmd')
1014

    
1015
  if done:
1016
    proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1017
  return not cumul_degraded
1018

    
1019

    
1020
def _CheckDiskConsistency(cfgw, dev, node, on_primary, ldisk=False):
1021
  """Check that mirrors are not degraded.
1022

1023
  The ldisk parameter, if True, will change the test from the
1024
  is_degraded attribute (which represents overall non-ok status for
1025
  the device(s)) to the ldisk (representing the local storage status).
1026

1027
  """
1028
  cfgw.SetDiskID(dev, node)
1029
  if ldisk:
1030
    idx = 6
1031
  else:
1032
    idx = 5
1033

    
1034
  result = True
1035
  if on_primary or dev.AssembleOnSecondary():
1036
    rstats = rpc.call_blockdev_find(node, dev)
1037
    if not rstats:
1038
      logger.ToStderr("Can't get any data from node %s" % node)
1039
      result = False
1040
    else:
1041
      result = result and (not rstats[idx])
1042
  if dev.children:
1043
    for child in dev.children:
1044
      result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1045

    
1046
  return result
1047

    
1048

    
1049
class LUDiagnoseOS(NoHooksLU):
1050
  """Logical unit for OS diagnose/query.
1051

1052
  """
1053
  _OP_REQP = []
1054

    
1055
  def CheckPrereq(self):
1056
    """Check prerequisites.
1057

1058
    This always succeeds, since this is a pure query LU.
1059

1060
    """
1061
    return
1062

    
1063
  def Exec(self, feedback_fn):
1064
    """Compute the list of OSes.
1065

1066
    """
1067
    node_list = self.cfg.GetNodeList()
1068
    node_data = rpc.call_os_diagnose(node_list)
1069
    if node_data == False:
1070
      raise errors.OpExecError("Can't gather the list of OSes")
1071
    return node_data
1072

    
1073

    
1074
class LURemoveNode(LogicalUnit):
1075
  """Logical unit for removing a node.
1076

1077
  """
1078
  HPATH = "node-remove"
1079
  HTYPE = constants.HTYPE_NODE
1080
  _OP_REQP = ["node_name"]
1081

    
1082
  def BuildHooksEnv(self):
1083
    """Build hooks env.
1084

1085
    This doesn't run on the target node in the pre phase as a failed
1086
    node would not allows itself to run.
1087

1088
    """
1089
    env = {
1090
      "OP_TARGET": self.op.node_name,
1091
      "NODE_NAME": self.op.node_name,
1092
      }
1093
    all_nodes = self.cfg.GetNodeList()
1094
    all_nodes.remove(self.op.node_name)
1095
    return env, all_nodes, all_nodes
1096

    
1097
  def CheckPrereq(self):
1098
    """Check prerequisites.
1099

1100
    This checks:
1101
     - the node exists in the configuration
1102
     - it does not have primary or secondary instances
1103
     - it's not the master
1104

1105
    Any errors are signalled by raising errors.OpPrereqError.
1106

1107
    """
1108
    node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1109
    if node is None:
1110
      raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1111

    
1112
    instance_list = self.cfg.GetInstanceList()
1113

    
1114
    masternode = self.sstore.GetMasterNode()
1115
    if node.name == masternode:
1116
      raise errors.OpPrereqError("Node is the master node,"
1117
                                 " you need to failover first.")
1118

    
1119
    for instance_name in instance_list:
1120
      instance = self.cfg.GetInstanceInfo(instance_name)
1121
      if node.name == instance.primary_node:
1122
        raise errors.OpPrereqError("Instance %s still running on the node,"
1123
                                   " please remove first." % instance_name)
1124
      if node.name in instance.secondary_nodes:
1125
        raise errors.OpPrereqError("Instance %s has node as a secondary,"
1126
                                   " please remove first." % instance_name)
1127
    self.op.node_name = node.name
1128
    self.node = node
1129

    
1130
  def Exec(self, feedback_fn):
1131
    """Removes the node from the cluster.
1132

1133
    """
1134
    node = self.node
1135
    logger.Info("stopping the node daemon and removing configs from node %s" %
1136
                node.name)
1137

    
1138
    rpc.call_node_leave_cluster(node.name)
1139

    
1140
    ssh.SSHCall(node.name, 'root', "%s stop" % constants.NODE_INITD_SCRIPT)
1141

    
1142
    logger.Info("Removing node %s from config" % node.name)
1143

    
1144
    self.cfg.RemoveNode(node.name)
1145

    
1146
    _RemoveHostFromEtcHosts(node.name)
1147

    
1148

    
1149
class LUQueryNodes(NoHooksLU):
1150
  """Logical unit for querying nodes.
1151

1152
  """
1153
  _OP_REQP = ["output_fields", "names"]
1154

    
1155
  def CheckPrereq(self):
1156
    """Check prerequisites.
1157

1158
    This checks that the fields required are valid output fields.
1159

1160
    """
1161
    self.dynamic_fields = frozenset(["dtotal", "dfree",
1162
                                     "mtotal", "mnode", "mfree",
1163
                                     "bootid"])
1164

    
1165
    _CheckOutputFields(static=["name", "pinst_cnt", "sinst_cnt",
1166
                               "pinst_list", "sinst_list",
1167
                               "pip", "sip"],
1168
                       dynamic=self.dynamic_fields,
1169
                       selected=self.op.output_fields)
1170

    
1171
    self.wanted = _GetWantedNodes(self, self.op.names)
1172

    
1173
  def Exec(self, feedback_fn):
1174
    """Computes the list of nodes and their attributes.
1175

1176
    """
1177
    nodenames = self.wanted
1178
    nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1179

    
1180
    # begin data gathering
1181

    
1182
    if self.dynamic_fields.intersection(self.op.output_fields):
1183
      live_data = {}
1184
      node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1185
      for name in nodenames:
1186
        nodeinfo = node_data.get(name, None)
1187
        if nodeinfo:
1188
          live_data[name] = {
1189
            "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1190
            "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1191
            "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1192
            "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1193
            "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1194
            "bootid": nodeinfo['bootid'],
1195
            }
1196
        else:
1197
          live_data[name] = {}
1198
    else:
1199
      live_data = dict.fromkeys(nodenames, {})
1200

    
1201
    node_to_primary = dict([(name, set()) for name in nodenames])
1202
    node_to_secondary = dict([(name, set()) for name in nodenames])
1203

    
1204
    inst_fields = frozenset(("pinst_cnt", "pinst_list",
1205
                             "sinst_cnt", "sinst_list"))
1206
    if inst_fields & frozenset(self.op.output_fields):
1207
      instancelist = self.cfg.GetInstanceList()
1208

    
1209
      for instance_name in instancelist:
1210
        inst = self.cfg.GetInstanceInfo(instance_name)
1211
        if inst.primary_node in node_to_primary:
1212
          node_to_primary[inst.primary_node].add(inst.name)
1213
        for secnode in inst.secondary_nodes:
1214
          if secnode in node_to_secondary:
1215
            node_to_secondary[secnode].add(inst.name)
1216

    
1217
    # end data gathering
1218

    
1219
    output = []
1220
    for node in nodelist:
1221
      node_output = []
1222
      for field in self.op.output_fields:
1223
        if field == "name":
1224
          val = node.name
1225
        elif field == "pinst_list":
1226
          val = list(node_to_primary[node.name])
1227
        elif field == "sinst_list":
1228
          val = list(node_to_secondary[node.name])
1229
        elif field == "pinst_cnt":
1230
          val = len(node_to_primary[node.name])
1231
        elif field == "sinst_cnt":
1232
          val = len(node_to_secondary[node.name])
1233
        elif field == "pip":
1234
          val = node.primary_ip
1235
        elif field == "sip":
1236
          val = node.secondary_ip
1237
        elif field in self.dynamic_fields:
1238
          val = live_data[node.name].get(field, None)
1239
        else:
1240
          raise errors.ParameterError(field)
1241
        node_output.append(val)
1242
      output.append(node_output)
1243

    
1244
    return output
1245

    
1246

    
1247
class LUQueryNodeVolumes(NoHooksLU):
1248
  """Logical unit for getting volumes on node(s).
1249

1250
  """
1251
  _OP_REQP = ["nodes", "output_fields"]
1252

    
1253
  def CheckPrereq(self):
1254
    """Check prerequisites.
1255

1256
    This checks that the fields required are valid output fields.
1257

1258
    """
1259
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1260

    
1261
    _CheckOutputFields(static=["node"],
1262
                       dynamic=["phys", "vg", "name", "size", "instance"],
1263
                       selected=self.op.output_fields)
1264

    
1265

    
1266
  def Exec(self, feedback_fn):
1267
    """Computes the list of nodes and their attributes.
1268

1269
    """
1270
    nodenames = self.nodes
1271
    volumes = rpc.call_node_volumes(nodenames)
1272

    
1273
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
1274
             in self.cfg.GetInstanceList()]
1275

    
1276
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1277

    
1278
    output = []
1279
    for node in nodenames:
1280
      if node not in volumes or not volumes[node]:
1281
        continue
1282

    
1283
      node_vols = volumes[node][:]
1284
      node_vols.sort(key=lambda vol: vol['dev'])
1285

    
1286
      for vol in node_vols:
1287
        node_output = []
1288
        for field in self.op.output_fields:
1289
          if field == "node":
1290
            val = node
1291
          elif field == "phys":
1292
            val = vol['dev']
1293
          elif field == "vg":
1294
            val = vol['vg']
1295
          elif field == "name":
1296
            val = vol['name']
1297
          elif field == "size":
1298
            val = int(float(vol['size']))
1299
          elif field == "instance":
1300
            for inst in ilist:
1301
              if node not in lv_by_node[inst]:
1302
                continue
1303
              if vol['name'] in lv_by_node[inst][node]:
1304
                val = inst.name
1305
                break
1306
            else:
1307
              val = '-'
1308
          else:
1309
            raise errors.ParameterError(field)
1310
          node_output.append(str(val))
1311

    
1312
        output.append(node_output)
1313

    
1314
    return output
1315

    
1316

    
1317
class LUAddNode(LogicalUnit):
1318
  """Logical unit for adding node to the cluster.
1319

1320
  """
1321
  HPATH = "node-add"
1322
  HTYPE = constants.HTYPE_NODE
1323
  _OP_REQP = ["node_name"]
1324

    
1325
  def BuildHooksEnv(self):
1326
    """Build hooks env.
1327

1328
    This will run on all nodes before, and on all nodes + the new node after.
1329

1330
    """
1331
    env = {
1332
      "OP_TARGET": self.op.node_name,
1333
      "NODE_NAME": self.op.node_name,
1334
      "NODE_PIP": self.op.primary_ip,
1335
      "NODE_SIP": self.op.secondary_ip,
1336
      }
1337
    nodes_0 = self.cfg.GetNodeList()
1338
    nodes_1 = nodes_0 + [self.op.node_name, ]
1339
    return env, nodes_0, nodes_1
1340

    
1341
  def CheckPrereq(self):
1342
    """Check prerequisites.
1343

1344
    This checks:
1345
     - the new node is not already in the config
1346
     - it is resolvable
1347
     - its parameters (single/dual homed) matches the cluster
1348

1349
    Any errors are signalled by raising errors.OpPrereqError.
1350

1351
    """
1352
    node_name = self.op.node_name
1353
    cfg = self.cfg
1354

    
1355
    dns_data = utils.HostInfo(node_name)
1356

    
1357
    node = dns_data.name
1358
    primary_ip = self.op.primary_ip = dns_data.ip
1359
    secondary_ip = getattr(self.op, "secondary_ip", None)
1360
    if secondary_ip is None:
1361
      secondary_ip = primary_ip
1362
    if not utils.IsValidIP(secondary_ip):
1363
      raise errors.OpPrereqError("Invalid secondary IP given")
1364
    self.op.secondary_ip = secondary_ip
1365
    node_list = cfg.GetNodeList()
1366
    if node in node_list:
1367
      raise errors.OpPrereqError("Node %s is already in the configuration"
1368
                                 % node)
1369

    
1370
    for existing_node_name in node_list:
1371
      existing_node = cfg.GetNodeInfo(existing_node_name)
1372
      if (existing_node.primary_ip == primary_ip or
1373
          existing_node.secondary_ip == primary_ip or
1374
          existing_node.primary_ip == secondary_ip or
1375
          existing_node.secondary_ip == secondary_ip):
1376
        raise errors.OpPrereqError("New node ip address(es) conflict with"
1377
                                   " existing node %s" % existing_node.name)
1378

    
1379
    # check that the type of the node (single versus dual homed) is the
1380
    # same as for the master
1381
    myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1382
    master_singlehomed = myself.secondary_ip == myself.primary_ip
1383
    newbie_singlehomed = secondary_ip == primary_ip
1384
    if master_singlehomed != newbie_singlehomed:
1385
      if master_singlehomed:
1386
        raise errors.OpPrereqError("The master has no private ip but the"
1387
                                   " new node has one")
1388
      else:
1389
        raise errors.OpPrereqError("The master has a private ip but the"
1390
                                   " new node doesn't have one")
1391

    
1392
    # checks reachablity
1393
    if not utils.TcpPing(utils.HostInfo().name,
1394
                         primary_ip,
1395
                         constants.DEFAULT_NODED_PORT):
1396
      raise errors.OpPrereqError("Node not reachable by ping")
1397

    
1398
    if not newbie_singlehomed:
1399
      # check reachability from my secondary ip to newbie's secondary ip
1400
      if not utils.TcpPing(myself.secondary_ip,
1401
                           secondary_ip,
1402
                           constants.DEFAULT_NODED_PORT):
1403
        raise errors.OpPrereqError(
1404
          "Node secondary ip not reachable by TCP based ping to noded port")
1405

    
1406
    self.new_node = objects.Node(name=node,
1407
                                 primary_ip=primary_ip,
1408
                                 secondary_ip=secondary_ip)
1409

    
1410
  def Exec(self, feedback_fn):
1411
    """Adds the new node to the cluster.
1412

1413
    """
1414
    new_node = self.new_node
1415
    node = new_node.name
1416

    
1417
    # set up inter-node password and certificate and restarts the node daemon
1418
    gntpass = self.sstore.GetNodeDaemonPassword()
1419
    if not re.match('^[a-zA-Z0-9.]{1,64}$', gntpass):
1420
      raise errors.OpExecError("ganeti password corruption detected")
1421
    f = open(constants.SSL_CERT_FILE)
1422
    try:
1423
      gntpem = f.read(8192)
1424
    finally:
1425
      f.close()
1426
    # in the base64 pem encoding, neither '!' nor '.' are valid chars,
1427
    # so we use this to detect an invalid certificate; as long as the
1428
    # cert doesn't contain this, the here-document will be correctly
1429
    # parsed by the shell sequence below
1430
    if re.search('^!EOF\.', gntpem, re.MULTILINE):
1431
      raise errors.OpExecError("invalid PEM encoding in the SSL certificate")
1432
    if not gntpem.endswith("\n"):
1433
      raise errors.OpExecError("PEM must end with newline")
1434
    logger.Info("copy cluster pass to %s and starting the node daemon" % node)
1435

    
1436
    # and then connect with ssh to set password and start ganeti-noded
1437
    # note that all the below variables are sanitized at this point,
1438
    # either by being constants or by the checks above
1439
    ss = self.sstore
1440
    mycommand = ("umask 077 && "
1441
                 "echo '%s' > '%s' && "
1442
                 "cat > '%s' << '!EOF.' && \n"
1443
                 "%s!EOF.\n%s restart" %
1444
                 (gntpass, ss.KeyToFilename(ss.SS_NODED_PASS),
1445
                  constants.SSL_CERT_FILE, gntpem,
1446
                  constants.NODE_INITD_SCRIPT))
1447

    
1448
    result = ssh.SSHCall(node, 'root', mycommand, batch=False, ask_key=True)
1449
    if result.failed:
1450
      raise errors.OpExecError("Remote command on node %s, error: %s,"
1451
                               " output: %s" %
1452
                               (node, result.fail_reason, result.output))
1453

    
1454
    # check connectivity
1455
    time.sleep(4)
1456

    
1457
    result = rpc.call_version([node])[node]
1458
    if result:
1459
      if constants.PROTOCOL_VERSION == result:
1460
        logger.Info("communication to node %s fine, sw version %s match" %
1461
                    (node, result))
1462
      else:
1463
        raise errors.OpExecError("Version mismatch master version %s,"
1464
                                 " node version %s" %
1465
                                 (constants.PROTOCOL_VERSION, result))
1466
    else:
1467
      raise errors.OpExecError("Cannot get version from the new node")
1468

    
1469
    # setup ssh on node
1470
    logger.Info("copy ssh key to node %s" % node)
1471
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1472
    keyarray = []
1473
    keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1474
                constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1475
                priv_key, pub_key]
1476

    
1477
    for i in keyfiles:
1478
      f = open(i, 'r')
1479
      try:
1480
        keyarray.append(f.read())
1481
      finally:
1482
        f.close()
1483

    
1484
    result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1485
                               keyarray[3], keyarray[4], keyarray[5])
1486

    
1487
    if not result:
1488
      raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1489

    
1490
    # Add node to our /etc/hosts, and add key to known_hosts
1491
    _AddHostToEtcHosts(new_node.name)
1492

    
1493
    _UpdateKnownHosts(new_node.name, new_node.primary_ip,
1494
                      self.cfg.GetHostKey())
1495

    
1496
    if new_node.secondary_ip != new_node.primary_ip:
1497
      if not rpc.call_node_tcp_ping(new_node.name,
1498
                                    constants.LOCALHOST_IP_ADDRESS,
1499
                                    new_node.secondary_ip,
1500
                                    constants.DEFAULT_NODED_PORT,
1501
                                    10, False):
1502
        raise errors.OpExecError("Node claims it doesn't have the"
1503
                                 " secondary ip you gave (%s).\n"
1504
                                 "Please fix and re-run this command." %
1505
                                 new_node.secondary_ip)
1506

    
1507
    success, msg = ssh.VerifyNodeHostname(node)
1508
    if not success:
1509
      raise errors.OpExecError("Node '%s' claims it has a different hostname"
1510
                               " than the one the resolver gives: %s.\n"
1511
                               "Please fix and re-run this command." %
1512
                               (node, msg))
1513

    
1514
    # Distribute updated /etc/hosts and known_hosts to all nodes,
1515
    # including the node just added
1516
    myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1517
    dist_nodes = self.cfg.GetNodeList() + [node]
1518
    if myself.name in dist_nodes:
1519
      dist_nodes.remove(myself.name)
1520

    
1521
    logger.Debug("Copying hosts and known_hosts to all nodes")
1522
    for fname in ("/etc/hosts", constants.SSH_KNOWN_HOSTS_FILE):
1523
      result = rpc.call_upload_file(dist_nodes, fname)
1524
      for to_node in dist_nodes:
1525
        if not result[to_node]:
1526
          logger.Error("copy of file %s to node %s failed" %
1527
                       (fname, to_node))
1528

    
1529
    to_copy = ss.GetFileList()
1530
    for fname in to_copy:
1531
      if not ssh.CopyFileToNode(node, fname):
1532
        logger.Error("could not copy file %s to node %s" % (fname, node))
1533

    
1534
    logger.Info("adding node %s to cluster.conf" % node)
1535
    self.cfg.AddNode(new_node)
1536

    
1537

    
1538
class LUMasterFailover(LogicalUnit):
1539
  """Failover the master node to the current node.
1540

1541
  This is a special LU in that it must run on a non-master node.
1542

1543
  """
1544
  HPATH = "master-failover"
1545
  HTYPE = constants.HTYPE_CLUSTER
1546
  REQ_MASTER = False
1547
  _OP_REQP = []
1548

    
1549
  def BuildHooksEnv(self):
1550
    """Build hooks env.
1551

1552
    This will run on the new master only in the pre phase, and on all
1553
    the nodes in the post phase.
1554

1555
    """
1556
    env = {
1557
      "OP_TARGET": self.new_master,
1558
      "NEW_MASTER": self.new_master,
1559
      "OLD_MASTER": self.old_master,
1560
      }
1561
    return env, [self.new_master], self.cfg.GetNodeList()
1562

    
1563
  def CheckPrereq(self):
1564
    """Check prerequisites.
1565

1566
    This checks that we are not already the master.
1567

1568
    """
1569
    self.new_master = utils.HostInfo().name
1570
    self.old_master = self.sstore.GetMasterNode()
1571

    
1572
    if self.old_master == self.new_master:
1573
      raise errors.OpPrereqError("This commands must be run on the node"
1574
                                 " where you want the new master to be.\n"
1575
                                 "%s is already the master" %
1576
                                 self.old_master)
1577

    
1578
  def Exec(self, feedback_fn):
1579
    """Failover the master node.
1580

1581
    This command, when run on a non-master node, will cause the current
1582
    master to cease being master, and the non-master to become new
1583
    master.
1584

1585
    """
1586
    #TODO: do not rely on gethostname returning the FQDN
1587
    logger.Info("setting master to %s, old master: %s" %
1588
                (self.new_master, self.old_master))
1589

    
1590
    if not rpc.call_node_stop_master(self.old_master):
1591
      logger.Error("could disable the master role on the old master"
1592
                   " %s, please disable manually" % self.old_master)
1593

    
1594
    ss = self.sstore
1595
    ss.SetKey(ss.SS_MASTER_NODE, self.new_master)
1596
    if not rpc.call_upload_file(self.cfg.GetNodeList(),
1597
                                ss.KeyToFilename(ss.SS_MASTER_NODE)):
1598
      logger.Error("could not distribute the new simple store master file"
1599
                   " to the other nodes, please check.")
1600

    
1601
    if not rpc.call_node_start_master(self.new_master):
1602
      logger.Error("could not start the master role on the new master"
1603
                   " %s, please check" % self.new_master)
1604
      feedback_fn("Error in activating the master IP on the new master,\n"
1605
                  "please fix manually.")
1606

    
1607

    
1608

    
1609
class LUQueryClusterInfo(NoHooksLU):
1610
  """Query cluster configuration.
1611

1612
  """
1613
  _OP_REQP = []
1614
  REQ_MASTER = False
1615

    
1616
  def CheckPrereq(self):
1617
    """No prerequsites needed for this LU.
1618

1619
    """
1620
    pass
1621

    
1622
  def Exec(self, feedback_fn):
1623
    """Return cluster config.
1624

1625
    """
1626
    result = {
1627
      "name": self.sstore.GetClusterName(),
1628
      "software_version": constants.RELEASE_VERSION,
1629
      "protocol_version": constants.PROTOCOL_VERSION,
1630
      "config_version": constants.CONFIG_VERSION,
1631
      "os_api_version": constants.OS_API_VERSION,
1632
      "export_version": constants.EXPORT_VERSION,
1633
      "master": self.sstore.GetMasterNode(),
1634
      "architecture": (platform.architecture()[0], platform.machine()),
1635
      }
1636

    
1637
    return result
1638

    
1639

    
1640
class LUClusterCopyFile(NoHooksLU):
1641
  """Copy file to cluster.
1642

1643
  """
1644
  _OP_REQP = ["nodes", "filename"]
1645

    
1646
  def CheckPrereq(self):
1647
    """Check prerequisites.
1648

1649
    It should check that the named file exists and that the given list
1650
    of nodes is valid.
1651

1652
    """
1653
    if not os.path.exists(self.op.filename):
1654
      raise errors.OpPrereqError("No such filename '%s'" % self.op.filename)
1655

    
1656
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1657

    
1658
  def Exec(self, feedback_fn):
1659
    """Copy a file from master to some nodes.
1660

1661
    Args:
1662
      opts - class with options as members
1663
      args - list containing a single element, the file name
1664
    Opts used:
1665
      nodes - list containing the name of target nodes; if empty, all nodes
1666

1667
    """
1668
    filename = self.op.filename
1669

    
1670
    myname = utils.HostInfo().name
1671

    
1672
    for node in self.nodes:
1673
      if node == myname:
1674
        continue
1675
      if not ssh.CopyFileToNode(node, filename):
1676
        logger.Error("Copy of file %s to node %s failed" % (filename, node))
1677

    
1678

    
1679
class LUDumpClusterConfig(NoHooksLU):
1680
  """Return a text-representation of the cluster-config.
1681

1682
  """
1683
  _OP_REQP = []
1684

    
1685
  def CheckPrereq(self):
1686
    """No prerequisites.
1687

1688
    """
1689
    pass
1690

    
1691
  def Exec(self, feedback_fn):
1692
    """Dump a representation of the cluster config to the standard output.
1693

1694
    """
1695
    return self.cfg.DumpConfig()
1696

    
1697

    
1698
class LURunClusterCommand(NoHooksLU):
1699
  """Run a command on some nodes.
1700

1701
  """
1702
  _OP_REQP = ["command", "nodes"]
1703

    
1704
  def CheckPrereq(self):
1705
    """Check prerequisites.
1706

1707
    It checks that the given list of nodes is valid.
1708

1709
    """
1710
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1711

    
1712
  def Exec(self, feedback_fn):
1713
    """Run a command on some nodes.
1714

1715
    """
1716
    data = []
1717
    for node in self.nodes:
1718
      result = ssh.SSHCall(node, "root", self.op.command)
1719
      data.append((node, result.output, result.exit_code))
1720

    
1721
    return data
1722

    
1723

    
1724
class LUActivateInstanceDisks(NoHooksLU):
1725
  """Bring up an instance's disks.
1726

1727
  """
1728
  _OP_REQP = ["instance_name"]
1729

    
1730
  def CheckPrereq(self):
1731
    """Check prerequisites.
1732

1733
    This checks that the instance is in the cluster.
1734

1735
    """
1736
    instance = self.cfg.GetInstanceInfo(
1737
      self.cfg.ExpandInstanceName(self.op.instance_name))
1738
    if instance is None:
1739
      raise errors.OpPrereqError("Instance '%s' not known" %
1740
                                 self.op.instance_name)
1741
    self.instance = instance
1742

    
1743

    
1744
  def Exec(self, feedback_fn):
1745
    """Activate the disks.
1746

1747
    """
1748
    disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
1749
    if not disks_ok:
1750
      raise errors.OpExecError("Cannot activate block devices")
1751

    
1752
    return disks_info
1753

    
1754

    
1755
def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
1756
  """Prepare the block devices for an instance.
1757

1758
  This sets up the block devices on all nodes.
1759

1760
  Args:
1761
    instance: a ganeti.objects.Instance object
1762
    ignore_secondaries: if true, errors on secondary nodes won't result
1763
                        in an error return from the function
1764

1765
  Returns:
1766
    false if the operation failed
1767
    list of (host, instance_visible_name, node_visible_name) if the operation
1768
         suceeded with the mapping from node devices to instance devices
1769
  """
1770
  device_info = []
1771
  disks_ok = True
1772
  for inst_disk in instance.disks:
1773
    master_result = None
1774
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1775
      cfg.SetDiskID(node_disk, node)
1776
      is_primary = node == instance.primary_node
1777
      result = rpc.call_blockdev_assemble(node, node_disk,
1778
                                          instance.name, is_primary)
1779
      if not result:
1780
        logger.Error("could not prepare block device %s on node %s (is_pri"
1781
                     "mary=%s)" % (inst_disk.iv_name, node, is_primary))
1782
        if is_primary or not ignore_secondaries:
1783
          disks_ok = False
1784
      if is_primary:
1785
        master_result = result
1786
    device_info.append((instance.primary_node, inst_disk.iv_name,
1787
                        master_result))
1788

    
1789
  # leave the disks configured for the primary node
1790
  # this is a workaround that would be fixed better by
1791
  # improving the logical/physical id handling
1792
  for disk in instance.disks:
1793
    cfg.SetDiskID(disk, instance.primary_node)
1794

    
1795
  return disks_ok, device_info
1796

    
1797

    
1798
def _StartInstanceDisks(cfg, instance, force):
1799
  """Start the disks of an instance.
1800

1801
  """
1802
  disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
1803
                                           ignore_secondaries=force)
1804
  if not disks_ok:
1805
    _ShutdownInstanceDisks(instance, cfg)
1806
    if force is not None and not force:
1807
      logger.Error("If the message above refers to a secondary node,"
1808
                   " you can retry the operation using '--force'.")
1809
    raise errors.OpExecError("Disk consistency error")
1810

    
1811

    
1812
class LUDeactivateInstanceDisks(NoHooksLU):
1813
  """Shutdown an instance's disks.
1814

1815
  """
1816
  _OP_REQP = ["instance_name"]
1817

    
1818
  def CheckPrereq(self):
1819
    """Check prerequisites.
1820

1821
    This checks that the instance is in the cluster.
1822

1823
    """
1824
    instance = self.cfg.GetInstanceInfo(
1825
      self.cfg.ExpandInstanceName(self.op.instance_name))
1826
    if instance is None:
1827
      raise errors.OpPrereqError("Instance '%s' not known" %
1828
                                 self.op.instance_name)
1829
    self.instance = instance
1830

    
1831
  def Exec(self, feedback_fn):
1832
    """Deactivate the disks
1833

1834
    """
1835
    instance = self.instance
1836
    ins_l = rpc.call_instance_list([instance.primary_node])
1837
    ins_l = ins_l[instance.primary_node]
1838
    if not type(ins_l) is list:
1839
      raise errors.OpExecError("Can't contact node '%s'" %
1840
                               instance.primary_node)
1841

    
1842
    if self.instance.name in ins_l:
1843
      raise errors.OpExecError("Instance is running, can't shutdown"
1844
                               " block devices.")
1845

    
1846
    _ShutdownInstanceDisks(instance, self.cfg)
1847

    
1848

    
1849
def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
1850
  """Shutdown block devices of an instance.
1851

1852
  This does the shutdown on all nodes of the instance.
1853

1854
  If the ignore_primary is false, errors on the primary node are
1855
  ignored.
1856

1857
  """
1858
  result = True
1859
  for disk in instance.disks:
1860
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
1861
      cfg.SetDiskID(top_disk, node)
1862
      if not rpc.call_blockdev_shutdown(node, top_disk):
1863
        logger.Error("could not shutdown block device %s on node %s" %
1864
                     (disk.iv_name, node))
1865
        if not ignore_primary or node != instance.primary_node:
1866
          result = False
1867
  return result
1868

    
1869

    
1870
class LUStartupInstance(LogicalUnit):
1871
  """Starts an instance.
1872

1873
  """
1874
  HPATH = "instance-start"
1875
  HTYPE = constants.HTYPE_INSTANCE
1876
  _OP_REQP = ["instance_name", "force"]
1877

    
1878
  def BuildHooksEnv(self):
1879
    """Build hooks env.
1880

1881
    This runs on master, primary and secondary nodes of the instance.
1882

1883
    """
1884
    env = {
1885
      "FORCE": self.op.force,
1886
      }
1887
    env.update(_BuildInstanceHookEnvByObject(self.instance))
1888
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
1889
          list(self.instance.secondary_nodes))
1890
    return env, nl, nl
1891

    
1892
  def CheckPrereq(self):
1893
    """Check prerequisites.
1894

1895
    This checks that the instance is in the cluster.
1896

1897
    """
1898
    instance = self.cfg.GetInstanceInfo(
1899
      self.cfg.ExpandInstanceName(self.op.instance_name))
1900
    if instance is None:
1901
      raise errors.OpPrereqError("Instance '%s' not known" %
1902
                                 self.op.instance_name)
1903

    
1904
    # check bridges existance
1905
    _CheckInstanceBridgesExist(instance)
1906

    
1907
    self.instance = instance
1908
    self.op.instance_name = instance.name
1909

    
1910
  def Exec(self, feedback_fn):
1911
    """Start the instance.
1912

1913
    """
1914
    instance = self.instance
1915
    force = self.op.force
1916
    extra_args = getattr(self.op, "extra_args", "")
1917

    
1918
    node_current = instance.primary_node
1919

    
1920
    nodeinfo = rpc.call_node_info([node_current], self.cfg.GetVGName())
1921
    if not nodeinfo:
1922
      raise errors.OpExecError("Could not contact node %s for infos" %
1923
                               (node_current))
1924

    
1925
    freememory = nodeinfo[node_current]['memory_free']
1926
    memory = instance.memory
1927
    if memory > freememory:
1928
      raise errors.OpExecError("Not enough memory to start instance"
1929
                               " %s on node %s"
1930
                               " needed %s MiB, available %s MiB" %
1931
                               (instance.name, node_current, memory,
1932
                                freememory))
1933

    
1934
    _StartInstanceDisks(self.cfg, instance, force)
1935

    
1936
    if not rpc.call_instance_start(node_current, instance, extra_args):
1937
      _ShutdownInstanceDisks(instance, self.cfg)
1938
      raise errors.OpExecError("Could not start instance")
1939

    
1940
    self.cfg.MarkInstanceUp(instance.name)
1941

    
1942

    
1943
class LURebootInstance(LogicalUnit):
1944
  """Reboot an instance.
1945

1946
  """
1947
  HPATH = "instance-reboot"
1948
  HTYPE = constants.HTYPE_INSTANCE
1949
  _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
1950

    
1951
  def BuildHooksEnv(self):
1952
    """Build hooks env.
1953

1954
    This runs on master, primary and secondary nodes of the instance.
1955

1956
    """
1957
    env = {
1958
      "IGNORE_SECONDARIES": self.op.ignore_secondaries,
1959
      }
1960
    env.update(_BuildInstanceHookEnvByObject(self.instance))
1961
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
1962
          list(self.instance.secondary_nodes))
1963
    return env, nl, nl
1964

    
1965
  def CheckPrereq(self):
1966
    """Check prerequisites.
1967

1968
    This checks that the instance is in the cluster.
1969

1970
    """
1971
    instance = self.cfg.GetInstanceInfo(
1972
      self.cfg.ExpandInstanceName(self.op.instance_name))
1973
    if instance is None:
1974
      raise errors.OpPrereqError("Instance '%s' not known" %
1975
                                 self.op.instance_name)
1976

    
1977
    # check bridges existance
1978
    _CheckInstanceBridgesExist(instance)
1979

    
1980
    self.instance = instance
1981
    self.op.instance_name = instance.name
1982

    
1983
  def Exec(self, feedback_fn):
1984
    """Reboot the instance.
1985

1986
    """
1987
    instance = self.instance
1988
    ignore_secondaries = self.op.ignore_secondaries
1989
    reboot_type = self.op.reboot_type
1990
    extra_args = getattr(self.op, "extra_args", "")
1991

    
1992
    node_current = instance.primary_node
1993

    
1994
    if reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
1995
                           constants.INSTANCE_REBOOT_HARD,
1996
                           constants.INSTANCE_REBOOT_FULL]:
1997
      raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
1998
                                  (constants.INSTANCE_REBOOT_SOFT,
1999
                                   constants.INSTANCE_REBOOT_HARD,
2000
                                   constants.INSTANCE_REBOOT_FULL))
2001

    
2002
    if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2003
                       constants.INSTANCE_REBOOT_HARD]:
2004
      if not rpc.call_instance_reboot(node_current, instance,
2005
                                      reboot_type, extra_args):
2006
        raise errors.OpExecError("Could not reboot instance")
2007
    else:
2008
      if not rpc.call_instance_shutdown(node_current, instance):
2009
        raise errors.OpExecError("could not shutdown instance for full reboot")
2010
      _ShutdownInstanceDisks(instance, self.cfg)
2011
      _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
2012
      if not rpc.call_instance_start(node_current, instance, extra_args):
2013
        _ShutdownInstanceDisks(instance, self.cfg)
2014
        raise errors.OpExecError("Could not start instance for full reboot")
2015

    
2016
    self.cfg.MarkInstanceUp(instance.name)
2017

    
2018

    
2019
class LUShutdownInstance(LogicalUnit):
2020
  """Shutdown an instance.
2021

2022
  """
2023
  HPATH = "instance-stop"
2024
  HTYPE = constants.HTYPE_INSTANCE
2025
  _OP_REQP = ["instance_name"]
2026

    
2027
  def BuildHooksEnv(self):
2028
    """Build hooks env.
2029

2030
    This runs on master, primary and secondary nodes of the instance.
2031

2032
    """
2033
    env = _BuildInstanceHookEnvByObject(self.instance)
2034
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2035
          list(self.instance.secondary_nodes))
2036
    return env, nl, nl
2037

    
2038
  def CheckPrereq(self):
2039
    """Check prerequisites.
2040

2041
    This checks that the instance is in the cluster.
2042

2043
    """
2044
    instance = self.cfg.GetInstanceInfo(
2045
      self.cfg.ExpandInstanceName(self.op.instance_name))
2046
    if instance is None:
2047
      raise errors.OpPrereqError("Instance '%s' not known" %
2048
                                 self.op.instance_name)
2049
    self.instance = instance
2050

    
2051
  def Exec(self, feedback_fn):
2052
    """Shutdown the instance.
2053

2054
    """
2055
    instance = self.instance
2056
    node_current = instance.primary_node
2057
    if not rpc.call_instance_shutdown(node_current, instance):
2058
      logger.Error("could not shutdown instance")
2059

    
2060
    self.cfg.MarkInstanceDown(instance.name)
2061
    _ShutdownInstanceDisks(instance, self.cfg)
2062

    
2063

    
2064
class LUReinstallInstance(LogicalUnit):
2065
  """Reinstall an instance.
2066

2067
  """
2068
  HPATH = "instance-reinstall"
2069
  HTYPE = constants.HTYPE_INSTANCE
2070
  _OP_REQP = ["instance_name"]
2071

    
2072
  def BuildHooksEnv(self):
2073
    """Build hooks env.
2074

2075
    This runs on master, primary and secondary nodes of the instance.
2076

2077
    """
2078
    env = _BuildInstanceHookEnvByObject(self.instance)
2079
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2080
          list(self.instance.secondary_nodes))
2081
    return env, nl, nl
2082

    
2083
  def CheckPrereq(self):
2084
    """Check prerequisites.
2085

2086
    This checks that the instance is in the cluster and is not running.
2087

2088
    """
2089
    instance = self.cfg.GetInstanceInfo(
2090
      self.cfg.ExpandInstanceName(self.op.instance_name))
2091
    if instance is None:
2092
      raise errors.OpPrereqError("Instance '%s' not known" %
2093
                                 self.op.instance_name)
2094
    if instance.disk_template == constants.DT_DISKLESS:
2095
      raise errors.OpPrereqError("Instance '%s' has no disks" %
2096
                                 self.op.instance_name)
2097
    if instance.status != "down":
2098
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2099
                                 self.op.instance_name)
2100
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2101
    if remote_info:
2102
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2103
                                 (self.op.instance_name,
2104
                                  instance.primary_node))
2105

    
2106
    self.op.os_type = getattr(self.op, "os_type", None)
2107
    if self.op.os_type is not None:
2108
      # OS verification
2109
      pnode = self.cfg.GetNodeInfo(
2110
        self.cfg.ExpandNodeName(instance.primary_node))
2111
      if pnode is None:
2112
        raise errors.OpPrereqError("Primary node '%s' is unknown" %
2113
                                   self.op.pnode)
2114
      os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2115
      if not os_obj:
2116
        raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2117
                                   " primary node"  % self.op.os_type)
2118

    
2119
    self.instance = instance
2120

    
2121
  def Exec(self, feedback_fn):
2122
    """Reinstall the instance.
2123

2124
    """
2125
    inst = self.instance
2126

    
2127
    if self.op.os_type is not None:
2128
      feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2129
      inst.os = self.op.os_type
2130
      self.cfg.AddInstance(inst)
2131

    
2132
    _StartInstanceDisks(self.cfg, inst, None)
2133
    try:
2134
      feedback_fn("Running the instance OS create scripts...")
2135
      if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2136
        raise errors.OpExecError("Could not install OS for instance %s "
2137
                                 "on node %s" %
2138
                                 (inst.name, inst.primary_node))
2139
    finally:
2140
      _ShutdownInstanceDisks(inst, self.cfg)
2141

    
2142

    
2143
class LURenameInstance(LogicalUnit):
2144
  """Rename an instance.
2145

2146
  """
2147
  HPATH = "instance-rename"
2148
  HTYPE = constants.HTYPE_INSTANCE
2149
  _OP_REQP = ["instance_name", "new_name"]
2150

    
2151
  def BuildHooksEnv(self):
2152
    """Build hooks env.
2153

2154
    This runs on master, primary and secondary nodes of the instance.
2155

2156
    """
2157
    env = _BuildInstanceHookEnvByObject(self.instance)
2158
    env["INSTANCE_NEW_NAME"] = self.op.new_name
2159
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2160
          list(self.instance.secondary_nodes))
2161
    return env, nl, nl
2162

    
2163
  def CheckPrereq(self):
2164
    """Check prerequisites.
2165

2166
    This checks that the instance is in the cluster and is not running.
2167

2168
    """
2169
    instance = self.cfg.GetInstanceInfo(
2170
      self.cfg.ExpandInstanceName(self.op.instance_name))
2171
    if instance is None:
2172
      raise errors.OpPrereqError("Instance '%s' not known" %
2173
                                 self.op.instance_name)
2174
    if instance.status != "down":
2175
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2176
                                 self.op.instance_name)
2177
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2178
    if remote_info:
2179
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2180
                                 (self.op.instance_name,
2181
                                  instance.primary_node))
2182
    self.instance = instance
2183

    
2184
    # new name verification
2185
    name_info = utils.HostInfo(self.op.new_name)
2186

    
2187
    self.op.new_name = new_name = name_info.name
2188
    if not getattr(self.op, "ignore_ip", False):
2189
      command = ["fping", "-q", name_info.ip]
2190
      result = utils.RunCmd(command)
2191
      if not result.failed:
2192
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
2193
                                   (name_info.ip, new_name))
2194

    
2195

    
2196
  def Exec(self, feedback_fn):
2197
    """Reinstall the instance.
2198

2199
    """
2200
    inst = self.instance
2201
    old_name = inst.name
2202

    
2203
    self.cfg.RenameInstance(inst.name, self.op.new_name)
2204

    
2205
    # re-read the instance from the configuration after rename
2206
    inst = self.cfg.GetInstanceInfo(self.op.new_name)
2207

    
2208
    _StartInstanceDisks(self.cfg, inst, None)
2209
    try:
2210
      if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2211
                                          "sda", "sdb"):
2212
        msg = ("Could run OS rename script for instance %s\n"
2213
               "on node %s\n"
2214
               "(but the instance has been renamed in Ganeti)" %
2215
               (inst.name, inst.primary_node))
2216
        logger.Error(msg)
2217
    finally:
2218
      _ShutdownInstanceDisks(inst, self.cfg)
2219

    
2220

    
2221
class LURemoveInstance(LogicalUnit):
2222
  """Remove an instance.
2223

2224
  """
2225
  HPATH = "instance-remove"
2226
  HTYPE = constants.HTYPE_INSTANCE
2227
  _OP_REQP = ["instance_name"]
2228

    
2229
  def BuildHooksEnv(self):
2230
    """Build hooks env.
2231

2232
    This runs on master, primary and secondary nodes of the instance.
2233

2234
    """
2235
    env = _BuildInstanceHookEnvByObject(self.instance)
2236
    nl = [self.sstore.GetMasterNode()]
2237
    return env, nl, nl
2238

    
2239
  def CheckPrereq(self):
2240
    """Check prerequisites.
2241

2242
    This checks that the instance is in the cluster.
2243

2244
    """
2245
    instance = self.cfg.GetInstanceInfo(
2246
      self.cfg.ExpandInstanceName(self.op.instance_name))
2247
    if instance is None:
2248
      raise errors.OpPrereqError("Instance '%s' not known" %
2249
                                 self.op.instance_name)
2250
    self.instance = instance
2251

    
2252
  def Exec(self, feedback_fn):
2253
    """Remove the instance.
2254

2255
    """
2256
    instance = self.instance
2257
    logger.Info("shutting down instance %s on node %s" %
2258
                (instance.name, instance.primary_node))
2259

    
2260
    if not rpc.call_instance_shutdown(instance.primary_node, instance):
2261
      if self.op.ignore_failures:
2262
        feedback_fn("Warning: can't shutdown instance")
2263
      else:
2264
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2265
                                 (instance.name, instance.primary_node))
2266

    
2267
    logger.Info("removing block devices for instance %s" % instance.name)
2268

    
2269
    if not _RemoveDisks(instance, self.cfg):
2270
      if self.op.ignore_failures:
2271
        feedback_fn("Warning: can't remove instance's disks")
2272
      else:
2273
        raise errors.OpExecError("Can't remove instance's disks")
2274

    
2275
    logger.Info("removing instance %s out of cluster config" % instance.name)
2276

    
2277
    self.cfg.RemoveInstance(instance.name)
2278

    
2279

    
2280
class LUQueryInstances(NoHooksLU):
2281
  """Logical unit for querying instances.
2282

2283
  """
2284
  _OP_REQP = ["output_fields", "names"]
2285

    
2286
  def CheckPrereq(self):
2287
    """Check prerequisites.
2288

2289
    This checks that the fields required are valid output fields.
2290

2291
    """
2292
    self.dynamic_fields = frozenset(["oper_state", "oper_ram"])
2293
    _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2294
                               "admin_state", "admin_ram",
2295
                               "disk_template", "ip", "mac", "bridge",
2296
                               "sda_size", "sdb_size"],
2297
                       dynamic=self.dynamic_fields,
2298
                       selected=self.op.output_fields)
2299

    
2300
    self.wanted = _GetWantedInstances(self, self.op.names)
2301

    
2302
  def Exec(self, feedback_fn):
2303
    """Computes the list of nodes and their attributes.
2304

2305
    """
2306
    instance_names = self.wanted
2307
    instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2308
                     in instance_names]
2309

    
2310
    # begin data gathering
2311

    
2312
    nodes = frozenset([inst.primary_node for inst in instance_list])
2313

    
2314
    bad_nodes = []
2315
    if self.dynamic_fields.intersection(self.op.output_fields):
2316
      live_data = {}
2317
      node_data = rpc.call_all_instances_info(nodes)
2318
      for name in nodes:
2319
        result = node_data[name]
2320
        if result:
2321
          live_data.update(result)
2322
        elif result == False:
2323
          bad_nodes.append(name)
2324
        # else no instance is alive
2325
    else:
2326
      live_data = dict([(name, {}) for name in instance_names])
2327

    
2328
    # end data gathering
2329

    
2330
    output = []
2331
    for instance in instance_list:
2332
      iout = []
2333
      for field in self.op.output_fields:
2334
        if field == "name":
2335
          val = instance.name
2336
        elif field == "os":
2337
          val = instance.os
2338
        elif field == "pnode":
2339
          val = instance.primary_node
2340
        elif field == "snodes":
2341
          val = list(instance.secondary_nodes)
2342
        elif field == "admin_state":
2343
          val = (instance.status != "down")
2344
        elif field == "oper_state":
2345
          if instance.primary_node in bad_nodes:
2346
            val = None
2347
          else:
2348
            val = bool(live_data.get(instance.name))
2349
        elif field == "admin_ram":
2350
          val = instance.memory
2351
        elif field == "oper_ram":
2352
          if instance.primary_node in bad_nodes:
2353
            val = None
2354
          elif instance.name in live_data:
2355
            val = live_data[instance.name].get("memory", "?")
2356
          else:
2357
            val = "-"
2358
        elif field == "disk_template":
2359
          val = instance.disk_template
2360
        elif field == "ip":
2361
          val = instance.nics[0].ip
2362
        elif field == "bridge":
2363
          val = instance.nics[0].bridge
2364
        elif field == "mac":
2365
          val = instance.nics[0].mac
2366
        elif field == "sda_size" or field == "sdb_size":
2367
          disk = instance.FindDisk(field[:3])
2368
          if disk is None:
2369
            val = None
2370
          else:
2371
            val = disk.size
2372
        else:
2373
          raise errors.ParameterError(field)
2374
        iout.append(val)
2375
      output.append(iout)
2376

    
2377
    return output
2378

    
2379

    
2380
class LUFailoverInstance(LogicalUnit):
2381
  """Failover an instance.
2382

2383
  """
2384
  HPATH = "instance-failover"
2385
  HTYPE = constants.HTYPE_INSTANCE
2386
  _OP_REQP = ["instance_name", "ignore_consistency"]
2387

    
2388
  def BuildHooksEnv(self):
2389
    """Build hooks env.
2390

2391
    This runs on master, primary and secondary nodes of the instance.
2392

2393
    """
2394
    env = {
2395
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2396
      }
2397
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2398
    nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2399
    return env, nl, nl
2400

    
2401
  def CheckPrereq(self):
2402
    """Check prerequisites.
2403

2404
    This checks that the instance is in the cluster.
2405

2406
    """
2407
    instance = self.cfg.GetInstanceInfo(
2408
      self.cfg.ExpandInstanceName(self.op.instance_name))
2409
    if instance is None:
2410
      raise errors.OpPrereqError("Instance '%s' not known" %
2411
                                 self.op.instance_name)
2412

    
2413
    if instance.disk_template not in constants.DTS_NET_MIRROR:
2414
      raise errors.OpPrereqError("Instance's disk layout is not"
2415
                                 " network mirrored, cannot failover.")
2416

    
2417
    secondary_nodes = instance.secondary_nodes
2418
    if not secondary_nodes:
2419
      raise errors.ProgrammerError("no secondary node but using "
2420
                                   "DT_REMOTE_RAID1 template")
2421

    
2422
    # check memory requirements on the secondary node
2423
    target_node = secondary_nodes[0]
2424
    nodeinfo = rpc.call_node_info([target_node], self.cfg.GetVGName())
2425
    info = nodeinfo.get(target_node, None)
2426
    if not info:
2427
      raise errors.OpPrereqError("Cannot get current information"
2428
                                 " from node '%s'" % nodeinfo)
2429
    if instance.memory > info['memory_free']:
2430
      raise errors.OpPrereqError("Not enough memory on target node %s."
2431
                                 " %d MB available, %d MB required" %
2432
                                 (target_node, info['memory_free'],
2433
                                  instance.memory))
2434

    
2435
    # check bridge existance
2436
    brlist = [nic.bridge for nic in instance.nics]
2437
    if not rpc.call_bridges_exist(target_node, brlist):
2438
      raise errors.OpPrereqError("One or more target bridges %s does not"
2439
                                 " exist on destination node '%s'" %
2440
                                 (brlist, target_node))
2441

    
2442
    self.instance = instance
2443

    
2444
  def Exec(self, feedback_fn):
2445
    """Failover an instance.
2446

2447
    The failover is done by shutting it down on its present node and
2448
    starting it on the secondary.
2449

2450
    """
2451
    instance = self.instance
2452

    
2453
    source_node = instance.primary_node
2454
    target_node = instance.secondary_nodes[0]
2455

    
2456
    feedback_fn("* checking disk consistency between source and target")
2457
    for dev in instance.disks:
2458
      # for remote_raid1, these are md over drbd
2459
      if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2460
        if not self.op.ignore_consistency:
2461
          raise errors.OpExecError("Disk %s is degraded on target node,"
2462
                                   " aborting failover." % dev.iv_name)
2463

    
2464
    feedback_fn("* checking target node resource availability")
2465
    nodeinfo = rpc.call_node_info([target_node], self.cfg.GetVGName())
2466

    
2467
    if not nodeinfo:
2468
      raise errors.OpExecError("Could not contact target node %s." %
2469
                               target_node)
2470

    
2471
    free_memory = int(nodeinfo[target_node]['memory_free'])
2472
    memory = instance.memory
2473
    if memory > free_memory:
2474
      raise errors.OpExecError("Not enough memory to create instance %s on"
2475
                               " node %s. needed %s MiB, available %s MiB" %
2476
                               (instance.name, target_node, memory,
2477
                                free_memory))
2478

    
2479
    feedback_fn("* shutting down instance on source node")
2480
    logger.Info("Shutting down instance %s on node %s" %
2481
                (instance.name, source_node))
2482

    
2483
    if not rpc.call_instance_shutdown(source_node, instance):
2484
      if self.op.ignore_consistency:
2485
        logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2486
                     " anyway. Please make sure node %s is down"  %
2487
                     (instance.name, source_node, source_node))
2488
      else:
2489
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2490
                                 (instance.name, source_node))
2491

    
2492
    feedback_fn("* deactivating the instance's disks on source node")
2493
    if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2494
      raise errors.OpExecError("Can't shut down the instance's disks.")
2495

    
2496
    instance.primary_node = target_node
2497
    # distribute new instance config to the other nodes
2498
    self.cfg.AddInstance(instance)
2499

    
2500
    feedback_fn("* activating the instance's disks on target node")
2501
    logger.Info("Starting instance %s on node %s" %
2502
                (instance.name, target_node))
2503

    
2504
    disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2505
                                             ignore_secondaries=True)
2506
    if not disks_ok:
2507
      _ShutdownInstanceDisks(instance, self.cfg)
2508
      raise errors.OpExecError("Can't activate the instance's disks")
2509

    
2510
    feedback_fn("* starting the instance on the target node")
2511
    if not rpc.call_instance_start(target_node, instance, None):
2512
      _ShutdownInstanceDisks(instance, self.cfg)
2513
      raise errors.OpExecError("Could not start instance %s on node %s." %
2514
                               (instance.name, target_node))
2515

    
2516

    
2517
def _CreateBlockDevOnPrimary(cfg, node, instance, device, info):
2518
  """Create a tree of block devices on the primary node.
2519

2520
  This always creates all devices.
2521

2522
  """
2523
  if device.children:
2524
    for child in device.children:
2525
      if not _CreateBlockDevOnPrimary(cfg, node, instance, child, info):
2526
        return False
2527

    
2528
  cfg.SetDiskID(device, node)
2529
  new_id = rpc.call_blockdev_create(node, device, device.size,
2530
                                    instance.name, True, info)
2531
  if not new_id:
2532
    return False
2533
  if device.physical_id is None:
2534
    device.physical_id = new_id
2535
  return True
2536

    
2537

    
2538
def _CreateBlockDevOnSecondary(cfg, node, instance, device, force, info):
2539
  """Create a tree of block devices on a secondary node.
2540

2541
  If this device type has to be created on secondaries, create it and
2542
  all its children.
2543

2544
  If not, just recurse to children keeping the same 'force' value.
2545

2546
  """
2547
  if device.CreateOnSecondary():
2548
    force = True
2549
  if device.children:
2550
    for child in device.children:
2551
      if not _CreateBlockDevOnSecondary(cfg, node, instance,
2552
                                        child, force, info):
2553
        return False
2554

    
2555
  if not force:
2556
    return True
2557
  cfg.SetDiskID(device, node)
2558
  new_id = rpc.call_blockdev_create(node, device, device.size,
2559
                                    instance.name, False, info)
2560
  if not new_id:
2561
    return False
2562
  if device.physical_id is None:
2563
    device.physical_id = new_id
2564
  return True
2565

    
2566

    
2567
def _GenerateUniqueNames(cfg, exts):
2568
  """Generate a suitable LV name.
2569

2570
  This will generate a logical volume name for the given instance.
2571

2572
  """
2573
  results = []
2574
  for val in exts:
2575
    new_id = cfg.GenerateUniqueID()
2576
    results.append("%s%s" % (new_id, val))
2577
  return results
2578

    
2579

    
2580
def _GenerateMDDRBDBranch(cfg, primary, secondary, size, names):
2581
  """Generate a drbd device complete with its children.
2582

2583
  """
2584
  port = cfg.AllocatePort()
2585
  vgname = cfg.GetVGName()
2586
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2587
                          logical_id=(vgname, names[0]))
2588
  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2589
                          logical_id=(vgname, names[1]))
2590
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD7, size=size,
2591
                          logical_id = (primary, secondary, port),
2592
                          children = [dev_data, dev_meta])
2593
  return drbd_dev
2594

    
2595

    
2596
def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name):
2597
  """Generate a drbd8 device complete with its children.
2598

2599
  """
2600
  port = cfg.AllocatePort()
2601
  vgname = cfg.GetVGName()
2602
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2603
                          logical_id=(vgname, names[0]))
2604
  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2605
                          logical_id=(vgname, names[1]))
2606
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
2607
                          logical_id = (primary, secondary, port),
2608
                          children = [dev_data, dev_meta],
2609
                          iv_name=iv_name)
2610
  return drbd_dev
2611

    
2612
def _GenerateDiskTemplate(cfg, template_name,
2613
                          instance_name, primary_node,
2614
                          secondary_nodes, disk_sz, swap_sz):
2615
  """Generate the entire disk layout for a given template type.
2616

2617
  """
2618
  #TODO: compute space requirements
2619

    
2620
  vgname = cfg.GetVGName()
2621
  if template_name == "diskless":
2622
    disks = []
2623
  elif template_name == "plain":
2624
    if len(secondary_nodes) != 0:
2625
      raise errors.ProgrammerError("Wrong template configuration")
2626

    
2627
    names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
2628
    sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2629
                           logical_id=(vgname, names[0]),
2630
                           iv_name = "sda")
2631
    sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2632
                           logical_id=(vgname, names[1]),
2633
                           iv_name = "sdb")
2634
    disks = [sda_dev, sdb_dev]
2635
  elif template_name == "local_raid1":
2636
    if len(secondary_nodes) != 0:
2637
      raise errors.ProgrammerError("Wrong template configuration")
2638

    
2639

    
2640
    names = _GenerateUniqueNames(cfg, [".sda_m1", ".sda_m2",
2641
                                       ".sdb_m1", ".sdb_m2"])
2642
    sda_dev_m1 = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2643
                              logical_id=(vgname, names[0]))
2644
    sda_dev_m2 = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2645
                              logical_id=(vgname, names[1]))
2646
    md_sda_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name = "sda",
2647
                              size=disk_sz,
2648
                              children = [sda_dev_m1, sda_dev_m2])
2649
    sdb_dev_m1 = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2650
                              logical_id=(vgname, names[2]))
2651
    sdb_dev_m2 = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2652
                              logical_id=(vgname, names[3]))
2653
    md_sdb_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name = "sdb",
2654
                              size=swap_sz,
2655
                              children = [sdb_dev_m1, sdb_dev_m2])
2656
    disks = [md_sda_dev, md_sdb_dev]
2657
  elif template_name == constants.DT_REMOTE_RAID1:
2658
    if len(secondary_nodes) != 1:
2659
      raise errors.ProgrammerError("Wrong template configuration")
2660
    remote_node = secondary_nodes[0]
2661
    names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2662
                                       ".sdb_data", ".sdb_meta"])
2663
    drbd_sda_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2664
                                         disk_sz, names[0:2])
2665
    md_sda_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name="sda",
2666
                              children = [drbd_sda_dev], size=disk_sz)
2667
    drbd_sdb_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2668
                                         swap_sz, names[2:4])
2669
    md_sdb_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name="sdb",
2670
                              children = [drbd_sdb_dev], size=swap_sz)
2671
    disks = [md_sda_dev, md_sdb_dev]
2672
  elif template_name == constants.DT_DRBD8:
2673
    if len(secondary_nodes) != 1:
2674
      raise errors.ProgrammerError("Wrong template configuration")
2675
    remote_node = secondary_nodes[0]
2676
    names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2677
                                       ".sdb_data", ".sdb_meta"])
2678
    drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2679
                                         disk_sz, names[0:2], "sda")
2680
    drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2681
                                         swap_sz, names[2:4], "sdb")
2682
    disks = [drbd_sda_dev, drbd_sdb_dev]
2683
  else:
2684
    raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
2685
  return disks
2686

    
2687

    
2688
def _GetInstanceInfoText(instance):
2689
  """Compute that text that should be added to the disk's metadata.
2690

2691
  """
2692
  return "originstname+%s" % instance.name
2693

    
2694

    
2695
def _CreateDisks(cfg, instance):
2696
  """Create all disks for an instance.
2697

2698
  This abstracts away some work from AddInstance.
2699

2700
  Args:
2701
    instance: the instance object
2702

2703
  Returns:
2704
    True or False showing the success of the creation process
2705

2706
  """
2707
  info = _GetInstanceInfoText(instance)
2708

    
2709
  for device in instance.disks:
2710
    logger.Info("creating volume %s for instance %s" %
2711
              (device.iv_name, instance.name))
2712
    #HARDCODE
2713
    for secondary_node in instance.secondary_nodes:
2714
      if not _CreateBlockDevOnSecondary(cfg, secondary_node, instance,
2715
                                        device, False, info):
2716
        logger.Error("failed to create volume %s (%s) on secondary node %s!" %
2717
                     (device.iv_name, device, secondary_node))
2718
        return False
2719
    #HARDCODE
2720
    if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
2721
                                    instance, device, info):
2722
      logger.Error("failed to create volume %s on primary!" %
2723
                   device.iv_name)
2724
      return False
2725
  return True
2726

    
2727

    
2728
def _RemoveDisks(instance, cfg):
2729
  """Remove all disks for an instance.
2730

2731
  This abstracts away some work from `AddInstance()` and
2732
  `RemoveInstance()`. Note that in case some of the devices couldn't
2733
  be removed, the removal will continue with the other ones (compare
2734
  with `_CreateDisks()`).
2735

2736
  Args:
2737
    instance: the instance object
2738

2739
  Returns:
2740
    True or False showing the success of the removal proces
2741

2742
  """
2743
  logger.Info("removing block devices for instance %s" % instance.name)
2744

    
2745
  result = True
2746
  for device in instance.disks:
2747
    for node, disk in device.ComputeNodeTree(instance.primary_node):
2748
      cfg.SetDiskID(disk, node)
2749
      if not rpc.call_blockdev_remove(node, disk):
2750
        logger.Error("could not remove block device %s on node %s,"
2751
                     " continuing anyway" %
2752
                     (device.iv_name, node))
2753
        result = False
2754
  return result
2755

    
2756

    
2757
class LUCreateInstance(LogicalUnit):
2758
  """Create an instance.
2759

2760
  """
2761
  HPATH = "instance-add"
2762
  HTYPE = constants.HTYPE_INSTANCE
2763
  _OP_REQP = ["instance_name", "mem_size", "disk_size", "pnode",
2764
              "disk_template", "swap_size", "mode", "start", "vcpus",
2765
              "wait_for_sync", "ip_check"]
2766

    
2767
  def BuildHooksEnv(self):
2768
    """Build hooks env.
2769

2770
    This runs on master, primary and secondary nodes of the instance.
2771

2772
    """
2773
    env = {
2774
      "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
2775
      "INSTANCE_DISK_SIZE": self.op.disk_size,
2776
      "INSTANCE_SWAP_SIZE": self.op.swap_size,
2777
      "INSTANCE_ADD_MODE": self.op.mode,
2778
      }
2779
    if self.op.mode == constants.INSTANCE_IMPORT:
2780
      env["INSTANCE_SRC_NODE"] = self.op.src_node
2781
      env["INSTANCE_SRC_PATH"] = self.op.src_path
2782
      env["INSTANCE_SRC_IMAGE"] = self.src_image
2783

    
2784
    env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
2785
      primary_node=self.op.pnode,
2786
      secondary_nodes=self.secondaries,
2787
      status=self.instance_status,
2788
      os_type=self.op.os_type,
2789
      memory=self.op.mem_size,
2790
      vcpus=self.op.vcpus,
2791
      nics=[(self.inst_ip, self.op.bridge)],
2792
    ))
2793

    
2794
    nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
2795
          self.secondaries)
2796
    return env, nl, nl
2797

    
2798

    
2799
  def CheckPrereq(self):
2800
    """Check prerequisites.
2801

2802
    """
2803
    if self.op.mode not in (constants.INSTANCE_CREATE,
2804
                            constants.INSTANCE_IMPORT):
2805
      raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
2806
                                 self.op.mode)
2807

    
2808
    if self.op.mode == constants.INSTANCE_IMPORT:
2809
      src_node = getattr(self.op, "src_node", None)
2810
      src_path = getattr(self.op, "src_path", None)
2811
      if src_node is None or src_path is None:
2812
        raise errors.OpPrereqError("Importing an instance requires source"
2813
                                   " node and path options")
2814
      src_node_full = self.cfg.ExpandNodeName(src_node)
2815
      if src_node_full is None:
2816
        raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
2817
      self.op.src_node = src_node = src_node_full
2818

    
2819
      if not os.path.isabs(src_path):
2820
        raise errors.OpPrereqError("The source path must be absolute")
2821

    
2822
      export_info = rpc.call_export_info(src_node, src_path)
2823

    
2824
      if not export_info:
2825
        raise errors.OpPrereqError("No export found in dir %s" % src_path)
2826

    
2827
      if not export_info.has_section(constants.INISECT_EXP):
2828
        raise errors.ProgrammerError("Corrupted export config")
2829

    
2830
      ei_version = export_info.get(constants.INISECT_EXP, 'version')
2831
      if (int(ei_version) != constants.EXPORT_VERSION):
2832
        raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
2833
                                   (ei_version, constants.EXPORT_VERSION))
2834

    
2835
      if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
2836
        raise errors.OpPrereqError("Can't import instance with more than"
2837
                                   " one data disk")
2838

    
2839
      # FIXME: are the old os-es, disk sizes, etc. useful?
2840
      self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
2841
      diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
2842
                                                         'disk0_dump'))
2843
      self.src_image = diskimage
2844
    else: # INSTANCE_CREATE
2845
      if getattr(self.op, "os_type", None) is None:
2846
        raise errors.OpPrereqError("No guest OS specified")
2847

    
2848
    # check primary node
2849
    pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
2850
    if pnode is None:
2851
      raise errors.OpPrereqError("Primary node '%s' is unknown" %
2852
                                 self.op.pnode)
2853
    self.op.pnode = pnode.name
2854
    self.pnode = pnode
2855
    self.secondaries = []
2856
    # disk template and mirror node verification
2857
    if self.op.disk_template not in constants.DISK_TEMPLATES:
2858
      raise errors.OpPrereqError("Invalid disk template name")
2859

    
2860
    if self.op.disk_template in constants.DTS_NET_MIRROR:
2861
      if getattr(self.op, "snode", None) is None:
2862
        raise errors.OpPrereqError("The networked disk templates need"
2863
                                   " a mirror node")
2864

    
2865
      snode_name = self.cfg.ExpandNodeName(self.op.snode)
2866
      if snode_name is None:
2867
        raise errors.OpPrereqError("Unknown secondary node '%s'" %
2868
                                   self.op.snode)
2869
      elif snode_name == pnode.name:
2870
        raise errors.OpPrereqError("The secondary node cannot be"
2871
                                   " the primary node.")
2872
      self.secondaries.append(snode_name)
2873

    
2874
    # Check lv size requirements
2875
    nodenames = [pnode.name] + self.secondaries
2876
    nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
2877

    
2878
    # Required free disk space as a function of disk and swap space
2879
    req_size_dict = {
2880
      constants.DT_DISKLESS: 0,
2881
      constants.DT_PLAIN: self.op.disk_size + self.op.swap_size,
2882
      constants.DT_LOCAL_RAID1: (self.op.disk_size + self.op.swap_size) * 2,
2883
      # 256 MB are added for drbd metadata, 128MB for each drbd device
2884
      constants.DT_REMOTE_RAID1: self.op.disk_size + self.op.swap_size + 256,
2885
      constants.DT_DRBD8: self.op.disk_size + self.op.swap_size + 256,
2886
    }
2887

    
2888
    if self.op.disk_template not in req_size_dict:
2889
      raise errors.ProgrammerError("Disk template '%s' size requirement"
2890
                                   " is unknown" %  self.op.disk_template)
2891

    
2892
    req_size = req_size_dict[self.op.disk_template]
2893

    
2894
    for node in nodenames:
2895
      info = nodeinfo.get(node, None)
2896
      if not info:
2897
        raise errors.OpPrereqError("Cannot get current information"
2898
                                   " from node '%s'" % nodeinfo)
2899
      if req_size > info['vg_free']:
2900
        raise errors.OpPrereqError("Not enough disk space on target node %s."
2901
                                   " %d MB available, %d MB required" %
2902
                                   (node, info['vg_free'], req_size))
2903

    
2904
    # os verification
2905
    os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2906
    if not os_obj:
2907
      raise errors.OpPrereqError("OS '%s' not in supported os list for"
2908
                                 " primary node"  % self.op.os_type)
2909

    
2910
    # instance verification
2911
    hostname1 = utils.HostInfo(self.op.instance_name)
2912

    
2913
    self.op.instance_name = instance_name = hostname1.name
2914
    instance_list = self.cfg.GetInstanceList()
2915
    if instance_name in instance_list:
2916
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2917
                                 instance_name)
2918

    
2919
    ip = getattr(self.op, "ip", None)
2920
    if ip is None or ip.lower() == "none":
2921
      inst_ip = None
2922
    elif ip.lower() == "auto":
2923
      inst_ip = hostname1.ip
2924
    else:
2925
      if not utils.IsValidIP(ip):
2926
        raise errors.OpPrereqError("given IP address '%s' doesn't look"
2927
                                   " like a valid IP" % ip)
2928
      inst_ip = ip
2929
    self.inst_ip = inst_ip
2930

    
2931
    if self.op.start and not self.op.ip_check:
2932
      raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
2933
                                 " adding an instance in start mode")
2934

    
2935
    if self.op.ip_check:
2936
      if utils.TcpPing(utils.HostInfo().name, hostname1.ip,
2937
                       constants.DEFAULT_NODED_PORT):
2938
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
2939
                                   (hostname1.ip, instance_name))
2940

    
2941
    # bridge verification
2942
    bridge = getattr(self.op, "bridge", None)
2943
    if bridge is None:
2944
      self.op.bridge = self.cfg.GetDefBridge()
2945
    else:
2946
      self.op.bridge = bridge
2947

    
2948
    if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
2949
      raise errors.OpPrereqError("target bridge '%s' does not exist on"
2950
                                 " destination node '%s'" %
2951
                                 (self.op.bridge, pnode.name))
2952

    
2953
    if self.op.start:
2954
      self.instance_status = 'up'
2955
    else:
2956
      self.instance_status = 'down'
2957

    
2958
  def Exec(self, feedback_fn):
2959
    """Create and add the instance to the cluster.
2960

2961
    """
2962
    instance = self.op.instance_name
2963
    pnode_name = self.pnode.name
2964

    
2965
    nic = objects.NIC(bridge=self.op.bridge, mac=self.cfg.GenerateMAC())
2966
    if self.inst_ip is not None:
2967
      nic.ip = self.inst_ip
2968

    
2969
    disks = _GenerateDiskTemplate(self.cfg,
2970
                                  self.op.disk_template,
2971
                                  instance, pnode_name,
2972
                                  self.secondaries, self.op.disk_size,
2973
                                  self.op.swap_size)
2974

    
2975
    iobj = objects.Instance(name=instance, os=self.op.os_type,
2976
                            primary_node=pnode_name,
2977
                            memory=self.op.mem_size,
2978
                            vcpus=self.op.vcpus,
2979
                            nics=[nic], disks=disks,
2980
                            disk_template=self.op.disk_template,
2981
                            status=self.instance_status,
2982
                            )
2983

    
2984
    feedback_fn("* creating instance disks...")
2985
    if not _CreateDisks(self.cfg, iobj):
2986
      _RemoveDisks(iobj, self.cfg)
2987
      raise errors.OpExecError("Device creation failed, reverting...")
2988

    
2989
    feedback_fn("adding instance %s to cluster config" % instance)
2990

    
2991
    self.cfg.AddInstance(iobj)
2992

    
2993
    if self.op.wait_for_sync:
2994
      disk_abort = not _WaitForSync(self.cfg, iobj, self.proc)
2995
    elif iobj.disk_template in constants.DTS_NET_MIRROR:
2996
      # make sure the disks are not degraded (still sync-ing is ok)
2997
      time.sleep(15)
2998
      feedback_fn("* checking mirrors status")
2999
      disk_abort = not _WaitForSync(self.cfg, iobj, self.proc, oneshot=True)
3000
    else:
3001
      disk_abort = False
3002

    
3003
    if disk_abort:
3004
      _RemoveDisks(iobj, self.cfg)
3005
      self.cfg.RemoveInstance(iobj.name)
3006
      raise errors.OpExecError("There are some degraded disks for"
3007
                               " this instance")
3008

    
3009
    feedback_fn("creating os for instance %s on node %s" %
3010
                (instance, pnode_name))
3011

    
3012
    if iobj.disk_template != constants.DT_DISKLESS:
3013
      if self.op.mode == constants.INSTANCE_CREATE:
3014
        feedback_fn("* running the instance OS create scripts...")
3015
        if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3016
          raise errors.OpExecError("could not add os for instance %s"
3017
                                   " on node %s" %
3018
                                   (instance, pnode_name))
3019

    
3020
      elif self.op.mode == constants.INSTANCE_IMPORT:
3021
        feedback_fn("* running the instance OS import scripts...")
3022
        src_node = self.op.src_node
3023
        src_image = self.src_image
3024
        if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3025
                                                src_node, src_image):
3026
          raise errors.OpExecError("Could not import os for instance"
3027
                                   " %s on node %s" %
3028
                                   (instance, pnode_name))
3029
      else:
3030
        # also checked in the prereq part
3031
        raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3032
                                     % self.op.mode)
3033

    
3034
    if self.op.start:
3035
      logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3036
      feedback_fn("* starting instance...")
3037
      if not rpc.call_instance_start(pnode_name, iobj, None):
3038
        raise errors.OpExecError("Could not start instance")
3039

    
3040

    
3041
class LUConnectConsole(NoHooksLU):
3042
  """Connect to an instance's console.
3043

3044
  This is somewhat special in that it returns the command line that
3045
  you need to run on the master node in order to connect to the
3046
  console.
3047

3048
  """
3049
  _OP_REQP = ["instance_name"]
3050

    
3051
  def CheckPrereq(self):
3052
    """Check prerequisites.
3053

3054
    This checks that the instance is in the cluster.
3055

3056
    """
3057
    instance = self.cfg.GetInstanceInfo(
3058
      self.cfg.ExpandInstanceName(self.op.instance_name))
3059
    if instance is None:
3060
      raise errors.OpPrereqError("Instance '%s' not known" %
3061
                                 self.op.instance_name)
3062
    self.instance = instance
3063

    
3064
  def Exec(self, feedback_fn):
3065
    """Connect to the console of an instance
3066

3067
    """
3068
    instance = self.instance
3069
    node = instance.primary_node
3070

    
3071
    node_insts = rpc.call_instance_list([node])[node]
3072
    if node_insts is False:
3073
      raise errors.OpExecError("Can't connect to node %s." % node)
3074

    
3075
    if instance.name not in node_insts:
3076
      raise errors.OpExecError("Instance %s is not running." % instance.name)
3077

    
3078
    logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3079

    
3080
    hyper = hypervisor.GetHypervisor()
3081
    console_cmd = hyper.GetShellCommandForConsole(instance.name)
3082
    # build ssh cmdline
3083
    argv = ["ssh", "-q", "-t"]
3084
    argv.extend(ssh.KNOWN_HOSTS_OPTS)
3085
    argv.extend(ssh.BATCH_MODE_OPTS)
3086
    argv.append(node)
3087
    argv.append(console_cmd)
3088
    return "ssh", argv
3089

    
3090

    
3091
class LUAddMDDRBDComponent(LogicalUnit):
3092
  """Adda new mirror member to an instance's disk.
3093

3094
  """
3095
  HPATH = "mirror-add"
3096
  HTYPE = constants.HTYPE_INSTANCE
3097
  _OP_REQP = ["instance_name", "remote_node", "disk_name"]
3098

    
3099
  def BuildHooksEnv(self):
3100
    """Build hooks env.
3101

3102
    This runs on the master, the primary and all the secondaries.
3103

3104
    """
3105
    env = {
3106
      "NEW_SECONDARY": self.op.remote_node,
3107
      "DISK_NAME": self.op.disk_name,
3108
      }
3109
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3110
    nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
3111
          self.op.remote_node,] + list(self.instance.secondary_nodes)
3112
    return env, nl, nl
3113

    
3114
  def CheckPrereq(self):
3115
    """Check prerequisites.
3116

3117
    This checks that the instance is in the cluster.
3118

3119
    """
3120
    instance = self.cfg.GetInstanceInfo(
3121
      self.cfg.ExpandInstanceName(self.op.instance_name))
3122
    if instance is None:
3123
      raise errors.OpPrereqError("Instance '%s' not known" %
3124
                                 self.op.instance_name)
3125
    self.instance = instance
3126

    
3127
    remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
3128
    if remote_node is None:
3129
      raise errors.OpPrereqError("Node '%s' not known" % self.op.remote_node)
3130
    self.remote_node = remote_node
3131

    
3132
    if remote_node == instance.primary_node:
3133
      raise errors.OpPrereqError("The specified node is the primary node of"
3134
                                 " the instance.")
3135

    
3136
    if instance.disk_template != constants.DT_REMOTE_RAID1:
3137
      raise errors.OpPrereqError("Instance's disk layout is not"
3138
                                 " remote_raid1.")
3139
    for disk in instance.disks:
3140
      if disk.iv_name == self.op.disk_name:
3141
        break
3142
    else:
3143
      raise errors.OpPrereqError("Can't find this device ('%s') in the"
3144
                                 " instance." % self.op.disk_name)
3145
    if len(disk.children) > 1:
3146
      raise errors.OpPrereqError("The device already has two slave"
3147
                                 " devices.\n"
3148
                                 "This would create a 3-disk raid1"
3149
                                 " which we don't allow.")
3150
    self.disk = disk
3151

    
3152
  def Exec(self, feedback_fn):
3153
    """Add the mirror component
3154

3155
    """
3156
    disk = self.disk
3157
    instance = self.instance
3158

    
3159
    remote_node = self.remote_node
3160
    lv_names = [".%s_%s" % (disk.iv_name, suf) for suf in ["data", "meta"]]
3161
    names = _GenerateUniqueNames(self.cfg, lv_names)
3162
    new_drbd = _GenerateMDDRBDBranch(self.cfg, instance.primary_node,
3163
                                     remote_node, disk.size, names)
3164

    
3165
    logger.Info("adding new mirror component on secondary")
3166
    #HARDCODE
3167
    if not _CreateBlockDevOnSecondary(self.cfg, remote_node, instance,
3168
                                      new_drbd, False,
3169
                                      _GetInstanceInfoText(instance)):
3170
      raise errors.OpExecError("Failed to create new component on secondary"
3171
                               " node %s" % remote_node)
3172

    
3173
    logger.Info("adding new mirror component on primary")
3174
    #HARDCODE
3175
    if not _CreateBlockDevOnPrimary(self.cfg, instance.primary_node,
3176
                                    instance, new_drbd,
3177
                                    _GetInstanceInfoText(instance)):
3178
      # remove secondary dev
3179
      self.cfg.SetDiskID(new_drbd, remote_node)
3180
      rpc.call_blockdev_remove(remote_node, new_drbd)
3181
      raise errors.OpExecError("Failed to create volume on primary")
3182

    
3183
    # the device exists now
3184
    # call the primary node to add the mirror to md
3185
    logger.Info("adding new mirror component to md")
3186
    if not rpc.call_blockdev_addchildren(instance.primary_node,
3187
                                         disk, [new_drbd]):
3188
      logger.Error("Can't add mirror compoment to md!")
3189
      self.cfg.SetDiskID(new_drbd, remote_node)
3190
      if not rpc.call_blockdev_remove(remote_node, new_drbd):
3191
        logger.Error("Can't rollback on secondary")
3192
      self.cfg.SetDiskID(new_drbd, instance.primary_node)
3193
      if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3194
        logger.Error("Can't rollback on primary")
3195
      raise errors.OpExecError("Can't add mirror component to md array")
3196

    
3197
    disk.children.append(new_drbd)
3198

    
3199
    self.cfg.AddInstance(instance)
3200

    
3201
    _WaitForSync(self.cfg, instance, self.proc)
3202

    
3203
    return 0
3204

    
3205

    
3206
class LURemoveMDDRBDComponent(LogicalUnit):
3207
  """Remove a component from a remote_raid1 disk.
3208

3209
  """
3210
  HPATH = "mirror-remove"
3211
  HTYPE = constants.HTYPE_INSTANCE
3212
  _OP_REQP = ["instance_name", "disk_name", "disk_id"]
3213

    
3214
  def BuildHooksEnv(self):
3215
    """Build hooks env.
3216

3217
    This runs on the master, the primary and all the secondaries.
3218

3219
    """
3220
    env = {
3221
      "DISK_NAME": self.op.disk_name,
3222
      "DISK_ID": self.op.disk_id,
3223
      "OLD_SECONDARY": self.old_secondary,
3224
      }
3225
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3226
    nl = [self.sstore.GetMasterNode(),
3227
          self.instance.primary_node] + list(self.instance.secondary_nodes)
3228
    return env, nl, nl
3229

    
3230
  def CheckPrereq(self):
3231
    """Check prerequisites.
3232

3233
    This checks that the instance is in the cluster.
3234

3235
    """
3236
    instance = self.cfg.GetInstanceInfo(
3237
      self.cfg.ExpandInstanceName(self.op.instance_name))
3238
    if instance is None:
3239
      raise errors.OpPrereqError("Instance '%s' not known" %
3240
                                 self.op.instance_name)
3241
    self.instance = instance
3242

    
3243
    if instance.disk_template != constants.DT_REMOTE_RAID1:
3244
      raise errors.OpPrereqError("Instance's disk layout is not"
3245
                                 " remote_raid1.")
3246
    for disk in instance.disks:
3247
      if disk.iv_name == self.op.disk_name:
3248
        break
3249
    else:
3250
      raise errors.OpPrereqError("Can't find this device ('%s') in the"
3251
                                 " instance." % self.op.disk_name)
3252
    for child in disk.children:
3253
      if (child.dev_type == constants.LD_DRBD7 and
3254
          child.logical_id[2] == self.op.disk_id):
3255
        break
3256
    else:
3257
      raise errors.OpPrereqError("Can't find the device with this port.")
3258

    
3259
    if len(disk.children) < 2:
3260
      raise errors.OpPrereqError("Cannot remove the last component from"
3261
                                 " a mirror.")
3262
    self.disk = disk
3263
    self.child = child
3264
    if self.child.logical_id[0] == instance.primary_node:
3265
      oid = 1
3266
    else:
3267
      oid = 0
3268
    self.old_secondary = self.child.logical_id[oid]
3269

    
3270
  def Exec(self, feedback_fn):
3271
    """Remove the mirror component
3272

3273
    """
3274
    instance = self.instance
3275
    disk = self.disk
3276
    child = self.child
3277
    logger.Info("remove mirror component")
3278
    self.cfg.SetDiskID(disk, instance.primary_node)
3279
    if not rpc.call_blockdev_removechildren(instance.primary_node,
3280
                                            disk, [child]):
3281
      raise errors.OpExecError("Can't remove child from mirror.")
3282

    
3283
    for node in child.logical_id[:2]:
3284
      self.cfg.SetDiskID(child, node)
3285
      if not rpc.call_blockdev_remove(node, child):
3286
        logger.Error("Warning: failed to remove device from node %s,"
3287
                     " continuing operation." % node)
3288

    
3289
    disk.children.remove(child)
3290
    self.cfg.AddInstance(instance)
3291

    
3292

    
3293
class LUReplaceDisks(LogicalUnit):
3294
  """Replace the disks of an instance.
3295

3296
  """
3297
  HPATH = "mirrors-replace"
3298
  HTYPE = constants.HTYPE_INSTANCE
3299
  _OP_REQP = ["instance_name", "mode", "disks"]
3300

    
3301
  def BuildHooksEnv(self):
3302
    """Build hooks env.
3303

3304
    This runs on the master, the primary and all the secondaries.
3305

3306
    """
3307
    env = {
3308
      "MODE": self.op.mode,
3309
      "NEW_SECONDARY": self.op.remote_node,
3310
      "OLD_SECONDARY": self.instance.secondary_nodes[0],
3311
      }
3312
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3313
    nl = [
3314
      self.sstore.GetMasterNode(),
3315
      self.instance.primary_node,
3316
      ]
3317
    if self.op.remote_node is not None:
3318
      nl.append(self.op.remote_node)
3319
    return env, nl, nl
3320

    
3321
  def CheckPrereq(self):
3322
    """Check prerequisites.
3323

3324
    This checks that the instance is in the cluster.
3325

3326
    """
3327
    instance = self.cfg.GetInstanceInfo(
3328
      self.cfg.ExpandInstanceName(self.op.instance_name))
3329
    if instance is None:
3330
      raise errors.OpPrereqError("Instance '%s' not known" %
3331
                                 self.op.instance_name)
3332
    self.instance = instance
3333
    self.op.instance_name = instance.name
3334

    
3335
    if instance.disk_template not in constants.DTS_NET_MIRROR:
3336
      raise errors.OpPrereqError("Instance's disk layout is not"
3337
                                 " network mirrored.")
3338

    
3339
    if len(instance.secondary_nodes) != 1:
3340
      raise errors.OpPrereqError("The instance has a strange layout,"
3341
                                 " expected one secondary but found %d" %
3342
                                 len(instance.secondary_nodes))
3343

    
3344
    self.sec_node = instance.secondary_nodes[0]
3345

    
3346
    remote_node = getattr(self.op, "remote_node", None)
3347
    if remote_node is not None:
3348
      remote_node = self.cfg.ExpandNodeName(remote_node)
3349
      if remote_node is None:
3350
        raise errors.OpPrereqError("Node '%s' not known" %
3351
                                   self.op.remote_node)
3352
      self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3353
    else:
3354
      self.remote_node_info = None
3355
    if remote_node == instance.primary_node:
3356
      raise errors.OpPrereqError("The specified node is the primary node of"
3357
                                 " the instance.")
3358
    elif remote_node == self.sec_node:
3359
      if self.op.mode == constants.REPLACE_DISK_SEC:
3360
        # this is for DRBD8, where we can't execute the same mode of
3361
        # replacement as for drbd7 (no different port allocated)
3362
        raise errors.OpPrereqError("Same secondary given, cannot execute"
3363
                                   " replacement")
3364
      # the user gave the current secondary, switch to
3365
      # 'no-replace-secondary' mode for drbd7
3366
      remote_node = None
3367
    if (instance.disk_template == constants.DT_REMOTE_RAID1 and
3368
        self.op.mode != constants.REPLACE_DISK_ALL):
3369
      raise errors.OpPrereqError("Template 'remote_raid1' only allows all"
3370
                                 " disks replacement, not individual ones")
3371
    if instance.disk_template == constants.DT_DRBD8:
3372
      if (self.op.mode == constants.REPLACE_DISK_ALL and
3373
          remote_node is not None):
3374
        # switch to replace secondary mode
3375
        self.op.mode = constants.REPLACE_DISK_SEC
3376

    
3377
      if self.op.mode == constants.REPLACE_DISK_ALL:
3378
        raise errors.OpPrereqError("Template 'drbd8' only allows primary or"
3379
                                   " secondary disk replacement, not"
3380
                                   " both at once")
3381
      elif self.op.mode == constants.REPLACE_DISK_PRI:
3382
        if remote_node is not None:
3383
          raise errors.OpPrereqError("Template 'drbd8' does not allow changing"
3384
                                     " the secondary while doing a primary"
3385
                                     " node disk replacement")
3386
        self.tgt_node = instance.primary_node
3387
        self.oth_node = instance.secondary_nodes[0]
3388
      elif self.op.mode == constants.REPLACE_DISK_SEC:
3389
        self.new_node = remote_node # this can be None, in which case
3390
                                    # we don't change the secondary
3391
        self.tgt_node = instance.secondary_nodes[0]
3392
        self.oth_node = instance.primary_node
3393
      else:
3394
        raise errors.ProgrammerError("Unhandled disk replace mode")
3395

    
3396
    for name in self.op.disks:
3397
      if instance.FindDisk(name) is None:
3398
        raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3399
                                   (name, instance.name))
3400
    self.op.remote_node = remote_node
3401

    
3402
  def _ExecRR1(self, feedback_fn):
3403
    """Replace the disks of an instance.
3404

3405
    """
3406
    instance = self.instance
3407
    iv_names = {}
3408
    # start of work
3409
    if self.op.remote_node is None:
3410
      remote_node = self.sec_node
3411
    else:
3412
      remote_node = self.op.remote_node
3413
    cfg = self.cfg
3414
    for dev in instance.disks:
3415
      size = dev.size
3416
      lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3417
      names = _GenerateUniqueNames(cfg, lv_names)
3418
      new_drbd = _GenerateMDDRBDBranch(cfg, instance.primary_node,
3419
                                       remote_node, size, names)
3420
      iv_names[dev.iv_name] = (dev, dev.children[0], new_drbd)
3421
      logger.Info("adding new mirror component on secondary for %s" %
3422
                  dev.iv_name)
3423
      #HARDCODE
3424
      if not _CreateBlockDevOnSecondary(cfg, remote_node, instance,
3425
                                        new_drbd, False,
3426
                                        _GetInstanceInfoText(instance)):
3427
        raise errors.OpExecError("Failed to create new component on"
3428
                                 " secondary node %s\n"
3429
                                 "Full abort, cleanup manually!" %
3430
                                 remote_node)
3431

    
3432
      logger.Info("adding new mirror component on primary")
3433
      #HARDCODE
3434
      if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
3435
                                      instance, new_drbd,
3436
                                      _GetInstanceInfoText(instance)):
3437
        # remove secondary dev
3438
        cfg.SetDiskID(new_drbd, remote_node)
3439
        rpc.call_blockdev_remove(remote_node, new_drbd)
3440
        raise errors.OpExecError("Failed to create volume on primary!\n"
3441
                                 "Full abort, cleanup manually!!")
3442

    
3443
      # the device exists now
3444
      # call the primary node to add the mirror to md
3445
      logger.Info("adding new mirror component to md")
3446
      if not rpc.call_blockdev_addchildren(instance.primary_node, dev,
3447
                                           [new_drbd]):
3448
        logger.Error("Can't add mirror compoment to md!")
3449
        cfg.SetDiskID(new_drbd, remote_node)
3450
        if not rpc.call_blockdev_remove(remote_node, new_drbd):
3451
          logger.Error("Can't rollback on secondary")
3452
        cfg.SetDiskID(new_drbd, instance.primary_node)
3453
        if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3454
          logger.Error("Can't rollback on primary")
3455
        raise errors.OpExecError("Full abort, cleanup manually!!")
3456

    
3457
      dev.children.append(new_drbd)
3458
      cfg.AddInstance(instance)
3459

    
3460
    # this can fail as the old devices are degraded and _WaitForSync
3461
    # does a combined result over all disks, so we don't check its
3462
    # return value
3463
    _WaitForSync(cfg, instance, self.proc, unlock=True)
3464

    
3465
    # so check manually all the devices
3466
    for name in iv_names:
3467
      dev, child, new_drbd = iv_names[name]
3468
      cfg.SetDiskID(dev, instance.primary_node)
3469
      is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3470
      if is_degr:
3471
        raise errors.OpExecError("MD device %s is degraded!" % name)
3472
      cfg.SetDiskID(new_drbd, instance.primary_node)
3473
      is_degr = rpc.call_blockdev_find(instance.primary_node, new_drbd)[5]
3474
      if is_degr:
3475
        raise errors.OpExecError("New drbd device %s is degraded!" % name)
3476

    
3477
    for name in iv_names:
3478
      dev, child, new_drbd = iv_names[name]
3479
      logger.Info("remove mirror %s component" % name)
3480
      cfg.SetDiskID(dev, instance.primary_node)
3481
      if not rpc.call_blockdev_removechildren(instance.primary_node,
3482
                                              dev, [child]):
3483
        logger.Error("Can't remove child from mirror, aborting"
3484
                     " *this device cleanup*.\nYou need to cleanup manually!!")
3485
        continue
3486

    
3487
      for node in child.logical_id[:2]:
3488
        logger.Info("remove child device on %s" % node)
3489
        cfg.SetDiskID(child, node)
3490
        if not rpc.call_blockdev_remove(node, child):
3491
          logger.Error("Warning: failed to remove device from node %s,"
3492
                       " continuing operation." % node)
3493

    
3494
      dev.children.remove(child)
3495

    
3496
      cfg.AddInstance(instance)
3497

    
3498
  def _ExecD8DiskOnly(self, feedback_fn):
3499
    """Replace a disk on the primary or secondary for dbrd8.
3500

3501
    The algorithm for replace is quite complicated:
3502
      - for each disk to be replaced:
3503
        - create new LVs on the target node with unique names
3504
        - detach old LVs from the drbd device
3505
        - rename old LVs to name_replaced.<time_t>
3506
        - rename new LVs to old LVs
3507
        - attach the new LVs (with the old names now) to the drbd device
3508
      - wait for sync across all devices
3509
      - for each modified disk:
3510
        - remove old LVs (which have the name name_replaces.<time_t>)
3511

3512
    Failures are not very well handled.
3513

3514
    """
3515
    steps_total = 6
3516
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3517
    instance = self.instance
3518
    iv_names = {}
3519
    vgname = self.cfg.GetVGName()
3520
    # start of work
3521
    cfg = self.cfg
3522
    tgt_node = self.tgt_node
3523
    oth_node = self.oth_node
3524

    
3525
    # Step: check device activation
3526
    self.proc.LogStep(1, steps_total, "check device existence")
3527
    info("checking volume groups")
3528
    my_vg = cfg.GetVGName()
3529
    results = rpc.call_vg_list([oth_node, tgt_node])
3530
    if not results:
3531
      raise errors.OpExecError("Can't list volume groups on the nodes")
3532
    for node in oth_node, tgt_node:
3533
      res = results.get(node, False)
3534
      if not res or my_vg not in res:
3535
        raise errors.OpExecError("Volume group '%s' not found on %s" %
3536
                                 (my_vg, node))
3537
    for dev in instance.disks:
3538
      if not dev.iv_name in self.op.disks:
3539
        continue
3540
      for node in tgt_node, oth_node:
3541
        info("checking %s on %s" % (dev.iv_name, node))
3542
        cfg.SetDiskID(dev, node)
3543
        if not rpc.call_blockdev_find(node, dev):
3544
          raise errors.OpExecError("Can't find device %s on node %s" %
3545
                                   (dev.iv_name, node))
3546

    
3547
    # Step: check other node consistency
3548
    self.proc.LogStep(2, steps_total, "check peer consistency")
3549
    for dev in instance.disks:
3550
      if not dev.iv_name in self.op.disks:
3551
        continue
3552
      info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3553
      if not _CheckDiskConsistency(self.cfg, dev, oth_node,
3554
                                   oth_node==instance.primary_node):
3555
        raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3556
                                 " to replace disks on this node (%s)" %
3557
                                 (oth_node, tgt_node))
3558

    
3559
    # Step: create new storage
3560
    self.proc.LogStep(3, steps_total, "allocate new storage")
3561
    for dev in instance.disks:
3562
      if not dev.iv_name in self.op.disks:
3563
        continue
3564
      size = dev.size
3565
      cfg.SetDiskID(dev, tgt_node)
3566
      lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3567
      names = _GenerateUniqueNames(cfg, lv_names)
3568
      lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3569
                             logical_id=(vgname, names[0]))
3570
      lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3571
                             logical_id=(vgname, names[1]))
3572
      new_lvs = [lv_data, lv_meta]
3573
      old_lvs = dev.children
3574
      iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3575
      info("creating new local storage on %s for %s" %
3576
           (tgt_node, dev.iv_name))
3577
      # since we *always* want to create this LV, we use the
3578
      # _Create...OnPrimary (which forces the creation), even if we
3579
      # are talking about the secondary node
3580
      for new_lv in new_lvs:
3581
        if not _CreateBlockDevOnPrimary(cfg, tgt_node, instance, new_lv,
3582
                                        _GetInstanceInfoText(instance)):
3583
          raise errors.OpExecError("Failed to create new LV named '%s' on"
3584
                                   " node '%s'" %
3585
                                   (new_lv.logical_id[1], tgt_node))
3586

    
3587
    # Step: for each lv, detach+rename*2+attach
3588
    self.proc.LogStep(4, steps_total, "change drbd configuration")
3589
    for dev, old_lvs, new_lvs in iv_names.itervalues():
3590
      info("detaching %s drbd from local storage" % dev.iv_name)
3591
      if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
3592
        raise errors.OpExecError("Can't detach drbd from local storage on node"
3593
                                 " %s for device %s" % (tgt_node, dev.iv_name))
3594
      #dev.children = []
3595
      #cfg.Update(instance)
3596

    
3597
      # ok, we created the new LVs, so now we know we have the needed
3598
      # storage; as such, we proceed on the target node to rename
3599
      # old_lv to _old, and new_lv to old_lv; note that we rename LVs
3600
      # using the assumption than logical_id == physical_id (which in
3601
      # turn is the unique_id on that node)
3602

    
3603
      # FIXME(iustin): use a better name for the replaced LVs
3604
      temp_suffix = int(time.time())
3605
      ren_fn = lambda d, suff: (d.physical_id[0],
3606
                                d.physical_id[1] + "_replaced-%s" % suff)
3607
      # build the rename list based on what LVs exist on the node
3608
      rlist = []
3609
      for to_ren in old_lvs:
3610
        find_res = rpc.call_blockdev_find(tgt_node, to_ren)
3611
        if find_res is not None: # device exists
3612
          rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
3613

    
3614
      info("renaming the old LVs on the target node")
3615
      if not rpc.call_blockdev_rename(tgt_node, rlist):
3616
        raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
3617
      # now we rename the new LVs to the old LVs
3618
      info("renaming the new LVs on the target node")
3619
      rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
3620
      if not rpc.call_blockdev_rename(tgt_node, rlist):
3621
        raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
3622

    
3623
      for old, new in zip(old_lvs, new_lvs):
3624
        new.logical_id = old.logical_id
3625
        cfg.SetDiskID(new, tgt_node)
3626

    
3627
      for disk in old_lvs:
3628
        disk.logical_id = ren_fn(disk, temp_suffix)
3629
        cfg.SetDiskID(disk, tgt_node)
3630

    
3631
      # now that the new lvs have the old name, we can add them to the device
3632
      info("adding new mirror component on %s" % tgt_node)
3633
      if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
3634
        for new_lv in new_lvs:
3635
          if not rpc.call_blockdev_remove(tgt_node, new_lv):
3636
            warning("Can't rollback device %s", "manually cleanup unused"
3637
                    " logical volumes")
3638
        raise errors.OpExecError("Can't add local storage to drbd")
3639

    
3640
      dev.children = new_lvs
3641
      cfg.Update(instance)
3642

    
3643
    # Step: wait for sync
3644

    
3645
    # this can fail as the old devices are degraded and _WaitForSync
3646
    # does a combined result over all disks, so we don't check its
3647
    # return value
3648
    self.proc.LogStep(5, steps_total, "sync devices")
3649
    _WaitForSync(cfg, instance, self.proc, unlock=True)
3650

    
3651
    # so check manually all the devices
3652
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3653
      cfg.SetDiskID(dev, instance.primary_node)
3654
      is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3655
      if is_degr:
3656
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
3657

    
3658
    # Step: remove old storage
3659
    self.proc.LogStep(6, steps_total, "removing old storage")
3660
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3661
      info("remove logical volumes for %s" % name)
3662
      for lv in old_lvs:
3663
        cfg.SetDiskID(lv, tgt_node)
3664
        if not rpc.call_blockdev_remove(tgt_node, lv):
3665
          warning("Can't remove old LV", "manually remove unused LVs")
3666
          continue
3667

    
3668
  def _ExecD8Secondary(self, feedback_fn):
3669
    """Replace the secondary node for drbd8.
3670

3671
    The algorithm for replace is quite complicated:
3672
      - for all disks of the instance:
3673
        - create new LVs on the new node with same names
3674
        - shutdown the drbd device on the old secondary
3675
        - disconnect the drbd network on the primary
3676
        - create the drbd device on the new secondary
3677
        - network attach the drbd on the primary, using an artifice:
3678
          the drbd code for Attach() will connect to the network if it
3679
          finds a device which is connected to the good local disks but
3680
          not network enabled
3681
      - wait for sync across all devices
3682
      - remove all disks from the old secondary
3683

3684
    Failures are not very well handled.
3685

3686
    """
3687
    steps_total = 6
3688
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3689
    instance = self.instance
3690
    iv_names = {}
3691
    vgname = self.cfg.GetVGName()
3692
    # start of work
3693
    cfg = self.cfg
3694
    old_node = self.tgt_node
3695
    new_node = self.new_node
3696
    pri_node = instance.primary_node
3697

    
3698
    # Step: check device activation
3699
    self.proc.LogStep(1, steps_total, "check device existence")
3700
    info("checking volume groups")
3701
    my_vg = cfg.GetVGName()
3702
    results = rpc.call_vg_list([pri_node, new_node])
3703
    if not results:
3704
      raise errors.OpExecError("Can't list volume groups on the nodes")
3705
    for node in pri_node, new_node:
3706
      res = results.get(node, False)
3707
      if not res or my_vg not in res:
3708
        raise errors.OpExecError("Volume group '%s' not found on %s" %
3709
                                 (my_vg, node))
3710
    for dev in instance.disks:
3711
      if not dev.iv_name in self.op.disks:
3712
        continue
3713
      info("checking %s on %s" % (dev.iv_name, pri_node))
3714
      cfg.SetDiskID(dev, pri_node)
3715
      if not rpc.call_blockdev_find(pri_node, dev):
3716
        raise errors.OpExecError("Can't find device %s on node %s" %
3717
                                 (dev.iv_name, pri_node))
3718

    
3719
    # Step: check other node consistency
3720
    self.proc.LogStep(2, steps_total, "check peer consistency")
3721
    for dev in instance.disks:
3722
      if not dev.iv_name in self.op.disks:
3723
        continue
3724
      info("checking %s consistency on %s" % (dev.iv_name, pri_node))
3725
      if not _CheckDiskConsistency(self.cfg, dev, pri_node, True, ldisk=True):
3726
        raise errors.OpExecError("Primary node (%s) has degraded storage,"
3727
                                 " unsafe to replace the secondary" %
3728
                                 pri_node)
3729

    
3730
    # Step: create new storage
3731
    self.proc.LogStep(3, steps_total, "allocate new storage")
3732
    for dev in instance.disks:
3733
      size = dev.size
3734
      info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
3735
      # since we *always* want to create this LV, we use the
3736
      # _Create...OnPrimary (which forces the creation), even if we
3737
      # are talking about the secondary node
3738
      for new_lv in dev.children:
3739
        if not _CreateBlockDevOnPrimary(cfg, new_node, instance, new_lv,
3740
                                        _GetInstanceInfoText(instance)):
3741
          raise errors.OpExecError("Failed to create new LV named '%s' on"
3742
                                   " node '%s'" %
3743
                                   (new_lv.logical_id[1], new_node))
3744

    
3745
      iv_names[dev.iv_name] = (dev, dev.children)
3746

    
3747
    self.proc.LogStep(4, steps_total, "changing drbd configuration")
3748
    for dev in instance.disks:
3749
      size = dev.size
3750
      info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
3751
      # create new devices on new_node
3752
      new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
3753
                              logical_id=(pri_node, new_node,
3754
                                          dev.logical_id[2]),
3755
                              children=dev.children)
3756
      if not _CreateBlockDevOnSecondary(cfg, new_node, instance,
3757
                                        new_drbd, False,
3758
                                      _GetInstanceInfoText(instance)):
3759
        raise errors.OpExecError("Failed to create new DRBD on"
3760
                                 " node '%s'" % new_node)
3761

    
3762
    for dev in instance.disks:
3763
      # we have new devices, shutdown the drbd on the old secondary
3764
      info("shutting down drbd for %s on old node" % dev.iv_name)
3765
      cfg.SetDiskID(dev, old_node)
3766
      if not rpc.call_blockdev_shutdown(old_node, dev):
3767
        warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
3768
                "Please cleanup this device manuall as soon as possible")
3769

    
3770
      # we have new storage, we 'rename' the network on the primary
3771
      info("switching primary drbd for %s to new secondary node" % dev.iv_name)
3772
      cfg.SetDiskID(dev, pri_node)
3773
      # rename to the ip of the new node
3774
      new_uid = list(dev.physical_id)
3775
      new_uid[2] = self.remote_node_info.secondary_ip
3776
      rlist = [(dev, tuple(new_uid))]
3777
      if not rpc.call_blockdev_rename(pri_node, rlist):
3778
        raise errors.OpExecError("Can't detach & re-attach drbd %s on node"
3779
                                 " %s from %s to %s" %
3780
                                 (dev.iv_name, pri_node, old_node, new_node))
3781
      dev.logical_id = (pri_node, new_node, dev.logical_id[2])
3782
      cfg.SetDiskID(dev, pri_node)
3783
      cfg.Update(instance)
3784

    
3785

    
3786
    # this can fail as the old devices are degraded and _WaitForSync
3787
    # does a combined result over all disks, so we don't check its
3788
    # return value
3789
    self.proc.LogStep(5, steps_total, "sync devices")
3790
    _WaitForSync(cfg, instance, self.proc, unlock=True)
3791

    
3792
    # so check manually all the devices
3793
    for name, (dev, old_lvs) in iv_names.iteritems():
3794
      cfg.SetDiskID(dev, pri_node)
3795
      is_degr = rpc.call_blockdev_find(pri_node, dev)[5]
3796
      if is_degr:
3797
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
3798

    
3799
    self.proc.LogStep(6, steps_total, "removing old storage")
3800
    for name, (dev, old_lvs) in iv_names.iteritems():
3801
      info("remove logical volumes for %s" % name)
3802
      for lv in old_lvs:
3803
        cfg.SetDiskID(lv, old_node)
3804
        if not rpc.call_blockdev_remove(old_node, lv):
3805
          warning("Can't remove LV on old secondary",
3806
                  "Cleanup stale volumes by hand")
3807

    
3808
  def Exec(self, feedback_fn):
3809
    """Execute disk replacement.
3810

3811
    This dispatches the disk replacement to the appropriate handler.
3812

3813
    """
3814
    instance = self.instance
3815
    if instance.disk_template == constants.DT_REMOTE_RAID1:
3816
      fn = self._ExecRR1
3817
    elif instance.disk_template == constants.DT_DRBD8:
3818
      if self.op.remote_node is None:
3819
        fn = self._ExecD8DiskOnly
3820
      else:
3821
        fn = self._ExecD8Secondary
3822
    else:
3823
      raise errors.ProgrammerError("Unhandled disk replacement case")
3824
    return fn(feedback_fn)
3825

    
3826

    
3827
class LUQueryInstanceData(NoHooksLU):
3828
  """Query runtime instance data.
3829

3830
  """
3831
  _OP_REQP = ["instances"]
3832

    
3833
  def CheckPrereq(self):
3834
    """Check prerequisites.
3835

3836
    This only checks the optional instance list against the existing names.
3837

3838
    """
3839
    if not isinstance(self.op.instances, list):
3840
      raise errors.OpPrereqError("Invalid argument type 'instances'")
3841
    if self.op.instances:
3842
      self.wanted_instances = []
3843
      names = self.op.instances
3844
      for name in names:
3845
        instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
3846
        if instance is None:
3847
          raise errors.OpPrereqError("No such instance name '%s'" % name)
3848
      self.wanted_instances.append(instance)
3849
    else:
3850
      self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
3851
                               in self.cfg.GetInstanceList()]
3852
    return
3853

    
3854

    
3855
  def _ComputeDiskStatus(self, instance, snode, dev):
3856
    """Compute block device status.
3857

3858
    """
3859
    self.cfg.SetDiskID(dev, instance.primary_node)
3860
    dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
3861
    if dev.dev_type in constants.LDS_DRBD:
3862
      # we change the snode then (otherwise we use the one passed in)
3863
      if dev.logical_id[0] == instance.primary_node:
3864
        snode = dev.logical_id[1]
3865
      else:
3866
        snode = dev.logical_id[0]
3867

    
3868
    if snode:
3869
      self.cfg.SetDiskID(dev, snode)
3870
      dev_sstatus = rpc.call_blockdev_find(snode, dev)
3871
    else:
3872
      dev_sstatus = None
3873

    
3874
    if dev.children:
3875
      dev_children = [self._ComputeDiskStatus(instance, snode, child)
3876
                      for child in dev.children]
3877
    else:
3878
      dev_children = []
3879

    
3880
    data = {
3881
      "iv_name": dev.iv_name,
3882
      "dev_type": dev.dev_type,
3883
      "logical_id": dev.logical_id,
3884
      "physical_id": dev.physical_id,
3885
      "pstatus": dev_pstatus,
3886
      "sstatus": dev_sstatus,
3887
      "children": dev_children,
3888
      }
3889

    
3890
    return data
3891

    
3892
  def Exec(self, feedback_fn):
3893
    """Gather and return data"""
3894
    result = {}
3895
    for instance in self.wanted_instances:
3896
      remote_info = rpc.call_instance_info(instance.primary_node,
3897
                                                instance.name)
3898
      if remote_info and "state" in remote_info:
3899
        remote_state = "up"
3900
      else:
3901
        remote_state = "down"
3902
      if instance.status == "down":
3903
        config_state = "down"
3904
      else:
3905
        config_state = "up"
3906

    
3907
      disks = [self._ComputeDiskStatus(instance, None, device)
3908
               for device in instance.disks]
3909

    
3910
      idict = {
3911
        "name": instance.name,
3912
        "config_state": config_state,
3913
        "run_state": remote_state,
3914
        "pnode": instance.primary_node,
3915
        "snodes": instance.secondary_nodes,
3916
        "os": instance.os,
3917
        "memory": instance.memory,
3918
        "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
3919
        "disks": disks,
3920
        "vcpus": instance.vcpus,
3921
        }
3922

    
3923
      result[instance.name] = idict
3924

    
3925
    return result
3926

    
3927

    
3928
class LUSetInstanceParms(LogicalUnit):
3929
  """Modifies an instances's parameters.
3930

3931
  """
3932
  HPATH = "instance-modify"
3933
  HTYPE = constants.HTYPE_INSTANCE
3934
  _OP_REQP = ["instance_name"]
3935

    
3936
  def BuildHooksEnv(self):
3937
    """Build hooks env.
3938

3939
    This runs on the master, primary and secondaries.
3940

3941
    """
3942
    args = dict()
3943
    if self.mem:
3944
      args['memory'] = self.mem
3945
    if self.vcpus:
3946
      args['vcpus'] = self.vcpus
3947
    if self.do_ip or self.do_bridge:
3948
      if self.do_ip:
3949
        ip = self.ip
3950
      else:
3951
        ip = self.instance.nics[0].ip
3952
      if self.bridge:
3953
        bridge = self.bridge
3954
      else:
3955
        bridge = self.instance.nics[0].bridge
3956
      args['nics'] = [(ip, bridge)]
3957
    env = _BuildInstanceHookEnvByObject(self.instance, override=args)
3958
    nl = [self.sstore.GetMasterNode(),
3959
          self.instance.primary_node] + list(self.instance.secondary_nodes)
3960
    return env, nl, nl
3961

    
3962
  def CheckPrereq(self):
3963
    """Check prerequisites.
3964

3965
    This only checks the instance list against the existing names.
3966

3967
    """
3968
    self.mem = getattr(self.op, "mem", None)
3969
    self.vcpus = getattr(self.op, "vcpus", None)
3970
    self.ip = getattr(self.op, "ip", None)
3971
    self.bridge = getattr(self.op, "bridge", None)
3972
    if [self.mem, self.vcpus, self.ip, self.bridge].count(None) == 4:
3973
      raise errors.OpPrereqError("No changes submitted")
3974
    if self.mem is not None:
3975
      try:
3976
        self.mem = int(self.mem)
3977
      except ValueError, err:
3978
        raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
3979
    if self.vcpus is not None:
3980
      try:
3981
        self.vcpus = int(self.vcpus)
3982
      except ValueError, err:
3983
        raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
3984
    if self.ip is not None:
3985
      self.do_ip = True
3986
      if self.ip.lower() == "none":
3987
        self.ip = None
3988
      else:
3989
        if not utils.IsValidIP(self.ip):
3990
          raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
3991
    else:
3992
      self.do_ip = False
3993
    self.do_bridge = (self.bridge is not None)
3994

    
3995
    instance = self.cfg.GetInstanceInfo(
3996
      self.cfg.ExpandInstanceName(self.op.instance_name))
3997
    if instance is None:
3998
      raise errors.OpPrereqError("No such instance name '%s'" %
3999
                                 self.op.instance_name)
4000
    self.op.instance_name = instance.name
4001
    self.instance = instance
4002
    return
4003

    
4004
  def Exec(self, feedback_fn):
4005
    """Modifies an instance.
4006

4007
    All parameters take effect only at the next restart of the instance.
4008
    """
4009
    result = []
4010
    instance = self.instance
4011
    if self.mem:
4012
      instance.memory = self.mem
4013
      result.append(("mem", self.mem))
4014
    if self.vcpus:
4015
      instance.vcpus = self.vcpus
4016
      result.append(("vcpus",  self.vcpus))
4017
    if self.do_ip:
4018
      instance.nics[0].ip = self.ip
4019
      result.append(("ip", self.ip))
4020
    if self.bridge:
4021
      instance.nics[0].bridge = self.bridge
4022
      result.append(("bridge", self.bridge))
4023

    
4024
    self.cfg.AddInstance(instance)
4025

    
4026
    return result
4027

    
4028

    
4029
class LUQueryExports(NoHooksLU):
4030
  """Query the exports list
4031

4032
  """
4033
  _OP_REQP = []
4034

    
4035
  def CheckPrereq(self):
4036
    """Check that the nodelist contains only existing nodes.
4037

4038
    """
4039
    self.nodes = _GetWantedNodes(self, getattr(self.op, "nodes", None))
4040

    
4041
  def Exec(self, feedback_fn):
4042
    """Compute the list of all the exported system images.
4043

4044
    Returns:
4045
      a dictionary with the structure node->(export-list)
4046
      where export-list is a list of the instances exported on
4047
      that node.
4048

4049
    """
4050
    return rpc.call_export_list(self.nodes)
4051

    
4052

    
4053
class LUExportInstance(LogicalUnit):
4054
  """Export an instance to an image in the cluster.
4055

4056
  """
4057
  HPATH = "instance-export"
4058
  HTYPE = constants.HTYPE_INSTANCE
4059
  _OP_REQP = ["instance_name", "target_node", "shutdown"]
4060

    
4061
  def BuildHooksEnv(self):
4062
    """Build hooks env.
4063

4064
    This will run on the master, primary node and target node.
4065

4066
    """
4067
    env = {
4068
      "EXPORT_NODE": self.op.target_node,
4069
      "EXPORT_DO_SHUTDOWN": self.op.shutdown,
4070
      }
4071
    env.update(_BuildInstanceHookEnvByObject(self.instance))
4072
    nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
4073
          self.op.target_node]
4074
    return env, nl, nl
4075

    
4076
  def CheckPrereq(self):
4077
    """Check prerequisites.
4078

4079
    This checks that the instance name is a valid one.
4080

4081
    """
4082
    instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4083
    self.instance = self.cfg.GetInstanceInfo(instance_name)
4084
    if self.instance is None:
4085
      raise errors.OpPrereqError("Instance '%s' not found" %
4086
                                 self.op.instance_name)
4087

    
4088
    # node verification
4089
    dst_node_short = self.cfg.ExpandNodeName(self.op.target_node)
4090
    self.dst_node = self.cfg.GetNodeInfo(dst_node_short)
4091

    
4092
    if self.dst_node is None:
4093
      raise errors.OpPrereqError("Destination node '%s' is unknown." %
4094
                                 self.op.target_node)
4095
    self.op.target_node = self.dst_node.name
4096

    
4097
  def Exec(self, feedback_fn):
4098
    """Export an instance to an image in the cluster.
4099

4100
    """
4101
    instance = self.instance
4102
    dst_node = self.dst_node
4103
    src_node = instance.primary_node
4104
    # shutdown the instance, unless requested not to do so
4105
    if self.op.shutdown:
4106
      op = opcodes.OpShutdownInstance(instance_name=instance.name)
4107
      self.proc.ChainOpCode(op)
4108

    
4109
    vgname = self.cfg.GetVGName()
4110

    
4111
    snap_disks = []
4112

    
4113
    try:
4114
      for disk in instance.disks:
4115
        if disk.iv_name == "sda":
4116
          # new_dev_name will be a snapshot of an lvm leaf of the one we passed
4117
          new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
4118

    
4119
          if not new_dev_name:
4120
            logger.Error("could not snapshot block device %s on node %s" %
4121
                         (disk.logical_id[1], src_node))
4122
          else:
4123
            new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
4124
                                      logical_id=(vgname, new_dev_name),
4125
                                      physical_id=(vgname, new_dev_name),
4126
                                      iv_name=disk.iv_name)
4127
            snap_disks.append(new_dev)
4128

    
4129
    finally:
4130
      if self.op.shutdown:
4131
        op = opcodes.OpStartupInstance(instance_name=instance.name,
4132
                                       force=False)
4133
        self.proc.ChainOpCode(op)
4134

    
4135
    # TODO: check for size
4136

    
4137
    for dev in snap_disks:
4138
      if not rpc.call_snapshot_export(src_node, dev, dst_node.name,
4139
                                           instance):
4140
        logger.Error("could not export block device %s from node"
4141
                     " %s to node %s" %
4142
                     (dev.logical_id[1], src_node, dst_node.name))
4143
      if not rpc.call_blockdev_remove(src_node, dev):
4144
        logger.Error("could not remove snapshot block device %s from"
4145
                     " node %s" % (dev.logical_id[1], src_node))
4146

    
4147
    if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
4148
      logger.Error("could not finalize export for instance %s on node %s" %
4149
                   (instance.name, dst_node.name))
4150

    
4151
    nodelist = self.cfg.GetNodeList()
4152
    nodelist.remove(dst_node.name)
4153

    
4154
    # on one-node clusters nodelist will be empty after the removal
4155
    # if we proceed the backup would be removed because OpQueryExports
4156
    # substitutes an empty list with the full cluster node list.
4157
    if nodelist:
4158
      op = opcodes.OpQueryExports(nodes=nodelist)
4159
      exportlist = self.proc.ChainOpCode(op)
4160
      for node in exportlist:
4161
        if instance.name in exportlist[node]:
4162
          if not rpc.call_export_remove(node, instance.name):
4163
            logger.Error("could not remove older export for instance %s"
4164
                         " on node %s" % (instance.name, node))
4165

    
4166

    
4167
class TagsLU(NoHooksLU):
4168
  """Generic tags LU.
4169

4170
  This is an abstract class which is the parent of all the other tags LUs.
4171

4172
  """
4173
  def CheckPrereq(self):
4174
    """Check prerequisites.
4175

4176
    """
4177
    if self.op.kind == constants.TAG_CLUSTER:
4178
      self.target = self.cfg.GetClusterInfo()
4179
    elif self.op.kind == constants.TAG_NODE:
4180
      name = self.cfg.ExpandNodeName(self.op.name)
4181
      if name is None:
4182
        raise errors.OpPrereqError("Invalid node name (%s)" %
4183
                                   (self.op.name,))
4184
      self.op.name = name
4185
      self.target = self.cfg.GetNodeInfo(name)
4186
    elif self.op.kind == constants.TAG_INSTANCE:
4187
      name = self.cfg.ExpandInstanceName(self.op.name)
4188
      if name is None:
4189
        raise errors.OpPrereqError("Invalid instance name (%s)" %
4190
                                   (self.op.name,))
4191
      self.op.name = name
4192
      self.target = self.cfg.GetInstanceInfo(name)
4193
    else:
4194
      raise errors.OpPrereqError("Wrong tag type requested (%s)" %
4195
                                 str(self.op.kind))
4196

    
4197

    
4198
class LUGetTags(TagsLU):
4199
  """Returns the tags of a given object.
4200

4201
  """
4202
  _OP_REQP = ["kind", "name"]
4203

    
4204
  def Exec(self, feedback_fn):
4205
    """Returns the tag list.
4206

4207
    """
4208
    return self.target.GetTags()
4209

    
4210

    
4211
class LUSearchTags(NoHooksLU):
4212
  """Searches the tags for a given pattern.
4213

4214
  """
4215
  _OP_REQP = ["pattern"]
4216

    
4217
  def CheckPrereq(self):
4218
    """Check prerequisites.
4219

4220
    This checks the pattern passed for validity by compiling it.
4221

4222
    """
4223
    try:
4224
      self.re = re.compile(self.op.pattern)
4225
    except re.error, err:
4226
      raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
4227
                                 (self.op.pattern, err))
4228

    
4229
  def Exec(self, feedback_fn):
4230
    """Returns the tag list.
4231

4232
    """
4233
    cfg = self.cfg
4234
    tgts = [("/cluster", cfg.GetClusterInfo())]
4235
    ilist = [cfg.GetInstanceInfo(name) for name in cfg.GetInstanceList()]
4236
    tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
4237
    nlist = [cfg.GetNodeInfo(name) for name in cfg.GetNodeList()]
4238
    tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
4239
    results = []
4240
    for path, target in tgts:
4241
      for tag in target.GetTags():
4242
        if self.re.search(tag):
4243
          results.append((path, tag))
4244
    return results
4245

    
4246

    
4247
class LUAddTags(TagsLU):
4248
  """Sets a tag on a given object.
4249

4250
  """
4251
  _OP_REQP = ["kind", "name", "tags"]
4252

    
4253
  def CheckPrereq(self):
4254
    """Check prerequisites.
4255

4256
    This checks the type and length of the tag name and value.
4257

4258
    """
4259
    TagsLU.CheckPrereq(self)
4260
    for tag in self.op.tags:
4261
      objects.TaggableObject.ValidateTag(tag)
4262

    
4263
  def Exec(self, feedback_fn):
4264
    """Sets the tag.
4265

4266
    """
4267
    try:
4268
      for tag in self.op.tags:
4269
        self.target.AddTag(tag)
4270
    except errors.TagError, err:
4271
      raise errors.OpExecError("Error while setting tag: %s" % str(err))
4272
    try:
4273
      self.cfg.Update(self.target)
4274
    except errors.ConfigurationError:
4275
      raise errors.OpRetryError("There has been a modification to the"
4276
                                " config file and the operation has been"
4277
                                " aborted. Please retry.")
4278

    
4279

    
4280
class LUDelTags(TagsLU):
4281
  """Delete a list of tags from a given object.
4282

4283
  """
4284
  _OP_REQP = ["kind", "name", "tags"]
4285

    
4286
  def CheckPrereq(self):
4287
    """Check prerequisites.
4288

4289
    This checks that we have the given tag.
4290

4291
    """
4292
    TagsLU.CheckPrereq(self)
4293
    for tag in self.op.tags:
4294
      objects.TaggableObject.ValidateTag(tag)
4295
    del_tags = frozenset(self.op.tags)
4296
    cur_tags = self.target.GetTags()
4297
    if not del_tags <= cur_tags:
4298
      diff_tags = del_tags - cur_tags
4299
      diff_names = ["'%s'" % tag for tag in diff_tags]
4300
      diff_names.sort()
4301
      raise errors.OpPrereqError("Tag(s) %s not found" %
4302
                                 (",".join(diff_names)))
4303

    
4304
  def Exec(self, feedback_fn):
4305
    """Remove the tag from the object.
4306

4307
    """
4308
    for tag in self.op.tags:
4309
      self.target.RemoveTag(tag)
4310
    try:
4311
      self.cfg.Update(self.target)
4312
    except errors.ConfigurationError:
4313
      raise errors.OpRetryError("There has been a modification to the"
4314
                                " config file and the operation has been"
4315
                                " aborted. Please retry.")