Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ f27302fa

History | View | Annotate | Download (120.4 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0613,W0201
25

    
26
import os
27
import os.path
28
import sha
29
import socket
30
import time
31
import tempfile
32
import re
33
import platform
34

    
35
from ganeti import rpc
36
from ganeti import ssh
37
from ganeti import logger
38
from ganeti import utils
39
from ganeti import errors
40
from ganeti import hypervisor
41
from ganeti import config
42
from ganeti import constants
43
from ganeti import objects
44
from ganeti import opcodes
45
from ganeti import ssconf
46

    
47
class LogicalUnit(object):
48
  """Logical Unit base class.
49

50
  Subclasses must follow these rules:
51
    - implement CheckPrereq which also fills in the opcode instance
52
      with all the fields (even if as None)
53
    - implement Exec
54
    - implement BuildHooksEnv
55
    - redefine HPATH and HTYPE
56
    - optionally redefine their run requirements (REQ_CLUSTER,
57
      REQ_MASTER); note that all commands require root permissions
58

59
  """
60
  HPATH = None
61
  HTYPE = None
62
  _OP_REQP = []
63
  REQ_CLUSTER = True
64
  REQ_MASTER = True
65

    
66
  def __init__(self, processor, op, cfg, sstore):
67
    """Constructor for LogicalUnit.
68

69
    This needs to be overriden in derived classes in order to check op
70
    validity.
71

72
    """
73
    self.processor = processor
74
    self.op = op
75
    self.cfg = cfg
76
    self.sstore = sstore
77
    for attr_name in self._OP_REQP:
78
      attr_val = getattr(op, attr_name, None)
79
      if attr_val is None:
80
        raise errors.OpPrereqError("Required parameter '%s' missing" %
81
                                   attr_name)
82
    if self.REQ_CLUSTER:
83
      if not cfg.IsCluster():
84
        raise errors.OpPrereqError("Cluster not initialized yet,"
85
                                   " use 'gnt-cluster init' first.")
86
      if self.REQ_MASTER:
87
        master = sstore.GetMasterNode()
88
        if master != utils.HostInfo().name:
89
          raise errors.OpPrereqError("Commands must be run on the master"
90
                                     " node %s" % master)
91

    
92
  def CheckPrereq(self):
93
    """Check prerequisites for this LU.
94

95
    This method should check that the prerequisites for the execution
96
    of this LU are fulfilled. It can do internode communication, but
97
    it should be idempotent - no cluster or system changes are
98
    allowed.
99

100
    The method should raise errors.OpPrereqError in case something is
101
    not fulfilled. Its return value is ignored.
102

103
    This method should also update all the parameters of the opcode to
104
    their canonical form; e.g. a short node name must be fully
105
    expanded after this method has successfully completed (so that
106
    hooks, logging, etc. work correctly).
107

108
    """
109
    raise NotImplementedError
110

    
111
  def Exec(self, feedback_fn):
112
    """Execute the LU.
113

114
    This method should implement the actual work. It should raise
115
    errors.OpExecError for failures that are somewhat dealt with in
116
    code, or expected.
117

118
    """
119
    raise NotImplementedError
120

    
121
  def BuildHooksEnv(self):
122
    """Build hooks environment for this LU.
123

124
    This method should return a three-node tuple consisting of: a dict
125
    containing the environment that will be used for running the
126
    specific hook for this LU, a list of node names on which the hook
127
    should run before the execution, and a list of node names on which
128
    the hook should run after the execution.
129

130
    The keys of the dict must not have 'GANETI_' prefixed as this will
131
    be handled in the hooks runner. Also note additional keys will be
132
    added by the hooks runner. If the LU doesn't define any
133
    environment, an empty dict (and not None) should be returned.
134

135
    As for the node lists, the master should not be included in the
136
    them, as it will be added by the hooks runner in case this LU
137
    requires a cluster to run on (otherwise we don't have a node
138
    list). No nodes should be returned as an empty list (and not
139
    None).
140

141
    Note that if the HPATH for a LU class is None, this function will
142
    not be called.
143

144
    """
145
    raise NotImplementedError
146

    
147

    
148
class NoHooksLU(LogicalUnit):
149
  """Simple LU which runs no hooks.
150

151
  This LU is intended as a parent for other LogicalUnits which will
152
  run no hooks, in order to reduce duplicate code.
153

154
  """
155
  HPATH = None
156
  HTYPE = None
157

    
158
  def BuildHooksEnv(self):
159
    """Build hooks env.
160

161
    This is a no-op, since we don't run hooks.
162

163
    """
164
    return
165

    
166

    
167
def _GetWantedNodes(lu, nodes):
168
  """Returns list of checked and expanded node names.
169

170
  Args:
171
    nodes: List of nodes (strings) or None for all
172

173
  """
174
  if not isinstance(nodes, list):
175
    raise errors.OpPrereqError("Invalid argument type 'nodes'")
176

    
177
  if nodes:
178
    wanted = []
179

    
180
    for name in nodes:
181
      node = lu.cfg.ExpandNodeName(name)
182
      if node is None:
183
        raise errors.OpPrereqError("No such node name '%s'" % name)
184
      wanted.append(node)
185

    
186
  else:
187
    wanted = lu.cfg.GetNodeList()
188
  return utils.NiceSort(wanted)
189

    
190

    
191
def _GetWantedInstances(lu, instances):
192
  """Returns list of checked and expanded instance names.
193

194
  Args:
195
    instances: List of instances (strings) or None for all
196

197
  """
198
  if not isinstance(instances, list):
199
    raise errors.OpPrereqError("Invalid argument type 'instances'")
200

    
201
  if instances:
202
    wanted = []
203

    
204
    for name in instances:
205
      instance = lu.cfg.ExpandInstanceName(name)
206
      if instance is None:
207
        raise errors.OpPrereqError("No such instance name '%s'" % name)
208
      wanted.append(instance)
209

    
210
  else:
211
    wanted = lu.cfg.GetInstanceList()
212
  return utils.NiceSort(wanted)
213

    
214

    
215
def _CheckOutputFields(static, dynamic, selected):
216
  """Checks whether all selected fields are valid.
217

218
  Args:
219
    static: Static fields
220
    dynamic: Dynamic fields
221

222
  """
223
  static_fields = frozenset(static)
224
  dynamic_fields = frozenset(dynamic)
225

    
226
  all_fields = static_fields | dynamic_fields
227

    
228
  if not all_fields.issuperset(selected):
229
    raise errors.OpPrereqError("Unknown output fields selected: %s"
230
                               % ",".join(frozenset(selected).
231
                                          difference(all_fields)))
232

    
233

    
234
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
235
                          memory, vcpus, nics):
236
  """Builds instance related env variables for hooks from single variables.
237

238
  Args:
239
    secondary_nodes: List of secondary nodes as strings
240
  """
241
  env = {
242
    "INSTANCE_NAME": name,
243
    "INSTANCE_PRIMARY": primary_node,
244
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
245
    "INSTANCE_OS_TYPE": os_type,
246
    "INSTANCE_STATUS": status,
247
    "INSTANCE_MEMORY": memory,
248
    "INSTANCE_VCPUS": vcpus,
249
  }
250

    
251
  if nics:
252
    nic_count = len(nics)
253
    for idx, (ip, bridge) in enumerate(nics):
254
      if ip is None:
255
        ip = ""
256
      env["INSTANCE_NIC%d_IP" % idx] = ip
257
      env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
258
  else:
259
    nic_count = 0
260

    
261
  env["INSTANCE_NIC_COUNT"] = nic_count
262

    
263
  return env
264

    
265

    
266
def _BuildInstanceHookEnvByObject(instance, override=None):
267
  """Builds instance related env variables for hooks from an object.
268

269
  Args:
270
    instance: objects.Instance object of instance
271
    override: dict of values to override
272
  """
273
  args = {
274
    'name': instance.name,
275
    'primary_node': instance.primary_node,
276
    'secondary_nodes': instance.secondary_nodes,
277
    'os_type': instance.os,
278
    'status': instance.os,
279
    'memory': instance.memory,
280
    'vcpus': instance.vcpus,
281
    'nics': [(nic.ip, nic.bridge) for nic in instance.nics],
282
  }
283
  if override:
284
    args.update(override)
285
  return _BuildInstanceHookEnv(**args)
286

    
287

    
288
def _UpdateEtcHosts(fullnode, ip):
289
  """Ensure a node has a correct entry in /etc/hosts.
290

291
  Args:
292
    fullnode - Fully qualified domain name of host. (str)
293
    ip       - IPv4 address of host (str)
294

295
  """
296
  node = fullnode.split(".", 1)[0]
297

    
298
  f = open('/etc/hosts', 'r+')
299

    
300
  inthere = False
301

    
302
  save_lines = []
303
  add_lines = []
304
  removed = False
305

    
306
  while True:
307
    rawline = f.readline()
308

    
309
    if not rawline:
310
      # End of file
311
      break
312

    
313
    line = rawline.split('\n')[0]
314

    
315
    # Strip off comments
316
    line = line.split('#')[0]
317

    
318
    if not line:
319
      # Entire line was comment, skip
320
      save_lines.append(rawline)
321
      continue
322

    
323
    fields = line.split()
324

    
325
    haveall = True
326
    havesome = False
327
    for spec in [ ip, fullnode, node ]:
328
      if spec not in fields:
329
        haveall = False
330
      if spec in fields:
331
        havesome = True
332

    
333
    if haveall:
334
      inthere = True
335
      save_lines.append(rawline)
336
      continue
337

    
338
    if havesome and not haveall:
339
      # Line (old, or manual?) which is missing some.  Remove.
340
      removed = True
341
      continue
342

    
343
    save_lines.append(rawline)
344

    
345
  if not inthere:
346
    add_lines.append('%s\t%s %s\n' % (ip, fullnode, node))
347

    
348
  if removed:
349
    if add_lines:
350
      save_lines = save_lines + add_lines
351

    
352
    # We removed a line, write a new file and replace old.
353
    fd, tmpname = tempfile.mkstemp('tmp', 'hosts_', '/etc')
354
    newfile = os.fdopen(fd, 'w')
355
    newfile.write(''.join(save_lines))
356
    newfile.close()
357
    os.rename(tmpname, '/etc/hosts')
358

    
359
  elif add_lines:
360
    # Simply appending a new line will do the trick.
361
    f.seek(0, 2)
362
    for add in add_lines:
363
      f.write(add)
364

    
365
  f.close()
366

    
367

    
368
def _UpdateKnownHosts(fullnode, ip, pubkey):
369
  """Ensure a node has a correct known_hosts entry.
370

371
  Args:
372
    fullnode - Fully qualified domain name of host. (str)
373
    ip       - IPv4 address of host (str)
374
    pubkey   - the public key of the cluster
375

376
  """
377
  if os.path.exists(constants.SSH_KNOWN_HOSTS_FILE):
378
    f = open(constants.SSH_KNOWN_HOSTS_FILE, 'r+')
379
  else:
380
    f = open(constants.SSH_KNOWN_HOSTS_FILE, 'w+')
381

    
382
  inthere = False
383

    
384
  save_lines = []
385
  add_lines = []
386
  removed = False
387

    
388
  while True:
389
    rawline = f.readline()
390
    logger.Debug('read %s' % (repr(rawline),))
391

    
392
    if not rawline:
393
      # End of file
394
      break
395

    
396
    line = rawline.split('\n')[0]
397

    
398
    parts = line.split(' ')
399
    fields = parts[0].split(',')
400
    key = parts[2]
401

    
402
    haveall = True
403
    havesome = False
404
    for spec in [ ip, fullnode ]:
405
      if spec not in fields:
406
        haveall = False
407
      if spec in fields:
408
        havesome = True
409

    
410
    logger.Debug("key, pubkey = %s." % (repr((key, pubkey)),))
411
    if haveall and key == pubkey:
412
      inthere = True
413
      save_lines.append(rawline)
414
      logger.Debug("Keeping known_hosts '%s'." % (repr(rawline),))
415
      continue
416

    
417
    if havesome and (not haveall or key != pubkey):
418
      removed = True
419
      logger.Debug("Discarding known_hosts '%s'." % (repr(rawline),))
420
      continue
421

    
422
    save_lines.append(rawline)
423

    
424
  if not inthere:
425
    add_lines.append('%s,%s ssh-rsa %s\n' % (fullnode, ip, pubkey))
426
    logger.Debug("Adding known_hosts '%s'." % (repr(add_lines[-1]),))
427

    
428
  if removed:
429
    save_lines = save_lines + add_lines
430

    
431
    # Write a new file and replace old.
432
    fd, tmpname = tempfile.mkstemp('.tmp', 'known_hosts.',
433
                                   constants.DATA_DIR)
434
    newfile = os.fdopen(fd, 'w')
435
    try:
436
      newfile.write(''.join(save_lines))
437
    finally:
438
      newfile.close()
439
    logger.Debug("Wrote new known_hosts.")
440
    os.rename(tmpname, constants.SSH_KNOWN_HOSTS_FILE)
441

    
442
  elif add_lines:
443
    # Simply appending a new line will do the trick.
444
    f.seek(0, 2)
445
    for add in add_lines:
446
      f.write(add)
447

    
448
  f.close()
449

    
450

    
451
def _HasValidVG(vglist, vgname):
452
  """Checks if the volume group list is valid.
453

454
  A non-None return value means there's an error, and the return value
455
  is the error message.
456

457
  """
458
  vgsize = vglist.get(vgname, None)
459
  if vgsize is None:
460
    return "volume group '%s' missing" % vgname
461
  elif vgsize < 20480:
462
    return ("volume group '%s' too small (20480MiB required, %dMib found)" %
463
            (vgname, vgsize))
464
  return None
465

    
466

    
467
def _InitSSHSetup(node):
468
  """Setup the SSH configuration for the cluster.
469

470

471
  This generates a dsa keypair for root, adds the pub key to the
472
  permitted hosts and adds the hostkey to its own known hosts.
473

474
  Args:
475
    node: the name of this host as a fqdn
476

477
  """
478
  if os.path.exists('/root/.ssh/id_dsa'):
479
    utils.CreateBackup('/root/.ssh/id_dsa')
480
  if os.path.exists('/root/.ssh/id_dsa.pub'):
481
    utils.CreateBackup('/root/.ssh/id_dsa.pub')
482

    
483
  utils.RemoveFile('/root/.ssh/id_dsa')
484
  utils.RemoveFile('/root/.ssh/id_dsa.pub')
485

    
486
  result = utils.RunCmd(["ssh-keygen", "-t", "dsa",
487
                         "-f", "/root/.ssh/id_dsa",
488
                         "-q", "-N", ""])
489
  if result.failed:
490
    raise errors.OpExecError("Could not generate ssh keypair, error %s" %
491
                             result.output)
492

    
493
  f = open('/root/.ssh/id_dsa.pub', 'r')
494
  try:
495
    utils.AddAuthorizedKey('/root/.ssh/authorized_keys', f.read(8192))
496
  finally:
497
    f.close()
498

    
499

    
500
def _InitGanetiServerSetup(ss):
501
  """Setup the necessary configuration for the initial node daemon.
502

503
  This creates the nodepass file containing the shared password for
504
  the cluster and also generates the SSL certificate.
505

506
  """
507
  # Create pseudo random password
508
  randpass = sha.new(os.urandom(64)).hexdigest()
509
  # and write it into sstore
510
  ss.SetKey(ss.SS_NODED_PASS, randpass)
511

    
512
  result = utils.RunCmd(["openssl", "req", "-new", "-newkey", "rsa:1024",
513
                         "-days", str(365*5), "-nodes", "-x509",
514
                         "-keyout", constants.SSL_CERT_FILE,
515
                         "-out", constants.SSL_CERT_FILE, "-batch"])
516
  if result.failed:
517
    raise errors.OpExecError("could not generate server ssl cert, command"
518
                             " %s had exitcode %s and error message %s" %
519
                             (result.cmd, result.exit_code, result.output))
520

    
521
  os.chmod(constants.SSL_CERT_FILE, 0400)
522

    
523
  result = utils.RunCmd([constants.NODE_INITD_SCRIPT, "restart"])
524

    
525
  if result.failed:
526
    raise errors.OpExecError("Could not start the node daemon, command %s"
527
                             " had exitcode %s and error %s" %
528
                             (result.cmd, result.exit_code, result.output))
529

    
530

    
531
class LUInitCluster(LogicalUnit):
532
  """Initialise the cluster.
533

534
  """
535
  HPATH = "cluster-init"
536
  HTYPE = constants.HTYPE_CLUSTER
537
  _OP_REQP = ["cluster_name", "hypervisor_type", "vg_name", "mac_prefix",
538
              "def_bridge", "master_netdev"]
539
  REQ_CLUSTER = False
540

    
541
  def BuildHooksEnv(self):
542
    """Build hooks env.
543

544
    Notes: Since we don't require a cluster, we must manually add
545
    ourselves in the post-run node list.
546

547
    """
548
    env = {
549
      "CLUSTER": self.op.cluster_name,
550
      "MASTER": self.hostname.name,
551
      }
552
    return env, [], [self.hostname.name]
553

    
554
  def CheckPrereq(self):
555
    """Verify that the passed name is a valid one.
556

557
    """
558
    if config.ConfigWriter.IsCluster():
559
      raise errors.OpPrereqError("Cluster is already initialised")
560

    
561
    self.hostname = hostname = utils.HostInfo()
562

    
563
    if hostname.ip.startswith("127."):
564
      raise errors.OpPrereqError("This host's IP resolves to the private"
565
                                 " range (%s). Please fix DNS or /etc/hosts." %
566
                                 (hostname.ip,))
567

    
568
    self.clustername = clustername = utils.HostInfo(self.op.cluster_name)
569

    
570
    result = utils.RunCmd(["fping", "-S127.0.0.1", "-q", hostname.ip])
571
    if result.failed:
572
      raise errors.OpPrereqError("Inconsistency: this host's name resolves"
573
                                 " to %s,\nbut this ip address does not"
574
                                 " belong to this host."
575
                                 " Aborting." % hostname.ip)
576

    
577
    secondary_ip = getattr(self.op, "secondary_ip", None)
578
    if secondary_ip and not utils.IsValidIP(secondary_ip):
579
      raise errors.OpPrereqError("Invalid secondary ip given")
580
    if secondary_ip and secondary_ip != hostname.ip:
581
      result = utils.RunCmd(["fping", "-S127.0.0.1", "-q", secondary_ip])
582
      if result.failed:
583
        raise errors.OpPrereqError("You gave %s as secondary IP,\n"
584
                                   "but it does not belong to this host." %
585
                                   secondary_ip)
586
    self.secondary_ip = secondary_ip
587

    
588
    # checks presence of the volume group given
589
    vgstatus = _HasValidVG(utils.ListVolumeGroups(), self.op.vg_name)
590

    
591
    if vgstatus:
592
      raise errors.OpPrereqError("Error: %s" % vgstatus)
593

    
594
    if not re.match("^[0-9a-z]{2}:[0-9a-z]{2}:[0-9a-z]{2}$",
595
                    self.op.mac_prefix):
596
      raise errors.OpPrereqError("Invalid mac prefix given '%s'" %
597
                                 self.op.mac_prefix)
598

    
599
    if self.op.hypervisor_type not in hypervisor.VALID_HTYPES:
600
      raise errors.OpPrereqError("Invalid hypervisor type given '%s'" %
601
                                 self.op.hypervisor_type)
602

    
603
    result = utils.RunCmd(["ip", "link", "show", "dev", self.op.master_netdev])
604
    if result.failed:
605
      raise errors.OpPrereqError("Invalid master netdev given (%s): '%s'" %
606
                                 (self.op.master_netdev,
607
                                  result.output.strip()))
608

    
609
  def Exec(self, feedback_fn):
610
    """Initialize the cluster.
611

612
    """
613
    clustername = self.clustername
614
    hostname = self.hostname
615

    
616
    # set up the simple store
617
    ss = ssconf.SimpleStore()
618
    ss.SetKey(ss.SS_HYPERVISOR, self.op.hypervisor_type)
619
    ss.SetKey(ss.SS_MASTER_NODE, hostname.name)
620
    ss.SetKey(ss.SS_MASTER_IP, clustername.ip)
621
    ss.SetKey(ss.SS_MASTER_NETDEV, self.op.master_netdev)
622
    ss.SetKey(ss.SS_CLUSTER_NAME, clustername.name)
623

    
624
    # set up the inter-node password and certificate
625
    _InitGanetiServerSetup(ss)
626

    
627
    # start the master ip
628
    rpc.call_node_start_master(hostname.name)
629

    
630
    # set up ssh config and /etc/hosts
631
    f = open('/etc/ssh/ssh_host_rsa_key.pub', 'r')
632
    try:
633
      sshline = f.read()
634
    finally:
635
      f.close()
636
    sshkey = sshline.split(" ")[1]
637

    
638
    _UpdateEtcHosts(hostname.name, hostname.ip)
639

    
640
    _UpdateKnownHosts(hostname.name, hostname.ip, sshkey)
641

    
642
    _InitSSHSetup(hostname.name)
643

    
644
    # init of cluster config file
645
    cfgw = config.ConfigWriter()
646
    cfgw.InitConfig(hostname.name, hostname.ip, self.secondary_ip,
647
                    sshkey, self.op.mac_prefix,
648
                    self.op.vg_name, self.op.def_bridge)
649

    
650

    
651
class LUDestroyCluster(NoHooksLU):
652
  """Logical unit for destroying the cluster.
653

654
  """
655
  _OP_REQP = []
656

    
657
  def CheckPrereq(self):
658
    """Check prerequisites.
659

660
    This checks whether the cluster is empty.
661

662
    Any errors are signalled by raising errors.OpPrereqError.
663

664
    """
665
    master = self.sstore.GetMasterNode()
666

    
667
    nodelist = self.cfg.GetNodeList()
668
    if len(nodelist) != 1 or nodelist[0] != master:
669
      raise errors.OpPrereqError("There are still %d node(s) in"
670
                                 " this cluster." % (len(nodelist) - 1))
671
    instancelist = self.cfg.GetInstanceList()
672
    if instancelist:
673
      raise errors.OpPrereqError("There are still %d instance(s) in"
674
                                 " this cluster." % len(instancelist))
675

    
676
  def Exec(self, feedback_fn):
677
    """Destroys the cluster.
678

679
    """
680
    utils.CreateBackup('/root/.ssh/id_dsa')
681
    utils.CreateBackup('/root/.ssh/id_dsa.pub')
682
    rpc.call_node_leave_cluster(self.sstore.GetMasterNode())
683

    
684

    
685
class LUVerifyCluster(NoHooksLU):
686
  """Verifies the cluster status.
687

688
  """
689
  _OP_REQP = []
690

    
691
  def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
692
                  remote_version, feedback_fn):
693
    """Run multiple tests against a node.
694

695
    Test list:
696
      - compares ganeti version
697
      - checks vg existance and size > 20G
698
      - checks config file checksum
699
      - checks ssh to other nodes
700

701
    Args:
702
      node: name of the node to check
703
      file_list: required list of files
704
      local_cksum: dictionary of local files and their checksums
705

706
    """
707
    # compares ganeti version
708
    local_version = constants.PROTOCOL_VERSION
709
    if not remote_version:
710
      feedback_fn(" - ERROR: connection to %s failed" % (node))
711
      return True
712

    
713
    if local_version != remote_version:
714
      feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
715
                      (local_version, node, remote_version))
716
      return True
717

    
718
    # checks vg existance and size > 20G
719

    
720
    bad = False
721
    if not vglist:
722
      feedback_fn("  - ERROR: unable to check volume groups on node %s." %
723
                      (node,))
724
      bad = True
725
    else:
726
      vgstatus = _HasValidVG(vglist, self.cfg.GetVGName())
727
      if vgstatus:
728
        feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
729
        bad = True
730

    
731
    # checks config file checksum
732
    # checks ssh to any
733

    
734
    if 'filelist' not in node_result:
735
      bad = True
736
      feedback_fn("  - ERROR: node hasn't returned file checksum data")
737
    else:
738
      remote_cksum = node_result['filelist']
739
      for file_name in file_list:
740
        if file_name not in remote_cksum:
741
          bad = True
742
          feedback_fn("  - ERROR: file '%s' missing" % file_name)
743
        elif remote_cksum[file_name] != local_cksum[file_name]:
744
          bad = True
745
          feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
746

    
747
    if 'nodelist' not in node_result:
748
      bad = True
749
      feedback_fn("  - ERROR: node hasn't returned node connectivity data")
750
    else:
751
      if node_result['nodelist']:
752
        bad = True
753
        for node in node_result['nodelist']:
754
          feedback_fn("  - ERROR: communication with node '%s': %s" %
755
                          (node, node_result['nodelist'][node]))
756
    hyp_result = node_result.get('hypervisor', None)
757
    if hyp_result is not None:
758
      feedback_fn("  - ERROR: hypervisor verify failure: '%s'" % hyp_result)
759
    return bad
760

    
761
  def _VerifyInstance(self, instance, node_vol_is, node_instance, feedback_fn):
762
    """Verify an instance.
763

764
    This function checks to see if the required block devices are
765
    available on the instance's node.
766

767
    """
768
    bad = False
769

    
770
    instancelist = self.cfg.GetInstanceList()
771
    if not instance in instancelist:
772
      feedback_fn("  - ERROR: instance %s not in instance list %s" %
773
                      (instance, instancelist))
774
      bad = True
775

    
776
    instanceconfig = self.cfg.GetInstanceInfo(instance)
777
    node_current = instanceconfig.primary_node
778

    
779
    node_vol_should = {}
780
    instanceconfig.MapLVsByNode(node_vol_should)
781

    
782
    for node in node_vol_should:
783
      for volume in node_vol_should[node]:
784
        if node not in node_vol_is or volume not in node_vol_is[node]:
785
          feedback_fn("  - ERROR: volume %s missing on node %s" %
786
                          (volume, node))
787
          bad = True
788

    
789
    if not instanceconfig.status == 'down':
790
      if not instance in node_instance[node_current]:
791
        feedback_fn("  - ERROR: instance %s not running on node %s" %
792
                        (instance, node_current))
793
        bad = True
794

    
795
    for node in node_instance:
796
      if (not node == node_current):
797
        if instance in node_instance[node]:
798
          feedback_fn("  - ERROR: instance %s should not run on node %s" %
799
                          (instance, node))
800
          bad = True
801

    
802
    return not bad
803

    
804
  def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
805
    """Verify if there are any unknown volumes in the cluster.
806

807
    The .os, .swap and backup volumes are ignored. All other volumes are
808
    reported as unknown.
809

810
    """
811
    bad = False
812

    
813
    for node in node_vol_is:
814
      for volume in node_vol_is[node]:
815
        if node not in node_vol_should or volume not in node_vol_should[node]:
816
          feedback_fn("  - ERROR: volume %s on node %s should not exist" %
817
                      (volume, node))
818
          bad = True
819
    return bad
820

    
821
  def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
822
    """Verify the list of running instances.
823

824
    This checks what instances are running but unknown to the cluster.
825

826
    """
827
    bad = False
828
    for node in node_instance:
829
      for runninginstance in node_instance[node]:
830
        if runninginstance not in instancelist:
831
          feedback_fn("  - ERROR: instance %s on node %s should not exist" %
832
                          (runninginstance, node))
833
          bad = True
834
    return bad
835

    
836
  def CheckPrereq(self):
837
    """Check prerequisites.
838

839
    This has no prerequisites.
840

841
    """
842
    pass
843

    
844
  def Exec(self, feedback_fn):
845
    """Verify integrity of cluster, performing various test on nodes.
846

847
    """
848
    bad = False
849
    feedback_fn("* Verifying global settings")
850
    self.cfg.VerifyConfig()
851

    
852
    master = self.sstore.GetMasterNode()
853
    vg_name = self.cfg.GetVGName()
854
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
855
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
856
    node_volume = {}
857
    node_instance = {}
858

    
859
    # FIXME: verify OS list
860
    # do local checksums
861
    file_names = list(self.sstore.GetFileList())
862
    file_names.append(constants.SSL_CERT_FILE)
863
    file_names.append(constants.CLUSTER_CONF_FILE)
864
    local_checksums = utils.FingerprintFiles(file_names)
865

    
866
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
867
    all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
868
    all_instanceinfo = rpc.call_instance_list(nodelist)
869
    all_vglist = rpc.call_vg_list(nodelist)
870
    node_verify_param = {
871
      'filelist': file_names,
872
      'nodelist': nodelist,
873
      'hypervisor': None,
874
      }
875
    all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
876
    all_rversion = rpc.call_version(nodelist)
877

    
878
    for node in nodelist:
879
      feedback_fn("* Verifying node %s" % node)
880
      result = self._VerifyNode(node, file_names, local_checksums,
881
                                all_vglist[node], all_nvinfo[node],
882
                                all_rversion[node], feedback_fn)
883
      bad = bad or result
884

    
885
      # node_volume
886
      volumeinfo = all_volumeinfo[node]
887

    
888
      if type(volumeinfo) != dict:
889
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
890
        bad = True
891
        continue
892

    
893
      node_volume[node] = volumeinfo
894

    
895
      # node_instance
896
      nodeinstance = all_instanceinfo[node]
897
      if type(nodeinstance) != list:
898
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
899
        bad = True
900
        continue
901

    
902
      node_instance[node] = nodeinstance
903

    
904
    node_vol_should = {}
905

    
906
    for instance in instancelist:
907
      feedback_fn("* Verifying instance %s" % instance)
908
      result =  self._VerifyInstance(instance, node_volume, node_instance,
909
                                     feedback_fn)
910
      bad = bad or result
911

    
912
      inst_config = self.cfg.GetInstanceInfo(instance)
913

    
914
      inst_config.MapLVsByNode(node_vol_should)
915

    
916
    feedback_fn("* Verifying orphan volumes")
917
    result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
918
                                       feedback_fn)
919
    bad = bad or result
920

    
921
    feedback_fn("* Verifying remaining instances")
922
    result = self._VerifyOrphanInstances(instancelist, node_instance,
923
                                         feedback_fn)
924
    bad = bad or result
925

    
926
    return int(bad)
927

    
928

    
929
class LURenameCluster(LogicalUnit):
930
  """Rename the cluster.
931

932
  """
933
  HPATH = "cluster-rename"
934
  HTYPE = constants.HTYPE_CLUSTER
935
  _OP_REQP = ["name"]
936

    
937
  def BuildHooksEnv(self):
938
    """Build hooks env.
939

940
    """
941
    env = {
942
      "NEW_NAME": self.op.name,
943
      }
944
    mn = self.sstore.GetMasterNode()
945
    return env, [mn], [mn]
946

    
947
  def CheckPrereq(self):
948
    """Verify that the passed name is a valid one.
949

950
    """
951
    hostname = utils.HostInfo(self.op.name)
952

    
953
    new_name = hostname.name
954
    self.ip = new_ip = hostname.ip
955
    old_name = self.sstore.GetClusterName()
956
    old_ip = self.sstore.GetMasterIP()
957
    if new_name == old_name and new_ip == old_ip:
958
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
959
                                 " cluster has changed")
960
    if new_ip != old_ip:
961
      result = utils.RunCmd(["fping", "-q", new_ip])
962
      if not result.failed:
963
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
964
                                   " reachable on the network. Aborting." %
965
                                   new_ip)
966

    
967
    self.op.name = new_name
968

    
969
  def Exec(self, feedback_fn):
970
    """Rename the cluster.
971

972
    """
973
    clustername = self.op.name
974
    ip = self.ip
975
    ss = self.sstore
976

    
977
    # shutdown the master IP
978
    master = ss.GetMasterNode()
979
    if not rpc.call_node_stop_master(master):
980
      raise errors.OpExecError("Could not disable the master role")
981

    
982
    try:
983
      # modify the sstore
984
      ss.SetKey(ss.SS_MASTER_IP, ip)
985
      ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
986

    
987
      # Distribute updated ss config to all nodes
988
      myself = self.cfg.GetNodeInfo(master)
989
      dist_nodes = self.cfg.GetNodeList()
990
      if myself.name in dist_nodes:
991
        dist_nodes.remove(myself.name)
992

    
993
      logger.Debug("Copying updated ssconf data to all nodes")
994
      for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
995
        fname = ss.KeyToFilename(keyname)
996
        result = rpc.call_upload_file(dist_nodes, fname)
997
        for to_node in dist_nodes:
998
          if not result[to_node]:
999
            logger.Error("copy of file %s to node %s failed" %
1000
                         (fname, to_node))
1001
    finally:
1002
      if not rpc.call_node_start_master(master):
1003
        logger.Error("Could not re-enable the master role on the master,\n"
1004
                     "please restart manually.")
1005

    
1006

    
1007
def _WaitForSync(cfgw, instance, oneshot=False, unlock=False):
1008
  """Sleep and poll for an instance's disk to sync.
1009

1010
  """
1011
  if not instance.disks:
1012
    return True
1013

    
1014
  if not oneshot:
1015
    logger.ToStdout("Waiting for instance %s to sync disks." % instance.name)
1016

    
1017
  node = instance.primary_node
1018

    
1019
  for dev in instance.disks:
1020
    cfgw.SetDiskID(dev, node)
1021

    
1022
  retries = 0
1023
  while True:
1024
    max_time = 0
1025
    done = True
1026
    cumul_degraded = False
1027
    rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1028
    if not rstats:
1029
      logger.ToStderr("Can't get any data from node %s" % node)
1030
      retries += 1
1031
      if retries >= 10:
1032
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1033
                                 " aborting." % node)
1034
      time.sleep(6)
1035
      continue
1036
    retries = 0
1037
    for i in range(len(rstats)):
1038
      mstat = rstats[i]
1039
      if mstat is None:
1040
        logger.ToStderr("Can't compute data for node %s/%s" %
1041
                        (node, instance.disks[i].iv_name))
1042
        continue
1043
      perc_done, est_time, is_degraded = mstat
1044
      cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1045
      if perc_done is not None:
1046
        done = False
1047
        if est_time is not None:
1048
          rem_time = "%d estimated seconds remaining" % est_time
1049
          max_time = est_time
1050
        else:
1051
          rem_time = "no time estimate"
1052
        logger.ToStdout("- device %s: %5.2f%% done, %s" %
1053
                        (instance.disks[i].iv_name, perc_done, rem_time))
1054
    if done or oneshot:
1055
      break
1056

    
1057
    if unlock:
1058
      utils.Unlock('cmd')
1059
    try:
1060
      time.sleep(min(60, max_time))
1061
    finally:
1062
      if unlock:
1063
        utils.Lock('cmd')
1064

    
1065
  if done:
1066
    logger.ToStdout("Instance %s's disks are in sync." % instance.name)
1067
  return not cumul_degraded
1068

    
1069

    
1070
def _CheckDiskConsistency(cfgw, dev, node, on_primary):
1071
  """Check that mirrors are not degraded.
1072

1073
  """
1074
  cfgw.SetDiskID(dev, node)
1075

    
1076
  result = True
1077
  if on_primary or dev.AssembleOnSecondary():
1078
    rstats = rpc.call_blockdev_find(node, dev)
1079
    if not rstats:
1080
      logger.ToStderr("Can't get any data from node %s" % node)
1081
      result = False
1082
    else:
1083
      result = result and (not rstats[5])
1084
  if dev.children:
1085
    for child in dev.children:
1086
      result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1087

    
1088
  return result
1089

    
1090

    
1091
class LUDiagnoseOS(NoHooksLU):
1092
  """Logical unit for OS diagnose/query.
1093

1094
  """
1095
  _OP_REQP = []
1096

    
1097
  def CheckPrereq(self):
1098
    """Check prerequisites.
1099

1100
    This always succeeds, since this is a pure query LU.
1101

1102
    """
1103
    return
1104

    
1105
  def Exec(self, feedback_fn):
1106
    """Compute the list of OSes.
1107

1108
    """
1109
    node_list = self.cfg.GetNodeList()
1110
    node_data = rpc.call_os_diagnose(node_list)
1111
    if node_data == False:
1112
      raise errors.OpExecError("Can't gather the list of OSes")
1113
    return node_data
1114

    
1115

    
1116
class LURemoveNode(LogicalUnit):
1117
  """Logical unit for removing a node.
1118

1119
  """
1120
  HPATH = "node-remove"
1121
  HTYPE = constants.HTYPE_NODE
1122
  _OP_REQP = ["node_name"]
1123

    
1124
  def BuildHooksEnv(self):
1125
    """Build hooks env.
1126

1127
    This doesn't run on the target node in the pre phase as a failed
1128
    node would not allows itself to run.
1129

1130
    """
1131
    env = {
1132
      "NODE_NAME": self.op.node_name,
1133
      }
1134
    all_nodes = self.cfg.GetNodeList()
1135
    all_nodes.remove(self.op.node_name)
1136
    return env, all_nodes, all_nodes
1137

    
1138
  def CheckPrereq(self):
1139
    """Check prerequisites.
1140

1141
    This checks:
1142
     - the node exists in the configuration
1143
     - it does not have primary or secondary instances
1144
     - it's not the master
1145

1146
    Any errors are signalled by raising errors.OpPrereqError.
1147

1148
    """
1149
    node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1150
    if node is None:
1151
      raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1152

    
1153
    instance_list = self.cfg.GetInstanceList()
1154

    
1155
    masternode = self.sstore.GetMasterNode()
1156
    if node.name == masternode:
1157
      raise errors.OpPrereqError("Node is the master node,"
1158
                                 " you need to failover first.")
1159

    
1160
    for instance_name in instance_list:
1161
      instance = self.cfg.GetInstanceInfo(instance_name)
1162
      if node.name == instance.primary_node:
1163
        raise errors.OpPrereqError("Instance %s still running on the node,"
1164
                                   " please remove first." % instance_name)
1165
      if node.name in instance.secondary_nodes:
1166
        raise errors.OpPrereqError("Instance %s has node as a secondary,"
1167
                                   " please remove first." % instance_name)
1168
    self.op.node_name = node.name
1169
    self.node = node
1170

    
1171
  def Exec(self, feedback_fn):
1172
    """Removes the node from the cluster.
1173

1174
    """
1175
    node = self.node
1176
    logger.Info("stopping the node daemon and removing configs from node %s" %
1177
                node.name)
1178

    
1179
    rpc.call_node_leave_cluster(node.name)
1180

    
1181
    ssh.SSHCall(node.name, 'root', "%s stop" % constants.NODE_INITD_SCRIPT)
1182

    
1183
    logger.Info("Removing node %s from config" % node.name)
1184

    
1185
    self.cfg.RemoveNode(node.name)
1186

    
1187

    
1188
class LUQueryNodes(NoHooksLU):
1189
  """Logical unit for querying nodes.
1190

1191
  """
1192
  _OP_REQP = ["output_fields", "names"]
1193

    
1194
  def CheckPrereq(self):
1195
    """Check prerequisites.
1196

1197
    This checks that the fields required are valid output fields.
1198

1199
    """
1200
    self.dynamic_fields = frozenset(["dtotal", "dfree",
1201
                                     "mtotal", "mnode", "mfree"])
1202

    
1203
    _CheckOutputFields(static=["name", "pinst_cnt", "sinst_cnt",
1204
                               "pinst_list", "sinst_list",
1205
                               "pip", "sip"],
1206
                       dynamic=self.dynamic_fields,
1207
                       selected=self.op.output_fields)
1208

    
1209
    self.wanted = _GetWantedNodes(self, self.op.names)
1210

    
1211
  def Exec(self, feedback_fn):
1212
    """Computes the list of nodes and their attributes.
1213

1214
    """
1215
    nodenames = self.wanted
1216
    nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1217

    
1218
    # begin data gathering
1219

    
1220
    if self.dynamic_fields.intersection(self.op.output_fields):
1221
      live_data = {}
1222
      node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1223
      for name in nodenames:
1224
        nodeinfo = node_data.get(name, None)
1225
        if nodeinfo:
1226
          live_data[name] = {
1227
            "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1228
            "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1229
            "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1230
            "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1231
            "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1232
            }
1233
        else:
1234
          live_data[name] = {}
1235
    else:
1236
      live_data = dict.fromkeys(nodenames, {})
1237

    
1238
    node_to_primary = dict([(name, set()) for name in nodenames])
1239
    node_to_secondary = dict([(name, set()) for name in nodenames])
1240

    
1241
    inst_fields = frozenset(("pinst_cnt", "pinst_list",
1242
                             "sinst_cnt", "sinst_list"))
1243
    if inst_fields & frozenset(self.op.output_fields):
1244
      instancelist = self.cfg.GetInstanceList()
1245

    
1246
      for instance_name in instancelist:
1247
        inst = self.cfg.GetInstanceInfo(instance_name)
1248
        if inst.primary_node in node_to_primary:
1249
          node_to_primary[inst.primary_node].add(inst.name)
1250
        for secnode in inst.secondary_nodes:
1251
          if secnode in node_to_secondary:
1252
            node_to_secondary[secnode].add(inst.name)
1253

    
1254
    # end data gathering
1255

    
1256
    output = []
1257
    for node in nodelist:
1258
      node_output = []
1259
      for field in self.op.output_fields:
1260
        if field == "name":
1261
          val = node.name
1262
        elif field == "pinst_list":
1263
          val = list(node_to_primary[node.name])
1264
        elif field == "sinst_list":
1265
          val = list(node_to_secondary[node.name])
1266
        elif field == "pinst_cnt":
1267
          val = len(node_to_primary[node.name])
1268
        elif field == "sinst_cnt":
1269
          val = len(node_to_secondary[node.name])
1270
        elif field == "pip":
1271
          val = node.primary_ip
1272
        elif field == "sip":
1273
          val = node.secondary_ip
1274
        elif field in self.dynamic_fields:
1275
          val = live_data[node.name].get(field, None)
1276
        else:
1277
          raise errors.ParameterError(field)
1278
        node_output.append(val)
1279
      output.append(node_output)
1280

    
1281
    return output
1282

    
1283

    
1284
class LUQueryNodeVolumes(NoHooksLU):
1285
  """Logical unit for getting volumes on node(s).
1286

1287
  """
1288
  _OP_REQP = ["nodes", "output_fields"]
1289

    
1290
  def CheckPrereq(self):
1291
    """Check prerequisites.
1292

1293
    This checks that the fields required are valid output fields.
1294

1295
    """
1296
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1297

    
1298
    _CheckOutputFields(static=["node"],
1299
                       dynamic=["phys", "vg", "name", "size", "instance"],
1300
                       selected=self.op.output_fields)
1301

    
1302

    
1303
  def Exec(self, feedback_fn):
1304
    """Computes the list of nodes and their attributes.
1305

1306
    """
1307
    nodenames = self.nodes
1308
    volumes = rpc.call_node_volumes(nodenames)
1309

    
1310
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
1311
             in self.cfg.GetInstanceList()]
1312

    
1313
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1314

    
1315
    output = []
1316
    for node in nodenames:
1317
      if node not in volumes or not volumes[node]:
1318
        continue
1319

    
1320
      node_vols = volumes[node][:]
1321
      node_vols.sort(key=lambda vol: vol['dev'])
1322

    
1323
      for vol in node_vols:
1324
        node_output = []
1325
        for field in self.op.output_fields:
1326
          if field == "node":
1327
            val = node
1328
          elif field == "phys":
1329
            val = vol['dev']
1330
          elif field == "vg":
1331
            val = vol['vg']
1332
          elif field == "name":
1333
            val = vol['name']
1334
          elif field == "size":
1335
            val = int(float(vol['size']))
1336
          elif field == "instance":
1337
            for inst in ilist:
1338
              if node not in lv_by_node[inst]:
1339
                continue
1340
              if vol['name'] in lv_by_node[inst][node]:
1341
                val = inst.name
1342
                break
1343
            else:
1344
              val = '-'
1345
          else:
1346
            raise errors.ParameterError(field)
1347
          node_output.append(str(val))
1348

    
1349
        output.append(node_output)
1350

    
1351
    return output
1352

    
1353

    
1354
class LUAddNode(LogicalUnit):
1355
  """Logical unit for adding node to the cluster.
1356

1357
  """
1358
  HPATH = "node-add"
1359
  HTYPE = constants.HTYPE_NODE
1360
  _OP_REQP = ["node_name"]
1361

    
1362
  def BuildHooksEnv(self):
1363
    """Build hooks env.
1364

1365
    This will run on all nodes before, and on all nodes + the new node after.
1366

1367
    """
1368
    env = {
1369
      "NODE_NAME": self.op.node_name,
1370
      "NODE_PIP": self.op.primary_ip,
1371
      "NODE_SIP": self.op.secondary_ip,
1372
      }
1373
    nodes_0 = self.cfg.GetNodeList()
1374
    nodes_1 = nodes_0 + [self.op.node_name, ]
1375
    return env, nodes_0, nodes_1
1376

    
1377
  def CheckPrereq(self):
1378
    """Check prerequisites.
1379

1380
    This checks:
1381
     - the new node is not already in the config
1382
     - it is resolvable
1383
     - its parameters (single/dual homed) matches the cluster
1384

1385
    Any errors are signalled by raising errors.OpPrereqError.
1386

1387
    """
1388
    node_name = self.op.node_name
1389
    cfg = self.cfg
1390

    
1391
    dns_data = utils.HostInfo(node_name)
1392

    
1393
    node = dns_data.name
1394
    primary_ip = self.op.primary_ip = dns_data.ip
1395
    secondary_ip = getattr(self.op, "secondary_ip", None)
1396
    if secondary_ip is None:
1397
      secondary_ip = primary_ip
1398
    if not utils.IsValidIP(secondary_ip):
1399
      raise errors.OpPrereqError("Invalid secondary IP given")
1400
    self.op.secondary_ip = secondary_ip
1401
    node_list = cfg.GetNodeList()
1402
    if node in node_list:
1403
      raise errors.OpPrereqError("Node %s is already in the configuration"
1404
                                 % node)
1405

    
1406
    for existing_node_name in node_list:
1407
      existing_node = cfg.GetNodeInfo(existing_node_name)
1408
      if (existing_node.primary_ip == primary_ip or
1409
          existing_node.secondary_ip == primary_ip or
1410
          existing_node.primary_ip == secondary_ip or
1411
          existing_node.secondary_ip == secondary_ip):
1412
        raise errors.OpPrereqError("New node ip address(es) conflict with"
1413
                                   " existing node %s" % existing_node.name)
1414

    
1415
    # check that the type of the node (single versus dual homed) is the
1416
    # same as for the master
1417
    myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1418
    master_singlehomed = myself.secondary_ip == myself.primary_ip
1419
    newbie_singlehomed = secondary_ip == primary_ip
1420
    if master_singlehomed != newbie_singlehomed:
1421
      if master_singlehomed:
1422
        raise errors.OpPrereqError("The master has no private ip but the"
1423
                                   " new node has one")
1424
      else:
1425
        raise errors.OpPrereqError("The master has a private ip but the"
1426
                                   " new node doesn't have one")
1427

    
1428
    # checks reachablity
1429
    command = ["fping", "-q", primary_ip]
1430
    result = utils.RunCmd(command)
1431
    if result.failed:
1432
      raise errors.OpPrereqError("Node not reachable by ping")
1433

    
1434
    if not newbie_singlehomed:
1435
      # check reachability from my secondary ip to newbie's secondary ip
1436
      command = ["fping", "-S%s" % myself.secondary_ip, "-q", secondary_ip]
1437
      result = utils.RunCmd(command)
1438
      if result.failed:
1439
        raise errors.OpPrereqError("Node secondary ip not reachable by ping")
1440

    
1441
    self.new_node = objects.Node(name=node,
1442
                                 primary_ip=primary_ip,
1443
                                 secondary_ip=secondary_ip)
1444

    
1445
  def Exec(self, feedback_fn):
1446
    """Adds the new node to the cluster.
1447

1448
    """
1449
    new_node = self.new_node
1450
    node = new_node.name
1451

    
1452
    # set up inter-node password and certificate and restarts the node daemon
1453
    gntpass = self.sstore.GetNodeDaemonPassword()
1454
    if not re.match('^[a-zA-Z0-9.]{1,64}$', gntpass):
1455
      raise errors.OpExecError("ganeti password corruption detected")
1456
    f = open(constants.SSL_CERT_FILE)
1457
    try:
1458
      gntpem = f.read(8192)
1459
    finally:
1460
      f.close()
1461
    # in the base64 pem encoding, neither '!' nor '.' are valid chars,
1462
    # so we use this to detect an invalid certificate; as long as the
1463
    # cert doesn't contain this, the here-document will be correctly
1464
    # parsed by the shell sequence below
1465
    if re.search('^!EOF\.', gntpem, re.MULTILINE):
1466
      raise errors.OpExecError("invalid PEM encoding in the SSL certificate")
1467
    if not gntpem.endswith("\n"):
1468
      raise errors.OpExecError("PEM must end with newline")
1469
    logger.Info("copy cluster pass to %s and starting the node daemon" % node)
1470

    
1471
    # and then connect with ssh to set password and start ganeti-noded
1472
    # note that all the below variables are sanitized at this point,
1473
    # either by being constants or by the checks above
1474
    ss = self.sstore
1475
    mycommand = ("umask 077 && "
1476
                 "echo '%s' > '%s' && "
1477
                 "cat > '%s' << '!EOF.' && \n"
1478
                 "%s!EOF.\n%s restart" %
1479
                 (gntpass, ss.KeyToFilename(ss.SS_NODED_PASS),
1480
                  constants.SSL_CERT_FILE, gntpem,
1481
                  constants.NODE_INITD_SCRIPT))
1482

    
1483
    result = ssh.SSHCall(node, 'root', mycommand, batch=False, ask_key=True)
1484
    if result.failed:
1485
      raise errors.OpExecError("Remote command on node %s, error: %s,"
1486
                               " output: %s" %
1487
                               (node, result.fail_reason, result.output))
1488

    
1489
    # check connectivity
1490
    time.sleep(4)
1491

    
1492
    result = rpc.call_version([node])[node]
1493
    if result:
1494
      if constants.PROTOCOL_VERSION == result:
1495
        logger.Info("communication to node %s fine, sw version %s match" %
1496
                    (node, result))
1497
      else:
1498
        raise errors.OpExecError("Version mismatch master version %s,"
1499
                                 " node version %s" %
1500
                                 (constants.PROTOCOL_VERSION, result))
1501
    else:
1502
      raise errors.OpExecError("Cannot get version from the new node")
1503

    
1504
    # setup ssh on node
1505
    logger.Info("copy ssh key to node %s" % node)
1506
    keyarray = []
1507
    keyfiles = ["/etc/ssh/ssh_host_dsa_key", "/etc/ssh/ssh_host_dsa_key.pub",
1508
                "/etc/ssh/ssh_host_rsa_key", "/etc/ssh/ssh_host_rsa_key.pub",
1509
                "/root/.ssh/id_dsa", "/root/.ssh/id_dsa.pub"]
1510

    
1511
    for i in keyfiles:
1512
      f = open(i, 'r')
1513
      try:
1514
        keyarray.append(f.read())
1515
      finally:
1516
        f.close()
1517

    
1518
    result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1519
                               keyarray[3], keyarray[4], keyarray[5])
1520

    
1521
    if not result:
1522
      raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1523

    
1524
    # Add node to our /etc/hosts, and add key to known_hosts
1525
    _UpdateEtcHosts(new_node.name, new_node.primary_ip)
1526
    _UpdateKnownHosts(new_node.name, new_node.primary_ip,
1527
                      self.cfg.GetHostKey())
1528

    
1529
    if new_node.secondary_ip != new_node.primary_ip:
1530
      result = ssh.SSHCall(node, "root",
1531
                           "fping -S 127.0.0.1 -q %s" % new_node.secondary_ip)
1532
      if result.failed:
1533
        raise errors.OpExecError("Node claims it doesn't have the"
1534
                                 " secondary ip you gave (%s).\n"
1535
                                 "Please fix and re-run this command." %
1536
                                 new_node.secondary_ip)
1537

    
1538
    success, msg = ssh.VerifyNodeHostname(node)
1539
    if not success:
1540
      raise errors.OpExecError("Node '%s' claims it has a different hostname"
1541
                               " than the one the resolver gives: %s.\n"
1542
                               "Please fix and re-run this command." %
1543
                               (node, msg))
1544

    
1545
    # Distribute updated /etc/hosts and known_hosts to all nodes,
1546
    # including the node just added
1547
    myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1548
    dist_nodes = self.cfg.GetNodeList() + [node]
1549
    if myself.name in dist_nodes:
1550
      dist_nodes.remove(myself.name)
1551

    
1552
    logger.Debug("Copying hosts and known_hosts to all nodes")
1553
    for fname in ("/etc/hosts", constants.SSH_KNOWN_HOSTS_FILE):
1554
      result = rpc.call_upload_file(dist_nodes, fname)
1555
      for to_node in dist_nodes:
1556
        if not result[to_node]:
1557
          logger.Error("copy of file %s to node %s failed" %
1558
                       (fname, to_node))
1559

    
1560
    to_copy = ss.GetFileList()
1561
    for fname in to_copy:
1562
      if not ssh.CopyFileToNode(node, fname):
1563
        logger.Error("could not copy file %s to node %s" % (fname, node))
1564

    
1565
    logger.Info("adding node %s to cluster.conf" % node)
1566
    self.cfg.AddNode(new_node)
1567

    
1568

    
1569
class LUMasterFailover(LogicalUnit):
1570
  """Failover the master node to the current node.
1571

1572
  This is a special LU in that it must run on a non-master node.
1573

1574
  """
1575
  HPATH = "master-failover"
1576
  HTYPE = constants.HTYPE_CLUSTER
1577
  REQ_MASTER = False
1578
  _OP_REQP = []
1579

    
1580
  def BuildHooksEnv(self):
1581
    """Build hooks env.
1582

1583
    This will run on the new master only in the pre phase, and on all
1584
    the nodes in the post phase.
1585

1586
    """
1587
    env = {
1588
      "NEW_MASTER": self.new_master,
1589
      "OLD_MASTER": self.old_master,
1590
      }
1591
    return env, [self.new_master], self.cfg.GetNodeList()
1592

    
1593
  def CheckPrereq(self):
1594
    """Check prerequisites.
1595

1596
    This checks that we are not already the master.
1597

1598
    """
1599
    self.new_master = utils.HostInfo().name
1600
    self.old_master = self.sstore.GetMasterNode()
1601

    
1602
    if self.old_master == self.new_master:
1603
      raise errors.OpPrereqError("This commands must be run on the node"
1604
                                 " where you want the new master to be.\n"
1605
                                 "%s is already the master" %
1606
                                 self.old_master)
1607

    
1608
  def Exec(self, feedback_fn):
1609
    """Failover the master node.
1610

1611
    This command, when run on a non-master node, will cause the current
1612
    master to cease being master, and the non-master to become new
1613
    master.
1614

1615
    """
1616
    #TODO: do not rely on gethostname returning the FQDN
1617
    logger.Info("setting master to %s, old master: %s" %
1618
                (self.new_master, self.old_master))
1619

    
1620
    if not rpc.call_node_stop_master(self.old_master):
1621
      logger.Error("could disable the master role on the old master"
1622
                   " %s, please disable manually" % self.old_master)
1623

    
1624
    ss = self.sstore
1625
    ss.SetKey(ss.SS_MASTER_NODE, self.new_master)
1626
    if not rpc.call_upload_file(self.cfg.GetNodeList(),
1627
                                ss.KeyToFilename(ss.SS_MASTER_NODE)):
1628
      logger.Error("could not distribute the new simple store master file"
1629
                   " to the other nodes, please check.")
1630

    
1631
    if not rpc.call_node_start_master(self.new_master):
1632
      logger.Error("could not start the master role on the new master"
1633
                   " %s, please check" % self.new_master)
1634
      feedback_fn("Error in activating the master IP on the new master,\n"
1635
                  "please fix manually.")
1636

    
1637

    
1638

    
1639
class LUQueryClusterInfo(NoHooksLU):
1640
  """Query cluster configuration.
1641

1642
  """
1643
  _OP_REQP = []
1644
  REQ_MASTER = False
1645

    
1646
  def CheckPrereq(self):
1647
    """No prerequsites needed for this LU.
1648

1649
    """
1650
    pass
1651

    
1652
  def Exec(self, feedback_fn):
1653
    """Return cluster config.
1654

1655
    """
1656
    result = {
1657
      "name": self.sstore.GetClusterName(),
1658
      "software_version": constants.RELEASE_VERSION,
1659
      "protocol_version": constants.PROTOCOL_VERSION,
1660
      "config_version": constants.CONFIG_VERSION,
1661
      "os_api_version": constants.OS_API_VERSION,
1662
      "export_version": constants.EXPORT_VERSION,
1663
      "master": self.sstore.GetMasterNode(),
1664
      "architecture": (platform.architecture()[0], platform.machine()),
1665
      }
1666

    
1667
    return result
1668

    
1669

    
1670
class LUClusterCopyFile(NoHooksLU):
1671
  """Copy file to cluster.
1672

1673
  """
1674
  _OP_REQP = ["nodes", "filename"]
1675

    
1676
  def CheckPrereq(self):
1677
    """Check prerequisites.
1678

1679
    It should check that the named file exists and that the given list
1680
    of nodes is valid.
1681

1682
    """
1683
    if not os.path.exists(self.op.filename):
1684
      raise errors.OpPrereqError("No such filename '%s'" % self.op.filename)
1685

    
1686
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1687

    
1688
  def Exec(self, feedback_fn):
1689
    """Copy a file from master to some nodes.
1690

1691
    Args:
1692
      opts - class with options as members
1693
      args - list containing a single element, the file name
1694
    Opts used:
1695
      nodes - list containing the name of target nodes; if empty, all nodes
1696

1697
    """
1698
    filename = self.op.filename
1699

    
1700
    myname = utils.HostInfo().name
1701

    
1702
    for node in self.nodes:
1703
      if node == myname:
1704
        continue
1705
      if not ssh.CopyFileToNode(node, filename):
1706
        logger.Error("Copy of file %s to node %s failed" % (filename, node))
1707

    
1708

    
1709
class LUDumpClusterConfig(NoHooksLU):
1710
  """Return a text-representation of the cluster-config.
1711

1712
  """
1713
  _OP_REQP = []
1714

    
1715
  def CheckPrereq(self):
1716
    """No prerequisites.
1717

1718
    """
1719
    pass
1720

    
1721
  def Exec(self, feedback_fn):
1722
    """Dump a representation of the cluster config to the standard output.
1723

1724
    """
1725
    return self.cfg.DumpConfig()
1726

    
1727

    
1728
class LURunClusterCommand(NoHooksLU):
1729
  """Run a command on some nodes.
1730

1731
  """
1732
  _OP_REQP = ["command", "nodes"]
1733

    
1734
  def CheckPrereq(self):
1735
    """Check prerequisites.
1736

1737
    It checks that the given list of nodes is valid.
1738

1739
    """
1740
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1741

    
1742
  def Exec(self, feedback_fn):
1743
    """Run a command on some nodes.
1744

1745
    """
1746
    data = []
1747
    for node in self.nodes:
1748
      result = ssh.SSHCall(node, "root", self.op.command)
1749
      data.append((node, result.output, result.exit_code))
1750

    
1751
    return data
1752

    
1753

    
1754
class LUActivateInstanceDisks(NoHooksLU):
1755
  """Bring up an instance's disks.
1756

1757
  """
1758
  _OP_REQP = ["instance_name"]
1759

    
1760
  def CheckPrereq(self):
1761
    """Check prerequisites.
1762

1763
    This checks that the instance is in the cluster.
1764

1765
    """
1766
    instance = self.cfg.GetInstanceInfo(
1767
      self.cfg.ExpandInstanceName(self.op.instance_name))
1768
    if instance is None:
1769
      raise errors.OpPrereqError("Instance '%s' not known" %
1770
                                 self.op.instance_name)
1771
    self.instance = instance
1772

    
1773

    
1774
  def Exec(self, feedback_fn):
1775
    """Activate the disks.
1776

1777
    """
1778
    disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
1779
    if not disks_ok:
1780
      raise errors.OpExecError("Cannot activate block devices")
1781

    
1782
    return disks_info
1783

    
1784

    
1785
def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
1786
  """Prepare the block devices for an instance.
1787

1788
  This sets up the block devices on all nodes.
1789

1790
  Args:
1791
    instance: a ganeti.objects.Instance object
1792
    ignore_secondaries: if true, errors on secondary nodes won't result
1793
                        in an error return from the function
1794

1795
  Returns:
1796
    false if the operation failed
1797
    list of (host, instance_visible_name, node_visible_name) if the operation
1798
         suceeded with the mapping from node devices to instance devices
1799
  """
1800
  device_info = []
1801
  disks_ok = True
1802
  for inst_disk in instance.disks:
1803
    master_result = None
1804
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1805
      cfg.SetDiskID(node_disk, node)
1806
      is_primary = node == instance.primary_node
1807
      result = rpc.call_blockdev_assemble(node, node_disk, is_primary)
1808
      if not result:
1809
        logger.Error("could not prepare block device %s on node %s (is_pri"
1810
                     "mary=%s)" % (inst_disk.iv_name, node, is_primary))
1811
        if is_primary or not ignore_secondaries:
1812
          disks_ok = False
1813
      if is_primary:
1814
        master_result = result
1815
    device_info.append((instance.primary_node, inst_disk.iv_name,
1816
                        master_result))
1817

    
1818
  return disks_ok, device_info
1819

    
1820

    
1821
def _StartInstanceDisks(cfg, instance, force):
1822
  """Start the disks of an instance.
1823

1824
  """
1825
  disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
1826
                                           ignore_secondaries=force)
1827
  if not disks_ok:
1828
    _ShutdownInstanceDisks(instance, cfg)
1829
    if force is not None and not force:
1830
      logger.Error("If the message above refers to a secondary node,"
1831
                   " you can retry the operation using '--force'.")
1832
    raise errors.OpExecError("Disk consistency error")
1833

    
1834

    
1835
class LUDeactivateInstanceDisks(NoHooksLU):
1836
  """Shutdown an instance's disks.
1837

1838
  """
1839
  _OP_REQP = ["instance_name"]
1840

    
1841
  def CheckPrereq(self):
1842
    """Check prerequisites.
1843

1844
    This checks that the instance is in the cluster.
1845

1846
    """
1847
    instance = self.cfg.GetInstanceInfo(
1848
      self.cfg.ExpandInstanceName(self.op.instance_name))
1849
    if instance is None:
1850
      raise errors.OpPrereqError("Instance '%s' not known" %
1851
                                 self.op.instance_name)
1852
    self.instance = instance
1853

    
1854
  def Exec(self, feedback_fn):
1855
    """Deactivate the disks
1856

1857
    """
1858
    instance = self.instance
1859
    ins_l = rpc.call_instance_list([instance.primary_node])
1860
    ins_l = ins_l[instance.primary_node]
1861
    if not type(ins_l) is list:
1862
      raise errors.OpExecError("Can't contact node '%s'" %
1863
                               instance.primary_node)
1864

    
1865
    if self.instance.name in ins_l:
1866
      raise errors.OpExecError("Instance is running, can't shutdown"
1867
                               " block devices.")
1868

    
1869
    _ShutdownInstanceDisks(instance, self.cfg)
1870

    
1871

    
1872
def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
1873
  """Shutdown block devices of an instance.
1874

1875
  This does the shutdown on all nodes of the instance.
1876

1877
  If the ignore_primary is false, errors on the primary node are
1878
  ignored.
1879

1880
  """
1881
  result = True
1882
  for disk in instance.disks:
1883
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
1884
      cfg.SetDiskID(top_disk, node)
1885
      if not rpc.call_blockdev_shutdown(node, top_disk):
1886
        logger.Error("could not shutdown block device %s on node %s" %
1887
                     (disk.iv_name, node))
1888
        if not ignore_primary or node != instance.primary_node:
1889
          result = False
1890
  return result
1891

    
1892

    
1893
class LUStartupInstance(LogicalUnit):
1894
  """Starts an instance.
1895

1896
  """
1897
  HPATH = "instance-start"
1898
  HTYPE = constants.HTYPE_INSTANCE
1899
  _OP_REQP = ["instance_name", "force"]
1900

    
1901
  def BuildHooksEnv(self):
1902
    """Build hooks env.
1903

1904
    This runs on master, primary and secondary nodes of the instance.
1905

1906
    """
1907
    env = {
1908
      "FORCE": self.op.force,
1909
      }
1910
    env.update(_BuildInstanceHookEnvByObject(self.instance))
1911
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
1912
          list(self.instance.secondary_nodes))
1913
    return env, nl, nl
1914

    
1915
  def CheckPrereq(self):
1916
    """Check prerequisites.
1917

1918
    This checks that the instance is in the cluster.
1919

1920
    """
1921
    instance = self.cfg.GetInstanceInfo(
1922
      self.cfg.ExpandInstanceName(self.op.instance_name))
1923
    if instance is None:
1924
      raise errors.OpPrereqError("Instance '%s' not known" %
1925
                                 self.op.instance_name)
1926

    
1927
    # check bridges existance
1928
    brlist = [nic.bridge for nic in instance.nics]
1929
    if not rpc.call_bridges_exist(instance.primary_node, brlist):
1930
      raise errors.OpPrereqError("one or more target bridges %s does not"
1931
                                 " exist on destination node '%s'" %
1932
                                 (brlist, instance.primary_node))
1933

    
1934
    self.instance = instance
1935
    self.op.instance_name = instance.name
1936

    
1937
  def Exec(self, feedback_fn):
1938
    """Start the instance.
1939

1940
    """
1941
    instance = self.instance
1942
    force = self.op.force
1943
    extra_args = getattr(self.op, "extra_args", "")
1944

    
1945
    node_current = instance.primary_node
1946

    
1947
    nodeinfo = rpc.call_node_info([node_current], self.cfg.GetVGName())
1948
    if not nodeinfo:
1949
      raise errors.OpExecError("Could not contact node %s for infos" %
1950
                               (node_current))
1951

    
1952
    freememory = nodeinfo[node_current]['memory_free']
1953
    memory = instance.memory
1954
    if memory > freememory:
1955
      raise errors.OpExecError("Not enough memory to start instance"
1956
                               " %s on node %s"
1957
                               " needed %s MiB, available %s MiB" %
1958
                               (instance.name, node_current, memory,
1959
                                freememory))
1960

    
1961
    _StartInstanceDisks(self.cfg, instance, force)
1962

    
1963
    if not rpc.call_instance_start(node_current, instance, extra_args):
1964
      _ShutdownInstanceDisks(instance, self.cfg)
1965
      raise errors.OpExecError("Could not start instance")
1966

    
1967
    self.cfg.MarkInstanceUp(instance.name)
1968

    
1969

    
1970
class LUShutdownInstance(LogicalUnit):
1971
  """Shutdown an instance.
1972

1973
  """
1974
  HPATH = "instance-stop"
1975
  HTYPE = constants.HTYPE_INSTANCE
1976
  _OP_REQP = ["instance_name"]
1977

    
1978
  def BuildHooksEnv(self):
1979
    """Build hooks env.
1980

1981
    This runs on master, primary and secondary nodes of the instance.
1982

1983
    """
1984
    env = _BuildInstanceHookEnvByObject(self.instance)
1985
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
1986
          list(self.instance.secondary_nodes))
1987
    return env, nl, nl
1988

    
1989
  def CheckPrereq(self):
1990
    """Check prerequisites.
1991

1992
    This checks that the instance is in the cluster.
1993

1994
    """
1995
    instance = self.cfg.GetInstanceInfo(
1996
      self.cfg.ExpandInstanceName(self.op.instance_name))
1997
    if instance is None:
1998
      raise errors.OpPrereqError("Instance '%s' not known" %
1999
                                 self.op.instance_name)
2000
    self.instance = instance
2001

    
2002
  def Exec(self, feedback_fn):
2003
    """Shutdown the instance.
2004

2005
    """
2006
    instance = self.instance
2007
    node_current = instance.primary_node
2008
    if not rpc.call_instance_shutdown(node_current, instance):
2009
      logger.Error("could not shutdown instance")
2010

    
2011
    self.cfg.MarkInstanceDown(instance.name)
2012
    _ShutdownInstanceDisks(instance, self.cfg)
2013

    
2014

    
2015
class LUReinstallInstance(LogicalUnit):
2016
  """Reinstall an instance.
2017

2018
  """
2019
  HPATH = "instance-reinstall"
2020
  HTYPE = constants.HTYPE_INSTANCE
2021
  _OP_REQP = ["instance_name"]
2022

    
2023
  def BuildHooksEnv(self):
2024
    """Build hooks env.
2025

2026
    This runs on master, primary and secondary nodes of the instance.
2027

2028
    """
2029
    env = _BuildInstanceHookEnvByObject(self.instance)
2030
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2031
          list(self.instance.secondary_nodes))
2032
    return env, nl, nl
2033

    
2034
  def CheckPrereq(self):
2035
    """Check prerequisites.
2036

2037
    This checks that the instance is in the cluster and is not running.
2038

2039
    """
2040
    instance = self.cfg.GetInstanceInfo(
2041
      self.cfg.ExpandInstanceName(self.op.instance_name))
2042
    if instance is None:
2043
      raise errors.OpPrereqError("Instance '%s' not known" %
2044
                                 self.op.instance_name)
2045
    if instance.disk_template == constants.DT_DISKLESS:
2046
      raise errors.OpPrereqError("Instance '%s' has no disks" %
2047
                                 self.op.instance_name)
2048
    if instance.status != "down":
2049
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2050
                                 self.op.instance_name)
2051
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2052
    if remote_info:
2053
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2054
                                 (self.op.instance_name,
2055
                                  instance.primary_node))
2056

    
2057
    self.op.os_type = getattr(self.op, "os_type", None)
2058
    if self.op.os_type is not None:
2059
      # OS verification
2060
      pnode = self.cfg.GetNodeInfo(
2061
        self.cfg.ExpandNodeName(instance.primary_node))
2062
      if pnode is None:
2063
        raise errors.OpPrereqError("Primary node '%s' is unknown" %
2064
                                   self.op.pnode)
2065
      os_obj = rpc.call_os_get([pnode.name], self.op.os_type)[pnode.name]
2066
      if not isinstance(os_obj, objects.OS):
2067
        raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2068
                                   " primary node"  % self.op.os_type)
2069

    
2070
    self.instance = instance
2071

    
2072
  def Exec(self, feedback_fn):
2073
    """Reinstall the instance.
2074

2075
    """
2076
    inst = self.instance
2077

    
2078
    if self.op.os_type is not None:
2079
      feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2080
      inst.os = self.op.os_type
2081
      self.cfg.AddInstance(inst)
2082

    
2083
    _StartInstanceDisks(self.cfg, inst, None)
2084
    try:
2085
      feedback_fn("Running the instance OS create scripts...")
2086
      if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2087
        raise errors.OpExecError("Could not install OS for instance %s "
2088
                                 "on node %s" %
2089
                                 (inst.name, inst.primary_node))
2090
    finally:
2091
      _ShutdownInstanceDisks(inst, self.cfg)
2092

    
2093

    
2094
class LURenameInstance(LogicalUnit):
2095
  """Rename an instance.
2096

2097
  """
2098
  HPATH = "instance-rename"
2099
  HTYPE = constants.HTYPE_INSTANCE
2100
  _OP_REQP = ["instance_name", "new_name"]
2101

    
2102
  def BuildHooksEnv(self):
2103
    """Build hooks env.
2104

2105
    This runs on master, primary and secondary nodes of the instance.
2106

2107
    """
2108
    env = _BuildInstanceHookEnvByObject(self.instance)
2109
    env["INSTANCE_NEW_NAME"] = self.op.new_name
2110
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2111
          list(self.instance.secondary_nodes))
2112
    return env, nl, nl
2113

    
2114
  def CheckPrereq(self):
2115
    """Check prerequisites.
2116

2117
    This checks that the instance is in the cluster and is not running.
2118

2119
    """
2120
    instance = self.cfg.GetInstanceInfo(
2121
      self.cfg.ExpandInstanceName(self.op.instance_name))
2122
    if instance is None:
2123
      raise errors.OpPrereqError("Instance '%s' not known" %
2124
                                 self.op.instance_name)
2125
    if instance.status != "down":
2126
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2127
                                 self.op.instance_name)
2128
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2129
    if remote_info:
2130
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2131
                                 (self.op.instance_name,
2132
                                  instance.primary_node))
2133
    self.instance = instance
2134

    
2135
    # new name verification
2136
    name_info = utils.HostInfo(self.op.new_name)
2137

    
2138
    self.op.new_name = new_name = name_info.name
2139
    if not getattr(self.op, "ignore_ip", False):
2140
      command = ["fping", "-q", name_info.ip]
2141
      result = utils.RunCmd(command)
2142
      if not result.failed:
2143
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
2144
                                   (name_info.ip, new_name))
2145

    
2146

    
2147
  def Exec(self, feedback_fn):
2148
    """Reinstall the instance.
2149

2150
    """
2151
    inst = self.instance
2152
    old_name = inst.name
2153

    
2154
    self.cfg.RenameInstance(inst.name, self.op.new_name)
2155

    
2156
    # re-read the instance from the configuration after rename
2157
    inst = self.cfg.GetInstanceInfo(self.op.new_name)
2158

    
2159
    _StartInstanceDisks(self.cfg, inst, None)
2160
    try:
2161
      if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2162
                                          "sda", "sdb"):
2163
        msg = ("Could run OS rename script for instance %s\n"
2164
               "on node %s\n"
2165
               "(but the instance has been renamed in Ganeti)" %
2166
               (inst.name, inst.primary_node))
2167
        logger.Error(msg)
2168
    finally:
2169
      _ShutdownInstanceDisks(inst, self.cfg)
2170

    
2171

    
2172
class LURemoveInstance(LogicalUnit):
2173
  """Remove an instance.
2174

2175
  """
2176
  HPATH = "instance-remove"
2177
  HTYPE = constants.HTYPE_INSTANCE
2178
  _OP_REQP = ["instance_name"]
2179

    
2180
  def BuildHooksEnv(self):
2181
    """Build hooks env.
2182

2183
    This runs on master, primary and secondary nodes of the instance.
2184

2185
    """
2186
    env = _BuildInstanceHookEnvByObject(self.instance)
2187
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2188
          list(self.instance.secondary_nodes))
2189
    return env, nl, nl
2190

    
2191
  def CheckPrereq(self):
2192
    """Check prerequisites.
2193

2194
    This checks that the instance is in the cluster.
2195

2196
    """
2197
    instance = self.cfg.GetInstanceInfo(
2198
      self.cfg.ExpandInstanceName(self.op.instance_name))
2199
    if instance is None:
2200
      raise errors.OpPrereqError("Instance '%s' not known" %
2201
                                 self.op.instance_name)
2202
    self.instance = instance
2203

    
2204
  def Exec(self, feedback_fn):
2205
    """Remove the instance.
2206

2207
    """
2208
    instance = self.instance
2209
    logger.Info("shutting down instance %s on node %s" %
2210
                (instance.name, instance.primary_node))
2211

    
2212
    if not rpc.call_instance_shutdown(instance.primary_node, instance):
2213
      raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2214
                               (instance.name, instance.primary_node))
2215

    
2216
    logger.Info("removing block devices for instance %s" % instance.name)
2217

    
2218
    _RemoveDisks(instance, self.cfg)
2219

    
2220
    logger.Info("removing instance %s out of cluster config" % instance.name)
2221

    
2222
    self.cfg.RemoveInstance(instance.name)
2223

    
2224

    
2225
class LUQueryInstances(NoHooksLU):
2226
  """Logical unit for querying instances.
2227

2228
  """
2229
  _OP_REQP = ["output_fields", "names"]
2230

    
2231
  def CheckPrereq(self):
2232
    """Check prerequisites.
2233

2234
    This checks that the fields required are valid output fields.
2235

2236
    """
2237
    self.dynamic_fields = frozenset(["oper_state", "oper_ram"])
2238
    _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2239
                               "admin_state", "admin_ram",
2240
                               "disk_template", "ip", "mac", "bridge",
2241
                               "sda_size", "sdb_size"],
2242
                       dynamic=self.dynamic_fields,
2243
                       selected=self.op.output_fields)
2244

    
2245
    self.wanted = _GetWantedInstances(self, self.op.names)
2246

    
2247
  def Exec(self, feedback_fn):
2248
    """Computes the list of nodes and their attributes.
2249

2250
    """
2251
    instance_names = self.wanted
2252
    instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2253
                     in instance_names]
2254

    
2255
    # begin data gathering
2256

    
2257
    nodes = frozenset([inst.primary_node for inst in instance_list])
2258

    
2259
    bad_nodes = []
2260
    if self.dynamic_fields.intersection(self.op.output_fields):
2261
      live_data = {}
2262
      node_data = rpc.call_all_instances_info(nodes)
2263
      for name in nodes:
2264
        result = node_data[name]
2265
        if result:
2266
          live_data.update(result)
2267
        elif result == False:
2268
          bad_nodes.append(name)
2269
        # else no instance is alive
2270
    else:
2271
      live_data = dict([(name, {}) for name in instance_names])
2272

    
2273
    # end data gathering
2274

    
2275
    output = []
2276
    for instance in instance_list:
2277
      iout = []
2278
      for field in self.op.output_fields:
2279
        if field == "name":
2280
          val = instance.name
2281
        elif field == "os":
2282
          val = instance.os
2283
        elif field == "pnode":
2284
          val = instance.primary_node
2285
        elif field == "snodes":
2286
          val = list(instance.secondary_nodes)
2287
        elif field == "admin_state":
2288
          val = (instance.status != "down")
2289
        elif field == "oper_state":
2290
          if instance.primary_node in bad_nodes:
2291
            val = None
2292
          else:
2293
            val = bool(live_data.get(instance.name))
2294
        elif field == "admin_ram":
2295
          val = instance.memory
2296
        elif field == "oper_ram":
2297
          if instance.primary_node in bad_nodes:
2298
            val = None
2299
          elif instance.name in live_data:
2300
            val = live_data[instance.name].get("memory", "?")
2301
          else:
2302
            val = "-"
2303
        elif field == "disk_template":
2304
          val = instance.disk_template
2305
        elif field == "ip":
2306
          val = instance.nics[0].ip
2307
        elif field == "bridge":
2308
          val = instance.nics[0].bridge
2309
        elif field == "mac":
2310
          val = instance.nics[0].mac
2311
        elif field == "sda_size" or field == "sdb_size":
2312
          disk = instance.FindDisk(field[:3])
2313
          if disk is None:
2314
            val = None
2315
          else:
2316
            val = disk.size
2317
        else:
2318
          raise errors.ParameterError(field)
2319
        iout.append(val)
2320
      output.append(iout)
2321

    
2322
    return output
2323

    
2324

    
2325
class LUFailoverInstance(LogicalUnit):
2326
  """Failover an instance.
2327

2328
  """
2329
  HPATH = "instance-failover"
2330
  HTYPE = constants.HTYPE_INSTANCE
2331
  _OP_REQP = ["instance_name", "ignore_consistency"]
2332

    
2333
  def BuildHooksEnv(self):
2334
    """Build hooks env.
2335

2336
    This runs on master, primary and secondary nodes of the instance.
2337

2338
    """
2339
    env = {
2340
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2341
      }
2342
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2343
    nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2344
    return env, nl, nl
2345

    
2346
  def CheckPrereq(self):
2347
    """Check prerequisites.
2348

2349
    This checks that the instance is in the cluster.
2350

2351
    """
2352
    instance = self.cfg.GetInstanceInfo(
2353
      self.cfg.ExpandInstanceName(self.op.instance_name))
2354
    if instance is None:
2355
      raise errors.OpPrereqError("Instance '%s' not known" %
2356
                                 self.op.instance_name)
2357

    
2358
    if instance.disk_template != constants.DT_REMOTE_RAID1:
2359
      raise errors.OpPrereqError("Instance's disk layout is not"
2360
                                 " remote_raid1.")
2361

    
2362
    secondary_nodes = instance.secondary_nodes
2363
    if not secondary_nodes:
2364
      raise errors.ProgrammerError("no secondary node but using "
2365
                                   "DT_REMOTE_RAID1 template")
2366

    
2367
    # check memory requirements on the secondary node
2368
    target_node = secondary_nodes[0]
2369
    nodeinfo = rpc.call_node_info([target_node], self.cfg.GetVGName())
2370
    info = nodeinfo.get(target_node, None)
2371
    if not info:
2372
      raise errors.OpPrereqError("Cannot get current information"
2373
                                 " from node '%s'" % nodeinfo)
2374
    if instance.memory > info['memory_free']:
2375
      raise errors.OpPrereqError("Not enough memory on target node %s."
2376
                                 " %d MB available, %d MB required" %
2377
                                 (target_node, info['memory_free'],
2378
                                  instance.memory))
2379

    
2380
    # check bridge existance
2381
    brlist = [nic.bridge for nic in instance.nics]
2382
    if not rpc.call_bridges_exist(instance.primary_node, brlist):
2383
      raise errors.OpPrereqError("One or more target bridges %s does not"
2384
                                 " exist on destination node '%s'" %
2385
                                 (brlist, instance.primary_node))
2386

    
2387
    self.instance = instance
2388

    
2389
  def Exec(self, feedback_fn):
2390
    """Failover an instance.
2391

2392
    The failover is done by shutting it down on its present node and
2393
    starting it on the secondary.
2394

2395
    """
2396
    instance = self.instance
2397

    
2398
    source_node = instance.primary_node
2399
    target_node = instance.secondary_nodes[0]
2400

    
2401
    feedback_fn("* checking disk consistency between source and target")
2402
    for dev in instance.disks:
2403
      # for remote_raid1, these are md over drbd
2404
      if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2405
        if not self.op.ignore_consistency:
2406
          raise errors.OpExecError("Disk %s is degraded on target node,"
2407
                                   " aborting failover." % dev.iv_name)
2408

    
2409
    feedback_fn("* checking target node resource availability")
2410
    nodeinfo = rpc.call_node_info([target_node], self.cfg.GetVGName())
2411

    
2412
    if not nodeinfo:
2413
      raise errors.OpExecError("Could not contact target node %s." %
2414
                               target_node)
2415

    
2416
    free_memory = int(nodeinfo[target_node]['memory_free'])
2417
    memory = instance.memory
2418
    if memory > free_memory:
2419
      raise errors.OpExecError("Not enough memory to create instance %s on"
2420
                               " node %s. needed %s MiB, available %s MiB" %
2421
                               (instance.name, target_node, memory,
2422
                                free_memory))
2423

    
2424
    feedback_fn("* shutting down instance on source node")
2425
    logger.Info("Shutting down instance %s on node %s" %
2426
                (instance.name, source_node))
2427

    
2428
    if not rpc.call_instance_shutdown(source_node, instance):
2429
      logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2430
                   " anyway. Please make sure node %s is down"  %
2431
                   (instance.name, source_node, source_node))
2432

    
2433
    feedback_fn("* deactivating the instance's disks on source node")
2434
    if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2435
      raise errors.OpExecError("Can't shut down the instance's disks.")
2436

    
2437
    instance.primary_node = target_node
2438
    # distribute new instance config to the other nodes
2439
    self.cfg.AddInstance(instance)
2440

    
2441
    feedback_fn("* activating the instance's disks on target node")
2442
    logger.Info("Starting instance %s on node %s" %
2443
                (instance.name, target_node))
2444

    
2445
    disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2446
                                             ignore_secondaries=True)
2447
    if not disks_ok:
2448
      _ShutdownInstanceDisks(instance, self.cfg)
2449
      raise errors.OpExecError("Can't activate the instance's disks")
2450

    
2451
    feedback_fn("* starting the instance on the target node")
2452
    if not rpc.call_instance_start(target_node, instance, None):
2453
      _ShutdownInstanceDisks(instance, self.cfg)
2454
      raise errors.OpExecError("Could not start instance %s on node %s." %
2455
                               (instance.name, target_node))
2456

    
2457

    
2458
def _CreateBlockDevOnPrimary(cfg, node, device, info):
2459
  """Create a tree of block devices on the primary node.
2460

2461
  This always creates all devices.
2462

2463
  """
2464
  if device.children:
2465
    for child in device.children:
2466
      if not _CreateBlockDevOnPrimary(cfg, node, child, info):
2467
        return False
2468

    
2469
  cfg.SetDiskID(device, node)
2470
  new_id = rpc.call_blockdev_create(node, device, device.size, True, info)
2471
  if not new_id:
2472
    return False
2473
  if device.physical_id is None:
2474
    device.physical_id = new_id
2475
  return True
2476

    
2477

    
2478
def _CreateBlockDevOnSecondary(cfg, node, device, force, info):
2479
  """Create a tree of block devices on a secondary node.
2480

2481
  If this device type has to be created on secondaries, create it and
2482
  all its children.
2483

2484
  If not, just recurse to children keeping the same 'force' value.
2485

2486
  """
2487
  if device.CreateOnSecondary():
2488
    force = True
2489
  if device.children:
2490
    for child in device.children:
2491
      if not _CreateBlockDevOnSecondary(cfg, node, child, force, info):
2492
        return False
2493

    
2494
  if not force:
2495
    return True
2496
  cfg.SetDiskID(device, node)
2497
  new_id = rpc.call_blockdev_create(node, device, device.size, False, info)
2498
  if not new_id:
2499
    return False
2500
  if device.physical_id is None:
2501
    device.physical_id = new_id
2502
  return True
2503

    
2504

    
2505
def _GenerateUniqueNames(cfg, exts):
2506
  """Generate a suitable LV name.
2507

2508
  This will generate a logical volume name for the given instance.
2509

2510
  """
2511
  results = []
2512
  for val in exts:
2513
    new_id = cfg.GenerateUniqueID()
2514
    results.append("%s%s" % (new_id, val))
2515
  return results
2516

    
2517

    
2518
def _GenerateMDDRBDBranch(cfg, primary, secondary, size, names):
2519
  """Generate a drbd device complete with its children.
2520

2521
  """
2522
  port = cfg.AllocatePort()
2523
  vgname = cfg.GetVGName()
2524
  dev_data = objects.Disk(dev_type="lvm", size=size,
2525
                          logical_id=(vgname, names[0]))
2526
  dev_meta = objects.Disk(dev_type="lvm", size=128,
2527
                          logical_id=(vgname, names[1]))
2528
  drbd_dev = objects.Disk(dev_type="drbd", size=size,
2529
                          logical_id = (primary, secondary, port),
2530
                          children = [dev_data, dev_meta])
2531
  return drbd_dev
2532

    
2533

    
2534
def _GenerateDiskTemplate(cfg, template_name,
2535
                          instance_name, primary_node,
2536
                          secondary_nodes, disk_sz, swap_sz):
2537
  """Generate the entire disk layout for a given template type.
2538

2539
  """
2540
  #TODO: compute space requirements
2541

    
2542
  vgname = cfg.GetVGName()
2543
  if template_name == "diskless":
2544
    disks = []
2545
  elif template_name == "plain":
2546
    if len(secondary_nodes) != 0:
2547
      raise errors.ProgrammerError("Wrong template configuration")
2548

    
2549
    names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
2550
    sda_dev = objects.Disk(dev_type="lvm", size=disk_sz,
2551
                           logical_id=(vgname, names[0]),
2552
                           iv_name = "sda")
2553
    sdb_dev = objects.Disk(dev_type="lvm", size=swap_sz,
2554
                           logical_id=(vgname, names[1]),
2555
                           iv_name = "sdb")
2556
    disks = [sda_dev, sdb_dev]
2557
  elif template_name == "local_raid1":
2558
    if len(secondary_nodes) != 0:
2559
      raise errors.ProgrammerError("Wrong template configuration")
2560

    
2561

    
2562
    names = _GenerateUniqueNames(cfg, [".sda_m1", ".sda_m2",
2563
                                       ".sdb_m1", ".sdb_m2"])
2564
    sda_dev_m1 = objects.Disk(dev_type="lvm", size=disk_sz,
2565
                              logical_id=(vgname, names[0]))
2566
    sda_dev_m2 = objects.Disk(dev_type="lvm", size=disk_sz,
2567
                              logical_id=(vgname, names[1]))
2568
    md_sda_dev = objects.Disk(dev_type="md_raid1", iv_name = "sda",
2569
                              size=disk_sz,
2570
                              children = [sda_dev_m1, sda_dev_m2])
2571
    sdb_dev_m1 = objects.Disk(dev_type="lvm", size=swap_sz,
2572
                              logical_id=(vgname, names[2]))
2573
    sdb_dev_m2 = objects.Disk(dev_type="lvm", size=swap_sz,
2574
                              logical_id=(vgname, names[3]))
2575
    md_sdb_dev = objects.Disk(dev_type="md_raid1", iv_name = "sdb",
2576
                              size=swap_sz,
2577
                              children = [sdb_dev_m1, sdb_dev_m2])
2578
    disks = [md_sda_dev, md_sdb_dev]
2579
  elif template_name == constants.DT_REMOTE_RAID1:
2580
    if len(secondary_nodes) != 1:
2581
      raise errors.ProgrammerError("Wrong template configuration")
2582
    remote_node = secondary_nodes[0]
2583
    names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2584
                                       ".sdb_data", ".sdb_meta"])
2585
    drbd_sda_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2586
                                         disk_sz, names[0:2])
2587
    md_sda_dev = objects.Disk(dev_type="md_raid1", iv_name="sda",
2588
                              children = [drbd_sda_dev], size=disk_sz)
2589
    drbd_sdb_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2590
                                         swap_sz, names[2:4])
2591
    md_sdb_dev = objects.Disk(dev_type="md_raid1", iv_name="sdb",
2592
                              children = [drbd_sdb_dev], size=swap_sz)
2593
    disks = [md_sda_dev, md_sdb_dev]
2594
  else:
2595
    raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
2596
  return disks
2597

    
2598

    
2599
def _GetInstanceInfoText(instance):
2600
  """Compute that text that should be added to the disk's metadata.
2601

2602
  """
2603
  return "originstname+%s" % instance.name
2604

    
2605

    
2606
def _CreateDisks(cfg, instance):
2607
  """Create all disks for an instance.
2608

2609
  This abstracts away some work from AddInstance.
2610

2611
  Args:
2612
    instance: the instance object
2613

2614
  Returns:
2615
    True or False showing the success of the creation process
2616

2617
  """
2618
  info = _GetInstanceInfoText(instance)
2619

    
2620
  for device in instance.disks:
2621
    logger.Info("creating volume %s for instance %s" %
2622
              (device.iv_name, instance.name))
2623
    #HARDCODE
2624
    for secondary_node in instance.secondary_nodes:
2625
      if not _CreateBlockDevOnSecondary(cfg, secondary_node, device, False,
2626
                                        info):
2627
        logger.Error("failed to create volume %s (%s) on secondary node %s!" %
2628
                     (device.iv_name, device, secondary_node))
2629
        return False
2630
    #HARDCODE
2631
    if not _CreateBlockDevOnPrimary(cfg, instance.primary_node, device, info):
2632
      logger.Error("failed to create volume %s on primary!" %
2633
                   device.iv_name)
2634
      return False
2635
  return True
2636

    
2637

    
2638
def _RemoveDisks(instance, cfg):
2639
  """Remove all disks for an instance.
2640

2641
  This abstracts away some work from `AddInstance()` and
2642
  `RemoveInstance()`. Note that in case some of the devices couldn't
2643
  be remove, the removal will continue with the other ones (compare
2644
  with `_CreateDisks()`).
2645

2646
  Args:
2647
    instance: the instance object
2648

2649
  Returns:
2650
    True or False showing the success of the removal proces
2651

2652
  """
2653
  logger.Info("removing block devices for instance %s" % instance.name)
2654

    
2655
  result = True
2656
  for device in instance.disks:
2657
    for node, disk in device.ComputeNodeTree(instance.primary_node):
2658
      cfg.SetDiskID(disk, node)
2659
      if not rpc.call_blockdev_remove(node, disk):
2660
        logger.Error("could not remove block device %s on node %s,"
2661
                     " continuing anyway" %
2662
                     (device.iv_name, node))
2663
        result = False
2664
  return result
2665

    
2666

    
2667
class LUCreateInstance(LogicalUnit):
2668
  """Create an instance.
2669

2670
  """
2671
  HPATH = "instance-add"
2672
  HTYPE = constants.HTYPE_INSTANCE
2673
  _OP_REQP = ["instance_name", "mem_size", "disk_size", "pnode",
2674
              "disk_template", "swap_size", "mode", "start", "vcpus",
2675
              "wait_for_sync", "ip_check"]
2676

    
2677
  def BuildHooksEnv(self):
2678
    """Build hooks env.
2679

2680
    This runs on master, primary and secondary nodes of the instance.
2681

2682
    """
2683
    env = {
2684
      "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
2685
      "INSTANCE_DISK_SIZE": self.op.disk_size,
2686
      "INSTANCE_SWAP_SIZE": self.op.swap_size,
2687
      "INSTANCE_ADD_MODE": self.op.mode,
2688
      }
2689
    if self.op.mode == constants.INSTANCE_IMPORT:
2690
      env["INSTANCE_SRC_NODE"] = self.op.src_node
2691
      env["INSTANCE_SRC_PATH"] = self.op.src_path
2692
      env["INSTANCE_SRC_IMAGE"] = self.src_image
2693

    
2694
    env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
2695
      primary_node=self.op.pnode,
2696
      secondary_nodes=self.secondaries,
2697
      status=self.instance_status,
2698
      os_type=self.op.os_type,
2699
      memory=self.op.mem_size,
2700
      vcpus=self.op.vcpus,
2701
      nics=[(self.inst_ip, self.op.bridge)],
2702
    ))
2703

    
2704
    nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
2705
          self.secondaries)
2706
    return env, nl, nl
2707

    
2708

    
2709
  def CheckPrereq(self):
2710
    """Check prerequisites.
2711

2712
    """
2713
    if self.op.mode not in (constants.INSTANCE_CREATE,
2714
                            constants.INSTANCE_IMPORT):
2715
      raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
2716
                                 self.op.mode)
2717

    
2718
    if self.op.mode == constants.INSTANCE_IMPORT:
2719
      src_node = getattr(self.op, "src_node", None)
2720
      src_path = getattr(self.op, "src_path", None)
2721
      if src_node is None or src_path is None:
2722
        raise errors.OpPrereqError("Importing an instance requires source"
2723
                                   " node and path options")
2724
      src_node_full = self.cfg.ExpandNodeName(src_node)
2725
      if src_node_full is None:
2726
        raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
2727
      self.op.src_node = src_node = src_node_full
2728

    
2729
      if not os.path.isabs(src_path):
2730
        raise errors.OpPrereqError("The source path must be absolute")
2731

    
2732
      export_info = rpc.call_export_info(src_node, src_path)
2733

    
2734
      if not export_info:
2735
        raise errors.OpPrereqError("No export found in dir %s" % src_path)
2736

    
2737
      if not export_info.has_section(constants.INISECT_EXP):
2738
        raise errors.ProgrammerError("Corrupted export config")
2739

    
2740
      ei_version = export_info.get(constants.INISECT_EXP, 'version')
2741
      if (int(ei_version) != constants.EXPORT_VERSION):
2742
        raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
2743
                                   (ei_version, constants.EXPORT_VERSION))
2744

    
2745
      if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
2746
        raise errors.OpPrereqError("Can't import instance with more than"
2747
                                   " one data disk")
2748

    
2749
      # FIXME: are the old os-es, disk sizes, etc. useful?
2750
      self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
2751
      diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
2752
                                                         'disk0_dump'))
2753
      self.src_image = diskimage
2754
    else: # INSTANCE_CREATE
2755
      if getattr(self.op, "os_type", None) is None:
2756
        raise errors.OpPrereqError("No guest OS specified")
2757

    
2758
    # check primary node
2759
    pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
2760
    if pnode is None:
2761
      raise errors.OpPrereqError("Primary node '%s' is unknown" %
2762
                                 self.op.pnode)
2763
    self.op.pnode = pnode.name
2764
    self.pnode = pnode
2765
    self.secondaries = []
2766
    # disk template and mirror node verification
2767
    if self.op.disk_template not in constants.DISK_TEMPLATES:
2768
      raise errors.OpPrereqError("Invalid disk template name")
2769

    
2770
    if self.op.disk_template == constants.DT_REMOTE_RAID1:
2771
      if getattr(self.op, "snode", None) is None:
2772
        raise errors.OpPrereqError("The 'remote_raid1' disk template needs"
2773
                                   " a mirror node")
2774

    
2775
      snode_name = self.cfg.ExpandNodeName(self.op.snode)
2776
      if snode_name is None:
2777
        raise errors.OpPrereqError("Unknown secondary node '%s'" %
2778
                                   self.op.snode)
2779
      elif snode_name == pnode.name:
2780
        raise errors.OpPrereqError("The secondary node cannot be"
2781
                                   " the primary node.")
2782
      self.secondaries.append(snode_name)
2783

    
2784
    # Check lv size requirements
2785
    nodenames = [pnode.name] + self.secondaries
2786
    nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
2787

    
2788
    # Required free disk space as a function of disk and swap space
2789
    req_size_dict = {
2790
      constants.DT_DISKLESS: 0,
2791
      constants.DT_PLAIN: self.op.disk_size + self.op.swap_size,
2792
      constants.DT_LOCAL_RAID1: (self.op.disk_size + self.op.swap_size) * 2,
2793
      # 256 MB are added for drbd metadata, 128MB for each drbd device
2794
      constants.DT_REMOTE_RAID1: self.op.disk_size + self.op.swap_size + 256,
2795
    }
2796

    
2797
    if self.op.disk_template not in req_size_dict:
2798
      raise errors.ProgrammerError("Disk template '%s' size requirement"
2799
                                   " is unknown" %  self.op.disk_template)
2800

    
2801
    req_size = req_size_dict[self.op.disk_template]
2802

    
2803
    for node in nodenames:
2804
      info = nodeinfo.get(node, None)
2805
      if not info:
2806
        raise errors.OpPrereqError("Cannot get current information"
2807
                                   " from node '%s'" % nodeinfo)
2808
      if req_size > info['vg_free']:
2809
        raise errors.OpPrereqError("Not enough disk space on target node %s."
2810
                                   " %d MB available, %d MB required" %
2811
                                   (node, info['vg_free'], req_size))
2812

    
2813
    # os verification
2814
    os_obj = rpc.call_os_get([pnode.name], self.op.os_type)[pnode.name]
2815
    if not isinstance(os_obj, objects.OS):
2816
      raise errors.OpPrereqError("OS '%s' not in supported os list for"
2817
                                 " primary node"  % self.op.os_type)
2818

    
2819
    # instance verification
2820
    hostname1 = utils.HostInfo(self.op.instance_name)
2821

    
2822
    self.op.instance_name = instance_name = hostname1.name
2823
    instance_list = self.cfg.GetInstanceList()
2824
    if instance_name in instance_list:
2825
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2826
                                 instance_name)
2827

    
2828
    ip = getattr(self.op, "ip", None)
2829
    if ip is None or ip.lower() == "none":
2830
      inst_ip = None
2831
    elif ip.lower() == "auto":
2832
      inst_ip = hostname1.ip
2833
    else:
2834
      if not utils.IsValidIP(ip):
2835
        raise errors.OpPrereqError("given IP address '%s' doesn't look"
2836
                                   " like a valid IP" % ip)
2837
      inst_ip = ip
2838
    self.inst_ip = inst_ip
2839

    
2840
    if self.op.start and not self.op.ip_check:
2841
      raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
2842
                                 " adding an instance in start mode")
2843

    
2844
    if self.op.ip_check:
2845
      command = ["fping", "-q", hostname1.ip]
2846
      result = utils.RunCmd(command)
2847
      if not result.failed:
2848
        raise errors.OpPrereqError("IP address %s of instance %s already"
2849
                                   " in use" % (hostname1.ip, instance_name))
2850

    
2851
    # bridge verification
2852
    bridge = getattr(self.op, "bridge", None)
2853
    if bridge is None:
2854
      self.op.bridge = self.cfg.GetDefBridge()
2855
    else:
2856
      self.op.bridge = bridge
2857

    
2858
    if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
2859
      raise errors.OpPrereqError("target bridge '%s' does not exist on"
2860
                                 " destination node '%s'" %
2861
                                 (self.op.bridge, pnode.name))
2862

    
2863
    if self.op.start:
2864
      self.instance_status = 'up'
2865
    else:
2866
      self.instance_status = 'down'
2867

    
2868
  def Exec(self, feedback_fn):
2869
    """Create and add the instance to the cluster.
2870

2871
    """
2872
    instance = self.op.instance_name
2873
    pnode_name = self.pnode.name
2874

    
2875
    nic = objects.NIC(bridge=self.op.bridge, mac=self.cfg.GenerateMAC())
2876
    if self.inst_ip is not None:
2877
      nic.ip = self.inst_ip
2878

    
2879
    disks = _GenerateDiskTemplate(self.cfg,
2880
                                  self.op.disk_template,
2881
                                  instance, pnode_name,
2882
                                  self.secondaries, self.op.disk_size,
2883
                                  self.op.swap_size)
2884

    
2885
    iobj = objects.Instance(name=instance, os=self.op.os_type,
2886
                            primary_node=pnode_name,
2887
                            memory=self.op.mem_size,
2888
                            vcpus=self.op.vcpus,
2889
                            nics=[nic], disks=disks,
2890
                            disk_template=self.op.disk_template,
2891
                            status=self.instance_status,
2892
                            )
2893

    
2894
    feedback_fn("* creating instance disks...")
2895
    if not _CreateDisks(self.cfg, iobj):
2896
      _RemoveDisks(iobj, self.cfg)
2897
      raise errors.OpExecError("Device creation failed, reverting...")
2898

    
2899
    feedback_fn("adding instance %s to cluster config" % instance)
2900

    
2901
    self.cfg.AddInstance(iobj)
2902

    
2903
    if self.op.wait_for_sync:
2904
      disk_abort = not _WaitForSync(self.cfg, iobj)
2905
    elif iobj.disk_template == constants.DT_REMOTE_RAID1:
2906
      # make sure the disks are not degraded (still sync-ing is ok)
2907
      time.sleep(15)
2908
      feedback_fn("* checking mirrors status")
2909
      disk_abort = not _WaitForSync(self.cfg, iobj, oneshot=True)
2910
    else:
2911
      disk_abort = False
2912

    
2913
    if disk_abort:
2914
      _RemoveDisks(iobj, self.cfg)
2915
      self.cfg.RemoveInstance(iobj.name)
2916
      raise errors.OpExecError("There are some degraded disks for"
2917
                               " this instance")
2918

    
2919
    feedback_fn("creating os for instance %s on node %s" %
2920
                (instance, pnode_name))
2921

    
2922
    if iobj.disk_template != constants.DT_DISKLESS:
2923
      if self.op.mode == constants.INSTANCE_CREATE:
2924
        feedback_fn("* running the instance OS create scripts...")
2925
        if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
2926
          raise errors.OpExecError("could not add os for instance %s"
2927
                                   " on node %s" %
2928
                                   (instance, pnode_name))
2929

    
2930
      elif self.op.mode == constants.INSTANCE_IMPORT:
2931
        feedback_fn("* running the instance OS import scripts...")
2932
        src_node = self.op.src_node
2933
        src_image = self.src_image
2934
        if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
2935
                                                src_node, src_image):
2936
          raise errors.OpExecError("Could not import os for instance"
2937
                                   " %s on node %s" %
2938
                                   (instance, pnode_name))
2939
      else:
2940
        # also checked in the prereq part
2941
        raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
2942
                                     % self.op.mode)
2943

    
2944
    if self.op.start:
2945
      logger.Info("starting instance %s on node %s" % (instance, pnode_name))
2946
      feedback_fn("* starting instance...")
2947
      if not rpc.call_instance_start(pnode_name, iobj, None):
2948
        raise errors.OpExecError("Could not start instance")
2949

    
2950

    
2951
class LUConnectConsole(NoHooksLU):
2952
  """Connect to an instance's console.
2953

2954
  This is somewhat special in that it returns the command line that
2955
  you need to run on the master node in order to connect to the
2956
  console.
2957

2958
  """
2959
  _OP_REQP = ["instance_name"]
2960

    
2961
  def CheckPrereq(self):
2962
    """Check prerequisites.
2963

2964
    This checks that the instance is in the cluster.
2965

2966
    """
2967
    instance = self.cfg.GetInstanceInfo(
2968
      self.cfg.ExpandInstanceName(self.op.instance_name))
2969
    if instance is None:
2970
      raise errors.OpPrereqError("Instance '%s' not known" %
2971
                                 self.op.instance_name)
2972
    self.instance = instance
2973

    
2974
  def Exec(self, feedback_fn):
2975
    """Connect to the console of an instance
2976

2977
    """
2978
    instance = self.instance
2979
    node = instance.primary_node
2980

    
2981
    node_insts = rpc.call_instance_list([node])[node]
2982
    if node_insts is False:
2983
      raise errors.OpExecError("Can't connect to node %s." % node)
2984

    
2985
    if instance.name not in node_insts:
2986
      raise errors.OpExecError("Instance %s is not running." % instance.name)
2987

    
2988
    logger.Debug("connecting to console of %s on %s" % (instance.name, node))
2989

    
2990
    hyper = hypervisor.GetHypervisor()
2991
    console_cmd = hyper.GetShellCommandForConsole(instance.name)
2992
    # build ssh cmdline
2993
    argv = ["ssh", "-q", "-t"]
2994
    argv.extend(ssh.KNOWN_HOSTS_OPTS)
2995
    argv.extend(ssh.BATCH_MODE_OPTS)
2996
    argv.append(node)
2997
    argv.append(console_cmd)
2998
    return "ssh", argv
2999

    
3000

    
3001
class LUAddMDDRBDComponent(LogicalUnit):
3002
  """Adda new mirror member to an instance's disk.
3003

3004
  """
3005
  HPATH = "mirror-add"
3006
  HTYPE = constants.HTYPE_INSTANCE
3007
  _OP_REQP = ["instance_name", "remote_node", "disk_name"]
3008

    
3009
  def BuildHooksEnv(self):
3010
    """Build hooks env.
3011

3012
    This runs on the master, the primary and all the secondaries.
3013

3014
    """
3015
    env = {
3016
      "NEW_SECONDARY": self.op.remote_node,
3017
      "DISK_NAME": self.op.disk_name,
3018
      }
3019
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3020
    nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
3021
          self.op.remote_node,] + list(self.instance.secondary_nodes)
3022
    return env, nl, nl
3023

    
3024
  def CheckPrereq(self):
3025
    """Check prerequisites.
3026

3027
    This checks that the instance is in the cluster.
3028

3029
    """
3030
    instance = self.cfg.GetInstanceInfo(
3031
      self.cfg.ExpandInstanceName(self.op.instance_name))
3032
    if instance is None:
3033
      raise errors.OpPrereqError("Instance '%s' not known" %
3034
                                 self.op.instance_name)
3035
    self.instance = instance
3036

    
3037
    remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
3038
    if remote_node is None:
3039
      raise errors.OpPrereqError("Node '%s' not known" % self.op.remote_node)
3040
    self.remote_node = remote_node
3041

    
3042
    if remote_node == instance.primary_node:
3043
      raise errors.OpPrereqError("The specified node is the primary node of"
3044
                                 " the instance.")
3045

    
3046
    if instance.disk_template != constants.DT_REMOTE_RAID1:
3047
      raise errors.OpPrereqError("Instance's disk layout is not"
3048
                                 " remote_raid1.")
3049
    for disk in instance.disks:
3050
      if disk.iv_name == self.op.disk_name:
3051
        break
3052
    else:
3053
      raise errors.OpPrereqError("Can't find this device ('%s') in the"
3054
                                 " instance." % self.op.disk_name)
3055
    if len(disk.children) > 1:
3056
      raise errors.OpPrereqError("The device already has two slave"
3057
                                 " devices.\n"
3058
                                 "This would create a 3-disk raid1"
3059
                                 " which we don't allow.")
3060
    self.disk = disk
3061

    
3062
  def Exec(self, feedback_fn):
3063
    """Add the mirror component
3064

3065
    """
3066
    disk = self.disk
3067
    instance = self.instance
3068

    
3069
    remote_node = self.remote_node
3070
    lv_names = [".%s_%s" % (disk.iv_name, suf) for suf in ["data", "meta"]]
3071
    names = _GenerateUniqueNames(self.cfg, lv_names)
3072
    new_drbd = _GenerateMDDRBDBranch(self.cfg, instance.primary_node,
3073
                                     remote_node, disk.size, names)
3074

    
3075
    logger.Info("adding new mirror component on secondary")
3076
    #HARDCODE
3077
    if not _CreateBlockDevOnSecondary(self.cfg, remote_node, new_drbd, False,
3078
                                      _GetInstanceInfoText(instance)):
3079
      raise errors.OpExecError("Failed to create new component on secondary"
3080
                               " node %s" % remote_node)
3081

    
3082
    logger.Info("adding new mirror component on primary")
3083
    #HARDCODE
3084
    if not _CreateBlockDevOnPrimary(self.cfg, instance.primary_node, new_drbd,
3085
                                    _GetInstanceInfoText(instance)):
3086
      # remove secondary dev
3087
      self.cfg.SetDiskID(new_drbd, remote_node)
3088
      rpc.call_blockdev_remove(remote_node, new_drbd)
3089
      raise errors.OpExecError("Failed to create volume on primary")
3090

    
3091
    # the device exists now
3092
    # call the primary node to add the mirror to md
3093
    logger.Info("adding new mirror component to md")
3094
    if not rpc.call_blockdev_addchild(instance.primary_node,
3095
                                           disk, new_drbd):
3096
      logger.Error("Can't add mirror compoment to md!")
3097
      self.cfg.SetDiskID(new_drbd, remote_node)
3098
      if not rpc.call_blockdev_remove(remote_node, new_drbd):
3099
        logger.Error("Can't rollback on secondary")
3100
      self.cfg.SetDiskID(new_drbd, instance.primary_node)
3101
      if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3102
        logger.Error("Can't rollback on primary")
3103
      raise errors.OpExecError("Can't add mirror component to md array")
3104

    
3105
    disk.children.append(new_drbd)
3106

    
3107
    self.cfg.AddInstance(instance)
3108

    
3109
    _WaitForSync(self.cfg, instance)
3110

    
3111
    return 0
3112

    
3113

    
3114
class LURemoveMDDRBDComponent(LogicalUnit):
3115
  """Remove a component from a remote_raid1 disk.
3116

3117
  """
3118
  HPATH = "mirror-remove"
3119
  HTYPE = constants.HTYPE_INSTANCE
3120
  _OP_REQP = ["instance_name", "disk_name", "disk_id"]
3121

    
3122
  def BuildHooksEnv(self):
3123
    """Build hooks env.
3124

3125
    This runs on the master, the primary and all the secondaries.
3126

3127
    """
3128
    env = {
3129
      "DISK_NAME": self.op.disk_name,
3130
      "DISK_ID": self.op.disk_id,
3131
      "OLD_SECONDARY": self.old_secondary,
3132
      }
3133
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3134
    nl = [self.sstore.GetMasterNode(),
3135
          self.instance.primary_node] + list(self.instance.secondary_nodes)
3136
    return env, nl, nl
3137

    
3138
  def CheckPrereq(self):
3139
    """Check prerequisites.
3140

3141
    This checks that the instance is in the cluster.
3142

3143
    """
3144
    instance = self.cfg.GetInstanceInfo(
3145
      self.cfg.ExpandInstanceName(self.op.instance_name))
3146
    if instance is None:
3147
      raise errors.OpPrereqError("Instance '%s' not known" %
3148
                                 self.op.instance_name)
3149
    self.instance = instance
3150

    
3151
    if instance.disk_template != constants.DT_REMOTE_RAID1:
3152
      raise errors.OpPrereqError("Instance's disk layout is not"
3153
                                 " remote_raid1.")
3154
    for disk in instance.disks:
3155
      if disk.iv_name == self.op.disk_name:
3156
        break
3157
    else:
3158
      raise errors.OpPrereqError("Can't find this device ('%s') in the"
3159
                                 " instance." % self.op.disk_name)
3160
    for child in disk.children:
3161
      if child.dev_type == "drbd" and child.logical_id[2] == self.op.disk_id:
3162
        break
3163
    else:
3164
      raise errors.OpPrereqError("Can't find the device with this port.")
3165

    
3166
    if len(disk.children) < 2:
3167
      raise errors.OpPrereqError("Cannot remove the last component from"
3168
                                 " a mirror.")
3169
    self.disk = disk
3170
    self.child = child
3171
    if self.child.logical_id[0] == instance.primary_node:
3172
      oid = 1
3173
    else:
3174
      oid = 0
3175
    self.old_secondary = self.child.logical_id[oid]
3176

    
3177
  def Exec(self, feedback_fn):
3178
    """Remove the mirror component
3179

3180
    """
3181
    instance = self.instance
3182
    disk = self.disk
3183
    child = self.child
3184
    logger.Info("remove mirror component")
3185
    self.cfg.SetDiskID(disk, instance.primary_node)
3186
    if not rpc.call_blockdev_removechild(instance.primary_node,
3187
                                              disk, child):
3188
      raise errors.OpExecError("Can't remove child from mirror.")
3189

    
3190
    for node in child.logical_id[:2]:
3191
      self.cfg.SetDiskID(child, node)
3192
      if not rpc.call_blockdev_remove(node, child):
3193
        logger.Error("Warning: failed to remove device from node %s,"
3194
                     " continuing operation." % node)
3195

    
3196
    disk.children.remove(child)
3197
    self.cfg.AddInstance(instance)
3198

    
3199

    
3200
class LUReplaceDisks(LogicalUnit):
3201
  """Replace the disks of an instance.
3202

3203
  """
3204
  HPATH = "mirrors-replace"
3205
  HTYPE = constants.HTYPE_INSTANCE
3206
  _OP_REQP = ["instance_name"]
3207

    
3208
  def BuildHooksEnv(self):
3209
    """Build hooks env.
3210

3211
    This runs on the master, the primary and all the secondaries.
3212

3213
    """
3214
    env = {
3215
      "NEW_SECONDARY": self.op.remote_node,
3216
      "OLD_SECONDARY": self.instance.secondary_nodes[0],
3217
      }
3218
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3219
    nl = [self.sstore.GetMasterNode(),
3220
          self.instance.primary_node] + list(self.instance.secondary_nodes)
3221
    return env, nl, nl
3222

    
3223
  def CheckPrereq(self):
3224
    """Check prerequisites.
3225

3226
    This checks that the instance is in the cluster.
3227

3228
    """
3229
    instance = self.cfg.GetInstanceInfo(
3230
      self.cfg.ExpandInstanceName(self.op.instance_name))
3231
    if instance is None:
3232
      raise errors.OpPrereqError("Instance '%s' not known" %
3233
                                 self.op.instance_name)
3234
    self.instance = instance
3235

    
3236
    if instance.disk_template != constants.DT_REMOTE_RAID1:
3237
      raise errors.OpPrereqError("Instance's disk layout is not"
3238
                                 " remote_raid1.")
3239

    
3240
    if len(instance.secondary_nodes) != 1:
3241
      raise errors.OpPrereqError("The instance has a strange layout,"
3242
                                 " expected one secondary but found %d" %
3243
                                 len(instance.secondary_nodes))
3244

    
3245
    remote_node = getattr(self.op, "remote_node", None)
3246
    if remote_node is None:
3247
      remote_node = instance.secondary_nodes[0]
3248
    else:
3249
      remote_node = self.cfg.ExpandNodeName(remote_node)
3250
      if remote_node is None:
3251
        raise errors.OpPrereqError("Node '%s' not known" %
3252
                                   self.op.remote_node)
3253
    if remote_node == instance.primary_node:
3254
      raise errors.OpPrereqError("The specified node is the primary node of"
3255
                                 " the instance.")
3256
    self.op.remote_node = remote_node
3257

    
3258
  def Exec(self, feedback_fn):
3259
    """Replace the disks of an instance.
3260

3261
    """
3262
    instance = self.instance
3263
    iv_names = {}
3264
    # start of work
3265
    remote_node = self.op.remote_node
3266
    cfg = self.cfg
3267
    for dev in instance.disks:
3268
      size = dev.size
3269
      lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3270
      names = _GenerateUniqueNames(cfg, lv_names)
3271
      new_drbd = _GenerateMDDRBDBranch(cfg, instance.primary_node,
3272
                                       remote_node, size, names)
3273
      iv_names[dev.iv_name] = (dev, dev.children[0], new_drbd)
3274
      logger.Info("adding new mirror component on secondary for %s" %
3275
                  dev.iv_name)
3276
      #HARDCODE
3277
      if not _CreateBlockDevOnSecondary(cfg, remote_node, new_drbd, False,
3278
                                        _GetInstanceInfoText(instance)):
3279
        raise errors.OpExecError("Failed to create new component on"
3280
                                 " secondary node %s\n"
3281
                                 "Full abort, cleanup manually!" %
3282
                                 remote_node)
3283

    
3284
      logger.Info("adding new mirror component on primary")
3285
      #HARDCODE
3286
      if not _CreateBlockDevOnPrimary(cfg, instance.primary_node, new_drbd,
3287
                                      _GetInstanceInfoText(instance)):
3288
        # remove secondary dev
3289
        cfg.SetDiskID(new_drbd, remote_node)
3290
        rpc.call_blockdev_remove(remote_node, new_drbd)
3291
        raise errors.OpExecError("Failed to create volume on primary!\n"
3292
                                 "Full abort, cleanup manually!!")
3293

    
3294
      # the device exists now
3295
      # call the primary node to add the mirror to md
3296
      logger.Info("adding new mirror component to md")
3297
      if not rpc.call_blockdev_addchild(instance.primary_node, dev,
3298
                                        new_drbd):
3299
        logger.Error("Can't add mirror compoment to md!")
3300
        cfg.SetDiskID(new_drbd, remote_node)
3301
        if not rpc.call_blockdev_remove(remote_node, new_drbd):
3302
          logger.Error("Can't rollback on secondary")
3303
        cfg.SetDiskID(new_drbd, instance.primary_node)
3304
        if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3305
          logger.Error("Can't rollback on primary")
3306
        raise errors.OpExecError("Full abort, cleanup manually!!")
3307

    
3308
      dev.children.append(new_drbd)
3309
      cfg.AddInstance(instance)
3310

    
3311
    # this can fail as the old devices are degraded and _WaitForSync
3312
    # does a combined result over all disks, so we don't check its
3313
    # return value
3314
    _WaitForSync(cfg, instance, unlock=True)
3315

    
3316
    # so check manually all the devices
3317
    for name in iv_names:
3318
      dev, child, new_drbd = iv_names[name]
3319
      cfg.SetDiskID(dev, instance.primary_node)
3320
      is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3321
      if is_degr:
3322
        raise errors.OpExecError("MD device %s is degraded!" % name)
3323
      cfg.SetDiskID(new_drbd, instance.primary_node)
3324
      is_degr = rpc.call_blockdev_find(instance.primary_node, new_drbd)[5]
3325
      if is_degr:
3326
        raise errors.OpExecError("New drbd device %s is degraded!" % name)
3327

    
3328
    for name in iv_names:
3329
      dev, child, new_drbd = iv_names[name]
3330
      logger.Info("remove mirror %s component" % name)
3331
      cfg.SetDiskID(dev, instance.primary_node)
3332
      if not rpc.call_blockdev_removechild(instance.primary_node,
3333
                                                dev, child):
3334
        logger.Error("Can't remove child from mirror, aborting"
3335
                     " *this device cleanup*.\nYou need to cleanup manually!!")
3336
        continue
3337

    
3338
      for node in child.logical_id[:2]:
3339
        logger.Info("remove child device on %s" % node)
3340
        cfg.SetDiskID(child, node)
3341
        if not rpc.call_blockdev_remove(node, child):
3342
          logger.Error("Warning: failed to remove device from node %s,"
3343
                       " continuing operation." % node)
3344

    
3345
      dev.children.remove(child)
3346

    
3347
      cfg.AddInstance(instance)
3348

    
3349

    
3350
class LUQueryInstanceData(NoHooksLU):
3351
  """Query runtime instance data.
3352

3353
  """
3354
  _OP_REQP = ["instances"]
3355

    
3356
  def CheckPrereq(self):
3357
    """Check prerequisites.
3358

3359
    This only checks the optional instance list against the existing names.
3360

3361
    """
3362
    if not isinstance(self.op.instances, list):
3363
      raise errors.OpPrereqError("Invalid argument type 'instances'")
3364
    if self.op.instances:
3365
      self.wanted_instances = []
3366
      names = self.op.instances
3367
      for name in names:
3368
        instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
3369
        if instance is None:
3370
          raise errors.OpPrereqError("No such instance name '%s'" % name)
3371
      self.wanted_instances.append(instance)
3372
    else:
3373
      self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
3374
                               in self.cfg.GetInstanceList()]
3375
    return
3376

    
3377

    
3378
  def _ComputeDiskStatus(self, instance, snode, dev):
3379
    """Compute block device status.
3380

3381
    """
3382
    self.cfg.SetDiskID(dev, instance.primary_node)
3383
    dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
3384
    if dev.dev_type == "drbd":
3385
      # we change the snode then (otherwise we use the one passed in)
3386
      if dev.logical_id[0] == instance.primary_node:
3387
        snode = dev.logical_id[1]
3388
      else:
3389
        snode = dev.logical_id[0]
3390

    
3391
    if snode:
3392
      self.cfg.SetDiskID(dev, snode)
3393
      dev_sstatus = rpc.call_blockdev_find(snode, dev)
3394
    else:
3395
      dev_sstatus = None
3396

    
3397
    if dev.children:
3398
      dev_children = [self._ComputeDiskStatus(instance, snode, child)
3399
                      for child in dev.children]
3400
    else:
3401
      dev_children = []
3402

    
3403
    data = {
3404
      "iv_name": dev.iv_name,
3405
      "dev_type": dev.dev_type,
3406
      "logical_id": dev.logical_id,
3407
      "physical_id": dev.physical_id,
3408
      "pstatus": dev_pstatus,
3409
      "sstatus": dev_sstatus,
3410
      "children": dev_children,
3411
      }
3412

    
3413
    return data
3414

    
3415
  def Exec(self, feedback_fn):
3416
    """Gather and return data"""
3417
    result = {}
3418
    for instance in self.wanted_instances:
3419
      remote_info = rpc.call_instance_info(instance.primary_node,
3420
                                                instance.name)
3421
      if remote_info and "state" in remote_info:
3422
        remote_state = "up"
3423
      else:
3424
        remote_state = "down"
3425
      if instance.status == "down":
3426
        config_state = "down"
3427
      else:
3428
        config_state = "up"
3429

    
3430
      disks = [self._ComputeDiskStatus(instance, None, device)
3431
               for device in instance.disks]
3432

    
3433
      idict = {
3434
        "name": instance.name,
3435
        "config_state": config_state,
3436
        "run_state": remote_state,
3437
        "pnode": instance.primary_node,
3438
        "snodes": instance.secondary_nodes,
3439
        "os": instance.os,
3440
        "memory": instance.memory,
3441
        "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
3442
        "disks": disks,
3443
        }
3444

    
3445
      result[instance.name] = idict
3446

    
3447
    return result
3448

    
3449

    
3450
class LUSetInstanceParms(LogicalUnit):
3451
  """Modifies an instances's parameters.
3452

3453
  """
3454
  HPATH = "instance-modify"
3455
  HTYPE = constants.HTYPE_INSTANCE
3456
  _OP_REQP = ["instance_name"]
3457

    
3458
  def BuildHooksEnv(self):
3459
    """Build hooks env.
3460

3461
    This runs on the master, primary and secondaries.
3462

3463
    """
3464
    args = dict()
3465
    if self.mem:
3466
      args['memory'] = self.mem
3467
    if self.vcpus:
3468
      args['vcpus'] = self.vcpus
3469
    if self.do_ip or self.do_bridge:
3470
      if self.do_ip:
3471
        ip = self.ip
3472
      else:
3473
        ip = self.instance.nics[0].ip
3474
      if self.bridge:
3475
        bridge = self.bridge
3476
      else:
3477
        bridge = self.instance.nics[0].bridge
3478
      args['nics'] = [(ip, bridge)]
3479
    env = _BuildInstanceHookEnvByObject(self.instance, override=args)
3480
    nl = [self.sstore.GetMasterNode(),
3481
          self.instance.primary_node] + list(self.instance.secondary_nodes)
3482
    return env, nl, nl
3483

    
3484
  def CheckPrereq(self):
3485
    """Check prerequisites.
3486

3487
    This only checks the instance list against the existing names.
3488

3489
    """
3490
    self.mem = getattr(self.op, "mem", None)
3491
    self.vcpus = getattr(self.op, "vcpus", None)
3492
    self.ip = getattr(self.op, "ip", None)
3493
    self.bridge = getattr(self.op, "bridge", None)
3494
    if [self.mem, self.vcpus, self.ip, self.bridge].count(None) == 4:
3495
      raise errors.OpPrereqError("No changes submitted")
3496
    if self.mem is not None:
3497
      try:
3498
        self.mem = int(self.mem)
3499
      except ValueError, err:
3500
        raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
3501
    if self.vcpus is not None:
3502
      try:
3503
        self.vcpus = int(self.vcpus)
3504
      except ValueError, err:
3505
        raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
3506
    if self.ip is not None:
3507
      self.do_ip = True
3508
      if self.ip.lower() == "none":
3509
        self.ip = None
3510
      else:
3511
        if not utils.IsValidIP(self.ip):
3512
          raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
3513
    else:
3514
      self.do_ip = False
3515
    self.do_bridge = (self.bridge is not None)
3516

    
3517
    instance = self.cfg.GetInstanceInfo(
3518
      self.cfg.ExpandInstanceName(self.op.instance_name))
3519
    if instance is None:
3520
      raise errors.OpPrereqError("No such instance name '%s'" %
3521
                                 self.op.instance_name)
3522
    self.op.instance_name = instance.name
3523
    self.instance = instance
3524
    return
3525

    
3526
  def Exec(self, feedback_fn):
3527
    """Modifies an instance.
3528

3529
    All parameters take effect only at the next restart of the instance.
3530
    """
3531
    result = []
3532
    instance = self.instance
3533
    if self.mem:
3534
      instance.memory = self.mem
3535
      result.append(("mem", self.mem))
3536
    if self.vcpus:
3537
      instance.vcpus = self.vcpus
3538
      result.append(("vcpus",  self.vcpus))
3539
    if self.do_ip:
3540
      instance.nics[0].ip = self.ip
3541
      result.append(("ip", self.ip))
3542
    if self.bridge:
3543
      instance.nics[0].bridge = self.bridge
3544
      result.append(("bridge", self.bridge))
3545

    
3546
    self.cfg.AddInstance(instance)
3547

    
3548
    return result
3549

    
3550

    
3551
class LUQueryExports(NoHooksLU):
3552
  """Query the exports list
3553

3554
  """
3555
  _OP_REQP = []
3556

    
3557
  def CheckPrereq(self):
3558
    """Check that the nodelist contains only existing nodes.
3559

3560
    """
3561
    self.nodes = _GetWantedNodes(self, getattr(self.op, "nodes", None))
3562

    
3563
  def Exec(self, feedback_fn):
3564
    """Compute the list of all the exported system images.
3565

3566
    Returns:
3567
      a dictionary with the structure node->(export-list)
3568
      where export-list is a list of the instances exported on
3569
      that node.
3570

3571
    """
3572
    return rpc.call_export_list(self.nodes)
3573

    
3574

    
3575
class LUExportInstance(LogicalUnit):
3576
  """Export an instance to an image in the cluster.
3577

3578
  """
3579
  HPATH = "instance-export"
3580
  HTYPE = constants.HTYPE_INSTANCE
3581
  _OP_REQP = ["instance_name", "target_node", "shutdown"]
3582

    
3583
  def BuildHooksEnv(self):
3584
    """Build hooks env.
3585

3586
    This will run on the master, primary node and target node.
3587

3588
    """
3589
    env = {
3590
      "EXPORT_NODE": self.op.target_node,
3591
      "EXPORT_DO_SHUTDOWN": self.op.shutdown,
3592
      }
3593
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3594
    nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
3595
          self.op.target_node]
3596
    return env, nl, nl
3597

    
3598
  def CheckPrereq(self):
3599
    """Check prerequisites.
3600

3601
    This checks that the instance name is a valid one.
3602

3603
    """
3604
    instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
3605
    self.instance = self.cfg.GetInstanceInfo(instance_name)
3606
    if self.instance is None:
3607
      raise errors.OpPrereqError("Instance '%s' not found" %
3608
                                 self.op.instance_name)
3609

    
3610
    # node verification
3611
    dst_node_short = self.cfg.ExpandNodeName(self.op.target_node)
3612
    self.dst_node = self.cfg.GetNodeInfo(dst_node_short)
3613

    
3614
    if self.dst_node is None:
3615
      raise errors.OpPrereqError("Destination node '%s' is unknown." %
3616
                                 self.op.target_node)
3617
    self.op.target_node = self.dst_node.name
3618

    
3619
  def Exec(self, feedback_fn):
3620
    """Export an instance to an image in the cluster.
3621

3622
    """
3623
    instance = self.instance
3624
    dst_node = self.dst_node
3625
    src_node = instance.primary_node
3626
    # shutdown the instance, unless requested not to do so
3627
    if self.op.shutdown:
3628
      op = opcodes.OpShutdownInstance(instance_name=instance.name)
3629
      self.processor.ChainOpCode(op, feedback_fn)
3630

    
3631
    vgname = self.cfg.GetVGName()
3632

    
3633
    snap_disks = []
3634

    
3635
    try:
3636
      for disk in instance.disks:
3637
        if disk.iv_name == "sda":
3638
          # new_dev_name will be a snapshot of an lvm leaf of the one we passed
3639
          new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
3640

    
3641
          if not new_dev_name:
3642
            logger.Error("could not snapshot block device %s on node %s" %
3643
                         (disk.logical_id[1], src_node))
3644
          else:
3645
            new_dev = objects.Disk(dev_type="lvm", size=disk.size,
3646
                                      logical_id=(vgname, new_dev_name),
3647
                                      physical_id=(vgname, new_dev_name),
3648
                                      iv_name=disk.iv_name)
3649
            snap_disks.append(new_dev)
3650

    
3651
    finally:
3652
      if self.op.shutdown:
3653
        op = opcodes.OpStartupInstance(instance_name=instance.name,
3654
                                       force=False)
3655
        self.processor.ChainOpCode(op, feedback_fn)
3656

    
3657
    # TODO: check for size
3658

    
3659
    for dev in snap_disks:
3660
      if not rpc.call_snapshot_export(src_node, dev, dst_node.name,
3661
                                           instance):
3662
        logger.Error("could not export block device %s from node"
3663
                     " %s to node %s" %
3664
                     (dev.logical_id[1], src_node, dst_node.name))
3665
      if not rpc.call_blockdev_remove(src_node, dev):
3666
        logger.Error("could not remove snapshot block device %s from"
3667
                     " node %s" % (dev.logical_id[1], src_node))
3668

    
3669
    if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
3670
      logger.Error("could not finalize export for instance %s on node %s" %
3671
                   (instance.name, dst_node.name))
3672

    
3673
    nodelist = self.cfg.GetNodeList()
3674
    nodelist.remove(dst_node.name)
3675

    
3676
    # on one-node clusters nodelist will be empty after the removal
3677
    # if we proceed the backup would be removed because OpQueryExports
3678
    # substitutes an empty list with the full cluster node list.
3679
    if nodelist:
3680
      op = opcodes.OpQueryExports(nodes=nodelist)
3681
      exportlist = self.processor.ChainOpCode(op, feedback_fn)
3682
      for node in exportlist:
3683
        if instance.name in exportlist[node]:
3684
          if not rpc.call_export_remove(node, instance.name):
3685
            logger.Error("could not remove older export for instance %s"
3686
                         " on node %s" % (instance.name, node))
3687

    
3688

    
3689
class TagsLU(NoHooksLU):
3690
  """Generic tags LU.
3691

3692
  This is an abstract class which is the parent of all the other tags LUs.
3693

3694
  """
3695
  def CheckPrereq(self):
3696
    """Check prerequisites.
3697

3698
    """
3699
    if self.op.kind == constants.TAG_CLUSTER:
3700
      self.target = self.cfg.GetClusterInfo()
3701
    elif self.op.kind == constants.TAG_NODE:
3702
      name = self.cfg.ExpandNodeName(self.op.name)
3703
      if name is None:
3704
        raise errors.OpPrereqError("Invalid node name (%s)" %
3705
                                   (self.op.name,))
3706
      self.op.name = name
3707
      self.target = self.cfg.GetNodeInfo(name)
3708
    elif self.op.kind == constants.TAG_INSTANCE:
3709
      name = self.cfg.ExpandInstanceName(self.op.name)
3710
      if name is None:
3711
        raise errors.OpPrereqError("Invalid instance name (%s)" %
3712
                                   (self.op.name,))
3713
      self.op.name = name
3714
      self.target = self.cfg.GetInstanceInfo(name)
3715
    else:
3716
      raise errors.OpPrereqError("Wrong tag type requested (%s)" %
3717
                                 str(self.op.kind))
3718

    
3719

    
3720
class LUGetTags(TagsLU):
3721
  """Returns the tags of a given object.
3722

3723
  """
3724
  _OP_REQP = ["kind", "name"]
3725

    
3726
  def Exec(self, feedback_fn):
3727
    """Returns the tag list.
3728

3729
    """
3730
    return self.target.GetTags()
3731

    
3732

    
3733
class LUAddTags(TagsLU):
3734
  """Sets a tag on a given object.
3735

3736
  """
3737
  _OP_REQP = ["kind", "name", "tags"]
3738

    
3739
  def CheckPrereq(self):
3740
    """Check prerequisites.
3741

3742
    This checks the type and length of the tag name and value.
3743

3744
    """
3745
    TagsLU.CheckPrereq(self)
3746
    for tag in self.op.tags:
3747
      objects.TaggableObject.ValidateTag(tag)
3748

    
3749
  def Exec(self, feedback_fn):
3750
    """Sets the tag.
3751

3752
    """
3753
    try:
3754
      for tag in self.op.tags:
3755
        self.target.AddTag(tag)
3756
    except errors.TagError, err:
3757
      raise errors.OpExecError("Error while setting tag: %s" % str(err))
3758
    try:
3759
      self.cfg.Update(self.target)
3760
    except errors.ConfigurationError:
3761
      raise errors.OpRetryError("There has been a modification to the"
3762
                                " config file and the operation has been"
3763
                                " aborted. Please retry.")
3764

    
3765

    
3766
class LUDelTags(TagsLU):
3767
  """Delete a list of tags from a given object.
3768

3769
  """
3770
  _OP_REQP = ["kind", "name", "tags"]
3771

    
3772
  def CheckPrereq(self):
3773
    """Check prerequisites.
3774

3775
    This checks that we have the given tag.
3776

3777
    """
3778
    TagsLU.CheckPrereq(self)
3779
    for tag in self.op.tags:
3780
      objects.TaggableObject.ValidateTag(tag)
3781
    del_tags = frozenset(self.op.tags)
3782
    cur_tags = self.target.GetTags()
3783
    if not del_tags <= cur_tags:
3784
      diff_tags = del_tags - cur_tags
3785
      diff_names = ["'%s'" % tag for tag in diff_tags]
3786
      diff_names.sort()
3787
      raise errors.OpPrereqError("Tag(s) %s not found" %
3788
                                 (",".join(diff_names)))
3789

    
3790
  def Exec(self, feedback_fn):
3791
    """Remove the tag from the object.
3792

3793
    """
3794
    for tag in self.op.tags:
3795
      self.target.RemoveTag(tag)
3796
    try:
3797
      self.cfg.Update(self.target)
3798
    except errors.ConfigurationError:
3799
      raise errors.OpRetryError("There has been a modification to the"
3800
                                " config file and the operation has been"
3801
                                " aborted. Please retry.")