Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ bcf043c9

History | View | Annotate | Download (121.1 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0613,W0201
25

    
26
import os
27
import os.path
28
import sha
29
import socket
30
import time
31
import tempfile
32
import re
33
import platform
34

    
35
from ganeti import rpc
36
from ganeti import ssh
37
from ganeti import logger
38
from ganeti import utils
39
from ganeti import errors
40
from ganeti import hypervisor
41
from ganeti import config
42
from ganeti import constants
43
from ganeti import objects
44
from ganeti import opcodes
45
from ganeti import ssconf
46

    
47
class LogicalUnit(object):
48
  """Logical Unit base class.
49

50
  Subclasses must follow these rules:
51
    - implement CheckPrereq which also fills in the opcode instance
52
      with all the fields (even if as None)
53
    - implement Exec
54
    - implement BuildHooksEnv
55
    - redefine HPATH and HTYPE
56
    - optionally redefine their run requirements (REQ_CLUSTER,
57
      REQ_MASTER); note that all commands require root permissions
58

59
  """
60
  HPATH = None
61
  HTYPE = None
62
  _OP_REQP = []
63
  REQ_CLUSTER = True
64
  REQ_MASTER = True
65

    
66
  def __init__(self, processor, op, cfg, sstore):
67
    """Constructor for LogicalUnit.
68

69
    This needs to be overriden in derived classes in order to check op
70
    validity.
71

72
    """
73
    self.processor = processor
74
    self.op = op
75
    self.cfg = cfg
76
    self.sstore = sstore
77
    for attr_name in self._OP_REQP:
78
      attr_val = getattr(op, attr_name, None)
79
      if attr_val is None:
80
        raise errors.OpPrereqError("Required parameter '%s' missing" %
81
                                   attr_name)
82
    if self.REQ_CLUSTER:
83
      if not cfg.IsCluster():
84
        raise errors.OpPrereqError("Cluster not initialized yet,"
85
                                   " use 'gnt-cluster init' first.")
86
      if self.REQ_MASTER:
87
        master = sstore.GetMasterNode()
88
        if master != socket.gethostname():
89
          raise errors.OpPrereqError("Commands must be run on the master"
90
                                     " node %s" % master)
91

    
92
  def CheckPrereq(self):
93
    """Check prerequisites for this LU.
94

95
    This method should check that the prerequisites for the execution
96
    of this LU are fulfilled. It can do internode communication, but
97
    it should be idempotent - no cluster or system changes are
98
    allowed.
99

100
    The method should raise errors.OpPrereqError in case something is
101
    not fulfilled. Its return value is ignored.
102

103
    This method should also update all the parameters of the opcode to
104
    their canonical form; e.g. a short node name must be fully
105
    expanded after this method has successfully completed (so that
106
    hooks, logging, etc. work correctly).
107

108
    """
109
    raise NotImplementedError
110

    
111
  def Exec(self, feedback_fn):
112
    """Execute the LU.
113

114
    This method should implement the actual work. It should raise
115
    errors.OpExecError for failures that are somewhat dealt with in
116
    code, or expected.
117

118
    """
119
    raise NotImplementedError
120

    
121
  def BuildHooksEnv(self):
122
    """Build hooks environment for this LU.
123

124
    This method should return a three-node tuple consisting of: a dict
125
    containing the environment that will be used for running the
126
    specific hook for this LU, a list of node names on which the hook
127
    should run before the execution, and a list of node names on which
128
    the hook should run after the execution.
129

130
    The keys of the dict must not have 'GANETI_' prefixed as this will
131
    be handled in the hooks runner. Also note additional keys will be
132
    added by the hooks runner. If the LU doesn't define any
133
    environment, an empty dict (and not None) should be returned.
134

135
    As for the node lists, the master should not be included in the
136
    them, as it will be added by the hooks runner in case this LU
137
    requires a cluster to run on (otherwise we don't have a node
138
    list). No nodes should be returned as an empty list (and not
139
    None).
140

141
    Note that if the HPATH for a LU class is None, this function will
142
    not be called.
143

144
    """
145
    raise NotImplementedError
146

    
147

    
148
class NoHooksLU(LogicalUnit):
149
  """Simple LU which runs no hooks.
150

151
  This LU is intended as a parent for other LogicalUnits which will
152
  run no hooks, in order to reduce duplicate code.
153

154
  """
155
  HPATH = None
156
  HTYPE = None
157

    
158
  def BuildHooksEnv(self):
159
    """Build hooks env.
160

161
    This is a no-op, since we don't run hooks.
162

163
    """
164
    return
165

    
166

    
167
def _GetWantedNodes(lu, nodes):
168
  """Returns list of checked and expanded node names.
169

170
  Args:
171
    nodes: List of nodes (strings) or None for all
172

173
  """
174
  if not isinstance(nodes, list):
175
    raise errors.OpPrereqError("Invalid argument type 'nodes'")
176

    
177
  if nodes:
178
    wanted = []
179

    
180
    for name in nodes:
181
      node = lu.cfg.ExpandNodeName(name)
182
      if node is None:
183
        raise errors.OpPrereqError("No such node name '%s'" % name)
184
      wanted.append(node)
185

    
186
  else:
187
    wanted = lu.cfg.GetNodeList()
188
  return utils.NiceSort(wanted)
189

    
190

    
191
def _GetWantedInstances(lu, instances):
192
  """Returns list of checked and expanded instance names.
193

194
  Args:
195
    instances: List of instances (strings) or None for all
196

197
  """
198
  if not isinstance(instances, list):
199
    raise errors.OpPrereqError("Invalid argument type 'instances'")
200

    
201
  if instances:
202
    wanted = []
203

    
204
    for name in instances:
205
      instance = lu.cfg.ExpandInstanceName(name)
206
      if instance is None:
207
        raise errors.OpPrereqError("No such instance name '%s'" % name)
208
      wanted.append(instance)
209

    
210
  else:
211
    wanted = lu.cfg.GetInstanceList()
212
  return utils.NiceSort(wanted)
213

    
214

    
215
def _CheckOutputFields(static, dynamic, selected):
216
  """Checks whether all selected fields are valid.
217

218
  Args:
219
    static: Static fields
220
    dynamic: Dynamic fields
221

222
  """
223
  static_fields = frozenset(static)
224
  dynamic_fields = frozenset(dynamic)
225

    
226
  all_fields = static_fields | dynamic_fields
227

    
228
  if not all_fields.issuperset(selected):
229
    raise errors.OpPrereqError("Unknown output fields selected: %s"
230
                               % ",".join(frozenset(selected).
231
                                          difference(all_fields)))
232

    
233

    
234
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
235
                          memory, vcpus, nics):
236
  """Builds instance related env variables for hooks from single variables.
237

238
  Args:
239
    secondary_nodes: List of secondary nodes as strings
240
  """
241
  env = {
242
    "INSTANCE_NAME": name,
243
    "INSTANCE_PRIMARY": primary_node,
244
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
245
    "INSTANCE_OS_TYPE": os_type,
246
    "INSTANCE_STATUS": status,
247
    "INSTANCE_MEMORY": memory,
248
    "INSTANCE_VCPUS": vcpus,
249
  }
250

    
251
  if nics:
252
    nic_count = len(nics)
253
    for idx, (ip, bridge) in enumerate(nics):
254
      if ip is None:
255
        ip = ""
256
      env["INSTANCE_NIC%d_IP" % idx] = ip
257
      env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
258
  else:
259
    nic_count = 0
260

    
261
  env["INSTANCE_NIC_COUNT"] = nic_count
262

    
263
  return env
264

    
265

    
266
def _BuildInstanceHookEnvByObject(instance, override=None):
267
  """Builds instance related env variables for hooks from an object.
268

269
  Args:
270
    instance: objects.Instance object of instance
271
    override: dict of values to override
272
  """
273
  args = {
274
    'name': instance.name,
275
    'primary_node': instance.primary_node,
276
    'secondary_nodes': instance.secondary_nodes,
277
    'os_type': instance.os,
278
    'status': instance.os,
279
    'memory': instance.memory,
280
    'vcpus': instance.vcpus,
281
    'nics': [(nic.ip, nic.bridge) for nic in instance.nics],
282
  }
283
  if override:
284
    args.update(override)
285
  return _BuildInstanceHookEnv(**args)
286

    
287

    
288
def _UpdateEtcHosts(fullnode, ip):
289
  """Ensure a node has a correct entry in /etc/hosts.
290

291
  Args:
292
    fullnode - Fully qualified domain name of host. (str)
293
    ip       - IPv4 address of host (str)
294

295
  """
296
  node = fullnode.split(".", 1)[0]
297

    
298
  f = open('/etc/hosts', 'r+')
299

    
300
  inthere = False
301

    
302
  save_lines = []
303
  add_lines = []
304
  removed = False
305

    
306
  while True:
307
    rawline = f.readline()
308

    
309
    if not rawline:
310
      # End of file
311
      break
312

    
313
    line = rawline.split('\n')[0]
314

    
315
    # Strip off comments
316
    line = line.split('#')[0]
317

    
318
    if not line:
319
      # Entire line was comment, skip
320
      save_lines.append(rawline)
321
      continue
322

    
323
    fields = line.split()
324

    
325
    haveall = True
326
    havesome = False
327
    for spec in [ ip, fullnode, node ]:
328
      if spec not in fields:
329
        haveall = False
330
      if spec in fields:
331
        havesome = True
332

    
333
    if haveall:
334
      inthere = True
335
      save_lines.append(rawline)
336
      continue
337

    
338
    if havesome and not haveall:
339
      # Line (old, or manual?) which is missing some.  Remove.
340
      removed = True
341
      continue
342

    
343
    save_lines.append(rawline)
344

    
345
  if not inthere:
346
    add_lines.append('%s\t%s %s\n' % (ip, fullnode, node))
347

    
348
  if removed:
349
    if add_lines:
350
      save_lines = save_lines + add_lines
351

    
352
    # We removed a line, write a new file and replace old.
353
    fd, tmpname = tempfile.mkstemp('tmp', 'hosts_', '/etc')
354
    newfile = os.fdopen(fd, 'w')
355
    newfile.write(''.join(save_lines))
356
    newfile.close()
357
    os.rename(tmpname, '/etc/hosts')
358

    
359
  elif add_lines:
360
    # Simply appending a new line will do the trick.
361
    f.seek(0, 2)
362
    for add in add_lines:
363
      f.write(add)
364

    
365
  f.close()
366

    
367

    
368
def _UpdateKnownHosts(fullnode, ip, pubkey):
369
  """Ensure a node has a correct known_hosts entry.
370

371
  Args:
372
    fullnode - Fully qualified domain name of host. (str)
373
    ip       - IPv4 address of host (str)
374
    pubkey   - the public key of the cluster
375

376
  """
377
  if os.path.exists(constants.SSH_KNOWN_HOSTS_FILE):
378
    f = open(constants.SSH_KNOWN_HOSTS_FILE, 'r+')
379
  else:
380
    f = open(constants.SSH_KNOWN_HOSTS_FILE, 'w+')
381

    
382
  inthere = False
383

    
384
  save_lines = []
385
  add_lines = []
386
  removed = False
387

    
388
  while True:
389
    rawline = f.readline()
390
    logger.Debug('read %s' % (repr(rawline),))
391

    
392
    if not rawline:
393
      # End of file
394
      break
395

    
396
    line = rawline.split('\n')[0]
397

    
398
    parts = line.split(' ')
399
    fields = parts[0].split(',')
400
    key = parts[2]
401

    
402
    haveall = True
403
    havesome = False
404
    for spec in [ ip, fullnode ]:
405
      if spec not in fields:
406
        haveall = False
407
      if spec in fields:
408
        havesome = True
409

    
410
    logger.Debug("key, pubkey = %s." % (repr((key, pubkey)),))
411
    if haveall and key == pubkey:
412
      inthere = True
413
      save_lines.append(rawline)
414
      logger.Debug("Keeping known_hosts '%s'." % (repr(rawline),))
415
      continue
416

    
417
    if havesome and (not haveall or key != pubkey):
418
      removed = True
419
      logger.Debug("Discarding known_hosts '%s'." % (repr(rawline),))
420
      continue
421

    
422
    save_lines.append(rawline)
423

    
424
  if not inthere:
425
    add_lines.append('%s,%s ssh-rsa %s\n' % (fullnode, ip, pubkey))
426
    logger.Debug("Adding known_hosts '%s'." % (repr(add_lines[-1]),))
427

    
428
  if removed:
429
    save_lines = save_lines + add_lines
430

    
431
    # Write a new file and replace old.
432
    fd, tmpname = tempfile.mkstemp('.tmp', 'known_hosts.',
433
                                   constants.DATA_DIR)
434
    newfile = os.fdopen(fd, 'w')
435
    try:
436
      newfile.write(''.join(save_lines))
437
    finally:
438
      newfile.close()
439
    logger.Debug("Wrote new known_hosts.")
440
    os.rename(tmpname, constants.SSH_KNOWN_HOSTS_FILE)
441

    
442
  elif add_lines:
443
    # Simply appending a new line will do the trick.
444
    f.seek(0, 2)
445
    for add in add_lines:
446
      f.write(add)
447

    
448
  f.close()
449

    
450

    
451
def _HasValidVG(vglist, vgname):
452
  """Checks if the volume group list is valid.
453

454
  A non-None return value means there's an error, and the return value
455
  is the error message.
456

457
  """
458
  vgsize = vglist.get(vgname, None)
459
  if vgsize is None:
460
    return "volume group '%s' missing" % vgname
461
  elif vgsize < 20480:
462
    return ("volume group '%s' too small (20480MiB required, %dMib found)" %
463
            (vgname, vgsize))
464
  return None
465

    
466

    
467
def _InitSSHSetup(node):
468
  """Setup the SSH configuration for the cluster.
469

470

471
  This generates a dsa keypair for root, adds the pub key to the
472
  permitted hosts and adds the hostkey to its own known hosts.
473

474
  Args:
475
    node: the name of this host as a fqdn
476

477
  """
478
  if os.path.exists('/root/.ssh/id_dsa'):
479
    utils.CreateBackup('/root/.ssh/id_dsa')
480
  if os.path.exists('/root/.ssh/id_dsa.pub'):
481
    utils.CreateBackup('/root/.ssh/id_dsa.pub')
482

    
483
  utils.RemoveFile('/root/.ssh/id_dsa')
484
  utils.RemoveFile('/root/.ssh/id_dsa.pub')
485

    
486
  result = utils.RunCmd(["ssh-keygen", "-t", "dsa",
487
                         "-f", "/root/.ssh/id_dsa",
488
                         "-q", "-N", ""])
489
  if result.failed:
490
    raise errors.OpExecError("Could not generate ssh keypair, error %s" %
491
                             result.output)
492

    
493
  f = open('/root/.ssh/id_dsa.pub', 'r')
494
  try:
495
    utils.AddAuthorizedKey('/root/.ssh/authorized_keys', f.read(8192))
496
  finally:
497
    f.close()
498

    
499

    
500
def _InitGanetiServerSetup(ss):
501
  """Setup the necessary configuration for the initial node daemon.
502

503
  This creates the nodepass file containing the shared password for
504
  the cluster and also generates the SSL certificate.
505

506
  """
507
  # Create pseudo random password
508
  randpass = sha.new(os.urandom(64)).hexdigest()
509
  # and write it into sstore
510
  ss.SetKey(ss.SS_NODED_PASS, randpass)
511

    
512
  result = utils.RunCmd(["openssl", "req", "-new", "-newkey", "rsa:1024",
513
                         "-days", str(365*5), "-nodes", "-x509",
514
                         "-keyout", constants.SSL_CERT_FILE,
515
                         "-out", constants.SSL_CERT_FILE, "-batch"])
516
  if result.failed:
517
    raise errors.OpExecError("could not generate server ssl cert, command"
518
                             " %s had exitcode %s and error message %s" %
519
                             (result.cmd, result.exit_code, result.output))
520

    
521
  os.chmod(constants.SSL_CERT_FILE, 0400)
522

    
523
  result = utils.RunCmd([constants.NODE_INITD_SCRIPT, "restart"])
524

    
525
  if result.failed:
526
    raise errors.OpExecError("Could not start the node daemon, command %s"
527
                             " had exitcode %s and error %s" %
528
                             (result.cmd, result.exit_code, result.output))
529

    
530

    
531
class LUInitCluster(LogicalUnit):
532
  """Initialise the cluster.
533

534
  """
535
  HPATH = "cluster-init"
536
  HTYPE = constants.HTYPE_CLUSTER
537
  _OP_REQP = ["cluster_name", "hypervisor_type", "vg_name", "mac_prefix",
538
              "def_bridge", "master_netdev"]
539
  REQ_CLUSTER = False
540

    
541
  def BuildHooksEnv(self):
542
    """Build hooks env.
543

544
    Notes: Since we don't require a cluster, we must manually add
545
    ourselves in the post-run node list.
546

547
    """
548
    env = {
549
      "CLUSTER": self.op.cluster_name,
550
      "MASTER": self.hostname.name,
551
      }
552
    return env, [], [self.hostname.name]
553

    
554
  def CheckPrereq(self):
555
    """Verify that the passed name is a valid one.
556

557
    """
558
    if config.ConfigWriter.IsCluster():
559
      raise errors.OpPrereqError("Cluster is already initialised")
560

    
561
    hostname_local = socket.gethostname()
562
    self.hostname = hostname = utils.LookupHostname(hostname_local)
563
    if not hostname:
564
      raise errors.OpPrereqError("Cannot resolve my own hostname ('%s')" %
565
                                 hostname_local)
566

    
567
    if hostname.name != hostname_local:
568
      raise errors.OpPrereqError("My own hostname (%s) does not match the"
569
                                 " resolver (%s): probably not using FQDN"
570
                                 " for hostname." %
571
                                 (hostname_local, hostname.name))
572

    
573
    if hostname.ip.startswith("127."):
574
      raise errors.OpPrereqError("This host's IP resolves to the private"
575
                                 " range (%s). Please fix DNS or /etc/hosts." %
576
                                 (hostname.ip,))
577

    
578
    self.clustername = clustername = utils.LookupHostname(self.op.cluster_name)
579
    if not clustername:
580
      raise errors.OpPrereqError("Cannot resolve given cluster name ('%s')"
581
                                 % self.op.cluster_name)
582

    
583
    result = utils.RunCmd(["fping", "-S127.0.0.1", "-q", hostname.ip])
584
    if result.failed:
585
      raise errors.OpPrereqError("Inconsistency: this host's name resolves"
586
                                 " to %s,\nbut this ip address does not"
587
                                 " belong to this host."
588
                                 " Aborting." % hostname.ip)
589

    
590
    secondary_ip = getattr(self.op, "secondary_ip", None)
591
    if secondary_ip and not utils.IsValidIP(secondary_ip):
592
      raise errors.OpPrereqError("Invalid secondary ip given")
593
    if secondary_ip and secondary_ip != hostname.ip:
594
      result = utils.RunCmd(["fping", "-S127.0.0.1", "-q", secondary_ip])
595
      if result.failed:
596
        raise errors.OpPrereqError("You gave %s as secondary IP,\n"
597
                                   "but it does not belong to this host." %
598
                                   secondary_ip)
599
    self.secondary_ip = secondary_ip
600

    
601
    # checks presence of the volume group given
602
    vgstatus = _HasValidVG(utils.ListVolumeGroups(), self.op.vg_name)
603

    
604
    if vgstatus:
605
      raise errors.OpPrereqError("Error: %s" % vgstatus)
606

    
607
    if not re.match("^[0-9a-z]{2}:[0-9a-z]{2}:[0-9a-z]{2}$",
608
                    self.op.mac_prefix):
609
      raise errors.OpPrereqError("Invalid mac prefix given '%s'" %
610
                                 self.op.mac_prefix)
611

    
612
    if self.op.hypervisor_type not in hypervisor.VALID_HTYPES:
613
      raise errors.OpPrereqError("Invalid hypervisor type given '%s'" %
614
                                 self.op.hypervisor_type)
615

    
616
    result = utils.RunCmd(["ip", "link", "show", "dev", self.op.master_netdev])
617
    if result.failed:
618
      raise errors.OpPrereqError("Invalid master netdev given (%s): '%s'" %
619
                                 (self.op.master_netdev,
620
                                  result.output.strip()))
621

    
622
  def Exec(self, feedback_fn):
623
    """Initialize the cluster.
624

625
    """
626
    clustername = self.clustername
627
    hostname = self.hostname
628

    
629
    # set up the simple store
630
    ss = ssconf.SimpleStore()
631
    ss.SetKey(ss.SS_HYPERVISOR, self.op.hypervisor_type)
632
    ss.SetKey(ss.SS_MASTER_NODE, hostname.name)
633
    ss.SetKey(ss.SS_MASTER_IP, clustername.ip)
634
    ss.SetKey(ss.SS_MASTER_NETDEV, self.op.master_netdev)
635
    ss.SetKey(ss.SS_CLUSTER_NAME, clustername.name)
636

    
637
    # set up the inter-node password and certificate
638
    _InitGanetiServerSetup(ss)
639

    
640
    # start the master ip
641
    rpc.call_node_start_master(hostname.name)
642

    
643
    # set up ssh config and /etc/hosts
644
    f = open('/etc/ssh/ssh_host_rsa_key.pub', 'r')
645
    try:
646
      sshline = f.read()
647
    finally:
648
      f.close()
649
    sshkey = sshline.split(" ")[1]
650

    
651
    _UpdateEtcHosts(hostname.name, hostname.ip)
652

    
653
    _UpdateKnownHosts(hostname.name, hostname.ip, sshkey)
654

    
655
    _InitSSHSetup(hostname.name)
656

    
657
    # init of cluster config file
658
    cfgw = config.ConfigWriter()
659
    cfgw.InitConfig(hostname.name, hostname.ip, self.secondary_ip,
660
                    sshkey, self.op.mac_prefix,
661
                    self.op.vg_name, self.op.def_bridge)
662

    
663

    
664
class LUDestroyCluster(NoHooksLU):
665
  """Logical unit for destroying the cluster.
666

667
  """
668
  _OP_REQP = []
669

    
670
  def CheckPrereq(self):
671
    """Check prerequisites.
672

673
    This checks whether the cluster is empty.
674

675
    Any errors are signalled by raising errors.OpPrereqError.
676

677
    """
678
    master = self.sstore.GetMasterNode()
679

    
680
    nodelist = self.cfg.GetNodeList()
681
    if len(nodelist) != 1 or nodelist[0] != master:
682
      raise errors.OpPrereqError("There are still %d node(s) in"
683
                                 " this cluster." % (len(nodelist) - 1))
684
    instancelist = self.cfg.GetInstanceList()
685
    if instancelist:
686
      raise errors.OpPrereqError("There are still %d instance(s) in"
687
                                 " this cluster." % len(instancelist))
688

    
689
  def Exec(self, feedback_fn):
690
    """Destroys the cluster.
691

692
    """
693
    utils.CreateBackup('/root/.ssh/id_dsa')
694
    utils.CreateBackup('/root/.ssh/id_dsa.pub')
695
    rpc.call_node_leave_cluster(self.sstore.GetMasterNode())
696

    
697

    
698
class LUVerifyCluster(NoHooksLU):
699
  """Verifies the cluster status.
700

701
  """
702
  _OP_REQP = []
703

    
704
  def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
705
                  remote_version, feedback_fn):
706
    """Run multiple tests against a node.
707

708
    Test list:
709
      - compares ganeti version
710
      - checks vg existance and size > 20G
711
      - checks config file checksum
712
      - checks ssh to other nodes
713

714
    Args:
715
      node: name of the node to check
716
      file_list: required list of files
717
      local_cksum: dictionary of local files and their checksums
718

719
    """
720
    # compares ganeti version
721
    local_version = constants.PROTOCOL_VERSION
722
    if not remote_version:
723
      feedback_fn(" - ERROR: connection to %s failed" % (node))
724
      return True
725

    
726
    if local_version != remote_version:
727
      feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
728
                      (local_version, node, remote_version))
729
      return True
730

    
731
    # checks vg existance and size > 20G
732

    
733
    bad = False
734
    if not vglist:
735
      feedback_fn("  - ERROR: unable to check volume groups on node %s." %
736
                      (node,))
737
      bad = True
738
    else:
739
      vgstatus = _HasValidVG(vglist, self.cfg.GetVGName())
740
      if vgstatus:
741
        feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
742
        bad = True
743

    
744
    # checks config file checksum
745
    # checks ssh to any
746

    
747
    if 'filelist' not in node_result:
748
      bad = True
749
      feedback_fn("  - ERROR: node hasn't returned file checksum data")
750
    else:
751
      remote_cksum = node_result['filelist']
752
      for file_name in file_list:
753
        if file_name not in remote_cksum:
754
          bad = True
755
          feedback_fn("  - ERROR: file '%s' missing" % file_name)
756
        elif remote_cksum[file_name] != local_cksum[file_name]:
757
          bad = True
758
          feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
759

    
760
    if 'nodelist' not in node_result:
761
      bad = True
762
      feedback_fn("  - ERROR: node hasn't returned node connectivity data")
763
    else:
764
      if node_result['nodelist']:
765
        bad = True
766
        for node in node_result['nodelist']:
767
          feedback_fn("  - ERROR: communication with node '%s': %s" %
768
                          (node, node_result['nodelist'][node]))
769
    hyp_result = node_result.get('hypervisor', None)
770
    if hyp_result is not None:
771
      feedback_fn("  - ERROR: hypervisor verify failure: '%s'" % hyp_result)
772
    return bad
773

    
774
  def _VerifyInstance(self, instance, node_vol_is, node_instance, feedback_fn):
775
    """Verify an instance.
776

777
    This function checks to see if the required block devices are
778
    available on the instance's node.
779

780
    """
781
    bad = False
782

    
783
    instancelist = self.cfg.GetInstanceList()
784
    if not instance in instancelist:
785
      feedback_fn("  - ERROR: instance %s not in instance list %s" %
786
                      (instance, instancelist))
787
      bad = True
788

    
789
    instanceconfig = self.cfg.GetInstanceInfo(instance)
790
    node_current = instanceconfig.primary_node
791

    
792
    node_vol_should = {}
793
    instanceconfig.MapLVsByNode(node_vol_should)
794

    
795
    for node in node_vol_should:
796
      for volume in node_vol_should[node]:
797
        if node not in node_vol_is or volume not in node_vol_is[node]:
798
          feedback_fn("  - ERROR: volume %s missing on node %s" %
799
                          (volume, node))
800
          bad = True
801

    
802
    if not instanceconfig.status == 'down':
803
      if not instance in node_instance[node_current]:
804
        feedback_fn("  - ERROR: instance %s not running on node %s" %
805
                        (instance, node_current))
806
        bad = True
807

    
808
    for node in node_instance:
809
      if (not node == node_current):
810
        if instance in node_instance[node]:
811
          feedback_fn("  - ERROR: instance %s should not run on node %s" %
812
                          (instance, node))
813
          bad = True
814

    
815
    return not bad
816

    
817
  def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
818
    """Verify if there are any unknown volumes in the cluster.
819

820
    The .os, .swap and backup volumes are ignored. All other volumes are
821
    reported as unknown.
822

823
    """
824
    bad = False
825

    
826
    for node in node_vol_is:
827
      for volume in node_vol_is[node]:
828
        if node not in node_vol_should or volume not in node_vol_should[node]:
829
          feedback_fn("  - ERROR: volume %s on node %s should not exist" %
830
                      (volume, node))
831
          bad = True
832
    return bad
833

    
834
  def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
835
    """Verify the list of running instances.
836

837
    This checks what instances are running but unknown to the cluster.
838

839
    """
840
    bad = False
841
    for node in node_instance:
842
      for runninginstance in node_instance[node]:
843
        if runninginstance not in instancelist:
844
          feedback_fn("  - ERROR: instance %s on node %s should not exist" %
845
                          (runninginstance, node))
846
          bad = True
847
    return bad
848

    
849
  def CheckPrereq(self):
850
    """Check prerequisites.
851

852
    This has no prerequisites.
853

854
    """
855
    pass
856

    
857
  def Exec(self, feedback_fn):
858
    """Verify integrity of cluster, performing various test on nodes.
859

860
    """
861
    bad = False
862
    feedback_fn("* Verifying global settings")
863
    self.cfg.VerifyConfig()
864

    
865
    master = self.sstore.GetMasterNode()
866
    vg_name = self.cfg.GetVGName()
867
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
868
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
869
    node_volume = {}
870
    node_instance = {}
871

    
872
    # FIXME: verify OS list
873
    # do local checksums
874
    file_names = list(self.sstore.GetFileList())
875
    file_names.append(constants.SSL_CERT_FILE)
876
    file_names.append(constants.CLUSTER_CONF_FILE)
877
    local_checksums = utils.FingerprintFiles(file_names)
878

    
879
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
880
    all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
881
    all_instanceinfo = rpc.call_instance_list(nodelist)
882
    all_vglist = rpc.call_vg_list(nodelist)
883
    node_verify_param = {
884
      'filelist': file_names,
885
      'nodelist': nodelist,
886
      'hypervisor': None,
887
      }
888
    all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
889
    all_rversion = rpc.call_version(nodelist)
890

    
891
    for node in nodelist:
892
      feedback_fn("* Verifying node %s" % node)
893
      result = self._VerifyNode(node, file_names, local_checksums,
894
                                all_vglist[node], all_nvinfo[node],
895
                                all_rversion[node], feedback_fn)
896
      bad = bad or result
897

    
898
      # node_volume
899
      volumeinfo = all_volumeinfo[node]
900

    
901
      if type(volumeinfo) != dict:
902
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
903
        bad = True
904
        continue
905

    
906
      node_volume[node] = volumeinfo
907

    
908
      # node_instance
909
      nodeinstance = all_instanceinfo[node]
910
      if type(nodeinstance) != list:
911
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
912
        bad = True
913
        continue
914

    
915
      node_instance[node] = nodeinstance
916

    
917
    node_vol_should = {}
918

    
919
    for instance in instancelist:
920
      feedback_fn("* Verifying instance %s" % instance)
921
      result =  self._VerifyInstance(instance, node_volume, node_instance,
922
                                     feedback_fn)
923
      bad = bad or result
924

    
925
      inst_config = self.cfg.GetInstanceInfo(instance)
926

    
927
      inst_config.MapLVsByNode(node_vol_should)
928

    
929
    feedback_fn("* Verifying orphan volumes")
930
    result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
931
                                       feedback_fn)
932
    bad = bad or result
933

    
934
    feedback_fn("* Verifying remaining instances")
935
    result = self._VerifyOrphanInstances(instancelist, node_instance,
936
                                         feedback_fn)
937
    bad = bad or result
938

    
939
    return int(bad)
940

    
941

    
942
class LURenameCluster(LogicalUnit):
943
  """Rename the cluster.
944

945
  """
946
  HPATH = "cluster-rename"
947
  HTYPE = constants.HTYPE_CLUSTER
948
  _OP_REQP = ["name"]
949

    
950
  def BuildHooksEnv(self):
951
    """Build hooks env.
952

953
    """
954
    env = {
955
      "NEW_NAME": self.op.name,
956
      }
957
    mn = self.sstore.GetMasterNode()
958
    return env, [mn], [mn]
959

    
960
  def CheckPrereq(self):
961
    """Verify that the passed name is a valid one.
962

963
    """
964
    hostname = utils.LookupHostname(self.op.name)
965
    if not hostname:
966
      raise errors.OpPrereqError("Cannot resolve the new cluster name ('%s')" %
967
                                 self.op.name)
968

    
969
    new_name = hostname.name
970
    self.ip = new_ip = hostname.ip
971
    old_name = self.sstore.GetClusterName()
972
    old_ip = self.sstore.GetMasterIP()
973
    if new_name == old_name and new_ip == old_ip:
974
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
975
                                 " cluster has changed")
976
    if new_ip != old_ip:
977
      result = utils.RunCmd(["fping", "-q", new_ip])
978
      if not result.failed:
979
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
980
                                   " reachable on the network. Aborting." %
981
                                   new_ip)
982

    
983
    self.op.name = new_name
984

    
985
  def Exec(self, feedback_fn):
986
    """Rename the cluster.
987

988
    """
989
    clustername = self.op.name
990
    ip = self.ip
991
    ss = self.sstore
992

    
993
    # shutdown the master IP
994
    master = ss.GetMasterNode()
995
    if not rpc.call_node_stop_master(master):
996
      raise errors.OpExecError("Could not disable the master role")
997

    
998
    try:
999
      # modify the sstore
1000
      ss.SetKey(ss.SS_MASTER_IP, ip)
1001
      ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
1002

    
1003
      # Distribute updated ss config to all nodes
1004
      myself = self.cfg.GetNodeInfo(master)
1005
      dist_nodes = self.cfg.GetNodeList()
1006
      if myself.name in dist_nodes:
1007
        dist_nodes.remove(myself.name)
1008

    
1009
      logger.Debug("Copying updated ssconf data to all nodes")
1010
      for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1011
        fname = ss.KeyToFilename(keyname)
1012
        result = rpc.call_upload_file(dist_nodes, fname)
1013
        for to_node in dist_nodes:
1014
          if not result[to_node]:
1015
            logger.Error("copy of file %s to node %s failed" %
1016
                         (fname, to_node))
1017
    finally:
1018
      if not rpc.call_node_start_master(master):
1019
        logger.Error("Could not re-enable the master role on the master,\n"
1020
                     "please restart manually.")
1021

    
1022

    
1023
def _WaitForSync(cfgw, instance, oneshot=False, unlock=False):
1024
  """Sleep and poll for an instance's disk to sync.
1025

1026
  """
1027
  if not instance.disks:
1028
    return True
1029

    
1030
  if not oneshot:
1031
    logger.ToStdout("Waiting for instance %s to sync disks." % instance.name)
1032

    
1033
  node = instance.primary_node
1034

    
1035
  for dev in instance.disks:
1036
    cfgw.SetDiskID(dev, node)
1037

    
1038
  retries = 0
1039
  while True:
1040
    max_time = 0
1041
    done = True
1042
    cumul_degraded = False
1043
    rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1044
    if not rstats:
1045
      logger.ToStderr("Can't get any data from node %s" % node)
1046
      retries += 1
1047
      if retries >= 10:
1048
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1049
                                 " aborting." % node)
1050
      time.sleep(6)
1051
      continue
1052
    retries = 0
1053
    for i in range(len(rstats)):
1054
      mstat = rstats[i]
1055
      if mstat is None:
1056
        logger.ToStderr("Can't compute data for node %s/%s" %
1057
                        (node, instance.disks[i].iv_name))
1058
        continue
1059
      perc_done, est_time, is_degraded = mstat
1060
      cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1061
      if perc_done is not None:
1062
        done = False
1063
        if est_time is not None:
1064
          rem_time = "%d estimated seconds remaining" % est_time
1065
          max_time = est_time
1066
        else:
1067
          rem_time = "no time estimate"
1068
        logger.ToStdout("- device %s: %5.2f%% done, %s" %
1069
                        (instance.disks[i].iv_name, perc_done, rem_time))
1070
    if done or oneshot:
1071
      break
1072

    
1073
    if unlock:
1074
      utils.Unlock('cmd')
1075
    try:
1076
      time.sleep(min(60, max_time))
1077
    finally:
1078
      if unlock:
1079
        utils.Lock('cmd')
1080

    
1081
  if done:
1082
    logger.ToStdout("Instance %s's disks are in sync." % instance.name)
1083
  return not cumul_degraded
1084

    
1085

    
1086
def _CheckDiskConsistency(cfgw, dev, node, on_primary):
1087
  """Check that mirrors are not degraded.
1088

1089
  """
1090
  cfgw.SetDiskID(dev, node)
1091

    
1092
  result = True
1093
  if on_primary or dev.AssembleOnSecondary():
1094
    rstats = rpc.call_blockdev_find(node, dev)
1095
    if not rstats:
1096
      logger.ToStderr("Can't get any data from node %s" % node)
1097
      result = False
1098
    else:
1099
      result = result and (not rstats[5])
1100
  if dev.children:
1101
    for child in dev.children:
1102
      result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1103

    
1104
  return result
1105

    
1106

    
1107
class LUDiagnoseOS(NoHooksLU):
1108
  """Logical unit for OS diagnose/query.
1109

1110
  """
1111
  _OP_REQP = []
1112

    
1113
  def CheckPrereq(self):
1114
    """Check prerequisites.
1115

1116
    This always succeeds, since this is a pure query LU.
1117

1118
    """
1119
    return
1120

    
1121
  def Exec(self, feedback_fn):
1122
    """Compute the list of OSes.
1123

1124
    """
1125
    node_list = self.cfg.GetNodeList()
1126
    node_data = rpc.call_os_diagnose(node_list)
1127
    if node_data == False:
1128
      raise errors.OpExecError("Can't gather the list of OSes")
1129
    return node_data
1130

    
1131

    
1132
class LURemoveNode(LogicalUnit):
1133
  """Logical unit for removing a node.
1134

1135
  """
1136
  HPATH = "node-remove"
1137
  HTYPE = constants.HTYPE_NODE
1138
  _OP_REQP = ["node_name"]
1139

    
1140
  def BuildHooksEnv(self):
1141
    """Build hooks env.
1142

1143
    This doesn't run on the target node in the pre phase as a failed
1144
    node would not allows itself to run.
1145

1146
    """
1147
    env = {
1148
      "NODE_NAME": self.op.node_name,
1149
      }
1150
    all_nodes = self.cfg.GetNodeList()
1151
    all_nodes.remove(self.op.node_name)
1152
    return env, all_nodes, all_nodes
1153

    
1154
  def CheckPrereq(self):
1155
    """Check prerequisites.
1156

1157
    This checks:
1158
     - the node exists in the configuration
1159
     - it does not have primary or secondary instances
1160
     - it's not the master
1161

1162
    Any errors are signalled by raising errors.OpPrereqError.
1163

1164
    """
1165
    node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1166
    if node is None:
1167
      raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1168

    
1169
    instance_list = self.cfg.GetInstanceList()
1170

    
1171
    masternode = self.sstore.GetMasterNode()
1172
    if node.name == masternode:
1173
      raise errors.OpPrereqError("Node is the master node,"
1174
                                 " you need to failover first.")
1175

    
1176
    for instance_name in instance_list:
1177
      instance = self.cfg.GetInstanceInfo(instance_name)
1178
      if node.name == instance.primary_node:
1179
        raise errors.OpPrereqError("Instance %s still running on the node,"
1180
                                   " please remove first." % instance_name)
1181
      if node.name in instance.secondary_nodes:
1182
        raise errors.OpPrereqError("Instance %s has node as a secondary,"
1183
                                   " please remove first." % instance_name)
1184
    self.op.node_name = node.name
1185
    self.node = node
1186

    
1187
  def Exec(self, feedback_fn):
1188
    """Removes the node from the cluster.
1189

1190
    """
1191
    node = self.node
1192
    logger.Info("stopping the node daemon and removing configs from node %s" %
1193
                node.name)
1194

    
1195
    rpc.call_node_leave_cluster(node.name)
1196

    
1197
    ssh.SSHCall(node.name, 'root', "%s stop" % constants.NODE_INITD_SCRIPT)
1198

    
1199
    logger.Info("Removing node %s from config" % node.name)
1200

    
1201
    self.cfg.RemoveNode(node.name)
1202

    
1203

    
1204
class LUQueryNodes(NoHooksLU):
1205
  """Logical unit for querying nodes.
1206

1207
  """
1208
  _OP_REQP = ["output_fields", "names"]
1209

    
1210
  def CheckPrereq(self):
1211
    """Check prerequisites.
1212

1213
    This checks that the fields required are valid output fields.
1214

1215
    """
1216
    self.dynamic_fields = frozenset(["dtotal", "dfree",
1217
                                     "mtotal", "mnode", "mfree"])
1218

    
1219
    _CheckOutputFields(static=["name", "pinst_cnt", "sinst_cnt",
1220
                               "pinst_list", "sinst_list",
1221
                               "pip", "sip"],
1222
                       dynamic=self.dynamic_fields,
1223
                       selected=self.op.output_fields)
1224

    
1225
    self.wanted = _GetWantedNodes(self, self.op.names)
1226

    
1227
  def Exec(self, feedback_fn):
1228
    """Computes the list of nodes and their attributes.
1229

1230
    """
1231
    nodenames = self.wanted
1232
    nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1233

    
1234
    # begin data gathering
1235

    
1236
    if self.dynamic_fields.intersection(self.op.output_fields):
1237
      live_data = {}
1238
      node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1239
      for name in nodenames:
1240
        nodeinfo = node_data.get(name, None)
1241
        if nodeinfo:
1242
          live_data[name] = {
1243
            "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1244
            "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1245
            "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1246
            "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1247
            "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1248
            }
1249
        else:
1250
          live_data[name] = {}
1251
    else:
1252
      live_data = dict.fromkeys(nodenames, {})
1253

    
1254
    node_to_primary = dict([(name, set()) for name in nodenames])
1255
    node_to_secondary = dict([(name, set()) for name in nodenames])
1256

    
1257
    inst_fields = frozenset(("pinst_cnt", "pinst_list",
1258
                             "sinst_cnt", "sinst_list"))
1259
    if inst_fields & frozenset(self.op.output_fields):
1260
      instancelist = self.cfg.GetInstanceList()
1261

    
1262
      for instance_name in instancelist:
1263
        inst = self.cfg.GetInstanceInfo(instance_name)
1264
        if inst.primary_node in node_to_primary:
1265
          node_to_primary[inst.primary_node].add(inst.name)
1266
        for secnode in inst.secondary_nodes:
1267
          if secnode in node_to_secondary:
1268
            node_to_secondary[secnode].add(inst.name)
1269

    
1270
    # end data gathering
1271

    
1272
    output = []
1273
    for node in nodelist:
1274
      node_output = []
1275
      for field in self.op.output_fields:
1276
        if field == "name":
1277
          val = node.name
1278
        elif field == "pinst_list":
1279
          val = list(node_to_primary[node.name])
1280
        elif field == "sinst_list":
1281
          val = list(node_to_secondary[node.name])
1282
        elif field == "pinst_cnt":
1283
          val = len(node_to_primary[node.name])
1284
        elif field == "sinst_cnt":
1285
          val = len(node_to_secondary[node.name])
1286
        elif field == "pip":
1287
          val = node.primary_ip
1288
        elif field == "sip":
1289
          val = node.secondary_ip
1290
        elif field in self.dynamic_fields:
1291
          val = live_data[node.name].get(field, None)
1292
        else:
1293
          raise errors.ParameterError(field)
1294
        node_output.append(val)
1295
      output.append(node_output)
1296

    
1297
    return output
1298

    
1299

    
1300
class LUQueryNodeVolumes(NoHooksLU):
1301
  """Logical unit for getting volumes on node(s).
1302

1303
  """
1304
  _OP_REQP = ["nodes", "output_fields"]
1305

    
1306
  def CheckPrereq(self):
1307
    """Check prerequisites.
1308

1309
    This checks that the fields required are valid output fields.
1310

1311
    """
1312
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1313

    
1314
    _CheckOutputFields(static=["node"],
1315
                       dynamic=["phys", "vg", "name", "size", "instance"],
1316
                       selected=self.op.output_fields)
1317

    
1318

    
1319
  def Exec(self, feedback_fn):
1320
    """Computes the list of nodes and their attributes.
1321

1322
    """
1323
    nodenames = self.nodes
1324
    volumes = rpc.call_node_volumes(nodenames)
1325

    
1326
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
1327
             in self.cfg.GetInstanceList()]
1328

    
1329
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1330

    
1331
    output = []
1332
    for node in nodenames:
1333
      if node not in volumes or not volumes[node]:
1334
        continue
1335

    
1336
      node_vols = volumes[node][:]
1337
      node_vols.sort(key=lambda vol: vol['dev'])
1338

    
1339
      for vol in node_vols:
1340
        node_output = []
1341
        for field in self.op.output_fields:
1342
          if field == "node":
1343
            val = node
1344
          elif field == "phys":
1345
            val = vol['dev']
1346
          elif field == "vg":
1347
            val = vol['vg']
1348
          elif field == "name":
1349
            val = vol['name']
1350
          elif field == "size":
1351
            val = int(float(vol['size']))
1352
          elif field == "instance":
1353
            for inst in ilist:
1354
              if node not in lv_by_node[inst]:
1355
                continue
1356
              if vol['name'] in lv_by_node[inst][node]:
1357
                val = inst.name
1358
                break
1359
            else:
1360
              val = '-'
1361
          else:
1362
            raise errors.ParameterError(field)
1363
          node_output.append(str(val))
1364

    
1365
        output.append(node_output)
1366

    
1367
    return output
1368

    
1369

    
1370
class LUAddNode(LogicalUnit):
1371
  """Logical unit for adding node to the cluster.
1372

1373
  """
1374
  HPATH = "node-add"
1375
  HTYPE = constants.HTYPE_NODE
1376
  _OP_REQP = ["node_name"]
1377

    
1378
  def BuildHooksEnv(self):
1379
    """Build hooks env.
1380

1381
    This will run on all nodes before, and on all nodes + the new node after.
1382

1383
    """
1384
    env = {
1385
      "NODE_NAME": self.op.node_name,
1386
      "NODE_PIP": self.op.primary_ip,
1387
      "NODE_SIP": self.op.secondary_ip,
1388
      }
1389
    nodes_0 = self.cfg.GetNodeList()
1390
    nodes_1 = nodes_0 + [self.op.node_name, ]
1391
    return env, nodes_0, nodes_1
1392

    
1393
  def CheckPrereq(self):
1394
    """Check prerequisites.
1395

1396
    This checks:
1397
     - the new node is not already in the config
1398
     - it is resolvable
1399
     - its parameters (single/dual homed) matches the cluster
1400

1401
    Any errors are signalled by raising errors.OpPrereqError.
1402

1403
    """
1404
    node_name = self.op.node_name
1405
    cfg = self.cfg
1406

    
1407
    dns_data = utils.LookupHostname(node_name)
1408
    if not dns_data:
1409
      raise errors.OpPrereqError("Node %s is not resolvable" % node_name)
1410

    
1411
    node = dns_data.name
1412
    primary_ip = self.op.primary_ip = dns_data.ip
1413
    secondary_ip = getattr(self.op, "secondary_ip", None)
1414
    if secondary_ip is None:
1415
      secondary_ip = primary_ip
1416
    if not utils.IsValidIP(secondary_ip):
1417
      raise errors.OpPrereqError("Invalid secondary IP given")
1418
    self.op.secondary_ip = secondary_ip
1419
    node_list = cfg.GetNodeList()
1420
    if node in node_list:
1421
      raise errors.OpPrereqError("Node %s is already in the configuration"
1422
                                 % node)
1423

    
1424
    for existing_node_name in node_list:
1425
      existing_node = cfg.GetNodeInfo(existing_node_name)
1426
      if (existing_node.primary_ip == primary_ip or
1427
          existing_node.secondary_ip == primary_ip or
1428
          existing_node.primary_ip == secondary_ip or
1429
          existing_node.secondary_ip == secondary_ip):
1430
        raise errors.OpPrereqError("New node ip address(es) conflict with"
1431
                                   " existing node %s" % existing_node.name)
1432

    
1433
    # check that the type of the node (single versus dual homed) is the
1434
    # same as for the master
1435
    myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1436
    master_singlehomed = myself.secondary_ip == myself.primary_ip
1437
    newbie_singlehomed = secondary_ip == primary_ip
1438
    if master_singlehomed != newbie_singlehomed:
1439
      if master_singlehomed:
1440
        raise errors.OpPrereqError("The master has no private ip but the"
1441
                                   " new node has one")
1442
      else:
1443
        raise errors.OpPrereqError("The master has a private ip but the"
1444
                                   " new node doesn't have one")
1445

    
1446
    # checks reachablity
1447
    command = ["fping", "-q", primary_ip]
1448
    result = utils.RunCmd(command)
1449
    if result.failed:
1450
      raise errors.OpPrereqError("Node not reachable by ping")
1451

    
1452
    if not newbie_singlehomed:
1453
      # check reachability from my secondary ip to newbie's secondary ip
1454
      command = ["fping", "-S%s" % myself.secondary_ip, "-q", secondary_ip]
1455
      result = utils.RunCmd(command)
1456
      if result.failed:
1457
        raise errors.OpPrereqError("Node secondary ip not reachable by ping")
1458

    
1459
    self.new_node = objects.Node(name=node,
1460
                                 primary_ip=primary_ip,
1461
                                 secondary_ip=secondary_ip)
1462

    
1463
  def Exec(self, feedback_fn):
1464
    """Adds the new node to the cluster.
1465

1466
    """
1467
    new_node = self.new_node
1468
    node = new_node.name
1469

    
1470
    # set up inter-node password and certificate and restarts the node daemon
1471
    gntpass = self.sstore.GetNodeDaemonPassword()
1472
    if not re.match('^[a-zA-Z0-9.]{1,64}$', gntpass):
1473
      raise errors.OpExecError("ganeti password corruption detected")
1474
    f = open(constants.SSL_CERT_FILE)
1475
    try:
1476
      gntpem = f.read(8192)
1477
    finally:
1478
      f.close()
1479
    # in the base64 pem encoding, neither '!' nor '.' are valid chars,
1480
    # so we use this to detect an invalid certificate; as long as the
1481
    # cert doesn't contain this, the here-document will be correctly
1482
    # parsed by the shell sequence below
1483
    if re.search('^!EOF\.', gntpem, re.MULTILINE):
1484
      raise errors.OpExecError("invalid PEM encoding in the SSL certificate")
1485
    if not gntpem.endswith("\n"):
1486
      raise errors.OpExecError("PEM must end with newline")
1487
    logger.Info("copy cluster pass to %s and starting the node daemon" % node)
1488

    
1489
    # and then connect with ssh to set password and start ganeti-noded
1490
    # note that all the below variables are sanitized at this point,
1491
    # either by being constants or by the checks above
1492
    ss = self.sstore
1493
    mycommand = ("umask 077 && "
1494
                 "echo '%s' > '%s' && "
1495
                 "cat > '%s' << '!EOF.' && \n"
1496
                 "%s!EOF.\n%s restart" %
1497
                 (gntpass, ss.KeyToFilename(ss.SS_NODED_PASS),
1498
                  constants.SSL_CERT_FILE, gntpem,
1499
                  constants.NODE_INITD_SCRIPT))
1500

    
1501
    result = ssh.SSHCall(node, 'root', mycommand, batch=False, ask_key=True)
1502
    if result.failed:
1503
      raise errors.OpExecError("Remote command on node %s, error: %s,"
1504
                               " output: %s" %
1505
                               (node, result.fail_reason, result.output))
1506

    
1507
    # check connectivity
1508
    time.sleep(4)
1509

    
1510
    result = rpc.call_version([node])[node]
1511
    if result:
1512
      if constants.PROTOCOL_VERSION == result:
1513
        logger.Info("communication to node %s fine, sw version %s match" %
1514
                    (node, result))
1515
      else:
1516
        raise errors.OpExecError("Version mismatch master version %s,"
1517
                                 " node version %s" %
1518
                                 (constants.PROTOCOL_VERSION, result))
1519
    else:
1520
      raise errors.OpExecError("Cannot get version from the new node")
1521

    
1522
    # setup ssh on node
1523
    logger.Info("copy ssh key to node %s" % node)
1524
    keyarray = []
1525
    keyfiles = ["/etc/ssh/ssh_host_dsa_key", "/etc/ssh/ssh_host_dsa_key.pub",
1526
                "/etc/ssh/ssh_host_rsa_key", "/etc/ssh/ssh_host_rsa_key.pub",
1527
                "/root/.ssh/id_dsa", "/root/.ssh/id_dsa.pub"]
1528

    
1529
    for i in keyfiles:
1530
      f = open(i, 'r')
1531
      try:
1532
        keyarray.append(f.read())
1533
      finally:
1534
        f.close()
1535

    
1536
    result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1537
                               keyarray[3], keyarray[4], keyarray[5])
1538

    
1539
    if not result:
1540
      raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1541

    
1542
    # Add node to our /etc/hosts, and add key to known_hosts
1543
    _UpdateEtcHosts(new_node.name, new_node.primary_ip)
1544
    _UpdateKnownHosts(new_node.name, new_node.primary_ip,
1545
                      self.cfg.GetHostKey())
1546

    
1547
    if new_node.secondary_ip != new_node.primary_ip:
1548
      result = ssh.SSHCall(node, "root",
1549
                           "fping -S 127.0.0.1 -q %s" % new_node.secondary_ip)
1550
      if result.failed:
1551
        raise errors.OpExecError("Node claims it doesn't have the"
1552
                                 " secondary ip you gave (%s).\n"
1553
                                 "Please fix and re-run this command." %
1554
                                 new_node.secondary_ip)
1555

    
1556
    success, msg = ssh.VerifyNodeHostname(node)
1557
    if not success:
1558
      raise errors.OpExecError("Node '%s' claims it has a different hostname"
1559
                               " than the one the resolver gives: %s.\n"
1560
                               "Please fix and re-run this command." %
1561
                               (node, msg))
1562

    
1563
    # Distribute updated /etc/hosts and known_hosts to all nodes,
1564
    # including the node just added
1565
    myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1566
    dist_nodes = self.cfg.GetNodeList() + [node]
1567
    if myself.name in dist_nodes:
1568
      dist_nodes.remove(myself.name)
1569

    
1570
    logger.Debug("Copying hosts and known_hosts to all nodes")
1571
    for fname in ("/etc/hosts", constants.SSH_KNOWN_HOSTS_FILE):
1572
      result = rpc.call_upload_file(dist_nodes, fname)
1573
      for to_node in dist_nodes:
1574
        if not result[to_node]:
1575
          logger.Error("copy of file %s to node %s failed" %
1576
                       (fname, to_node))
1577

    
1578
    to_copy = ss.GetFileList()
1579
    for fname in to_copy:
1580
      if not ssh.CopyFileToNode(node, fname):
1581
        logger.Error("could not copy file %s to node %s" % (fname, node))
1582

    
1583
    logger.Info("adding node %s to cluster.conf" % node)
1584
    self.cfg.AddNode(new_node)
1585

    
1586

    
1587
class LUMasterFailover(LogicalUnit):
1588
  """Failover the master node to the current node.
1589

1590
  This is a special LU in that it must run on a non-master node.
1591

1592
  """
1593
  HPATH = "master-failover"
1594
  HTYPE = constants.HTYPE_CLUSTER
1595
  REQ_MASTER = False
1596
  _OP_REQP = []
1597

    
1598
  def BuildHooksEnv(self):
1599
    """Build hooks env.
1600

1601
    This will run on the new master only in the pre phase, and on all
1602
    the nodes in the post phase.
1603

1604
    """
1605
    env = {
1606
      "NEW_MASTER": self.new_master,
1607
      "OLD_MASTER": self.old_master,
1608
      }
1609
    return env, [self.new_master], self.cfg.GetNodeList()
1610

    
1611
  def CheckPrereq(self):
1612
    """Check prerequisites.
1613

1614
    This checks that we are not already the master.
1615

1616
    """
1617
    self.new_master = socket.gethostname()
1618

    
1619
    self.old_master = self.sstore.GetMasterNode()
1620

    
1621
    if self.old_master == self.new_master:
1622
      raise errors.OpPrereqError("This commands must be run on the node"
1623
                                 " where you want the new master to be.\n"
1624
                                 "%s is already the master" %
1625
                                 self.old_master)
1626

    
1627
  def Exec(self, feedback_fn):
1628
    """Failover the master node.
1629

1630
    This command, when run on a non-master node, will cause the current
1631
    master to cease being master, and the non-master to become new
1632
    master.
1633

1634
    """
1635
    #TODO: do not rely on gethostname returning the FQDN
1636
    logger.Info("setting master to %s, old master: %s" %
1637
                (self.new_master, self.old_master))
1638

    
1639
    if not rpc.call_node_stop_master(self.old_master):
1640
      logger.Error("could disable the master role on the old master"
1641
                   " %s, please disable manually" % self.old_master)
1642

    
1643
    ss = self.sstore
1644
    ss.SetKey(ss.SS_MASTER_NODE, self.new_master)
1645
    if not rpc.call_upload_file(self.cfg.GetNodeList(),
1646
                                ss.KeyToFilename(ss.SS_MASTER_NODE)):
1647
      logger.Error("could not distribute the new simple store master file"
1648
                   " to the other nodes, please check.")
1649

    
1650
    if not rpc.call_node_start_master(self.new_master):
1651
      logger.Error("could not start the master role on the new master"
1652
                   " %s, please check" % self.new_master)
1653
      feedback_fn("Error in activating the master IP on the new master,\n"
1654
                  "please fix manually.")
1655

    
1656

    
1657

    
1658
class LUQueryClusterInfo(NoHooksLU):
1659
  """Query cluster configuration.
1660

1661
  """
1662
  _OP_REQP = []
1663
  REQ_MASTER = False
1664

    
1665
  def CheckPrereq(self):
1666
    """No prerequsites needed for this LU.
1667

1668
    """
1669
    pass
1670

    
1671
  def Exec(self, feedback_fn):
1672
    """Return cluster config.
1673

1674
    """
1675
    result = {
1676
      "name": self.sstore.GetClusterName(),
1677
      "software_version": constants.RELEASE_VERSION,
1678
      "protocol_version": constants.PROTOCOL_VERSION,
1679
      "config_version": constants.CONFIG_VERSION,
1680
      "os_api_version": constants.OS_API_VERSION,
1681
      "export_version": constants.EXPORT_VERSION,
1682
      "master": self.sstore.GetMasterNode(),
1683
      "architecture": (platform.architecture()[0], platform.machine()),
1684
      }
1685

    
1686
    return result
1687

    
1688

    
1689
class LUClusterCopyFile(NoHooksLU):
1690
  """Copy file to cluster.
1691

1692
  """
1693
  _OP_REQP = ["nodes", "filename"]
1694

    
1695
  def CheckPrereq(self):
1696
    """Check prerequisites.
1697

1698
    It should check that the named file exists and that the given list
1699
    of nodes is valid.
1700

1701
    """
1702
    if not os.path.exists(self.op.filename):
1703
      raise errors.OpPrereqError("No such filename '%s'" % self.op.filename)
1704

    
1705
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1706

    
1707
  def Exec(self, feedback_fn):
1708
    """Copy a file from master to some nodes.
1709

1710
    Args:
1711
      opts - class with options as members
1712
      args - list containing a single element, the file name
1713
    Opts used:
1714
      nodes - list containing the name of target nodes; if empty, all nodes
1715

1716
    """
1717
    filename = self.op.filename
1718

    
1719
    myname = socket.gethostname()
1720

    
1721
    for node in self.nodes:
1722
      if node == myname:
1723
        continue
1724
      if not ssh.CopyFileToNode(node, filename):
1725
        logger.Error("Copy of file %s to node %s failed" % (filename, node))
1726

    
1727

    
1728
class LUDumpClusterConfig(NoHooksLU):
1729
  """Return a text-representation of the cluster-config.
1730

1731
  """
1732
  _OP_REQP = []
1733

    
1734
  def CheckPrereq(self):
1735
    """No prerequisites.
1736

1737
    """
1738
    pass
1739

    
1740
  def Exec(self, feedback_fn):
1741
    """Dump a representation of the cluster config to the standard output.
1742

1743
    """
1744
    return self.cfg.DumpConfig()
1745

    
1746

    
1747
class LURunClusterCommand(NoHooksLU):
1748
  """Run a command on some nodes.
1749

1750
  """
1751
  _OP_REQP = ["command", "nodes"]
1752

    
1753
  def CheckPrereq(self):
1754
    """Check prerequisites.
1755

1756
    It checks that the given list of nodes is valid.
1757

1758
    """
1759
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1760

    
1761
  def Exec(self, feedback_fn):
1762
    """Run a command on some nodes.
1763

1764
    """
1765
    data = []
1766
    for node in self.nodes:
1767
      result = ssh.SSHCall(node, "root", self.op.command)
1768
      data.append((node, result.output, result.exit_code))
1769

    
1770
    return data
1771

    
1772

    
1773
class LUActivateInstanceDisks(NoHooksLU):
1774
  """Bring up an instance's disks.
1775

1776
  """
1777
  _OP_REQP = ["instance_name"]
1778

    
1779
  def CheckPrereq(self):
1780
    """Check prerequisites.
1781

1782
    This checks that the instance is in the cluster.
1783

1784
    """
1785
    instance = self.cfg.GetInstanceInfo(
1786
      self.cfg.ExpandInstanceName(self.op.instance_name))
1787
    if instance is None:
1788
      raise errors.OpPrereqError("Instance '%s' not known" %
1789
                                 self.op.instance_name)
1790
    self.instance = instance
1791

    
1792

    
1793
  def Exec(self, feedback_fn):
1794
    """Activate the disks.
1795

1796
    """
1797
    disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
1798
    if not disks_ok:
1799
      raise errors.OpExecError("Cannot activate block devices")
1800

    
1801
    return disks_info
1802

    
1803

    
1804
def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
1805
  """Prepare the block devices for an instance.
1806

1807
  This sets up the block devices on all nodes.
1808

1809
  Args:
1810
    instance: a ganeti.objects.Instance object
1811
    ignore_secondaries: if true, errors on secondary nodes won't result
1812
                        in an error return from the function
1813

1814
  Returns:
1815
    false if the operation failed
1816
    list of (host, instance_visible_name, node_visible_name) if the operation
1817
         suceeded with the mapping from node devices to instance devices
1818
  """
1819
  device_info = []
1820
  disks_ok = True
1821
  for inst_disk in instance.disks:
1822
    master_result = None
1823
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1824
      cfg.SetDiskID(node_disk, node)
1825
      is_primary = node == instance.primary_node
1826
      result = rpc.call_blockdev_assemble(node, node_disk, is_primary)
1827
      if not result:
1828
        logger.Error("could not prepare block device %s on node %s (is_pri"
1829
                     "mary=%s)" % (inst_disk.iv_name, node, is_primary))
1830
        if is_primary or not ignore_secondaries:
1831
          disks_ok = False
1832
      if is_primary:
1833
        master_result = result
1834
    device_info.append((instance.primary_node, inst_disk.iv_name,
1835
                        master_result))
1836

    
1837
  return disks_ok, device_info
1838

    
1839

    
1840
def _StartInstanceDisks(cfg, instance, force):
1841
  """Start the disks of an instance.
1842

1843
  """
1844
  disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
1845
                                           ignore_secondaries=force)
1846
  if not disks_ok:
1847
    _ShutdownInstanceDisks(instance, cfg)
1848
    if force is not None and not force:
1849
      logger.Error("If the message above refers to a secondary node,"
1850
                   " you can retry the operation using '--force'.")
1851
    raise errors.OpExecError("Disk consistency error")
1852

    
1853

    
1854
class LUDeactivateInstanceDisks(NoHooksLU):
1855
  """Shutdown an instance's disks.
1856

1857
  """
1858
  _OP_REQP = ["instance_name"]
1859

    
1860
  def CheckPrereq(self):
1861
    """Check prerequisites.
1862

1863
    This checks that the instance is in the cluster.
1864

1865
    """
1866
    instance = self.cfg.GetInstanceInfo(
1867
      self.cfg.ExpandInstanceName(self.op.instance_name))
1868
    if instance is None:
1869
      raise errors.OpPrereqError("Instance '%s' not known" %
1870
                                 self.op.instance_name)
1871
    self.instance = instance
1872

    
1873
  def Exec(self, feedback_fn):
1874
    """Deactivate the disks
1875

1876
    """
1877
    instance = self.instance
1878
    ins_l = rpc.call_instance_list([instance.primary_node])
1879
    ins_l = ins_l[instance.primary_node]
1880
    if not type(ins_l) is list:
1881
      raise errors.OpExecError("Can't contact node '%s'" %
1882
                               instance.primary_node)
1883

    
1884
    if self.instance.name in ins_l:
1885
      raise errors.OpExecError("Instance is running, can't shutdown"
1886
                               " block devices.")
1887

    
1888
    _ShutdownInstanceDisks(instance, self.cfg)
1889

    
1890

    
1891
def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
1892
  """Shutdown block devices of an instance.
1893

1894
  This does the shutdown on all nodes of the instance.
1895

1896
  If the ignore_primary is false, errors on the primary node are
1897
  ignored.
1898

1899
  """
1900
  result = True
1901
  for disk in instance.disks:
1902
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
1903
      cfg.SetDiskID(top_disk, node)
1904
      if not rpc.call_blockdev_shutdown(node, top_disk):
1905
        logger.Error("could not shutdown block device %s on node %s" %
1906
                     (disk.iv_name, node))
1907
        if not ignore_primary or node != instance.primary_node:
1908
          result = False
1909
  return result
1910

    
1911

    
1912
class LUStartupInstance(LogicalUnit):
1913
  """Starts an instance.
1914

1915
  """
1916
  HPATH = "instance-start"
1917
  HTYPE = constants.HTYPE_INSTANCE
1918
  _OP_REQP = ["instance_name", "force"]
1919

    
1920
  def BuildHooksEnv(self):
1921
    """Build hooks env.
1922

1923
    This runs on master, primary and secondary nodes of the instance.
1924

1925
    """
1926
    env = {
1927
      "FORCE": self.op.force,
1928
      }
1929
    env.update(_BuildInstanceHookEnvByObject(self.instance))
1930
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
1931
          list(self.instance.secondary_nodes))
1932
    return env, nl, nl
1933

    
1934
  def CheckPrereq(self):
1935
    """Check prerequisites.
1936

1937
    This checks that the instance is in the cluster.
1938

1939
    """
1940
    instance = self.cfg.GetInstanceInfo(
1941
      self.cfg.ExpandInstanceName(self.op.instance_name))
1942
    if instance is None:
1943
      raise errors.OpPrereqError("Instance '%s' not known" %
1944
                                 self.op.instance_name)
1945

    
1946
    # check bridges existance
1947
    brlist = [nic.bridge for nic in instance.nics]
1948
    if not rpc.call_bridges_exist(instance.primary_node, brlist):
1949
      raise errors.OpPrereqError("one or more target bridges %s does not"
1950
                                 " exist on destination node '%s'" %
1951
                                 (brlist, instance.primary_node))
1952

    
1953
    self.instance = instance
1954
    self.op.instance_name = instance.name
1955

    
1956
  def Exec(self, feedback_fn):
1957
    """Start the instance.
1958

1959
    """
1960
    instance = self.instance
1961
    force = self.op.force
1962
    extra_args = getattr(self.op, "extra_args", "")
1963

    
1964
    node_current = instance.primary_node
1965

    
1966
    nodeinfo = rpc.call_node_info([node_current], self.cfg.GetVGName())
1967
    if not nodeinfo:
1968
      raise errors.OpExecError("Could not contact node %s for infos" %
1969
                               (node_current))
1970

    
1971
    freememory = nodeinfo[node_current]['memory_free']
1972
    memory = instance.memory
1973
    if memory > freememory:
1974
      raise errors.OpExecError("Not enough memory to start instance"
1975
                               " %s on node %s"
1976
                               " needed %s MiB, available %s MiB" %
1977
                               (instance.name, node_current, memory,
1978
                                freememory))
1979

    
1980
    _StartInstanceDisks(self.cfg, instance, force)
1981

    
1982
    if not rpc.call_instance_start(node_current, instance, extra_args):
1983
      _ShutdownInstanceDisks(instance, self.cfg)
1984
      raise errors.OpExecError("Could not start instance")
1985

    
1986
    self.cfg.MarkInstanceUp(instance.name)
1987

    
1988

    
1989
class LUShutdownInstance(LogicalUnit):
1990
  """Shutdown an instance.
1991

1992
  """
1993
  HPATH = "instance-stop"
1994
  HTYPE = constants.HTYPE_INSTANCE
1995
  _OP_REQP = ["instance_name"]
1996

    
1997
  def BuildHooksEnv(self):
1998
    """Build hooks env.
1999

2000
    This runs on master, primary and secondary nodes of the instance.
2001

2002
    """
2003
    env = _BuildInstanceHookEnvByObject(self.instance)
2004
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2005
          list(self.instance.secondary_nodes))
2006
    return env, nl, nl
2007

    
2008
  def CheckPrereq(self):
2009
    """Check prerequisites.
2010

2011
    This checks that the instance is in the cluster.
2012

2013
    """
2014
    instance = self.cfg.GetInstanceInfo(
2015
      self.cfg.ExpandInstanceName(self.op.instance_name))
2016
    if instance is None:
2017
      raise errors.OpPrereqError("Instance '%s' not known" %
2018
                                 self.op.instance_name)
2019
    self.instance = instance
2020

    
2021
  def Exec(self, feedback_fn):
2022
    """Shutdown the instance.
2023

2024
    """
2025
    instance = self.instance
2026
    node_current = instance.primary_node
2027
    if not rpc.call_instance_shutdown(node_current, instance):
2028
      logger.Error("could not shutdown instance")
2029

    
2030
    self.cfg.MarkInstanceDown(instance.name)
2031
    _ShutdownInstanceDisks(instance, self.cfg)
2032

    
2033

    
2034
class LUReinstallInstance(LogicalUnit):
2035
  """Reinstall an instance.
2036

2037
  """
2038
  HPATH = "instance-reinstall"
2039
  HTYPE = constants.HTYPE_INSTANCE
2040
  _OP_REQP = ["instance_name"]
2041

    
2042
  def BuildHooksEnv(self):
2043
    """Build hooks env.
2044

2045
    This runs on master, primary and secondary nodes of the instance.
2046

2047
    """
2048
    env = _BuildInstanceHookEnvByObject(self.instance)
2049
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2050
          list(self.instance.secondary_nodes))
2051
    return env, nl, nl
2052

    
2053
  def CheckPrereq(self):
2054
    """Check prerequisites.
2055

2056
    This checks that the instance is in the cluster and is not running.
2057

2058
    """
2059
    instance = self.cfg.GetInstanceInfo(
2060
      self.cfg.ExpandInstanceName(self.op.instance_name))
2061
    if instance is None:
2062
      raise errors.OpPrereqError("Instance '%s' not known" %
2063
                                 self.op.instance_name)
2064
    if instance.disk_template == constants.DT_DISKLESS:
2065
      raise errors.OpPrereqError("Instance '%s' has no disks" %
2066
                                 self.op.instance_name)
2067
    if instance.status != "down":
2068
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2069
                                 self.op.instance_name)
2070
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2071
    if remote_info:
2072
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2073
                                 (self.op.instance_name,
2074
                                  instance.primary_node))
2075

    
2076
    self.op.os_type = getattr(self.op, "os_type", None)
2077
    if self.op.os_type is not None:
2078
      # OS verification
2079
      pnode = self.cfg.GetNodeInfo(
2080
        self.cfg.ExpandNodeName(instance.primary_node))
2081
      if pnode is None:
2082
        raise errors.OpPrereqError("Primary node '%s' is unknown" %
2083
                                   self.op.pnode)
2084
      os_obj = rpc.call_os_get([pnode.name], self.op.os_type)[pnode.name]
2085
      if not isinstance(os_obj, objects.OS):
2086
        raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2087
                                   " primary node"  % self.op.os_type)
2088

    
2089
    self.instance = instance
2090

    
2091
  def Exec(self, feedback_fn):
2092
    """Reinstall the instance.
2093

2094
    """
2095
    inst = self.instance
2096

    
2097
    if self.op.os_type is not None:
2098
      feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2099
      inst.os = self.op.os_type
2100
      self.cfg.AddInstance(inst)
2101

    
2102
    _StartInstanceDisks(self.cfg, inst, None)
2103
    try:
2104
      feedback_fn("Running the instance OS create scripts...")
2105
      if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2106
        raise errors.OpExecError("Could not install OS for instance %s "
2107
                                 "on node %s" %
2108
                                 (inst.name, inst.primary_node))
2109
    finally:
2110
      _ShutdownInstanceDisks(inst, self.cfg)
2111

    
2112

    
2113
class LURenameInstance(LogicalUnit):
2114
  """Rename an instance.
2115

2116
  """
2117
  HPATH = "instance-rename"
2118
  HTYPE = constants.HTYPE_INSTANCE
2119
  _OP_REQP = ["instance_name", "new_name"]
2120

    
2121
  def BuildHooksEnv(self):
2122
    """Build hooks env.
2123

2124
    This runs on master, primary and secondary nodes of the instance.
2125

2126
    """
2127
    env = _BuildInstanceHookEnvByObject(self.instance)
2128
    env["INSTANCE_NEW_NAME"] = self.op.new_name
2129
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2130
          list(self.instance.secondary_nodes))
2131
    return env, nl, nl
2132

    
2133
  def CheckPrereq(self):
2134
    """Check prerequisites.
2135

2136
    This checks that the instance is in the cluster and is not running.
2137

2138
    """
2139
    instance = self.cfg.GetInstanceInfo(
2140
      self.cfg.ExpandInstanceName(self.op.instance_name))
2141
    if instance is None:
2142
      raise errors.OpPrereqError("Instance '%s' not known" %
2143
                                 self.op.instance_name)
2144
    if instance.status != "down":
2145
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2146
                                 self.op.instance_name)
2147
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2148
    if remote_info:
2149
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2150
                                 (self.op.instance_name,
2151
                                  instance.primary_node))
2152
    self.instance = instance
2153

    
2154
    # new name verification
2155
    hostname1 = utils.LookupHostname(self.op.new_name)
2156
    if not hostname1:
2157
      raise errors.OpPrereqError("New instance name '%s' not found in dns" %
2158
                                 self.op.new_name)
2159

    
2160
    self.op.new_name = new_name = hostname1.name
2161
    if not getattr(self.op, "ignore_ip", False):
2162
      command = ["fping", "-q", hostname1.ip]
2163
      result = utils.RunCmd(command)
2164
      if not result.failed:
2165
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
2166
                                   (hostname1.ip, new_name))
2167

    
2168

    
2169
  def Exec(self, feedback_fn):
2170
    """Reinstall the instance.
2171

2172
    """
2173
    inst = self.instance
2174
    old_name = inst.name
2175

    
2176
    self.cfg.RenameInstance(inst.name, self.op.new_name)
2177

    
2178
    # re-read the instance from the configuration after rename
2179
    inst = self.cfg.GetInstanceInfo(self.op.new_name)
2180

    
2181
    _StartInstanceDisks(self.cfg, inst, None)
2182
    try:
2183
      if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2184
                                          "sda", "sdb"):
2185
        msg = ("Could run OS rename script for instance %s\n"
2186
               "on node %s\n"
2187
               "(but the instance has been renamed in Ganeti)" %
2188
               (inst.name, inst.primary_node))
2189
        logger.Error(msg)
2190
    finally:
2191
      _ShutdownInstanceDisks(inst, self.cfg)
2192

    
2193

    
2194
class LURemoveInstance(LogicalUnit):
2195
  """Remove an instance.
2196

2197
  """
2198
  HPATH = "instance-remove"
2199
  HTYPE = constants.HTYPE_INSTANCE
2200
  _OP_REQP = ["instance_name"]
2201

    
2202
  def BuildHooksEnv(self):
2203
    """Build hooks env.
2204

2205
    This runs on master, primary and secondary nodes of the instance.
2206

2207
    """
2208
    env = _BuildInstanceHookEnvByObject(self.instance)
2209
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2210
          list(self.instance.secondary_nodes))
2211
    return env, nl, nl
2212

    
2213
  def CheckPrereq(self):
2214
    """Check prerequisites.
2215

2216
    This checks that the instance is in the cluster.
2217

2218
    """
2219
    instance = self.cfg.GetInstanceInfo(
2220
      self.cfg.ExpandInstanceName(self.op.instance_name))
2221
    if instance is None:
2222
      raise errors.OpPrereqError("Instance '%s' not known" %
2223
                                 self.op.instance_name)
2224
    self.instance = instance
2225

    
2226
  def Exec(self, feedback_fn):
2227
    """Remove the instance.
2228

2229
    """
2230
    instance = self.instance
2231
    logger.Info("shutting down instance %s on node %s" %
2232
                (instance.name, instance.primary_node))
2233

    
2234
    if not rpc.call_instance_shutdown(instance.primary_node, instance):
2235
      raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2236
                               (instance.name, instance.primary_node))
2237

    
2238
    logger.Info("removing block devices for instance %s" % instance.name)
2239

    
2240
    _RemoveDisks(instance, self.cfg)
2241

    
2242
    logger.Info("removing instance %s out of cluster config" % instance.name)
2243

    
2244
    self.cfg.RemoveInstance(instance.name)
2245

    
2246

    
2247
class LUQueryInstances(NoHooksLU):
2248
  """Logical unit for querying instances.
2249

2250
  """
2251
  _OP_REQP = ["output_fields", "names"]
2252

    
2253
  def CheckPrereq(self):
2254
    """Check prerequisites.
2255

2256
    This checks that the fields required are valid output fields.
2257

2258
    """
2259
    self.dynamic_fields = frozenset(["oper_state", "oper_ram"])
2260
    _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2261
                               "admin_state", "admin_ram",
2262
                               "disk_template", "ip", "mac", "bridge",
2263
                               "sda_size", "sdb_size"],
2264
                       dynamic=self.dynamic_fields,
2265
                       selected=self.op.output_fields)
2266

    
2267
    self.wanted = _GetWantedInstances(self, self.op.names)
2268

    
2269
  def Exec(self, feedback_fn):
2270
    """Computes the list of nodes and their attributes.
2271

2272
    """
2273
    instance_names = self.wanted
2274
    instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2275
                     in instance_names]
2276

    
2277
    # begin data gathering
2278

    
2279
    nodes = frozenset([inst.primary_node for inst in instance_list])
2280

    
2281
    bad_nodes = []
2282
    if self.dynamic_fields.intersection(self.op.output_fields):
2283
      live_data = {}
2284
      node_data = rpc.call_all_instances_info(nodes)
2285
      for name in nodes:
2286
        result = node_data[name]
2287
        if result:
2288
          live_data.update(result)
2289
        elif result == False:
2290
          bad_nodes.append(name)
2291
        # else no instance is alive
2292
    else:
2293
      live_data = dict([(name, {}) for name in instance_names])
2294

    
2295
    # end data gathering
2296

    
2297
    output = []
2298
    for instance in instance_list:
2299
      iout = []
2300
      for field in self.op.output_fields:
2301
        if field == "name":
2302
          val = instance.name
2303
        elif field == "os":
2304
          val = instance.os
2305
        elif field == "pnode":
2306
          val = instance.primary_node
2307
        elif field == "snodes":
2308
          val = list(instance.secondary_nodes)
2309
        elif field == "admin_state":
2310
          val = (instance.status != "down")
2311
        elif field == "oper_state":
2312
          if instance.primary_node in bad_nodes:
2313
            val = None
2314
          else:
2315
            val = bool(live_data.get(instance.name))
2316
        elif field == "admin_ram":
2317
          val = instance.memory
2318
        elif field == "oper_ram":
2319
          if instance.primary_node in bad_nodes:
2320
            val = None
2321
          elif instance.name in live_data:
2322
            val = live_data[instance.name].get("memory", "?")
2323
          else:
2324
            val = "-"
2325
        elif field == "disk_template":
2326
          val = instance.disk_template
2327
        elif field == "ip":
2328
          val = instance.nics[0].ip
2329
        elif field == "bridge":
2330
          val = instance.nics[0].bridge
2331
        elif field == "mac":
2332
          val = instance.nics[0].mac
2333
        elif field == "sda_size" or field == "sdb_size":
2334
          disk = instance.FindDisk(field[:3])
2335
          if disk is None:
2336
            val = None
2337
          else:
2338
            val = disk.size
2339
        else:
2340
          raise errors.ParameterError(field)
2341
        iout.append(val)
2342
      output.append(iout)
2343

    
2344
    return output
2345

    
2346

    
2347
class LUFailoverInstance(LogicalUnit):
2348
  """Failover an instance.
2349

2350
  """
2351
  HPATH = "instance-failover"
2352
  HTYPE = constants.HTYPE_INSTANCE
2353
  _OP_REQP = ["instance_name", "ignore_consistency"]
2354

    
2355
  def BuildHooksEnv(self):
2356
    """Build hooks env.
2357

2358
    This runs on master, primary and secondary nodes of the instance.
2359

2360
    """
2361
    env = {
2362
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2363
      }
2364
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2365
    nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2366
    return env, nl, nl
2367

    
2368
  def CheckPrereq(self):
2369
    """Check prerequisites.
2370

2371
    This checks that the instance is in the cluster.
2372

2373
    """
2374
    instance = self.cfg.GetInstanceInfo(
2375
      self.cfg.ExpandInstanceName(self.op.instance_name))
2376
    if instance is None:
2377
      raise errors.OpPrereqError("Instance '%s' not known" %
2378
                                 self.op.instance_name)
2379

    
2380
    if instance.disk_template != constants.DT_REMOTE_RAID1:
2381
      raise errors.OpPrereqError("Instance's disk layout is not"
2382
                                 " remote_raid1.")
2383

    
2384
    secondary_nodes = instance.secondary_nodes
2385
    if not secondary_nodes:
2386
      raise errors.ProgrammerError("no secondary node but using "
2387
                                   "DT_REMOTE_RAID1 template")
2388

    
2389
    # check memory requirements on the secondary node
2390
    target_node = secondary_nodes[0]
2391
    nodeinfo = rpc.call_node_info([target_node], self.cfg.GetVGName())
2392
    info = nodeinfo.get(target_node, None)
2393
    if not info:
2394
      raise errors.OpPrereqError("Cannot get current information"
2395
                                 " from node '%s'" % nodeinfo)
2396
    if instance.memory > info['memory_free']:
2397
      raise errors.OpPrereqError("Not enough memory on target node %s."
2398
                                 " %d MB available, %d MB required" %
2399
                                 (target_node, info['memory_free'],
2400
                                  instance.memory))
2401

    
2402
    # check bridge existance
2403
    brlist = [nic.bridge for nic in instance.nics]
2404
    if not rpc.call_bridges_exist(instance.primary_node, brlist):
2405
      raise errors.OpPrereqError("One or more target bridges %s does not"
2406
                                 " exist on destination node '%s'" %
2407
                                 (brlist, instance.primary_node))
2408

    
2409
    self.instance = instance
2410

    
2411
  def Exec(self, feedback_fn):
2412
    """Failover an instance.
2413

2414
    The failover is done by shutting it down on its present node and
2415
    starting it on the secondary.
2416

2417
    """
2418
    instance = self.instance
2419

    
2420
    source_node = instance.primary_node
2421
    target_node = instance.secondary_nodes[0]
2422

    
2423
    feedback_fn("* checking disk consistency between source and target")
2424
    for dev in instance.disks:
2425
      # for remote_raid1, these are md over drbd
2426
      if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2427
        if not self.op.ignore_consistency:
2428
          raise errors.OpExecError("Disk %s is degraded on target node,"
2429
                                   " aborting failover." % dev.iv_name)
2430

    
2431
    feedback_fn("* checking target node resource availability")
2432
    nodeinfo = rpc.call_node_info([target_node], self.cfg.GetVGName())
2433

    
2434
    if not nodeinfo:
2435
      raise errors.OpExecError("Could not contact target node %s." %
2436
                               target_node)
2437

    
2438
    free_memory = int(nodeinfo[target_node]['memory_free'])
2439
    memory = instance.memory
2440
    if memory > free_memory:
2441
      raise errors.OpExecError("Not enough memory to create instance %s on"
2442
                               " node %s. needed %s MiB, available %s MiB" %
2443
                               (instance.name, target_node, memory,
2444
                                free_memory))
2445

    
2446
    feedback_fn("* shutting down instance on source node")
2447
    logger.Info("Shutting down instance %s on node %s" %
2448
                (instance.name, source_node))
2449

    
2450
    if not rpc.call_instance_shutdown(source_node, instance):
2451
      logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2452
                   " anyway. Please make sure node %s is down"  %
2453
                   (instance.name, source_node, source_node))
2454

    
2455
    feedback_fn("* deactivating the instance's disks on source node")
2456
    if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2457
      raise errors.OpExecError("Can't shut down the instance's disks.")
2458

    
2459
    instance.primary_node = target_node
2460
    # distribute new instance config to the other nodes
2461
    self.cfg.AddInstance(instance)
2462

    
2463
    feedback_fn("* activating the instance's disks on target node")
2464
    logger.Info("Starting instance %s on node %s" %
2465
                (instance.name, target_node))
2466

    
2467
    disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2468
                                             ignore_secondaries=True)
2469
    if not disks_ok:
2470
      _ShutdownInstanceDisks(instance, self.cfg)
2471
      raise errors.OpExecError("Can't activate the instance's disks")
2472

    
2473
    feedback_fn("* starting the instance on the target node")
2474
    if not rpc.call_instance_start(target_node, instance, None):
2475
      _ShutdownInstanceDisks(instance, self.cfg)
2476
      raise errors.OpExecError("Could not start instance %s on node %s." %
2477
                               (instance.name, target_node))
2478

    
2479

    
2480
def _CreateBlockDevOnPrimary(cfg, node, device, info):
2481
  """Create a tree of block devices on the primary node.
2482

2483
  This always creates all devices.
2484

2485
  """
2486
  if device.children:
2487
    for child in device.children:
2488
      if not _CreateBlockDevOnPrimary(cfg, node, child, info):
2489
        return False
2490

    
2491
  cfg.SetDiskID(device, node)
2492
  new_id = rpc.call_blockdev_create(node, device, device.size, True, info)
2493
  if not new_id:
2494
    return False
2495
  if device.physical_id is None:
2496
    device.physical_id = new_id
2497
  return True
2498

    
2499

    
2500
def _CreateBlockDevOnSecondary(cfg, node, device, force, info):
2501
  """Create a tree of block devices on a secondary node.
2502

2503
  If this device type has to be created on secondaries, create it and
2504
  all its children.
2505

2506
  If not, just recurse to children keeping the same 'force' value.
2507

2508
  """
2509
  if device.CreateOnSecondary():
2510
    force = True
2511
  if device.children:
2512
    for child in device.children:
2513
      if not _CreateBlockDevOnSecondary(cfg, node, child, force, info):
2514
        return False
2515

    
2516
  if not force:
2517
    return True
2518
  cfg.SetDiskID(device, node)
2519
  new_id = rpc.call_blockdev_create(node, device, device.size, False, info)
2520
  if not new_id:
2521
    return False
2522
  if device.physical_id is None:
2523
    device.physical_id = new_id
2524
  return True
2525

    
2526

    
2527
def _GenerateUniqueNames(cfg, exts):
2528
  """Generate a suitable LV name.
2529

2530
  This will generate a logical volume name for the given instance.
2531

2532
  """
2533
  results = []
2534
  for val in exts:
2535
    new_id = cfg.GenerateUniqueID()
2536
    results.append("%s%s" % (new_id, val))
2537
  return results
2538

    
2539

    
2540
def _GenerateMDDRBDBranch(cfg, primary, secondary, size, names):
2541
  """Generate a drbd device complete with its children.
2542

2543
  """
2544
  port = cfg.AllocatePort()
2545
  vgname = cfg.GetVGName()
2546
  dev_data = objects.Disk(dev_type="lvm", size=size,
2547
                          logical_id=(vgname, names[0]))
2548
  dev_meta = objects.Disk(dev_type="lvm", size=128,
2549
                          logical_id=(vgname, names[1]))
2550
  drbd_dev = objects.Disk(dev_type="drbd", size=size,
2551
                          logical_id = (primary, secondary, port),
2552
                          children = [dev_data, dev_meta])
2553
  return drbd_dev
2554

    
2555

    
2556
def _GenerateDiskTemplate(cfg, template_name,
2557
                          instance_name, primary_node,
2558
                          secondary_nodes, disk_sz, swap_sz):
2559
  """Generate the entire disk layout for a given template type.
2560

2561
  """
2562
  #TODO: compute space requirements
2563

    
2564
  vgname = cfg.GetVGName()
2565
  if template_name == "diskless":
2566
    disks = []
2567
  elif template_name == "plain":
2568
    if len(secondary_nodes) != 0:
2569
      raise errors.ProgrammerError("Wrong template configuration")
2570

    
2571
    names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
2572
    sda_dev = objects.Disk(dev_type="lvm", size=disk_sz,
2573
                           logical_id=(vgname, names[0]),
2574
                           iv_name = "sda")
2575
    sdb_dev = objects.Disk(dev_type="lvm", size=swap_sz,
2576
                           logical_id=(vgname, names[1]),
2577
                           iv_name = "sdb")
2578
    disks = [sda_dev, sdb_dev]
2579
  elif template_name == "local_raid1":
2580
    if len(secondary_nodes) != 0:
2581
      raise errors.ProgrammerError("Wrong template configuration")
2582

    
2583

    
2584
    names = _GenerateUniqueNames(cfg, [".sda_m1", ".sda_m2",
2585
                                       ".sdb_m1", ".sdb_m2"])
2586
    sda_dev_m1 = objects.Disk(dev_type="lvm", size=disk_sz,
2587
                              logical_id=(vgname, names[0]))
2588
    sda_dev_m2 = objects.Disk(dev_type="lvm", size=disk_sz,
2589
                              logical_id=(vgname, names[1]))
2590
    md_sda_dev = objects.Disk(dev_type="md_raid1", iv_name = "sda",
2591
                              size=disk_sz,
2592
                              children = [sda_dev_m1, sda_dev_m2])
2593
    sdb_dev_m1 = objects.Disk(dev_type="lvm", size=swap_sz,
2594
                              logical_id=(vgname, names[2]))
2595
    sdb_dev_m2 = objects.Disk(dev_type="lvm", size=swap_sz,
2596
                              logical_id=(vgname, names[3]))
2597
    md_sdb_dev = objects.Disk(dev_type="md_raid1", iv_name = "sdb",
2598
                              size=swap_sz,
2599
                              children = [sdb_dev_m1, sdb_dev_m2])
2600
    disks = [md_sda_dev, md_sdb_dev]
2601
  elif template_name == constants.DT_REMOTE_RAID1:
2602
    if len(secondary_nodes) != 1:
2603
      raise errors.ProgrammerError("Wrong template configuration")
2604
    remote_node = secondary_nodes[0]
2605
    names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2606
                                       ".sdb_data", ".sdb_meta"])
2607
    drbd_sda_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2608
                                         disk_sz, names[0:2])
2609
    md_sda_dev = objects.Disk(dev_type="md_raid1", iv_name="sda",
2610
                              children = [drbd_sda_dev], size=disk_sz)
2611
    drbd_sdb_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2612
                                         swap_sz, names[2:4])
2613
    md_sdb_dev = objects.Disk(dev_type="md_raid1", iv_name="sdb",
2614
                              children = [drbd_sdb_dev], size=swap_sz)
2615
    disks = [md_sda_dev, md_sdb_dev]
2616
  else:
2617
    raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
2618
  return disks
2619

    
2620

    
2621
def _GetInstanceInfoText(instance):
2622
  """Compute that text that should be added to the disk's metadata.
2623

2624
  """
2625
  return "originstname+%s" % instance.name
2626

    
2627

    
2628
def _CreateDisks(cfg, instance):
2629
  """Create all disks for an instance.
2630

2631
  This abstracts away some work from AddInstance.
2632

2633
  Args:
2634
    instance: the instance object
2635

2636
  Returns:
2637
    True or False showing the success of the creation process
2638

2639
  """
2640
  info = _GetInstanceInfoText(instance)
2641

    
2642
  for device in instance.disks:
2643
    logger.Info("creating volume %s for instance %s" %
2644
              (device.iv_name, instance.name))
2645
    #HARDCODE
2646
    for secondary_node in instance.secondary_nodes:
2647
      if not _CreateBlockDevOnSecondary(cfg, secondary_node, device, False,
2648
                                        info):
2649
        logger.Error("failed to create volume %s (%s) on secondary node %s!" %
2650
                     (device.iv_name, device, secondary_node))
2651
        return False
2652
    #HARDCODE
2653
    if not _CreateBlockDevOnPrimary(cfg, instance.primary_node, device, info):
2654
      logger.Error("failed to create volume %s on primary!" %
2655
                   device.iv_name)
2656
      return False
2657
  return True
2658

    
2659

    
2660
def _RemoveDisks(instance, cfg):
2661
  """Remove all disks for an instance.
2662

2663
  This abstracts away some work from `AddInstance()` and
2664
  `RemoveInstance()`. Note that in case some of the devices couldn't
2665
  be remove, the removal will continue with the other ones (compare
2666
  with `_CreateDisks()`).
2667

2668
  Args:
2669
    instance: the instance object
2670

2671
  Returns:
2672
    True or False showing the success of the removal proces
2673

2674
  """
2675
  logger.Info("removing block devices for instance %s" % instance.name)
2676

    
2677
  result = True
2678
  for device in instance.disks:
2679
    for node, disk in device.ComputeNodeTree(instance.primary_node):
2680
      cfg.SetDiskID(disk, node)
2681
      if not rpc.call_blockdev_remove(node, disk):
2682
        logger.Error("could not remove block device %s on node %s,"
2683
                     " continuing anyway" %
2684
                     (device.iv_name, node))
2685
        result = False
2686
  return result
2687

    
2688

    
2689
class LUCreateInstance(LogicalUnit):
2690
  """Create an instance.
2691

2692
  """
2693
  HPATH = "instance-add"
2694
  HTYPE = constants.HTYPE_INSTANCE
2695
  _OP_REQP = ["instance_name", "mem_size", "disk_size", "pnode",
2696
              "disk_template", "swap_size", "mode", "start", "vcpus",
2697
              "wait_for_sync"]
2698

    
2699
  def BuildHooksEnv(self):
2700
    """Build hooks env.
2701

2702
    This runs on master, primary and secondary nodes of the instance.
2703

2704
    """
2705
    env = {
2706
      "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
2707
      "INSTANCE_DISK_SIZE": self.op.disk_size,
2708
      "INSTANCE_SWAP_SIZE": self.op.swap_size,
2709
      "INSTANCE_ADD_MODE": self.op.mode,
2710
      }
2711
    if self.op.mode == constants.INSTANCE_IMPORT:
2712
      env["INSTANCE_SRC_NODE"] = self.op.src_node
2713
      env["INSTANCE_SRC_PATH"] = self.op.src_path
2714
      env["INSTANCE_SRC_IMAGE"] = self.src_image
2715

    
2716
    env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
2717
      primary_node=self.op.pnode,
2718
      secondary_nodes=self.secondaries,
2719
      status=self.instance_status,
2720
      os_type=self.op.os_type,
2721
      memory=self.op.mem_size,
2722
      vcpus=self.op.vcpus,
2723
      nics=[(self.inst_ip, self.op.bridge)],
2724
    ))
2725

    
2726
    nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
2727
          self.secondaries)
2728
    return env, nl, nl
2729

    
2730

    
2731
  def CheckPrereq(self):
2732
    """Check prerequisites.
2733

2734
    """
2735
    if self.op.mode not in (constants.INSTANCE_CREATE,
2736
                            constants.INSTANCE_IMPORT):
2737
      raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
2738
                                 self.op.mode)
2739

    
2740
    if self.op.mode == constants.INSTANCE_IMPORT:
2741
      src_node = getattr(self.op, "src_node", None)
2742
      src_path = getattr(self.op, "src_path", None)
2743
      if src_node is None or src_path is None:
2744
        raise errors.OpPrereqError("Importing an instance requires source"
2745
                                   " node and path options")
2746
      src_node_full = self.cfg.ExpandNodeName(src_node)
2747
      if src_node_full is None:
2748
        raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
2749
      self.op.src_node = src_node = src_node_full
2750

    
2751
      if not os.path.isabs(src_path):
2752
        raise errors.OpPrereqError("The source path must be absolute")
2753

    
2754
      export_info = rpc.call_export_info(src_node, src_path)
2755

    
2756
      if not export_info:
2757
        raise errors.OpPrereqError("No export found in dir %s" % src_path)
2758

    
2759
      if not export_info.has_section(constants.INISECT_EXP):
2760
        raise errors.ProgrammerError("Corrupted export config")
2761

    
2762
      ei_version = export_info.get(constants.INISECT_EXP, 'version')
2763
      if (int(ei_version) != constants.EXPORT_VERSION):
2764
        raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
2765
                                   (ei_version, constants.EXPORT_VERSION))
2766

    
2767
      if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
2768
        raise errors.OpPrereqError("Can't import instance with more than"
2769
                                   " one data disk")
2770

    
2771
      # FIXME: are the old os-es, disk sizes, etc. useful?
2772
      self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
2773
      diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
2774
                                                         'disk0_dump'))
2775
      self.src_image = diskimage
2776
    else: # INSTANCE_CREATE
2777
      if getattr(self.op, "os_type", None) is None:
2778
        raise errors.OpPrereqError("No guest OS specified")
2779

    
2780
    # check primary node
2781
    pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
2782
    if pnode is None:
2783
      raise errors.OpPrereqError("Primary node '%s' is unknown" %
2784
                                 self.op.pnode)
2785
    self.op.pnode = pnode.name
2786
    self.pnode = pnode
2787
    self.secondaries = []
2788
    # disk template and mirror node verification
2789
    if self.op.disk_template not in constants.DISK_TEMPLATES:
2790
      raise errors.OpPrereqError("Invalid disk template name")
2791

    
2792
    if self.op.disk_template == constants.DT_REMOTE_RAID1:
2793
      if getattr(self.op, "snode", None) is None:
2794
        raise errors.OpPrereqError("The 'remote_raid1' disk template needs"
2795
                                   " a mirror node")
2796

    
2797
      snode_name = self.cfg.ExpandNodeName(self.op.snode)
2798
      if snode_name is None:
2799
        raise errors.OpPrereqError("Unknown secondary node '%s'" %
2800
                                   self.op.snode)
2801
      elif snode_name == pnode.name:
2802
        raise errors.OpPrereqError("The secondary node cannot be"
2803
                                   " the primary node.")
2804
      self.secondaries.append(snode_name)
2805

    
2806
    # Check lv size requirements
2807
    nodenames = [pnode.name] + self.secondaries
2808
    nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
2809

    
2810
    # Required free disk space as a function of disk and swap space
2811
    req_size_dict = {
2812
      constants.DT_DISKLESS: 0,
2813
      constants.DT_PLAIN: self.op.disk_size + self.op.swap_size,
2814
      constants.DT_LOCAL_RAID1: (self.op.disk_size + self.op.swap_size) * 2,
2815
      # 256 MB are added for drbd metadata, 128MB for each drbd device
2816
      constants.DT_REMOTE_RAID1: self.op.disk_size + self.op.swap_size + 256,
2817
    }
2818

    
2819
    if self.op.disk_template not in req_size_dict:
2820
      raise errors.ProgrammerError("Disk template '%s' size requirement"
2821
                                   " is unknown" %  self.op.disk_template)
2822

    
2823
    req_size = req_size_dict[self.op.disk_template]
2824

    
2825
    for node in nodenames:
2826
      info = nodeinfo.get(node, None)
2827
      if not info:
2828
        raise errors.OpPrereqError("Cannot get current information"
2829
                                   " from node '%s'" % nodeinfo)
2830
      if req_size > info['vg_free']:
2831
        raise errors.OpPrereqError("Not enough disk space on target node %s."
2832
                                   " %d MB available, %d MB required" %
2833
                                   (node, info['vg_free'], req_size))
2834

    
2835
    # os verification
2836
    os_obj = rpc.call_os_get([pnode.name], self.op.os_type)[pnode.name]
2837
    if not isinstance(os_obj, objects.OS):
2838
      raise errors.OpPrereqError("OS '%s' not in supported os list for"
2839
                                 " primary node"  % self.op.os_type)
2840

    
2841
    # instance verification
2842
    hostname1 = utils.LookupHostname(self.op.instance_name)
2843
    if not hostname1:
2844
      raise errors.OpPrereqError("Instance name '%s' not found in dns" %
2845
                                 self.op.instance_name)
2846

    
2847
    self.op.instance_name = instance_name = hostname1.name
2848
    instance_list = self.cfg.GetInstanceList()
2849
    if instance_name in instance_list:
2850
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2851
                                 instance_name)
2852

    
2853
    ip = getattr(self.op, "ip", None)
2854
    if ip is None or ip.lower() == "none":
2855
      inst_ip = None
2856
    elif ip.lower() == "auto":
2857
      inst_ip = hostname1.ip
2858
    else:
2859
      if not utils.IsValidIP(ip):
2860
        raise errors.OpPrereqError("given IP address '%s' doesn't look"
2861
                                   " like a valid IP" % ip)
2862
      inst_ip = ip
2863
    self.inst_ip = inst_ip
2864

    
2865
    command = ["fping", "-q", hostname1.ip]
2866
    result = utils.RunCmd(command)
2867
    if not result.failed:
2868
      raise errors.OpPrereqError("IP %s of instance %s already in use" %
2869
                                 (hostname1.ip, instance_name))
2870

    
2871
    # bridge verification
2872
    bridge = getattr(self.op, "bridge", None)
2873
    if bridge is None:
2874
      self.op.bridge = self.cfg.GetDefBridge()
2875
    else:
2876
      self.op.bridge = bridge
2877

    
2878
    if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
2879
      raise errors.OpPrereqError("target bridge '%s' does not exist on"
2880
                                 " destination node '%s'" %
2881
                                 (self.op.bridge, pnode.name))
2882

    
2883
    if self.op.start:
2884
      self.instance_status = 'up'
2885
    else:
2886
      self.instance_status = 'down'
2887

    
2888
  def Exec(self, feedback_fn):
2889
    """Create and add the instance to the cluster.
2890

2891
    """
2892
    instance = self.op.instance_name
2893
    pnode_name = self.pnode.name
2894

    
2895
    nic = objects.NIC(bridge=self.op.bridge, mac=self.cfg.GenerateMAC())
2896
    if self.inst_ip is not None:
2897
      nic.ip = self.inst_ip
2898

    
2899
    disks = _GenerateDiskTemplate(self.cfg,
2900
                                  self.op.disk_template,
2901
                                  instance, pnode_name,
2902
                                  self.secondaries, self.op.disk_size,
2903
                                  self.op.swap_size)
2904

    
2905
    iobj = objects.Instance(name=instance, os=self.op.os_type,
2906
                            primary_node=pnode_name,
2907
                            memory=self.op.mem_size,
2908
                            vcpus=self.op.vcpus,
2909
                            nics=[nic], disks=disks,
2910
                            disk_template=self.op.disk_template,
2911
                            status=self.instance_status,
2912
                            )
2913

    
2914
    feedback_fn("* creating instance disks...")
2915
    if not _CreateDisks(self.cfg, iobj):
2916
      _RemoveDisks(iobj, self.cfg)
2917
      raise errors.OpExecError("Device creation failed, reverting...")
2918

    
2919
    feedback_fn("adding instance %s to cluster config" % instance)
2920

    
2921
    self.cfg.AddInstance(iobj)
2922

    
2923
    if self.op.wait_for_sync:
2924
      disk_abort = not _WaitForSync(self.cfg, iobj)
2925
    elif iobj.disk_template == constants.DT_REMOTE_RAID1:
2926
      # make sure the disks are not degraded (still sync-ing is ok)
2927
      time.sleep(15)
2928
      feedback_fn("* checking mirrors status")
2929
      disk_abort = not _WaitForSync(self.cfg, iobj, oneshot=True)
2930
    else:
2931
      disk_abort = False
2932

    
2933
    if disk_abort:
2934
      _RemoveDisks(iobj, self.cfg)
2935
      self.cfg.RemoveInstance(iobj.name)
2936
      raise errors.OpExecError("There are some degraded disks for"
2937
                               " this instance")
2938

    
2939
    feedback_fn("creating os for instance %s on node %s" %
2940
                (instance, pnode_name))
2941

    
2942
    if iobj.disk_template != constants.DT_DISKLESS:
2943
      if self.op.mode == constants.INSTANCE_CREATE:
2944
        feedback_fn("* running the instance OS create scripts...")
2945
        if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
2946
          raise errors.OpExecError("could not add os for instance %s"
2947
                                   " on node %s" %
2948
                                   (instance, pnode_name))
2949

    
2950
      elif self.op.mode == constants.INSTANCE_IMPORT:
2951
        feedback_fn("* running the instance OS import scripts...")
2952
        src_node = self.op.src_node
2953
        src_image = self.src_image
2954
        if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
2955
                                                src_node, src_image):
2956
          raise errors.OpExecError("Could not import os for instance"
2957
                                   " %s on node %s" %
2958
                                   (instance, pnode_name))
2959
      else:
2960
        # also checked in the prereq part
2961
        raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
2962
                                     % self.op.mode)
2963

    
2964
    if self.op.start:
2965
      logger.Info("starting instance %s on node %s" % (instance, pnode_name))
2966
      feedback_fn("* starting instance...")
2967
      if not rpc.call_instance_start(pnode_name, iobj, None):
2968
        raise errors.OpExecError("Could not start instance")
2969

    
2970

    
2971
class LUConnectConsole(NoHooksLU):
2972
  """Connect to an instance's console.
2973

2974
  This is somewhat special in that it returns the command line that
2975
  you need to run on the master node in order to connect to the
2976
  console.
2977

2978
  """
2979
  _OP_REQP = ["instance_name"]
2980

    
2981
  def CheckPrereq(self):
2982
    """Check prerequisites.
2983

2984
    This checks that the instance is in the cluster.
2985

2986
    """
2987
    instance = self.cfg.GetInstanceInfo(
2988
      self.cfg.ExpandInstanceName(self.op.instance_name))
2989
    if instance is None:
2990
      raise errors.OpPrereqError("Instance '%s' not known" %
2991
                                 self.op.instance_name)
2992
    self.instance = instance
2993

    
2994
  def Exec(self, feedback_fn):
2995
    """Connect to the console of an instance
2996

2997
    """
2998
    instance = self.instance
2999
    node = instance.primary_node
3000

    
3001
    node_insts = rpc.call_instance_list([node])[node]
3002
    if node_insts is False:
3003
      raise errors.OpExecError("Can't connect to node %s." % node)
3004

    
3005
    if instance.name not in node_insts:
3006
      raise errors.OpExecError("Instance %s is not running." % instance.name)
3007

    
3008
    logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3009

    
3010
    hyper = hypervisor.GetHypervisor()
3011
    console_cmd = hyper.GetShellCommandForConsole(instance.name)
3012
    # build ssh cmdline
3013
    argv = ["ssh", "-q", "-t"]
3014
    argv.extend(ssh.KNOWN_HOSTS_OPTS)
3015
    argv.extend(ssh.BATCH_MODE_OPTS)
3016
    argv.append(node)
3017
    argv.append(console_cmd)
3018
    return "ssh", argv
3019

    
3020

    
3021
class LUAddMDDRBDComponent(LogicalUnit):
3022
  """Adda new mirror member to an instance's disk.
3023

3024
  """
3025
  HPATH = "mirror-add"
3026
  HTYPE = constants.HTYPE_INSTANCE
3027
  _OP_REQP = ["instance_name", "remote_node", "disk_name"]
3028

    
3029
  def BuildHooksEnv(self):
3030
    """Build hooks env.
3031

3032
    This runs on the master, the primary and all the secondaries.
3033

3034
    """
3035
    env = {
3036
      "NEW_SECONDARY": self.op.remote_node,
3037
      "DISK_NAME": self.op.disk_name,
3038
      }
3039
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3040
    nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
3041
          self.op.remote_node,] + list(self.instance.secondary_nodes)
3042
    return env, nl, nl
3043

    
3044
  def CheckPrereq(self):
3045
    """Check prerequisites.
3046

3047
    This checks that the instance is in the cluster.
3048

3049
    """
3050
    instance = self.cfg.GetInstanceInfo(
3051
      self.cfg.ExpandInstanceName(self.op.instance_name))
3052
    if instance is None:
3053
      raise errors.OpPrereqError("Instance '%s' not known" %
3054
                                 self.op.instance_name)
3055
    self.instance = instance
3056

    
3057
    remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
3058
    if remote_node is None:
3059
      raise errors.OpPrereqError("Node '%s' not known" % self.op.remote_node)
3060
    self.remote_node = remote_node
3061

    
3062
    if remote_node == instance.primary_node:
3063
      raise errors.OpPrereqError("The specified node is the primary node of"
3064
                                 " the instance.")
3065

    
3066
    if instance.disk_template != constants.DT_REMOTE_RAID1:
3067
      raise errors.OpPrereqError("Instance's disk layout is not"
3068
                                 " remote_raid1.")
3069
    for disk in instance.disks:
3070
      if disk.iv_name == self.op.disk_name:
3071
        break
3072
    else:
3073
      raise errors.OpPrereqError("Can't find this device ('%s') in the"
3074
                                 " instance." % self.op.disk_name)
3075
    if len(disk.children) > 1:
3076
      raise errors.OpPrereqError("The device already has two slave"
3077
                                 " devices.\n"
3078
                                 "This would create a 3-disk raid1"
3079
                                 " which we don't allow.")
3080
    self.disk = disk
3081

    
3082
  def Exec(self, feedback_fn):
3083
    """Add the mirror component
3084

3085
    """
3086
    disk = self.disk
3087
    instance = self.instance
3088

    
3089
    remote_node = self.remote_node
3090
    lv_names = [".%s_%s" % (disk.iv_name, suf) for suf in ["data", "meta"]]
3091
    names = _GenerateUniqueNames(self.cfg, lv_names)
3092
    new_drbd = _GenerateMDDRBDBranch(self.cfg, instance.primary_node,
3093
                                     remote_node, disk.size, names)
3094

    
3095
    logger.Info("adding new mirror component on secondary")
3096
    #HARDCODE
3097
    if not _CreateBlockDevOnSecondary(self.cfg, remote_node, new_drbd, False,
3098
                                      _GetInstanceInfoText(instance)):
3099
      raise errors.OpExecError("Failed to create new component on secondary"
3100
                               " node %s" % remote_node)
3101

    
3102
    logger.Info("adding new mirror component on primary")
3103
    #HARDCODE
3104
    if not _CreateBlockDevOnPrimary(self.cfg, instance.primary_node, new_drbd,
3105
                                    _GetInstanceInfoText(instance)):
3106
      # remove secondary dev
3107
      self.cfg.SetDiskID(new_drbd, remote_node)
3108
      rpc.call_blockdev_remove(remote_node, new_drbd)
3109
      raise errors.OpExecError("Failed to create volume on primary")
3110

    
3111
    # the device exists now
3112
    # call the primary node to add the mirror to md
3113
    logger.Info("adding new mirror component to md")
3114
    if not rpc.call_blockdev_addchild(instance.primary_node,
3115
                                           disk, new_drbd):
3116
      logger.Error("Can't add mirror compoment to md!")
3117
      self.cfg.SetDiskID(new_drbd, remote_node)
3118
      if not rpc.call_blockdev_remove(remote_node, new_drbd):
3119
        logger.Error("Can't rollback on secondary")
3120
      self.cfg.SetDiskID(new_drbd, instance.primary_node)
3121
      if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3122
        logger.Error("Can't rollback on primary")
3123
      raise errors.OpExecError("Can't add mirror component to md array")
3124

    
3125
    disk.children.append(new_drbd)
3126

    
3127
    self.cfg.AddInstance(instance)
3128

    
3129
    _WaitForSync(self.cfg, instance)
3130

    
3131
    return 0
3132

    
3133

    
3134
class LURemoveMDDRBDComponent(LogicalUnit):
3135
  """Remove a component from a remote_raid1 disk.
3136

3137
  """
3138
  HPATH = "mirror-remove"
3139
  HTYPE = constants.HTYPE_INSTANCE
3140
  _OP_REQP = ["instance_name", "disk_name", "disk_id"]
3141

    
3142
  def BuildHooksEnv(self):
3143
    """Build hooks env.
3144

3145
    This runs on the master, the primary and all the secondaries.
3146

3147
    """
3148
    env = {
3149
      "DISK_NAME": self.op.disk_name,
3150
      "DISK_ID": self.op.disk_id,
3151
      "OLD_SECONDARY": self.old_secondary,
3152
      }
3153
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3154
    nl = [self.sstore.GetMasterNode(),
3155
          self.instance.primary_node] + list(self.instance.secondary_nodes)
3156
    return env, nl, nl
3157

    
3158
  def CheckPrereq(self):
3159
    """Check prerequisites.
3160

3161
    This checks that the instance is in the cluster.
3162

3163
    """
3164
    instance = self.cfg.GetInstanceInfo(
3165
      self.cfg.ExpandInstanceName(self.op.instance_name))
3166
    if instance is None:
3167
      raise errors.OpPrereqError("Instance '%s' not known" %
3168
                                 self.op.instance_name)
3169
    self.instance = instance
3170

    
3171
    if instance.disk_template != constants.DT_REMOTE_RAID1:
3172
      raise errors.OpPrereqError("Instance's disk layout is not"
3173
                                 " remote_raid1.")
3174
    for disk in instance.disks:
3175
      if disk.iv_name == self.op.disk_name:
3176
        break
3177
    else:
3178
      raise errors.OpPrereqError("Can't find this device ('%s') in the"
3179
                                 " instance." % self.op.disk_name)
3180
    for child in disk.children:
3181
      if child.dev_type == "drbd" and child.logical_id[2] == self.op.disk_id:
3182
        break
3183
    else:
3184
      raise errors.OpPrereqError("Can't find the device with this port.")
3185

    
3186
    if len(disk.children) < 2:
3187
      raise errors.OpPrereqError("Cannot remove the last component from"
3188
                                 " a mirror.")
3189
    self.disk = disk
3190
    self.child = child
3191
    if self.child.logical_id[0] == instance.primary_node:
3192
      oid = 1
3193
    else:
3194
      oid = 0
3195
    self.old_secondary = self.child.logical_id[oid]
3196

    
3197
  def Exec(self, feedback_fn):
3198
    """Remove the mirror component
3199

3200
    """
3201
    instance = self.instance
3202
    disk = self.disk
3203
    child = self.child
3204
    logger.Info("remove mirror component")
3205
    self.cfg.SetDiskID(disk, instance.primary_node)
3206
    if not rpc.call_blockdev_removechild(instance.primary_node,
3207
                                              disk, child):
3208
      raise errors.OpExecError("Can't remove child from mirror.")
3209

    
3210
    for node in child.logical_id[:2]:
3211
      self.cfg.SetDiskID(child, node)
3212
      if not rpc.call_blockdev_remove(node, child):
3213
        logger.Error("Warning: failed to remove device from node %s,"
3214
                     " continuing operation." % node)
3215

    
3216
    disk.children.remove(child)
3217
    self.cfg.AddInstance(instance)
3218

    
3219

    
3220
class LUReplaceDisks(LogicalUnit):
3221
  """Replace the disks of an instance.
3222

3223
  """
3224
  HPATH = "mirrors-replace"
3225
  HTYPE = constants.HTYPE_INSTANCE
3226
  _OP_REQP = ["instance_name"]
3227

    
3228
  def BuildHooksEnv(self):
3229
    """Build hooks env.
3230

3231
    This runs on the master, the primary and all the secondaries.
3232

3233
    """
3234
    env = {
3235
      "NEW_SECONDARY": self.op.remote_node,
3236
      "OLD_SECONDARY": self.instance.secondary_nodes[0],
3237
      }
3238
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3239
    nl = [self.sstore.GetMasterNode(),
3240
          self.instance.primary_node] + list(self.instance.secondary_nodes)
3241
    return env, nl, nl
3242

    
3243
  def CheckPrereq(self):
3244
    """Check prerequisites.
3245

3246
    This checks that the instance is in the cluster.
3247

3248
    """
3249
    instance = self.cfg.GetInstanceInfo(
3250
      self.cfg.ExpandInstanceName(self.op.instance_name))
3251
    if instance is None:
3252
      raise errors.OpPrereqError("Instance '%s' not known" %
3253
                                 self.op.instance_name)
3254
    self.instance = instance
3255

    
3256
    if instance.disk_template != constants.DT_REMOTE_RAID1:
3257
      raise errors.OpPrereqError("Instance's disk layout is not"
3258
                                 " remote_raid1.")
3259

    
3260
    if len(instance.secondary_nodes) != 1:
3261
      raise errors.OpPrereqError("The instance has a strange layout,"
3262
                                 " expected one secondary but found %d" %
3263
                                 len(instance.secondary_nodes))
3264

    
3265
    remote_node = getattr(self.op, "remote_node", None)
3266
    if remote_node is None:
3267
      remote_node = instance.secondary_nodes[0]
3268
    else:
3269
      remote_node = self.cfg.ExpandNodeName(remote_node)
3270
      if remote_node is None:
3271
        raise errors.OpPrereqError("Node '%s' not known" %
3272
                                   self.op.remote_node)
3273
    if remote_node == instance.primary_node:
3274
      raise errors.OpPrereqError("The specified node is the primary node of"
3275
                                 " the instance.")
3276
    self.op.remote_node = remote_node
3277

    
3278
  def Exec(self, feedback_fn):
3279
    """Replace the disks of an instance.
3280

3281
    """
3282
    instance = self.instance
3283
    iv_names = {}
3284
    # start of work
3285
    remote_node = self.op.remote_node
3286
    cfg = self.cfg
3287
    for dev in instance.disks:
3288
      size = dev.size
3289
      lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3290
      names = _GenerateUniqueNames(cfg, lv_names)
3291
      new_drbd = _GenerateMDDRBDBranch(cfg, instance.primary_node,
3292
                                       remote_node, size, names)
3293
      iv_names[dev.iv_name] = (dev, dev.children[0], new_drbd)
3294
      logger.Info("adding new mirror component on secondary for %s" %
3295
                  dev.iv_name)
3296
      #HARDCODE
3297
      if not _CreateBlockDevOnSecondary(cfg, remote_node, new_drbd, False,
3298
                                        _GetInstanceInfoText(instance)):
3299
        raise errors.OpExecError("Failed to create new component on"
3300
                                 " secondary node %s\n"
3301
                                 "Full abort, cleanup manually!" %
3302
                                 remote_node)
3303

    
3304
      logger.Info("adding new mirror component on primary")
3305
      #HARDCODE
3306
      if not _CreateBlockDevOnPrimary(cfg, instance.primary_node, new_drbd,
3307
                                      _GetInstanceInfoText(instance)):
3308
        # remove secondary dev
3309
        cfg.SetDiskID(new_drbd, remote_node)
3310
        rpc.call_blockdev_remove(remote_node, new_drbd)
3311
        raise errors.OpExecError("Failed to create volume on primary!\n"
3312
                                 "Full abort, cleanup manually!!")
3313

    
3314
      # the device exists now
3315
      # call the primary node to add the mirror to md
3316
      logger.Info("adding new mirror component to md")
3317
      if not rpc.call_blockdev_addchild(instance.primary_node, dev,
3318
                                        new_drbd):
3319
        logger.Error("Can't add mirror compoment to md!")
3320
        cfg.SetDiskID(new_drbd, remote_node)
3321
        if not rpc.call_blockdev_remove(remote_node, new_drbd):
3322
          logger.Error("Can't rollback on secondary")
3323
        cfg.SetDiskID(new_drbd, instance.primary_node)
3324
        if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3325
          logger.Error("Can't rollback on primary")
3326
        raise errors.OpExecError("Full abort, cleanup manually!!")
3327

    
3328
      dev.children.append(new_drbd)
3329
      cfg.AddInstance(instance)
3330

    
3331
    # this can fail as the old devices are degraded and _WaitForSync
3332
    # does a combined result over all disks, so we don't check its
3333
    # return value
3334
    _WaitForSync(cfg, instance, unlock=True)
3335

    
3336
    # so check manually all the devices
3337
    for name in iv_names:
3338
      dev, child, new_drbd = iv_names[name]
3339
      cfg.SetDiskID(dev, instance.primary_node)
3340
      is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3341
      if is_degr:
3342
        raise errors.OpExecError("MD device %s is degraded!" % name)
3343
      cfg.SetDiskID(new_drbd, instance.primary_node)
3344
      is_degr = rpc.call_blockdev_find(instance.primary_node, new_drbd)[5]
3345
      if is_degr:
3346
        raise errors.OpExecError("New drbd device %s is degraded!" % name)
3347

    
3348
    for name in iv_names:
3349
      dev, child, new_drbd = iv_names[name]
3350
      logger.Info("remove mirror %s component" % name)
3351
      cfg.SetDiskID(dev, instance.primary_node)
3352
      if not rpc.call_blockdev_removechild(instance.primary_node,
3353
                                                dev, child):
3354
        logger.Error("Can't remove child from mirror, aborting"
3355
                     " *this device cleanup*.\nYou need to cleanup manually!!")
3356
        continue
3357

    
3358
      for node in child.logical_id[:2]:
3359
        logger.Info("remove child device on %s" % node)
3360
        cfg.SetDiskID(child, node)
3361
        if not rpc.call_blockdev_remove(node, child):
3362
          logger.Error("Warning: failed to remove device from node %s,"
3363
                       " continuing operation." % node)
3364

    
3365
      dev.children.remove(child)
3366

    
3367
      cfg.AddInstance(instance)
3368

    
3369

    
3370
class LUQueryInstanceData(NoHooksLU):
3371
  """Query runtime instance data.
3372

3373
  """
3374
  _OP_REQP = ["instances"]
3375

    
3376
  def CheckPrereq(self):
3377
    """Check prerequisites.
3378

3379
    This only checks the optional instance list against the existing names.
3380

3381
    """
3382
    if not isinstance(self.op.instances, list):
3383
      raise errors.OpPrereqError("Invalid argument type 'instances'")
3384
    if self.op.instances:
3385
      self.wanted_instances = []
3386
      names = self.op.instances
3387
      for name in names:
3388
        instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
3389
        if instance is None:
3390
          raise errors.OpPrereqError("No such instance name '%s'" % name)
3391
      self.wanted_instances.append(instance)
3392
    else:
3393
      self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
3394
                               in self.cfg.GetInstanceList()]
3395
    return
3396

    
3397

    
3398
  def _ComputeDiskStatus(self, instance, snode, dev):
3399
    """Compute block device status.
3400

3401
    """
3402
    self.cfg.SetDiskID(dev, instance.primary_node)
3403
    dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
3404
    if dev.dev_type == "drbd":
3405
      # we change the snode then (otherwise we use the one passed in)
3406
      if dev.logical_id[0] == instance.primary_node:
3407
        snode = dev.logical_id[1]
3408
      else:
3409
        snode = dev.logical_id[0]
3410

    
3411
    if snode:
3412
      self.cfg.SetDiskID(dev, snode)
3413
      dev_sstatus = rpc.call_blockdev_find(snode, dev)
3414
    else:
3415
      dev_sstatus = None
3416

    
3417
    if dev.children:
3418
      dev_children = [self._ComputeDiskStatus(instance, snode, child)
3419
                      for child in dev.children]
3420
    else:
3421
      dev_children = []
3422

    
3423
    data = {
3424
      "iv_name": dev.iv_name,
3425
      "dev_type": dev.dev_type,
3426
      "logical_id": dev.logical_id,
3427
      "physical_id": dev.physical_id,
3428
      "pstatus": dev_pstatus,
3429
      "sstatus": dev_sstatus,
3430
      "children": dev_children,
3431
      }
3432

    
3433
    return data
3434

    
3435
  def Exec(self, feedback_fn):
3436
    """Gather and return data"""
3437
    result = {}
3438
    for instance in self.wanted_instances:
3439
      remote_info = rpc.call_instance_info(instance.primary_node,
3440
                                                instance.name)
3441
      if remote_info and "state" in remote_info:
3442
        remote_state = "up"
3443
      else:
3444
        remote_state = "down"
3445
      if instance.status == "down":
3446
        config_state = "down"
3447
      else:
3448
        config_state = "up"
3449

    
3450
      disks = [self._ComputeDiskStatus(instance, None, device)
3451
               for device in instance.disks]
3452

    
3453
      idict = {
3454
        "name": instance.name,
3455
        "config_state": config_state,
3456
        "run_state": remote_state,
3457
        "pnode": instance.primary_node,
3458
        "snodes": instance.secondary_nodes,
3459
        "os": instance.os,
3460
        "memory": instance.memory,
3461
        "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
3462
        "disks": disks,
3463
        }
3464

    
3465
      result[instance.name] = idict
3466

    
3467
    return result
3468

    
3469

    
3470
class LUSetInstanceParms(LogicalUnit):
3471
  """Modifies an instances's parameters.
3472

3473
  """
3474
  HPATH = "instance-modify"
3475
  HTYPE = constants.HTYPE_INSTANCE
3476
  _OP_REQP = ["instance_name"]
3477

    
3478
  def BuildHooksEnv(self):
3479
    """Build hooks env.
3480

3481
    This runs on the master, primary and secondaries.
3482

3483
    """
3484
    args = dict()
3485
    if self.mem:
3486
      args['memory'] = self.mem
3487
    if self.vcpus:
3488
      args['vcpus'] = self.vcpus
3489
    if self.do_ip or self.do_bridge:
3490
      if self.do_ip:
3491
        ip = self.ip
3492
      else:
3493
        ip = self.instance.nics[0].ip
3494
      if self.bridge:
3495
        bridge = self.bridge
3496
      else:
3497
        bridge = self.instance.nics[0].bridge
3498
      args['nics'] = [(ip, bridge)]
3499
    env = _BuildInstanceHookEnvByObject(self.instance, override=args)
3500
    nl = [self.sstore.GetMasterNode(),
3501
          self.instance.primary_node] + list(self.instance.secondary_nodes)
3502
    return env, nl, nl
3503

    
3504
  def CheckPrereq(self):
3505
    """Check prerequisites.
3506

3507
    This only checks the instance list against the existing names.
3508

3509
    """
3510
    self.mem = getattr(self.op, "mem", None)
3511
    self.vcpus = getattr(self.op, "vcpus", None)
3512
    self.ip = getattr(self.op, "ip", None)
3513
    self.bridge = getattr(self.op, "bridge", None)
3514
    if [self.mem, self.vcpus, self.ip, self.bridge].count(None) == 4:
3515
      raise errors.OpPrereqError("No changes submitted")
3516
    if self.mem is not None:
3517
      try:
3518
        self.mem = int(self.mem)
3519
      except ValueError, err:
3520
        raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
3521
    if self.vcpus is not None:
3522
      try:
3523
        self.vcpus = int(self.vcpus)
3524
      except ValueError, err:
3525
        raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
3526
    if self.ip is not None:
3527
      self.do_ip = True
3528
      if self.ip.lower() == "none":
3529
        self.ip = None
3530
      else:
3531
        if not utils.IsValidIP(self.ip):
3532
          raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
3533
    else:
3534
      self.do_ip = False
3535
    self.do_bridge = (self.bridge is not None)
3536

    
3537
    instance = self.cfg.GetInstanceInfo(
3538
      self.cfg.ExpandInstanceName(self.op.instance_name))
3539
    if instance is None:
3540
      raise errors.OpPrereqError("No such instance name '%s'" %
3541
                                 self.op.instance_name)
3542
    self.op.instance_name = instance.name
3543
    self.instance = instance
3544
    return
3545

    
3546
  def Exec(self, feedback_fn):
3547
    """Modifies an instance.
3548

3549
    All parameters take effect only at the next restart of the instance.
3550
    """
3551
    result = []
3552
    instance = self.instance
3553
    if self.mem:
3554
      instance.memory = self.mem
3555
      result.append(("mem", self.mem))
3556
    if self.vcpus:
3557
      instance.vcpus = self.vcpus
3558
      result.append(("vcpus",  self.vcpus))
3559
    if self.do_ip:
3560
      instance.nics[0].ip = self.ip
3561
      result.append(("ip", self.ip))
3562
    if self.bridge:
3563
      instance.nics[0].bridge = self.bridge
3564
      result.append(("bridge", self.bridge))
3565

    
3566
    self.cfg.AddInstance(instance)
3567

    
3568
    return result
3569

    
3570

    
3571
class LUQueryExports(NoHooksLU):
3572
  """Query the exports list
3573

3574
  """
3575
  _OP_REQP = []
3576

    
3577
  def CheckPrereq(self):
3578
    """Check that the nodelist contains only existing nodes.
3579

3580
    """
3581
    self.nodes = _GetWantedNodes(self, getattr(self.op, "nodes", None))
3582

    
3583
  def Exec(self, feedback_fn):
3584
    """Compute the list of all the exported system images.
3585

3586
    Returns:
3587
      a dictionary with the structure node->(export-list)
3588
      where export-list is a list of the instances exported on
3589
      that node.
3590

3591
    """
3592
    return rpc.call_export_list(self.nodes)
3593

    
3594

    
3595
class LUExportInstance(LogicalUnit):
3596
  """Export an instance to an image in the cluster.
3597

3598
  """
3599
  HPATH = "instance-export"
3600
  HTYPE = constants.HTYPE_INSTANCE
3601
  _OP_REQP = ["instance_name", "target_node", "shutdown"]
3602

    
3603
  def BuildHooksEnv(self):
3604
    """Build hooks env.
3605

3606
    This will run on the master, primary node and target node.
3607

3608
    """
3609
    env = {
3610
      "EXPORT_NODE": self.op.target_node,
3611
      "EXPORT_DO_SHUTDOWN": self.op.shutdown,
3612
      }
3613
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3614
    nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
3615
          self.op.target_node]
3616
    return env, nl, nl
3617

    
3618
  def CheckPrereq(self):
3619
    """Check prerequisites.
3620

3621
    This checks that the instance name is a valid one.
3622

3623
    """
3624
    instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
3625
    self.instance = self.cfg.GetInstanceInfo(instance_name)
3626
    if self.instance is None:
3627
      raise errors.OpPrereqError("Instance '%s' not found" %
3628
                                 self.op.instance_name)
3629

    
3630
    # node verification
3631
    dst_node_short = self.cfg.ExpandNodeName(self.op.target_node)
3632
    self.dst_node = self.cfg.GetNodeInfo(dst_node_short)
3633

    
3634
    if self.dst_node is None:
3635
      raise errors.OpPrereqError("Destination node '%s' is unknown." %
3636
                                 self.op.target_node)
3637
    self.op.target_node = self.dst_node.name
3638

    
3639
  def Exec(self, feedback_fn):
3640
    """Export an instance to an image in the cluster.
3641

3642
    """
3643
    instance = self.instance
3644
    dst_node = self.dst_node
3645
    src_node = instance.primary_node
3646
    # shutdown the instance, unless requested not to do so
3647
    if self.op.shutdown:
3648
      op = opcodes.OpShutdownInstance(instance_name=instance.name)
3649
      self.processor.ChainOpCode(op, feedback_fn)
3650

    
3651
    vgname = self.cfg.GetVGName()
3652

    
3653
    snap_disks = []
3654

    
3655
    try:
3656
      for disk in instance.disks:
3657
        if disk.iv_name == "sda":
3658
          # new_dev_name will be a snapshot of an lvm leaf of the one we passed
3659
          new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
3660

    
3661
          if not new_dev_name:
3662
            logger.Error("could not snapshot block device %s on node %s" %
3663
                         (disk.logical_id[1], src_node))
3664
          else:
3665
            new_dev = objects.Disk(dev_type="lvm", size=disk.size,
3666
                                      logical_id=(vgname, new_dev_name),
3667
                                      physical_id=(vgname, new_dev_name),
3668
                                      iv_name=disk.iv_name)
3669
            snap_disks.append(new_dev)
3670

    
3671
    finally:
3672
      if self.op.shutdown:
3673
        op = opcodes.OpStartupInstance(instance_name=instance.name,
3674
                                       force=False)
3675
        self.processor.ChainOpCode(op, feedback_fn)
3676

    
3677
    # TODO: check for size
3678

    
3679
    for dev in snap_disks:
3680
      if not rpc.call_snapshot_export(src_node, dev, dst_node.name,
3681
                                           instance):
3682
        logger.Error("could not export block device %s from node"
3683
                     " %s to node %s" %
3684
                     (dev.logical_id[1], src_node, dst_node.name))
3685
      if not rpc.call_blockdev_remove(src_node, dev):
3686
        logger.Error("could not remove snapshot block device %s from"
3687
                     " node %s" % (dev.logical_id[1], src_node))
3688

    
3689
    if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
3690
      logger.Error("could not finalize export for instance %s on node %s" %
3691
                   (instance.name, dst_node.name))
3692

    
3693
    nodelist = self.cfg.GetNodeList()
3694
    nodelist.remove(dst_node.name)
3695

    
3696
    # on one-node clusters nodelist will be empty after the removal
3697
    # if we proceed the backup would be removed because OpQueryExports
3698
    # substitutes an empty list with the full cluster node list.
3699
    if nodelist:
3700
      op = opcodes.OpQueryExports(nodes=nodelist)
3701
      exportlist = self.processor.ChainOpCode(op, feedback_fn)
3702
      for node in exportlist:
3703
        if instance.name in exportlist[node]:
3704
          if not rpc.call_export_remove(node, instance.name):
3705
            logger.Error("could not remove older export for instance %s"
3706
                         " on node %s" % (instance.name, node))
3707

    
3708

    
3709
class TagsLU(NoHooksLU):
3710
  """Generic tags LU.
3711

3712
  This is an abstract class which is the parent of all the other tags LUs.
3713

3714
  """
3715
  def CheckPrereq(self):
3716
    """Check prerequisites.
3717

3718
    """
3719
    if self.op.kind == constants.TAG_CLUSTER:
3720
      self.target = self.cfg.GetClusterInfo()
3721
    elif self.op.kind == constants.TAG_NODE:
3722
      name = self.cfg.ExpandNodeName(self.op.name)
3723
      if name is None:
3724
        raise errors.OpPrereqError("Invalid node name (%s)" %
3725
                                   (self.op.name,))
3726
      self.op.name = name
3727
      self.target = self.cfg.GetNodeInfo(name)
3728
    elif self.op.kind == constants.TAG_INSTANCE:
3729
      name = self.cfg.ExpandInstanceName(name)
3730
      if name is None:
3731
        raise errors.OpPrereqError("Invalid instance name (%s)" %
3732
                                   (self.op.name,))
3733
      self.op.name = name
3734
      self.target = self.cfg.GetInstanceInfo(name)
3735
    else:
3736
      raise errors.OpPrereqError("Wrong tag type requested (%s)" %
3737
                                 str(self.op.kind))
3738

    
3739

    
3740
class LUGetTags(TagsLU):
3741
  """Returns the tags of a given object.
3742

3743
  """
3744
  _OP_REQP = ["kind", "name"]
3745

    
3746
  def Exec(self, feedback_fn):
3747
    """Returns the tag list.
3748

3749
    """
3750
    return self.target.GetTags()
3751

    
3752

    
3753
class LUAddTag(TagsLU):
3754
  """Sets a tag on a given object.
3755

3756
  """
3757
  _OP_REQP = ["kind", "name", "tag"]
3758

    
3759
  def CheckPrereq(self):
3760
    """Check prerequisites.
3761

3762
    This checks the type and length of the tag name and value.
3763

3764
    """
3765
    TagsLU.CheckPrereq(self)
3766
    objects.TaggableObject.ValidateTag(self.op.tag)
3767

    
3768
  def Exec(self, feedback_fn):
3769
    """Sets the tag.
3770

3771
    """
3772
    try:
3773
      self.target.AddTag(self.op.tag)
3774
    except errors.TagError, err:
3775
      raise errors.OpExecError("Error while setting tag: %s" % str(err))
3776
    try:
3777
      self.cfg.Update(self.target)
3778
    except errors.ConfigurationError:
3779
      raise errors.OpRetryError("There has been a modification to the"
3780
                                " config file and the operation has been"
3781
                                " aborted. Please retry.")
3782

    
3783

    
3784
class LUDelTag(TagsLU):
3785
  """Delete a tag from a given object.
3786

3787
  """
3788
  _OP_REQP = ["kind", "name", "tag"]
3789

    
3790
  def CheckPrereq(self):
3791
    """Check prerequisites.
3792

3793
    This checks that we have the given tag.
3794

3795
    """
3796
    TagsLU.CheckPrereq(self)
3797
    objects.TaggableObject.ValidateTag(self.op.tag)
3798
    if self.op.tag not in self.target.GetTags():
3799
      raise errors.OpPrereqError("Tag not found")
3800

    
3801
  def Exec(self, feedback_fn):
3802
    """Remove the tag from the object.
3803

3804
    """
3805
    self.target.RemoveTag(self.op.tag)
3806
    try:
3807
      self.cfg.Update(self.target)
3808
    except errors.ConfigurationError:
3809
      raise errors.OpRetryError("There has been a modification to the"
3810
                                " config file and the operation has been"
3811
                                " aborted. Please retry.")