Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ 9440aeab

History | View | Annotate | Download (143.1 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0613,W0201
25

    
26
import os
27
import os.path
28
import sha
29
import time
30
import tempfile
31
import re
32
import platform
33

    
34
from ganeti import rpc
35
from ganeti import ssh
36
from ganeti import logger
37
from ganeti import utils
38
from ganeti import errors
39
from ganeti import hypervisor
40
from ganeti import config
41
from ganeti import constants
42
from ganeti import objects
43
from ganeti import opcodes
44
from ganeti import ssconf
45

    
46
class LogicalUnit(object):
47
  """Logical Unit base class.
48

49
  Subclasses must follow these rules:
50
    - implement CheckPrereq which also fills in the opcode instance
51
      with all the fields (even if as None)
52
    - implement Exec
53
    - implement BuildHooksEnv
54
    - redefine HPATH and HTYPE
55
    - optionally redefine their run requirements (REQ_CLUSTER,
56
      REQ_MASTER); note that all commands require root permissions
57

58
  """
59
  HPATH = None
60
  HTYPE = None
61
  _OP_REQP = []
62
  REQ_CLUSTER = True
63
  REQ_MASTER = True
64

    
65
  def __init__(self, processor, op, cfg, sstore):
66
    """Constructor for LogicalUnit.
67

68
    This needs to be overriden in derived classes in order to check op
69
    validity.
70

71
    """
72
    self.proc = processor
73
    self.op = op
74
    self.cfg = cfg
75
    self.sstore = sstore
76
    for attr_name in self._OP_REQP:
77
      attr_val = getattr(op, attr_name, None)
78
      if attr_val is None:
79
        raise errors.OpPrereqError("Required parameter '%s' missing" %
80
                                   attr_name)
81
    if self.REQ_CLUSTER:
82
      if not cfg.IsCluster():
83
        raise errors.OpPrereqError("Cluster not initialized yet,"
84
                                   " use 'gnt-cluster init' first.")
85
      if self.REQ_MASTER:
86
        master = sstore.GetMasterNode()
87
        if master != utils.HostInfo().name:
88
          raise errors.OpPrereqError("Commands must be run on the master"
89
                                     " node %s" % master)
90

    
91
  def CheckPrereq(self):
92
    """Check prerequisites for this LU.
93

94
    This method should check that the prerequisites for the execution
95
    of this LU are fulfilled. It can do internode communication, but
96
    it should be idempotent - no cluster or system changes are
97
    allowed.
98

99
    The method should raise errors.OpPrereqError in case something is
100
    not fulfilled. Its return value is ignored.
101

102
    This method should also update all the parameters of the opcode to
103
    their canonical form; e.g. a short node name must be fully
104
    expanded after this method has successfully completed (so that
105
    hooks, logging, etc. work correctly).
106

107
    """
108
    raise NotImplementedError
109

    
110
  def Exec(self, feedback_fn):
111
    """Execute the LU.
112

113
    This method should implement the actual work. It should raise
114
    errors.OpExecError for failures that are somewhat dealt with in
115
    code, or expected.
116

117
    """
118
    raise NotImplementedError
119

    
120
  def BuildHooksEnv(self):
121
    """Build hooks environment for this LU.
122

123
    This method should return a three-node tuple consisting of: a dict
124
    containing the environment that will be used for running the
125
    specific hook for this LU, a list of node names on which the hook
126
    should run before the execution, and a list of node names on which
127
    the hook should run after the execution.
128

129
    The keys of the dict must not have 'GANETI_' prefixed as this will
130
    be handled in the hooks runner. Also note additional keys will be
131
    added by the hooks runner. If the LU doesn't define any
132
    environment, an empty dict (and not None) should be returned.
133

134
    As for the node lists, the master should not be included in the
135
    them, as it will be added by the hooks runner in case this LU
136
    requires a cluster to run on (otherwise we don't have a node
137
    list). No nodes should be returned as an empty list (and not
138
    None).
139

140
    Note that if the HPATH for a LU class is None, this function will
141
    not be called.
142

143
    """
144
    raise NotImplementedError
145

    
146

    
147
class NoHooksLU(LogicalUnit):
148
  """Simple LU which runs no hooks.
149

150
  This LU is intended as a parent for other LogicalUnits which will
151
  run no hooks, in order to reduce duplicate code.
152

153
  """
154
  HPATH = None
155
  HTYPE = None
156

    
157
  def BuildHooksEnv(self):
158
    """Build hooks env.
159

160
    This is a no-op, since we don't run hooks.
161

162
    """
163
    return {}, [], []
164

    
165

    
166
def _AddHostToEtcHosts(hostname):
167
  """Wrapper around utils.SetEtcHostsEntry.
168

169
  """
170
  hi = utils.HostInfo(name=hostname)
171
  utils.SetEtcHostsEntry(constants.ETC_HOSTS, hi.ip, hi.name, [hi.ShortName()])
172

    
173

    
174
def _RemoveHostFromEtcHosts(hostname):
175
  """Wrapper around utils.RemoveEtcHostsEntry.
176

177
  """
178
  hi = utils.HostInfo(name=hostname)
179
  utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.name)
180
  utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.ShortName())
181

    
182

    
183
def _GetWantedNodes(lu, nodes):
184
  """Returns list of checked and expanded node names.
185

186
  Args:
187
    nodes: List of nodes (strings) or None for all
188

189
  """
190
  if not isinstance(nodes, list):
191
    raise errors.OpPrereqError("Invalid argument type 'nodes'")
192

    
193
  if nodes:
194
    wanted = []
195

    
196
    for name in nodes:
197
      node = lu.cfg.ExpandNodeName(name)
198
      if node is None:
199
        raise errors.OpPrereqError("No such node name '%s'" % name)
200
      wanted.append(node)
201

    
202
  else:
203
    wanted = lu.cfg.GetNodeList()
204
  return utils.NiceSort(wanted)
205

    
206

    
207
def _GetWantedInstances(lu, instances):
208
  """Returns list of checked and expanded instance names.
209

210
  Args:
211
    instances: List of instances (strings) or None for all
212

213
  """
214
  if not isinstance(instances, list):
215
    raise errors.OpPrereqError("Invalid argument type 'instances'")
216

    
217
  if instances:
218
    wanted = []
219

    
220
    for name in instances:
221
      instance = lu.cfg.ExpandInstanceName(name)
222
      if instance is None:
223
        raise errors.OpPrereqError("No such instance name '%s'" % name)
224
      wanted.append(instance)
225

    
226
  else:
227
    wanted = lu.cfg.GetInstanceList()
228
  return utils.NiceSort(wanted)
229

    
230

    
231
def _CheckOutputFields(static, dynamic, selected):
232
  """Checks whether all selected fields are valid.
233

234
  Args:
235
    static: Static fields
236
    dynamic: Dynamic fields
237

238
  """
239
  static_fields = frozenset(static)
240
  dynamic_fields = frozenset(dynamic)
241

    
242
  all_fields = static_fields | dynamic_fields
243

    
244
  if not all_fields.issuperset(selected):
245
    raise errors.OpPrereqError("Unknown output fields selected: %s"
246
                               % ",".join(frozenset(selected).
247
                                          difference(all_fields)))
248

    
249

    
250
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
251
                          memory, vcpus, nics):
252
  """Builds instance related env variables for hooks from single variables.
253

254
  Args:
255
    secondary_nodes: List of secondary nodes as strings
256
  """
257
  env = {
258
    "OP_TARGET": name,
259
    "INSTANCE_NAME": name,
260
    "INSTANCE_PRIMARY": primary_node,
261
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
262
    "INSTANCE_OS_TYPE": os_type,
263
    "INSTANCE_STATUS": status,
264
    "INSTANCE_MEMORY": memory,
265
    "INSTANCE_VCPUS": vcpus,
266
  }
267

    
268
  if nics:
269
    nic_count = len(nics)
270
    for idx, (ip, bridge) in enumerate(nics):
271
      if ip is None:
272
        ip = ""
273
      env["INSTANCE_NIC%d_IP" % idx] = ip
274
      env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
275
  else:
276
    nic_count = 0
277

    
278
  env["INSTANCE_NIC_COUNT"] = nic_count
279

    
280
  return env
281

    
282

    
283
def _BuildInstanceHookEnvByObject(instance, override=None):
284
  """Builds instance related env variables for hooks from an object.
285

286
  Args:
287
    instance: objects.Instance object of instance
288
    override: dict of values to override
289
  """
290
  args = {
291
    'name': instance.name,
292
    'primary_node': instance.primary_node,
293
    'secondary_nodes': instance.secondary_nodes,
294
    'os_type': instance.os,
295
    'status': instance.os,
296
    'memory': instance.memory,
297
    'vcpus': instance.vcpus,
298
    'nics': [(nic.ip, nic.bridge) for nic in instance.nics],
299
  }
300
  if override:
301
    args.update(override)
302
  return _BuildInstanceHookEnv(**args)
303

    
304

    
305
def _UpdateKnownHosts(fullnode, ip, pubkey):
306
  """Ensure a node has a correct known_hosts entry.
307

308
  Args:
309
    fullnode - Fully qualified domain name of host. (str)
310
    ip       - IPv4 address of host (str)
311
    pubkey   - the public key of the cluster
312

313
  """
314
  if os.path.exists(constants.SSH_KNOWN_HOSTS_FILE):
315
    f = open(constants.SSH_KNOWN_HOSTS_FILE, 'r+')
316
  else:
317
    f = open(constants.SSH_KNOWN_HOSTS_FILE, 'w+')
318

    
319
  inthere = False
320

    
321
  save_lines = []
322
  add_lines = []
323
  removed = False
324

    
325
  for rawline in f:
326
    logger.Debug('read %s' % (repr(rawline),))
327

    
328
    parts = rawline.rstrip('\r\n').split()
329

    
330
    # Ignore unwanted lines
331
    if len(parts) >= 3 and not rawline.lstrip()[0] == '#':
332
      fields = parts[0].split(',')
333
      key = parts[2]
334

    
335
      haveall = True
336
      havesome = False
337
      for spec in [ ip, fullnode ]:
338
        if spec not in fields:
339
          haveall = False
340
        if spec in fields:
341
          havesome = True
342

    
343
      logger.Debug("key, pubkey = %s." % (repr((key, pubkey)),))
344
      if haveall and key == pubkey:
345
        inthere = True
346
        save_lines.append(rawline)
347
        logger.Debug("Keeping known_hosts '%s'." % (repr(rawline),))
348
        continue
349

    
350
      if havesome and (not haveall or key != pubkey):
351
        removed = True
352
        logger.Debug("Discarding known_hosts '%s'." % (repr(rawline),))
353
        continue
354

    
355
    save_lines.append(rawline)
356

    
357
  if not inthere:
358
    add_lines.append('%s,%s ssh-rsa %s\n' % (fullnode, ip, pubkey))
359
    logger.Debug("Adding known_hosts '%s'." % (repr(add_lines[-1]),))
360

    
361
  if removed:
362
    save_lines = save_lines + add_lines
363

    
364
    # Write a new file and replace old.
365
    fd, tmpname = tempfile.mkstemp('.tmp', 'known_hosts.',
366
                                   constants.DATA_DIR)
367
    newfile = os.fdopen(fd, 'w')
368
    try:
369
      newfile.write(''.join(save_lines))
370
    finally:
371
      newfile.close()
372
    logger.Debug("Wrote new known_hosts.")
373
    os.rename(tmpname, constants.SSH_KNOWN_HOSTS_FILE)
374

    
375
  elif add_lines:
376
    # Simply appending a new line will do the trick.
377
    f.seek(0, 2)
378
    for add in add_lines:
379
      f.write(add)
380

    
381
  f.close()
382

    
383

    
384
def _HasValidVG(vglist, vgname):
385
  """Checks if the volume group list is valid.
386

387
  A non-None return value means there's an error, and the return value
388
  is the error message.
389

390
  """
391
  vgsize = vglist.get(vgname, None)
392
  if vgsize is None:
393
    return "volume group '%s' missing" % vgname
394
  elif vgsize < 20480:
395
    return ("volume group '%s' too small (20480MiB required, %dMib found)" %
396
            (vgname, vgsize))
397
  return None
398

    
399

    
400
def _InitSSHSetup(node):
401
  """Setup the SSH configuration for the cluster.
402

403

404
  This generates a dsa keypair for root, adds the pub key to the
405
  permitted hosts and adds the hostkey to its own known hosts.
406

407
  Args:
408
    node: the name of this host as a fqdn
409

410
  """
411
  priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
412

    
413
  for name in priv_key, pub_key:
414
    if os.path.exists(name):
415
      utils.CreateBackup(name)
416
    utils.RemoveFile(name)
417

    
418
  result = utils.RunCmd(["ssh-keygen", "-t", "dsa",
419
                         "-f", priv_key,
420
                         "-q", "-N", ""])
421
  if result.failed:
422
    raise errors.OpExecError("Could not generate ssh keypair, error %s" %
423
                             result.output)
424

    
425
  f = open(pub_key, 'r')
426
  try:
427
    utils.AddAuthorizedKey(auth_keys, f.read(8192))
428
  finally:
429
    f.close()
430

    
431

    
432
def _InitGanetiServerSetup(ss):
433
  """Setup the necessary configuration for the initial node daemon.
434

435
  This creates the nodepass file containing the shared password for
436
  the cluster and also generates the SSL certificate.
437

438
  """
439
  # Create pseudo random password
440
  randpass = sha.new(os.urandom(64)).hexdigest()
441
  # and write it into sstore
442
  ss.SetKey(ss.SS_NODED_PASS, randpass)
443

    
444
  result = utils.RunCmd(["openssl", "req", "-new", "-newkey", "rsa:1024",
445
                         "-days", str(365*5), "-nodes", "-x509",
446
                         "-keyout", constants.SSL_CERT_FILE,
447
                         "-out", constants.SSL_CERT_FILE, "-batch"])
448
  if result.failed:
449
    raise errors.OpExecError("could not generate server ssl cert, command"
450
                             " %s had exitcode %s and error message %s" %
451
                             (result.cmd, result.exit_code, result.output))
452

    
453
  os.chmod(constants.SSL_CERT_FILE, 0400)
454

    
455
  result = utils.RunCmd([constants.NODE_INITD_SCRIPT, "restart"])
456

    
457
  if result.failed:
458
    raise errors.OpExecError("Could not start the node daemon, command %s"
459
                             " had exitcode %s and error %s" %
460
                             (result.cmd, result.exit_code, result.output))
461

    
462

    
463
def _CheckInstanceBridgesExist(instance):
464
  """Check that the brigdes needed by an instance exist.
465

466
  """
467
  # check bridges existance
468
  brlist = [nic.bridge for nic in instance.nics]
469
  if not rpc.call_bridges_exist(instance.primary_node, brlist):
470
    raise errors.OpPrereqError("one or more target bridges %s does not"
471
                               " exist on destination node '%s'" %
472
                               (brlist, instance.primary_node))
473

    
474

    
475
class LUInitCluster(LogicalUnit):
476
  """Initialise the cluster.
477

478
  """
479
  HPATH = "cluster-init"
480
  HTYPE = constants.HTYPE_CLUSTER
481
  _OP_REQP = ["cluster_name", "hypervisor_type", "vg_name", "mac_prefix",
482
              "def_bridge", "master_netdev"]
483
  REQ_CLUSTER = False
484

    
485
  def BuildHooksEnv(self):
486
    """Build hooks env.
487

488
    Notes: Since we don't require a cluster, we must manually add
489
    ourselves in the post-run node list.
490

491
    """
492
    env = {"OP_TARGET": self.op.cluster_name}
493
    return env, [], [self.hostname.name]
494

    
495
  def CheckPrereq(self):
496
    """Verify that the passed name is a valid one.
497

498
    """
499
    if config.ConfigWriter.IsCluster():
500
      raise errors.OpPrereqError("Cluster is already initialised")
501

    
502
    self.hostname = hostname = utils.HostInfo()
503

    
504
    if hostname.ip.startswith("127."):
505
      raise errors.OpPrereqError("This host's IP resolves to the private"
506
                                 " range (%s). Please fix DNS or /etc/hosts." %
507
                                 (hostname.ip,))
508

    
509
    self.clustername = clustername = utils.HostInfo(self.op.cluster_name)
510

    
511
    if not utils.TcpPing(constants.LOCALHOST_IP_ADDRESS, hostname.ip,
512
                         constants.DEFAULT_NODED_PORT):
513
      raise errors.OpPrereqError("Inconsistency: this host's name resolves"
514
                                 " to %s,\nbut this ip address does not"
515
                                 " belong to this host."
516
                                 " Aborting." % hostname.ip)
517

    
518
    secondary_ip = getattr(self.op, "secondary_ip", None)
519
    if secondary_ip and not utils.IsValidIP(secondary_ip):
520
      raise errors.OpPrereqError("Invalid secondary ip given")
521
    if (secondary_ip and
522
        secondary_ip != hostname.ip and
523
        (not utils.TcpPing(constants.LOCALHOST_IP_ADDRESS, secondary_ip,
524
                           constants.DEFAULT_NODED_PORT))):
525
      raise errors.OpPrereqError("You gave %s as secondary IP,\n"
526
                                 "but it does not belong to this host." %
527
                                 secondary_ip)
528
    self.secondary_ip = secondary_ip
529

    
530
    # checks presence of the volume group given
531
    vgstatus = _HasValidVG(utils.ListVolumeGroups(), self.op.vg_name)
532

    
533
    if vgstatus:
534
      raise errors.OpPrereqError("Error: %s" % vgstatus)
535

    
536
    if not re.match("^[0-9a-z]{2}:[0-9a-z]{2}:[0-9a-z]{2}$",
537
                    self.op.mac_prefix):
538
      raise errors.OpPrereqError("Invalid mac prefix given '%s'" %
539
                                 self.op.mac_prefix)
540

    
541
    if self.op.hypervisor_type not in hypervisor.VALID_HTYPES:
542
      raise errors.OpPrereqError("Invalid hypervisor type given '%s'" %
543
                                 self.op.hypervisor_type)
544

    
545
    result = utils.RunCmd(["ip", "link", "show", "dev", self.op.master_netdev])
546
    if result.failed:
547
      raise errors.OpPrereqError("Invalid master netdev given (%s): '%s'" %
548
                                 (self.op.master_netdev,
549
                                  result.output.strip()))
550

    
551
    if not (os.path.isfile(constants.NODE_INITD_SCRIPT) and
552
            os.access(constants.NODE_INITD_SCRIPT, os.X_OK)):
553
      raise errors.OpPrereqError("Init.d script '%s' missing or not "
554
                                 "executable." % constants.NODE_INITD_SCRIPT)
555

    
556
  def Exec(self, feedback_fn):
557
    """Initialize the cluster.
558

559
    """
560
    clustername = self.clustername
561
    hostname = self.hostname
562

    
563
    # set up the simple store
564
    self.sstore = ss = ssconf.SimpleStore()
565
    ss.SetKey(ss.SS_HYPERVISOR, self.op.hypervisor_type)
566
    ss.SetKey(ss.SS_MASTER_NODE, hostname.name)
567
    ss.SetKey(ss.SS_MASTER_IP, clustername.ip)
568
    ss.SetKey(ss.SS_MASTER_NETDEV, self.op.master_netdev)
569
    ss.SetKey(ss.SS_CLUSTER_NAME, clustername.name)
570

    
571
    # set up the inter-node password and certificate
572
    _InitGanetiServerSetup(ss)
573

    
574
    # start the master ip
575
    rpc.call_node_start_master(hostname.name)
576

    
577
    # set up ssh config and /etc/hosts
578
    f = open(constants.SSH_HOST_RSA_PUB, 'r')
579
    try:
580
      sshline = f.read()
581
    finally:
582
      f.close()
583
    sshkey = sshline.split(" ")[1]
584

    
585
    _AddHostToEtcHosts(hostname.name)
586

    
587
    _UpdateKnownHosts(hostname.name, hostname.ip, sshkey)
588

    
589
    _InitSSHSetup(hostname.name)
590

    
591
    # init of cluster config file
592
    self.cfg = cfgw = config.ConfigWriter()
593
    cfgw.InitConfig(hostname.name, hostname.ip, self.secondary_ip,
594
                    sshkey, self.op.mac_prefix,
595
                    self.op.vg_name, self.op.def_bridge)
596

    
597

    
598
class LUDestroyCluster(NoHooksLU):
599
  """Logical unit for destroying the cluster.
600

601
  """
602
  _OP_REQP = []
603

    
604
  def CheckPrereq(self):
605
    """Check prerequisites.
606

607
    This checks whether the cluster is empty.
608

609
    Any errors are signalled by raising errors.OpPrereqError.
610

611
    """
612
    master = self.sstore.GetMasterNode()
613

    
614
    nodelist = self.cfg.GetNodeList()
615
    if len(nodelist) != 1 or nodelist[0] != master:
616
      raise errors.OpPrereqError("There are still %d node(s) in"
617
                                 " this cluster." % (len(nodelist) - 1))
618
    instancelist = self.cfg.GetInstanceList()
619
    if instancelist:
620
      raise errors.OpPrereqError("There are still %d instance(s) in"
621
                                 " this cluster." % len(instancelist))
622

    
623
  def Exec(self, feedback_fn):
624
    """Destroys the cluster.
625

626
    """
627
    master = self.sstore.GetMasterNode()
628
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
629
    utils.CreateBackup(priv_key)
630
    utils.CreateBackup(pub_key)
631
    rpc.call_node_leave_cluster(master)
632
    _RemoveHostFromEtcHosts(master)
633

    
634

    
635
class LUVerifyCluster(NoHooksLU):
636
  """Verifies the cluster status.
637

638
  """
639
  _OP_REQP = []
640

    
641
  def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
642
                  remote_version, feedback_fn):
643
    """Run multiple tests against a node.
644

645
    Test list:
646
      - compares ganeti version
647
      - checks vg existance and size > 20G
648
      - checks config file checksum
649
      - checks ssh to other nodes
650

651
    Args:
652
      node: name of the node to check
653
      file_list: required list of files
654
      local_cksum: dictionary of local files and their checksums
655

656
    """
657
    # compares ganeti version
658
    local_version = constants.PROTOCOL_VERSION
659
    if not remote_version:
660
      feedback_fn(" - ERROR: connection to %s failed" % (node))
661
      return True
662

    
663
    if local_version != remote_version:
664
      feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
665
                      (local_version, node, remote_version))
666
      return True
667

    
668
    # checks vg existance and size > 20G
669

    
670
    bad = False
671
    if not vglist:
672
      feedback_fn("  - ERROR: unable to check volume groups on node %s." %
673
                      (node,))
674
      bad = True
675
    else:
676
      vgstatus = _HasValidVG(vglist, self.cfg.GetVGName())
677
      if vgstatus:
678
        feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
679
        bad = True
680

    
681
    # checks config file checksum
682
    # checks ssh to any
683

    
684
    if 'filelist' not in node_result:
685
      bad = True
686
      feedback_fn("  - ERROR: node hasn't returned file checksum data")
687
    else:
688
      remote_cksum = node_result['filelist']
689
      for file_name in file_list:
690
        if file_name not in remote_cksum:
691
          bad = True
692
          feedback_fn("  - ERROR: file '%s' missing" % file_name)
693
        elif remote_cksum[file_name] != local_cksum[file_name]:
694
          bad = True
695
          feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
696

    
697
    if 'nodelist' not in node_result:
698
      bad = True
699
      feedback_fn("  - ERROR: node hasn't returned node connectivity data")
700
    else:
701
      if node_result['nodelist']:
702
        bad = True
703
        for node in node_result['nodelist']:
704
          feedback_fn("  - ERROR: communication with node '%s': %s" %
705
                          (node, node_result['nodelist'][node]))
706
    hyp_result = node_result.get('hypervisor', None)
707
    if hyp_result is not None:
708
      feedback_fn("  - ERROR: hypervisor verify failure: '%s'" % hyp_result)
709
    return bad
710

    
711
  def _VerifyInstance(self, instance, node_vol_is, node_instance, feedback_fn):
712
    """Verify an instance.
713

714
    This function checks to see if the required block devices are
715
    available on the instance's node.
716

717
    """
718
    bad = False
719

    
720
    instancelist = self.cfg.GetInstanceList()
721
    if not instance in instancelist:
722
      feedback_fn("  - ERROR: instance %s not in instance list %s" %
723
                      (instance, instancelist))
724
      bad = True
725

    
726
    instanceconfig = self.cfg.GetInstanceInfo(instance)
727
    node_current = instanceconfig.primary_node
728

    
729
    node_vol_should = {}
730
    instanceconfig.MapLVsByNode(node_vol_should)
731

    
732
    for node in node_vol_should:
733
      for volume in node_vol_should[node]:
734
        if node not in node_vol_is or volume not in node_vol_is[node]:
735
          feedback_fn("  - ERROR: volume %s missing on node %s" %
736
                          (volume, node))
737
          bad = True
738

    
739
    if not instanceconfig.status == 'down':
740
      if not instance in node_instance[node_current]:
741
        feedback_fn("  - ERROR: instance %s not running on node %s" %
742
                        (instance, node_current))
743
        bad = True
744

    
745
    for node in node_instance:
746
      if (not node == node_current):
747
        if instance in node_instance[node]:
748
          feedback_fn("  - ERROR: instance %s should not run on node %s" %
749
                          (instance, node))
750
          bad = True
751

    
752
    return bad
753

    
754
  def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
755
    """Verify if there are any unknown volumes in the cluster.
756

757
    The .os, .swap and backup volumes are ignored. All other volumes are
758
    reported as unknown.
759

760
    """
761
    bad = False
762

    
763
    for node in node_vol_is:
764
      for volume in node_vol_is[node]:
765
        if node not in node_vol_should or volume not in node_vol_should[node]:
766
          feedback_fn("  - ERROR: volume %s on node %s should not exist" %
767
                      (volume, node))
768
          bad = True
769
    return bad
770

    
771
  def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
772
    """Verify the list of running instances.
773

774
    This checks what instances are running but unknown to the cluster.
775

776
    """
777
    bad = False
778
    for node in node_instance:
779
      for runninginstance in node_instance[node]:
780
        if runninginstance not in instancelist:
781
          feedback_fn("  - ERROR: instance %s on node %s should not exist" %
782
                          (runninginstance, node))
783
          bad = True
784
    return bad
785

    
786
  def CheckPrereq(self):
787
    """Check prerequisites.
788

789
    This has no prerequisites.
790

791
    """
792
    pass
793

    
794
  def Exec(self, feedback_fn):
795
    """Verify integrity of cluster, performing various test on nodes.
796

797
    """
798
    bad = False
799
    feedback_fn("* Verifying global settings")
800
    self.cfg.VerifyConfig()
801

    
802
    vg_name = self.cfg.GetVGName()
803
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
804
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
805
    node_volume = {}
806
    node_instance = {}
807

    
808
    # FIXME: verify OS list
809
    # do local checksums
810
    file_names = list(self.sstore.GetFileList())
811
    file_names.append(constants.SSL_CERT_FILE)
812
    file_names.append(constants.CLUSTER_CONF_FILE)
813
    local_checksums = utils.FingerprintFiles(file_names)
814

    
815
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
816
    all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
817
    all_instanceinfo = rpc.call_instance_list(nodelist)
818
    all_vglist = rpc.call_vg_list(nodelist)
819
    node_verify_param = {
820
      'filelist': file_names,
821
      'nodelist': nodelist,
822
      'hypervisor': None,
823
      }
824
    all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
825
    all_rversion = rpc.call_version(nodelist)
826

    
827
    for node in nodelist:
828
      feedback_fn("* Verifying node %s" % node)
829
      result = self._VerifyNode(node, file_names, local_checksums,
830
                                all_vglist[node], all_nvinfo[node],
831
                                all_rversion[node], feedback_fn)
832
      bad = bad or result
833

    
834
      # node_volume
835
      volumeinfo = all_volumeinfo[node]
836

    
837
      if type(volumeinfo) != dict:
838
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
839
        bad = True
840
        continue
841

    
842
      node_volume[node] = volumeinfo
843

    
844
      # node_instance
845
      nodeinstance = all_instanceinfo[node]
846
      if type(nodeinstance) != list:
847
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
848
        bad = True
849
        continue
850

    
851
      node_instance[node] = nodeinstance
852

    
853
    node_vol_should = {}
854

    
855
    for instance in instancelist:
856
      feedback_fn("* Verifying instance %s" % instance)
857
      result =  self._VerifyInstance(instance, node_volume, node_instance,
858
                                     feedback_fn)
859
      bad = bad or result
860

    
861
      inst_config = self.cfg.GetInstanceInfo(instance)
862

    
863
      inst_config.MapLVsByNode(node_vol_should)
864

    
865
    feedback_fn("* Verifying orphan volumes")
866
    result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
867
                                       feedback_fn)
868
    bad = bad or result
869

    
870
    feedback_fn("* Verifying remaining instances")
871
    result = self._VerifyOrphanInstances(instancelist, node_instance,
872
                                         feedback_fn)
873
    bad = bad or result
874

    
875
    return int(bad)
876

    
877

    
878
class LURenameCluster(LogicalUnit):
879
  """Rename the cluster.
880

881
  """
882
  HPATH = "cluster-rename"
883
  HTYPE = constants.HTYPE_CLUSTER
884
  _OP_REQP = ["name"]
885

    
886
  def BuildHooksEnv(self):
887
    """Build hooks env.
888

889
    """
890
    env = {
891
      "OP_TARGET": self.op.sstore.GetClusterName(),
892
      "NEW_NAME": self.op.name,
893
      }
894
    mn = self.sstore.GetMasterNode()
895
    return env, [mn], [mn]
896

    
897
  def CheckPrereq(self):
898
    """Verify that the passed name is a valid one.
899

900
    """
901
    hostname = utils.HostInfo(self.op.name)
902

    
903
    new_name = hostname.name
904
    self.ip = new_ip = hostname.ip
905
    old_name = self.sstore.GetClusterName()
906
    old_ip = self.sstore.GetMasterIP()
907
    if new_name == old_name and new_ip == old_ip:
908
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
909
                                 " cluster has changed")
910
    if new_ip != old_ip:
911
      result = utils.RunCmd(["fping", "-q", new_ip])
912
      if not result.failed:
913
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
914
                                   " reachable on the network. Aborting." %
915
                                   new_ip)
916

    
917
    self.op.name = new_name
918

    
919
  def Exec(self, feedback_fn):
920
    """Rename the cluster.
921

922
    """
923
    clustername = self.op.name
924
    ip = self.ip
925
    ss = self.sstore
926

    
927
    # shutdown the master IP
928
    master = ss.GetMasterNode()
929
    if not rpc.call_node_stop_master(master):
930
      raise errors.OpExecError("Could not disable the master role")
931

    
932
    try:
933
      # modify the sstore
934
      ss.SetKey(ss.SS_MASTER_IP, ip)
935
      ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
936

    
937
      # Distribute updated ss config to all nodes
938
      myself = self.cfg.GetNodeInfo(master)
939
      dist_nodes = self.cfg.GetNodeList()
940
      if myself.name in dist_nodes:
941
        dist_nodes.remove(myself.name)
942

    
943
      logger.Debug("Copying updated ssconf data to all nodes")
944
      for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
945
        fname = ss.KeyToFilename(keyname)
946
        result = rpc.call_upload_file(dist_nodes, fname)
947
        for to_node in dist_nodes:
948
          if not result[to_node]:
949
            logger.Error("copy of file %s to node %s failed" %
950
                         (fname, to_node))
951
    finally:
952
      if not rpc.call_node_start_master(master):
953
        logger.Error("Could not re-enable the master role on the master,\n"
954
                     "please restart manually.")
955

    
956

    
957
def _WaitForSync(cfgw, instance, proc, oneshot=False, unlock=False):
958
  """Sleep and poll for an instance's disk to sync.
959

960
  """
961
  if not instance.disks:
962
    return True
963

    
964
  if not oneshot:
965
    proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
966

    
967
  node = instance.primary_node
968

    
969
  for dev in instance.disks:
970
    cfgw.SetDiskID(dev, node)
971

    
972
  retries = 0
973
  while True:
974
    max_time = 0
975
    done = True
976
    cumul_degraded = False
977
    rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
978
    if not rstats:
979
      proc.LogWarning("Can't get any data from node %s" % node)
980
      retries += 1
981
      if retries >= 10:
982
        raise errors.RemoteError("Can't contact node %s for mirror data,"
983
                                 " aborting." % node)
984
      time.sleep(6)
985
      continue
986
    retries = 0
987
    for i in range(len(rstats)):
988
      mstat = rstats[i]
989
      if mstat is None:
990
        proc.LogWarning("Can't compute data for node %s/%s" %
991
                        (node, instance.disks[i].iv_name))
992
        continue
993
      # we ignore the ldisk parameter
994
      perc_done, est_time, is_degraded, _ = mstat
995
      cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
996
      if perc_done is not None:
997
        done = False
998
        if est_time is not None:
999
          rem_time = "%d estimated seconds remaining" % est_time
1000
          max_time = est_time
1001
        else:
1002
          rem_time = "no time estimate"
1003
        proc.LogInfo("- device %s: %5.2f%% done, %s" %
1004
                     (instance.disks[i].iv_name, perc_done, rem_time))
1005
    if done or oneshot:
1006
      break
1007

    
1008
    if unlock:
1009
      utils.Unlock('cmd')
1010
    try:
1011
      time.sleep(min(60, max_time))
1012
    finally:
1013
      if unlock:
1014
        utils.Lock('cmd')
1015

    
1016
  if done:
1017
    proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1018
  return not cumul_degraded
1019

    
1020

    
1021
def _CheckDiskConsistency(cfgw, dev, node, on_primary, ldisk=False):
1022
  """Check that mirrors are not degraded.
1023

1024
  The ldisk parameter, if True, will change the test from the
1025
  is_degraded attribute (which represents overall non-ok status for
1026
  the device(s)) to the ldisk (representing the local storage status).
1027

1028
  """
1029
  cfgw.SetDiskID(dev, node)
1030
  if ldisk:
1031
    idx = 6
1032
  else:
1033
    idx = 5
1034

    
1035
  result = True
1036
  if on_primary or dev.AssembleOnSecondary():
1037
    rstats = rpc.call_blockdev_find(node, dev)
1038
    if not rstats:
1039
      logger.ToStderr("Can't get any data from node %s" % node)
1040
      result = False
1041
    else:
1042
      result = result and (not rstats[idx])
1043
  if dev.children:
1044
    for child in dev.children:
1045
      result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1046

    
1047
  return result
1048

    
1049

    
1050
class LUDiagnoseOS(NoHooksLU):
1051
  """Logical unit for OS diagnose/query.
1052

1053
  """
1054
  _OP_REQP = []
1055

    
1056
  def CheckPrereq(self):
1057
    """Check prerequisites.
1058

1059
    This always succeeds, since this is a pure query LU.
1060

1061
    """
1062
    return
1063

    
1064
  def Exec(self, feedback_fn):
1065
    """Compute the list of OSes.
1066

1067
    """
1068
    node_list = self.cfg.GetNodeList()
1069
    node_data = rpc.call_os_diagnose(node_list)
1070
    if node_data == False:
1071
      raise errors.OpExecError("Can't gather the list of OSes")
1072
    return node_data
1073

    
1074

    
1075
class LURemoveNode(LogicalUnit):
1076
  """Logical unit for removing a node.
1077

1078
  """
1079
  HPATH = "node-remove"
1080
  HTYPE = constants.HTYPE_NODE
1081
  _OP_REQP = ["node_name"]
1082

    
1083
  def BuildHooksEnv(self):
1084
    """Build hooks env.
1085

1086
    This doesn't run on the target node in the pre phase as a failed
1087
    node would not allows itself to run.
1088

1089
    """
1090
    env = {
1091
      "OP_TARGET": self.op.node_name,
1092
      "NODE_NAME": self.op.node_name,
1093
      }
1094
    all_nodes = self.cfg.GetNodeList()
1095
    all_nodes.remove(self.op.node_name)
1096
    return env, all_nodes, all_nodes
1097

    
1098
  def CheckPrereq(self):
1099
    """Check prerequisites.
1100

1101
    This checks:
1102
     - the node exists in the configuration
1103
     - it does not have primary or secondary instances
1104
     - it's not the master
1105

1106
    Any errors are signalled by raising errors.OpPrereqError.
1107

1108
    """
1109
    node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1110
    if node is None:
1111
      raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1112

    
1113
    instance_list = self.cfg.GetInstanceList()
1114

    
1115
    masternode = self.sstore.GetMasterNode()
1116
    if node.name == masternode:
1117
      raise errors.OpPrereqError("Node is the master node,"
1118
                                 " you need to failover first.")
1119

    
1120
    for instance_name in instance_list:
1121
      instance = self.cfg.GetInstanceInfo(instance_name)
1122
      if node.name == instance.primary_node:
1123
        raise errors.OpPrereqError("Instance %s still running on the node,"
1124
                                   " please remove first." % instance_name)
1125
      if node.name in instance.secondary_nodes:
1126
        raise errors.OpPrereqError("Instance %s has node as a secondary,"
1127
                                   " please remove first." % instance_name)
1128
    self.op.node_name = node.name
1129
    self.node = node
1130

    
1131
  def Exec(self, feedback_fn):
1132
    """Removes the node from the cluster.
1133

1134
    """
1135
    node = self.node
1136
    logger.Info("stopping the node daemon and removing configs from node %s" %
1137
                node.name)
1138

    
1139
    rpc.call_node_leave_cluster(node.name)
1140

    
1141
    ssh.SSHCall(node.name, 'root', "%s stop" % constants.NODE_INITD_SCRIPT)
1142

    
1143
    logger.Info("Removing node %s from config" % node.name)
1144

    
1145
    self.cfg.RemoveNode(node.name)
1146

    
1147
    _RemoveHostFromEtcHosts(node.name)
1148

    
1149

    
1150
class LUQueryNodes(NoHooksLU):
1151
  """Logical unit for querying nodes.
1152

1153
  """
1154
  _OP_REQP = ["output_fields", "names"]
1155

    
1156
  def CheckPrereq(self):
1157
    """Check prerequisites.
1158

1159
    This checks that the fields required are valid output fields.
1160

1161
    """
1162
    self.dynamic_fields = frozenset(["dtotal", "dfree",
1163
                                     "mtotal", "mnode", "mfree",
1164
                                     "bootid"])
1165

    
1166
    _CheckOutputFields(static=["name", "pinst_cnt", "sinst_cnt",
1167
                               "pinst_list", "sinst_list",
1168
                               "pip", "sip"],
1169
                       dynamic=self.dynamic_fields,
1170
                       selected=self.op.output_fields)
1171

    
1172
    self.wanted = _GetWantedNodes(self, self.op.names)
1173

    
1174
  def Exec(self, feedback_fn):
1175
    """Computes the list of nodes and their attributes.
1176

1177
    """
1178
    nodenames = self.wanted
1179
    nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1180

    
1181
    # begin data gathering
1182

    
1183
    if self.dynamic_fields.intersection(self.op.output_fields):
1184
      live_data = {}
1185
      node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1186
      for name in nodenames:
1187
        nodeinfo = node_data.get(name, None)
1188
        if nodeinfo:
1189
          live_data[name] = {
1190
            "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1191
            "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1192
            "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1193
            "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1194
            "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1195
            "bootid": nodeinfo['bootid'],
1196
            }
1197
        else:
1198
          live_data[name] = {}
1199
    else:
1200
      live_data = dict.fromkeys(nodenames, {})
1201

    
1202
    node_to_primary = dict([(name, set()) for name in nodenames])
1203
    node_to_secondary = dict([(name, set()) for name in nodenames])
1204

    
1205
    inst_fields = frozenset(("pinst_cnt", "pinst_list",
1206
                             "sinst_cnt", "sinst_list"))
1207
    if inst_fields & frozenset(self.op.output_fields):
1208
      instancelist = self.cfg.GetInstanceList()
1209

    
1210
      for instance_name in instancelist:
1211
        inst = self.cfg.GetInstanceInfo(instance_name)
1212
        if inst.primary_node in node_to_primary:
1213
          node_to_primary[inst.primary_node].add(inst.name)
1214
        for secnode in inst.secondary_nodes:
1215
          if secnode in node_to_secondary:
1216
            node_to_secondary[secnode].add(inst.name)
1217

    
1218
    # end data gathering
1219

    
1220
    output = []
1221
    for node in nodelist:
1222
      node_output = []
1223
      for field in self.op.output_fields:
1224
        if field == "name":
1225
          val = node.name
1226
        elif field == "pinst_list":
1227
          val = list(node_to_primary[node.name])
1228
        elif field == "sinst_list":
1229
          val = list(node_to_secondary[node.name])
1230
        elif field == "pinst_cnt":
1231
          val = len(node_to_primary[node.name])
1232
        elif field == "sinst_cnt":
1233
          val = len(node_to_secondary[node.name])
1234
        elif field == "pip":
1235
          val = node.primary_ip
1236
        elif field == "sip":
1237
          val = node.secondary_ip
1238
        elif field in self.dynamic_fields:
1239
          val = live_data[node.name].get(field, None)
1240
        else:
1241
          raise errors.ParameterError(field)
1242
        node_output.append(val)
1243
      output.append(node_output)
1244

    
1245
    return output
1246

    
1247

    
1248
class LUQueryNodeVolumes(NoHooksLU):
1249
  """Logical unit for getting volumes on node(s).
1250

1251
  """
1252
  _OP_REQP = ["nodes", "output_fields"]
1253

    
1254
  def CheckPrereq(self):
1255
    """Check prerequisites.
1256

1257
    This checks that the fields required are valid output fields.
1258

1259
    """
1260
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1261

    
1262
    _CheckOutputFields(static=["node"],
1263
                       dynamic=["phys", "vg", "name", "size", "instance"],
1264
                       selected=self.op.output_fields)
1265

    
1266

    
1267
  def Exec(self, feedback_fn):
1268
    """Computes the list of nodes and their attributes.
1269

1270
    """
1271
    nodenames = self.nodes
1272
    volumes = rpc.call_node_volumes(nodenames)
1273

    
1274
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
1275
             in self.cfg.GetInstanceList()]
1276

    
1277
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1278

    
1279
    output = []
1280
    for node in nodenames:
1281
      if node not in volumes or not volumes[node]:
1282
        continue
1283

    
1284
      node_vols = volumes[node][:]
1285
      node_vols.sort(key=lambda vol: vol['dev'])
1286

    
1287
      for vol in node_vols:
1288
        node_output = []
1289
        for field in self.op.output_fields:
1290
          if field == "node":
1291
            val = node
1292
          elif field == "phys":
1293
            val = vol['dev']
1294
          elif field == "vg":
1295
            val = vol['vg']
1296
          elif field == "name":
1297
            val = vol['name']
1298
          elif field == "size":
1299
            val = int(float(vol['size']))
1300
          elif field == "instance":
1301
            for inst in ilist:
1302
              if node not in lv_by_node[inst]:
1303
                continue
1304
              if vol['name'] in lv_by_node[inst][node]:
1305
                val = inst.name
1306
                break
1307
            else:
1308
              val = '-'
1309
          else:
1310
            raise errors.ParameterError(field)
1311
          node_output.append(str(val))
1312

    
1313
        output.append(node_output)
1314

    
1315
    return output
1316

    
1317

    
1318
class LUAddNode(LogicalUnit):
1319
  """Logical unit for adding node to the cluster.
1320

1321
  """
1322
  HPATH = "node-add"
1323
  HTYPE = constants.HTYPE_NODE
1324
  _OP_REQP = ["node_name"]
1325

    
1326
  def BuildHooksEnv(self):
1327
    """Build hooks env.
1328

1329
    This will run on all nodes before, and on all nodes + the new node after.
1330

1331
    """
1332
    env = {
1333
      "OP_TARGET": self.op.node_name,
1334
      "NODE_NAME": self.op.node_name,
1335
      "NODE_PIP": self.op.primary_ip,
1336
      "NODE_SIP": self.op.secondary_ip,
1337
      }
1338
    nodes_0 = self.cfg.GetNodeList()
1339
    nodes_1 = nodes_0 + [self.op.node_name, ]
1340
    return env, nodes_0, nodes_1
1341

    
1342
  def CheckPrereq(self):
1343
    """Check prerequisites.
1344

1345
    This checks:
1346
     - the new node is not already in the config
1347
     - it is resolvable
1348
     - its parameters (single/dual homed) matches the cluster
1349

1350
    Any errors are signalled by raising errors.OpPrereqError.
1351

1352
    """
1353
    node_name = self.op.node_name
1354
    cfg = self.cfg
1355

    
1356
    dns_data = utils.HostInfo(node_name)
1357

    
1358
    node = dns_data.name
1359
    primary_ip = self.op.primary_ip = dns_data.ip
1360
    secondary_ip = getattr(self.op, "secondary_ip", None)
1361
    if secondary_ip is None:
1362
      secondary_ip = primary_ip
1363
    if not utils.IsValidIP(secondary_ip):
1364
      raise errors.OpPrereqError("Invalid secondary IP given")
1365
    self.op.secondary_ip = secondary_ip
1366
    node_list = cfg.GetNodeList()
1367
    if node in node_list:
1368
      raise errors.OpPrereqError("Node %s is already in the configuration"
1369
                                 % node)
1370

    
1371
    for existing_node_name in node_list:
1372
      existing_node = cfg.GetNodeInfo(existing_node_name)
1373
      if (existing_node.primary_ip == primary_ip or
1374
          existing_node.secondary_ip == primary_ip or
1375
          existing_node.primary_ip == secondary_ip or
1376
          existing_node.secondary_ip == secondary_ip):
1377
        raise errors.OpPrereqError("New node ip address(es) conflict with"
1378
                                   " existing node %s" % existing_node.name)
1379

    
1380
    # check that the type of the node (single versus dual homed) is the
1381
    # same as for the master
1382
    myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1383
    master_singlehomed = myself.secondary_ip == myself.primary_ip
1384
    newbie_singlehomed = secondary_ip == primary_ip
1385
    if master_singlehomed != newbie_singlehomed:
1386
      if master_singlehomed:
1387
        raise errors.OpPrereqError("The master has no private ip but the"
1388
                                   " new node has one")
1389
      else:
1390
        raise errors.OpPrereqError("The master has a private ip but the"
1391
                                   " new node doesn't have one")
1392

    
1393
    # checks reachablity
1394
    if not utils.TcpPing(utils.HostInfo().name,
1395
                         primary_ip,
1396
                         constants.DEFAULT_NODED_PORT):
1397
      raise errors.OpPrereqError("Node not reachable by ping")
1398

    
1399
    if not newbie_singlehomed:
1400
      # check reachability from my secondary ip to newbie's secondary ip
1401
      if not utils.TcpPing(myself.secondary_ip,
1402
                           secondary_ip,
1403
                           constants.DEFAULT_NODED_PORT):
1404
        raise errors.OpPrereqError(
1405
          "Node secondary ip not reachable by TCP based ping to noded port")
1406

    
1407
    self.new_node = objects.Node(name=node,
1408
                                 primary_ip=primary_ip,
1409
                                 secondary_ip=secondary_ip)
1410

    
1411
  def Exec(self, feedback_fn):
1412
    """Adds the new node to the cluster.
1413

1414
    """
1415
    new_node = self.new_node
1416
    node = new_node.name
1417

    
1418
    # set up inter-node password and certificate and restarts the node daemon
1419
    gntpass = self.sstore.GetNodeDaemonPassword()
1420
    if not re.match('^[a-zA-Z0-9.]{1,64}$', gntpass):
1421
      raise errors.OpExecError("ganeti password corruption detected")
1422
    f = open(constants.SSL_CERT_FILE)
1423
    try:
1424
      gntpem = f.read(8192)
1425
    finally:
1426
      f.close()
1427
    # in the base64 pem encoding, neither '!' nor '.' are valid chars,
1428
    # so we use this to detect an invalid certificate; as long as the
1429
    # cert doesn't contain this, the here-document will be correctly
1430
    # parsed by the shell sequence below
1431
    if re.search('^!EOF\.', gntpem, re.MULTILINE):
1432
      raise errors.OpExecError("invalid PEM encoding in the SSL certificate")
1433
    if not gntpem.endswith("\n"):
1434
      raise errors.OpExecError("PEM must end with newline")
1435
    logger.Info("copy cluster pass to %s and starting the node daemon" % node)
1436

    
1437
    # and then connect with ssh to set password and start ganeti-noded
1438
    # note that all the below variables are sanitized at this point,
1439
    # either by being constants or by the checks above
1440
    ss = self.sstore
1441
    mycommand = ("umask 077 && "
1442
                 "echo '%s' > '%s' && "
1443
                 "cat > '%s' << '!EOF.' && \n"
1444
                 "%s!EOF.\n%s restart" %
1445
                 (gntpass, ss.KeyToFilename(ss.SS_NODED_PASS),
1446
                  constants.SSL_CERT_FILE, gntpem,
1447
                  constants.NODE_INITD_SCRIPT))
1448

    
1449
    result = ssh.SSHCall(node, 'root', mycommand, batch=False, ask_key=True)
1450
    if result.failed:
1451
      raise errors.OpExecError("Remote command on node %s, error: %s,"
1452
                               " output: %s" %
1453
                               (node, result.fail_reason, result.output))
1454

    
1455
    # check connectivity
1456
    time.sleep(4)
1457

    
1458
    result = rpc.call_version([node])[node]
1459
    if result:
1460
      if constants.PROTOCOL_VERSION == result:
1461
        logger.Info("communication to node %s fine, sw version %s match" %
1462
                    (node, result))
1463
      else:
1464
        raise errors.OpExecError("Version mismatch master version %s,"
1465
                                 " node version %s" %
1466
                                 (constants.PROTOCOL_VERSION, result))
1467
    else:
1468
      raise errors.OpExecError("Cannot get version from the new node")
1469

    
1470
    # setup ssh on node
1471
    logger.Info("copy ssh key to node %s" % node)
1472
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1473
    keyarray = []
1474
    keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1475
                constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1476
                priv_key, pub_key]
1477

    
1478
    for i in keyfiles:
1479
      f = open(i, 'r')
1480
      try:
1481
        keyarray.append(f.read())
1482
      finally:
1483
        f.close()
1484

    
1485
    result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1486
                               keyarray[3], keyarray[4], keyarray[5])
1487

    
1488
    if not result:
1489
      raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1490

    
1491
    # Add node to our /etc/hosts, and add key to known_hosts
1492
    _AddHostToEtcHosts(new_node.name)
1493

    
1494
    _UpdateKnownHosts(new_node.name, new_node.primary_ip,
1495
                      self.cfg.GetHostKey())
1496

    
1497
    if new_node.secondary_ip != new_node.primary_ip:
1498
      if not rpc.call_node_tcp_ping(new_node.name,
1499
                                    constants.LOCALHOST_IP_ADDRESS,
1500
                                    new_node.secondary_ip,
1501
                                    constants.DEFAULT_NODED_PORT,
1502
                                    10, False):
1503
        raise errors.OpExecError("Node claims it doesn't have the"
1504
                                 " secondary ip you gave (%s).\n"
1505
                                 "Please fix and re-run this command." %
1506
                                 new_node.secondary_ip)
1507

    
1508
    success, msg = ssh.VerifyNodeHostname(node)
1509
    if not success:
1510
      raise errors.OpExecError("Node '%s' claims it has a different hostname"
1511
                               " than the one the resolver gives: %s.\n"
1512
                               "Please fix and re-run this command." %
1513
                               (node, msg))
1514

    
1515
    # Distribute updated /etc/hosts and known_hosts to all nodes,
1516
    # including the node just added
1517
    myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1518
    dist_nodes = self.cfg.GetNodeList() + [node]
1519
    if myself.name in dist_nodes:
1520
      dist_nodes.remove(myself.name)
1521

    
1522
    logger.Debug("Copying hosts and known_hosts to all nodes")
1523
    for fname in ("/etc/hosts", constants.SSH_KNOWN_HOSTS_FILE):
1524
      result = rpc.call_upload_file(dist_nodes, fname)
1525
      for to_node in dist_nodes:
1526
        if not result[to_node]:
1527
          logger.Error("copy of file %s to node %s failed" %
1528
                       (fname, to_node))
1529

    
1530
    to_copy = ss.GetFileList()
1531
    for fname in to_copy:
1532
      if not ssh.CopyFileToNode(node, fname):
1533
        logger.Error("could not copy file %s to node %s" % (fname, node))
1534

    
1535
    logger.Info("adding node %s to cluster.conf" % node)
1536
    self.cfg.AddNode(new_node)
1537

    
1538

    
1539
class LUMasterFailover(LogicalUnit):
1540
  """Failover the master node to the current node.
1541

1542
  This is a special LU in that it must run on a non-master node.
1543

1544
  """
1545
  HPATH = "master-failover"
1546
  HTYPE = constants.HTYPE_CLUSTER
1547
  REQ_MASTER = False
1548
  _OP_REQP = []
1549

    
1550
  def BuildHooksEnv(self):
1551
    """Build hooks env.
1552

1553
    This will run on the new master only in the pre phase, and on all
1554
    the nodes in the post phase.
1555

1556
    """
1557
    env = {
1558
      "OP_TARGET": self.new_master,
1559
      "NEW_MASTER": self.new_master,
1560
      "OLD_MASTER": self.old_master,
1561
      }
1562
    return env, [self.new_master], self.cfg.GetNodeList()
1563

    
1564
  def CheckPrereq(self):
1565
    """Check prerequisites.
1566

1567
    This checks that we are not already the master.
1568

1569
    """
1570
    self.new_master = utils.HostInfo().name
1571
    self.old_master = self.sstore.GetMasterNode()
1572

    
1573
    if self.old_master == self.new_master:
1574
      raise errors.OpPrereqError("This commands must be run on the node"
1575
                                 " where you want the new master to be.\n"
1576
                                 "%s is already the master" %
1577
                                 self.old_master)
1578

    
1579
  def Exec(self, feedback_fn):
1580
    """Failover the master node.
1581

1582
    This command, when run on a non-master node, will cause the current
1583
    master to cease being master, and the non-master to become new
1584
    master.
1585

1586
    """
1587
    #TODO: do not rely on gethostname returning the FQDN
1588
    logger.Info("setting master to %s, old master: %s" %
1589
                (self.new_master, self.old_master))
1590

    
1591
    if not rpc.call_node_stop_master(self.old_master):
1592
      logger.Error("could disable the master role on the old master"
1593
                   " %s, please disable manually" % self.old_master)
1594

    
1595
    ss = self.sstore
1596
    ss.SetKey(ss.SS_MASTER_NODE, self.new_master)
1597
    if not rpc.call_upload_file(self.cfg.GetNodeList(),
1598
                                ss.KeyToFilename(ss.SS_MASTER_NODE)):
1599
      logger.Error("could not distribute the new simple store master file"
1600
                   " to the other nodes, please check.")
1601

    
1602
    if not rpc.call_node_start_master(self.new_master):
1603
      logger.Error("could not start the master role on the new master"
1604
                   " %s, please check" % self.new_master)
1605
      feedback_fn("Error in activating the master IP on the new master,\n"
1606
                  "please fix manually.")
1607

    
1608

    
1609

    
1610
class LUQueryClusterInfo(NoHooksLU):
1611
  """Query cluster configuration.
1612

1613
  """
1614
  _OP_REQP = []
1615
  REQ_MASTER = False
1616

    
1617
  def CheckPrereq(self):
1618
    """No prerequsites needed for this LU.
1619

1620
    """
1621
    pass
1622

    
1623
  def Exec(self, feedback_fn):
1624
    """Return cluster config.
1625

1626
    """
1627
    result = {
1628
      "name": self.sstore.GetClusterName(),
1629
      "software_version": constants.RELEASE_VERSION,
1630
      "protocol_version": constants.PROTOCOL_VERSION,
1631
      "config_version": constants.CONFIG_VERSION,
1632
      "os_api_version": constants.OS_API_VERSION,
1633
      "export_version": constants.EXPORT_VERSION,
1634
      "master": self.sstore.GetMasterNode(),
1635
      "architecture": (platform.architecture()[0], platform.machine()),
1636
      }
1637

    
1638
    return result
1639

    
1640

    
1641
class LUClusterCopyFile(NoHooksLU):
1642
  """Copy file to cluster.
1643

1644
  """
1645
  _OP_REQP = ["nodes", "filename"]
1646

    
1647
  def CheckPrereq(self):
1648
    """Check prerequisites.
1649

1650
    It should check that the named file exists and that the given list
1651
    of nodes is valid.
1652

1653
    """
1654
    if not os.path.exists(self.op.filename):
1655
      raise errors.OpPrereqError("No such filename '%s'" % self.op.filename)
1656

    
1657
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1658

    
1659
  def Exec(self, feedback_fn):
1660
    """Copy a file from master to some nodes.
1661

1662
    Args:
1663
      opts - class with options as members
1664
      args - list containing a single element, the file name
1665
    Opts used:
1666
      nodes - list containing the name of target nodes; if empty, all nodes
1667

1668
    """
1669
    filename = self.op.filename
1670

    
1671
    myname = utils.HostInfo().name
1672

    
1673
    for node in self.nodes:
1674
      if node == myname:
1675
        continue
1676
      if not ssh.CopyFileToNode(node, filename):
1677
        logger.Error("Copy of file %s to node %s failed" % (filename, node))
1678

    
1679

    
1680
class LUDumpClusterConfig(NoHooksLU):
1681
  """Return a text-representation of the cluster-config.
1682

1683
  """
1684
  _OP_REQP = []
1685

    
1686
  def CheckPrereq(self):
1687
    """No prerequisites.
1688

1689
    """
1690
    pass
1691

    
1692
  def Exec(self, feedback_fn):
1693
    """Dump a representation of the cluster config to the standard output.
1694

1695
    """
1696
    return self.cfg.DumpConfig()
1697

    
1698

    
1699
class LURunClusterCommand(NoHooksLU):
1700
  """Run a command on some nodes.
1701

1702
  """
1703
  _OP_REQP = ["command", "nodes"]
1704

    
1705
  def CheckPrereq(self):
1706
    """Check prerequisites.
1707

1708
    It checks that the given list of nodes is valid.
1709

1710
    """
1711
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1712

    
1713
  def Exec(self, feedback_fn):
1714
    """Run a command on some nodes.
1715

1716
    """
1717
    data = []
1718
    for node in self.nodes:
1719
      result = ssh.SSHCall(node, "root", self.op.command)
1720
      data.append((node, result.output, result.exit_code))
1721

    
1722
    return data
1723

    
1724

    
1725
class LUActivateInstanceDisks(NoHooksLU):
1726
  """Bring up an instance's disks.
1727

1728
  """
1729
  _OP_REQP = ["instance_name"]
1730

    
1731
  def CheckPrereq(self):
1732
    """Check prerequisites.
1733

1734
    This checks that the instance is in the cluster.
1735

1736
    """
1737
    instance = self.cfg.GetInstanceInfo(
1738
      self.cfg.ExpandInstanceName(self.op.instance_name))
1739
    if instance is None:
1740
      raise errors.OpPrereqError("Instance '%s' not known" %
1741
                                 self.op.instance_name)
1742
    self.instance = instance
1743

    
1744

    
1745
  def Exec(self, feedback_fn):
1746
    """Activate the disks.
1747

1748
    """
1749
    disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
1750
    if not disks_ok:
1751
      raise errors.OpExecError("Cannot activate block devices")
1752

    
1753
    return disks_info
1754

    
1755

    
1756
def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
1757
  """Prepare the block devices for an instance.
1758

1759
  This sets up the block devices on all nodes.
1760

1761
  Args:
1762
    instance: a ganeti.objects.Instance object
1763
    ignore_secondaries: if true, errors on secondary nodes won't result
1764
                        in an error return from the function
1765

1766
  Returns:
1767
    false if the operation failed
1768
    list of (host, instance_visible_name, node_visible_name) if the operation
1769
         suceeded with the mapping from node devices to instance devices
1770
  """
1771
  device_info = []
1772
  disks_ok = True
1773
  for inst_disk in instance.disks:
1774
    master_result = None
1775
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1776
      cfg.SetDiskID(node_disk, node)
1777
      is_primary = node == instance.primary_node
1778
      result = rpc.call_blockdev_assemble(node, node_disk,
1779
                                          instance.name, is_primary)
1780
      if not result:
1781
        logger.Error("could not prepare block device %s on node %s (is_pri"
1782
                     "mary=%s)" % (inst_disk.iv_name, node, is_primary))
1783
        if is_primary or not ignore_secondaries:
1784
          disks_ok = False
1785
      if is_primary:
1786
        master_result = result
1787
    device_info.append((instance.primary_node, inst_disk.iv_name,
1788
                        master_result))
1789

    
1790
  # leave the disks configured for the primary node
1791
  # this is a workaround that would be fixed better by
1792
  # improving the logical/physical id handling
1793
  for disk in instance.disks:
1794
    cfg.SetDiskID(disk, instance.primary_node)
1795

    
1796
  return disks_ok, device_info
1797

    
1798

    
1799
def _StartInstanceDisks(cfg, instance, force):
1800
  """Start the disks of an instance.
1801

1802
  """
1803
  disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
1804
                                           ignore_secondaries=force)
1805
  if not disks_ok:
1806
    _ShutdownInstanceDisks(instance, cfg)
1807
    if force is not None and not force:
1808
      logger.Error("If the message above refers to a secondary node,"
1809
                   " you can retry the operation using '--force'.")
1810
    raise errors.OpExecError("Disk consistency error")
1811

    
1812

    
1813
class LUDeactivateInstanceDisks(NoHooksLU):
1814
  """Shutdown an instance's disks.
1815

1816
  """
1817
  _OP_REQP = ["instance_name"]
1818

    
1819
  def CheckPrereq(self):
1820
    """Check prerequisites.
1821

1822
    This checks that the instance is in the cluster.
1823

1824
    """
1825
    instance = self.cfg.GetInstanceInfo(
1826
      self.cfg.ExpandInstanceName(self.op.instance_name))
1827
    if instance is None:
1828
      raise errors.OpPrereqError("Instance '%s' not known" %
1829
                                 self.op.instance_name)
1830
    self.instance = instance
1831

    
1832
  def Exec(self, feedback_fn):
1833
    """Deactivate the disks
1834

1835
    """
1836
    instance = self.instance
1837
    ins_l = rpc.call_instance_list([instance.primary_node])
1838
    ins_l = ins_l[instance.primary_node]
1839
    if not type(ins_l) is list:
1840
      raise errors.OpExecError("Can't contact node '%s'" %
1841
                               instance.primary_node)
1842

    
1843
    if self.instance.name in ins_l:
1844
      raise errors.OpExecError("Instance is running, can't shutdown"
1845
                               " block devices.")
1846

    
1847
    _ShutdownInstanceDisks(instance, self.cfg)
1848

    
1849

    
1850
def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
1851
  """Shutdown block devices of an instance.
1852

1853
  This does the shutdown on all nodes of the instance.
1854

1855
  If the ignore_primary is false, errors on the primary node are
1856
  ignored.
1857

1858
  """
1859
  result = True
1860
  for disk in instance.disks:
1861
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
1862
      cfg.SetDiskID(top_disk, node)
1863
      if not rpc.call_blockdev_shutdown(node, top_disk):
1864
        logger.Error("could not shutdown block device %s on node %s" %
1865
                     (disk.iv_name, node))
1866
        if not ignore_primary or node != instance.primary_node:
1867
          result = False
1868
  return result
1869

    
1870

    
1871
class LUStartupInstance(LogicalUnit):
1872
  """Starts an instance.
1873

1874
  """
1875
  HPATH = "instance-start"
1876
  HTYPE = constants.HTYPE_INSTANCE
1877
  _OP_REQP = ["instance_name", "force"]
1878

    
1879
  def BuildHooksEnv(self):
1880
    """Build hooks env.
1881

1882
    This runs on master, primary and secondary nodes of the instance.
1883

1884
    """
1885
    env = {
1886
      "FORCE": self.op.force,
1887
      }
1888
    env.update(_BuildInstanceHookEnvByObject(self.instance))
1889
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
1890
          list(self.instance.secondary_nodes))
1891
    return env, nl, nl
1892

    
1893
  def CheckPrereq(self):
1894
    """Check prerequisites.
1895

1896
    This checks that the instance is in the cluster.
1897

1898
    """
1899
    instance = self.cfg.GetInstanceInfo(
1900
      self.cfg.ExpandInstanceName(self.op.instance_name))
1901
    if instance is None:
1902
      raise errors.OpPrereqError("Instance '%s' not known" %
1903
                                 self.op.instance_name)
1904

    
1905
    # check bridges existance
1906
    _CheckInstanceBridgesExist(instance)
1907

    
1908
    self.instance = instance
1909
    self.op.instance_name = instance.name
1910

    
1911
  def Exec(self, feedback_fn):
1912
    """Start the instance.
1913

1914
    """
1915
    instance = self.instance
1916
    force = self.op.force
1917
    extra_args = getattr(self.op, "extra_args", "")
1918

    
1919
    node_current = instance.primary_node
1920

    
1921
    nodeinfo = rpc.call_node_info([node_current], self.cfg.GetVGName())
1922
    if not nodeinfo:
1923
      raise errors.OpExecError("Could not contact node %s for infos" %
1924
                               (node_current))
1925

    
1926
    freememory = nodeinfo[node_current]['memory_free']
1927
    memory = instance.memory
1928
    if memory > freememory:
1929
      raise errors.OpExecError("Not enough memory to start instance"
1930
                               " %s on node %s"
1931
                               " needed %s MiB, available %s MiB" %
1932
                               (instance.name, node_current, memory,
1933
                                freememory))
1934

    
1935
    _StartInstanceDisks(self.cfg, instance, force)
1936

    
1937
    if not rpc.call_instance_start(node_current, instance, extra_args):
1938
      _ShutdownInstanceDisks(instance, self.cfg)
1939
      raise errors.OpExecError("Could not start instance")
1940

    
1941
    self.cfg.MarkInstanceUp(instance.name)
1942

    
1943

    
1944
class LURebootInstance(LogicalUnit):
1945
  """Reboot an instance.
1946

1947
  """
1948
  HPATH = "instance-reboot"
1949
  HTYPE = constants.HTYPE_INSTANCE
1950
  _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
1951

    
1952
  def BuildHooksEnv(self):
1953
    """Build hooks env.
1954

1955
    This runs on master, primary and secondary nodes of the instance.
1956

1957
    """
1958
    env = {
1959
      "IGNORE_SECONDARIES": self.op.ignore_secondaries,
1960
      }
1961
    env.update(_BuildInstanceHookEnvByObject(self.instance))
1962
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
1963
          list(self.instance.secondary_nodes))
1964
    return env, nl, nl
1965

    
1966
  def CheckPrereq(self):
1967
    """Check prerequisites.
1968

1969
    This checks that the instance is in the cluster.
1970

1971
    """
1972
    instance = self.cfg.GetInstanceInfo(
1973
      self.cfg.ExpandInstanceName(self.op.instance_name))
1974
    if instance is None:
1975
      raise errors.OpPrereqError("Instance '%s' not known" %
1976
                                 self.op.instance_name)
1977

    
1978
    # check bridges existance
1979
    _CheckInstanceBridgesExist(instance)
1980

    
1981
    self.instance = instance
1982
    self.op.instance_name = instance.name
1983

    
1984
  def Exec(self, feedback_fn):
1985
    """Reboot the instance.
1986

1987
    """
1988
    instance = self.instance
1989
    ignore_secondaries = self.op.ignore_secondaries
1990
    reboot_type = self.op.reboot_type
1991
    extra_args = getattr(self.op, "extra_args", "")
1992

    
1993
    node_current = instance.primary_node
1994

    
1995
    if reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
1996
                           constants.INSTANCE_REBOOT_HARD,
1997
                           constants.INSTANCE_REBOOT_FULL]:
1998
      raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
1999
                                  (constants.INSTANCE_REBOOT_SOFT,
2000
                                   constants.INSTANCE_REBOOT_HARD,
2001
                                   constants.INSTANCE_REBOOT_FULL))
2002

    
2003
    if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2004
                       constants.INSTANCE_REBOOT_HARD]:
2005
      if not rpc.call_instance_reboot(node_current, instance,
2006
                                      reboot_type, extra_args):
2007
        raise errors.OpExecError("Could not reboot instance")
2008
    else:
2009
      if not rpc.call_instance_shutdown(node_current, instance):
2010
        raise errors.OpExecError("could not shutdown instance for full reboot")
2011
      _ShutdownInstanceDisks(instance, self.cfg)
2012
      _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
2013
      if not rpc.call_instance_start(node_current, instance, extra_args):
2014
        _ShutdownInstanceDisks(instance, self.cfg)
2015
        raise errors.OpExecError("Could not start instance for full reboot")
2016

    
2017
    self.cfg.MarkInstanceUp(instance.name)
2018

    
2019

    
2020
class LUShutdownInstance(LogicalUnit):
2021
  """Shutdown an instance.
2022

2023
  """
2024
  HPATH = "instance-stop"
2025
  HTYPE = constants.HTYPE_INSTANCE
2026
  _OP_REQP = ["instance_name"]
2027

    
2028
  def BuildHooksEnv(self):
2029
    """Build hooks env.
2030

2031
    This runs on master, primary and secondary nodes of the instance.
2032

2033
    """
2034
    env = _BuildInstanceHookEnvByObject(self.instance)
2035
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2036
          list(self.instance.secondary_nodes))
2037
    return env, nl, nl
2038

    
2039
  def CheckPrereq(self):
2040
    """Check prerequisites.
2041

2042
    This checks that the instance is in the cluster.
2043

2044
    """
2045
    instance = self.cfg.GetInstanceInfo(
2046
      self.cfg.ExpandInstanceName(self.op.instance_name))
2047
    if instance is None:
2048
      raise errors.OpPrereqError("Instance '%s' not known" %
2049
                                 self.op.instance_name)
2050
    self.instance = instance
2051

    
2052
  def Exec(self, feedback_fn):
2053
    """Shutdown the instance.
2054

2055
    """
2056
    instance = self.instance
2057
    node_current = instance.primary_node
2058
    if not rpc.call_instance_shutdown(node_current, instance):
2059
      logger.Error("could not shutdown instance")
2060

    
2061
    self.cfg.MarkInstanceDown(instance.name)
2062
    _ShutdownInstanceDisks(instance, self.cfg)
2063

    
2064

    
2065
class LUReinstallInstance(LogicalUnit):
2066
  """Reinstall an instance.
2067

2068
  """
2069
  HPATH = "instance-reinstall"
2070
  HTYPE = constants.HTYPE_INSTANCE
2071
  _OP_REQP = ["instance_name"]
2072

    
2073
  def BuildHooksEnv(self):
2074
    """Build hooks env.
2075

2076
    This runs on master, primary and secondary nodes of the instance.
2077

2078
    """
2079
    env = _BuildInstanceHookEnvByObject(self.instance)
2080
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2081
          list(self.instance.secondary_nodes))
2082
    return env, nl, nl
2083

    
2084
  def CheckPrereq(self):
2085
    """Check prerequisites.
2086

2087
    This checks that the instance is in the cluster and is not running.
2088

2089
    """
2090
    instance = self.cfg.GetInstanceInfo(
2091
      self.cfg.ExpandInstanceName(self.op.instance_name))
2092
    if instance is None:
2093
      raise errors.OpPrereqError("Instance '%s' not known" %
2094
                                 self.op.instance_name)
2095
    if instance.disk_template == constants.DT_DISKLESS:
2096
      raise errors.OpPrereqError("Instance '%s' has no disks" %
2097
                                 self.op.instance_name)
2098
    if instance.status != "down":
2099
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2100
                                 self.op.instance_name)
2101
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2102
    if remote_info:
2103
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2104
                                 (self.op.instance_name,
2105
                                  instance.primary_node))
2106

    
2107
    self.op.os_type = getattr(self.op, "os_type", None)
2108
    if self.op.os_type is not None:
2109
      # OS verification
2110
      pnode = self.cfg.GetNodeInfo(
2111
        self.cfg.ExpandNodeName(instance.primary_node))
2112
      if pnode is None:
2113
        raise errors.OpPrereqError("Primary node '%s' is unknown" %
2114
                                   self.op.pnode)
2115
      os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2116
      if not os_obj:
2117
        raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2118
                                   " primary node"  % self.op.os_type)
2119

    
2120
    self.instance = instance
2121

    
2122
  def Exec(self, feedback_fn):
2123
    """Reinstall the instance.
2124

2125
    """
2126
    inst = self.instance
2127

    
2128
    if self.op.os_type is not None:
2129
      feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2130
      inst.os = self.op.os_type
2131
      self.cfg.AddInstance(inst)
2132

    
2133
    _StartInstanceDisks(self.cfg, inst, None)
2134
    try:
2135
      feedback_fn("Running the instance OS create scripts...")
2136
      if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2137
        raise errors.OpExecError("Could not install OS for instance %s "
2138
                                 "on node %s" %
2139
                                 (inst.name, inst.primary_node))
2140
    finally:
2141
      _ShutdownInstanceDisks(inst, self.cfg)
2142

    
2143

    
2144
class LURenameInstance(LogicalUnit):
2145
  """Rename an instance.
2146

2147
  """
2148
  HPATH = "instance-rename"
2149
  HTYPE = constants.HTYPE_INSTANCE
2150
  _OP_REQP = ["instance_name", "new_name"]
2151

    
2152
  def BuildHooksEnv(self):
2153
    """Build hooks env.
2154

2155
    This runs on master, primary and secondary nodes of the instance.
2156

2157
    """
2158
    env = _BuildInstanceHookEnvByObject(self.instance)
2159
    env["INSTANCE_NEW_NAME"] = self.op.new_name
2160
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2161
          list(self.instance.secondary_nodes))
2162
    return env, nl, nl
2163

    
2164
  def CheckPrereq(self):
2165
    """Check prerequisites.
2166

2167
    This checks that the instance is in the cluster and is not running.
2168

2169
    """
2170
    instance = self.cfg.GetInstanceInfo(
2171
      self.cfg.ExpandInstanceName(self.op.instance_name))
2172
    if instance is None:
2173
      raise errors.OpPrereqError("Instance '%s' not known" %
2174
                                 self.op.instance_name)
2175
    if instance.status != "down":
2176
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2177
                                 self.op.instance_name)
2178
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2179
    if remote_info:
2180
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2181
                                 (self.op.instance_name,
2182
                                  instance.primary_node))
2183
    self.instance = instance
2184

    
2185
    # new name verification
2186
    name_info = utils.HostInfo(self.op.new_name)
2187

    
2188
    self.op.new_name = new_name = name_info.name
2189
    if not getattr(self.op, "ignore_ip", False):
2190
      command = ["fping", "-q", name_info.ip]
2191
      result = utils.RunCmd(command)
2192
      if not result.failed:
2193
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
2194
                                   (name_info.ip, new_name))
2195

    
2196

    
2197
  def Exec(self, feedback_fn):
2198
    """Reinstall the instance.
2199

2200
    """
2201
    inst = self.instance
2202
    old_name = inst.name
2203

    
2204
    self.cfg.RenameInstance(inst.name, self.op.new_name)
2205

    
2206
    # re-read the instance from the configuration after rename
2207
    inst = self.cfg.GetInstanceInfo(self.op.new_name)
2208

    
2209
    _StartInstanceDisks(self.cfg, inst, None)
2210
    try:
2211
      if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2212
                                          "sda", "sdb"):
2213
        msg = ("Could run OS rename script for instance %s\n"
2214
               "on node %s\n"
2215
               "(but the instance has been renamed in Ganeti)" %
2216
               (inst.name, inst.primary_node))
2217
        logger.Error(msg)
2218
    finally:
2219
      _ShutdownInstanceDisks(inst, self.cfg)
2220

    
2221

    
2222
class LURemoveInstance(LogicalUnit):
2223
  """Remove an instance.
2224

2225
  """
2226
  HPATH = "instance-remove"
2227
  HTYPE = constants.HTYPE_INSTANCE
2228
  _OP_REQP = ["instance_name"]
2229

    
2230
  def BuildHooksEnv(self):
2231
    """Build hooks env.
2232

2233
    This runs on master, primary and secondary nodes of the instance.
2234

2235
    """
2236
    env = _BuildInstanceHookEnvByObject(self.instance)
2237
    nl = [self.sstore.GetMasterNode()]
2238
    return env, nl, nl
2239

    
2240
  def CheckPrereq(self):
2241
    """Check prerequisites.
2242

2243
    This checks that the instance is in the cluster.
2244

2245
    """
2246
    instance = self.cfg.GetInstanceInfo(
2247
      self.cfg.ExpandInstanceName(self.op.instance_name))
2248
    if instance is None:
2249
      raise errors.OpPrereqError("Instance '%s' not known" %
2250
                                 self.op.instance_name)
2251
    self.instance = instance
2252

    
2253
  def Exec(self, feedback_fn):
2254
    """Remove the instance.
2255

2256
    """
2257
    instance = self.instance
2258
    logger.Info("shutting down instance %s on node %s" %
2259
                (instance.name, instance.primary_node))
2260

    
2261
    if not rpc.call_instance_shutdown(instance.primary_node, instance):
2262
      if self.op.ignore_failures:
2263
        feedback_fn("Warning: can't shutdown instance")
2264
      else:
2265
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2266
                                 (instance.name, instance.primary_node))
2267

    
2268
    logger.Info("removing block devices for instance %s" % instance.name)
2269

    
2270
    if not _RemoveDisks(instance, self.cfg):
2271
      if self.op.ignore_failures:
2272
        feedback_fn("Warning: can't remove instance's disks")
2273
      else:
2274
        raise errors.OpExecError("Can't remove instance's disks")
2275

    
2276
    logger.Info("removing instance %s out of cluster config" % instance.name)
2277

    
2278
    self.cfg.RemoveInstance(instance.name)
2279

    
2280

    
2281
class LUQueryInstances(NoHooksLU):
2282
  """Logical unit for querying instances.
2283

2284
  """
2285
  _OP_REQP = ["output_fields", "names"]
2286

    
2287
  def CheckPrereq(self):
2288
    """Check prerequisites.
2289

2290
    This checks that the fields required are valid output fields.
2291

2292
    """
2293
    self.dynamic_fields = frozenset(["oper_state", "oper_ram"])
2294
    _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2295
                               "admin_state", "admin_ram",
2296
                               "disk_template", "ip", "mac", "bridge",
2297
                               "sda_size", "sdb_size"],
2298
                       dynamic=self.dynamic_fields,
2299
                       selected=self.op.output_fields)
2300

    
2301
    self.wanted = _GetWantedInstances(self, self.op.names)
2302

    
2303
  def Exec(self, feedback_fn):
2304
    """Computes the list of nodes and their attributes.
2305

2306
    """
2307
    instance_names = self.wanted
2308
    instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2309
                     in instance_names]
2310

    
2311
    # begin data gathering
2312

    
2313
    nodes = frozenset([inst.primary_node for inst in instance_list])
2314

    
2315
    bad_nodes = []
2316
    if self.dynamic_fields.intersection(self.op.output_fields):
2317
      live_data = {}
2318
      node_data = rpc.call_all_instances_info(nodes)
2319
      for name in nodes:
2320
        result = node_data[name]
2321
        if result:
2322
          live_data.update(result)
2323
        elif result == False:
2324
          bad_nodes.append(name)
2325
        # else no instance is alive
2326
    else:
2327
      live_data = dict([(name, {}) for name in instance_names])
2328

    
2329
    # end data gathering
2330

    
2331
    output = []
2332
    for instance in instance_list:
2333
      iout = []
2334
      for field in self.op.output_fields:
2335
        if field == "name":
2336
          val = instance.name
2337
        elif field == "os":
2338
          val = instance.os
2339
        elif field == "pnode":
2340
          val = instance.primary_node
2341
        elif field == "snodes":
2342
          val = list(instance.secondary_nodes)
2343
        elif field == "admin_state":
2344
          val = (instance.status != "down")
2345
        elif field == "oper_state":
2346
          if instance.primary_node in bad_nodes:
2347
            val = None
2348
          else:
2349
            val = bool(live_data.get(instance.name))
2350
        elif field == "admin_ram":
2351
          val = instance.memory
2352
        elif field == "oper_ram":
2353
          if instance.primary_node in bad_nodes:
2354
            val = None
2355
          elif instance.name in live_data:
2356
            val = live_data[instance.name].get("memory", "?")
2357
          else:
2358
            val = "-"
2359
        elif field == "disk_template":
2360
          val = instance.disk_template
2361
        elif field == "ip":
2362
          val = instance.nics[0].ip
2363
        elif field == "bridge":
2364
          val = instance.nics[0].bridge
2365
        elif field == "mac":
2366
          val = instance.nics[0].mac
2367
        elif field == "sda_size" or field == "sdb_size":
2368
          disk = instance.FindDisk(field[:3])
2369
          if disk is None:
2370
            val = None
2371
          else:
2372
            val = disk.size
2373
        else:
2374
          raise errors.ParameterError(field)
2375
        iout.append(val)
2376
      output.append(iout)
2377

    
2378
    return output
2379

    
2380

    
2381
class LUFailoverInstance(LogicalUnit):
2382
  """Failover an instance.
2383

2384
  """
2385
  HPATH = "instance-failover"
2386
  HTYPE = constants.HTYPE_INSTANCE
2387
  _OP_REQP = ["instance_name", "ignore_consistency"]
2388

    
2389
  def BuildHooksEnv(self):
2390
    """Build hooks env.
2391

2392
    This runs on master, primary and secondary nodes of the instance.
2393

2394
    """
2395
    env = {
2396
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2397
      }
2398
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2399
    nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2400
    return env, nl, nl
2401

    
2402
  def CheckPrereq(self):
2403
    """Check prerequisites.
2404

2405
    This checks that the instance is in the cluster.
2406

2407
    """
2408
    instance = self.cfg.GetInstanceInfo(
2409
      self.cfg.ExpandInstanceName(self.op.instance_name))
2410
    if instance is None:
2411
      raise errors.OpPrereqError("Instance '%s' not known" %
2412
                                 self.op.instance_name)
2413

    
2414
    if instance.disk_template not in constants.DTS_NET_MIRROR:
2415
      raise errors.OpPrereqError("Instance's disk layout is not"
2416
                                 " network mirrored, cannot failover.")
2417

    
2418
    secondary_nodes = instance.secondary_nodes
2419
    if not secondary_nodes:
2420
      raise errors.ProgrammerError("no secondary node but using "
2421
                                   "DT_REMOTE_RAID1 template")
2422

    
2423
    # check memory requirements on the secondary node
2424
    target_node = secondary_nodes[0]
2425
    nodeinfo = rpc.call_node_info([target_node], self.cfg.GetVGName())
2426
    info = nodeinfo.get(target_node, None)
2427
    if not info:
2428
      raise errors.OpPrereqError("Cannot get current information"
2429
                                 " from node '%s'" % nodeinfo)
2430
    if instance.memory > info['memory_free']:
2431
      raise errors.OpPrereqError("Not enough memory on target node %s."
2432
                                 " %d MB available, %d MB required" %
2433
                                 (target_node, info['memory_free'],
2434
                                  instance.memory))
2435

    
2436
    # check bridge existance
2437
    brlist = [nic.bridge for nic in instance.nics]
2438
    if not rpc.call_bridges_exist(target_node, brlist):
2439
      raise errors.OpPrereqError("One or more target bridges %s does not"
2440
                                 " exist on destination node '%s'" %
2441
                                 (brlist, target_node))
2442

    
2443
    self.instance = instance
2444

    
2445
  def Exec(self, feedback_fn):
2446
    """Failover an instance.
2447

2448
    The failover is done by shutting it down on its present node and
2449
    starting it on the secondary.
2450

2451
    """
2452
    instance = self.instance
2453

    
2454
    source_node = instance.primary_node
2455
    target_node = instance.secondary_nodes[0]
2456

    
2457
    feedback_fn("* checking disk consistency between source and target")
2458
    for dev in instance.disks:
2459
      # for remote_raid1, these are md over drbd
2460
      if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2461
        if not self.op.ignore_consistency:
2462
          raise errors.OpExecError("Disk %s is degraded on target node,"
2463
                                   " aborting failover." % dev.iv_name)
2464

    
2465
    feedback_fn("* checking target node resource availability")
2466
    nodeinfo = rpc.call_node_info([target_node], self.cfg.GetVGName())
2467

    
2468
    if not nodeinfo:
2469
      raise errors.OpExecError("Could not contact target node %s." %
2470
                               target_node)
2471

    
2472
    free_memory = int(nodeinfo[target_node]['memory_free'])
2473
    memory = instance.memory
2474
    if memory > free_memory:
2475
      raise errors.OpExecError("Not enough memory to create instance %s on"
2476
                               " node %s. needed %s MiB, available %s MiB" %
2477
                               (instance.name, target_node, memory,
2478
                                free_memory))
2479

    
2480
    feedback_fn("* shutting down instance on source node")
2481
    logger.Info("Shutting down instance %s on node %s" %
2482
                (instance.name, source_node))
2483

    
2484
    if not rpc.call_instance_shutdown(source_node, instance):
2485
      if self.op.ignore_consistency:
2486
        logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2487
                     " anyway. Please make sure node %s is down"  %
2488
                     (instance.name, source_node, source_node))
2489
      else:
2490
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2491
                                 (instance.name, source_node))
2492

    
2493
    feedback_fn("* deactivating the instance's disks on source node")
2494
    if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2495
      raise errors.OpExecError("Can't shut down the instance's disks.")
2496

    
2497
    instance.primary_node = target_node
2498
    # distribute new instance config to the other nodes
2499
    self.cfg.AddInstance(instance)
2500

    
2501
    feedback_fn("* activating the instance's disks on target node")
2502
    logger.Info("Starting instance %s on node %s" %
2503
                (instance.name, target_node))
2504

    
2505
    disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2506
                                             ignore_secondaries=True)
2507
    if not disks_ok:
2508
      _ShutdownInstanceDisks(instance, self.cfg)
2509
      raise errors.OpExecError("Can't activate the instance's disks")
2510

    
2511
    feedback_fn("* starting the instance on the target node")
2512
    if not rpc.call_instance_start(target_node, instance, None):
2513
      _ShutdownInstanceDisks(instance, self.cfg)
2514
      raise errors.OpExecError("Could not start instance %s on node %s." %
2515
                               (instance.name, target_node))
2516

    
2517

    
2518
def _CreateBlockDevOnPrimary(cfg, node, instance, device, info):
2519
  """Create a tree of block devices on the primary node.
2520

2521
  This always creates all devices.
2522

2523
  """
2524
  if device.children:
2525
    for child in device.children:
2526
      if not _CreateBlockDevOnPrimary(cfg, node, instance, child, info):
2527
        return False
2528

    
2529
  cfg.SetDiskID(device, node)
2530
  new_id = rpc.call_blockdev_create(node, device, device.size,
2531
                                    instance.name, True, info)
2532
  if not new_id:
2533
    return False
2534
  if device.physical_id is None:
2535
    device.physical_id = new_id
2536
  return True
2537

    
2538

    
2539
def _CreateBlockDevOnSecondary(cfg, node, instance, device, force, info):
2540
  """Create a tree of block devices on a secondary node.
2541

2542
  If this device type has to be created on secondaries, create it and
2543
  all its children.
2544

2545
  If not, just recurse to children keeping the same 'force' value.
2546

2547
  """
2548
  if device.CreateOnSecondary():
2549
    force = True
2550
  if device.children:
2551
    for child in device.children:
2552
      if not _CreateBlockDevOnSecondary(cfg, node, instance,
2553
                                        child, force, info):
2554
        return False
2555

    
2556
  if not force:
2557
    return True
2558
  cfg.SetDiskID(device, node)
2559
  new_id = rpc.call_blockdev_create(node, device, device.size,
2560
                                    instance.name, False, info)
2561
  if not new_id:
2562
    return False
2563
  if device.physical_id is None:
2564
    device.physical_id = new_id
2565
  return True
2566

    
2567

    
2568
def _GenerateUniqueNames(cfg, exts):
2569
  """Generate a suitable LV name.
2570

2571
  This will generate a logical volume name for the given instance.
2572

2573
  """
2574
  results = []
2575
  for val in exts:
2576
    new_id = cfg.GenerateUniqueID()
2577
    results.append("%s%s" % (new_id, val))
2578
  return results
2579

    
2580

    
2581
def _GenerateMDDRBDBranch(cfg, primary, secondary, size, names):
2582
  """Generate a drbd device complete with its children.
2583

2584
  """
2585
  port = cfg.AllocatePort()
2586
  vgname = cfg.GetVGName()
2587
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2588
                          logical_id=(vgname, names[0]))
2589
  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2590
                          logical_id=(vgname, names[1]))
2591
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD7, size=size,
2592
                          logical_id = (primary, secondary, port),
2593
                          children = [dev_data, dev_meta])
2594
  return drbd_dev
2595

    
2596

    
2597
def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name):
2598
  """Generate a drbd8 device complete with its children.
2599

2600
  """
2601
  port = cfg.AllocatePort()
2602
  vgname = cfg.GetVGName()
2603
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2604
                          logical_id=(vgname, names[0]))
2605
  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2606
                          logical_id=(vgname, names[1]))
2607
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
2608
                          logical_id = (primary, secondary, port),
2609
                          children = [dev_data, dev_meta],
2610
                          iv_name=iv_name)
2611
  return drbd_dev
2612

    
2613
def _GenerateDiskTemplate(cfg, template_name,
2614
                          instance_name, primary_node,
2615
                          secondary_nodes, disk_sz, swap_sz):
2616
  """Generate the entire disk layout for a given template type.
2617

2618
  """
2619
  #TODO: compute space requirements
2620

    
2621
  vgname = cfg.GetVGName()
2622
  if template_name == "diskless":
2623
    disks = []
2624
  elif template_name == "plain":
2625
    if len(secondary_nodes) != 0:
2626
      raise errors.ProgrammerError("Wrong template configuration")
2627

    
2628
    names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
2629
    sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2630
                           logical_id=(vgname, names[0]),
2631
                           iv_name = "sda")
2632
    sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2633
                           logical_id=(vgname, names[1]),
2634
                           iv_name = "sdb")
2635
    disks = [sda_dev, sdb_dev]
2636
  elif template_name == "local_raid1":
2637
    if len(secondary_nodes) != 0:
2638
      raise errors.ProgrammerError("Wrong template configuration")
2639

    
2640

    
2641
    names = _GenerateUniqueNames(cfg, [".sda_m1", ".sda_m2",
2642
                                       ".sdb_m1", ".sdb_m2"])
2643
    sda_dev_m1 = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2644
                              logical_id=(vgname, names[0]))
2645
    sda_dev_m2 = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2646
                              logical_id=(vgname, names[1]))
2647
    md_sda_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name = "sda",
2648
                              size=disk_sz,
2649
                              children = [sda_dev_m1, sda_dev_m2])
2650
    sdb_dev_m1 = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2651
                              logical_id=(vgname, names[2]))
2652
    sdb_dev_m2 = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2653
                              logical_id=(vgname, names[3]))
2654
    md_sdb_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name = "sdb",
2655
                              size=swap_sz,
2656
                              children = [sdb_dev_m1, sdb_dev_m2])
2657
    disks = [md_sda_dev, md_sdb_dev]
2658
  elif template_name == constants.DT_REMOTE_RAID1:
2659
    if len(secondary_nodes) != 1:
2660
      raise errors.ProgrammerError("Wrong template configuration")
2661
    remote_node = secondary_nodes[0]
2662
    names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2663
                                       ".sdb_data", ".sdb_meta"])
2664
    drbd_sda_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2665
                                         disk_sz, names[0:2])
2666
    md_sda_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name="sda",
2667
                              children = [drbd_sda_dev], size=disk_sz)
2668
    drbd_sdb_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2669
                                         swap_sz, names[2:4])
2670
    md_sdb_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name="sdb",
2671
                              children = [drbd_sdb_dev], size=swap_sz)
2672
    disks = [md_sda_dev, md_sdb_dev]
2673
  elif template_name == constants.DT_DRBD8:
2674
    if len(secondary_nodes) != 1:
2675
      raise errors.ProgrammerError("Wrong template configuration")
2676
    remote_node = secondary_nodes[0]
2677
    names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2678
                                       ".sdb_data", ".sdb_meta"])
2679
    drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2680
                                         disk_sz, names[0:2], "sda")
2681
    drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2682
                                         swap_sz, names[2:4], "sdb")
2683
    disks = [drbd_sda_dev, drbd_sdb_dev]
2684
  else:
2685
    raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
2686
  return disks
2687

    
2688

    
2689
def _GetInstanceInfoText(instance):
2690
  """Compute that text that should be added to the disk's metadata.
2691

2692
  """
2693
  return "originstname+%s" % instance.name
2694

    
2695

    
2696
def _CreateDisks(cfg, instance):
2697
  """Create all disks for an instance.
2698

2699
  This abstracts away some work from AddInstance.
2700

2701
  Args:
2702
    instance: the instance object
2703

2704
  Returns:
2705
    True or False showing the success of the creation process
2706

2707
  """
2708
  info = _GetInstanceInfoText(instance)
2709

    
2710
  for device in instance.disks:
2711
    logger.Info("creating volume %s for instance %s" %
2712
              (device.iv_name, instance.name))
2713
    #HARDCODE
2714
    for secondary_node in instance.secondary_nodes:
2715
      if not _CreateBlockDevOnSecondary(cfg, secondary_node, instance,
2716
                                        device, False, info):
2717
        logger.Error("failed to create volume %s (%s) on secondary node %s!" %
2718
                     (device.iv_name, device, secondary_node))
2719
        return False
2720
    #HARDCODE
2721
    if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
2722
                                    instance, device, info):
2723
      logger.Error("failed to create volume %s on primary!" %
2724
                   device.iv_name)
2725
      return False
2726
  return True
2727

    
2728

    
2729
def _RemoveDisks(instance, cfg):
2730
  """Remove all disks for an instance.
2731

2732
  This abstracts away some work from `AddInstance()` and
2733
  `RemoveInstance()`. Note that in case some of the devices couldn't
2734
  be removed, the removal will continue with the other ones (compare
2735
  with `_CreateDisks()`).
2736

2737
  Args:
2738
    instance: the instance object
2739

2740
  Returns:
2741
    True or False showing the success of the removal proces
2742

2743
  """
2744
  logger.Info("removing block devices for instance %s" % instance.name)
2745

    
2746
  result = True
2747
  for device in instance.disks:
2748
    for node, disk in device.ComputeNodeTree(instance.primary_node):
2749
      cfg.SetDiskID(disk, node)
2750
      if not rpc.call_blockdev_remove(node, disk):
2751
        logger.Error("could not remove block device %s on node %s,"
2752
                     " continuing anyway" %
2753
                     (device.iv_name, node))
2754
        result = False
2755
  return result
2756

    
2757

    
2758
class LUCreateInstance(LogicalUnit):
2759
  """Create an instance.
2760

2761
  """
2762
  HPATH = "instance-add"
2763
  HTYPE = constants.HTYPE_INSTANCE
2764
  _OP_REQP = ["instance_name", "mem_size", "disk_size", "pnode",
2765
              "disk_template", "swap_size", "mode", "start", "vcpus",
2766
              "wait_for_sync", "ip_check"]
2767

    
2768
  def BuildHooksEnv(self):
2769
    """Build hooks env.
2770

2771
    This runs on master, primary and secondary nodes of the instance.
2772

2773
    """
2774
    env = {
2775
      "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
2776
      "INSTANCE_DISK_SIZE": self.op.disk_size,
2777
      "INSTANCE_SWAP_SIZE": self.op.swap_size,
2778
      "INSTANCE_ADD_MODE": self.op.mode,
2779
      }
2780
    if self.op.mode == constants.INSTANCE_IMPORT:
2781
      env["INSTANCE_SRC_NODE"] = self.op.src_node
2782
      env["INSTANCE_SRC_PATH"] = self.op.src_path
2783
      env["INSTANCE_SRC_IMAGE"] = self.src_image
2784

    
2785
    env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
2786
      primary_node=self.op.pnode,
2787
      secondary_nodes=self.secondaries,
2788
      status=self.instance_status,
2789
      os_type=self.op.os_type,
2790
      memory=self.op.mem_size,
2791
      vcpus=self.op.vcpus,
2792
      nics=[(self.inst_ip, self.op.bridge)],
2793
    ))
2794

    
2795
    nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
2796
          self.secondaries)
2797
    return env, nl, nl
2798

    
2799

    
2800
  def CheckPrereq(self):
2801
    """Check prerequisites.
2802

2803
    """
2804
    if self.op.mode not in (constants.INSTANCE_CREATE,
2805
                            constants.INSTANCE_IMPORT):
2806
      raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
2807
                                 self.op.mode)
2808

    
2809
    if self.op.mode == constants.INSTANCE_IMPORT:
2810
      src_node = getattr(self.op, "src_node", None)
2811
      src_path = getattr(self.op, "src_path", None)
2812
      if src_node is None or src_path is None:
2813
        raise errors.OpPrereqError("Importing an instance requires source"
2814
                                   " node and path options")
2815
      src_node_full = self.cfg.ExpandNodeName(src_node)
2816
      if src_node_full is None:
2817
        raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
2818
      self.op.src_node = src_node = src_node_full
2819

    
2820
      if not os.path.isabs(src_path):
2821
        raise errors.OpPrereqError("The source path must be absolute")
2822

    
2823
      export_info = rpc.call_export_info(src_node, src_path)
2824

    
2825
      if not export_info:
2826
        raise errors.OpPrereqError("No export found in dir %s" % src_path)
2827

    
2828
      if not export_info.has_section(constants.INISECT_EXP):
2829
        raise errors.ProgrammerError("Corrupted export config")
2830

    
2831
      ei_version = export_info.get(constants.INISECT_EXP, 'version')
2832
      if (int(ei_version) != constants.EXPORT_VERSION):
2833
        raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
2834
                                   (ei_version, constants.EXPORT_VERSION))
2835

    
2836
      if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
2837
        raise errors.OpPrereqError("Can't import instance with more than"
2838
                                   " one data disk")
2839

    
2840
      # FIXME: are the old os-es, disk sizes, etc. useful?
2841
      self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
2842
      diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
2843
                                                         'disk0_dump'))
2844
      self.src_image = diskimage
2845
    else: # INSTANCE_CREATE
2846
      if getattr(self.op, "os_type", None) is None:
2847
        raise errors.OpPrereqError("No guest OS specified")
2848

    
2849
    # check primary node
2850
    pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
2851
    if pnode is None:
2852
      raise errors.OpPrereqError("Primary node '%s' is unknown" %
2853
                                 self.op.pnode)
2854
    self.op.pnode = pnode.name
2855
    self.pnode = pnode
2856
    self.secondaries = []
2857
    # disk template and mirror node verification
2858
    if self.op.disk_template not in constants.DISK_TEMPLATES:
2859
      raise errors.OpPrereqError("Invalid disk template name")
2860

    
2861
    if self.op.disk_template in constants.DTS_NET_MIRROR:
2862
      if getattr(self.op, "snode", None) is None:
2863
        raise errors.OpPrereqError("The networked disk templates need"
2864
                                   " a mirror node")
2865

    
2866
      snode_name = self.cfg.ExpandNodeName(self.op.snode)
2867
      if snode_name is None:
2868
        raise errors.OpPrereqError("Unknown secondary node '%s'" %
2869
                                   self.op.snode)
2870
      elif snode_name == pnode.name:
2871
        raise errors.OpPrereqError("The secondary node cannot be"
2872
                                   " the primary node.")
2873
      self.secondaries.append(snode_name)
2874

    
2875
    # Check lv size requirements
2876
    nodenames = [pnode.name] + self.secondaries
2877
    nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
2878

    
2879
    # Required free disk space as a function of disk and swap space
2880
    req_size_dict = {
2881
      constants.DT_DISKLESS: 0,
2882
      constants.DT_PLAIN: self.op.disk_size + self.op.swap_size,
2883
      constants.DT_LOCAL_RAID1: (self.op.disk_size + self.op.swap_size) * 2,
2884
      # 256 MB are added for drbd metadata, 128MB for each drbd device
2885
      constants.DT_REMOTE_RAID1: self.op.disk_size + self.op.swap_size + 256,
2886
      constants.DT_DRBD8: self.op.disk_size + self.op.swap_size + 256,
2887
    }
2888

    
2889
    if self.op.disk_template not in req_size_dict:
2890
      raise errors.ProgrammerError("Disk template '%s' size requirement"
2891
                                   " is unknown" %  self.op.disk_template)
2892

    
2893
    req_size = req_size_dict[self.op.disk_template]
2894

    
2895
    for node in nodenames:
2896
      info = nodeinfo.get(node, None)
2897
      if not info:
2898
        raise errors.OpPrereqError("Cannot get current information"
2899
                                   " from node '%s'" % nodeinfo)
2900
      if req_size > info['vg_free']:
2901
        raise errors.OpPrereqError("Not enough disk space on target node %s."
2902
                                   " %d MB available, %d MB required" %
2903
                                   (node, info['vg_free'], req_size))
2904

    
2905
    # os verification
2906
    os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2907
    if not os_obj:
2908
      raise errors.OpPrereqError("OS '%s' not in supported os list for"
2909
                                 " primary node"  % self.op.os_type)
2910

    
2911
    # instance verification
2912
    hostname1 = utils.HostInfo(self.op.instance_name)
2913

    
2914
    self.op.instance_name = instance_name = hostname1.name
2915
    instance_list = self.cfg.GetInstanceList()
2916
    if instance_name in instance_list:
2917
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2918
                                 instance_name)
2919

    
2920
    ip = getattr(self.op, "ip", None)
2921
    if ip is None or ip.lower() == "none":
2922
      inst_ip = None
2923
    elif ip.lower() == "auto":
2924
      inst_ip = hostname1.ip
2925
    else:
2926
      if not utils.IsValidIP(ip):
2927
        raise errors.OpPrereqError("given IP address '%s' doesn't look"
2928
                                   " like a valid IP" % ip)
2929
      inst_ip = ip
2930
    self.inst_ip = inst_ip
2931

    
2932
    if self.op.start and not self.op.ip_check:
2933
      raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
2934
                                 " adding an instance in start mode")
2935

    
2936
    if self.op.ip_check:
2937
      if utils.TcpPing(utils.HostInfo().name, hostname1.ip,
2938
                       constants.DEFAULT_NODED_PORT):
2939
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
2940
                                   (hostname1.ip, instance_name))
2941

    
2942
    # bridge verification
2943
    bridge = getattr(self.op, "bridge", None)
2944
    if bridge is None:
2945
      self.op.bridge = self.cfg.GetDefBridge()
2946
    else:
2947
      self.op.bridge = bridge
2948

    
2949
    if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
2950
      raise errors.OpPrereqError("target bridge '%s' does not exist on"
2951
                                 " destination node '%s'" %
2952
                                 (self.op.bridge, pnode.name))
2953

    
2954
    if self.op.start:
2955
      self.instance_status = 'up'
2956
    else:
2957
      self.instance_status = 'down'
2958

    
2959
  def Exec(self, feedback_fn):
2960
    """Create and add the instance to the cluster.
2961

2962
    """
2963
    instance = self.op.instance_name
2964
    pnode_name = self.pnode.name
2965

    
2966
    nic = objects.NIC(bridge=self.op.bridge, mac=self.cfg.GenerateMAC())
2967
    if self.inst_ip is not None:
2968
      nic.ip = self.inst_ip
2969

    
2970
    disks = _GenerateDiskTemplate(self.cfg,
2971
                                  self.op.disk_template,
2972
                                  instance, pnode_name,
2973
                                  self.secondaries, self.op.disk_size,
2974
                                  self.op.swap_size)
2975

    
2976
    iobj = objects.Instance(name=instance, os=self.op.os_type,
2977
                            primary_node=pnode_name,
2978
                            memory=self.op.mem_size,
2979
                            vcpus=self.op.vcpus,
2980
                            nics=[nic], disks=disks,
2981
                            disk_template=self.op.disk_template,
2982
                            status=self.instance_status,
2983
                            )
2984

    
2985
    feedback_fn("* creating instance disks...")
2986
    if not _CreateDisks(self.cfg, iobj):
2987
      _RemoveDisks(iobj, self.cfg)
2988
      raise errors.OpExecError("Device creation failed, reverting...")
2989

    
2990
    feedback_fn("adding instance %s to cluster config" % instance)
2991

    
2992
    self.cfg.AddInstance(iobj)
2993

    
2994
    if self.op.wait_for_sync:
2995
      disk_abort = not _WaitForSync(self.cfg, iobj, self.proc)
2996
    elif iobj.disk_template in constants.DTS_NET_MIRROR:
2997
      # make sure the disks are not degraded (still sync-ing is ok)
2998
      time.sleep(15)
2999
      feedback_fn("* checking mirrors status")
3000
      disk_abort = not _WaitForSync(self.cfg, iobj, self.proc, oneshot=True)
3001
    else:
3002
      disk_abort = False
3003

    
3004
    if disk_abort:
3005
      _RemoveDisks(iobj, self.cfg)
3006
      self.cfg.RemoveInstance(iobj.name)
3007
      raise errors.OpExecError("There are some degraded disks for"
3008
                               " this instance")
3009

    
3010
    feedback_fn("creating os for instance %s on node %s" %
3011
                (instance, pnode_name))
3012

    
3013
    if iobj.disk_template != constants.DT_DISKLESS:
3014
      if self.op.mode == constants.INSTANCE_CREATE:
3015
        feedback_fn("* running the instance OS create scripts...")
3016
        if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3017
          raise errors.OpExecError("could not add os for instance %s"
3018
                                   " on node %s" %
3019
                                   (instance, pnode_name))
3020

    
3021
      elif self.op.mode == constants.INSTANCE_IMPORT:
3022
        feedback_fn("* running the instance OS import scripts...")
3023
        src_node = self.op.src_node
3024
        src_image = self.src_image
3025
        if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3026
                                                src_node, src_image):
3027
          raise errors.OpExecError("Could not import os for instance"
3028
                                   " %s on node %s" %
3029
                                   (instance, pnode_name))
3030
      else:
3031
        # also checked in the prereq part
3032
        raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3033
                                     % self.op.mode)
3034

    
3035
    if self.op.start:
3036
      logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3037
      feedback_fn("* starting instance...")
3038
      if not rpc.call_instance_start(pnode_name, iobj, None):
3039
        raise errors.OpExecError("Could not start instance")
3040

    
3041

    
3042
class LUConnectConsole(NoHooksLU):
3043
  """Connect to an instance's console.
3044

3045
  This is somewhat special in that it returns the command line that
3046
  you need to run on the master node in order to connect to the
3047
  console.
3048

3049
  """
3050
  _OP_REQP = ["instance_name"]
3051

    
3052
  def CheckPrereq(self):
3053
    """Check prerequisites.
3054

3055
    This checks that the instance is in the cluster.
3056

3057
    """
3058
    instance = self.cfg.GetInstanceInfo(
3059
      self.cfg.ExpandInstanceName(self.op.instance_name))
3060
    if instance is None:
3061
      raise errors.OpPrereqError("Instance '%s' not known" %
3062
                                 self.op.instance_name)
3063
    self.instance = instance
3064

    
3065
  def Exec(self, feedback_fn):
3066
    """Connect to the console of an instance
3067

3068
    """
3069
    instance = self.instance
3070
    node = instance.primary_node
3071

    
3072
    node_insts = rpc.call_instance_list([node])[node]
3073
    if node_insts is False:
3074
      raise errors.OpExecError("Can't connect to node %s." % node)
3075

    
3076
    if instance.name not in node_insts:
3077
      raise errors.OpExecError("Instance %s is not running." % instance.name)
3078

    
3079
    logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3080

    
3081
    hyper = hypervisor.GetHypervisor()
3082
    console_cmd = hyper.GetShellCommandForConsole(instance.name)
3083
    # build ssh cmdline
3084
    argv = ["ssh", "-q", "-t"]
3085
    argv.extend(ssh.KNOWN_HOSTS_OPTS)
3086
    argv.extend(ssh.BATCH_MODE_OPTS)
3087
    argv.append(node)
3088
    argv.append(console_cmd)
3089
    return "ssh", argv
3090

    
3091

    
3092
class LUAddMDDRBDComponent(LogicalUnit):
3093
  """Adda new mirror member to an instance's disk.
3094

3095
  """
3096
  HPATH = "mirror-add"
3097
  HTYPE = constants.HTYPE_INSTANCE
3098
  _OP_REQP = ["instance_name", "remote_node", "disk_name"]
3099

    
3100
  def BuildHooksEnv(self):
3101
    """Build hooks env.
3102

3103
    This runs on the master, the primary and all the secondaries.
3104

3105
    """
3106
    env = {
3107
      "NEW_SECONDARY": self.op.remote_node,
3108
      "DISK_NAME": self.op.disk_name,
3109
      }
3110
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3111
    nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
3112
          self.op.remote_node,] + list(self.instance.secondary_nodes)
3113
    return env, nl, nl
3114

    
3115
  def CheckPrereq(self):
3116
    """Check prerequisites.
3117

3118
    This checks that the instance is in the cluster.
3119

3120
    """
3121
    instance = self.cfg.GetInstanceInfo(
3122
      self.cfg.ExpandInstanceName(self.op.instance_name))
3123
    if instance is None:
3124
      raise errors.OpPrereqError("Instance '%s' not known" %
3125
                                 self.op.instance_name)
3126
    self.instance = instance
3127

    
3128
    remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
3129
    if remote_node is None:
3130
      raise errors.OpPrereqError("Node '%s' not known" % self.op.remote_node)
3131
    self.remote_node = remote_node
3132

    
3133
    if remote_node == instance.primary_node:
3134
      raise errors.OpPrereqError("The specified node is the primary node of"
3135
                                 " the instance.")
3136

    
3137
    if instance.disk_template != constants.DT_REMOTE_RAID1:
3138
      raise errors.OpPrereqError("Instance's disk layout is not"
3139
                                 " remote_raid1.")
3140
    for disk in instance.disks:
3141
      if disk.iv_name == self.op.disk_name:
3142
        break
3143
    else:
3144
      raise errors.OpPrereqError("Can't find this device ('%s') in the"
3145
                                 " instance." % self.op.disk_name)
3146
    if len(disk.children) > 1:
3147
      raise errors.OpPrereqError("The device already has two slave"
3148
                                 " devices.\n"
3149
                                 "This would create a 3-disk raid1"
3150
                                 " which we don't allow.")
3151
    self.disk = disk
3152

    
3153
  def Exec(self, feedback_fn):
3154
    """Add the mirror component
3155

3156
    """
3157
    disk = self.disk
3158
    instance = self.instance
3159

    
3160
    remote_node = self.remote_node
3161
    lv_names = [".%s_%s" % (disk.iv_name, suf) for suf in ["data", "meta"]]
3162
    names = _GenerateUniqueNames(self.cfg, lv_names)
3163
    new_drbd = _GenerateMDDRBDBranch(self.cfg, instance.primary_node,
3164
                                     remote_node, disk.size, names)
3165

    
3166
    logger.Info("adding new mirror component on secondary")
3167
    #HARDCODE
3168
    if not _CreateBlockDevOnSecondary(self.cfg, remote_node, instance,
3169
                                      new_drbd, False,
3170
                                      _GetInstanceInfoText(instance)):
3171
      raise errors.OpExecError("Failed to create new component on secondary"
3172
                               " node %s" % remote_node)
3173

    
3174
    logger.Info("adding new mirror component on primary")
3175
    #HARDCODE
3176
    if not _CreateBlockDevOnPrimary(self.cfg, instance.primary_node,
3177
                                    instance, new_drbd,
3178
                                    _GetInstanceInfoText(instance)):
3179
      # remove secondary dev
3180
      self.cfg.SetDiskID(new_drbd, remote_node)
3181
      rpc.call_blockdev_remove(remote_node, new_drbd)
3182
      raise errors.OpExecError("Failed to create volume on primary")
3183

    
3184
    # the device exists now
3185
    # call the primary node to add the mirror to md
3186
    logger.Info("adding new mirror component to md")
3187
    if not rpc.call_blockdev_addchildren(instance.primary_node,
3188
                                         disk, [new_drbd]):
3189
      logger.Error("Can't add mirror compoment to md!")
3190
      self.cfg.SetDiskID(new_drbd, remote_node)
3191
      if not rpc.call_blockdev_remove(remote_node, new_drbd):
3192
        logger.Error("Can't rollback on secondary")
3193
      self.cfg.SetDiskID(new_drbd, instance.primary_node)
3194
      if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3195
        logger.Error("Can't rollback on primary")
3196
      raise errors.OpExecError("Can't add mirror component to md array")
3197

    
3198
    disk.children.append(new_drbd)
3199

    
3200
    self.cfg.AddInstance(instance)
3201

    
3202
    _WaitForSync(self.cfg, instance, self.proc)
3203

    
3204
    return 0
3205

    
3206

    
3207
class LURemoveMDDRBDComponent(LogicalUnit):
3208
  """Remove a component from a remote_raid1 disk.
3209

3210
  """
3211
  HPATH = "mirror-remove"
3212
  HTYPE = constants.HTYPE_INSTANCE
3213
  _OP_REQP = ["instance_name", "disk_name", "disk_id"]
3214

    
3215
  def BuildHooksEnv(self):
3216
    """Build hooks env.
3217

3218
    This runs on the master, the primary and all the secondaries.
3219

3220
    """
3221
    env = {
3222
      "DISK_NAME": self.op.disk_name,
3223
      "DISK_ID": self.op.disk_id,
3224
      "OLD_SECONDARY": self.old_secondary,
3225
      }
3226
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3227
    nl = [self.sstore.GetMasterNode(),
3228
          self.instance.primary_node] + list(self.instance.secondary_nodes)
3229
    return env, nl, nl
3230

    
3231
  def CheckPrereq(self):
3232
    """Check prerequisites.
3233

3234
    This checks that the instance is in the cluster.
3235

3236
    """
3237
    instance = self.cfg.GetInstanceInfo(
3238
      self.cfg.ExpandInstanceName(self.op.instance_name))
3239
    if instance is None:
3240
      raise errors.OpPrereqError("Instance '%s' not known" %
3241
                                 self.op.instance_name)
3242
    self.instance = instance
3243

    
3244
    if instance.disk_template != constants.DT_REMOTE_RAID1:
3245
      raise errors.OpPrereqError("Instance's disk layout is not"
3246
                                 " remote_raid1.")
3247
    for disk in instance.disks:
3248
      if disk.iv_name == self.op.disk_name:
3249
        break
3250
    else:
3251
      raise errors.OpPrereqError("Can't find this device ('%s') in the"
3252
                                 " instance." % self.op.disk_name)
3253
    for child in disk.children:
3254
      if (child.dev_type == constants.LD_DRBD7 and
3255
          child.logical_id[2] == self.op.disk_id):
3256
        break
3257
    else:
3258
      raise errors.OpPrereqError("Can't find the device with this port.")
3259

    
3260
    if len(disk.children) < 2:
3261
      raise errors.OpPrereqError("Cannot remove the last component from"
3262
                                 " a mirror.")
3263
    self.disk = disk
3264
    self.child = child
3265
    if self.child.logical_id[0] == instance.primary_node:
3266
      oid = 1
3267
    else:
3268
      oid = 0
3269
    self.old_secondary = self.child.logical_id[oid]
3270

    
3271
  def Exec(self, feedback_fn):
3272
    """Remove the mirror component
3273

3274
    """
3275
    instance = self.instance
3276
    disk = self.disk
3277
    child = self.child
3278
    logger.Info("remove mirror component")
3279
    self.cfg.SetDiskID(disk, instance.primary_node)
3280
    if not rpc.call_blockdev_removechildren(instance.primary_node,
3281
                                            disk, [child]):
3282
      raise errors.OpExecError("Can't remove child from mirror.")
3283

    
3284
    for node in child.logical_id[:2]:
3285
      self.cfg.SetDiskID(child, node)
3286
      if not rpc.call_blockdev_remove(node, child):
3287
        logger.Error("Warning: failed to remove device from node %s,"
3288
                     " continuing operation." % node)
3289

    
3290
    disk.children.remove(child)
3291
    self.cfg.AddInstance(instance)
3292

    
3293

    
3294
class LUReplaceDisks(LogicalUnit):
3295
  """Replace the disks of an instance.
3296

3297
  """
3298
  HPATH = "mirrors-replace"
3299
  HTYPE = constants.HTYPE_INSTANCE
3300
  _OP_REQP = ["instance_name", "mode", "disks"]
3301

    
3302
  def BuildHooksEnv(self):
3303
    """Build hooks env.
3304

3305
    This runs on the master, the primary and all the secondaries.
3306

3307
    """
3308
    env = {
3309
      "MODE": self.op.mode,
3310
      "NEW_SECONDARY": self.op.remote_node,
3311
      "OLD_SECONDARY": self.instance.secondary_nodes[0],
3312
      }
3313
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3314
    nl = [
3315
      self.sstore.GetMasterNode(),
3316
      self.instance.primary_node,
3317
      ]
3318
    if self.op.remote_node is not None:
3319
      nl.append(self.op.remote_node)
3320
    return env, nl, nl
3321

    
3322
  def CheckPrereq(self):
3323
    """Check prerequisites.
3324

3325
    This checks that the instance is in the cluster.
3326

3327
    """
3328
    instance = self.cfg.GetInstanceInfo(
3329
      self.cfg.ExpandInstanceName(self.op.instance_name))
3330
    if instance is None:
3331
      raise errors.OpPrereqError("Instance '%s' not known" %
3332
                                 self.op.instance_name)
3333
    self.instance = instance
3334
    self.op.instance_name = instance.name
3335

    
3336
    if instance.disk_template not in constants.DTS_NET_MIRROR:
3337
      raise errors.OpPrereqError("Instance's disk layout is not"
3338
                                 " network mirrored.")
3339

    
3340
    if len(instance.secondary_nodes) != 1:
3341
      raise errors.OpPrereqError("The instance has a strange layout,"
3342
                                 " expected one secondary but found %d" %
3343
                                 len(instance.secondary_nodes))
3344

    
3345
    self.sec_node = instance.secondary_nodes[0]
3346

    
3347
    remote_node = getattr(self.op, "remote_node", None)
3348
    if remote_node is not None:
3349
      remote_node = self.cfg.ExpandNodeName(remote_node)
3350
      if remote_node is None:
3351
        raise errors.OpPrereqError("Node '%s' not known" %
3352
                                   self.op.remote_node)
3353
      self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3354
    else:
3355
      self.remote_node_info = None
3356
    if remote_node == instance.primary_node:
3357
      raise errors.OpPrereqError("The specified node is the primary node of"
3358
                                 " the instance.")
3359
    elif remote_node == self.sec_node:
3360
      if self.op.mode == constants.REPLACE_DISK_SEC:
3361
        # this is for DRBD8, where we can't execute the same mode of
3362
        # replacement as for drbd7 (no different port allocated)
3363
        raise errors.OpPrereqError("Same secondary given, cannot execute"
3364
                                   " replacement")
3365
      # the user gave the current secondary, switch to
3366
      # 'no-replace-secondary' mode for drbd7
3367
      remote_node = None
3368
    if (instance.disk_template == constants.DT_REMOTE_RAID1 and
3369
        self.op.mode != constants.REPLACE_DISK_ALL):
3370
      raise errors.OpPrereqError("Template 'remote_raid1' only allows all"
3371
                                 " disks replacement, not individual ones")
3372
    if instance.disk_template == constants.DT_DRBD8:
3373
      if (self.op.mode == constants.REPLACE_DISK_ALL and
3374
          remote_node is not None):
3375
        # switch to replace secondary mode
3376
        self.op.mode = constants.REPLACE_DISK_SEC
3377

    
3378
      if self.op.mode == constants.REPLACE_DISK_ALL:
3379
        raise errors.OpPrereqError("Template 'drbd8' only allows primary or"
3380
                                   " secondary disk replacement, not"
3381
                                   " both at once")
3382
      elif self.op.mode == constants.REPLACE_DISK_PRI:
3383
        if remote_node is not None:
3384
          raise errors.OpPrereqError("Template 'drbd8' does not allow changing"
3385
                                     " the secondary while doing a primary"
3386
                                     " node disk replacement")
3387
        self.tgt_node = instance.primary_node
3388
        self.oth_node = instance.secondary_nodes[0]
3389
      elif self.op.mode == constants.REPLACE_DISK_SEC:
3390
        self.new_node = remote_node # this can be None, in which case
3391
                                    # we don't change the secondary
3392
        self.tgt_node = instance.secondary_nodes[0]
3393
        self.oth_node = instance.primary_node
3394
      else:
3395
        raise errors.ProgrammerError("Unhandled disk replace mode")
3396

    
3397
    for name in self.op.disks:
3398
      if instance.FindDisk(name) is None:
3399
        raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3400
                                   (name, instance.name))
3401
    self.op.remote_node = remote_node
3402

    
3403
  def _ExecRR1(self, feedback_fn):
3404
    """Replace the disks of an instance.
3405

3406
    """
3407
    instance = self.instance
3408
    iv_names = {}
3409
    # start of work
3410
    if self.op.remote_node is None:
3411
      remote_node = self.sec_node
3412
    else:
3413
      remote_node = self.op.remote_node
3414
    cfg = self.cfg
3415
    for dev in instance.disks:
3416
      size = dev.size
3417
      lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3418
      names = _GenerateUniqueNames(cfg, lv_names)
3419
      new_drbd = _GenerateMDDRBDBranch(cfg, instance.primary_node,
3420
                                       remote_node, size, names)
3421
      iv_names[dev.iv_name] = (dev, dev.children[0], new_drbd)
3422
      logger.Info("adding new mirror component on secondary for %s" %
3423
                  dev.iv_name)
3424
      #HARDCODE
3425
      if not _CreateBlockDevOnSecondary(cfg, remote_node, instance,
3426
                                        new_drbd, False,
3427
                                        _GetInstanceInfoText(instance)):
3428
        raise errors.OpExecError("Failed to create new component on"
3429
                                 " secondary node %s\n"
3430
                                 "Full abort, cleanup manually!" %
3431
                                 remote_node)
3432

    
3433
      logger.Info("adding new mirror component on primary")
3434
      #HARDCODE
3435
      if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
3436
                                      instance, new_drbd,
3437
                                      _GetInstanceInfoText(instance)):
3438
        # remove secondary dev
3439
        cfg.SetDiskID(new_drbd, remote_node)
3440
        rpc.call_blockdev_remove(remote_node, new_drbd)
3441
        raise errors.OpExecError("Failed to create volume on primary!\n"
3442
                                 "Full abort, cleanup manually!!")
3443

    
3444
      # the device exists now
3445
      # call the primary node to add the mirror to md
3446
      logger.Info("adding new mirror component to md")
3447
      if not rpc.call_blockdev_addchildren(instance.primary_node, dev,
3448
                                           [new_drbd]):
3449
        logger.Error("Can't add mirror compoment to md!")
3450
        cfg.SetDiskID(new_drbd, remote_node)
3451
        if not rpc.call_blockdev_remove(remote_node, new_drbd):
3452
          logger.Error("Can't rollback on secondary")
3453
        cfg.SetDiskID(new_drbd, instance.primary_node)
3454
        if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3455
          logger.Error("Can't rollback on primary")
3456
        raise errors.OpExecError("Full abort, cleanup manually!!")
3457

    
3458
      dev.children.append(new_drbd)
3459
      cfg.AddInstance(instance)
3460

    
3461
    # this can fail as the old devices are degraded and _WaitForSync
3462
    # does a combined result over all disks, so we don't check its
3463
    # return value
3464
    _WaitForSync(cfg, instance, self.proc, unlock=True)
3465

    
3466
    # so check manually all the devices
3467
    for name in iv_names:
3468
      dev, child, new_drbd = iv_names[name]
3469
      cfg.SetDiskID(dev, instance.primary_node)
3470
      is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3471
      if is_degr:
3472
        raise errors.OpExecError("MD device %s is degraded!" % name)
3473
      cfg.SetDiskID(new_drbd, instance.primary_node)
3474
      is_degr = rpc.call_blockdev_find(instance.primary_node, new_drbd)[5]
3475
      if is_degr:
3476
        raise errors.OpExecError("New drbd device %s is degraded!" % name)
3477

    
3478
    for name in iv_names:
3479
      dev, child, new_drbd = iv_names[name]
3480
      logger.Info("remove mirror %s component" % name)
3481
      cfg.SetDiskID(dev, instance.primary_node)
3482
      if not rpc.call_blockdev_removechildren(instance.primary_node,
3483
                                              dev, [child]):
3484
        logger.Error("Can't remove child from mirror, aborting"
3485
                     " *this device cleanup*.\nYou need to cleanup manually!!")
3486
        continue
3487

    
3488
      for node in child.logical_id[:2]:
3489
        logger.Info("remove child device on %s" % node)
3490
        cfg.SetDiskID(child, node)
3491
        if not rpc.call_blockdev_remove(node, child):
3492
          logger.Error("Warning: failed to remove device from node %s,"
3493
                       " continuing operation." % node)
3494

    
3495
      dev.children.remove(child)
3496

    
3497
      cfg.AddInstance(instance)
3498

    
3499
  def _ExecD8DiskOnly(self, feedback_fn):
3500
    """Replace a disk on the primary or secondary for dbrd8.
3501

3502
    The algorithm for replace is quite complicated:
3503
      - for each disk to be replaced:
3504
        - create new LVs on the target node with unique names
3505
        - detach old LVs from the drbd device
3506
        - rename old LVs to name_replaced.<time_t>
3507
        - rename new LVs to old LVs
3508
        - attach the new LVs (with the old names now) to the drbd device
3509
      - wait for sync across all devices
3510
      - for each modified disk:
3511
        - remove old LVs (which have the name name_replaces.<time_t>)
3512

3513
    Failures are not very well handled.
3514

3515
    """
3516
    steps_total = 6
3517
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3518
    instance = self.instance
3519
    iv_names = {}
3520
    vgname = self.cfg.GetVGName()
3521
    # start of work
3522
    cfg = self.cfg
3523
    tgt_node = self.tgt_node
3524
    oth_node = self.oth_node
3525

    
3526
    # Step: check device activation
3527
    self.proc.LogStep(1, steps_total, "check device existence")
3528
    info("checking volume groups")
3529
    my_vg = cfg.GetVGName()
3530
    results = rpc.call_vg_list([oth_node, tgt_node])
3531
    if not results:
3532
      raise errors.OpExecError("Can't list volume groups on the nodes")
3533
    for node in oth_node, tgt_node:
3534
      res = results.get(node, False)
3535
      if not res or my_vg not in res:
3536
        raise errors.OpExecError("Volume group '%s' not found on %s" %
3537
                                 (my_vg, node))
3538
    for dev in instance.disks:
3539
      if not dev.iv_name in self.op.disks:
3540
        continue
3541
      for node in tgt_node, oth_node:
3542
        info("checking %s on %s" % (dev.iv_name, node))
3543
        cfg.SetDiskID(dev, node)
3544
        if not rpc.call_blockdev_find(node, dev):
3545
          raise errors.OpExecError("Can't find device %s on node %s" %
3546
                                   (dev.iv_name, node))
3547

    
3548
    # Step: check other node consistency
3549
    self.proc.LogStep(2, steps_total, "check peer consistency")
3550
    for dev in instance.disks:
3551
      if not dev.iv_name in self.op.disks:
3552
        continue
3553
      info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3554
      if not _CheckDiskConsistency(self.cfg, dev, oth_node,
3555
                                   oth_node==instance.primary_node):
3556
        raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3557
                                 " to replace disks on this node (%s)" %
3558
                                 (oth_node, tgt_node))
3559

    
3560
    # Step: create new storage
3561
    self.proc.LogStep(3, steps_total, "allocate new storage")
3562
    for dev in instance.disks:
3563
      if not dev.iv_name in self.op.disks:
3564
        continue
3565
      size = dev.size
3566
      cfg.SetDiskID(dev, tgt_node)
3567
      lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3568
      names = _GenerateUniqueNames(cfg, lv_names)
3569
      lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3570
                             logical_id=(vgname, names[0]))
3571
      lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3572
                             logical_id=(vgname, names[1]))
3573
      new_lvs = [lv_data, lv_meta]
3574
      old_lvs = dev.children
3575
      iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3576
      info("creating new local storage on %s for %s" %
3577
           (tgt_node, dev.iv_name))
3578
      # since we *always* want to create this LV, we use the
3579
      # _Create...OnPrimary (which forces the creation), even if we
3580
      # are talking about the secondary node
3581
      for new_lv in new_lvs:
3582
        if not _CreateBlockDevOnPrimary(cfg, tgt_node, instance, new_lv,
3583
                                        _GetInstanceInfoText(instance)):
3584
          raise errors.OpExecError("Failed to create new LV named '%s' on"
3585
                                   " node '%s'" %
3586
                                   (new_lv.logical_id[1], tgt_node))
3587

    
3588
    # Step: for each lv, detach+rename*2+attach
3589
    self.proc.LogStep(4, steps_total, "change drbd configuration")
3590
    for dev, old_lvs, new_lvs in iv_names.itervalues():
3591
      info("detaching %s drbd from local storage" % dev.iv_name)
3592
      if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
3593
        raise errors.OpExecError("Can't detach drbd from local storage on node"
3594
                                 " %s for device %s" % (tgt_node, dev.iv_name))
3595
      #dev.children = []
3596
      #cfg.Update(instance)
3597

    
3598
      # ok, we created the new LVs, so now we know we have the needed
3599
      # storage; as such, we proceed on the target node to rename
3600
      # old_lv to _old, and new_lv to old_lv; note that we rename LVs
3601
      # using the assumption than logical_id == physical_id (which in
3602
      # turn is the unique_id on that node)
3603

    
3604
      # FIXME(iustin): use a better name for the replaced LVs
3605
      temp_suffix = int(time.time())
3606
      ren_fn = lambda d, suff: (d.physical_id[0],
3607
                                d.physical_id[1] + "_replaced-%s" % suff)
3608
      # build the rename list based on what LVs exist on the node
3609
      rlist = []
3610
      for to_ren in old_lvs:
3611
        find_res = rpc.call_blockdev_find(tgt_node, to_ren)
3612
        if find_res is not None: # device exists
3613
          rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
3614

    
3615
      info("renaming the old LVs on the target node")
3616
      if not rpc.call_blockdev_rename(tgt_node, rlist):
3617
        raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
3618
      # now we rename the new LVs to the old LVs
3619
      info("renaming the new LVs on the target node")
3620
      rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
3621
      if not rpc.call_blockdev_rename(tgt_node, rlist):
3622
        raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
3623

    
3624
      for old, new in zip(old_lvs, new_lvs):
3625
        new.logical_id = old.logical_id
3626
        cfg.SetDiskID(new, tgt_node)
3627

    
3628
      for disk in old_lvs:
3629
        disk.logical_id = ren_fn(disk, temp_suffix)
3630
        cfg.SetDiskID(disk, tgt_node)
3631

    
3632
      # now that the new lvs have the old name, we can add them to the device
3633
      info("adding new mirror component on %s" % tgt_node)
3634
      if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
3635
        for new_lv in new_lvs:
3636
          if not rpc.call_blockdev_remove(tgt_node, new_lv):
3637
            warning("Can't rollback device %s", "manually cleanup unused"
3638
                    " logical volumes")
3639
        raise errors.OpExecError("Can't add local storage to drbd")
3640

    
3641
      dev.children = new_lvs
3642
      cfg.Update(instance)
3643

    
3644
    # Step: wait for sync
3645

    
3646
    # this can fail as the old devices are degraded and _WaitForSync
3647
    # does a combined result over all disks, so we don't check its
3648
    # return value
3649
    self.proc.LogStep(5, steps_total, "sync devices")
3650
    _WaitForSync(cfg, instance, self.proc, unlock=True)
3651

    
3652
    # so check manually all the devices
3653
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3654
      cfg.SetDiskID(dev, instance.primary_node)
3655
      is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3656
      if is_degr:
3657
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
3658

    
3659
    # Step: remove old storage
3660
    self.proc.LogStep(6, steps_total, "removing old storage")
3661
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3662
      info("remove logical volumes for %s" % name)
3663
      for lv in old_lvs:
3664
        cfg.SetDiskID(lv, tgt_node)
3665
        if not rpc.call_blockdev_remove(tgt_node, lv):
3666
          warning("Can't remove old LV", "manually remove unused LVs")
3667
          continue
3668

    
3669
  def _ExecD8Secondary(self, feedback_fn):
3670
    """Replace the secondary node for drbd8.
3671

3672
    The algorithm for replace is quite complicated:
3673
      - for all disks of the instance:
3674
        - create new LVs on the new node with same names
3675
        - shutdown the drbd device on the old secondary
3676
        - disconnect the drbd network on the primary
3677
        - create the drbd device on the new secondary
3678
        - network attach the drbd on the primary, using an artifice:
3679
          the drbd code for Attach() will connect to the network if it
3680
          finds a device which is connected to the good local disks but
3681
          not network enabled
3682
      - wait for sync across all devices
3683
      - remove all disks from the old secondary
3684

3685
    Failures are not very well handled.
3686

3687
    """
3688
    steps_total = 6
3689
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3690
    instance = self.instance
3691
    iv_names = {}
3692
    vgname = self.cfg.GetVGName()
3693
    # start of work
3694
    cfg = self.cfg
3695
    old_node = self.tgt_node
3696
    new_node = self.new_node
3697
    pri_node = instance.primary_node
3698

    
3699
    # Step: check device activation
3700
    self.proc.LogStep(1, steps_total, "check device existence")
3701
    info("checking volume groups")
3702
    my_vg = cfg.GetVGName()
3703
    results = rpc.call_vg_list([pri_node, new_node])
3704
    if not results:
3705
      raise errors.OpExecError("Can't list volume groups on the nodes")
3706
    for node in pri_node, new_node:
3707
      res = results.get(node, False)
3708
      if not res or my_vg not in res:
3709
        raise errors.OpExecError("Volume group '%s' not found on %s" %
3710
                                 (my_vg, node))
3711
    for dev in instance.disks:
3712
      if not dev.iv_name in self.op.disks:
3713
        continue
3714
      info("checking %s on %s" % (dev.iv_name, pri_node))
3715
      cfg.SetDiskID(dev, pri_node)
3716
      if not rpc.call_blockdev_find(pri_node, dev):
3717
        raise errors.OpExecError("Can't find device %s on node %s" %
3718
                                 (dev.iv_name, pri_node))
3719

    
3720
    # Step: check other node consistency
3721
    self.proc.LogStep(2, steps_total, "check peer consistency")
3722
    for dev in instance.disks:
3723
      if not dev.iv_name in self.op.disks:
3724
        continue
3725
      info("checking %s consistency on %s" % (dev.iv_name, pri_node))
3726
      if not _CheckDiskConsistency(self.cfg, dev, pri_node, True, ldisk=True):
3727
        raise errors.OpExecError("Primary node (%s) has degraded storage,"
3728
                                 " unsafe to replace the secondary" %
3729
                                 pri_node)
3730

    
3731
    # Step: create new storage
3732
    self.proc.LogStep(3, steps_total, "allocate new storage")
3733
    for dev in instance.disks:
3734
      size = dev.size
3735
      info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
3736
      # since we *always* want to create this LV, we use the
3737
      # _Create...OnPrimary (which forces the creation), even if we
3738
      # are talking about the secondary node
3739
      for new_lv in dev.children:
3740
        if not _CreateBlockDevOnPrimary(cfg, new_node, instance, new_lv,
3741
                                        _GetInstanceInfoText(instance)):
3742
          raise errors.OpExecError("Failed to create new LV named '%s' on"
3743
                                   " node '%s'" %
3744
                                   (new_lv.logical_id[1], new_node))
3745

    
3746
      iv_names[dev.iv_name] = (dev, dev.children)
3747

    
3748
    self.proc.LogStep(4, steps_total, "changing drbd configuration")
3749
    for dev in instance.disks:
3750
      size = dev.size
3751
      info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
3752
      # create new devices on new_node
3753
      new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
3754
                              logical_id=(pri_node, new_node,
3755
                                          dev.logical_id[2]),
3756
                              children=dev.children)
3757
      if not _CreateBlockDevOnSecondary(cfg, new_node, instance,
3758
                                        new_drbd, False,
3759
                                      _GetInstanceInfoText(instance)):
3760
        raise errors.OpExecError("Failed to create new DRBD on"
3761
                                 " node '%s'" % new_node)
3762

    
3763
    for dev in instance.disks:
3764
      # we have new devices, shutdown the drbd on the old secondary
3765
      info("shutting down drbd for %s on old node" % dev.iv_name)
3766
      cfg.SetDiskID(dev, old_node)
3767
      if not rpc.call_blockdev_shutdown(old_node, dev):
3768
        warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
3769
                "Please cleanup this device manuall as soon as possible")
3770

    
3771
      # we have new storage, we 'rename' the network on the primary
3772
      info("switching primary drbd for %s to new secondary node" % dev.iv_name)
3773
      cfg.SetDiskID(dev, pri_node)
3774
      # rename to the ip of the new node
3775
      new_uid = list(dev.physical_id)
3776
      new_uid[2] = self.remote_node_info.secondary_ip
3777
      rlist = [(dev, tuple(new_uid))]
3778
      if not rpc.call_blockdev_rename(pri_node, rlist):
3779
        raise errors.OpExecError("Can't detach & re-attach drbd %s on node"
3780
                                 " %s from %s to %s" %
3781
                                 (dev.iv_name, pri_node, old_node, new_node))
3782
      dev.logical_id = (pri_node, new_node, dev.logical_id[2])
3783
      cfg.SetDiskID(dev, pri_node)
3784
      cfg.Update(instance)
3785

    
3786

    
3787
    # this can fail as the old devices are degraded and _WaitForSync
3788
    # does a combined result over all disks, so we don't check its
3789
    # return value
3790
    self.proc.LogStep(5, steps_total, "sync devices")
3791
    _WaitForSync(cfg, instance, self.proc, unlock=True)
3792

    
3793
    # so check manually all the devices
3794
    for name, (dev, old_lvs) in iv_names.iteritems():
3795
      cfg.SetDiskID(dev, pri_node)
3796
      is_degr = rpc.call_blockdev_find(pri_node, dev)[5]
3797
      if is_degr:
3798
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
3799

    
3800
    self.proc.LogStep(6, steps_total, "removing old storage")
3801
    for name, (dev, old_lvs) in iv_names.iteritems():
3802
      info("remove logical volumes for %s" % name)
3803
      for lv in old_lvs:
3804
        cfg.SetDiskID(lv, old_node)
3805
        if not rpc.call_blockdev_remove(old_node, lv):
3806
          warning("Can't remove LV on old secondary",
3807
                  "Cleanup stale volumes by hand")
3808

    
3809
  def Exec(self, feedback_fn):
3810
    """Execute disk replacement.
3811

3812
    This dispatches the disk replacement to the appropriate handler.
3813

3814
    """
3815
    instance = self.instance
3816
    if instance.disk_template == constants.DT_REMOTE_RAID1:
3817
      fn = self._ExecRR1
3818
    elif instance.disk_template == constants.DT_DRBD8:
3819
      if self.op.remote_node is None:
3820
        fn = self._ExecD8DiskOnly
3821
      else:
3822
        fn = self._ExecD8Secondary
3823
    else:
3824
      raise errors.ProgrammerError("Unhandled disk replacement case")
3825
    return fn(feedback_fn)
3826

    
3827

    
3828
class LUQueryInstanceData(NoHooksLU):
3829
  """Query runtime instance data.
3830

3831
  """
3832
  _OP_REQP = ["instances"]
3833

    
3834
  def CheckPrereq(self):
3835
    """Check prerequisites.
3836

3837
    This only checks the optional instance list against the existing names.
3838

3839
    """
3840
    if not isinstance(self.op.instances, list):
3841
      raise errors.OpPrereqError("Invalid argument type 'instances'")
3842
    if self.op.instances:
3843
      self.wanted_instances = []
3844
      names = self.op.instances
3845
      for name in names:
3846
        instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
3847
        if instance is None:
3848
          raise errors.OpPrereqError("No such instance name '%s'" % name)
3849
      self.wanted_instances.append(instance)
3850
    else:
3851
      self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
3852
                               in self.cfg.GetInstanceList()]
3853
    return
3854

    
3855

    
3856
  def _ComputeDiskStatus(self, instance, snode, dev):
3857
    """Compute block device status.
3858

3859
    """
3860
    self.cfg.SetDiskID(dev, instance.primary_node)
3861
    dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
3862
    if dev.dev_type in constants.LDS_DRBD:
3863
      # we change the snode then (otherwise we use the one passed in)
3864
      if dev.logical_id[0] == instance.primary_node:
3865
        snode = dev.logical_id[1]
3866
      else:
3867
        snode = dev.logical_id[0]
3868

    
3869
    if snode:
3870
      self.cfg.SetDiskID(dev, snode)
3871
      dev_sstatus = rpc.call_blockdev_find(snode, dev)
3872
    else:
3873
      dev_sstatus = None
3874

    
3875
    if dev.children:
3876
      dev_children = [self._ComputeDiskStatus(instance, snode, child)
3877
                      for child in dev.children]
3878
    else:
3879
      dev_children = []
3880

    
3881
    data = {
3882
      "iv_name": dev.iv_name,
3883
      "dev_type": dev.dev_type,
3884
      "logical_id": dev.logical_id,
3885
      "physical_id": dev.physical_id,
3886
      "pstatus": dev_pstatus,
3887
      "sstatus": dev_sstatus,
3888
      "children": dev_children,
3889
      }
3890

    
3891
    return data
3892

    
3893
  def Exec(self, feedback_fn):
3894
    """Gather and return data"""
3895
    result = {}
3896
    for instance in self.wanted_instances:
3897
      remote_info = rpc.call_instance_info(instance.primary_node,
3898
                                                instance.name)
3899
      if remote_info and "state" in remote_info:
3900
        remote_state = "up"
3901
      else:
3902
        remote_state = "down"
3903
      if instance.status == "down":
3904
        config_state = "down"
3905
      else:
3906
        config_state = "up"
3907

    
3908
      disks = [self._ComputeDiskStatus(instance, None, device)
3909
               for device in instance.disks]
3910

    
3911
      idict = {
3912
        "name": instance.name,
3913
        "config_state": config_state,
3914
        "run_state": remote_state,
3915
        "pnode": instance.primary_node,
3916
        "snodes": instance.secondary_nodes,
3917
        "os": instance.os,
3918
        "memory": instance.memory,
3919
        "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
3920
        "disks": disks,
3921
        "vcpus": instance.vcpus,
3922
        }
3923

    
3924
      result[instance.name] = idict
3925

    
3926
    return result
3927

    
3928

    
3929
class LUSetInstanceParms(LogicalUnit):
3930
  """Modifies an instances's parameters.
3931

3932
  """
3933
  HPATH = "instance-modify"
3934
  HTYPE = constants.HTYPE_INSTANCE
3935
  _OP_REQP = ["instance_name"]
3936

    
3937
  def BuildHooksEnv(self):
3938
    """Build hooks env.
3939

3940
    This runs on the master, primary and secondaries.
3941

3942
    """
3943
    args = dict()
3944
    if self.mem:
3945
      args['memory'] = self.mem
3946
    if self.vcpus:
3947
      args['vcpus'] = self.vcpus
3948
    if self.do_ip or self.do_bridge:
3949
      if self.do_ip:
3950
        ip = self.ip
3951
      else:
3952
        ip = self.instance.nics[0].ip
3953
      if self.bridge:
3954
        bridge = self.bridge
3955
      else:
3956
        bridge = self.instance.nics[0].bridge
3957
      args['nics'] = [(ip, bridge)]
3958
    env = _BuildInstanceHookEnvByObject(self.instance, override=args)
3959
    nl = [self.sstore.GetMasterNode(),
3960
          self.instance.primary_node] + list(self.instance.secondary_nodes)
3961
    return env, nl, nl
3962

    
3963
  def CheckPrereq(self):
3964
    """Check prerequisites.
3965

3966
    This only checks the instance list against the existing names.
3967

3968
    """
3969
    self.mem = getattr(self.op, "mem", None)
3970
    self.vcpus = getattr(self.op, "vcpus", None)
3971
    self.ip = getattr(self.op, "ip", None)
3972
    self.bridge = getattr(self.op, "bridge", None)
3973
    if [self.mem, self.vcpus, self.ip, self.bridge].count(None) == 4:
3974
      raise errors.OpPrereqError("No changes submitted")
3975
    if self.mem is not None:
3976
      try:
3977
        self.mem = int(self.mem)
3978
      except ValueError, err:
3979
        raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
3980
    if self.vcpus is not None:
3981
      try:
3982
        self.vcpus = int(self.vcpus)
3983
      except ValueError, err:
3984
        raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
3985
    if self.ip is not None:
3986
      self.do_ip = True
3987
      if self.ip.lower() == "none":
3988
        self.ip = None
3989
      else:
3990
        if not utils.IsValidIP(self.ip):
3991
          raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
3992
    else:
3993
      self.do_ip = False
3994
    self.do_bridge = (self.bridge is not None)
3995

    
3996
    instance = self.cfg.GetInstanceInfo(
3997
      self.cfg.ExpandInstanceName(self.op.instance_name))
3998
    if instance is None:
3999
      raise errors.OpPrereqError("No such instance name '%s'" %
4000
                                 self.op.instance_name)
4001
    self.op.instance_name = instance.name
4002
    self.instance = instance
4003
    return
4004

    
4005
  def Exec(self, feedback_fn):
4006
    """Modifies an instance.
4007

4008
    All parameters take effect only at the next restart of the instance.
4009
    """
4010
    result = []
4011
    instance = self.instance
4012
    if self.mem:
4013
      instance.memory = self.mem
4014
      result.append(("mem", self.mem))
4015
    if self.vcpus:
4016
      instance.vcpus = self.vcpus
4017
      result.append(("vcpus",  self.vcpus))
4018
    if self.do_ip:
4019
      instance.nics[0].ip = self.ip
4020
      result.append(("ip", self.ip))
4021
    if self.bridge:
4022
      instance.nics[0].bridge = self.bridge
4023
      result.append(("bridge", self.bridge))
4024

    
4025
    self.cfg.AddInstance(instance)
4026

    
4027
    return result
4028

    
4029

    
4030
class LUQueryExports(NoHooksLU):
4031
  """Query the exports list
4032

4033
  """
4034
  _OP_REQP = []
4035

    
4036
  def CheckPrereq(self):
4037
    """Check that the nodelist contains only existing nodes.
4038

4039
    """
4040
    self.nodes = _GetWantedNodes(self, getattr(self.op, "nodes", None))
4041

    
4042
  def Exec(self, feedback_fn):
4043
    """Compute the list of all the exported system images.
4044

4045
    Returns:
4046
      a dictionary with the structure node->(export-list)
4047
      where export-list is a list of the instances exported on
4048
      that node.
4049

4050
    """
4051
    return rpc.call_export_list(self.nodes)
4052

    
4053

    
4054
class LUExportInstance(LogicalUnit):
4055
  """Export an instance to an image in the cluster.
4056

4057
  """
4058
  HPATH = "instance-export"
4059
  HTYPE = constants.HTYPE_INSTANCE
4060
  _OP_REQP = ["instance_name", "target_node", "shutdown"]
4061

    
4062
  def BuildHooksEnv(self):
4063
    """Build hooks env.
4064

4065
    This will run on the master, primary node and target node.
4066

4067
    """
4068
    env = {
4069
      "EXPORT_NODE": self.op.target_node,
4070
      "EXPORT_DO_SHUTDOWN": self.op.shutdown,
4071
      }
4072
    env.update(_BuildInstanceHookEnvByObject(self.instance))
4073
    nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
4074
          self.op.target_node]
4075
    return env, nl, nl
4076

    
4077
  def CheckPrereq(self):
4078
    """Check prerequisites.
4079

4080
    This checks that the instance name is a valid one.
4081

4082
    """
4083
    instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4084
    self.instance = self.cfg.GetInstanceInfo(instance_name)
4085
    if self.instance is None:
4086
      raise errors.OpPrereqError("Instance '%s' not found" %
4087
                                 self.op.instance_name)
4088

    
4089
    # node verification
4090
    dst_node_short = self.cfg.ExpandNodeName(self.op.target_node)
4091
    self.dst_node = self.cfg.GetNodeInfo(dst_node_short)
4092

    
4093
    if self.dst_node is None:
4094
      raise errors.OpPrereqError("Destination node '%s' is unknown." %
4095
                                 self.op.target_node)
4096
    self.op.target_node = self.dst_node.name
4097

    
4098
  def Exec(self, feedback_fn):
4099
    """Export an instance to an image in the cluster.
4100

4101
    """
4102
    instance = self.instance
4103
    dst_node = self.dst_node
4104
    src_node = instance.primary_node
4105
    # shutdown the instance, unless requested not to do so
4106
    if self.op.shutdown:
4107
      op = opcodes.OpShutdownInstance(instance_name=instance.name)
4108
      self.proc.ChainOpCode(op)
4109

    
4110
    vgname = self.cfg.GetVGName()
4111

    
4112
    snap_disks = []
4113

    
4114
    try:
4115
      for disk in instance.disks:
4116
        if disk.iv_name == "sda":
4117
          # new_dev_name will be a snapshot of an lvm leaf of the one we passed
4118
          new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
4119

    
4120
          if not new_dev_name:
4121
            logger.Error("could not snapshot block device %s on node %s" %
4122
                         (disk.logical_id[1], src_node))
4123
          else:
4124
            new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
4125
                                      logical_id=(vgname, new_dev_name),
4126
                                      physical_id=(vgname, new_dev_name),
4127
                                      iv_name=disk.iv_name)
4128
            snap_disks.append(new_dev)
4129

    
4130
    finally:
4131
      if self.op.shutdown:
4132
        op = opcodes.OpStartupInstance(instance_name=instance.name,
4133
                                       force=False)
4134
        self.proc.ChainOpCode(op)
4135

    
4136
    # TODO: check for size
4137

    
4138
    for dev in snap_disks:
4139
      if not rpc.call_snapshot_export(src_node, dev, dst_node.name,
4140
                                           instance):
4141
        logger.Error("could not export block device %s from node"
4142
                     " %s to node %s" %
4143
                     (dev.logical_id[1], src_node, dst_node.name))
4144
      if not rpc.call_blockdev_remove(src_node, dev):
4145
        logger.Error("could not remove snapshot block device %s from"
4146
                     " node %s" % (dev.logical_id[1], src_node))
4147

    
4148
    if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
4149
      logger.Error("could not finalize export for instance %s on node %s" %
4150
                   (instance.name, dst_node.name))
4151

    
4152
    nodelist = self.cfg.GetNodeList()
4153
    nodelist.remove(dst_node.name)
4154

    
4155
    # on one-node clusters nodelist will be empty after the removal
4156
    # if we proceed the backup would be removed because OpQueryExports
4157
    # substitutes an empty list with the full cluster node list.
4158
    if nodelist:
4159
      op = opcodes.OpQueryExports(nodes=nodelist)
4160
      exportlist = self.proc.ChainOpCode(op)
4161
      for node in exportlist:
4162
        if instance.name in exportlist[node]:
4163
          if not rpc.call_export_remove(node, instance.name):
4164
            logger.Error("could not remove older export for instance %s"
4165
                         " on node %s" % (instance.name, node))
4166

    
4167

    
4168
class TagsLU(NoHooksLU):
4169
  """Generic tags LU.
4170

4171
  This is an abstract class which is the parent of all the other tags LUs.
4172

4173
  """
4174
  def CheckPrereq(self):
4175
    """Check prerequisites.
4176

4177
    """
4178
    if self.op.kind == constants.TAG_CLUSTER:
4179
      self.target = self.cfg.GetClusterInfo()
4180
    elif self.op.kind == constants.TAG_NODE:
4181
      name = self.cfg.ExpandNodeName(self.op.name)
4182
      if name is None:
4183
        raise errors.OpPrereqError("Invalid node name (%s)" %
4184
                                   (self.op.name,))
4185
      self.op.name = name
4186
      self.target = self.cfg.GetNodeInfo(name)
4187
    elif self.op.kind == constants.TAG_INSTANCE:
4188
      name = self.cfg.ExpandInstanceName(self.op.name)
4189
      if name is None:
4190
        raise errors.OpPrereqError("Invalid instance name (%s)" %
4191
                                   (self.op.name,))
4192
      self.op.name = name
4193
      self.target = self.cfg.GetInstanceInfo(name)
4194
    else:
4195
      raise errors.OpPrereqError("Wrong tag type requested (%s)" %
4196
                                 str(self.op.kind))
4197

    
4198

    
4199
class LUGetTags(TagsLU):
4200
  """Returns the tags of a given object.
4201

4202
  """
4203
  _OP_REQP = ["kind", "name"]
4204

    
4205
  def Exec(self, feedback_fn):
4206
    """Returns the tag list.
4207

4208
    """
4209
    return self.target.GetTags()
4210

    
4211

    
4212
class LUSearchTags(NoHooksLU):
4213
  """Searches the tags for a given pattern.
4214

4215
  """
4216
  _OP_REQP = ["pattern"]
4217

    
4218
  def CheckPrereq(self):
4219
    """Check prerequisites.
4220

4221
    This checks the pattern passed for validity by compiling it.
4222

4223
    """
4224
    try:
4225
      self.re = re.compile(self.op.pattern)
4226
    except re.error, err:
4227
      raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
4228
                                 (self.op.pattern, err))
4229

    
4230
  def Exec(self, feedback_fn):
4231
    """Returns the tag list.
4232

4233
    """
4234
    cfg = self.cfg
4235
    tgts = [("/cluster", cfg.GetClusterInfo())]
4236
    ilist = [cfg.GetInstanceInfo(name) for name in cfg.GetInstanceList()]
4237
    tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
4238
    nlist = [cfg.GetNodeInfo(name) for name in cfg.GetNodeList()]
4239
    tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
4240
    results = []
4241
    for path, target in tgts:
4242
      for tag in target.GetTags():
4243
        if self.re.search(tag):
4244
          results.append((path, tag))
4245
    return results
4246

    
4247

    
4248
class LUAddTags(TagsLU):
4249
  """Sets a tag on a given object.
4250

4251
  """
4252
  _OP_REQP = ["kind", "name", "tags"]
4253

    
4254
  def CheckPrereq(self):
4255
    """Check prerequisites.
4256

4257
    This checks the type and length of the tag name and value.
4258

4259
    """
4260
    TagsLU.CheckPrereq(self)
4261
    for tag in self.op.tags:
4262
      objects.TaggableObject.ValidateTag(tag)
4263

    
4264
  def Exec(self, feedback_fn):
4265
    """Sets the tag.
4266

4267
    """
4268
    try:
4269
      for tag in self.op.tags:
4270
        self.target.AddTag(tag)
4271
    except errors.TagError, err:
4272
      raise errors.OpExecError("Error while setting tag: %s" % str(err))
4273
    try:
4274
      self.cfg.Update(self.target)
4275
    except errors.ConfigurationError:
4276
      raise errors.OpRetryError("There has been a modification to the"
4277
                                " config file and the operation has been"
4278
                                " aborted. Please retry.")
4279

    
4280

    
4281
class LUDelTags(TagsLU):
4282
  """Delete a list of tags from a given object.
4283

4284
  """
4285
  _OP_REQP = ["kind", "name", "tags"]
4286

    
4287
  def CheckPrereq(self):
4288
    """Check prerequisites.
4289

4290
    This checks that we have the given tag.
4291

4292
    """
4293
    TagsLU.CheckPrereq(self)
4294
    for tag in self.op.tags:
4295
      objects.TaggableObject.ValidateTag(tag)
4296
    del_tags = frozenset(self.op.tags)
4297
    cur_tags = self.target.GetTags()
4298
    if not del_tags <= cur_tags:
4299
      diff_tags = del_tags - cur_tags
4300
      diff_names = ["'%s'" % tag for tag in diff_tags]
4301
      diff_names.sort()
4302
      raise errors.OpPrereqError("Tag(s) %s not found" %
4303
                                 (",".join(diff_names)))
4304

    
4305
  def Exec(self, feedback_fn):
4306
    """Remove the tag from the object.
4307

4308
    """
4309
    for tag in self.op.tags:
4310
      self.target.RemoveTag(tag)
4311
    try:
4312
      self.cfg.Update(self.target)
4313
    except errors.ConfigurationError:
4314
      raise errors.OpRetryError("There has been a modification to the"
4315
                                " config file and the operation has been"
4316
                                " aborted. Please retry.")