Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ a7ba5e53

History | View | Annotate | Download (116.5 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0613,W0201
25

    
26
import os
27
import os.path
28
import sha
29
import socket
30
import time
31
import tempfile
32
import re
33
import platform
34

    
35
from ganeti import rpc
36
from ganeti import ssh
37
from ganeti import logger
38
from ganeti import utils
39
from ganeti import errors
40
from ganeti import hypervisor
41
from ganeti import config
42
from ganeti import constants
43
from ganeti import objects
44
from ganeti import opcodes
45
from ganeti import ssconf
46

    
47
class LogicalUnit(object):
48
  """Logical Unit base class.
49

50
  Subclasses must follow these rules:
51
    - implement CheckPrereq which also fills in the opcode instance
52
      with all the fields (even if as None)
53
    - implement Exec
54
    - implement BuildHooksEnv
55
    - redefine HPATH and HTYPE
56
    - optionally redefine their run requirements (REQ_CLUSTER,
57
      REQ_MASTER); note that all commands require root permissions
58

59
  """
60
  HPATH = None
61
  HTYPE = None
62
  _OP_REQP = []
63
  REQ_CLUSTER = True
64
  REQ_MASTER = True
65

    
66
  def __init__(self, processor, op, cfg, sstore):
67
    """Constructor for LogicalUnit.
68

69
    This needs to be overriden in derived classes in order to check op
70
    validity.
71

72
    """
73
    self.processor = processor
74
    self.op = op
75
    self.cfg = cfg
76
    self.sstore = sstore
77
    for attr_name in self._OP_REQP:
78
      attr_val = getattr(op, attr_name, None)
79
      if attr_val is None:
80
        raise errors.OpPrereqError("Required parameter '%s' missing" %
81
                                   attr_name)
82
    if self.REQ_CLUSTER:
83
      if not cfg.IsCluster():
84
        raise errors.OpPrereqError("Cluster not initialized yet,"
85
                                   " use 'gnt-cluster init' first.")
86
      if self.REQ_MASTER:
87
        master = sstore.GetMasterNode()
88
        if master != socket.gethostname():
89
          raise errors.OpPrereqError("Commands must be run on the master"
90
                                     " node %s" % master)
91

    
92
  def CheckPrereq(self):
93
    """Check prerequisites for this LU.
94

95
    This method should check that the prerequisites for the execution
96
    of this LU are fulfilled. It can do internode communication, but
97
    it should be idempotent - no cluster or system changes are
98
    allowed.
99

100
    The method should raise errors.OpPrereqError in case something is
101
    not fulfilled. Its return value is ignored.
102

103
    This method should also update all the parameters of the opcode to
104
    their canonical form; e.g. a short node name must be fully
105
    expanded after this method has successfully completed (so that
106
    hooks, logging, etc. work correctly).
107

108
    """
109
    raise NotImplementedError
110

    
111
  def Exec(self, feedback_fn):
112
    """Execute the LU.
113

114
    This method should implement the actual work. It should raise
115
    errors.OpExecError for failures that are somewhat dealt with in
116
    code, or expected.
117

118
    """
119
    raise NotImplementedError
120

    
121
  def BuildHooksEnv(self):
122
    """Build hooks environment for this LU.
123

124
    This method should return a three-node tuple consisting of: a dict
125
    containing the environment that will be used for running the
126
    specific hook for this LU, a list of node names on which the hook
127
    should run before the execution, and a list of node names on which
128
    the hook should run after the execution.
129

130
    The keys of the dict must not have 'GANETI_' prefixed as this will
131
    be handled in the hooks runner. Also note additional keys will be
132
    added by the hooks runner. If the LU doesn't define any
133
    environment, an empty dict (and not None) should be returned.
134

135
    As for the node lists, the master should not be included in the
136
    them, as it will be added by the hooks runner in case this LU
137
    requires a cluster to run on (otherwise we don't have a node
138
    list). No nodes should be returned as an empty list (and not
139
    None).
140

141
    Note that if the HPATH for a LU class is None, this function will
142
    not be called.
143

144
    """
145
    raise NotImplementedError
146

    
147

    
148
class NoHooksLU(LogicalUnit):
149
  """Simple LU which runs no hooks.
150

151
  This LU is intended as a parent for other LogicalUnits which will
152
  run no hooks, in order to reduce duplicate code.
153

154
  """
155
  HPATH = None
156
  HTYPE = None
157

    
158
  def BuildHooksEnv(self):
159
    """Build hooks env.
160

161
    This is a no-op, since we don't run hooks.
162

163
    """
164
    return
165

    
166

    
167
def _GetWantedNodes(lu, nodes):
168
  """Returns list of checked and expanded node names.
169

170
  Args:
171
    nodes: List of nodes (strings) or None for all
172

173
  """
174
  if not isinstance(nodes, list):
175
    raise errors.OpPrereqError("Invalid argument type 'nodes'")
176

    
177
  if nodes:
178
    wanted = []
179

    
180
    for name in nodes:
181
      node = lu.cfg.ExpandNodeName(name)
182
      if node is None:
183
        raise errors.OpPrereqError("No such node name '%s'" % name)
184
      wanted.append(node)
185

    
186
  else:
187
    wanted = lu.cfg.GetNodeList()
188
  return utils.NiceSort(wanted)
189

    
190

    
191
def _GetWantedInstances(lu, instances):
192
  """Returns list of checked and expanded instance names.
193

194
  Args:
195
    instances: List of instances (strings) or None for all
196

197
  """
198
  if not isinstance(instances, list):
199
    raise errors.OpPrereqError("Invalid argument type 'instances'")
200

    
201
  if instances:
202
    wanted = []
203

    
204
    for name in instances:
205
      instance = lu.cfg.ExpandInstanceName(name)
206
      if instance is None:
207
        raise errors.OpPrereqError("No such instance name '%s'" % name)
208
      wanted.append(instance)
209

    
210
  else:
211
    wanted = lu.cfg.GetInstanceList()
212
  return utils.NiceSort(wanted)
213

    
214

    
215
def _CheckOutputFields(static, dynamic, selected):
216
  """Checks whether all selected fields are valid.
217

218
  Args:
219
    static: Static fields
220
    dynamic: Dynamic fields
221

222
  """
223
  static_fields = frozenset(static)
224
  dynamic_fields = frozenset(dynamic)
225

    
226
  all_fields = static_fields | dynamic_fields
227

    
228
  if not all_fields.issuperset(selected):
229
    raise errors.OpPrereqError("Unknown output fields selected: %s"
230
                               % ",".join(frozenset(selected).
231
                                          difference(all_fields)))
232

    
233

    
234
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
235
                          memory, vcpus, nics):
236
  """Builds instance related env variables for hooks from single variables.
237

238
  Args:
239
    secondary_nodes: List of secondary nodes as strings
240
  """
241
  env = {
242
    "INSTANCE_NAME": name,
243
    "INSTANCE_PRIMARY": primary_node,
244
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
245
    "INSTANCE_OS_TYPE": os_type,
246
    "INSTANCE_STATUS": status,
247
    "INSTANCE_MEMORY": memory,
248
    "INSTANCE_VCPUS": vcpus,
249
  }
250

    
251
  if nics:
252
    nic_count = len(nics)
253
    for idx, (ip, bridge) in enumerate(nics):
254
      if ip is None:
255
        ip = ""
256
      env["INSTANCE_NIC%d_IP" % idx] = ip
257
      env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
258
  else:
259
    nic_count = 0
260

    
261
  env["INSTANCE_NIC_COUNT"] = nic_count
262

    
263
  return env
264

    
265

    
266
def _BuildInstanceHookEnvByObject(instance, override=None):
267
  """Builds instance related env variables for hooks from an object.
268

269
  Args:
270
    instance: objects.Instance object of instance
271
    override: dict of values to override
272
  """
273
  args = {
274
    'name': instance.name,
275
    'primary_node': instance.primary_node,
276
    'secondary_nodes': instance.secondary_nodes,
277
    'os_type': instance.os,
278
    'status': instance.os,
279
    'memory': instance.memory,
280
    'vcpus': instance.vcpus,
281
    'nics': [(nic.ip, nic.bridge) for nic in instance.nics],
282
  }
283
  if override:
284
    args.update(override)
285
  return _BuildInstanceHookEnv(**args)
286

    
287

    
288
def _UpdateEtcHosts(fullnode, ip):
289
  """Ensure a node has a correct entry in /etc/hosts.
290

291
  Args:
292
    fullnode - Fully qualified domain name of host. (str)
293
    ip       - IPv4 address of host (str)
294

295
  """
296
  node = fullnode.split(".", 1)[0]
297

    
298
  f = open('/etc/hosts', 'r+')
299

    
300
  inthere = False
301

    
302
  save_lines = []
303
  add_lines = []
304
  removed = False
305

    
306
  while True:
307
    rawline = f.readline()
308

    
309
    if not rawline:
310
      # End of file
311
      break
312

    
313
    line = rawline.split('\n')[0]
314

    
315
    # Strip off comments
316
    line = line.split('#')[0]
317

    
318
    if not line:
319
      # Entire line was comment, skip
320
      save_lines.append(rawline)
321
      continue
322

    
323
    fields = line.split()
324

    
325
    haveall = True
326
    havesome = False
327
    for spec in [ ip, fullnode, node ]:
328
      if spec not in fields:
329
        haveall = False
330
      if spec in fields:
331
        havesome = True
332

    
333
    if haveall:
334
      inthere = True
335
      save_lines.append(rawline)
336
      continue
337

    
338
    if havesome and not haveall:
339
      # Line (old, or manual?) which is missing some.  Remove.
340
      removed = True
341
      continue
342

    
343
    save_lines.append(rawline)
344

    
345
  if not inthere:
346
    add_lines.append('%s\t%s %s\n' % (ip, fullnode, node))
347

    
348
  if removed:
349
    if add_lines:
350
      save_lines = save_lines + add_lines
351

    
352
    # We removed a line, write a new file and replace old.
353
    fd, tmpname = tempfile.mkstemp('tmp', 'hosts_', '/etc')
354
    newfile = os.fdopen(fd, 'w')
355
    newfile.write(''.join(save_lines))
356
    newfile.close()
357
    os.rename(tmpname, '/etc/hosts')
358

    
359
  elif add_lines:
360
    # Simply appending a new line will do the trick.
361
    f.seek(0, 2)
362
    for add in add_lines:
363
      f.write(add)
364

    
365
  f.close()
366

    
367

    
368
def _UpdateKnownHosts(fullnode, ip, pubkey):
369
  """Ensure a node has a correct known_hosts entry.
370

371
  Args:
372
    fullnode - Fully qualified domain name of host. (str)
373
    ip       - IPv4 address of host (str)
374
    pubkey   - the public key of the cluster
375

376
  """
377
  if os.path.exists(constants.SSH_KNOWN_HOSTS_FILE):
378
    f = open(constants.SSH_KNOWN_HOSTS_FILE, 'r+')
379
  else:
380
    f = open(constants.SSH_KNOWN_HOSTS_FILE, 'w+')
381

    
382
  inthere = False
383

    
384
  save_lines = []
385
  add_lines = []
386
  removed = False
387

    
388
  while True:
389
    rawline = f.readline()
390
    logger.Debug('read %s' % (repr(rawline),))
391

    
392
    if not rawline:
393
      # End of file
394
      break
395

    
396
    line = rawline.split('\n')[0]
397

    
398
    parts = line.split(' ')
399
    fields = parts[0].split(',')
400
    key = parts[2]
401

    
402
    haveall = True
403
    havesome = False
404
    for spec in [ ip, fullnode ]:
405
      if spec not in fields:
406
        haveall = False
407
      if spec in fields:
408
        havesome = True
409

    
410
    logger.Debug("key, pubkey = %s." % (repr((key, pubkey)),))
411
    if haveall and key == pubkey:
412
      inthere = True
413
      save_lines.append(rawline)
414
      logger.Debug("Keeping known_hosts '%s'." % (repr(rawline),))
415
      continue
416

    
417
    if havesome and (not haveall or key != pubkey):
418
      removed = True
419
      logger.Debug("Discarding known_hosts '%s'." % (repr(rawline),))
420
      continue
421

    
422
    save_lines.append(rawline)
423

    
424
  if not inthere:
425
    add_lines.append('%s,%s ssh-rsa %s\n' % (fullnode, ip, pubkey))
426
    logger.Debug("Adding known_hosts '%s'." % (repr(add_lines[-1]),))
427

    
428
  if removed:
429
    save_lines = save_lines + add_lines
430

    
431
    # Write a new file and replace old.
432
    fd, tmpname = tempfile.mkstemp('.tmp', 'known_hosts.',
433
                                   constants.DATA_DIR)
434
    newfile = os.fdopen(fd, 'w')
435
    try:
436
      newfile.write(''.join(save_lines))
437
    finally:
438
      newfile.close()
439
    logger.Debug("Wrote new known_hosts.")
440
    os.rename(tmpname, constants.SSH_KNOWN_HOSTS_FILE)
441

    
442
  elif add_lines:
443
    # Simply appending a new line will do the trick.
444
    f.seek(0, 2)
445
    for add in add_lines:
446
      f.write(add)
447

    
448
  f.close()
449

    
450

    
451
def _HasValidVG(vglist, vgname):
452
  """Checks if the volume group list is valid.
453

454
  A non-None return value means there's an error, and the return value
455
  is the error message.
456

457
  """
458
  vgsize = vglist.get(vgname, None)
459
  if vgsize is None:
460
    return "volume group '%s' missing" % vgname
461
  elif vgsize < 20480:
462
    return ("volume group '%s' too small (20480MiB required, %dMib found)" %
463
            (vgname, vgsize))
464
  return None
465

    
466

    
467
def _InitSSHSetup(node):
468
  """Setup the SSH configuration for the cluster.
469

470

471
  This generates a dsa keypair for root, adds the pub key to the
472
  permitted hosts and adds the hostkey to its own known hosts.
473

474
  Args:
475
    node: the name of this host as a fqdn
476

477
  """
478
  if os.path.exists('/root/.ssh/id_dsa'):
479
    utils.CreateBackup('/root/.ssh/id_dsa')
480
  if os.path.exists('/root/.ssh/id_dsa.pub'):
481
    utils.CreateBackup('/root/.ssh/id_dsa.pub')
482

    
483
  utils.RemoveFile('/root/.ssh/id_dsa')
484
  utils.RemoveFile('/root/.ssh/id_dsa.pub')
485

    
486
  result = utils.RunCmd(["ssh-keygen", "-t", "dsa",
487
                         "-f", "/root/.ssh/id_dsa",
488
                         "-q", "-N", ""])
489
  if result.failed:
490
    raise errors.OpExecError("Could not generate ssh keypair, error %s" %
491
                             result.output)
492

    
493
  f = open('/root/.ssh/id_dsa.pub', 'r')
494
  try:
495
    utils.AddAuthorizedKey('/root/.ssh/authorized_keys', f.read(8192))
496
  finally:
497
    f.close()
498

    
499

    
500
def _InitGanetiServerSetup(ss):
501
  """Setup the necessary configuration for the initial node daemon.
502

503
  This creates the nodepass file containing the shared password for
504
  the cluster and also generates the SSL certificate.
505

506
  """
507
  # Create pseudo random password
508
  randpass = sha.new(os.urandom(64)).hexdigest()
509
  # and write it into sstore
510
  ss.SetKey(ss.SS_NODED_PASS, randpass)
511

    
512
  result = utils.RunCmd(["openssl", "req", "-new", "-newkey", "rsa:1024",
513
                         "-days", str(365*5), "-nodes", "-x509",
514
                         "-keyout", constants.SSL_CERT_FILE,
515
                         "-out", constants.SSL_CERT_FILE, "-batch"])
516
  if result.failed:
517
    raise errors.OpExecError("could not generate server ssl cert, command"
518
                             " %s had exitcode %s and error message %s" %
519
                             (result.cmd, result.exit_code, result.output))
520

    
521
  os.chmod(constants.SSL_CERT_FILE, 0400)
522

    
523
  result = utils.RunCmd([constants.NODE_INITD_SCRIPT, "restart"])
524

    
525
  if result.failed:
526
    raise errors.OpExecError("Could not start the node daemon, command %s"
527
                             " had exitcode %s and error %s" %
528
                             (result.cmd, result.exit_code, result.output))
529

    
530

    
531
class LUInitCluster(LogicalUnit):
532
  """Initialise the cluster.
533

534
  """
535
  HPATH = "cluster-init"
536
  HTYPE = constants.HTYPE_CLUSTER
537
  _OP_REQP = ["cluster_name", "hypervisor_type", "vg_name", "mac_prefix",
538
              "def_bridge", "master_netdev"]
539
  REQ_CLUSTER = False
540

    
541
  def BuildHooksEnv(self):
542
    """Build hooks env.
543

544
    Notes: Since we don't require a cluster, we must manually add
545
    ourselves in the post-run node list.
546

547
    """
548
    env = {
549
      "CLUSTER": self.op.cluster_name,
550
      "MASTER": self.hostname['hostname_full'],
551
      }
552
    return env, [], [self.hostname['hostname_full']]
553

    
554
  def CheckPrereq(self):
555
    """Verify that the passed name is a valid one.
556

557
    """
558
    if config.ConfigWriter.IsCluster():
559
      raise errors.OpPrereqError("Cluster is already initialised")
560

    
561
    hostname_local = socket.gethostname()
562
    self.hostname = hostname = utils.LookupHostname(hostname_local)
563
    if not hostname:
564
      raise errors.OpPrereqError("Cannot resolve my own hostname ('%s')" %
565
                                 hostname_local)
566

    
567
    if hostname["hostname_full"] != hostname_local:
568
      raise errors.OpPrereqError("My own hostname (%s) does not match the"
569
                                 " resolver (%s): probably not using FQDN"
570
                                 " for hostname." %
571
                                 (hostname_local, hostname["hostname_full"]))
572

    
573
    if hostname["ip"].startswith("127."):
574
      raise errors.OpPrereqError("This host's IP resolves to the private"
575
                                 " range (%s). Please fix DNS or /etc/hosts." %
576
                                 (hostname["ip"],))
577

    
578
    self.clustername = clustername = utils.LookupHostname(self.op.cluster_name)
579
    if not clustername:
580
      raise errors.OpPrereqError("Cannot resolve given cluster name ('%s')"
581
                                 % self.op.cluster_name)
582

    
583
    result = utils.RunCmd(["fping", "-S127.0.0.1", "-q", hostname['ip']])
584
    if result.failed:
585
      raise errors.OpPrereqError("Inconsistency: this host's name resolves"
586
                                 " to %s,\nbut this ip address does not"
587
                                 " belong to this host."
588
                                 " Aborting." % hostname['ip'])
589

    
590
    secondary_ip = getattr(self.op, "secondary_ip", None)
591
    if secondary_ip and not utils.IsValidIP(secondary_ip):
592
      raise errors.OpPrereqError("Invalid secondary ip given")
593
    if secondary_ip and secondary_ip != hostname['ip']:
594
      result = utils.RunCmd(["fping", "-S127.0.0.1", "-q", secondary_ip])
595
      if result.failed:
596
        raise errors.OpPrereqError("You gave %s as secondary IP,\n"
597
                                   "but it does not belong to this host." %
598
                                   secondary_ip)
599
    self.secondary_ip = secondary_ip
600

    
601
    # checks presence of the volume group given
602
    vgstatus = _HasValidVG(utils.ListVolumeGroups(), self.op.vg_name)
603

    
604
    if vgstatus:
605
      raise errors.OpPrereqError("Error: %s" % vgstatus)
606

    
607
    if not re.match("^[0-9a-z]{2}:[0-9a-z]{2}:[0-9a-z]{2}$",
608
                    self.op.mac_prefix):
609
      raise errors.OpPrereqError("Invalid mac prefix given '%s'" %
610
                                 self.op.mac_prefix)
611

    
612
    if self.op.hypervisor_type not in hypervisor.VALID_HTYPES:
613
      raise errors.OpPrereqError("Invalid hypervisor type given '%s'" %
614
                                 self.op.hypervisor_type)
615

    
616
    result = utils.RunCmd(["ip", "link", "show", "dev", self.op.master_netdev])
617
    if result.failed:
618
      raise errors.OpPrereqError("Invalid master netdev given (%s): '%s'" %
619
                                 (self.op.master_netdev,
620
                                  result.output.strip()))
621

    
622
  def Exec(self, feedback_fn):
623
    """Initialize the cluster.
624

625
    """
626
    clustername = self.clustername
627
    hostname = self.hostname
628

    
629
    # set up the simple store
630
    ss = ssconf.SimpleStore()
631
    ss.SetKey(ss.SS_HYPERVISOR, self.op.hypervisor_type)
632
    ss.SetKey(ss.SS_MASTER_NODE, hostname['hostname_full'])
633
    ss.SetKey(ss.SS_MASTER_IP, clustername['ip'])
634
    ss.SetKey(ss.SS_MASTER_NETDEV, self.op.master_netdev)
635
    ss.SetKey(ss.SS_CLUSTER_NAME, clustername['hostname'])
636

    
637
    # set up the inter-node password and certificate
638
    _InitGanetiServerSetup(ss)
639

    
640
    # start the master ip
641
    rpc.call_node_start_master(hostname['hostname_full'])
642

    
643
    # set up ssh config and /etc/hosts
644
    f = open('/etc/ssh/ssh_host_rsa_key.pub', 'r')
645
    try:
646
      sshline = f.read()
647
    finally:
648
      f.close()
649
    sshkey = sshline.split(" ")[1]
650

    
651
    _UpdateEtcHosts(hostname['hostname_full'],
652
                    hostname['ip'],
653
                    )
654

    
655
    _UpdateKnownHosts(hostname['hostname_full'],
656
                      hostname['ip'],
657
                      sshkey,
658
                      )
659

    
660
    _InitSSHSetup(hostname['hostname'])
661

    
662
    # init of cluster config file
663
    cfgw = config.ConfigWriter()
664
    cfgw.InitConfig(hostname['hostname'], hostname['ip'], self.secondary_ip,
665
                    sshkey, self.op.mac_prefix,
666
                    self.op.vg_name, self.op.def_bridge)
667

    
668

    
669
class LUDestroyCluster(NoHooksLU):
670
  """Logical unit for destroying the cluster.
671

672
  """
673
  _OP_REQP = []
674

    
675
  def CheckPrereq(self):
676
    """Check prerequisites.
677

678
    This checks whether the cluster is empty.
679

680
    Any errors are signalled by raising errors.OpPrereqError.
681

682
    """
683
    master = self.sstore.GetMasterNode()
684

    
685
    nodelist = self.cfg.GetNodeList()
686
    if len(nodelist) != 1 or nodelist[0] != master:
687
      raise errors.OpPrereqError("There are still %d node(s) in"
688
                                 " this cluster." % (len(nodelist) - 1))
689
    instancelist = self.cfg.GetInstanceList()
690
    if instancelist:
691
      raise errors.OpPrereqError("There are still %d instance(s) in"
692
                                 " this cluster." % len(instancelist))
693

    
694
  def Exec(self, feedback_fn):
695
    """Destroys the cluster.
696

697
    """
698
    utils.CreateBackup('/root/.ssh/id_dsa')
699
    utils.CreateBackup('/root/.ssh/id_dsa.pub')
700
    rpc.call_node_leave_cluster(self.sstore.GetMasterNode())
701

    
702

    
703
class LUVerifyCluster(NoHooksLU):
704
  """Verifies the cluster status.
705

706
  """
707
  _OP_REQP = []
708

    
709
  def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
710
                  remote_version, feedback_fn):
711
    """Run multiple tests against a node.
712

713
    Test list:
714
      - compares ganeti version
715
      - checks vg existance and size > 20G
716
      - checks config file checksum
717
      - checks ssh to other nodes
718

719
    Args:
720
      node: name of the node to check
721
      file_list: required list of files
722
      local_cksum: dictionary of local files and their checksums
723

724
    """
725
    # compares ganeti version
726
    local_version = constants.PROTOCOL_VERSION
727
    if not remote_version:
728
      feedback_fn(" - ERROR: connection to %s failed" % (node))
729
      return True
730

    
731
    if local_version != remote_version:
732
      feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
733
                      (local_version, node, remote_version))
734
      return True
735

    
736
    # checks vg existance and size > 20G
737

    
738
    bad = False
739
    if not vglist:
740
      feedback_fn("  - ERROR: unable to check volume groups on node %s." %
741
                      (node,))
742
      bad = True
743
    else:
744
      vgstatus = _HasValidVG(vglist, self.cfg.GetVGName())
745
      if vgstatus:
746
        feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
747
        bad = True
748

    
749
    # checks config file checksum
750
    # checks ssh to any
751

    
752
    if 'filelist' not in node_result:
753
      bad = True
754
      feedback_fn("  - ERROR: node hasn't returned file checksum data")
755
    else:
756
      remote_cksum = node_result['filelist']
757
      for file_name in file_list:
758
        if file_name not in remote_cksum:
759
          bad = True
760
          feedback_fn("  - ERROR: file '%s' missing" % file_name)
761
        elif remote_cksum[file_name] != local_cksum[file_name]:
762
          bad = True
763
          feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
764

    
765
    if 'nodelist' not in node_result:
766
      bad = True
767
      feedback_fn("  - ERROR: node hasn't returned node connectivity data")
768
    else:
769
      if node_result['nodelist']:
770
        bad = True
771
        for node in node_result['nodelist']:
772
          feedback_fn("  - ERROR: communication with node '%s': %s" %
773
                          (node, node_result['nodelist'][node]))
774
    hyp_result = node_result.get('hypervisor', None)
775
    if hyp_result is not None:
776
      feedback_fn("  - ERROR: hypervisor verify failure: '%s'" % hyp_result)
777
    return bad
778

    
779
  def _VerifyInstance(self, instance, node_vol_is, node_instance, feedback_fn):
780
    """Verify an instance.
781

782
    This function checks to see if the required block devices are
783
    available on the instance's node.
784

785
    """
786
    bad = False
787

    
788
    instancelist = self.cfg.GetInstanceList()
789
    if not instance in instancelist:
790
      feedback_fn("  - ERROR: instance %s not in instance list %s" %
791
                      (instance, instancelist))
792
      bad = True
793

    
794
    instanceconfig = self.cfg.GetInstanceInfo(instance)
795
    node_current = instanceconfig.primary_node
796

    
797
    node_vol_should = {}
798
    instanceconfig.MapLVsByNode(node_vol_should)
799

    
800
    for node in node_vol_should:
801
      for volume in node_vol_should[node]:
802
        if node not in node_vol_is or volume not in node_vol_is[node]:
803
          feedback_fn("  - ERROR: volume %s missing on node %s" %
804
                          (volume, node))
805
          bad = True
806

    
807
    if not instanceconfig.status == 'down':
808
      if not instance in node_instance[node_current]:
809
        feedback_fn("  - ERROR: instance %s not running on node %s" %
810
                        (instance, node_current))
811
        bad = True
812

    
813
    for node in node_instance:
814
      if (not node == node_current):
815
        if instance in node_instance[node]:
816
          feedback_fn("  - ERROR: instance %s should not run on node %s" %
817
                          (instance, node))
818
          bad = True
819

    
820
    return not bad
821

    
822
  def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
823
    """Verify if there are any unknown volumes in the cluster.
824

825
    The .os, .swap and backup volumes are ignored. All other volumes are
826
    reported as unknown.
827

828
    """
829
    bad = False
830

    
831
    for node in node_vol_is:
832
      for volume in node_vol_is[node]:
833
        if node not in node_vol_should or volume not in node_vol_should[node]:
834
          feedback_fn("  - ERROR: volume %s on node %s should not exist" %
835
                      (volume, node))
836
          bad = True
837
    return bad
838

    
839
  def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
840
    """Verify the list of running instances.
841

842
    This checks what instances are running but unknown to the cluster.
843

844
    """
845
    bad = False
846
    for node in node_instance:
847
      for runninginstance in node_instance[node]:
848
        if runninginstance not in instancelist:
849
          feedback_fn("  - ERROR: instance %s on node %s should not exist" %
850
                          (runninginstance, node))
851
          bad = True
852
    return bad
853

    
854
  def CheckPrereq(self):
855
    """Check prerequisites.
856

857
    This has no prerequisites.
858

859
    """
860
    pass
861

    
862
  def Exec(self, feedback_fn):
863
    """Verify integrity of cluster, performing various test on nodes.
864

865
    """
866
    bad = False
867
    feedback_fn("* Verifying global settings")
868
    self.cfg.VerifyConfig()
869

    
870
    master = self.sstore.GetMasterNode()
871
    vg_name = self.cfg.GetVGName()
872
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
873
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
874
    node_volume = {}
875
    node_instance = {}
876

    
877
    # FIXME: verify OS list
878
    # do local checksums
879
    file_names = list(self.sstore.GetFileList())
880
    file_names.append(constants.SSL_CERT_FILE)
881
    file_names.append(constants.CLUSTER_CONF_FILE)
882
    local_checksums = utils.FingerprintFiles(file_names)
883

    
884
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
885
    all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
886
    all_instanceinfo = rpc.call_instance_list(nodelist)
887
    all_vglist = rpc.call_vg_list(nodelist)
888
    node_verify_param = {
889
      'filelist': file_names,
890
      'nodelist': nodelist,
891
      'hypervisor': None,
892
      }
893
    all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
894
    all_rversion = rpc.call_version(nodelist)
895

    
896
    for node in nodelist:
897
      feedback_fn("* Verifying node %s" % node)
898
      result = self._VerifyNode(node, file_names, local_checksums,
899
                                all_vglist[node], all_nvinfo[node],
900
                                all_rversion[node], feedback_fn)
901
      bad = bad or result
902

    
903
      # node_volume
904
      volumeinfo = all_volumeinfo[node]
905

    
906
      if type(volumeinfo) != dict:
907
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
908
        bad = True
909
        continue
910

    
911
      node_volume[node] = volumeinfo
912

    
913
      # node_instance
914
      nodeinstance = all_instanceinfo[node]
915
      if type(nodeinstance) != list:
916
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
917
        bad = True
918
        continue
919

    
920
      node_instance[node] = nodeinstance
921

    
922
    node_vol_should = {}
923

    
924
    for instance in instancelist:
925
      feedback_fn("* Verifying instance %s" % instance)
926
      result =  self._VerifyInstance(instance, node_volume, node_instance,
927
                                     feedback_fn)
928
      bad = bad or result
929

    
930
      inst_config = self.cfg.GetInstanceInfo(instance)
931

    
932
      inst_config.MapLVsByNode(node_vol_should)
933

    
934
    feedback_fn("* Verifying orphan volumes")
935
    result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
936
                                       feedback_fn)
937
    bad = bad or result
938

    
939
    feedback_fn("* Verifying remaining instances")
940
    result = self._VerifyOrphanInstances(instancelist, node_instance,
941
                                         feedback_fn)
942
    bad = bad or result
943

    
944
    return int(bad)
945

    
946

    
947
def _WaitForSync(cfgw, instance, oneshot=False, unlock=False):
948
  """Sleep and poll for an instance's disk to sync.
949

950
  """
951
  if not instance.disks:
952
    return True
953

    
954
  if not oneshot:
955
    logger.ToStdout("Waiting for instance %s to sync disks." % instance.name)
956

    
957
  node = instance.primary_node
958

    
959
  for dev in instance.disks:
960
    cfgw.SetDiskID(dev, node)
961

    
962
  retries = 0
963
  while True:
964
    max_time = 0
965
    done = True
966
    cumul_degraded = False
967
    rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
968
    if not rstats:
969
      logger.ToStderr("Can't get any data from node %s" % node)
970
      retries += 1
971
      if retries >= 10:
972
        raise errors.RemoteError("Can't contact node %s for mirror data,"
973
                                 " aborting." % node)
974
      time.sleep(6)
975
      continue
976
    retries = 0
977
    for i in range(len(rstats)):
978
      mstat = rstats[i]
979
      if mstat is None:
980
        logger.ToStderr("Can't compute data for node %s/%s" %
981
                        (node, instance.disks[i].iv_name))
982
        continue
983
      perc_done, est_time, is_degraded = mstat
984
      cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
985
      if perc_done is not None:
986
        done = False
987
        if est_time is not None:
988
          rem_time = "%d estimated seconds remaining" % est_time
989
          max_time = est_time
990
        else:
991
          rem_time = "no time estimate"
992
        logger.ToStdout("- device %s: %5.2f%% done, %s" %
993
                        (instance.disks[i].iv_name, perc_done, rem_time))
994
    if done or oneshot:
995
      break
996

    
997
    if unlock:
998
      utils.Unlock('cmd')
999
    try:
1000
      time.sleep(min(60, max_time))
1001
    finally:
1002
      if unlock:
1003
        utils.Lock('cmd')
1004

    
1005
  if done:
1006
    logger.ToStdout("Instance %s's disks are in sync." % instance.name)
1007
  return not cumul_degraded
1008

    
1009

    
1010
def _CheckDiskConsistency(cfgw, dev, node, on_primary):
1011
  """Check that mirrors are not degraded.
1012

1013
  """
1014
  cfgw.SetDiskID(dev, node)
1015

    
1016
  result = True
1017
  if on_primary or dev.AssembleOnSecondary():
1018
    rstats = rpc.call_blockdev_find(node, dev)
1019
    if not rstats:
1020
      logger.ToStderr("Can't get any data from node %s" % node)
1021
      result = False
1022
    else:
1023
      result = result and (not rstats[5])
1024
  if dev.children:
1025
    for child in dev.children:
1026
      result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1027

    
1028
  return result
1029

    
1030

    
1031
class LUDiagnoseOS(NoHooksLU):
1032
  """Logical unit for OS diagnose/query.
1033

1034
  """
1035
  _OP_REQP = []
1036

    
1037
  def CheckPrereq(self):
1038
    """Check prerequisites.
1039

1040
    This always succeeds, since this is a pure query LU.
1041

1042
    """
1043
    return
1044

    
1045
  def Exec(self, feedback_fn):
1046
    """Compute the list of OSes.
1047

1048
    """
1049
    node_list = self.cfg.GetNodeList()
1050
    node_data = rpc.call_os_diagnose(node_list)
1051
    if node_data == False:
1052
      raise errors.OpExecError("Can't gather the list of OSes")
1053
    return node_data
1054

    
1055

    
1056
class LURemoveNode(LogicalUnit):
1057
  """Logical unit for removing a node.
1058

1059
  """
1060
  HPATH = "node-remove"
1061
  HTYPE = constants.HTYPE_NODE
1062
  _OP_REQP = ["node_name"]
1063

    
1064
  def BuildHooksEnv(self):
1065
    """Build hooks env.
1066

1067
    This doesn't run on the target node in the pre phase as a failed
1068
    node would not allows itself to run.
1069

1070
    """
1071
    env = {
1072
      "NODE_NAME": self.op.node_name,
1073
      }
1074
    all_nodes = self.cfg.GetNodeList()
1075
    all_nodes.remove(self.op.node_name)
1076
    return env, all_nodes, all_nodes
1077

    
1078
  def CheckPrereq(self):
1079
    """Check prerequisites.
1080

1081
    This checks:
1082
     - the node exists in the configuration
1083
     - it does not have primary or secondary instances
1084
     - it's not the master
1085

1086
    Any errors are signalled by raising errors.OpPrereqError.
1087

1088
    """
1089
    node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1090
    if node is None:
1091
      raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1092

    
1093
    instance_list = self.cfg.GetInstanceList()
1094

    
1095
    masternode = self.sstore.GetMasterNode()
1096
    if node.name == masternode:
1097
      raise errors.OpPrereqError("Node is the master node,"
1098
                                 " you need to failover first.")
1099

    
1100
    for instance_name in instance_list:
1101
      instance = self.cfg.GetInstanceInfo(instance_name)
1102
      if node.name == instance.primary_node:
1103
        raise errors.OpPrereqError("Instance %s still running on the node,"
1104
                                   " please remove first." % instance_name)
1105
      if node.name in instance.secondary_nodes:
1106
        raise errors.OpPrereqError("Instance %s has node as a secondary,"
1107
                                   " please remove first." % instance_name)
1108
    self.op.node_name = node.name
1109
    self.node = node
1110

    
1111
  def Exec(self, feedback_fn):
1112
    """Removes the node from the cluster.
1113

1114
    """
1115
    node = self.node
1116
    logger.Info("stopping the node daemon and removing configs from node %s" %
1117
                node.name)
1118

    
1119
    rpc.call_node_leave_cluster(node.name)
1120

    
1121
    ssh.SSHCall(node.name, 'root', "%s stop" % constants.NODE_INITD_SCRIPT)
1122

    
1123
    logger.Info("Removing node %s from config" % node.name)
1124

    
1125
    self.cfg.RemoveNode(node.name)
1126

    
1127

    
1128
class LUQueryNodes(NoHooksLU):
1129
  """Logical unit for querying nodes.
1130

1131
  """
1132
  _OP_REQP = ["output_fields"]
1133

    
1134
  def CheckPrereq(self):
1135
    """Check prerequisites.
1136

1137
    This checks that the fields required are valid output fields.
1138

1139
    """
1140
    self.dynamic_fields = frozenset(["dtotal", "dfree",
1141
                                     "mtotal", "mnode", "mfree"])
1142

    
1143
    _CheckOutputFields(static=["name", "pinst", "sinst", "pip", "sip"],
1144
                       dynamic=self.dynamic_fields,
1145
                       selected=self.op.output_fields)
1146

    
1147

    
1148
  def Exec(self, feedback_fn):
1149
    """Computes the list of nodes and their attributes.
1150

1151
    """
1152
    nodenames = utils.NiceSort(self.cfg.GetNodeList())
1153
    nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1154

    
1155

    
1156
    # begin data gathering
1157

    
1158
    if self.dynamic_fields.intersection(self.op.output_fields):
1159
      live_data = {}
1160
      node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1161
      for name in nodenames:
1162
        nodeinfo = node_data.get(name, None)
1163
        if nodeinfo:
1164
          live_data[name] = {
1165
            "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1166
            "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1167
            "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1168
            "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1169
            "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1170
            }
1171
        else:
1172
          live_data[name] = {}
1173
    else:
1174
      live_data = dict.fromkeys(nodenames, {})
1175

    
1176
    node_to_primary = dict.fromkeys(nodenames, 0)
1177
    node_to_secondary = dict.fromkeys(nodenames, 0)
1178

    
1179
    if "pinst" in self.op.output_fields or "sinst" in self.op.output_fields:
1180
      instancelist = self.cfg.GetInstanceList()
1181

    
1182
      for instance in instancelist:
1183
        instanceinfo = self.cfg.GetInstanceInfo(instance)
1184
        node_to_primary[instanceinfo.primary_node] += 1
1185
        for secnode in instanceinfo.secondary_nodes:
1186
          node_to_secondary[secnode] += 1
1187

    
1188
    # end data gathering
1189

    
1190
    output = []
1191
    for node in nodelist:
1192
      node_output = []
1193
      for field in self.op.output_fields:
1194
        if field == "name":
1195
          val = node.name
1196
        elif field == "pinst":
1197
          val = node_to_primary[node.name]
1198
        elif field == "sinst":
1199
          val = node_to_secondary[node.name]
1200
        elif field == "pip":
1201
          val = node.primary_ip
1202
        elif field == "sip":
1203
          val = node.secondary_ip
1204
        elif field in self.dynamic_fields:
1205
          val = live_data[node.name].get(field, "?")
1206
        else:
1207
          raise errors.ParameterError(field)
1208
        val = str(val)
1209
        node_output.append(val)
1210
      output.append(node_output)
1211

    
1212
    return output
1213

    
1214

    
1215
class LUQueryNodeVolumes(NoHooksLU):
1216
  """Logical unit for getting volumes on node(s).
1217

1218
  """
1219
  _OP_REQP = ["nodes", "output_fields"]
1220

    
1221
  def CheckPrereq(self):
1222
    """Check prerequisites.
1223

1224
    This checks that the fields required are valid output fields.
1225

1226
    """
1227
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1228

    
1229
    _CheckOutputFields(static=["node"],
1230
                       dynamic=["phys", "vg", "name", "size", "instance"],
1231
                       selected=self.op.output_fields)
1232

    
1233

    
1234
  def Exec(self, feedback_fn):
1235
    """Computes the list of nodes and their attributes.
1236

1237
    """
1238
    nodenames = self.nodes
1239
    volumes = rpc.call_node_volumes(nodenames)
1240

    
1241
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
1242
             in self.cfg.GetInstanceList()]
1243

    
1244
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1245

    
1246
    output = []
1247
    for node in nodenames:
1248
      if node not in volumes or not volumes[node]:
1249
        continue
1250

    
1251
      node_vols = volumes[node][:]
1252
      node_vols.sort(key=lambda vol: vol['dev'])
1253

    
1254
      for vol in node_vols:
1255
        node_output = []
1256
        for field in self.op.output_fields:
1257
          if field == "node":
1258
            val = node
1259
          elif field == "phys":
1260
            val = vol['dev']
1261
          elif field == "vg":
1262
            val = vol['vg']
1263
          elif field == "name":
1264
            val = vol['name']
1265
          elif field == "size":
1266
            val = int(float(vol['size']))
1267
          elif field == "instance":
1268
            for inst in ilist:
1269
              if node not in lv_by_node[inst]:
1270
                continue
1271
              if vol['name'] in lv_by_node[inst][node]:
1272
                val = inst.name
1273
                break
1274
            else:
1275
              val = '-'
1276
          else:
1277
            raise errors.ParameterError(field)
1278
          node_output.append(str(val))
1279

    
1280
        output.append(node_output)
1281

    
1282
    return output
1283

    
1284

    
1285
class LUAddNode(LogicalUnit):
1286
  """Logical unit for adding node to the cluster.
1287

1288
  """
1289
  HPATH = "node-add"
1290
  HTYPE = constants.HTYPE_NODE
1291
  _OP_REQP = ["node_name"]
1292

    
1293
  def BuildHooksEnv(self):
1294
    """Build hooks env.
1295

1296
    This will run on all nodes before, and on all nodes + the new node after.
1297

1298
    """
1299
    env = {
1300
      "NODE_NAME": self.op.node_name,
1301
      "NODE_PIP": self.op.primary_ip,
1302
      "NODE_SIP": self.op.secondary_ip,
1303
      }
1304
    nodes_0 = self.cfg.GetNodeList()
1305
    nodes_1 = nodes_0 + [self.op.node_name, ]
1306
    return env, nodes_0, nodes_1
1307

    
1308
  def CheckPrereq(self):
1309
    """Check prerequisites.
1310

1311
    This checks:
1312
     - the new node is not already in the config
1313
     - it is resolvable
1314
     - its parameters (single/dual homed) matches the cluster
1315

1316
    Any errors are signalled by raising errors.OpPrereqError.
1317

1318
    """
1319
    node_name = self.op.node_name
1320
    cfg = self.cfg
1321

    
1322
    dns_data = utils.LookupHostname(node_name)
1323
    if not dns_data:
1324
      raise errors.OpPrereqError("Node %s is not resolvable" % node_name)
1325

    
1326
    node = dns_data['hostname']
1327
    primary_ip = self.op.primary_ip = dns_data['ip']
1328
    secondary_ip = getattr(self.op, "secondary_ip", None)
1329
    if secondary_ip is None:
1330
      secondary_ip = primary_ip
1331
    if not utils.IsValidIP(secondary_ip):
1332
      raise errors.OpPrereqError("Invalid secondary IP given")
1333
    self.op.secondary_ip = secondary_ip
1334
    node_list = cfg.GetNodeList()
1335
    if node in node_list:
1336
      raise errors.OpPrereqError("Node %s is already in the configuration"
1337
                                 % node)
1338

    
1339
    for existing_node_name in node_list:
1340
      existing_node = cfg.GetNodeInfo(existing_node_name)
1341
      if (existing_node.primary_ip == primary_ip or
1342
          existing_node.secondary_ip == primary_ip or
1343
          existing_node.primary_ip == secondary_ip or
1344
          existing_node.secondary_ip == secondary_ip):
1345
        raise errors.OpPrereqError("New node ip address(es) conflict with"
1346
                                   " existing node %s" % existing_node.name)
1347

    
1348
    # check that the type of the node (single versus dual homed) is the
1349
    # same as for the master
1350
    myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1351
    master_singlehomed = myself.secondary_ip == myself.primary_ip
1352
    newbie_singlehomed = secondary_ip == primary_ip
1353
    if master_singlehomed != newbie_singlehomed:
1354
      if master_singlehomed:
1355
        raise errors.OpPrereqError("The master has no private ip but the"
1356
                                   " new node has one")
1357
      else:
1358
        raise errors.OpPrereqError("The master has a private ip but the"
1359
                                   " new node doesn't have one")
1360

    
1361
    # checks reachablity
1362
    command = ["fping", "-q", primary_ip]
1363
    result = utils.RunCmd(command)
1364
    if result.failed:
1365
      raise errors.OpPrereqError("Node not reachable by ping")
1366

    
1367
    if not newbie_singlehomed:
1368
      # check reachability from my secondary ip to newbie's secondary ip
1369
      command = ["fping", "-S%s" % myself.secondary_ip, "-q", secondary_ip]
1370
      result = utils.RunCmd(command)
1371
      if result.failed:
1372
        raise errors.OpPrereqError("Node secondary ip not reachable by ping")
1373

    
1374
    self.new_node = objects.Node(name=node,
1375
                                 primary_ip=primary_ip,
1376
                                 secondary_ip=secondary_ip)
1377

    
1378
  def Exec(self, feedback_fn):
1379
    """Adds the new node to the cluster.
1380

1381
    """
1382
    new_node = self.new_node
1383
    node = new_node.name
1384

    
1385
    # set up inter-node password and certificate and restarts the node daemon
1386
    gntpass = self.sstore.GetNodeDaemonPassword()
1387
    if not re.match('^[a-zA-Z0-9.]{1,64}$', gntpass):
1388
      raise errors.OpExecError("ganeti password corruption detected")
1389
    f = open(constants.SSL_CERT_FILE)
1390
    try:
1391
      gntpem = f.read(8192)
1392
    finally:
1393
      f.close()
1394
    # in the base64 pem encoding, neither '!' nor '.' are valid chars,
1395
    # so we use this to detect an invalid certificate; as long as the
1396
    # cert doesn't contain this, the here-document will be correctly
1397
    # parsed by the shell sequence below
1398
    if re.search('^!EOF\.', gntpem, re.MULTILINE):
1399
      raise errors.OpExecError("invalid PEM encoding in the SSL certificate")
1400
    if not gntpem.endswith("\n"):
1401
      raise errors.OpExecError("PEM must end with newline")
1402
    logger.Info("copy cluster pass to %s and starting the node daemon" % node)
1403

    
1404
    # and then connect with ssh to set password and start ganeti-noded
1405
    # note that all the below variables are sanitized at this point,
1406
    # either by being constants or by the checks above
1407
    ss = self.sstore
1408
    mycommand = ("umask 077 && "
1409
                 "echo '%s' > '%s' && "
1410
                 "cat > '%s' << '!EOF.' && \n"
1411
                 "%s!EOF.\n%s restart" %
1412
                 (gntpass, ss.KeyToFilename(ss.SS_NODED_PASS),
1413
                  constants.SSL_CERT_FILE, gntpem,
1414
                  constants.NODE_INITD_SCRIPT))
1415

    
1416
    result = ssh.SSHCall(node, 'root', mycommand, batch=False, ask_key=True)
1417
    if result.failed:
1418
      raise errors.OpExecError("Remote command on node %s, error: %s,"
1419
                               " output: %s" %
1420
                               (node, result.fail_reason, result.output))
1421

    
1422
    # check connectivity
1423
    time.sleep(4)
1424

    
1425
    result = rpc.call_version([node])[node]
1426
    if result:
1427
      if constants.PROTOCOL_VERSION == result:
1428
        logger.Info("communication to node %s fine, sw version %s match" %
1429
                    (node, result))
1430
      else:
1431
        raise errors.OpExecError("Version mismatch master version %s,"
1432
                                 " node version %s" %
1433
                                 (constants.PROTOCOL_VERSION, result))
1434
    else:
1435
      raise errors.OpExecError("Cannot get version from the new node")
1436

    
1437
    # setup ssh on node
1438
    logger.Info("copy ssh key to node %s" % node)
1439
    keyarray = []
1440
    keyfiles = ["/etc/ssh/ssh_host_dsa_key", "/etc/ssh/ssh_host_dsa_key.pub",
1441
                "/etc/ssh/ssh_host_rsa_key", "/etc/ssh/ssh_host_rsa_key.pub",
1442
                "/root/.ssh/id_dsa", "/root/.ssh/id_dsa.pub"]
1443

    
1444
    for i in keyfiles:
1445
      f = open(i, 'r')
1446
      try:
1447
        keyarray.append(f.read())
1448
      finally:
1449
        f.close()
1450

    
1451
    result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1452
                               keyarray[3], keyarray[4], keyarray[5])
1453

    
1454
    if not result:
1455
      raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1456

    
1457
    # Add node to our /etc/hosts, and add key to known_hosts
1458
    _UpdateEtcHosts(new_node.name, new_node.primary_ip)
1459
    _UpdateKnownHosts(new_node.name, new_node.primary_ip,
1460
                      self.cfg.GetHostKey())
1461

    
1462
    if new_node.secondary_ip != new_node.primary_ip:
1463
      result = ssh.SSHCall(node, "root",
1464
                           "fping -S 127.0.0.1 -q %s" % new_node.secondary_ip)
1465
      if result.failed:
1466
        raise errors.OpExecError("Node claims it doesn't have the"
1467
                                 " secondary ip you gave (%s).\n"
1468
                                 "Please fix and re-run this command." %
1469
                                 new_node.secondary_ip)
1470

    
1471
    success, msg = ssh.VerifyNodeHostname(node)
1472
    if not success:
1473
      raise errors.OpExecError("Node '%s' claims it has a different hostname"
1474
                               " than the one the resolver gives: %s.\n"
1475
                               "Please fix and re-run this command." %
1476
                               (node, msg))
1477

    
1478
    # Distribute updated /etc/hosts and known_hosts to all nodes,
1479
    # including the node just added
1480
    myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1481
    dist_nodes = self.cfg.GetNodeList() + [node]
1482
    if myself.name in dist_nodes:
1483
      dist_nodes.remove(myself.name)
1484

    
1485
    logger.Debug("Copying hosts and known_hosts to all nodes")
1486
    for fname in ("/etc/hosts", constants.SSH_KNOWN_HOSTS_FILE):
1487
      result = rpc.call_upload_file(dist_nodes, fname)
1488
      for to_node in dist_nodes:
1489
        if not result[to_node]:
1490
          logger.Error("copy of file %s to node %s failed" %
1491
                       (fname, to_node))
1492

    
1493
    to_copy = ss.GetFileList()
1494
    for fname in to_copy:
1495
      if not ssh.CopyFileToNode(node, fname):
1496
        logger.Error("could not copy file %s to node %s" % (fname, node))
1497

    
1498
    logger.Info("adding node %s to cluster.conf" % node)
1499
    self.cfg.AddNode(new_node)
1500

    
1501

    
1502
class LUMasterFailover(LogicalUnit):
1503
  """Failover the master node to the current node.
1504

1505
  This is a special LU in that it must run on a non-master node.
1506

1507
  """
1508
  HPATH = "master-failover"
1509
  HTYPE = constants.HTYPE_CLUSTER
1510
  REQ_MASTER = False
1511
  _OP_REQP = []
1512

    
1513
  def BuildHooksEnv(self):
1514
    """Build hooks env.
1515

1516
    This will run on the new master only in the pre phase, and on all
1517
    the nodes in the post phase.
1518

1519
    """
1520
    env = {
1521
      "NEW_MASTER": self.new_master,
1522
      "OLD_MASTER": self.old_master,
1523
      }
1524
    return env, [self.new_master], self.cfg.GetNodeList()
1525

    
1526
  def CheckPrereq(self):
1527
    """Check prerequisites.
1528

1529
    This checks that we are not already the master.
1530

1531
    """
1532
    self.new_master = socket.gethostname()
1533

    
1534
    self.old_master = self.sstore.GetMasterNode()
1535

    
1536
    if self.old_master == self.new_master:
1537
      raise errors.OpPrereqError("This commands must be run on the node"
1538
                                 " where you want the new master to be.\n"
1539
                                 "%s is already the master" %
1540
                                 self.old_master)
1541

    
1542
  def Exec(self, feedback_fn):
1543
    """Failover the master node.
1544

1545
    This command, when run on a non-master node, will cause the current
1546
    master to cease being master, and the non-master to become new
1547
    master.
1548

1549
    """
1550
    #TODO: do not rely on gethostname returning the FQDN
1551
    logger.Info("setting master to %s, old master: %s" %
1552
                (self.new_master, self.old_master))
1553

    
1554
    if not rpc.call_node_stop_master(self.old_master):
1555
      logger.Error("could disable the master role on the old master"
1556
                   " %s, please disable manually" % self.old_master)
1557

    
1558
    ss = self.sstore
1559
    ss.SetKey(ss.SS_MASTER_NODE, self.new_master)
1560
    if not rpc.call_upload_file(self.cfg.GetNodeList(),
1561
                                ss.KeyToFilename(ss.SS_MASTER_NODE)):
1562
      logger.Error("could not distribute the new simple store master file"
1563
                   " to the other nodes, please check.")
1564

    
1565
    if not rpc.call_node_start_master(self.new_master):
1566
      logger.Error("could not start the master role on the new master"
1567
                   " %s, please check" % self.new_master)
1568
      feedback_fn("Error in activating the master IP on the new master,\n"
1569
                  "please fix manually.")
1570

    
1571

    
1572

    
1573
class LUQueryClusterInfo(NoHooksLU):
1574
  """Query cluster configuration.
1575

1576
  """
1577
  _OP_REQP = []
1578
  REQ_MASTER = False
1579

    
1580
  def CheckPrereq(self):
1581
    """No prerequsites needed for this LU.
1582

1583
    """
1584
    pass
1585

    
1586
  def Exec(self, feedback_fn):
1587
    """Return cluster config.
1588

1589
    """
1590
    result = {
1591
      "name": self.sstore.GetClusterName(),
1592
      "software_version": constants.RELEASE_VERSION,
1593
      "protocol_version": constants.PROTOCOL_VERSION,
1594
      "config_version": constants.CONFIG_VERSION,
1595
      "os_api_version": constants.OS_API_VERSION,
1596
      "export_version": constants.EXPORT_VERSION,
1597
      "master": self.sstore.GetMasterNode(),
1598
      "architecture": (platform.architecture()[0], platform.machine()),
1599
      }
1600

    
1601
    return result
1602

    
1603

    
1604
class LUClusterCopyFile(NoHooksLU):
1605
  """Copy file to cluster.
1606

1607
  """
1608
  _OP_REQP = ["nodes", "filename"]
1609

    
1610
  def CheckPrereq(self):
1611
    """Check prerequisites.
1612

1613
    It should check that the named file exists and that the given list
1614
    of nodes is valid.
1615

1616
    """
1617
    if not os.path.exists(self.op.filename):
1618
      raise errors.OpPrereqError("No such filename '%s'" % self.op.filename)
1619

    
1620
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1621

    
1622
  def Exec(self, feedback_fn):
1623
    """Copy a file from master to some nodes.
1624

1625
    Args:
1626
      opts - class with options as members
1627
      args - list containing a single element, the file name
1628
    Opts used:
1629
      nodes - list containing the name of target nodes; if empty, all nodes
1630

1631
    """
1632
    filename = self.op.filename
1633

    
1634
    myname = socket.gethostname()
1635

    
1636
    for node in self.nodes:
1637
      if node == myname:
1638
        continue
1639
      if not ssh.CopyFileToNode(node, filename):
1640
        logger.Error("Copy of file %s to node %s failed" % (filename, node))
1641

    
1642

    
1643
class LUDumpClusterConfig(NoHooksLU):
1644
  """Return a text-representation of the cluster-config.
1645

1646
  """
1647
  _OP_REQP = []
1648

    
1649
  def CheckPrereq(self):
1650
    """No prerequisites.
1651

1652
    """
1653
    pass
1654

    
1655
  def Exec(self, feedback_fn):
1656
    """Dump a representation of the cluster config to the standard output.
1657

1658
    """
1659
    return self.cfg.DumpConfig()
1660

    
1661

    
1662
class LURunClusterCommand(NoHooksLU):
1663
  """Run a command on some nodes.
1664

1665
  """
1666
  _OP_REQP = ["command", "nodes"]
1667

    
1668
  def CheckPrereq(self):
1669
    """Check prerequisites.
1670

1671
    It checks that the given list of nodes is valid.
1672

1673
    """
1674
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1675

    
1676
  def Exec(self, feedback_fn):
1677
    """Run a command on some nodes.
1678

1679
    """
1680
    data = []
1681
    for node in self.nodes:
1682
      result = ssh.SSHCall(node, "root", self.op.command)
1683
      data.append((node, result.output, result.exit_code))
1684

    
1685
    return data
1686

    
1687

    
1688
class LUActivateInstanceDisks(NoHooksLU):
1689
  """Bring up an instance's disks.
1690

1691
  """
1692
  _OP_REQP = ["instance_name"]
1693

    
1694
  def CheckPrereq(self):
1695
    """Check prerequisites.
1696

1697
    This checks that the instance is in the cluster.
1698

1699
    """
1700
    instance = self.cfg.GetInstanceInfo(
1701
      self.cfg.ExpandInstanceName(self.op.instance_name))
1702
    if instance is None:
1703
      raise errors.OpPrereqError("Instance '%s' not known" %
1704
                                 self.op.instance_name)
1705
    self.instance = instance
1706

    
1707

    
1708
  def Exec(self, feedback_fn):
1709
    """Activate the disks.
1710

1711
    """
1712
    disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
1713
    if not disks_ok:
1714
      raise errors.OpExecError("Cannot activate block devices")
1715

    
1716
    return disks_info
1717

    
1718

    
1719
def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
1720
  """Prepare the block devices for an instance.
1721

1722
  This sets up the block devices on all nodes.
1723

1724
  Args:
1725
    instance: a ganeti.objects.Instance object
1726
    ignore_secondaries: if true, errors on secondary nodes won't result
1727
                        in an error return from the function
1728

1729
  Returns:
1730
    false if the operation failed
1731
    list of (host, instance_visible_name, node_visible_name) if the operation
1732
         suceeded with the mapping from node devices to instance devices
1733
  """
1734
  device_info = []
1735
  disks_ok = True
1736
  for inst_disk in instance.disks:
1737
    master_result = None
1738
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1739
      cfg.SetDiskID(node_disk, node)
1740
      is_primary = node == instance.primary_node
1741
      result = rpc.call_blockdev_assemble(node, node_disk, is_primary)
1742
      if not result:
1743
        logger.Error("could not prepare block device %s on node %s (is_pri"
1744
                     "mary=%s)" % (inst_disk.iv_name, node, is_primary))
1745
        if is_primary or not ignore_secondaries:
1746
          disks_ok = False
1747
      if is_primary:
1748
        master_result = result
1749
    device_info.append((instance.primary_node, inst_disk.iv_name,
1750
                        master_result))
1751

    
1752
  return disks_ok, device_info
1753

    
1754

    
1755
def _StartInstanceDisks(cfg, instance, force):
1756
  """Start the disks of an instance.
1757

1758
  """
1759
  disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
1760
                                           ignore_secondaries=force)
1761
  if not disks_ok:
1762
    _ShutdownInstanceDisks(instance, cfg)
1763
    if force is not None and not force:
1764
      logger.Error("If the message above refers to a secondary node,"
1765
                   " you can retry the operation using '--force'.")
1766
    raise errors.OpExecError("Disk consistency error")
1767

    
1768

    
1769
class LUDeactivateInstanceDisks(NoHooksLU):
1770
  """Shutdown an instance's disks.
1771

1772
  """
1773
  _OP_REQP = ["instance_name"]
1774

    
1775
  def CheckPrereq(self):
1776
    """Check prerequisites.
1777

1778
    This checks that the instance is in the cluster.
1779

1780
    """
1781
    instance = self.cfg.GetInstanceInfo(
1782
      self.cfg.ExpandInstanceName(self.op.instance_name))
1783
    if instance is None:
1784
      raise errors.OpPrereqError("Instance '%s' not known" %
1785
                                 self.op.instance_name)
1786
    self.instance = instance
1787

    
1788
  def Exec(self, feedback_fn):
1789
    """Deactivate the disks
1790

1791
    """
1792
    instance = self.instance
1793
    ins_l = rpc.call_instance_list([instance.primary_node])
1794
    ins_l = ins_l[instance.primary_node]
1795
    if not type(ins_l) is list:
1796
      raise errors.OpExecError("Can't contact node '%s'" %
1797
                               instance.primary_node)
1798

    
1799
    if self.instance.name in ins_l:
1800
      raise errors.OpExecError("Instance is running, can't shutdown"
1801
                               " block devices.")
1802

    
1803
    _ShutdownInstanceDisks(instance, self.cfg)
1804

    
1805

    
1806
def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
1807
  """Shutdown block devices of an instance.
1808

1809
  This does the shutdown on all nodes of the instance.
1810

1811
  If the ignore_primary is false, errors on the primary node are
1812
  ignored.
1813

1814
  """
1815
  result = True
1816
  for disk in instance.disks:
1817
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
1818
      cfg.SetDiskID(top_disk, node)
1819
      if not rpc.call_blockdev_shutdown(node, top_disk):
1820
        logger.Error("could not shutdown block device %s on node %s" %
1821
                     (disk.iv_name, node))
1822
        if not ignore_primary or node != instance.primary_node:
1823
          result = False
1824
  return result
1825

    
1826

    
1827
class LUStartupInstance(LogicalUnit):
1828
  """Starts an instance.
1829

1830
  """
1831
  HPATH = "instance-start"
1832
  HTYPE = constants.HTYPE_INSTANCE
1833
  _OP_REQP = ["instance_name", "force"]
1834

    
1835
  def BuildHooksEnv(self):
1836
    """Build hooks env.
1837

1838
    This runs on master, primary and secondary nodes of the instance.
1839

1840
    """
1841
    env = {
1842
      "FORCE": self.op.force,
1843
      }
1844
    env.update(_BuildInstanceHookEnvByObject(self.instance))
1845
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
1846
          list(self.instance.secondary_nodes))
1847
    return env, nl, nl
1848

    
1849
  def CheckPrereq(self):
1850
    """Check prerequisites.
1851

1852
    This checks that the instance is in the cluster.
1853

1854
    """
1855
    instance = self.cfg.GetInstanceInfo(
1856
      self.cfg.ExpandInstanceName(self.op.instance_name))
1857
    if instance is None:
1858
      raise errors.OpPrereqError("Instance '%s' not known" %
1859
                                 self.op.instance_name)
1860

    
1861
    # check bridges existance
1862
    brlist = [nic.bridge for nic in instance.nics]
1863
    if not rpc.call_bridges_exist(instance.primary_node, brlist):
1864
      raise errors.OpPrereqError("one or more target bridges %s does not"
1865
                                 " exist on destination node '%s'" %
1866
                                 (brlist, instance.primary_node))
1867

    
1868
    self.instance = instance
1869
    self.op.instance_name = instance.name
1870

    
1871
  def Exec(self, feedback_fn):
1872
    """Start the instance.
1873

1874
    """
1875
    instance = self.instance
1876
    force = self.op.force
1877
    extra_args = getattr(self.op, "extra_args", "")
1878

    
1879
    node_current = instance.primary_node
1880

    
1881
    nodeinfo = rpc.call_node_info([node_current], self.cfg.GetVGName())
1882
    if not nodeinfo:
1883
      raise errors.OpExecError("Could not contact node %s for infos" %
1884
                               (node_current))
1885

    
1886
    freememory = nodeinfo[node_current]['memory_free']
1887
    memory = instance.memory
1888
    if memory > freememory:
1889
      raise errors.OpExecError("Not enough memory to start instance"
1890
                               " %s on node %s"
1891
                               " needed %s MiB, available %s MiB" %
1892
                               (instance.name, node_current, memory,
1893
                                freememory))
1894

    
1895
    _StartInstanceDisks(self.cfg, instance, force)
1896

    
1897
    if not rpc.call_instance_start(node_current, instance, extra_args):
1898
      _ShutdownInstanceDisks(instance, self.cfg)
1899
      raise errors.OpExecError("Could not start instance")
1900

    
1901
    self.cfg.MarkInstanceUp(instance.name)
1902

    
1903

    
1904
class LUShutdownInstance(LogicalUnit):
1905
  """Shutdown an instance.
1906

1907
  """
1908
  HPATH = "instance-stop"
1909
  HTYPE = constants.HTYPE_INSTANCE
1910
  _OP_REQP = ["instance_name"]
1911

    
1912
  def BuildHooksEnv(self):
1913
    """Build hooks env.
1914

1915
    This runs on master, primary and secondary nodes of the instance.
1916

1917
    """
1918
    env = _BuildInstanceHookEnvByObject(self.instance)
1919
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
1920
          list(self.instance.secondary_nodes))
1921
    return env, nl, nl
1922

    
1923
  def CheckPrereq(self):
1924
    """Check prerequisites.
1925

1926
    This checks that the instance is in the cluster.
1927

1928
    """
1929
    instance = self.cfg.GetInstanceInfo(
1930
      self.cfg.ExpandInstanceName(self.op.instance_name))
1931
    if instance is None:
1932
      raise errors.OpPrereqError("Instance '%s' not known" %
1933
                                 self.op.instance_name)
1934
    self.instance = instance
1935

    
1936
  def Exec(self, feedback_fn):
1937
    """Shutdown the instance.
1938

1939
    """
1940
    instance = self.instance
1941
    node_current = instance.primary_node
1942
    if not rpc.call_instance_shutdown(node_current, instance):
1943
      logger.Error("could not shutdown instance")
1944

    
1945
    self.cfg.MarkInstanceDown(instance.name)
1946
    _ShutdownInstanceDisks(instance, self.cfg)
1947

    
1948

    
1949
class LUReinstallInstance(LogicalUnit):
1950
  """Reinstall an instance.
1951

1952
  """
1953
  HPATH = "instance-reinstall"
1954
  HTYPE = constants.HTYPE_INSTANCE
1955
  _OP_REQP = ["instance_name"]
1956

    
1957
  def BuildHooksEnv(self):
1958
    """Build hooks env.
1959

1960
    This runs on master, primary and secondary nodes of the instance.
1961

1962
    """
1963
    env = _BuildInstanceHookEnvByObject(self.instance)
1964
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
1965
          list(self.instance.secondary_nodes))
1966
    return env, nl, nl
1967

    
1968
  def CheckPrereq(self):
1969
    """Check prerequisites.
1970

1971
    This checks that the instance is in the cluster and is not running.
1972

1973
    """
1974
    instance = self.cfg.GetInstanceInfo(
1975
      self.cfg.ExpandInstanceName(self.op.instance_name))
1976
    if instance is None:
1977
      raise errors.OpPrereqError("Instance '%s' not known" %
1978
                                 self.op.instance_name)
1979
    if instance.disk_template == constants.DT_DISKLESS:
1980
      raise errors.OpPrereqError("Instance '%s' has no disks" %
1981
                                 self.op.instance_name)
1982
    if instance.status != "down":
1983
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
1984
                                 self.op.instance_name)
1985
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
1986
    if remote_info:
1987
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
1988
                                 (self.op.instance_name,
1989
                                  instance.primary_node))
1990

    
1991
    self.op.os_type = getattr(self.op, "os_type", None)
1992
    if self.op.os_type is not None:
1993
      # OS verification
1994
      pnode = self.cfg.GetNodeInfo(
1995
        self.cfg.ExpandNodeName(instance.primary_node))
1996
      if pnode is None:
1997
        raise errors.OpPrereqError("Primary node '%s' is unknown" %
1998
                                   self.op.pnode)
1999
      os_obj = rpc.call_os_get([pnode.name], self.op.os_type)[pnode.name]
2000
      if not isinstance(os_obj, objects.OS):
2001
        raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2002
                                   " primary node"  % self.op.os_type)
2003

    
2004
    self.instance = instance
2005

    
2006
  def Exec(self, feedback_fn):
2007
    """Reinstall the instance.
2008

2009
    """
2010
    inst = self.instance
2011

    
2012
    if self.op.os_type is not None:
2013
      feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2014
      inst.os = self.op.os_type
2015
      self.cfg.AddInstance(inst)
2016

    
2017
    _StartInstanceDisks(self.cfg, inst, None)
2018
    try:
2019
      feedback_fn("Running the instance OS create scripts...")
2020
      if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2021
        raise errors.OpExecError("Could not install OS for instance %s "
2022
                                 "on node %s" %
2023
                                 (inst.name, inst.primary_node))
2024
    finally:
2025
      _ShutdownInstanceDisks(inst, self.cfg)
2026

    
2027

    
2028
class LURemoveInstance(LogicalUnit):
2029
  """Remove an instance.
2030

2031
  """
2032
  HPATH = "instance-remove"
2033
  HTYPE = constants.HTYPE_INSTANCE
2034
  _OP_REQP = ["instance_name"]
2035

    
2036
  def BuildHooksEnv(self):
2037
    """Build hooks env.
2038

2039
    This runs on master, primary and secondary nodes of the instance.
2040

2041
    """
2042
    env = _BuildInstanceHookEnvByObject(self.instance)
2043
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2044
          list(self.instance.secondary_nodes))
2045
    return env, nl, nl
2046

    
2047
  def CheckPrereq(self):
2048
    """Check prerequisites.
2049

2050
    This checks that the instance is in the cluster.
2051

2052
    """
2053
    instance = self.cfg.GetInstanceInfo(
2054
      self.cfg.ExpandInstanceName(self.op.instance_name))
2055
    if instance is None:
2056
      raise errors.OpPrereqError("Instance '%s' not known" %
2057
                                 self.op.instance_name)
2058
    self.instance = instance
2059

    
2060
  def Exec(self, feedback_fn):
2061
    """Remove the instance.
2062

2063
    """
2064
    instance = self.instance
2065
    logger.Info("shutting down instance %s on node %s" %
2066
                (instance.name, instance.primary_node))
2067

    
2068
    if not rpc.call_instance_shutdown(instance.primary_node, instance):
2069
      raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2070
                               (instance.name, instance.primary_node))
2071

    
2072
    logger.Info("removing block devices for instance %s" % instance.name)
2073

    
2074
    _RemoveDisks(instance, self.cfg)
2075

    
2076
    logger.Info("removing instance %s out of cluster config" % instance.name)
2077

    
2078
    self.cfg.RemoveInstance(instance.name)
2079

    
2080

    
2081
class LUQueryInstances(NoHooksLU):
2082
  """Logical unit for querying instances.
2083

2084
  """
2085
  _OP_REQP = ["output_fields"]
2086

    
2087
  def CheckPrereq(self):
2088
    """Check prerequisites.
2089

2090
    This checks that the fields required are valid output fields.
2091

2092
    """
2093
    self.dynamic_fields = frozenset(["oper_state", "oper_ram"])
2094
    _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2095
                               "admin_state", "admin_ram",
2096
                               "disk_template", "ip", "mac", "bridge",
2097
                               "sda_size", "sdb_size"],
2098
                       dynamic=self.dynamic_fields,
2099
                       selected=self.op.output_fields)
2100

    
2101
  def Exec(self, feedback_fn):
2102
    """Computes the list of nodes and their attributes.
2103

2104
    """
2105
    instance_names = utils.NiceSort(self.cfg.GetInstanceList())
2106
    instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2107
                     in instance_names]
2108

    
2109
    # begin data gathering
2110

    
2111
    nodes = frozenset([inst.primary_node for inst in instance_list])
2112

    
2113
    bad_nodes = []
2114
    if self.dynamic_fields.intersection(self.op.output_fields):
2115
      live_data = {}
2116
      node_data = rpc.call_all_instances_info(nodes)
2117
      for name in nodes:
2118
        result = node_data[name]
2119
        if result:
2120
          live_data.update(result)
2121
        elif result == False:
2122
          bad_nodes.append(name)
2123
        # else no instance is alive
2124
    else:
2125
      live_data = dict([(name, {}) for name in instance_names])
2126

    
2127
    # end data gathering
2128

    
2129
    output = []
2130
    for instance in instance_list:
2131
      iout = []
2132
      for field in self.op.output_fields:
2133
        if field == "name":
2134
          val = instance.name
2135
        elif field == "os":
2136
          val = instance.os
2137
        elif field == "pnode":
2138
          val = instance.primary_node
2139
        elif field == "snodes":
2140
          val = list(instance.secondary_nodes)
2141
        elif field == "admin_state":
2142
          val = (instance.status != "down")
2143
        elif field == "oper_state":
2144
          if instance.primary_node in bad_nodes:
2145
            val = None
2146
          else:
2147
            val = bool(live_data.get(instance.name))
2148
        elif field == "admin_ram":
2149
          val = instance.memory
2150
        elif field == "oper_ram":
2151
          if instance.primary_node in bad_nodes:
2152
            val = None
2153
          elif instance.name in live_data:
2154
            val = live_data[instance.name].get("memory", "?")
2155
          else:
2156
            val = "-"
2157
        elif field == "disk_template":
2158
          val = instance.disk_template
2159
        elif field == "ip":
2160
          val = instance.nics[0].ip
2161
        elif field == "bridge":
2162
          val = instance.nics[0].bridge
2163
        elif field == "mac":
2164
          val = instance.nics[0].mac
2165
        elif field == "sda_size" or field == "sdb_size":
2166
          disk = instance.FindDisk(field[:3])
2167
          if disk is None:
2168
            val = None
2169
          else:
2170
            val = disk.size
2171
        else:
2172
          raise errors.ParameterError(field)
2173
        iout.append(val)
2174
      output.append(iout)
2175

    
2176
    return output
2177

    
2178

    
2179
class LUFailoverInstance(LogicalUnit):
2180
  """Failover an instance.
2181

2182
  """
2183
  HPATH = "instance-failover"
2184
  HTYPE = constants.HTYPE_INSTANCE
2185
  _OP_REQP = ["instance_name", "ignore_consistency"]
2186

    
2187
  def BuildHooksEnv(self):
2188
    """Build hooks env.
2189

2190
    This runs on master, primary and secondary nodes of the instance.
2191

2192
    """
2193
    env = {
2194
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2195
      }
2196
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2197
    nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2198
    return env, nl, nl
2199

    
2200
  def CheckPrereq(self):
2201
    """Check prerequisites.
2202

2203
    This checks that the instance is in the cluster.
2204

2205
    """
2206
    instance = self.cfg.GetInstanceInfo(
2207
      self.cfg.ExpandInstanceName(self.op.instance_name))
2208
    if instance is None:
2209
      raise errors.OpPrereqError("Instance '%s' not known" %
2210
                                 self.op.instance_name)
2211

    
2212
    if instance.disk_template != constants.DT_REMOTE_RAID1:
2213
      raise errors.OpPrereqError("Instance's disk layout is not"
2214
                                 " remote_raid1.")
2215

    
2216
    secondary_nodes = instance.secondary_nodes
2217
    if not secondary_nodes:
2218
      raise errors.ProgrammerError("no secondary node but using "
2219
                                   "DT_REMOTE_RAID1 template")
2220

    
2221
    # check memory requirements on the secondary node
2222
    target_node = secondary_nodes[0]
2223
    nodeinfo = rpc.call_node_info([target_node], self.cfg.GetVGName())
2224
    info = nodeinfo.get(target_node, None)
2225
    if not info:
2226
      raise errors.OpPrereqError("Cannot get current information"
2227
                                 " from node '%s'" % nodeinfo)
2228
    if instance.memory > info['memory_free']:
2229
      raise errors.OpPrereqError("Not enough memory on target node %s."
2230
                                 " %d MB available, %d MB required" %
2231
                                 (target_node, info['memory_free'],
2232
                                  instance.memory))
2233

    
2234
    # check bridge existance
2235
    brlist = [nic.bridge for nic in instance.nics]
2236
    if not rpc.call_bridges_exist(instance.primary_node, brlist):
2237
      raise errors.OpPrereqError("One or more target bridges %s does not"
2238
                                 " exist on destination node '%s'" %
2239
                                 (brlist, instance.primary_node))
2240

    
2241
    self.instance = instance
2242

    
2243
  def Exec(self, feedback_fn):
2244
    """Failover an instance.
2245

2246
    The failover is done by shutting it down on its present node and
2247
    starting it on the secondary.
2248

2249
    """
2250
    instance = self.instance
2251

    
2252
    source_node = instance.primary_node
2253
    target_node = instance.secondary_nodes[0]
2254

    
2255
    feedback_fn("* checking disk consistency between source and target")
2256
    for dev in instance.disks:
2257
      # for remote_raid1, these are md over drbd
2258
      if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2259
        if not self.op.ignore_consistency:
2260
          raise errors.OpExecError("Disk %s is degraded on target node,"
2261
                                   " aborting failover." % dev.iv_name)
2262

    
2263
    feedback_fn("* checking target node resource availability")
2264
    nodeinfo = rpc.call_node_info([target_node], self.cfg.GetVGName())
2265

    
2266
    if not nodeinfo:
2267
      raise errors.OpExecError("Could not contact target node %s." %
2268
                               target_node)
2269

    
2270
    free_memory = int(nodeinfo[target_node]['memory_free'])
2271
    memory = instance.memory
2272
    if memory > free_memory:
2273
      raise errors.OpExecError("Not enough memory to create instance %s on"
2274
                               " node %s. needed %s MiB, available %s MiB" %
2275
                               (instance.name, target_node, memory,
2276
                                free_memory))
2277

    
2278
    feedback_fn("* shutting down instance on source node")
2279
    logger.Info("Shutting down instance %s on node %s" %
2280
                (instance.name, source_node))
2281

    
2282
    if not rpc.call_instance_shutdown(source_node, instance):
2283
      logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2284
                   " anyway. Please make sure node %s is down"  %
2285
                   (instance.name, source_node, source_node))
2286

    
2287
    feedback_fn("* deactivating the instance's disks on source node")
2288
    if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2289
      raise errors.OpExecError("Can't shut down the instance's disks.")
2290

    
2291
    instance.primary_node = target_node
2292
    # distribute new instance config to the other nodes
2293
    self.cfg.AddInstance(instance)
2294

    
2295
    feedback_fn("* activating the instance's disks on target node")
2296
    logger.Info("Starting instance %s on node %s" %
2297
                (instance.name, target_node))
2298

    
2299
    disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2300
                                             ignore_secondaries=True)
2301
    if not disks_ok:
2302
      _ShutdownInstanceDisks(instance, self.cfg)
2303
      raise errors.OpExecError("Can't activate the instance's disks")
2304

    
2305
    feedback_fn("* starting the instance on the target node")
2306
    if not rpc.call_instance_start(target_node, instance, None):
2307
      _ShutdownInstanceDisks(instance, self.cfg)
2308
      raise errors.OpExecError("Could not start instance %s on node %s." %
2309
                               (instance.name, target_node))
2310

    
2311

    
2312
def _CreateBlockDevOnPrimary(cfg, node, device, info):
2313
  """Create a tree of block devices on the primary node.
2314

2315
  This always creates all devices.
2316

2317
  """
2318
  if device.children:
2319
    for child in device.children:
2320
      if not _CreateBlockDevOnPrimary(cfg, node, child, info):
2321
        return False
2322

    
2323
  cfg.SetDiskID(device, node)
2324
  new_id = rpc.call_blockdev_create(node, device, device.size, True, info)
2325
  if not new_id:
2326
    return False
2327
  if device.physical_id is None:
2328
    device.physical_id = new_id
2329
  return True
2330

    
2331

    
2332
def _CreateBlockDevOnSecondary(cfg, node, device, force, info):
2333
  """Create a tree of block devices on a secondary node.
2334

2335
  If this device type has to be created on secondaries, create it and
2336
  all its children.
2337

2338
  If not, just recurse to children keeping the same 'force' value.
2339

2340
  """
2341
  if device.CreateOnSecondary():
2342
    force = True
2343
  if device.children:
2344
    for child in device.children:
2345
      if not _CreateBlockDevOnSecondary(cfg, node, child, force, info):
2346
        return False
2347

    
2348
  if not force:
2349
    return True
2350
  cfg.SetDiskID(device, node)
2351
  new_id = rpc.call_blockdev_create(node, device, device.size, False, info)
2352
  if not new_id:
2353
    return False
2354
  if device.physical_id is None:
2355
    device.physical_id = new_id
2356
  return True
2357

    
2358

    
2359
def _GenerateUniqueNames(cfg, exts):
2360
  """Generate a suitable LV name.
2361

2362
  This will generate a logical volume name for the given instance.
2363

2364
  """
2365
  results = []
2366
  for val in exts:
2367
    new_id = cfg.GenerateUniqueID()
2368
    results.append("%s%s" % (new_id, val))
2369
  return results
2370

    
2371

    
2372
def _GenerateMDDRBDBranch(cfg, primary, secondary, size, names):
2373
  """Generate a drbd device complete with its children.
2374

2375
  """
2376
  port = cfg.AllocatePort()
2377
  vgname = cfg.GetVGName()
2378
  dev_data = objects.Disk(dev_type="lvm", size=size,
2379
                          logical_id=(vgname, names[0]))
2380
  dev_meta = objects.Disk(dev_type="lvm", size=128,
2381
                          logical_id=(vgname, names[1]))
2382
  drbd_dev = objects.Disk(dev_type="drbd", size=size,
2383
                          logical_id = (primary, secondary, port),
2384
                          children = [dev_data, dev_meta])
2385
  return drbd_dev
2386

    
2387

    
2388
def _GenerateDiskTemplate(cfg, template_name,
2389
                          instance_name, primary_node,
2390
                          secondary_nodes, disk_sz, swap_sz):
2391
  """Generate the entire disk layout for a given template type.
2392

2393
  """
2394
  #TODO: compute space requirements
2395

    
2396
  vgname = cfg.GetVGName()
2397
  if template_name == "diskless":
2398
    disks = []
2399
  elif template_name == "plain":
2400
    if len(secondary_nodes) != 0:
2401
      raise errors.ProgrammerError("Wrong template configuration")
2402

    
2403
    names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
2404
    sda_dev = objects.Disk(dev_type="lvm", size=disk_sz,
2405
                           logical_id=(vgname, names[0]),
2406
                           iv_name = "sda")
2407
    sdb_dev = objects.Disk(dev_type="lvm", size=swap_sz,
2408
                           logical_id=(vgname, names[1]),
2409
                           iv_name = "sdb")
2410
    disks = [sda_dev, sdb_dev]
2411
  elif template_name == "local_raid1":
2412
    if len(secondary_nodes) != 0:
2413
      raise errors.ProgrammerError("Wrong template configuration")
2414

    
2415

    
2416
    names = _GenerateUniqueNames(cfg, [".sda_m1", ".sda_m2",
2417
                                       ".sdb_m1", ".sdb_m2"])
2418
    sda_dev_m1 = objects.Disk(dev_type="lvm", size=disk_sz,
2419
                              logical_id=(vgname, names[0]))
2420
    sda_dev_m2 = objects.Disk(dev_type="lvm", size=disk_sz,
2421
                              logical_id=(vgname, names[1]))
2422
    md_sda_dev = objects.Disk(dev_type="md_raid1", iv_name = "sda",
2423
                              size=disk_sz,
2424
                              children = [sda_dev_m1, sda_dev_m2])
2425
    sdb_dev_m1 = objects.Disk(dev_type="lvm", size=swap_sz,
2426
                              logical_id=(vgname, names[2]))
2427
    sdb_dev_m2 = objects.Disk(dev_type="lvm", size=swap_sz,
2428
                              logical_id=(vgname, names[3]))
2429
    md_sdb_dev = objects.Disk(dev_type="md_raid1", iv_name = "sdb",
2430
                              size=swap_sz,
2431
                              children = [sdb_dev_m1, sdb_dev_m2])
2432
    disks = [md_sda_dev, md_sdb_dev]
2433
  elif template_name == constants.DT_REMOTE_RAID1:
2434
    if len(secondary_nodes) != 1:
2435
      raise errors.ProgrammerError("Wrong template configuration")
2436
    remote_node = secondary_nodes[0]
2437
    names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2438
                                       ".sdb_data", ".sdb_meta"])
2439
    drbd_sda_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2440
                                         disk_sz, names[0:2])
2441
    md_sda_dev = objects.Disk(dev_type="md_raid1", iv_name="sda",
2442
                              children = [drbd_sda_dev], size=disk_sz)
2443
    drbd_sdb_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2444
                                         swap_sz, names[2:4])
2445
    md_sdb_dev = objects.Disk(dev_type="md_raid1", iv_name="sdb",
2446
                              children = [drbd_sdb_dev], size=swap_sz)
2447
    disks = [md_sda_dev, md_sdb_dev]
2448
  else:
2449
    raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
2450
  return disks
2451

    
2452

    
2453
def _GetInstanceInfoText(instance):
2454
  """Compute that text that should be added to the disk's metadata.
2455

2456
  """
2457
  return "originstname+%s" % instance.name
2458

    
2459

    
2460
def _CreateDisks(cfg, instance):
2461
  """Create all disks for an instance.
2462

2463
  This abstracts away some work from AddInstance.
2464

2465
  Args:
2466
    instance: the instance object
2467

2468
  Returns:
2469
    True or False showing the success of the creation process
2470

2471
  """
2472
  info = _GetInstanceInfoText(instance)
2473

    
2474
  for device in instance.disks:
2475
    logger.Info("creating volume %s for instance %s" %
2476
              (device.iv_name, instance.name))
2477
    #HARDCODE
2478
    for secondary_node in instance.secondary_nodes:
2479
      if not _CreateBlockDevOnSecondary(cfg, secondary_node, device, False,
2480
                                        info):
2481
        logger.Error("failed to create volume %s (%s) on secondary node %s!" %
2482
                     (device.iv_name, device, secondary_node))
2483
        return False
2484
    #HARDCODE
2485
    if not _CreateBlockDevOnPrimary(cfg, instance.primary_node, device, info):
2486
      logger.Error("failed to create volume %s on primary!" %
2487
                   device.iv_name)
2488
      return False
2489
  return True
2490

    
2491

    
2492
def _RemoveDisks(instance, cfg):
2493
  """Remove all disks for an instance.
2494

2495
  This abstracts away some work from `AddInstance()` and
2496
  `RemoveInstance()`. Note that in case some of the devices couldn't
2497
  be remove, the removal will continue with the other ones (compare
2498
  with `_CreateDisks()`).
2499

2500
  Args:
2501
    instance: the instance object
2502

2503
  Returns:
2504
    True or False showing the success of the removal proces
2505

2506
  """
2507
  logger.Info("removing block devices for instance %s" % instance.name)
2508

    
2509
  result = True
2510
  for device in instance.disks:
2511
    for node, disk in device.ComputeNodeTree(instance.primary_node):
2512
      cfg.SetDiskID(disk, node)
2513
      if not rpc.call_blockdev_remove(node, disk):
2514
        logger.Error("could not remove block device %s on node %s,"
2515
                     " continuing anyway" %
2516
                     (device.iv_name, node))
2517
        result = False
2518
  return result
2519

    
2520

    
2521
class LUCreateInstance(LogicalUnit):
2522
  """Create an instance.
2523

2524
  """
2525
  HPATH = "instance-add"
2526
  HTYPE = constants.HTYPE_INSTANCE
2527
  _OP_REQP = ["instance_name", "mem_size", "disk_size", "pnode",
2528
              "disk_template", "swap_size", "mode", "start", "vcpus",
2529
              "wait_for_sync"]
2530

    
2531
  def BuildHooksEnv(self):
2532
    """Build hooks env.
2533

2534
    This runs on master, primary and secondary nodes of the instance.
2535

2536
    """
2537
    env = {
2538
      "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
2539
      "INSTANCE_DISK_SIZE": self.op.disk_size,
2540
      "INSTANCE_SWAP_SIZE": self.op.swap_size,
2541
      "INSTANCE_ADD_MODE": self.op.mode,
2542
      }
2543
    if self.op.mode == constants.INSTANCE_IMPORT:
2544
      env["INSTANCE_SRC_NODE"] = self.op.src_node
2545
      env["INSTANCE_SRC_PATH"] = self.op.src_path
2546
      env["INSTANCE_SRC_IMAGE"] = self.src_image
2547

    
2548
    env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
2549
      primary_node=self.op.pnode,
2550
      secondary_nodes=self.secondaries,
2551
      status=self.instance_status,
2552
      os_type=self.op.os_type,
2553
      memory=self.op.mem_size,
2554
      vcpus=self.op.vcpus,
2555
      nics=[(self.inst_ip, self.op.bridge)],
2556
    ))
2557

    
2558
    nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
2559
          self.secondaries)
2560
    return env, nl, nl
2561

    
2562

    
2563
  def CheckPrereq(self):
2564
    """Check prerequisites.
2565

2566
    """
2567
    if self.op.mode not in (constants.INSTANCE_CREATE,
2568
                            constants.INSTANCE_IMPORT):
2569
      raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
2570
                                 self.op.mode)
2571

    
2572
    if self.op.mode == constants.INSTANCE_IMPORT:
2573
      src_node = getattr(self.op, "src_node", None)
2574
      src_path = getattr(self.op, "src_path", None)
2575
      if src_node is None or src_path is None:
2576
        raise errors.OpPrereqError("Importing an instance requires source"
2577
                                   " node and path options")
2578
      src_node_full = self.cfg.ExpandNodeName(src_node)
2579
      if src_node_full is None:
2580
        raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
2581
      self.op.src_node = src_node = src_node_full
2582

    
2583
      if not os.path.isabs(src_path):
2584
        raise errors.OpPrereqError("The source path must be absolute")
2585

    
2586
      export_info = rpc.call_export_info(src_node, src_path)
2587

    
2588
      if not export_info:
2589
        raise errors.OpPrereqError("No export found in dir %s" % src_path)
2590

    
2591
      if not export_info.has_section(constants.INISECT_EXP):
2592
        raise errors.ProgrammerError("Corrupted export config")
2593

    
2594
      ei_version = export_info.get(constants.INISECT_EXP, 'version')
2595
      if (int(ei_version) != constants.EXPORT_VERSION):
2596
        raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
2597
                                   (ei_version, constants.EXPORT_VERSION))
2598

    
2599
      if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
2600
        raise errors.OpPrereqError("Can't import instance with more than"
2601
                                   " one data disk")
2602

    
2603
      # FIXME: are the old os-es, disk sizes, etc. useful?
2604
      self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
2605
      diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
2606
                                                         'disk0_dump'))
2607
      self.src_image = diskimage
2608
    else: # INSTANCE_CREATE
2609
      if getattr(self.op, "os_type", None) is None:
2610
        raise errors.OpPrereqError("No guest OS specified")
2611

    
2612
    # check primary node
2613
    pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
2614
    if pnode is None:
2615
      raise errors.OpPrereqError("Primary node '%s' is unknown" %
2616
                                 self.op.pnode)
2617
    self.op.pnode = pnode.name
2618
    self.pnode = pnode
2619
    self.secondaries = []
2620
    # disk template and mirror node verification
2621
    if self.op.disk_template not in constants.DISK_TEMPLATES:
2622
      raise errors.OpPrereqError("Invalid disk template name")
2623

    
2624
    if self.op.disk_template == constants.DT_REMOTE_RAID1:
2625
      if getattr(self.op, "snode", None) is None:
2626
        raise errors.OpPrereqError("The 'remote_raid1' disk template needs"
2627
                                   " a mirror node")
2628

    
2629
      snode_name = self.cfg.ExpandNodeName(self.op.snode)
2630
      if snode_name is None:
2631
        raise errors.OpPrereqError("Unknown secondary node '%s'" %
2632
                                   self.op.snode)
2633
      elif snode_name == pnode.name:
2634
        raise errors.OpPrereqError("The secondary node cannot be"
2635
                                   " the primary node.")
2636
      self.secondaries.append(snode_name)
2637

    
2638
    # Check lv size requirements
2639
    nodenames = [pnode.name] + self.secondaries
2640
    nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
2641

    
2642
    # Required free disk space as a function of disk and swap space
2643
    req_size_dict = {
2644
      constants.DT_DISKLESS: 0,
2645
      constants.DT_PLAIN: self.op.disk_size + self.op.swap_size,
2646
      constants.DT_LOCAL_RAID1: (self.op.disk_size + self.op.swap_size) * 2,
2647
      # 256 MB are added for drbd metadata, 128MB for each drbd device
2648
      constants.DT_REMOTE_RAID1: self.op.disk_size + self.op.swap_size + 256,
2649
    }
2650

    
2651
    if self.op.disk_template not in req_size_dict:
2652
      raise errors.ProgrammerError("Disk template '%s' size requirement"
2653
                                   " is unknown" %  self.op.disk_template)
2654

    
2655
    req_size = req_size_dict[self.op.disk_template]
2656

    
2657
    for node in nodenames:
2658
      info = nodeinfo.get(node, None)
2659
      if not info:
2660
        raise errors.OpPrereqError("Cannot get current information"
2661
                                   " from node '%s'" % nodeinfo)
2662
      if req_size > info['vg_free']:
2663
        raise errors.OpPrereqError("Not enough disk space on target node %s."
2664
                                   " %d MB available, %d MB required" %
2665
                                   (node, info['vg_free'], req_size))
2666

    
2667
    # os verification
2668
    os_obj = rpc.call_os_get([pnode.name], self.op.os_type)[pnode.name]
2669
    if not isinstance(os_obj, objects.OS):
2670
      raise errors.OpPrereqError("OS '%s' not in supported os list for"
2671
                                 " primary node"  % self.op.os_type)
2672

    
2673
    # instance verification
2674
    hostname1 = utils.LookupHostname(self.op.instance_name)
2675
    if not hostname1:
2676
      raise errors.OpPrereqError("Instance name '%s' not found in dns" %
2677
                                 self.op.instance_name)
2678

    
2679
    self.op.instance_name = instance_name = hostname1['hostname']
2680
    instance_list = self.cfg.GetInstanceList()
2681
    if instance_name in instance_list:
2682
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2683
                                 instance_name)
2684

    
2685
    ip = getattr(self.op, "ip", None)
2686
    if ip is None or ip.lower() == "none":
2687
      inst_ip = None
2688
    elif ip.lower() == "auto":
2689
      inst_ip = hostname1['ip']
2690
    else:
2691
      if not utils.IsValidIP(ip):
2692
        raise errors.OpPrereqError("given IP address '%s' doesn't look"
2693
                                   " like a valid IP" % ip)
2694
      inst_ip = ip
2695
    self.inst_ip = inst_ip
2696

    
2697
    command = ["fping", "-q", hostname1['ip']]
2698
    result = utils.RunCmd(command)
2699
    if not result.failed:
2700
      raise errors.OpPrereqError("IP %s of instance %s already in use" %
2701
                                 (hostname1['ip'], instance_name))
2702

    
2703
    # bridge verification
2704
    bridge = getattr(self.op, "bridge", None)
2705
    if bridge is None:
2706
      self.op.bridge = self.cfg.GetDefBridge()
2707
    else:
2708
      self.op.bridge = bridge
2709

    
2710
    if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
2711
      raise errors.OpPrereqError("target bridge '%s' does not exist on"
2712
                                 " destination node '%s'" %
2713
                                 (self.op.bridge, pnode.name))
2714

    
2715
    if self.op.start:
2716
      self.instance_status = 'up'
2717
    else:
2718
      self.instance_status = 'down'
2719

    
2720
  def Exec(self, feedback_fn):
2721
    """Create and add the instance to the cluster.
2722

2723
    """
2724
    instance = self.op.instance_name
2725
    pnode_name = self.pnode.name
2726

    
2727
    nic = objects.NIC(bridge=self.op.bridge, mac=self.cfg.GenerateMAC())
2728
    if self.inst_ip is not None:
2729
      nic.ip = self.inst_ip
2730

    
2731
    disks = _GenerateDiskTemplate(self.cfg,
2732
                                  self.op.disk_template,
2733
                                  instance, pnode_name,
2734
                                  self.secondaries, self.op.disk_size,
2735
                                  self.op.swap_size)
2736

    
2737
    iobj = objects.Instance(name=instance, os=self.op.os_type,
2738
                            primary_node=pnode_name,
2739
                            memory=self.op.mem_size,
2740
                            vcpus=self.op.vcpus,
2741
                            nics=[nic], disks=disks,
2742
                            disk_template=self.op.disk_template,
2743
                            status=self.instance_status,
2744
                            )
2745

    
2746
    feedback_fn("* creating instance disks...")
2747
    if not _CreateDisks(self.cfg, iobj):
2748
      _RemoveDisks(iobj, self.cfg)
2749
      raise errors.OpExecError("Device creation failed, reverting...")
2750

    
2751
    feedback_fn("adding instance %s to cluster config" % instance)
2752

    
2753
    self.cfg.AddInstance(iobj)
2754

    
2755
    if self.op.wait_for_sync:
2756
      disk_abort = not _WaitForSync(self.cfg, iobj)
2757
    elif iobj.disk_template == constants.DT_REMOTE_RAID1:
2758
      # make sure the disks are not degraded (still sync-ing is ok)
2759
      time.sleep(15)
2760
      feedback_fn("* checking mirrors status")
2761
      disk_abort = not _WaitForSync(self.cfg, iobj, oneshot=True)
2762
    else:
2763
      disk_abort = False
2764

    
2765
    if disk_abort:
2766
      _RemoveDisks(iobj, self.cfg)
2767
      self.cfg.RemoveInstance(iobj.name)
2768
      raise errors.OpExecError("There are some degraded disks for"
2769
                               " this instance")
2770

    
2771
    feedback_fn("creating os for instance %s on node %s" %
2772
                (instance, pnode_name))
2773

    
2774
    if iobj.disk_template != constants.DT_DISKLESS:
2775
      if self.op.mode == constants.INSTANCE_CREATE:
2776
        feedback_fn("* running the instance OS create scripts...")
2777
        if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
2778
          raise errors.OpExecError("could not add os for instance %s"
2779
                                   " on node %s" %
2780
                                   (instance, pnode_name))
2781

    
2782
      elif self.op.mode == constants.INSTANCE_IMPORT:
2783
        feedback_fn("* running the instance OS import scripts...")
2784
        src_node = self.op.src_node
2785
        src_image = self.src_image
2786
        if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
2787
                                                src_node, src_image):
2788
          raise errors.OpExecError("Could not import os for instance"
2789
                                   " %s on node %s" %
2790
                                   (instance, pnode_name))
2791
      else:
2792
        # also checked in the prereq part
2793
        raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
2794
                                     % self.op.mode)
2795

    
2796
    if self.op.start:
2797
      logger.Info("starting instance %s on node %s" % (instance, pnode_name))
2798
      feedback_fn("* starting instance...")
2799
      if not rpc.call_instance_start(pnode_name, iobj, None):
2800
        raise errors.OpExecError("Could not start instance")
2801

    
2802

    
2803
class LUConnectConsole(NoHooksLU):
2804
  """Connect to an instance's console.
2805

2806
  This is somewhat special in that it returns the command line that
2807
  you need to run on the master node in order to connect to the
2808
  console.
2809

2810
  """
2811
  _OP_REQP = ["instance_name"]
2812

    
2813
  def CheckPrereq(self):
2814
    """Check prerequisites.
2815

2816
    This checks that the instance is in the cluster.
2817

2818
    """
2819
    instance = self.cfg.GetInstanceInfo(
2820
      self.cfg.ExpandInstanceName(self.op.instance_name))
2821
    if instance is None:
2822
      raise errors.OpPrereqError("Instance '%s' not known" %
2823
                                 self.op.instance_name)
2824
    self.instance = instance
2825

    
2826
  def Exec(self, feedback_fn):
2827
    """Connect to the console of an instance
2828

2829
    """
2830
    instance = self.instance
2831
    node = instance.primary_node
2832

    
2833
    node_insts = rpc.call_instance_list([node])[node]
2834
    if node_insts is False:
2835
      raise errors.OpExecError("Can't connect to node %s." % node)
2836

    
2837
    if instance.name not in node_insts:
2838
      raise errors.OpExecError("Instance %s is not running." % instance.name)
2839

    
2840
    logger.Debug("connecting to console of %s on %s" % (instance.name, node))
2841

    
2842
    hyper = hypervisor.GetHypervisor()
2843
    console_cmd = hyper.GetShellCommandForConsole(instance.name)
2844
    # build ssh cmdline
2845
    argv = ["ssh", "-q", "-t"]
2846
    argv.extend(ssh.KNOWN_HOSTS_OPTS)
2847
    argv.extend(ssh.BATCH_MODE_OPTS)
2848
    argv.append(node)
2849
    argv.append(console_cmd)
2850
    return "ssh", argv
2851

    
2852

    
2853
class LUAddMDDRBDComponent(LogicalUnit):
2854
  """Adda new mirror member to an instance's disk.
2855

2856
  """
2857
  HPATH = "mirror-add"
2858
  HTYPE = constants.HTYPE_INSTANCE
2859
  _OP_REQP = ["instance_name", "remote_node", "disk_name"]
2860

    
2861
  def BuildHooksEnv(self):
2862
    """Build hooks env.
2863

2864
    This runs on the master, the primary and all the secondaries.
2865

2866
    """
2867
    env = {
2868
      "NEW_SECONDARY": self.op.remote_node,
2869
      "DISK_NAME": self.op.disk_name,
2870
      }
2871
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2872
    nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
2873
          self.op.remote_node,] + list(self.instance.secondary_nodes)
2874
    return env, nl, nl
2875

    
2876
  def CheckPrereq(self):
2877
    """Check prerequisites.
2878

2879
    This checks that the instance is in the cluster.
2880

2881
    """
2882
    instance = self.cfg.GetInstanceInfo(
2883
      self.cfg.ExpandInstanceName(self.op.instance_name))
2884
    if instance is None:
2885
      raise errors.OpPrereqError("Instance '%s' not known" %
2886
                                 self.op.instance_name)
2887
    self.instance = instance
2888

    
2889
    remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
2890
    if remote_node is None:
2891
      raise errors.OpPrereqError("Node '%s' not known" % self.op.remote_node)
2892
    self.remote_node = remote_node
2893

    
2894
    if remote_node == instance.primary_node:
2895
      raise errors.OpPrereqError("The specified node is the primary node of"
2896
                                 " the instance.")
2897

    
2898
    if instance.disk_template != constants.DT_REMOTE_RAID1:
2899
      raise errors.OpPrereqError("Instance's disk layout is not"
2900
                                 " remote_raid1.")
2901
    for disk in instance.disks:
2902
      if disk.iv_name == self.op.disk_name:
2903
        break
2904
    else:
2905
      raise errors.OpPrereqError("Can't find this device ('%s') in the"
2906
                                 " instance." % self.op.disk_name)
2907
    if len(disk.children) > 1:
2908
      raise errors.OpPrereqError("The device already has two slave"
2909
                                 " devices.\n"
2910
                                 "This would create a 3-disk raid1"
2911
                                 " which we don't allow.")
2912
    self.disk = disk
2913

    
2914
  def Exec(self, feedback_fn):
2915
    """Add the mirror component
2916

2917
    """
2918
    disk = self.disk
2919
    instance = self.instance
2920

    
2921
    remote_node = self.remote_node
2922
    lv_names = [".%s_%s" % (disk.iv_name, suf) for suf in ["data", "meta"]]
2923
    names = _GenerateUniqueNames(self.cfg, lv_names)
2924
    new_drbd = _GenerateMDDRBDBranch(self.cfg, instance.primary_node,
2925
                                     remote_node, disk.size, names)
2926

    
2927
    logger.Info("adding new mirror component on secondary")
2928
    #HARDCODE
2929
    if not _CreateBlockDevOnSecondary(self.cfg, remote_node, new_drbd, False,
2930
                                      _GetInstanceInfoText(instance)):
2931
      raise errors.OpExecError("Failed to create new component on secondary"
2932
                               " node %s" % remote_node)
2933

    
2934
    logger.Info("adding new mirror component on primary")
2935
    #HARDCODE
2936
    if not _CreateBlockDevOnPrimary(self.cfg, instance.primary_node, new_drbd,
2937
                                    _GetInstanceInfoText(instance)):
2938
      # remove secondary dev
2939
      self.cfg.SetDiskID(new_drbd, remote_node)
2940
      rpc.call_blockdev_remove(remote_node, new_drbd)
2941
      raise errors.OpExecError("Failed to create volume on primary")
2942

    
2943
    # the device exists now
2944
    # call the primary node to add the mirror to md
2945
    logger.Info("adding new mirror component to md")
2946
    if not rpc.call_blockdev_addchild(instance.primary_node,
2947
                                           disk, new_drbd):
2948
      logger.Error("Can't add mirror compoment to md!")
2949
      self.cfg.SetDiskID(new_drbd, remote_node)
2950
      if not rpc.call_blockdev_remove(remote_node, new_drbd):
2951
        logger.Error("Can't rollback on secondary")
2952
      self.cfg.SetDiskID(new_drbd, instance.primary_node)
2953
      if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
2954
        logger.Error("Can't rollback on primary")
2955
      raise errors.OpExecError("Can't add mirror component to md array")
2956

    
2957
    disk.children.append(new_drbd)
2958

    
2959
    self.cfg.AddInstance(instance)
2960

    
2961
    _WaitForSync(self.cfg, instance)
2962

    
2963
    return 0
2964

    
2965

    
2966
class LURemoveMDDRBDComponent(LogicalUnit):
2967
  """Remove a component from a remote_raid1 disk.
2968

2969
  """
2970
  HPATH = "mirror-remove"
2971
  HTYPE = constants.HTYPE_INSTANCE
2972
  _OP_REQP = ["instance_name", "disk_name", "disk_id"]
2973

    
2974
  def BuildHooksEnv(self):
2975
    """Build hooks env.
2976

2977
    This runs on the master, the primary and all the secondaries.
2978

2979
    """
2980
    env = {
2981
      "DISK_NAME": self.op.disk_name,
2982
      "DISK_ID": self.op.disk_id,
2983
      "OLD_SECONDARY": self.old_secondary,
2984
      }
2985
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2986
    nl = [self.sstore.GetMasterNode(),
2987
          self.instance.primary_node] + list(self.instance.secondary_nodes)
2988
    return env, nl, nl
2989

    
2990
  def CheckPrereq(self):
2991
    """Check prerequisites.
2992

2993
    This checks that the instance is in the cluster.
2994

2995
    """
2996
    instance = self.cfg.GetInstanceInfo(
2997
      self.cfg.ExpandInstanceName(self.op.instance_name))
2998
    if instance is None:
2999
      raise errors.OpPrereqError("Instance '%s' not known" %
3000
                                 self.op.instance_name)
3001
    self.instance = instance
3002

    
3003
    if instance.disk_template != constants.DT_REMOTE_RAID1:
3004
      raise errors.OpPrereqError("Instance's disk layout is not"
3005
                                 " remote_raid1.")
3006
    for disk in instance.disks:
3007
      if disk.iv_name == self.op.disk_name:
3008
        break
3009
    else:
3010
      raise errors.OpPrereqError("Can't find this device ('%s') in the"
3011
                                 " instance." % self.op.disk_name)
3012
    for child in disk.children:
3013
      if child.dev_type == "drbd" and child.logical_id[2] == self.op.disk_id:
3014
        break
3015
    else:
3016
      raise errors.OpPrereqError("Can't find the device with this port.")
3017

    
3018
    if len(disk.children) < 2:
3019
      raise errors.OpPrereqError("Cannot remove the last component from"
3020
                                 " a mirror.")
3021
    self.disk = disk
3022
    self.child = child
3023
    if self.child.logical_id[0] == instance.primary_node:
3024
      oid = 1
3025
    else:
3026
      oid = 0
3027
    self.old_secondary = self.child.logical_id[oid]
3028

    
3029
  def Exec(self, feedback_fn):
3030
    """Remove the mirror component
3031

3032
    """
3033
    instance = self.instance
3034
    disk = self.disk
3035
    child = self.child
3036
    logger.Info("remove mirror component")
3037
    self.cfg.SetDiskID(disk, instance.primary_node)
3038
    if not rpc.call_blockdev_removechild(instance.primary_node,
3039
                                              disk, child):
3040
      raise errors.OpExecError("Can't remove child from mirror.")
3041

    
3042
    for node in child.logical_id[:2]:
3043
      self.cfg.SetDiskID(child, node)
3044
      if not rpc.call_blockdev_remove(node, child):
3045
        logger.Error("Warning: failed to remove device from node %s,"
3046
                     " continuing operation." % node)
3047

    
3048
    disk.children.remove(child)
3049
    self.cfg.AddInstance(instance)
3050

    
3051

    
3052
class LUReplaceDisks(LogicalUnit):
3053
  """Replace the disks of an instance.
3054

3055
  """
3056
  HPATH = "mirrors-replace"
3057
  HTYPE = constants.HTYPE_INSTANCE
3058
  _OP_REQP = ["instance_name"]
3059

    
3060
  def BuildHooksEnv(self):
3061
    """Build hooks env.
3062

3063
    This runs on the master, the primary and all the secondaries.
3064

3065
    """
3066
    env = {
3067
      "NEW_SECONDARY": self.op.remote_node,
3068
      "OLD_SECONDARY": self.instance.secondary_nodes[0],
3069
      }
3070
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3071
    nl = [self.sstore.GetMasterNode(),
3072
          self.instance.primary_node] + list(self.instance.secondary_nodes)
3073
    return env, nl, nl
3074

    
3075
  def CheckPrereq(self):
3076
    """Check prerequisites.
3077

3078
    This checks that the instance is in the cluster.
3079

3080
    """
3081
    instance = self.cfg.GetInstanceInfo(
3082
      self.cfg.ExpandInstanceName(self.op.instance_name))
3083
    if instance is None:
3084
      raise errors.OpPrereqError("Instance '%s' not known" %
3085
                                 self.op.instance_name)
3086
    self.instance = instance
3087

    
3088
    if instance.disk_template != constants.DT_REMOTE_RAID1:
3089
      raise errors.OpPrereqError("Instance's disk layout is not"
3090
                                 " remote_raid1.")
3091

    
3092
    if len(instance.secondary_nodes) != 1:
3093
      raise errors.OpPrereqError("The instance has a strange layout,"
3094
                                 " expected one secondary but found %d" %
3095
                                 len(instance.secondary_nodes))
3096

    
3097
    remote_node = getattr(self.op, "remote_node", None)
3098
    if remote_node is None:
3099
      remote_node = instance.secondary_nodes[0]
3100
    else:
3101
      remote_node = self.cfg.ExpandNodeName(remote_node)
3102
      if remote_node is None:
3103
        raise errors.OpPrereqError("Node '%s' not known" %
3104
                                   self.op.remote_node)
3105
    if remote_node == instance.primary_node:
3106
      raise errors.OpPrereqError("The specified node is the primary node of"
3107
                                 " the instance.")
3108
    self.op.remote_node = remote_node
3109

    
3110
  def Exec(self, feedback_fn):
3111
    """Replace the disks of an instance.
3112

3113
    """
3114
    instance = self.instance
3115
    iv_names = {}
3116
    # start of work
3117
    remote_node = self.op.remote_node
3118
    cfg = self.cfg
3119
    for dev in instance.disks:
3120
      size = dev.size
3121
      lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3122
      names = _GenerateUniqueNames(cfg, lv_names)
3123
      new_drbd = _GenerateMDDRBDBranch(cfg, instance.primary_node,
3124
                                       remote_node, size, names)
3125
      iv_names[dev.iv_name] = (dev, dev.children[0], new_drbd)
3126
      logger.Info("adding new mirror component on secondary for %s" %
3127
                  dev.iv_name)
3128
      #HARDCODE
3129
      if not _CreateBlockDevOnSecondary(cfg, remote_node, new_drbd, False,
3130
                                        _GetInstanceInfoText(instance)):
3131
        raise errors.OpExecError("Failed to create new component on"
3132
                                 " secondary node %s\n"
3133
                                 "Full abort, cleanup manually!" %
3134
                                 remote_node)
3135

    
3136
      logger.Info("adding new mirror component on primary")
3137
      #HARDCODE
3138
      if not _CreateBlockDevOnPrimary(cfg, instance.primary_node, new_drbd,
3139
                                      _GetInstanceInfoText(instance)):
3140
        # remove secondary dev
3141
        cfg.SetDiskID(new_drbd, remote_node)
3142
        rpc.call_blockdev_remove(remote_node, new_drbd)
3143
        raise errors.OpExecError("Failed to create volume on primary!\n"
3144
                                 "Full abort, cleanup manually!!")
3145

    
3146
      # the device exists now
3147
      # call the primary node to add the mirror to md
3148
      logger.Info("adding new mirror component to md")
3149
      if not rpc.call_blockdev_addchild(instance.primary_node, dev,
3150
                                        new_drbd):
3151
        logger.Error("Can't add mirror compoment to md!")
3152
        cfg.SetDiskID(new_drbd, remote_node)
3153
        if not rpc.call_blockdev_remove(remote_node, new_drbd):
3154
          logger.Error("Can't rollback on secondary")
3155
        cfg.SetDiskID(new_drbd, instance.primary_node)
3156
        if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3157
          logger.Error("Can't rollback on primary")
3158
        raise errors.OpExecError("Full abort, cleanup manually!!")
3159

    
3160
      dev.children.append(new_drbd)
3161
      cfg.AddInstance(instance)
3162

    
3163
    # this can fail as the old devices are degraded and _WaitForSync
3164
    # does a combined result over all disks, so we don't check its
3165
    # return value
3166
    _WaitForSync(cfg, instance, unlock=True)
3167

    
3168
    # so check manually all the devices
3169
    for name in iv_names:
3170
      dev, child, new_drbd = iv_names[name]
3171
      cfg.SetDiskID(dev, instance.primary_node)
3172
      is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3173
      if is_degr:
3174
        raise errors.OpExecError("MD device %s is degraded!" % name)
3175
      cfg.SetDiskID(new_drbd, instance.primary_node)
3176
      is_degr = rpc.call_blockdev_find(instance.primary_node, new_drbd)[5]
3177
      if is_degr:
3178
        raise errors.OpExecError("New drbd device %s is degraded!" % name)
3179

    
3180
    for name in iv_names:
3181
      dev, child, new_drbd = iv_names[name]
3182
      logger.Info("remove mirror %s component" % name)
3183
      cfg.SetDiskID(dev, instance.primary_node)
3184
      if not rpc.call_blockdev_removechild(instance.primary_node,
3185
                                                dev, child):
3186
        logger.Error("Can't remove child from mirror, aborting"
3187
                     " *this device cleanup*.\nYou need to cleanup manually!!")
3188
        continue
3189

    
3190
      for node in child.logical_id[:2]:
3191
        logger.Info("remove child device on %s" % node)
3192
        cfg.SetDiskID(child, node)
3193
        if not rpc.call_blockdev_remove(node, child):
3194
          logger.Error("Warning: failed to remove device from node %s,"
3195
                       " continuing operation." % node)
3196

    
3197
      dev.children.remove(child)
3198

    
3199
      cfg.AddInstance(instance)
3200

    
3201

    
3202
class LUQueryInstanceData(NoHooksLU):
3203
  """Query runtime instance data.
3204

3205
  """
3206
  _OP_REQP = ["instances"]
3207

    
3208
  def CheckPrereq(self):
3209
    """Check prerequisites.
3210

3211
    This only checks the optional instance list against the existing names.
3212

3213
    """
3214
    if not isinstance(self.op.instances, list):
3215
      raise errors.OpPrereqError("Invalid argument type 'instances'")
3216
    if self.op.instances:
3217
      self.wanted_instances = []
3218
      names = self.op.instances
3219
      for name in names:
3220
        instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
3221
        if instance is None:
3222
          raise errors.OpPrereqError("No such instance name '%s'" % name)
3223
      self.wanted_instances.append(instance)
3224
    else:
3225
      self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
3226
                               in self.cfg.GetInstanceList()]
3227
    return
3228

    
3229

    
3230
  def _ComputeDiskStatus(self, instance, snode, dev):
3231
    """Compute block device status.
3232

3233
    """
3234
    self.cfg.SetDiskID(dev, instance.primary_node)
3235
    dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
3236
    if dev.dev_type == "drbd":
3237
      # we change the snode then (otherwise we use the one passed in)
3238
      if dev.logical_id[0] == instance.primary_node:
3239
        snode = dev.logical_id[1]
3240
      else:
3241
        snode = dev.logical_id[0]
3242

    
3243
    if snode:
3244
      self.cfg.SetDiskID(dev, snode)
3245
      dev_sstatus = rpc.call_blockdev_find(snode, dev)
3246
    else:
3247
      dev_sstatus = None
3248

    
3249
    if dev.children:
3250
      dev_children = [self._ComputeDiskStatus(instance, snode, child)
3251
                      for child in dev.children]
3252
    else:
3253
      dev_children = []
3254

    
3255
    data = {
3256
      "iv_name": dev.iv_name,
3257
      "dev_type": dev.dev_type,
3258
      "logical_id": dev.logical_id,
3259
      "physical_id": dev.physical_id,
3260
      "pstatus": dev_pstatus,
3261
      "sstatus": dev_sstatus,
3262
      "children": dev_children,
3263
      }
3264

    
3265
    return data
3266

    
3267
  def Exec(self, feedback_fn):
3268
    """Gather and return data"""
3269
    result = {}
3270
    for instance in self.wanted_instances:
3271
      remote_info = rpc.call_instance_info(instance.primary_node,
3272
                                                instance.name)
3273
      if remote_info and "state" in remote_info:
3274
        remote_state = "up"
3275
      else:
3276
        remote_state = "down"
3277
      if instance.status == "down":
3278
        config_state = "down"
3279
      else:
3280
        config_state = "up"
3281

    
3282
      disks = [self._ComputeDiskStatus(instance, None, device)
3283
               for device in instance.disks]
3284

    
3285
      idict = {
3286
        "name": instance.name,
3287
        "config_state": config_state,
3288
        "run_state": remote_state,
3289
        "pnode": instance.primary_node,
3290
        "snodes": instance.secondary_nodes,
3291
        "os": instance.os,
3292
        "memory": instance.memory,
3293
        "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
3294
        "disks": disks,
3295
        }
3296

    
3297
      result[instance.name] = idict
3298

    
3299
    return result
3300

    
3301

    
3302
class LUQueryNodeData(NoHooksLU):
3303
  """Logical unit for querying node data.
3304

3305
  """
3306
  _OP_REQP = ["nodes"]
3307

    
3308
  def CheckPrereq(self):
3309
    """Check prerequisites.
3310

3311
    This only checks the optional node list against the existing names.
3312

3313
    """
3314
    self.wanted_nodes = _GetWantedNodes(self, self.op.nodes)
3315

    
3316
  def Exec(self, feedback_fn):
3317
    """Compute and return the list of nodes.
3318

3319
    """
3320
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
3321
             in self.cfg.GetInstanceList()]
3322
    result = []
3323
    for node in [self.cfg.GetNodeInfo(name) for name in self.wanted_nodes]:
3324
      result.append((node.name, node.primary_ip, node.secondary_ip,
3325
                     [inst.name for inst in ilist
3326
                      if inst.primary_node == node.name],
3327
                     [inst.name for inst in ilist
3328
                      if node.name in inst.secondary_nodes],
3329
                     ))
3330
    return result
3331

    
3332

    
3333
class LUSetInstanceParms(LogicalUnit):
3334
  """Modifies an instances's parameters.
3335

3336
  """
3337
  HPATH = "instance-modify"
3338
  HTYPE = constants.HTYPE_INSTANCE
3339
  _OP_REQP = ["instance_name"]
3340

    
3341
  def BuildHooksEnv(self):
3342
    """Build hooks env.
3343

3344
    This runs on the master, primary and secondaries.
3345

3346
    """
3347
    args = dict()
3348
    if self.mem:
3349
      args['memory'] = self.mem
3350
    if self.vcpus:
3351
      args['vcpus'] = self.vcpus
3352
    if self.do_ip or self.do_bridge:
3353
      if self.do_ip:
3354
        ip = self.ip
3355
      else:
3356
        ip = self.instance.nics[0].ip
3357
      if self.bridge:
3358
        bridge = self.bridge
3359
      else:
3360
        bridge = self.instance.nics[0].bridge
3361
      args['nics'] = [(ip, bridge)]
3362
    env = _BuildInstanceHookEnvByObject(self.instance, override=args)
3363
    nl = [self.sstore.GetMasterNode(),
3364
          self.instance.primary_node] + list(self.instance.secondary_nodes)
3365
    return env, nl, nl
3366

    
3367
  def CheckPrereq(self):
3368
    """Check prerequisites.
3369

3370
    This only checks the instance list against the existing names.
3371

3372
    """
3373
    self.mem = getattr(self.op, "mem", None)
3374
    self.vcpus = getattr(self.op, "vcpus", None)
3375
    self.ip = getattr(self.op, "ip", None)
3376
    self.bridge = getattr(self.op, "bridge", None)
3377
    if [self.mem, self.vcpus, self.ip, self.bridge].count(None) == 4:
3378
      raise errors.OpPrereqError("No changes submitted")
3379
    if self.mem is not None:
3380
      try:
3381
        self.mem = int(self.mem)
3382
      except ValueError, err:
3383
        raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
3384
    if self.vcpus is not None:
3385
      try:
3386
        self.vcpus = int(self.vcpus)
3387
      except ValueError, err:
3388
        raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
3389
    if self.ip is not None:
3390
      self.do_ip = True
3391
      if self.ip.lower() == "none":
3392
        self.ip = None
3393
      else:
3394
        if not utils.IsValidIP(self.ip):
3395
          raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
3396
    else:
3397
      self.do_ip = False
3398
    self.do_bridge = (self.bridge is not None)
3399

    
3400
    instance = self.cfg.GetInstanceInfo(
3401
      self.cfg.ExpandInstanceName(self.op.instance_name))
3402
    if instance is None:
3403
      raise errors.OpPrereqError("No such instance name '%s'" %
3404
                                 self.op.instance_name)
3405
    self.op.instance_name = instance.name
3406
    self.instance = instance
3407
    return
3408

    
3409
  def Exec(self, feedback_fn):
3410
    """Modifies an instance.
3411

3412
    All parameters take effect only at the next restart of the instance.
3413
    """
3414
    result = []
3415
    instance = self.instance
3416
    if self.mem:
3417
      instance.memory = self.mem
3418
      result.append(("mem", self.mem))
3419
    if self.vcpus:
3420
      instance.vcpus = self.vcpus
3421
      result.append(("vcpus",  self.vcpus))
3422
    if self.do_ip:
3423
      instance.nics[0].ip = self.ip
3424
      result.append(("ip", self.ip))
3425
    if self.bridge:
3426
      instance.nics[0].bridge = self.bridge
3427
      result.append(("bridge", self.bridge))
3428

    
3429
    self.cfg.AddInstance(instance)
3430

    
3431
    return result
3432

    
3433

    
3434
class LUQueryExports(NoHooksLU):
3435
  """Query the exports list
3436

3437
  """
3438
  _OP_REQP = []
3439

    
3440
  def CheckPrereq(self):
3441
    """Check that the nodelist contains only existing nodes.
3442

3443
    """
3444
    self.nodes = _GetWantedNodes(self, getattr(self.op, "nodes", None))
3445

    
3446
  def Exec(self, feedback_fn):
3447
    """Compute the list of all the exported system images.
3448

3449
    Returns:
3450
      a dictionary with the structure node->(export-list)
3451
      where export-list is a list of the instances exported on
3452
      that node.
3453

3454
    """
3455
    return rpc.call_export_list(self.nodes)
3456

    
3457

    
3458
class LUExportInstance(LogicalUnit):
3459
  """Export an instance to an image in the cluster.
3460

3461
  """
3462
  HPATH = "instance-export"
3463
  HTYPE = constants.HTYPE_INSTANCE
3464
  _OP_REQP = ["instance_name", "target_node", "shutdown"]
3465

    
3466
  def BuildHooksEnv(self):
3467
    """Build hooks env.
3468

3469
    This will run on the master, primary node and target node.
3470

3471
    """
3472
    env = {
3473
      "EXPORT_NODE": self.op.target_node,
3474
      "EXPORT_DO_SHUTDOWN": self.op.shutdown,
3475
      }
3476
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3477
    nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
3478
          self.op.target_node]
3479
    return env, nl, nl
3480

    
3481
  def CheckPrereq(self):
3482
    """Check prerequisites.
3483

3484
    This checks that the instance name is a valid one.
3485

3486
    """
3487
    instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
3488
    self.instance = self.cfg.GetInstanceInfo(instance_name)
3489
    if self.instance is None:
3490
      raise errors.OpPrereqError("Instance '%s' not found" %
3491
                                 self.op.instance_name)
3492

    
3493
    # node verification
3494
    dst_node_short = self.cfg.ExpandNodeName(self.op.target_node)
3495
    self.dst_node = self.cfg.GetNodeInfo(dst_node_short)
3496

    
3497
    if self.dst_node is None:
3498
      raise errors.OpPrereqError("Destination node '%s' is unknown." %
3499
                                 self.op.target_node)
3500
    self.op.target_node = self.dst_node.name
3501

    
3502
  def Exec(self, feedback_fn):
3503
    """Export an instance to an image in the cluster.
3504

3505
    """
3506
    instance = self.instance
3507
    dst_node = self.dst_node
3508
    src_node = instance.primary_node
3509
    # shutdown the instance, unless requested not to do so
3510
    if self.op.shutdown:
3511
      op = opcodes.OpShutdownInstance(instance_name=instance.name)
3512
      self.processor.ChainOpCode(op, feedback_fn)
3513

    
3514
    vgname = self.cfg.GetVGName()
3515

    
3516
    snap_disks = []
3517

    
3518
    try:
3519
      for disk in instance.disks:
3520
        if disk.iv_name == "sda":
3521
          # new_dev_name will be a snapshot of an lvm leaf of the one we passed
3522
          new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
3523

    
3524
          if not new_dev_name:
3525
            logger.Error("could not snapshot block device %s on node %s" %
3526
                         (disk.logical_id[1], src_node))
3527
          else:
3528
            new_dev = objects.Disk(dev_type="lvm", size=disk.size,
3529
                                      logical_id=(vgname, new_dev_name),
3530
                                      physical_id=(vgname, new_dev_name),
3531
                                      iv_name=disk.iv_name)
3532
            snap_disks.append(new_dev)
3533

    
3534
    finally:
3535
      if self.op.shutdown:
3536
        op = opcodes.OpStartupInstance(instance_name=instance.name,
3537
                                       force=False)
3538
        self.processor.ChainOpCode(op, feedback_fn)
3539

    
3540
    # TODO: check for size
3541

    
3542
    for dev in snap_disks:
3543
      if not rpc.call_snapshot_export(src_node, dev, dst_node.name,
3544
                                           instance):
3545
        logger.Error("could not export block device %s from node"
3546
                     " %s to node %s" %
3547
                     (dev.logical_id[1], src_node, dst_node.name))
3548
      if not rpc.call_blockdev_remove(src_node, dev):
3549
        logger.Error("could not remove snapshot block device %s from"
3550
                     " node %s" % (dev.logical_id[1], src_node))
3551

    
3552
    if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
3553
      logger.Error("could not finalize export for instance %s on node %s" %
3554
                   (instance.name, dst_node.name))
3555

    
3556
    nodelist = self.cfg.GetNodeList()
3557
    nodelist.remove(dst_node.name)
3558

    
3559
    # on one-node clusters nodelist will be empty after the removal
3560
    # if we proceed the backup would be removed because OpQueryExports
3561
    # substitutes an empty list with the full cluster node list.
3562
    if nodelist:
3563
      op = opcodes.OpQueryExports(nodes=nodelist)
3564
      exportlist = self.processor.ChainOpCode(op, feedback_fn)
3565
      for node in exportlist:
3566
        if instance.name in exportlist[node]:
3567
          if not rpc.call_export_remove(node, instance.name):
3568
            logger.Error("could not remove older export for instance %s"
3569
                         " on node %s" % (instance.name, node))
3570

    
3571

    
3572
class TagsLU(NoHooksLU):
3573
  """Generic tags LU.
3574

3575
  This is an abstract class which is the parent of all the other tags LUs.
3576

3577
  """
3578
  def CheckPrereq(self):
3579
    """Check prerequisites.
3580

3581
    """
3582
    if self.op.kind == constants.TAG_CLUSTER:
3583
      self.target = self.cfg.GetClusterInfo()
3584
    elif self.op.kind == constants.TAG_NODE:
3585
      name = self.cfg.ExpandNodeName(self.op.name)
3586
      if name is None:
3587
        raise errors.OpPrereqError("Invalid node name (%s)" %
3588
                                   (self.op.name,))
3589
      self.op.name = name
3590
      self.target = self.cfg.GetNodeInfo(name)
3591
    elif self.op.kind == constants.TAG_INSTANCE:
3592
      name = self.cfg.ExpandInstanceName(name)
3593
      if name is None:
3594
        raise errors.OpPrereqError("Invalid instance name (%s)" %
3595
                                   (self.op.name,))
3596
      self.op.name = name
3597
      self.target = self.cfg.GetInstanceInfo(name)
3598
    else:
3599
      raise errors.OpPrereqError("Wrong tag type requested (%s)" %
3600
                                 str(self.op.kind))
3601

    
3602

    
3603
class LUGetTags(TagsLU):
3604
  """Returns the tags of a given object.
3605

3606
  """
3607
  _OP_REQP = ["kind", "name"]
3608

    
3609
  def Exec(self, feedback_fn):
3610
    """Returns the tag list.
3611

3612
    """
3613
    return self.target.GetTags()
3614

    
3615

    
3616
class LUAddTag(TagsLU):
3617
  """Sets a tag on a given object.
3618

3619
  """
3620
  _OP_REQP = ["kind", "name", "tag"]
3621

    
3622
  def CheckPrereq(self):
3623
    """Check prerequisites.
3624

3625
    This checks the type and length of the tag name and value.
3626

3627
    """
3628
    TagsLU.CheckPrereq(self)
3629
    objects.TaggableObject.ValidateTag(self.op.tag)
3630

    
3631
  def Exec(self, feedback_fn):
3632
    """Sets the tag.
3633

3634
    """
3635
    try:
3636
      self.target.AddTag(self.op.tag)
3637
    except errors.TagError, err:
3638
      raise errors.OpExecError("Error while setting tag: %s" % str(err))
3639
    try:
3640
      self.cfg.Update(self.target)
3641
    except errors.ConfigurationError:
3642
      raise errors.OpRetryError("There has been a modification to the"
3643
                                " config file and the operation has been"
3644
                                " aborted. Please retry.")
3645

    
3646

    
3647
class LUDelTag(TagsLU):
3648
  """Delete a tag from a given object.
3649

3650
  """
3651
  _OP_REQP = ["kind", "name", "tag"]
3652

    
3653
  def CheckPrereq(self):
3654
    """Check prerequisites.
3655

3656
    This checks that we have the given tag.
3657

3658
    """
3659
    TagsLU.CheckPrereq(self)
3660
    objects.TaggableObject.ValidateTag(self.op.tag)
3661
    if self.op.tag not in self.target.GetTags():
3662
      raise errors.OpPrereqError("Tag not found")
3663

    
3664
  def Exec(self, feedback_fn):
3665
    """Remove the tag from the object.
3666

3667
    """
3668
    self.target.RemoveTag(self.op.tag)
3669
    try:
3670
      self.cfg.Update(self.target)
3671
    except errors.ConfigurationError:
3672
      raise errors.OpRetryError("There has been a modification to the"
3673
                                " config file and the operation has been"
3674
                                " aborted. Please retry.")