Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ 4167825b

History | View | Annotate | Download (120.7 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0613,W0201
25

    
26
import os
27
import os.path
28
import sha
29
import socket
30
import time
31
import tempfile
32
import re
33
import platform
34

    
35
from ganeti import rpc
36
from ganeti import ssh
37
from ganeti import logger
38
from ganeti import utils
39
from ganeti import errors
40
from ganeti import hypervisor
41
from ganeti import config
42
from ganeti import constants
43
from ganeti import objects
44
from ganeti import opcodes
45
from ganeti import ssconf
46

    
47
class LogicalUnit(object):
48
  """Logical Unit base class.
49

50
  Subclasses must follow these rules:
51
    - implement CheckPrereq which also fills in the opcode instance
52
      with all the fields (even if as None)
53
    - implement Exec
54
    - implement BuildHooksEnv
55
    - redefine HPATH and HTYPE
56
    - optionally redefine their run requirements (REQ_CLUSTER,
57
      REQ_MASTER); note that all commands require root permissions
58

59
  """
60
  HPATH = None
61
  HTYPE = None
62
  _OP_REQP = []
63
  REQ_CLUSTER = True
64
  REQ_MASTER = True
65

    
66
  def __init__(self, processor, op, cfg, sstore):
67
    """Constructor for LogicalUnit.
68

69
    This needs to be overriden in derived classes in order to check op
70
    validity.
71

72
    """
73
    self.processor = processor
74
    self.op = op
75
    self.cfg = cfg
76
    self.sstore = sstore
77
    for attr_name in self._OP_REQP:
78
      attr_val = getattr(op, attr_name, None)
79
      if attr_val is None:
80
        raise errors.OpPrereqError("Required parameter '%s' missing" %
81
                                   attr_name)
82
    if self.REQ_CLUSTER:
83
      if not cfg.IsCluster():
84
        raise errors.OpPrereqError("Cluster not initialized yet,"
85
                                   " use 'gnt-cluster init' first.")
86
      if self.REQ_MASTER:
87
        master = sstore.GetMasterNode()
88
        if master != utils.HostInfo().name:
89
          raise errors.OpPrereqError("Commands must be run on the master"
90
                                     " node %s" % master)
91

    
92
  def CheckPrereq(self):
93
    """Check prerequisites for this LU.
94

95
    This method should check that the prerequisites for the execution
96
    of this LU are fulfilled. It can do internode communication, but
97
    it should be idempotent - no cluster or system changes are
98
    allowed.
99

100
    The method should raise errors.OpPrereqError in case something is
101
    not fulfilled. Its return value is ignored.
102

103
    This method should also update all the parameters of the opcode to
104
    their canonical form; e.g. a short node name must be fully
105
    expanded after this method has successfully completed (so that
106
    hooks, logging, etc. work correctly).
107

108
    """
109
    raise NotImplementedError
110

    
111
  def Exec(self, feedback_fn):
112
    """Execute the LU.
113

114
    This method should implement the actual work. It should raise
115
    errors.OpExecError for failures that are somewhat dealt with in
116
    code, or expected.
117

118
    """
119
    raise NotImplementedError
120

    
121
  def BuildHooksEnv(self):
122
    """Build hooks environment for this LU.
123

124
    This method should return a three-node tuple consisting of: a dict
125
    containing the environment that will be used for running the
126
    specific hook for this LU, a list of node names on which the hook
127
    should run before the execution, and a list of node names on which
128
    the hook should run after the execution.
129

130
    The keys of the dict must not have 'GANETI_' prefixed as this will
131
    be handled in the hooks runner. Also note additional keys will be
132
    added by the hooks runner. If the LU doesn't define any
133
    environment, an empty dict (and not None) should be returned.
134

135
    As for the node lists, the master should not be included in the
136
    them, as it will be added by the hooks runner in case this LU
137
    requires a cluster to run on (otherwise we don't have a node
138
    list). No nodes should be returned as an empty list (and not
139
    None).
140

141
    Note that if the HPATH for a LU class is None, this function will
142
    not be called.
143

144
    """
145
    raise NotImplementedError
146

    
147

    
148
class NoHooksLU(LogicalUnit):
149
  """Simple LU which runs no hooks.
150

151
  This LU is intended as a parent for other LogicalUnits which will
152
  run no hooks, in order to reduce duplicate code.
153

154
  """
155
  HPATH = None
156
  HTYPE = None
157

    
158
  def BuildHooksEnv(self):
159
    """Build hooks env.
160

161
    This is a no-op, since we don't run hooks.
162

163
    """
164
    return
165

    
166

    
167
def _GetWantedNodes(lu, nodes):
168
  """Returns list of checked and expanded node names.
169

170
  Args:
171
    nodes: List of nodes (strings) or None for all
172

173
  """
174
  if not isinstance(nodes, list):
175
    raise errors.OpPrereqError("Invalid argument type 'nodes'")
176

    
177
  if nodes:
178
    wanted = []
179

    
180
    for name in nodes:
181
      node = lu.cfg.ExpandNodeName(name)
182
      if node is None:
183
        raise errors.OpPrereqError("No such node name '%s'" % name)
184
      wanted.append(node)
185

    
186
  else:
187
    wanted = lu.cfg.GetNodeList()
188
  return utils.NiceSort(wanted)
189

    
190

    
191
def _GetWantedInstances(lu, instances):
192
  """Returns list of checked and expanded instance names.
193

194
  Args:
195
    instances: List of instances (strings) or None for all
196

197
  """
198
  if not isinstance(instances, list):
199
    raise errors.OpPrereqError("Invalid argument type 'instances'")
200

    
201
  if instances:
202
    wanted = []
203

    
204
    for name in instances:
205
      instance = lu.cfg.ExpandInstanceName(name)
206
      if instance is None:
207
        raise errors.OpPrereqError("No such instance name '%s'" % name)
208
      wanted.append(instance)
209

    
210
  else:
211
    wanted = lu.cfg.GetInstanceList()
212
  return utils.NiceSort(wanted)
213

    
214

    
215
def _CheckOutputFields(static, dynamic, selected):
216
  """Checks whether all selected fields are valid.
217

218
  Args:
219
    static: Static fields
220
    dynamic: Dynamic fields
221

222
  """
223
  static_fields = frozenset(static)
224
  dynamic_fields = frozenset(dynamic)
225

    
226
  all_fields = static_fields | dynamic_fields
227

    
228
  if not all_fields.issuperset(selected):
229
    raise errors.OpPrereqError("Unknown output fields selected: %s"
230
                               % ",".join(frozenset(selected).
231
                                          difference(all_fields)))
232

    
233

    
234
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
235
                          memory, vcpus, nics):
236
  """Builds instance related env variables for hooks from single variables.
237

238
  Args:
239
    secondary_nodes: List of secondary nodes as strings
240
  """
241
  env = {
242
    "INSTANCE_NAME": name,
243
    "INSTANCE_PRIMARY": primary_node,
244
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
245
    "INSTANCE_OS_TYPE": os_type,
246
    "INSTANCE_STATUS": status,
247
    "INSTANCE_MEMORY": memory,
248
    "INSTANCE_VCPUS": vcpus,
249
  }
250

    
251
  if nics:
252
    nic_count = len(nics)
253
    for idx, (ip, bridge) in enumerate(nics):
254
      if ip is None:
255
        ip = ""
256
      env["INSTANCE_NIC%d_IP" % idx] = ip
257
      env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
258
  else:
259
    nic_count = 0
260

    
261
  env["INSTANCE_NIC_COUNT"] = nic_count
262

    
263
  return env
264

    
265

    
266
def _BuildInstanceHookEnvByObject(instance, override=None):
267
  """Builds instance related env variables for hooks from an object.
268

269
  Args:
270
    instance: objects.Instance object of instance
271
    override: dict of values to override
272
  """
273
  args = {
274
    'name': instance.name,
275
    'primary_node': instance.primary_node,
276
    'secondary_nodes': instance.secondary_nodes,
277
    'os_type': instance.os,
278
    'status': instance.os,
279
    'memory': instance.memory,
280
    'vcpus': instance.vcpus,
281
    'nics': [(nic.ip, nic.bridge) for nic in instance.nics],
282
  }
283
  if override:
284
    args.update(override)
285
  return _BuildInstanceHookEnv(**args)
286

    
287

    
288
def _UpdateEtcHosts(fullnode, ip):
289
  """Ensure a node has a correct entry in /etc/hosts.
290

291
  Args:
292
    fullnode - Fully qualified domain name of host. (str)
293
    ip       - IPv4 address of host (str)
294

295
  """
296
  node = fullnode.split(".", 1)[0]
297

    
298
  f = open('/etc/hosts', 'r+')
299

    
300
  inthere = False
301

    
302
  save_lines = []
303
  add_lines = []
304
  removed = False
305

    
306
  while True:
307
    rawline = f.readline()
308

    
309
    if not rawline:
310
      # End of file
311
      break
312

    
313
    line = rawline.split('\n')[0]
314

    
315
    # Strip off comments
316
    line = line.split('#')[0]
317

    
318
    if not line:
319
      # Entire line was comment, skip
320
      save_lines.append(rawline)
321
      continue
322

    
323
    fields = line.split()
324

    
325
    haveall = True
326
    havesome = False
327
    for spec in [ ip, fullnode, node ]:
328
      if spec not in fields:
329
        haveall = False
330
      if spec in fields:
331
        havesome = True
332

    
333
    if haveall:
334
      inthere = True
335
      save_lines.append(rawline)
336
      continue
337

    
338
    if havesome and not haveall:
339
      # Line (old, or manual?) which is missing some.  Remove.
340
      removed = True
341
      continue
342

    
343
    save_lines.append(rawline)
344

    
345
  if not inthere:
346
    add_lines.append('%s\t%s %s\n' % (ip, fullnode, node))
347

    
348
  if removed:
349
    if add_lines:
350
      save_lines = save_lines + add_lines
351

    
352
    # We removed a line, write a new file and replace old.
353
    fd, tmpname = tempfile.mkstemp('tmp', 'hosts_', '/etc')
354
    newfile = os.fdopen(fd, 'w')
355
    newfile.write(''.join(save_lines))
356
    newfile.close()
357
    os.rename(tmpname, '/etc/hosts')
358

    
359
  elif add_lines:
360
    # Simply appending a new line will do the trick.
361
    f.seek(0, 2)
362
    for add in add_lines:
363
      f.write(add)
364

    
365
  f.close()
366

    
367

    
368
def _UpdateKnownHosts(fullnode, ip, pubkey):
369
  """Ensure a node has a correct known_hosts entry.
370

371
  Args:
372
    fullnode - Fully qualified domain name of host. (str)
373
    ip       - IPv4 address of host (str)
374
    pubkey   - the public key of the cluster
375

376
  """
377
  if os.path.exists(constants.SSH_KNOWN_HOSTS_FILE):
378
    f = open(constants.SSH_KNOWN_HOSTS_FILE, 'r+')
379
  else:
380
    f = open(constants.SSH_KNOWN_HOSTS_FILE, 'w+')
381

    
382
  inthere = False
383

    
384
  save_lines = []
385
  add_lines = []
386
  removed = False
387

    
388
  while True:
389
    rawline = f.readline()
390
    logger.Debug('read %s' % (repr(rawline),))
391

    
392
    if not rawline:
393
      # End of file
394
      break
395

    
396
    line = rawline.split('\n')[0]
397

    
398
    parts = line.split(' ')
399
    fields = parts[0].split(',')
400
    key = parts[2]
401

    
402
    haveall = True
403
    havesome = False
404
    for spec in [ ip, fullnode ]:
405
      if spec not in fields:
406
        haveall = False
407
      if spec in fields:
408
        havesome = True
409

    
410
    logger.Debug("key, pubkey = %s." % (repr((key, pubkey)),))
411
    if haveall and key == pubkey:
412
      inthere = True
413
      save_lines.append(rawline)
414
      logger.Debug("Keeping known_hosts '%s'." % (repr(rawline),))
415
      continue
416

    
417
    if havesome and (not haveall or key != pubkey):
418
      removed = True
419
      logger.Debug("Discarding known_hosts '%s'." % (repr(rawline),))
420
      continue
421

    
422
    save_lines.append(rawline)
423

    
424
  if not inthere:
425
    add_lines.append('%s,%s ssh-rsa %s\n' % (fullnode, ip, pubkey))
426
    logger.Debug("Adding known_hosts '%s'." % (repr(add_lines[-1]),))
427

    
428
  if removed:
429
    save_lines = save_lines + add_lines
430

    
431
    # Write a new file and replace old.
432
    fd, tmpname = tempfile.mkstemp('.tmp', 'known_hosts.',
433
                                   constants.DATA_DIR)
434
    newfile = os.fdopen(fd, 'w')
435
    try:
436
      newfile.write(''.join(save_lines))
437
    finally:
438
      newfile.close()
439
    logger.Debug("Wrote new known_hosts.")
440
    os.rename(tmpname, constants.SSH_KNOWN_HOSTS_FILE)
441

    
442
  elif add_lines:
443
    # Simply appending a new line will do the trick.
444
    f.seek(0, 2)
445
    for add in add_lines:
446
      f.write(add)
447

    
448
  f.close()
449

    
450

    
451
def _HasValidVG(vglist, vgname):
452
  """Checks if the volume group list is valid.
453

454
  A non-None return value means there's an error, and the return value
455
  is the error message.
456

457
  """
458
  vgsize = vglist.get(vgname, None)
459
  if vgsize is None:
460
    return "volume group '%s' missing" % vgname
461
  elif vgsize < 20480:
462
    return ("volume group '%s' too small (20480MiB required, %dMib found)" %
463
            (vgname, vgsize))
464
  return None
465

    
466

    
467
def _InitSSHSetup(node):
468
  """Setup the SSH configuration for the cluster.
469

470

471
  This generates a dsa keypair for root, adds the pub key to the
472
  permitted hosts and adds the hostkey to its own known hosts.
473

474
  Args:
475
    node: the name of this host as a fqdn
476

477
  """
478
  if os.path.exists('/root/.ssh/id_dsa'):
479
    utils.CreateBackup('/root/.ssh/id_dsa')
480
  if os.path.exists('/root/.ssh/id_dsa.pub'):
481
    utils.CreateBackup('/root/.ssh/id_dsa.pub')
482

    
483
  utils.RemoveFile('/root/.ssh/id_dsa')
484
  utils.RemoveFile('/root/.ssh/id_dsa.pub')
485

    
486
  result = utils.RunCmd(["ssh-keygen", "-t", "dsa",
487
                         "-f", "/root/.ssh/id_dsa",
488
                         "-q", "-N", ""])
489
  if result.failed:
490
    raise errors.OpExecError("Could not generate ssh keypair, error %s" %
491
                             result.output)
492

    
493
  f = open('/root/.ssh/id_dsa.pub', 'r')
494
  try:
495
    utils.AddAuthorizedKey('/root/.ssh/authorized_keys', f.read(8192))
496
  finally:
497
    f.close()
498

    
499

    
500
def _InitGanetiServerSetup(ss):
501
  """Setup the necessary configuration for the initial node daemon.
502

503
  This creates the nodepass file containing the shared password for
504
  the cluster and also generates the SSL certificate.
505

506
  """
507
  # Create pseudo random password
508
  randpass = sha.new(os.urandom(64)).hexdigest()
509
  # and write it into sstore
510
  ss.SetKey(ss.SS_NODED_PASS, randpass)
511

    
512
  result = utils.RunCmd(["openssl", "req", "-new", "-newkey", "rsa:1024",
513
                         "-days", str(365*5), "-nodes", "-x509",
514
                         "-keyout", constants.SSL_CERT_FILE,
515
                         "-out", constants.SSL_CERT_FILE, "-batch"])
516
  if result.failed:
517
    raise errors.OpExecError("could not generate server ssl cert, command"
518
                             " %s had exitcode %s and error message %s" %
519
                             (result.cmd, result.exit_code, result.output))
520

    
521
  os.chmod(constants.SSL_CERT_FILE, 0400)
522

    
523
  result = utils.RunCmd([constants.NODE_INITD_SCRIPT, "restart"])
524

    
525
  if result.failed:
526
    raise errors.OpExecError("Could not start the node daemon, command %s"
527
                             " had exitcode %s and error %s" %
528
                             (result.cmd, result.exit_code, result.output))
529

    
530

    
531
class LUInitCluster(LogicalUnit):
532
  """Initialise the cluster.
533

534
  """
535
  HPATH = "cluster-init"
536
  HTYPE = constants.HTYPE_CLUSTER
537
  _OP_REQP = ["cluster_name", "hypervisor_type", "vg_name", "mac_prefix",
538
              "def_bridge", "master_netdev"]
539
  REQ_CLUSTER = False
540

    
541
  def BuildHooksEnv(self):
542
    """Build hooks env.
543

544
    Notes: Since we don't require a cluster, we must manually add
545
    ourselves in the post-run node list.
546

547
    """
548
    return {}, [], [self.hostname.name]
549

    
550
  def CheckPrereq(self):
551
    """Verify that the passed name is a valid one.
552

553
    """
554
    if config.ConfigWriter.IsCluster():
555
      raise errors.OpPrereqError("Cluster is already initialised")
556

    
557
    self.hostname = hostname = utils.HostInfo()
558

    
559
    if hostname.ip.startswith("127."):
560
      raise errors.OpPrereqError("This host's IP resolves to the private"
561
                                 " range (%s). Please fix DNS or /etc/hosts." %
562
                                 (hostname.ip,))
563

    
564
    self.clustername = clustername = utils.HostInfo(self.op.cluster_name)
565

    
566
    if not utils.TcpPing(constants.LOCALHOST_IP_ADDRESS, hostname.ip,
567
                         constants.DEFAULT_NODED_PORT):
568
      raise errors.OpPrereqError("Inconsistency: this host's name resolves"
569
                                 " to %s,\nbut this ip address does not"
570
                                 " belong to this host."
571
                                 " Aborting." % hostname.ip)
572

    
573
    secondary_ip = getattr(self.op, "secondary_ip", None)
574
    if secondary_ip and not utils.IsValidIP(secondary_ip):
575
      raise errors.OpPrereqError("Invalid secondary ip given")
576
    if (secondary_ip and
577
        secondary_ip != hostname.ip and
578
        (not utils.TcpPing(constants.LOCALHOST_IP_ADDRESS, secondary_ip,
579
                           constants.DEFAULT_NODED_PORT))):
580
      raise errors.OpPrereqError("You gave %s as secondary IP,\n"
581
                                 "but it does not belong to this host." %
582
                                 secondary_ip)
583
    self.secondary_ip = secondary_ip
584

    
585
    # checks presence of the volume group given
586
    vgstatus = _HasValidVG(utils.ListVolumeGroups(), self.op.vg_name)
587

    
588
    if vgstatus:
589
      raise errors.OpPrereqError("Error: %s" % vgstatus)
590

    
591
    if not re.match("^[0-9a-z]{2}:[0-9a-z]{2}:[0-9a-z]{2}$",
592
                    self.op.mac_prefix):
593
      raise errors.OpPrereqError("Invalid mac prefix given '%s'" %
594
                                 self.op.mac_prefix)
595

    
596
    if self.op.hypervisor_type not in hypervisor.VALID_HTYPES:
597
      raise errors.OpPrereqError("Invalid hypervisor type given '%s'" %
598
                                 self.op.hypervisor_type)
599

    
600
    result = utils.RunCmd(["ip", "link", "show", "dev", self.op.master_netdev])
601
    if result.failed:
602
      raise errors.OpPrereqError("Invalid master netdev given (%s): '%s'" %
603
                                 (self.op.master_netdev,
604
                                  result.output.strip()))
605

    
606
  def Exec(self, feedback_fn):
607
    """Initialize the cluster.
608

609
    """
610
    clustername = self.clustername
611
    hostname = self.hostname
612

    
613
    # set up the simple store
614
    self.sstore = ss = ssconf.SimpleStore()
615
    ss.SetKey(ss.SS_HYPERVISOR, self.op.hypervisor_type)
616
    ss.SetKey(ss.SS_MASTER_NODE, hostname.name)
617
    ss.SetKey(ss.SS_MASTER_IP, clustername.ip)
618
    ss.SetKey(ss.SS_MASTER_NETDEV, self.op.master_netdev)
619
    ss.SetKey(ss.SS_CLUSTER_NAME, clustername.name)
620

    
621
    # set up the inter-node password and certificate
622
    _InitGanetiServerSetup(ss)
623

    
624
    # start the master ip
625
    rpc.call_node_start_master(hostname.name)
626

    
627
    # set up ssh config and /etc/hosts
628
    f = open('/etc/ssh/ssh_host_rsa_key.pub', 'r')
629
    try:
630
      sshline = f.read()
631
    finally:
632
      f.close()
633
    sshkey = sshline.split(" ")[1]
634

    
635
    _UpdateEtcHosts(hostname.name, hostname.ip)
636

    
637
    _UpdateKnownHosts(hostname.name, hostname.ip, sshkey)
638

    
639
    _InitSSHSetup(hostname.name)
640

    
641
    # init of cluster config file
642
    self.cfg = cfgw = config.ConfigWriter()
643
    cfgw.InitConfig(hostname.name, hostname.ip, self.secondary_ip,
644
                    sshkey, self.op.mac_prefix,
645
                    self.op.vg_name, self.op.def_bridge)
646

    
647

    
648
class LUDestroyCluster(NoHooksLU):
649
  """Logical unit for destroying the cluster.
650

651
  """
652
  _OP_REQP = []
653

    
654
  def CheckPrereq(self):
655
    """Check prerequisites.
656

657
    This checks whether the cluster is empty.
658

659
    Any errors are signalled by raising errors.OpPrereqError.
660

661
    """
662
    master = self.sstore.GetMasterNode()
663

    
664
    nodelist = self.cfg.GetNodeList()
665
    if len(nodelist) != 1 or nodelist[0] != master:
666
      raise errors.OpPrereqError("There are still %d node(s) in"
667
                                 " this cluster." % (len(nodelist) - 1))
668
    instancelist = self.cfg.GetInstanceList()
669
    if instancelist:
670
      raise errors.OpPrereqError("There are still %d instance(s) in"
671
                                 " this cluster." % len(instancelist))
672

    
673
  def Exec(self, feedback_fn):
674
    """Destroys the cluster.
675

676
    """
677
    utils.CreateBackup('/root/.ssh/id_dsa')
678
    utils.CreateBackup('/root/.ssh/id_dsa.pub')
679
    rpc.call_node_leave_cluster(self.sstore.GetMasterNode())
680

    
681

    
682
class LUVerifyCluster(NoHooksLU):
683
  """Verifies the cluster status.
684

685
  """
686
  _OP_REQP = []
687

    
688
  def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
689
                  remote_version, feedback_fn):
690
    """Run multiple tests against a node.
691

692
    Test list:
693
      - compares ganeti version
694
      - checks vg existance and size > 20G
695
      - checks config file checksum
696
      - checks ssh to other nodes
697

698
    Args:
699
      node: name of the node to check
700
      file_list: required list of files
701
      local_cksum: dictionary of local files and their checksums
702

703
    """
704
    # compares ganeti version
705
    local_version = constants.PROTOCOL_VERSION
706
    if not remote_version:
707
      feedback_fn(" - ERROR: connection to %s failed" % (node))
708
      return True
709

    
710
    if local_version != remote_version:
711
      feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
712
                      (local_version, node, remote_version))
713
      return True
714

    
715
    # checks vg existance and size > 20G
716

    
717
    bad = False
718
    if not vglist:
719
      feedback_fn("  - ERROR: unable to check volume groups on node %s." %
720
                      (node,))
721
      bad = True
722
    else:
723
      vgstatus = _HasValidVG(vglist, self.cfg.GetVGName())
724
      if vgstatus:
725
        feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
726
        bad = True
727

    
728
    # checks config file checksum
729
    # checks ssh to any
730

    
731
    if 'filelist' not in node_result:
732
      bad = True
733
      feedback_fn("  - ERROR: node hasn't returned file checksum data")
734
    else:
735
      remote_cksum = node_result['filelist']
736
      for file_name in file_list:
737
        if file_name not in remote_cksum:
738
          bad = True
739
          feedback_fn("  - ERROR: file '%s' missing" % file_name)
740
        elif remote_cksum[file_name] != local_cksum[file_name]:
741
          bad = True
742
          feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
743

    
744
    if 'nodelist' not in node_result:
745
      bad = True
746
      feedback_fn("  - ERROR: node hasn't returned node connectivity data")
747
    else:
748
      if node_result['nodelist']:
749
        bad = True
750
        for node in node_result['nodelist']:
751
          feedback_fn("  - ERROR: communication with node '%s': %s" %
752
                          (node, node_result['nodelist'][node]))
753
    hyp_result = node_result.get('hypervisor', None)
754
    if hyp_result is not None:
755
      feedback_fn("  - ERROR: hypervisor verify failure: '%s'" % hyp_result)
756
    return bad
757

    
758
  def _VerifyInstance(self, instance, node_vol_is, node_instance, feedback_fn):
759
    """Verify an instance.
760

761
    This function checks to see if the required block devices are
762
    available on the instance's node.
763

764
    """
765
    bad = False
766

    
767
    instancelist = self.cfg.GetInstanceList()
768
    if not instance in instancelist:
769
      feedback_fn("  - ERROR: instance %s not in instance list %s" %
770
                      (instance, instancelist))
771
      bad = True
772

    
773
    instanceconfig = self.cfg.GetInstanceInfo(instance)
774
    node_current = instanceconfig.primary_node
775

    
776
    node_vol_should = {}
777
    instanceconfig.MapLVsByNode(node_vol_should)
778

    
779
    for node in node_vol_should:
780
      for volume in node_vol_should[node]:
781
        if node not in node_vol_is or volume not in node_vol_is[node]:
782
          feedback_fn("  - ERROR: volume %s missing on node %s" %
783
                          (volume, node))
784
          bad = True
785

    
786
    if not instanceconfig.status == 'down':
787
      if not instance in node_instance[node_current]:
788
        feedback_fn("  - ERROR: instance %s not running on node %s" %
789
                        (instance, node_current))
790
        bad = True
791

    
792
    for node in node_instance:
793
      if (not node == node_current):
794
        if instance in node_instance[node]:
795
          feedback_fn("  - ERROR: instance %s should not run on node %s" %
796
                          (instance, node))
797
          bad = True
798

    
799
    return not bad
800

    
801
  def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
802
    """Verify if there are any unknown volumes in the cluster.
803

804
    The .os, .swap and backup volumes are ignored. All other volumes are
805
    reported as unknown.
806

807
    """
808
    bad = False
809

    
810
    for node in node_vol_is:
811
      for volume in node_vol_is[node]:
812
        if node not in node_vol_should or volume not in node_vol_should[node]:
813
          feedback_fn("  - ERROR: volume %s on node %s should not exist" %
814
                      (volume, node))
815
          bad = True
816
    return bad
817

    
818
  def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
819
    """Verify the list of running instances.
820

821
    This checks what instances are running but unknown to the cluster.
822

823
    """
824
    bad = False
825
    for node in node_instance:
826
      for runninginstance in node_instance[node]:
827
        if runninginstance not in instancelist:
828
          feedback_fn("  - ERROR: instance %s on node %s should not exist" %
829
                          (runninginstance, node))
830
          bad = True
831
    return bad
832

    
833
  def CheckPrereq(self):
834
    """Check prerequisites.
835

836
    This has no prerequisites.
837

838
    """
839
    pass
840

    
841
  def Exec(self, feedback_fn):
842
    """Verify integrity of cluster, performing various test on nodes.
843

844
    """
845
    bad = False
846
    feedback_fn("* Verifying global settings")
847
    self.cfg.VerifyConfig()
848

    
849
    master = self.sstore.GetMasterNode()
850
    vg_name = self.cfg.GetVGName()
851
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
852
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
853
    node_volume = {}
854
    node_instance = {}
855

    
856
    # FIXME: verify OS list
857
    # do local checksums
858
    file_names = list(self.sstore.GetFileList())
859
    file_names.append(constants.SSL_CERT_FILE)
860
    file_names.append(constants.CLUSTER_CONF_FILE)
861
    local_checksums = utils.FingerprintFiles(file_names)
862

    
863
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
864
    all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
865
    all_instanceinfo = rpc.call_instance_list(nodelist)
866
    all_vglist = rpc.call_vg_list(nodelist)
867
    node_verify_param = {
868
      'filelist': file_names,
869
      'nodelist': nodelist,
870
      'hypervisor': None,
871
      }
872
    all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
873
    all_rversion = rpc.call_version(nodelist)
874

    
875
    for node in nodelist:
876
      feedback_fn("* Verifying node %s" % node)
877
      result = self._VerifyNode(node, file_names, local_checksums,
878
                                all_vglist[node], all_nvinfo[node],
879
                                all_rversion[node], feedback_fn)
880
      bad = bad or result
881

    
882
      # node_volume
883
      volumeinfo = all_volumeinfo[node]
884

    
885
      if type(volumeinfo) != dict:
886
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
887
        bad = True
888
        continue
889

    
890
      node_volume[node] = volumeinfo
891

    
892
      # node_instance
893
      nodeinstance = all_instanceinfo[node]
894
      if type(nodeinstance) != list:
895
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
896
        bad = True
897
        continue
898

    
899
      node_instance[node] = nodeinstance
900

    
901
    node_vol_should = {}
902

    
903
    for instance in instancelist:
904
      feedback_fn("* Verifying instance %s" % instance)
905
      result =  self._VerifyInstance(instance, node_volume, node_instance,
906
                                     feedback_fn)
907
      bad = bad or result
908

    
909
      inst_config = self.cfg.GetInstanceInfo(instance)
910

    
911
      inst_config.MapLVsByNode(node_vol_should)
912

    
913
    feedback_fn("* Verifying orphan volumes")
914
    result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
915
                                       feedback_fn)
916
    bad = bad or result
917

    
918
    feedback_fn("* Verifying remaining instances")
919
    result = self._VerifyOrphanInstances(instancelist, node_instance,
920
                                         feedback_fn)
921
    bad = bad or result
922

    
923
    return int(bad)
924

    
925

    
926
class LURenameCluster(LogicalUnit):
927
  """Rename the cluster.
928

929
  """
930
  HPATH = "cluster-rename"
931
  HTYPE = constants.HTYPE_CLUSTER
932
  _OP_REQP = ["name"]
933

    
934
  def BuildHooksEnv(self):
935
    """Build hooks env.
936

937
    """
938
    env = {
939
      "NEW_NAME": self.op.name,
940
      }
941
    mn = self.sstore.GetMasterNode()
942
    return env, [mn], [mn]
943

    
944
  def CheckPrereq(self):
945
    """Verify that the passed name is a valid one.
946

947
    """
948
    hostname = utils.HostInfo(self.op.name)
949

    
950
    new_name = hostname.name
951
    self.ip = new_ip = hostname.ip
952
    old_name = self.sstore.GetClusterName()
953
    old_ip = self.sstore.GetMasterIP()
954
    if new_name == old_name and new_ip == old_ip:
955
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
956
                                 " cluster has changed")
957
    if new_ip != old_ip:
958
      result = utils.RunCmd(["fping", "-q", new_ip])
959
      if not result.failed:
960
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
961
                                   " reachable on the network. Aborting." %
962
                                   new_ip)
963

    
964
    self.op.name = new_name
965

    
966
  def Exec(self, feedback_fn):
967
    """Rename the cluster.
968

969
    """
970
    clustername = self.op.name
971
    ip = self.ip
972
    ss = self.sstore
973

    
974
    # shutdown the master IP
975
    master = ss.GetMasterNode()
976
    if not rpc.call_node_stop_master(master):
977
      raise errors.OpExecError("Could not disable the master role")
978

    
979
    try:
980
      # modify the sstore
981
      ss.SetKey(ss.SS_MASTER_IP, ip)
982
      ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
983

    
984
      # Distribute updated ss config to all nodes
985
      myself = self.cfg.GetNodeInfo(master)
986
      dist_nodes = self.cfg.GetNodeList()
987
      if myself.name in dist_nodes:
988
        dist_nodes.remove(myself.name)
989

    
990
      logger.Debug("Copying updated ssconf data to all nodes")
991
      for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
992
        fname = ss.KeyToFilename(keyname)
993
        result = rpc.call_upload_file(dist_nodes, fname)
994
        for to_node in dist_nodes:
995
          if not result[to_node]:
996
            logger.Error("copy of file %s to node %s failed" %
997
                         (fname, to_node))
998
    finally:
999
      if not rpc.call_node_start_master(master):
1000
        logger.Error("Could not re-enable the master role on the master,\n"
1001
                     "please restart manually.")
1002

    
1003

    
1004
def _WaitForSync(cfgw, instance, oneshot=False, unlock=False):
1005
  """Sleep and poll for an instance's disk to sync.
1006

1007
  """
1008
  if not instance.disks:
1009
    return True
1010

    
1011
  if not oneshot:
1012
    logger.ToStdout("Waiting for instance %s to sync disks." % instance.name)
1013

    
1014
  node = instance.primary_node
1015

    
1016
  for dev in instance.disks:
1017
    cfgw.SetDiskID(dev, node)
1018

    
1019
  retries = 0
1020
  while True:
1021
    max_time = 0
1022
    done = True
1023
    cumul_degraded = False
1024
    rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1025
    if not rstats:
1026
      logger.ToStderr("Can't get any data from node %s" % node)
1027
      retries += 1
1028
      if retries >= 10:
1029
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1030
                                 " aborting." % node)
1031
      time.sleep(6)
1032
      continue
1033
    retries = 0
1034
    for i in range(len(rstats)):
1035
      mstat = rstats[i]
1036
      if mstat is None:
1037
        logger.ToStderr("Can't compute data for node %s/%s" %
1038
                        (node, instance.disks[i].iv_name))
1039
        continue
1040
      perc_done, est_time, is_degraded = mstat
1041
      cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1042
      if perc_done is not None:
1043
        done = False
1044
        if est_time is not None:
1045
          rem_time = "%d estimated seconds remaining" % est_time
1046
          max_time = est_time
1047
        else:
1048
          rem_time = "no time estimate"
1049
        logger.ToStdout("- device %s: %5.2f%% done, %s" %
1050
                        (instance.disks[i].iv_name, perc_done, rem_time))
1051
    if done or oneshot:
1052
      break
1053

    
1054
    if unlock:
1055
      utils.Unlock('cmd')
1056
    try:
1057
      time.sleep(min(60, max_time))
1058
    finally:
1059
      if unlock:
1060
        utils.Lock('cmd')
1061

    
1062
  if done:
1063
    logger.ToStdout("Instance %s's disks are in sync." % instance.name)
1064
  return not cumul_degraded
1065

    
1066

    
1067
def _CheckDiskConsistency(cfgw, dev, node, on_primary):
1068
  """Check that mirrors are not degraded.
1069

1070
  """
1071
  cfgw.SetDiskID(dev, node)
1072

    
1073
  result = True
1074
  if on_primary or dev.AssembleOnSecondary():
1075
    rstats = rpc.call_blockdev_find(node, dev)
1076
    if not rstats:
1077
      logger.ToStderr("Can't get any data from node %s" % node)
1078
      result = False
1079
    else:
1080
      result = result and (not rstats[5])
1081
  if dev.children:
1082
    for child in dev.children:
1083
      result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1084

    
1085
  return result
1086

    
1087

    
1088
class LUDiagnoseOS(NoHooksLU):
1089
  """Logical unit for OS diagnose/query.
1090

1091
  """
1092
  _OP_REQP = []
1093

    
1094
  def CheckPrereq(self):
1095
    """Check prerequisites.
1096

1097
    This always succeeds, since this is a pure query LU.
1098

1099
    """
1100
    return
1101

    
1102
  def Exec(self, feedback_fn):
1103
    """Compute the list of OSes.
1104

1105
    """
1106
    node_list = self.cfg.GetNodeList()
1107
    node_data = rpc.call_os_diagnose(node_list)
1108
    if node_data == False:
1109
      raise errors.OpExecError("Can't gather the list of OSes")
1110
    return node_data
1111

    
1112

    
1113
class LURemoveNode(LogicalUnit):
1114
  """Logical unit for removing a node.
1115

1116
  """
1117
  HPATH = "node-remove"
1118
  HTYPE = constants.HTYPE_NODE
1119
  _OP_REQP = ["node_name"]
1120

    
1121
  def BuildHooksEnv(self):
1122
    """Build hooks env.
1123

1124
    This doesn't run on the target node in the pre phase as a failed
1125
    node would not allows itself to run.
1126

1127
    """
1128
    env = {
1129
      "NODE_NAME": self.op.node_name,
1130
      }
1131
    all_nodes = self.cfg.GetNodeList()
1132
    all_nodes.remove(self.op.node_name)
1133
    return env, all_nodes, all_nodes
1134

    
1135
  def CheckPrereq(self):
1136
    """Check prerequisites.
1137

1138
    This checks:
1139
     - the node exists in the configuration
1140
     - it does not have primary or secondary instances
1141
     - it's not the master
1142

1143
    Any errors are signalled by raising errors.OpPrereqError.
1144

1145
    """
1146
    node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1147
    if node is None:
1148
      raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1149

    
1150
    instance_list = self.cfg.GetInstanceList()
1151

    
1152
    masternode = self.sstore.GetMasterNode()
1153
    if node.name == masternode:
1154
      raise errors.OpPrereqError("Node is the master node,"
1155
                                 " you need to failover first.")
1156

    
1157
    for instance_name in instance_list:
1158
      instance = self.cfg.GetInstanceInfo(instance_name)
1159
      if node.name == instance.primary_node:
1160
        raise errors.OpPrereqError("Instance %s still running on the node,"
1161
                                   " please remove first." % instance_name)
1162
      if node.name in instance.secondary_nodes:
1163
        raise errors.OpPrereqError("Instance %s has node as a secondary,"
1164
                                   " please remove first." % instance_name)
1165
    self.op.node_name = node.name
1166
    self.node = node
1167

    
1168
  def Exec(self, feedback_fn):
1169
    """Removes the node from the cluster.
1170

1171
    """
1172
    node = self.node
1173
    logger.Info("stopping the node daemon and removing configs from node %s" %
1174
                node.name)
1175

    
1176
    rpc.call_node_leave_cluster(node.name)
1177

    
1178
    ssh.SSHCall(node.name, 'root', "%s stop" % constants.NODE_INITD_SCRIPT)
1179

    
1180
    logger.Info("Removing node %s from config" % node.name)
1181

    
1182
    self.cfg.RemoveNode(node.name)
1183

    
1184

    
1185
class LUQueryNodes(NoHooksLU):
1186
  """Logical unit for querying nodes.
1187

1188
  """
1189
  _OP_REQP = ["output_fields", "names"]
1190

    
1191
  def CheckPrereq(self):
1192
    """Check prerequisites.
1193

1194
    This checks that the fields required are valid output fields.
1195

1196
    """
1197
    self.dynamic_fields = frozenset(["dtotal", "dfree",
1198
                                     "mtotal", "mnode", "mfree",
1199
                                     "bootid"])
1200

    
1201
    _CheckOutputFields(static=["name", "pinst_cnt", "sinst_cnt",
1202
                               "pinst_list", "sinst_list",
1203
                               "pip", "sip"],
1204
                       dynamic=self.dynamic_fields,
1205
                       selected=self.op.output_fields)
1206

    
1207
    self.wanted = _GetWantedNodes(self, self.op.names)
1208

    
1209
  def Exec(self, feedback_fn):
1210
    """Computes the list of nodes and their attributes.
1211

1212
    """
1213
    nodenames = self.wanted
1214
    nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1215

    
1216
    # begin data gathering
1217

    
1218
    if self.dynamic_fields.intersection(self.op.output_fields):
1219
      live_data = {}
1220
      node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1221
      for name in nodenames:
1222
        nodeinfo = node_data.get(name, None)
1223
        if nodeinfo:
1224
          live_data[name] = {
1225
            "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1226
            "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1227
            "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1228
            "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1229
            "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1230
            "bootid": nodeinfo['bootid'],
1231
            }
1232
        else:
1233
          live_data[name] = {}
1234
    else:
1235
      live_data = dict.fromkeys(nodenames, {})
1236

    
1237
    node_to_primary = dict([(name, set()) for name in nodenames])
1238
    node_to_secondary = dict([(name, set()) for name in nodenames])
1239

    
1240
    inst_fields = frozenset(("pinst_cnt", "pinst_list",
1241
                             "sinst_cnt", "sinst_list"))
1242
    if inst_fields & frozenset(self.op.output_fields):
1243
      instancelist = self.cfg.GetInstanceList()
1244

    
1245
      for instance_name in instancelist:
1246
        inst = self.cfg.GetInstanceInfo(instance_name)
1247
        if inst.primary_node in node_to_primary:
1248
          node_to_primary[inst.primary_node].add(inst.name)
1249
        for secnode in inst.secondary_nodes:
1250
          if secnode in node_to_secondary:
1251
            node_to_secondary[secnode].add(inst.name)
1252

    
1253
    # end data gathering
1254

    
1255
    output = []
1256
    for node in nodelist:
1257
      node_output = []
1258
      for field in self.op.output_fields:
1259
        if field == "name":
1260
          val = node.name
1261
        elif field == "pinst_list":
1262
          val = list(node_to_primary[node.name])
1263
        elif field == "sinst_list":
1264
          val = list(node_to_secondary[node.name])
1265
        elif field == "pinst_cnt":
1266
          val = len(node_to_primary[node.name])
1267
        elif field == "sinst_cnt":
1268
          val = len(node_to_secondary[node.name])
1269
        elif field == "pip":
1270
          val = node.primary_ip
1271
        elif field == "sip":
1272
          val = node.secondary_ip
1273
        elif field in self.dynamic_fields:
1274
          val = live_data[node.name].get(field, None)
1275
        else:
1276
          raise errors.ParameterError(field)
1277
        node_output.append(val)
1278
      output.append(node_output)
1279

    
1280
    return output
1281

    
1282

    
1283
class LUQueryNodeVolumes(NoHooksLU):
1284
  """Logical unit for getting volumes on node(s).
1285

1286
  """
1287
  _OP_REQP = ["nodes", "output_fields"]
1288

    
1289
  def CheckPrereq(self):
1290
    """Check prerequisites.
1291

1292
    This checks that the fields required are valid output fields.
1293

1294
    """
1295
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1296

    
1297
    _CheckOutputFields(static=["node"],
1298
                       dynamic=["phys", "vg", "name", "size", "instance"],
1299
                       selected=self.op.output_fields)
1300

    
1301

    
1302
  def Exec(self, feedback_fn):
1303
    """Computes the list of nodes and their attributes.
1304

1305
    """
1306
    nodenames = self.nodes
1307
    volumes = rpc.call_node_volumes(nodenames)
1308

    
1309
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
1310
             in self.cfg.GetInstanceList()]
1311

    
1312
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1313

    
1314
    output = []
1315
    for node in nodenames:
1316
      if node not in volumes or not volumes[node]:
1317
        continue
1318

    
1319
      node_vols = volumes[node][:]
1320
      node_vols.sort(key=lambda vol: vol['dev'])
1321

    
1322
      for vol in node_vols:
1323
        node_output = []
1324
        for field in self.op.output_fields:
1325
          if field == "node":
1326
            val = node
1327
          elif field == "phys":
1328
            val = vol['dev']
1329
          elif field == "vg":
1330
            val = vol['vg']
1331
          elif field == "name":
1332
            val = vol['name']
1333
          elif field == "size":
1334
            val = int(float(vol['size']))
1335
          elif field == "instance":
1336
            for inst in ilist:
1337
              if node not in lv_by_node[inst]:
1338
                continue
1339
              if vol['name'] in lv_by_node[inst][node]:
1340
                val = inst.name
1341
                break
1342
            else:
1343
              val = '-'
1344
          else:
1345
            raise errors.ParameterError(field)
1346
          node_output.append(str(val))
1347

    
1348
        output.append(node_output)
1349

    
1350
    return output
1351

    
1352

    
1353
class LUAddNode(LogicalUnit):
1354
  """Logical unit for adding node to the cluster.
1355

1356
  """
1357
  HPATH = "node-add"
1358
  HTYPE = constants.HTYPE_NODE
1359
  _OP_REQP = ["node_name"]
1360

    
1361
  def BuildHooksEnv(self):
1362
    """Build hooks env.
1363

1364
    This will run on all nodes before, and on all nodes + the new node after.
1365

1366
    """
1367
    env = {
1368
      "NODE_NAME": self.op.node_name,
1369
      "NODE_PIP": self.op.primary_ip,
1370
      "NODE_SIP": self.op.secondary_ip,
1371
      }
1372
    nodes_0 = self.cfg.GetNodeList()
1373
    nodes_1 = nodes_0 + [self.op.node_name, ]
1374
    return env, nodes_0, nodes_1
1375

    
1376
  def CheckPrereq(self):
1377
    """Check prerequisites.
1378

1379
    This checks:
1380
     - the new node is not already in the config
1381
     - it is resolvable
1382
     - its parameters (single/dual homed) matches the cluster
1383

1384
    Any errors are signalled by raising errors.OpPrereqError.
1385

1386
    """
1387
    node_name = self.op.node_name
1388
    cfg = self.cfg
1389

    
1390
    dns_data = utils.HostInfo(node_name)
1391

    
1392
    node = dns_data.name
1393
    primary_ip = self.op.primary_ip = dns_data.ip
1394
    secondary_ip = getattr(self.op, "secondary_ip", None)
1395
    if secondary_ip is None:
1396
      secondary_ip = primary_ip
1397
    if not utils.IsValidIP(secondary_ip):
1398
      raise errors.OpPrereqError("Invalid secondary IP given")
1399
    self.op.secondary_ip = secondary_ip
1400
    node_list = cfg.GetNodeList()
1401
    if node in node_list:
1402
      raise errors.OpPrereqError("Node %s is already in the configuration"
1403
                                 % node)
1404

    
1405
    for existing_node_name in node_list:
1406
      existing_node = cfg.GetNodeInfo(existing_node_name)
1407
      if (existing_node.primary_ip == primary_ip or
1408
          existing_node.secondary_ip == primary_ip or
1409
          existing_node.primary_ip == secondary_ip or
1410
          existing_node.secondary_ip == secondary_ip):
1411
        raise errors.OpPrereqError("New node ip address(es) conflict with"
1412
                                   " existing node %s" % existing_node.name)
1413

    
1414
    # check that the type of the node (single versus dual homed) is the
1415
    # same as for the master
1416
    myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1417
    master_singlehomed = myself.secondary_ip == myself.primary_ip
1418
    newbie_singlehomed = secondary_ip == primary_ip
1419
    if master_singlehomed != newbie_singlehomed:
1420
      if master_singlehomed:
1421
        raise errors.OpPrereqError("The master has no private ip but the"
1422
                                   " new node has one")
1423
      else:
1424
        raise errors.OpPrereqError("The master has a private ip but the"
1425
                                   " new node doesn't have one")
1426

    
1427
    # checks reachablity
1428
    if not utils.TcpPing(utils.HostInfo().name,
1429
                         primary_ip,
1430
                         constants.DEFAULT_NODED_PORT):
1431
      raise errors.OpPrereqError("Node not reachable by ping")
1432

    
1433
    if not newbie_singlehomed:
1434
      # check reachability from my secondary ip to newbie's secondary ip
1435
      if not utils.TcpPing(myself.secondary_ip,
1436
                           secondary_ip,
1437
                           constants.DEFAULT_NODED_PORT):
1438
        raise errors.OpPrereqError(
1439
          "Node secondary ip not reachable by TCP based ping to noded port")
1440

    
1441
    self.new_node = objects.Node(name=node,
1442
                                 primary_ip=primary_ip,
1443
                                 secondary_ip=secondary_ip)
1444

    
1445
  def Exec(self, feedback_fn):
1446
    """Adds the new node to the cluster.
1447

1448
    """
1449
    new_node = self.new_node
1450
    node = new_node.name
1451

    
1452
    # set up inter-node password and certificate and restarts the node daemon
1453
    gntpass = self.sstore.GetNodeDaemonPassword()
1454
    if not re.match('^[a-zA-Z0-9.]{1,64}$', gntpass):
1455
      raise errors.OpExecError("ganeti password corruption detected")
1456
    f = open(constants.SSL_CERT_FILE)
1457
    try:
1458
      gntpem = f.read(8192)
1459
    finally:
1460
      f.close()
1461
    # in the base64 pem encoding, neither '!' nor '.' are valid chars,
1462
    # so we use this to detect an invalid certificate; as long as the
1463
    # cert doesn't contain this, the here-document will be correctly
1464
    # parsed by the shell sequence below
1465
    if re.search('^!EOF\.', gntpem, re.MULTILINE):
1466
      raise errors.OpExecError("invalid PEM encoding in the SSL certificate")
1467
    if not gntpem.endswith("\n"):
1468
      raise errors.OpExecError("PEM must end with newline")
1469
    logger.Info("copy cluster pass to %s and starting the node daemon" % node)
1470

    
1471
    # and then connect with ssh to set password and start ganeti-noded
1472
    # note that all the below variables are sanitized at this point,
1473
    # either by being constants or by the checks above
1474
    ss = self.sstore
1475
    mycommand = ("umask 077 && "
1476
                 "echo '%s' > '%s' && "
1477
                 "cat > '%s' << '!EOF.' && \n"
1478
                 "%s!EOF.\n%s restart" %
1479
                 (gntpass, ss.KeyToFilename(ss.SS_NODED_PASS),
1480
                  constants.SSL_CERT_FILE, gntpem,
1481
                  constants.NODE_INITD_SCRIPT))
1482

    
1483
    result = ssh.SSHCall(node, 'root', mycommand, batch=False, ask_key=True)
1484
    if result.failed:
1485
      raise errors.OpExecError("Remote command on node %s, error: %s,"
1486
                               " output: %s" %
1487
                               (node, result.fail_reason, result.output))
1488

    
1489
    # check connectivity
1490
    time.sleep(4)
1491

    
1492
    result = rpc.call_version([node])[node]
1493
    if result:
1494
      if constants.PROTOCOL_VERSION == result:
1495
        logger.Info("communication to node %s fine, sw version %s match" %
1496
                    (node, result))
1497
      else:
1498
        raise errors.OpExecError("Version mismatch master version %s,"
1499
                                 " node version %s" %
1500
                                 (constants.PROTOCOL_VERSION, result))
1501
    else:
1502
      raise errors.OpExecError("Cannot get version from the new node")
1503

    
1504
    # setup ssh on node
1505
    logger.Info("copy ssh key to node %s" % node)
1506
    keyarray = []
1507
    keyfiles = ["/etc/ssh/ssh_host_dsa_key", "/etc/ssh/ssh_host_dsa_key.pub",
1508
                "/etc/ssh/ssh_host_rsa_key", "/etc/ssh/ssh_host_rsa_key.pub",
1509
                "/root/.ssh/id_dsa", "/root/.ssh/id_dsa.pub"]
1510

    
1511
    for i in keyfiles:
1512
      f = open(i, 'r')
1513
      try:
1514
        keyarray.append(f.read())
1515
      finally:
1516
        f.close()
1517

    
1518
    result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1519
                               keyarray[3], keyarray[4], keyarray[5])
1520

    
1521
    if not result:
1522
      raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1523

    
1524
    # Add node to our /etc/hosts, and add key to known_hosts
1525
    _UpdateEtcHosts(new_node.name, new_node.primary_ip)
1526
    _UpdateKnownHosts(new_node.name, new_node.primary_ip,
1527
                      self.cfg.GetHostKey())
1528

    
1529
    if new_node.secondary_ip != new_node.primary_ip:
1530
      if not rpc.call_node_tcp_ping(new_node.name,
1531
                                    constants.LOCALHOST_IP_ADDRESS,
1532
                                    new_node.secondary_ip,
1533
                                    constants.DEFAULT_NODED_PORT,
1534
                                    10, False):
1535
        raise errors.OpExecError("Node claims it doesn't have the"
1536
                                 " secondary ip you gave (%s).\n"
1537
                                 "Please fix and re-run this command." %
1538
                                 new_node.secondary_ip)
1539

    
1540
    success, msg = ssh.VerifyNodeHostname(node)
1541
    if not success:
1542
      raise errors.OpExecError("Node '%s' claims it has a different hostname"
1543
                               " than the one the resolver gives: %s.\n"
1544
                               "Please fix and re-run this command." %
1545
                               (node, msg))
1546

    
1547
    # Distribute updated /etc/hosts and known_hosts to all nodes,
1548
    # including the node just added
1549
    myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1550
    dist_nodes = self.cfg.GetNodeList() + [node]
1551
    if myself.name in dist_nodes:
1552
      dist_nodes.remove(myself.name)
1553

    
1554
    logger.Debug("Copying hosts and known_hosts to all nodes")
1555
    for fname in ("/etc/hosts", constants.SSH_KNOWN_HOSTS_FILE):
1556
      result = rpc.call_upload_file(dist_nodes, fname)
1557
      for to_node in dist_nodes:
1558
        if not result[to_node]:
1559
          logger.Error("copy of file %s to node %s failed" %
1560
                       (fname, to_node))
1561

    
1562
    to_copy = ss.GetFileList()
1563
    for fname in to_copy:
1564
      if not ssh.CopyFileToNode(node, fname):
1565
        logger.Error("could not copy file %s to node %s" % (fname, node))
1566

    
1567
    logger.Info("adding node %s to cluster.conf" % node)
1568
    self.cfg.AddNode(new_node)
1569

    
1570

    
1571
class LUMasterFailover(LogicalUnit):
1572
  """Failover the master node to the current node.
1573

1574
  This is a special LU in that it must run on a non-master node.
1575

1576
  """
1577
  HPATH = "master-failover"
1578
  HTYPE = constants.HTYPE_CLUSTER
1579
  REQ_MASTER = False
1580
  _OP_REQP = []
1581

    
1582
  def BuildHooksEnv(self):
1583
    """Build hooks env.
1584

1585
    This will run on the new master only in the pre phase, and on all
1586
    the nodes in the post phase.
1587

1588
    """
1589
    env = {
1590
      "NEW_MASTER": self.new_master,
1591
      "OLD_MASTER": self.old_master,
1592
      }
1593
    return env, [self.new_master], self.cfg.GetNodeList()
1594

    
1595
  def CheckPrereq(self):
1596
    """Check prerequisites.
1597

1598
    This checks that we are not already the master.
1599

1600
    """
1601
    self.new_master = utils.HostInfo().name
1602
    self.old_master = self.sstore.GetMasterNode()
1603

    
1604
    if self.old_master == self.new_master:
1605
      raise errors.OpPrereqError("This commands must be run on the node"
1606
                                 " where you want the new master to be.\n"
1607
                                 "%s is already the master" %
1608
                                 self.old_master)
1609

    
1610
  def Exec(self, feedback_fn):
1611
    """Failover the master node.
1612

1613
    This command, when run on a non-master node, will cause the current
1614
    master to cease being master, and the non-master to become new
1615
    master.
1616

1617
    """
1618
    #TODO: do not rely on gethostname returning the FQDN
1619
    logger.Info("setting master to %s, old master: %s" %
1620
                (self.new_master, self.old_master))
1621

    
1622
    if not rpc.call_node_stop_master(self.old_master):
1623
      logger.Error("could disable the master role on the old master"
1624
                   " %s, please disable manually" % self.old_master)
1625

    
1626
    ss = self.sstore
1627
    ss.SetKey(ss.SS_MASTER_NODE, self.new_master)
1628
    if not rpc.call_upload_file(self.cfg.GetNodeList(),
1629
                                ss.KeyToFilename(ss.SS_MASTER_NODE)):
1630
      logger.Error("could not distribute the new simple store master file"
1631
                   " to the other nodes, please check.")
1632

    
1633
    if not rpc.call_node_start_master(self.new_master):
1634
      logger.Error("could not start the master role on the new master"
1635
                   " %s, please check" % self.new_master)
1636
      feedback_fn("Error in activating the master IP on the new master,\n"
1637
                  "please fix manually.")
1638

    
1639

    
1640

    
1641
class LUQueryClusterInfo(NoHooksLU):
1642
  """Query cluster configuration.
1643

1644
  """
1645
  _OP_REQP = []
1646
  REQ_MASTER = False
1647

    
1648
  def CheckPrereq(self):
1649
    """No prerequsites needed for this LU.
1650

1651
    """
1652
    pass
1653

    
1654
  def Exec(self, feedback_fn):
1655
    """Return cluster config.
1656

1657
    """
1658
    result = {
1659
      "name": self.sstore.GetClusterName(),
1660
      "software_version": constants.RELEASE_VERSION,
1661
      "protocol_version": constants.PROTOCOL_VERSION,
1662
      "config_version": constants.CONFIG_VERSION,
1663
      "os_api_version": constants.OS_API_VERSION,
1664
      "export_version": constants.EXPORT_VERSION,
1665
      "master": self.sstore.GetMasterNode(),
1666
      "architecture": (platform.architecture()[0], platform.machine()),
1667
      }
1668

    
1669
    return result
1670

    
1671

    
1672
class LUClusterCopyFile(NoHooksLU):
1673
  """Copy file to cluster.
1674

1675
  """
1676
  _OP_REQP = ["nodes", "filename"]
1677

    
1678
  def CheckPrereq(self):
1679
    """Check prerequisites.
1680

1681
    It should check that the named file exists and that the given list
1682
    of nodes is valid.
1683

1684
    """
1685
    if not os.path.exists(self.op.filename):
1686
      raise errors.OpPrereqError("No such filename '%s'" % self.op.filename)
1687

    
1688
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1689

    
1690
  def Exec(self, feedback_fn):
1691
    """Copy a file from master to some nodes.
1692

1693
    Args:
1694
      opts - class with options as members
1695
      args - list containing a single element, the file name
1696
    Opts used:
1697
      nodes - list containing the name of target nodes; if empty, all nodes
1698

1699
    """
1700
    filename = self.op.filename
1701

    
1702
    myname = utils.HostInfo().name
1703

    
1704
    for node in self.nodes:
1705
      if node == myname:
1706
        continue
1707
      if not ssh.CopyFileToNode(node, filename):
1708
        logger.Error("Copy of file %s to node %s failed" % (filename, node))
1709

    
1710

    
1711
class LUDumpClusterConfig(NoHooksLU):
1712
  """Return a text-representation of the cluster-config.
1713

1714
  """
1715
  _OP_REQP = []
1716

    
1717
  def CheckPrereq(self):
1718
    """No prerequisites.
1719

1720
    """
1721
    pass
1722

    
1723
  def Exec(self, feedback_fn):
1724
    """Dump a representation of the cluster config to the standard output.
1725

1726
    """
1727
    return self.cfg.DumpConfig()
1728

    
1729

    
1730
class LURunClusterCommand(NoHooksLU):
1731
  """Run a command on some nodes.
1732

1733
  """
1734
  _OP_REQP = ["command", "nodes"]
1735

    
1736
  def CheckPrereq(self):
1737
    """Check prerequisites.
1738

1739
    It checks that the given list of nodes is valid.
1740

1741
    """
1742
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1743

    
1744
  def Exec(self, feedback_fn):
1745
    """Run a command on some nodes.
1746

1747
    """
1748
    data = []
1749
    for node in self.nodes:
1750
      result = ssh.SSHCall(node, "root", self.op.command)
1751
      data.append((node, result.output, result.exit_code))
1752

    
1753
    return data
1754

    
1755

    
1756
class LUActivateInstanceDisks(NoHooksLU):
1757
  """Bring up an instance's disks.
1758

1759
  """
1760
  _OP_REQP = ["instance_name"]
1761

    
1762
  def CheckPrereq(self):
1763
    """Check prerequisites.
1764

1765
    This checks that the instance is in the cluster.
1766

1767
    """
1768
    instance = self.cfg.GetInstanceInfo(
1769
      self.cfg.ExpandInstanceName(self.op.instance_name))
1770
    if instance is None:
1771
      raise errors.OpPrereqError("Instance '%s' not known" %
1772
                                 self.op.instance_name)
1773
    self.instance = instance
1774

    
1775

    
1776
  def Exec(self, feedback_fn):
1777
    """Activate the disks.
1778

1779
    """
1780
    disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
1781
    if not disks_ok:
1782
      raise errors.OpExecError("Cannot activate block devices")
1783

    
1784
    return disks_info
1785

    
1786

    
1787
def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
1788
  """Prepare the block devices for an instance.
1789

1790
  This sets up the block devices on all nodes.
1791

1792
  Args:
1793
    instance: a ganeti.objects.Instance object
1794
    ignore_secondaries: if true, errors on secondary nodes won't result
1795
                        in an error return from the function
1796

1797
  Returns:
1798
    false if the operation failed
1799
    list of (host, instance_visible_name, node_visible_name) if the operation
1800
         suceeded with the mapping from node devices to instance devices
1801
  """
1802
  device_info = []
1803
  disks_ok = True
1804
  for inst_disk in instance.disks:
1805
    master_result = None
1806
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1807
      cfg.SetDiskID(node_disk, node)
1808
      is_primary = node == instance.primary_node
1809
      result = rpc.call_blockdev_assemble(node, node_disk, is_primary)
1810
      if not result:
1811
        logger.Error("could not prepare block device %s on node %s (is_pri"
1812
                     "mary=%s)" % (inst_disk.iv_name, node, is_primary))
1813
        if is_primary or not ignore_secondaries:
1814
          disks_ok = False
1815
      if is_primary:
1816
        master_result = result
1817
    device_info.append((instance.primary_node, inst_disk.iv_name,
1818
                        master_result))
1819

    
1820
  return disks_ok, device_info
1821

    
1822

    
1823
def _StartInstanceDisks(cfg, instance, force):
1824
  """Start the disks of an instance.
1825

1826
  """
1827
  disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
1828
                                           ignore_secondaries=force)
1829
  if not disks_ok:
1830
    _ShutdownInstanceDisks(instance, cfg)
1831
    if force is not None and not force:
1832
      logger.Error("If the message above refers to a secondary node,"
1833
                   " you can retry the operation using '--force'.")
1834
    raise errors.OpExecError("Disk consistency error")
1835

    
1836

    
1837
class LUDeactivateInstanceDisks(NoHooksLU):
1838
  """Shutdown an instance's disks.
1839

1840
  """
1841
  _OP_REQP = ["instance_name"]
1842

    
1843
  def CheckPrereq(self):
1844
    """Check prerequisites.
1845

1846
    This checks that the instance is in the cluster.
1847

1848
    """
1849
    instance = self.cfg.GetInstanceInfo(
1850
      self.cfg.ExpandInstanceName(self.op.instance_name))
1851
    if instance is None:
1852
      raise errors.OpPrereqError("Instance '%s' not known" %
1853
                                 self.op.instance_name)
1854
    self.instance = instance
1855

    
1856
  def Exec(self, feedback_fn):
1857
    """Deactivate the disks
1858

1859
    """
1860
    instance = self.instance
1861
    ins_l = rpc.call_instance_list([instance.primary_node])
1862
    ins_l = ins_l[instance.primary_node]
1863
    if not type(ins_l) is list:
1864
      raise errors.OpExecError("Can't contact node '%s'" %
1865
                               instance.primary_node)
1866

    
1867
    if self.instance.name in ins_l:
1868
      raise errors.OpExecError("Instance is running, can't shutdown"
1869
                               " block devices.")
1870

    
1871
    _ShutdownInstanceDisks(instance, self.cfg)
1872

    
1873

    
1874
def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
1875
  """Shutdown block devices of an instance.
1876

1877
  This does the shutdown on all nodes of the instance.
1878

1879
  If the ignore_primary is false, errors on the primary node are
1880
  ignored.
1881

1882
  """
1883
  result = True
1884
  for disk in instance.disks:
1885
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
1886
      cfg.SetDiskID(top_disk, node)
1887
      if not rpc.call_blockdev_shutdown(node, top_disk):
1888
        logger.Error("could not shutdown block device %s on node %s" %
1889
                     (disk.iv_name, node))
1890
        if not ignore_primary or node != instance.primary_node:
1891
          result = False
1892
  return result
1893

    
1894

    
1895
class LUStartupInstance(LogicalUnit):
1896
  """Starts an instance.
1897

1898
  """
1899
  HPATH = "instance-start"
1900
  HTYPE = constants.HTYPE_INSTANCE
1901
  _OP_REQP = ["instance_name", "force"]
1902

    
1903
  def BuildHooksEnv(self):
1904
    """Build hooks env.
1905

1906
    This runs on master, primary and secondary nodes of the instance.
1907

1908
    """
1909
    env = {
1910
      "FORCE": self.op.force,
1911
      }
1912
    env.update(_BuildInstanceHookEnvByObject(self.instance))
1913
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
1914
          list(self.instance.secondary_nodes))
1915
    return env, nl, nl
1916

    
1917
  def CheckPrereq(self):
1918
    """Check prerequisites.
1919

1920
    This checks that the instance is in the cluster.
1921

1922
    """
1923
    instance = self.cfg.GetInstanceInfo(
1924
      self.cfg.ExpandInstanceName(self.op.instance_name))
1925
    if instance is None:
1926
      raise errors.OpPrereqError("Instance '%s' not known" %
1927
                                 self.op.instance_name)
1928

    
1929
    # check bridges existance
1930
    brlist = [nic.bridge for nic in instance.nics]
1931
    if not rpc.call_bridges_exist(instance.primary_node, brlist):
1932
      raise errors.OpPrereqError("one or more target bridges %s does not"
1933
                                 " exist on destination node '%s'" %
1934
                                 (brlist, instance.primary_node))
1935

    
1936
    self.instance = instance
1937
    self.op.instance_name = instance.name
1938

    
1939
  def Exec(self, feedback_fn):
1940
    """Start the instance.
1941

1942
    """
1943
    instance = self.instance
1944
    force = self.op.force
1945
    extra_args = getattr(self.op, "extra_args", "")
1946

    
1947
    node_current = instance.primary_node
1948

    
1949
    nodeinfo = rpc.call_node_info([node_current], self.cfg.GetVGName())
1950
    if not nodeinfo:
1951
      raise errors.OpExecError("Could not contact node %s for infos" %
1952
                               (node_current))
1953

    
1954
    freememory = nodeinfo[node_current]['memory_free']
1955
    memory = instance.memory
1956
    if memory > freememory:
1957
      raise errors.OpExecError("Not enough memory to start instance"
1958
                               " %s on node %s"
1959
                               " needed %s MiB, available %s MiB" %
1960
                               (instance.name, node_current, memory,
1961
                                freememory))
1962

    
1963
    _StartInstanceDisks(self.cfg, instance, force)
1964

    
1965
    if not rpc.call_instance_start(node_current, instance, extra_args):
1966
      _ShutdownInstanceDisks(instance, self.cfg)
1967
      raise errors.OpExecError("Could not start instance")
1968

    
1969
    self.cfg.MarkInstanceUp(instance.name)
1970

    
1971

    
1972
class LUShutdownInstance(LogicalUnit):
1973
  """Shutdown an instance.
1974

1975
  """
1976
  HPATH = "instance-stop"
1977
  HTYPE = constants.HTYPE_INSTANCE
1978
  _OP_REQP = ["instance_name"]
1979

    
1980
  def BuildHooksEnv(self):
1981
    """Build hooks env.
1982

1983
    This runs on master, primary and secondary nodes of the instance.
1984

1985
    """
1986
    env = _BuildInstanceHookEnvByObject(self.instance)
1987
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
1988
          list(self.instance.secondary_nodes))
1989
    return env, nl, nl
1990

    
1991
  def CheckPrereq(self):
1992
    """Check prerequisites.
1993

1994
    This checks that the instance is in the cluster.
1995

1996
    """
1997
    instance = self.cfg.GetInstanceInfo(
1998
      self.cfg.ExpandInstanceName(self.op.instance_name))
1999
    if instance is None:
2000
      raise errors.OpPrereqError("Instance '%s' not known" %
2001
                                 self.op.instance_name)
2002
    self.instance = instance
2003

    
2004
  def Exec(self, feedback_fn):
2005
    """Shutdown the instance.
2006

2007
    """
2008
    instance = self.instance
2009
    node_current = instance.primary_node
2010
    if not rpc.call_instance_shutdown(node_current, instance):
2011
      logger.Error("could not shutdown instance")
2012

    
2013
    self.cfg.MarkInstanceDown(instance.name)
2014
    _ShutdownInstanceDisks(instance, self.cfg)
2015

    
2016

    
2017
class LUReinstallInstance(LogicalUnit):
2018
  """Reinstall an instance.
2019

2020
  """
2021
  HPATH = "instance-reinstall"
2022
  HTYPE = constants.HTYPE_INSTANCE
2023
  _OP_REQP = ["instance_name"]
2024

    
2025
  def BuildHooksEnv(self):
2026
    """Build hooks env.
2027

2028
    This runs on master, primary and secondary nodes of the instance.
2029

2030
    """
2031
    env = _BuildInstanceHookEnvByObject(self.instance)
2032
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2033
          list(self.instance.secondary_nodes))
2034
    return env, nl, nl
2035

    
2036
  def CheckPrereq(self):
2037
    """Check prerequisites.
2038

2039
    This checks that the instance is in the cluster and is not running.
2040

2041
    """
2042
    instance = self.cfg.GetInstanceInfo(
2043
      self.cfg.ExpandInstanceName(self.op.instance_name))
2044
    if instance is None:
2045
      raise errors.OpPrereqError("Instance '%s' not known" %
2046
                                 self.op.instance_name)
2047
    if instance.disk_template == constants.DT_DISKLESS:
2048
      raise errors.OpPrereqError("Instance '%s' has no disks" %
2049
                                 self.op.instance_name)
2050
    if instance.status != "down":
2051
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2052
                                 self.op.instance_name)
2053
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2054
    if remote_info:
2055
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2056
                                 (self.op.instance_name,
2057
                                  instance.primary_node))
2058

    
2059
    self.op.os_type = getattr(self.op, "os_type", None)
2060
    if self.op.os_type is not None:
2061
      # OS verification
2062
      pnode = self.cfg.GetNodeInfo(
2063
        self.cfg.ExpandNodeName(instance.primary_node))
2064
      if pnode is None:
2065
        raise errors.OpPrereqError("Primary node '%s' is unknown" %
2066
                                   self.op.pnode)
2067
      os_obj = rpc.call_os_get([pnode.name], self.op.os_type)[pnode.name]
2068
      if not isinstance(os_obj, objects.OS):
2069
        raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2070
                                   " primary node"  % self.op.os_type)
2071

    
2072
    self.instance = instance
2073

    
2074
  def Exec(self, feedback_fn):
2075
    """Reinstall the instance.
2076

2077
    """
2078
    inst = self.instance
2079

    
2080
    if self.op.os_type is not None:
2081
      feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2082
      inst.os = self.op.os_type
2083
      self.cfg.AddInstance(inst)
2084

    
2085
    _StartInstanceDisks(self.cfg, inst, None)
2086
    try:
2087
      feedback_fn("Running the instance OS create scripts...")
2088
      if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2089
        raise errors.OpExecError("Could not install OS for instance %s "
2090
                                 "on node %s" %
2091
                                 (inst.name, inst.primary_node))
2092
    finally:
2093
      _ShutdownInstanceDisks(inst, self.cfg)
2094

    
2095

    
2096
class LURenameInstance(LogicalUnit):
2097
  """Rename an instance.
2098

2099
  """
2100
  HPATH = "instance-rename"
2101
  HTYPE = constants.HTYPE_INSTANCE
2102
  _OP_REQP = ["instance_name", "new_name"]
2103

    
2104
  def BuildHooksEnv(self):
2105
    """Build hooks env.
2106

2107
    This runs on master, primary and secondary nodes of the instance.
2108

2109
    """
2110
    env = _BuildInstanceHookEnvByObject(self.instance)
2111
    env["INSTANCE_NEW_NAME"] = self.op.new_name
2112
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2113
          list(self.instance.secondary_nodes))
2114
    return env, nl, nl
2115

    
2116
  def CheckPrereq(self):
2117
    """Check prerequisites.
2118

2119
    This checks that the instance is in the cluster and is not running.
2120

2121
    """
2122
    instance = self.cfg.GetInstanceInfo(
2123
      self.cfg.ExpandInstanceName(self.op.instance_name))
2124
    if instance is None:
2125
      raise errors.OpPrereqError("Instance '%s' not known" %
2126
                                 self.op.instance_name)
2127
    if instance.status != "down":
2128
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2129
                                 self.op.instance_name)
2130
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2131
    if remote_info:
2132
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2133
                                 (self.op.instance_name,
2134
                                  instance.primary_node))
2135
    self.instance = instance
2136

    
2137
    # new name verification
2138
    name_info = utils.HostInfo(self.op.new_name)
2139

    
2140
    self.op.new_name = new_name = name_info.name
2141
    if not getattr(self.op, "ignore_ip", False):
2142
      command = ["fping", "-q", name_info.ip]
2143
      result = utils.RunCmd(command)
2144
      if not result.failed:
2145
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
2146
                                   (name_info.ip, new_name))
2147

    
2148

    
2149
  def Exec(self, feedback_fn):
2150
    """Reinstall the instance.
2151

2152
    """
2153
    inst = self.instance
2154
    old_name = inst.name
2155

    
2156
    self.cfg.RenameInstance(inst.name, self.op.new_name)
2157

    
2158
    # re-read the instance from the configuration after rename
2159
    inst = self.cfg.GetInstanceInfo(self.op.new_name)
2160

    
2161
    _StartInstanceDisks(self.cfg, inst, None)
2162
    try:
2163
      if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2164
                                          "sda", "sdb"):
2165
        msg = ("Could run OS rename script for instance %s\n"
2166
               "on node %s\n"
2167
               "(but the instance has been renamed in Ganeti)" %
2168
               (inst.name, inst.primary_node))
2169
        logger.Error(msg)
2170
    finally:
2171
      _ShutdownInstanceDisks(inst, self.cfg)
2172

    
2173

    
2174
class LURemoveInstance(LogicalUnit):
2175
  """Remove an instance.
2176

2177
  """
2178
  HPATH = "instance-remove"
2179
  HTYPE = constants.HTYPE_INSTANCE
2180
  _OP_REQP = ["instance_name"]
2181

    
2182
  def BuildHooksEnv(self):
2183
    """Build hooks env.
2184

2185
    This runs on master, primary and secondary nodes of the instance.
2186

2187
    """
2188
    env = _BuildInstanceHookEnvByObject(self.instance)
2189
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2190
          list(self.instance.secondary_nodes))
2191
    return env, nl, nl
2192

    
2193
  def CheckPrereq(self):
2194
    """Check prerequisites.
2195

2196
    This checks that the instance is in the cluster.
2197

2198
    """
2199
    instance = self.cfg.GetInstanceInfo(
2200
      self.cfg.ExpandInstanceName(self.op.instance_name))
2201
    if instance is None:
2202
      raise errors.OpPrereqError("Instance '%s' not known" %
2203
                                 self.op.instance_name)
2204
    self.instance = instance
2205

    
2206
  def Exec(self, feedback_fn):
2207
    """Remove the instance.
2208

2209
    """
2210
    instance = self.instance
2211
    logger.Info("shutting down instance %s on node %s" %
2212
                (instance.name, instance.primary_node))
2213

    
2214
    if not rpc.call_instance_shutdown(instance.primary_node, instance):
2215
      raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2216
                               (instance.name, instance.primary_node))
2217

    
2218
    logger.Info("removing block devices for instance %s" % instance.name)
2219

    
2220
    _RemoveDisks(instance, self.cfg)
2221

    
2222
    logger.Info("removing instance %s out of cluster config" % instance.name)
2223

    
2224
    self.cfg.RemoveInstance(instance.name)
2225

    
2226

    
2227
class LUQueryInstances(NoHooksLU):
2228
  """Logical unit for querying instances.
2229

2230
  """
2231
  _OP_REQP = ["output_fields", "names"]
2232

    
2233
  def CheckPrereq(self):
2234
    """Check prerequisites.
2235

2236
    This checks that the fields required are valid output fields.
2237

2238
    """
2239
    self.dynamic_fields = frozenset(["oper_state", "oper_ram"])
2240
    _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2241
                               "admin_state", "admin_ram",
2242
                               "disk_template", "ip", "mac", "bridge",
2243
                               "sda_size", "sdb_size"],
2244
                       dynamic=self.dynamic_fields,
2245
                       selected=self.op.output_fields)
2246

    
2247
    self.wanted = _GetWantedInstances(self, self.op.names)
2248

    
2249
  def Exec(self, feedback_fn):
2250
    """Computes the list of nodes and their attributes.
2251

2252
    """
2253
    instance_names = self.wanted
2254
    instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2255
                     in instance_names]
2256

    
2257
    # begin data gathering
2258

    
2259
    nodes = frozenset([inst.primary_node for inst in instance_list])
2260

    
2261
    bad_nodes = []
2262
    if self.dynamic_fields.intersection(self.op.output_fields):
2263
      live_data = {}
2264
      node_data = rpc.call_all_instances_info(nodes)
2265
      for name in nodes:
2266
        result = node_data[name]
2267
        if result:
2268
          live_data.update(result)
2269
        elif result == False:
2270
          bad_nodes.append(name)
2271
        # else no instance is alive
2272
    else:
2273
      live_data = dict([(name, {}) for name in instance_names])
2274

    
2275
    # end data gathering
2276

    
2277
    output = []
2278
    for instance in instance_list:
2279
      iout = []
2280
      for field in self.op.output_fields:
2281
        if field == "name":
2282
          val = instance.name
2283
        elif field == "os":
2284
          val = instance.os
2285
        elif field == "pnode":
2286
          val = instance.primary_node
2287
        elif field == "snodes":
2288
          val = list(instance.secondary_nodes)
2289
        elif field == "admin_state":
2290
          val = (instance.status != "down")
2291
        elif field == "oper_state":
2292
          if instance.primary_node in bad_nodes:
2293
            val = None
2294
          else:
2295
            val = bool(live_data.get(instance.name))
2296
        elif field == "admin_ram":
2297
          val = instance.memory
2298
        elif field == "oper_ram":
2299
          if instance.primary_node in bad_nodes:
2300
            val = None
2301
          elif instance.name in live_data:
2302
            val = live_data[instance.name].get("memory", "?")
2303
          else:
2304
            val = "-"
2305
        elif field == "disk_template":
2306
          val = instance.disk_template
2307
        elif field == "ip":
2308
          val = instance.nics[0].ip
2309
        elif field == "bridge":
2310
          val = instance.nics[0].bridge
2311
        elif field == "mac":
2312
          val = instance.nics[0].mac
2313
        elif field == "sda_size" or field == "sdb_size":
2314
          disk = instance.FindDisk(field[:3])
2315
          if disk is None:
2316
            val = None
2317
          else:
2318
            val = disk.size
2319
        else:
2320
          raise errors.ParameterError(field)
2321
        iout.append(val)
2322
      output.append(iout)
2323

    
2324
    return output
2325

    
2326

    
2327
class LUFailoverInstance(LogicalUnit):
2328
  """Failover an instance.
2329

2330
  """
2331
  HPATH = "instance-failover"
2332
  HTYPE = constants.HTYPE_INSTANCE
2333
  _OP_REQP = ["instance_name", "ignore_consistency"]
2334

    
2335
  def BuildHooksEnv(self):
2336
    """Build hooks env.
2337

2338
    This runs on master, primary and secondary nodes of the instance.
2339

2340
    """
2341
    env = {
2342
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2343
      }
2344
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2345
    nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2346
    return env, nl, nl
2347

    
2348
  def CheckPrereq(self):
2349
    """Check prerequisites.
2350

2351
    This checks that the instance is in the cluster.
2352

2353
    """
2354
    instance = self.cfg.GetInstanceInfo(
2355
      self.cfg.ExpandInstanceName(self.op.instance_name))
2356
    if instance is None:
2357
      raise errors.OpPrereqError("Instance '%s' not known" %
2358
                                 self.op.instance_name)
2359

    
2360
    if instance.disk_template != constants.DT_REMOTE_RAID1:
2361
      raise errors.OpPrereqError("Instance's disk layout is not"
2362
                                 " remote_raid1.")
2363

    
2364
    secondary_nodes = instance.secondary_nodes
2365
    if not secondary_nodes:
2366
      raise errors.ProgrammerError("no secondary node but using "
2367
                                   "DT_REMOTE_RAID1 template")
2368

    
2369
    # check memory requirements on the secondary node
2370
    target_node = secondary_nodes[0]
2371
    nodeinfo = rpc.call_node_info([target_node], self.cfg.GetVGName())
2372
    info = nodeinfo.get(target_node, None)
2373
    if not info:
2374
      raise errors.OpPrereqError("Cannot get current information"
2375
                                 " from node '%s'" % nodeinfo)
2376
    if instance.memory > info['memory_free']:
2377
      raise errors.OpPrereqError("Not enough memory on target node %s."
2378
                                 " %d MB available, %d MB required" %
2379
                                 (target_node, info['memory_free'],
2380
                                  instance.memory))
2381

    
2382
    # check bridge existance
2383
    brlist = [nic.bridge for nic in instance.nics]
2384
    if not rpc.call_bridges_exist(instance.primary_node, brlist):
2385
      raise errors.OpPrereqError("One or more target bridges %s does not"
2386
                                 " exist on destination node '%s'" %
2387
                                 (brlist, instance.primary_node))
2388

    
2389
    self.instance = instance
2390

    
2391
  def Exec(self, feedback_fn):
2392
    """Failover an instance.
2393

2394
    The failover is done by shutting it down on its present node and
2395
    starting it on the secondary.
2396

2397
    """
2398
    instance = self.instance
2399

    
2400
    source_node = instance.primary_node
2401
    target_node = instance.secondary_nodes[0]
2402

    
2403
    feedback_fn("* checking disk consistency between source and target")
2404
    for dev in instance.disks:
2405
      # for remote_raid1, these are md over drbd
2406
      if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2407
        if not self.op.ignore_consistency:
2408
          raise errors.OpExecError("Disk %s is degraded on target node,"
2409
                                   " aborting failover." % dev.iv_name)
2410

    
2411
    feedback_fn("* checking target node resource availability")
2412
    nodeinfo = rpc.call_node_info([target_node], self.cfg.GetVGName())
2413

    
2414
    if not nodeinfo:
2415
      raise errors.OpExecError("Could not contact target node %s." %
2416
                               target_node)
2417

    
2418
    free_memory = int(nodeinfo[target_node]['memory_free'])
2419
    memory = instance.memory
2420
    if memory > free_memory:
2421
      raise errors.OpExecError("Not enough memory to create instance %s on"
2422
                               " node %s. needed %s MiB, available %s MiB" %
2423
                               (instance.name, target_node, memory,
2424
                                free_memory))
2425

    
2426
    feedback_fn("* shutting down instance on source node")
2427
    logger.Info("Shutting down instance %s on node %s" %
2428
                (instance.name, source_node))
2429

    
2430
    if not rpc.call_instance_shutdown(source_node, instance):
2431
      logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2432
                   " anyway. Please make sure node %s is down"  %
2433
                   (instance.name, source_node, source_node))
2434

    
2435
    feedback_fn("* deactivating the instance's disks on source node")
2436
    if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2437
      raise errors.OpExecError("Can't shut down the instance's disks.")
2438

    
2439
    instance.primary_node = target_node
2440
    # distribute new instance config to the other nodes
2441
    self.cfg.AddInstance(instance)
2442

    
2443
    feedback_fn("* activating the instance's disks on target node")
2444
    logger.Info("Starting instance %s on node %s" %
2445
                (instance.name, target_node))
2446

    
2447
    disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2448
                                             ignore_secondaries=True)
2449
    if not disks_ok:
2450
      _ShutdownInstanceDisks(instance, self.cfg)
2451
      raise errors.OpExecError("Can't activate the instance's disks")
2452

    
2453
    feedback_fn("* starting the instance on the target node")
2454
    if not rpc.call_instance_start(target_node, instance, None):
2455
      _ShutdownInstanceDisks(instance, self.cfg)
2456
      raise errors.OpExecError("Could not start instance %s on node %s." %
2457
                               (instance.name, target_node))
2458

    
2459

    
2460
def _CreateBlockDevOnPrimary(cfg, node, device, info):
2461
  """Create a tree of block devices on the primary node.
2462

2463
  This always creates all devices.
2464

2465
  """
2466
  if device.children:
2467
    for child in device.children:
2468
      if not _CreateBlockDevOnPrimary(cfg, node, child, info):
2469
        return False
2470

    
2471
  cfg.SetDiskID(device, node)
2472
  new_id = rpc.call_blockdev_create(node, device, device.size, True, info)
2473
  if not new_id:
2474
    return False
2475
  if device.physical_id is None:
2476
    device.physical_id = new_id
2477
  return True
2478

    
2479

    
2480
def _CreateBlockDevOnSecondary(cfg, node, device, force, info):
2481
  """Create a tree of block devices on a secondary node.
2482

2483
  If this device type has to be created on secondaries, create it and
2484
  all its children.
2485

2486
  If not, just recurse to children keeping the same 'force' value.
2487

2488
  """
2489
  if device.CreateOnSecondary():
2490
    force = True
2491
  if device.children:
2492
    for child in device.children:
2493
      if not _CreateBlockDevOnSecondary(cfg, node, child, force, info):
2494
        return False
2495

    
2496
  if not force:
2497
    return True
2498
  cfg.SetDiskID(device, node)
2499
  new_id = rpc.call_blockdev_create(node, device, device.size, False, info)
2500
  if not new_id:
2501
    return False
2502
  if device.physical_id is None:
2503
    device.physical_id = new_id
2504
  return True
2505

    
2506

    
2507
def _GenerateUniqueNames(cfg, exts):
2508
  """Generate a suitable LV name.
2509

2510
  This will generate a logical volume name for the given instance.
2511

2512
  """
2513
  results = []
2514
  for val in exts:
2515
    new_id = cfg.GenerateUniqueID()
2516
    results.append("%s%s" % (new_id, val))
2517
  return results
2518

    
2519

    
2520
def _GenerateMDDRBDBranch(cfg, primary, secondary, size, names):
2521
  """Generate a drbd device complete with its children.
2522

2523
  """
2524
  port = cfg.AllocatePort()
2525
  vgname = cfg.GetVGName()
2526
  dev_data = objects.Disk(dev_type="lvm", size=size,
2527
                          logical_id=(vgname, names[0]))
2528
  dev_meta = objects.Disk(dev_type="lvm", size=128,
2529
                          logical_id=(vgname, names[1]))
2530
  drbd_dev = objects.Disk(dev_type="drbd", size=size,
2531
                          logical_id = (primary, secondary, port),
2532
                          children = [dev_data, dev_meta])
2533
  return drbd_dev
2534

    
2535

    
2536
def _GenerateDiskTemplate(cfg, template_name,
2537
                          instance_name, primary_node,
2538
                          secondary_nodes, disk_sz, swap_sz):
2539
  """Generate the entire disk layout for a given template type.
2540

2541
  """
2542
  #TODO: compute space requirements
2543

    
2544
  vgname = cfg.GetVGName()
2545
  if template_name == "diskless":
2546
    disks = []
2547
  elif template_name == "plain":
2548
    if len(secondary_nodes) != 0:
2549
      raise errors.ProgrammerError("Wrong template configuration")
2550

    
2551
    names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
2552
    sda_dev = objects.Disk(dev_type="lvm", size=disk_sz,
2553
                           logical_id=(vgname, names[0]),
2554
                           iv_name = "sda")
2555
    sdb_dev = objects.Disk(dev_type="lvm", size=swap_sz,
2556
                           logical_id=(vgname, names[1]),
2557
                           iv_name = "sdb")
2558
    disks = [sda_dev, sdb_dev]
2559
  elif template_name == "local_raid1":
2560
    if len(secondary_nodes) != 0:
2561
      raise errors.ProgrammerError("Wrong template configuration")
2562

    
2563

    
2564
    names = _GenerateUniqueNames(cfg, [".sda_m1", ".sda_m2",
2565
                                       ".sdb_m1", ".sdb_m2"])
2566
    sda_dev_m1 = objects.Disk(dev_type="lvm", size=disk_sz,
2567
                              logical_id=(vgname, names[0]))
2568
    sda_dev_m2 = objects.Disk(dev_type="lvm", size=disk_sz,
2569
                              logical_id=(vgname, names[1]))
2570
    md_sda_dev = objects.Disk(dev_type="md_raid1", iv_name = "sda",
2571
                              size=disk_sz,
2572
                              children = [sda_dev_m1, sda_dev_m2])
2573
    sdb_dev_m1 = objects.Disk(dev_type="lvm", size=swap_sz,
2574
                              logical_id=(vgname, names[2]))
2575
    sdb_dev_m2 = objects.Disk(dev_type="lvm", size=swap_sz,
2576
                              logical_id=(vgname, names[3]))
2577
    md_sdb_dev = objects.Disk(dev_type="md_raid1", iv_name = "sdb",
2578
                              size=swap_sz,
2579
                              children = [sdb_dev_m1, sdb_dev_m2])
2580
    disks = [md_sda_dev, md_sdb_dev]
2581
  elif template_name == constants.DT_REMOTE_RAID1:
2582
    if len(secondary_nodes) != 1:
2583
      raise errors.ProgrammerError("Wrong template configuration")
2584
    remote_node = secondary_nodes[0]
2585
    names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2586
                                       ".sdb_data", ".sdb_meta"])
2587
    drbd_sda_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2588
                                         disk_sz, names[0:2])
2589
    md_sda_dev = objects.Disk(dev_type="md_raid1", iv_name="sda",
2590
                              children = [drbd_sda_dev], size=disk_sz)
2591
    drbd_sdb_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2592
                                         swap_sz, names[2:4])
2593
    md_sdb_dev = objects.Disk(dev_type="md_raid1", iv_name="sdb",
2594
                              children = [drbd_sdb_dev], size=swap_sz)
2595
    disks = [md_sda_dev, md_sdb_dev]
2596
  else:
2597
    raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
2598
  return disks
2599

    
2600

    
2601
def _GetInstanceInfoText(instance):
2602
  """Compute that text that should be added to the disk's metadata.
2603

2604
  """
2605
  return "originstname+%s" % instance.name
2606

    
2607

    
2608
def _CreateDisks(cfg, instance):
2609
  """Create all disks for an instance.
2610

2611
  This abstracts away some work from AddInstance.
2612

2613
  Args:
2614
    instance: the instance object
2615

2616
  Returns:
2617
    True or False showing the success of the creation process
2618

2619
  """
2620
  info = _GetInstanceInfoText(instance)
2621

    
2622
  for device in instance.disks:
2623
    logger.Info("creating volume %s for instance %s" %
2624
              (device.iv_name, instance.name))
2625
    #HARDCODE
2626
    for secondary_node in instance.secondary_nodes:
2627
      if not _CreateBlockDevOnSecondary(cfg, secondary_node, device, False,
2628
                                        info):
2629
        logger.Error("failed to create volume %s (%s) on secondary node %s!" %
2630
                     (device.iv_name, device, secondary_node))
2631
        return False
2632
    #HARDCODE
2633
    if not _CreateBlockDevOnPrimary(cfg, instance.primary_node, device, info):
2634
      logger.Error("failed to create volume %s on primary!" %
2635
                   device.iv_name)
2636
      return False
2637
  return True
2638

    
2639

    
2640
def _RemoveDisks(instance, cfg):
2641
  """Remove all disks for an instance.
2642

2643
  This abstracts away some work from `AddInstance()` and
2644
  `RemoveInstance()`. Note that in case some of the devices couldn't
2645
  be remove, the removal will continue with the other ones (compare
2646
  with `_CreateDisks()`).
2647

2648
  Args:
2649
    instance: the instance object
2650

2651
  Returns:
2652
    True or False showing the success of the removal proces
2653

2654
  """
2655
  logger.Info("removing block devices for instance %s" % instance.name)
2656

    
2657
  result = True
2658
  for device in instance.disks:
2659
    for node, disk in device.ComputeNodeTree(instance.primary_node):
2660
      cfg.SetDiskID(disk, node)
2661
      if not rpc.call_blockdev_remove(node, disk):
2662
        logger.Error("could not remove block device %s on node %s,"
2663
                     " continuing anyway" %
2664
                     (device.iv_name, node))
2665
        result = False
2666
  return result
2667

    
2668

    
2669
class LUCreateInstance(LogicalUnit):
2670
  """Create an instance.
2671

2672
  """
2673
  HPATH = "instance-add"
2674
  HTYPE = constants.HTYPE_INSTANCE
2675
  _OP_REQP = ["instance_name", "mem_size", "disk_size", "pnode",
2676
              "disk_template", "swap_size", "mode", "start", "vcpus",
2677
              "wait_for_sync", "ip_check"]
2678

    
2679
  def BuildHooksEnv(self):
2680
    """Build hooks env.
2681

2682
    This runs on master, primary and secondary nodes of the instance.
2683

2684
    """
2685
    env = {
2686
      "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
2687
      "INSTANCE_DISK_SIZE": self.op.disk_size,
2688
      "INSTANCE_SWAP_SIZE": self.op.swap_size,
2689
      "INSTANCE_ADD_MODE": self.op.mode,
2690
      }
2691
    if self.op.mode == constants.INSTANCE_IMPORT:
2692
      env["INSTANCE_SRC_NODE"] = self.op.src_node
2693
      env["INSTANCE_SRC_PATH"] = self.op.src_path
2694
      env["INSTANCE_SRC_IMAGE"] = self.src_image
2695

    
2696
    env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
2697
      primary_node=self.op.pnode,
2698
      secondary_nodes=self.secondaries,
2699
      status=self.instance_status,
2700
      os_type=self.op.os_type,
2701
      memory=self.op.mem_size,
2702
      vcpus=self.op.vcpus,
2703
      nics=[(self.inst_ip, self.op.bridge)],
2704
    ))
2705

    
2706
    nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
2707
          self.secondaries)
2708
    return env, nl, nl
2709

    
2710

    
2711
  def CheckPrereq(self):
2712
    """Check prerequisites.
2713

2714
    """
2715
    if self.op.mode not in (constants.INSTANCE_CREATE,
2716
                            constants.INSTANCE_IMPORT):
2717
      raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
2718
                                 self.op.mode)
2719

    
2720
    if self.op.mode == constants.INSTANCE_IMPORT:
2721
      src_node = getattr(self.op, "src_node", None)
2722
      src_path = getattr(self.op, "src_path", None)
2723
      if src_node is None or src_path is None:
2724
        raise errors.OpPrereqError("Importing an instance requires source"
2725
                                   " node and path options")
2726
      src_node_full = self.cfg.ExpandNodeName(src_node)
2727
      if src_node_full is None:
2728
        raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
2729
      self.op.src_node = src_node = src_node_full
2730

    
2731
      if not os.path.isabs(src_path):
2732
        raise errors.OpPrereqError("The source path must be absolute")
2733

    
2734
      export_info = rpc.call_export_info(src_node, src_path)
2735

    
2736
      if not export_info:
2737
        raise errors.OpPrereqError("No export found in dir %s" % src_path)
2738

    
2739
      if not export_info.has_section(constants.INISECT_EXP):
2740
        raise errors.ProgrammerError("Corrupted export config")
2741

    
2742
      ei_version = export_info.get(constants.INISECT_EXP, 'version')
2743
      if (int(ei_version) != constants.EXPORT_VERSION):
2744
        raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
2745
                                   (ei_version, constants.EXPORT_VERSION))
2746

    
2747
      if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
2748
        raise errors.OpPrereqError("Can't import instance with more than"
2749
                                   " one data disk")
2750

    
2751
      # FIXME: are the old os-es, disk sizes, etc. useful?
2752
      self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
2753
      diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
2754
                                                         'disk0_dump'))
2755
      self.src_image = diskimage
2756
    else: # INSTANCE_CREATE
2757
      if getattr(self.op, "os_type", None) is None:
2758
        raise errors.OpPrereqError("No guest OS specified")
2759

    
2760
    # check primary node
2761
    pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
2762
    if pnode is None:
2763
      raise errors.OpPrereqError("Primary node '%s' is unknown" %
2764
                                 self.op.pnode)
2765
    self.op.pnode = pnode.name
2766
    self.pnode = pnode
2767
    self.secondaries = []
2768
    # disk template and mirror node verification
2769
    if self.op.disk_template not in constants.DISK_TEMPLATES:
2770
      raise errors.OpPrereqError("Invalid disk template name")
2771

    
2772
    if self.op.disk_template == constants.DT_REMOTE_RAID1:
2773
      if getattr(self.op, "snode", None) is None:
2774
        raise errors.OpPrereqError("The 'remote_raid1' disk template needs"
2775
                                   " a mirror node")
2776

    
2777
      snode_name = self.cfg.ExpandNodeName(self.op.snode)
2778
      if snode_name is None:
2779
        raise errors.OpPrereqError("Unknown secondary node '%s'" %
2780
                                   self.op.snode)
2781
      elif snode_name == pnode.name:
2782
        raise errors.OpPrereqError("The secondary node cannot be"
2783
                                   " the primary node.")
2784
      self.secondaries.append(snode_name)
2785

    
2786
    # Check lv size requirements
2787
    nodenames = [pnode.name] + self.secondaries
2788
    nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
2789

    
2790
    # Required free disk space as a function of disk and swap space
2791
    req_size_dict = {
2792
      constants.DT_DISKLESS: 0,
2793
      constants.DT_PLAIN: self.op.disk_size + self.op.swap_size,
2794
      constants.DT_LOCAL_RAID1: (self.op.disk_size + self.op.swap_size) * 2,
2795
      # 256 MB are added for drbd metadata, 128MB for each drbd device
2796
      constants.DT_REMOTE_RAID1: self.op.disk_size + self.op.swap_size + 256,
2797
    }
2798

    
2799
    if self.op.disk_template not in req_size_dict:
2800
      raise errors.ProgrammerError("Disk template '%s' size requirement"
2801
                                   " is unknown" %  self.op.disk_template)
2802

    
2803
    req_size = req_size_dict[self.op.disk_template]
2804

    
2805
    for node in nodenames:
2806
      info = nodeinfo.get(node, None)
2807
      if not info:
2808
        raise errors.OpPrereqError("Cannot get current information"
2809
                                   " from node '%s'" % nodeinfo)
2810
      if req_size > info['vg_free']:
2811
        raise errors.OpPrereqError("Not enough disk space on target node %s."
2812
                                   " %d MB available, %d MB required" %
2813
                                   (node, info['vg_free'], req_size))
2814

    
2815
    # os verification
2816
    os_obj = rpc.call_os_get([pnode.name], self.op.os_type)[pnode.name]
2817
    if not isinstance(os_obj, objects.OS):
2818
      raise errors.OpPrereqError("OS '%s' not in supported os list for"
2819
                                 " primary node"  % self.op.os_type)
2820

    
2821
    # instance verification
2822
    hostname1 = utils.HostInfo(self.op.instance_name)
2823

    
2824
    self.op.instance_name = instance_name = hostname1.name
2825
    instance_list = self.cfg.GetInstanceList()
2826
    if instance_name in instance_list:
2827
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2828
                                 instance_name)
2829

    
2830
    ip = getattr(self.op, "ip", None)
2831
    if ip is None or ip.lower() == "none":
2832
      inst_ip = None
2833
    elif ip.lower() == "auto":
2834
      inst_ip = hostname1.ip
2835
    else:
2836
      if not utils.IsValidIP(ip):
2837
        raise errors.OpPrereqError("given IP address '%s' doesn't look"
2838
                                   " like a valid IP" % ip)
2839
      inst_ip = ip
2840
    self.inst_ip = inst_ip
2841

    
2842
    if self.op.start and not self.op.ip_check:
2843
      raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
2844
                                 " adding an instance in start mode")
2845

    
2846
    if self.op.ip_check:
2847
      if utils.TcpPing(utils.HostInfo().name, hostname1.ip,
2848
                       constants.DEFAULT_NODED_PORT):
2849
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
2850
                                   (hostname1.ip, instance_name))
2851

    
2852
    # bridge verification
2853
    bridge = getattr(self.op, "bridge", None)
2854
    if bridge is None:
2855
      self.op.bridge = self.cfg.GetDefBridge()
2856
    else:
2857
      self.op.bridge = bridge
2858

    
2859
    if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
2860
      raise errors.OpPrereqError("target bridge '%s' does not exist on"
2861
                                 " destination node '%s'" %
2862
                                 (self.op.bridge, pnode.name))
2863

    
2864
    if self.op.start:
2865
      self.instance_status = 'up'
2866
    else:
2867
      self.instance_status = 'down'
2868

    
2869
  def Exec(self, feedback_fn):
2870
    """Create and add the instance to the cluster.
2871

2872
    """
2873
    instance = self.op.instance_name
2874
    pnode_name = self.pnode.name
2875

    
2876
    nic = objects.NIC(bridge=self.op.bridge, mac=self.cfg.GenerateMAC())
2877
    if self.inst_ip is not None:
2878
      nic.ip = self.inst_ip
2879

    
2880
    disks = _GenerateDiskTemplate(self.cfg,
2881
                                  self.op.disk_template,
2882
                                  instance, pnode_name,
2883
                                  self.secondaries, self.op.disk_size,
2884
                                  self.op.swap_size)
2885

    
2886
    iobj = objects.Instance(name=instance, os=self.op.os_type,
2887
                            primary_node=pnode_name,
2888
                            memory=self.op.mem_size,
2889
                            vcpus=self.op.vcpus,
2890
                            nics=[nic], disks=disks,
2891
                            disk_template=self.op.disk_template,
2892
                            status=self.instance_status,
2893
                            )
2894

    
2895
    feedback_fn("* creating instance disks...")
2896
    if not _CreateDisks(self.cfg, iobj):
2897
      _RemoveDisks(iobj, self.cfg)
2898
      raise errors.OpExecError("Device creation failed, reverting...")
2899

    
2900
    feedback_fn("adding instance %s to cluster config" % instance)
2901

    
2902
    self.cfg.AddInstance(iobj)
2903

    
2904
    if self.op.wait_for_sync:
2905
      disk_abort = not _WaitForSync(self.cfg, iobj)
2906
    elif iobj.disk_template == constants.DT_REMOTE_RAID1:
2907
      # make sure the disks are not degraded (still sync-ing is ok)
2908
      time.sleep(15)
2909
      feedback_fn("* checking mirrors status")
2910
      disk_abort = not _WaitForSync(self.cfg, iobj, oneshot=True)
2911
    else:
2912
      disk_abort = False
2913

    
2914
    if disk_abort:
2915
      _RemoveDisks(iobj, self.cfg)
2916
      self.cfg.RemoveInstance(iobj.name)
2917
      raise errors.OpExecError("There are some degraded disks for"
2918
                               " this instance")
2919

    
2920
    feedback_fn("creating os for instance %s on node %s" %
2921
                (instance, pnode_name))
2922

    
2923
    if iobj.disk_template != constants.DT_DISKLESS:
2924
      if self.op.mode == constants.INSTANCE_CREATE:
2925
        feedback_fn("* running the instance OS create scripts...")
2926
        if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
2927
          raise errors.OpExecError("could not add os for instance %s"
2928
                                   " on node %s" %
2929
                                   (instance, pnode_name))
2930

    
2931
      elif self.op.mode == constants.INSTANCE_IMPORT:
2932
        feedback_fn("* running the instance OS import scripts...")
2933
        src_node = self.op.src_node
2934
        src_image = self.src_image
2935
        if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
2936
                                                src_node, src_image):
2937
          raise errors.OpExecError("Could not import os for instance"
2938
                                   " %s on node %s" %
2939
                                   (instance, pnode_name))
2940
      else:
2941
        # also checked in the prereq part
2942
        raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
2943
                                     % self.op.mode)
2944

    
2945
    if self.op.start:
2946
      logger.Info("starting instance %s on node %s" % (instance, pnode_name))
2947
      feedback_fn("* starting instance...")
2948
      if not rpc.call_instance_start(pnode_name, iobj, None):
2949
        raise errors.OpExecError("Could not start instance")
2950

    
2951

    
2952
class LUConnectConsole(NoHooksLU):
2953
  """Connect to an instance's console.
2954

2955
  This is somewhat special in that it returns the command line that
2956
  you need to run on the master node in order to connect to the
2957
  console.
2958

2959
  """
2960
  _OP_REQP = ["instance_name"]
2961

    
2962
  def CheckPrereq(self):
2963
    """Check prerequisites.
2964

2965
    This checks that the instance is in the cluster.
2966

2967
    """
2968
    instance = self.cfg.GetInstanceInfo(
2969
      self.cfg.ExpandInstanceName(self.op.instance_name))
2970
    if instance is None:
2971
      raise errors.OpPrereqError("Instance '%s' not known" %
2972
                                 self.op.instance_name)
2973
    self.instance = instance
2974

    
2975
  def Exec(self, feedback_fn):
2976
    """Connect to the console of an instance
2977

2978
    """
2979
    instance = self.instance
2980
    node = instance.primary_node
2981

    
2982
    node_insts = rpc.call_instance_list([node])[node]
2983
    if node_insts is False:
2984
      raise errors.OpExecError("Can't connect to node %s." % node)
2985

    
2986
    if instance.name not in node_insts:
2987
      raise errors.OpExecError("Instance %s is not running." % instance.name)
2988

    
2989
    logger.Debug("connecting to console of %s on %s" % (instance.name, node))
2990

    
2991
    hyper = hypervisor.GetHypervisor()
2992
    console_cmd = hyper.GetShellCommandForConsole(instance.name)
2993
    # build ssh cmdline
2994
    argv = ["ssh", "-q", "-t"]
2995
    argv.extend(ssh.KNOWN_HOSTS_OPTS)
2996
    argv.extend(ssh.BATCH_MODE_OPTS)
2997
    argv.append(node)
2998
    argv.append(console_cmd)
2999
    return "ssh", argv
3000

    
3001

    
3002
class LUAddMDDRBDComponent(LogicalUnit):
3003
  """Adda new mirror member to an instance's disk.
3004

3005
  """
3006
  HPATH = "mirror-add"
3007
  HTYPE = constants.HTYPE_INSTANCE
3008
  _OP_REQP = ["instance_name", "remote_node", "disk_name"]
3009

    
3010
  def BuildHooksEnv(self):
3011
    """Build hooks env.
3012

3013
    This runs on the master, the primary and all the secondaries.
3014

3015
    """
3016
    env = {
3017
      "NEW_SECONDARY": self.op.remote_node,
3018
      "DISK_NAME": self.op.disk_name,
3019
      }
3020
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3021
    nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
3022
          self.op.remote_node,] + list(self.instance.secondary_nodes)
3023
    return env, nl, nl
3024

    
3025
  def CheckPrereq(self):
3026
    """Check prerequisites.
3027

3028
    This checks that the instance is in the cluster.
3029

3030
    """
3031
    instance = self.cfg.GetInstanceInfo(
3032
      self.cfg.ExpandInstanceName(self.op.instance_name))
3033
    if instance is None:
3034
      raise errors.OpPrereqError("Instance '%s' not known" %
3035
                                 self.op.instance_name)
3036
    self.instance = instance
3037

    
3038
    remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
3039
    if remote_node is None:
3040
      raise errors.OpPrereqError("Node '%s' not known" % self.op.remote_node)
3041
    self.remote_node = remote_node
3042

    
3043
    if remote_node == instance.primary_node:
3044
      raise errors.OpPrereqError("The specified node is the primary node of"
3045
                                 " the instance.")
3046

    
3047
    if instance.disk_template != constants.DT_REMOTE_RAID1:
3048
      raise errors.OpPrereqError("Instance's disk layout is not"
3049
                                 " remote_raid1.")
3050
    for disk in instance.disks:
3051
      if disk.iv_name == self.op.disk_name:
3052
        break
3053
    else:
3054
      raise errors.OpPrereqError("Can't find this device ('%s') in the"
3055
                                 " instance." % self.op.disk_name)
3056
    if len(disk.children) > 1:
3057
      raise errors.OpPrereqError("The device already has two slave"
3058
                                 " devices.\n"
3059
                                 "This would create a 3-disk raid1"
3060
                                 " which we don't allow.")
3061
    self.disk = disk
3062

    
3063
  def Exec(self, feedback_fn):
3064
    """Add the mirror component
3065

3066
    """
3067
    disk = self.disk
3068
    instance = self.instance
3069

    
3070
    remote_node = self.remote_node
3071
    lv_names = [".%s_%s" % (disk.iv_name, suf) for suf in ["data", "meta"]]
3072
    names = _GenerateUniqueNames(self.cfg, lv_names)
3073
    new_drbd = _GenerateMDDRBDBranch(self.cfg, instance.primary_node,
3074
                                     remote_node, disk.size, names)
3075

    
3076
    logger.Info("adding new mirror component on secondary")
3077
    #HARDCODE
3078
    if not _CreateBlockDevOnSecondary(self.cfg, remote_node, new_drbd, False,
3079
                                      _GetInstanceInfoText(instance)):
3080
      raise errors.OpExecError("Failed to create new component on secondary"
3081
                               " node %s" % remote_node)
3082

    
3083
    logger.Info("adding new mirror component on primary")
3084
    #HARDCODE
3085
    if not _CreateBlockDevOnPrimary(self.cfg, instance.primary_node, new_drbd,
3086
                                    _GetInstanceInfoText(instance)):
3087
      # remove secondary dev
3088
      self.cfg.SetDiskID(new_drbd, remote_node)
3089
      rpc.call_blockdev_remove(remote_node, new_drbd)
3090
      raise errors.OpExecError("Failed to create volume on primary")
3091

    
3092
    # the device exists now
3093
    # call the primary node to add the mirror to md
3094
    logger.Info("adding new mirror component to md")
3095
    if not rpc.call_blockdev_addchild(instance.primary_node,
3096
                                           disk, new_drbd):
3097
      logger.Error("Can't add mirror compoment to md!")
3098
      self.cfg.SetDiskID(new_drbd, remote_node)
3099
      if not rpc.call_blockdev_remove(remote_node, new_drbd):
3100
        logger.Error("Can't rollback on secondary")
3101
      self.cfg.SetDiskID(new_drbd, instance.primary_node)
3102
      if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3103
        logger.Error("Can't rollback on primary")
3104
      raise errors.OpExecError("Can't add mirror component to md array")
3105

    
3106
    disk.children.append(new_drbd)
3107

    
3108
    self.cfg.AddInstance(instance)
3109

    
3110
    _WaitForSync(self.cfg, instance)
3111

    
3112
    return 0
3113

    
3114

    
3115
class LURemoveMDDRBDComponent(LogicalUnit):
3116
  """Remove a component from a remote_raid1 disk.
3117

3118
  """
3119
  HPATH = "mirror-remove"
3120
  HTYPE = constants.HTYPE_INSTANCE
3121
  _OP_REQP = ["instance_name", "disk_name", "disk_id"]
3122

    
3123
  def BuildHooksEnv(self):
3124
    """Build hooks env.
3125

3126
    This runs on the master, the primary and all the secondaries.
3127

3128
    """
3129
    env = {
3130
      "DISK_NAME": self.op.disk_name,
3131
      "DISK_ID": self.op.disk_id,
3132
      "OLD_SECONDARY": self.old_secondary,
3133
      }
3134
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3135
    nl = [self.sstore.GetMasterNode(),
3136
          self.instance.primary_node] + list(self.instance.secondary_nodes)
3137
    return env, nl, nl
3138

    
3139
  def CheckPrereq(self):
3140
    """Check prerequisites.
3141

3142
    This checks that the instance is in the cluster.
3143

3144
    """
3145
    instance = self.cfg.GetInstanceInfo(
3146
      self.cfg.ExpandInstanceName(self.op.instance_name))
3147
    if instance is None:
3148
      raise errors.OpPrereqError("Instance '%s' not known" %
3149
                                 self.op.instance_name)
3150
    self.instance = instance
3151

    
3152
    if instance.disk_template != constants.DT_REMOTE_RAID1:
3153
      raise errors.OpPrereqError("Instance's disk layout is not"
3154
                                 " remote_raid1.")
3155
    for disk in instance.disks:
3156
      if disk.iv_name == self.op.disk_name:
3157
        break
3158
    else:
3159
      raise errors.OpPrereqError("Can't find this device ('%s') in the"
3160
                                 " instance." % self.op.disk_name)
3161
    for child in disk.children:
3162
      if child.dev_type == "drbd" and child.logical_id[2] == self.op.disk_id:
3163
        break
3164
    else:
3165
      raise errors.OpPrereqError("Can't find the device with this port.")
3166

    
3167
    if len(disk.children) < 2:
3168
      raise errors.OpPrereqError("Cannot remove the last component from"
3169
                                 " a mirror.")
3170
    self.disk = disk
3171
    self.child = child
3172
    if self.child.logical_id[0] == instance.primary_node:
3173
      oid = 1
3174
    else:
3175
      oid = 0
3176
    self.old_secondary = self.child.logical_id[oid]
3177

    
3178
  def Exec(self, feedback_fn):
3179
    """Remove the mirror component
3180

3181
    """
3182
    instance = self.instance
3183
    disk = self.disk
3184
    child = self.child
3185
    logger.Info("remove mirror component")
3186
    self.cfg.SetDiskID(disk, instance.primary_node)
3187
    if not rpc.call_blockdev_removechild(instance.primary_node,
3188
                                              disk, child):
3189
      raise errors.OpExecError("Can't remove child from mirror.")
3190

    
3191
    for node in child.logical_id[:2]:
3192
      self.cfg.SetDiskID(child, node)
3193
      if not rpc.call_blockdev_remove(node, child):
3194
        logger.Error("Warning: failed to remove device from node %s,"
3195
                     " continuing operation." % node)
3196

    
3197
    disk.children.remove(child)
3198
    self.cfg.AddInstance(instance)
3199

    
3200

    
3201
class LUReplaceDisks(LogicalUnit):
3202
  """Replace the disks of an instance.
3203

3204
  """
3205
  HPATH = "mirrors-replace"
3206
  HTYPE = constants.HTYPE_INSTANCE
3207
  _OP_REQP = ["instance_name"]
3208

    
3209
  def BuildHooksEnv(self):
3210
    """Build hooks env.
3211

3212
    This runs on the master, the primary and all the secondaries.
3213

3214
    """
3215
    env = {
3216
      "NEW_SECONDARY": self.op.remote_node,
3217
      "OLD_SECONDARY": self.instance.secondary_nodes[0],
3218
      }
3219
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3220
    nl = [self.sstore.GetMasterNode(),
3221
          self.instance.primary_node] + list(self.instance.secondary_nodes)
3222
    return env, nl, nl
3223

    
3224
  def CheckPrereq(self):
3225
    """Check prerequisites.
3226

3227
    This checks that the instance is in the cluster.
3228

3229
    """
3230
    instance = self.cfg.GetInstanceInfo(
3231
      self.cfg.ExpandInstanceName(self.op.instance_name))
3232
    if instance is None:
3233
      raise errors.OpPrereqError("Instance '%s' not known" %
3234
                                 self.op.instance_name)
3235
    self.instance = instance
3236

    
3237
    if instance.disk_template != constants.DT_REMOTE_RAID1:
3238
      raise errors.OpPrereqError("Instance's disk layout is not"
3239
                                 " remote_raid1.")
3240

    
3241
    if len(instance.secondary_nodes) != 1:
3242
      raise errors.OpPrereqError("The instance has a strange layout,"
3243
                                 " expected one secondary but found %d" %
3244
                                 len(instance.secondary_nodes))
3245

    
3246
    remote_node = getattr(self.op, "remote_node", None)
3247
    if remote_node is None:
3248
      remote_node = instance.secondary_nodes[0]
3249
    else:
3250
      remote_node = self.cfg.ExpandNodeName(remote_node)
3251
      if remote_node is None:
3252
        raise errors.OpPrereqError("Node '%s' not known" %
3253
                                   self.op.remote_node)
3254
    if remote_node == instance.primary_node:
3255
      raise errors.OpPrereqError("The specified node is the primary node of"
3256
                                 " the instance.")
3257
    self.op.remote_node = remote_node
3258

    
3259
  def Exec(self, feedback_fn):
3260
    """Replace the disks of an instance.
3261

3262
    """
3263
    instance = self.instance
3264
    iv_names = {}
3265
    # start of work
3266
    remote_node = self.op.remote_node
3267
    cfg = self.cfg
3268
    for dev in instance.disks:
3269
      size = dev.size
3270
      lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3271
      names = _GenerateUniqueNames(cfg, lv_names)
3272
      new_drbd = _GenerateMDDRBDBranch(cfg, instance.primary_node,
3273
                                       remote_node, size, names)
3274
      iv_names[dev.iv_name] = (dev, dev.children[0], new_drbd)
3275
      logger.Info("adding new mirror component on secondary for %s" %
3276
                  dev.iv_name)
3277
      #HARDCODE
3278
      if not _CreateBlockDevOnSecondary(cfg, remote_node, new_drbd, False,
3279
                                        _GetInstanceInfoText(instance)):
3280
        raise errors.OpExecError("Failed to create new component on"
3281
                                 " secondary node %s\n"
3282
                                 "Full abort, cleanup manually!" %
3283
                                 remote_node)
3284

    
3285
      logger.Info("adding new mirror component on primary")
3286
      #HARDCODE
3287
      if not _CreateBlockDevOnPrimary(cfg, instance.primary_node, new_drbd,
3288
                                      _GetInstanceInfoText(instance)):
3289
        # remove secondary dev
3290
        cfg.SetDiskID(new_drbd, remote_node)
3291
        rpc.call_blockdev_remove(remote_node, new_drbd)
3292
        raise errors.OpExecError("Failed to create volume on primary!\n"
3293
                                 "Full abort, cleanup manually!!")
3294

    
3295
      # the device exists now
3296
      # call the primary node to add the mirror to md
3297
      logger.Info("adding new mirror component to md")
3298
      if not rpc.call_blockdev_addchild(instance.primary_node, dev,
3299
                                        new_drbd):
3300
        logger.Error("Can't add mirror compoment to md!")
3301
        cfg.SetDiskID(new_drbd, remote_node)
3302
        if not rpc.call_blockdev_remove(remote_node, new_drbd):
3303
          logger.Error("Can't rollback on secondary")
3304
        cfg.SetDiskID(new_drbd, instance.primary_node)
3305
        if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3306
          logger.Error("Can't rollback on primary")
3307
        raise errors.OpExecError("Full abort, cleanup manually!!")
3308

    
3309
      dev.children.append(new_drbd)
3310
      cfg.AddInstance(instance)
3311

    
3312
    # this can fail as the old devices are degraded and _WaitForSync
3313
    # does a combined result over all disks, so we don't check its
3314
    # return value
3315
    _WaitForSync(cfg, instance, unlock=True)
3316

    
3317
    # so check manually all the devices
3318
    for name in iv_names:
3319
      dev, child, new_drbd = iv_names[name]
3320
      cfg.SetDiskID(dev, instance.primary_node)
3321
      is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3322
      if is_degr:
3323
        raise errors.OpExecError("MD device %s is degraded!" % name)
3324
      cfg.SetDiskID(new_drbd, instance.primary_node)
3325
      is_degr = rpc.call_blockdev_find(instance.primary_node, new_drbd)[5]
3326
      if is_degr:
3327
        raise errors.OpExecError("New drbd device %s is degraded!" % name)
3328

    
3329
    for name in iv_names:
3330
      dev, child, new_drbd = iv_names[name]
3331
      logger.Info("remove mirror %s component" % name)
3332
      cfg.SetDiskID(dev, instance.primary_node)
3333
      if not rpc.call_blockdev_removechild(instance.primary_node,
3334
                                                dev, child):
3335
        logger.Error("Can't remove child from mirror, aborting"
3336
                     " *this device cleanup*.\nYou need to cleanup manually!!")
3337
        continue
3338

    
3339
      for node in child.logical_id[:2]:
3340
        logger.Info("remove child device on %s" % node)
3341
        cfg.SetDiskID(child, node)
3342
        if not rpc.call_blockdev_remove(node, child):
3343
          logger.Error("Warning: failed to remove device from node %s,"
3344
                       " continuing operation." % node)
3345

    
3346
      dev.children.remove(child)
3347

    
3348
      cfg.AddInstance(instance)
3349

    
3350

    
3351
class LUQueryInstanceData(NoHooksLU):
3352
  """Query runtime instance data.
3353

3354
  """
3355
  _OP_REQP = ["instances"]
3356

    
3357
  def CheckPrereq(self):
3358
    """Check prerequisites.
3359

3360
    This only checks the optional instance list against the existing names.
3361

3362
    """
3363
    if not isinstance(self.op.instances, list):
3364
      raise errors.OpPrereqError("Invalid argument type 'instances'")
3365
    if self.op.instances:
3366
      self.wanted_instances = []
3367
      names = self.op.instances
3368
      for name in names:
3369
        instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
3370
        if instance is None:
3371
          raise errors.OpPrereqError("No such instance name '%s'" % name)
3372
      self.wanted_instances.append(instance)
3373
    else:
3374
      self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
3375
                               in self.cfg.GetInstanceList()]
3376
    return
3377

    
3378

    
3379
  def _ComputeDiskStatus(self, instance, snode, dev):
3380
    """Compute block device status.
3381

3382
    """
3383
    self.cfg.SetDiskID(dev, instance.primary_node)
3384
    dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
3385
    if dev.dev_type == "drbd":
3386
      # we change the snode then (otherwise we use the one passed in)
3387
      if dev.logical_id[0] == instance.primary_node:
3388
        snode = dev.logical_id[1]
3389
      else:
3390
        snode = dev.logical_id[0]
3391

    
3392
    if snode:
3393
      self.cfg.SetDiskID(dev, snode)
3394
      dev_sstatus = rpc.call_blockdev_find(snode, dev)
3395
    else:
3396
      dev_sstatus = None
3397

    
3398
    if dev.children:
3399
      dev_children = [self._ComputeDiskStatus(instance, snode, child)
3400
                      for child in dev.children]
3401
    else:
3402
      dev_children = []
3403

    
3404
    data = {
3405
      "iv_name": dev.iv_name,
3406
      "dev_type": dev.dev_type,
3407
      "logical_id": dev.logical_id,
3408
      "physical_id": dev.physical_id,
3409
      "pstatus": dev_pstatus,
3410
      "sstatus": dev_sstatus,
3411
      "children": dev_children,
3412
      }
3413

    
3414
    return data
3415

    
3416
  def Exec(self, feedback_fn):
3417
    """Gather and return data"""
3418
    result = {}
3419
    for instance in self.wanted_instances:
3420
      remote_info = rpc.call_instance_info(instance.primary_node,
3421
                                                instance.name)
3422
      if remote_info and "state" in remote_info:
3423
        remote_state = "up"
3424
      else:
3425
        remote_state = "down"
3426
      if instance.status == "down":
3427
        config_state = "down"
3428
      else:
3429
        config_state = "up"
3430

    
3431
      disks = [self._ComputeDiskStatus(instance, None, device)
3432
               for device in instance.disks]
3433

    
3434
      idict = {
3435
        "name": instance.name,
3436
        "config_state": config_state,
3437
        "run_state": remote_state,
3438
        "pnode": instance.primary_node,
3439
        "snodes": instance.secondary_nodes,
3440
        "os": instance.os,
3441
        "memory": instance.memory,
3442
        "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
3443
        "disks": disks,
3444
        }
3445

    
3446
      result[instance.name] = idict
3447

    
3448
    return result
3449

    
3450

    
3451
class LUSetInstanceParms(LogicalUnit):
3452
  """Modifies an instances's parameters.
3453

3454
  """
3455
  HPATH = "instance-modify"
3456
  HTYPE = constants.HTYPE_INSTANCE
3457
  _OP_REQP = ["instance_name"]
3458

    
3459
  def BuildHooksEnv(self):
3460
    """Build hooks env.
3461

3462
    This runs on the master, primary and secondaries.
3463

3464
    """
3465
    args = dict()
3466
    if self.mem:
3467
      args['memory'] = self.mem
3468
    if self.vcpus:
3469
      args['vcpus'] = self.vcpus
3470
    if self.do_ip or self.do_bridge:
3471
      if self.do_ip:
3472
        ip = self.ip
3473
      else:
3474
        ip = self.instance.nics[0].ip
3475
      if self.bridge:
3476
        bridge = self.bridge
3477
      else:
3478
        bridge = self.instance.nics[0].bridge
3479
      args['nics'] = [(ip, bridge)]
3480
    env = _BuildInstanceHookEnvByObject(self.instance, override=args)
3481
    nl = [self.sstore.GetMasterNode(),
3482
          self.instance.primary_node] + list(self.instance.secondary_nodes)
3483
    return env, nl, nl
3484

    
3485
  def CheckPrereq(self):
3486
    """Check prerequisites.
3487

3488
    This only checks the instance list against the existing names.
3489

3490
    """
3491
    self.mem = getattr(self.op, "mem", None)
3492
    self.vcpus = getattr(self.op, "vcpus", None)
3493
    self.ip = getattr(self.op, "ip", None)
3494
    self.bridge = getattr(self.op, "bridge", None)
3495
    if [self.mem, self.vcpus, self.ip, self.bridge].count(None) == 4:
3496
      raise errors.OpPrereqError("No changes submitted")
3497
    if self.mem is not None:
3498
      try:
3499
        self.mem = int(self.mem)
3500
      except ValueError, err:
3501
        raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
3502
    if self.vcpus is not None:
3503
      try:
3504
        self.vcpus = int(self.vcpus)
3505
      except ValueError, err:
3506
        raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
3507
    if self.ip is not None:
3508
      self.do_ip = True
3509
      if self.ip.lower() == "none":
3510
        self.ip = None
3511
      else:
3512
        if not utils.IsValidIP(self.ip):
3513
          raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
3514
    else:
3515
      self.do_ip = False
3516
    self.do_bridge = (self.bridge is not None)
3517

    
3518
    instance = self.cfg.GetInstanceInfo(
3519
      self.cfg.ExpandInstanceName(self.op.instance_name))
3520
    if instance is None:
3521
      raise errors.OpPrereqError("No such instance name '%s'" %
3522
                                 self.op.instance_name)
3523
    self.op.instance_name = instance.name
3524
    self.instance = instance
3525
    return
3526

    
3527
  def Exec(self, feedback_fn):
3528
    """Modifies an instance.
3529

3530
    All parameters take effect only at the next restart of the instance.
3531
    """
3532
    result = []
3533
    instance = self.instance
3534
    if self.mem:
3535
      instance.memory = self.mem
3536
      result.append(("mem", self.mem))
3537
    if self.vcpus:
3538
      instance.vcpus = self.vcpus
3539
      result.append(("vcpus",  self.vcpus))
3540
    if self.do_ip:
3541
      instance.nics[0].ip = self.ip
3542
      result.append(("ip", self.ip))
3543
    if self.bridge:
3544
      instance.nics[0].bridge = self.bridge
3545
      result.append(("bridge", self.bridge))
3546

    
3547
    self.cfg.AddInstance(instance)
3548

    
3549
    return result
3550

    
3551

    
3552
class LUQueryExports(NoHooksLU):
3553
  """Query the exports list
3554

3555
  """
3556
  _OP_REQP = []
3557

    
3558
  def CheckPrereq(self):
3559
    """Check that the nodelist contains only existing nodes.
3560

3561
    """
3562
    self.nodes = _GetWantedNodes(self, getattr(self.op, "nodes", None))
3563

    
3564
  def Exec(self, feedback_fn):
3565
    """Compute the list of all the exported system images.
3566

3567
    Returns:
3568
      a dictionary with the structure node->(export-list)
3569
      where export-list is a list of the instances exported on
3570
      that node.
3571

3572
    """
3573
    return rpc.call_export_list(self.nodes)
3574

    
3575

    
3576
class LUExportInstance(LogicalUnit):
3577
  """Export an instance to an image in the cluster.
3578

3579
  """
3580
  HPATH = "instance-export"
3581
  HTYPE = constants.HTYPE_INSTANCE
3582
  _OP_REQP = ["instance_name", "target_node", "shutdown"]
3583

    
3584
  def BuildHooksEnv(self):
3585
    """Build hooks env.
3586

3587
    This will run on the master, primary node and target node.
3588

3589
    """
3590
    env = {
3591
      "EXPORT_NODE": self.op.target_node,
3592
      "EXPORT_DO_SHUTDOWN": self.op.shutdown,
3593
      }
3594
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3595
    nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
3596
          self.op.target_node]
3597
    return env, nl, nl
3598

    
3599
  def CheckPrereq(self):
3600
    """Check prerequisites.
3601

3602
    This checks that the instance name is a valid one.
3603

3604
    """
3605
    instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
3606
    self.instance = self.cfg.GetInstanceInfo(instance_name)
3607
    if self.instance is None:
3608
      raise errors.OpPrereqError("Instance '%s' not found" %
3609
                                 self.op.instance_name)
3610

    
3611
    # node verification
3612
    dst_node_short = self.cfg.ExpandNodeName(self.op.target_node)
3613
    self.dst_node = self.cfg.GetNodeInfo(dst_node_short)
3614

    
3615
    if self.dst_node is None:
3616
      raise errors.OpPrereqError("Destination node '%s' is unknown." %
3617
                                 self.op.target_node)
3618
    self.op.target_node = self.dst_node.name
3619

    
3620
  def Exec(self, feedback_fn):
3621
    """Export an instance to an image in the cluster.
3622

3623
    """
3624
    instance = self.instance
3625
    dst_node = self.dst_node
3626
    src_node = instance.primary_node
3627
    # shutdown the instance, unless requested not to do so
3628
    if self.op.shutdown:
3629
      op = opcodes.OpShutdownInstance(instance_name=instance.name)
3630
      self.processor.ChainOpCode(op, feedback_fn)
3631

    
3632
    vgname = self.cfg.GetVGName()
3633

    
3634
    snap_disks = []
3635

    
3636
    try:
3637
      for disk in instance.disks:
3638
        if disk.iv_name == "sda":
3639
          # new_dev_name will be a snapshot of an lvm leaf of the one we passed
3640
          new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
3641

    
3642
          if not new_dev_name:
3643
            logger.Error("could not snapshot block device %s on node %s" %
3644
                         (disk.logical_id[1], src_node))
3645
          else:
3646
            new_dev = objects.Disk(dev_type="lvm", size=disk.size,
3647
                                      logical_id=(vgname, new_dev_name),
3648
                                      physical_id=(vgname, new_dev_name),
3649
                                      iv_name=disk.iv_name)
3650
            snap_disks.append(new_dev)
3651

    
3652
    finally:
3653
      if self.op.shutdown:
3654
        op = opcodes.OpStartupInstance(instance_name=instance.name,
3655
                                       force=False)
3656
        self.processor.ChainOpCode(op, feedback_fn)
3657

    
3658
    # TODO: check for size
3659

    
3660
    for dev in snap_disks:
3661
      if not rpc.call_snapshot_export(src_node, dev, dst_node.name,
3662
                                           instance):
3663
        logger.Error("could not export block device %s from node"
3664
                     " %s to node %s" %
3665
                     (dev.logical_id[1], src_node, dst_node.name))
3666
      if not rpc.call_blockdev_remove(src_node, dev):
3667
        logger.Error("could not remove snapshot block device %s from"
3668
                     " node %s" % (dev.logical_id[1], src_node))
3669

    
3670
    if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
3671
      logger.Error("could not finalize export for instance %s on node %s" %
3672
                   (instance.name, dst_node.name))
3673

    
3674
    nodelist = self.cfg.GetNodeList()
3675
    nodelist.remove(dst_node.name)
3676

    
3677
    # on one-node clusters nodelist will be empty after the removal
3678
    # if we proceed the backup would be removed because OpQueryExports
3679
    # substitutes an empty list with the full cluster node list.
3680
    if nodelist:
3681
      op = opcodes.OpQueryExports(nodes=nodelist)
3682
      exportlist = self.processor.ChainOpCode(op, feedback_fn)
3683
      for node in exportlist:
3684
        if instance.name in exportlist[node]:
3685
          if not rpc.call_export_remove(node, instance.name):
3686
            logger.Error("could not remove older export for instance %s"
3687
                         " on node %s" % (instance.name, node))
3688

    
3689

    
3690
class TagsLU(NoHooksLU):
3691
  """Generic tags LU.
3692

3693
  This is an abstract class which is the parent of all the other tags LUs.
3694

3695
  """
3696
  def CheckPrereq(self):
3697
    """Check prerequisites.
3698

3699
    """
3700
    if self.op.kind == constants.TAG_CLUSTER:
3701
      self.target = self.cfg.GetClusterInfo()
3702
    elif self.op.kind == constants.TAG_NODE:
3703
      name = self.cfg.ExpandNodeName(self.op.name)
3704
      if name is None:
3705
        raise errors.OpPrereqError("Invalid node name (%s)" %
3706
                                   (self.op.name,))
3707
      self.op.name = name
3708
      self.target = self.cfg.GetNodeInfo(name)
3709
    elif self.op.kind == constants.TAG_INSTANCE:
3710
      name = self.cfg.ExpandInstanceName(self.op.name)
3711
      if name is None:
3712
        raise errors.OpPrereqError("Invalid instance name (%s)" %
3713
                                   (self.op.name,))
3714
      self.op.name = name
3715
      self.target = self.cfg.GetInstanceInfo(name)
3716
    else:
3717
      raise errors.OpPrereqError("Wrong tag type requested (%s)" %
3718
                                 str(self.op.kind))
3719

    
3720

    
3721
class LUGetTags(TagsLU):
3722
  """Returns the tags of a given object.
3723

3724
  """
3725
  _OP_REQP = ["kind", "name"]
3726

    
3727
  def Exec(self, feedback_fn):
3728
    """Returns the tag list.
3729

3730
    """
3731
    return self.target.GetTags()
3732

    
3733

    
3734
class LUAddTags(TagsLU):
3735
  """Sets a tag on a given object.
3736

3737
  """
3738
  _OP_REQP = ["kind", "name", "tags"]
3739

    
3740
  def CheckPrereq(self):
3741
    """Check prerequisites.
3742

3743
    This checks the type and length of the tag name and value.
3744

3745
    """
3746
    TagsLU.CheckPrereq(self)
3747
    for tag in self.op.tags:
3748
      objects.TaggableObject.ValidateTag(tag)
3749

    
3750
  def Exec(self, feedback_fn):
3751
    """Sets the tag.
3752

3753
    """
3754
    try:
3755
      for tag in self.op.tags:
3756
        self.target.AddTag(tag)
3757
    except errors.TagError, err:
3758
      raise errors.OpExecError("Error while setting tag: %s" % str(err))
3759
    try:
3760
      self.cfg.Update(self.target)
3761
    except errors.ConfigurationError:
3762
      raise errors.OpRetryError("There has been a modification to the"
3763
                                " config file and the operation has been"
3764
                                " aborted. Please retry.")
3765

    
3766

    
3767
class LUDelTags(TagsLU):
3768
  """Delete a list of tags from a given object.
3769

3770
  """
3771
  _OP_REQP = ["kind", "name", "tags"]
3772

    
3773
  def CheckPrereq(self):
3774
    """Check prerequisites.
3775

3776
    This checks that we have the given tag.
3777

3778
    """
3779
    TagsLU.CheckPrereq(self)
3780
    for tag in self.op.tags:
3781
      objects.TaggableObject.ValidateTag(tag)
3782
    del_tags = frozenset(self.op.tags)
3783
    cur_tags = self.target.GetTags()
3784
    if not del_tags <= cur_tags:
3785
      diff_tags = del_tags - cur_tags
3786
      diff_names = ["'%s'" % tag for tag in diff_tags]
3787
      diff_names.sort()
3788
      raise errors.OpPrereqError("Tag(s) %s not found" %
3789
                                 (",".join(diff_names)))
3790

    
3791
  def Exec(self, feedback_fn):
3792
    """Remove the tag from the object.
3793

3794
    """
3795
    for tag in self.op.tags:
3796
      self.target.RemoveTag(tag)
3797
    try:
3798
      self.cfg.Update(self.target)
3799
    except errors.ConfigurationError:
3800
      raise errors.OpRetryError("There has been a modification to the"
3801
                                " config file and the operation has been"
3802
                                " aborted. Please retry.")