Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ 2f31098c

History | View | Annotate | Download (120.5 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0613,W0201
25

    
26
import os
27
import os.path
28
import sha
29
import socket
30
import time
31
import tempfile
32
import re
33
import platform
34

    
35
from ganeti import rpc
36
from ganeti import ssh
37
from ganeti import logger
38
from ganeti import utils
39
from ganeti import errors
40
from ganeti import hypervisor
41
from ganeti import config
42
from ganeti import constants
43
from ganeti import objects
44
from ganeti import opcodes
45
from ganeti import ssconf
46

    
47
class LogicalUnit(object):
48
  """Logical Unit base class.
49

50
  Subclasses must follow these rules:
51
    - implement CheckPrereq which also fills in the opcode instance
52
      with all the fields (even if as None)
53
    - implement Exec
54
    - implement BuildHooksEnv
55
    - redefine HPATH and HTYPE
56
    - optionally redefine their run requirements (REQ_CLUSTER,
57
      REQ_MASTER); note that all commands require root permissions
58

59
  """
60
  HPATH = None
61
  HTYPE = None
62
  _OP_REQP = []
63
  REQ_CLUSTER = True
64
  REQ_MASTER = True
65

    
66
  def __init__(self, processor, op, cfg, sstore):
67
    """Constructor for LogicalUnit.
68

69
    This needs to be overriden in derived classes in order to check op
70
    validity.
71

72
    """
73
    self.processor = processor
74
    self.op = op
75
    self.cfg = cfg
76
    self.sstore = sstore
77
    for attr_name in self._OP_REQP:
78
      attr_val = getattr(op, attr_name, None)
79
      if attr_val is None:
80
        raise errors.OpPrereqError("Required parameter '%s' missing" %
81
                                   attr_name)
82
    if self.REQ_CLUSTER:
83
      if not cfg.IsCluster():
84
        raise errors.OpPrereqError("Cluster not initialized yet,"
85
                                   " use 'gnt-cluster init' first.")
86
      if self.REQ_MASTER:
87
        master = sstore.GetMasterNode()
88
        if master != utils.HostInfo().name:
89
          raise errors.OpPrereqError("Commands must be run on the master"
90
                                     " node %s" % master)
91

    
92
  def CheckPrereq(self):
93
    """Check prerequisites for this LU.
94

95
    This method should check that the prerequisites for the execution
96
    of this LU are fulfilled. It can do internode communication, but
97
    it should be idempotent - no cluster or system changes are
98
    allowed.
99

100
    The method should raise errors.OpPrereqError in case something is
101
    not fulfilled. Its return value is ignored.
102

103
    This method should also update all the parameters of the opcode to
104
    their canonical form; e.g. a short node name must be fully
105
    expanded after this method has successfully completed (so that
106
    hooks, logging, etc. work correctly).
107

108
    """
109
    raise NotImplementedError
110

    
111
  def Exec(self, feedback_fn):
112
    """Execute the LU.
113

114
    This method should implement the actual work. It should raise
115
    errors.OpExecError for failures that are somewhat dealt with in
116
    code, or expected.
117

118
    """
119
    raise NotImplementedError
120

    
121
  def BuildHooksEnv(self):
122
    """Build hooks environment for this LU.
123

124
    This method should return a three-node tuple consisting of: a dict
125
    containing the environment that will be used for running the
126
    specific hook for this LU, a list of node names on which the hook
127
    should run before the execution, and a list of node names on which
128
    the hook should run after the execution.
129

130
    The keys of the dict must not have 'GANETI_' prefixed as this will
131
    be handled in the hooks runner. Also note additional keys will be
132
    added by the hooks runner. If the LU doesn't define any
133
    environment, an empty dict (and not None) should be returned.
134

135
    As for the node lists, the master should not be included in the
136
    them, as it will be added by the hooks runner in case this LU
137
    requires a cluster to run on (otherwise we don't have a node
138
    list). No nodes should be returned as an empty list (and not
139
    None).
140

141
    Note that if the HPATH for a LU class is None, this function will
142
    not be called.
143

144
    """
145
    raise NotImplementedError
146

    
147

    
148
class NoHooksLU(LogicalUnit):
149
  """Simple LU which runs no hooks.
150

151
  This LU is intended as a parent for other LogicalUnits which will
152
  run no hooks, in order to reduce duplicate code.
153

154
  """
155
  HPATH = None
156
  HTYPE = None
157

    
158
  def BuildHooksEnv(self):
159
    """Build hooks env.
160

161
    This is a no-op, since we don't run hooks.
162

163
    """
164
    return
165

    
166

    
167
def _GetWantedNodes(lu, nodes):
168
  """Returns list of checked and expanded node names.
169

170
  Args:
171
    nodes: List of nodes (strings) or None for all
172

173
  """
174
  if not isinstance(nodes, list):
175
    raise errors.OpPrereqError("Invalid argument type 'nodes'")
176

    
177
  if nodes:
178
    wanted = []
179

    
180
    for name in nodes:
181
      node = lu.cfg.ExpandNodeName(name)
182
      if node is None:
183
        raise errors.OpPrereqError("No such node name '%s'" % name)
184
      wanted.append(node)
185

    
186
  else:
187
    wanted = lu.cfg.GetNodeList()
188
  return utils.NiceSort(wanted)
189

    
190

    
191
def _GetWantedInstances(lu, instances):
192
  """Returns list of checked and expanded instance names.
193

194
  Args:
195
    instances: List of instances (strings) or None for all
196

197
  """
198
  if not isinstance(instances, list):
199
    raise errors.OpPrereqError("Invalid argument type 'instances'")
200

    
201
  if instances:
202
    wanted = []
203

    
204
    for name in instances:
205
      instance = lu.cfg.ExpandInstanceName(name)
206
      if instance is None:
207
        raise errors.OpPrereqError("No such instance name '%s'" % name)
208
      wanted.append(instance)
209

    
210
  else:
211
    wanted = lu.cfg.GetInstanceList()
212
  return utils.NiceSort(wanted)
213

    
214

    
215
def _CheckOutputFields(static, dynamic, selected):
216
  """Checks whether all selected fields are valid.
217

218
  Args:
219
    static: Static fields
220
    dynamic: Dynamic fields
221

222
  """
223
  static_fields = frozenset(static)
224
  dynamic_fields = frozenset(dynamic)
225

    
226
  all_fields = static_fields | dynamic_fields
227

    
228
  if not all_fields.issuperset(selected):
229
    raise errors.OpPrereqError("Unknown output fields selected: %s"
230
                               % ",".join(frozenset(selected).
231
                                          difference(all_fields)))
232

    
233

    
234
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
235
                          memory, vcpus, nics):
236
  """Builds instance related env variables for hooks from single variables.
237

238
  Args:
239
    secondary_nodes: List of secondary nodes as strings
240
  """
241
  env = {
242
    "INSTANCE_NAME": name,
243
    "INSTANCE_PRIMARY": primary_node,
244
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
245
    "INSTANCE_OS_TYPE": os_type,
246
    "INSTANCE_STATUS": status,
247
    "INSTANCE_MEMORY": memory,
248
    "INSTANCE_VCPUS": vcpus,
249
  }
250

    
251
  if nics:
252
    nic_count = len(nics)
253
    for idx, (ip, bridge) in enumerate(nics):
254
      if ip is None:
255
        ip = ""
256
      env["INSTANCE_NIC%d_IP" % idx] = ip
257
      env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
258
  else:
259
    nic_count = 0
260

    
261
  env["INSTANCE_NIC_COUNT"] = nic_count
262

    
263
  return env
264

    
265

    
266
def _BuildInstanceHookEnvByObject(instance, override=None):
267
  """Builds instance related env variables for hooks from an object.
268

269
  Args:
270
    instance: objects.Instance object of instance
271
    override: dict of values to override
272
  """
273
  args = {
274
    'name': instance.name,
275
    'primary_node': instance.primary_node,
276
    'secondary_nodes': instance.secondary_nodes,
277
    'os_type': instance.os,
278
    'status': instance.os,
279
    'memory': instance.memory,
280
    'vcpus': instance.vcpus,
281
    'nics': [(nic.ip, nic.bridge) for nic in instance.nics],
282
  }
283
  if override:
284
    args.update(override)
285
  return _BuildInstanceHookEnv(**args)
286

    
287

    
288
def _UpdateEtcHosts(fullnode, ip):
289
  """Ensure a node has a correct entry in /etc/hosts.
290

291
  Args:
292
    fullnode - Fully qualified domain name of host. (str)
293
    ip       - IPv4 address of host (str)
294

295
  """
296
  node = fullnode.split(".", 1)[0]
297

    
298
  f = open('/etc/hosts', 'r+')
299

    
300
  inthere = False
301

    
302
  save_lines = []
303
  add_lines = []
304
  removed = False
305

    
306
  while True:
307
    rawline = f.readline()
308

    
309
    if not rawline:
310
      # End of file
311
      break
312

    
313
    line = rawline.split('\n')[0]
314

    
315
    # Strip off comments
316
    line = line.split('#')[0]
317

    
318
    if not line:
319
      # Entire line was comment, skip
320
      save_lines.append(rawline)
321
      continue
322

    
323
    fields = line.split()
324

    
325
    haveall = True
326
    havesome = False
327
    for spec in [ ip, fullnode, node ]:
328
      if spec not in fields:
329
        haveall = False
330
      if spec in fields:
331
        havesome = True
332

    
333
    if haveall:
334
      inthere = True
335
      save_lines.append(rawline)
336
      continue
337

    
338
    if havesome and not haveall:
339
      # Line (old, or manual?) which is missing some.  Remove.
340
      removed = True
341
      continue
342

    
343
    save_lines.append(rawline)
344

    
345
  if not inthere:
346
    add_lines.append('%s\t%s %s\n' % (ip, fullnode, node))
347

    
348
  if removed:
349
    if add_lines:
350
      save_lines = save_lines + add_lines
351

    
352
    # We removed a line, write a new file and replace old.
353
    fd, tmpname = tempfile.mkstemp('tmp', 'hosts_', '/etc')
354
    newfile = os.fdopen(fd, 'w')
355
    newfile.write(''.join(save_lines))
356
    newfile.close()
357
    os.rename(tmpname, '/etc/hosts')
358

    
359
  elif add_lines:
360
    # Simply appending a new line will do the trick.
361
    f.seek(0, 2)
362
    for add in add_lines:
363
      f.write(add)
364

    
365
  f.close()
366

    
367

    
368
def _UpdateKnownHosts(fullnode, ip, pubkey):
369
  """Ensure a node has a correct known_hosts entry.
370

371
  Args:
372
    fullnode - Fully qualified domain name of host. (str)
373
    ip       - IPv4 address of host (str)
374
    pubkey   - the public key of the cluster
375

376
  """
377
  if os.path.exists(constants.SSH_KNOWN_HOSTS_FILE):
378
    f = open(constants.SSH_KNOWN_HOSTS_FILE, 'r+')
379
  else:
380
    f = open(constants.SSH_KNOWN_HOSTS_FILE, 'w+')
381

    
382
  inthere = False
383

    
384
  save_lines = []
385
  add_lines = []
386
  removed = False
387

    
388
  while True:
389
    rawline = f.readline()
390
    logger.Debug('read %s' % (repr(rawline),))
391

    
392
    if not rawline:
393
      # End of file
394
      break
395

    
396
    line = rawline.split('\n')[0]
397

    
398
    parts = line.split(' ')
399
    fields = parts[0].split(',')
400
    key = parts[2]
401

    
402
    haveall = True
403
    havesome = False
404
    for spec in [ ip, fullnode ]:
405
      if spec not in fields:
406
        haveall = False
407
      if spec in fields:
408
        havesome = True
409

    
410
    logger.Debug("key, pubkey = %s." % (repr((key, pubkey)),))
411
    if haveall and key == pubkey:
412
      inthere = True
413
      save_lines.append(rawline)
414
      logger.Debug("Keeping known_hosts '%s'." % (repr(rawline),))
415
      continue
416

    
417
    if havesome and (not haveall or key != pubkey):
418
      removed = True
419
      logger.Debug("Discarding known_hosts '%s'." % (repr(rawline),))
420
      continue
421

    
422
    save_lines.append(rawline)
423

    
424
  if not inthere:
425
    add_lines.append('%s,%s ssh-rsa %s\n' % (fullnode, ip, pubkey))
426
    logger.Debug("Adding known_hosts '%s'." % (repr(add_lines[-1]),))
427

    
428
  if removed:
429
    save_lines = save_lines + add_lines
430

    
431
    # Write a new file and replace old.
432
    fd, tmpname = tempfile.mkstemp('.tmp', 'known_hosts.',
433
                                   constants.DATA_DIR)
434
    newfile = os.fdopen(fd, 'w')
435
    try:
436
      newfile.write(''.join(save_lines))
437
    finally:
438
      newfile.close()
439
    logger.Debug("Wrote new known_hosts.")
440
    os.rename(tmpname, constants.SSH_KNOWN_HOSTS_FILE)
441

    
442
  elif add_lines:
443
    # Simply appending a new line will do the trick.
444
    f.seek(0, 2)
445
    for add in add_lines:
446
      f.write(add)
447

    
448
  f.close()
449

    
450

    
451
def _HasValidVG(vglist, vgname):
452
  """Checks if the volume group list is valid.
453

454
  A non-None return value means there's an error, and the return value
455
  is the error message.
456

457
  """
458
  vgsize = vglist.get(vgname, None)
459
  if vgsize is None:
460
    return "volume group '%s' missing" % vgname
461
  elif vgsize < 20480:
462
    return ("volume group '%s' too small (20480MiB required, %dMib found)" %
463
            (vgname, vgsize))
464
  return None
465

    
466

    
467
def _InitSSHSetup(node):
468
  """Setup the SSH configuration for the cluster.
469

470

471
  This generates a dsa keypair for root, adds the pub key to the
472
  permitted hosts and adds the hostkey to its own known hosts.
473

474
  Args:
475
    node: the name of this host as a fqdn
476

477
  """
478
  if os.path.exists('/root/.ssh/id_dsa'):
479
    utils.CreateBackup('/root/.ssh/id_dsa')
480
  if os.path.exists('/root/.ssh/id_dsa.pub'):
481
    utils.CreateBackup('/root/.ssh/id_dsa.pub')
482

    
483
  utils.RemoveFile('/root/.ssh/id_dsa')
484
  utils.RemoveFile('/root/.ssh/id_dsa.pub')
485

    
486
  result = utils.RunCmd(["ssh-keygen", "-t", "dsa",
487
                         "-f", "/root/.ssh/id_dsa",
488
                         "-q", "-N", ""])
489
  if result.failed:
490
    raise errors.OpExecError("Could not generate ssh keypair, error %s" %
491
                             result.output)
492

    
493
  f = open('/root/.ssh/id_dsa.pub', 'r')
494
  try:
495
    utils.AddAuthorizedKey('/root/.ssh/authorized_keys', f.read(8192))
496
  finally:
497
    f.close()
498

    
499

    
500
def _InitGanetiServerSetup(ss):
501
  """Setup the necessary configuration for the initial node daemon.
502

503
  This creates the nodepass file containing the shared password for
504
  the cluster and also generates the SSL certificate.
505

506
  """
507
  # Create pseudo random password
508
  randpass = sha.new(os.urandom(64)).hexdigest()
509
  # and write it into sstore
510
  ss.SetKey(ss.SS_NODED_PASS, randpass)
511

    
512
  result = utils.RunCmd(["openssl", "req", "-new", "-newkey", "rsa:1024",
513
                         "-days", str(365*5), "-nodes", "-x509",
514
                         "-keyout", constants.SSL_CERT_FILE,
515
                         "-out", constants.SSL_CERT_FILE, "-batch"])
516
  if result.failed:
517
    raise errors.OpExecError("could not generate server ssl cert, command"
518
                             " %s had exitcode %s and error message %s" %
519
                             (result.cmd, result.exit_code, result.output))
520

    
521
  os.chmod(constants.SSL_CERT_FILE, 0400)
522

    
523
  result = utils.RunCmd([constants.NODE_INITD_SCRIPT, "restart"])
524

    
525
  if result.failed:
526
    raise errors.OpExecError("Could not start the node daemon, command %s"
527
                             " had exitcode %s and error %s" %
528
                             (result.cmd, result.exit_code, result.output))
529

    
530

    
531
class LUInitCluster(LogicalUnit):
532
  """Initialise the cluster.
533

534
  """
535
  HPATH = "cluster-init"
536
  HTYPE = constants.HTYPE_CLUSTER
537
  _OP_REQP = ["cluster_name", "hypervisor_type", "vg_name", "mac_prefix",
538
              "def_bridge", "master_netdev"]
539
  REQ_CLUSTER = False
540

    
541
  def BuildHooksEnv(self):
542
    """Build hooks env.
543

544
    Notes: Since we don't require a cluster, we must manually add
545
    ourselves in the post-run node list.
546

547
    """
548
    env = {
549
      "CLUSTER": self.op.cluster_name,
550
      "MASTER": self.hostname.name,
551
      }
552
    return env, [], [self.hostname.name]
553

    
554
  def CheckPrereq(self):
555
    """Verify that the passed name is a valid one.
556

557
    """
558
    if config.ConfigWriter.IsCluster():
559
      raise errors.OpPrereqError("Cluster is already initialised")
560

    
561
    self.hostname = hostname = utils.HostInfo()
562

    
563
    if hostname.ip.startswith("127."):
564
      raise errors.OpPrereqError("This host's IP resolves to the private"
565
                                 " range (%s). Please fix DNS or /etc/hosts." %
566
                                 (hostname.ip,))
567

    
568
    self.clustername = clustername = utils.HostInfo(self.op.cluster_name)
569

    
570
    result = utils.RunCmd(["fping", "-S127.0.0.1", "-q", hostname.ip])
571
    if result.failed:
572
      raise errors.OpPrereqError("Inconsistency: this host's name resolves"
573
                                 " to %s,\nbut this ip address does not"
574
                                 " belong to this host."
575
                                 " Aborting." % hostname.ip)
576

    
577
    secondary_ip = getattr(self.op, "secondary_ip", None)
578
    if secondary_ip and not utils.IsValidIP(secondary_ip):
579
      raise errors.OpPrereqError("Invalid secondary ip given")
580
    if secondary_ip and secondary_ip != hostname.ip:
581
      result = utils.RunCmd(["fping", "-S127.0.0.1", "-q", secondary_ip])
582
      if result.failed:
583
        raise errors.OpPrereqError("You gave %s as secondary IP,\n"
584
                                   "but it does not belong to this host." %
585
                                   secondary_ip)
586
    self.secondary_ip = secondary_ip
587

    
588
    # checks presence of the volume group given
589
    vgstatus = _HasValidVG(utils.ListVolumeGroups(), self.op.vg_name)
590

    
591
    if vgstatus:
592
      raise errors.OpPrereqError("Error: %s" % vgstatus)
593

    
594
    if not re.match("^[0-9a-z]{2}:[0-9a-z]{2}:[0-9a-z]{2}$",
595
                    self.op.mac_prefix):
596
      raise errors.OpPrereqError("Invalid mac prefix given '%s'" %
597
                                 self.op.mac_prefix)
598

    
599
    if self.op.hypervisor_type not in hypervisor.VALID_HTYPES:
600
      raise errors.OpPrereqError("Invalid hypervisor type given '%s'" %
601
                                 self.op.hypervisor_type)
602

    
603
    result = utils.RunCmd(["ip", "link", "show", "dev", self.op.master_netdev])
604
    if result.failed:
605
      raise errors.OpPrereqError("Invalid master netdev given (%s): '%s'" %
606
                                 (self.op.master_netdev,
607
                                  result.output.strip()))
608

    
609
  def Exec(self, feedback_fn):
610
    """Initialize the cluster.
611

612
    """
613
    clustername = self.clustername
614
    hostname = self.hostname
615

    
616
    # set up the simple store
617
    ss = ssconf.SimpleStore()
618
    ss.SetKey(ss.SS_HYPERVISOR, self.op.hypervisor_type)
619
    ss.SetKey(ss.SS_MASTER_NODE, hostname.name)
620
    ss.SetKey(ss.SS_MASTER_IP, clustername.ip)
621
    ss.SetKey(ss.SS_MASTER_NETDEV, self.op.master_netdev)
622
    ss.SetKey(ss.SS_CLUSTER_NAME, clustername.name)
623

    
624
    # set up the inter-node password and certificate
625
    _InitGanetiServerSetup(ss)
626

    
627
    # start the master ip
628
    rpc.call_node_start_master(hostname.name)
629

    
630
    # set up ssh config and /etc/hosts
631
    f = open('/etc/ssh/ssh_host_rsa_key.pub', 'r')
632
    try:
633
      sshline = f.read()
634
    finally:
635
      f.close()
636
    sshkey = sshline.split(" ")[1]
637

    
638
    _UpdateEtcHosts(hostname.name, hostname.ip)
639

    
640
    _UpdateKnownHosts(hostname.name, hostname.ip, sshkey)
641

    
642
    _InitSSHSetup(hostname.name)
643

    
644
    # init of cluster config file
645
    cfgw = config.ConfigWriter()
646
    cfgw.InitConfig(hostname.name, hostname.ip, self.secondary_ip,
647
                    sshkey, self.op.mac_prefix,
648
                    self.op.vg_name, self.op.def_bridge)
649

    
650

    
651
class LUDestroyCluster(NoHooksLU):
652
  """Logical unit for destroying the cluster.
653

654
  """
655
  _OP_REQP = []
656

    
657
  def CheckPrereq(self):
658
    """Check prerequisites.
659

660
    This checks whether the cluster is empty.
661

662
    Any errors are signalled by raising errors.OpPrereqError.
663

664
    """
665
    master = self.sstore.GetMasterNode()
666

    
667
    nodelist = self.cfg.GetNodeList()
668
    if len(nodelist) != 1 or nodelist[0] != master:
669
      raise errors.OpPrereqError("There are still %d node(s) in"
670
                                 " this cluster." % (len(nodelist) - 1))
671
    instancelist = self.cfg.GetInstanceList()
672
    if instancelist:
673
      raise errors.OpPrereqError("There are still %d instance(s) in"
674
                                 " this cluster." % len(instancelist))
675

    
676
  def Exec(self, feedback_fn):
677
    """Destroys the cluster.
678

679
    """
680
    utils.CreateBackup('/root/.ssh/id_dsa')
681
    utils.CreateBackup('/root/.ssh/id_dsa.pub')
682
    rpc.call_node_leave_cluster(self.sstore.GetMasterNode())
683

    
684

    
685
class LUVerifyCluster(NoHooksLU):
686
  """Verifies the cluster status.
687

688
  """
689
  _OP_REQP = []
690

    
691
  def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
692
                  remote_version, feedback_fn):
693
    """Run multiple tests against a node.
694

695
    Test list:
696
      - compares ganeti version
697
      - checks vg existance and size > 20G
698
      - checks config file checksum
699
      - checks ssh to other nodes
700

701
    Args:
702
      node: name of the node to check
703
      file_list: required list of files
704
      local_cksum: dictionary of local files and their checksums
705

706
    """
707
    # compares ganeti version
708
    local_version = constants.PROTOCOL_VERSION
709
    if not remote_version:
710
      feedback_fn(" - ERROR: connection to %s failed" % (node))
711
      return True
712

    
713
    if local_version != remote_version:
714
      feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
715
                      (local_version, node, remote_version))
716
      return True
717

    
718
    # checks vg existance and size > 20G
719

    
720
    bad = False
721
    if not vglist:
722
      feedback_fn("  - ERROR: unable to check volume groups on node %s." %
723
                      (node,))
724
      bad = True
725
    else:
726
      vgstatus = _HasValidVG(vglist, self.cfg.GetVGName())
727
      if vgstatus:
728
        feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
729
        bad = True
730

    
731
    # checks config file checksum
732
    # checks ssh to any
733

    
734
    if 'filelist' not in node_result:
735
      bad = True
736
      feedback_fn("  - ERROR: node hasn't returned file checksum data")
737
    else:
738
      remote_cksum = node_result['filelist']
739
      for file_name in file_list:
740
        if file_name not in remote_cksum:
741
          bad = True
742
          feedback_fn("  - ERROR: file '%s' missing" % file_name)
743
        elif remote_cksum[file_name] != local_cksum[file_name]:
744
          bad = True
745
          feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
746

    
747
    if 'nodelist' not in node_result:
748
      bad = True
749
      feedback_fn("  - ERROR: node hasn't returned node connectivity data")
750
    else:
751
      if node_result['nodelist']:
752
        bad = True
753
        for node in node_result['nodelist']:
754
          feedback_fn("  - ERROR: communication with node '%s': %s" %
755
                          (node, node_result['nodelist'][node]))
756
    hyp_result = node_result.get('hypervisor', None)
757
    if hyp_result is not None:
758
      feedback_fn("  - ERROR: hypervisor verify failure: '%s'" % hyp_result)
759
    return bad
760

    
761
  def _VerifyInstance(self, instance, node_vol_is, node_instance, feedback_fn):
762
    """Verify an instance.
763

764
    This function checks to see if the required block devices are
765
    available on the instance's node.
766

767
    """
768
    bad = False
769

    
770
    instancelist = self.cfg.GetInstanceList()
771
    if not instance in instancelist:
772
      feedback_fn("  - ERROR: instance %s not in instance list %s" %
773
                      (instance, instancelist))
774
      bad = True
775

    
776
    instanceconfig = self.cfg.GetInstanceInfo(instance)
777
    node_current = instanceconfig.primary_node
778

    
779
    node_vol_should = {}
780
    instanceconfig.MapLVsByNode(node_vol_should)
781

    
782
    for node in node_vol_should:
783
      for volume in node_vol_should[node]:
784
        if node not in node_vol_is or volume not in node_vol_is[node]:
785
          feedback_fn("  - ERROR: volume %s missing on node %s" %
786
                          (volume, node))
787
          bad = True
788

    
789
    if not instanceconfig.status == 'down':
790
      if not instance in node_instance[node_current]:
791
        feedback_fn("  - ERROR: instance %s not running on node %s" %
792
                        (instance, node_current))
793
        bad = True
794

    
795
    for node in node_instance:
796
      if (not node == node_current):
797
        if instance in node_instance[node]:
798
          feedback_fn("  - ERROR: instance %s should not run on node %s" %
799
                          (instance, node))
800
          bad = True
801

    
802
    return not bad
803

    
804
  def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
805
    """Verify if there are any unknown volumes in the cluster.
806

807
    The .os, .swap and backup volumes are ignored. All other volumes are
808
    reported as unknown.
809

810
    """
811
    bad = False
812

    
813
    for node in node_vol_is:
814
      for volume in node_vol_is[node]:
815
        if node not in node_vol_should or volume not in node_vol_should[node]:
816
          feedback_fn("  - ERROR: volume %s on node %s should not exist" %
817
                      (volume, node))
818
          bad = True
819
    return bad
820

    
821
  def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
822
    """Verify the list of running instances.
823

824
    This checks what instances are running but unknown to the cluster.
825

826
    """
827
    bad = False
828
    for node in node_instance:
829
      for runninginstance in node_instance[node]:
830
        if runninginstance not in instancelist:
831
          feedback_fn("  - ERROR: instance %s on node %s should not exist" %
832
                          (runninginstance, node))
833
          bad = True
834
    return bad
835

    
836
  def CheckPrereq(self):
837
    """Check prerequisites.
838

839
    This has no prerequisites.
840

841
    """
842
    pass
843

    
844
  def Exec(self, feedback_fn):
845
    """Verify integrity of cluster, performing various test on nodes.
846

847
    """
848
    bad = False
849
    feedback_fn("* Verifying global settings")
850
    self.cfg.VerifyConfig()
851

    
852
    master = self.sstore.GetMasterNode()
853
    vg_name = self.cfg.GetVGName()
854
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
855
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
856
    node_volume = {}
857
    node_instance = {}
858

    
859
    # FIXME: verify OS list
860
    # do local checksums
861
    file_names = list(self.sstore.GetFileList())
862
    file_names.append(constants.SSL_CERT_FILE)
863
    file_names.append(constants.CLUSTER_CONF_FILE)
864
    local_checksums = utils.FingerprintFiles(file_names)
865

    
866
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
867
    all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
868
    all_instanceinfo = rpc.call_instance_list(nodelist)
869
    all_vglist = rpc.call_vg_list(nodelist)
870
    node_verify_param = {
871
      'filelist': file_names,
872
      'nodelist': nodelist,
873
      'hypervisor': None,
874
      }
875
    all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
876
    all_rversion = rpc.call_version(nodelist)
877

    
878
    for node in nodelist:
879
      feedback_fn("* Verifying node %s" % node)
880
      result = self._VerifyNode(node, file_names, local_checksums,
881
                                all_vglist[node], all_nvinfo[node],
882
                                all_rversion[node], feedback_fn)
883
      bad = bad or result
884

    
885
      # node_volume
886
      volumeinfo = all_volumeinfo[node]
887

    
888
      if type(volumeinfo) != dict:
889
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
890
        bad = True
891
        continue
892

    
893
      node_volume[node] = volumeinfo
894

    
895
      # node_instance
896
      nodeinstance = all_instanceinfo[node]
897
      if type(nodeinstance) != list:
898
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
899
        bad = True
900
        continue
901

    
902
      node_instance[node] = nodeinstance
903

    
904
    node_vol_should = {}
905

    
906
    for instance in instancelist:
907
      feedback_fn("* Verifying instance %s" % instance)
908
      result =  self._VerifyInstance(instance, node_volume, node_instance,
909
                                     feedback_fn)
910
      bad = bad or result
911

    
912
      inst_config = self.cfg.GetInstanceInfo(instance)
913

    
914
      inst_config.MapLVsByNode(node_vol_should)
915

    
916
    feedback_fn("* Verifying orphan volumes")
917
    result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
918
                                       feedback_fn)
919
    bad = bad or result
920

    
921
    feedback_fn("* Verifying remaining instances")
922
    result = self._VerifyOrphanInstances(instancelist, node_instance,
923
                                         feedback_fn)
924
    bad = bad or result
925

    
926
    return int(bad)
927

    
928

    
929
class LURenameCluster(LogicalUnit):
930
  """Rename the cluster.
931

932
  """
933
  HPATH = "cluster-rename"
934
  HTYPE = constants.HTYPE_CLUSTER
935
  _OP_REQP = ["name"]
936

    
937
  def BuildHooksEnv(self):
938
    """Build hooks env.
939

940
    """
941
    env = {
942
      "NEW_NAME": self.op.name,
943
      }
944
    mn = self.sstore.GetMasterNode()
945
    return env, [mn], [mn]
946

    
947
  def CheckPrereq(self):
948
    """Verify that the passed name is a valid one.
949

950
    """
951
    hostname = utils.HostInfo(self.op.name)
952

    
953
    new_name = hostname.name
954
    self.ip = new_ip = hostname.ip
955
    old_name = self.sstore.GetClusterName()
956
    old_ip = self.sstore.GetMasterIP()
957
    if new_name == old_name and new_ip == old_ip:
958
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
959
                                 " cluster has changed")
960
    if new_ip != old_ip:
961
      result = utils.RunCmd(["fping", "-q", new_ip])
962
      if not result.failed:
963
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
964
                                   " reachable on the network. Aborting." %
965
                                   new_ip)
966

    
967
    self.op.name = new_name
968

    
969
  def Exec(self, feedback_fn):
970
    """Rename the cluster.
971

972
    """
973
    clustername = self.op.name
974
    ip = self.ip
975
    ss = self.sstore
976

    
977
    # shutdown the master IP
978
    master = ss.GetMasterNode()
979
    if not rpc.call_node_stop_master(master):
980
      raise errors.OpExecError("Could not disable the master role")
981

    
982
    try:
983
      # modify the sstore
984
      ss.SetKey(ss.SS_MASTER_IP, ip)
985
      ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
986

    
987
      # Distribute updated ss config to all nodes
988
      myself = self.cfg.GetNodeInfo(master)
989
      dist_nodes = self.cfg.GetNodeList()
990
      if myself.name in dist_nodes:
991
        dist_nodes.remove(myself.name)
992

    
993
      logger.Debug("Copying updated ssconf data to all nodes")
994
      for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
995
        fname = ss.KeyToFilename(keyname)
996
        result = rpc.call_upload_file(dist_nodes, fname)
997
        for to_node in dist_nodes:
998
          if not result[to_node]:
999
            logger.Error("copy of file %s to node %s failed" %
1000
                         (fname, to_node))
1001
    finally:
1002
      if not rpc.call_node_start_master(master):
1003
        logger.Error("Could not re-enable the master role on the master,\n"
1004
                     "please restart manually.")
1005

    
1006

    
1007
def _WaitForSync(cfgw, instance, oneshot=False, unlock=False):
1008
  """Sleep and poll for an instance's disk to sync.
1009

1010
  """
1011
  if not instance.disks:
1012
    return True
1013

    
1014
  if not oneshot:
1015
    logger.ToStdout("Waiting for instance %s to sync disks." % instance.name)
1016

    
1017
  node = instance.primary_node
1018

    
1019
  for dev in instance.disks:
1020
    cfgw.SetDiskID(dev, node)
1021

    
1022
  retries = 0
1023
  while True:
1024
    max_time = 0
1025
    done = True
1026
    cumul_degraded = False
1027
    rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1028
    if not rstats:
1029
      logger.ToStderr("Can't get any data from node %s" % node)
1030
      retries += 1
1031
      if retries >= 10:
1032
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1033
                                 " aborting." % node)
1034
      time.sleep(6)
1035
      continue
1036
    retries = 0
1037
    for i in range(len(rstats)):
1038
      mstat = rstats[i]
1039
      if mstat is None:
1040
        logger.ToStderr("Can't compute data for node %s/%s" %
1041
                        (node, instance.disks[i].iv_name))
1042
        continue
1043
      perc_done, est_time, is_degraded = mstat
1044
      cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1045
      if perc_done is not None:
1046
        done = False
1047
        if est_time is not None:
1048
          rem_time = "%d estimated seconds remaining" % est_time
1049
          max_time = est_time
1050
        else:
1051
          rem_time = "no time estimate"
1052
        logger.ToStdout("- device %s: %5.2f%% done, %s" %
1053
                        (instance.disks[i].iv_name, perc_done, rem_time))
1054
    if done or oneshot:
1055
      break
1056

    
1057
    if unlock:
1058
      utils.Unlock('cmd')
1059
    try:
1060
      time.sleep(min(60, max_time))
1061
    finally:
1062
      if unlock:
1063
        utils.Lock('cmd')
1064

    
1065
  if done:
1066
    logger.ToStdout("Instance %s's disks are in sync." % instance.name)
1067
  return not cumul_degraded
1068

    
1069

    
1070
def _CheckDiskConsistency(cfgw, dev, node, on_primary):
1071
  """Check that mirrors are not degraded.
1072

1073
  """
1074
  cfgw.SetDiskID(dev, node)
1075

    
1076
  result = True
1077
  if on_primary or dev.AssembleOnSecondary():
1078
    rstats = rpc.call_blockdev_find(node, dev)
1079
    if not rstats:
1080
      logger.ToStderr("Can't get any data from node %s" % node)
1081
      result = False
1082
    else:
1083
      result = result and (not rstats[5])
1084
  if dev.children:
1085
    for child in dev.children:
1086
      result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1087

    
1088
  return result
1089

    
1090

    
1091
class LUDiagnoseOS(NoHooksLU):
1092
  """Logical unit for OS diagnose/query.
1093

1094
  """
1095
  _OP_REQP = []
1096

    
1097
  def CheckPrereq(self):
1098
    """Check prerequisites.
1099

1100
    This always succeeds, since this is a pure query LU.
1101

1102
    """
1103
    return
1104

    
1105
  def Exec(self, feedback_fn):
1106
    """Compute the list of OSes.
1107

1108
    """
1109
    node_list = self.cfg.GetNodeList()
1110
    node_data = rpc.call_os_diagnose(node_list)
1111
    if node_data == False:
1112
      raise errors.OpExecError("Can't gather the list of OSes")
1113
    return node_data
1114

    
1115

    
1116
class LURemoveNode(LogicalUnit):
1117
  """Logical unit for removing a node.
1118

1119
  """
1120
  HPATH = "node-remove"
1121
  HTYPE = constants.HTYPE_NODE
1122
  _OP_REQP = ["node_name"]
1123

    
1124
  def BuildHooksEnv(self):
1125
    """Build hooks env.
1126

1127
    This doesn't run on the target node in the pre phase as a failed
1128
    node would not allows itself to run.
1129

1130
    """
1131
    env = {
1132
      "NODE_NAME": self.op.node_name,
1133
      }
1134
    all_nodes = self.cfg.GetNodeList()
1135
    all_nodes.remove(self.op.node_name)
1136
    return env, all_nodes, all_nodes
1137

    
1138
  def CheckPrereq(self):
1139
    """Check prerequisites.
1140

1141
    This checks:
1142
     - the node exists in the configuration
1143
     - it does not have primary or secondary instances
1144
     - it's not the master
1145

1146
    Any errors are signalled by raising errors.OpPrereqError.
1147

1148
    """
1149
    node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1150
    if node is None:
1151
      raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1152

    
1153
    instance_list = self.cfg.GetInstanceList()
1154

    
1155
    masternode = self.sstore.GetMasterNode()
1156
    if node.name == masternode:
1157
      raise errors.OpPrereqError("Node is the master node,"
1158
                                 " you need to failover first.")
1159

    
1160
    for instance_name in instance_list:
1161
      instance = self.cfg.GetInstanceInfo(instance_name)
1162
      if node.name == instance.primary_node:
1163
        raise errors.OpPrereqError("Instance %s still running on the node,"
1164
                                   " please remove first." % instance_name)
1165
      if node.name in instance.secondary_nodes:
1166
        raise errors.OpPrereqError("Instance %s has node as a secondary,"
1167
                                   " please remove first." % instance_name)
1168
    self.op.node_name = node.name
1169
    self.node = node
1170

    
1171
  def Exec(self, feedback_fn):
1172
    """Removes the node from the cluster.
1173

1174
    """
1175
    node = self.node
1176
    logger.Info("stopping the node daemon and removing configs from node %s" %
1177
                node.name)
1178

    
1179
    rpc.call_node_leave_cluster(node.name)
1180

    
1181
    ssh.SSHCall(node.name, 'root', "%s stop" % constants.NODE_INITD_SCRIPT)
1182

    
1183
    logger.Info("Removing node %s from config" % node.name)
1184

    
1185
    self.cfg.RemoveNode(node.name)
1186

    
1187

    
1188
class LUQueryNodes(NoHooksLU):
1189
  """Logical unit for querying nodes.
1190

1191
  """
1192
  _OP_REQP = ["output_fields", "names"]
1193

    
1194
  def CheckPrereq(self):
1195
    """Check prerequisites.
1196

1197
    This checks that the fields required are valid output fields.
1198

1199
    """
1200
    self.dynamic_fields = frozenset(["dtotal", "dfree",
1201
                                     "mtotal", "mnode", "mfree",
1202
                                     "bootid"])
1203

    
1204
    _CheckOutputFields(static=["name", "pinst_cnt", "sinst_cnt",
1205
                               "pinst_list", "sinst_list",
1206
                               "pip", "sip"],
1207
                       dynamic=self.dynamic_fields,
1208
                       selected=self.op.output_fields)
1209

    
1210
    self.wanted = _GetWantedNodes(self, self.op.names)
1211

    
1212
  def Exec(self, feedback_fn):
1213
    """Computes the list of nodes and their attributes.
1214

1215
    """
1216
    nodenames = self.wanted
1217
    nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1218

    
1219
    # begin data gathering
1220

    
1221
    if self.dynamic_fields.intersection(self.op.output_fields):
1222
      live_data = {}
1223
      node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1224
      for name in nodenames:
1225
        nodeinfo = node_data.get(name, None)
1226
        if nodeinfo:
1227
          live_data[name] = {
1228
            "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1229
            "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1230
            "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1231
            "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1232
            "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1233
            "bootid": nodeinfo['bootid'],
1234
            }
1235
        else:
1236
          live_data[name] = {}
1237
    else:
1238
      live_data = dict.fromkeys(nodenames, {})
1239

    
1240
    node_to_primary = dict([(name, set()) for name in nodenames])
1241
    node_to_secondary = dict([(name, set()) for name in nodenames])
1242

    
1243
    inst_fields = frozenset(("pinst_cnt", "pinst_list",
1244
                             "sinst_cnt", "sinst_list"))
1245
    if inst_fields & frozenset(self.op.output_fields):
1246
      instancelist = self.cfg.GetInstanceList()
1247

    
1248
      for instance_name in instancelist:
1249
        inst = self.cfg.GetInstanceInfo(instance_name)
1250
        if inst.primary_node in node_to_primary:
1251
          node_to_primary[inst.primary_node].add(inst.name)
1252
        for secnode in inst.secondary_nodes:
1253
          if secnode in node_to_secondary:
1254
            node_to_secondary[secnode].add(inst.name)
1255

    
1256
    # end data gathering
1257

    
1258
    output = []
1259
    for node in nodelist:
1260
      node_output = []
1261
      for field in self.op.output_fields:
1262
        if field == "name":
1263
          val = node.name
1264
        elif field == "pinst_list":
1265
          val = list(node_to_primary[node.name])
1266
        elif field == "sinst_list":
1267
          val = list(node_to_secondary[node.name])
1268
        elif field == "pinst_cnt":
1269
          val = len(node_to_primary[node.name])
1270
        elif field == "sinst_cnt":
1271
          val = len(node_to_secondary[node.name])
1272
        elif field == "pip":
1273
          val = node.primary_ip
1274
        elif field == "sip":
1275
          val = node.secondary_ip
1276
        elif field in self.dynamic_fields:
1277
          val = live_data[node.name].get(field, None)
1278
        else:
1279
          raise errors.ParameterError(field)
1280
        node_output.append(val)
1281
      output.append(node_output)
1282

    
1283
    return output
1284

    
1285

    
1286
class LUQueryNodeVolumes(NoHooksLU):
1287
  """Logical unit for getting volumes on node(s).
1288

1289
  """
1290
  _OP_REQP = ["nodes", "output_fields"]
1291

    
1292
  def CheckPrereq(self):
1293
    """Check prerequisites.
1294

1295
    This checks that the fields required are valid output fields.
1296

1297
    """
1298
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1299

    
1300
    _CheckOutputFields(static=["node"],
1301
                       dynamic=["phys", "vg", "name", "size", "instance"],
1302
                       selected=self.op.output_fields)
1303

    
1304

    
1305
  def Exec(self, feedback_fn):
1306
    """Computes the list of nodes and their attributes.
1307

1308
    """
1309
    nodenames = self.nodes
1310
    volumes = rpc.call_node_volumes(nodenames)
1311

    
1312
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
1313
             in self.cfg.GetInstanceList()]
1314

    
1315
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1316

    
1317
    output = []
1318
    for node in nodenames:
1319
      if node not in volumes or not volumes[node]:
1320
        continue
1321

    
1322
      node_vols = volumes[node][:]
1323
      node_vols.sort(key=lambda vol: vol['dev'])
1324

    
1325
      for vol in node_vols:
1326
        node_output = []
1327
        for field in self.op.output_fields:
1328
          if field == "node":
1329
            val = node
1330
          elif field == "phys":
1331
            val = vol['dev']
1332
          elif field == "vg":
1333
            val = vol['vg']
1334
          elif field == "name":
1335
            val = vol['name']
1336
          elif field == "size":
1337
            val = int(float(vol['size']))
1338
          elif field == "instance":
1339
            for inst in ilist:
1340
              if node not in lv_by_node[inst]:
1341
                continue
1342
              if vol['name'] in lv_by_node[inst][node]:
1343
                val = inst.name
1344
                break
1345
            else:
1346
              val = '-'
1347
          else:
1348
            raise errors.ParameterError(field)
1349
          node_output.append(str(val))
1350

    
1351
        output.append(node_output)
1352

    
1353
    return output
1354

    
1355

    
1356
class LUAddNode(LogicalUnit):
1357
  """Logical unit for adding node to the cluster.
1358

1359
  """
1360
  HPATH = "node-add"
1361
  HTYPE = constants.HTYPE_NODE
1362
  _OP_REQP = ["node_name"]
1363

    
1364
  def BuildHooksEnv(self):
1365
    """Build hooks env.
1366

1367
    This will run on all nodes before, and on all nodes + the new node after.
1368

1369
    """
1370
    env = {
1371
      "NODE_NAME": self.op.node_name,
1372
      "NODE_PIP": self.op.primary_ip,
1373
      "NODE_SIP": self.op.secondary_ip,
1374
      }
1375
    nodes_0 = self.cfg.GetNodeList()
1376
    nodes_1 = nodes_0 + [self.op.node_name, ]
1377
    return env, nodes_0, nodes_1
1378

    
1379
  def CheckPrereq(self):
1380
    """Check prerequisites.
1381

1382
    This checks:
1383
     - the new node is not already in the config
1384
     - it is resolvable
1385
     - its parameters (single/dual homed) matches the cluster
1386

1387
    Any errors are signalled by raising errors.OpPrereqError.
1388

1389
    """
1390
    node_name = self.op.node_name
1391
    cfg = self.cfg
1392

    
1393
    dns_data = utils.HostInfo(node_name)
1394

    
1395
    node = dns_data.name
1396
    primary_ip = self.op.primary_ip = dns_data.ip
1397
    secondary_ip = getattr(self.op, "secondary_ip", None)
1398
    if secondary_ip is None:
1399
      secondary_ip = primary_ip
1400
    if not utils.IsValidIP(secondary_ip):
1401
      raise errors.OpPrereqError("Invalid secondary IP given")
1402
    self.op.secondary_ip = secondary_ip
1403
    node_list = cfg.GetNodeList()
1404
    if node in node_list:
1405
      raise errors.OpPrereqError("Node %s is already in the configuration"
1406
                                 % node)
1407

    
1408
    for existing_node_name in node_list:
1409
      existing_node = cfg.GetNodeInfo(existing_node_name)
1410
      if (existing_node.primary_ip == primary_ip or
1411
          existing_node.secondary_ip == primary_ip or
1412
          existing_node.primary_ip == secondary_ip or
1413
          existing_node.secondary_ip == secondary_ip):
1414
        raise errors.OpPrereqError("New node ip address(es) conflict with"
1415
                                   " existing node %s" % existing_node.name)
1416

    
1417
    # check that the type of the node (single versus dual homed) is the
1418
    # same as for the master
1419
    myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1420
    master_singlehomed = myself.secondary_ip == myself.primary_ip
1421
    newbie_singlehomed = secondary_ip == primary_ip
1422
    if master_singlehomed != newbie_singlehomed:
1423
      if master_singlehomed:
1424
        raise errors.OpPrereqError("The master has no private ip but the"
1425
                                   " new node has one")
1426
      else:
1427
        raise errors.OpPrereqError("The master has a private ip but the"
1428
                                   " new node doesn't have one")
1429

    
1430
    # checks reachablity
1431
    command = ["fping", "-q", primary_ip]
1432
    result = utils.RunCmd(command)
1433
    if result.failed:
1434
      raise errors.OpPrereqError("Node not reachable by ping")
1435

    
1436
    if not newbie_singlehomed:
1437
      # check reachability from my secondary ip to newbie's secondary ip
1438
      command = ["fping", "-S%s" % myself.secondary_ip, "-q", secondary_ip]
1439
      result = utils.RunCmd(command)
1440
      if result.failed:
1441
        raise errors.OpPrereqError("Node secondary ip not reachable by ping")
1442

    
1443
    self.new_node = objects.Node(name=node,
1444
                                 primary_ip=primary_ip,
1445
                                 secondary_ip=secondary_ip)
1446

    
1447
  def Exec(self, feedback_fn):
1448
    """Adds the new node to the cluster.
1449

1450
    """
1451
    new_node = self.new_node
1452
    node = new_node.name
1453

    
1454
    # set up inter-node password and certificate and restarts the node daemon
1455
    gntpass = self.sstore.GetNodeDaemonPassword()
1456
    if not re.match('^[a-zA-Z0-9.]{1,64}$', gntpass):
1457
      raise errors.OpExecError("ganeti password corruption detected")
1458
    f = open(constants.SSL_CERT_FILE)
1459
    try:
1460
      gntpem = f.read(8192)
1461
    finally:
1462
      f.close()
1463
    # in the base64 pem encoding, neither '!' nor '.' are valid chars,
1464
    # so we use this to detect an invalid certificate; as long as the
1465
    # cert doesn't contain this, the here-document will be correctly
1466
    # parsed by the shell sequence below
1467
    if re.search('^!EOF\.', gntpem, re.MULTILINE):
1468
      raise errors.OpExecError("invalid PEM encoding in the SSL certificate")
1469
    if not gntpem.endswith("\n"):
1470
      raise errors.OpExecError("PEM must end with newline")
1471
    logger.Info("copy cluster pass to %s and starting the node daemon" % node)
1472

    
1473
    # and then connect with ssh to set password and start ganeti-noded
1474
    # note that all the below variables are sanitized at this point,
1475
    # either by being constants or by the checks above
1476
    ss = self.sstore
1477
    mycommand = ("umask 077 && "
1478
                 "echo '%s' > '%s' && "
1479
                 "cat > '%s' << '!EOF.' && \n"
1480
                 "%s!EOF.\n%s restart" %
1481
                 (gntpass, ss.KeyToFilename(ss.SS_NODED_PASS),
1482
                  constants.SSL_CERT_FILE, gntpem,
1483
                  constants.NODE_INITD_SCRIPT))
1484

    
1485
    result = ssh.SSHCall(node, 'root', mycommand, batch=False, ask_key=True)
1486
    if result.failed:
1487
      raise errors.OpExecError("Remote command on node %s, error: %s,"
1488
                               " output: %s" %
1489
                               (node, result.fail_reason, result.output))
1490

    
1491
    # check connectivity
1492
    time.sleep(4)
1493

    
1494
    result = rpc.call_version([node])[node]
1495
    if result:
1496
      if constants.PROTOCOL_VERSION == result:
1497
        logger.Info("communication to node %s fine, sw version %s match" %
1498
                    (node, result))
1499
      else:
1500
        raise errors.OpExecError("Version mismatch master version %s,"
1501
                                 " node version %s" %
1502
                                 (constants.PROTOCOL_VERSION, result))
1503
    else:
1504
      raise errors.OpExecError("Cannot get version from the new node")
1505

    
1506
    # setup ssh on node
1507
    logger.Info("copy ssh key to node %s" % node)
1508
    keyarray = []
1509
    keyfiles = ["/etc/ssh/ssh_host_dsa_key", "/etc/ssh/ssh_host_dsa_key.pub",
1510
                "/etc/ssh/ssh_host_rsa_key", "/etc/ssh/ssh_host_rsa_key.pub",
1511
                "/root/.ssh/id_dsa", "/root/.ssh/id_dsa.pub"]
1512

    
1513
    for i in keyfiles:
1514
      f = open(i, 'r')
1515
      try:
1516
        keyarray.append(f.read())
1517
      finally:
1518
        f.close()
1519

    
1520
    result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1521
                               keyarray[3], keyarray[4], keyarray[5])
1522

    
1523
    if not result:
1524
      raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1525

    
1526
    # Add node to our /etc/hosts, and add key to known_hosts
1527
    _UpdateEtcHosts(new_node.name, new_node.primary_ip)
1528
    _UpdateKnownHosts(new_node.name, new_node.primary_ip,
1529
                      self.cfg.GetHostKey())
1530

    
1531
    if new_node.secondary_ip != new_node.primary_ip:
1532
      result = ssh.SSHCall(node, "root",
1533
                           "fping -S 127.0.0.1 -q %s" % new_node.secondary_ip)
1534
      if result.failed:
1535
        raise errors.OpExecError("Node claims it doesn't have the"
1536
                                 " secondary ip you gave (%s).\n"
1537
                                 "Please fix and re-run this command." %
1538
                                 new_node.secondary_ip)
1539

    
1540
    success, msg = ssh.VerifyNodeHostname(node)
1541
    if not success:
1542
      raise errors.OpExecError("Node '%s' claims it has a different hostname"
1543
                               " than the one the resolver gives: %s.\n"
1544
                               "Please fix and re-run this command." %
1545
                               (node, msg))
1546

    
1547
    # Distribute updated /etc/hosts and known_hosts to all nodes,
1548
    # including the node just added
1549
    myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1550
    dist_nodes = self.cfg.GetNodeList() + [node]
1551
    if myself.name in dist_nodes:
1552
      dist_nodes.remove(myself.name)
1553

    
1554
    logger.Debug("Copying hosts and known_hosts to all nodes")
1555
    for fname in ("/etc/hosts", constants.SSH_KNOWN_HOSTS_FILE):
1556
      result = rpc.call_upload_file(dist_nodes, fname)
1557
      for to_node in dist_nodes:
1558
        if not result[to_node]:
1559
          logger.Error("copy of file %s to node %s failed" %
1560
                       (fname, to_node))
1561

    
1562
    to_copy = ss.GetFileList()
1563
    for fname in to_copy:
1564
      if not ssh.CopyFileToNode(node, fname):
1565
        logger.Error("could not copy file %s to node %s" % (fname, node))
1566

    
1567
    logger.Info("adding node %s to cluster.conf" % node)
1568
    self.cfg.AddNode(new_node)
1569

    
1570

    
1571
class LUMasterFailover(LogicalUnit):
1572
  """Failover the master node to the current node.
1573

1574
  This is a special LU in that it must run on a non-master node.
1575

1576
  """
1577
  HPATH = "master-failover"
1578
  HTYPE = constants.HTYPE_CLUSTER
1579
  REQ_MASTER = False
1580
  _OP_REQP = []
1581

    
1582
  def BuildHooksEnv(self):
1583
    """Build hooks env.
1584

1585
    This will run on the new master only in the pre phase, and on all
1586
    the nodes in the post phase.
1587

1588
    """
1589
    env = {
1590
      "NEW_MASTER": self.new_master,
1591
      "OLD_MASTER": self.old_master,
1592
      }
1593
    return env, [self.new_master], self.cfg.GetNodeList()
1594

    
1595
  def CheckPrereq(self):
1596
    """Check prerequisites.
1597

1598
    This checks that we are not already the master.
1599

1600
    """
1601
    self.new_master = utils.HostInfo().name
1602
    self.old_master = self.sstore.GetMasterNode()
1603

    
1604
    if self.old_master == self.new_master:
1605
      raise errors.OpPrereqError("This commands must be run on the node"
1606
                                 " where you want the new master to be.\n"
1607
                                 "%s is already the master" %
1608
                                 self.old_master)
1609

    
1610
  def Exec(self, feedback_fn):
1611
    """Failover the master node.
1612

1613
    This command, when run on a non-master node, will cause the current
1614
    master to cease being master, and the non-master to become new
1615
    master.
1616

1617
    """
1618
    #TODO: do not rely on gethostname returning the FQDN
1619
    logger.Info("setting master to %s, old master: %s" %
1620
                (self.new_master, self.old_master))
1621

    
1622
    if not rpc.call_node_stop_master(self.old_master):
1623
      logger.Error("could disable the master role on the old master"
1624
                   " %s, please disable manually" % self.old_master)
1625

    
1626
    ss = self.sstore
1627
    ss.SetKey(ss.SS_MASTER_NODE, self.new_master)
1628
    if not rpc.call_upload_file(self.cfg.GetNodeList(),
1629
                                ss.KeyToFilename(ss.SS_MASTER_NODE)):
1630
      logger.Error("could not distribute the new simple store master file"
1631
                   " to the other nodes, please check.")
1632

    
1633
    if not rpc.call_node_start_master(self.new_master):
1634
      logger.Error("could not start the master role on the new master"
1635
                   " %s, please check" % self.new_master)
1636
      feedback_fn("Error in activating the master IP on the new master,\n"
1637
                  "please fix manually.")
1638

    
1639

    
1640

    
1641
class LUQueryClusterInfo(NoHooksLU):
1642
  """Query cluster configuration.
1643

1644
  """
1645
  _OP_REQP = []
1646
  REQ_MASTER = False
1647

    
1648
  def CheckPrereq(self):
1649
    """No prerequsites needed for this LU.
1650

1651
    """
1652
    pass
1653

    
1654
  def Exec(self, feedback_fn):
1655
    """Return cluster config.
1656

1657
    """
1658
    result = {
1659
      "name": self.sstore.GetClusterName(),
1660
      "software_version": constants.RELEASE_VERSION,
1661
      "protocol_version": constants.PROTOCOL_VERSION,
1662
      "config_version": constants.CONFIG_VERSION,
1663
      "os_api_version": constants.OS_API_VERSION,
1664
      "export_version": constants.EXPORT_VERSION,
1665
      "master": self.sstore.GetMasterNode(),
1666
      "architecture": (platform.architecture()[0], platform.machine()),
1667
      }
1668

    
1669
    return result
1670

    
1671

    
1672
class LUClusterCopyFile(NoHooksLU):
1673
  """Copy file to cluster.
1674

1675
  """
1676
  _OP_REQP = ["nodes", "filename"]
1677

    
1678
  def CheckPrereq(self):
1679
    """Check prerequisites.
1680

1681
    It should check that the named file exists and that the given list
1682
    of nodes is valid.
1683

1684
    """
1685
    if not os.path.exists(self.op.filename):
1686
      raise errors.OpPrereqError("No such filename '%s'" % self.op.filename)
1687

    
1688
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1689

    
1690
  def Exec(self, feedback_fn):
1691
    """Copy a file from master to some nodes.
1692

1693
    Args:
1694
      opts - class with options as members
1695
      args - list containing a single element, the file name
1696
    Opts used:
1697
      nodes - list containing the name of target nodes; if empty, all nodes
1698

1699
    """
1700
    filename = self.op.filename
1701

    
1702
    myname = utils.HostInfo().name
1703

    
1704
    for node in self.nodes:
1705
      if node == myname:
1706
        continue
1707
      if not ssh.CopyFileToNode(node, filename):
1708
        logger.Error("Copy of file %s to node %s failed" % (filename, node))
1709

    
1710

    
1711
class LUDumpClusterConfig(NoHooksLU):
1712
  """Return a text-representation of the cluster-config.
1713

1714
  """
1715
  _OP_REQP = []
1716

    
1717
  def CheckPrereq(self):
1718
    """No prerequisites.
1719

1720
    """
1721
    pass
1722

    
1723
  def Exec(self, feedback_fn):
1724
    """Dump a representation of the cluster config to the standard output.
1725

1726
    """
1727
    return self.cfg.DumpConfig()
1728

    
1729

    
1730
class LURunClusterCommand(NoHooksLU):
1731
  """Run a command on some nodes.
1732

1733
  """
1734
  _OP_REQP = ["command", "nodes"]
1735

    
1736
  def CheckPrereq(self):
1737
    """Check prerequisites.
1738

1739
    It checks that the given list of nodes is valid.
1740

1741
    """
1742
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1743

    
1744
  def Exec(self, feedback_fn):
1745
    """Run a command on some nodes.
1746

1747
    """
1748
    data = []
1749
    for node in self.nodes:
1750
      result = ssh.SSHCall(node, "root", self.op.command)
1751
      data.append((node, result.output, result.exit_code))
1752

    
1753
    return data
1754

    
1755

    
1756
class LUActivateInstanceDisks(NoHooksLU):
1757
  """Bring up an instance's disks.
1758

1759
  """
1760
  _OP_REQP = ["instance_name"]
1761

    
1762
  def CheckPrereq(self):
1763
    """Check prerequisites.
1764

1765
    This checks that the instance is in the cluster.
1766

1767
    """
1768
    instance = self.cfg.GetInstanceInfo(
1769
      self.cfg.ExpandInstanceName(self.op.instance_name))
1770
    if instance is None:
1771
      raise errors.OpPrereqError("Instance '%s' not known" %
1772
                                 self.op.instance_name)
1773
    self.instance = instance
1774

    
1775

    
1776
  def Exec(self, feedback_fn):
1777
    """Activate the disks.
1778

1779
    """
1780
    disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
1781
    if not disks_ok:
1782
      raise errors.OpExecError("Cannot activate block devices")
1783

    
1784
    return disks_info
1785

    
1786

    
1787
def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
1788
  """Prepare the block devices for an instance.
1789

1790
  This sets up the block devices on all nodes.
1791

1792
  Args:
1793
    instance: a ganeti.objects.Instance object
1794
    ignore_secondaries: if true, errors on secondary nodes won't result
1795
                        in an error return from the function
1796

1797
  Returns:
1798
    false if the operation failed
1799
    list of (host, instance_visible_name, node_visible_name) if the operation
1800
         suceeded with the mapping from node devices to instance devices
1801
  """
1802
  device_info = []
1803
  disks_ok = True
1804
  for inst_disk in instance.disks:
1805
    master_result = None
1806
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1807
      cfg.SetDiskID(node_disk, node)
1808
      is_primary = node == instance.primary_node
1809
      result = rpc.call_blockdev_assemble(node, node_disk, is_primary)
1810
      if not result:
1811
        logger.Error("could not prepare block device %s on node %s (is_pri"
1812
                     "mary=%s)" % (inst_disk.iv_name, node, is_primary))
1813
        if is_primary or not ignore_secondaries:
1814
          disks_ok = False
1815
      if is_primary:
1816
        master_result = result
1817
    device_info.append((instance.primary_node, inst_disk.iv_name,
1818
                        master_result))
1819

    
1820
  return disks_ok, device_info
1821

    
1822

    
1823
def _StartInstanceDisks(cfg, instance, force):
1824
  """Start the disks of an instance.
1825

1826
  """
1827
  disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
1828
                                           ignore_secondaries=force)
1829
  if not disks_ok:
1830
    _ShutdownInstanceDisks(instance, cfg)
1831
    if force is not None and not force:
1832
      logger.Error("If the message above refers to a secondary node,"
1833
                   " you can retry the operation using '--force'.")
1834
    raise errors.OpExecError("Disk consistency error")
1835

    
1836

    
1837
class LUDeactivateInstanceDisks(NoHooksLU):
1838
  """Shutdown an instance's disks.
1839

1840
  """
1841
  _OP_REQP = ["instance_name"]
1842

    
1843
  def CheckPrereq(self):
1844
    """Check prerequisites.
1845

1846
    This checks that the instance is in the cluster.
1847

1848
    """
1849
    instance = self.cfg.GetInstanceInfo(
1850
      self.cfg.ExpandInstanceName(self.op.instance_name))
1851
    if instance is None:
1852
      raise errors.OpPrereqError("Instance '%s' not known" %
1853
                                 self.op.instance_name)
1854
    self.instance = instance
1855

    
1856
  def Exec(self, feedback_fn):
1857
    """Deactivate the disks
1858

1859
    """
1860
    instance = self.instance
1861
    ins_l = rpc.call_instance_list([instance.primary_node])
1862
    ins_l = ins_l[instance.primary_node]
1863
    if not type(ins_l) is list:
1864
      raise errors.OpExecError("Can't contact node '%s'" %
1865
                               instance.primary_node)
1866

    
1867
    if self.instance.name in ins_l:
1868
      raise errors.OpExecError("Instance is running, can't shutdown"
1869
                               " block devices.")
1870

    
1871
    _ShutdownInstanceDisks(instance, self.cfg)
1872

    
1873

    
1874
def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
1875
  """Shutdown block devices of an instance.
1876

1877
  This does the shutdown on all nodes of the instance.
1878

1879
  If the ignore_primary is false, errors on the primary node are
1880
  ignored.
1881

1882
  """
1883
  result = True
1884
  for disk in instance.disks:
1885
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
1886
      cfg.SetDiskID(top_disk, node)
1887
      if not rpc.call_blockdev_shutdown(node, top_disk):
1888
        logger.Error("could not shutdown block device %s on node %s" %
1889
                     (disk.iv_name, node))
1890
        if not ignore_primary or node != instance.primary_node:
1891
          result = False
1892
  return result
1893

    
1894

    
1895
class LUStartupInstance(LogicalUnit):
1896
  """Starts an instance.
1897

1898
  """
1899
  HPATH = "instance-start"
1900
  HTYPE = constants.HTYPE_INSTANCE
1901
  _OP_REQP = ["instance_name", "force"]
1902

    
1903
  def BuildHooksEnv(self):
1904
    """Build hooks env.
1905

1906
    This runs on master, primary and secondary nodes of the instance.
1907

1908
    """
1909
    env = {
1910
      "FORCE": self.op.force,
1911
      }
1912
    env.update(_BuildInstanceHookEnvByObject(self.instance))
1913
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
1914
          list(self.instance.secondary_nodes))
1915
    return env, nl, nl
1916

    
1917
  def CheckPrereq(self):
1918
    """Check prerequisites.
1919

1920
    This checks that the instance is in the cluster.
1921

1922
    """
1923
    instance = self.cfg.GetInstanceInfo(
1924
      self.cfg.ExpandInstanceName(self.op.instance_name))
1925
    if instance is None:
1926
      raise errors.OpPrereqError("Instance '%s' not known" %
1927
                                 self.op.instance_name)
1928

    
1929
    # check bridges existance
1930
    brlist = [nic.bridge for nic in instance.nics]
1931
    if not rpc.call_bridges_exist(instance.primary_node, brlist):
1932
      raise errors.OpPrereqError("one or more target bridges %s does not"
1933
                                 " exist on destination node '%s'" %
1934
                                 (brlist, instance.primary_node))
1935

    
1936
    self.instance = instance
1937
    self.op.instance_name = instance.name
1938

    
1939
  def Exec(self, feedback_fn):
1940
    """Start the instance.
1941

1942
    """
1943
    instance = self.instance
1944
    force = self.op.force
1945
    extra_args = getattr(self.op, "extra_args", "")
1946

    
1947
    node_current = instance.primary_node
1948

    
1949
    nodeinfo = rpc.call_node_info([node_current], self.cfg.GetVGName())
1950
    if not nodeinfo:
1951
      raise errors.OpExecError("Could not contact node %s for infos" %
1952
                               (node_current))
1953

    
1954
    freememory = nodeinfo[node_current]['memory_free']
1955
    memory = instance.memory
1956
    if memory > freememory:
1957
      raise errors.OpExecError("Not enough memory to start instance"
1958
                               " %s on node %s"
1959
                               " needed %s MiB, available %s MiB" %
1960
                               (instance.name, node_current, memory,
1961
                                freememory))
1962

    
1963
    _StartInstanceDisks(self.cfg, instance, force)
1964

    
1965
    if not rpc.call_instance_start(node_current, instance, extra_args):
1966
      _ShutdownInstanceDisks(instance, self.cfg)
1967
      raise errors.OpExecError("Could not start instance")
1968

    
1969
    self.cfg.MarkInstanceUp(instance.name)
1970

    
1971

    
1972
class LUShutdownInstance(LogicalUnit):
1973
  """Shutdown an instance.
1974

1975
  """
1976
  HPATH = "instance-stop"
1977
  HTYPE = constants.HTYPE_INSTANCE
1978
  _OP_REQP = ["instance_name"]
1979

    
1980
  def BuildHooksEnv(self):
1981
    """Build hooks env.
1982

1983
    This runs on master, primary and secondary nodes of the instance.
1984

1985
    """
1986
    env = _BuildInstanceHookEnvByObject(self.instance)
1987
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
1988
          list(self.instance.secondary_nodes))
1989
    return env, nl, nl
1990

    
1991
  def CheckPrereq(self):
1992
    """Check prerequisites.
1993

1994
    This checks that the instance is in the cluster.
1995

1996
    """
1997
    instance = self.cfg.GetInstanceInfo(
1998
      self.cfg.ExpandInstanceName(self.op.instance_name))
1999
    if instance is None:
2000
      raise errors.OpPrereqError("Instance '%s' not known" %
2001
                                 self.op.instance_name)
2002
    self.instance = instance
2003

    
2004
  def Exec(self, feedback_fn):
2005
    """Shutdown the instance.
2006

2007
    """
2008
    instance = self.instance
2009
    node_current = instance.primary_node
2010
    if not rpc.call_instance_shutdown(node_current, instance):
2011
      logger.Error("could not shutdown instance")
2012

    
2013
    self.cfg.MarkInstanceDown(instance.name)
2014
    _ShutdownInstanceDisks(instance, self.cfg)
2015

    
2016

    
2017
class LUReinstallInstance(LogicalUnit):
2018
  """Reinstall an instance.
2019

2020
  """
2021
  HPATH = "instance-reinstall"
2022
  HTYPE = constants.HTYPE_INSTANCE
2023
  _OP_REQP = ["instance_name"]
2024

    
2025
  def BuildHooksEnv(self):
2026
    """Build hooks env.
2027

2028
    This runs on master, primary and secondary nodes of the instance.
2029

2030
    """
2031
    env = _BuildInstanceHookEnvByObject(self.instance)
2032
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2033
          list(self.instance.secondary_nodes))
2034
    return env, nl, nl
2035

    
2036
  def CheckPrereq(self):
2037
    """Check prerequisites.
2038

2039
    This checks that the instance is in the cluster and is not running.
2040

2041
    """
2042
    instance = self.cfg.GetInstanceInfo(
2043
      self.cfg.ExpandInstanceName(self.op.instance_name))
2044
    if instance is None:
2045
      raise errors.OpPrereqError("Instance '%s' not known" %
2046
                                 self.op.instance_name)
2047
    if instance.disk_template == constants.DT_DISKLESS:
2048
      raise errors.OpPrereqError("Instance '%s' has no disks" %
2049
                                 self.op.instance_name)
2050
    if instance.status != "down":
2051
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2052
                                 self.op.instance_name)
2053
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2054
    if remote_info:
2055
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2056
                                 (self.op.instance_name,
2057
                                  instance.primary_node))
2058

    
2059
    self.op.os_type = getattr(self.op, "os_type", None)
2060
    if self.op.os_type is not None:
2061
      # OS verification
2062
      pnode = self.cfg.GetNodeInfo(
2063
        self.cfg.ExpandNodeName(instance.primary_node))
2064
      if pnode is None:
2065
        raise errors.OpPrereqError("Primary node '%s' is unknown" %
2066
                                   self.op.pnode)
2067
      os_obj = rpc.call_os_get([pnode.name], self.op.os_type)[pnode.name]
2068
      if not isinstance(os_obj, objects.OS):
2069
        raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2070
                                   " primary node"  % self.op.os_type)
2071

    
2072
    self.instance = instance
2073

    
2074
  def Exec(self, feedback_fn):
2075
    """Reinstall the instance.
2076

2077
    """
2078
    inst = self.instance
2079

    
2080
    if self.op.os_type is not None:
2081
      feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2082
      inst.os = self.op.os_type
2083
      self.cfg.AddInstance(inst)
2084

    
2085
    _StartInstanceDisks(self.cfg, inst, None)
2086
    try:
2087
      feedback_fn("Running the instance OS create scripts...")
2088
      if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2089
        raise errors.OpExecError("Could not install OS for instance %s "
2090
                                 "on node %s" %
2091
                                 (inst.name, inst.primary_node))
2092
    finally:
2093
      _ShutdownInstanceDisks(inst, self.cfg)
2094

    
2095

    
2096
class LURenameInstance(LogicalUnit):
2097
  """Rename an instance.
2098

2099
  """
2100
  HPATH = "instance-rename"
2101
  HTYPE = constants.HTYPE_INSTANCE
2102
  _OP_REQP = ["instance_name", "new_name"]
2103

    
2104
  def BuildHooksEnv(self):
2105
    """Build hooks env.
2106

2107
    This runs on master, primary and secondary nodes of the instance.
2108

2109
    """
2110
    env = _BuildInstanceHookEnvByObject(self.instance)
2111
    env["INSTANCE_NEW_NAME"] = self.op.new_name
2112
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2113
          list(self.instance.secondary_nodes))
2114
    return env, nl, nl
2115

    
2116
  def CheckPrereq(self):
2117
    """Check prerequisites.
2118

2119
    This checks that the instance is in the cluster and is not running.
2120

2121
    """
2122
    instance = self.cfg.GetInstanceInfo(
2123
      self.cfg.ExpandInstanceName(self.op.instance_name))
2124
    if instance is None:
2125
      raise errors.OpPrereqError("Instance '%s' not known" %
2126
                                 self.op.instance_name)
2127
    if instance.status != "down":
2128
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2129
                                 self.op.instance_name)
2130
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2131
    if remote_info:
2132
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2133
                                 (self.op.instance_name,
2134
                                  instance.primary_node))
2135
    self.instance = instance
2136

    
2137
    # new name verification
2138
    name_info = utils.HostInfo(self.op.new_name)
2139

    
2140
    self.op.new_name = new_name = name_info.name
2141
    if not getattr(self.op, "ignore_ip", False):
2142
      command = ["fping", "-q", name_info.ip]
2143
      result = utils.RunCmd(command)
2144
      if not result.failed:
2145
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
2146
                                   (name_info.ip, new_name))
2147

    
2148

    
2149
  def Exec(self, feedback_fn):
2150
    """Reinstall the instance.
2151

2152
    """
2153
    inst = self.instance
2154
    old_name = inst.name
2155

    
2156
    self.cfg.RenameInstance(inst.name, self.op.new_name)
2157

    
2158
    # re-read the instance from the configuration after rename
2159
    inst = self.cfg.GetInstanceInfo(self.op.new_name)
2160

    
2161
    _StartInstanceDisks(self.cfg, inst, None)
2162
    try:
2163
      if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2164
                                          "sda", "sdb"):
2165
        msg = ("Could run OS rename script for instance %s\n"
2166
               "on node %s\n"
2167
               "(but the instance has been renamed in Ganeti)" %
2168
               (inst.name, inst.primary_node))
2169
        logger.Error(msg)
2170
    finally:
2171
      _ShutdownInstanceDisks(inst, self.cfg)
2172

    
2173

    
2174
class LURemoveInstance(LogicalUnit):
2175
  """Remove an instance.
2176

2177
  """
2178
  HPATH = "instance-remove"
2179
  HTYPE = constants.HTYPE_INSTANCE
2180
  _OP_REQP = ["instance_name"]
2181

    
2182
  def BuildHooksEnv(self):
2183
    """Build hooks env.
2184

2185
    This runs on master, primary and secondary nodes of the instance.
2186

2187
    """
2188
    env = _BuildInstanceHookEnvByObject(self.instance)
2189
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2190
          list(self.instance.secondary_nodes))
2191
    return env, nl, nl
2192

    
2193
  def CheckPrereq(self):
2194
    """Check prerequisites.
2195

2196
    This checks that the instance is in the cluster.
2197

2198
    """
2199
    instance = self.cfg.GetInstanceInfo(
2200
      self.cfg.ExpandInstanceName(self.op.instance_name))
2201
    if instance is None:
2202
      raise errors.OpPrereqError("Instance '%s' not known" %
2203
                                 self.op.instance_name)
2204
    self.instance = instance
2205

    
2206
  def Exec(self, feedback_fn):
2207
    """Remove the instance.
2208

2209
    """
2210
    instance = self.instance
2211
    logger.Info("shutting down instance %s on node %s" %
2212
                (instance.name, instance.primary_node))
2213

    
2214
    if not rpc.call_instance_shutdown(instance.primary_node, instance):
2215
      raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2216
                               (instance.name, instance.primary_node))
2217

    
2218
    logger.Info("removing block devices for instance %s" % instance.name)
2219

    
2220
    _RemoveDisks(instance, self.cfg)
2221

    
2222
    logger.Info("removing instance %s out of cluster config" % instance.name)
2223

    
2224
    self.cfg.RemoveInstance(instance.name)
2225

    
2226

    
2227
class LUQueryInstances(NoHooksLU):
2228
  """Logical unit for querying instances.
2229

2230
  """
2231
  _OP_REQP = ["output_fields", "names"]
2232

    
2233
  def CheckPrereq(self):
2234
    """Check prerequisites.
2235

2236
    This checks that the fields required are valid output fields.
2237

2238
    """
2239
    self.dynamic_fields = frozenset(["oper_state", "oper_ram"])
2240
    _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2241
                               "admin_state", "admin_ram",
2242
                               "disk_template", "ip", "mac", "bridge",
2243
                               "sda_size", "sdb_size"],
2244
                       dynamic=self.dynamic_fields,
2245
                       selected=self.op.output_fields)
2246

    
2247
    self.wanted = _GetWantedInstances(self, self.op.names)
2248

    
2249
  def Exec(self, feedback_fn):
2250
    """Computes the list of nodes and their attributes.
2251

2252
    """
2253
    instance_names = self.wanted
2254
    instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2255
                     in instance_names]
2256

    
2257
    # begin data gathering
2258

    
2259
    nodes = frozenset([inst.primary_node for inst in instance_list])
2260

    
2261
    bad_nodes = []
2262
    if self.dynamic_fields.intersection(self.op.output_fields):
2263
      live_data = {}
2264
      node_data = rpc.call_all_instances_info(nodes)
2265
      for name in nodes:
2266
        result = node_data[name]
2267
        if result:
2268
          live_data.update(result)
2269
        elif result == False:
2270
          bad_nodes.append(name)
2271
        # else no instance is alive
2272
    else:
2273
      live_data = dict([(name, {}) for name in instance_names])
2274

    
2275
    # end data gathering
2276

    
2277
    output = []
2278
    for instance in instance_list:
2279
      iout = []
2280
      for field in self.op.output_fields:
2281
        if field == "name":
2282
          val = instance.name
2283
        elif field == "os":
2284
          val = instance.os
2285
        elif field == "pnode":
2286
          val = instance.primary_node
2287
        elif field == "snodes":
2288
          val = list(instance.secondary_nodes)
2289
        elif field == "admin_state":
2290
          val = (instance.status != "down")
2291
        elif field == "oper_state":
2292
          if instance.primary_node in bad_nodes:
2293
            val = None
2294
          else:
2295
            val = bool(live_data.get(instance.name))
2296
        elif field == "admin_ram":
2297
          val = instance.memory
2298
        elif field == "oper_ram":
2299
          if instance.primary_node in bad_nodes:
2300
            val = None
2301
          elif instance.name in live_data:
2302
            val = live_data[instance.name].get("memory", "?")
2303
          else:
2304
            val = "-"
2305
        elif field == "disk_template":
2306
          val = instance.disk_template
2307
        elif field == "ip":
2308
          val = instance.nics[0].ip
2309
        elif field == "bridge":
2310
          val = instance.nics[0].bridge
2311
        elif field == "mac":
2312
          val = instance.nics[0].mac
2313
        elif field == "sda_size" or field == "sdb_size":
2314
          disk = instance.FindDisk(field[:3])
2315
          if disk is None:
2316
            val = None
2317
          else:
2318
            val = disk.size
2319
        else:
2320
          raise errors.ParameterError(field)
2321
        iout.append(val)
2322
      output.append(iout)
2323

    
2324
    return output
2325

    
2326

    
2327
class LUFailoverInstance(LogicalUnit):
2328
  """Failover an instance.
2329

2330
  """
2331
  HPATH = "instance-failover"
2332
  HTYPE = constants.HTYPE_INSTANCE
2333
  _OP_REQP = ["instance_name", "ignore_consistency"]
2334

    
2335
  def BuildHooksEnv(self):
2336
    """Build hooks env.
2337

2338
    This runs on master, primary and secondary nodes of the instance.
2339

2340
    """
2341
    env = {
2342
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2343
      }
2344
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2345
    nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2346
    return env, nl, nl
2347

    
2348
  def CheckPrereq(self):
2349
    """Check prerequisites.
2350

2351
    This checks that the instance is in the cluster.
2352

2353
    """
2354
    instance = self.cfg.GetInstanceInfo(
2355
      self.cfg.ExpandInstanceName(self.op.instance_name))
2356
    if instance is None:
2357
      raise errors.OpPrereqError("Instance '%s' not known" %
2358
                                 self.op.instance_name)
2359

    
2360
    if instance.disk_template != constants.DT_REMOTE_RAID1:
2361
      raise errors.OpPrereqError("Instance's disk layout is not"
2362
                                 " remote_raid1.")
2363

    
2364
    secondary_nodes = instance.secondary_nodes
2365
    if not secondary_nodes:
2366
      raise errors.ProgrammerError("no secondary node but using "
2367
                                   "DT_REMOTE_RAID1 template")
2368

    
2369
    # check memory requirements on the secondary node
2370
    target_node = secondary_nodes[0]
2371
    nodeinfo = rpc.call_node_info([target_node], self.cfg.GetVGName())
2372
    info = nodeinfo.get(target_node, None)
2373
    if not info:
2374
      raise errors.OpPrereqError("Cannot get current information"
2375
                                 " from node '%s'" % nodeinfo)
2376
    if instance.memory > info['memory_free']:
2377
      raise errors.OpPrereqError("Not enough memory on target node %s."
2378
                                 " %d MB available, %d MB required" %
2379
                                 (target_node, info['memory_free'],
2380
                                  instance.memory))
2381

    
2382
    # check bridge existance
2383
    brlist = [nic.bridge for nic in instance.nics]
2384
    if not rpc.call_bridges_exist(instance.primary_node, brlist):
2385
      raise errors.OpPrereqError("One or more target bridges %s does not"
2386
                                 " exist on destination node '%s'" %
2387
                                 (brlist, instance.primary_node))
2388

    
2389
    self.instance = instance
2390

    
2391
  def Exec(self, feedback_fn):
2392
    """Failover an instance.
2393

2394
    The failover is done by shutting it down on its present node and
2395
    starting it on the secondary.
2396

2397
    """
2398
    instance = self.instance
2399

    
2400
    source_node = instance.primary_node
2401
    target_node = instance.secondary_nodes[0]
2402

    
2403
    feedback_fn("* checking disk consistency between source and target")
2404
    for dev in instance.disks:
2405
      # for remote_raid1, these are md over drbd
2406
      if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2407
        if not self.op.ignore_consistency:
2408
          raise errors.OpExecError("Disk %s is degraded on target node,"
2409
                                   " aborting failover." % dev.iv_name)
2410

    
2411
    feedback_fn("* checking target node resource availability")
2412
    nodeinfo = rpc.call_node_info([target_node], self.cfg.GetVGName())
2413

    
2414
    if not nodeinfo:
2415
      raise errors.OpExecError("Could not contact target node %s." %
2416
                               target_node)
2417

    
2418
    free_memory = int(nodeinfo[target_node]['memory_free'])
2419
    memory = instance.memory
2420
    if memory > free_memory:
2421
      raise errors.OpExecError("Not enough memory to create instance %s on"
2422
                               " node %s. needed %s MiB, available %s MiB" %
2423
                               (instance.name, target_node, memory,
2424
                                free_memory))
2425

    
2426
    feedback_fn("* shutting down instance on source node")
2427
    logger.Info("Shutting down instance %s on node %s" %
2428
                (instance.name, source_node))
2429

    
2430
    if not rpc.call_instance_shutdown(source_node, instance):
2431
      logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2432
                   " anyway. Please make sure node %s is down"  %
2433
                   (instance.name, source_node, source_node))
2434

    
2435
    feedback_fn("* deactivating the instance's disks on source node")
2436
    if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2437
      raise errors.OpExecError("Can't shut down the instance's disks.")
2438

    
2439
    instance.primary_node = target_node
2440
    # distribute new instance config to the other nodes
2441
    self.cfg.AddInstance(instance)
2442

    
2443
    feedback_fn("* activating the instance's disks on target node")
2444
    logger.Info("Starting instance %s on node %s" %
2445
                (instance.name, target_node))
2446

    
2447
    disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2448
                                             ignore_secondaries=True)
2449
    if not disks_ok:
2450
      _ShutdownInstanceDisks(instance, self.cfg)
2451
      raise errors.OpExecError("Can't activate the instance's disks")
2452

    
2453
    feedback_fn("* starting the instance on the target node")
2454
    if not rpc.call_instance_start(target_node, instance, None):
2455
      _ShutdownInstanceDisks(instance, self.cfg)
2456
      raise errors.OpExecError("Could not start instance %s on node %s." %
2457
                               (instance.name, target_node))
2458

    
2459

    
2460
def _CreateBlockDevOnPrimary(cfg, node, device, info):
2461
  """Create a tree of block devices on the primary node.
2462

2463
  This always creates all devices.
2464

2465
  """
2466
  if device.children:
2467
    for child in device.children:
2468
      if not _CreateBlockDevOnPrimary(cfg, node, child, info):
2469
        return False
2470

    
2471
  cfg.SetDiskID(device, node)
2472
  new_id = rpc.call_blockdev_create(node, device, device.size, True, info)
2473
  if not new_id:
2474
    return False
2475
  if device.physical_id is None:
2476
    device.physical_id = new_id
2477
  return True
2478

    
2479

    
2480
def _CreateBlockDevOnSecondary(cfg, node, device, force, info):
2481
  """Create a tree of block devices on a secondary node.
2482

2483
  If this device type has to be created on secondaries, create it and
2484
  all its children.
2485

2486
  If not, just recurse to children keeping the same 'force' value.
2487

2488
  """
2489
  if device.CreateOnSecondary():
2490
    force = True
2491
  if device.children:
2492
    for child in device.children:
2493
      if not _CreateBlockDevOnSecondary(cfg, node, child, force, info):
2494
        return False
2495

    
2496
  if not force:
2497
    return True
2498
  cfg.SetDiskID(device, node)
2499
  new_id = rpc.call_blockdev_create(node, device, device.size, False, info)
2500
  if not new_id:
2501
    return False
2502
  if device.physical_id is None:
2503
    device.physical_id = new_id
2504
  return True
2505

    
2506

    
2507
def _GenerateUniqueNames(cfg, exts):
2508
  """Generate a suitable LV name.
2509

2510
  This will generate a logical volume name for the given instance.
2511

2512
  """
2513
  results = []
2514
  for val in exts:
2515
    new_id = cfg.GenerateUniqueID()
2516
    results.append("%s%s" % (new_id, val))
2517
  return results
2518

    
2519

    
2520
def _GenerateMDDRBDBranch(cfg, primary, secondary, size, names):
2521
  """Generate a drbd device complete with its children.
2522

2523
  """
2524
  port = cfg.AllocatePort()
2525
  vgname = cfg.GetVGName()
2526
  dev_data = objects.Disk(dev_type="lvm", size=size,
2527
                          logical_id=(vgname, names[0]))
2528
  dev_meta = objects.Disk(dev_type="lvm", size=128,
2529
                          logical_id=(vgname, names[1]))
2530
  drbd_dev = objects.Disk(dev_type="drbd", size=size,
2531
                          logical_id = (primary, secondary, port),
2532
                          children = [dev_data, dev_meta])
2533
  return drbd_dev
2534

    
2535

    
2536
def _GenerateDiskTemplate(cfg, template_name,
2537
                          instance_name, primary_node,
2538
                          secondary_nodes, disk_sz, swap_sz):
2539
  """Generate the entire disk layout for a given template type.
2540

2541
  """
2542
  #TODO: compute space requirements
2543

    
2544
  vgname = cfg.GetVGName()
2545
  if template_name == "diskless":
2546
    disks = []
2547
  elif template_name == "plain":
2548
    if len(secondary_nodes) != 0:
2549
      raise errors.ProgrammerError("Wrong template configuration")
2550

    
2551
    names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
2552
    sda_dev = objects.Disk(dev_type="lvm", size=disk_sz,
2553
                           logical_id=(vgname, names[0]),
2554
                           iv_name = "sda")
2555
    sdb_dev = objects.Disk(dev_type="lvm", size=swap_sz,
2556
                           logical_id=(vgname, names[1]),
2557
                           iv_name = "sdb")
2558
    disks = [sda_dev, sdb_dev]
2559
  elif template_name == "local_raid1":
2560
    if len(secondary_nodes) != 0:
2561
      raise errors.ProgrammerError("Wrong template configuration")
2562

    
2563

    
2564
    names = _GenerateUniqueNames(cfg, [".sda_m1", ".sda_m2",
2565
                                       ".sdb_m1", ".sdb_m2"])
2566
    sda_dev_m1 = objects.Disk(dev_type="lvm", size=disk_sz,
2567
                              logical_id=(vgname, names[0]))
2568
    sda_dev_m2 = objects.Disk(dev_type="lvm", size=disk_sz,
2569
                              logical_id=(vgname, names[1]))
2570
    md_sda_dev = objects.Disk(dev_type="md_raid1", iv_name = "sda",
2571
                              size=disk_sz,
2572
                              children = [sda_dev_m1, sda_dev_m2])
2573
    sdb_dev_m1 = objects.Disk(dev_type="lvm", size=swap_sz,
2574
                              logical_id=(vgname, names[2]))
2575
    sdb_dev_m2 = objects.Disk(dev_type="lvm", size=swap_sz,
2576
                              logical_id=(vgname, names[3]))
2577
    md_sdb_dev = objects.Disk(dev_type="md_raid1", iv_name = "sdb",
2578
                              size=swap_sz,
2579
                              children = [sdb_dev_m1, sdb_dev_m2])
2580
    disks = [md_sda_dev, md_sdb_dev]
2581
  elif template_name == constants.DT_REMOTE_RAID1:
2582
    if len(secondary_nodes) != 1:
2583
      raise errors.ProgrammerError("Wrong template configuration")
2584
    remote_node = secondary_nodes[0]
2585
    names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2586
                                       ".sdb_data", ".sdb_meta"])
2587
    drbd_sda_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2588
                                         disk_sz, names[0:2])
2589
    md_sda_dev = objects.Disk(dev_type="md_raid1", iv_name="sda",
2590
                              children = [drbd_sda_dev], size=disk_sz)
2591
    drbd_sdb_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2592
                                         swap_sz, names[2:4])
2593
    md_sdb_dev = objects.Disk(dev_type="md_raid1", iv_name="sdb",
2594
                              children = [drbd_sdb_dev], size=swap_sz)
2595
    disks = [md_sda_dev, md_sdb_dev]
2596
  else:
2597
    raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
2598
  return disks
2599

    
2600

    
2601
def _GetInstanceInfoText(instance):
2602
  """Compute that text that should be added to the disk's metadata.
2603

2604
  """
2605
  return "originstname+%s" % instance.name
2606

    
2607

    
2608
def _CreateDisks(cfg, instance):
2609
  """Create all disks for an instance.
2610

2611
  This abstracts away some work from AddInstance.
2612

2613
  Args:
2614
    instance: the instance object
2615

2616
  Returns:
2617
    True or False showing the success of the creation process
2618

2619
  """
2620
  info = _GetInstanceInfoText(instance)
2621

    
2622
  for device in instance.disks:
2623
    logger.Info("creating volume %s for instance %s" %
2624
              (device.iv_name, instance.name))
2625
    #HARDCODE
2626
    for secondary_node in instance.secondary_nodes:
2627
      if not _CreateBlockDevOnSecondary(cfg, secondary_node, device, False,
2628
                                        info):
2629
        logger.Error("failed to create volume %s (%s) on secondary node %s!" %
2630
                     (device.iv_name, device, secondary_node))
2631
        return False
2632
    #HARDCODE
2633
    if not _CreateBlockDevOnPrimary(cfg, instance.primary_node, device, info):
2634
      logger.Error("failed to create volume %s on primary!" %
2635
                   device.iv_name)
2636
      return False
2637
  return True
2638

    
2639

    
2640
def _RemoveDisks(instance, cfg):
2641
  """Remove all disks for an instance.
2642

2643
  This abstracts away some work from `AddInstance()` and
2644
  `RemoveInstance()`. Note that in case some of the devices couldn't
2645
  be remove, the removal will continue with the other ones (compare
2646
  with `_CreateDisks()`).
2647

2648
  Args:
2649
    instance: the instance object
2650

2651
  Returns:
2652
    True or False showing the success of the removal proces
2653

2654
  """
2655
  logger.Info("removing block devices for instance %s" % instance.name)
2656

    
2657
  result = True
2658
  for device in instance.disks:
2659
    for node, disk in device.ComputeNodeTree(instance.primary_node):
2660
      cfg.SetDiskID(disk, node)
2661
      if not rpc.call_blockdev_remove(node, disk):
2662
        logger.Error("could not remove block device %s on node %s,"
2663
                     " continuing anyway" %
2664
                     (device.iv_name, node))
2665
        result = False
2666
  return result
2667

    
2668

    
2669
class LUCreateInstance(LogicalUnit):
2670
  """Create an instance.
2671

2672
  """
2673
  HPATH = "instance-add"
2674
  HTYPE = constants.HTYPE_INSTANCE
2675
  _OP_REQP = ["instance_name", "mem_size", "disk_size", "pnode",
2676
              "disk_template", "swap_size", "mode", "start", "vcpus",
2677
              "wait_for_sync", "ip_check"]
2678

    
2679
  def BuildHooksEnv(self):
2680
    """Build hooks env.
2681

2682
    This runs on master, primary and secondary nodes of the instance.
2683

2684
    """
2685
    env = {
2686
      "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
2687
      "INSTANCE_DISK_SIZE": self.op.disk_size,
2688
      "INSTANCE_SWAP_SIZE": self.op.swap_size,
2689
      "INSTANCE_ADD_MODE": self.op.mode,
2690
      }
2691
    if self.op.mode == constants.INSTANCE_IMPORT:
2692
      env["INSTANCE_SRC_NODE"] = self.op.src_node
2693
      env["INSTANCE_SRC_PATH"] = self.op.src_path
2694
      env["INSTANCE_SRC_IMAGE"] = self.src_image
2695

    
2696
    env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
2697
      primary_node=self.op.pnode,
2698
      secondary_nodes=self.secondaries,
2699
      status=self.instance_status,
2700
      os_type=self.op.os_type,
2701
      memory=self.op.mem_size,
2702
      vcpus=self.op.vcpus,
2703
      nics=[(self.inst_ip, self.op.bridge)],
2704
    ))
2705

    
2706
    nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
2707
          self.secondaries)
2708
    return env, nl, nl
2709

    
2710

    
2711
  def CheckPrereq(self):
2712
    """Check prerequisites.
2713

2714
    """
2715
    if self.op.mode not in (constants.INSTANCE_CREATE,
2716
                            constants.INSTANCE_IMPORT):
2717
      raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
2718
                                 self.op.mode)
2719

    
2720
    if self.op.mode == constants.INSTANCE_IMPORT:
2721
      src_node = getattr(self.op, "src_node", None)
2722
      src_path = getattr(self.op, "src_path", None)
2723
      if src_node is None or src_path is None:
2724
        raise errors.OpPrereqError("Importing an instance requires source"
2725
                                   " node and path options")
2726
      src_node_full = self.cfg.ExpandNodeName(src_node)
2727
      if src_node_full is None:
2728
        raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
2729
      self.op.src_node = src_node = src_node_full
2730

    
2731
      if not os.path.isabs(src_path):
2732
        raise errors.OpPrereqError("The source path must be absolute")
2733

    
2734
      export_info = rpc.call_export_info(src_node, src_path)
2735

    
2736
      if not export_info:
2737
        raise errors.OpPrereqError("No export found in dir %s" % src_path)
2738

    
2739
      if not export_info.has_section(constants.INISECT_EXP):
2740
        raise errors.ProgrammerError("Corrupted export config")
2741

    
2742
      ei_version = export_info.get(constants.INISECT_EXP, 'version')
2743
      if (int(ei_version) != constants.EXPORT_VERSION):
2744
        raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
2745
                                   (ei_version, constants.EXPORT_VERSION))
2746

    
2747
      if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
2748
        raise errors.OpPrereqError("Can't import instance with more than"
2749
                                   " one data disk")
2750

    
2751
      # FIXME: are the old os-es, disk sizes, etc. useful?
2752
      self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
2753
      diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
2754
                                                         'disk0_dump'))
2755
      self.src_image = diskimage
2756
    else: # INSTANCE_CREATE
2757
      if getattr(self.op, "os_type", None) is None:
2758
        raise errors.OpPrereqError("No guest OS specified")
2759

    
2760
    # check primary node
2761
    pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
2762
    if pnode is None:
2763
      raise errors.OpPrereqError("Primary node '%s' is unknown" %
2764
                                 self.op.pnode)
2765
    self.op.pnode = pnode.name
2766
    self.pnode = pnode
2767
    self.secondaries = []
2768
    # disk template and mirror node verification
2769
    if self.op.disk_template not in constants.DISK_TEMPLATES:
2770
      raise errors.OpPrereqError("Invalid disk template name")
2771

    
2772
    if self.op.disk_template == constants.DT_REMOTE_RAID1:
2773
      if getattr(self.op, "snode", None) is None:
2774
        raise errors.OpPrereqError("The 'remote_raid1' disk template needs"
2775
                                   " a mirror node")
2776

    
2777
      snode_name = self.cfg.ExpandNodeName(self.op.snode)
2778
      if snode_name is None:
2779
        raise errors.OpPrereqError("Unknown secondary node '%s'" %
2780
                                   self.op.snode)
2781
      elif snode_name == pnode.name:
2782
        raise errors.OpPrereqError("The secondary node cannot be"
2783
                                   " the primary node.")
2784
      self.secondaries.append(snode_name)
2785

    
2786
    # Check lv size requirements
2787
    nodenames = [pnode.name] + self.secondaries
2788
    nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
2789

    
2790
    # Required free disk space as a function of disk and swap space
2791
    req_size_dict = {
2792
      constants.DT_DISKLESS: 0,
2793
      constants.DT_PLAIN: self.op.disk_size + self.op.swap_size,
2794
      constants.DT_LOCAL_RAID1: (self.op.disk_size + self.op.swap_size) * 2,
2795
      # 256 MB are added for drbd metadata, 128MB for each drbd device
2796
      constants.DT_REMOTE_RAID1: self.op.disk_size + self.op.swap_size + 256,
2797
    }
2798

    
2799
    if self.op.disk_template not in req_size_dict:
2800
      raise errors.ProgrammerError("Disk template '%s' size requirement"
2801
                                   " is unknown" %  self.op.disk_template)
2802

    
2803
    req_size = req_size_dict[self.op.disk_template]
2804

    
2805
    for node in nodenames:
2806
      info = nodeinfo.get(node, None)
2807
      if not info:
2808
        raise errors.OpPrereqError("Cannot get current information"
2809
                                   " from node '%s'" % nodeinfo)
2810
      if req_size > info['vg_free']:
2811
        raise errors.OpPrereqError("Not enough disk space on target node %s."
2812
                                   " %d MB available, %d MB required" %
2813
                                   (node, info['vg_free'], req_size))
2814

    
2815
    # os verification
2816
    os_obj = rpc.call_os_get([pnode.name], self.op.os_type)[pnode.name]
2817
    if not isinstance(os_obj, objects.OS):
2818
      raise errors.OpPrereqError("OS '%s' not in supported os list for"
2819
                                 " primary node"  % self.op.os_type)
2820

    
2821
    # instance verification
2822
    hostname1 = utils.HostInfo(self.op.instance_name)
2823

    
2824
    self.op.instance_name = instance_name = hostname1.name
2825
    instance_list = self.cfg.GetInstanceList()
2826
    if instance_name in instance_list:
2827
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2828
                                 instance_name)
2829

    
2830
    ip = getattr(self.op, "ip", None)
2831
    if ip is None or ip.lower() == "none":
2832
      inst_ip = None
2833
    elif ip.lower() == "auto":
2834
      inst_ip = hostname1.ip
2835
    else:
2836
      if not utils.IsValidIP(ip):
2837
        raise errors.OpPrereqError("given IP address '%s' doesn't look"
2838
                                   " like a valid IP" % ip)
2839
      inst_ip = ip
2840
    self.inst_ip = inst_ip
2841

    
2842
    if self.op.start and not self.op.ip_check:
2843
      raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
2844
                                 " adding an instance in start mode")
2845

    
2846
    if self.op.ip_check:
2847
      command = ["fping", "-q", hostname1.ip]
2848
      result = utils.RunCmd(command)
2849
      if not result.failed:
2850
        raise errors.OpPrereqError("IP address %s of instance %s already"
2851
                                   " in use" % (hostname1.ip, instance_name))
2852

    
2853
    # bridge verification
2854
    bridge = getattr(self.op, "bridge", None)
2855
    if bridge is None:
2856
      self.op.bridge = self.cfg.GetDefBridge()
2857
    else:
2858
      self.op.bridge = bridge
2859

    
2860
    if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
2861
      raise errors.OpPrereqError("target bridge '%s' does not exist on"
2862
                                 " destination node '%s'" %
2863
                                 (self.op.bridge, pnode.name))
2864

    
2865
    if self.op.start:
2866
      self.instance_status = 'up'
2867
    else:
2868
      self.instance_status = 'down'
2869

    
2870
  def Exec(self, feedback_fn):
2871
    """Create and add the instance to the cluster.
2872

2873
    """
2874
    instance = self.op.instance_name
2875
    pnode_name = self.pnode.name
2876

    
2877
    nic = objects.NIC(bridge=self.op.bridge, mac=self.cfg.GenerateMAC())
2878
    if self.inst_ip is not None:
2879
      nic.ip = self.inst_ip
2880

    
2881
    disks = _GenerateDiskTemplate(self.cfg,
2882
                                  self.op.disk_template,
2883
                                  instance, pnode_name,
2884
                                  self.secondaries, self.op.disk_size,
2885
                                  self.op.swap_size)
2886

    
2887
    iobj = objects.Instance(name=instance, os=self.op.os_type,
2888
                            primary_node=pnode_name,
2889
                            memory=self.op.mem_size,
2890
                            vcpus=self.op.vcpus,
2891
                            nics=[nic], disks=disks,
2892
                            disk_template=self.op.disk_template,
2893
                            status=self.instance_status,
2894
                            )
2895

    
2896
    feedback_fn("* creating instance disks...")
2897
    if not _CreateDisks(self.cfg, iobj):
2898
      _RemoveDisks(iobj, self.cfg)
2899
      raise errors.OpExecError("Device creation failed, reverting...")
2900

    
2901
    feedback_fn("adding instance %s to cluster config" % instance)
2902

    
2903
    self.cfg.AddInstance(iobj)
2904

    
2905
    if self.op.wait_for_sync:
2906
      disk_abort = not _WaitForSync(self.cfg, iobj)
2907
    elif iobj.disk_template == constants.DT_REMOTE_RAID1:
2908
      # make sure the disks are not degraded (still sync-ing is ok)
2909
      time.sleep(15)
2910
      feedback_fn("* checking mirrors status")
2911
      disk_abort = not _WaitForSync(self.cfg, iobj, oneshot=True)
2912
    else:
2913
      disk_abort = False
2914

    
2915
    if disk_abort:
2916
      _RemoveDisks(iobj, self.cfg)
2917
      self.cfg.RemoveInstance(iobj.name)
2918
      raise errors.OpExecError("There are some degraded disks for"
2919
                               " this instance")
2920

    
2921
    feedback_fn("creating os for instance %s on node %s" %
2922
                (instance, pnode_name))
2923

    
2924
    if iobj.disk_template != constants.DT_DISKLESS:
2925
      if self.op.mode == constants.INSTANCE_CREATE:
2926
        feedback_fn("* running the instance OS create scripts...")
2927
        if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
2928
          raise errors.OpExecError("could not add os for instance %s"
2929
                                   " on node %s" %
2930
                                   (instance, pnode_name))
2931

    
2932
      elif self.op.mode == constants.INSTANCE_IMPORT:
2933
        feedback_fn("* running the instance OS import scripts...")
2934
        src_node = self.op.src_node
2935
        src_image = self.src_image
2936
        if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
2937
                                                src_node, src_image):
2938
          raise errors.OpExecError("Could not import os for instance"
2939
                                   " %s on node %s" %
2940
                                   (instance, pnode_name))
2941
      else:
2942
        # also checked in the prereq part
2943
        raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
2944
                                     % self.op.mode)
2945

    
2946
    if self.op.start:
2947
      logger.Info("starting instance %s on node %s" % (instance, pnode_name))
2948
      feedback_fn("* starting instance...")
2949
      if not rpc.call_instance_start(pnode_name, iobj, None):
2950
        raise errors.OpExecError("Could not start instance")
2951

    
2952

    
2953
class LUConnectConsole(NoHooksLU):
2954
  """Connect to an instance's console.
2955

2956
  This is somewhat special in that it returns the command line that
2957
  you need to run on the master node in order to connect to the
2958
  console.
2959

2960
  """
2961
  _OP_REQP = ["instance_name"]
2962

    
2963
  def CheckPrereq(self):
2964
    """Check prerequisites.
2965

2966
    This checks that the instance is in the cluster.
2967

2968
    """
2969
    instance = self.cfg.GetInstanceInfo(
2970
      self.cfg.ExpandInstanceName(self.op.instance_name))
2971
    if instance is None:
2972
      raise errors.OpPrereqError("Instance '%s' not known" %
2973
                                 self.op.instance_name)
2974
    self.instance = instance
2975

    
2976
  def Exec(self, feedback_fn):
2977
    """Connect to the console of an instance
2978

2979
    """
2980
    instance = self.instance
2981
    node = instance.primary_node
2982

    
2983
    node_insts = rpc.call_instance_list([node])[node]
2984
    if node_insts is False:
2985
      raise errors.OpExecError("Can't connect to node %s." % node)
2986

    
2987
    if instance.name not in node_insts:
2988
      raise errors.OpExecError("Instance %s is not running." % instance.name)
2989

    
2990
    logger.Debug("connecting to console of %s on %s" % (instance.name, node))
2991

    
2992
    hyper = hypervisor.GetHypervisor()
2993
    console_cmd = hyper.GetShellCommandForConsole(instance.name)
2994
    # build ssh cmdline
2995
    argv = ["ssh", "-q", "-t"]
2996
    argv.extend(ssh.KNOWN_HOSTS_OPTS)
2997
    argv.extend(ssh.BATCH_MODE_OPTS)
2998
    argv.append(node)
2999
    argv.append(console_cmd)
3000
    return "ssh", argv
3001

    
3002

    
3003
class LUAddMDDRBDComponent(LogicalUnit):
3004
  """Adda new mirror member to an instance's disk.
3005

3006
  """
3007
  HPATH = "mirror-add"
3008
  HTYPE = constants.HTYPE_INSTANCE
3009
  _OP_REQP = ["instance_name", "remote_node", "disk_name"]
3010

    
3011
  def BuildHooksEnv(self):
3012
    """Build hooks env.
3013

3014
    This runs on the master, the primary and all the secondaries.
3015

3016
    """
3017
    env = {
3018
      "NEW_SECONDARY": self.op.remote_node,
3019
      "DISK_NAME": self.op.disk_name,
3020
      }
3021
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3022
    nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
3023
          self.op.remote_node,] + list(self.instance.secondary_nodes)
3024
    return env, nl, nl
3025

    
3026
  def CheckPrereq(self):
3027
    """Check prerequisites.
3028

3029
    This checks that the instance is in the cluster.
3030

3031
    """
3032
    instance = self.cfg.GetInstanceInfo(
3033
      self.cfg.ExpandInstanceName(self.op.instance_name))
3034
    if instance is None:
3035
      raise errors.OpPrereqError("Instance '%s' not known" %
3036
                                 self.op.instance_name)
3037
    self.instance = instance
3038

    
3039
    remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
3040
    if remote_node is None:
3041
      raise errors.OpPrereqError("Node '%s' not known" % self.op.remote_node)
3042
    self.remote_node = remote_node
3043

    
3044
    if remote_node == instance.primary_node:
3045
      raise errors.OpPrereqError("The specified node is the primary node of"
3046
                                 " the instance.")
3047

    
3048
    if instance.disk_template != constants.DT_REMOTE_RAID1:
3049
      raise errors.OpPrereqError("Instance's disk layout is not"
3050
                                 " remote_raid1.")
3051
    for disk in instance.disks:
3052
      if disk.iv_name == self.op.disk_name:
3053
        break
3054
    else:
3055
      raise errors.OpPrereqError("Can't find this device ('%s') in the"
3056
                                 " instance." % self.op.disk_name)
3057
    if len(disk.children) > 1:
3058
      raise errors.OpPrereqError("The device already has two slave"
3059
                                 " devices.\n"
3060
                                 "This would create a 3-disk raid1"
3061
                                 " which we don't allow.")
3062
    self.disk = disk
3063

    
3064
  def Exec(self, feedback_fn):
3065
    """Add the mirror component
3066

3067
    """
3068
    disk = self.disk
3069
    instance = self.instance
3070

    
3071
    remote_node = self.remote_node
3072
    lv_names = [".%s_%s" % (disk.iv_name, suf) for suf in ["data", "meta"]]
3073
    names = _GenerateUniqueNames(self.cfg, lv_names)
3074
    new_drbd = _GenerateMDDRBDBranch(self.cfg, instance.primary_node,
3075
                                     remote_node, disk.size, names)
3076

    
3077
    logger.Info("adding new mirror component on secondary")
3078
    #HARDCODE
3079
    if not _CreateBlockDevOnSecondary(self.cfg, remote_node, new_drbd, False,
3080
                                      _GetInstanceInfoText(instance)):
3081
      raise errors.OpExecError("Failed to create new component on secondary"
3082
                               " node %s" % remote_node)
3083

    
3084
    logger.Info("adding new mirror component on primary")
3085
    #HARDCODE
3086
    if not _CreateBlockDevOnPrimary(self.cfg, instance.primary_node, new_drbd,
3087
                                    _GetInstanceInfoText(instance)):
3088
      # remove secondary dev
3089
      self.cfg.SetDiskID(new_drbd, remote_node)
3090
      rpc.call_blockdev_remove(remote_node, new_drbd)
3091
      raise errors.OpExecError("Failed to create volume on primary")
3092

    
3093
    # the device exists now
3094
    # call the primary node to add the mirror to md
3095
    logger.Info("adding new mirror component to md")
3096
    if not rpc.call_blockdev_addchild(instance.primary_node,
3097
                                           disk, new_drbd):
3098
      logger.Error("Can't add mirror compoment to md!")
3099
      self.cfg.SetDiskID(new_drbd, remote_node)
3100
      if not rpc.call_blockdev_remove(remote_node, new_drbd):
3101
        logger.Error("Can't rollback on secondary")
3102
      self.cfg.SetDiskID(new_drbd, instance.primary_node)
3103
      if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3104
        logger.Error("Can't rollback on primary")
3105
      raise errors.OpExecError("Can't add mirror component to md array")
3106

    
3107
    disk.children.append(new_drbd)
3108

    
3109
    self.cfg.AddInstance(instance)
3110

    
3111
    _WaitForSync(self.cfg, instance)
3112

    
3113
    return 0
3114

    
3115

    
3116
class LURemoveMDDRBDComponent(LogicalUnit):
3117
  """Remove a component from a remote_raid1 disk.
3118

3119
  """
3120
  HPATH = "mirror-remove"
3121
  HTYPE = constants.HTYPE_INSTANCE
3122
  _OP_REQP = ["instance_name", "disk_name", "disk_id"]
3123

    
3124
  def BuildHooksEnv(self):
3125
    """Build hooks env.
3126

3127
    This runs on the master, the primary and all the secondaries.
3128

3129
    """
3130
    env = {
3131
      "DISK_NAME": self.op.disk_name,
3132
      "DISK_ID": self.op.disk_id,
3133
      "OLD_SECONDARY": self.old_secondary,
3134
      }
3135
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3136
    nl = [self.sstore.GetMasterNode(),
3137
          self.instance.primary_node] + list(self.instance.secondary_nodes)
3138
    return env, nl, nl
3139

    
3140
  def CheckPrereq(self):
3141
    """Check prerequisites.
3142

3143
    This checks that the instance is in the cluster.
3144

3145
    """
3146
    instance = self.cfg.GetInstanceInfo(
3147
      self.cfg.ExpandInstanceName(self.op.instance_name))
3148
    if instance is None:
3149
      raise errors.OpPrereqError("Instance '%s' not known" %
3150
                                 self.op.instance_name)
3151
    self.instance = instance
3152

    
3153
    if instance.disk_template != constants.DT_REMOTE_RAID1:
3154
      raise errors.OpPrereqError("Instance's disk layout is not"
3155
                                 " remote_raid1.")
3156
    for disk in instance.disks:
3157
      if disk.iv_name == self.op.disk_name:
3158
        break
3159
    else:
3160
      raise errors.OpPrereqError("Can't find this device ('%s') in the"
3161
                                 " instance." % self.op.disk_name)
3162
    for child in disk.children:
3163
      if child.dev_type == "drbd" and child.logical_id[2] == self.op.disk_id:
3164
        break
3165
    else:
3166
      raise errors.OpPrereqError("Can't find the device with this port.")
3167

    
3168
    if len(disk.children) < 2:
3169
      raise errors.OpPrereqError("Cannot remove the last component from"
3170
                                 " a mirror.")
3171
    self.disk = disk
3172
    self.child = child
3173
    if self.child.logical_id[0] == instance.primary_node:
3174
      oid = 1
3175
    else:
3176
      oid = 0
3177
    self.old_secondary = self.child.logical_id[oid]
3178

    
3179
  def Exec(self, feedback_fn):
3180
    """Remove the mirror component
3181

3182
    """
3183
    instance = self.instance
3184
    disk = self.disk
3185
    child = self.child
3186
    logger.Info("remove mirror component")
3187
    self.cfg.SetDiskID(disk, instance.primary_node)
3188
    if not rpc.call_blockdev_removechild(instance.primary_node,
3189
                                              disk, child):
3190
      raise errors.OpExecError("Can't remove child from mirror.")
3191

    
3192
    for node in child.logical_id[:2]:
3193
      self.cfg.SetDiskID(child, node)
3194
      if not rpc.call_blockdev_remove(node, child):
3195
        logger.Error("Warning: failed to remove device from node %s,"
3196
                     " continuing operation." % node)
3197

    
3198
    disk.children.remove(child)
3199
    self.cfg.AddInstance(instance)
3200

    
3201

    
3202
class LUReplaceDisks(LogicalUnit):
3203
  """Replace the disks of an instance.
3204

3205
  """
3206
  HPATH = "mirrors-replace"
3207
  HTYPE = constants.HTYPE_INSTANCE
3208
  _OP_REQP = ["instance_name"]
3209

    
3210
  def BuildHooksEnv(self):
3211
    """Build hooks env.
3212

3213
    This runs on the master, the primary and all the secondaries.
3214

3215
    """
3216
    env = {
3217
      "NEW_SECONDARY": self.op.remote_node,
3218
      "OLD_SECONDARY": self.instance.secondary_nodes[0],
3219
      }
3220
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3221
    nl = [self.sstore.GetMasterNode(),
3222
          self.instance.primary_node] + list(self.instance.secondary_nodes)
3223
    return env, nl, nl
3224

    
3225
  def CheckPrereq(self):
3226
    """Check prerequisites.
3227

3228
    This checks that the instance is in the cluster.
3229

3230
    """
3231
    instance = self.cfg.GetInstanceInfo(
3232
      self.cfg.ExpandInstanceName(self.op.instance_name))
3233
    if instance is None:
3234
      raise errors.OpPrereqError("Instance '%s' not known" %
3235
                                 self.op.instance_name)
3236
    self.instance = instance
3237

    
3238
    if instance.disk_template != constants.DT_REMOTE_RAID1:
3239
      raise errors.OpPrereqError("Instance's disk layout is not"
3240
                                 " remote_raid1.")
3241

    
3242
    if len(instance.secondary_nodes) != 1:
3243
      raise errors.OpPrereqError("The instance has a strange layout,"
3244
                                 " expected one secondary but found %d" %
3245
                                 len(instance.secondary_nodes))
3246

    
3247
    remote_node = getattr(self.op, "remote_node", None)
3248
    if remote_node is None:
3249
      remote_node = instance.secondary_nodes[0]
3250
    else:
3251
      remote_node = self.cfg.ExpandNodeName(remote_node)
3252
      if remote_node is None:
3253
        raise errors.OpPrereqError("Node '%s' not known" %
3254
                                   self.op.remote_node)
3255
    if remote_node == instance.primary_node:
3256
      raise errors.OpPrereqError("The specified node is the primary node of"
3257
                                 " the instance.")
3258
    self.op.remote_node = remote_node
3259

    
3260
  def Exec(self, feedback_fn):
3261
    """Replace the disks of an instance.
3262

3263
    """
3264
    instance = self.instance
3265
    iv_names = {}
3266
    # start of work
3267
    remote_node = self.op.remote_node
3268
    cfg = self.cfg
3269
    for dev in instance.disks:
3270
      size = dev.size
3271
      lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3272
      names = _GenerateUniqueNames(cfg, lv_names)
3273
      new_drbd = _GenerateMDDRBDBranch(cfg, instance.primary_node,
3274
                                       remote_node, size, names)
3275
      iv_names[dev.iv_name] = (dev, dev.children[0], new_drbd)
3276
      logger.Info("adding new mirror component on secondary for %s" %
3277
                  dev.iv_name)
3278
      #HARDCODE
3279
      if not _CreateBlockDevOnSecondary(cfg, remote_node, new_drbd, False,
3280
                                        _GetInstanceInfoText(instance)):
3281
        raise errors.OpExecError("Failed to create new component on"
3282
                                 " secondary node %s\n"
3283
                                 "Full abort, cleanup manually!" %
3284
                                 remote_node)
3285

    
3286
      logger.Info("adding new mirror component on primary")
3287
      #HARDCODE
3288
      if not _CreateBlockDevOnPrimary(cfg, instance.primary_node, new_drbd,
3289
                                      _GetInstanceInfoText(instance)):
3290
        # remove secondary dev
3291
        cfg.SetDiskID(new_drbd, remote_node)
3292
        rpc.call_blockdev_remove(remote_node, new_drbd)
3293
        raise errors.OpExecError("Failed to create volume on primary!\n"
3294
                                 "Full abort, cleanup manually!!")
3295

    
3296
      # the device exists now
3297
      # call the primary node to add the mirror to md
3298
      logger.Info("adding new mirror component to md")
3299
      if not rpc.call_blockdev_addchild(instance.primary_node, dev,
3300
                                        new_drbd):
3301
        logger.Error("Can't add mirror compoment to md!")
3302
        cfg.SetDiskID(new_drbd, remote_node)
3303
        if not rpc.call_blockdev_remove(remote_node, new_drbd):
3304
          logger.Error("Can't rollback on secondary")
3305
        cfg.SetDiskID(new_drbd, instance.primary_node)
3306
        if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3307
          logger.Error("Can't rollback on primary")
3308
        raise errors.OpExecError("Full abort, cleanup manually!!")
3309

    
3310
      dev.children.append(new_drbd)
3311
      cfg.AddInstance(instance)
3312

    
3313
    # this can fail as the old devices are degraded and _WaitForSync
3314
    # does a combined result over all disks, so we don't check its
3315
    # return value
3316
    _WaitForSync(cfg, instance, unlock=True)
3317

    
3318
    # so check manually all the devices
3319
    for name in iv_names:
3320
      dev, child, new_drbd = iv_names[name]
3321
      cfg.SetDiskID(dev, instance.primary_node)
3322
      is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3323
      if is_degr:
3324
        raise errors.OpExecError("MD device %s is degraded!" % name)
3325
      cfg.SetDiskID(new_drbd, instance.primary_node)
3326
      is_degr = rpc.call_blockdev_find(instance.primary_node, new_drbd)[5]
3327
      if is_degr:
3328
        raise errors.OpExecError("New drbd device %s is degraded!" % name)
3329

    
3330
    for name in iv_names:
3331
      dev, child, new_drbd = iv_names[name]
3332
      logger.Info("remove mirror %s component" % name)
3333
      cfg.SetDiskID(dev, instance.primary_node)
3334
      if not rpc.call_blockdev_removechild(instance.primary_node,
3335
                                                dev, child):
3336
        logger.Error("Can't remove child from mirror, aborting"
3337
                     " *this device cleanup*.\nYou need to cleanup manually!!")
3338
        continue
3339

    
3340
      for node in child.logical_id[:2]:
3341
        logger.Info("remove child device on %s" % node)
3342
        cfg.SetDiskID(child, node)
3343
        if not rpc.call_blockdev_remove(node, child):
3344
          logger.Error("Warning: failed to remove device from node %s,"
3345
                       " continuing operation." % node)
3346

    
3347
      dev.children.remove(child)
3348

    
3349
      cfg.AddInstance(instance)
3350

    
3351

    
3352
class LUQueryInstanceData(NoHooksLU):
3353
  """Query runtime instance data.
3354

3355
  """
3356
  _OP_REQP = ["instances"]
3357

    
3358
  def CheckPrereq(self):
3359
    """Check prerequisites.
3360

3361
    This only checks the optional instance list against the existing names.
3362

3363
    """
3364
    if not isinstance(self.op.instances, list):
3365
      raise errors.OpPrereqError("Invalid argument type 'instances'")
3366
    if self.op.instances:
3367
      self.wanted_instances = []
3368
      names = self.op.instances
3369
      for name in names:
3370
        instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
3371
        if instance is None:
3372
          raise errors.OpPrereqError("No such instance name '%s'" % name)
3373
      self.wanted_instances.append(instance)
3374
    else:
3375
      self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
3376
                               in self.cfg.GetInstanceList()]
3377
    return
3378

    
3379

    
3380
  def _ComputeDiskStatus(self, instance, snode, dev):
3381
    """Compute block device status.
3382

3383
    """
3384
    self.cfg.SetDiskID(dev, instance.primary_node)
3385
    dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
3386
    if dev.dev_type == "drbd":
3387
      # we change the snode then (otherwise we use the one passed in)
3388
      if dev.logical_id[0] == instance.primary_node:
3389
        snode = dev.logical_id[1]
3390
      else:
3391
        snode = dev.logical_id[0]
3392

    
3393
    if snode:
3394
      self.cfg.SetDiskID(dev, snode)
3395
      dev_sstatus = rpc.call_blockdev_find(snode, dev)
3396
    else:
3397
      dev_sstatus = None
3398

    
3399
    if dev.children:
3400
      dev_children = [self._ComputeDiskStatus(instance, snode, child)
3401
                      for child in dev.children]
3402
    else:
3403
      dev_children = []
3404

    
3405
    data = {
3406
      "iv_name": dev.iv_name,
3407
      "dev_type": dev.dev_type,
3408
      "logical_id": dev.logical_id,
3409
      "physical_id": dev.physical_id,
3410
      "pstatus": dev_pstatus,
3411
      "sstatus": dev_sstatus,
3412
      "children": dev_children,
3413
      }
3414

    
3415
    return data
3416

    
3417
  def Exec(self, feedback_fn):
3418
    """Gather and return data"""
3419
    result = {}
3420
    for instance in self.wanted_instances:
3421
      remote_info = rpc.call_instance_info(instance.primary_node,
3422
                                                instance.name)
3423
      if remote_info and "state" in remote_info:
3424
        remote_state = "up"
3425
      else:
3426
        remote_state = "down"
3427
      if instance.status == "down":
3428
        config_state = "down"
3429
      else:
3430
        config_state = "up"
3431

    
3432
      disks = [self._ComputeDiskStatus(instance, None, device)
3433
               for device in instance.disks]
3434

    
3435
      idict = {
3436
        "name": instance.name,
3437
        "config_state": config_state,
3438
        "run_state": remote_state,
3439
        "pnode": instance.primary_node,
3440
        "snodes": instance.secondary_nodes,
3441
        "os": instance.os,
3442
        "memory": instance.memory,
3443
        "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
3444
        "disks": disks,
3445
        }
3446

    
3447
      result[instance.name] = idict
3448

    
3449
    return result
3450

    
3451

    
3452
class LUSetInstanceParms(LogicalUnit):
3453
  """Modifies an instances's parameters.
3454

3455
  """
3456
  HPATH = "instance-modify"
3457
  HTYPE = constants.HTYPE_INSTANCE
3458
  _OP_REQP = ["instance_name"]
3459

    
3460
  def BuildHooksEnv(self):
3461
    """Build hooks env.
3462

3463
    This runs on the master, primary and secondaries.
3464

3465
    """
3466
    args = dict()
3467
    if self.mem:
3468
      args['memory'] = self.mem
3469
    if self.vcpus:
3470
      args['vcpus'] = self.vcpus
3471
    if self.do_ip or self.do_bridge:
3472
      if self.do_ip:
3473
        ip = self.ip
3474
      else:
3475
        ip = self.instance.nics[0].ip
3476
      if self.bridge:
3477
        bridge = self.bridge
3478
      else:
3479
        bridge = self.instance.nics[0].bridge
3480
      args['nics'] = [(ip, bridge)]
3481
    env = _BuildInstanceHookEnvByObject(self.instance, override=args)
3482
    nl = [self.sstore.GetMasterNode(),
3483
          self.instance.primary_node] + list(self.instance.secondary_nodes)
3484
    return env, nl, nl
3485

    
3486
  def CheckPrereq(self):
3487
    """Check prerequisites.
3488

3489
    This only checks the instance list against the existing names.
3490

3491
    """
3492
    self.mem = getattr(self.op, "mem", None)
3493
    self.vcpus = getattr(self.op, "vcpus", None)
3494
    self.ip = getattr(self.op, "ip", None)
3495
    self.bridge = getattr(self.op, "bridge", None)
3496
    if [self.mem, self.vcpus, self.ip, self.bridge].count(None) == 4:
3497
      raise errors.OpPrereqError("No changes submitted")
3498
    if self.mem is not None:
3499
      try:
3500
        self.mem = int(self.mem)
3501
      except ValueError, err:
3502
        raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
3503
    if self.vcpus is not None:
3504
      try:
3505
        self.vcpus = int(self.vcpus)
3506
      except ValueError, err:
3507
        raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
3508
    if self.ip is not None:
3509
      self.do_ip = True
3510
      if self.ip.lower() == "none":
3511
        self.ip = None
3512
      else:
3513
        if not utils.IsValidIP(self.ip):
3514
          raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
3515
    else:
3516
      self.do_ip = False
3517
    self.do_bridge = (self.bridge is not None)
3518

    
3519
    instance = self.cfg.GetInstanceInfo(
3520
      self.cfg.ExpandInstanceName(self.op.instance_name))
3521
    if instance is None:
3522
      raise errors.OpPrereqError("No such instance name '%s'" %
3523
                                 self.op.instance_name)
3524
    self.op.instance_name = instance.name
3525
    self.instance = instance
3526
    return
3527

    
3528
  def Exec(self, feedback_fn):
3529
    """Modifies an instance.
3530

3531
    All parameters take effect only at the next restart of the instance.
3532
    """
3533
    result = []
3534
    instance = self.instance
3535
    if self.mem:
3536
      instance.memory = self.mem
3537
      result.append(("mem", self.mem))
3538
    if self.vcpus:
3539
      instance.vcpus = self.vcpus
3540
      result.append(("vcpus",  self.vcpus))
3541
    if self.do_ip:
3542
      instance.nics[0].ip = self.ip
3543
      result.append(("ip", self.ip))
3544
    if self.bridge:
3545
      instance.nics[0].bridge = self.bridge
3546
      result.append(("bridge", self.bridge))
3547

    
3548
    self.cfg.AddInstance(instance)
3549

    
3550
    return result
3551

    
3552

    
3553
class LUQueryExports(NoHooksLU):
3554
  """Query the exports list
3555

3556
  """
3557
  _OP_REQP = []
3558

    
3559
  def CheckPrereq(self):
3560
    """Check that the nodelist contains only existing nodes.
3561

3562
    """
3563
    self.nodes = _GetWantedNodes(self, getattr(self.op, "nodes", None))
3564

    
3565
  def Exec(self, feedback_fn):
3566
    """Compute the list of all the exported system images.
3567

3568
    Returns:
3569
      a dictionary with the structure node->(export-list)
3570
      where export-list is a list of the instances exported on
3571
      that node.
3572

3573
    """
3574
    return rpc.call_export_list(self.nodes)
3575

    
3576

    
3577
class LUExportInstance(LogicalUnit):
3578
  """Export an instance to an image in the cluster.
3579

3580
  """
3581
  HPATH = "instance-export"
3582
  HTYPE = constants.HTYPE_INSTANCE
3583
  _OP_REQP = ["instance_name", "target_node", "shutdown"]
3584

    
3585
  def BuildHooksEnv(self):
3586
    """Build hooks env.
3587

3588
    This will run on the master, primary node and target node.
3589

3590
    """
3591
    env = {
3592
      "EXPORT_NODE": self.op.target_node,
3593
      "EXPORT_DO_SHUTDOWN": self.op.shutdown,
3594
      }
3595
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3596
    nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
3597
          self.op.target_node]
3598
    return env, nl, nl
3599

    
3600
  def CheckPrereq(self):
3601
    """Check prerequisites.
3602

3603
    This checks that the instance name is a valid one.
3604

3605
    """
3606
    instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
3607
    self.instance = self.cfg.GetInstanceInfo(instance_name)
3608
    if self.instance is None:
3609
      raise errors.OpPrereqError("Instance '%s' not found" %
3610
                                 self.op.instance_name)
3611

    
3612
    # node verification
3613
    dst_node_short = self.cfg.ExpandNodeName(self.op.target_node)
3614
    self.dst_node = self.cfg.GetNodeInfo(dst_node_short)
3615

    
3616
    if self.dst_node is None:
3617
      raise errors.OpPrereqError("Destination node '%s' is unknown." %
3618
                                 self.op.target_node)
3619
    self.op.target_node = self.dst_node.name
3620

    
3621
  def Exec(self, feedback_fn):
3622
    """Export an instance to an image in the cluster.
3623

3624
    """
3625
    instance = self.instance
3626
    dst_node = self.dst_node
3627
    src_node = instance.primary_node
3628
    # shutdown the instance, unless requested not to do so
3629
    if self.op.shutdown:
3630
      op = opcodes.OpShutdownInstance(instance_name=instance.name)
3631
      self.processor.ChainOpCode(op, feedback_fn)
3632

    
3633
    vgname = self.cfg.GetVGName()
3634

    
3635
    snap_disks = []
3636

    
3637
    try:
3638
      for disk in instance.disks:
3639
        if disk.iv_name == "sda":
3640
          # new_dev_name will be a snapshot of an lvm leaf of the one we passed
3641
          new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
3642

    
3643
          if not new_dev_name:
3644
            logger.Error("could not snapshot block device %s on node %s" %
3645
                         (disk.logical_id[1], src_node))
3646
          else:
3647
            new_dev = objects.Disk(dev_type="lvm", size=disk.size,
3648
                                      logical_id=(vgname, new_dev_name),
3649
                                      physical_id=(vgname, new_dev_name),
3650
                                      iv_name=disk.iv_name)
3651
            snap_disks.append(new_dev)
3652

    
3653
    finally:
3654
      if self.op.shutdown:
3655
        op = opcodes.OpStartupInstance(instance_name=instance.name,
3656
                                       force=False)
3657
        self.processor.ChainOpCode(op, feedback_fn)
3658

    
3659
    # TODO: check for size
3660

    
3661
    for dev in snap_disks:
3662
      if not rpc.call_snapshot_export(src_node, dev, dst_node.name,
3663
                                           instance):
3664
        logger.Error("could not export block device %s from node"
3665
                     " %s to node %s" %
3666
                     (dev.logical_id[1], src_node, dst_node.name))
3667
      if not rpc.call_blockdev_remove(src_node, dev):
3668
        logger.Error("could not remove snapshot block device %s from"
3669
                     " node %s" % (dev.logical_id[1], src_node))
3670

    
3671
    if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
3672
      logger.Error("could not finalize export for instance %s on node %s" %
3673
                   (instance.name, dst_node.name))
3674

    
3675
    nodelist = self.cfg.GetNodeList()
3676
    nodelist.remove(dst_node.name)
3677

    
3678
    # on one-node clusters nodelist will be empty after the removal
3679
    # if we proceed the backup would be removed because OpQueryExports
3680
    # substitutes an empty list with the full cluster node list.
3681
    if nodelist:
3682
      op = opcodes.OpQueryExports(nodes=nodelist)
3683
      exportlist = self.processor.ChainOpCode(op, feedback_fn)
3684
      for node in exportlist:
3685
        if instance.name in exportlist[node]:
3686
          if not rpc.call_export_remove(node, instance.name):
3687
            logger.Error("could not remove older export for instance %s"
3688
                         " on node %s" % (instance.name, node))
3689

    
3690

    
3691
class TagsLU(NoHooksLU):
3692
  """Generic tags LU.
3693

3694
  This is an abstract class which is the parent of all the other tags LUs.
3695

3696
  """
3697
  def CheckPrereq(self):
3698
    """Check prerequisites.
3699

3700
    """
3701
    if self.op.kind == constants.TAG_CLUSTER:
3702
      self.target = self.cfg.GetClusterInfo()
3703
    elif self.op.kind == constants.TAG_NODE:
3704
      name = self.cfg.ExpandNodeName(self.op.name)
3705
      if name is None:
3706
        raise errors.OpPrereqError("Invalid node name (%s)" %
3707
                                   (self.op.name,))
3708
      self.op.name = name
3709
      self.target = self.cfg.GetNodeInfo(name)
3710
    elif self.op.kind == constants.TAG_INSTANCE:
3711
      name = self.cfg.ExpandInstanceName(self.op.name)
3712
      if name is None:
3713
        raise errors.OpPrereqError("Invalid instance name (%s)" %
3714
                                   (self.op.name,))
3715
      self.op.name = name
3716
      self.target = self.cfg.GetInstanceInfo(name)
3717
    else:
3718
      raise errors.OpPrereqError("Wrong tag type requested (%s)" %
3719
                                 str(self.op.kind))
3720

    
3721

    
3722
class LUGetTags(TagsLU):
3723
  """Returns the tags of a given object.
3724

3725
  """
3726
  _OP_REQP = ["kind", "name"]
3727

    
3728
  def Exec(self, feedback_fn):
3729
    """Returns the tag list.
3730

3731
    """
3732
    return self.target.GetTags()
3733

    
3734

    
3735
class LUAddTags(TagsLU):
3736
  """Sets a tag on a given object.
3737

3738
  """
3739
  _OP_REQP = ["kind", "name", "tags"]
3740

    
3741
  def CheckPrereq(self):
3742
    """Check prerequisites.
3743

3744
    This checks the type and length of the tag name and value.
3745

3746
    """
3747
    TagsLU.CheckPrereq(self)
3748
    for tag in self.op.tags:
3749
      objects.TaggableObject.ValidateTag(tag)
3750

    
3751
  def Exec(self, feedback_fn):
3752
    """Sets the tag.
3753

3754
    """
3755
    try:
3756
      for tag in self.op.tags:
3757
        self.target.AddTag(tag)
3758
    except errors.TagError, err:
3759
      raise errors.OpExecError("Error while setting tag: %s" % str(err))
3760
    try:
3761
      self.cfg.Update(self.target)
3762
    except errors.ConfigurationError:
3763
      raise errors.OpRetryError("There has been a modification to the"
3764
                                " config file and the operation has been"
3765
                                " aborted. Please retry.")
3766

    
3767

    
3768
class LUDelTags(TagsLU):
3769
  """Delete a list of tags from a given object.
3770

3771
  """
3772
  _OP_REQP = ["kind", "name", "tags"]
3773

    
3774
  def CheckPrereq(self):
3775
    """Check prerequisites.
3776

3777
    This checks that we have the given tag.
3778

3779
    """
3780
    TagsLU.CheckPrereq(self)
3781
    for tag in self.op.tags:
3782
      objects.TaggableObject.ValidateTag(tag)
3783
    del_tags = frozenset(self.op.tags)
3784
    cur_tags = self.target.GetTags()
3785
    if not del_tags <= cur_tags:
3786
      diff_tags = del_tags - cur_tags
3787
      diff_names = ["'%s'" % tag for tag in diff_tags]
3788
      diff_names.sort()
3789
      raise errors.OpPrereqError("Tag(s) %s not found" %
3790
                                 (",".join(diff_names)))
3791

    
3792
  def Exec(self, feedback_fn):
3793
    """Remove the tag from the object.
3794

3795
    """
3796
    for tag in self.op.tags:
3797
      self.target.RemoveTag(tag)
3798
    try:
3799
      self.cfg.Update(self.target)
3800
    except errors.ConfigurationError:
3801
      raise errors.OpRetryError("There has been a modification to the"
3802
                                " config file and the operation has been"
3803
                                " aborted. Please retry.")