Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ 16abfbc2

History | View | Annotate | Download (120.8 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0613,W0201
25

    
26
import os
27
import os.path
28
import sha
29
import socket
30
import time
31
import tempfile
32
import re
33
import platform
34

    
35
from ganeti import rpc
36
from ganeti import ssh
37
from ganeti import logger
38
from ganeti import utils
39
from ganeti import errors
40
from ganeti import hypervisor
41
from ganeti import config
42
from ganeti import constants
43
from ganeti import objects
44
from ganeti import opcodes
45
from ganeti import ssconf
46

    
47
class LogicalUnit(object):
48
  """Logical Unit base class.
49

50
  Subclasses must follow these rules:
51
    - implement CheckPrereq which also fills in the opcode instance
52
      with all the fields (even if as None)
53
    - implement Exec
54
    - implement BuildHooksEnv
55
    - redefine HPATH and HTYPE
56
    - optionally redefine their run requirements (REQ_CLUSTER,
57
      REQ_MASTER); note that all commands require root permissions
58

59
  """
60
  HPATH = None
61
  HTYPE = None
62
  _OP_REQP = []
63
  REQ_CLUSTER = True
64
  REQ_MASTER = True
65

    
66
  def __init__(self, processor, op, cfg, sstore):
67
    """Constructor for LogicalUnit.
68

69
    This needs to be overriden in derived classes in order to check op
70
    validity.
71

72
    """
73
    self.processor = processor
74
    self.op = op
75
    self.cfg = cfg
76
    self.sstore = sstore
77
    for attr_name in self._OP_REQP:
78
      attr_val = getattr(op, attr_name, None)
79
      if attr_val is None:
80
        raise errors.OpPrereqError("Required parameter '%s' missing" %
81
                                   attr_name)
82
    if self.REQ_CLUSTER:
83
      if not cfg.IsCluster():
84
        raise errors.OpPrereqError("Cluster not initialized yet,"
85
                                   " use 'gnt-cluster init' first.")
86
      if self.REQ_MASTER:
87
        master = sstore.GetMasterNode()
88
        if master != utils.HostInfo().name:
89
          raise errors.OpPrereqError("Commands must be run on the master"
90
                                     " node %s" % master)
91

    
92
  def CheckPrereq(self):
93
    """Check prerequisites for this LU.
94

95
    This method should check that the prerequisites for the execution
96
    of this LU are fulfilled. It can do internode communication, but
97
    it should be idempotent - no cluster or system changes are
98
    allowed.
99

100
    The method should raise errors.OpPrereqError in case something is
101
    not fulfilled. Its return value is ignored.
102

103
    This method should also update all the parameters of the opcode to
104
    their canonical form; e.g. a short node name must be fully
105
    expanded after this method has successfully completed (so that
106
    hooks, logging, etc. work correctly).
107

108
    """
109
    raise NotImplementedError
110

    
111
  def Exec(self, feedback_fn):
112
    """Execute the LU.
113

114
    This method should implement the actual work. It should raise
115
    errors.OpExecError for failures that are somewhat dealt with in
116
    code, or expected.
117

118
    """
119
    raise NotImplementedError
120

    
121
  def BuildHooksEnv(self):
122
    """Build hooks environment for this LU.
123

124
    This method should return a three-node tuple consisting of: a dict
125
    containing the environment that will be used for running the
126
    specific hook for this LU, a list of node names on which the hook
127
    should run before the execution, and a list of node names on which
128
    the hook should run after the execution.
129

130
    The keys of the dict must not have 'GANETI_' prefixed as this will
131
    be handled in the hooks runner. Also note additional keys will be
132
    added by the hooks runner. If the LU doesn't define any
133
    environment, an empty dict (and not None) should be returned.
134

135
    As for the node lists, the master should not be included in the
136
    them, as it will be added by the hooks runner in case this LU
137
    requires a cluster to run on (otherwise we don't have a node
138
    list). No nodes should be returned as an empty list (and not
139
    None).
140

141
    Note that if the HPATH for a LU class is None, this function will
142
    not be called.
143

144
    """
145
    raise NotImplementedError
146

    
147

    
148
class NoHooksLU(LogicalUnit):
149
  """Simple LU which runs no hooks.
150

151
  This LU is intended as a parent for other LogicalUnits which will
152
  run no hooks, in order to reduce duplicate code.
153

154
  """
155
  HPATH = None
156
  HTYPE = None
157

    
158
  def BuildHooksEnv(self):
159
    """Build hooks env.
160

161
    This is a no-op, since we don't run hooks.
162

163
    """
164
    return
165

    
166

    
167
def _GetWantedNodes(lu, nodes):
168
  """Returns list of checked and expanded node names.
169

170
  Args:
171
    nodes: List of nodes (strings) or None for all
172

173
  """
174
  if not isinstance(nodes, list):
175
    raise errors.OpPrereqError("Invalid argument type 'nodes'")
176

    
177
  if nodes:
178
    wanted = []
179

    
180
    for name in nodes:
181
      node = lu.cfg.ExpandNodeName(name)
182
      if node is None:
183
        raise errors.OpPrereqError("No such node name '%s'" % name)
184
      wanted.append(node)
185

    
186
  else:
187
    wanted = lu.cfg.GetNodeList()
188
  return utils.NiceSort(wanted)
189

    
190

    
191
def _GetWantedInstances(lu, instances):
192
  """Returns list of checked and expanded instance names.
193

194
  Args:
195
    instances: List of instances (strings) or None for all
196

197
  """
198
  if not isinstance(instances, list):
199
    raise errors.OpPrereqError("Invalid argument type 'instances'")
200

    
201
  if instances:
202
    wanted = []
203

    
204
    for name in instances:
205
      instance = lu.cfg.ExpandInstanceName(name)
206
      if instance is None:
207
        raise errors.OpPrereqError("No such instance name '%s'" % name)
208
      wanted.append(instance)
209

    
210
  else:
211
    wanted = lu.cfg.GetInstanceList()
212
  return utils.NiceSort(wanted)
213

    
214

    
215
def _CheckOutputFields(static, dynamic, selected):
216
  """Checks whether all selected fields are valid.
217

218
  Args:
219
    static: Static fields
220
    dynamic: Dynamic fields
221

222
  """
223
  static_fields = frozenset(static)
224
  dynamic_fields = frozenset(dynamic)
225

    
226
  all_fields = static_fields | dynamic_fields
227

    
228
  if not all_fields.issuperset(selected):
229
    raise errors.OpPrereqError("Unknown output fields selected: %s"
230
                               % ",".join(frozenset(selected).
231
                                          difference(all_fields)))
232

    
233

    
234
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
235
                          memory, vcpus, nics):
236
  """Builds instance related env variables for hooks from single variables.
237

238
  Args:
239
    secondary_nodes: List of secondary nodes as strings
240
  """
241
  env = {
242
    "INSTANCE_NAME": name,
243
    "INSTANCE_PRIMARY": primary_node,
244
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
245
    "INSTANCE_OS_TYPE": os_type,
246
    "INSTANCE_STATUS": status,
247
    "INSTANCE_MEMORY": memory,
248
    "INSTANCE_VCPUS": vcpus,
249
  }
250

    
251
  if nics:
252
    nic_count = len(nics)
253
    for idx, (ip, bridge) in enumerate(nics):
254
      if ip is None:
255
        ip = ""
256
      env["INSTANCE_NIC%d_IP" % idx] = ip
257
      env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
258
  else:
259
    nic_count = 0
260

    
261
  env["INSTANCE_NIC_COUNT"] = nic_count
262

    
263
  return env
264

    
265

    
266
def _BuildInstanceHookEnvByObject(instance, override=None):
267
  """Builds instance related env variables for hooks from an object.
268

269
  Args:
270
    instance: objects.Instance object of instance
271
    override: dict of values to override
272
  """
273
  args = {
274
    'name': instance.name,
275
    'primary_node': instance.primary_node,
276
    'secondary_nodes': instance.secondary_nodes,
277
    'os_type': instance.os,
278
    'status': instance.os,
279
    'memory': instance.memory,
280
    'vcpus': instance.vcpus,
281
    'nics': [(nic.ip, nic.bridge) for nic in instance.nics],
282
  }
283
  if override:
284
    args.update(override)
285
  return _BuildInstanceHookEnv(**args)
286

    
287

    
288
def _UpdateEtcHosts(fullnode, ip):
289
  """Ensure a node has a correct entry in /etc/hosts.
290

291
  Args:
292
    fullnode - Fully qualified domain name of host. (str)
293
    ip       - IPv4 address of host (str)
294

295
  """
296
  node = fullnode.split(".", 1)[0]
297

    
298
  f = open('/etc/hosts', 'r+')
299

    
300
  inthere = False
301

    
302
  save_lines = []
303
  add_lines = []
304
  removed = False
305

    
306
  while True:
307
    rawline = f.readline()
308

    
309
    if not rawline:
310
      # End of file
311
      break
312

    
313
    line = rawline.split('\n')[0]
314

    
315
    # Strip off comments
316
    line = line.split('#')[0]
317

    
318
    if not line:
319
      # Entire line was comment, skip
320
      save_lines.append(rawline)
321
      continue
322

    
323
    fields = line.split()
324

    
325
    haveall = True
326
    havesome = False
327
    for spec in [ ip, fullnode, node ]:
328
      if spec not in fields:
329
        haveall = False
330
      if spec in fields:
331
        havesome = True
332

    
333
    if haveall:
334
      inthere = True
335
      save_lines.append(rawline)
336
      continue
337

    
338
    if havesome and not haveall:
339
      # Line (old, or manual?) which is missing some.  Remove.
340
      removed = True
341
      continue
342

    
343
    save_lines.append(rawline)
344

    
345
  if not inthere:
346
    add_lines.append('%s\t%s %s\n' % (ip, fullnode, node))
347

    
348
  if removed:
349
    if add_lines:
350
      save_lines = save_lines + add_lines
351

    
352
    # We removed a line, write a new file and replace old.
353
    fd, tmpname = tempfile.mkstemp('tmp', 'hosts_', '/etc')
354
    newfile = os.fdopen(fd, 'w')
355
    newfile.write(''.join(save_lines))
356
    newfile.close()
357
    os.rename(tmpname, '/etc/hosts')
358

    
359
  elif add_lines:
360
    # Simply appending a new line will do the trick.
361
    f.seek(0, 2)
362
    for add in add_lines:
363
      f.write(add)
364

    
365
  f.close()
366

    
367

    
368
def _UpdateKnownHosts(fullnode, ip, pubkey):
369
  """Ensure a node has a correct known_hosts entry.
370

371
  Args:
372
    fullnode - Fully qualified domain name of host. (str)
373
    ip       - IPv4 address of host (str)
374
    pubkey   - the public key of the cluster
375

376
  """
377
  if os.path.exists(constants.SSH_KNOWN_HOSTS_FILE):
378
    f = open(constants.SSH_KNOWN_HOSTS_FILE, 'r+')
379
  else:
380
    f = open(constants.SSH_KNOWN_HOSTS_FILE, 'w+')
381

    
382
  inthere = False
383

    
384
  save_lines = []
385
  add_lines = []
386
  removed = False
387

    
388
  while True:
389
    rawline = f.readline()
390
    logger.Debug('read %s' % (repr(rawline),))
391

    
392
    if not rawline:
393
      # End of file
394
      break
395

    
396
    line = rawline.split('\n')[0]
397

    
398
    parts = line.split(' ')
399
    fields = parts[0].split(',')
400
    key = parts[2]
401

    
402
    haveall = True
403
    havesome = False
404
    for spec in [ ip, fullnode ]:
405
      if spec not in fields:
406
        haveall = False
407
      if spec in fields:
408
        havesome = True
409

    
410
    logger.Debug("key, pubkey = %s." % (repr((key, pubkey)),))
411
    if haveall and key == pubkey:
412
      inthere = True
413
      save_lines.append(rawline)
414
      logger.Debug("Keeping known_hosts '%s'." % (repr(rawline),))
415
      continue
416

    
417
    if havesome and (not haveall or key != pubkey):
418
      removed = True
419
      logger.Debug("Discarding known_hosts '%s'." % (repr(rawline),))
420
      continue
421

    
422
    save_lines.append(rawline)
423

    
424
  if not inthere:
425
    add_lines.append('%s,%s ssh-rsa %s\n' % (fullnode, ip, pubkey))
426
    logger.Debug("Adding known_hosts '%s'." % (repr(add_lines[-1]),))
427

    
428
  if removed:
429
    save_lines = save_lines + add_lines
430

    
431
    # Write a new file and replace old.
432
    fd, tmpname = tempfile.mkstemp('.tmp', 'known_hosts.',
433
                                   constants.DATA_DIR)
434
    newfile = os.fdopen(fd, 'w')
435
    try:
436
      newfile.write(''.join(save_lines))
437
    finally:
438
      newfile.close()
439
    logger.Debug("Wrote new known_hosts.")
440
    os.rename(tmpname, constants.SSH_KNOWN_HOSTS_FILE)
441

    
442
  elif add_lines:
443
    # Simply appending a new line will do the trick.
444
    f.seek(0, 2)
445
    for add in add_lines:
446
      f.write(add)
447

    
448
  f.close()
449

    
450

    
451
def _HasValidVG(vglist, vgname):
452
  """Checks if the volume group list is valid.
453

454
  A non-None return value means there's an error, and the return value
455
  is the error message.
456

457
  """
458
  vgsize = vglist.get(vgname, None)
459
  if vgsize is None:
460
    return "volume group '%s' missing" % vgname
461
  elif vgsize < 20480:
462
    return ("volume group '%s' too small (20480MiB required, %dMib found)" %
463
            (vgname, vgsize))
464
  return None
465

    
466

    
467
def _InitSSHSetup(node):
468
  """Setup the SSH configuration for the cluster.
469

470

471
  This generates a dsa keypair for root, adds the pub key to the
472
  permitted hosts and adds the hostkey to its own known hosts.
473

474
  Args:
475
    node: the name of this host as a fqdn
476

477
  """
478
  if os.path.exists('/root/.ssh/id_dsa'):
479
    utils.CreateBackup('/root/.ssh/id_dsa')
480
  if os.path.exists('/root/.ssh/id_dsa.pub'):
481
    utils.CreateBackup('/root/.ssh/id_dsa.pub')
482

    
483
  utils.RemoveFile('/root/.ssh/id_dsa')
484
  utils.RemoveFile('/root/.ssh/id_dsa.pub')
485

    
486
  result = utils.RunCmd(["ssh-keygen", "-t", "dsa",
487
                         "-f", "/root/.ssh/id_dsa",
488
                         "-q", "-N", ""])
489
  if result.failed:
490
    raise errors.OpExecError("Could not generate ssh keypair, error %s" %
491
                             result.output)
492

    
493
  f = open('/root/.ssh/id_dsa.pub', 'r')
494
  try:
495
    utils.AddAuthorizedKey('/root/.ssh/authorized_keys', f.read(8192))
496
  finally:
497
    f.close()
498

    
499

    
500
def _InitGanetiServerSetup(ss):
501
  """Setup the necessary configuration for the initial node daemon.
502

503
  This creates the nodepass file containing the shared password for
504
  the cluster and also generates the SSL certificate.
505

506
  """
507
  # Create pseudo random password
508
  randpass = sha.new(os.urandom(64)).hexdigest()
509
  # and write it into sstore
510
  ss.SetKey(ss.SS_NODED_PASS, randpass)
511

    
512
  result = utils.RunCmd(["openssl", "req", "-new", "-newkey", "rsa:1024",
513
                         "-days", str(365*5), "-nodes", "-x509",
514
                         "-keyout", constants.SSL_CERT_FILE,
515
                         "-out", constants.SSL_CERT_FILE, "-batch"])
516
  if result.failed:
517
    raise errors.OpExecError("could not generate server ssl cert, command"
518
                             " %s had exitcode %s and error message %s" %
519
                             (result.cmd, result.exit_code, result.output))
520

    
521
  os.chmod(constants.SSL_CERT_FILE, 0400)
522

    
523
  result = utils.RunCmd([constants.NODE_INITD_SCRIPT, "restart"])
524

    
525
  if result.failed:
526
    raise errors.OpExecError("Could not start the node daemon, command %s"
527
                             " had exitcode %s and error %s" %
528
                             (result.cmd, result.exit_code, result.output))
529

    
530

    
531
class LUInitCluster(LogicalUnit):
532
  """Initialise the cluster.
533

534
  """
535
  HPATH = "cluster-init"
536
  HTYPE = constants.HTYPE_CLUSTER
537
  _OP_REQP = ["cluster_name", "hypervisor_type", "vg_name", "mac_prefix",
538
              "def_bridge", "master_netdev"]
539
  REQ_CLUSTER = False
540

    
541
  def BuildHooksEnv(self):
542
    """Build hooks env.
543

544
    Notes: Since we don't require a cluster, we must manually add
545
    ourselves in the post-run node list.
546

547
    """
548
    env = {
549
      "CLUSTER": self.op.cluster_name,
550
      "MASTER": self.hostname.name,
551
      }
552
    return env, [], [self.hostname.name]
553

    
554
  def CheckPrereq(self):
555
    """Verify that the passed name is a valid one.
556

557
    """
558
    if config.ConfigWriter.IsCluster():
559
      raise errors.OpPrereqError("Cluster is already initialised")
560

    
561
    self.hostname = hostname = utils.HostInfo()
562

    
563
    if hostname.ip.startswith("127."):
564
      raise errors.OpPrereqError("This host's IP resolves to the private"
565
                                 " range (%s). Please fix DNS or /etc/hosts." %
566
                                 (hostname.ip,))
567

    
568
    self.clustername = clustername = utils.HostInfo(self.op.cluster_name)
569

    
570
    if not utils.TcpPing(constants.LOCALHOST_IP_ADDRESS, hostname.ip,
571
                         constants.DEFAULT_NODED_PORT):
572
      raise errors.OpPrereqError("Inconsistency: this host's name resolves"
573
                                 " to %s,\nbut this ip address does not"
574
                                 " belong to this host."
575
                                 " Aborting." % hostname.ip)
576

    
577
    secondary_ip = getattr(self.op, "secondary_ip", None)
578
    if secondary_ip and not utils.IsValidIP(secondary_ip):
579
      raise errors.OpPrereqError("Invalid secondary ip given")
580
    if (secondary_ip and
581
        secondary_ip != hostname.ip and
582
        (not utils.TcpPing(constants.LOCALHOST_IP_ADDRESS, secondary_ip,
583
                           constants.DEFAULT_NODED_PORT))):
584
      raise errors.OpPrereqError("You gave %s as secondary IP,\n"
585
                                 "but it does not belong to this host." %
586
                                 secondary_ip)
587
    self.secondary_ip = secondary_ip
588

    
589
    # checks presence of the volume group given
590
    vgstatus = _HasValidVG(utils.ListVolumeGroups(), self.op.vg_name)
591

    
592
    if vgstatus:
593
      raise errors.OpPrereqError("Error: %s" % vgstatus)
594

    
595
    if not re.match("^[0-9a-z]{2}:[0-9a-z]{2}:[0-9a-z]{2}$",
596
                    self.op.mac_prefix):
597
      raise errors.OpPrereqError("Invalid mac prefix given '%s'" %
598
                                 self.op.mac_prefix)
599

    
600
    if self.op.hypervisor_type not in hypervisor.VALID_HTYPES:
601
      raise errors.OpPrereqError("Invalid hypervisor type given '%s'" %
602
                                 self.op.hypervisor_type)
603

    
604
    result = utils.RunCmd(["ip", "link", "show", "dev", self.op.master_netdev])
605
    if result.failed:
606
      raise errors.OpPrereqError("Invalid master netdev given (%s): '%s'" %
607
                                 (self.op.master_netdev,
608
                                  result.output.strip()))
609

    
610
  def Exec(self, feedback_fn):
611
    """Initialize the cluster.
612

613
    """
614
    clustername = self.clustername
615
    hostname = self.hostname
616

    
617
    # set up the simple store
618
    ss = ssconf.SimpleStore()
619
    ss.SetKey(ss.SS_HYPERVISOR, self.op.hypervisor_type)
620
    ss.SetKey(ss.SS_MASTER_NODE, hostname.name)
621
    ss.SetKey(ss.SS_MASTER_IP, clustername.ip)
622
    ss.SetKey(ss.SS_MASTER_NETDEV, self.op.master_netdev)
623
    ss.SetKey(ss.SS_CLUSTER_NAME, clustername.name)
624

    
625
    # set up the inter-node password and certificate
626
    _InitGanetiServerSetup(ss)
627

    
628
    # start the master ip
629
    rpc.call_node_start_master(hostname.name)
630

    
631
    # set up ssh config and /etc/hosts
632
    f = open('/etc/ssh/ssh_host_rsa_key.pub', 'r')
633
    try:
634
      sshline = f.read()
635
    finally:
636
      f.close()
637
    sshkey = sshline.split(" ")[1]
638

    
639
    _UpdateEtcHosts(hostname.name, hostname.ip)
640

    
641
    _UpdateKnownHosts(hostname.name, hostname.ip, sshkey)
642

    
643
    _InitSSHSetup(hostname.name)
644

    
645
    # init of cluster config file
646
    cfgw = config.ConfigWriter()
647
    cfgw.InitConfig(hostname.name, hostname.ip, self.secondary_ip,
648
                    sshkey, self.op.mac_prefix,
649
                    self.op.vg_name, self.op.def_bridge)
650

    
651

    
652
class LUDestroyCluster(NoHooksLU):
653
  """Logical unit for destroying the cluster.
654

655
  """
656
  _OP_REQP = []
657

    
658
  def CheckPrereq(self):
659
    """Check prerequisites.
660

661
    This checks whether the cluster is empty.
662

663
    Any errors are signalled by raising errors.OpPrereqError.
664

665
    """
666
    master = self.sstore.GetMasterNode()
667

    
668
    nodelist = self.cfg.GetNodeList()
669
    if len(nodelist) != 1 or nodelist[0] != master:
670
      raise errors.OpPrereqError("There are still %d node(s) in"
671
                                 " this cluster." % (len(nodelist) - 1))
672
    instancelist = self.cfg.GetInstanceList()
673
    if instancelist:
674
      raise errors.OpPrereqError("There are still %d instance(s) in"
675
                                 " this cluster." % len(instancelist))
676

    
677
  def Exec(self, feedback_fn):
678
    """Destroys the cluster.
679

680
    """
681
    utils.CreateBackup('/root/.ssh/id_dsa')
682
    utils.CreateBackup('/root/.ssh/id_dsa.pub')
683
    rpc.call_node_leave_cluster(self.sstore.GetMasterNode())
684

    
685

    
686
class LUVerifyCluster(NoHooksLU):
687
  """Verifies the cluster status.
688

689
  """
690
  _OP_REQP = []
691

    
692
  def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
693
                  remote_version, feedback_fn):
694
    """Run multiple tests against a node.
695

696
    Test list:
697
      - compares ganeti version
698
      - checks vg existance and size > 20G
699
      - checks config file checksum
700
      - checks ssh to other nodes
701

702
    Args:
703
      node: name of the node to check
704
      file_list: required list of files
705
      local_cksum: dictionary of local files and their checksums
706

707
    """
708
    # compares ganeti version
709
    local_version = constants.PROTOCOL_VERSION
710
    if not remote_version:
711
      feedback_fn(" - ERROR: connection to %s failed" % (node))
712
      return True
713

    
714
    if local_version != remote_version:
715
      feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
716
                      (local_version, node, remote_version))
717
      return True
718

    
719
    # checks vg existance and size > 20G
720

    
721
    bad = False
722
    if not vglist:
723
      feedback_fn("  - ERROR: unable to check volume groups on node %s." %
724
                      (node,))
725
      bad = True
726
    else:
727
      vgstatus = _HasValidVG(vglist, self.cfg.GetVGName())
728
      if vgstatus:
729
        feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
730
        bad = True
731

    
732
    # checks config file checksum
733
    # checks ssh to any
734

    
735
    if 'filelist' not in node_result:
736
      bad = True
737
      feedback_fn("  - ERROR: node hasn't returned file checksum data")
738
    else:
739
      remote_cksum = node_result['filelist']
740
      for file_name in file_list:
741
        if file_name not in remote_cksum:
742
          bad = True
743
          feedback_fn("  - ERROR: file '%s' missing" % file_name)
744
        elif remote_cksum[file_name] != local_cksum[file_name]:
745
          bad = True
746
          feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
747

    
748
    if 'nodelist' not in node_result:
749
      bad = True
750
      feedback_fn("  - ERROR: node hasn't returned node connectivity data")
751
    else:
752
      if node_result['nodelist']:
753
        bad = True
754
        for node in node_result['nodelist']:
755
          feedback_fn("  - ERROR: communication with node '%s': %s" %
756
                          (node, node_result['nodelist'][node]))
757
    hyp_result = node_result.get('hypervisor', None)
758
    if hyp_result is not None:
759
      feedback_fn("  - ERROR: hypervisor verify failure: '%s'" % hyp_result)
760
    return bad
761

    
762
  def _VerifyInstance(self, instance, node_vol_is, node_instance, feedback_fn):
763
    """Verify an instance.
764

765
    This function checks to see if the required block devices are
766
    available on the instance's node.
767

768
    """
769
    bad = False
770

    
771
    instancelist = self.cfg.GetInstanceList()
772
    if not instance in instancelist:
773
      feedback_fn("  - ERROR: instance %s not in instance list %s" %
774
                      (instance, instancelist))
775
      bad = True
776

    
777
    instanceconfig = self.cfg.GetInstanceInfo(instance)
778
    node_current = instanceconfig.primary_node
779

    
780
    node_vol_should = {}
781
    instanceconfig.MapLVsByNode(node_vol_should)
782

    
783
    for node in node_vol_should:
784
      for volume in node_vol_should[node]:
785
        if node not in node_vol_is or volume not in node_vol_is[node]:
786
          feedback_fn("  - ERROR: volume %s missing on node %s" %
787
                          (volume, node))
788
          bad = True
789

    
790
    if not instanceconfig.status == 'down':
791
      if not instance in node_instance[node_current]:
792
        feedback_fn("  - ERROR: instance %s not running on node %s" %
793
                        (instance, node_current))
794
        bad = True
795

    
796
    for node in node_instance:
797
      if (not node == node_current):
798
        if instance in node_instance[node]:
799
          feedback_fn("  - ERROR: instance %s should not run on node %s" %
800
                          (instance, node))
801
          bad = True
802

    
803
    return not bad
804

    
805
  def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
806
    """Verify if there are any unknown volumes in the cluster.
807

808
    The .os, .swap and backup volumes are ignored. All other volumes are
809
    reported as unknown.
810

811
    """
812
    bad = False
813

    
814
    for node in node_vol_is:
815
      for volume in node_vol_is[node]:
816
        if node not in node_vol_should or volume not in node_vol_should[node]:
817
          feedback_fn("  - ERROR: volume %s on node %s should not exist" %
818
                      (volume, node))
819
          bad = True
820
    return bad
821

    
822
  def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
823
    """Verify the list of running instances.
824

825
    This checks what instances are running but unknown to the cluster.
826

827
    """
828
    bad = False
829
    for node in node_instance:
830
      for runninginstance in node_instance[node]:
831
        if runninginstance not in instancelist:
832
          feedback_fn("  - ERROR: instance %s on node %s should not exist" %
833
                          (runninginstance, node))
834
          bad = True
835
    return bad
836

    
837
  def CheckPrereq(self):
838
    """Check prerequisites.
839

840
    This has no prerequisites.
841

842
    """
843
    pass
844

    
845
  def Exec(self, feedback_fn):
846
    """Verify integrity of cluster, performing various test on nodes.
847

848
    """
849
    bad = False
850
    feedback_fn("* Verifying global settings")
851
    self.cfg.VerifyConfig()
852

    
853
    master = self.sstore.GetMasterNode()
854
    vg_name = self.cfg.GetVGName()
855
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
856
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
857
    node_volume = {}
858
    node_instance = {}
859

    
860
    # FIXME: verify OS list
861
    # do local checksums
862
    file_names = list(self.sstore.GetFileList())
863
    file_names.append(constants.SSL_CERT_FILE)
864
    file_names.append(constants.CLUSTER_CONF_FILE)
865
    local_checksums = utils.FingerprintFiles(file_names)
866

    
867
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
868
    all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
869
    all_instanceinfo = rpc.call_instance_list(nodelist)
870
    all_vglist = rpc.call_vg_list(nodelist)
871
    node_verify_param = {
872
      'filelist': file_names,
873
      'nodelist': nodelist,
874
      'hypervisor': None,
875
      }
876
    all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
877
    all_rversion = rpc.call_version(nodelist)
878

    
879
    for node in nodelist:
880
      feedback_fn("* Verifying node %s" % node)
881
      result = self._VerifyNode(node, file_names, local_checksums,
882
                                all_vglist[node], all_nvinfo[node],
883
                                all_rversion[node], feedback_fn)
884
      bad = bad or result
885

    
886
      # node_volume
887
      volumeinfo = all_volumeinfo[node]
888

    
889
      if type(volumeinfo) != dict:
890
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
891
        bad = True
892
        continue
893

    
894
      node_volume[node] = volumeinfo
895

    
896
      # node_instance
897
      nodeinstance = all_instanceinfo[node]
898
      if type(nodeinstance) != list:
899
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
900
        bad = True
901
        continue
902

    
903
      node_instance[node] = nodeinstance
904

    
905
    node_vol_should = {}
906

    
907
    for instance in instancelist:
908
      feedback_fn("* Verifying instance %s" % instance)
909
      result =  self._VerifyInstance(instance, node_volume, node_instance,
910
                                     feedback_fn)
911
      bad = bad or result
912

    
913
      inst_config = self.cfg.GetInstanceInfo(instance)
914

    
915
      inst_config.MapLVsByNode(node_vol_should)
916

    
917
    feedback_fn("* Verifying orphan volumes")
918
    result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
919
                                       feedback_fn)
920
    bad = bad or result
921

    
922
    feedback_fn("* Verifying remaining instances")
923
    result = self._VerifyOrphanInstances(instancelist, node_instance,
924
                                         feedback_fn)
925
    bad = bad or result
926

    
927
    return int(bad)
928

    
929

    
930
class LURenameCluster(LogicalUnit):
931
  """Rename the cluster.
932

933
  """
934
  HPATH = "cluster-rename"
935
  HTYPE = constants.HTYPE_CLUSTER
936
  _OP_REQP = ["name"]
937

    
938
  def BuildHooksEnv(self):
939
    """Build hooks env.
940

941
    """
942
    env = {
943
      "NEW_NAME": self.op.name,
944
      }
945
    mn = self.sstore.GetMasterNode()
946
    return env, [mn], [mn]
947

    
948
  def CheckPrereq(self):
949
    """Verify that the passed name is a valid one.
950

951
    """
952
    hostname = utils.HostInfo(self.op.name)
953

    
954
    new_name = hostname.name
955
    self.ip = new_ip = hostname.ip
956
    old_name = self.sstore.GetClusterName()
957
    old_ip = self.sstore.GetMasterIP()
958
    if new_name == old_name and new_ip == old_ip:
959
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
960
                                 " cluster has changed")
961
    if new_ip != old_ip:
962
      result = utils.RunCmd(["fping", "-q", new_ip])
963
      if not result.failed:
964
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
965
                                   " reachable on the network. Aborting." %
966
                                   new_ip)
967

    
968
    self.op.name = new_name
969

    
970
  def Exec(self, feedback_fn):
971
    """Rename the cluster.
972

973
    """
974
    clustername = self.op.name
975
    ip = self.ip
976
    ss = self.sstore
977

    
978
    # shutdown the master IP
979
    master = ss.GetMasterNode()
980
    if not rpc.call_node_stop_master(master):
981
      raise errors.OpExecError("Could not disable the master role")
982

    
983
    try:
984
      # modify the sstore
985
      ss.SetKey(ss.SS_MASTER_IP, ip)
986
      ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
987

    
988
      # Distribute updated ss config to all nodes
989
      myself = self.cfg.GetNodeInfo(master)
990
      dist_nodes = self.cfg.GetNodeList()
991
      if myself.name in dist_nodes:
992
        dist_nodes.remove(myself.name)
993

    
994
      logger.Debug("Copying updated ssconf data to all nodes")
995
      for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
996
        fname = ss.KeyToFilename(keyname)
997
        result = rpc.call_upload_file(dist_nodes, fname)
998
        for to_node in dist_nodes:
999
          if not result[to_node]:
1000
            logger.Error("copy of file %s to node %s failed" %
1001
                         (fname, to_node))
1002
    finally:
1003
      if not rpc.call_node_start_master(master):
1004
        logger.Error("Could not re-enable the master role on the master,\n"
1005
                     "please restart manually.")
1006

    
1007

    
1008
def _WaitForSync(cfgw, instance, oneshot=False, unlock=False):
1009
  """Sleep and poll for an instance's disk to sync.
1010

1011
  """
1012
  if not instance.disks:
1013
    return True
1014

    
1015
  if not oneshot:
1016
    logger.ToStdout("Waiting for instance %s to sync disks." % instance.name)
1017

    
1018
  node = instance.primary_node
1019

    
1020
  for dev in instance.disks:
1021
    cfgw.SetDiskID(dev, node)
1022

    
1023
  retries = 0
1024
  while True:
1025
    max_time = 0
1026
    done = True
1027
    cumul_degraded = False
1028
    rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1029
    if not rstats:
1030
      logger.ToStderr("Can't get any data from node %s" % node)
1031
      retries += 1
1032
      if retries >= 10:
1033
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1034
                                 " aborting." % node)
1035
      time.sleep(6)
1036
      continue
1037
    retries = 0
1038
    for i in range(len(rstats)):
1039
      mstat = rstats[i]
1040
      if mstat is None:
1041
        logger.ToStderr("Can't compute data for node %s/%s" %
1042
                        (node, instance.disks[i].iv_name))
1043
        continue
1044
      perc_done, est_time, is_degraded = mstat
1045
      cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1046
      if perc_done is not None:
1047
        done = False
1048
        if est_time is not None:
1049
          rem_time = "%d estimated seconds remaining" % est_time
1050
          max_time = est_time
1051
        else:
1052
          rem_time = "no time estimate"
1053
        logger.ToStdout("- device %s: %5.2f%% done, %s" %
1054
                        (instance.disks[i].iv_name, perc_done, rem_time))
1055
    if done or oneshot:
1056
      break
1057

    
1058
    if unlock:
1059
      utils.Unlock('cmd')
1060
    try:
1061
      time.sleep(min(60, max_time))
1062
    finally:
1063
      if unlock:
1064
        utils.Lock('cmd')
1065

    
1066
  if done:
1067
    logger.ToStdout("Instance %s's disks are in sync." % instance.name)
1068
  return not cumul_degraded
1069

    
1070

    
1071
def _CheckDiskConsistency(cfgw, dev, node, on_primary):
1072
  """Check that mirrors are not degraded.
1073

1074
  """
1075
  cfgw.SetDiskID(dev, node)
1076

    
1077
  result = True
1078
  if on_primary or dev.AssembleOnSecondary():
1079
    rstats = rpc.call_blockdev_find(node, dev)
1080
    if not rstats:
1081
      logger.ToStderr("Can't get any data from node %s" % node)
1082
      result = False
1083
    else:
1084
      result = result and (not rstats[5])
1085
  if dev.children:
1086
    for child in dev.children:
1087
      result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1088

    
1089
  return result
1090

    
1091

    
1092
class LUDiagnoseOS(NoHooksLU):
1093
  """Logical unit for OS diagnose/query.
1094

1095
  """
1096
  _OP_REQP = []
1097

    
1098
  def CheckPrereq(self):
1099
    """Check prerequisites.
1100

1101
    This always succeeds, since this is a pure query LU.
1102

1103
    """
1104
    return
1105

    
1106
  def Exec(self, feedback_fn):
1107
    """Compute the list of OSes.
1108

1109
    """
1110
    node_list = self.cfg.GetNodeList()
1111
    node_data = rpc.call_os_diagnose(node_list)
1112
    if node_data == False:
1113
      raise errors.OpExecError("Can't gather the list of OSes")
1114
    return node_data
1115

    
1116

    
1117
class LURemoveNode(LogicalUnit):
1118
  """Logical unit for removing a node.
1119

1120
  """
1121
  HPATH = "node-remove"
1122
  HTYPE = constants.HTYPE_NODE
1123
  _OP_REQP = ["node_name"]
1124

    
1125
  def BuildHooksEnv(self):
1126
    """Build hooks env.
1127

1128
    This doesn't run on the target node in the pre phase as a failed
1129
    node would not allows itself to run.
1130

1131
    """
1132
    env = {
1133
      "NODE_NAME": self.op.node_name,
1134
      }
1135
    all_nodes = self.cfg.GetNodeList()
1136
    all_nodes.remove(self.op.node_name)
1137
    return env, all_nodes, all_nodes
1138

    
1139
  def CheckPrereq(self):
1140
    """Check prerequisites.
1141

1142
    This checks:
1143
     - the node exists in the configuration
1144
     - it does not have primary or secondary instances
1145
     - it's not the master
1146

1147
    Any errors are signalled by raising errors.OpPrereqError.
1148

1149
    """
1150
    node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1151
    if node is None:
1152
      raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1153

    
1154
    instance_list = self.cfg.GetInstanceList()
1155

    
1156
    masternode = self.sstore.GetMasterNode()
1157
    if node.name == masternode:
1158
      raise errors.OpPrereqError("Node is the master node,"
1159
                                 " you need to failover first.")
1160

    
1161
    for instance_name in instance_list:
1162
      instance = self.cfg.GetInstanceInfo(instance_name)
1163
      if node.name == instance.primary_node:
1164
        raise errors.OpPrereqError("Instance %s still running on the node,"
1165
                                   " please remove first." % instance_name)
1166
      if node.name in instance.secondary_nodes:
1167
        raise errors.OpPrereqError("Instance %s has node as a secondary,"
1168
                                   " please remove first." % instance_name)
1169
    self.op.node_name = node.name
1170
    self.node = node
1171

    
1172
  def Exec(self, feedback_fn):
1173
    """Removes the node from the cluster.
1174

1175
    """
1176
    node = self.node
1177
    logger.Info("stopping the node daemon and removing configs from node %s" %
1178
                node.name)
1179

    
1180
    rpc.call_node_leave_cluster(node.name)
1181

    
1182
    ssh.SSHCall(node.name, 'root', "%s stop" % constants.NODE_INITD_SCRIPT)
1183

    
1184
    logger.Info("Removing node %s from config" % node.name)
1185

    
1186
    self.cfg.RemoveNode(node.name)
1187

    
1188

    
1189
class LUQueryNodes(NoHooksLU):
1190
  """Logical unit for querying nodes.
1191

1192
  """
1193
  _OP_REQP = ["output_fields", "names"]
1194

    
1195
  def CheckPrereq(self):
1196
    """Check prerequisites.
1197

1198
    This checks that the fields required are valid output fields.
1199

1200
    """
1201
    self.dynamic_fields = frozenset(["dtotal", "dfree",
1202
                                     "mtotal", "mnode", "mfree",
1203
                                     "bootid"])
1204

    
1205
    _CheckOutputFields(static=["name", "pinst_cnt", "sinst_cnt",
1206
                               "pinst_list", "sinst_list",
1207
                               "pip", "sip"],
1208
                       dynamic=self.dynamic_fields,
1209
                       selected=self.op.output_fields)
1210

    
1211
    self.wanted = _GetWantedNodes(self, self.op.names)
1212

    
1213
  def Exec(self, feedback_fn):
1214
    """Computes the list of nodes and their attributes.
1215

1216
    """
1217
    nodenames = self.wanted
1218
    nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1219

    
1220
    # begin data gathering
1221

    
1222
    if self.dynamic_fields.intersection(self.op.output_fields):
1223
      live_data = {}
1224
      node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1225
      for name in nodenames:
1226
        nodeinfo = node_data.get(name, None)
1227
        if nodeinfo:
1228
          live_data[name] = {
1229
            "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1230
            "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1231
            "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1232
            "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1233
            "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1234
            "bootid": nodeinfo['bootid'],
1235
            }
1236
        else:
1237
          live_data[name] = {}
1238
    else:
1239
      live_data = dict.fromkeys(nodenames, {})
1240

    
1241
    node_to_primary = dict([(name, set()) for name in nodenames])
1242
    node_to_secondary = dict([(name, set()) for name in nodenames])
1243

    
1244
    inst_fields = frozenset(("pinst_cnt", "pinst_list",
1245
                             "sinst_cnt", "sinst_list"))
1246
    if inst_fields & frozenset(self.op.output_fields):
1247
      instancelist = self.cfg.GetInstanceList()
1248

    
1249
      for instance_name in instancelist:
1250
        inst = self.cfg.GetInstanceInfo(instance_name)
1251
        if inst.primary_node in node_to_primary:
1252
          node_to_primary[inst.primary_node].add(inst.name)
1253
        for secnode in inst.secondary_nodes:
1254
          if secnode in node_to_secondary:
1255
            node_to_secondary[secnode].add(inst.name)
1256

    
1257
    # end data gathering
1258

    
1259
    output = []
1260
    for node in nodelist:
1261
      node_output = []
1262
      for field in self.op.output_fields:
1263
        if field == "name":
1264
          val = node.name
1265
        elif field == "pinst_list":
1266
          val = list(node_to_primary[node.name])
1267
        elif field == "sinst_list":
1268
          val = list(node_to_secondary[node.name])
1269
        elif field == "pinst_cnt":
1270
          val = len(node_to_primary[node.name])
1271
        elif field == "sinst_cnt":
1272
          val = len(node_to_secondary[node.name])
1273
        elif field == "pip":
1274
          val = node.primary_ip
1275
        elif field == "sip":
1276
          val = node.secondary_ip
1277
        elif field in self.dynamic_fields:
1278
          val = live_data[node.name].get(field, None)
1279
        else:
1280
          raise errors.ParameterError(field)
1281
        node_output.append(val)
1282
      output.append(node_output)
1283

    
1284
    return output
1285

    
1286

    
1287
class LUQueryNodeVolumes(NoHooksLU):
1288
  """Logical unit for getting volumes on node(s).
1289

1290
  """
1291
  _OP_REQP = ["nodes", "output_fields"]
1292

    
1293
  def CheckPrereq(self):
1294
    """Check prerequisites.
1295

1296
    This checks that the fields required are valid output fields.
1297

1298
    """
1299
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1300

    
1301
    _CheckOutputFields(static=["node"],
1302
                       dynamic=["phys", "vg", "name", "size", "instance"],
1303
                       selected=self.op.output_fields)
1304

    
1305

    
1306
  def Exec(self, feedback_fn):
1307
    """Computes the list of nodes and their attributes.
1308

1309
    """
1310
    nodenames = self.nodes
1311
    volumes = rpc.call_node_volumes(nodenames)
1312

    
1313
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
1314
             in self.cfg.GetInstanceList()]
1315

    
1316
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1317

    
1318
    output = []
1319
    for node in nodenames:
1320
      if node not in volumes or not volumes[node]:
1321
        continue
1322

    
1323
      node_vols = volumes[node][:]
1324
      node_vols.sort(key=lambda vol: vol['dev'])
1325

    
1326
      for vol in node_vols:
1327
        node_output = []
1328
        for field in self.op.output_fields:
1329
          if field == "node":
1330
            val = node
1331
          elif field == "phys":
1332
            val = vol['dev']
1333
          elif field == "vg":
1334
            val = vol['vg']
1335
          elif field == "name":
1336
            val = vol['name']
1337
          elif field == "size":
1338
            val = int(float(vol['size']))
1339
          elif field == "instance":
1340
            for inst in ilist:
1341
              if node not in lv_by_node[inst]:
1342
                continue
1343
              if vol['name'] in lv_by_node[inst][node]:
1344
                val = inst.name
1345
                break
1346
            else:
1347
              val = '-'
1348
          else:
1349
            raise errors.ParameterError(field)
1350
          node_output.append(str(val))
1351

    
1352
        output.append(node_output)
1353

    
1354
    return output
1355

    
1356

    
1357
class LUAddNode(LogicalUnit):
1358
  """Logical unit for adding node to the cluster.
1359

1360
  """
1361
  HPATH = "node-add"
1362
  HTYPE = constants.HTYPE_NODE
1363
  _OP_REQP = ["node_name"]
1364

    
1365
  def BuildHooksEnv(self):
1366
    """Build hooks env.
1367

1368
    This will run on all nodes before, and on all nodes + the new node after.
1369

1370
    """
1371
    env = {
1372
      "NODE_NAME": self.op.node_name,
1373
      "NODE_PIP": self.op.primary_ip,
1374
      "NODE_SIP": self.op.secondary_ip,
1375
      }
1376
    nodes_0 = self.cfg.GetNodeList()
1377
    nodes_1 = nodes_0 + [self.op.node_name, ]
1378
    return env, nodes_0, nodes_1
1379

    
1380
  def CheckPrereq(self):
1381
    """Check prerequisites.
1382

1383
    This checks:
1384
     - the new node is not already in the config
1385
     - it is resolvable
1386
     - its parameters (single/dual homed) matches the cluster
1387

1388
    Any errors are signalled by raising errors.OpPrereqError.
1389

1390
    """
1391
    node_name = self.op.node_name
1392
    cfg = self.cfg
1393

    
1394
    dns_data = utils.HostInfo(node_name)
1395

    
1396
    node = dns_data.name
1397
    primary_ip = self.op.primary_ip = dns_data.ip
1398
    secondary_ip = getattr(self.op, "secondary_ip", None)
1399
    if secondary_ip is None:
1400
      secondary_ip = primary_ip
1401
    if not utils.IsValidIP(secondary_ip):
1402
      raise errors.OpPrereqError("Invalid secondary IP given")
1403
    self.op.secondary_ip = secondary_ip
1404
    node_list = cfg.GetNodeList()
1405
    if node in node_list:
1406
      raise errors.OpPrereqError("Node %s is already in the configuration"
1407
                                 % node)
1408

    
1409
    for existing_node_name in node_list:
1410
      existing_node = cfg.GetNodeInfo(existing_node_name)
1411
      if (existing_node.primary_ip == primary_ip or
1412
          existing_node.secondary_ip == primary_ip or
1413
          existing_node.primary_ip == secondary_ip or
1414
          existing_node.secondary_ip == secondary_ip):
1415
        raise errors.OpPrereqError("New node ip address(es) conflict with"
1416
                                   " existing node %s" % existing_node.name)
1417

    
1418
    # check that the type of the node (single versus dual homed) is the
1419
    # same as for the master
1420
    myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1421
    master_singlehomed = myself.secondary_ip == myself.primary_ip
1422
    newbie_singlehomed = secondary_ip == primary_ip
1423
    if master_singlehomed != newbie_singlehomed:
1424
      if master_singlehomed:
1425
        raise errors.OpPrereqError("The master has no private ip but the"
1426
                                   " new node has one")
1427
      else:
1428
        raise errors.OpPrereqError("The master has a private ip but the"
1429
                                   " new node doesn't have one")
1430

    
1431
    # checks reachablity
1432
    if not utils.TcpPing(utils.HostInfo().name,
1433
                         primary_ip,
1434
                         constants.DEFAULT_NODED_PORT):
1435
      raise errors.OpPrereqError("Node not reachable by ping")
1436

    
1437
    if not newbie_singlehomed:
1438
      # check reachability from my secondary ip to newbie's secondary ip
1439
      if not utils.TcpPing(myself.secondary_ip,
1440
                           secondary_ip,
1441
                           constants.DEFAULT_NODED_PORT):
1442
        raise errors.OpPrereqError(
1443
          "Node secondary ip not reachable by TCP based ping to noded port")
1444

    
1445
    self.new_node = objects.Node(name=node,
1446
                                 primary_ip=primary_ip,
1447
                                 secondary_ip=secondary_ip)
1448

    
1449
  def Exec(self, feedback_fn):
1450
    """Adds the new node to the cluster.
1451

1452
    """
1453
    new_node = self.new_node
1454
    node = new_node.name
1455

    
1456
    # set up inter-node password and certificate and restarts the node daemon
1457
    gntpass = self.sstore.GetNodeDaemonPassword()
1458
    if not re.match('^[a-zA-Z0-9.]{1,64}$', gntpass):
1459
      raise errors.OpExecError("ganeti password corruption detected")
1460
    f = open(constants.SSL_CERT_FILE)
1461
    try:
1462
      gntpem = f.read(8192)
1463
    finally:
1464
      f.close()
1465
    # in the base64 pem encoding, neither '!' nor '.' are valid chars,
1466
    # so we use this to detect an invalid certificate; as long as the
1467
    # cert doesn't contain this, the here-document will be correctly
1468
    # parsed by the shell sequence below
1469
    if re.search('^!EOF\.', gntpem, re.MULTILINE):
1470
      raise errors.OpExecError("invalid PEM encoding in the SSL certificate")
1471
    if not gntpem.endswith("\n"):
1472
      raise errors.OpExecError("PEM must end with newline")
1473
    logger.Info("copy cluster pass to %s and starting the node daemon" % node)
1474

    
1475
    # and then connect with ssh to set password and start ganeti-noded
1476
    # note that all the below variables are sanitized at this point,
1477
    # either by being constants or by the checks above
1478
    ss = self.sstore
1479
    mycommand = ("umask 077 && "
1480
                 "echo '%s' > '%s' && "
1481
                 "cat > '%s' << '!EOF.' && \n"
1482
                 "%s!EOF.\n%s restart" %
1483
                 (gntpass, ss.KeyToFilename(ss.SS_NODED_PASS),
1484
                  constants.SSL_CERT_FILE, gntpem,
1485
                  constants.NODE_INITD_SCRIPT))
1486

    
1487
    result = ssh.SSHCall(node, 'root', mycommand, batch=False, ask_key=True)
1488
    if result.failed:
1489
      raise errors.OpExecError("Remote command on node %s, error: %s,"
1490
                               " output: %s" %
1491
                               (node, result.fail_reason, result.output))
1492

    
1493
    # check connectivity
1494
    time.sleep(4)
1495

    
1496
    result = rpc.call_version([node])[node]
1497
    if result:
1498
      if constants.PROTOCOL_VERSION == result:
1499
        logger.Info("communication to node %s fine, sw version %s match" %
1500
                    (node, result))
1501
      else:
1502
        raise errors.OpExecError("Version mismatch master version %s,"
1503
                                 " node version %s" %
1504
                                 (constants.PROTOCOL_VERSION, result))
1505
    else:
1506
      raise errors.OpExecError("Cannot get version from the new node")
1507

    
1508
    # setup ssh on node
1509
    logger.Info("copy ssh key to node %s" % node)
1510
    keyarray = []
1511
    keyfiles = ["/etc/ssh/ssh_host_dsa_key", "/etc/ssh/ssh_host_dsa_key.pub",
1512
                "/etc/ssh/ssh_host_rsa_key", "/etc/ssh/ssh_host_rsa_key.pub",
1513
                "/root/.ssh/id_dsa", "/root/.ssh/id_dsa.pub"]
1514

    
1515
    for i in keyfiles:
1516
      f = open(i, 'r')
1517
      try:
1518
        keyarray.append(f.read())
1519
      finally:
1520
        f.close()
1521

    
1522
    result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1523
                               keyarray[3], keyarray[4], keyarray[5])
1524

    
1525
    if not result:
1526
      raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1527

    
1528
    # Add node to our /etc/hosts, and add key to known_hosts
1529
    _UpdateEtcHosts(new_node.name, new_node.primary_ip)
1530
    _UpdateKnownHosts(new_node.name, new_node.primary_ip,
1531
                      self.cfg.GetHostKey())
1532

    
1533
    if new_node.secondary_ip != new_node.primary_ip:
1534
      if not rpc.call_node_tcp_ping(new_node.name,
1535
                                    constants.LOCALHOST_IP_ADDRESS,
1536
                                    new_node.secondary_ip,
1537
                                    constants.DEFAULT_NODED_PORT,
1538
                                    10, False):
1539
        raise errors.OpExecError("Node claims it doesn't have the"
1540
                                 " secondary ip you gave (%s).\n"
1541
                                 "Please fix and re-run this command." %
1542
                                 new_node.secondary_ip)
1543

    
1544
    success, msg = ssh.VerifyNodeHostname(node)
1545
    if not success:
1546
      raise errors.OpExecError("Node '%s' claims it has a different hostname"
1547
                               " than the one the resolver gives: %s.\n"
1548
                               "Please fix and re-run this command." %
1549
                               (node, msg))
1550

    
1551
    # Distribute updated /etc/hosts and known_hosts to all nodes,
1552
    # including the node just added
1553
    myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1554
    dist_nodes = self.cfg.GetNodeList() + [node]
1555
    if myself.name in dist_nodes:
1556
      dist_nodes.remove(myself.name)
1557

    
1558
    logger.Debug("Copying hosts and known_hosts to all nodes")
1559
    for fname in ("/etc/hosts", constants.SSH_KNOWN_HOSTS_FILE):
1560
      result = rpc.call_upload_file(dist_nodes, fname)
1561
      for to_node in dist_nodes:
1562
        if not result[to_node]:
1563
          logger.Error("copy of file %s to node %s failed" %
1564
                       (fname, to_node))
1565

    
1566
    to_copy = ss.GetFileList()
1567
    for fname in to_copy:
1568
      if not ssh.CopyFileToNode(node, fname):
1569
        logger.Error("could not copy file %s to node %s" % (fname, node))
1570

    
1571
    logger.Info("adding node %s to cluster.conf" % node)
1572
    self.cfg.AddNode(new_node)
1573

    
1574

    
1575
class LUMasterFailover(LogicalUnit):
1576
  """Failover the master node to the current node.
1577

1578
  This is a special LU in that it must run on a non-master node.
1579

1580
  """
1581
  HPATH = "master-failover"
1582
  HTYPE = constants.HTYPE_CLUSTER
1583
  REQ_MASTER = False
1584
  _OP_REQP = []
1585

    
1586
  def BuildHooksEnv(self):
1587
    """Build hooks env.
1588

1589
    This will run on the new master only in the pre phase, and on all
1590
    the nodes in the post phase.
1591

1592
    """
1593
    env = {
1594
      "NEW_MASTER": self.new_master,
1595
      "OLD_MASTER": self.old_master,
1596
      }
1597
    return env, [self.new_master], self.cfg.GetNodeList()
1598

    
1599
  def CheckPrereq(self):
1600
    """Check prerequisites.
1601

1602
    This checks that we are not already the master.
1603

1604
    """
1605
    self.new_master = utils.HostInfo().name
1606
    self.old_master = self.sstore.GetMasterNode()
1607

    
1608
    if self.old_master == self.new_master:
1609
      raise errors.OpPrereqError("This commands must be run on the node"
1610
                                 " where you want the new master to be.\n"
1611
                                 "%s is already the master" %
1612
                                 self.old_master)
1613

    
1614
  def Exec(self, feedback_fn):
1615
    """Failover the master node.
1616

1617
    This command, when run on a non-master node, will cause the current
1618
    master to cease being master, and the non-master to become new
1619
    master.
1620

1621
    """
1622
    #TODO: do not rely on gethostname returning the FQDN
1623
    logger.Info("setting master to %s, old master: %s" %
1624
                (self.new_master, self.old_master))
1625

    
1626
    if not rpc.call_node_stop_master(self.old_master):
1627
      logger.Error("could disable the master role on the old master"
1628
                   " %s, please disable manually" % self.old_master)
1629

    
1630
    ss = self.sstore
1631
    ss.SetKey(ss.SS_MASTER_NODE, self.new_master)
1632
    if not rpc.call_upload_file(self.cfg.GetNodeList(),
1633
                                ss.KeyToFilename(ss.SS_MASTER_NODE)):
1634
      logger.Error("could not distribute the new simple store master file"
1635
                   " to the other nodes, please check.")
1636

    
1637
    if not rpc.call_node_start_master(self.new_master):
1638
      logger.Error("could not start the master role on the new master"
1639
                   " %s, please check" % self.new_master)
1640
      feedback_fn("Error in activating the master IP on the new master,\n"
1641
                  "please fix manually.")
1642

    
1643

    
1644

    
1645
class LUQueryClusterInfo(NoHooksLU):
1646
  """Query cluster configuration.
1647

1648
  """
1649
  _OP_REQP = []
1650
  REQ_MASTER = False
1651

    
1652
  def CheckPrereq(self):
1653
    """No prerequsites needed for this LU.
1654

1655
    """
1656
    pass
1657

    
1658
  def Exec(self, feedback_fn):
1659
    """Return cluster config.
1660

1661
    """
1662
    result = {
1663
      "name": self.sstore.GetClusterName(),
1664
      "software_version": constants.RELEASE_VERSION,
1665
      "protocol_version": constants.PROTOCOL_VERSION,
1666
      "config_version": constants.CONFIG_VERSION,
1667
      "os_api_version": constants.OS_API_VERSION,
1668
      "export_version": constants.EXPORT_VERSION,
1669
      "master": self.sstore.GetMasterNode(),
1670
      "architecture": (platform.architecture()[0], platform.machine()),
1671
      }
1672

    
1673
    return result
1674

    
1675

    
1676
class LUClusterCopyFile(NoHooksLU):
1677
  """Copy file to cluster.
1678

1679
  """
1680
  _OP_REQP = ["nodes", "filename"]
1681

    
1682
  def CheckPrereq(self):
1683
    """Check prerequisites.
1684

1685
    It should check that the named file exists and that the given list
1686
    of nodes is valid.
1687

1688
    """
1689
    if not os.path.exists(self.op.filename):
1690
      raise errors.OpPrereqError("No such filename '%s'" % self.op.filename)
1691

    
1692
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1693

    
1694
  def Exec(self, feedback_fn):
1695
    """Copy a file from master to some nodes.
1696

1697
    Args:
1698
      opts - class with options as members
1699
      args - list containing a single element, the file name
1700
    Opts used:
1701
      nodes - list containing the name of target nodes; if empty, all nodes
1702

1703
    """
1704
    filename = self.op.filename
1705

    
1706
    myname = utils.HostInfo().name
1707

    
1708
    for node in self.nodes:
1709
      if node == myname:
1710
        continue
1711
      if not ssh.CopyFileToNode(node, filename):
1712
        logger.Error("Copy of file %s to node %s failed" % (filename, node))
1713

    
1714

    
1715
class LUDumpClusterConfig(NoHooksLU):
1716
  """Return a text-representation of the cluster-config.
1717

1718
  """
1719
  _OP_REQP = []
1720

    
1721
  def CheckPrereq(self):
1722
    """No prerequisites.
1723

1724
    """
1725
    pass
1726

    
1727
  def Exec(self, feedback_fn):
1728
    """Dump a representation of the cluster config to the standard output.
1729

1730
    """
1731
    return self.cfg.DumpConfig()
1732

    
1733

    
1734
class LURunClusterCommand(NoHooksLU):
1735
  """Run a command on some nodes.
1736

1737
  """
1738
  _OP_REQP = ["command", "nodes"]
1739

    
1740
  def CheckPrereq(self):
1741
    """Check prerequisites.
1742

1743
    It checks that the given list of nodes is valid.
1744

1745
    """
1746
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1747

    
1748
  def Exec(self, feedback_fn):
1749
    """Run a command on some nodes.
1750

1751
    """
1752
    data = []
1753
    for node in self.nodes:
1754
      result = ssh.SSHCall(node, "root", self.op.command)
1755
      data.append((node, result.output, result.exit_code))
1756

    
1757
    return data
1758

    
1759

    
1760
class LUActivateInstanceDisks(NoHooksLU):
1761
  """Bring up an instance's disks.
1762

1763
  """
1764
  _OP_REQP = ["instance_name"]
1765

    
1766
  def CheckPrereq(self):
1767
    """Check prerequisites.
1768

1769
    This checks that the instance is in the cluster.
1770

1771
    """
1772
    instance = self.cfg.GetInstanceInfo(
1773
      self.cfg.ExpandInstanceName(self.op.instance_name))
1774
    if instance is None:
1775
      raise errors.OpPrereqError("Instance '%s' not known" %
1776
                                 self.op.instance_name)
1777
    self.instance = instance
1778

    
1779

    
1780
  def Exec(self, feedback_fn):
1781
    """Activate the disks.
1782

1783
    """
1784
    disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
1785
    if not disks_ok:
1786
      raise errors.OpExecError("Cannot activate block devices")
1787

    
1788
    return disks_info
1789

    
1790

    
1791
def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
1792
  """Prepare the block devices for an instance.
1793

1794
  This sets up the block devices on all nodes.
1795

1796
  Args:
1797
    instance: a ganeti.objects.Instance object
1798
    ignore_secondaries: if true, errors on secondary nodes won't result
1799
                        in an error return from the function
1800

1801
  Returns:
1802
    false if the operation failed
1803
    list of (host, instance_visible_name, node_visible_name) if the operation
1804
         suceeded with the mapping from node devices to instance devices
1805
  """
1806
  device_info = []
1807
  disks_ok = True
1808
  for inst_disk in instance.disks:
1809
    master_result = None
1810
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1811
      cfg.SetDiskID(node_disk, node)
1812
      is_primary = node == instance.primary_node
1813
      result = rpc.call_blockdev_assemble(node, node_disk, is_primary)
1814
      if not result:
1815
        logger.Error("could not prepare block device %s on node %s (is_pri"
1816
                     "mary=%s)" % (inst_disk.iv_name, node, is_primary))
1817
        if is_primary or not ignore_secondaries:
1818
          disks_ok = False
1819
      if is_primary:
1820
        master_result = result
1821
    device_info.append((instance.primary_node, inst_disk.iv_name,
1822
                        master_result))
1823

    
1824
  return disks_ok, device_info
1825

    
1826

    
1827
def _StartInstanceDisks(cfg, instance, force):
1828
  """Start the disks of an instance.
1829

1830
  """
1831
  disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
1832
                                           ignore_secondaries=force)
1833
  if not disks_ok:
1834
    _ShutdownInstanceDisks(instance, cfg)
1835
    if force is not None and not force:
1836
      logger.Error("If the message above refers to a secondary node,"
1837
                   " you can retry the operation using '--force'.")
1838
    raise errors.OpExecError("Disk consistency error")
1839

    
1840

    
1841
class LUDeactivateInstanceDisks(NoHooksLU):
1842
  """Shutdown an instance's disks.
1843

1844
  """
1845
  _OP_REQP = ["instance_name"]
1846

    
1847
  def CheckPrereq(self):
1848
    """Check prerequisites.
1849

1850
    This checks that the instance is in the cluster.
1851

1852
    """
1853
    instance = self.cfg.GetInstanceInfo(
1854
      self.cfg.ExpandInstanceName(self.op.instance_name))
1855
    if instance is None:
1856
      raise errors.OpPrereqError("Instance '%s' not known" %
1857
                                 self.op.instance_name)
1858
    self.instance = instance
1859

    
1860
  def Exec(self, feedback_fn):
1861
    """Deactivate the disks
1862

1863
    """
1864
    instance = self.instance
1865
    ins_l = rpc.call_instance_list([instance.primary_node])
1866
    ins_l = ins_l[instance.primary_node]
1867
    if not type(ins_l) is list:
1868
      raise errors.OpExecError("Can't contact node '%s'" %
1869
                               instance.primary_node)
1870

    
1871
    if self.instance.name in ins_l:
1872
      raise errors.OpExecError("Instance is running, can't shutdown"
1873
                               " block devices.")
1874

    
1875
    _ShutdownInstanceDisks(instance, self.cfg)
1876

    
1877

    
1878
def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
1879
  """Shutdown block devices of an instance.
1880

1881
  This does the shutdown on all nodes of the instance.
1882

1883
  If the ignore_primary is false, errors on the primary node are
1884
  ignored.
1885

1886
  """
1887
  result = True
1888
  for disk in instance.disks:
1889
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
1890
      cfg.SetDiskID(top_disk, node)
1891
      if not rpc.call_blockdev_shutdown(node, top_disk):
1892
        logger.Error("could not shutdown block device %s on node %s" %
1893
                     (disk.iv_name, node))
1894
        if not ignore_primary or node != instance.primary_node:
1895
          result = False
1896
  return result
1897

    
1898

    
1899
class LUStartupInstance(LogicalUnit):
1900
  """Starts an instance.
1901

1902
  """
1903
  HPATH = "instance-start"
1904
  HTYPE = constants.HTYPE_INSTANCE
1905
  _OP_REQP = ["instance_name", "force"]
1906

    
1907
  def BuildHooksEnv(self):
1908
    """Build hooks env.
1909

1910
    This runs on master, primary and secondary nodes of the instance.
1911

1912
    """
1913
    env = {
1914
      "FORCE": self.op.force,
1915
      }
1916
    env.update(_BuildInstanceHookEnvByObject(self.instance))
1917
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
1918
          list(self.instance.secondary_nodes))
1919
    return env, nl, nl
1920

    
1921
  def CheckPrereq(self):
1922
    """Check prerequisites.
1923

1924
    This checks that the instance is in the cluster.
1925

1926
    """
1927
    instance = self.cfg.GetInstanceInfo(
1928
      self.cfg.ExpandInstanceName(self.op.instance_name))
1929
    if instance is None:
1930
      raise errors.OpPrereqError("Instance '%s' not known" %
1931
                                 self.op.instance_name)
1932

    
1933
    # check bridges existance
1934
    brlist = [nic.bridge for nic in instance.nics]
1935
    if not rpc.call_bridges_exist(instance.primary_node, brlist):
1936
      raise errors.OpPrereqError("one or more target bridges %s does not"
1937
                                 " exist on destination node '%s'" %
1938
                                 (brlist, instance.primary_node))
1939

    
1940
    self.instance = instance
1941
    self.op.instance_name = instance.name
1942

    
1943
  def Exec(self, feedback_fn):
1944
    """Start the instance.
1945

1946
    """
1947
    instance = self.instance
1948
    force = self.op.force
1949
    extra_args = getattr(self.op, "extra_args", "")
1950

    
1951
    node_current = instance.primary_node
1952

    
1953
    nodeinfo = rpc.call_node_info([node_current], self.cfg.GetVGName())
1954
    if not nodeinfo:
1955
      raise errors.OpExecError("Could not contact node %s for infos" %
1956
                               (node_current))
1957

    
1958
    freememory = nodeinfo[node_current]['memory_free']
1959
    memory = instance.memory
1960
    if memory > freememory:
1961
      raise errors.OpExecError("Not enough memory to start instance"
1962
                               " %s on node %s"
1963
                               " needed %s MiB, available %s MiB" %
1964
                               (instance.name, node_current, memory,
1965
                                freememory))
1966

    
1967
    _StartInstanceDisks(self.cfg, instance, force)
1968

    
1969
    if not rpc.call_instance_start(node_current, instance, extra_args):
1970
      _ShutdownInstanceDisks(instance, self.cfg)
1971
      raise errors.OpExecError("Could not start instance")
1972

    
1973
    self.cfg.MarkInstanceUp(instance.name)
1974

    
1975

    
1976
class LUShutdownInstance(LogicalUnit):
1977
  """Shutdown an instance.
1978

1979
  """
1980
  HPATH = "instance-stop"
1981
  HTYPE = constants.HTYPE_INSTANCE
1982
  _OP_REQP = ["instance_name"]
1983

    
1984
  def BuildHooksEnv(self):
1985
    """Build hooks env.
1986

1987
    This runs on master, primary and secondary nodes of the instance.
1988

1989
    """
1990
    env = _BuildInstanceHookEnvByObject(self.instance)
1991
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
1992
          list(self.instance.secondary_nodes))
1993
    return env, nl, nl
1994

    
1995
  def CheckPrereq(self):
1996
    """Check prerequisites.
1997

1998
    This checks that the instance is in the cluster.
1999

2000
    """
2001
    instance = self.cfg.GetInstanceInfo(
2002
      self.cfg.ExpandInstanceName(self.op.instance_name))
2003
    if instance is None:
2004
      raise errors.OpPrereqError("Instance '%s' not known" %
2005
                                 self.op.instance_name)
2006
    self.instance = instance
2007

    
2008
  def Exec(self, feedback_fn):
2009
    """Shutdown the instance.
2010

2011
    """
2012
    instance = self.instance
2013
    node_current = instance.primary_node
2014
    if not rpc.call_instance_shutdown(node_current, instance):
2015
      logger.Error("could not shutdown instance")
2016

    
2017
    self.cfg.MarkInstanceDown(instance.name)
2018
    _ShutdownInstanceDisks(instance, self.cfg)
2019

    
2020

    
2021
class LUReinstallInstance(LogicalUnit):
2022
  """Reinstall an instance.
2023

2024
  """
2025
  HPATH = "instance-reinstall"
2026
  HTYPE = constants.HTYPE_INSTANCE
2027
  _OP_REQP = ["instance_name"]
2028

    
2029
  def BuildHooksEnv(self):
2030
    """Build hooks env.
2031

2032
    This runs on master, primary and secondary nodes of the instance.
2033

2034
    """
2035
    env = _BuildInstanceHookEnvByObject(self.instance)
2036
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2037
          list(self.instance.secondary_nodes))
2038
    return env, nl, nl
2039

    
2040
  def CheckPrereq(self):
2041
    """Check prerequisites.
2042

2043
    This checks that the instance is in the cluster and is not running.
2044

2045
    """
2046
    instance = self.cfg.GetInstanceInfo(
2047
      self.cfg.ExpandInstanceName(self.op.instance_name))
2048
    if instance is None:
2049
      raise errors.OpPrereqError("Instance '%s' not known" %
2050
                                 self.op.instance_name)
2051
    if instance.disk_template == constants.DT_DISKLESS:
2052
      raise errors.OpPrereqError("Instance '%s' has no disks" %
2053
                                 self.op.instance_name)
2054
    if instance.status != "down":
2055
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2056
                                 self.op.instance_name)
2057
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2058
    if remote_info:
2059
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2060
                                 (self.op.instance_name,
2061
                                  instance.primary_node))
2062

    
2063
    self.op.os_type = getattr(self.op, "os_type", None)
2064
    if self.op.os_type is not None:
2065
      # OS verification
2066
      pnode = self.cfg.GetNodeInfo(
2067
        self.cfg.ExpandNodeName(instance.primary_node))
2068
      if pnode is None:
2069
        raise errors.OpPrereqError("Primary node '%s' is unknown" %
2070
                                   self.op.pnode)
2071
      os_obj = rpc.call_os_get([pnode.name], self.op.os_type)[pnode.name]
2072
      if not isinstance(os_obj, objects.OS):
2073
        raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2074
                                   " primary node"  % self.op.os_type)
2075

    
2076
    self.instance = instance
2077

    
2078
  def Exec(self, feedback_fn):
2079
    """Reinstall the instance.
2080

2081
    """
2082
    inst = self.instance
2083

    
2084
    if self.op.os_type is not None:
2085
      feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2086
      inst.os = self.op.os_type
2087
      self.cfg.AddInstance(inst)
2088

    
2089
    _StartInstanceDisks(self.cfg, inst, None)
2090
    try:
2091
      feedback_fn("Running the instance OS create scripts...")
2092
      if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2093
        raise errors.OpExecError("Could not install OS for instance %s "
2094
                                 "on node %s" %
2095
                                 (inst.name, inst.primary_node))
2096
    finally:
2097
      _ShutdownInstanceDisks(inst, self.cfg)
2098

    
2099

    
2100
class LURenameInstance(LogicalUnit):
2101
  """Rename an instance.
2102

2103
  """
2104
  HPATH = "instance-rename"
2105
  HTYPE = constants.HTYPE_INSTANCE
2106
  _OP_REQP = ["instance_name", "new_name"]
2107

    
2108
  def BuildHooksEnv(self):
2109
    """Build hooks env.
2110

2111
    This runs on master, primary and secondary nodes of the instance.
2112

2113
    """
2114
    env = _BuildInstanceHookEnvByObject(self.instance)
2115
    env["INSTANCE_NEW_NAME"] = self.op.new_name
2116
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2117
          list(self.instance.secondary_nodes))
2118
    return env, nl, nl
2119

    
2120
  def CheckPrereq(self):
2121
    """Check prerequisites.
2122

2123
    This checks that the instance is in the cluster and is not running.
2124

2125
    """
2126
    instance = self.cfg.GetInstanceInfo(
2127
      self.cfg.ExpandInstanceName(self.op.instance_name))
2128
    if instance is None:
2129
      raise errors.OpPrereqError("Instance '%s' not known" %
2130
                                 self.op.instance_name)
2131
    if instance.status != "down":
2132
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2133
                                 self.op.instance_name)
2134
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2135
    if remote_info:
2136
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2137
                                 (self.op.instance_name,
2138
                                  instance.primary_node))
2139
    self.instance = instance
2140

    
2141
    # new name verification
2142
    name_info = utils.HostInfo(self.op.new_name)
2143

    
2144
    self.op.new_name = new_name = name_info.name
2145
    if not getattr(self.op, "ignore_ip", False):
2146
      command = ["fping", "-q", name_info.ip]
2147
      result = utils.RunCmd(command)
2148
      if not result.failed:
2149
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
2150
                                   (name_info.ip, new_name))
2151

    
2152

    
2153
  def Exec(self, feedback_fn):
2154
    """Reinstall the instance.
2155

2156
    """
2157
    inst = self.instance
2158
    old_name = inst.name
2159

    
2160
    self.cfg.RenameInstance(inst.name, self.op.new_name)
2161

    
2162
    # re-read the instance from the configuration after rename
2163
    inst = self.cfg.GetInstanceInfo(self.op.new_name)
2164

    
2165
    _StartInstanceDisks(self.cfg, inst, None)
2166
    try:
2167
      if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2168
                                          "sda", "sdb"):
2169
        msg = ("Could run OS rename script for instance %s\n"
2170
               "on node %s\n"
2171
               "(but the instance has been renamed in Ganeti)" %
2172
               (inst.name, inst.primary_node))
2173
        logger.Error(msg)
2174
    finally:
2175
      _ShutdownInstanceDisks(inst, self.cfg)
2176

    
2177

    
2178
class LURemoveInstance(LogicalUnit):
2179
  """Remove an instance.
2180

2181
  """
2182
  HPATH = "instance-remove"
2183
  HTYPE = constants.HTYPE_INSTANCE
2184
  _OP_REQP = ["instance_name"]
2185

    
2186
  def BuildHooksEnv(self):
2187
    """Build hooks env.
2188

2189
    This runs on master, primary and secondary nodes of the instance.
2190

2191
    """
2192
    env = _BuildInstanceHookEnvByObject(self.instance)
2193
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2194
          list(self.instance.secondary_nodes))
2195
    return env, nl, nl
2196

    
2197
  def CheckPrereq(self):
2198
    """Check prerequisites.
2199

2200
    This checks that the instance is in the cluster.
2201

2202
    """
2203
    instance = self.cfg.GetInstanceInfo(
2204
      self.cfg.ExpandInstanceName(self.op.instance_name))
2205
    if instance is None:
2206
      raise errors.OpPrereqError("Instance '%s' not known" %
2207
                                 self.op.instance_name)
2208
    self.instance = instance
2209

    
2210
  def Exec(self, feedback_fn):
2211
    """Remove the instance.
2212

2213
    """
2214
    instance = self.instance
2215
    logger.Info("shutting down instance %s on node %s" %
2216
                (instance.name, instance.primary_node))
2217

    
2218
    if not rpc.call_instance_shutdown(instance.primary_node, instance):
2219
      raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2220
                               (instance.name, instance.primary_node))
2221

    
2222
    logger.Info("removing block devices for instance %s" % instance.name)
2223

    
2224
    _RemoveDisks(instance, self.cfg)
2225

    
2226
    logger.Info("removing instance %s out of cluster config" % instance.name)
2227

    
2228
    self.cfg.RemoveInstance(instance.name)
2229

    
2230

    
2231
class LUQueryInstances(NoHooksLU):
2232
  """Logical unit for querying instances.
2233

2234
  """
2235
  _OP_REQP = ["output_fields", "names"]
2236

    
2237
  def CheckPrereq(self):
2238
    """Check prerequisites.
2239

2240
    This checks that the fields required are valid output fields.
2241

2242
    """
2243
    self.dynamic_fields = frozenset(["oper_state", "oper_ram"])
2244
    _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2245
                               "admin_state", "admin_ram",
2246
                               "disk_template", "ip", "mac", "bridge",
2247
                               "sda_size", "sdb_size"],
2248
                       dynamic=self.dynamic_fields,
2249
                       selected=self.op.output_fields)
2250

    
2251
    self.wanted = _GetWantedInstances(self, self.op.names)
2252

    
2253
  def Exec(self, feedback_fn):
2254
    """Computes the list of nodes and their attributes.
2255

2256
    """
2257
    instance_names = self.wanted
2258
    instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2259
                     in instance_names]
2260

    
2261
    # begin data gathering
2262

    
2263
    nodes = frozenset([inst.primary_node for inst in instance_list])
2264

    
2265
    bad_nodes = []
2266
    if self.dynamic_fields.intersection(self.op.output_fields):
2267
      live_data = {}
2268
      node_data = rpc.call_all_instances_info(nodes)
2269
      for name in nodes:
2270
        result = node_data[name]
2271
        if result:
2272
          live_data.update(result)
2273
        elif result == False:
2274
          bad_nodes.append(name)
2275
        # else no instance is alive
2276
    else:
2277
      live_data = dict([(name, {}) for name in instance_names])
2278

    
2279
    # end data gathering
2280

    
2281
    output = []
2282
    for instance in instance_list:
2283
      iout = []
2284
      for field in self.op.output_fields:
2285
        if field == "name":
2286
          val = instance.name
2287
        elif field == "os":
2288
          val = instance.os
2289
        elif field == "pnode":
2290
          val = instance.primary_node
2291
        elif field == "snodes":
2292
          val = list(instance.secondary_nodes)
2293
        elif field == "admin_state":
2294
          val = (instance.status != "down")
2295
        elif field == "oper_state":
2296
          if instance.primary_node in bad_nodes:
2297
            val = None
2298
          else:
2299
            val = bool(live_data.get(instance.name))
2300
        elif field == "admin_ram":
2301
          val = instance.memory
2302
        elif field == "oper_ram":
2303
          if instance.primary_node in bad_nodes:
2304
            val = None
2305
          elif instance.name in live_data:
2306
            val = live_data[instance.name].get("memory", "?")
2307
          else:
2308
            val = "-"
2309
        elif field == "disk_template":
2310
          val = instance.disk_template
2311
        elif field == "ip":
2312
          val = instance.nics[0].ip
2313
        elif field == "bridge":
2314
          val = instance.nics[0].bridge
2315
        elif field == "mac":
2316
          val = instance.nics[0].mac
2317
        elif field == "sda_size" or field == "sdb_size":
2318
          disk = instance.FindDisk(field[:3])
2319
          if disk is None:
2320
            val = None
2321
          else:
2322
            val = disk.size
2323
        else:
2324
          raise errors.ParameterError(field)
2325
        iout.append(val)
2326
      output.append(iout)
2327

    
2328
    return output
2329

    
2330

    
2331
class LUFailoverInstance(LogicalUnit):
2332
  """Failover an instance.
2333

2334
  """
2335
  HPATH = "instance-failover"
2336
  HTYPE = constants.HTYPE_INSTANCE
2337
  _OP_REQP = ["instance_name", "ignore_consistency"]
2338

    
2339
  def BuildHooksEnv(self):
2340
    """Build hooks env.
2341

2342
    This runs on master, primary and secondary nodes of the instance.
2343

2344
    """
2345
    env = {
2346
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2347
      }
2348
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2349
    nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2350
    return env, nl, nl
2351

    
2352
  def CheckPrereq(self):
2353
    """Check prerequisites.
2354

2355
    This checks that the instance is in the cluster.
2356

2357
    """
2358
    instance = self.cfg.GetInstanceInfo(
2359
      self.cfg.ExpandInstanceName(self.op.instance_name))
2360
    if instance is None:
2361
      raise errors.OpPrereqError("Instance '%s' not known" %
2362
                                 self.op.instance_name)
2363

    
2364
    if instance.disk_template != constants.DT_REMOTE_RAID1:
2365
      raise errors.OpPrereqError("Instance's disk layout is not"
2366
                                 " remote_raid1.")
2367

    
2368
    secondary_nodes = instance.secondary_nodes
2369
    if not secondary_nodes:
2370
      raise errors.ProgrammerError("no secondary node but using "
2371
                                   "DT_REMOTE_RAID1 template")
2372

    
2373
    # check memory requirements on the secondary node
2374
    target_node = secondary_nodes[0]
2375
    nodeinfo = rpc.call_node_info([target_node], self.cfg.GetVGName())
2376
    info = nodeinfo.get(target_node, None)
2377
    if not info:
2378
      raise errors.OpPrereqError("Cannot get current information"
2379
                                 " from node '%s'" % nodeinfo)
2380
    if instance.memory > info['memory_free']:
2381
      raise errors.OpPrereqError("Not enough memory on target node %s."
2382
                                 " %d MB available, %d MB required" %
2383
                                 (target_node, info['memory_free'],
2384
                                  instance.memory))
2385

    
2386
    # check bridge existance
2387
    brlist = [nic.bridge for nic in instance.nics]
2388
    if not rpc.call_bridges_exist(instance.primary_node, brlist):
2389
      raise errors.OpPrereqError("One or more target bridges %s does not"
2390
                                 " exist on destination node '%s'" %
2391
                                 (brlist, instance.primary_node))
2392

    
2393
    self.instance = instance
2394

    
2395
  def Exec(self, feedback_fn):
2396
    """Failover an instance.
2397

2398
    The failover is done by shutting it down on its present node and
2399
    starting it on the secondary.
2400

2401
    """
2402
    instance = self.instance
2403

    
2404
    source_node = instance.primary_node
2405
    target_node = instance.secondary_nodes[0]
2406

    
2407
    feedback_fn("* checking disk consistency between source and target")
2408
    for dev in instance.disks:
2409
      # for remote_raid1, these are md over drbd
2410
      if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2411
        if not self.op.ignore_consistency:
2412
          raise errors.OpExecError("Disk %s is degraded on target node,"
2413
                                   " aborting failover." % dev.iv_name)
2414

    
2415
    feedback_fn("* checking target node resource availability")
2416
    nodeinfo = rpc.call_node_info([target_node], self.cfg.GetVGName())
2417

    
2418
    if not nodeinfo:
2419
      raise errors.OpExecError("Could not contact target node %s." %
2420
                               target_node)
2421

    
2422
    free_memory = int(nodeinfo[target_node]['memory_free'])
2423
    memory = instance.memory
2424
    if memory > free_memory:
2425
      raise errors.OpExecError("Not enough memory to create instance %s on"
2426
                               " node %s. needed %s MiB, available %s MiB" %
2427
                               (instance.name, target_node, memory,
2428
                                free_memory))
2429

    
2430
    feedback_fn("* shutting down instance on source node")
2431
    logger.Info("Shutting down instance %s on node %s" %
2432
                (instance.name, source_node))
2433

    
2434
    if not rpc.call_instance_shutdown(source_node, instance):
2435
      logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2436
                   " anyway. Please make sure node %s is down"  %
2437
                   (instance.name, source_node, source_node))
2438

    
2439
    feedback_fn("* deactivating the instance's disks on source node")
2440
    if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2441
      raise errors.OpExecError("Can't shut down the instance's disks.")
2442

    
2443
    instance.primary_node = target_node
2444
    # distribute new instance config to the other nodes
2445
    self.cfg.AddInstance(instance)
2446

    
2447
    feedback_fn("* activating the instance's disks on target node")
2448
    logger.Info("Starting instance %s on node %s" %
2449
                (instance.name, target_node))
2450

    
2451
    disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2452
                                             ignore_secondaries=True)
2453
    if not disks_ok:
2454
      _ShutdownInstanceDisks(instance, self.cfg)
2455
      raise errors.OpExecError("Can't activate the instance's disks")
2456

    
2457
    feedback_fn("* starting the instance on the target node")
2458
    if not rpc.call_instance_start(target_node, instance, None):
2459
      _ShutdownInstanceDisks(instance, self.cfg)
2460
      raise errors.OpExecError("Could not start instance %s on node %s." %
2461
                               (instance.name, target_node))
2462

    
2463

    
2464
def _CreateBlockDevOnPrimary(cfg, node, device, info):
2465
  """Create a tree of block devices on the primary node.
2466

2467
  This always creates all devices.
2468

2469
  """
2470
  if device.children:
2471
    for child in device.children:
2472
      if not _CreateBlockDevOnPrimary(cfg, node, child, info):
2473
        return False
2474

    
2475
  cfg.SetDiskID(device, node)
2476
  new_id = rpc.call_blockdev_create(node, device, device.size, True, info)
2477
  if not new_id:
2478
    return False
2479
  if device.physical_id is None:
2480
    device.physical_id = new_id
2481
  return True
2482

    
2483

    
2484
def _CreateBlockDevOnSecondary(cfg, node, device, force, info):
2485
  """Create a tree of block devices on a secondary node.
2486

2487
  If this device type has to be created on secondaries, create it and
2488
  all its children.
2489

2490
  If not, just recurse to children keeping the same 'force' value.
2491

2492
  """
2493
  if device.CreateOnSecondary():
2494
    force = True
2495
  if device.children:
2496
    for child in device.children:
2497
      if not _CreateBlockDevOnSecondary(cfg, node, child, force, info):
2498
        return False
2499

    
2500
  if not force:
2501
    return True
2502
  cfg.SetDiskID(device, node)
2503
  new_id = rpc.call_blockdev_create(node, device, device.size, False, info)
2504
  if not new_id:
2505
    return False
2506
  if device.physical_id is None:
2507
    device.physical_id = new_id
2508
  return True
2509

    
2510

    
2511
def _GenerateUniqueNames(cfg, exts):
2512
  """Generate a suitable LV name.
2513

2514
  This will generate a logical volume name for the given instance.
2515

2516
  """
2517
  results = []
2518
  for val in exts:
2519
    new_id = cfg.GenerateUniqueID()
2520
    results.append("%s%s" % (new_id, val))
2521
  return results
2522

    
2523

    
2524
def _GenerateMDDRBDBranch(cfg, primary, secondary, size, names):
2525
  """Generate a drbd device complete with its children.
2526

2527
  """
2528
  port = cfg.AllocatePort()
2529
  vgname = cfg.GetVGName()
2530
  dev_data = objects.Disk(dev_type="lvm", size=size,
2531
                          logical_id=(vgname, names[0]))
2532
  dev_meta = objects.Disk(dev_type="lvm", size=128,
2533
                          logical_id=(vgname, names[1]))
2534
  drbd_dev = objects.Disk(dev_type="drbd", size=size,
2535
                          logical_id = (primary, secondary, port),
2536
                          children = [dev_data, dev_meta])
2537
  return drbd_dev
2538

    
2539

    
2540
def _GenerateDiskTemplate(cfg, template_name,
2541
                          instance_name, primary_node,
2542
                          secondary_nodes, disk_sz, swap_sz):
2543
  """Generate the entire disk layout for a given template type.
2544

2545
  """
2546
  #TODO: compute space requirements
2547

    
2548
  vgname = cfg.GetVGName()
2549
  if template_name == "diskless":
2550
    disks = []
2551
  elif template_name == "plain":
2552
    if len(secondary_nodes) != 0:
2553
      raise errors.ProgrammerError("Wrong template configuration")
2554

    
2555
    names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
2556
    sda_dev = objects.Disk(dev_type="lvm", size=disk_sz,
2557
                           logical_id=(vgname, names[0]),
2558
                           iv_name = "sda")
2559
    sdb_dev = objects.Disk(dev_type="lvm", size=swap_sz,
2560
                           logical_id=(vgname, names[1]),
2561
                           iv_name = "sdb")
2562
    disks = [sda_dev, sdb_dev]
2563
  elif template_name == "local_raid1":
2564
    if len(secondary_nodes) != 0:
2565
      raise errors.ProgrammerError("Wrong template configuration")
2566

    
2567

    
2568
    names = _GenerateUniqueNames(cfg, [".sda_m1", ".sda_m2",
2569
                                       ".sdb_m1", ".sdb_m2"])
2570
    sda_dev_m1 = objects.Disk(dev_type="lvm", size=disk_sz,
2571
                              logical_id=(vgname, names[0]))
2572
    sda_dev_m2 = objects.Disk(dev_type="lvm", size=disk_sz,
2573
                              logical_id=(vgname, names[1]))
2574
    md_sda_dev = objects.Disk(dev_type="md_raid1", iv_name = "sda",
2575
                              size=disk_sz,
2576
                              children = [sda_dev_m1, sda_dev_m2])
2577
    sdb_dev_m1 = objects.Disk(dev_type="lvm", size=swap_sz,
2578
                              logical_id=(vgname, names[2]))
2579
    sdb_dev_m2 = objects.Disk(dev_type="lvm", size=swap_sz,
2580
                              logical_id=(vgname, names[3]))
2581
    md_sdb_dev = objects.Disk(dev_type="md_raid1", iv_name = "sdb",
2582
                              size=swap_sz,
2583
                              children = [sdb_dev_m1, sdb_dev_m2])
2584
    disks = [md_sda_dev, md_sdb_dev]
2585
  elif template_name == constants.DT_REMOTE_RAID1:
2586
    if len(secondary_nodes) != 1:
2587
      raise errors.ProgrammerError("Wrong template configuration")
2588
    remote_node = secondary_nodes[0]
2589
    names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2590
                                       ".sdb_data", ".sdb_meta"])
2591
    drbd_sda_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2592
                                         disk_sz, names[0:2])
2593
    md_sda_dev = objects.Disk(dev_type="md_raid1", iv_name="sda",
2594
                              children = [drbd_sda_dev], size=disk_sz)
2595
    drbd_sdb_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2596
                                         swap_sz, names[2:4])
2597
    md_sdb_dev = objects.Disk(dev_type="md_raid1", iv_name="sdb",
2598
                              children = [drbd_sdb_dev], size=swap_sz)
2599
    disks = [md_sda_dev, md_sdb_dev]
2600
  else:
2601
    raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
2602
  return disks
2603

    
2604

    
2605
def _GetInstanceInfoText(instance):
2606
  """Compute that text that should be added to the disk's metadata.
2607

2608
  """
2609
  return "originstname+%s" % instance.name
2610

    
2611

    
2612
def _CreateDisks(cfg, instance):
2613
  """Create all disks for an instance.
2614

2615
  This abstracts away some work from AddInstance.
2616

2617
  Args:
2618
    instance: the instance object
2619

2620
  Returns:
2621
    True or False showing the success of the creation process
2622

2623
  """
2624
  info = _GetInstanceInfoText(instance)
2625

    
2626
  for device in instance.disks:
2627
    logger.Info("creating volume %s for instance %s" %
2628
              (device.iv_name, instance.name))
2629
    #HARDCODE
2630
    for secondary_node in instance.secondary_nodes:
2631
      if not _CreateBlockDevOnSecondary(cfg, secondary_node, device, False,
2632
                                        info):
2633
        logger.Error("failed to create volume %s (%s) on secondary node %s!" %
2634
                     (device.iv_name, device, secondary_node))
2635
        return False
2636
    #HARDCODE
2637
    if not _CreateBlockDevOnPrimary(cfg, instance.primary_node, device, info):
2638
      logger.Error("failed to create volume %s on primary!" %
2639
                   device.iv_name)
2640
      return False
2641
  return True
2642

    
2643

    
2644
def _RemoveDisks(instance, cfg):
2645
  """Remove all disks for an instance.
2646

2647
  This abstracts away some work from `AddInstance()` and
2648
  `RemoveInstance()`. Note that in case some of the devices couldn't
2649
  be remove, the removal will continue with the other ones (compare
2650
  with `_CreateDisks()`).
2651

2652
  Args:
2653
    instance: the instance object
2654

2655
  Returns:
2656
    True or False showing the success of the removal proces
2657

2658
  """
2659
  logger.Info("removing block devices for instance %s" % instance.name)
2660

    
2661
  result = True
2662
  for device in instance.disks:
2663
    for node, disk in device.ComputeNodeTree(instance.primary_node):
2664
      cfg.SetDiskID(disk, node)
2665
      if not rpc.call_blockdev_remove(node, disk):
2666
        logger.Error("could not remove block device %s on node %s,"
2667
                     " continuing anyway" %
2668
                     (device.iv_name, node))
2669
        result = False
2670
  return result
2671

    
2672

    
2673
class LUCreateInstance(LogicalUnit):
2674
  """Create an instance.
2675

2676
  """
2677
  HPATH = "instance-add"
2678
  HTYPE = constants.HTYPE_INSTANCE
2679
  _OP_REQP = ["instance_name", "mem_size", "disk_size", "pnode",
2680
              "disk_template", "swap_size", "mode", "start", "vcpus",
2681
              "wait_for_sync", "ip_check"]
2682

    
2683
  def BuildHooksEnv(self):
2684
    """Build hooks env.
2685

2686
    This runs on master, primary and secondary nodes of the instance.
2687

2688
    """
2689
    env = {
2690
      "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
2691
      "INSTANCE_DISK_SIZE": self.op.disk_size,
2692
      "INSTANCE_SWAP_SIZE": self.op.swap_size,
2693
      "INSTANCE_ADD_MODE": self.op.mode,
2694
      }
2695
    if self.op.mode == constants.INSTANCE_IMPORT:
2696
      env["INSTANCE_SRC_NODE"] = self.op.src_node
2697
      env["INSTANCE_SRC_PATH"] = self.op.src_path
2698
      env["INSTANCE_SRC_IMAGE"] = self.src_image
2699

    
2700
    env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
2701
      primary_node=self.op.pnode,
2702
      secondary_nodes=self.secondaries,
2703
      status=self.instance_status,
2704
      os_type=self.op.os_type,
2705
      memory=self.op.mem_size,
2706
      vcpus=self.op.vcpus,
2707
      nics=[(self.inst_ip, self.op.bridge)],
2708
    ))
2709

    
2710
    nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
2711
          self.secondaries)
2712
    return env, nl, nl
2713

    
2714

    
2715
  def CheckPrereq(self):
2716
    """Check prerequisites.
2717

2718
    """
2719
    if self.op.mode not in (constants.INSTANCE_CREATE,
2720
                            constants.INSTANCE_IMPORT):
2721
      raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
2722
                                 self.op.mode)
2723

    
2724
    if self.op.mode == constants.INSTANCE_IMPORT:
2725
      src_node = getattr(self.op, "src_node", None)
2726
      src_path = getattr(self.op, "src_path", None)
2727
      if src_node is None or src_path is None:
2728
        raise errors.OpPrereqError("Importing an instance requires source"
2729
                                   " node and path options")
2730
      src_node_full = self.cfg.ExpandNodeName(src_node)
2731
      if src_node_full is None:
2732
        raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
2733
      self.op.src_node = src_node = src_node_full
2734

    
2735
      if not os.path.isabs(src_path):
2736
        raise errors.OpPrereqError("The source path must be absolute")
2737

    
2738
      export_info = rpc.call_export_info(src_node, src_path)
2739

    
2740
      if not export_info:
2741
        raise errors.OpPrereqError("No export found in dir %s" % src_path)
2742

    
2743
      if not export_info.has_section(constants.INISECT_EXP):
2744
        raise errors.ProgrammerError("Corrupted export config")
2745

    
2746
      ei_version = export_info.get(constants.INISECT_EXP, 'version')
2747
      if (int(ei_version) != constants.EXPORT_VERSION):
2748
        raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
2749
                                   (ei_version, constants.EXPORT_VERSION))
2750

    
2751
      if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
2752
        raise errors.OpPrereqError("Can't import instance with more than"
2753
                                   " one data disk")
2754

    
2755
      # FIXME: are the old os-es, disk sizes, etc. useful?
2756
      self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
2757
      diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
2758
                                                         'disk0_dump'))
2759
      self.src_image = diskimage
2760
    else: # INSTANCE_CREATE
2761
      if getattr(self.op, "os_type", None) is None:
2762
        raise errors.OpPrereqError("No guest OS specified")
2763

    
2764
    # check primary node
2765
    pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
2766
    if pnode is None:
2767
      raise errors.OpPrereqError("Primary node '%s' is unknown" %
2768
                                 self.op.pnode)
2769
    self.op.pnode = pnode.name
2770
    self.pnode = pnode
2771
    self.secondaries = []
2772
    # disk template and mirror node verification
2773
    if self.op.disk_template not in constants.DISK_TEMPLATES:
2774
      raise errors.OpPrereqError("Invalid disk template name")
2775

    
2776
    if self.op.disk_template == constants.DT_REMOTE_RAID1:
2777
      if getattr(self.op, "snode", None) is None:
2778
        raise errors.OpPrereqError("The 'remote_raid1' disk template needs"
2779
                                   " a mirror node")
2780

    
2781
      snode_name = self.cfg.ExpandNodeName(self.op.snode)
2782
      if snode_name is None:
2783
        raise errors.OpPrereqError("Unknown secondary node '%s'" %
2784
                                   self.op.snode)
2785
      elif snode_name == pnode.name:
2786
        raise errors.OpPrereqError("The secondary node cannot be"
2787
                                   " the primary node.")
2788
      self.secondaries.append(snode_name)
2789

    
2790
    # Check lv size requirements
2791
    nodenames = [pnode.name] + self.secondaries
2792
    nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
2793

    
2794
    # Required free disk space as a function of disk and swap space
2795
    req_size_dict = {
2796
      constants.DT_DISKLESS: 0,
2797
      constants.DT_PLAIN: self.op.disk_size + self.op.swap_size,
2798
      constants.DT_LOCAL_RAID1: (self.op.disk_size + self.op.swap_size) * 2,
2799
      # 256 MB are added for drbd metadata, 128MB for each drbd device
2800
      constants.DT_REMOTE_RAID1: self.op.disk_size + self.op.swap_size + 256,
2801
    }
2802

    
2803
    if self.op.disk_template not in req_size_dict:
2804
      raise errors.ProgrammerError("Disk template '%s' size requirement"
2805
                                   " is unknown" %  self.op.disk_template)
2806

    
2807
    req_size = req_size_dict[self.op.disk_template]
2808

    
2809
    for node in nodenames:
2810
      info = nodeinfo.get(node, None)
2811
      if not info:
2812
        raise errors.OpPrereqError("Cannot get current information"
2813
                                   " from node '%s'" % nodeinfo)
2814
      if req_size > info['vg_free']:
2815
        raise errors.OpPrereqError("Not enough disk space on target node %s."
2816
                                   " %d MB available, %d MB required" %
2817
                                   (node, info['vg_free'], req_size))
2818

    
2819
    # os verification
2820
    os_obj = rpc.call_os_get([pnode.name], self.op.os_type)[pnode.name]
2821
    if not isinstance(os_obj, objects.OS):
2822
      raise errors.OpPrereqError("OS '%s' not in supported os list for"
2823
                                 " primary node"  % self.op.os_type)
2824

    
2825
    # instance verification
2826
    hostname1 = utils.HostInfo(self.op.instance_name)
2827

    
2828
    self.op.instance_name = instance_name = hostname1.name
2829
    instance_list = self.cfg.GetInstanceList()
2830
    if instance_name in instance_list:
2831
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2832
                                 instance_name)
2833

    
2834
    ip = getattr(self.op, "ip", None)
2835
    if ip is None or ip.lower() == "none":
2836
      inst_ip = None
2837
    elif ip.lower() == "auto":
2838
      inst_ip = hostname1.ip
2839
    else:
2840
      if not utils.IsValidIP(ip):
2841
        raise errors.OpPrereqError("given IP address '%s' doesn't look"
2842
                                   " like a valid IP" % ip)
2843
      inst_ip = ip
2844
    self.inst_ip = inst_ip
2845

    
2846
    if self.op.start and not self.op.ip_check:
2847
      raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
2848
                                 " adding an instance in start mode")
2849

    
2850
    if self.op.ip_check:
2851
      if utils.TcpPing(utils.HostInfo().name, hostname1.ip,
2852
                       constants.DEFAULT_NODED_PORT):
2853
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
2854
                                   (hostname1.ip, instance_name))
2855

    
2856
    # bridge verification
2857
    bridge = getattr(self.op, "bridge", None)
2858
    if bridge is None:
2859
      self.op.bridge = self.cfg.GetDefBridge()
2860
    else:
2861
      self.op.bridge = bridge
2862

    
2863
    if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
2864
      raise errors.OpPrereqError("target bridge '%s' does not exist on"
2865
                                 " destination node '%s'" %
2866
                                 (self.op.bridge, pnode.name))
2867

    
2868
    if self.op.start:
2869
      self.instance_status = 'up'
2870
    else:
2871
      self.instance_status = 'down'
2872

    
2873
  def Exec(self, feedback_fn):
2874
    """Create and add the instance to the cluster.
2875

2876
    """
2877
    instance = self.op.instance_name
2878
    pnode_name = self.pnode.name
2879

    
2880
    nic = objects.NIC(bridge=self.op.bridge, mac=self.cfg.GenerateMAC())
2881
    if self.inst_ip is not None:
2882
      nic.ip = self.inst_ip
2883

    
2884
    disks = _GenerateDiskTemplate(self.cfg,
2885
                                  self.op.disk_template,
2886
                                  instance, pnode_name,
2887
                                  self.secondaries, self.op.disk_size,
2888
                                  self.op.swap_size)
2889

    
2890
    iobj = objects.Instance(name=instance, os=self.op.os_type,
2891
                            primary_node=pnode_name,
2892
                            memory=self.op.mem_size,
2893
                            vcpus=self.op.vcpus,
2894
                            nics=[nic], disks=disks,
2895
                            disk_template=self.op.disk_template,
2896
                            status=self.instance_status,
2897
                            )
2898

    
2899
    feedback_fn("* creating instance disks...")
2900
    if not _CreateDisks(self.cfg, iobj):
2901
      _RemoveDisks(iobj, self.cfg)
2902
      raise errors.OpExecError("Device creation failed, reverting...")
2903

    
2904
    feedback_fn("adding instance %s to cluster config" % instance)
2905

    
2906
    self.cfg.AddInstance(iobj)
2907

    
2908
    if self.op.wait_for_sync:
2909
      disk_abort = not _WaitForSync(self.cfg, iobj)
2910
    elif iobj.disk_template == constants.DT_REMOTE_RAID1:
2911
      # make sure the disks are not degraded (still sync-ing is ok)
2912
      time.sleep(15)
2913
      feedback_fn("* checking mirrors status")
2914
      disk_abort = not _WaitForSync(self.cfg, iobj, oneshot=True)
2915
    else:
2916
      disk_abort = False
2917

    
2918
    if disk_abort:
2919
      _RemoveDisks(iobj, self.cfg)
2920
      self.cfg.RemoveInstance(iobj.name)
2921
      raise errors.OpExecError("There are some degraded disks for"
2922
                               " this instance")
2923

    
2924
    feedback_fn("creating os for instance %s on node %s" %
2925
                (instance, pnode_name))
2926

    
2927
    if iobj.disk_template != constants.DT_DISKLESS:
2928
      if self.op.mode == constants.INSTANCE_CREATE:
2929
        feedback_fn("* running the instance OS create scripts...")
2930
        if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
2931
          raise errors.OpExecError("could not add os for instance %s"
2932
                                   " on node %s" %
2933
                                   (instance, pnode_name))
2934

    
2935
      elif self.op.mode == constants.INSTANCE_IMPORT:
2936
        feedback_fn("* running the instance OS import scripts...")
2937
        src_node = self.op.src_node
2938
        src_image = self.src_image
2939
        if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
2940
                                                src_node, src_image):
2941
          raise errors.OpExecError("Could not import os for instance"
2942
                                   " %s on node %s" %
2943
                                   (instance, pnode_name))
2944
      else:
2945
        # also checked in the prereq part
2946
        raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
2947
                                     % self.op.mode)
2948

    
2949
    if self.op.start:
2950
      logger.Info("starting instance %s on node %s" % (instance, pnode_name))
2951
      feedback_fn("* starting instance...")
2952
      if not rpc.call_instance_start(pnode_name, iobj, None):
2953
        raise errors.OpExecError("Could not start instance")
2954

    
2955

    
2956
class LUConnectConsole(NoHooksLU):
2957
  """Connect to an instance's console.
2958

2959
  This is somewhat special in that it returns the command line that
2960
  you need to run on the master node in order to connect to the
2961
  console.
2962

2963
  """
2964
  _OP_REQP = ["instance_name"]
2965

    
2966
  def CheckPrereq(self):
2967
    """Check prerequisites.
2968

2969
    This checks that the instance is in the cluster.
2970

2971
    """
2972
    instance = self.cfg.GetInstanceInfo(
2973
      self.cfg.ExpandInstanceName(self.op.instance_name))
2974
    if instance is None:
2975
      raise errors.OpPrereqError("Instance '%s' not known" %
2976
                                 self.op.instance_name)
2977
    self.instance = instance
2978

    
2979
  def Exec(self, feedback_fn):
2980
    """Connect to the console of an instance
2981

2982
    """
2983
    instance = self.instance
2984
    node = instance.primary_node
2985

    
2986
    node_insts = rpc.call_instance_list([node])[node]
2987
    if node_insts is False:
2988
      raise errors.OpExecError("Can't connect to node %s." % node)
2989

    
2990
    if instance.name not in node_insts:
2991
      raise errors.OpExecError("Instance %s is not running." % instance.name)
2992

    
2993
    logger.Debug("connecting to console of %s on %s" % (instance.name, node))
2994

    
2995
    hyper = hypervisor.GetHypervisor()
2996
    console_cmd = hyper.GetShellCommandForConsole(instance.name)
2997
    # build ssh cmdline
2998
    argv = ["ssh", "-q", "-t"]
2999
    argv.extend(ssh.KNOWN_HOSTS_OPTS)
3000
    argv.extend(ssh.BATCH_MODE_OPTS)
3001
    argv.append(node)
3002
    argv.append(console_cmd)
3003
    return "ssh", argv
3004

    
3005

    
3006
class LUAddMDDRBDComponent(LogicalUnit):
3007
  """Adda new mirror member to an instance's disk.
3008

3009
  """
3010
  HPATH = "mirror-add"
3011
  HTYPE = constants.HTYPE_INSTANCE
3012
  _OP_REQP = ["instance_name", "remote_node", "disk_name"]
3013

    
3014
  def BuildHooksEnv(self):
3015
    """Build hooks env.
3016

3017
    This runs on the master, the primary and all the secondaries.
3018

3019
    """
3020
    env = {
3021
      "NEW_SECONDARY": self.op.remote_node,
3022
      "DISK_NAME": self.op.disk_name,
3023
      }
3024
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3025
    nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
3026
          self.op.remote_node,] + list(self.instance.secondary_nodes)
3027
    return env, nl, nl
3028

    
3029
  def CheckPrereq(self):
3030
    """Check prerequisites.
3031

3032
    This checks that the instance is in the cluster.
3033

3034
    """
3035
    instance = self.cfg.GetInstanceInfo(
3036
      self.cfg.ExpandInstanceName(self.op.instance_name))
3037
    if instance is None:
3038
      raise errors.OpPrereqError("Instance '%s' not known" %
3039
                                 self.op.instance_name)
3040
    self.instance = instance
3041

    
3042
    remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
3043
    if remote_node is None:
3044
      raise errors.OpPrereqError("Node '%s' not known" % self.op.remote_node)
3045
    self.remote_node = remote_node
3046

    
3047
    if remote_node == instance.primary_node:
3048
      raise errors.OpPrereqError("The specified node is the primary node of"
3049
                                 " the instance.")
3050

    
3051
    if instance.disk_template != constants.DT_REMOTE_RAID1:
3052
      raise errors.OpPrereqError("Instance's disk layout is not"
3053
                                 " remote_raid1.")
3054
    for disk in instance.disks:
3055
      if disk.iv_name == self.op.disk_name:
3056
        break
3057
    else:
3058
      raise errors.OpPrereqError("Can't find this device ('%s') in the"
3059
                                 " instance." % self.op.disk_name)
3060
    if len(disk.children) > 1:
3061
      raise errors.OpPrereqError("The device already has two slave"
3062
                                 " devices.\n"
3063
                                 "This would create a 3-disk raid1"
3064
                                 " which we don't allow.")
3065
    self.disk = disk
3066

    
3067
  def Exec(self, feedback_fn):
3068
    """Add the mirror component
3069

3070
    """
3071
    disk = self.disk
3072
    instance = self.instance
3073

    
3074
    remote_node = self.remote_node
3075
    lv_names = [".%s_%s" % (disk.iv_name, suf) for suf in ["data", "meta"]]
3076
    names = _GenerateUniqueNames(self.cfg, lv_names)
3077
    new_drbd = _GenerateMDDRBDBranch(self.cfg, instance.primary_node,
3078
                                     remote_node, disk.size, names)
3079

    
3080
    logger.Info("adding new mirror component on secondary")
3081
    #HARDCODE
3082
    if not _CreateBlockDevOnSecondary(self.cfg, remote_node, new_drbd, False,
3083
                                      _GetInstanceInfoText(instance)):
3084
      raise errors.OpExecError("Failed to create new component on secondary"
3085
                               " node %s" % remote_node)
3086

    
3087
    logger.Info("adding new mirror component on primary")
3088
    #HARDCODE
3089
    if not _CreateBlockDevOnPrimary(self.cfg, instance.primary_node, new_drbd,
3090
                                    _GetInstanceInfoText(instance)):
3091
      # remove secondary dev
3092
      self.cfg.SetDiskID(new_drbd, remote_node)
3093
      rpc.call_blockdev_remove(remote_node, new_drbd)
3094
      raise errors.OpExecError("Failed to create volume on primary")
3095

    
3096
    # the device exists now
3097
    # call the primary node to add the mirror to md
3098
    logger.Info("adding new mirror component to md")
3099
    if not rpc.call_blockdev_addchild(instance.primary_node,
3100
                                           disk, new_drbd):
3101
      logger.Error("Can't add mirror compoment to md!")
3102
      self.cfg.SetDiskID(new_drbd, remote_node)
3103
      if not rpc.call_blockdev_remove(remote_node, new_drbd):
3104
        logger.Error("Can't rollback on secondary")
3105
      self.cfg.SetDiskID(new_drbd, instance.primary_node)
3106
      if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3107
        logger.Error("Can't rollback on primary")
3108
      raise errors.OpExecError("Can't add mirror component to md array")
3109

    
3110
    disk.children.append(new_drbd)
3111

    
3112
    self.cfg.AddInstance(instance)
3113

    
3114
    _WaitForSync(self.cfg, instance)
3115

    
3116
    return 0
3117

    
3118

    
3119
class LURemoveMDDRBDComponent(LogicalUnit):
3120
  """Remove a component from a remote_raid1 disk.
3121

3122
  """
3123
  HPATH = "mirror-remove"
3124
  HTYPE = constants.HTYPE_INSTANCE
3125
  _OP_REQP = ["instance_name", "disk_name", "disk_id"]
3126

    
3127
  def BuildHooksEnv(self):
3128
    """Build hooks env.
3129

3130
    This runs on the master, the primary and all the secondaries.
3131

3132
    """
3133
    env = {
3134
      "DISK_NAME": self.op.disk_name,
3135
      "DISK_ID": self.op.disk_id,
3136
      "OLD_SECONDARY": self.old_secondary,
3137
      }
3138
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3139
    nl = [self.sstore.GetMasterNode(),
3140
          self.instance.primary_node] + list(self.instance.secondary_nodes)
3141
    return env, nl, nl
3142

    
3143
  def CheckPrereq(self):
3144
    """Check prerequisites.
3145

3146
    This checks that the instance is in the cluster.
3147

3148
    """
3149
    instance = self.cfg.GetInstanceInfo(
3150
      self.cfg.ExpandInstanceName(self.op.instance_name))
3151
    if instance is None:
3152
      raise errors.OpPrereqError("Instance '%s' not known" %
3153
                                 self.op.instance_name)
3154
    self.instance = instance
3155

    
3156
    if instance.disk_template != constants.DT_REMOTE_RAID1:
3157
      raise errors.OpPrereqError("Instance's disk layout is not"
3158
                                 " remote_raid1.")
3159
    for disk in instance.disks:
3160
      if disk.iv_name == self.op.disk_name:
3161
        break
3162
    else:
3163
      raise errors.OpPrereqError("Can't find this device ('%s') in the"
3164
                                 " instance." % self.op.disk_name)
3165
    for child in disk.children:
3166
      if child.dev_type == "drbd" and child.logical_id[2] == self.op.disk_id:
3167
        break
3168
    else:
3169
      raise errors.OpPrereqError("Can't find the device with this port.")
3170

    
3171
    if len(disk.children) < 2:
3172
      raise errors.OpPrereqError("Cannot remove the last component from"
3173
                                 " a mirror.")
3174
    self.disk = disk
3175
    self.child = child
3176
    if self.child.logical_id[0] == instance.primary_node:
3177
      oid = 1
3178
    else:
3179
      oid = 0
3180
    self.old_secondary = self.child.logical_id[oid]
3181

    
3182
  def Exec(self, feedback_fn):
3183
    """Remove the mirror component
3184

3185
    """
3186
    instance = self.instance
3187
    disk = self.disk
3188
    child = self.child
3189
    logger.Info("remove mirror component")
3190
    self.cfg.SetDiskID(disk, instance.primary_node)
3191
    if not rpc.call_blockdev_removechild(instance.primary_node,
3192
                                              disk, child):
3193
      raise errors.OpExecError("Can't remove child from mirror.")
3194

    
3195
    for node in child.logical_id[:2]:
3196
      self.cfg.SetDiskID(child, node)
3197
      if not rpc.call_blockdev_remove(node, child):
3198
        logger.Error("Warning: failed to remove device from node %s,"
3199
                     " continuing operation." % node)
3200

    
3201
    disk.children.remove(child)
3202
    self.cfg.AddInstance(instance)
3203

    
3204

    
3205
class LUReplaceDisks(LogicalUnit):
3206
  """Replace the disks of an instance.
3207

3208
  """
3209
  HPATH = "mirrors-replace"
3210
  HTYPE = constants.HTYPE_INSTANCE
3211
  _OP_REQP = ["instance_name"]
3212

    
3213
  def BuildHooksEnv(self):
3214
    """Build hooks env.
3215

3216
    This runs on the master, the primary and all the secondaries.
3217

3218
    """
3219
    env = {
3220
      "NEW_SECONDARY": self.op.remote_node,
3221
      "OLD_SECONDARY": self.instance.secondary_nodes[0],
3222
      }
3223
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3224
    nl = [self.sstore.GetMasterNode(),
3225
          self.instance.primary_node] + list(self.instance.secondary_nodes)
3226
    return env, nl, nl
3227

    
3228
  def CheckPrereq(self):
3229
    """Check prerequisites.
3230

3231
    This checks that the instance is in the cluster.
3232

3233
    """
3234
    instance = self.cfg.GetInstanceInfo(
3235
      self.cfg.ExpandInstanceName(self.op.instance_name))
3236
    if instance is None:
3237
      raise errors.OpPrereqError("Instance '%s' not known" %
3238
                                 self.op.instance_name)
3239
    self.instance = instance
3240

    
3241
    if instance.disk_template != constants.DT_REMOTE_RAID1:
3242
      raise errors.OpPrereqError("Instance's disk layout is not"
3243
                                 " remote_raid1.")
3244

    
3245
    if len(instance.secondary_nodes) != 1:
3246
      raise errors.OpPrereqError("The instance has a strange layout,"
3247
                                 " expected one secondary but found %d" %
3248
                                 len(instance.secondary_nodes))
3249

    
3250
    remote_node = getattr(self.op, "remote_node", None)
3251
    if remote_node is None:
3252
      remote_node = instance.secondary_nodes[0]
3253
    else:
3254
      remote_node = self.cfg.ExpandNodeName(remote_node)
3255
      if remote_node is None:
3256
        raise errors.OpPrereqError("Node '%s' not known" %
3257
                                   self.op.remote_node)
3258
    if remote_node == instance.primary_node:
3259
      raise errors.OpPrereqError("The specified node is the primary node of"
3260
                                 " the instance.")
3261
    self.op.remote_node = remote_node
3262

    
3263
  def Exec(self, feedback_fn):
3264
    """Replace the disks of an instance.
3265

3266
    """
3267
    instance = self.instance
3268
    iv_names = {}
3269
    # start of work
3270
    remote_node = self.op.remote_node
3271
    cfg = self.cfg
3272
    for dev in instance.disks:
3273
      size = dev.size
3274
      lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3275
      names = _GenerateUniqueNames(cfg, lv_names)
3276
      new_drbd = _GenerateMDDRBDBranch(cfg, instance.primary_node,
3277
                                       remote_node, size, names)
3278
      iv_names[dev.iv_name] = (dev, dev.children[0], new_drbd)
3279
      logger.Info("adding new mirror component on secondary for %s" %
3280
                  dev.iv_name)
3281
      #HARDCODE
3282
      if not _CreateBlockDevOnSecondary(cfg, remote_node, new_drbd, False,
3283
                                        _GetInstanceInfoText(instance)):
3284
        raise errors.OpExecError("Failed to create new component on"
3285
                                 " secondary node %s\n"
3286
                                 "Full abort, cleanup manually!" %
3287
                                 remote_node)
3288

    
3289
      logger.Info("adding new mirror component on primary")
3290
      #HARDCODE
3291
      if not _CreateBlockDevOnPrimary(cfg, instance.primary_node, new_drbd,
3292
                                      _GetInstanceInfoText(instance)):
3293
        # remove secondary dev
3294
        cfg.SetDiskID(new_drbd, remote_node)
3295
        rpc.call_blockdev_remove(remote_node, new_drbd)
3296
        raise errors.OpExecError("Failed to create volume on primary!\n"
3297
                                 "Full abort, cleanup manually!!")
3298

    
3299
      # the device exists now
3300
      # call the primary node to add the mirror to md
3301
      logger.Info("adding new mirror component to md")
3302
      if not rpc.call_blockdev_addchild(instance.primary_node, dev,
3303
                                        new_drbd):
3304
        logger.Error("Can't add mirror compoment to md!")
3305
        cfg.SetDiskID(new_drbd, remote_node)
3306
        if not rpc.call_blockdev_remove(remote_node, new_drbd):
3307
          logger.Error("Can't rollback on secondary")
3308
        cfg.SetDiskID(new_drbd, instance.primary_node)
3309
        if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3310
          logger.Error("Can't rollback on primary")
3311
        raise errors.OpExecError("Full abort, cleanup manually!!")
3312

    
3313
      dev.children.append(new_drbd)
3314
      cfg.AddInstance(instance)
3315

    
3316
    # this can fail as the old devices are degraded and _WaitForSync
3317
    # does a combined result over all disks, so we don't check its
3318
    # return value
3319
    _WaitForSync(cfg, instance, unlock=True)
3320

    
3321
    # so check manually all the devices
3322
    for name in iv_names:
3323
      dev, child, new_drbd = iv_names[name]
3324
      cfg.SetDiskID(dev, instance.primary_node)
3325
      is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3326
      if is_degr:
3327
        raise errors.OpExecError("MD device %s is degraded!" % name)
3328
      cfg.SetDiskID(new_drbd, instance.primary_node)
3329
      is_degr = rpc.call_blockdev_find(instance.primary_node, new_drbd)[5]
3330
      if is_degr:
3331
        raise errors.OpExecError("New drbd device %s is degraded!" % name)
3332

    
3333
    for name in iv_names:
3334
      dev, child, new_drbd = iv_names[name]
3335
      logger.Info("remove mirror %s component" % name)
3336
      cfg.SetDiskID(dev, instance.primary_node)
3337
      if not rpc.call_blockdev_removechild(instance.primary_node,
3338
                                                dev, child):
3339
        logger.Error("Can't remove child from mirror, aborting"
3340
                     " *this device cleanup*.\nYou need to cleanup manually!!")
3341
        continue
3342

    
3343
      for node in child.logical_id[:2]:
3344
        logger.Info("remove child device on %s" % node)
3345
        cfg.SetDiskID(child, node)
3346
        if not rpc.call_blockdev_remove(node, child):
3347
          logger.Error("Warning: failed to remove device from node %s,"
3348
                       " continuing operation." % node)
3349

    
3350
      dev.children.remove(child)
3351

    
3352
      cfg.AddInstance(instance)
3353

    
3354

    
3355
class LUQueryInstanceData(NoHooksLU):
3356
  """Query runtime instance data.
3357

3358
  """
3359
  _OP_REQP = ["instances"]
3360

    
3361
  def CheckPrereq(self):
3362
    """Check prerequisites.
3363

3364
    This only checks the optional instance list against the existing names.
3365

3366
    """
3367
    if not isinstance(self.op.instances, list):
3368
      raise errors.OpPrereqError("Invalid argument type 'instances'")
3369
    if self.op.instances:
3370
      self.wanted_instances = []
3371
      names = self.op.instances
3372
      for name in names:
3373
        instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
3374
        if instance is None:
3375
          raise errors.OpPrereqError("No such instance name '%s'" % name)
3376
      self.wanted_instances.append(instance)
3377
    else:
3378
      self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
3379
                               in self.cfg.GetInstanceList()]
3380
    return
3381

    
3382

    
3383
  def _ComputeDiskStatus(self, instance, snode, dev):
3384
    """Compute block device status.
3385

3386
    """
3387
    self.cfg.SetDiskID(dev, instance.primary_node)
3388
    dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
3389
    if dev.dev_type == "drbd":
3390
      # we change the snode then (otherwise we use the one passed in)
3391
      if dev.logical_id[0] == instance.primary_node:
3392
        snode = dev.logical_id[1]
3393
      else:
3394
        snode = dev.logical_id[0]
3395

    
3396
    if snode:
3397
      self.cfg.SetDiskID(dev, snode)
3398
      dev_sstatus = rpc.call_blockdev_find(snode, dev)
3399
    else:
3400
      dev_sstatus = None
3401

    
3402
    if dev.children:
3403
      dev_children = [self._ComputeDiskStatus(instance, snode, child)
3404
                      for child in dev.children]
3405
    else:
3406
      dev_children = []
3407

    
3408
    data = {
3409
      "iv_name": dev.iv_name,
3410
      "dev_type": dev.dev_type,
3411
      "logical_id": dev.logical_id,
3412
      "physical_id": dev.physical_id,
3413
      "pstatus": dev_pstatus,
3414
      "sstatus": dev_sstatus,
3415
      "children": dev_children,
3416
      }
3417

    
3418
    return data
3419

    
3420
  def Exec(self, feedback_fn):
3421
    """Gather and return data"""
3422
    result = {}
3423
    for instance in self.wanted_instances:
3424
      remote_info = rpc.call_instance_info(instance.primary_node,
3425
                                                instance.name)
3426
      if remote_info and "state" in remote_info:
3427
        remote_state = "up"
3428
      else:
3429
        remote_state = "down"
3430
      if instance.status == "down":
3431
        config_state = "down"
3432
      else:
3433
        config_state = "up"
3434

    
3435
      disks = [self._ComputeDiskStatus(instance, None, device)
3436
               for device in instance.disks]
3437

    
3438
      idict = {
3439
        "name": instance.name,
3440
        "config_state": config_state,
3441
        "run_state": remote_state,
3442
        "pnode": instance.primary_node,
3443
        "snodes": instance.secondary_nodes,
3444
        "os": instance.os,
3445
        "memory": instance.memory,
3446
        "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
3447
        "disks": disks,
3448
        }
3449

    
3450
      result[instance.name] = idict
3451

    
3452
    return result
3453

    
3454

    
3455
class LUSetInstanceParms(LogicalUnit):
3456
  """Modifies an instances's parameters.
3457

3458
  """
3459
  HPATH = "instance-modify"
3460
  HTYPE = constants.HTYPE_INSTANCE
3461
  _OP_REQP = ["instance_name"]
3462

    
3463
  def BuildHooksEnv(self):
3464
    """Build hooks env.
3465

3466
    This runs on the master, primary and secondaries.
3467

3468
    """
3469
    args = dict()
3470
    if self.mem:
3471
      args['memory'] = self.mem
3472
    if self.vcpus:
3473
      args['vcpus'] = self.vcpus
3474
    if self.do_ip or self.do_bridge:
3475
      if self.do_ip:
3476
        ip = self.ip
3477
      else:
3478
        ip = self.instance.nics[0].ip
3479
      if self.bridge:
3480
        bridge = self.bridge
3481
      else:
3482
        bridge = self.instance.nics[0].bridge
3483
      args['nics'] = [(ip, bridge)]
3484
    env = _BuildInstanceHookEnvByObject(self.instance, override=args)
3485
    nl = [self.sstore.GetMasterNode(),
3486
          self.instance.primary_node] + list(self.instance.secondary_nodes)
3487
    return env, nl, nl
3488

    
3489
  def CheckPrereq(self):
3490
    """Check prerequisites.
3491

3492
    This only checks the instance list against the existing names.
3493

3494
    """
3495
    self.mem = getattr(self.op, "mem", None)
3496
    self.vcpus = getattr(self.op, "vcpus", None)
3497
    self.ip = getattr(self.op, "ip", None)
3498
    self.bridge = getattr(self.op, "bridge", None)
3499
    if [self.mem, self.vcpus, self.ip, self.bridge].count(None) == 4:
3500
      raise errors.OpPrereqError("No changes submitted")
3501
    if self.mem is not None:
3502
      try:
3503
        self.mem = int(self.mem)
3504
      except ValueError, err:
3505
        raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
3506
    if self.vcpus is not None:
3507
      try:
3508
        self.vcpus = int(self.vcpus)
3509
      except ValueError, err:
3510
        raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
3511
    if self.ip is not None:
3512
      self.do_ip = True
3513
      if self.ip.lower() == "none":
3514
        self.ip = None
3515
      else:
3516
        if not utils.IsValidIP(self.ip):
3517
          raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
3518
    else:
3519
      self.do_ip = False
3520
    self.do_bridge = (self.bridge is not None)
3521

    
3522
    instance = self.cfg.GetInstanceInfo(
3523
      self.cfg.ExpandInstanceName(self.op.instance_name))
3524
    if instance is None:
3525
      raise errors.OpPrereqError("No such instance name '%s'" %
3526
                                 self.op.instance_name)
3527
    self.op.instance_name = instance.name
3528
    self.instance = instance
3529
    return
3530

    
3531
  def Exec(self, feedback_fn):
3532
    """Modifies an instance.
3533

3534
    All parameters take effect only at the next restart of the instance.
3535
    """
3536
    result = []
3537
    instance = self.instance
3538
    if self.mem:
3539
      instance.memory = self.mem
3540
      result.append(("mem", self.mem))
3541
    if self.vcpus:
3542
      instance.vcpus = self.vcpus
3543
      result.append(("vcpus",  self.vcpus))
3544
    if self.do_ip:
3545
      instance.nics[0].ip = self.ip
3546
      result.append(("ip", self.ip))
3547
    if self.bridge:
3548
      instance.nics[0].bridge = self.bridge
3549
      result.append(("bridge", self.bridge))
3550

    
3551
    self.cfg.AddInstance(instance)
3552

    
3553
    return result
3554

    
3555

    
3556
class LUQueryExports(NoHooksLU):
3557
  """Query the exports list
3558

3559
  """
3560
  _OP_REQP = []
3561

    
3562
  def CheckPrereq(self):
3563
    """Check that the nodelist contains only existing nodes.
3564

3565
    """
3566
    self.nodes = _GetWantedNodes(self, getattr(self.op, "nodes", None))
3567

    
3568
  def Exec(self, feedback_fn):
3569
    """Compute the list of all the exported system images.
3570

3571
    Returns:
3572
      a dictionary with the structure node->(export-list)
3573
      where export-list is a list of the instances exported on
3574
      that node.
3575

3576
    """
3577
    return rpc.call_export_list(self.nodes)
3578

    
3579

    
3580
class LUExportInstance(LogicalUnit):
3581
  """Export an instance to an image in the cluster.
3582

3583
  """
3584
  HPATH = "instance-export"
3585
  HTYPE = constants.HTYPE_INSTANCE
3586
  _OP_REQP = ["instance_name", "target_node", "shutdown"]
3587

    
3588
  def BuildHooksEnv(self):
3589
    """Build hooks env.
3590

3591
    This will run on the master, primary node and target node.
3592

3593
    """
3594
    env = {
3595
      "EXPORT_NODE": self.op.target_node,
3596
      "EXPORT_DO_SHUTDOWN": self.op.shutdown,
3597
      }
3598
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3599
    nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
3600
          self.op.target_node]
3601
    return env, nl, nl
3602

    
3603
  def CheckPrereq(self):
3604
    """Check prerequisites.
3605

3606
    This checks that the instance name is a valid one.
3607

3608
    """
3609
    instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
3610
    self.instance = self.cfg.GetInstanceInfo(instance_name)
3611
    if self.instance is None:
3612
      raise errors.OpPrereqError("Instance '%s' not found" %
3613
                                 self.op.instance_name)
3614

    
3615
    # node verification
3616
    dst_node_short = self.cfg.ExpandNodeName(self.op.target_node)
3617
    self.dst_node = self.cfg.GetNodeInfo(dst_node_short)
3618

    
3619
    if self.dst_node is None:
3620
      raise errors.OpPrereqError("Destination node '%s' is unknown." %
3621
                                 self.op.target_node)
3622
    self.op.target_node = self.dst_node.name
3623

    
3624
  def Exec(self, feedback_fn):
3625
    """Export an instance to an image in the cluster.
3626

3627
    """
3628
    instance = self.instance
3629
    dst_node = self.dst_node
3630
    src_node = instance.primary_node
3631
    # shutdown the instance, unless requested not to do so
3632
    if self.op.shutdown:
3633
      op = opcodes.OpShutdownInstance(instance_name=instance.name)
3634
      self.processor.ChainOpCode(op, feedback_fn)
3635

    
3636
    vgname = self.cfg.GetVGName()
3637

    
3638
    snap_disks = []
3639

    
3640
    try:
3641
      for disk in instance.disks:
3642
        if disk.iv_name == "sda":
3643
          # new_dev_name will be a snapshot of an lvm leaf of the one we passed
3644
          new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
3645

    
3646
          if not new_dev_name:
3647
            logger.Error("could not snapshot block device %s on node %s" %
3648
                         (disk.logical_id[1], src_node))
3649
          else:
3650
            new_dev = objects.Disk(dev_type="lvm", size=disk.size,
3651
                                      logical_id=(vgname, new_dev_name),
3652
                                      physical_id=(vgname, new_dev_name),
3653
                                      iv_name=disk.iv_name)
3654
            snap_disks.append(new_dev)
3655

    
3656
    finally:
3657
      if self.op.shutdown:
3658
        op = opcodes.OpStartupInstance(instance_name=instance.name,
3659
                                       force=False)
3660
        self.processor.ChainOpCode(op, feedback_fn)
3661

    
3662
    # TODO: check for size
3663

    
3664
    for dev in snap_disks:
3665
      if not rpc.call_snapshot_export(src_node, dev, dst_node.name,
3666
                                           instance):
3667
        logger.Error("could not export block device %s from node"
3668
                     " %s to node %s" %
3669
                     (dev.logical_id[1], src_node, dst_node.name))
3670
      if not rpc.call_blockdev_remove(src_node, dev):
3671
        logger.Error("could not remove snapshot block device %s from"
3672
                     " node %s" % (dev.logical_id[1], src_node))
3673

    
3674
    if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
3675
      logger.Error("could not finalize export for instance %s on node %s" %
3676
                   (instance.name, dst_node.name))
3677

    
3678
    nodelist = self.cfg.GetNodeList()
3679
    nodelist.remove(dst_node.name)
3680

    
3681
    # on one-node clusters nodelist will be empty after the removal
3682
    # if we proceed the backup would be removed because OpQueryExports
3683
    # substitutes an empty list with the full cluster node list.
3684
    if nodelist:
3685
      op = opcodes.OpQueryExports(nodes=nodelist)
3686
      exportlist = self.processor.ChainOpCode(op, feedback_fn)
3687
      for node in exportlist:
3688
        if instance.name in exportlist[node]:
3689
          if not rpc.call_export_remove(node, instance.name):
3690
            logger.Error("could not remove older export for instance %s"
3691
                         " on node %s" % (instance.name, node))
3692

    
3693

    
3694
class TagsLU(NoHooksLU):
3695
  """Generic tags LU.
3696

3697
  This is an abstract class which is the parent of all the other tags LUs.
3698

3699
  """
3700
  def CheckPrereq(self):
3701
    """Check prerequisites.
3702

3703
    """
3704
    if self.op.kind == constants.TAG_CLUSTER:
3705
      self.target = self.cfg.GetClusterInfo()
3706
    elif self.op.kind == constants.TAG_NODE:
3707
      name = self.cfg.ExpandNodeName(self.op.name)
3708
      if name is None:
3709
        raise errors.OpPrereqError("Invalid node name (%s)" %
3710
                                   (self.op.name,))
3711
      self.op.name = name
3712
      self.target = self.cfg.GetNodeInfo(name)
3713
    elif self.op.kind == constants.TAG_INSTANCE:
3714
      name = self.cfg.ExpandInstanceName(self.op.name)
3715
      if name is None:
3716
        raise errors.OpPrereqError("Invalid instance name (%s)" %
3717
                                   (self.op.name,))
3718
      self.op.name = name
3719
      self.target = self.cfg.GetInstanceInfo(name)
3720
    else:
3721
      raise errors.OpPrereqError("Wrong tag type requested (%s)" %
3722
                                 str(self.op.kind))
3723

    
3724

    
3725
class LUGetTags(TagsLU):
3726
  """Returns the tags of a given object.
3727

3728
  """
3729
  _OP_REQP = ["kind", "name"]
3730

    
3731
  def Exec(self, feedback_fn):
3732
    """Returns the tag list.
3733

3734
    """
3735
    return self.target.GetTags()
3736

    
3737

    
3738
class LUAddTags(TagsLU):
3739
  """Sets a tag on a given object.
3740

3741
  """
3742
  _OP_REQP = ["kind", "name", "tags"]
3743

    
3744
  def CheckPrereq(self):
3745
    """Check prerequisites.
3746

3747
    This checks the type and length of the tag name and value.
3748

3749
    """
3750
    TagsLU.CheckPrereq(self)
3751
    for tag in self.op.tags:
3752
      objects.TaggableObject.ValidateTag(tag)
3753

    
3754
  def Exec(self, feedback_fn):
3755
    """Sets the tag.
3756

3757
    """
3758
    try:
3759
      for tag in self.op.tags:
3760
        self.target.AddTag(tag)
3761
    except errors.TagError, err:
3762
      raise errors.OpExecError("Error while setting tag: %s" % str(err))
3763
    try:
3764
      self.cfg.Update(self.target)
3765
    except errors.ConfigurationError:
3766
      raise errors.OpRetryError("There has been a modification to the"
3767
                                " config file and the operation has been"
3768
                                " aborted. Please retry.")
3769

    
3770

    
3771
class LUDelTags(TagsLU):
3772
  """Delete a list of tags from a given object.
3773

3774
  """
3775
  _OP_REQP = ["kind", "name", "tags"]
3776

    
3777
  def CheckPrereq(self):
3778
    """Check prerequisites.
3779

3780
    This checks that we have the given tag.
3781

3782
    """
3783
    TagsLU.CheckPrereq(self)
3784
    for tag in self.op.tags:
3785
      objects.TaggableObject.ValidateTag(tag)
3786
    del_tags = frozenset(self.op.tags)
3787
    cur_tags = self.target.GetTags()
3788
    if not del_tags <= cur_tags:
3789
      diff_tags = del_tags - cur_tags
3790
      diff_names = ["'%s'" % tag for tag in diff_tags]
3791
      diff_names.sort()
3792
      raise errors.OpPrereqError("Tag(s) %s not found" %
3793
                                 (",".join(diff_names)))
3794

    
3795
  def Exec(self, feedback_fn):
3796
    """Remove the tag from the object.
3797

3798
    """
3799
    for tag in self.op.tags:
3800
      self.target.RemoveTag(tag)
3801
    try:
3802
      self.cfg.Update(self.target)
3803
    except errors.ConfigurationError:
3804
      raise errors.OpRetryError("There has been a modification to the"
3805
                                " config file and the operation has been"
3806
                                " aborted. Please retry.")