Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ a8083063

History | View | Annotate | Download (107.2 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the commands used by gnt-* programs."""
23

    
24
# pylint: disable-msg=W0613,W0201
25

    
26
import os
27
import os.path
28
import sha
29
import socket
30
import time
31
import tempfile
32
import re
33
import platform
34

    
35
from ganeti import rpc
36
from ganeti import ssh
37
from ganeti import logger
38
from ganeti import utils
39
from ganeti import errors
40
from ganeti import hypervisor
41
from ganeti import config
42
from ganeti import constants
43
from ganeti import objects
44
from ganeti import opcodes
45
from ganeti import ssconf
46

    
47
class LogicalUnit(object):
48
  """Logical Unit base class..
49

50
  Subclasses must follow these rules:
51
    - implement CheckPrereq which also fills in the opcode instance
52
      with all the fields (even if as None)
53
    - implement Exec
54
    - implement BuildHooksEnv
55
    - redefine HPATH and HTYPE
56
    - optionally redefine their run requirements (REQ_CLUSTER,
57
      REQ_MASTER); note that all commands require root permissions
58

59
  """
60
  HPATH = None
61
  HTYPE = None
62
  _OP_REQP = []
63
  REQ_CLUSTER = True
64
  REQ_MASTER = True
65

    
66
  def __init__(self, processor, op, cfg, sstore):
67
    """Constructor for LogicalUnit.
68

69
    This needs to be overriden in derived classes in order to check op
70
    validity.
71

72
    """
73
    self.processor = processor
74
    self.op = op
75
    self.cfg = cfg
76
    self.sstore = sstore
77
    for attr_name in self._OP_REQP:
78
      attr_val = getattr(op, attr_name, None)
79
      if attr_val is None:
80
        raise errors.OpPrereqError, ("Required parameter '%s' missing" %
81
                                     attr_name)
82
    if self.REQ_CLUSTER:
83
      if not cfg.IsCluster():
84
        raise errors.OpPrereqError, ("Cluster not initialized yet,"
85
                                     " use 'gnt-cluster init' first.")
86
      if self.REQ_MASTER:
87
        master = cfg.GetMaster()
88
        if master != socket.gethostname():
89
          raise errors.OpPrereqError, ("Commands must be run on the master"
90
                                       " node %s" % master)
91

    
92
  def CheckPrereq(self):
93
    """Check prerequisites for this LU.
94

95
    This method should check that the prerequisites for the execution
96
    of this LU are fulfilled. It can do internode communication, but
97
    it should be idempotent - no cluster or system changes are
98
    allowed.
99

100
    The method should raise errors.OpPrereqError in case something is
101
    not fulfilled. Its return value is ignored.
102

103
    This method should also update all the parameters of the opcode to
104
    their canonical form; e.g. a short node name must be fully
105
    expanded after this method has successfully completed (so that
106
    hooks, logging, etc. work correctly).
107

108
    """
109
    raise NotImplementedError
110

    
111
  def Exec(self, feedback_fn):
112
    """Execute the LU.
113

114
    This method should implement the actual work. It should raise
115
    errors.OpExecError for failures that are somewhat dealt with in
116
    code, or expected.
117

118
    """
119
    raise NotImplementedError
120

    
121
  def BuildHooksEnv(self):
122
    """Build hooks environment for this LU.
123

124
    This method should return a three-node tuple consisting of: a dict
125
    containing the environment that will be used for running the
126
    specific hook for this LU, a list of node names on which the hook
127
    should run before the execution, and a list of node names on which
128
    the hook should run after the execution.
129

130
    The keys of the dict must not have 'GANETI_' prefixed as this will
131
    be handled in the hooks runner. Also note additional keys will be
132
    added by the hooks runner. If the LU doesn't define any
133
    environment, an empty dict (and not None) should be returned.
134

135
    As for the node lists, the master should not be included in the
136
    them, as it will be added by the hooks runner in case this LU
137
    requires a cluster to run on (otherwise we don't have a node
138
    list). No nodes should be returned as an empty list (and not
139
    None).
140

141
    Note that if the HPATH for a LU class is None, this function will
142
    not be called.
143

144
    """
145
    raise NotImplementedError
146

    
147

    
148
class NoHooksLU(LogicalUnit):
149
  """Simple LU which runs no hooks.
150

151
  This LU is intended as a parent for other LogicalUnits which will
152
  run no hooks, in order to reduce duplicate code.
153

154
  """
155
  HPATH = None
156
  HTYPE = None
157

    
158
  def BuildHooksEnv(self):
159
    """Build hooks env.
160

161
    This is a no-op, since we don't run hooks.
162

163
    """
164
    return
165

    
166

    
167
def _UpdateEtcHosts(fullnode, ip):
168
  """Ensure a node has a correct entry in /etc/hosts.
169

170
  Args:
171
    fullnode - Fully qualified domain name of host. (str)
172
    ip       - IPv4 address of host (str)
173

174
  """
175
  node = fullnode.split(".", 1)[0]
176

    
177
  f = open('/etc/hosts', 'r+')
178

    
179
  inthere = False
180

    
181
  save_lines = []
182
  add_lines = []
183
  removed = False
184

    
185
  while True:
186
    rawline = f.readline()
187

    
188
    if not rawline:
189
      # End of file
190
      break
191

    
192
    line = rawline.split('\n')[0]
193

    
194
    # Strip off comments
195
    line = line.split('#')[0]
196

    
197
    if not line:
198
      # Entire line was comment, skip
199
      save_lines.append(rawline)
200
      continue
201

    
202
    fields = line.split()
203

    
204
    haveall = True
205
    havesome = False
206
    for spec in [ ip, fullnode, node ]:
207
      if spec not in fields:
208
        haveall = False
209
      if spec in fields:
210
        havesome = True
211

    
212
    if haveall:
213
      inthere = True
214
      save_lines.append(rawline)
215
      continue
216

    
217
    if havesome and not haveall:
218
      # Line (old, or manual?) which is missing some.  Remove.
219
      removed = True
220
      continue
221

    
222
    save_lines.append(rawline)
223

    
224
  if not inthere:
225
    add_lines.append('%s\t%s %s\n' % (ip, fullnode, node))
226

    
227
  if removed:
228
    if add_lines:
229
      save_lines = save_lines + add_lines
230

    
231
    # We removed a line, write a new file and replace old.
232
    fd, tmpname = tempfile.mkstemp('tmp', 'hosts_', '/etc')
233
    newfile = os.fdopen(fd, 'w')
234
    newfile.write(''.join(save_lines))
235
    newfile.close()
236
    os.rename(tmpname, '/etc/hosts')
237

    
238
  elif add_lines:
239
    # Simply appending a new line will do the trick.
240
    f.seek(0, 2)
241
    for add in add_lines:
242
      f.write(add)
243

    
244
  f.close()
245

    
246

    
247
def _UpdateKnownHosts(fullnode, ip, pubkey):
248
  """Ensure a node has a correct known_hosts entry.
249

250
  Args:
251
    fullnode - Fully qualified domain name of host. (str)
252
    ip       - IPv4 address of host (str)
253
    pubkey   - the public key of the cluster
254

255
  """
256
  if os.path.exists('/etc/ssh/ssh_known_hosts'):
257
    f = open('/etc/ssh/ssh_known_hosts', 'r+')
258
  else:
259
    f = open('/etc/ssh/ssh_known_hosts', 'w+')
260

    
261
  inthere = False
262

    
263
  save_lines = []
264
  add_lines = []
265
  removed = False
266

    
267
  while True:
268
    rawline = f.readline()
269
    logger.Debug('read %s' % (repr(rawline),))
270

    
271
    if not rawline:
272
      # End of file
273
      break
274

    
275
    line = rawline.split('\n')[0]
276

    
277
    parts = line.split(' ')
278
    fields = parts[0].split(',')
279
    key = parts[2]
280

    
281
    haveall = True
282
    havesome = False
283
    for spec in [ ip, fullnode ]:
284
      if spec not in fields:
285
        haveall = False
286
      if spec in fields:
287
        havesome = True
288

    
289
    logger.Debug("key, pubkey = %s." % (repr((key, pubkey)),))
290
    if haveall and key == pubkey:
291
      inthere = True
292
      save_lines.append(rawline)
293
      logger.Debug("Keeping known_hosts '%s'." % (repr(rawline),))
294
      continue
295

    
296
    if havesome and (not haveall or key != pubkey):
297
      removed = True
298
      logger.Debug("Discarding known_hosts '%s'." % (repr(rawline),))
299
      continue
300

    
301
    save_lines.append(rawline)
302

    
303
  if not inthere:
304
    add_lines.append('%s,%s ssh-rsa %s\n' % (fullnode, ip, pubkey))
305
    logger.Debug("Adding known_hosts '%s'." % (repr(add_lines[-1]),))
306

    
307
  if removed:
308
    save_lines = save_lines + add_lines
309

    
310
    # Write a new file and replace old.
311
    fd, tmpname = tempfile.mkstemp('tmp', 'ssh_known_hosts_', '/etc/ssh')
312
    newfile = os.fdopen(fd, 'w')
313
    newfile.write(''.join(save_lines))
314
    newfile.close()
315
    logger.Debug("Wrote new known_hosts.")
316
    os.rename(tmpname, '/etc/ssh/ssh_known_hosts')
317

    
318
  elif add_lines:
319
    # Simply appending a new line will do the trick.
320
    f.seek(0, 2)
321
    for add in add_lines:
322
      f.write(add)
323

    
324
  f.close()
325

    
326

    
327
def _HasValidVG(vglist, vgname):
328
  """Checks if the volume group list is valid.
329

330
  A non-None return value means there's an error, and the return value
331
  is the error message.
332

333
  """
334
  vgsize = vglist.get(vgname, None)
335
  if vgsize is None:
336
    return "volume group '%s' missing" % vgname
337
  elif vgsize < 20480:
338
    return ("volume group '%s' too small (20480MiB required, %dMib found" %
339
            vgname, vgsize)
340
  return None
341

    
342

    
343
def _InitSSHSetup(node):
344
  """Setup the SSH configuration for the cluster.
345

346

347
  This generates a dsa keypair for root, adds the pub key to the
348
  permitted hosts and adds the hostkey to its own known hosts.
349

350
  Args:
351
    node: the name of this host as a fqdn
352

353
  """
354
  utils.RemoveFile('/root/.ssh/known_hosts')
355

    
356
  if os.path.exists('/root/.ssh/id_dsa'):
357
    utils.CreateBackup('/root/.ssh/id_dsa')
358
  if os.path.exists('/root/.ssh/id_dsa.pub'):
359
    utils.CreateBackup('/root/.ssh/id_dsa.pub')
360

    
361
  utils.RemoveFile('/root/.ssh/id_dsa')
362
  utils.RemoveFile('/root/.ssh/id_dsa.pub')
363

    
364
  result = utils.RunCmd(["ssh-keygen", "-t", "dsa",
365
                         "-f", "/root/.ssh/id_dsa",
366
                         "-q", "-N", ""])
367
  if result.failed:
368
    raise errors.OpExecError, ("could not generate ssh keypair, error %s" %
369
                               result.output)
370

    
371
  f = open('/root/.ssh/id_dsa.pub', 'r')
372
  try:
373
    utils.AddAuthorizedKey('/root/.ssh/authorized_keys', f.read(8192))
374
  finally:
375
    f.close()
376

    
377

    
378
def _InitGanetiServerSetup(ss):
379
  """Setup the necessary configuration for the initial node daemon.
380

381
  This creates the nodepass file containing the shared password for
382
  the cluster and also generates the SSL certificate.
383

384
  """
385
  # Create pseudo random password
386
  randpass = sha.new(os.urandom(64)).hexdigest()
387
  # and write it into sstore
388
  ss.SetKey(ss.SS_NODED_PASS, randpass)
389

    
390
  result = utils.RunCmd(["openssl", "req", "-new", "-newkey", "rsa:1024",
391
                         "-days", str(365*5), "-nodes", "-x509",
392
                         "-keyout", constants.SSL_CERT_FILE,
393
                         "-out", constants.SSL_CERT_FILE, "-batch"])
394
  if result.failed:
395
    raise errors.OpExecError, ("could not generate server ssl cert, command"
396
                               " %s had exitcode %s and error message %s" %
397
                               (result.cmd, result.exit_code, result.output))
398

    
399
  os.chmod(constants.SSL_CERT_FILE, 0400)
400

    
401
  result = utils.RunCmd([constants.NODE_INITD_SCRIPT, "restart"])
402

    
403
  if result.failed:
404
    raise errors.OpExecError, ("could not start the node daemon, command %s"
405
                               " had exitcode %s and error %s" %
406
                               (result.cmd, result.exit_code, result.output))
407

    
408

    
409
def _InitClusterInterface(fullname, name, ip):
410
  """Initialize the master startup script.
411

412
  """
413
  f = file(constants.CLUSTER_NAME_FILE, 'w')
414
  f.write("%s\n" % fullname)
415
  f.close()
416

    
417
  f = file(constants.MASTER_INITD_SCRIPT, 'w')
418
  f.write ("#!/bin/sh\n")
419
  f.write ("\n")
420
  f.write ("# Start Ganeti Master Virtual Address\n")
421
  f.write ("\n")
422
  f.write ("DESC=\"Ganeti Master IP\"\n")
423
  f.write ("MASTERNAME=\"%s\"\n" % name)
424
  f.write ("MASTERIP=\"%s\"\n" % ip)
425
  f.write ("case \"$1\" in\n")
426
  f.write ("  start)\n")
427
  f.write ("    if fping -q -c 3 ${MASTERIP} &>/dev/null; then\n")
428
  f.write ("        echo \"$MASTERNAME no-go - there is already a master.\"\n")
429
  f.write ("        rm -f %s\n" % constants.MASTER_CRON_LINK)
430
  f.write ("        scp ${MASTERNAME}:%s %s\n" %
431
           (constants.CLUSTER_CONF_FILE, constants.CLUSTER_CONF_FILE))
432
  f.write ("    else\n")
433
  f.write ("        echo -n \"Starting $DESC: \"\n")
434
  f.write ("        ip address add ${MASTERIP}/32 dev xen-br0"
435
           " label xen-br0:0\n")
436
  f.write ("        arping -q -U -c 3 -I xen-br0 -s ${MASTERIP} ${MASTERIP}\n")
437
  f.write ("        echo \"$MASTERNAME.\"\n")
438
  f.write ("    fi\n")
439
  f.write ("    ;;\n")
440
  f.write ("  stop)\n")
441
  f.write ("    echo -n \"Stopping $DESC: \"\n")
442
  f.write ("    ip address del ${MASTERIP}/32 dev xen-br0\n")
443
  f.write ("    echo \"$MASTERNAME.\"\n")
444
  f.write ("    ;;\n")
445
  f.write ("  *)\n")
446
  f.write ("    echo \"Usage: $0 {start|stop}\" >&2\n")
447
  f.write ("    exit 1\n")
448
  f.write ("    ;;\n")
449
  f.write ("esac\n")
450
  f.write ("\n")
451
  f.write ("exit 0\n")
452
  f.flush()
453
  os.fsync(f.fileno())
454
  f.close()
455
  os.chmod(constants.MASTER_INITD_SCRIPT, 0755)
456

    
457

    
458
class LUInitCluster(LogicalUnit):
459
  """Initialise the cluster.
460

461
  """
462
  HPATH = "cluster-init"
463
  HTYPE = constants.HTYPE_CLUSTER
464
  _OP_REQP = ["cluster_name", "hypervisor_type", "vg_name", "mac_prefix",
465
              "def_bridge"]
466
  REQ_CLUSTER = False
467

    
468
  def BuildHooksEnv(self):
469
    """Build hooks env.
470

471
    Notes: Since we don't require a cluster, we must manually add
472
    ourselves in the post-run node list.
473

474
    """
475

    
476
    env = {"CLUSTER": self.op.cluster_name,
477
           "MASTER": self.hostname}
478
    return env, [], [self.hostname['hostname_full']]
479

    
480
  def CheckPrereq(self):
481
    """Verify that the passed name is a valid one.
482

483
    """
484
    if config.ConfigWriter.IsCluster():
485
      raise errors.OpPrereqError, ("Cluster is already initialised")
486

    
487
    hostname_local = socket.gethostname()
488
    self.hostname = hostname = utils.LookupHostname(hostname_local)
489
    if not hostname:
490
      raise errors.OpPrereqError, ("Cannot resolve my own hostname ('%s')" %
491
                                   hostname_local)
492

    
493
    self.clustername = clustername = utils.LookupHostname(self.op.cluster_name)
494
    if not clustername:
495
      raise errors.OpPrereqError, ("Cannot resolve given cluster name ('%s')"
496
                                   % self.op.cluster_name)
497

    
498
    result = utils.RunCmd(["fping", "-S127.0.0.1", "-q", hostname['ip']])
499
    if result.failed:
500
      raise errors.OpPrereqError, ("Inconsistency: this host's name resolves"
501
                                   " to %s,\nbut this ip address does not"
502
                                   " belong to this host."
503
                                   " Aborting." % hostname['ip'])
504

    
505
    secondary_ip = getattr(self.op, "secondary_ip", None)
506
    if secondary_ip and not utils.IsValidIP(secondary_ip):
507
      raise errors.OpPrereqError, ("Invalid secondary ip given")
508
    if secondary_ip and secondary_ip != hostname['ip']:
509
      result = utils.RunCmd(["fping", "-S127.0.0.1", "-q", secondary_ip])
510
      if result.failed:
511
        raise errors.OpPrereqError, ("You gave %s as secondary IP,\n"
512
                                     "but it does not belong to this host." %
513
                                     secondary_ip)
514
    self.secondary_ip = secondary_ip
515

    
516
    # checks presence of the volume group given
517
    vgstatus = _HasValidVG(utils.ListVolumeGroups(), self.op.vg_name)
518

    
519
    if vgstatus:
520
      raise errors.OpPrereqError, ("Error: %s" % vgstatus)
521

    
522
    if not re.match("^[0-9a-z]{2}:[0-9a-z]{2}:[0-9a-z]{2}$",
523
                    self.op.mac_prefix):
524
      raise errors.OpPrereqError, ("Invalid mac prefix given '%s'" %
525
                                   self.op.mac_prefix)
526

    
527
    if self.op.hypervisor_type not in hypervisor.VALID_HTYPES:
528
      raise errors.OpPrereqError, ("Invalid hypervisor type given '%s'" %
529
                                   self.op.hypervisor_type)
530

    
531
  def Exec(self, feedback_fn):
532
    """Initialize the cluster.
533

534
    """
535
    clustername = self.clustername
536
    hostname = self.hostname
537

    
538
    # adds the cluste name file and master startup script
539
    _InitClusterInterface(clustername['hostname_full'],
540
                          clustername['hostname'],
541
                          clustername['ip'])
542

    
543
    # set up the simple store
544
    ss = ssconf.SimpleStore()
545
    ss.SetKey(ss.SS_HYPERVISOR, self.op.hypervisor_type)
546

    
547
    # set up the inter-node password and certificate
548
    _InitGanetiServerSetup(ss)
549

    
550
    # start the master ip
551
    rpc.call_node_start_master(hostname['hostname_full'])
552

    
553
    # set up ssh config and /etc/hosts
554
    f = open('/etc/ssh/ssh_host_rsa_key.pub', 'r')
555
    try:
556
      sshline = f.read()
557
    finally:
558
      f.close()
559
    sshkey = sshline.split(" ")[1]
560

    
561
    _UpdateEtcHosts(hostname['hostname_full'],
562
                    hostname['ip'],
563
                    )
564

    
565
    _UpdateKnownHosts(hostname['hostname_full'],
566
                      hostname['ip'],
567
                      sshkey,
568
                      )
569

    
570
    _InitSSHSetup(hostname['hostname'])
571

    
572
    # init of cluster config file
573
    cfgw = config.ConfigWriter()
574
    cfgw.InitConfig(hostname['hostname'], hostname['ip'], self.secondary_ip,
575
                    clustername['hostname'], sshkey, self.op.mac_prefix,
576
                    self.op.vg_name, self.op.def_bridge)
577

    
578

    
579
class LUDestroyCluster(NoHooksLU):
580
  """Logical unit for destroying the cluster.
581

582
  """
583
  _OP_REQP = []
584

    
585
  def CheckPrereq(self):
586
    """Check prerequisites.
587

588
    This checks whether the cluster is empty.
589

590
    Any errors are signalled by raising errors.OpPrereqError.
591

592
    """
593
    master = self.cfg.GetMaster()
594

    
595
    nodelist = self.cfg.GetNodeList()
596
    if len(nodelist) > 0 and nodelist != [master]:
597
        raise errors.OpPrereqError, ("There are still %d node(s) in "
598
                                     "this cluster." % (len(nodelist) - 1))
599

    
600
  def Exec(self, feedback_fn):
601
    """Destroys the cluster.
602

603
    """
604
    utils.CreateBackup('/root/.ssh/id_dsa')
605
    utils.CreateBackup('/root/.ssh/id_dsa.pub')
606
    rpc.call_node_leave_cluster(self.cfg.GetMaster())
607

    
608

    
609
class LUVerifyCluster(NoHooksLU):
610
  """Verifies the cluster status.
611

612
  """
613
  _OP_REQP = []
614

    
615
  def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
616
                  remote_version, feedback_fn):
617
    """Run multiple tests against a node.
618

619
    Test list:
620
      - compares ganeti version
621
      - checks vg existance and size > 20G
622
      - checks config file checksum
623
      - checks ssh to other nodes
624

625
    Args:
626
      node: name of the node to check
627
      file_list: required list of files
628
      local_cksum: dictionary of local files and their checksums
629
    """
630
    # compares ganeti version
631
    local_version = constants.PROTOCOL_VERSION
632
    if not remote_version:
633
      feedback_fn(" - ERROR: connection to %s failed" % (node))
634
      return True
635

    
636
    if local_version != remote_version:
637
      feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
638
                      (local_version, node, remote_version))
639
      return True
640

    
641
    # checks vg existance and size > 20G
642

    
643
    bad = False
644
    if not vglist:
645
      feedback_fn("  - ERROR: unable to check volume groups on node %s." %
646
                      (node,))
647
      bad = True
648
    else:
649
      vgstatus = _HasValidVG(vglist, self.cfg.GetVGName())
650
      if vgstatus:
651
        feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
652
        bad = True
653

    
654
    # checks config file checksum
655
    # checks ssh to any
656

    
657
    if 'filelist' not in node_result:
658
      bad = True
659
      feedback_fn("  - ERROR: node hasn't returned file checksum data")
660
    else:
661
      remote_cksum = node_result['filelist']
662
      for file_name in file_list:
663
        if file_name not in remote_cksum:
664
          bad = True
665
          feedback_fn("  - ERROR: file '%s' missing" % file_name)
666
        elif remote_cksum[file_name] != local_cksum[file_name]:
667
          bad = True
668
          feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
669

    
670
    if 'nodelist' not in node_result:
671
      bad = True
672
      feedback_fn("  - ERROR: node hasn't returned node connectivity data")
673
    else:
674
      if node_result['nodelist']:
675
        bad = True
676
        for node in node_result['nodelist']:
677
          feedback_fn("  - ERROR: communication with node '%s': %s" %
678
                          (node, node_result['nodelist'][node]))
679
    hyp_result = node_result.get('hypervisor', None)
680
    if hyp_result is not None:
681
      feedback_fn("  - ERROR: hypervisor verify failure: '%s'" % hyp_result)
682
    return bad
683

    
684
  def _VerifyInstance(self, instance, node_vol_is, node_instance, feedback_fn):
685
    """Verify an instance.
686

687
    This function checks to see if the required block devices are
688
    available on the instance's node.
689

690
    """
691
    bad = False
692

    
693
    instancelist = self.cfg.GetInstanceList()
694
    if not instance in instancelist:
695
      feedback_fn("  - ERROR: instance %s not in instance list %s" %
696
                      (instance, instancelist))
697
      bad = True
698

    
699
    instanceconfig = self.cfg.GetInstanceInfo(instance)
700
    node_current = instanceconfig.primary_node
701

    
702
    node_vol_should = {}
703
    instanceconfig.MapLVsByNode(node_vol_should)
704

    
705
    for node in node_vol_should:
706
      for volume in node_vol_should[node]:
707
        if node not in node_vol_is or volume not in node_vol_is[node]:
708
          feedback_fn("  - ERROR: volume %s missing on node %s" %
709
                          (volume, node))
710
          bad = True
711

    
712
    if not instanceconfig.status == 'down':
713
      if not instance in node_instance[node_current]:
714
        feedback_fn("  - ERROR: instance %s not running on node %s" %
715
                        (instance, node_current))
716
        bad = True
717

    
718
    for node in node_instance:
719
      if (not node == node_current):
720
        if instance in node_instance[node]:
721
          feedback_fn("  - ERROR: instance %s should not run on node %s" %
722
                          (instance, node))
723
          bad = True
724

    
725
    return not bad
726

    
727
  def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
728
    """Verify if there are any unknown volumes in the cluster.
729

730
    The .os, .swap and backup volumes are ignored. All other volumes are
731
    reported as unknown.
732

733
    """
734
    bad = False
735

    
736
    for node in node_vol_is:
737
      for volume in node_vol_is[node]:
738
        if node not in node_vol_should or volume not in node_vol_should[node]:
739
          feedback_fn("  - ERROR: volume %s on node %s should not exist" %
740
                      (volume, node))
741
          bad = True
742
    return bad
743

    
744

    
745
  def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
746
    """Verify the list of running instances.
747

748
    This checks what instances are running but unknown to the cluster.
749

750
    """
751
    bad = False
752
    for node in node_instance:
753
      for runninginstance in node_instance[node]:
754
        if runninginstance not in instancelist:
755
          feedback_fn("  - ERROR: instance %s on node %s should not exist" %
756
                          (runninginstance, node))
757
          bad = True
758
    return bad
759

    
760
  def _VerifyNodeConfigFiles(self, ismaster, node, file_list, feedback_fn):
761
    """Verify the list of node config files"""
762

    
763
    bad = False
764
    for file_name in constants.MASTER_CONFIGFILES:
765
      if ismaster and file_name not in file_list:
766
        feedback_fn("  - ERROR: master config file %s missing from master"
767
                    " node %s" % (file_name, node))
768
        bad = True
769
      elif not ismaster and file_name in file_list:
770
        feedback_fn("  - ERROR: master config file %s should not exist"
771
                    " on non-master node %s" % (file_name, node))
772
        bad = True
773

    
774
    for file_name in constants.NODE_CONFIGFILES:
775
      if file_name not in file_list:
776
        feedback_fn("  - ERROR: config file %s missing from node %s" %
777
                    (file_name, node))
778
        bad = True
779

    
780
    return bad
781

    
782
  def CheckPrereq(self):
783
    """Check prerequisites.
784

785
    This has no prerequisites.
786

787
    """
788
    pass
789

    
790
  def Exec(self, feedback_fn):
791
    """Verify integrity of cluster, performing various test on nodes.
792

793
    """
794
    bad = False
795
    feedback_fn("* Verifying global settings")
796
    self.cfg.VerifyConfig()
797

    
798
    master = self.cfg.GetMaster()
799
    vg_name = self.cfg.GetVGName()
800
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
801
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
802
    node_volume = {}
803
    node_instance = {}
804

    
805
    # FIXME: verify OS list
806
    # do local checksums
807
    file_names = constants.CLUSTER_CONF_FILES
808
    local_checksums = utils.FingerprintFiles(file_names)
809

    
810
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
811
    all_configfile = rpc.call_configfile_list(nodelist)
812
    all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
813
    all_instanceinfo = rpc.call_instance_list(nodelist)
814
    all_vglist = rpc.call_vg_list(nodelist)
815
    node_verify_param = {
816
      'filelist': file_names,
817
      'nodelist': nodelist,
818
      'hypervisor': None,
819
      }
820
    all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
821
    all_rversion = rpc.call_version(nodelist)
822

    
823
    for node in nodelist:
824
      feedback_fn("* Verifying node %s" % node)
825
      result = self._VerifyNode(node, file_names, local_checksums,
826
                                all_vglist[node], all_nvinfo[node],
827
                                all_rversion[node], feedback_fn)
828
      bad = bad or result
829
      # node_configfile
830
      nodeconfigfile = all_configfile[node]
831

    
832
      if not nodeconfigfile:
833
        feedback_fn("  - ERROR: connection to %s failed" % (node))
834
        bad = True
835
        continue
836

    
837
      bad = bad or self._VerifyNodeConfigFiles(node==master, node,
838
                                               nodeconfigfile, feedback_fn)
839

    
840
      # node_volume
841
      volumeinfo = all_volumeinfo[node]
842

    
843
      if type(volumeinfo) != dict:
844
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
845
        bad = True
846
        continue
847

    
848
      node_volume[node] = volumeinfo
849

    
850
      # node_instance
851
      nodeinstance = all_instanceinfo[node]
852
      if type(nodeinstance) != list:
853
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
854
        bad = True
855
        continue
856

    
857
      node_instance[node] = nodeinstance
858

    
859
    node_vol_should = {}
860

    
861
    for instance in instancelist:
862
      feedback_fn("* Verifying instance %s" % instance)
863
      result =  self._VerifyInstance(instance, node_volume, node_instance,
864
                                     feedback_fn)
865
      bad = bad or result
866

    
867
      inst_config = self.cfg.GetInstanceInfo(instance)
868

    
869
      inst_config.MapLVsByNode(node_vol_should)
870

    
871
    feedback_fn("* Verifying orphan volumes")
872
    result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
873
                                       feedback_fn)
874
    bad = bad or result
875

    
876
    feedback_fn("* Verifying remaining instances")
877
    result = self._VerifyOrphanInstances(instancelist, node_instance,
878
                                         feedback_fn)
879
    bad = bad or result
880

    
881
    return int(bad)
882

    
883

    
884
def _WaitForSync(cfgw, instance, oneshot=False, unlock=False):
885
  """Sleep and poll for an instance's disk to sync.
886

887
  """
888
  if not instance.disks:
889
    return True
890

    
891
  if not oneshot:
892
    logger.ToStdout("Waiting for instance %s to sync disks." % instance.name)
893

    
894
  node = instance.primary_node
895

    
896
  for dev in instance.disks:
897
    cfgw.SetDiskID(dev, node)
898

    
899
  retries = 0
900
  while True:
901
    max_time = 0
902
    done = True
903
    cumul_degraded = False
904
    rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
905
    if not rstats:
906
      logger.ToStderr("Can't get any data from node %s" % node)
907
      retries += 1
908
      if retries >= 10:
909
        raise errors.RemoteError, ("Can't contact node %s for mirror data,"
910
                                   " aborting." % node)
911
      time.sleep(6)
912
      continue
913
    retries = 0
914
    for i in range(len(rstats)):
915
      mstat = rstats[i]
916
      if mstat is None:
917
        logger.ToStderr("Can't compute data for node %s/%s" %
918
                        (node, instance.disks[i].iv_name))
919
        continue
920
      perc_done, est_time, is_degraded = mstat
921
      cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
922
      if perc_done is not None:
923
        done = False
924
        if est_time is not None:
925
          rem_time = "%d estimated seconds remaining" % est_time
926
          max_time = est_time
927
        else:
928
          rem_time = "no time estimate"
929
        logger.ToStdout("- device %s: %5.2f%% done, %s" %
930
                        (instance.disks[i].iv_name, perc_done, rem_time))
931
    if done or oneshot:
932
      break
933

    
934
    if unlock:
935
      utils.Unlock('cmd')
936
    try:
937
      time.sleep(min(60, max_time))
938
    finally:
939
      if unlock:
940
        utils.Lock('cmd')
941

    
942
  if done:
943
    logger.ToStdout("Instance %s's disks are in sync." % instance.name)
944
  return not cumul_degraded
945

    
946

    
947
def _CheckDiskConsistency(cfgw, dev, node, on_primary):
948
  """Check that mirrors are not degraded.
949

950
  """
951

    
952
  cfgw.SetDiskID(dev, node)
953

    
954
  result = True
955
  if on_primary or dev.AssembleOnSecondary():
956
    rstats = rpc.call_blockdev_find(node, dev)
957
    if not rstats:
958
      logger.ToStderr("Can't get any data from node %s" % node)
959
      result = False
960
    else:
961
      result = result and (not rstats[5])
962
  if dev.children:
963
    for child in dev.children:
964
      result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
965

    
966
  return result
967

    
968

    
969
class LUDiagnoseOS(NoHooksLU):
970
  """Logical unit for OS diagnose/query.
971

972
  """
973
  _OP_REQP = []
974

    
975
  def CheckPrereq(self):
976
    """Check prerequisites.
977

978
    This always succeeds, since this is a pure query LU.
979

980
    """
981
    return
982

    
983
  def Exec(self, feedback_fn):
984
    """Compute the list of OSes.
985

986
    """
987
    node_list = self.cfg.GetNodeList()
988
    node_data = rpc.call_os_diagnose(node_list)
989
    if node_data == False:
990
      raise errors.OpExecError, "Can't gather the list of OSes"
991
    return node_data
992

    
993

    
994
class LURemoveNode(LogicalUnit):
995
  """Logical unit for removing a node.
996

997
  """
998
  HPATH = "node-remove"
999
  HTYPE = constants.HTYPE_NODE
1000
  _OP_REQP = ["node_name"]
1001

    
1002
  def BuildHooksEnv(self):
1003
    """Build hooks env.
1004

1005
    This doesn't run on the target node in the pre phase as a failed
1006
    node would not allows itself to run.
1007

1008
    """
1009
    all_nodes = self.cfg.GetNodeList()
1010
    all_nodes.remove(self.op.node_name)
1011
    return {"NODE_NAME": self.op.node_name}, all_nodes, all_nodes
1012

    
1013
  def CheckPrereq(self):
1014
    """Check prerequisites.
1015

1016
    This checks:
1017
     - the node exists in the configuration
1018
     - it does not have primary or secondary instances
1019
     - it's not the master
1020

1021
    Any errors are signalled by raising errors.OpPrereqError.
1022

1023
    """
1024

    
1025
    node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1026
    if node is None:
1027
      logger.Error("Error: Node '%s' is unknown." % self.op.node_name)
1028
      return 1
1029

    
1030
    instance_list = self.cfg.GetInstanceList()
1031

    
1032
    masternode = self.cfg.GetMaster()
1033
    if node.name == masternode:
1034
      raise errors.OpPrereqError, ("Node is the master node,"
1035
                                   " you need to failover first.")
1036

    
1037
    for instance_name in instance_list:
1038
      instance = self.cfg.GetInstanceInfo(instance_name)
1039
      if node.name == instance.primary_node:
1040
        raise errors.OpPrereqError, ("Instance %s still running on the node,"
1041
                                     " please remove first." % instance_name)
1042
      if node.name in instance.secondary_nodes:
1043
        raise errors.OpPrereqError, ("Instance %s has node as a secondary,"
1044
                                     " please remove first." % instance_name)
1045
    self.op.node_name = node.name
1046
    self.node = node
1047

    
1048
  def Exec(self, feedback_fn):
1049
    """Removes the node from the cluster.
1050

1051
    """
1052
    node = self.node
1053
    logger.Info("stopping the node daemon and removing configs from node %s" %
1054
                node.name)
1055

    
1056
    rpc.call_node_leave_cluster(node.name)
1057

    
1058
    ssh.SSHCall(node.name, 'root', "%s stop" % constants.NODE_INITD_SCRIPT)
1059

    
1060
    logger.Info("Removing node %s from config" % node.name)
1061

    
1062
    self.cfg.RemoveNode(node.name)
1063

    
1064

    
1065
class LUQueryNodes(NoHooksLU):
1066
  """Logical unit for querying nodes.
1067

1068
  """
1069
  _OP_REQP = ["output_fields"]
1070

    
1071
  def CheckPrereq(self):
1072
    """Check prerequisites.
1073

1074
    This checks that the fields required are valid output fields.
1075

1076
    """
1077
    self.static_fields = frozenset(["name", "pinst", "sinst", "pip", "sip"])
1078
    self.dynamic_fields = frozenset(["dtotal", "dfree",
1079
                                     "mtotal", "mnode", "mfree"])
1080
    self.all_fields = self.static_fields | self.dynamic_fields
1081

    
1082
    if not self.all_fields.issuperset(self.op.output_fields):
1083
      raise errors.OpPrereqError, ("Unknown output fields selected: %s"
1084
                                   % ",".join(frozenset(self.op.output_fields).
1085
                                              difference(self.all_fields)))
1086

    
1087

    
1088
  def Exec(self, feedback_fn):
1089
    """Computes the list of nodes and their attributes.
1090

1091
    """
1092
    nodenames = utils.NiceSort(self.cfg.GetNodeList())
1093
    nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1094

    
1095

    
1096
    # begin data gathering
1097

    
1098
    if self.dynamic_fields.intersection(self.op.output_fields):
1099
      live_data = {}
1100
      node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1101
      for name in nodenames:
1102
        nodeinfo = node_data.get(name, None)
1103
        if nodeinfo:
1104
          live_data[name] = {
1105
            "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1106
            "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1107
            "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1108
            "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1109
            "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1110
            }
1111
        else:
1112
          live_data[name] = {}
1113
    else:
1114
      live_data = dict.fromkeys(nodenames, {})
1115

    
1116
    node_to_primary = dict.fromkeys(nodenames, 0)
1117
    node_to_secondary = dict.fromkeys(nodenames, 0)
1118

    
1119
    if "pinst" in self.op.output_fields or "sinst" in self.op.output_fields:
1120
      instancelist = self.cfg.GetInstanceList()
1121

    
1122
      for instance in instancelist:
1123
        instanceinfo = self.cfg.GetInstanceInfo(instance)
1124
        node_to_primary[instanceinfo.primary_node] += 1
1125
        for secnode in instanceinfo.secondary_nodes:
1126
          node_to_secondary[secnode] += 1
1127

    
1128
    # end data gathering
1129

    
1130
    output = []
1131
    for node in nodelist:
1132
      node_output = []
1133
      for field in self.op.output_fields:
1134
        if field == "name":
1135
          val = node.name
1136
        elif field == "pinst":
1137
          val = node_to_primary[node.name]
1138
        elif field == "sinst":
1139
          val = node_to_secondary[node.name]
1140
        elif field == "pip":
1141
          val = node.primary_ip
1142
        elif field == "sip":
1143
          val = node.secondary_ip
1144
        elif field in self.dynamic_fields:
1145
          val = live_data[node.name].get(field, "?")
1146
        else:
1147
          raise errors.ParameterError, field
1148
        val = str(val)
1149
        node_output.append(val)
1150
      output.append(node_output)
1151

    
1152
    return output
1153

    
1154

    
1155
def _CheckNodesDirs(node_list, paths):
1156
  """Verify if the given nodes have the same files.
1157

1158
  Args:
1159
    node_list: the list of node names to check
1160
    paths: the list of directories to checksum and compare
1161

1162
  Returns:
1163
    list of (node, different_file, message); if empty, the files are in sync
1164

1165
  """
1166
  file_names = []
1167
  for dir_name in paths:
1168
    flist = [os.path.join(dir_name, name) for name in os.listdir(dir_name)]
1169
    flist = [name for name in flist if os.path.isfile(name)]
1170
    file_names.extend(flist)
1171

    
1172
  local_checksums = utils.FingerprintFiles(file_names)
1173

    
1174
  results = []
1175
  verify_params = {'filelist': file_names}
1176
  all_node_results = rpc.call_node_verify(node_list, verify_params)
1177
  for node_name in node_list:
1178
    node_result = all_node_results.get(node_name, False)
1179
    if not node_result or 'filelist' not in node_result:
1180
      results.append((node_name, "'all files'", "node communication error"))
1181
      continue
1182
    remote_checksums = node_result['filelist']
1183
    for fname in local_checksums:
1184
      if fname not in remote_checksums:
1185
        results.append((node_name, fname, "missing file"))
1186
      elif remote_checksums[fname] != local_checksums[fname]:
1187
        results.append((node_name, fname, "wrong checksum"))
1188
  return results
1189

    
1190

    
1191
class LUAddNode(LogicalUnit):
1192
  """Logical unit for adding node to the cluster.
1193

1194
  """
1195
  HPATH = "node-add"
1196
  HTYPE = constants.HTYPE_NODE
1197
  _OP_REQP = ["node_name"]
1198

    
1199
  def BuildHooksEnv(self):
1200
    """Build hooks env.
1201

1202
    This will run on all nodes before, and on all nodes + the new node after.
1203

1204
    """
1205
    env = {
1206
      "NODE_NAME": self.op.node_name,
1207
      "NODE_PIP": self.op.primary_ip,
1208
      "NODE_SIP": self.op.secondary_ip,
1209
      }
1210
    nodes_0 = self.cfg.GetNodeList()
1211
    nodes_1 = nodes_0 + [self.op.node_name, ]
1212
    return env, nodes_0, nodes_1
1213

    
1214
  def CheckPrereq(self):
1215
    """Check prerequisites.
1216

1217
    This checks:
1218
     - the new node is not already in the config
1219
     - it is resolvable
1220
     - its parameters (single/dual homed) matches the cluster
1221

1222
    Any errors are signalled by raising errors.OpPrereqError.
1223

1224
    """
1225
    node_name = self.op.node_name
1226
    cfg = self.cfg
1227

    
1228
    dns_data = utils.LookupHostname(node_name)
1229
    if not dns_data:
1230
      raise errors.OpPrereqError, ("Node %s is not resolvable" % node_name)
1231

    
1232
    node = dns_data['hostname']
1233
    primary_ip = self.op.primary_ip = dns_data['ip']
1234
    secondary_ip = getattr(self.op, "secondary_ip", None)
1235
    if secondary_ip is None:
1236
      secondary_ip = primary_ip
1237
    if not utils.IsValidIP(secondary_ip):
1238
      raise errors.OpPrereqError, ("Invalid secondary IP given")
1239
    self.op.secondary_ip = secondary_ip
1240
    node_list = cfg.GetNodeList()
1241
    if node in node_list:
1242
      raise errors.OpPrereqError, ("Node %s is already in the configuration"
1243
                                   % node)
1244

    
1245
    for existing_node_name in node_list:
1246
      existing_node = cfg.GetNodeInfo(existing_node_name)
1247
      if (existing_node.primary_ip == primary_ip or
1248
          existing_node.secondary_ip == primary_ip or
1249
          existing_node.primary_ip == secondary_ip or
1250
          existing_node.secondary_ip == secondary_ip):
1251
        raise errors.OpPrereqError, ("New node ip address(es) conflict with"
1252
                                     " existing node %s" % existing_node.name)
1253

    
1254
    # check that the type of the node (single versus dual homed) is the
1255
    # same as for the master
1256
    myself = cfg.GetNodeInfo(cfg.GetMaster())
1257
    master_singlehomed = myself.secondary_ip == myself.primary_ip
1258
    newbie_singlehomed = secondary_ip == primary_ip
1259
    if master_singlehomed != newbie_singlehomed:
1260
      if master_singlehomed:
1261
        raise errors.OpPrereqError, ("The master has no private ip but the"
1262
                                     " new node has one")
1263
      else:
1264
        raise errors.OpPrereqError ("The master has a private ip but the"
1265
                                    " new node doesn't have one")
1266

    
1267
    # checks reachablity
1268
    command = ["fping", "-q", primary_ip]
1269
    result = utils.RunCmd(command)
1270
    if result.failed:
1271
      raise errors.OpPrereqError, ("Node not reachable by ping")
1272

    
1273
    if not newbie_singlehomed:
1274
      # check reachability from my secondary ip to newbie's secondary ip
1275
      command = ["fping", "-S%s" % myself.secondary_ip, "-q", secondary_ip]
1276
      result = utils.RunCmd(command)
1277
      if result.failed:
1278
        raise errors.OpPrereqError, ("Node secondary ip not reachable by ping")
1279

    
1280
    self.new_node = objects.Node(name=node,
1281
                                 primary_ip=primary_ip,
1282
                                 secondary_ip=secondary_ip)
1283

    
1284
  def Exec(self, feedback_fn):
1285
    """Adds the new node to the cluster.
1286

1287
    """
1288
    new_node = self.new_node
1289
    node = new_node.name
1290

    
1291
    # set up inter-node password and certificate and restarts the node daemon
1292
    gntpass = self.sstore.GetNodeDaemonPassword()
1293
    if not re.match('^[a-zA-Z0-9.]{1,64}$', gntpass):
1294
      raise errors.OpExecError, ("ganeti password corruption detected")
1295
    f = open(constants.SSL_CERT_FILE)
1296
    try:
1297
      gntpem = f.read(8192)
1298
    finally:
1299
      f.close()
1300
    # in the base64 pem encoding, neither '!' nor '.' are valid chars,
1301
    # so we use this to detect an invalid certificate; as long as the
1302
    # cert doesn't contain this, the here-document will be correctly
1303
    # parsed by the shell sequence below
1304
    if re.search('^!EOF\.', gntpem, re.MULTILINE):
1305
      raise errors.OpExecError, ("invalid PEM encoding in the SSL certificate")
1306
    if not gntpem.endswith("\n"):
1307
      raise errors.OpExecError, ("PEM must end with newline")
1308
    logger.Info("copy cluster pass to %s and starting the node daemon" % node)
1309

    
1310
    # remove first the root's known_hosts file
1311
    utils.RemoveFile("/root/.ssh/known_hosts")
1312
    # and then connect with ssh to set password and start ganeti-noded
1313
    # note that all the below variables are sanitized at this point,
1314
    # either by being constants or by the checks above
1315
    ss = self.sstore
1316
    mycommand = ("umask 077 && "
1317
                 "echo '%s' > '%s' && "
1318
                 "cat > '%s' << '!EOF.' && \n"
1319
                 "%s!EOF.\n%s restart" %
1320
                 (gntpass, ss.KeyToFilename(ss.SS_NODED_PASS),
1321
                  constants.SSL_CERT_FILE, gntpem,
1322
                  constants.NODE_INITD_SCRIPT))
1323

    
1324
    result = ssh.SSHCall(node, 'root', mycommand, batch=False, ask_key=True)
1325
    if result.failed:
1326
      raise errors.OpExecError, ("Remote command on node %s, error: %s,"
1327
                                 " output: %s" %
1328
                                 (node, result.fail_reason, result.output))
1329

    
1330
    # check connectivity
1331
    time.sleep(4)
1332

    
1333
    result = rpc.call_version([node])[node]
1334
    if result:
1335
      if constants.PROTOCOL_VERSION == result:
1336
        logger.Info("communication to node %s fine, sw version %s match" %
1337
                    (node, result))
1338
      else:
1339
        raise errors.OpExecError, ("Version mismatch master version %s,"
1340
                                   " node version %s" %
1341
                                   (constants.PROTOCOL_VERSION, result))
1342
    else:
1343
      raise errors.OpExecError, ("Cannot get version from the new node")
1344

    
1345
    # setup ssh on node
1346
    logger.Info("copy ssh key to node %s" % node)
1347
    keyarray = []
1348
    keyfiles = ["/etc/ssh/ssh_host_dsa_key", "/etc/ssh/ssh_host_dsa_key.pub",
1349
                "/etc/ssh/ssh_host_rsa_key", "/etc/ssh/ssh_host_rsa_key.pub",
1350
                "/root/.ssh/id_dsa", "/root/.ssh/id_dsa.pub"]
1351

    
1352
    for i in keyfiles:
1353
      f = open(i, 'r')
1354
      try:
1355
        keyarray.append(f.read())
1356
      finally:
1357
        f.close()
1358

    
1359
    result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1360
                               keyarray[3], keyarray[4], keyarray[5])
1361

    
1362
    if not result:
1363
      raise errors.OpExecError, ("Cannot transfer ssh keys to the new node")
1364

    
1365
    # Add node to our /etc/hosts, and add key to known_hosts
1366
    _UpdateEtcHosts(new_node.name, new_node.primary_ip)
1367
    _UpdateKnownHosts(new_node.name, new_node.primary_ip,
1368
                      self.cfg.GetHostKey())
1369

    
1370
    if new_node.secondary_ip != new_node.primary_ip:
1371
      result = ssh.SSHCall(node, "root",
1372
                           "fping -S 127.0.0.1 -q %s" % new_node.secondary_ip)
1373
      if result.failed:
1374
        raise errors.OpExecError, ("Node claims it doesn't have the"
1375
                                   " secondary ip you gave (%s).\n"
1376
                                   "Please fix and re-run this command." %
1377
                                   new_node.secondary_ip)
1378

    
1379
    # Distribute updated /etc/hosts and known_hosts to all nodes,
1380
    # including the node just added
1381
    myself = self.cfg.GetNodeInfo(self.cfg.GetMaster())
1382
    dist_nodes = self.cfg.GetNodeList() + [node]
1383
    if myself.name in dist_nodes:
1384
      dist_nodes.remove(myself.name)
1385

    
1386
    logger.Debug("Copying hosts and known_hosts to all nodes")
1387
    for fname in ("/etc/hosts", "/etc/ssh/ssh_known_hosts"):
1388
      result = rpc.call_upload_file(dist_nodes, fname)
1389
      for to_node in dist_nodes:
1390
        if not result[to_node]:
1391
          logger.Error("copy of file %s to node %s failed" %
1392
                       (fname, to_node))
1393

    
1394
    to_copy = [constants.MASTER_CRON_FILE,
1395
               constants.MASTER_INITD_SCRIPT,
1396
               constants.CLUSTER_NAME_FILE]
1397
    to_copy.extend(ss.GetFileList())
1398
    for fname in to_copy:
1399
      if not ssh.CopyFileToNode(node, fname):
1400
        logger.Error("could not copy file %s to node %s" % (fname, node))
1401

    
1402
    logger.Info("adding node %s to cluster.conf" % node)
1403
    self.cfg.AddNode(new_node)
1404

    
1405

    
1406
class LUMasterFailover(LogicalUnit):
1407
  """Failover the master node to the current node.
1408

1409
  This is a special LU in that it must run on a non-master node.
1410

1411
  """
1412
  HPATH = "master-failover"
1413
  HTYPE = constants.HTYPE_CLUSTER
1414
  REQ_MASTER = False
1415
  _OP_REQP = []
1416

    
1417
  def BuildHooksEnv(self):
1418
    """Build hooks env.
1419

1420
    This will run on the new master only in the pre phase, and on all
1421
    the nodes in the post phase.
1422

1423
    """
1424
    env = {
1425
      "NEW_MASTER": self.new_master,
1426
      "OLD_MASTER": self.old_master,
1427
      }
1428
    return env, [self.new_master], self.cfg.GetNodeList()
1429

    
1430
  def CheckPrereq(self):
1431
    """Check prerequisites.
1432

1433
    This checks that we are not already the master.
1434

1435
    """
1436
    self.new_master = socket.gethostname()
1437

    
1438
    self.old_master = self.cfg.GetMaster()
1439

    
1440
    if self.old_master == self.new_master:
1441
      raise errors.OpPrereqError, ("This commands must be run on the node"
1442
                                   " where you want the new master to be.\n"
1443
                                   "%s is already the master" %
1444
                                   self.old_master)
1445

    
1446
  def Exec(self, feedback_fn):
1447
    """Failover the master node.
1448

1449
    This command, when run on a non-master node, will cause the current
1450
    master to cease being master, and the non-master to become new
1451
    master.
1452

1453
    """
1454

    
1455
    #TODO: do not rely on gethostname returning the FQDN
1456
    logger.Info("setting master to %s, old master: %s" %
1457
                (self.new_master, self.old_master))
1458

    
1459
    if not rpc.call_node_stop_master(self.old_master):
1460
      logger.Error("could disable the master role on the old master"
1461
                   " %s, please disable manually" % self.old_master)
1462

    
1463
    if not rpc.call_node_start_master(self.new_master):
1464
      logger.Error("could not start the master role on the new master"
1465
                   " %s, please check" % self.new_master)
1466

    
1467
    self.cfg.SetMaster(self.new_master)
1468

    
1469

    
1470
class LUQueryClusterInfo(NoHooksLU):
1471
  """Query cluster configuration.
1472

1473
  """
1474
  _OP_REQP = []
1475

    
1476
  def CheckPrereq(self):
1477
    """No prerequsites needed for this LU.
1478

1479
    """
1480
    pass
1481

    
1482
  def Exec(self, feedback_fn):
1483
    """Return cluster config.
1484

1485
    """
1486
    instances = [self.cfg.GetInstanceInfo(name)
1487
                 for name in self.cfg.GetInstanceList()]
1488
    result = {
1489
      "name": self.cfg.GetClusterName(),
1490
      "software_version": constants.RELEASE_VERSION,
1491
      "protocol_version": constants.PROTOCOL_VERSION,
1492
      "config_version": constants.CONFIG_VERSION,
1493
      "os_api_version": constants.OS_API_VERSION,
1494
      "export_version": constants.EXPORT_VERSION,
1495
      "master": self.cfg.GetMaster(),
1496
      "architecture": (platform.architecture()[0], platform.machine()),
1497
      "instances": [(instance.name, instance.primary_node)
1498
                    for instance in instances],
1499
      "nodes": self.cfg.GetNodeList(),
1500
      }
1501

    
1502
    return result
1503

    
1504

    
1505
class LUClusterCopyFile(NoHooksLU):
1506
  """Copy file to cluster.
1507

1508
  """
1509
  _OP_REQP = ["nodes", "filename"]
1510

    
1511
  def CheckPrereq(self):
1512
    """Check prerequisites.
1513

1514
    It should check that the named file exists and that the given list
1515
    of nodes is valid.
1516

1517
    """
1518
    if not os.path.exists(self.op.filename):
1519
      raise errors.OpPrereqError("No such filename '%s'" % self.op.filename)
1520
    if self.op.nodes:
1521
      nodes = self.op.nodes
1522
    else:
1523
      nodes = self.cfg.GetNodeList()
1524
    self.nodes = []
1525
    for node in nodes:
1526
      nname = self.cfg.ExpandNodeName(node)
1527
      if nname is None:
1528
        raise errors.OpPrereqError, ("Node '%s' is unknown." % node)
1529
      self.nodes.append(nname)
1530

    
1531
  def Exec(self, feedback_fn):
1532
    """Copy a file from master to some nodes.
1533

1534
    Args:
1535
      opts - class with options as members
1536
      args - list containing a single element, the file name
1537
    Opts used:
1538
      nodes - list containing the name of target nodes; if empty, all nodes
1539

1540
    """
1541
    filename = self.op.filename
1542

    
1543
    myname = socket.gethostname()
1544

    
1545
    for node in self.nodes:
1546
      if node == myname:
1547
        continue
1548
      if not ssh.CopyFileToNode(node, filename):
1549
        logger.Error("Copy of file %s to node %s failed" % (filename, node))
1550

    
1551

    
1552
class LUDumpClusterConfig(NoHooksLU):
1553
  """Return a text-representation of the cluster-config.
1554

1555
  """
1556
  _OP_REQP = []
1557

    
1558
  def CheckPrereq(self):
1559
    """No prerequisites.
1560

1561
    """
1562
    pass
1563

    
1564
  def Exec(self, feedback_fn):
1565
    """Dump a representation of the cluster config to the standard output.
1566

1567
    """
1568
    return self.cfg.DumpConfig()
1569

    
1570

    
1571
class LURunClusterCommand(NoHooksLU):
1572
  """Run a command on some nodes.
1573

1574
  """
1575
  _OP_REQP = ["command", "nodes"]
1576

    
1577
  def CheckPrereq(self):
1578
    """Check prerequisites.
1579

1580
    It checks that the given list of nodes is valid.
1581

1582
    """
1583
    if self.op.nodes:
1584
      nodes = self.op.nodes
1585
    else:
1586
      nodes = self.cfg.GetNodeList()
1587
    self.nodes = []
1588
    for node in nodes:
1589
      nname = self.cfg.ExpandNodeName(node)
1590
      if nname is None:
1591
        raise errors.OpPrereqError, ("Node '%s' is unknown." % node)
1592
      self.nodes.append(nname)
1593

    
1594
  def Exec(self, feedback_fn):
1595
    """Run a command on some nodes.
1596

1597
    """
1598
    data = []
1599
    for node in self.nodes:
1600
      result = utils.RunCmd(["ssh", node, self.op.command])
1601
      data.append((node, result.cmd, result.output, result.exit_code))
1602

    
1603
    return data
1604

    
1605

    
1606
class LUActivateInstanceDisks(NoHooksLU):
1607
  """Bring up an instance's disks.
1608

1609
  """
1610
  _OP_REQP = ["instance_name"]
1611

    
1612
  def CheckPrereq(self):
1613
    """Check prerequisites.
1614

1615
    This checks that the instance is in the cluster.
1616

1617
    """
1618
    instance = self.cfg.GetInstanceInfo(
1619
      self.cfg.ExpandInstanceName(self.op.instance_name))
1620
    if instance is None:
1621
      raise errors.OpPrereqError, ("Instance '%s' not known" %
1622
                                   self.op.instance_name)
1623
    self.instance = instance
1624

    
1625

    
1626
  def Exec(self, feedback_fn):
1627
    """Activate the disks.
1628

1629
    """
1630
    disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
1631
    if not disks_ok:
1632
      raise errors.OpExecError, ("Cannot activate block devices")
1633

    
1634
    return disks_info
1635

    
1636

    
1637
def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
1638
  """Prepare the block devices for an instance.
1639

1640
  This sets up the block devices on all nodes.
1641

1642
  Args:
1643
    instance: a ganeti.objects.Instance object
1644
    ignore_secondaries: if true, errors on secondary nodes won't result
1645
                        in an error return from the function
1646

1647
  Returns:
1648
    false if the operation failed
1649
    list of (host, instance_visible_name, node_visible_name) if the operation
1650
         suceeded with the mapping from node devices to instance devices
1651
  """
1652
  device_info = []
1653
  disks_ok = True
1654
  for inst_disk in instance.disks:
1655
    master_result = None
1656
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1657
      cfg.SetDiskID(node_disk, node)
1658
      is_primary = node == instance.primary_node
1659
      result = rpc.call_blockdev_assemble(node, node_disk, is_primary)
1660
      if not result:
1661
        logger.Error("could not prepare block device %s on node %s (is_pri"
1662
                     "mary=%s)" % (inst_disk.iv_name, node, is_primary))
1663
        if is_primary or not ignore_secondaries:
1664
          disks_ok = False
1665
      if is_primary:
1666
        master_result = result
1667
    device_info.append((instance.primary_node, inst_disk.iv_name,
1668
                        master_result))
1669

    
1670
  return disks_ok, device_info
1671

    
1672

    
1673
class LUDeactivateInstanceDisks(NoHooksLU):
1674
  """Shutdown an instance's disks.
1675

1676
  """
1677
  _OP_REQP = ["instance_name"]
1678

    
1679
  def CheckPrereq(self):
1680
    """Check prerequisites.
1681

1682
    This checks that the instance is in the cluster.
1683

1684
    """
1685
    instance = self.cfg.GetInstanceInfo(
1686
      self.cfg.ExpandInstanceName(self.op.instance_name))
1687
    if instance is None:
1688
      raise errors.OpPrereqError, ("Instance '%s' not known" %
1689
                                   self.op.instance_name)
1690
    self.instance = instance
1691

    
1692
  def Exec(self, feedback_fn):
1693
    """Deactivate the disks
1694

1695
    """
1696
    instance = self.instance
1697
    ins_l = rpc.call_instance_list([instance.primary_node])
1698
    ins_l = ins_l[instance.primary_node]
1699
    if not type(ins_l) is list:
1700
      raise errors.OpExecError, ("Can't contact node '%s'" %
1701
                                 instance.primary_node)
1702

    
1703
    if self.instance.name in ins_l:
1704
      raise errors.OpExecError, ("Instance is running, can't shutdown"
1705
                                 " block devices.")
1706

    
1707
    _ShutdownInstanceDisks(instance, self.cfg)
1708

    
1709

    
1710
def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
1711
  """Shutdown block devices of an instance.
1712

1713
  This does the shutdown on all nodes of the instance.
1714

1715
  If the ignore_primary is false, errors on the primary node are
1716
  ignored.
1717

1718
  """
1719
  result = True
1720
  for disk in instance.disks:
1721
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
1722
      cfg.SetDiskID(top_disk, node)
1723
      if not rpc.call_blockdev_shutdown(node, top_disk):
1724
        logger.Error("could not shutdown block device %s on node %s" %
1725
                     (disk.iv_name, node))
1726
        if not ignore_primary or node != instance.primary_node:
1727
          result = False
1728
  return result
1729

    
1730

    
1731
class LUStartupInstance(LogicalUnit):
1732
  """Starts an instance.
1733

1734
  """
1735
  HPATH = "instance-start"
1736
  HTYPE = constants.HTYPE_INSTANCE
1737
  _OP_REQP = ["instance_name", "force"]
1738

    
1739
  def BuildHooksEnv(self):
1740
    """Build hooks env.
1741

1742
    This runs on master, primary and secondary nodes of the instance.
1743

1744
    """
1745
    env = {
1746
      "INSTANCE_NAME": self.op.instance_name,
1747
      "INSTANCE_PRIMARY": self.instance.primary_node,
1748
      "INSTANCE_SECONDARIES": " ".join(self.instance.secondary_nodes),
1749
      "FORCE": self.op.force,
1750
      }
1751
    nl = ([self.cfg.GetMaster(), self.instance.primary_node] +
1752
          list(self.instance.secondary_nodes))
1753
    return env, nl, nl
1754

    
1755
  def CheckPrereq(self):
1756
    """Check prerequisites.
1757

1758
    This checks that the instance is in the cluster.
1759

1760
    """
1761
    instance = self.cfg.GetInstanceInfo(
1762
      self.cfg.ExpandInstanceName(self.op.instance_name))
1763
    if instance is None:
1764
      raise errors.OpPrereqError, ("Instance '%s' not known" %
1765
                                   self.op.instance_name)
1766

    
1767
    # check bridges existance
1768
    brlist = [nic.bridge for nic in instance.nics]
1769
    if not rpc.call_bridges_exist(instance.primary_node, brlist):
1770
      raise errors.OpPrereqError, ("one or more target bridges %s does not"
1771
                                   " exist on destination node '%s'" %
1772
                                   (brlist, instance.primary_node))
1773

    
1774
    self.instance = instance
1775
    self.op.instance_name = instance.name
1776

    
1777
  def Exec(self, feedback_fn):
1778
    """Start the instance.
1779

1780
    """
1781
    instance = self.instance
1782
    force = self.op.force
1783
    extra_args = getattr(self.op, "extra_args", "")
1784

    
1785
    node_current = instance.primary_node
1786

    
1787
    nodeinfo = rpc.call_node_info([node_current], self.cfg.GetVGName())
1788
    if not nodeinfo:
1789
      raise errors.OpExecError, ("Could not contact node %s for infos" %
1790
                                 (node_current))
1791

    
1792
    freememory = nodeinfo[node_current]['memory_free']
1793
    memory = instance.memory
1794
    if memory > freememory:
1795
      raise errors.OpExecError, ("Not enough memory to start instance"
1796
                                 " %s on node %s"
1797
                                 " needed %s MiB, available %s MiB" %
1798
                                 (instance.name, node_current, memory,
1799
                                  freememory))
1800

    
1801
    disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
1802
                                             ignore_secondaries=force)
1803
    if not disks_ok:
1804
      _ShutdownInstanceDisks(instance, self.cfg)
1805
      if not force:
1806
        logger.Error("If the message above refers to a secondary node,"
1807
                     " you can retry the operation using '--force'.")
1808
      raise errors.OpExecError, ("Disk consistency error")
1809

    
1810
    if not rpc.call_instance_start(node_current, instance, extra_args):
1811
      _ShutdownInstanceDisks(instance, self.cfg)
1812
      raise errors.OpExecError, ("Could not start instance")
1813

    
1814
    self.cfg.MarkInstanceUp(instance.name)
1815

    
1816

    
1817
class LUShutdownInstance(LogicalUnit):
1818
  """Shutdown an instance.
1819

1820
  """
1821
  HPATH = "instance-stop"
1822
  HTYPE = constants.HTYPE_INSTANCE
1823
  _OP_REQP = ["instance_name"]
1824

    
1825
  def BuildHooksEnv(self):
1826
    """Build hooks env.
1827

1828
    This runs on master, primary and secondary nodes of the instance.
1829

1830
    """
1831
    env = {
1832
      "INSTANCE_NAME": self.op.instance_name,
1833
      "INSTANCE_PRIMARY": self.instance.primary_node,
1834
      "INSTANCE_SECONDARIES": " ".join(self.instance.secondary_nodes),
1835
      }
1836
    nl = ([self.cfg.GetMaster(), self.instance.primary_node] +
1837
          list(self.instance.secondary_nodes))
1838
    return env, nl, nl
1839

    
1840
  def CheckPrereq(self):
1841
    """Check prerequisites.
1842

1843
    This checks that the instance is in the cluster.
1844

1845
    """
1846
    instance = self.cfg.GetInstanceInfo(
1847
      self.cfg.ExpandInstanceName(self.op.instance_name))
1848
    if instance is None:
1849
      raise errors.OpPrereqError, ("Instance '%s' not known" %
1850
                                   self.op.instance_name)
1851
    self.instance = instance
1852

    
1853
  def Exec(self, feedback_fn):
1854
    """Shutdown the instance.
1855

1856
    """
1857
    instance = self.instance
1858
    node_current = instance.primary_node
1859
    if not rpc.call_instance_shutdown(node_current, instance):
1860
      logger.Error("could not shutdown instance")
1861

    
1862
    self.cfg.MarkInstanceDown(instance.name)
1863
    _ShutdownInstanceDisks(instance, self.cfg)
1864

    
1865

    
1866
class LURemoveInstance(LogicalUnit):
1867
  """Remove an instance.
1868

1869
  """
1870
  HPATH = "instance-remove"
1871
  HTYPE = constants.HTYPE_INSTANCE
1872
  _OP_REQP = ["instance_name"]
1873

    
1874
  def BuildHooksEnv(self):
1875
    """Build hooks env.
1876

1877
    This runs on master, primary and secondary nodes of the instance.
1878

1879
    """
1880
    env = {
1881
      "INSTANCE_NAME": self.op.instance_name,
1882
      "INSTANCE_PRIMARY": self.instance.primary_node,
1883
      "INSTANCE_SECONDARIES": " ".join(self.instance.secondary_nodes),
1884
      }
1885
    nl = ([self.cfg.GetMaster(), self.instance.primary_node] +
1886
          list(self.instance.secondary_nodes))
1887
    return env, nl, nl
1888

    
1889
  def CheckPrereq(self):
1890
    """Check prerequisites.
1891

1892
    This checks that the instance is in the cluster.
1893

1894
    """
1895
    instance = self.cfg.GetInstanceInfo(
1896
      self.cfg.ExpandInstanceName(self.op.instance_name))
1897
    if instance is None:
1898
      raise errors.OpPrereqError, ("Instance '%s' not known" %
1899
                                   self.op.instance_name)
1900
    self.instance = instance
1901

    
1902
  def Exec(self, feedback_fn):
1903
    """Remove the instance.
1904

1905
    """
1906
    instance = self.instance
1907
    logger.Info("shutting down instance %s on node %s" %
1908
                (instance.name, instance.primary_node))
1909

    
1910
    if not rpc.call_instance_shutdown(instance.primary_node, instance):
1911
      raise errors.OpExecError, ("Could not shutdown instance %s on node %s" %
1912
                                 (instance.name, instance.primary_node))
1913

    
1914
    logger.Info("removing block devices for instance %s" % instance.name)
1915

    
1916
    _RemoveDisks(instance, self.cfg)
1917

    
1918
    logger.Info("removing instance %s out of cluster config" % instance.name)
1919

    
1920
    self.cfg.RemoveInstance(instance.name)
1921

    
1922

    
1923
class LUQueryInstances(NoHooksLU):
1924
  """Logical unit for querying instances.
1925

1926
  """
1927
  OP_REQP = ["output_fields"]
1928

    
1929
  def CheckPrereq(self):
1930
    """Check prerequisites.
1931

1932
    This checks that the fields required are valid output fields.
1933

1934
    """
1935

    
1936
    self.static_fields = frozenset(["name", "os", "pnode", "snodes",
1937
                                    "admin_state", "admin_ram",
1938
                                    "disk_template", "ip", "mac", "bridge"])
1939
    self.dynamic_fields = frozenset(["oper_state", "oper_ram"])
1940
    self.all_fields = self.static_fields | self.dynamic_fields
1941

    
1942
    if not self.all_fields.issuperset(self.op.output_fields):
1943
      raise errors.OpPrereqError, ("Unknown output fields selected: %s"
1944
                                   % ",".join(frozenset(self.op.output_fields).
1945
                                              difference(self.all_fields)))
1946

    
1947
  def Exec(self, feedback_fn):
1948
    """Computes the list of nodes and their attributes.
1949

1950
    """
1951

    
1952
    instance_names = utils.NiceSort(self.cfg.GetInstanceList())
1953
    instance_list = [self.cfg.GetInstanceInfo(iname) for iname
1954
                     in instance_names]
1955

    
1956
    # begin data gathering
1957

    
1958
    nodes = frozenset([inst.primary_node for inst in instance_list])
1959

    
1960
    bad_nodes = []
1961
    if self.dynamic_fields.intersection(self.op.output_fields):
1962
      live_data = {}
1963
      node_data = rpc.call_all_instances_info(nodes)
1964
      for name in nodes:
1965
        result = node_data[name]
1966
        if result:
1967
          live_data.update(result)
1968
        elif result == False:
1969
          bad_nodes.append(name)
1970
        # else no instance is alive
1971
    else:
1972
      live_data = dict([(name, {}) for name in instance_names])
1973

    
1974
    # end data gathering
1975

    
1976
    output = []
1977
    for instance in instance_list:
1978
      iout = []
1979
      for field in self.op.output_fields:
1980
        if field == "name":
1981
          val = instance.name
1982
        elif field == "os":
1983
          val = instance.os
1984
        elif field == "pnode":
1985
          val = instance.primary_node
1986
        elif field == "snodes":
1987
          val = ",".join(instance.secondary_nodes) or "-"
1988
        elif field == "admin_state":
1989
          if instance.status == "down":
1990
            val = "no"
1991
          else:
1992
            val = "yes"
1993
        elif field == "oper_state":
1994
          if instance.primary_node in bad_nodes:
1995
            val = "(node down)"
1996
          else:
1997
            if live_data.get(instance.name):
1998
              val = "running"
1999
            else:
2000
              val = "stopped"
2001
        elif field == "admin_ram":
2002
          val = instance.memory
2003
        elif field == "oper_ram":
2004
          if instance.primary_node in bad_nodes:
2005
            val = "(node down)"
2006
          elif instance.name in live_data:
2007
            val = live_data[instance.name].get("memory", "?")
2008
          else:
2009
            val = "-"
2010
        elif field == "disk_template":
2011
          val = instance.disk_template
2012
        elif field == "ip":
2013
          val = instance.nics[0].ip
2014
        elif field == "bridge":
2015
          val = instance.nics[0].bridge
2016
        elif field == "mac":
2017
          val = instance.nics[0].mac
2018
        else:
2019
          raise errors.ParameterError, field
2020
        val = str(val)
2021
        iout.append(val)
2022
      output.append(iout)
2023

    
2024
    return output
2025

    
2026

    
2027
class LUFailoverInstance(LogicalUnit):
2028
  """Failover an instance.
2029

2030
  """
2031
  HPATH = "instance-failover"
2032
  HTYPE = constants.HTYPE_INSTANCE
2033
  _OP_REQP = ["instance_name", "ignore_consistency"]
2034

    
2035
  def BuildHooksEnv(self):
2036
    """Build hooks env.
2037

2038
    This runs on master, primary and secondary nodes of the instance.
2039

2040
    """
2041
    env = {
2042
      "INSTANCE_NAME": self.op.instance_name,
2043
      "INSTANCE_PRIMARY": self.instance.primary_node,
2044
      "INSTANCE_SECONDARIES": " ".join(self.instance.secondary_nodes),
2045
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2046
      }
2047
    nl = [self.cfg.GetMaster()] + list(self.instance.secondary_nodes)
2048
    return env, nl, nl
2049

    
2050
  def CheckPrereq(self):
2051
    """Check prerequisites.
2052

2053
    This checks that the instance is in the cluster.
2054

2055
    """
2056
    instance = self.cfg.GetInstanceInfo(
2057
      self.cfg.ExpandInstanceName(self.op.instance_name))
2058
    if instance is None:
2059
      raise errors.OpPrereqError, ("Instance '%s' not known" %
2060
                                   self.op.instance_name)
2061

    
2062
    # check bridge existance
2063
    brlist = [nic.bridge for nic in instance.nics]
2064
    if not rpc.call_bridges_exist(instance.primary_node, brlist):
2065
      raise errors.OpPrereqError, ("one or more target bridges %s does not"
2066
                                   " exist on destination node '%s'" %
2067
                                   (brlist, instance.primary_node))
2068

    
2069
    self.instance = instance
2070

    
2071
  def Exec(self, feedback_fn):
2072
    """Failover an instance.
2073

2074
    The failover is done by shutting it down on its present node and
2075
    starting it on the secondary.
2076

2077
    """
2078
    instance = self.instance
2079

    
2080
    source_node = instance.primary_node
2081
    target_node = instance.secondary_nodes[0]
2082

    
2083
    feedback_fn("* checking disk consistency between source and target")
2084
    for dev in instance.disks:
2085
      # for remote_raid1, these are md over drbd
2086
      if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2087
        if not self.op.ignore_consistency:
2088
          raise errors.OpExecError, ("Disk %s is degraded on target node,"
2089
                                     " aborting failover." % dev.iv_name)
2090

    
2091
    feedback_fn("* checking target node resource availability")
2092
    nodeinfo = rpc.call_node_info([target_node], self.cfg.GetVGName())
2093

    
2094
    if not nodeinfo:
2095
      raise errors.OpExecError, ("Could not contact target node %s." %
2096
                                 target_node)
2097

    
2098
    free_memory = int(nodeinfo[target_node]['memory_free'])
2099
    memory = instance.memory
2100
    if memory > free_memory:
2101
      raise errors.OpExecError, ("Not enough memory to create instance %s on"
2102
                                 " node %s. needed %s MiB, available %s MiB" %
2103
                                 (instance.name, target_node, memory,
2104
                                  free_memory))
2105

    
2106
    feedback_fn("* shutting down instance on source node")
2107
    logger.Info("Shutting down instance %s on node %s" %
2108
                (instance.name, source_node))
2109

    
2110
    if not rpc.call_instance_shutdown(source_node, instance):
2111
      logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2112
                   " anyway. Please make sure node %s is down"  %
2113
                   (instance.name, source_node, source_node))
2114

    
2115
    feedback_fn("* deactivating the instance's disks on source node")
2116
    if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2117
      raise errors.OpExecError, ("Can't shut down the instance's disks.")
2118

    
2119
    instance.primary_node = target_node
2120
    # distribute new instance config to the other nodes
2121
    self.cfg.AddInstance(instance)
2122

    
2123
    feedback_fn("* activating the instance's disks on target node")
2124
    logger.Info("Starting instance %s on node %s" %
2125
                (instance.name, target_node))
2126

    
2127
    disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2128
                                             ignore_secondaries=True)
2129
    if not disks_ok:
2130
      _ShutdownInstanceDisks(instance, self.cfg)
2131
      raise errors.OpExecError, ("Can't activate the instance's disks")
2132

    
2133
    feedback_fn("* starting the instance on the target node")
2134
    if not rpc.call_instance_start(target_node, instance, None):
2135
      _ShutdownInstanceDisks(instance, self.cfg)
2136
      raise errors.OpExecError("Could not start instance %s on node %s." %
2137
                               (instance, target_node))
2138

    
2139

    
2140
def _CreateBlockDevOnPrimary(cfg, node, device):
2141
  """Create a tree of block devices on the primary node.
2142

2143
  This always creates all devices.
2144

2145
  """
2146

    
2147
  if device.children:
2148
    for child in device.children:
2149
      if not _CreateBlockDevOnPrimary(cfg, node, child):
2150
        return False
2151

    
2152
  cfg.SetDiskID(device, node)
2153
  new_id = rpc.call_blockdev_create(node, device, device.size, True)
2154
  if not new_id:
2155
    return False
2156
  if device.physical_id is None:
2157
    device.physical_id = new_id
2158
  return True
2159

    
2160

    
2161
def _CreateBlockDevOnSecondary(cfg, node, device, force):
2162
  """Create a tree of block devices on a secondary node.
2163

2164
  If this device type has to be created on secondaries, create it and
2165
  all its children.
2166

2167
  If not, just recurse to children keeping the same 'force' value.
2168

2169
  """
2170
  if device.CreateOnSecondary():
2171
    force = True
2172
  if device.children:
2173
    for child in device.children:
2174
      if not _CreateBlockDevOnSecondary(cfg, node, child, force):
2175
        return False
2176

    
2177
  if not force:
2178
    return True
2179
  cfg.SetDiskID(device, node)
2180
  new_id = rpc.call_blockdev_create(node, device, device.size, False)
2181
  if not new_id:
2182
    return False
2183
  if device.physical_id is None:
2184
    device.physical_id = new_id
2185
  return True
2186

    
2187

    
2188
def _GenerateMDDRBDBranch(cfg, vgname, primary, secondary, size, base):
2189
  """Generate a drbd device complete with its children.
2190

2191
  """
2192
  port = cfg.AllocatePort()
2193
  base = "%s_%s" % (base, port)
2194
  dev_data = objects.Disk(dev_type="lvm", size=size,
2195
                          logical_id=(vgname, "%s.data" % base))
2196
  dev_meta = objects.Disk(dev_type="lvm", size=128,
2197
                          logical_id=(vgname, "%s.meta" % base))
2198
  drbd_dev = objects.Disk(dev_type="drbd", size=size,
2199
                          logical_id = (primary, secondary, port),
2200
                          children = [dev_data, dev_meta])
2201
  return drbd_dev
2202

    
2203

    
2204
def _GenerateDiskTemplate(cfg, vgname, template_name,
2205
                          instance_name, primary_node,
2206
                          secondary_nodes, disk_sz, swap_sz):
2207
  """Generate the entire disk layout for a given template type.
2208

2209
  """
2210
  #TODO: compute space requirements
2211

    
2212
  if template_name == "diskless":
2213
    disks = []
2214
  elif template_name == "plain":
2215
    if len(secondary_nodes) != 0:
2216
      raise errors.ProgrammerError("Wrong template configuration")
2217
    sda_dev = objects.Disk(dev_type="lvm", size=disk_sz,
2218
                           logical_id=(vgname, "%s.os" % instance_name),
2219
                           iv_name = "sda")
2220
    sdb_dev = objects.Disk(dev_type="lvm", size=swap_sz,
2221
                           logical_id=(vgname, "%s.swap" % instance_name),
2222
                           iv_name = "sdb")
2223
    disks = [sda_dev, sdb_dev]
2224
  elif template_name == "local_raid1":
2225
    if len(secondary_nodes) != 0:
2226
      raise errors.ProgrammerError("Wrong template configuration")
2227
    sda_dev_m1 = objects.Disk(dev_type="lvm", size=disk_sz,
2228
                              logical_id=(vgname, "%s.os_m1" % instance_name))
2229
    sda_dev_m2 = objects.Disk(dev_type="lvm", size=disk_sz,
2230
                              logical_id=(vgname, "%s.os_m2" % instance_name))
2231
    md_sda_dev = objects.Disk(dev_type="md_raid1", iv_name = "sda",
2232
                              size=disk_sz,
2233
                              children = [sda_dev_m1, sda_dev_m2])
2234
    sdb_dev_m1 = objects.Disk(dev_type="lvm", size=swap_sz,
2235
                              logical_id=(vgname, "%s.swap_m1" %
2236
                                          instance_name))
2237
    sdb_dev_m2 = objects.Disk(dev_type="lvm", size=swap_sz,
2238
                              logical_id=(vgname, "%s.swap_m2" %
2239
                                          instance_name))
2240
    md_sdb_dev = objects.Disk(dev_type="md_raid1", iv_name = "sdb",
2241
                              size=swap_sz,
2242
                              children = [sdb_dev_m1, sdb_dev_m2])
2243
    disks = [md_sda_dev, md_sdb_dev]
2244
  elif template_name == "remote_raid1":
2245
    if len(secondary_nodes) != 1:
2246
      raise errors.ProgrammerError("Wrong template configuration")
2247
    remote_node = secondary_nodes[0]
2248
    drbd_sda_dev = _GenerateMDDRBDBranch(cfg, vgname,
2249
                                         primary_node, remote_node, disk_sz,
2250
                                         "%s-sda" % instance_name)
2251
    md_sda_dev = objects.Disk(dev_type="md_raid1", iv_name="sda",
2252
                              children = [drbd_sda_dev], size=disk_sz)
2253
    drbd_sdb_dev = _GenerateMDDRBDBranch(cfg, vgname,
2254
                                         primary_node, remote_node, swap_sz,
2255
                                         "%s-sdb" % instance_name)
2256
    md_sdb_dev = objects.Disk(dev_type="md_raid1", iv_name="sdb",
2257
                              children = [drbd_sdb_dev], size=swap_sz)
2258
    disks = [md_sda_dev, md_sdb_dev]
2259
  else:
2260
    raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
2261
  return disks
2262

    
2263

    
2264
def _CreateDisks(cfg, instance):
2265
  """Create all disks for an instance.
2266

2267
  This abstracts away some work from AddInstance.
2268

2269
  Args:
2270
    instance: the instance object
2271

2272
  Returns:
2273
    True or False showing the success of the creation process
2274

2275
  """
2276
  for device in instance.disks:
2277
    logger.Info("creating volume %s for instance %s" %
2278
              (device.iv_name, instance.name))
2279
    #HARDCODE
2280
    for secondary_node in instance.secondary_nodes:
2281
      if not _CreateBlockDevOnSecondary(cfg, secondary_node, device, False):
2282
        logger.Error("failed to create volume %s (%s) on secondary node %s!" %
2283
                     (device.iv_name, device, secondary_node))
2284
        return False
2285
    #HARDCODE
2286
    if not _CreateBlockDevOnPrimary(cfg, instance.primary_node, device):
2287
      logger.Error("failed to create volume %s on primary!" %
2288
                   device.iv_name)
2289
      return False
2290
  return True
2291

    
2292

    
2293
def _RemoveDisks(instance, cfg):
2294
  """Remove all disks for an instance.
2295

2296
  This abstracts away some work from `AddInstance()` and
2297
  `RemoveInstance()`. Note that in case some of the devices couldn't
2298
  be remove, the removal will continue with the other ones (compare
2299
  with `_CreateDisks()`).
2300

2301
  Args:
2302
    instance: the instance object
2303

2304
  Returns:
2305
    True or False showing the success of the removal proces
2306

2307
  """
2308
  logger.Info("removing block devices for instance %s" % instance.name)
2309

    
2310
  result = True
2311
  for device in instance.disks:
2312
    for node, disk in device.ComputeNodeTree(instance.primary_node):
2313
      cfg.SetDiskID(disk, node)
2314
      if not rpc.call_blockdev_remove(node, disk):
2315
        logger.Error("could not remove block device %s on node %s,"
2316
                     " continuing anyway" %
2317
                     (device.iv_name, node))
2318
        result = False
2319
  return result
2320

    
2321

    
2322
class LUCreateInstance(LogicalUnit):
2323
  """Create an instance.
2324

2325
  """
2326
  HPATH = "instance-add"
2327
  HTYPE = constants.HTYPE_INSTANCE
2328
  _OP_REQP = ["instance_name", "mem_size", "disk_size", "pnode",
2329
              "disk_template", "swap_size", "mode", "start", "vcpus",
2330
              "wait_for_sync"]
2331

    
2332
  def BuildHooksEnv(self):
2333
    """Build hooks env.
2334

2335
    This runs on master, primary and secondary nodes of the instance.
2336

2337
    """
2338
    env = {
2339
      "INSTANCE_NAME": self.op.instance_name,
2340
      "INSTANCE_PRIMARY": self.op.pnode,
2341
      "INSTANCE_SECONDARIES": " ".join(self.secondaries),
2342
      "DISK_TEMPLATE": self.op.disk_template,
2343
      "MEM_SIZE": self.op.mem_size,
2344
      "DISK_SIZE": self.op.disk_size,
2345
      "SWAP_SIZE": self.op.swap_size,
2346
      "VCPUS": self.op.vcpus,
2347
      "BRIDGE": self.op.bridge,
2348
      "INSTANCE_ADD_MODE": self.op.mode,
2349
      }
2350
    if self.op.mode == constants.INSTANCE_IMPORT:
2351
      env["SRC_NODE"] = self.op.src_node
2352
      env["SRC_PATH"] = self.op.src_path
2353
      env["SRC_IMAGE"] = self.src_image
2354
    if self.inst_ip:
2355
      env["INSTANCE_IP"] = self.inst_ip
2356

    
2357
    nl = ([self.cfg.GetMaster(), self.op.pnode] +
2358
          self.secondaries)
2359
    return env, nl, nl
2360

    
2361

    
2362
  def CheckPrereq(self):
2363
    """Check prerequisites.
2364

2365
    """
2366
    if self.op.mode not in (constants.INSTANCE_CREATE,
2367
                            constants.INSTANCE_IMPORT):
2368
      raise errors.OpPrereqError, ("Invalid instance creation mode '%s'" %
2369
                                   self.op.mode)
2370

    
2371
    if self.op.mode == constants.INSTANCE_IMPORT:
2372
      src_node = getattr(self.op, "src_node", None)
2373
      src_path = getattr(self.op, "src_path", None)
2374
      if src_node is None or src_path is None:
2375
        raise errors.OpPrereqError, ("Importing an instance requires source"
2376
                                     " node and path options")
2377
      src_node_full = self.cfg.ExpandNodeName(src_node)
2378
      if src_node_full is None:
2379
        raise errors.OpPrereqError, ("Unknown source node '%s'" % src_node)
2380
      self.op.src_node = src_node = src_node_full
2381

    
2382
      if not os.path.isabs(src_path):
2383
        raise errors.OpPrereqError, ("The source path must be absolute")
2384

    
2385
      export_info = rpc.call_export_info(src_node, src_path)
2386

    
2387
      if not export_info:
2388
        raise errors.OpPrereqError, ("No export found in dir %s" % src_path)
2389

    
2390
      if not export_info.has_section(constants.INISECT_EXP):
2391
        raise errors.ProgrammerError, ("Corrupted export config")
2392

    
2393
      ei_version = export_info.get(constants.INISECT_EXP, 'version')
2394
      if (int(ei_version) != constants.EXPORT_VERSION):
2395
        raise errors.OpPrereqError, ("Wrong export version %s (wanted %d)" %
2396
                                     (ei_version, constants.EXPORT_VERSION))
2397

    
2398
      if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
2399
        raise errors.OpPrereqError, ("Can't import instance with more than"
2400
                                     " one data disk")
2401

    
2402
      # FIXME: are the old os-es, disk sizes, etc. useful?
2403
      self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
2404
      diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
2405
                                                         'disk0_dump'))
2406
      self.src_image = diskimage
2407
    else: # INSTANCE_CREATE
2408
      if getattr(self.op, "os_type", None) is None:
2409
        raise errors.OpPrereqError, ("No guest OS specified")
2410

    
2411
    # check primary node
2412
    pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
2413
    if pnode is None:
2414
      raise errors.OpPrereqError, ("Primary node '%s' is uknown" %
2415
                                   self.op.pnode)
2416
    self.op.pnode = pnode.name
2417
    self.pnode = pnode
2418
    self.secondaries = []
2419
    # disk template and mirror node verification
2420
    if self.op.disk_template not in constants.DISK_TEMPLATES:
2421
      raise errors.OpPrereqError, ("Invalid disk template name")
2422

    
2423
    if self.op.disk_template == constants.DT_REMOTE_RAID1:
2424
      if getattr(self.op, "snode", None) is None:
2425
        raise errors.OpPrereqError, ("The 'remote_raid1' disk template needs"
2426
                                     " a mirror node")
2427

    
2428
      snode_name = self.cfg.ExpandNodeName(self.op.snode)
2429
      if snode_name is None:
2430
        raise errors.OpPrereqError, ("Unknown secondary node '%s'" %
2431
                                     self.op.snode)
2432
      elif snode_name == pnode.name:
2433
        raise errors.OpPrereqError, ("The secondary node cannot be"
2434
                                     " the primary node.")
2435
      self.secondaries.append(snode_name)
2436

    
2437
    # os verification
2438
    os_obj = rpc.call_os_get([pnode.name], self.op.os_type)[pnode.name]
2439
    if not isinstance(os_obj, objects.OS):
2440
      raise errors.OpPrereqError, ("OS '%s' not in supported os list for"
2441
                                   " primary node"  % self.op.os_type)
2442

    
2443
    # instance verification
2444
    hostname1 = utils.LookupHostname(self.op.instance_name)
2445
    if not hostname1:
2446
      raise errors.OpPrereqError, ("Instance name '%s' not found in dns" %
2447
                                   self.op.instance_name)
2448

    
2449
    self.op.instance_name = instance_name = hostname1['hostname']
2450
    instance_list = self.cfg.GetInstanceList()
2451
    if instance_name in instance_list:
2452
      raise errors.OpPrereqError, ("Instance '%s' is already in the cluster" %
2453
                                   instance_name)
2454

    
2455
    ip = getattr(self.op, "ip", None)
2456
    if ip is None or ip.lower() == "none":
2457
      inst_ip = None
2458
    elif ip.lower() == "auto":
2459
      inst_ip = hostname1['ip']
2460
    else:
2461
      if not utils.IsValidIP(ip):
2462
        raise errors.OpPrereqError, ("given IP address '%s' doesn't look"
2463
                                     " like a valid IP" % ip)
2464
      inst_ip = ip
2465
    self.inst_ip = inst_ip
2466

    
2467
    command = ["fping", "-q", hostname1['ip']]
2468
    result = utils.RunCmd(command)
2469
    if not result.failed:
2470
      raise errors.OpPrereqError, ("IP %s of instance %s already in use" %
2471
                                   (hostname1['ip'], instance_name))
2472

    
2473
    # bridge verification
2474
    bridge = getattr(self.op, "bridge", None)
2475
    if bridge is None:
2476
      self.op.bridge = self.cfg.GetDefBridge()
2477
    else:
2478
      self.op.bridge = bridge
2479

    
2480
    if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
2481
      raise errors.OpPrereqError, ("target bridge '%s' does not exist on"
2482
                                   " destination node '%s'" %
2483
                                   (self.op.bridge, pnode.name))
2484

    
2485
    if self.op.start:
2486
      self.instance_status = 'up'
2487
    else:
2488
      self.instance_status = 'down'
2489

    
2490
  def Exec(self, feedback_fn):
2491
    """Create and add the instance to the cluster.
2492

2493
    """
2494
    instance = self.op.instance_name
2495
    pnode_name = self.pnode.name
2496

    
2497
    nic = objects.NIC(bridge=self.op.bridge, mac=self.cfg.GenerateMAC())
2498
    if self.inst_ip is not None:
2499
      nic.ip = self.inst_ip
2500

    
2501
    disks = _GenerateDiskTemplate(self.cfg, self.cfg.GetVGName(),
2502
                                  self.op.disk_template,
2503
                                  instance, pnode_name,
2504
                                  self.secondaries, self.op.disk_size,
2505
                                  self.op.swap_size)
2506

    
2507
    iobj = objects.Instance(name=instance, os=self.op.os_type,
2508
                            primary_node=pnode_name,
2509
                            memory=self.op.mem_size,
2510
                            vcpus=self.op.vcpus,
2511
                            nics=[nic], disks=disks,
2512
                            disk_template=self.op.disk_template,
2513
                            status=self.instance_status,
2514
                            )
2515

    
2516
    feedback_fn("* creating instance disks...")
2517
    if not _CreateDisks(self.cfg, iobj):
2518
      _RemoveDisks(iobj, self.cfg)
2519
      raise errors.OpExecError, ("Device creation failed, reverting...")
2520

    
2521
    feedback_fn("adding instance %s to cluster config" % instance)
2522

    
2523
    self.cfg.AddInstance(iobj)
2524

    
2525
    if self.op.wait_for_sync:
2526
      disk_abort = not _WaitForSync(self.cfg, iobj)
2527
    elif iobj.disk_template == "remote_raid1":
2528
      # make sure the disks are not degraded (still sync-ing is ok)
2529
      time.sleep(15)
2530
      feedback_fn("* checking mirrors status")
2531
      disk_abort = not _WaitForSync(self.cfg, iobj, oneshot=True)
2532
    else:
2533
      disk_abort = False
2534

    
2535
    if disk_abort:
2536
      _RemoveDisks(iobj, self.cfg)
2537
      self.cfg.RemoveInstance(iobj.name)
2538
      raise errors.OpExecError, ("There are some degraded disks for"
2539
                                      " this instance")
2540

    
2541
    feedback_fn("creating os for instance %s on node %s" %
2542
                (instance, pnode_name))
2543

    
2544
    if iobj.disk_template != constants.DT_DISKLESS:
2545
      if self.op.mode == constants.INSTANCE_CREATE:
2546
        feedback_fn("* running the instance OS create scripts...")
2547
        if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
2548
          raise errors.OpExecError, ("could not add os for instance %s"
2549
                                          " on node %s" %
2550
                                          (instance, pnode_name))
2551

    
2552
      elif self.op.mode == constants.INSTANCE_IMPORT:
2553
        feedback_fn("* running the instance OS import scripts...")
2554
        src_node = self.op.src_node
2555
        src_image = self.src_image
2556
        if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
2557
                                                src_node, src_image):
2558
          raise errors.OpExecError, ("Could not import os for instance"
2559
                                          " %s on node %s" %
2560
                                          (instance, pnode_name))
2561
      else:
2562
        # also checked in the prereq part
2563
        raise errors.ProgrammerError, ("Unknown OS initialization mode '%s'"
2564
                                       % self.op.mode)
2565

    
2566
    if self.op.start:
2567
      logger.Info("starting instance %s on node %s" % (instance, pnode_name))
2568
      feedback_fn("* starting instance...")
2569
      if not rpc.call_instance_start(pnode_name, iobj, None):
2570
        raise errors.OpExecError, ("Could not start instance")
2571

    
2572

    
2573
class LUConnectConsole(NoHooksLU):
2574
  """Connect to an instance's console.
2575

2576
  This is somewhat special in that it returns the command line that
2577
  you need to run on the master node in order to connect to the
2578
  console.
2579

2580
  """
2581
  _OP_REQP = ["instance_name"]
2582

    
2583
  def CheckPrereq(self):
2584
    """Check prerequisites.
2585

2586
    This checks that the instance is in the cluster.
2587

2588
    """
2589
    instance = self.cfg.GetInstanceInfo(
2590
      self.cfg.ExpandInstanceName(self.op.instance_name))
2591
    if instance is None:
2592
      raise errors.OpPrereqError, ("Instance '%s' not known" %
2593
                                   self.op.instance_name)
2594
    self.instance = instance
2595

    
2596
  def Exec(self, feedback_fn):
2597
    """Connect to the console of an instance
2598

2599
    """
2600
    instance = self.instance
2601
    node = instance.primary_node
2602

    
2603
    node_insts = rpc.call_instance_list([node])[node]
2604
    if node_insts is False:
2605
      raise errors.OpExecError, ("Can't connect to node %s." % node)
2606

    
2607
    if instance.name not in node_insts:
2608
      raise errors.OpExecError, ("Instance %s is not running." % instance.name)
2609

    
2610
    logger.Debug("connecting to console of %s on %s" % (instance.name, node))
2611

    
2612
    hyper = hypervisor.GetHypervisor()
2613
    console_cmd = hyper.GetShellCommandForConsole(instance.name)
2614
    return node, console_cmd
2615

    
2616

    
2617
class LUAddMDDRBDComponent(LogicalUnit):
2618
  """Adda new mirror member to an instance's disk.
2619

2620
  """
2621
  HPATH = "mirror-add"
2622
  HTYPE = constants.HTYPE_INSTANCE
2623
  _OP_REQP = ["instance_name", "remote_node", "disk_name"]
2624

    
2625
  def BuildHooksEnv(self):
2626
    """Build hooks env.
2627

2628
    This runs on the master, the primary and all the secondaries.
2629

2630
    """
2631
    env = {
2632
      "INSTANCE_NAME": self.op.instance_name,
2633
      "NEW_SECONDARY": self.op.remote_node,
2634
      "DISK_NAME": self.op.disk_name,
2635
      }
2636
    nl = [self.cfg.GetMaster(), self.instance.primary_node,
2637
          self.op.remote_node,] + list(self.instance.secondary_nodes)
2638
    return env, nl, nl
2639

    
2640
  def CheckPrereq(self):
2641
    """Check prerequisites.
2642

2643
    This checks that the instance is in the cluster.
2644

2645
    """
2646
    instance = self.cfg.GetInstanceInfo(
2647
      self.cfg.ExpandInstanceName(self.op.instance_name))
2648
    if instance is None:
2649
      raise errors.OpPrereqError, ("Instance '%s' not known" %
2650
                                   self.op.instance_name)
2651
    self.instance = instance
2652

    
2653
    remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
2654
    if remote_node is None:
2655
      raise errors.OpPrereqError, ("Node '%s' not known" % self.op.remote_node)
2656
    self.remote_node = remote_node
2657

    
2658
    if remote_node == instance.primary_node:
2659
      raise errors.OpPrereqError, ("The specified node is the primary node of"
2660
                                   " the instance.")
2661

    
2662
    if instance.disk_template != constants.DT_REMOTE_RAID1:
2663
      raise errors.OpPrereqError, ("Instance's disk layout is not"
2664
                                   " remote_raid1.")
2665
    for disk in instance.disks:
2666
      if disk.iv_name == self.op.disk_name:
2667
        break
2668
    else:
2669
      raise errors.OpPrereqError, ("Can't find this device ('%s') in the"
2670
                                   " instance." % self.op.disk_name)
2671
    if len(disk.children) > 1:
2672
      raise errors.OpPrereqError, ("The device already has two slave"
2673
                                   " devices.\n"
2674
                                   "This would create a 3-disk raid1"
2675
                                   " which we don't allow.")
2676
    self.disk = disk
2677

    
2678
  def Exec(self, feedback_fn):
2679
    """Add the mirror component
2680

2681
    """
2682
    disk = self.disk
2683
    instance = self.instance
2684

    
2685
    remote_node = self.remote_node
2686
    new_drbd = _GenerateMDDRBDBranch(self.cfg, instance.primary_node,
2687
                                     remote_node, disk.size, "%s-%s" %
2688
                                     (instance.name, self.op.disk_name))
2689

    
2690
    logger.Info("adding new mirror component on secondary")
2691
    #HARDCODE
2692
    if not _CreateBlockDevOnSecondary(self.cfg, remote_node, new_drbd, False):
2693
      raise errors.OpExecError, ("Failed to create new component on secondary"
2694
                                 " node %s" % remote_node)
2695

    
2696
    logger.Info("adding new mirror component on primary")
2697
    #HARDCODE
2698
    if not _CreateBlockDevOnPrimary(self.cfg, instance.primary_node, new_drbd):
2699
      # remove secondary dev
2700
      self.cfg.SetDiskID(new_drbd, remote_node)
2701
      rpc.call_blockdev_remove(remote_node, new_drbd)
2702
      raise errors.OpExecError, ("Failed to create volume on primary")
2703

    
2704
    # the device exists now
2705
    # call the primary node to add the mirror to md
2706
    logger.Info("adding new mirror component to md")
2707
    if not rpc.call_blockdev_addchild(instance.primary_node,
2708
                                           disk, new_drbd):
2709
      logger.Error("Can't add mirror compoment to md!")
2710
      self.cfg.SetDiskID(new_drbd, remote_node)
2711
      if not rpc.call_blockdev_remove(remote_node, new_drbd):
2712
        logger.Error("Can't rollback on secondary")
2713
      self.cfg.SetDiskID(new_drbd, instance.primary_node)
2714
      if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
2715
        logger.Error("Can't rollback on primary")
2716
      raise errors.OpExecError, "Can't add mirror component to md array"
2717

    
2718
    disk.children.append(new_drbd)
2719

    
2720
    self.cfg.AddInstance(instance)
2721

    
2722
    _WaitForSync(self.cfg, instance)
2723

    
2724
    return 0
2725

    
2726

    
2727
class LURemoveMDDRBDComponent(LogicalUnit):
2728
  """Remove a component from a remote_raid1 disk.
2729

2730
  """
2731
  HPATH = "mirror-remove"
2732
  HTYPE = constants.HTYPE_INSTANCE
2733
  _OP_REQP = ["instance_name", "disk_name", "disk_id"]
2734

    
2735
  def BuildHooksEnv(self):
2736
    """Build hooks env.
2737

2738
    This runs on the master, the primary and all the secondaries.
2739

2740
    """
2741
    env = {
2742
      "INSTANCE_NAME": self.op.instance_name,
2743
      "DISK_NAME": self.op.disk_name,
2744
      "DISK_ID": self.op.disk_id,
2745
      "OLD_SECONDARY": self.old_secondary,
2746
      }
2747
    nl = [self.cfg.GetMaster(),
2748
          self.instance.primary_node] + list(self.instance.secondary_nodes)
2749
    return env, nl, nl
2750

    
2751
  def CheckPrereq(self):
2752
    """Check prerequisites.
2753

2754
    This checks that the instance is in the cluster.
2755

2756
    """
2757
    instance = self.cfg.GetInstanceInfo(
2758
      self.cfg.ExpandInstanceName(self.op.instance_name))
2759
    if instance is None:
2760
      raise errors.OpPrereqError, ("Instance '%s' not known" %
2761
                                   self.op.instance_name)
2762
    self.instance = instance
2763

    
2764
    if instance.disk_template != constants.DT_REMOTE_RAID1:
2765
      raise errors.OpPrereqError, ("Instance's disk layout is not"
2766
                                   " remote_raid1.")
2767
    for disk in instance.disks:
2768
      if disk.iv_name == self.op.disk_name:
2769
        break
2770
    else:
2771
      raise errors.OpPrereqError, ("Can't find this device ('%s') in the"
2772
                                   " instance." % self.op.disk_name)
2773
    for child in disk.children:
2774
      if child.dev_type == "drbd" and child.logical_id[2] == self.op.disk_id:
2775
        break
2776
    else:
2777
      raise errors.OpPrereqError, ("Can't find the device with this port.")
2778

    
2779
    if len(disk.children) < 2:
2780
      raise errors.OpPrereqError, ("Cannot remove the last component from"
2781
                                   " a mirror.")
2782
    self.disk = disk
2783
    self.child = child
2784
    if self.child.logical_id[0] == instance.primary_node:
2785
      oid = 1
2786
    else:
2787
      oid = 0
2788
    self.old_secondary = self.child.logical_id[oid]
2789

    
2790
  def Exec(self, feedback_fn):
2791
    """Remove the mirror component
2792

2793
    """
2794
    instance = self.instance
2795
    disk = self.disk
2796
    child = self.child
2797
    logger.Info("remove mirror component")
2798
    self.cfg.SetDiskID(disk, instance.primary_node)
2799
    if not rpc.call_blockdev_removechild(instance.primary_node,
2800
                                              disk, child):
2801
      raise errors.OpExecError, ("Can't remove child from mirror.")
2802

    
2803
    for node in child.logical_id[:2]:
2804
      self.cfg.SetDiskID(child, node)
2805
      if not rpc.call_blockdev_remove(node, child):
2806
        logger.Error("Warning: failed to remove device from node %s,"
2807
                     " continuing operation." % node)
2808

    
2809
    disk.children.remove(child)
2810
    self.cfg.AddInstance(instance)
2811

    
2812

    
2813
class LUReplaceDisks(LogicalUnit):
2814
  """Replace the disks of an instance.
2815

2816
  """
2817
  HPATH = "mirrors-replace"
2818
  HTYPE = constants.HTYPE_INSTANCE
2819
  _OP_REQP = ["instance_name"]
2820

    
2821
  def BuildHooksEnv(self):
2822
    """Build hooks env.
2823

2824
    This runs on the master, the primary and all the secondaries.
2825

2826
    """
2827
    env = {
2828
      "INSTANCE_NAME": self.op.instance_name,
2829
      "NEW_SECONDARY": self.op.remote_node,
2830
      "OLD_SECONDARY": self.instance.secondary_nodes[0],
2831
      }
2832
    nl = [self.cfg.GetMaster(),
2833
          self.instance.primary_node] + list(self.instance.secondary_nodes)
2834
    return env, nl, nl
2835

    
2836
  def CheckPrereq(self):
2837
    """Check prerequisites.
2838

2839
    This checks that the instance is in the cluster.
2840

2841
    """
2842
    instance = self.cfg.GetInstanceInfo(
2843
      self.cfg.ExpandInstanceName(self.op.instance_name))
2844
    if instance is None:
2845
      raise errors.OpPrereqError, ("Instance '%s' not known" %
2846
                                   self.op.instance_name)
2847
    self.instance = instance
2848

    
2849
    if instance.disk_template != constants.DT_REMOTE_RAID1:
2850
      raise errors.OpPrereqError, ("Instance's disk layout is not"
2851
                                   " remote_raid1.")
2852

    
2853
    if len(instance.secondary_nodes) != 1:
2854
      raise errors.OpPrereqError, ("The instance has a strange layout,"
2855
                                   " expected one secondary but found %d" %
2856
                                   len(instance.secondary_nodes))
2857

    
2858
    remote_node = getattr(self.op, "remote_node", None)
2859
    if remote_node is None:
2860
      remote_node = instance.secondary_nodes[0]
2861
    else:
2862
      remote_node = self.cfg.ExpandNodeName(remote_node)
2863
      if remote_node is None:
2864
        raise errors.OpPrereqError, ("Node '%s' not known" %
2865
                                     self.op.remote_node)
2866
    if remote_node == instance.primary_node:
2867
      raise errors.OpPrereqError, ("The specified node is the primary node of"
2868
                                   " the instance.")
2869
    self.op.remote_node = remote_node
2870

    
2871
  def Exec(self, feedback_fn):
2872
    """Replace the disks of an instance.
2873

2874
    """
2875
    instance = self.instance
2876
    iv_names = {}
2877
    # start of work
2878
    remote_node = self.op.remote_node
2879
    cfg = self.cfg
2880
    for dev in instance.disks:
2881
      size = dev.size
2882
      new_drbd = _GenerateMDDRBDBranch(cfg, instance.primary_node,
2883
                                       remote_node, size,
2884
                                       "%s-%s" % (instance.name, dev.iv_name))
2885
      iv_names[dev.iv_name] = (dev, dev.children[0], new_drbd)
2886
      logger.Info("adding new mirror component on secondary for %s" %
2887
                  dev.iv_name)
2888
      #HARDCODE
2889
      if not _CreateBlockDevOnSecondary(cfg, remote_node, new_drbd, False):
2890
        raise errors.OpExecError, ("Failed to create new component on"
2891
                                   " secondary node %s\n"
2892
                                   "Full abort, cleanup manually!" %
2893
                                   remote_node)
2894

    
2895
      logger.Info("adding new mirror component on primary")
2896
      #HARDCODE
2897
      if not _CreateBlockDevOnPrimary(cfg, instance.primary_node, new_drbd):
2898
        # remove secondary dev
2899
        cfg.SetDiskID(new_drbd, remote_node)
2900
        rpc.call_blockdev_remove(remote_node, new_drbd)
2901
        raise errors.OpExecError("Failed to create volume on primary!\n"
2902
                                 "Full abort, cleanup manually!!")
2903

    
2904
      # the device exists now
2905
      # call the primary node to add the mirror to md
2906
      logger.Info("adding new mirror component to md")
2907
      if not rpc.call_blockdev_addchild(instance.primary_node, dev,
2908
                                             new_drbd):
2909
        logger.Error("Can't add mirror compoment to md!")
2910
        cfg.SetDiskID(new_drbd, remote_node)
2911
        if not rpc.call_blockdev_remove(remote_node, new_drbd):
2912
          logger.Error("Can't rollback on secondary")
2913
        cfg.SetDiskID(new_drbd, instance.primary_node)
2914
        if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
2915
          logger.Error("Can't rollback on primary")
2916
        raise errors.OpExecError, ("Full abort, cleanup manually!!")
2917

    
2918
      dev.children.append(new_drbd)
2919
      cfg.AddInstance(instance)
2920

    
2921
    # this can fail as the old devices are degraded and _WaitForSync
2922
    # does a combined result over all disks, so we don't check its
2923
    # return value
2924
    _WaitForSync(cfg, instance, unlock=True)
2925

    
2926
    # so check manually all the devices
2927
    for name in iv_names:
2928
      dev, child, new_drbd = iv_names[name]
2929
      cfg.SetDiskID(dev, instance.primary_node)
2930
      is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
2931
      if is_degr:
2932
        raise errors.OpExecError, ("MD device %s is degraded!" % name)
2933
      cfg.SetDiskID(new_drbd, instance.primary_node)
2934
      is_degr = rpc.call_blockdev_find(instance.primary_node, new_drbd)[5]
2935
      if is_degr:
2936
        raise errors.OpExecError, ("New drbd device %s is degraded!" % name)
2937

    
2938
    for name in iv_names:
2939
      dev, child, new_drbd = iv_names[name]
2940
      logger.Info("remove mirror %s component" % name)
2941
      cfg.SetDiskID(dev, instance.primary_node)
2942
      if not rpc.call_blockdev_removechild(instance.primary_node,
2943
                                                dev, child):
2944
        logger.Error("Can't remove child from mirror, aborting"
2945
                     " *this device cleanup*.\nYou need to cleanup manually!!")
2946
        continue
2947

    
2948
      for node in child.logical_id[:2]:
2949
        logger.Info("remove child device on %s" % node)
2950
        cfg.SetDiskID(child, node)
2951
        if not rpc.call_blockdev_remove(node, child):
2952
          logger.Error("Warning: failed to remove device from node %s,"
2953
                       " continuing operation." % node)
2954

    
2955
      dev.children.remove(child)
2956

    
2957
      cfg.AddInstance(instance)
2958

    
2959

    
2960
class LUQueryInstanceData(NoHooksLU):
2961
  """Query runtime instance data.
2962

2963
  """
2964
  _OP_REQP = ["instances"]
2965

    
2966
  def CheckPrereq(self):
2967
    """Check prerequisites.
2968

2969
    This only checks the optional instance list against the existing names.
2970

2971
    """
2972
    if not isinstance(self.op.instances, list):
2973
      raise errors.OpPrereqError, "Invalid argument type 'instances'"
2974
    if self.op.instances:
2975
      self.wanted_instances = []
2976
      names = self.op.instances
2977
      for name in names:
2978
        instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
2979
        if instance is None:
2980
          raise errors.OpPrereqError, ("No such instance name '%s'" % name)
2981
      self.wanted_instances.append(instance)
2982
    else:
2983
      self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
2984
                               in self.cfg.GetInstanceList()]
2985
    return
2986

    
2987

    
2988
  def _ComputeDiskStatus(self, instance, snode, dev):
2989
    """Compute block device status.
2990

2991
    """
2992
    self.cfg.SetDiskID(dev, instance.primary_node)
2993
    dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
2994
    if dev.dev_type == "drbd":
2995
      # we change the snode then (otherwise we use the one passed in)
2996
      if dev.logical_id[0] == instance.primary_node:
2997
        snode = dev.logical_id[1]
2998
      else:
2999
        snode = dev.logical_id[0]
3000

    
3001
    if snode:
3002
      self.cfg.SetDiskID(dev, snode)
3003
      dev_sstatus = rpc.call_blockdev_find(snode, dev)
3004
    else:
3005
      dev_sstatus = None
3006

    
3007
    if dev.children:
3008
      dev_children = [self._ComputeDiskStatus(instance, snode, child)
3009
                      for child in dev.children]
3010
    else:
3011
      dev_children = []
3012

    
3013
    data = {
3014
      "iv_name": dev.iv_name,
3015
      "dev_type": dev.dev_type,
3016
      "logical_id": dev.logical_id,
3017
      "physical_id": dev.physical_id,
3018
      "pstatus": dev_pstatus,
3019
      "sstatus": dev_sstatus,
3020
      "children": dev_children,
3021
      }
3022

    
3023
    return data
3024

    
3025
  def Exec(self, feedback_fn):
3026
    """Gather and return data"""
3027

    
3028
    result = {}
3029
    for instance in self.wanted_instances:
3030
      remote_info = rpc.call_instance_info(instance.primary_node,
3031
                                                instance.name)
3032
      if remote_info and "state" in remote_info:
3033
        remote_state = "up"
3034
      else:
3035
        remote_state = "down"
3036
      if instance.status == "down":
3037
        config_state = "down"
3038
      else:
3039
        config_state = "up"
3040

    
3041
      disks = [self._ComputeDiskStatus(instance, None, device)
3042
               for device in instance.disks]
3043

    
3044
      idict = {
3045
        "name": instance.name,
3046
        "config_state": config_state,
3047
        "run_state": remote_state,
3048
        "pnode": instance.primary_node,
3049
        "snodes": instance.secondary_nodes,
3050
        "os": instance.os,
3051
        "memory": instance.memory,
3052
        "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
3053
        "disks": disks,
3054
        }
3055

    
3056
      result[instance.name] = idict
3057

    
3058
    return result
3059

    
3060

    
3061
class LUQueryNodeData(NoHooksLU):
3062
  """Logical unit for querying node data.
3063

3064
  """
3065
  _OP_REQP = ["nodes"]
3066

    
3067
  def CheckPrereq(self):
3068
    """Check prerequisites.
3069

3070
    This only checks the optional node list against the existing names.
3071

3072
    """
3073
    if not isinstance(self.op.nodes, list):
3074
      raise errors.OpPrereqError, "Invalid argument type 'nodes'"
3075
    if self.op.nodes:
3076
      self.wanted_nodes = []
3077
      names = self.op.nodes
3078
      for name in names:
3079
        node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(name))
3080
        if node is None:
3081
          raise errors.OpPrereqError, ("No such node name '%s'" % name)
3082
      self.wanted_nodes.append(node)
3083
    else:
3084
      self.wanted_nodes = [self.cfg.GetNodeInfo(name) for name
3085
                           in self.cfg.GetNodeList()]
3086
    return
3087

    
3088
  def Exec(self, feedback_fn):
3089
    """Compute and return the list of nodes.
3090

3091
    """
3092

    
3093
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
3094
             in self.cfg.GetInstanceList()]
3095
    result = []
3096
    for node in self.wanted_nodes:
3097
      result.append((node.name, node.primary_ip, node.secondary_ip,
3098
                     [inst.name for inst in ilist
3099
                      if inst.primary_node == node.name],
3100
                     [inst.name for inst in ilist
3101
                      if node.name in inst.secondary_nodes],
3102
                     ))
3103
    return result
3104

    
3105

    
3106
class LUSetInstanceParms(LogicalUnit):
3107
  """Modifies an instances's parameters.
3108

3109
  """
3110
  HPATH = "instance-modify"
3111
  HTYPE = constants.HTYPE_INSTANCE
3112
  _OP_REQP = ["instance_name"]
3113

    
3114
  def BuildHooksEnv(self):
3115
    """Build hooks env.
3116

3117
    This runs on the master, primary and secondaries.
3118

3119
    """
3120
    env = {
3121
      "INSTANCE_NAME": self.op.instance_name,
3122
      }
3123
    if self.mem:
3124
      env["MEM_SIZE"] = self.mem
3125
    if self.vcpus:
3126
      env["VCPUS"] = self.vcpus
3127
    if self.do_ip:
3128
      env["INSTANCE_IP"] = self.ip
3129
    if self.bridge:
3130
      env["BRIDGE"] = self.bridge
3131

    
3132
    nl = [self.cfg.GetMaster(),
3133
          self.instance.primary_node] + list(self.instance.secondary_nodes)
3134

    
3135
    return env, nl, nl
3136

    
3137
  def CheckPrereq(self):
3138
    """Check prerequisites.
3139

3140
    This only checks the instance list against the existing names.
3141

3142
    """
3143
    self.mem = getattr(self.op, "mem", None)
3144
    self.vcpus = getattr(self.op, "vcpus", None)
3145
    self.ip = getattr(self.op, "ip", None)
3146
    self.bridge = getattr(self.op, "bridge", None)
3147
    if [self.mem, self.vcpus, self.ip, self.bridge].count(None) == 4:
3148
      raise errors.OpPrereqError, ("No changes submitted")
3149
    if self.mem is not None:
3150
      try:
3151
        self.mem = int(self.mem)
3152
      except ValueError, err:
3153
        raise errors.OpPrereqError, ("Invalid memory size: %s" % str(err))
3154
    if self.vcpus is not None:
3155
      try:
3156
        self.vcpus = int(self.vcpus)
3157
      except ValueError, err:
3158
        raise errors.OpPrereqError, ("Invalid vcpus number: %s" % str(err))
3159
    if self.ip is not None:
3160
      self.do_ip = True
3161
      if self.ip.lower() == "none":
3162
        self.ip = None
3163
      else:
3164
        if not utils.IsValidIP(self.ip):
3165
          raise errors.OpPrereqError, ("Invalid IP address '%s'." % self.ip)
3166
    else:
3167
      self.do_ip = False
3168

    
3169
    instance = self.cfg.GetInstanceInfo(
3170
      self.cfg.ExpandInstanceName(self.op.instance_name))
3171
    if instance is None:
3172
      raise errors.OpPrereqError, ("No such instance name '%s'" %
3173
                                   self.op.instance_name)
3174
    self.op.instance_name = instance.name
3175
    self.instance = instance
3176
    return
3177

    
3178
  def Exec(self, feedback_fn):
3179
    """Modifies an instance.
3180

3181
    All parameters take effect only at the next restart of the instance.
3182
    """
3183
    result = []
3184
    instance = self.instance
3185
    if self.mem:
3186
      instance.memory = self.mem
3187
      result.append(("mem", self.mem))
3188
    if self.vcpus:
3189
      instance.vcpus = self.vcpus
3190
      result.append(("vcpus",  self.vcpus))
3191
    if self.do_ip:
3192
      instance.nics[0].ip = self.ip
3193
      result.append(("ip", self.ip))
3194
    if self.bridge:
3195
      instance.nics[0].bridge = self.bridge
3196
      result.append(("bridge", self.bridge))
3197

    
3198
    self.cfg.AddInstance(instance)
3199

    
3200
    return result
3201

    
3202

    
3203
class LUQueryExports(NoHooksLU):
3204
  """Query the exports list
3205

3206
  """
3207
  _OP_REQP = []
3208

    
3209
  def CheckPrereq(self):
3210
    """Check that the nodelist contains only existing nodes.
3211

3212
    """
3213
    nodes = getattr(self.op, "nodes", None)
3214
    if not nodes:
3215
      self.op.nodes = self.cfg.GetNodeList()
3216
    else:
3217
      expnodes = [self.cfg.ExpandNodeName(node) for node in nodes]
3218
      if expnodes.count(None) > 0:
3219
        raise errors.OpPrereqError, ("At least one of the given nodes %s"
3220
                                     " is unknown" % self.op.nodes)
3221
      self.op.nodes = expnodes
3222

    
3223
  def Exec(self, feedback_fn):
3224

    
3225
    """Compute the list of all the exported system images.
3226

3227
    Returns:
3228
      a dictionary with the structure node->(export-list)
3229
      where export-list is a list of the instances exported on
3230
      that node.
3231

3232
    """
3233
    return rpc.call_export_list(self.op.nodes)
3234

    
3235

    
3236
class LUExportInstance(LogicalUnit):
3237
  """Export an instance to an image in the cluster.
3238

3239
  """
3240
  HPATH = "instance-export"
3241
  HTYPE = constants.HTYPE_INSTANCE
3242
  _OP_REQP = ["instance_name", "target_node", "shutdown"]
3243

    
3244
  def BuildHooksEnv(self):
3245
    """Build hooks env.
3246

3247
    This will run on the master, primary node and target node.
3248

3249
    """
3250
    env = {
3251
      "INSTANCE_NAME": self.op.instance_name,
3252
      "EXPORT_NODE": self.op.target_node,
3253
      "EXPORT_DO_SHUTDOWN": self.op.shutdown,
3254
      }
3255
    nl = [self.cfg.GetMaster(), self.instance.primary_node,
3256
          self.op.target_node]
3257
    return env, nl, nl
3258

    
3259
  def CheckPrereq(self):
3260
    """Check prerequisites.
3261

3262
    This checks that the instance name is a valid one.
3263

3264
    """
3265
    instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
3266
    self.instance = self.cfg.GetInstanceInfo(instance_name)
3267
    if self.instance is None:
3268
      raise errors.OpPrereqError, ("Instance '%s' not found" %
3269
                                   self.op.instance_name)
3270

    
3271
    # node verification
3272
    dst_node_short = self.cfg.ExpandNodeName(self.op.target_node)
3273
    self.dst_node = self.cfg.GetNodeInfo(dst_node_short)
3274

    
3275
    if self.dst_node is None:
3276
      raise errors.OpPrereqError, ("Destination node '%s' is uknown." %
3277
                                   self.op.target_node)
3278
    self.op.target_node = self.dst_node.name
3279

    
3280
  def Exec(self, feedback_fn):
3281
    """Export an instance to an image in the cluster.
3282

3283
    """
3284
    instance = self.instance
3285
    dst_node = self.dst_node
3286
    src_node = instance.primary_node
3287
    # shutdown the instance, unless requested not to do so
3288
    if self.op.shutdown:
3289
      op = opcodes.OpShutdownInstance(instance_name=instance.name)
3290
      self.processor.ChainOpCode(op, feedback_fn)
3291

    
3292
    vgname = self.cfg.GetVGName()
3293

    
3294
    snap_disks = []
3295

    
3296
    try:
3297
      for disk in instance.disks:
3298
        if disk.iv_name == "sda":
3299
          # new_dev_name will be a snapshot of an lvm leaf of the one we passed
3300
          new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
3301

    
3302
          if not new_dev_name:
3303
            logger.Error("could not snapshot block device %s on node %s" %
3304
                         (disk.logical_id[1], src_node))
3305
          else:
3306
            new_dev = objects.Disk(dev_type="lvm", size=disk.size,
3307
                                      logical_id=(vgname, new_dev_name),
3308
                                      physical_id=(vgname, new_dev_name),
3309
                                      iv_name=disk.iv_name)
3310
            snap_disks.append(new_dev)
3311

    
3312
    finally:
3313
      if self.op.shutdown:
3314
        op = opcodes.OpStartupInstance(instance_name=instance.name,
3315
                                       force=False)
3316
        self.processor.ChainOpCode(op, feedback_fn)
3317

    
3318
    # TODO: check for size
3319

    
3320
    for dev in snap_disks:
3321
      if not rpc.call_snapshot_export(src_node, dev, dst_node.name,
3322
                                           instance):
3323
        logger.Error("could not export block device %s from node"
3324
                     " %s to node %s" %
3325
                     (dev.logical_id[1], src_node, dst_node.name))
3326
      if not rpc.call_blockdev_remove(src_node, dev):
3327
        logger.Error("could not remove snapshot block device %s from"
3328
                     " node %s" % (dev.logical_id[1], src_node))
3329

    
3330
    if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
3331
      logger.Error("could not finalize export for instance %s on node %s" %
3332
                   (instance.name, dst_node.name))
3333

    
3334
    nodelist = self.cfg.GetNodeList()
3335
    nodelist.remove(dst_node.name)
3336

    
3337
    # on one-node clusters nodelist will be empty after the removal
3338
    # if we proceed the backup would be removed because OpQueryExports
3339
    # substitutes an empty list with the full cluster node list.
3340
    if nodelist:
3341
      op = opcodes.OpQueryExports(nodes=nodelist)
3342
      exportlist = self.processor.ChainOpCode(op, feedback_fn)
3343
      for node in exportlist:
3344
        if instance.name in exportlist[node]:
3345
          if not rpc.call_export_remove(node, instance.name):
3346
            logger.Error("could not remove older export for instance %s"
3347
                         " on node %s" % (instance.name, node))