Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ db915bd1

History | View | Annotate | Download (110.2 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0613,W0201
25

    
26
import os
27
import os.path
28
import sha
29
import socket
30
import time
31
import tempfile
32
import re
33
import platform
34

    
35
from ganeti import rpc
36
from ganeti import ssh
37
from ganeti import logger
38
from ganeti import utils
39
from ganeti import errors
40
from ganeti import hypervisor
41
from ganeti import config
42
from ganeti import constants
43
from ganeti import objects
44
from ganeti import opcodes
45
from ganeti import ssconf
46

    
47
class LogicalUnit(object):
48
  """Logical Unit base class..
49

50
  Subclasses must follow these rules:
51
    - implement CheckPrereq which also fills in the opcode instance
52
      with all the fields (even if as None)
53
    - implement Exec
54
    - implement BuildHooksEnv
55
    - redefine HPATH and HTYPE
56
    - optionally redefine their run requirements (REQ_CLUSTER,
57
      REQ_MASTER); note that all commands require root permissions
58

59
  """
60
  HPATH = None
61
  HTYPE = None
62
  _OP_REQP = []
63
  REQ_CLUSTER = True
64
  REQ_MASTER = True
65

    
66
  def __init__(self, processor, op, cfg, sstore):
67
    """Constructor for LogicalUnit.
68

69
    This needs to be overriden in derived classes in order to check op
70
    validity.
71

72
    """
73
    self.processor = processor
74
    self.op = op
75
    self.cfg = cfg
76
    self.sstore = sstore
77
    for attr_name in self._OP_REQP:
78
      attr_val = getattr(op, attr_name, None)
79
      if attr_val is None:
80
        raise errors.OpPrereqError, ("Required parameter '%s' missing" %
81
                                     attr_name)
82
    if self.REQ_CLUSTER:
83
      if not cfg.IsCluster():
84
        raise errors.OpPrereqError, ("Cluster not initialized yet,"
85
                                     " use 'gnt-cluster init' first.")
86
      if self.REQ_MASTER:
87
        master = sstore.GetMasterNode()
88
        if master != socket.gethostname():
89
          raise errors.OpPrereqError, ("Commands must be run on the master"
90
                                       " node %s" % master)
91

    
92
  def CheckPrereq(self):
93
    """Check prerequisites for this LU.
94

95
    This method should check that the prerequisites for the execution
96
    of this LU are fulfilled. It can do internode communication, but
97
    it should be idempotent - no cluster or system changes are
98
    allowed.
99

100
    The method should raise errors.OpPrereqError in case something is
101
    not fulfilled. Its return value is ignored.
102

103
    This method should also update all the parameters of the opcode to
104
    their canonical form; e.g. a short node name must be fully
105
    expanded after this method has successfully completed (so that
106
    hooks, logging, etc. work correctly).
107

108
    """
109
    raise NotImplementedError
110

    
111
  def Exec(self, feedback_fn):
112
    """Execute the LU.
113

114
    This method should implement the actual work. It should raise
115
    errors.OpExecError for failures that are somewhat dealt with in
116
    code, or expected.
117

118
    """
119
    raise NotImplementedError
120

    
121
  def BuildHooksEnv(self):
122
    """Build hooks environment for this LU.
123

124
    This method should return a three-node tuple consisting of: a dict
125
    containing the environment that will be used for running the
126
    specific hook for this LU, a list of node names on which the hook
127
    should run before the execution, and a list of node names on which
128
    the hook should run after the execution.
129

130
    The keys of the dict must not have 'GANETI_' prefixed as this will
131
    be handled in the hooks runner. Also note additional keys will be
132
    added by the hooks runner. If the LU doesn't define any
133
    environment, an empty dict (and not None) should be returned.
134

135
    As for the node lists, the master should not be included in the
136
    them, as it will be added by the hooks runner in case this LU
137
    requires a cluster to run on (otherwise we don't have a node
138
    list). No nodes should be returned as an empty list (and not
139
    None).
140

141
    Note that if the HPATH for a LU class is None, this function will
142
    not be called.
143

144
    """
145
    raise NotImplementedError
146

    
147

    
148
class NoHooksLU(LogicalUnit):
149
  """Simple LU which runs no hooks.
150

151
  This LU is intended as a parent for other LogicalUnits which will
152
  run no hooks, in order to reduce duplicate code.
153

154
  """
155
  HPATH = None
156
  HTYPE = None
157

    
158
  def BuildHooksEnv(self):
159
    """Build hooks env.
160

161
    This is a no-op, since we don't run hooks.
162

163
    """
164
    return
165

    
166

    
167
def _GetWantedNodes(lu, nodes):
168
  """Returns list of checked and expanded nodes.
169

170
  Args:
171
    nodes: List of nodes (strings) or None for all
172

173
  """
174
  if nodes is not None and not isinstance(nodes, list):
175
    raise errors.OpPrereqError, "Invalid argument type 'nodes'"
176

    
177
  if nodes:
178
    wanted_nodes = []
179

    
180
    for name in nodes:
181
      node = lu.cfg.GetNodeInfo(lu.cfg.ExpandNodeName(name))
182
      if node is None:
183
        raise errors.OpPrereqError, ("No such node name '%s'" % name)
184
    wanted_nodes.append(node)
185

    
186
    return wanted_nodes
187
  else:
188
    return [lu.cfg.GetNodeInfo(name) for name in lu.cfg.GetNodeList()]
189

    
190

    
191
def _CheckOutputFields(static, dynamic, selected):
192
  """Checks whether all selected fields are valid.
193

194
  Args:
195
    static: Static fields
196
    dynamic: Dynamic fields
197

198
  """
199
  static_fields = frozenset(static)
200
  dynamic_fields = frozenset(dynamic)
201

    
202
  all_fields = static_fields | dynamic_fields
203

    
204
  if not all_fields.issuperset(selected):
205
    raise errors.OpPrereqError, ("Unknown output fields selected: %s"
206
                                 % ",".join(frozenset(selected).
207
                                            difference(all_fields)))
208

    
209

    
210
def _UpdateEtcHosts(fullnode, ip):
211
  """Ensure a node has a correct entry in /etc/hosts.
212

213
  Args:
214
    fullnode - Fully qualified domain name of host. (str)
215
    ip       - IPv4 address of host (str)
216

217
  """
218
  node = fullnode.split(".", 1)[0]
219

    
220
  f = open('/etc/hosts', 'r+')
221

    
222
  inthere = False
223

    
224
  save_lines = []
225
  add_lines = []
226
  removed = False
227

    
228
  while True:
229
    rawline = f.readline()
230

    
231
    if not rawline:
232
      # End of file
233
      break
234

    
235
    line = rawline.split('\n')[0]
236

    
237
    # Strip off comments
238
    line = line.split('#')[0]
239

    
240
    if not line:
241
      # Entire line was comment, skip
242
      save_lines.append(rawline)
243
      continue
244

    
245
    fields = line.split()
246

    
247
    haveall = True
248
    havesome = False
249
    for spec in [ ip, fullnode, node ]:
250
      if spec not in fields:
251
        haveall = False
252
      if spec in fields:
253
        havesome = True
254

    
255
    if haveall:
256
      inthere = True
257
      save_lines.append(rawline)
258
      continue
259

    
260
    if havesome and not haveall:
261
      # Line (old, or manual?) which is missing some.  Remove.
262
      removed = True
263
      continue
264

    
265
    save_lines.append(rawline)
266

    
267
  if not inthere:
268
    add_lines.append('%s\t%s %s\n' % (ip, fullnode, node))
269

    
270
  if removed:
271
    if add_lines:
272
      save_lines = save_lines + add_lines
273

    
274
    # We removed a line, write a new file and replace old.
275
    fd, tmpname = tempfile.mkstemp('tmp', 'hosts_', '/etc')
276
    newfile = os.fdopen(fd, 'w')
277
    newfile.write(''.join(save_lines))
278
    newfile.close()
279
    os.rename(tmpname, '/etc/hosts')
280

    
281
  elif add_lines:
282
    # Simply appending a new line will do the trick.
283
    f.seek(0, 2)
284
    for add in add_lines:
285
      f.write(add)
286

    
287
  f.close()
288

    
289

    
290
def _UpdateKnownHosts(fullnode, ip, pubkey):
291
  """Ensure a node has a correct known_hosts entry.
292

293
  Args:
294
    fullnode - Fully qualified domain name of host. (str)
295
    ip       - IPv4 address of host (str)
296
    pubkey   - the public key of the cluster
297

298
  """
299
  if os.path.exists('/etc/ssh/ssh_known_hosts'):
300
    f = open('/etc/ssh/ssh_known_hosts', 'r+')
301
  else:
302
    f = open('/etc/ssh/ssh_known_hosts', 'w+')
303

    
304
  inthere = False
305

    
306
  save_lines = []
307
  add_lines = []
308
  removed = False
309

    
310
  while True:
311
    rawline = f.readline()
312
    logger.Debug('read %s' % (repr(rawline),))
313

    
314
    if not rawline:
315
      # End of file
316
      break
317

    
318
    line = rawline.split('\n')[0]
319

    
320
    parts = line.split(' ')
321
    fields = parts[0].split(',')
322
    key = parts[2]
323

    
324
    haveall = True
325
    havesome = False
326
    for spec in [ ip, fullnode ]:
327
      if spec not in fields:
328
        haveall = False
329
      if spec in fields:
330
        havesome = True
331

    
332
    logger.Debug("key, pubkey = %s." % (repr((key, pubkey)),))
333
    if haveall and key == pubkey:
334
      inthere = True
335
      save_lines.append(rawline)
336
      logger.Debug("Keeping known_hosts '%s'." % (repr(rawline),))
337
      continue
338

    
339
    if havesome and (not haveall or key != pubkey):
340
      removed = True
341
      logger.Debug("Discarding known_hosts '%s'." % (repr(rawline),))
342
      continue
343

    
344
    save_lines.append(rawline)
345

    
346
  if not inthere:
347
    add_lines.append('%s,%s ssh-rsa %s\n' % (fullnode, ip, pubkey))
348
    logger.Debug("Adding known_hosts '%s'." % (repr(add_lines[-1]),))
349

    
350
  if removed:
351
    save_lines = save_lines + add_lines
352

    
353
    # Write a new file and replace old.
354
    fd, tmpname = tempfile.mkstemp('tmp', 'ssh_known_hosts_', '/etc/ssh')
355
    newfile = os.fdopen(fd, 'w')
356
    newfile.write(''.join(save_lines))
357
    newfile.close()
358
    logger.Debug("Wrote new known_hosts.")
359
    os.rename(tmpname, '/etc/ssh/ssh_known_hosts')
360

    
361
  elif add_lines:
362
    # Simply appending a new line will do the trick.
363
    f.seek(0, 2)
364
    for add in add_lines:
365
      f.write(add)
366

    
367
  f.close()
368

    
369

    
370
def _HasValidVG(vglist, vgname):
371
  """Checks if the volume group list is valid.
372

373
  A non-None return value means there's an error, and the return value
374
  is the error message.
375

376
  """
377
  vgsize = vglist.get(vgname, None)
378
  if vgsize is None:
379
    return "volume group '%s' missing" % vgname
380
  elif vgsize < 20480:
381
    return ("volume group '%s' too small (20480MiB required, %dMib found)" %
382
            (vgname, vgsize))
383
  return None
384

    
385

    
386
def _InitSSHSetup(node):
387
  """Setup the SSH configuration for the cluster.
388

389

390
  This generates a dsa keypair for root, adds the pub key to the
391
  permitted hosts and adds the hostkey to its own known hosts.
392

393
  Args:
394
    node: the name of this host as a fqdn
395

396
  """
397
  utils.RemoveFile('/root/.ssh/known_hosts')
398

    
399
  if os.path.exists('/root/.ssh/id_dsa'):
400
    utils.CreateBackup('/root/.ssh/id_dsa')
401
  if os.path.exists('/root/.ssh/id_dsa.pub'):
402
    utils.CreateBackup('/root/.ssh/id_dsa.pub')
403

    
404
  utils.RemoveFile('/root/.ssh/id_dsa')
405
  utils.RemoveFile('/root/.ssh/id_dsa.pub')
406

    
407
  result = utils.RunCmd(["ssh-keygen", "-t", "dsa",
408
                         "-f", "/root/.ssh/id_dsa",
409
                         "-q", "-N", ""])
410
  if result.failed:
411
    raise errors.OpExecError, ("could not generate ssh keypair, error %s" %
412
                               result.output)
413

    
414
  f = open('/root/.ssh/id_dsa.pub', 'r')
415
  try:
416
    utils.AddAuthorizedKey('/root/.ssh/authorized_keys', f.read(8192))
417
  finally:
418
    f.close()
419

    
420

    
421
def _InitGanetiServerSetup(ss):
422
  """Setup the necessary configuration for the initial node daemon.
423

424
  This creates the nodepass file containing the shared password for
425
  the cluster and also generates the SSL certificate.
426

427
  """
428
  # Create pseudo random password
429
  randpass = sha.new(os.urandom(64)).hexdigest()
430
  # and write it into sstore
431
  ss.SetKey(ss.SS_NODED_PASS, randpass)
432

    
433
  result = utils.RunCmd(["openssl", "req", "-new", "-newkey", "rsa:1024",
434
                         "-days", str(365*5), "-nodes", "-x509",
435
                         "-keyout", constants.SSL_CERT_FILE,
436
                         "-out", constants.SSL_CERT_FILE, "-batch"])
437
  if result.failed:
438
    raise errors.OpExecError, ("could not generate server ssl cert, command"
439
                               " %s had exitcode %s and error message %s" %
440
                               (result.cmd, result.exit_code, result.output))
441

    
442
  os.chmod(constants.SSL_CERT_FILE, 0400)
443

    
444
  result = utils.RunCmd([constants.NODE_INITD_SCRIPT, "restart"])
445

    
446
  if result.failed:
447
    raise errors.OpExecError, ("could not start the node daemon, command %s"
448
                               " had exitcode %s and error %s" %
449
                               (result.cmd, result.exit_code, result.output))
450

    
451

    
452
class LUInitCluster(LogicalUnit):
453
  """Initialise the cluster.
454

455
  """
456
  HPATH = "cluster-init"
457
  HTYPE = constants.HTYPE_CLUSTER
458
  _OP_REQP = ["cluster_name", "hypervisor_type", "vg_name", "mac_prefix",
459
              "def_bridge", "master_netdev"]
460
  REQ_CLUSTER = False
461

    
462
  def BuildHooksEnv(self):
463
    """Build hooks env.
464

465
    Notes: Since we don't require a cluster, we must manually add
466
    ourselves in the post-run node list.
467

468
    """
469
    env = {"CLUSTER": self.op.cluster_name,
470
           "MASTER": self.hostname['hostname_full']}
471
    return env, [], [self.hostname['hostname_full']]
472

    
473
  def CheckPrereq(self):
474
    """Verify that the passed name is a valid one.
475

476
    """
477
    if config.ConfigWriter.IsCluster():
478
      raise errors.OpPrereqError, ("Cluster is already initialised")
479

    
480
    hostname_local = socket.gethostname()
481
    self.hostname = hostname = utils.LookupHostname(hostname_local)
482
    if not hostname:
483
      raise errors.OpPrereqError, ("Cannot resolve my own hostname ('%s')" %
484
                                   hostname_local)
485

    
486
    self.clustername = clustername = utils.LookupHostname(self.op.cluster_name)
487
    if not clustername:
488
      raise errors.OpPrereqError, ("Cannot resolve given cluster name ('%s')"
489
                                   % self.op.cluster_name)
490

    
491
    result = utils.RunCmd(["fping", "-S127.0.0.1", "-q", hostname['ip']])
492
    if result.failed:
493
      raise errors.OpPrereqError, ("Inconsistency: this host's name resolves"
494
                                   " to %s,\nbut this ip address does not"
495
                                   " belong to this host."
496
                                   " Aborting." % hostname['ip'])
497

    
498
    secondary_ip = getattr(self.op, "secondary_ip", None)
499
    if secondary_ip and not utils.IsValidIP(secondary_ip):
500
      raise errors.OpPrereqError, ("Invalid secondary ip given")
501
    if secondary_ip and secondary_ip != hostname['ip']:
502
      result = utils.RunCmd(["fping", "-S127.0.0.1", "-q", secondary_ip])
503
      if result.failed:
504
        raise errors.OpPrereqError, ("You gave %s as secondary IP,\n"
505
                                     "but it does not belong to this host." %
506
                                     secondary_ip)
507
    self.secondary_ip = secondary_ip
508

    
509
    # checks presence of the volume group given
510
    vgstatus = _HasValidVG(utils.ListVolumeGroups(), self.op.vg_name)
511

    
512
    if vgstatus:
513
      raise errors.OpPrereqError, ("Error: %s" % vgstatus)
514

    
515
    if not re.match("^[0-9a-z]{2}:[0-9a-z]{2}:[0-9a-z]{2}$",
516
                    self.op.mac_prefix):
517
      raise errors.OpPrereqError, ("Invalid mac prefix given '%s'" %
518
                                   self.op.mac_prefix)
519

    
520
    if self.op.hypervisor_type not in hypervisor.VALID_HTYPES:
521
      raise errors.OpPrereqError, ("Invalid hypervisor type given '%s'" %
522
                                   self.op.hypervisor_type)
523

    
524
    result = utils.RunCmd(["ip", "link", "show", "dev", self.op.master_netdev])
525
    if result.failed:
526
      raise errors.OpPrereqError, ("Invalid master netdev given (%s): '%s'" %
527
                                   (self.op.master_netdev, result.output))
528

    
529
  def Exec(self, feedback_fn):
530
    """Initialize the cluster.
531

532
    """
533
    clustername = self.clustername
534
    hostname = self.hostname
535

    
536
    # set up the simple store
537
    ss = ssconf.SimpleStore()
538
    ss.SetKey(ss.SS_HYPERVISOR, self.op.hypervisor_type)
539
    ss.SetKey(ss.SS_MASTER_NODE, hostname['hostname_full'])
540
    ss.SetKey(ss.SS_MASTER_IP, clustername['ip'])
541
    ss.SetKey(ss.SS_MASTER_NETDEV, self.op.master_netdev)
542
    ss.SetKey(ss.SS_CLUSTER_NAME, clustername['hostname'])
543

    
544
    # set up the inter-node password and certificate
545
    _InitGanetiServerSetup(ss)
546

    
547
    # start the master ip
548
    rpc.call_node_start_master(hostname['hostname_full'])
549

    
550
    # set up ssh config and /etc/hosts
551
    f = open('/etc/ssh/ssh_host_rsa_key.pub', 'r')
552
    try:
553
      sshline = f.read()
554
    finally:
555
      f.close()
556
    sshkey = sshline.split(" ")[1]
557

    
558
    _UpdateEtcHosts(hostname['hostname_full'],
559
                    hostname['ip'],
560
                    )
561

    
562
    _UpdateKnownHosts(hostname['hostname_full'],
563
                      hostname['ip'],
564
                      sshkey,
565
                      )
566

    
567
    _InitSSHSetup(hostname['hostname'])
568

    
569
    # init of cluster config file
570
    cfgw = config.ConfigWriter()
571
    cfgw.InitConfig(hostname['hostname'], hostname['ip'], self.secondary_ip,
572
                    sshkey, self.op.mac_prefix,
573
                    self.op.vg_name, self.op.def_bridge)
574

    
575

    
576
class LUDestroyCluster(NoHooksLU):
577
  """Logical unit for destroying the cluster.
578

579
  """
580
  _OP_REQP = []
581

    
582
  def CheckPrereq(self):
583
    """Check prerequisites.
584

585
    This checks whether the cluster is empty.
586

587
    Any errors are signalled by raising errors.OpPrereqError.
588

589
    """
590
    master = self.sstore.GetMasterNode()
591

    
592
    nodelist = self.cfg.GetNodeList()
593
    if len(nodelist) != 1 or nodelist[0] != master:
594
      raise errors.OpPrereqError, ("There are still %d node(s) in "
595
                                   "this cluster." % (len(nodelist) - 1))
596
    instancelist = self.cfg.GetInstanceList()
597
    if instancelist:
598
      raise errors.OpPrereqError, ("There are still %d instance(s) in "
599
                                   "this cluster." % len(instancelist))
600

    
601
  def Exec(self, feedback_fn):
602
    """Destroys the cluster.
603

604
    """
605
    utils.CreateBackup('/root/.ssh/id_dsa')
606
    utils.CreateBackup('/root/.ssh/id_dsa.pub')
607
    rpc.call_node_leave_cluster(self.sstore.GetMasterNode())
608

    
609

    
610
class LUVerifyCluster(NoHooksLU):
611
  """Verifies the cluster status.
612

613
  """
614
  _OP_REQP = []
615

    
616
  def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
617
                  remote_version, feedback_fn):
618
    """Run multiple tests against a node.
619

620
    Test list:
621
      - compares ganeti version
622
      - checks vg existance and size > 20G
623
      - checks config file checksum
624
      - checks ssh to other nodes
625

626
    Args:
627
      node: name of the node to check
628
      file_list: required list of files
629
      local_cksum: dictionary of local files and their checksums
630

631
    """
632
    # compares ganeti version
633
    local_version = constants.PROTOCOL_VERSION
634
    if not remote_version:
635
      feedback_fn(" - ERROR: connection to %s failed" % (node))
636
      return True
637

    
638
    if local_version != remote_version:
639
      feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
640
                      (local_version, node, remote_version))
641
      return True
642

    
643
    # checks vg existance and size > 20G
644

    
645
    bad = False
646
    if not vglist:
647
      feedback_fn("  - ERROR: unable to check volume groups on node %s." %
648
                      (node,))
649
      bad = True
650
    else:
651
      vgstatus = _HasValidVG(vglist, self.cfg.GetVGName())
652
      if vgstatus:
653
        feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
654
        bad = True
655

    
656
    # checks config file checksum
657
    # checks ssh to any
658

    
659
    if 'filelist' not in node_result:
660
      bad = True
661
      feedback_fn("  - ERROR: node hasn't returned file checksum data")
662
    else:
663
      remote_cksum = node_result['filelist']
664
      for file_name in file_list:
665
        if file_name not in remote_cksum:
666
          bad = True
667
          feedback_fn("  - ERROR: file '%s' missing" % file_name)
668
        elif remote_cksum[file_name] != local_cksum[file_name]:
669
          bad = True
670
          feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
671

    
672
    if 'nodelist' not in node_result:
673
      bad = True
674
      feedback_fn("  - ERROR: node hasn't returned node connectivity data")
675
    else:
676
      if node_result['nodelist']:
677
        bad = True
678
        for node in node_result['nodelist']:
679
          feedback_fn("  - ERROR: communication with node '%s': %s" %
680
                          (node, node_result['nodelist'][node]))
681
    hyp_result = node_result.get('hypervisor', None)
682
    if hyp_result is not None:
683
      feedback_fn("  - ERROR: hypervisor verify failure: '%s'" % hyp_result)
684
    return bad
685

    
686
  def _VerifyInstance(self, instance, node_vol_is, node_instance, feedback_fn):
687
    """Verify an instance.
688

689
    This function checks to see if the required block devices are
690
    available on the instance's node.
691

692
    """
693
    bad = False
694

    
695
    instancelist = self.cfg.GetInstanceList()
696
    if not instance in instancelist:
697
      feedback_fn("  - ERROR: instance %s not in instance list %s" %
698
                      (instance, instancelist))
699
      bad = True
700

    
701
    instanceconfig = self.cfg.GetInstanceInfo(instance)
702
    node_current = instanceconfig.primary_node
703

    
704
    node_vol_should = {}
705
    instanceconfig.MapLVsByNode(node_vol_should)
706

    
707
    for node in node_vol_should:
708
      for volume in node_vol_should[node]:
709
        if node not in node_vol_is or volume not in node_vol_is[node]:
710
          feedback_fn("  - ERROR: volume %s missing on node %s" %
711
                          (volume, node))
712
          bad = True
713

    
714
    if not instanceconfig.status == 'down':
715
      if not instance in node_instance[node_current]:
716
        feedback_fn("  - ERROR: instance %s not running on node %s" %
717
                        (instance, node_current))
718
        bad = True
719

    
720
    for node in node_instance:
721
      if (not node == node_current):
722
        if instance in node_instance[node]:
723
          feedback_fn("  - ERROR: instance %s should not run on node %s" %
724
                          (instance, node))
725
          bad = True
726

    
727
    return not bad
728

    
729
  def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
730
    """Verify if there are any unknown volumes in the cluster.
731

732
    The .os, .swap and backup volumes are ignored. All other volumes are
733
    reported as unknown.
734

735
    """
736
    bad = False
737

    
738
    for node in node_vol_is:
739
      for volume in node_vol_is[node]:
740
        if node not in node_vol_should or volume not in node_vol_should[node]:
741
          feedback_fn("  - ERROR: volume %s on node %s should not exist" %
742
                      (volume, node))
743
          bad = True
744
    return bad
745

    
746
  def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
747
    """Verify the list of running instances.
748

749
    This checks what instances are running but unknown to the cluster.
750

751
    """
752
    bad = False
753
    for node in node_instance:
754
      for runninginstance in node_instance[node]:
755
        if runninginstance not in instancelist:
756
          feedback_fn("  - ERROR: instance %s on node %s should not exist" %
757
                          (runninginstance, node))
758
          bad = True
759
    return bad
760

    
761
  def CheckPrereq(self):
762
    """Check prerequisites.
763

764
    This has no prerequisites.
765

766
    """
767
    pass
768

    
769
  def Exec(self, feedback_fn):
770
    """Verify integrity of cluster, performing various test on nodes.
771

772
    """
773
    bad = False
774
    feedback_fn("* Verifying global settings")
775
    self.cfg.VerifyConfig()
776

    
777
    master = self.sstore.GetMasterNode()
778
    vg_name = self.cfg.GetVGName()
779
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
780
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
781
    node_volume = {}
782
    node_instance = {}
783

    
784
    # FIXME: verify OS list
785
    # do local checksums
786
    file_names = list(self.sstore.GetFileList())
787
    file_names.append(constants.SSL_CERT_FILE)
788
    file_names.append(constants.CLUSTER_CONF_FILE)
789
    local_checksums = utils.FingerprintFiles(file_names)
790

    
791
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
792
    all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
793
    all_instanceinfo = rpc.call_instance_list(nodelist)
794
    all_vglist = rpc.call_vg_list(nodelist)
795
    node_verify_param = {
796
      'filelist': file_names,
797
      'nodelist': nodelist,
798
      'hypervisor': None,
799
      }
800
    all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
801
    all_rversion = rpc.call_version(nodelist)
802

    
803
    for node in nodelist:
804
      feedback_fn("* Verifying node %s" % node)
805
      result = self._VerifyNode(node, file_names, local_checksums,
806
                                all_vglist[node], all_nvinfo[node],
807
                                all_rversion[node], feedback_fn)
808
      bad = bad or result
809

    
810
      # node_volume
811
      volumeinfo = all_volumeinfo[node]
812

    
813
      if type(volumeinfo) != dict:
814
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
815
        bad = True
816
        continue
817

    
818
      node_volume[node] = volumeinfo
819

    
820
      # node_instance
821
      nodeinstance = all_instanceinfo[node]
822
      if type(nodeinstance) != list:
823
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
824
        bad = True
825
        continue
826

    
827
      node_instance[node] = nodeinstance
828

    
829
    node_vol_should = {}
830

    
831
    for instance in instancelist:
832
      feedback_fn("* Verifying instance %s" % instance)
833
      result =  self._VerifyInstance(instance, node_volume, node_instance,
834
                                     feedback_fn)
835
      bad = bad or result
836

    
837
      inst_config = self.cfg.GetInstanceInfo(instance)
838

    
839
      inst_config.MapLVsByNode(node_vol_should)
840

    
841
    feedback_fn("* Verifying orphan volumes")
842
    result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
843
                                       feedback_fn)
844
    bad = bad or result
845

    
846
    feedback_fn("* Verifying remaining instances")
847
    result = self._VerifyOrphanInstances(instancelist, node_instance,
848
                                         feedback_fn)
849
    bad = bad or result
850

    
851
    return int(bad)
852

    
853

    
854
def _WaitForSync(cfgw, instance, oneshot=False, unlock=False):
855
  """Sleep and poll for an instance's disk to sync.
856

857
  """
858
  if not instance.disks:
859
    return True
860

    
861
  if not oneshot:
862
    logger.ToStdout("Waiting for instance %s to sync disks." % instance.name)
863

    
864
  node = instance.primary_node
865

    
866
  for dev in instance.disks:
867
    cfgw.SetDiskID(dev, node)
868

    
869
  retries = 0
870
  while True:
871
    max_time = 0
872
    done = True
873
    cumul_degraded = False
874
    rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
875
    if not rstats:
876
      logger.ToStderr("Can't get any data from node %s" % node)
877
      retries += 1
878
      if retries >= 10:
879
        raise errors.RemoteError, ("Can't contact node %s for mirror data,"
880
                                   " aborting." % node)
881
      time.sleep(6)
882
      continue
883
    retries = 0
884
    for i in range(len(rstats)):
885
      mstat = rstats[i]
886
      if mstat is None:
887
        logger.ToStderr("Can't compute data for node %s/%s" %
888
                        (node, instance.disks[i].iv_name))
889
        continue
890
      perc_done, est_time, is_degraded = mstat
891
      cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
892
      if perc_done is not None:
893
        done = False
894
        if est_time is not None:
895
          rem_time = "%d estimated seconds remaining" % est_time
896
          max_time = est_time
897
        else:
898
          rem_time = "no time estimate"
899
        logger.ToStdout("- device %s: %5.2f%% done, %s" %
900
                        (instance.disks[i].iv_name, perc_done, rem_time))
901
    if done or oneshot:
902
      break
903

    
904
    if unlock:
905
      utils.Unlock('cmd')
906
    try:
907
      time.sleep(min(60, max_time))
908
    finally:
909
      if unlock:
910
        utils.Lock('cmd')
911

    
912
  if done:
913
    logger.ToStdout("Instance %s's disks are in sync." % instance.name)
914
  return not cumul_degraded
915

    
916

    
917
def _CheckDiskConsistency(cfgw, dev, node, on_primary):
918
  """Check that mirrors are not degraded.
919

920
  """
921
  cfgw.SetDiskID(dev, node)
922

    
923
  result = True
924
  if on_primary or dev.AssembleOnSecondary():
925
    rstats = rpc.call_blockdev_find(node, dev)
926
    if not rstats:
927
      logger.ToStderr("Can't get any data from node %s" % node)
928
      result = False
929
    else:
930
      result = result and (not rstats[5])
931
  if dev.children:
932
    for child in dev.children:
933
      result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
934

    
935
  return result
936

    
937

    
938
class LUDiagnoseOS(NoHooksLU):
939
  """Logical unit for OS diagnose/query.
940

941
  """
942
  _OP_REQP = []
943

    
944
  def CheckPrereq(self):
945
    """Check prerequisites.
946

947
    This always succeeds, since this is a pure query LU.
948

949
    """
950
    return
951

    
952
  def Exec(self, feedback_fn):
953
    """Compute the list of OSes.
954

955
    """
956
    node_list = self.cfg.GetNodeList()
957
    node_data = rpc.call_os_diagnose(node_list)
958
    if node_data == False:
959
      raise errors.OpExecError, "Can't gather the list of OSes"
960
    return node_data
961

    
962

    
963
class LURemoveNode(LogicalUnit):
964
  """Logical unit for removing a node.
965

966
  """
967
  HPATH = "node-remove"
968
  HTYPE = constants.HTYPE_NODE
969
  _OP_REQP = ["node_name"]
970

    
971
  def BuildHooksEnv(self):
972
    """Build hooks env.
973

974
    This doesn't run on the target node in the pre phase as a failed
975
    node would not allows itself to run.
976

977
    """
978
    all_nodes = self.cfg.GetNodeList()
979
    all_nodes.remove(self.op.node_name)
980
    return {"NODE_NAME": self.op.node_name}, all_nodes, all_nodes
981

    
982
  def CheckPrereq(self):
983
    """Check prerequisites.
984

985
    This checks:
986
     - the node exists in the configuration
987
     - it does not have primary or secondary instances
988
     - it's not the master
989

990
    Any errors are signalled by raising errors.OpPrereqError.
991

992
    """
993
    node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
994
    if node is None:
995
      logger.Error("Error: Node '%s' is unknown." % self.op.node_name)
996
      return 1
997

    
998
    instance_list = self.cfg.GetInstanceList()
999

    
1000
    masternode = self.sstore.GetMasterNode()
1001
    if node.name == masternode:
1002
      raise errors.OpPrereqError, ("Node is the master node,"
1003
                                   " you need to failover first.")
1004

    
1005
    for instance_name in instance_list:
1006
      instance = self.cfg.GetInstanceInfo(instance_name)
1007
      if node.name == instance.primary_node:
1008
        raise errors.OpPrereqError, ("Instance %s still running on the node,"
1009
                                     " please remove first." % instance_name)
1010
      if node.name in instance.secondary_nodes:
1011
        raise errors.OpPrereqError, ("Instance %s has node as a secondary,"
1012
                                     " please remove first." % instance_name)
1013
    self.op.node_name = node.name
1014
    self.node = node
1015

    
1016
  def Exec(self, feedback_fn):
1017
    """Removes the node from the cluster.
1018

1019
    """
1020
    node = self.node
1021
    logger.Info("stopping the node daemon and removing configs from node %s" %
1022
                node.name)
1023

    
1024
    rpc.call_node_leave_cluster(node.name)
1025

    
1026
    ssh.SSHCall(node.name, 'root', "%s stop" % constants.NODE_INITD_SCRIPT)
1027

    
1028
    logger.Info("Removing node %s from config" % node.name)
1029

    
1030
    self.cfg.RemoveNode(node.name)
1031

    
1032

    
1033
class LUQueryNodes(NoHooksLU):
1034
  """Logical unit for querying nodes.
1035

1036
  """
1037
  _OP_REQP = ["output_fields"]
1038

    
1039
  def CheckPrereq(self):
1040
    """Check prerequisites.
1041

1042
    This checks that the fields required are valid output fields.
1043

1044
    """
1045
    self.dynamic_fields = frozenset(["dtotal", "dfree",
1046
                                     "mtotal", "mnode", "mfree"])
1047

    
1048
    _CheckOutputFields(static=["name", "pinst", "sinst", "pip", "sip"],
1049
                       dynamic=self.dynamic_fields,
1050
                       selected=self.op.output_fields)
1051

    
1052

    
1053
  def Exec(self, feedback_fn):
1054
    """Computes the list of nodes and their attributes.
1055

1056
    """
1057
    nodenames = utils.NiceSort(self.cfg.GetNodeList())
1058
    nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1059

    
1060

    
1061
    # begin data gathering
1062

    
1063
    if self.dynamic_fields.intersection(self.op.output_fields):
1064
      live_data = {}
1065
      node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1066
      for name in nodenames:
1067
        nodeinfo = node_data.get(name, None)
1068
        if nodeinfo:
1069
          live_data[name] = {
1070
            "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1071
            "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1072
            "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1073
            "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1074
            "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1075
            }
1076
        else:
1077
          live_data[name] = {}
1078
    else:
1079
      live_data = dict.fromkeys(nodenames, {})
1080

    
1081
    node_to_primary = dict.fromkeys(nodenames, 0)
1082
    node_to_secondary = dict.fromkeys(nodenames, 0)
1083

    
1084
    if "pinst" in self.op.output_fields or "sinst" in self.op.output_fields:
1085
      instancelist = self.cfg.GetInstanceList()
1086

    
1087
      for instance in instancelist:
1088
        instanceinfo = self.cfg.GetInstanceInfo(instance)
1089
        node_to_primary[instanceinfo.primary_node] += 1
1090
        for secnode in instanceinfo.secondary_nodes:
1091
          node_to_secondary[secnode] += 1
1092

    
1093
    # end data gathering
1094

    
1095
    output = []
1096
    for node in nodelist:
1097
      node_output = []
1098
      for field in self.op.output_fields:
1099
        if field == "name":
1100
          val = node.name
1101
        elif field == "pinst":
1102
          val = node_to_primary[node.name]
1103
        elif field == "sinst":
1104
          val = node_to_secondary[node.name]
1105
        elif field == "pip":
1106
          val = node.primary_ip
1107
        elif field == "sip":
1108
          val = node.secondary_ip
1109
        elif field in self.dynamic_fields:
1110
          val = live_data[node.name].get(field, "?")
1111
        else:
1112
          raise errors.ParameterError, field
1113
        val = str(val)
1114
        node_output.append(val)
1115
      output.append(node_output)
1116

    
1117
    return output
1118

    
1119

    
1120
class LUQueryNodeVolumes(NoHooksLU):
1121
  """Logical unit for getting volumes on node(s).
1122

1123
  """
1124
  _OP_REQP = ["nodes", "output_fields"]
1125

    
1126
  def CheckPrereq(self):
1127
    """Check prerequisites.
1128

1129
    This checks that the fields required are valid output fields.
1130

1131
    """
1132
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1133

    
1134
    _CheckOutputFields(static=["node"],
1135
                       dynamic=["phys", "vg", "name", "size", "instance"],
1136
                       selected=self.op.output_fields)
1137

    
1138

    
1139
  def Exec(self, feedback_fn):
1140
    """Computes the list of nodes and their attributes.
1141

1142
    """
1143
    nodenames = utils.NiceSort([node.name for node in self.nodes])
1144
    volumes = rpc.call_node_volumes(nodenames)
1145

    
1146
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
1147
             in self.cfg.GetInstanceList()]
1148

    
1149
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1150

    
1151
    output = []
1152
    for node in nodenames:
1153
      node_vols = volumes[node][:]
1154
      node_vols.sort(key=lambda vol: vol['dev'])
1155

    
1156
      for vol in node_vols:
1157
        node_output = []
1158
        for field in self.op.output_fields:
1159
          if field == "node":
1160
            val = node
1161
          elif field == "phys":
1162
            val = vol['dev']
1163
          elif field == "vg":
1164
            val = vol['vg']
1165
          elif field == "name":
1166
            val = vol['name']
1167
          elif field == "size":
1168
            val = int(float(vol['size']))
1169
          elif field == "instance":
1170
            for inst in ilist:
1171
              if node not in lv_by_node[inst]:
1172
                continue
1173
              if vol['name'] in lv_by_node[inst][node]:
1174
                val = inst.name
1175
                break
1176
            else:
1177
              val = '-'
1178
          else:
1179
            raise errors.ParameterError, field
1180
          node_output.append(str(val))
1181

    
1182
        output.append(node_output)
1183

    
1184
    return output
1185

    
1186

    
1187
class LUAddNode(LogicalUnit):
1188
  """Logical unit for adding node to the cluster.
1189

1190
  """
1191
  HPATH = "node-add"
1192
  HTYPE = constants.HTYPE_NODE
1193
  _OP_REQP = ["node_name"]
1194

    
1195
  def BuildHooksEnv(self):
1196
    """Build hooks env.
1197

1198
    This will run on all nodes before, and on all nodes + the new node after.
1199

1200
    """
1201
    env = {
1202
      "NODE_NAME": self.op.node_name,
1203
      "NODE_PIP": self.op.primary_ip,
1204
      "NODE_SIP": self.op.secondary_ip,
1205
      }
1206
    nodes_0 = self.cfg.GetNodeList()
1207
    nodes_1 = nodes_0 + [self.op.node_name, ]
1208
    return env, nodes_0, nodes_1
1209

    
1210
  def CheckPrereq(self):
1211
    """Check prerequisites.
1212

1213
    This checks:
1214
     - the new node is not already in the config
1215
     - it is resolvable
1216
     - its parameters (single/dual homed) matches the cluster
1217

1218
    Any errors are signalled by raising errors.OpPrereqError.
1219

1220
    """
1221
    node_name = self.op.node_name
1222
    cfg = self.cfg
1223

    
1224
    dns_data = utils.LookupHostname(node_name)
1225
    if not dns_data:
1226
      raise errors.OpPrereqError, ("Node %s is not resolvable" % node_name)
1227

    
1228
    node = dns_data['hostname']
1229
    primary_ip = self.op.primary_ip = dns_data['ip']
1230
    secondary_ip = getattr(self.op, "secondary_ip", None)
1231
    if secondary_ip is None:
1232
      secondary_ip = primary_ip
1233
    if not utils.IsValidIP(secondary_ip):
1234
      raise errors.OpPrereqError, ("Invalid secondary IP given")
1235
    self.op.secondary_ip = secondary_ip
1236
    node_list = cfg.GetNodeList()
1237
    if node in node_list:
1238
      raise errors.OpPrereqError, ("Node %s is already in the configuration"
1239
                                   % node)
1240

    
1241
    for existing_node_name in node_list:
1242
      existing_node = cfg.GetNodeInfo(existing_node_name)
1243
      if (existing_node.primary_ip == primary_ip or
1244
          existing_node.secondary_ip == primary_ip or
1245
          existing_node.primary_ip == secondary_ip or
1246
          existing_node.secondary_ip == secondary_ip):
1247
        raise errors.OpPrereqError, ("New node ip address(es) conflict with"
1248
                                     " existing node %s" % existing_node.name)
1249

    
1250
    # check that the type of the node (single versus dual homed) is the
1251
    # same as for the master
1252
    myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1253
    master_singlehomed = myself.secondary_ip == myself.primary_ip
1254
    newbie_singlehomed = secondary_ip == primary_ip
1255
    if master_singlehomed != newbie_singlehomed:
1256
      if master_singlehomed:
1257
        raise errors.OpPrereqError, ("The master has no private ip but the"
1258
                                     " new node has one")
1259
      else:
1260
        raise errors.OpPrereqError ("The master has a private ip but the"
1261
                                    " new node doesn't have one")
1262

    
1263
    # checks reachablity
1264
    command = ["fping", "-q", primary_ip]
1265
    result = utils.RunCmd(command)
1266
    if result.failed:
1267
      raise errors.OpPrereqError, ("Node not reachable by ping")
1268

    
1269
    if not newbie_singlehomed:
1270
      # check reachability from my secondary ip to newbie's secondary ip
1271
      command = ["fping", "-S%s" % myself.secondary_ip, "-q", secondary_ip]
1272
      result = utils.RunCmd(command)
1273
      if result.failed:
1274
        raise errors.OpPrereqError, ("Node secondary ip not reachable by ping")
1275

    
1276
    self.new_node = objects.Node(name=node,
1277
                                 primary_ip=primary_ip,
1278
                                 secondary_ip=secondary_ip)
1279

    
1280
  def Exec(self, feedback_fn):
1281
    """Adds the new node to the cluster.
1282

1283
    """
1284
    new_node = self.new_node
1285
    node = new_node.name
1286

    
1287
    # set up inter-node password and certificate and restarts the node daemon
1288
    gntpass = self.sstore.GetNodeDaemonPassword()
1289
    if not re.match('^[a-zA-Z0-9.]{1,64}$', gntpass):
1290
      raise errors.OpExecError, ("ganeti password corruption detected")
1291
    f = open(constants.SSL_CERT_FILE)
1292
    try:
1293
      gntpem = f.read(8192)
1294
    finally:
1295
      f.close()
1296
    # in the base64 pem encoding, neither '!' nor '.' are valid chars,
1297
    # so we use this to detect an invalid certificate; as long as the
1298
    # cert doesn't contain this, the here-document will be correctly
1299
    # parsed by the shell sequence below
1300
    if re.search('^!EOF\.', gntpem, re.MULTILINE):
1301
      raise errors.OpExecError, ("invalid PEM encoding in the SSL certificate")
1302
    if not gntpem.endswith("\n"):
1303
      raise errors.OpExecError, ("PEM must end with newline")
1304
    logger.Info("copy cluster pass to %s and starting the node daemon" % node)
1305

    
1306
    # remove first the root's known_hosts file
1307
    utils.RemoveFile("/root/.ssh/known_hosts")
1308
    # and then connect with ssh to set password and start ganeti-noded
1309
    # note that all the below variables are sanitized at this point,
1310
    # either by being constants or by the checks above
1311
    ss = self.sstore
1312
    mycommand = ("umask 077 && "
1313
                 "echo '%s' > '%s' && "
1314
                 "cat > '%s' << '!EOF.' && \n"
1315
                 "%s!EOF.\n%s restart" %
1316
                 (gntpass, ss.KeyToFilename(ss.SS_NODED_PASS),
1317
                  constants.SSL_CERT_FILE, gntpem,
1318
                  constants.NODE_INITD_SCRIPT))
1319

    
1320
    result = ssh.SSHCall(node, 'root', mycommand, batch=False, ask_key=True)
1321
    if result.failed:
1322
      raise errors.OpExecError, ("Remote command on node %s, error: %s,"
1323
                                 " output: %s" %
1324
                                 (node, result.fail_reason, result.output))
1325

    
1326
    # check connectivity
1327
    time.sleep(4)
1328

    
1329
    result = rpc.call_version([node])[node]
1330
    if result:
1331
      if constants.PROTOCOL_VERSION == result:
1332
        logger.Info("communication to node %s fine, sw version %s match" %
1333
                    (node, result))
1334
      else:
1335
        raise errors.OpExecError, ("Version mismatch master version %s,"
1336
                                   " node version %s" %
1337
                                   (constants.PROTOCOL_VERSION, result))
1338
    else:
1339
      raise errors.OpExecError, ("Cannot get version from the new node")
1340

    
1341
    # setup ssh on node
1342
    logger.Info("copy ssh key to node %s" % node)
1343
    keyarray = []
1344
    keyfiles = ["/etc/ssh/ssh_host_dsa_key", "/etc/ssh/ssh_host_dsa_key.pub",
1345
                "/etc/ssh/ssh_host_rsa_key", "/etc/ssh/ssh_host_rsa_key.pub",
1346
                "/root/.ssh/id_dsa", "/root/.ssh/id_dsa.pub"]
1347

    
1348
    for i in keyfiles:
1349
      f = open(i, 'r')
1350
      try:
1351
        keyarray.append(f.read())
1352
      finally:
1353
        f.close()
1354

    
1355
    result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1356
                               keyarray[3], keyarray[4], keyarray[5])
1357

    
1358
    if not result:
1359
      raise errors.OpExecError, ("Cannot transfer ssh keys to the new node")
1360

    
1361
    # Add node to our /etc/hosts, and add key to known_hosts
1362
    _UpdateEtcHosts(new_node.name, new_node.primary_ip)
1363
    _UpdateKnownHosts(new_node.name, new_node.primary_ip,
1364
                      self.cfg.GetHostKey())
1365

    
1366
    if new_node.secondary_ip != new_node.primary_ip:
1367
      result = ssh.SSHCall(node, "root",
1368
                           "fping -S 127.0.0.1 -q %s" % new_node.secondary_ip)
1369
      if result.failed:
1370
        raise errors.OpExecError, ("Node claims it doesn't have the"
1371
                                   " secondary ip you gave (%s).\n"
1372
                                   "Please fix and re-run this command." %
1373
                                   new_node.secondary_ip)
1374

    
1375
    # Distribute updated /etc/hosts and known_hosts to all nodes,
1376
    # including the node just added
1377
    myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1378
    dist_nodes = self.cfg.GetNodeList() + [node]
1379
    if myself.name in dist_nodes:
1380
      dist_nodes.remove(myself.name)
1381

    
1382
    logger.Debug("Copying hosts and known_hosts to all nodes")
1383
    for fname in ("/etc/hosts", "/etc/ssh/ssh_known_hosts"):
1384
      result = rpc.call_upload_file(dist_nodes, fname)
1385
      for to_node in dist_nodes:
1386
        if not result[to_node]:
1387
          logger.Error("copy of file %s to node %s failed" %
1388
                       (fname, to_node))
1389

    
1390
    to_copy = ss.GetFileList()
1391
    for fname in to_copy:
1392
      if not ssh.CopyFileToNode(node, fname):
1393
        logger.Error("could not copy file %s to node %s" % (fname, node))
1394

    
1395
    logger.Info("adding node %s to cluster.conf" % node)
1396
    self.cfg.AddNode(new_node)
1397

    
1398

    
1399
class LUMasterFailover(LogicalUnit):
1400
  """Failover the master node to the current node.
1401

1402
  This is a special LU in that it must run on a non-master node.
1403

1404
  """
1405
  HPATH = "master-failover"
1406
  HTYPE = constants.HTYPE_CLUSTER
1407
  REQ_MASTER = False
1408
  _OP_REQP = []
1409

    
1410
  def BuildHooksEnv(self):
1411
    """Build hooks env.
1412

1413
    This will run on the new master only in the pre phase, and on all
1414
    the nodes in the post phase.
1415

1416
    """
1417
    env = {
1418
      "NEW_MASTER": self.new_master,
1419
      "OLD_MASTER": self.old_master,
1420
      }
1421
    return env, [self.new_master], self.cfg.GetNodeList()
1422

    
1423
  def CheckPrereq(self):
1424
    """Check prerequisites.
1425

1426
    This checks that we are not already the master.
1427

1428
    """
1429
    self.new_master = socket.gethostname()
1430

    
1431
    self.old_master = self.sstore.GetMasterNode()
1432

    
1433
    if self.old_master == self.new_master:
1434
      raise errors.OpPrereqError, ("This commands must be run on the node"
1435
                                   " where you want the new master to be.\n"
1436
                                   "%s is already the master" %
1437
                                   self.old_master)
1438

    
1439
  def Exec(self, feedback_fn):
1440
    """Failover the master node.
1441

1442
    This command, when run on a non-master node, will cause the current
1443
    master to cease being master, and the non-master to become new
1444
    master.
1445

1446
    """
1447
    #TODO: do not rely on gethostname returning the FQDN
1448
    logger.Info("setting master to %s, old master: %s" %
1449
                (self.new_master, self.old_master))
1450

    
1451
    if not rpc.call_node_stop_master(self.old_master):
1452
      logger.Error("could disable the master role on the old master"
1453
                   " %s, please disable manually" % self.old_master)
1454

    
1455
    ss = self.sstore
1456
    ss.SetKey(ss.SS_MASTER_NODE, self.new_master)
1457
    if not rpc.call_upload_file(self.cfg.GetNodeList(),
1458
                                ss.KeyToFilename(ss.SS_MASTER_NODE)):
1459
      logger.Error("could not distribute the new simple store master file"
1460
                   " to the other nodes, please check.")
1461

    
1462
    if not rpc.call_node_start_master(self.new_master):
1463
      logger.Error("could not start the master role on the new master"
1464
                   " %s, please check" % self.new_master)
1465
      feedback_fn("Error in activating the master IP on the new master,\n"
1466
                  "please fix manually.")
1467

    
1468

    
1469

    
1470
class LUQueryClusterInfo(NoHooksLU):
1471
  """Query cluster configuration.
1472

1473
  """
1474
  _OP_REQP = []
1475
  REQ_MASTER = False
1476

    
1477
  def CheckPrereq(self):
1478
    """No prerequsites needed for this LU.
1479

1480
    """
1481
    pass
1482

    
1483
  def Exec(self, feedback_fn):
1484
    """Return cluster config.
1485

1486
    """
1487
    result = {
1488
      "name": self.sstore.GetClusterName(),
1489
      "software_version": constants.RELEASE_VERSION,
1490
      "protocol_version": constants.PROTOCOL_VERSION,
1491
      "config_version": constants.CONFIG_VERSION,
1492
      "os_api_version": constants.OS_API_VERSION,
1493
      "export_version": constants.EXPORT_VERSION,
1494
      "master": self.sstore.GetMasterNode(),
1495
      "architecture": (platform.architecture()[0], platform.machine()),
1496
      }
1497

    
1498
    return result
1499

    
1500

    
1501
class LUClusterCopyFile(NoHooksLU):
1502
  """Copy file to cluster.
1503

1504
  """
1505
  _OP_REQP = ["nodes", "filename"]
1506

    
1507
  def CheckPrereq(self):
1508
    """Check prerequisites.
1509

1510
    It should check that the named file exists and that the given list
1511
    of nodes is valid.
1512

1513
    """
1514
    if not os.path.exists(self.op.filename):
1515
      raise errors.OpPrereqError("No such filename '%s'" % self.op.filename)
1516

    
1517
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1518

    
1519
  def Exec(self, feedback_fn):
1520
    """Copy a file from master to some nodes.
1521

1522
    Args:
1523
      opts - class with options as members
1524
      args - list containing a single element, the file name
1525
    Opts used:
1526
      nodes - list containing the name of target nodes; if empty, all nodes
1527

1528
    """
1529
    filename = self.op.filename
1530

    
1531
    myname = socket.gethostname()
1532

    
1533
    for node in self.nodes:
1534
      if node == myname:
1535
        continue
1536
      if not ssh.CopyFileToNode(node, filename):
1537
        logger.Error("Copy of file %s to node %s failed" % (filename, node))
1538

    
1539

    
1540
class LUDumpClusterConfig(NoHooksLU):
1541
  """Return a text-representation of the cluster-config.
1542

1543
  """
1544
  _OP_REQP = []
1545

    
1546
  def CheckPrereq(self):
1547
    """No prerequisites.
1548

1549
    """
1550
    pass
1551

    
1552
  def Exec(self, feedback_fn):
1553
    """Dump a representation of the cluster config to the standard output.
1554

1555
    """
1556
    return self.cfg.DumpConfig()
1557

    
1558

    
1559
class LURunClusterCommand(NoHooksLU):
1560
  """Run a command on some nodes.
1561

1562
  """
1563
  _OP_REQP = ["command", "nodes"]
1564

    
1565
  def CheckPrereq(self):
1566
    """Check prerequisites.
1567

1568
    It checks that the given list of nodes is valid.
1569

1570
    """
1571
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1572

    
1573
  def Exec(self, feedback_fn):
1574
    """Run a command on some nodes.
1575

1576
    """
1577
    data = []
1578
    for node in self.nodes:
1579
      result = utils.RunCmd(["ssh", node.name, self.op.command])
1580
      data.append((node.name, result.cmd, result.output, result.exit_code))
1581

    
1582
    return data
1583

    
1584

    
1585
class LUActivateInstanceDisks(NoHooksLU):
1586
  """Bring up an instance's disks.
1587

1588
  """
1589
  _OP_REQP = ["instance_name"]
1590

    
1591
  def CheckPrereq(self):
1592
    """Check prerequisites.
1593

1594
    This checks that the instance is in the cluster.
1595

1596
    """
1597
    instance = self.cfg.GetInstanceInfo(
1598
      self.cfg.ExpandInstanceName(self.op.instance_name))
1599
    if instance is None:
1600
      raise errors.OpPrereqError, ("Instance '%s' not known" %
1601
                                   self.op.instance_name)
1602
    self.instance = instance
1603

    
1604

    
1605
  def Exec(self, feedback_fn):
1606
    """Activate the disks.
1607

1608
    """
1609
    disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
1610
    if not disks_ok:
1611
      raise errors.OpExecError, ("Cannot activate block devices")
1612

    
1613
    return disks_info
1614

    
1615

    
1616
def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
1617
  """Prepare the block devices for an instance.
1618

1619
  This sets up the block devices on all nodes.
1620

1621
  Args:
1622
    instance: a ganeti.objects.Instance object
1623
    ignore_secondaries: if true, errors on secondary nodes won't result
1624
                        in an error return from the function
1625

1626
  Returns:
1627
    false if the operation failed
1628
    list of (host, instance_visible_name, node_visible_name) if the operation
1629
         suceeded with the mapping from node devices to instance devices
1630
  """
1631
  device_info = []
1632
  disks_ok = True
1633
  for inst_disk in instance.disks:
1634
    master_result = None
1635
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1636
      cfg.SetDiskID(node_disk, node)
1637
      is_primary = node == instance.primary_node
1638
      result = rpc.call_blockdev_assemble(node, node_disk, is_primary)
1639
      if not result:
1640
        logger.Error("could not prepare block device %s on node %s (is_pri"
1641
                     "mary=%s)" % (inst_disk.iv_name, node, is_primary))
1642
        if is_primary or not ignore_secondaries:
1643
          disks_ok = False
1644
      if is_primary:
1645
        master_result = result
1646
    device_info.append((instance.primary_node, inst_disk.iv_name,
1647
                        master_result))
1648

    
1649
  return disks_ok, device_info
1650

    
1651

    
1652
def _StartInstanceDisks(cfg, instance, force):
1653
  disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
1654
                                           ignore_secondaries=force)
1655
  if not disks_ok:
1656
    _ShutdownInstanceDisks(instance, cfg)
1657
    if force is not None and not force:
1658
      logger.Error("If the message above refers to a secondary node,"
1659
                   " you can retry the operation using '--force'.")
1660
    raise errors.OpExecError, ("Disk consistency error")
1661

    
1662

    
1663
class LUDeactivateInstanceDisks(NoHooksLU):
1664
  """Shutdown an instance's disks.
1665

1666
  """
1667
  _OP_REQP = ["instance_name"]
1668

    
1669
  def CheckPrereq(self):
1670
    """Check prerequisites.
1671

1672
    This checks that the instance is in the cluster.
1673

1674
    """
1675
    instance = self.cfg.GetInstanceInfo(
1676
      self.cfg.ExpandInstanceName(self.op.instance_name))
1677
    if instance is None:
1678
      raise errors.OpPrereqError, ("Instance '%s' not known" %
1679
                                   self.op.instance_name)
1680
    self.instance = instance
1681

    
1682
  def Exec(self, feedback_fn):
1683
    """Deactivate the disks
1684

1685
    """
1686
    instance = self.instance
1687
    ins_l = rpc.call_instance_list([instance.primary_node])
1688
    ins_l = ins_l[instance.primary_node]
1689
    if not type(ins_l) is list:
1690
      raise errors.OpExecError, ("Can't contact node '%s'" %
1691
                                 instance.primary_node)
1692

    
1693
    if self.instance.name in ins_l:
1694
      raise errors.OpExecError, ("Instance is running, can't shutdown"
1695
                                 " block devices.")
1696

    
1697
    _ShutdownInstanceDisks(instance, self.cfg)
1698

    
1699

    
1700
def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
1701
  """Shutdown block devices of an instance.
1702

1703
  This does the shutdown on all nodes of the instance.
1704

1705
  If the ignore_primary is false, errors on the primary node are
1706
  ignored.
1707

1708
  """
1709
  result = True
1710
  for disk in instance.disks:
1711
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
1712
      cfg.SetDiskID(top_disk, node)
1713
      if not rpc.call_blockdev_shutdown(node, top_disk):
1714
        logger.Error("could not shutdown block device %s on node %s" %
1715
                     (disk.iv_name, node))
1716
        if not ignore_primary or node != instance.primary_node:
1717
          result = False
1718
  return result
1719

    
1720

    
1721
class LUStartupInstance(LogicalUnit):
1722
  """Starts an instance.
1723

1724
  """
1725
  HPATH = "instance-start"
1726
  HTYPE = constants.HTYPE_INSTANCE
1727
  _OP_REQP = ["instance_name", "force"]
1728

    
1729
  def BuildHooksEnv(self):
1730
    """Build hooks env.
1731

1732
    This runs on master, primary and secondary nodes of the instance.
1733

1734
    """
1735
    env = {
1736
      "INSTANCE_NAME": self.op.instance_name,
1737
      "INSTANCE_PRIMARY": self.instance.primary_node,
1738
      "INSTANCE_SECONDARIES": " ".join(self.instance.secondary_nodes),
1739
      "FORCE": self.op.force,
1740
      }
1741
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
1742
          list(self.instance.secondary_nodes))
1743
    return env, nl, nl
1744

    
1745
  def CheckPrereq(self):
1746
    """Check prerequisites.
1747

1748
    This checks that the instance is in the cluster.
1749

1750
    """
1751
    instance = self.cfg.GetInstanceInfo(
1752
      self.cfg.ExpandInstanceName(self.op.instance_name))
1753
    if instance is None:
1754
      raise errors.OpPrereqError, ("Instance '%s' not known" %
1755
                                   self.op.instance_name)
1756

    
1757
    # check bridges existance
1758
    brlist = [nic.bridge for nic in instance.nics]
1759
    if not rpc.call_bridges_exist(instance.primary_node, brlist):
1760
      raise errors.OpPrereqError, ("one or more target bridges %s does not"
1761
                                   " exist on destination node '%s'" %
1762
                                   (brlist, instance.primary_node))
1763

    
1764
    self.instance = instance
1765
    self.op.instance_name = instance.name
1766

    
1767
  def Exec(self, feedback_fn):
1768
    """Start the instance.
1769

1770
    """
1771
    instance = self.instance
1772
    force = self.op.force
1773
    extra_args = getattr(self.op, "extra_args", "")
1774

    
1775
    node_current = instance.primary_node
1776

    
1777
    nodeinfo = rpc.call_node_info([node_current], self.cfg.GetVGName())
1778
    if not nodeinfo:
1779
      raise errors.OpExecError, ("Could not contact node %s for infos" %
1780
                                 (node_current))
1781

    
1782
    freememory = nodeinfo[node_current]['memory_free']
1783
    memory = instance.memory
1784
    if memory > freememory:
1785
      raise errors.OpExecError, ("Not enough memory to start instance"
1786
                                 " %s on node %s"
1787
                                 " needed %s MiB, available %s MiB" %
1788
                                 (instance.name, node_current, memory,
1789
                                  freememory))
1790

    
1791
    _StartInstanceDisks(self.cfg, instance, force)
1792

    
1793
    if not rpc.call_instance_start(node_current, instance, extra_args):
1794
      _ShutdownInstanceDisks(instance, self.cfg)
1795
      raise errors.OpExecError, ("Could not start instance")
1796

    
1797
    self.cfg.MarkInstanceUp(instance.name)
1798

    
1799

    
1800
class LUShutdownInstance(LogicalUnit):
1801
  """Shutdown an instance.
1802

1803
  """
1804
  HPATH = "instance-stop"
1805
  HTYPE = constants.HTYPE_INSTANCE
1806
  _OP_REQP = ["instance_name"]
1807

    
1808
  def BuildHooksEnv(self):
1809
    """Build hooks env.
1810

1811
    This runs on master, primary and secondary nodes of the instance.
1812

1813
    """
1814
    env = {
1815
      "INSTANCE_NAME": self.op.instance_name,
1816
      "INSTANCE_PRIMARY": self.instance.primary_node,
1817
      "INSTANCE_SECONDARIES": " ".join(self.instance.secondary_nodes),
1818
      }
1819
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
1820
          list(self.instance.secondary_nodes))
1821
    return env, nl, nl
1822

    
1823
  def CheckPrereq(self):
1824
    """Check prerequisites.
1825

1826
    This checks that the instance is in the cluster.
1827

1828
    """
1829
    instance = self.cfg.GetInstanceInfo(
1830
      self.cfg.ExpandInstanceName(self.op.instance_name))
1831
    if instance is None:
1832
      raise errors.OpPrereqError, ("Instance '%s' not known" %
1833
                                   self.op.instance_name)
1834
    self.instance = instance
1835

    
1836
  def Exec(self, feedback_fn):
1837
    """Shutdown the instance.
1838

1839
    """
1840
    instance = self.instance
1841
    node_current = instance.primary_node
1842
    if not rpc.call_instance_shutdown(node_current, instance):
1843
      logger.Error("could not shutdown instance")
1844

    
1845
    self.cfg.MarkInstanceDown(instance.name)
1846
    _ShutdownInstanceDisks(instance, self.cfg)
1847

    
1848

    
1849
class LUReinstallInstance(LogicalUnit):
1850
  """Reinstall an instance.
1851

1852
  """
1853
  HPATH = "instance-reinstall"
1854
  HTYPE = constants.HTYPE_INSTANCE
1855
  _OP_REQP = ["instance_name"]
1856

    
1857
  def BuildHooksEnv(self):
1858
    """Build hooks env.
1859

1860
    This runs on master, primary and secondary nodes of the instance.
1861

1862
    """
1863
    env = {
1864
      "INSTANCE_NAME": self.op.instance_name,
1865
      "INSTANCE_PRIMARY": self.instance.primary_node,
1866
      "INSTANCE_SECONDARIES": " ".join(self.instance.secondary_nodes),
1867
      }
1868
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
1869
          list(self.instance.secondary_nodes))
1870
    return env, nl, nl
1871

    
1872
  def CheckPrereq(self):
1873
    """Check prerequisites.
1874

1875
    This checks that the instance is in the cluster and is not running.
1876

1877
    """
1878
    instance = self.cfg.GetInstanceInfo(
1879
      self.cfg.ExpandInstanceName(self.op.instance_name))
1880
    if instance is None:
1881
      raise errors.OpPrereqError, ("Instance '%s' not known" %
1882
                                   self.op.instance_name)
1883
    if instance.disk_template == constants.DT_DISKLESS:
1884
      raise errors.OpPrereqError, ("Instance '%s' has no disks" %
1885
                                   self.op.instance_name)
1886
    if instance.status != "down":
1887
      raise errors.OpPrereqError, ("Instance '%s' is marked to be up" %
1888
                                   self.op.instance_name)
1889
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
1890
    if remote_info:
1891
      raise errors.OpPrereqError, ("Instance '%s' is running on the node %s" %
1892
                                   (self.op.instance_name,
1893
                                    instance.primary_node))
1894

    
1895
    self.op.os_type = getattr(self.op, "os_type", None)
1896
    if self.op.os_type is not None:
1897
      # OS verification
1898
      pnode = self.cfg.GetNodeInfo(
1899
        self.cfg.ExpandNodeName(instance.primary_node))
1900
      if pnode is None:
1901
        raise errors.OpPrereqError, ("Primary node '%s' is unknown" %
1902
                                     self.op.pnode)
1903
      os_obj = rpc.call_os_get([pnode.name], self.op.os_type)[pnode.name]
1904
      if not isinstance(os_obj, objects.OS):
1905
        raise errors.OpPrereqError, ("OS '%s' not in supported OS list for"
1906
                                     " primary node"  % self.op.os_type)
1907

    
1908
    self.instance = instance
1909

    
1910
  def Exec(self, feedback_fn):
1911
    """Reinstall the instance.
1912

1913
    """
1914
    inst = self.instance
1915

    
1916
    if self.op.os_type is not None:
1917
      feedback_fn("Changing OS to '%s'..." % self.op.os_type)
1918
      inst.os = self.op.os_type
1919
      self.cfg.AddInstance(inst)
1920

    
1921
    _StartInstanceDisks(self.cfg, inst, None)
1922
    try:
1923
      feedback_fn("Running the instance OS create scripts...")
1924
      if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
1925
        raise errors.OpExecError, ("Could not install OS for instance %s "
1926
                                   "on node %s" %
1927
                                   (inst.name, inst.primary_node))
1928
    finally:
1929
      _ShutdownInstanceDisks(inst, self.cfg)
1930

    
1931

    
1932
class LURemoveInstance(LogicalUnit):
1933
  """Remove an instance.
1934

1935
  """
1936
  HPATH = "instance-remove"
1937
  HTYPE = constants.HTYPE_INSTANCE
1938
  _OP_REQP = ["instance_name"]
1939

    
1940
  def BuildHooksEnv(self):
1941
    """Build hooks env.
1942

1943
    This runs on master, primary and secondary nodes of the instance.
1944

1945
    """
1946
    env = {
1947
      "INSTANCE_NAME": self.op.instance_name,
1948
      "INSTANCE_PRIMARY": self.instance.primary_node,
1949
      "INSTANCE_SECONDARIES": " ".join(self.instance.secondary_nodes),
1950
      }
1951
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
1952
          list(self.instance.secondary_nodes))
1953
    return env, nl, nl
1954

    
1955
  def CheckPrereq(self):
1956
    """Check prerequisites.
1957

1958
    This checks that the instance is in the cluster.
1959

1960
    """
1961
    instance = self.cfg.GetInstanceInfo(
1962
      self.cfg.ExpandInstanceName(self.op.instance_name))
1963
    if instance is None:
1964
      raise errors.OpPrereqError, ("Instance '%s' not known" %
1965
                                   self.op.instance_name)
1966
    self.instance = instance
1967

    
1968
  def Exec(self, feedback_fn):
1969
    """Remove the instance.
1970

1971
    """
1972
    instance = self.instance
1973
    logger.Info("shutting down instance %s on node %s" %
1974
                (instance.name, instance.primary_node))
1975

    
1976
    if not rpc.call_instance_shutdown(instance.primary_node, instance):
1977
      raise errors.OpExecError, ("Could not shutdown instance %s on node %s" %
1978
                                 (instance.name, instance.primary_node))
1979

    
1980
    logger.Info("removing block devices for instance %s" % instance.name)
1981

    
1982
    _RemoveDisks(instance, self.cfg)
1983

    
1984
    logger.Info("removing instance %s out of cluster config" % instance.name)
1985

    
1986
    self.cfg.RemoveInstance(instance.name)
1987

    
1988

    
1989
class LUQueryInstances(NoHooksLU):
1990
  """Logical unit for querying instances.
1991

1992
  """
1993
  _OP_REQP = ["output_fields"]
1994

    
1995
  def CheckPrereq(self):
1996
    """Check prerequisites.
1997

1998
    This checks that the fields required are valid output fields.
1999

2000
    """
2001
    self.dynamic_fields = frozenset(["oper_state", "oper_ram"])
2002
    _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2003
                               "admin_state", "admin_ram",
2004
                               "disk_template", "ip", "mac", "bridge"],
2005
                       dynamic=self.dynamic_fields,
2006
                       selected=self.op.output_fields)
2007

    
2008
  def Exec(self, feedback_fn):
2009
    """Computes the list of nodes and their attributes.
2010

2011
    """
2012
    instance_names = utils.NiceSort(self.cfg.GetInstanceList())
2013
    instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2014
                     in instance_names]
2015

    
2016
    # begin data gathering
2017

    
2018
    nodes = frozenset([inst.primary_node for inst in instance_list])
2019

    
2020
    bad_nodes = []
2021
    if self.dynamic_fields.intersection(self.op.output_fields):
2022
      live_data = {}
2023
      node_data = rpc.call_all_instances_info(nodes)
2024
      for name in nodes:
2025
        result = node_data[name]
2026
        if result:
2027
          live_data.update(result)
2028
        elif result == False:
2029
          bad_nodes.append(name)
2030
        # else no instance is alive
2031
    else:
2032
      live_data = dict([(name, {}) for name in instance_names])
2033

    
2034
    # end data gathering
2035

    
2036
    output = []
2037
    for instance in instance_list:
2038
      iout = []
2039
      for field in self.op.output_fields:
2040
        if field == "name":
2041
          val = instance.name
2042
        elif field == "os":
2043
          val = instance.os
2044
        elif field == "pnode":
2045
          val = instance.primary_node
2046
        elif field == "snodes":
2047
          val = ",".join(instance.secondary_nodes) or "-"
2048
        elif field == "admin_state":
2049
          if instance.status == "down":
2050
            val = "no"
2051
          else:
2052
            val = "yes"
2053
        elif field == "oper_state":
2054
          if instance.primary_node in bad_nodes:
2055
            val = "(node down)"
2056
          else:
2057
            if live_data.get(instance.name):
2058
              val = "running"
2059
            else:
2060
              val = "stopped"
2061
        elif field == "admin_ram":
2062
          val = instance.memory
2063
        elif field == "oper_ram":
2064
          if instance.primary_node in bad_nodes:
2065
            val = "(node down)"
2066
          elif instance.name in live_data:
2067
            val = live_data[instance.name].get("memory", "?")
2068
          else:
2069
            val = "-"
2070
        elif field == "disk_template":
2071
          val = instance.disk_template
2072
        elif field == "ip":
2073
          val = instance.nics[0].ip
2074
        elif field == "bridge":
2075
          val = instance.nics[0].bridge
2076
        elif field == "mac":
2077
          val = instance.nics[0].mac
2078
        else:
2079
          raise errors.ParameterError, field
2080
        val = str(val)
2081
        iout.append(val)
2082
      output.append(iout)
2083

    
2084
    return output
2085

    
2086

    
2087
class LUFailoverInstance(LogicalUnit):
2088
  """Failover an instance.
2089

2090
  """
2091
  HPATH = "instance-failover"
2092
  HTYPE = constants.HTYPE_INSTANCE
2093
  _OP_REQP = ["instance_name", "ignore_consistency"]
2094

    
2095
  def BuildHooksEnv(self):
2096
    """Build hooks env.
2097

2098
    This runs on master, primary and secondary nodes of the instance.
2099

2100
    """
2101
    env = {
2102
      "INSTANCE_NAME": self.op.instance_name,
2103
      "INSTANCE_PRIMARY": self.instance.primary_node,
2104
      "INSTANCE_SECONDARIES": " ".join(self.instance.secondary_nodes),
2105
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2106
      }
2107
    nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2108
    return env, nl, nl
2109

    
2110
  def CheckPrereq(self):
2111
    """Check prerequisites.
2112

2113
    This checks that the instance is in the cluster.
2114

2115
    """
2116
    instance = self.cfg.GetInstanceInfo(
2117
      self.cfg.ExpandInstanceName(self.op.instance_name))
2118
    if instance is None:
2119
      raise errors.OpPrereqError, ("Instance '%s' not known" %
2120
                                   self.op.instance_name)
2121

    
2122
    # check memory requirements on the secondary node
2123
    target_node = instance.secondary_nodes[0]
2124
    nodeinfo = rpc.call_node_info([target_node], self.cfg.GetVGName())
2125
    info = nodeinfo.get(target_node, None)
2126
    if not info:
2127
      raise errors.OpPrereqError, ("Cannot get current information"
2128
                                   " from node '%s'" % nodeinfo)
2129
    if instance.memory > info['memory_free']:
2130
      raise errors.OpPrereqError, ("Not enough memory on target node %s."
2131
                                   " %d MB available, %d MB required" %
2132
                                   (target_node, info['memory_free'],
2133
                                    instance.memory))
2134

    
2135
    # check bridge existance
2136
    brlist = [nic.bridge for nic in instance.nics]
2137
    if not rpc.call_bridges_exist(instance.primary_node, brlist):
2138
      raise errors.OpPrereqError, ("one or more target bridges %s does not"
2139
                                   " exist on destination node '%s'" %
2140
                                   (brlist, instance.primary_node))
2141

    
2142
    self.instance = instance
2143

    
2144
  def Exec(self, feedback_fn):
2145
    """Failover an instance.
2146

2147
    The failover is done by shutting it down on its present node and
2148
    starting it on the secondary.
2149

2150
    """
2151
    instance = self.instance
2152

    
2153
    source_node = instance.primary_node
2154
    target_node = instance.secondary_nodes[0]
2155

    
2156
    feedback_fn("* checking disk consistency between source and target")
2157
    for dev in instance.disks:
2158
      # for remote_raid1, these are md over drbd
2159
      if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2160
        if not self.op.ignore_consistency:
2161
          raise errors.OpExecError, ("Disk %s is degraded on target node,"
2162
                                     " aborting failover." % dev.iv_name)
2163

    
2164
    feedback_fn("* checking target node resource availability")
2165
    nodeinfo = rpc.call_node_info([target_node], self.cfg.GetVGName())
2166

    
2167
    if not nodeinfo:
2168
      raise errors.OpExecError, ("Could not contact target node %s." %
2169
                                 target_node)
2170

    
2171
    free_memory = int(nodeinfo[target_node]['memory_free'])
2172
    memory = instance.memory
2173
    if memory > free_memory:
2174
      raise errors.OpExecError, ("Not enough memory to create instance %s on"
2175
                                 " node %s. needed %s MiB, available %s MiB" %
2176
                                 (instance.name, target_node, memory,
2177
                                  free_memory))
2178

    
2179
    feedback_fn("* shutting down instance on source node")
2180
    logger.Info("Shutting down instance %s on node %s" %
2181
                (instance.name, source_node))
2182

    
2183
    if not rpc.call_instance_shutdown(source_node, instance):
2184
      logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2185
                   " anyway. Please make sure node %s is down"  %
2186
                   (instance.name, source_node, source_node))
2187

    
2188
    feedback_fn("* deactivating the instance's disks on source node")
2189
    if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2190
      raise errors.OpExecError, ("Can't shut down the instance's disks.")
2191

    
2192
    instance.primary_node = target_node
2193
    # distribute new instance config to the other nodes
2194
    self.cfg.AddInstance(instance)
2195

    
2196
    feedback_fn("* activating the instance's disks on target node")
2197
    logger.Info("Starting instance %s on node %s" %
2198
                (instance.name, target_node))
2199

    
2200
    disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2201
                                             ignore_secondaries=True)
2202
    if not disks_ok:
2203
      _ShutdownInstanceDisks(instance, self.cfg)
2204
      raise errors.OpExecError, ("Can't activate the instance's disks")
2205

    
2206
    feedback_fn("* starting the instance on the target node")
2207
    if not rpc.call_instance_start(target_node, instance, None):
2208
      _ShutdownInstanceDisks(instance, self.cfg)
2209
      raise errors.OpExecError("Could not start instance %s on node %s." %
2210
                               (instance.name, target_node))
2211

    
2212

    
2213
def _CreateBlockDevOnPrimary(cfg, node, device):
2214
  """Create a tree of block devices on the primary node.
2215

2216
  This always creates all devices.
2217

2218
  """
2219
  if device.children:
2220
    for child in device.children:
2221
      if not _CreateBlockDevOnPrimary(cfg, node, child):
2222
        return False
2223

    
2224
  cfg.SetDiskID(device, node)
2225
  new_id = rpc.call_blockdev_create(node, device, device.size, True)
2226
  if not new_id:
2227
    return False
2228
  if device.physical_id is None:
2229
    device.physical_id = new_id
2230
  return True
2231

    
2232

    
2233
def _CreateBlockDevOnSecondary(cfg, node, device, force):
2234
  """Create a tree of block devices on a secondary node.
2235

2236
  If this device type has to be created on secondaries, create it and
2237
  all its children.
2238

2239
  If not, just recurse to children keeping the same 'force' value.
2240

2241
  """
2242
  if device.CreateOnSecondary():
2243
    force = True
2244
  if device.children:
2245
    for child in device.children:
2246
      if not _CreateBlockDevOnSecondary(cfg, node, child, force):
2247
        return False
2248

    
2249
  if not force:
2250
    return True
2251
  cfg.SetDiskID(device, node)
2252
  new_id = rpc.call_blockdev_create(node, device, device.size, False)
2253
  if not new_id:
2254
    return False
2255
  if device.physical_id is None:
2256
    device.physical_id = new_id
2257
  return True
2258

    
2259

    
2260
def _GenerateMDDRBDBranch(cfg, vgname, primary, secondary, size, base):
2261
  """Generate a drbd device complete with its children.
2262

2263
  """
2264
  port = cfg.AllocatePort()
2265
  base = "%s_%s" % (base, port)
2266
  dev_data = objects.Disk(dev_type="lvm", size=size,
2267
                          logical_id=(vgname, "%s.data" % base))
2268
  dev_meta = objects.Disk(dev_type="lvm", size=128,
2269
                          logical_id=(vgname, "%s.meta" % base))
2270
  drbd_dev = objects.Disk(dev_type="drbd", size=size,
2271
                          logical_id = (primary, secondary, port),
2272
                          children = [dev_data, dev_meta])
2273
  return drbd_dev
2274

    
2275

    
2276
def _GenerateDiskTemplate(cfg, vgname, template_name,
2277
                          instance_name, primary_node,
2278
                          secondary_nodes, disk_sz, swap_sz):
2279
  """Generate the entire disk layout for a given template type.
2280

2281
  """
2282
  #TODO: compute space requirements
2283

    
2284
  if template_name == "diskless":
2285
    disks = []
2286
  elif template_name == "plain":
2287
    if len(secondary_nodes) != 0:
2288
      raise errors.ProgrammerError("Wrong template configuration")
2289
    sda_dev = objects.Disk(dev_type="lvm", size=disk_sz,
2290
                           logical_id=(vgname, "%s.os" % instance_name),
2291
                           iv_name = "sda")
2292
    sdb_dev = objects.Disk(dev_type="lvm", size=swap_sz,
2293
                           logical_id=(vgname, "%s.swap" % instance_name),
2294
                           iv_name = "sdb")
2295
    disks = [sda_dev, sdb_dev]
2296
  elif template_name == "local_raid1":
2297
    if len(secondary_nodes) != 0:
2298
      raise errors.ProgrammerError("Wrong template configuration")
2299
    sda_dev_m1 = objects.Disk(dev_type="lvm", size=disk_sz,
2300
                              logical_id=(vgname, "%s.os_m1" % instance_name))
2301
    sda_dev_m2 = objects.Disk(dev_type="lvm", size=disk_sz,
2302
                              logical_id=(vgname, "%s.os_m2" % instance_name))
2303
    md_sda_dev = objects.Disk(dev_type="md_raid1", iv_name = "sda",
2304
                              size=disk_sz,
2305
                              children = [sda_dev_m1, sda_dev_m2])
2306
    sdb_dev_m1 = objects.Disk(dev_type="lvm", size=swap_sz,
2307
                              logical_id=(vgname, "%s.swap_m1" %
2308
                                          instance_name))
2309
    sdb_dev_m2 = objects.Disk(dev_type="lvm", size=swap_sz,
2310
                              logical_id=(vgname, "%s.swap_m2" %
2311
                                          instance_name))
2312
    md_sdb_dev = objects.Disk(dev_type="md_raid1", iv_name = "sdb",
2313
                              size=swap_sz,
2314
                              children = [sdb_dev_m1, sdb_dev_m2])
2315
    disks = [md_sda_dev, md_sdb_dev]
2316
  elif template_name == "remote_raid1":
2317
    if len(secondary_nodes) != 1:
2318
      raise errors.ProgrammerError("Wrong template configuration")
2319
    remote_node = secondary_nodes[0]
2320
    drbd_sda_dev = _GenerateMDDRBDBranch(cfg, vgname,
2321
                                         primary_node, remote_node, disk_sz,
2322
                                         "%s-sda" % instance_name)
2323
    md_sda_dev = objects.Disk(dev_type="md_raid1", iv_name="sda",
2324
                              children = [drbd_sda_dev], size=disk_sz)
2325
    drbd_sdb_dev = _GenerateMDDRBDBranch(cfg, vgname,
2326
                                         primary_node, remote_node, swap_sz,
2327
                                         "%s-sdb" % instance_name)
2328
    md_sdb_dev = objects.Disk(dev_type="md_raid1", iv_name="sdb",
2329
                              children = [drbd_sdb_dev], size=swap_sz)
2330
    disks = [md_sda_dev, md_sdb_dev]
2331
  else:
2332
    raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
2333
  return disks
2334

    
2335

    
2336
def _CreateDisks(cfg, instance):
2337
  """Create all disks for an instance.
2338

2339
  This abstracts away some work from AddInstance.
2340

2341
  Args:
2342
    instance: the instance object
2343

2344
  Returns:
2345
    True or False showing the success of the creation process
2346

2347
  """
2348
  for device in instance.disks:
2349
    logger.Info("creating volume %s for instance %s" %
2350
              (device.iv_name, instance.name))
2351
    #HARDCODE
2352
    for secondary_node in instance.secondary_nodes:
2353
      if not _CreateBlockDevOnSecondary(cfg, secondary_node, device, False):
2354
        logger.Error("failed to create volume %s (%s) on secondary node %s!" %
2355
                     (device.iv_name, device, secondary_node))
2356
        return False
2357
    #HARDCODE
2358
    if not _CreateBlockDevOnPrimary(cfg, instance.primary_node, device):
2359
      logger.Error("failed to create volume %s on primary!" %
2360
                   device.iv_name)
2361
      return False
2362
  return True
2363

    
2364

    
2365
def _RemoveDisks(instance, cfg):
2366
  """Remove all disks for an instance.
2367

2368
  This abstracts away some work from `AddInstance()` and
2369
  `RemoveInstance()`. Note that in case some of the devices couldn't
2370
  be remove, the removal will continue with the other ones (compare
2371
  with `_CreateDisks()`).
2372

2373
  Args:
2374
    instance: the instance object
2375

2376
  Returns:
2377
    True or False showing the success of the removal proces
2378

2379
  """
2380
  logger.Info("removing block devices for instance %s" % instance.name)
2381

    
2382
  result = True
2383
  for device in instance.disks:
2384
    for node, disk in device.ComputeNodeTree(instance.primary_node):
2385
      cfg.SetDiskID(disk, node)
2386
      if not rpc.call_blockdev_remove(node, disk):
2387
        logger.Error("could not remove block device %s on node %s,"
2388
                     " continuing anyway" %
2389
                     (device.iv_name, node))
2390
        result = False
2391
  return result
2392

    
2393

    
2394
class LUCreateInstance(LogicalUnit):
2395
  """Create an instance.
2396

2397
  """
2398
  HPATH = "instance-add"
2399
  HTYPE = constants.HTYPE_INSTANCE
2400
  _OP_REQP = ["instance_name", "mem_size", "disk_size", "pnode",
2401
              "disk_template", "swap_size", "mode", "start", "vcpus",
2402
              "wait_for_sync"]
2403

    
2404
  def BuildHooksEnv(self):
2405
    """Build hooks env.
2406

2407
    This runs on master, primary and secondary nodes of the instance.
2408

2409
    """
2410
    env = {
2411
      "INSTANCE_NAME": self.op.instance_name,
2412
      "INSTANCE_PRIMARY": self.op.pnode,
2413
      "INSTANCE_SECONDARIES": " ".join(self.secondaries),
2414
      "DISK_TEMPLATE": self.op.disk_template,
2415
      "MEM_SIZE": self.op.mem_size,
2416
      "DISK_SIZE": self.op.disk_size,
2417
      "SWAP_SIZE": self.op.swap_size,
2418
      "VCPUS": self.op.vcpus,
2419
      "BRIDGE": self.op.bridge,
2420
      "INSTANCE_ADD_MODE": self.op.mode,
2421
      }
2422
    if self.op.mode == constants.INSTANCE_IMPORT:
2423
      env["SRC_NODE"] = self.op.src_node
2424
      env["SRC_PATH"] = self.op.src_path
2425
      env["SRC_IMAGE"] = self.src_image
2426
    if self.inst_ip:
2427
      env["INSTANCE_IP"] = self.inst_ip
2428

    
2429
    nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
2430
          self.secondaries)
2431
    return env, nl, nl
2432

    
2433

    
2434
  def CheckPrereq(self):
2435
    """Check prerequisites.
2436

2437
    """
2438
    if self.op.mode not in (constants.INSTANCE_CREATE,
2439
                            constants.INSTANCE_IMPORT):
2440
      raise errors.OpPrereqError, ("Invalid instance creation mode '%s'" %
2441
                                   self.op.mode)
2442

    
2443
    if self.op.mode == constants.INSTANCE_IMPORT:
2444
      src_node = getattr(self.op, "src_node", None)
2445
      src_path = getattr(self.op, "src_path", None)
2446
      if src_node is None or src_path is None:
2447
        raise errors.OpPrereqError, ("Importing an instance requires source"
2448
                                     " node and path options")
2449
      src_node_full = self.cfg.ExpandNodeName(src_node)
2450
      if src_node_full is None:
2451
        raise errors.OpPrereqError, ("Unknown source node '%s'" % src_node)
2452
      self.op.src_node = src_node = src_node_full
2453

    
2454
      if not os.path.isabs(src_path):
2455
        raise errors.OpPrereqError, ("The source path must be absolute")
2456

    
2457
      export_info = rpc.call_export_info(src_node, src_path)
2458

    
2459
      if not export_info:
2460
        raise errors.OpPrereqError, ("No export found in dir %s" % src_path)
2461

    
2462
      if not export_info.has_section(constants.INISECT_EXP):
2463
        raise errors.ProgrammerError, ("Corrupted export config")
2464

    
2465
      ei_version = export_info.get(constants.INISECT_EXP, 'version')
2466
      if (int(ei_version) != constants.EXPORT_VERSION):
2467
        raise errors.OpPrereqError, ("Wrong export version %s (wanted %d)" %
2468
                                     (ei_version, constants.EXPORT_VERSION))
2469

    
2470
      if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
2471
        raise errors.OpPrereqError, ("Can't import instance with more than"
2472
                                     " one data disk")
2473

    
2474
      # FIXME: are the old os-es, disk sizes, etc. useful?
2475
      self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
2476
      diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
2477
                                                         'disk0_dump'))
2478
      self.src_image = diskimage
2479
    else: # INSTANCE_CREATE
2480
      if getattr(self.op, "os_type", None) is None:
2481
        raise errors.OpPrereqError, ("No guest OS specified")
2482

    
2483
    # check primary node
2484
    pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
2485
    if pnode is None:
2486
      raise errors.OpPrereqError, ("Primary node '%s' is unknown" %
2487
                                   self.op.pnode)
2488
    self.op.pnode = pnode.name
2489
    self.pnode = pnode
2490
    self.secondaries = []
2491
    # disk template and mirror node verification
2492
    if self.op.disk_template not in constants.DISK_TEMPLATES:
2493
      raise errors.OpPrereqError, ("Invalid disk template name")
2494

    
2495
    if self.op.disk_template == constants.DT_REMOTE_RAID1:
2496
      if getattr(self.op, "snode", None) is None:
2497
        raise errors.OpPrereqError, ("The 'remote_raid1' disk template needs"
2498
                                     " a mirror node")
2499

    
2500
      snode_name = self.cfg.ExpandNodeName(self.op.snode)
2501
      if snode_name is None:
2502
        raise errors.OpPrereqError, ("Unknown secondary node '%s'" %
2503
                                     self.op.snode)
2504
      elif snode_name == pnode.name:
2505
        raise errors.OpPrereqError, ("The secondary node cannot be"
2506
                                     " the primary node.")
2507
      self.secondaries.append(snode_name)
2508

    
2509
    # Check lv size requirements
2510
    nodenames = [pnode.name] + self.secondaries
2511
    nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
2512

    
2513
    # Required free disk space as a function of disk and swap space
2514
    req_size_dict = {
2515
      constants.DT_DISKLESS: 0,
2516
      constants.DT_PLAIN: self.op.disk_size + self.op.swap_size,
2517
      constants.DT_LOCAL_RAID1: (self.op.disk_size + self.op.swap_size) * 2,
2518
      # 256 MB are added for drbd metadata, 128MB for each drbd device
2519
      constants.DT_REMOTE_RAID1: self.op.disk_size + self.op.swap_size + 256,
2520
    }
2521

    
2522
    if self.op.disk_template not in req_size_dict:
2523
      raise errors.ProgrammerError, ("Disk template '%s' size requirement"
2524
                                     " is unknown" %  self.op.disk_template)
2525

    
2526
    req_size = req_size_dict[self.op.disk_template]
2527

    
2528
    for node in nodenames:
2529
      info = nodeinfo.get(node, None)
2530
      if not info:
2531
        raise errors.OpPrereqError, ("Cannot get current information"
2532
                                     " from node '%s'" % nodeinfo)
2533
      if req_size > info['vg_free']:
2534
        raise errors.OpPrereqError, ("Not enough disk space on target node %s."
2535
                                     " %d MB available, %d MB required" %
2536
                                     (node, info['vg_free'], req_size))
2537

    
2538
    # os verification
2539
    os_obj = rpc.call_os_get([pnode.name], self.op.os_type)[pnode.name]
2540
    if not isinstance(os_obj, objects.OS):
2541
      raise errors.OpPrereqError, ("OS '%s' not in supported os list for"
2542
                                   " primary node"  % self.op.os_type)
2543

    
2544
    # instance verification
2545
    hostname1 = utils.LookupHostname(self.op.instance_name)
2546
    if not hostname1:
2547
      raise errors.OpPrereqError, ("Instance name '%s' not found in dns" %
2548
                                   self.op.instance_name)
2549

    
2550
    self.op.instance_name = instance_name = hostname1['hostname']
2551
    instance_list = self.cfg.GetInstanceList()
2552
    if instance_name in instance_list:
2553
      raise errors.OpPrereqError, ("Instance '%s' is already in the cluster" %
2554
                                   instance_name)
2555

    
2556
    ip = getattr(self.op, "ip", None)
2557
    if ip is None or ip.lower() == "none":
2558
      inst_ip = None
2559
    elif ip.lower() == "auto":
2560
      inst_ip = hostname1['ip']
2561
    else:
2562
      if not utils.IsValidIP(ip):
2563
        raise errors.OpPrereqError, ("given IP address '%s' doesn't look"
2564
                                     " like a valid IP" % ip)
2565
      inst_ip = ip
2566
    self.inst_ip = inst_ip
2567

    
2568
    command = ["fping", "-q", hostname1['ip']]
2569
    result = utils.RunCmd(command)
2570
    if not result.failed:
2571
      raise errors.OpPrereqError, ("IP %s of instance %s already in use" %
2572
                                   (hostname1['ip'], instance_name))
2573

    
2574
    # bridge verification
2575
    bridge = getattr(self.op, "bridge", None)
2576
    if bridge is None:
2577
      self.op.bridge = self.cfg.GetDefBridge()
2578
    else:
2579
      self.op.bridge = bridge
2580

    
2581
    if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
2582
      raise errors.OpPrereqError, ("target bridge '%s' does not exist on"
2583
                                   " destination node '%s'" %
2584
                                   (self.op.bridge, pnode.name))
2585

    
2586
    if self.op.start:
2587
      self.instance_status = 'up'
2588
    else:
2589
      self.instance_status = 'down'
2590

    
2591
  def Exec(self, feedback_fn):
2592
    """Create and add the instance to the cluster.
2593

2594
    """
2595
    instance = self.op.instance_name
2596
    pnode_name = self.pnode.name
2597

    
2598
    nic = objects.NIC(bridge=self.op.bridge, mac=self.cfg.GenerateMAC())
2599
    if self.inst_ip is not None:
2600
      nic.ip = self.inst_ip
2601

    
2602
    disks = _GenerateDiskTemplate(self.cfg, self.cfg.GetVGName(),
2603
                                  self.op.disk_template,
2604
                                  instance, pnode_name,
2605
                                  self.secondaries, self.op.disk_size,
2606
                                  self.op.swap_size)
2607

    
2608
    iobj = objects.Instance(name=instance, os=self.op.os_type,
2609
                            primary_node=pnode_name,
2610
                            memory=self.op.mem_size,
2611
                            vcpus=self.op.vcpus,
2612
                            nics=[nic], disks=disks,
2613
                            disk_template=self.op.disk_template,
2614
                            status=self.instance_status,
2615
                            )
2616

    
2617
    feedback_fn("* creating instance disks...")
2618
    if not _CreateDisks(self.cfg, iobj):
2619
      _RemoveDisks(iobj, self.cfg)
2620
      raise errors.OpExecError, ("Device creation failed, reverting...")
2621

    
2622
    feedback_fn("adding instance %s to cluster config" % instance)
2623

    
2624
    self.cfg.AddInstance(iobj)
2625

    
2626
    if self.op.wait_for_sync:
2627
      disk_abort = not _WaitForSync(self.cfg, iobj)
2628
    elif iobj.disk_template == "remote_raid1":
2629
      # make sure the disks are not degraded (still sync-ing is ok)
2630
      time.sleep(15)
2631
      feedback_fn("* checking mirrors status")
2632
      disk_abort = not _WaitForSync(self.cfg, iobj, oneshot=True)
2633
    else:
2634
      disk_abort = False
2635

    
2636
    if disk_abort:
2637
      _RemoveDisks(iobj, self.cfg)
2638
      self.cfg.RemoveInstance(iobj.name)
2639
      raise errors.OpExecError, ("There are some degraded disks for"
2640
                                      " this instance")
2641

    
2642
    feedback_fn("creating os for instance %s on node %s" %
2643
                (instance, pnode_name))
2644

    
2645
    if iobj.disk_template != constants.DT_DISKLESS:
2646
      if self.op.mode == constants.INSTANCE_CREATE:
2647
        feedback_fn("* running the instance OS create scripts...")
2648
        if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
2649
          raise errors.OpExecError, ("could not add os for instance %s"
2650
                                          " on node %s" %
2651
                                          (instance, pnode_name))
2652

    
2653
      elif self.op.mode == constants.INSTANCE_IMPORT:
2654
        feedback_fn("* running the instance OS import scripts...")
2655
        src_node = self.op.src_node
2656
        src_image = self.src_image
2657
        if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
2658
                                                src_node, src_image):
2659
          raise errors.OpExecError, ("Could not import os for instance"
2660
                                          " %s on node %s" %
2661
                                          (instance, pnode_name))
2662
      else:
2663
        # also checked in the prereq part
2664
        raise errors.ProgrammerError, ("Unknown OS initialization mode '%s'"
2665
                                       % self.op.mode)
2666

    
2667
    if self.op.start:
2668
      logger.Info("starting instance %s on node %s" % (instance, pnode_name))
2669
      feedback_fn("* starting instance...")
2670
      if not rpc.call_instance_start(pnode_name, iobj, None):
2671
        raise errors.OpExecError, ("Could not start instance")
2672

    
2673

    
2674
class LUConnectConsole(NoHooksLU):
2675
  """Connect to an instance's console.
2676

2677
  This is somewhat special in that it returns the command line that
2678
  you need to run on the master node in order to connect to the
2679
  console.
2680

2681
  """
2682
  _OP_REQP = ["instance_name"]
2683

    
2684
  def CheckPrereq(self):
2685
    """Check prerequisites.
2686

2687
    This checks that the instance is in the cluster.
2688

2689
    """
2690
    instance = self.cfg.GetInstanceInfo(
2691
      self.cfg.ExpandInstanceName(self.op.instance_name))
2692
    if instance is None:
2693
      raise errors.OpPrereqError, ("Instance '%s' not known" %
2694
                                   self.op.instance_name)
2695
    self.instance = instance
2696

    
2697
  def Exec(self, feedback_fn):
2698
    """Connect to the console of an instance
2699

2700
    """
2701
    instance = self.instance
2702
    node = instance.primary_node
2703

    
2704
    node_insts = rpc.call_instance_list([node])[node]
2705
    if node_insts is False:
2706
      raise errors.OpExecError, ("Can't connect to node %s." % node)
2707

    
2708
    if instance.name not in node_insts:
2709
      raise errors.OpExecError, ("Instance %s is not running." % instance.name)
2710

    
2711
    logger.Debug("connecting to console of %s on %s" % (instance.name, node))
2712

    
2713
    hyper = hypervisor.GetHypervisor()
2714
    console_cmd = hyper.GetShellCommandForConsole(instance.name)
2715
    return node, console_cmd
2716

    
2717

    
2718
class LUAddMDDRBDComponent(LogicalUnit):
2719
  """Adda new mirror member to an instance's disk.
2720

2721
  """
2722
  HPATH = "mirror-add"
2723
  HTYPE = constants.HTYPE_INSTANCE
2724
  _OP_REQP = ["instance_name", "remote_node", "disk_name"]
2725

    
2726
  def BuildHooksEnv(self):
2727
    """Build hooks env.
2728

2729
    This runs on the master, the primary and all the secondaries.
2730

2731
    """
2732
    env = {
2733
      "INSTANCE_NAME": self.op.instance_name,
2734
      "NEW_SECONDARY": self.op.remote_node,
2735
      "DISK_NAME": self.op.disk_name,
2736
      }
2737
    nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
2738
          self.op.remote_node,] + list(self.instance.secondary_nodes)
2739
    return env, nl, nl
2740

    
2741
  def CheckPrereq(self):
2742
    """Check prerequisites.
2743

2744
    This checks that the instance is in the cluster.
2745

2746
    """
2747
    instance = self.cfg.GetInstanceInfo(
2748
      self.cfg.ExpandInstanceName(self.op.instance_name))
2749
    if instance is None:
2750
      raise errors.OpPrereqError, ("Instance '%s' not known" %
2751
                                   self.op.instance_name)
2752
    self.instance = instance
2753

    
2754
    remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
2755
    if remote_node is None:
2756
      raise errors.OpPrereqError, ("Node '%s' not known" % self.op.remote_node)
2757
    self.remote_node = remote_node
2758

    
2759
    if remote_node == instance.primary_node:
2760
      raise errors.OpPrereqError, ("The specified node is the primary node of"
2761
                                   " the instance.")
2762

    
2763
    if instance.disk_template != constants.DT_REMOTE_RAID1:
2764
      raise errors.OpPrereqError, ("Instance's disk layout is not"
2765
                                   " remote_raid1.")
2766
    for disk in instance.disks:
2767
      if disk.iv_name == self.op.disk_name:
2768
        break
2769
    else:
2770
      raise errors.OpPrereqError, ("Can't find this device ('%s') in the"
2771
                                   " instance." % self.op.disk_name)
2772
    if len(disk.children) > 1:
2773
      raise errors.OpPrereqError, ("The device already has two slave"
2774
                                   " devices.\n"
2775
                                   "This would create a 3-disk raid1"
2776
                                   " which we don't allow.")
2777
    self.disk = disk
2778

    
2779
  def Exec(self, feedback_fn):
2780
    """Add the mirror component
2781

2782
    """
2783
    disk = self.disk
2784
    instance = self.instance
2785

    
2786
    remote_node = self.remote_node
2787
    new_drbd = _GenerateMDDRBDBranch(self.cfg, self.cfg.GetVGName(),
2788
                                     instance.primary_node, remote_node,
2789
                                     disk.size, "%s-%s" %
2790
                                     (instance.name, self.op.disk_name))
2791

    
2792
    logger.Info("adding new mirror component on secondary")
2793
    #HARDCODE
2794
    if not _CreateBlockDevOnSecondary(self.cfg, remote_node, new_drbd, False):
2795
      raise errors.OpExecError, ("Failed to create new component on secondary"
2796
                                 " node %s" % remote_node)
2797

    
2798
    logger.Info("adding new mirror component on primary")
2799
    #HARDCODE
2800
    if not _CreateBlockDevOnPrimary(self.cfg, instance.primary_node, new_drbd):
2801
      # remove secondary dev
2802
      self.cfg.SetDiskID(new_drbd, remote_node)
2803
      rpc.call_blockdev_remove(remote_node, new_drbd)
2804
      raise errors.OpExecError, ("Failed to create volume on primary")
2805

    
2806
    # the device exists now
2807
    # call the primary node to add the mirror to md
2808
    logger.Info("adding new mirror component to md")
2809
    if not rpc.call_blockdev_addchild(instance.primary_node,
2810
                                           disk, new_drbd):
2811
      logger.Error("Can't add mirror compoment to md!")
2812
      self.cfg.SetDiskID(new_drbd, remote_node)
2813
      if not rpc.call_blockdev_remove(remote_node, new_drbd):
2814
        logger.Error("Can't rollback on secondary")
2815
      self.cfg.SetDiskID(new_drbd, instance.primary_node)
2816
      if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
2817
        logger.Error("Can't rollback on primary")
2818
      raise errors.OpExecError, "Can't add mirror component to md array"
2819

    
2820
    disk.children.append(new_drbd)
2821

    
2822
    self.cfg.AddInstance(instance)
2823

    
2824
    _WaitForSync(self.cfg, instance)
2825

    
2826
    return 0
2827

    
2828

    
2829
class LURemoveMDDRBDComponent(LogicalUnit):
2830
  """Remove a component from a remote_raid1 disk.
2831

2832
  """
2833
  HPATH = "mirror-remove"
2834
  HTYPE = constants.HTYPE_INSTANCE
2835
  _OP_REQP = ["instance_name", "disk_name", "disk_id"]
2836

    
2837
  def BuildHooksEnv(self):
2838
    """Build hooks env.
2839

2840
    This runs on the master, the primary and all the secondaries.
2841

2842
    """
2843
    env = {
2844
      "INSTANCE_NAME": self.op.instance_name,
2845
      "DISK_NAME": self.op.disk_name,
2846
      "DISK_ID": self.op.disk_id,
2847
      "OLD_SECONDARY": self.old_secondary,
2848
      }
2849
    nl = [self.sstore.GetMasterNode(),
2850
          self.instance.primary_node] + list(self.instance.secondary_nodes)
2851
    return env, nl, nl
2852

    
2853
  def CheckPrereq(self):
2854
    """Check prerequisites.
2855

2856
    This checks that the instance is in the cluster.
2857

2858
    """
2859
    instance = self.cfg.GetInstanceInfo(
2860
      self.cfg.ExpandInstanceName(self.op.instance_name))
2861
    if instance is None:
2862
      raise errors.OpPrereqError, ("Instance '%s' not known" %
2863
                                   self.op.instance_name)
2864
    self.instance = instance
2865

    
2866
    if instance.disk_template != constants.DT_REMOTE_RAID1:
2867
      raise errors.OpPrereqError, ("Instance's disk layout is not"
2868
                                   " remote_raid1.")
2869
    for disk in instance.disks:
2870
      if disk.iv_name == self.op.disk_name:
2871
        break
2872
    else:
2873
      raise errors.OpPrereqError, ("Can't find this device ('%s') in the"
2874
                                   " instance." % self.op.disk_name)
2875
    for child in disk.children:
2876
      if child.dev_type == "drbd" and child.logical_id[2] == self.op.disk_id:
2877
        break
2878
    else:
2879
      raise errors.OpPrereqError, ("Can't find the device with this port.")
2880

    
2881
    if len(disk.children) < 2:
2882
      raise errors.OpPrereqError, ("Cannot remove the last component from"
2883
                                   " a mirror.")
2884
    self.disk = disk
2885
    self.child = child
2886
    if self.child.logical_id[0] == instance.primary_node:
2887
      oid = 1
2888
    else:
2889
      oid = 0
2890
    self.old_secondary = self.child.logical_id[oid]
2891

    
2892
  def Exec(self, feedback_fn):
2893
    """Remove the mirror component
2894

2895
    """
2896
    instance = self.instance
2897
    disk = self.disk
2898
    child = self.child
2899
    logger.Info("remove mirror component")
2900
    self.cfg.SetDiskID(disk, instance.primary_node)
2901
    if not rpc.call_blockdev_removechild(instance.primary_node,
2902
                                              disk, child):
2903
      raise errors.OpExecError, ("Can't remove child from mirror.")
2904

    
2905
    for node in child.logical_id[:2]:
2906
      self.cfg.SetDiskID(child, node)
2907
      if not rpc.call_blockdev_remove(node, child):
2908
        logger.Error("Warning: failed to remove device from node %s,"
2909
                     " continuing operation." % node)
2910

    
2911
    disk.children.remove(child)
2912
    self.cfg.AddInstance(instance)
2913

    
2914

    
2915
class LUReplaceDisks(LogicalUnit):
2916
  """Replace the disks of an instance.
2917

2918
  """
2919
  HPATH = "mirrors-replace"
2920
  HTYPE = constants.HTYPE_INSTANCE
2921
  _OP_REQP = ["instance_name"]
2922

    
2923
  def BuildHooksEnv(self):
2924
    """Build hooks env.
2925

2926
    This runs on the master, the primary and all the secondaries.
2927

2928
    """
2929
    env = {
2930
      "INSTANCE_NAME": self.op.instance_name,
2931
      "NEW_SECONDARY": self.op.remote_node,
2932
      "OLD_SECONDARY": self.instance.secondary_nodes[0],
2933
      }
2934
    nl = [self.sstore.GetMasterNode(),
2935
          self.instance.primary_node] + list(self.instance.secondary_nodes)
2936
    return env, nl, nl
2937

    
2938
  def CheckPrereq(self):
2939
    """Check prerequisites.
2940

2941
    This checks that the instance is in the cluster.
2942

2943
    """
2944
    instance = self.cfg.GetInstanceInfo(
2945
      self.cfg.ExpandInstanceName(self.op.instance_name))
2946
    if instance is None:
2947
      raise errors.OpPrereqError, ("Instance '%s' not known" %
2948
                                   self.op.instance_name)
2949
    self.instance = instance
2950

    
2951
    if instance.disk_template != constants.DT_REMOTE_RAID1:
2952
      raise errors.OpPrereqError, ("Instance's disk layout is not"
2953
                                   " remote_raid1.")
2954

    
2955
    if len(instance.secondary_nodes) != 1:
2956
      raise errors.OpPrereqError, ("The instance has a strange layout,"
2957
                                   " expected one secondary but found %d" %
2958
                                   len(instance.secondary_nodes))
2959

    
2960
    remote_node = getattr(self.op, "remote_node", None)
2961
    if remote_node is None:
2962
      remote_node = instance.secondary_nodes[0]
2963
    else:
2964
      remote_node = self.cfg.ExpandNodeName(remote_node)
2965
      if remote_node is None:
2966
        raise errors.OpPrereqError, ("Node '%s' not known" %
2967
                                     self.op.remote_node)
2968
    if remote_node == instance.primary_node:
2969
      raise errors.OpPrereqError, ("The specified node is the primary node of"
2970
                                   " the instance.")
2971
    self.op.remote_node = remote_node
2972

    
2973
  def Exec(self, feedback_fn):
2974
    """Replace the disks of an instance.
2975

2976
    """
2977
    instance = self.instance
2978
    iv_names = {}
2979
    # start of work
2980
    remote_node = self.op.remote_node
2981
    cfg = self.cfg
2982
    vgname = cfg.GetVGName()
2983
    for dev in instance.disks:
2984
      size = dev.size
2985
      new_drbd = _GenerateMDDRBDBranch(cfg, vgname, instance.primary_node,
2986
                                       remote_node, size,
2987
                                       "%s-%s" % (instance.name, dev.iv_name))
2988
      iv_names[dev.iv_name] = (dev, dev.children[0], new_drbd)
2989
      logger.Info("adding new mirror component on secondary for %s" %
2990
                  dev.iv_name)
2991
      #HARDCODE
2992
      if not _CreateBlockDevOnSecondary(cfg, remote_node, new_drbd, False):
2993
        raise errors.OpExecError, ("Failed to create new component on"
2994
                                   " secondary node %s\n"
2995
                                   "Full abort, cleanup manually!" %
2996
                                   remote_node)
2997

    
2998
      logger.Info("adding new mirror component on primary")
2999
      #HARDCODE
3000
      if not _CreateBlockDevOnPrimary(cfg, instance.primary_node, new_drbd):
3001
        # remove secondary dev
3002
        cfg.SetDiskID(new_drbd, remote_node)
3003
        rpc.call_blockdev_remove(remote_node, new_drbd)
3004
        raise errors.OpExecError("Failed to create volume on primary!\n"
3005
                                 "Full abort, cleanup manually!!")
3006

    
3007
      # the device exists now
3008
      # call the primary node to add the mirror to md
3009
      logger.Info("adding new mirror component to md")
3010
      if not rpc.call_blockdev_addchild(instance.primary_node, dev,
3011
                                        new_drbd):
3012
        logger.Error("Can't add mirror compoment to md!")
3013
        cfg.SetDiskID(new_drbd, remote_node)
3014
        if not rpc.call_blockdev_remove(remote_node, new_drbd):
3015
          logger.Error("Can't rollback on secondary")
3016
        cfg.SetDiskID(new_drbd, instance.primary_node)
3017
        if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3018
          logger.Error("Can't rollback on primary")
3019
        raise errors.OpExecError, ("Full abort, cleanup manually!!")
3020

    
3021
      dev.children.append(new_drbd)
3022
      cfg.AddInstance(instance)
3023

    
3024
    # this can fail as the old devices are degraded and _WaitForSync
3025
    # does a combined result over all disks, so we don't check its
3026
    # return value
3027
    _WaitForSync(cfg, instance, unlock=True)
3028

    
3029
    # so check manually all the devices
3030
    for name in iv_names:
3031
      dev, child, new_drbd = iv_names[name]
3032
      cfg.SetDiskID(dev, instance.primary_node)
3033
      is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3034
      if is_degr:
3035
        raise errors.OpExecError, ("MD device %s is degraded!" % name)
3036
      cfg.SetDiskID(new_drbd, instance.primary_node)
3037
      is_degr = rpc.call_blockdev_find(instance.primary_node, new_drbd)[5]
3038
      if is_degr:
3039
        raise errors.OpExecError, ("New drbd device %s is degraded!" % name)
3040

    
3041
    for name in iv_names:
3042
      dev, child, new_drbd = iv_names[name]
3043
      logger.Info("remove mirror %s component" % name)
3044
      cfg.SetDiskID(dev, instance.primary_node)
3045
      if not rpc.call_blockdev_removechild(instance.primary_node,
3046
                                                dev, child):
3047
        logger.Error("Can't remove child from mirror, aborting"
3048
                     " *this device cleanup*.\nYou need to cleanup manually!!")
3049
        continue
3050

    
3051
      for node in child.logical_id[:2]:
3052
        logger.Info("remove child device on %s" % node)
3053
        cfg.SetDiskID(child, node)
3054
        if not rpc.call_blockdev_remove(node, child):
3055
          logger.Error("Warning: failed to remove device from node %s,"
3056
                       " continuing operation." % node)
3057

    
3058
      dev.children.remove(child)
3059

    
3060
      cfg.AddInstance(instance)
3061

    
3062

    
3063
class LUQueryInstanceData(NoHooksLU):
3064
  """Query runtime instance data.
3065

3066
  """
3067
  _OP_REQP = ["instances"]
3068

    
3069
  def CheckPrereq(self):
3070
    """Check prerequisites.
3071

3072
    This only checks the optional instance list against the existing names.
3073

3074
    """
3075
    if not isinstance(self.op.instances, list):
3076
      raise errors.OpPrereqError, "Invalid argument type 'instances'"
3077
    if self.op.instances:
3078
      self.wanted_instances = []
3079
      names = self.op.instances
3080
      for name in names:
3081
        instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
3082
        if instance is None:
3083
          raise errors.OpPrereqError, ("No such instance name '%s'" % name)
3084
      self.wanted_instances.append(instance)
3085
    else:
3086
      self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
3087
                               in self.cfg.GetInstanceList()]
3088
    return
3089

    
3090

    
3091
  def _ComputeDiskStatus(self, instance, snode, dev):
3092
    """Compute block device status.
3093

3094
    """
3095
    self.cfg.SetDiskID(dev, instance.primary_node)
3096
    dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
3097
    if dev.dev_type == "drbd":
3098
      # we change the snode then (otherwise we use the one passed in)
3099
      if dev.logical_id[0] == instance.primary_node:
3100
        snode = dev.logical_id[1]
3101
      else:
3102
        snode = dev.logical_id[0]
3103

    
3104
    if snode:
3105
      self.cfg.SetDiskID(dev, snode)
3106
      dev_sstatus = rpc.call_blockdev_find(snode, dev)
3107
    else:
3108
      dev_sstatus = None
3109

    
3110
    if dev.children:
3111
      dev_children = [self._ComputeDiskStatus(instance, snode, child)
3112
                      for child in dev.children]
3113
    else:
3114
      dev_children = []
3115

    
3116
    data = {
3117
      "iv_name": dev.iv_name,
3118
      "dev_type": dev.dev_type,
3119
      "logical_id": dev.logical_id,
3120
      "physical_id": dev.physical_id,
3121
      "pstatus": dev_pstatus,
3122
      "sstatus": dev_sstatus,
3123
      "children": dev_children,
3124
      }
3125

    
3126
    return data
3127

    
3128
  def Exec(self, feedback_fn):
3129
    """Gather and return data"""
3130
    result = {}
3131
    for instance in self.wanted_instances:
3132
      remote_info = rpc.call_instance_info(instance.primary_node,
3133
                                                instance.name)
3134
      if remote_info and "state" in remote_info:
3135
        remote_state = "up"
3136
      else:
3137
        remote_state = "down"
3138
      if instance.status == "down":
3139
        config_state = "down"
3140
      else:
3141
        config_state = "up"
3142

    
3143
      disks = [self._ComputeDiskStatus(instance, None, device)
3144
               for device in instance.disks]
3145

    
3146
      idict = {
3147
        "name": instance.name,
3148
        "config_state": config_state,
3149
        "run_state": remote_state,
3150
        "pnode": instance.primary_node,
3151
        "snodes": instance.secondary_nodes,
3152
        "os": instance.os,
3153
        "memory": instance.memory,
3154
        "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
3155
        "disks": disks,
3156
        }
3157

    
3158
      result[instance.name] = idict
3159

    
3160
    return result
3161

    
3162

    
3163
class LUQueryNodeData(NoHooksLU):
3164
  """Logical unit for querying node data.
3165

3166
  """
3167
  _OP_REQP = ["nodes"]
3168

    
3169
  def CheckPrereq(self):
3170
    """Check prerequisites.
3171

3172
    This only checks the optional node list against the existing names.
3173

3174
    """
3175
    self.wanted_nodes = _GetWantedNodes(self, self.op.nodes)
3176

    
3177
  def Exec(self, feedback_fn):
3178
    """Compute and return the list of nodes.
3179

3180
    """
3181
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
3182
             in self.cfg.GetInstanceList()]
3183
    result = []
3184
    for node in self.wanted_nodes:
3185
      result.append((node.name, node.primary_ip, node.secondary_ip,
3186
                     [inst.name for inst in ilist
3187
                      if inst.primary_node == node.name],
3188
                     [inst.name for inst in ilist
3189
                      if node.name in inst.secondary_nodes],
3190
                     ))
3191
    return result
3192

    
3193

    
3194
class LUSetInstanceParms(LogicalUnit):
3195
  """Modifies an instances's parameters.
3196

3197
  """
3198
  HPATH = "instance-modify"
3199
  HTYPE = constants.HTYPE_INSTANCE
3200
  _OP_REQP = ["instance_name"]
3201

    
3202
  def BuildHooksEnv(self):
3203
    """Build hooks env.
3204

3205
    This runs on the master, primary and secondaries.
3206

3207
    """
3208
    env = {
3209
      "INSTANCE_NAME": self.op.instance_name,
3210
      }
3211
    if self.mem:
3212
      env["MEM_SIZE"] = self.mem
3213
    if self.vcpus:
3214
      env["VCPUS"] = self.vcpus
3215
    if self.do_ip:
3216
      env["INSTANCE_IP"] = self.ip
3217
    if self.bridge:
3218
      env["BRIDGE"] = self.bridge
3219

    
3220
    nl = [self.sstore.GetMasterNode(),
3221
          self.instance.primary_node] + list(self.instance.secondary_nodes)
3222

    
3223
    return env, nl, nl
3224

    
3225
  def CheckPrereq(self):
3226
    """Check prerequisites.
3227

3228
    This only checks the instance list against the existing names.
3229

3230
    """
3231
    self.mem = getattr(self.op, "mem", None)
3232
    self.vcpus = getattr(self.op, "vcpus", None)
3233
    self.ip = getattr(self.op, "ip", None)
3234
    self.bridge = getattr(self.op, "bridge", None)
3235
    if [self.mem, self.vcpus, self.ip, self.bridge].count(None) == 4:
3236
      raise errors.OpPrereqError, ("No changes submitted")
3237
    if self.mem is not None:
3238
      try:
3239
        self.mem = int(self.mem)
3240
      except ValueError, err:
3241
        raise errors.OpPrereqError, ("Invalid memory size: %s" % str(err))
3242
    if self.vcpus is not None:
3243
      try:
3244
        self.vcpus = int(self.vcpus)
3245
      except ValueError, err:
3246
        raise errors.OpPrereqError, ("Invalid vcpus number: %s" % str(err))
3247
    if self.ip is not None:
3248
      self.do_ip = True
3249
      if self.ip.lower() == "none":
3250
        self.ip = None
3251
      else:
3252
        if not utils.IsValidIP(self.ip):
3253
          raise errors.OpPrereqError, ("Invalid IP address '%s'." % self.ip)
3254
    else:
3255
      self.do_ip = False
3256

    
3257
    instance = self.cfg.GetInstanceInfo(
3258
      self.cfg.ExpandInstanceName(self.op.instance_name))
3259
    if instance is None:
3260
      raise errors.OpPrereqError, ("No such instance name '%s'" %
3261
                                   self.op.instance_name)
3262
    self.op.instance_name = instance.name
3263
    self.instance = instance
3264
    return
3265

    
3266
  def Exec(self, feedback_fn):
3267
    """Modifies an instance.
3268

3269
    All parameters take effect only at the next restart of the instance.
3270
    """
3271
    result = []
3272
    instance = self.instance
3273
    if self.mem:
3274
      instance.memory = self.mem
3275
      result.append(("mem", self.mem))
3276
    if self.vcpus:
3277
      instance.vcpus = self.vcpus
3278
      result.append(("vcpus",  self.vcpus))
3279
    if self.do_ip:
3280
      instance.nics[0].ip = self.ip
3281
      result.append(("ip", self.ip))
3282
    if self.bridge:
3283
      instance.nics[0].bridge = self.bridge
3284
      result.append(("bridge", self.bridge))
3285

    
3286
    self.cfg.AddInstance(instance)
3287

    
3288
    return result
3289

    
3290

    
3291
class LUQueryExports(NoHooksLU):
3292
  """Query the exports list
3293

3294
  """
3295
  _OP_REQP = []
3296

    
3297
  def CheckPrereq(self):
3298
    """Check that the nodelist contains only existing nodes.
3299

3300
    """
3301
    self.nodes = _GetWantedNodes(self, getattr(self.op, "nodes", None))
3302

    
3303
  def Exec(self, feedback_fn):
3304
    """Compute the list of all the exported system images.
3305

3306
    Returns:
3307
      a dictionary with the structure node->(export-list)
3308
      where export-list is a list of the instances exported on
3309
      that node.
3310

3311
    """
3312
    return rpc.call_export_list([node.name for node in self.nodes])
3313

    
3314

    
3315
class LUExportInstance(LogicalUnit):
3316
  """Export an instance to an image in the cluster.
3317

3318
  """
3319
  HPATH = "instance-export"
3320
  HTYPE = constants.HTYPE_INSTANCE
3321
  _OP_REQP = ["instance_name", "target_node", "shutdown"]
3322

    
3323
  def BuildHooksEnv(self):
3324
    """Build hooks env.
3325

3326
    This will run on the master, primary node and target node.
3327

3328
    """
3329
    env = {
3330
      "INSTANCE_NAME": self.op.instance_name,
3331
      "EXPORT_NODE": self.op.target_node,
3332
      "EXPORT_DO_SHUTDOWN": self.op.shutdown,
3333
      }
3334
    nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
3335
          self.op.target_node]
3336
    return env, nl, nl
3337

    
3338
  def CheckPrereq(self):
3339
    """Check prerequisites.
3340

3341
    This checks that the instance name is a valid one.
3342

3343
    """
3344
    instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
3345
    self.instance = self.cfg.GetInstanceInfo(instance_name)
3346
    if self.instance is None:
3347
      raise errors.OpPrereqError, ("Instance '%s' not found" %
3348
                                   self.op.instance_name)
3349

    
3350
    # node verification
3351
    dst_node_short = self.cfg.ExpandNodeName(self.op.target_node)
3352
    self.dst_node = self.cfg.GetNodeInfo(dst_node_short)
3353

    
3354
    if self.dst_node is None:
3355
      raise errors.OpPrereqError, ("Destination node '%s' is unknown." %
3356
                                   self.op.target_node)
3357
    self.op.target_node = self.dst_node.name
3358

    
3359
  def Exec(self, feedback_fn):
3360
    """Export an instance to an image in the cluster.
3361

3362
    """
3363
    instance = self.instance
3364
    dst_node = self.dst_node
3365
    src_node = instance.primary_node
3366
    # shutdown the instance, unless requested not to do so
3367
    if self.op.shutdown:
3368
      op = opcodes.OpShutdownInstance(instance_name=instance.name)
3369
      self.processor.ChainOpCode(op, feedback_fn)
3370

    
3371
    vgname = self.cfg.GetVGName()
3372

    
3373
    snap_disks = []
3374

    
3375
    try:
3376
      for disk in instance.disks:
3377
        if disk.iv_name == "sda":
3378
          # new_dev_name will be a snapshot of an lvm leaf of the one we passed
3379
          new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
3380

    
3381
          if not new_dev_name:
3382
            logger.Error("could not snapshot block device %s on node %s" %
3383
                         (disk.logical_id[1], src_node))
3384
          else:
3385
            new_dev = objects.Disk(dev_type="lvm", size=disk.size,
3386
                                      logical_id=(vgname, new_dev_name),
3387
                                      physical_id=(vgname, new_dev_name),
3388
                                      iv_name=disk.iv_name)
3389
            snap_disks.append(new_dev)
3390

    
3391
    finally:
3392
      if self.op.shutdown:
3393
        op = opcodes.OpStartupInstance(instance_name=instance.name,
3394
                                       force=False)
3395
        self.processor.ChainOpCode(op, feedback_fn)
3396

    
3397
    # TODO: check for size
3398

    
3399
    for dev in snap_disks:
3400
      if not rpc.call_snapshot_export(src_node, dev, dst_node.name,
3401
                                           instance):
3402
        logger.Error("could not export block device %s from node"
3403
                     " %s to node %s" %
3404
                     (dev.logical_id[1], src_node, dst_node.name))
3405
      if not rpc.call_blockdev_remove(src_node, dev):
3406
        logger.Error("could not remove snapshot block device %s from"
3407
                     " node %s" % (dev.logical_id[1], src_node))
3408

    
3409
    if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
3410
      logger.Error("could not finalize export for instance %s on node %s" %
3411
                   (instance.name, dst_node.name))
3412

    
3413
    nodelist = self.cfg.GetNodeList()
3414
    nodelist.remove(dst_node.name)
3415

    
3416
    # on one-node clusters nodelist will be empty after the removal
3417
    # if we proceed the backup would be removed because OpQueryExports
3418
    # substitutes an empty list with the full cluster node list.
3419
    if nodelist:
3420
      op = opcodes.OpQueryExports(nodes=nodelist)
3421
      exportlist = self.processor.ChainOpCode(op, feedback_fn)
3422
      for node in exportlist:
3423
        if instance.name in exportlist[node]:
3424
          if not rpc.call_export_remove(node, instance.name):
3425
            logger.Error("could not remove older export for instance %s"
3426
                         " on node %s" % (instance.name, node))