Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ d0b3526f

History | View | Annotate | Download (109.3 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the commands used by gnt-* programs."""
23

    
24
# pylint: disable-msg=W0613,W0201
25

    
26
import os
27
import os.path
28
import sha
29
import socket
30
import time
31
import tempfile
32
import re
33
import platform
34

    
35
from ganeti import rpc
36
from ganeti import ssh
37
from ganeti import logger
38
from ganeti import utils
39
from ganeti import errors
40
from ganeti import hypervisor
41
from ganeti import config
42
from ganeti import constants
43
from ganeti import objects
44
from ganeti import opcodes
45
from ganeti import ssconf
46

    
47
class LogicalUnit(object):
48
  """Logical Unit base class..
49

50
  Subclasses must follow these rules:
51
    - implement CheckPrereq which also fills in the opcode instance
52
      with all the fields (even if as None)
53
    - implement Exec
54
    - implement BuildHooksEnv
55
    - redefine HPATH and HTYPE
56
    - optionally redefine their run requirements (REQ_CLUSTER,
57
      REQ_MASTER); note that all commands require root permissions
58

59
  """
60
  HPATH = None
61
  HTYPE = None
62
  _OP_REQP = []
63
  REQ_CLUSTER = True
64
  REQ_MASTER = True
65

    
66
  def __init__(self, processor, op, cfg, sstore):
67
    """Constructor for LogicalUnit.
68

69
    This needs to be overriden in derived classes in order to check op
70
    validity.
71

72
    """
73
    self.processor = processor
74
    self.op = op
75
    self.cfg = cfg
76
    self.sstore = sstore
77
    for attr_name in self._OP_REQP:
78
      attr_val = getattr(op, attr_name, None)
79
      if attr_val is None:
80
        raise errors.OpPrereqError, ("Required parameter '%s' missing" %
81
                                     attr_name)
82
    if self.REQ_CLUSTER:
83
      if not cfg.IsCluster():
84
        raise errors.OpPrereqError, ("Cluster not initialized yet,"
85
                                     " use 'gnt-cluster init' first.")
86
      if self.REQ_MASTER:
87
        master = cfg.GetMaster()
88
        if master != socket.gethostname():
89
          raise errors.OpPrereqError, ("Commands must be run on the master"
90
                                       " node %s" % master)
91

    
92
  def CheckPrereq(self):
93
    """Check prerequisites for this LU.
94

95
    This method should check that the prerequisites for the execution
96
    of this LU are fulfilled. It can do internode communication, but
97
    it should be idempotent - no cluster or system changes are
98
    allowed.
99

100
    The method should raise errors.OpPrereqError in case something is
101
    not fulfilled. Its return value is ignored.
102

103
    This method should also update all the parameters of the opcode to
104
    their canonical form; e.g. a short node name must be fully
105
    expanded after this method has successfully completed (so that
106
    hooks, logging, etc. work correctly).
107

108
    """
109
    raise NotImplementedError
110

    
111
  def Exec(self, feedback_fn):
112
    """Execute the LU.
113

114
    This method should implement the actual work. It should raise
115
    errors.OpExecError for failures that are somewhat dealt with in
116
    code, or expected.
117

118
    """
119
    raise NotImplementedError
120

    
121
  def BuildHooksEnv(self):
122
    """Build hooks environment for this LU.
123

124
    This method should return a three-node tuple consisting of: a dict
125
    containing the environment that will be used for running the
126
    specific hook for this LU, a list of node names on which the hook
127
    should run before the execution, and a list of node names on which
128
    the hook should run after the execution.
129

130
    The keys of the dict must not have 'GANETI_' prefixed as this will
131
    be handled in the hooks runner. Also note additional keys will be
132
    added by the hooks runner. If the LU doesn't define any
133
    environment, an empty dict (and not None) should be returned.
134

135
    As for the node lists, the master should not be included in the
136
    them, as it will be added by the hooks runner in case this LU
137
    requires a cluster to run on (otherwise we don't have a node
138
    list). No nodes should be returned as an empty list (and not
139
    None).
140

141
    Note that if the HPATH for a LU class is None, this function will
142
    not be called.
143

144
    """
145
    raise NotImplementedError
146

    
147

    
148
class NoHooksLU(LogicalUnit):
149
  """Simple LU which runs no hooks.
150

151
  This LU is intended as a parent for other LogicalUnits which will
152
  run no hooks, in order to reduce duplicate code.
153

154
  """
155
  HPATH = None
156
  HTYPE = None
157

    
158
  def BuildHooksEnv(self):
159
    """Build hooks env.
160

161
    This is a no-op, since we don't run hooks.
162

163
    """
164
    return
165

    
166

    
167
def _UpdateEtcHosts(fullnode, ip):
168
  """Ensure a node has a correct entry in /etc/hosts.
169

170
  Args:
171
    fullnode - Fully qualified domain name of host. (str)
172
    ip       - IPv4 address of host (str)
173

174
  """
175
  node = fullnode.split(".", 1)[0]
176

    
177
  f = open('/etc/hosts', 'r+')
178

    
179
  inthere = False
180

    
181
  save_lines = []
182
  add_lines = []
183
  removed = False
184

    
185
  while True:
186
    rawline = f.readline()
187

    
188
    if not rawline:
189
      # End of file
190
      break
191

    
192
    line = rawline.split('\n')[0]
193

    
194
    # Strip off comments
195
    line = line.split('#')[0]
196

    
197
    if not line:
198
      # Entire line was comment, skip
199
      save_lines.append(rawline)
200
      continue
201

    
202
    fields = line.split()
203

    
204
    haveall = True
205
    havesome = False
206
    for spec in [ ip, fullnode, node ]:
207
      if spec not in fields:
208
        haveall = False
209
      if spec in fields:
210
        havesome = True
211

    
212
    if haveall:
213
      inthere = True
214
      save_lines.append(rawline)
215
      continue
216

    
217
    if havesome and not haveall:
218
      # Line (old, or manual?) which is missing some.  Remove.
219
      removed = True
220
      continue
221

    
222
    save_lines.append(rawline)
223

    
224
  if not inthere:
225
    add_lines.append('%s\t%s %s\n' % (ip, fullnode, node))
226

    
227
  if removed:
228
    if add_lines:
229
      save_lines = save_lines + add_lines
230

    
231
    # We removed a line, write a new file and replace old.
232
    fd, tmpname = tempfile.mkstemp('tmp', 'hosts_', '/etc')
233
    newfile = os.fdopen(fd, 'w')
234
    newfile.write(''.join(save_lines))
235
    newfile.close()
236
    os.rename(tmpname, '/etc/hosts')
237

    
238
  elif add_lines:
239
    # Simply appending a new line will do the trick.
240
    f.seek(0, 2)
241
    for add in add_lines:
242
      f.write(add)
243

    
244
  f.close()
245

    
246

    
247
def _UpdateKnownHosts(fullnode, ip, pubkey):
248
  """Ensure a node has a correct known_hosts entry.
249

250
  Args:
251
    fullnode - Fully qualified domain name of host. (str)
252
    ip       - IPv4 address of host (str)
253
    pubkey   - the public key of the cluster
254

255
  """
256
  if os.path.exists('/etc/ssh/ssh_known_hosts'):
257
    f = open('/etc/ssh/ssh_known_hosts', 'r+')
258
  else:
259
    f = open('/etc/ssh/ssh_known_hosts', 'w+')
260

    
261
  inthere = False
262

    
263
  save_lines = []
264
  add_lines = []
265
  removed = False
266

    
267
  while True:
268
    rawline = f.readline()
269
    logger.Debug('read %s' % (repr(rawline),))
270

    
271
    if not rawline:
272
      # End of file
273
      break
274

    
275
    line = rawline.split('\n')[0]
276

    
277
    parts = line.split(' ')
278
    fields = parts[0].split(',')
279
    key = parts[2]
280

    
281
    haveall = True
282
    havesome = False
283
    for spec in [ ip, fullnode ]:
284
      if spec not in fields:
285
        haveall = False
286
      if spec in fields:
287
        havesome = True
288

    
289
    logger.Debug("key, pubkey = %s." % (repr((key, pubkey)),))
290
    if haveall and key == pubkey:
291
      inthere = True
292
      save_lines.append(rawline)
293
      logger.Debug("Keeping known_hosts '%s'." % (repr(rawline),))
294
      continue
295

    
296
    if havesome and (not haveall or key != pubkey):
297
      removed = True
298
      logger.Debug("Discarding known_hosts '%s'." % (repr(rawline),))
299
      continue
300

    
301
    save_lines.append(rawline)
302

    
303
  if not inthere:
304
    add_lines.append('%s,%s ssh-rsa %s\n' % (fullnode, ip, pubkey))
305
    logger.Debug("Adding known_hosts '%s'." % (repr(add_lines[-1]),))
306

    
307
  if removed:
308
    save_lines = save_lines + add_lines
309

    
310
    # Write a new file and replace old.
311
    fd, tmpname = tempfile.mkstemp('tmp', 'ssh_known_hosts_', '/etc/ssh')
312
    newfile = os.fdopen(fd, 'w')
313
    newfile.write(''.join(save_lines))
314
    newfile.close()
315
    logger.Debug("Wrote new known_hosts.")
316
    os.rename(tmpname, '/etc/ssh/ssh_known_hosts')
317

    
318
  elif add_lines:
319
    # Simply appending a new line will do the trick.
320
    f.seek(0, 2)
321
    for add in add_lines:
322
      f.write(add)
323

    
324
  f.close()
325

    
326

    
327
def _HasValidVG(vglist, vgname):
328
  """Checks if the volume group list is valid.
329

330
  A non-None return value means there's an error, and the return value
331
  is the error message.
332

333
  """
334
  vgsize = vglist.get(vgname, None)
335
  if vgsize is None:
336
    return "volume group '%s' missing" % vgname
337
  elif vgsize < 20480:
338
    return ("volume group '%s' too small (20480MiB required, %dMib found" %
339
            vgname, vgsize)
340
  return None
341

    
342

    
343
def _InitSSHSetup(node):
344
  """Setup the SSH configuration for the cluster.
345

346

347
  This generates a dsa keypair for root, adds the pub key to the
348
  permitted hosts and adds the hostkey to its own known hosts.
349

350
  Args:
351
    node: the name of this host as a fqdn
352

353
  """
354
  utils.RemoveFile('/root/.ssh/known_hosts')
355

    
356
  if os.path.exists('/root/.ssh/id_dsa'):
357
    utils.CreateBackup('/root/.ssh/id_dsa')
358
  if os.path.exists('/root/.ssh/id_dsa.pub'):
359
    utils.CreateBackup('/root/.ssh/id_dsa.pub')
360

    
361
  utils.RemoveFile('/root/.ssh/id_dsa')
362
  utils.RemoveFile('/root/.ssh/id_dsa.pub')
363

    
364
  result = utils.RunCmd(["ssh-keygen", "-t", "dsa",
365
                         "-f", "/root/.ssh/id_dsa",
366
                         "-q", "-N", ""])
367
  if result.failed:
368
    raise errors.OpExecError, ("could not generate ssh keypair, error %s" %
369
                               result.output)
370

    
371
  f = open('/root/.ssh/id_dsa.pub', 'r')
372
  try:
373
    utils.AddAuthorizedKey('/root/.ssh/authorized_keys', f.read(8192))
374
  finally:
375
    f.close()
376

    
377

    
378
def _InitGanetiServerSetup(ss):
379
  """Setup the necessary configuration for the initial node daemon.
380

381
  This creates the nodepass file containing the shared password for
382
  the cluster and also generates the SSL certificate.
383

384
  """
385
  # Create pseudo random password
386
  randpass = sha.new(os.urandom(64)).hexdigest()
387
  # and write it into sstore
388
  ss.SetKey(ss.SS_NODED_PASS, randpass)
389

    
390
  result = utils.RunCmd(["openssl", "req", "-new", "-newkey", "rsa:1024",
391
                         "-days", str(365*5), "-nodes", "-x509",
392
                         "-keyout", constants.SSL_CERT_FILE,
393
                         "-out", constants.SSL_CERT_FILE, "-batch"])
394
  if result.failed:
395
    raise errors.OpExecError, ("could not generate server ssl cert, command"
396
                               " %s had exitcode %s and error message %s" %
397
                               (result.cmd, result.exit_code, result.output))
398

    
399
  os.chmod(constants.SSL_CERT_FILE, 0400)
400

    
401
  result = utils.RunCmd([constants.NODE_INITD_SCRIPT, "restart"])
402

    
403
  if result.failed:
404
    raise errors.OpExecError, ("could not start the node daemon, command %s"
405
                               " had exitcode %s and error %s" %
406
                               (result.cmd, result.exit_code, result.output))
407

    
408

    
409
def _InitClusterInterface(fullname, name, ip):
410
  """Initialize the master startup script.
411

412
  """
413
  f = file(constants.CLUSTER_NAME_FILE, 'w')
414
  f.write("%s\n" % fullname)
415
  f.close()
416

    
417
  f = file(constants.MASTER_INITD_SCRIPT, 'w')
418
  f.write ("#!/bin/sh\n")
419
  f.write ("\n")
420
  f.write ("# Start Ganeti Master Virtual Address\n")
421
  f.write ("\n")
422
  f.write ("DESC=\"Ganeti Master IP\"\n")
423
  f.write ("MASTERNAME=\"%s\"\n" % name)
424
  f.write ("MASTERIP=\"%s\"\n" % ip)
425
  f.write ("case \"$1\" in\n")
426
  f.write ("  start)\n")
427
  f.write ("    if fping -q -c 3 ${MASTERIP} &>/dev/null; then\n")
428
  f.write ("        echo \"$MASTERNAME no-go - there is already a master.\"\n")
429
  f.write ("        rm -f %s\n" % constants.MASTER_CRON_LINK)
430
  f.write ("        scp ${MASTERNAME}:%s %s\n" %
431
           (constants.CLUSTER_CONF_FILE, constants.CLUSTER_CONF_FILE))
432
  f.write ("    else\n")
433
  f.write ("        echo -n \"Starting $DESC: \"\n")
434
  f.write ("        ip address add ${MASTERIP}/32 dev xen-br0"
435
           " label xen-br0:0\n")
436
  f.write ("        arping -q -U -c 3 -I xen-br0 -s ${MASTERIP} ${MASTERIP}\n")
437
  f.write ("        echo \"$MASTERNAME.\"\n")
438
  f.write ("    fi\n")
439
  f.write ("    ;;\n")
440
  f.write ("  stop)\n")
441
  f.write ("    echo -n \"Stopping $DESC: \"\n")
442
  f.write ("    ip address del ${MASTERIP}/32 dev xen-br0\n")
443
  f.write ("    echo \"$MASTERNAME.\"\n")
444
  f.write ("    ;;\n")
445
  f.write ("  *)\n")
446
  f.write ("    echo \"Usage: $0 {start|stop}\" >&2\n")
447
  f.write ("    exit 1\n")
448
  f.write ("    ;;\n")
449
  f.write ("esac\n")
450
  f.write ("\n")
451
  f.write ("exit 0\n")
452
  f.flush()
453
  os.fsync(f.fileno())
454
  f.close()
455
  os.chmod(constants.MASTER_INITD_SCRIPT, 0755)
456

    
457

    
458
class LUInitCluster(LogicalUnit):
459
  """Initialise the cluster.
460

461
  """
462
  HPATH = "cluster-init"
463
  HTYPE = constants.HTYPE_CLUSTER
464
  _OP_REQP = ["cluster_name", "hypervisor_type", "vg_name", "mac_prefix",
465
              "def_bridge"]
466
  REQ_CLUSTER = False
467

    
468
  def BuildHooksEnv(self):
469
    """Build hooks env.
470

471
    Notes: Since we don't require a cluster, we must manually add
472
    ourselves in the post-run node list.
473

474
    """
475

    
476
    env = {"CLUSTER": self.op.cluster_name,
477
           "MASTER": self.hostname}
478
    return env, [], [self.hostname['hostname_full']]
479

    
480
  def CheckPrereq(self):
481
    """Verify that the passed name is a valid one.
482

483
    """
484
    if config.ConfigWriter.IsCluster():
485
      raise errors.OpPrereqError, ("Cluster is already initialised")
486

    
487
    hostname_local = socket.gethostname()
488
    self.hostname = hostname = utils.LookupHostname(hostname_local)
489
    if not hostname:
490
      raise errors.OpPrereqError, ("Cannot resolve my own hostname ('%s')" %
491
                                   hostname_local)
492

    
493
    self.clustername = clustername = utils.LookupHostname(self.op.cluster_name)
494
    if not clustername:
495
      raise errors.OpPrereqError, ("Cannot resolve given cluster name ('%s')"
496
                                   % self.op.cluster_name)
497

    
498
    result = utils.RunCmd(["fping", "-S127.0.0.1", "-q", hostname['ip']])
499
    if result.failed:
500
      raise errors.OpPrereqError, ("Inconsistency: this host's name resolves"
501
                                   " to %s,\nbut this ip address does not"
502
                                   " belong to this host."
503
                                   " Aborting." % hostname['ip'])
504

    
505
    secondary_ip = getattr(self.op, "secondary_ip", None)
506
    if secondary_ip and not utils.IsValidIP(secondary_ip):
507
      raise errors.OpPrereqError, ("Invalid secondary ip given")
508
    if secondary_ip and secondary_ip != hostname['ip']:
509
      result = utils.RunCmd(["fping", "-S127.0.0.1", "-q", secondary_ip])
510
      if result.failed:
511
        raise errors.OpPrereqError, ("You gave %s as secondary IP,\n"
512
                                     "but it does not belong to this host." %
513
                                     secondary_ip)
514
    self.secondary_ip = secondary_ip
515

    
516
    # checks presence of the volume group given
517
    vgstatus = _HasValidVG(utils.ListVolumeGroups(), self.op.vg_name)
518

    
519
    if vgstatus:
520
      raise errors.OpPrereqError, ("Error: %s" % vgstatus)
521

    
522
    if not re.match("^[0-9a-z]{2}:[0-9a-z]{2}:[0-9a-z]{2}$",
523
                    self.op.mac_prefix):
524
      raise errors.OpPrereqError, ("Invalid mac prefix given '%s'" %
525
                                   self.op.mac_prefix)
526

    
527
    if self.op.hypervisor_type not in hypervisor.VALID_HTYPES:
528
      raise errors.OpPrereqError, ("Invalid hypervisor type given '%s'" %
529
                                   self.op.hypervisor_type)
530

    
531
  def Exec(self, feedback_fn):
532
    """Initialize the cluster.
533

534
    """
535
    clustername = self.clustername
536
    hostname = self.hostname
537

    
538
    # adds the cluste name file and master startup script
539
    _InitClusterInterface(clustername['hostname_full'],
540
                          clustername['hostname'],
541
                          clustername['ip'])
542

    
543
    # set up the simple store
544
    ss = ssconf.SimpleStore()
545
    ss.SetKey(ss.SS_HYPERVISOR, self.op.hypervisor_type)
546

    
547
    # set up the inter-node password and certificate
548
    _InitGanetiServerSetup(ss)
549

    
550
    # start the master ip
551
    rpc.call_node_start_master(hostname['hostname_full'])
552

    
553
    # set up ssh config and /etc/hosts
554
    f = open('/etc/ssh/ssh_host_rsa_key.pub', 'r')
555
    try:
556
      sshline = f.read()
557
    finally:
558
      f.close()
559
    sshkey = sshline.split(" ")[1]
560

    
561
    _UpdateEtcHosts(hostname['hostname_full'],
562
                    hostname['ip'],
563
                    )
564

    
565
    _UpdateKnownHosts(hostname['hostname_full'],
566
                      hostname['ip'],
567
                      sshkey,
568
                      )
569

    
570
    _InitSSHSetup(hostname['hostname'])
571

    
572
    # init of cluster config file
573
    cfgw = config.ConfigWriter()
574
    cfgw.InitConfig(hostname['hostname'], hostname['ip'], self.secondary_ip,
575
                    clustername['hostname'], sshkey, self.op.mac_prefix,
576
                    self.op.vg_name, self.op.def_bridge)
577

    
578

    
579
class LUDestroyCluster(NoHooksLU):
580
  """Logical unit for destroying the cluster.
581

582
  """
583
  _OP_REQP = []
584

    
585
  def CheckPrereq(self):
586
    """Check prerequisites.
587

588
    This checks whether the cluster is empty.
589

590
    Any errors are signalled by raising errors.OpPrereqError.
591

592
    """
593
    master = self.cfg.GetMaster()
594

    
595
    nodelist = self.cfg.GetNodeList()
596
    if len(nodelist) > 0 and nodelist != [master]:
597
        raise errors.OpPrereqError, ("There are still %d node(s) in "
598
                                     "this cluster." % (len(nodelist) - 1))
599

    
600
  def Exec(self, feedback_fn):
601
    """Destroys the cluster.
602

603
    """
604
    utils.CreateBackup('/root/.ssh/id_dsa')
605
    utils.CreateBackup('/root/.ssh/id_dsa.pub')
606
    rpc.call_node_leave_cluster(self.cfg.GetMaster())
607

    
608

    
609
class LUVerifyCluster(NoHooksLU):
610
  """Verifies the cluster status.
611

612
  """
613
  _OP_REQP = []
614

    
615
  def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
616
                  remote_version, feedback_fn):
617
    """Run multiple tests against a node.
618

619
    Test list:
620
      - compares ganeti version
621
      - checks vg existance and size > 20G
622
      - checks config file checksum
623
      - checks ssh to other nodes
624

625
    Args:
626
      node: name of the node to check
627
      file_list: required list of files
628
      local_cksum: dictionary of local files and their checksums
629
    """
630
    # compares ganeti version
631
    local_version = constants.PROTOCOL_VERSION
632
    if not remote_version:
633
      feedback_fn(" - ERROR: connection to %s failed" % (node))
634
      return True
635

    
636
    if local_version != remote_version:
637
      feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
638
                      (local_version, node, remote_version))
639
      return True
640

    
641
    # checks vg existance and size > 20G
642

    
643
    bad = False
644
    if not vglist:
645
      feedback_fn("  - ERROR: unable to check volume groups on node %s." %
646
                      (node,))
647
      bad = True
648
    else:
649
      vgstatus = _HasValidVG(vglist, self.cfg.GetVGName())
650
      if vgstatus:
651
        feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
652
        bad = True
653

    
654
    # checks config file checksum
655
    # checks ssh to any
656

    
657
    if 'filelist' not in node_result:
658
      bad = True
659
      feedback_fn("  - ERROR: node hasn't returned file checksum data")
660
    else:
661
      remote_cksum = node_result['filelist']
662
      for file_name in file_list:
663
        if file_name not in remote_cksum:
664
          bad = True
665
          feedback_fn("  - ERROR: file '%s' missing" % file_name)
666
        elif remote_cksum[file_name] != local_cksum[file_name]:
667
          bad = True
668
          feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
669

    
670
    if 'nodelist' not in node_result:
671
      bad = True
672
      feedback_fn("  - ERROR: node hasn't returned node connectivity data")
673
    else:
674
      if node_result['nodelist']:
675
        bad = True
676
        for node in node_result['nodelist']:
677
          feedback_fn("  - ERROR: communication with node '%s': %s" %
678
                          (node, node_result['nodelist'][node]))
679
    hyp_result = node_result.get('hypervisor', None)
680
    if hyp_result is not None:
681
      feedback_fn("  - ERROR: hypervisor verify failure: '%s'" % hyp_result)
682
    return bad
683

    
684
  def _VerifyInstance(self, instance, node_vol_is, node_instance, feedback_fn):
685
    """Verify an instance.
686

687
    This function checks to see if the required block devices are
688
    available on the instance's node.
689

690
    """
691
    bad = False
692

    
693
    instancelist = self.cfg.GetInstanceList()
694
    if not instance in instancelist:
695
      feedback_fn("  - ERROR: instance %s not in instance list %s" %
696
                      (instance, instancelist))
697
      bad = True
698

    
699
    instanceconfig = self.cfg.GetInstanceInfo(instance)
700
    node_current = instanceconfig.primary_node
701

    
702
    node_vol_should = {}
703
    instanceconfig.MapLVsByNode(node_vol_should)
704

    
705
    for node in node_vol_should:
706
      for volume in node_vol_should[node]:
707
        if node not in node_vol_is or volume not in node_vol_is[node]:
708
          feedback_fn("  - ERROR: volume %s missing on node %s" %
709
                          (volume, node))
710
          bad = True
711

    
712
    if not instanceconfig.status == 'down':
713
      if not instance in node_instance[node_current]:
714
        feedback_fn("  - ERROR: instance %s not running on node %s" %
715
                        (instance, node_current))
716
        bad = True
717

    
718
    for node in node_instance:
719
      if (not node == node_current):
720
        if instance in node_instance[node]:
721
          feedback_fn("  - ERROR: instance %s should not run on node %s" %
722
                          (instance, node))
723
          bad = True
724

    
725
    return not bad
726

    
727
  def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
728
    """Verify if there are any unknown volumes in the cluster.
729

730
    The .os, .swap and backup volumes are ignored. All other volumes are
731
    reported as unknown.
732

733
    """
734
    bad = False
735

    
736
    for node in node_vol_is:
737
      for volume in node_vol_is[node]:
738
        if node not in node_vol_should or volume not in node_vol_should[node]:
739
          feedback_fn("  - ERROR: volume %s on node %s should not exist" %
740
                      (volume, node))
741
          bad = True
742
    return bad
743

    
744

    
745
  def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
746
    """Verify the list of running instances.
747

748
    This checks what instances are running but unknown to the cluster.
749

750
    """
751
    bad = False
752
    for node in node_instance:
753
      for runninginstance in node_instance[node]:
754
        if runninginstance not in instancelist:
755
          feedback_fn("  - ERROR: instance %s on node %s should not exist" %
756
                          (runninginstance, node))
757
          bad = True
758
    return bad
759

    
760
  def _VerifyNodeConfigFiles(self, ismaster, node, file_list, feedback_fn):
761
    """Verify the list of node config files"""
762

    
763
    bad = False
764
    for file_name in constants.MASTER_CONFIGFILES:
765
      if ismaster and file_name not in file_list:
766
        feedback_fn("  - ERROR: master config file %s missing from master"
767
                    " node %s" % (file_name, node))
768
        bad = True
769
      elif not ismaster and file_name in file_list:
770
        feedback_fn("  - ERROR: master config file %s should not exist"
771
                    " on non-master node %s" % (file_name, node))
772
        bad = True
773

    
774
    for file_name in constants.NODE_CONFIGFILES:
775
      if file_name not in file_list:
776
        feedback_fn("  - ERROR: config file %s missing from node %s" %
777
                    (file_name, node))
778
        bad = True
779

    
780
    return bad
781

    
782
  def CheckPrereq(self):
783
    """Check prerequisites.
784

785
    This has no prerequisites.
786

787
    """
788
    pass
789

    
790
  def Exec(self, feedback_fn):
791
    """Verify integrity of cluster, performing various test on nodes.
792

793
    """
794
    bad = False
795
    feedback_fn("* Verifying global settings")
796
    self.cfg.VerifyConfig()
797

    
798
    master = self.cfg.GetMaster()
799
    vg_name = self.cfg.GetVGName()
800
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
801
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
802
    node_volume = {}
803
    node_instance = {}
804

    
805
    # FIXME: verify OS list
806
    # do local checksums
807
    file_names = constants.CLUSTER_CONF_FILES
808
    local_checksums = utils.FingerprintFiles(file_names)
809

    
810
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
811
    all_configfile = rpc.call_configfile_list(nodelist)
812
    all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
813
    all_instanceinfo = rpc.call_instance_list(nodelist)
814
    all_vglist = rpc.call_vg_list(nodelist)
815
    node_verify_param = {
816
      'filelist': file_names,
817
      'nodelist': nodelist,
818
      'hypervisor': None,
819
      }
820
    all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
821
    all_rversion = rpc.call_version(nodelist)
822

    
823
    for node in nodelist:
824
      feedback_fn("* Verifying node %s" % node)
825
      result = self._VerifyNode(node, file_names, local_checksums,
826
                                all_vglist[node], all_nvinfo[node],
827
                                all_rversion[node], feedback_fn)
828
      bad = bad or result
829
      # node_configfile
830
      nodeconfigfile = all_configfile[node]
831

    
832
      if not nodeconfigfile:
833
        feedback_fn("  - ERROR: connection to %s failed" % (node))
834
        bad = True
835
        continue
836

    
837
      bad = bad or self._VerifyNodeConfigFiles(node==master, node,
838
                                               nodeconfigfile, feedback_fn)
839

    
840
      # node_volume
841
      volumeinfo = all_volumeinfo[node]
842

    
843
      if type(volumeinfo) != dict:
844
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
845
        bad = True
846
        continue
847

    
848
      node_volume[node] = volumeinfo
849

    
850
      # node_instance
851
      nodeinstance = all_instanceinfo[node]
852
      if type(nodeinstance) != list:
853
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
854
        bad = True
855
        continue
856

    
857
      node_instance[node] = nodeinstance
858

    
859
    node_vol_should = {}
860

    
861
    for instance in instancelist:
862
      feedback_fn("* Verifying instance %s" % instance)
863
      result =  self._VerifyInstance(instance, node_volume, node_instance,
864
                                     feedback_fn)
865
      bad = bad or result
866

    
867
      inst_config = self.cfg.GetInstanceInfo(instance)
868

    
869
      inst_config.MapLVsByNode(node_vol_should)
870

    
871
    feedback_fn("* Verifying orphan volumes")
872
    result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
873
                                       feedback_fn)
874
    bad = bad or result
875

    
876
    feedback_fn("* Verifying remaining instances")
877
    result = self._VerifyOrphanInstances(instancelist, node_instance,
878
                                         feedback_fn)
879
    bad = bad or result
880

    
881
    return int(bad)
882

    
883

    
884
def _WaitForSync(cfgw, instance, oneshot=False, unlock=False):
885
  """Sleep and poll for an instance's disk to sync.
886

887
  """
888
  if not instance.disks:
889
    return True
890

    
891
  if not oneshot:
892
    logger.ToStdout("Waiting for instance %s to sync disks." % instance.name)
893

    
894
  node = instance.primary_node
895

    
896
  for dev in instance.disks:
897
    cfgw.SetDiskID(dev, node)
898

    
899
  retries = 0
900
  while True:
901
    max_time = 0
902
    done = True
903
    cumul_degraded = False
904
    rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
905
    if not rstats:
906
      logger.ToStderr("Can't get any data from node %s" % node)
907
      retries += 1
908
      if retries >= 10:
909
        raise errors.RemoteError, ("Can't contact node %s for mirror data,"
910
                                   " aborting." % node)
911
      time.sleep(6)
912
      continue
913
    retries = 0
914
    for i in range(len(rstats)):
915
      mstat = rstats[i]
916
      if mstat is None:
917
        logger.ToStderr("Can't compute data for node %s/%s" %
918
                        (node, instance.disks[i].iv_name))
919
        continue
920
      perc_done, est_time, is_degraded = mstat
921
      cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
922
      if perc_done is not None:
923
        done = False
924
        if est_time is not None:
925
          rem_time = "%d estimated seconds remaining" % est_time
926
          max_time = est_time
927
        else:
928
          rem_time = "no time estimate"
929
        logger.ToStdout("- device %s: %5.2f%% done, %s" %
930
                        (instance.disks[i].iv_name, perc_done, rem_time))
931
    if done or oneshot:
932
      break
933

    
934
    if unlock:
935
      utils.Unlock('cmd')
936
    try:
937
      time.sleep(min(60, max_time))
938
    finally:
939
      if unlock:
940
        utils.Lock('cmd')
941

    
942
  if done:
943
    logger.ToStdout("Instance %s's disks are in sync." % instance.name)
944
  return not cumul_degraded
945

    
946

    
947
def _CheckDiskConsistency(cfgw, dev, node, on_primary):
948
  """Check that mirrors are not degraded.
949

950
  """
951

    
952
  cfgw.SetDiskID(dev, node)
953

    
954
  result = True
955
  if on_primary or dev.AssembleOnSecondary():
956
    rstats = rpc.call_blockdev_find(node, dev)
957
    if not rstats:
958
      logger.ToStderr("Can't get any data from node %s" % node)
959
      result = False
960
    else:
961
      result = result and (not rstats[5])
962
  if dev.children:
963
    for child in dev.children:
964
      result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
965

    
966
  return result
967

    
968

    
969
class LUDiagnoseOS(NoHooksLU):
970
  """Logical unit for OS diagnose/query.
971

972
  """
973
  _OP_REQP = []
974

    
975
  def CheckPrereq(self):
976
    """Check prerequisites.
977

978
    This always succeeds, since this is a pure query LU.
979

980
    """
981
    return
982

    
983
  def Exec(self, feedback_fn):
984
    """Compute the list of OSes.
985

986
    """
987
    node_list = self.cfg.GetNodeList()
988
    node_data = rpc.call_os_diagnose(node_list)
989
    if node_data == False:
990
      raise errors.OpExecError, "Can't gather the list of OSes"
991
    return node_data
992

    
993

    
994
class LURemoveNode(LogicalUnit):
995
  """Logical unit for removing a node.
996

997
  """
998
  HPATH = "node-remove"
999
  HTYPE = constants.HTYPE_NODE
1000
  _OP_REQP = ["node_name"]
1001

    
1002
  def BuildHooksEnv(self):
1003
    """Build hooks env.
1004

1005
    This doesn't run on the target node in the pre phase as a failed
1006
    node would not allows itself to run.
1007

1008
    """
1009
    all_nodes = self.cfg.GetNodeList()
1010
    all_nodes.remove(self.op.node_name)
1011
    return {"NODE_NAME": self.op.node_name}, all_nodes, all_nodes
1012

    
1013
  def CheckPrereq(self):
1014
    """Check prerequisites.
1015

1016
    This checks:
1017
     - the node exists in the configuration
1018
     - it does not have primary or secondary instances
1019
     - it's not the master
1020

1021
    Any errors are signalled by raising errors.OpPrereqError.
1022

1023
    """
1024

    
1025
    node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1026
    if node is None:
1027
      logger.Error("Error: Node '%s' is unknown." % self.op.node_name)
1028
      return 1
1029

    
1030
    instance_list = self.cfg.GetInstanceList()
1031

    
1032
    masternode = self.cfg.GetMaster()
1033
    if node.name == masternode:
1034
      raise errors.OpPrereqError, ("Node is the master node,"
1035
                                   " you need to failover first.")
1036

    
1037
    for instance_name in instance_list:
1038
      instance = self.cfg.GetInstanceInfo(instance_name)
1039
      if node.name == instance.primary_node:
1040
        raise errors.OpPrereqError, ("Instance %s still running on the node,"
1041
                                     " please remove first." % instance_name)
1042
      if node.name in instance.secondary_nodes:
1043
        raise errors.OpPrereqError, ("Instance %s has node as a secondary,"
1044
                                     " please remove first." % instance_name)
1045
    self.op.node_name = node.name
1046
    self.node = node
1047

    
1048
  def Exec(self, feedback_fn):
1049
    """Removes the node from the cluster.
1050

1051
    """
1052
    node = self.node
1053
    logger.Info("stopping the node daemon and removing configs from node %s" %
1054
                node.name)
1055

    
1056
    rpc.call_node_leave_cluster(node.name)
1057

    
1058
    ssh.SSHCall(node.name, 'root', "%s stop" % constants.NODE_INITD_SCRIPT)
1059

    
1060
    logger.Info("Removing node %s from config" % node.name)
1061

    
1062
    self.cfg.RemoveNode(node.name)
1063

    
1064

    
1065
class LUQueryNodes(NoHooksLU):
1066
  """Logical unit for querying nodes.
1067

1068
  """
1069
  _OP_REQP = ["output_fields"]
1070

    
1071
  def CheckPrereq(self):
1072
    """Check prerequisites.
1073

1074
    This checks that the fields required are valid output fields.
1075

1076
    """
1077
    self.static_fields = frozenset(["name", "pinst", "sinst", "pip", "sip"])
1078
    self.dynamic_fields = frozenset(["dtotal", "dfree",
1079
                                     "mtotal", "mnode", "mfree"])
1080
    self.all_fields = self.static_fields | self.dynamic_fields
1081

    
1082
    if not self.all_fields.issuperset(self.op.output_fields):
1083
      raise errors.OpPrereqError, ("Unknown output fields selected: %s"
1084
                                   % ",".join(frozenset(self.op.output_fields).
1085
                                              difference(self.all_fields)))
1086

    
1087

    
1088
  def Exec(self, feedback_fn):
1089
    """Computes the list of nodes and their attributes.
1090

1091
    """
1092
    nodenames = utils.NiceSort(self.cfg.GetNodeList())
1093
    nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1094

    
1095

    
1096
    # begin data gathering
1097

    
1098
    if self.dynamic_fields.intersection(self.op.output_fields):
1099
      live_data = {}
1100
      node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1101
      for name in nodenames:
1102
        nodeinfo = node_data.get(name, None)
1103
        if nodeinfo:
1104
          live_data[name] = {
1105
            "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1106
            "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1107
            "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1108
            "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1109
            "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1110
            }
1111
        else:
1112
          live_data[name] = {}
1113
    else:
1114
      live_data = dict.fromkeys(nodenames, {})
1115

    
1116
    node_to_primary = dict.fromkeys(nodenames, 0)
1117
    node_to_secondary = dict.fromkeys(nodenames, 0)
1118

    
1119
    if "pinst" in self.op.output_fields or "sinst" in self.op.output_fields:
1120
      instancelist = self.cfg.GetInstanceList()
1121

    
1122
      for instance in instancelist:
1123
        instanceinfo = self.cfg.GetInstanceInfo(instance)
1124
        node_to_primary[instanceinfo.primary_node] += 1
1125
        for secnode in instanceinfo.secondary_nodes:
1126
          node_to_secondary[secnode] += 1
1127

    
1128
    # end data gathering
1129

    
1130
    output = []
1131
    for node in nodelist:
1132
      node_output = []
1133
      for field in self.op.output_fields:
1134
        if field == "name":
1135
          val = node.name
1136
        elif field == "pinst":
1137
          val = node_to_primary[node.name]
1138
        elif field == "sinst":
1139
          val = node_to_secondary[node.name]
1140
        elif field == "pip":
1141
          val = node.primary_ip
1142
        elif field == "sip":
1143
          val = node.secondary_ip
1144
        elif field in self.dynamic_fields:
1145
          val = live_data[node.name].get(field, "?")
1146
        else:
1147
          raise errors.ParameterError, field
1148
        val = str(val)
1149
        node_output.append(val)
1150
      output.append(node_output)
1151

    
1152
    return output
1153

    
1154

    
1155
def _CheckNodesDirs(node_list, paths):
1156
  """Verify if the given nodes have the same files.
1157

1158
  Args:
1159
    node_list: the list of node names to check
1160
    paths: the list of directories to checksum and compare
1161

1162
  Returns:
1163
    list of (node, different_file, message); if empty, the files are in sync
1164

1165
  """
1166
  file_names = []
1167
  for dir_name in paths:
1168
    flist = [os.path.join(dir_name, name) for name in os.listdir(dir_name)]
1169
    flist = [name for name in flist if os.path.isfile(name)]
1170
    file_names.extend(flist)
1171

    
1172
  local_checksums = utils.FingerprintFiles(file_names)
1173

    
1174
  results = []
1175
  verify_params = {'filelist': file_names}
1176
  all_node_results = rpc.call_node_verify(node_list, verify_params)
1177
  for node_name in node_list:
1178
    node_result = all_node_results.get(node_name, False)
1179
    if not node_result or 'filelist' not in node_result:
1180
      results.append((node_name, "'all files'", "node communication error"))
1181
      continue
1182
    remote_checksums = node_result['filelist']
1183
    for fname in local_checksums:
1184
      if fname not in remote_checksums:
1185
        results.append((node_name, fname, "missing file"))
1186
      elif remote_checksums[fname] != local_checksums[fname]:
1187
        results.append((node_name, fname, "wrong checksum"))
1188
  return results
1189

    
1190

    
1191
class LUAddNode(LogicalUnit):
1192
  """Logical unit for adding node to the cluster.
1193

1194
  """
1195
  HPATH = "node-add"
1196
  HTYPE = constants.HTYPE_NODE
1197
  _OP_REQP = ["node_name"]
1198

    
1199
  def BuildHooksEnv(self):
1200
    """Build hooks env.
1201

1202
    This will run on all nodes before, and on all nodes + the new node after.
1203

1204
    """
1205
    env = {
1206
      "NODE_NAME": self.op.node_name,
1207
      "NODE_PIP": self.op.primary_ip,
1208
      "NODE_SIP": self.op.secondary_ip,
1209
      }
1210
    nodes_0 = self.cfg.GetNodeList()
1211
    nodes_1 = nodes_0 + [self.op.node_name, ]
1212
    return env, nodes_0, nodes_1
1213

    
1214
  def CheckPrereq(self):
1215
    """Check prerequisites.
1216

1217
    This checks:
1218
     - the new node is not already in the config
1219
     - it is resolvable
1220
     - its parameters (single/dual homed) matches the cluster
1221

1222
    Any errors are signalled by raising errors.OpPrereqError.
1223

1224
    """
1225
    node_name = self.op.node_name
1226
    cfg = self.cfg
1227

    
1228
    dns_data = utils.LookupHostname(node_name)
1229
    if not dns_data:
1230
      raise errors.OpPrereqError, ("Node %s is not resolvable" % node_name)
1231

    
1232
    node = dns_data['hostname']
1233
    primary_ip = self.op.primary_ip = dns_data['ip']
1234
    secondary_ip = getattr(self.op, "secondary_ip", None)
1235
    if secondary_ip is None:
1236
      secondary_ip = primary_ip
1237
    if not utils.IsValidIP(secondary_ip):
1238
      raise errors.OpPrereqError, ("Invalid secondary IP given")
1239
    self.op.secondary_ip = secondary_ip
1240
    node_list = cfg.GetNodeList()
1241
    if node in node_list:
1242
      raise errors.OpPrereqError, ("Node %s is already in the configuration"
1243
                                   % node)
1244

    
1245
    for existing_node_name in node_list:
1246
      existing_node = cfg.GetNodeInfo(existing_node_name)
1247
      if (existing_node.primary_ip == primary_ip or
1248
          existing_node.secondary_ip == primary_ip or
1249
          existing_node.primary_ip == secondary_ip or
1250
          existing_node.secondary_ip == secondary_ip):
1251
        raise errors.OpPrereqError, ("New node ip address(es) conflict with"
1252
                                     " existing node %s" % existing_node.name)
1253

    
1254
    # check that the type of the node (single versus dual homed) is the
1255
    # same as for the master
1256
    myself = cfg.GetNodeInfo(cfg.GetMaster())
1257
    master_singlehomed = myself.secondary_ip == myself.primary_ip
1258
    newbie_singlehomed = secondary_ip == primary_ip
1259
    if master_singlehomed != newbie_singlehomed:
1260
      if master_singlehomed:
1261
        raise errors.OpPrereqError, ("The master has no private ip but the"
1262
                                     " new node has one")
1263
      else:
1264
        raise errors.OpPrereqError ("The master has a private ip but the"
1265
                                    " new node doesn't have one")
1266

    
1267
    # checks reachablity
1268
    command = ["fping", "-q", primary_ip]
1269
    result = utils.RunCmd(command)
1270
    if result.failed:
1271
      raise errors.OpPrereqError, ("Node not reachable by ping")
1272

    
1273
    if not newbie_singlehomed:
1274
      # check reachability from my secondary ip to newbie's secondary ip
1275
      command = ["fping", "-S%s" % myself.secondary_ip, "-q", secondary_ip]
1276
      result = utils.RunCmd(command)
1277
      if result.failed:
1278
        raise errors.OpPrereqError, ("Node secondary ip not reachable by ping")
1279

    
1280
    self.new_node = objects.Node(name=node,
1281
                                 primary_ip=primary_ip,
1282
                                 secondary_ip=secondary_ip)
1283

    
1284
  def Exec(self, feedback_fn):
1285
    """Adds the new node to the cluster.
1286

1287
    """
1288
    new_node = self.new_node
1289
    node = new_node.name
1290

    
1291
    # set up inter-node password and certificate and restarts the node daemon
1292
    gntpass = self.sstore.GetNodeDaemonPassword()
1293
    if not re.match('^[a-zA-Z0-9.]{1,64}$', gntpass):
1294
      raise errors.OpExecError, ("ganeti password corruption detected")
1295
    f = open(constants.SSL_CERT_FILE)
1296
    try:
1297
      gntpem = f.read(8192)
1298
    finally:
1299
      f.close()
1300
    # in the base64 pem encoding, neither '!' nor '.' are valid chars,
1301
    # so we use this to detect an invalid certificate; as long as the
1302
    # cert doesn't contain this, the here-document will be correctly
1303
    # parsed by the shell sequence below
1304
    if re.search('^!EOF\.', gntpem, re.MULTILINE):
1305
      raise errors.OpExecError, ("invalid PEM encoding in the SSL certificate")
1306
    if not gntpem.endswith("\n"):
1307
      raise errors.OpExecError, ("PEM must end with newline")
1308
    logger.Info("copy cluster pass to %s and starting the node daemon" % node)
1309

    
1310
    # remove first the root's known_hosts file
1311
    utils.RemoveFile("/root/.ssh/known_hosts")
1312
    # and then connect with ssh to set password and start ganeti-noded
1313
    # note that all the below variables are sanitized at this point,
1314
    # either by being constants or by the checks above
1315
    ss = self.sstore
1316
    mycommand = ("umask 077 && "
1317
                 "echo '%s' > '%s' && "
1318
                 "cat > '%s' << '!EOF.' && \n"
1319
                 "%s!EOF.\n%s restart" %
1320
                 (gntpass, ss.KeyToFilename(ss.SS_NODED_PASS),
1321
                  constants.SSL_CERT_FILE, gntpem,
1322
                  constants.NODE_INITD_SCRIPT))
1323

    
1324
    result = ssh.SSHCall(node, 'root', mycommand, batch=False, ask_key=True)
1325
    if result.failed:
1326
      raise errors.OpExecError, ("Remote command on node %s, error: %s,"
1327
                                 " output: %s" %
1328
                                 (node, result.fail_reason, result.output))
1329

    
1330
    # check connectivity
1331
    time.sleep(4)
1332

    
1333
    result = rpc.call_version([node])[node]
1334
    if result:
1335
      if constants.PROTOCOL_VERSION == result:
1336
        logger.Info("communication to node %s fine, sw version %s match" %
1337
                    (node, result))
1338
      else:
1339
        raise errors.OpExecError, ("Version mismatch master version %s,"
1340
                                   " node version %s" %
1341
                                   (constants.PROTOCOL_VERSION, result))
1342
    else:
1343
      raise errors.OpExecError, ("Cannot get version from the new node")
1344

    
1345
    # setup ssh on node
1346
    logger.Info("copy ssh key to node %s" % node)
1347
    keyarray = []
1348
    keyfiles = ["/etc/ssh/ssh_host_dsa_key", "/etc/ssh/ssh_host_dsa_key.pub",
1349
                "/etc/ssh/ssh_host_rsa_key", "/etc/ssh/ssh_host_rsa_key.pub",
1350
                "/root/.ssh/id_dsa", "/root/.ssh/id_dsa.pub"]
1351

    
1352
    for i in keyfiles:
1353
      f = open(i, 'r')
1354
      try:
1355
        keyarray.append(f.read())
1356
      finally:
1357
        f.close()
1358

    
1359
    result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1360
                               keyarray[3], keyarray[4], keyarray[5])
1361

    
1362
    if not result:
1363
      raise errors.OpExecError, ("Cannot transfer ssh keys to the new node")
1364

    
1365
    # Add node to our /etc/hosts, and add key to known_hosts
1366
    _UpdateEtcHosts(new_node.name, new_node.primary_ip)
1367
    _UpdateKnownHosts(new_node.name, new_node.primary_ip,
1368
                      self.cfg.GetHostKey())
1369

    
1370
    if new_node.secondary_ip != new_node.primary_ip:
1371
      result = ssh.SSHCall(node, "root",
1372
                           "fping -S 127.0.0.1 -q %s" % new_node.secondary_ip)
1373
      if result.failed:
1374
        raise errors.OpExecError, ("Node claims it doesn't have the"
1375
                                   " secondary ip you gave (%s).\n"
1376
                                   "Please fix and re-run this command." %
1377
                                   new_node.secondary_ip)
1378

    
1379
    # Distribute updated /etc/hosts and known_hosts to all nodes,
1380
    # including the node just added
1381
    myself = self.cfg.GetNodeInfo(self.cfg.GetMaster())
1382
    dist_nodes = self.cfg.GetNodeList() + [node]
1383
    if myself.name in dist_nodes:
1384
      dist_nodes.remove(myself.name)
1385

    
1386
    logger.Debug("Copying hosts and known_hosts to all nodes")
1387
    for fname in ("/etc/hosts", "/etc/ssh/ssh_known_hosts"):
1388
      result = rpc.call_upload_file(dist_nodes, fname)
1389
      for to_node in dist_nodes:
1390
        if not result[to_node]:
1391
          logger.Error("copy of file %s to node %s failed" %
1392
                       (fname, to_node))
1393

    
1394
    to_copy = [constants.MASTER_CRON_FILE,
1395
               constants.MASTER_INITD_SCRIPT,
1396
               constants.CLUSTER_NAME_FILE]
1397
    to_copy.extend(ss.GetFileList())
1398
    for fname in to_copy:
1399
      if not ssh.CopyFileToNode(node, fname):
1400
        logger.Error("could not copy file %s to node %s" % (fname, node))
1401

    
1402
    logger.Info("adding node %s to cluster.conf" % node)
1403
    self.cfg.AddNode(new_node)
1404

    
1405

    
1406
class LUMasterFailover(LogicalUnit):
1407
  """Failover the master node to the current node.
1408

1409
  This is a special LU in that it must run on a non-master node.
1410

1411
  """
1412
  HPATH = "master-failover"
1413
  HTYPE = constants.HTYPE_CLUSTER
1414
  REQ_MASTER = False
1415
  _OP_REQP = []
1416

    
1417
  def BuildHooksEnv(self):
1418
    """Build hooks env.
1419

1420
    This will run on the new master only in the pre phase, and on all
1421
    the nodes in the post phase.
1422

1423
    """
1424
    env = {
1425
      "NEW_MASTER": self.new_master,
1426
      "OLD_MASTER": self.old_master,
1427
      }
1428
    return env, [self.new_master], self.cfg.GetNodeList()
1429

    
1430
  def CheckPrereq(self):
1431
    """Check prerequisites.
1432

1433
    This checks that we are not already the master.
1434

1435
    """
1436
    self.new_master = socket.gethostname()
1437

    
1438
    self.old_master = self.cfg.GetMaster()
1439

    
1440
    if self.old_master == self.new_master:
1441
      raise errors.OpPrereqError, ("This commands must be run on the node"
1442
                                   " where you want the new master to be.\n"
1443
                                   "%s is already the master" %
1444
                                   self.old_master)
1445

    
1446
  def Exec(self, feedback_fn):
1447
    """Failover the master node.
1448

1449
    This command, when run on a non-master node, will cause the current
1450
    master to cease being master, and the non-master to become new
1451
    master.
1452

1453
    """
1454

    
1455
    #TODO: do not rely on gethostname returning the FQDN
1456
    logger.Info("setting master to %s, old master: %s" %
1457
                (self.new_master, self.old_master))
1458

    
1459
    if not rpc.call_node_stop_master(self.old_master):
1460
      logger.Error("could disable the master role on the old master"
1461
                   " %s, please disable manually" % self.old_master)
1462

    
1463
    if not rpc.call_node_start_master(self.new_master):
1464
      logger.Error("could not start the master role on the new master"
1465
                   " %s, please check" % self.new_master)
1466

    
1467
    self.cfg.SetMaster(self.new_master)
1468

    
1469

    
1470
class LUQueryClusterInfo(NoHooksLU):
1471
  """Query cluster configuration.
1472

1473
  """
1474
  _OP_REQP = []
1475

    
1476
  def CheckPrereq(self):
1477
    """No prerequsites needed for this LU.
1478

1479
    """
1480
    pass
1481

    
1482
  def Exec(self, feedback_fn):
1483
    """Return cluster config.
1484

1485
    """
1486
    instances = [self.cfg.GetInstanceInfo(name)
1487
                 for name in self.cfg.GetInstanceList()]
1488
    result = {
1489
      "name": self.cfg.GetClusterName(),
1490
      "software_version": constants.RELEASE_VERSION,
1491
      "protocol_version": constants.PROTOCOL_VERSION,
1492
      "config_version": constants.CONFIG_VERSION,
1493
      "os_api_version": constants.OS_API_VERSION,
1494
      "export_version": constants.EXPORT_VERSION,
1495
      "master": self.cfg.GetMaster(),
1496
      "architecture": (platform.architecture()[0], platform.machine()),
1497
      "instances": [(instance.name, instance.primary_node)
1498
                    for instance in instances],
1499
      "nodes": self.cfg.GetNodeList(),
1500
      }
1501

    
1502
    return result
1503

    
1504

    
1505
class LUClusterCopyFile(NoHooksLU):
1506
  """Copy file to cluster.
1507

1508
  """
1509
  _OP_REQP = ["nodes", "filename"]
1510

    
1511
  def CheckPrereq(self):
1512
    """Check prerequisites.
1513

1514
    It should check that the named file exists and that the given list
1515
    of nodes is valid.
1516

1517
    """
1518
    if not os.path.exists(self.op.filename):
1519
      raise errors.OpPrereqError("No such filename '%s'" % self.op.filename)
1520
    if self.op.nodes:
1521
      nodes = self.op.nodes
1522
    else:
1523
      nodes = self.cfg.GetNodeList()
1524
    self.nodes = []
1525
    for node in nodes:
1526
      nname = self.cfg.ExpandNodeName(node)
1527
      if nname is None:
1528
        raise errors.OpPrereqError, ("Node '%s' is unknown." % node)
1529
      self.nodes.append(nname)
1530

    
1531
  def Exec(self, feedback_fn):
1532
    """Copy a file from master to some nodes.
1533

1534
    Args:
1535
      opts - class with options as members
1536
      args - list containing a single element, the file name
1537
    Opts used:
1538
      nodes - list containing the name of target nodes; if empty, all nodes
1539

1540
    """
1541
    filename = self.op.filename
1542

    
1543
    myname = socket.gethostname()
1544

    
1545
    for node in self.nodes:
1546
      if node == myname:
1547
        continue
1548
      if not ssh.CopyFileToNode(node, filename):
1549
        logger.Error("Copy of file %s to node %s failed" % (filename, node))
1550

    
1551

    
1552
class LUDumpClusterConfig(NoHooksLU):
1553
  """Return a text-representation of the cluster-config.
1554

1555
  """
1556
  _OP_REQP = []
1557

    
1558
  def CheckPrereq(self):
1559
    """No prerequisites.
1560

1561
    """
1562
    pass
1563

    
1564
  def Exec(self, feedback_fn):
1565
    """Dump a representation of the cluster config to the standard output.
1566

1567
    """
1568
    return self.cfg.DumpConfig()
1569

    
1570

    
1571
class LURunClusterCommand(NoHooksLU):
1572
  """Run a command on some nodes.
1573

1574
  """
1575
  _OP_REQP = ["command", "nodes"]
1576

    
1577
  def CheckPrereq(self):
1578
    """Check prerequisites.
1579

1580
    It checks that the given list of nodes is valid.
1581

1582
    """
1583
    if self.op.nodes:
1584
      nodes = self.op.nodes
1585
    else:
1586
      nodes = self.cfg.GetNodeList()
1587
    self.nodes = []
1588
    for node in nodes:
1589
      nname = self.cfg.ExpandNodeName(node)
1590
      if nname is None:
1591
        raise errors.OpPrereqError, ("Node '%s' is unknown." % node)
1592
      self.nodes.append(nname)
1593

    
1594
  def Exec(self, feedback_fn):
1595
    """Run a command on some nodes.
1596

1597
    """
1598
    data = []
1599
    for node in self.nodes:
1600
      result = utils.RunCmd(["ssh", node, self.op.command])
1601
      data.append((node, result.cmd, result.output, result.exit_code))
1602

    
1603
    return data
1604

    
1605

    
1606
class LUActivateInstanceDisks(NoHooksLU):
1607
  """Bring up an instance's disks.
1608

1609
  """
1610
  _OP_REQP = ["instance_name"]
1611

    
1612
  def CheckPrereq(self):
1613
    """Check prerequisites.
1614

1615
    This checks that the instance is in the cluster.
1616

1617
    """
1618
    instance = self.cfg.GetInstanceInfo(
1619
      self.cfg.ExpandInstanceName(self.op.instance_name))
1620
    if instance is None:
1621
      raise errors.OpPrereqError, ("Instance '%s' not known" %
1622
                                   self.op.instance_name)
1623
    self.instance = instance
1624

    
1625

    
1626
  def Exec(self, feedback_fn):
1627
    """Activate the disks.
1628

1629
    """
1630
    disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
1631
    if not disks_ok:
1632
      raise errors.OpExecError, ("Cannot activate block devices")
1633

    
1634
    return disks_info
1635

    
1636

    
1637
def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
1638
  """Prepare the block devices for an instance.
1639

1640
  This sets up the block devices on all nodes.
1641

1642
  Args:
1643
    instance: a ganeti.objects.Instance object
1644
    ignore_secondaries: if true, errors on secondary nodes won't result
1645
                        in an error return from the function
1646

1647
  Returns:
1648
    false if the operation failed
1649
    list of (host, instance_visible_name, node_visible_name) if the operation
1650
         suceeded with the mapping from node devices to instance devices
1651
  """
1652
  device_info = []
1653
  disks_ok = True
1654
  for inst_disk in instance.disks:
1655
    master_result = None
1656
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1657
      cfg.SetDiskID(node_disk, node)
1658
      is_primary = node == instance.primary_node
1659
      result = rpc.call_blockdev_assemble(node, node_disk, is_primary)
1660
      if not result:
1661
        logger.Error("could not prepare block device %s on node %s (is_pri"
1662
                     "mary=%s)" % (inst_disk.iv_name, node, is_primary))
1663
        if is_primary or not ignore_secondaries:
1664
          disks_ok = False
1665
      if is_primary:
1666
        master_result = result
1667
    device_info.append((instance.primary_node, inst_disk.iv_name,
1668
                        master_result))
1669

    
1670
  return disks_ok, device_info
1671

    
1672

    
1673
class LUDeactivateInstanceDisks(NoHooksLU):
1674
  """Shutdown an instance's disks.
1675

1676
  """
1677
  _OP_REQP = ["instance_name"]
1678

    
1679
  def CheckPrereq(self):
1680
    """Check prerequisites.
1681

1682
    This checks that the instance is in the cluster.
1683

1684
    """
1685
    instance = self.cfg.GetInstanceInfo(
1686
      self.cfg.ExpandInstanceName(self.op.instance_name))
1687
    if instance is None:
1688
      raise errors.OpPrereqError, ("Instance '%s' not known" %
1689
                                   self.op.instance_name)
1690
    self.instance = instance
1691

    
1692
  def Exec(self, feedback_fn):
1693
    """Deactivate the disks
1694

1695
    """
1696
    instance = self.instance
1697
    ins_l = rpc.call_instance_list([instance.primary_node])
1698
    ins_l = ins_l[instance.primary_node]
1699
    if not type(ins_l) is list:
1700
      raise errors.OpExecError, ("Can't contact node '%s'" %
1701
                                 instance.primary_node)
1702

    
1703
    if self.instance.name in ins_l:
1704
      raise errors.OpExecError, ("Instance is running, can't shutdown"
1705
                                 " block devices.")
1706

    
1707
    _ShutdownInstanceDisks(instance, self.cfg)
1708

    
1709

    
1710
def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
1711
  """Shutdown block devices of an instance.
1712

1713
  This does the shutdown on all nodes of the instance.
1714

1715
  If the ignore_primary is false, errors on the primary node are
1716
  ignored.
1717

1718
  """
1719
  result = True
1720
  for disk in instance.disks:
1721
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
1722
      cfg.SetDiskID(top_disk, node)
1723
      if not rpc.call_blockdev_shutdown(node, top_disk):
1724
        logger.Error("could not shutdown block device %s on node %s" %
1725
                     (disk.iv_name, node))
1726
        if not ignore_primary or node != instance.primary_node:
1727
          result = False
1728
  return result
1729

    
1730

    
1731
class LUStartupInstance(LogicalUnit):
1732
  """Starts an instance.
1733

1734
  """
1735
  HPATH = "instance-start"
1736
  HTYPE = constants.HTYPE_INSTANCE
1737
  _OP_REQP = ["instance_name", "force"]
1738

    
1739
  def BuildHooksEnv(self):
1740
    """Build hooks env.
1741

1742
    This runs on master, primary and secondary nodes of the instance.
1743

1744
    """
1745
    env = {
1746
      "INSTANCE_NAME": self.op.instance_name,
1747
      "INSTANCE_PRIMARY": self.instance.primary_node,
1748
      "INSTANCE_SECONDARIES": " ".join(self.instance.secondary_nodes),
1749
      "FORCE": self.op.force,
1750
      }
1751
    nl = ([self.cfg.GetMaster(), self.instance.primary_node] +
1752
          list(self.instance.secondary_nodes))
1753
    return env, nl, nl
1754

    
1755
  def CheckPrereq(self):
1756
    """Check prerequisites.
1757

1758
    This checks that the instance is in the cluster.
1759

1760
    """
1761
    instance = self.cfg.GetInstanceInfo(
1762
      self.cfg.ExpandInstanceName(self.op.instance_name))
1763
    if instance is None:
1764
      raise errors.OpPrereqError, ("Instance '%s' not known" %
1765
                                   self.op.instance_name)
1766

    
1767
    # check bridges existance
1768
    brlist = [nic.bridge for nic in instance.nics]
1769
    if not rpc.call_bridges_exist(instance.primary_node, brlist):
1770
      raise errors.OpPrereqError, ("one or more target bridges %s does not"
1771
                                   " exist on destination node '%s'" %
1772
                                   (brlist, instance.primary_node))
1773

    
1774
    self.instance = instance
1775
    self.op.instance_name = instance.name
1776

    
1777
  def Exec(self, feedback_fn):
1778
    """Start the instance.
1779

1780
    """
1781
    instance = self.instance
1782
    force = self.op.force
1783
    extra_args = getattr(self.op, "extra_args", "")
1784

    
1785
    node_current = instance.primary_node
1786

    
1787
    nodeinfo = rpc.call_node_info([node_current], self.cfg.GetVGName())
1788
    if not nodeinfo:
1789
      raise errors.OpExecError, ("Could not contact node %s for infos" %
1790
                                 (node_current))
1791

    
1792
    freememory = nodeinfo[node_current]['memory_free']
1793
    memory = instance.memory
1794
    if memory > freememory:
1795
      raise errors.OpExecError, ("Not enough memory to start instance"
1796
                                 " %s on node %s"
1797
                                 " needed %s MiB, available %s MiB" %
1798
                                 (instance.name, node_current, memory,
1799
                                  freememory))
1800

    
1801
    disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
1802
                                             ignore_secondaries=force)
1803
    if not disks_ok:
1804
      _ShutdownInstanceDisks(instance, self.cfg)
1805
      if not force:
1806
        logger.Error("If the message above refers to a secondary node,"
1807
                     " you can retry the operation using '--force'.")
1808
      raise errors.OpExecError, ("Disk consistency error")
1809

    
1810
    if not rpc.call_instance_start(node_current, instance, extra_args):
1811
      _ShutdownInstanceDisks(instance, self.cfg)
1812
      raise errors.OpExecError, ("Could not start instance")
1813

    
1814
    self.cfg.MarkInstanceUp(instance.name)
1815

    
1816

    
1817
class LUShutdownInstance(LogicalUnit):
1818
  """Shutdown an instance.
1819

1820
  """
1821
  HPATH = "instance-stop"
1822
  HTYPE = constants.HTYPE_INSTANCE
1823
  _OP_REQP = ["instance_name"]
1824

    
1825
  def BuildHooksEnv(self):
1826
    """Build hooks env.
1827

1828
    This runs on master, primary and secondary nodes of the instance.
1829

1830
    """
1831
    env = {
1832
      "INSTANCE_NAME": self.op.instance_name,
1833
      "INSTANCE_PRIMARY": self.instance.primary_node,
1834
      "INSTANCE_SECONDARIES": " ".join(self.instance.secondary_nodes),
1835
      }
1836
    nl = ([self.cfg.GetMaster(), self.instance.primary_node] +
1837
          list(self.instance.secondary_nodes))
1838
    return env, nl, nl
1839

    
1840
  def CheckPrereq(self):
1841
    """Check prerequisites.
1842

1843
    This checks that the instance is in the cluster.
1844

1845
    """
1846
    instance = self.cfg.GetInstanceInfo(
1847
      self.cfg.ExpandInstanceName(self.op.instance_name))
1848
    if instance is None:
1849
      raise errors.OpPrereqError, ("Instance '%s' not known" %
1850
                                   self.op.instance_name)
1851
    self.instance = instance
1852

    
1853
  def Exec(self, feedback_fn):
1854
    """Shutdown the instance.
1855

1856
    """
1857
    instance = self.instance
1858
    node_current = instance.primary_node
1859
    if not rpc.call_instance_shutdown(node_current, instance):
1860
      logger.Error("could not shutdown instance")
1861

    
1862
    self.cfg.MarkInstanceDown(instance.name)
1863
    _ShutdownInstanceDisks(instance, self.cfg)
1864

    
1865

    
1866
class LURemoveInstance(LogicalUnit):
1867
  """Remove an instance.
1868

1869
  """
1870
  HPATH = "instance-remove"
1871
  HTYPE = constants.HTYPE_INSTANCE
1872
  _OP_REQP = ["instance_name"]
1873

    
1874
  def BuildHooksEnv(self):
1875
    """Build hooks env.
1876

1877
    This runs on master, primary and secondary nodes of the instance.
1878

1879
    """
1880
    env = {
1881
      "INSTANCE_NAME": self.op.instance_name,
1882
      "INSTANCE_PRIMARY": self.instance.primary_node,
1883
      "INSTANCE_SECONDARIES": " ".join(self.instance.secondary_nodes),
1884
      }
1885
    nl = ([self.cfg.GetMaster(), self.instance.primary_node] +
1886
          list(self.instance.secondary_nodes))
1887
    return env, nl, nl
1888

    
1889
  def CheckPrereq(self):
1890
    """Check prerequisites.
1891

1892
    This checks that the instance is in the cluster.
1893

1894
    """
1895
    instance = self.cfg.GetInstanceInfo(
1896
      self.cfg.ExpandInstanceName(self.op.instance_name))
1897
    if instance is None:
1898
      raise errors.OpPrereqError, ("Instance '%s' not known" %
1899
                                   self.op.instance_name)
1900
    self.instance = instance
1901

    
1902
  def Exec(self, feedback_fn):
1903
    """Remove the instance.
1904

1905
    """
1906
    instance = self.instance
1907
    logger.Info("shutting down instance %s on node %s" %
1908
                (instance.name, instance.primary_node))
1909

    
1910
    if not rpc.call_instance_shutdown(instance.primary_node, instance):
1911
      raise errors.OpExecError, ("Could not shutdown instance %s on node %s" %
1912
                                 (instance.name, instance.primary_node))
1913

    
1914
    logger.Info("removing block devices for instance %s" % instance.name)
1915

    
1916
    _RemoveDisks(instance, self.cfg)
1917

    
1918
    logger.Info("removing instance %s out of cluster config" % instance.name)
1919

    
1920
    self.cfg.RemoveInstance(instance.name)
1921

    
1922

    
1923
class LUQueryInstances(NoHooksLU):
1924
  """Logical unit for querying instances.
1925

1926
  """
1927
  OP_REQP = ["output_fields"]
1928

    
1929
  def CheckPrereq(self):
1930
    """Check prerequisites.
1931

1932
    This checks that the fields required are valid output fields.
1933

1934
    """
1935

    
1936
    self.static_fields = frozenset(["name", "os", "pnode", "snodes",
1937
                                    "admin_state", "admin_ram",
1938
                                    "disk_template", "ip", "mac", "bridge"])
1939
    self.dynamic_fields = frozenset(["oper_state", "oper_ram"])
1940
    self.all_fields = self.static_fields | self.dynamic_fields
1941

    
1942
    if not self.all_fields.issuperset(self.op.output_fields):
1943
      raise errors.OpPrereqError, ("Unknown output fields selected: %s"
1944
                                   % ",".join(frozenset(self.op.output_fields).
1945
                                              difference(self.all_fields)))
1946

    
1947
  def Exec(self, feedback_fn):
1948
    """Computes the list of nodes and their attributes.
1949

1950
    """
1951

    
1952
    instance_names = utils.NiceSort(self.cfg.GetInstanceList())
1953
    instance_list = [self.cfg.GetInstanceInfo(iname) for iname
1954
                     in instance_names]
1955

    
1956
    # begin data gathering
1957

    
1958
    nodes = frozenset([inst.primary_node for inst in instance_list])
1959

    
1960
    bad_nodes = []
1961
    if self.dynamic_fields.intersection(self.op.output_fields):
1962
      live_data = {}
1963
      node_data = rpc.call_all_instances_info(nodes)
1964
      for name in nodes:
1965
        result = node_data[name]
1966
        if result:
1967
          live_data.update(result)
1968
        elif result == False:
1969
          bad_nodes.append(name)
1970
        # else no instance is alive
1971
    else:
1972
      live_data = dict([(name, {}) for name in instance_names])
1973

    
1974
    # end data gathering
1975

    
1976
    output = []
1977
    for instance in instance_list:
1978
      iout = []
1979
      for field in self.op.output_fields:
1980
        if field == "name":
1981
          val = instance.name
1982
        elif field == "os":
1983
          val = instance.os
1984
        elif field == "pnode":
1985
          val = instance.primary_node
1986
        elif field == "snodes":
1987
          val = ",".join(instance.secondary_nodes) or "-"
1988
        elif field == "admin_state":
1989
          if instance.status == "down":
1990
            val = "no"
1991
          else:
1992
            val = "yes"
1993
        elif field == "oper_state":
1994
          if instance.primary_node in bad_nodes:
1995
            val = "(node down)"
1996
          else:
1997
            if live_data.get(instance.name):
1998
              val = "running"
1999
            else:
2000
              val = "stopped"
2001
        elif field == "admin_ram":
2002
          val = instance.memory
2003
        elif field == "oper_ram":
2004
          if instance.primary_node in bad_nodes:
2005
            val = "(node down)"
2006
          elif instance.name in live_data:
2007
            val = live_data[instance.name].get("memory", "?")
2008
          else:
2009
            val = "-"
2010
        elif field == "disk_template":
2011
          val = instance.disk_template
2012
        elif field == "ip":
2013
          val = instance.nics[0].ip
2014
        elif field == "bridge":
2015
          val = instance.nics[0].bridge
2016
        elif field == "mac":
2017
          val = instance.nics[0].mac
2018
        else:
2019
          raise errors.ParameterError, field
2020
        val = str(val)
2021
        iout.append(val)
2022
      output.append(iout)
2023

    
2024
    return output
2025

    
2026

    
2027
class LUFailoverInstance(LogicalUnit):
2028
  """Failover an instance.
2029

2030
  """
2031
  HPATH = "instance-failover"
2032
  HTYPE = constants.HTYPE_INSTANCE
2033
  _OP_REQP = ["instance_name", "ignore_consistency"]
2034

    
2035
  def BuildHooksEnv(self):
2036
    """Build hooks env.
2037

2038
    This runs on master, primary and secondary nodes of the instance.
2039

2040
    """
2041
    env = {
2042
      "INSTANCE_NAME": self.op.instance_name,
2043
      "INSTANCE_PRIMARY": self.instance.primary_node,
2044
      "INSTANCE_SECONDARIES": " ".join(self.instance.secondary_nodes),
2045
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2046
      }
2047
    nl = [self.cfg.GetMaster()] + list(self.instance.secondary_nodes)
2048
    return env, nl, nl
2049

    
2050
  def CheckPrereq(self):
2051
    """Check prerequisites.
2052

2053
    This checks that the instance is in the cluster.
2054

2055
    """
2056
    instance = self.cfg.GetInstanceInfo(
2057
      self.cfg.ExpandInstanceName(self.op.instance_name))
2058
    if instance is None:
2059
      raise errors.OpPrereqError, ("Instance '%s' not known" %
2060
                                   self.op.instance_name)
2061

    
2062
    # check memory requirements on the secondary node
2063
    target_node = instance.secondary_nodes[0]
2064
    nodeinfo = rpc.call_node_info([target_node], self.cfg.GetVGName())
2065
    info = nodeinfo.get(target_node, None)
2066
    if not info:
2067
      raise errors.OpPrereqError, ("Cannot get current information"
2068
                                   " from node '%s'" % nodeinfo)
2069
    if instance.memory > info['memory_free']:
2070
      raise errors.OpPrereqError, ("Not enough memory on target node %s."
2071
                                   " %d MB available, %d MB required" %
2072
                                   (target_node, info['memory_free'],
2073
                                    instance.memory))
2074

    
2075
    # check bridge existance
2076
    brlist = [nic.bridge for nic in instance.nics]
2077
    if not rpc.call_bridges_exist(instance.primary_node, brlist):
2078
      raise errors.OpPrereqError, ("one or more target bridges %s does not"
2079
                                   " exist on destination node '%s'" %
2080
                                   (brlist, instance.primary_node))
2081

    
2082
    self.instance = instance
2083

    
2084
  def Exec(self, feedback_fn):
2085
    """Failover an instance.
2086

2087
    The failover is done by shutting it down on its present node and
2088
    starting it on the secondary.
2089

2090
    """
2091
    instance = self.instance
2092

    
2093
    source_node = instance.primary_node
2094
    target_node = instance.secondary_nodes[0]
2095

    
2096
    feedback_fn("* checking disk consistency between source and target")
2097
    for dev in instance.disks:
2098
      # for remote_raid1, these are md over drbd
2099
      if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2100
        if not self.op.ignore_consistency:
2101
          raise errors.OpExecError, ("Disk %s is degraded on target node,"
2102
                                     " aborting failover." % dev.iv_name)
2103

    
2104
    feedback_fn("* checking target node resource availability")
2105
    nodeinfo = rpc.call_node_info([target_node], self.cfg.GetVGName())
2106

    
2107
    if not nodeinfo:
2108
      raise errors.OpExecError, ("Could not contact target node %s." %
2109
                                 target_node)
2110

    
2111
    free_memory = int(nodeinfo[target_node]['memory_free'])
2112
    memory = instance.memory
2113
    if memory > free_memory:
2114
      raise errors.OpExecError, ("Not enough memory to create instance %s on"
2115
                                 " node %s. needed %s MiB, available %s MiB" %
2116
                                 (instance.name, target_node, memory,
2117
                                  free_memory))
2118

    
2119
    feedback_fn("* shutting down instance on source node")
2120
    logger.Info("Shutting down instance %s on node %s" %
2121
                (instance.name, source_node))
2122

    
2123
    if not rpc.call_instance_shutdown(source_node, instance):
2124
      logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2125
                   " anyway. Please make sure node %s is down"  %
2126
                   (instance.name, source_node, source_node))
2127

    
2128
    feedback_fn("* deactivating the instance's disks on source node")
2129
    if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2130
      raise errors.OpExecError, ("Can't shut down the instance's disks.")
2131

    
2132
    instance.primary_node = target_node
2133
    # distribute new instance config to the other nodes
2134
    self.cfg.AddInstance(instance)
2135

    
2136
    feedback_fn("* activating the instance's disks on target node")
2137
    logger.Info("Starting instance %s on node %s" %
2138
                (instance.name, target_node))
2139

    
2140
    disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2141
                                             ignore_secondaries=True)
2142
    if not disks_ok:
2143
      _ShutdownInstanceDisks(instance, self.cfg)
2144
      raise errors.OpExecError, ("Can't activate the instance's disks")
2145

    
2146
    feedback_fn("* starting the instance on the target node")
2147
    if not rpc.call_instance_start(target_node, instance, None):
2148
      _ShutdownInstanceDisks(instance, self.cfg)
2149
      raise errors.OpExecError("Could not start instance %s on node %s." %
2150
                               (instance.name, target_node))
2151

    
2152

    
2153
def _CreateBlockDevOnPrimary(cfg, node, device):
2154
  """Create a tree of block devices on the primary node.
2155

2156
  This always creates all devices.
2157

2158
  """
2159

    
2160
  if device.children:
2161
    for child in device.children:
2162
      if not _CreateBlockDevOnPrimary(cfg, node, child):
2163
        return False
2164

    
2165
  cfg.SetDiskID(device, node)
2166
  new_id = rpc.call_blockdev_create(node, device, device.size, True)
2167
  if not new_id:
2168
    return False
2169
  if device.physical_id is None:
2170
    device.physical_id = new_id
2171
  return True
2172

    
2173

    
2174
def _CreateBlockDevOnSecondary(cfg, node, device, force):
2175
  """Create a tree of block devices on a secondary node.
2176

2177
  If this device type has to be created on secondaries, create it and
2178
  all its children.
2179

2180
  If not, just recurse to children keeping the same 'force' value.
2181

2182
  """
2183
  if device.CreateOnSecondary():
2184
    force = True
2185
  if device.children:
2186
    for child in device.children:
2187
      if not _CreateBlockDevOnSecondary(cfg, node, child, force):
2188
        return False
2189

    
2190
  if not force:
2191
    return True
2192
  cfg.SetDiskID(device, node)
2193
  new_id = rpc.call_blockdev_create(node, device, device.size, False)
2194
  if not new_id:
2195
    return False
2196
  if device.physical_id is None:
2197
    device.physical_id = new_id
2198
  return True
2199

    
2200

    
2201
def _GenerateMDDRBDBranch(cfg, vgname, primary, secondary, size, base):
2202
  """Generate a drbd device complete with its children.
2203

2204
  """
2205
  port = cfg.AllocatePort()
2206
  base = "%s_%s" % (base, port)
2207
  dev_data = objects.Disk(dev_type="lvm", size=size,
2208
                          logical_id=(vgname, "%s.data" % base))
2209
  dev_meta = objects.Disk(dev_type="lvm", size=128,
2210
                          logical_id=(vgname, "%s.meta" % base))
2211
  drbd_dev = objects.Disk(dev_type="drbd", size=size,
2212
                          logical_id = (primary, secondary, port),
2213
                          children = [dev_data, dev_meta])
2214
  return drbd_dev
2215

    
2216

    
2217
def _GenerateDiskTemplate(cfg, vgname, template_name,
2218
                          instance_name, primary_node,
2219
                          secondary_nodes, disk_sz, swap_sz):
2220
  """Generate the entire disk layout for a given template type.
2221

2222
  """
2223
  #TODO: compute space requirements
2224

    
2225
  if template_name == "diskless":
2226
    disks = []
2227
  elif template_name == "plain":
2228
    if len(secondary_nodes) != 0:
2229
      raise errors.ProgrammerError("Wrong template configuration")
2230
    sda_dev = objects.Disk(dev_type="lvm", size=disk_sz,
2231
                           logical_id=(vgname, "%s.os" % instance_name),
2232
                           iv_name = "sda")
2233
    sdb_dev = objects.Disk(dev_type="lvm", size=swap_sz,
2234
                           logical_id=(vgname, "%s.swap" % instance_name),
2235
                           iv_name = "sdb")
2236
    disks = [sda_dev, sdb_dev]
2237
  elif template_name == "local_raid1":
2238
    if len(secondary_nodes) != 0:
2239
      raise errors.ProgrammerError("Wrong template configuration")
2240
    sda_dev_m1 = objects.Disk(dev_type="lvm", size=disk_sz,
2241
                              logical_id=(vgname, "%s.os_m1" % instance_name))
2242
    sda_dev_m2 = objects.Disk(dev_type="lvm", size=disk_sz,
2243
                              logical_id=(vgname, "%s.os_m2" % instance_name))
2244
    md_sda_dev = objects.Disk(dev_type="md_raid1", iv_name = "sda",
2245
                              size=disk_sz,
2246
                              children = [sda_dev_m1, sda_dev_m2])
2247
    sdb_dev_m1 = objects.Disk(dev_type="lvm", size=swap_sz,
2248
                              logical_id=(vgname, "%s.swap_m1" %
2249
                                          instance_name))
2250
    sdb_dev_m2 = objects.Disk(dev_type="lvm", size=swap_sz,
2251
                              logical_id=(vgname, "%s.swap_m2" %
2252
                                          instance_name))
2253
    md_sdb_dev = objects.Disk(dev_type="md_raid1", iv_name = "sdb",
2254
                              size=swap_sz,
2255
                              children = [sdb_dev_m1, sdb_dev_m2])
2256
    disks = [md_sda_dev, md_sdb_dev]
2257
  elif template_name == "remote_raid1":
2258
    if len(secondary_nodes) != 1:
2259
      raise errors.ProgrammerError("Wrong template configuration")
2260
    remote_node = secondary_nodes[0]
2261
    drbd_sda_dev = _GenerateMDDRBDBranch(cfg, vgname,
2262
                                         primary_node, remote_node, disk_sz,
2263
                                         "%s-sda" % instance_name)
2264
    md_sda_dev = objects.Disk(dev_type="md_raid1", iv_name="sda",
2265
                              children = [drbd_sda_dev], size=disk_sz)
2266
    drbd_sdb_dev = _GenerateMDDRBDBranch(cfg, vgname,
2267
                                         primary_node, remote_node, swap_sz,
2268
                                         "%s-sdb" % instance_name)
2269
    md_sdb_dev = objects.Disk(dev_type="md_raid1", iv_name="sdb",
2270
                              children = [drbd_sdb_dev], size=swap_sz)
2271
    disks = [md_sda_dev, md_sdb_dev]
2272
  else:
2273
    raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
2274
  return disks
2275

    
2276

    
2277
def _CreateDisks(cfg, instance):
2278
  """Create all disks for an instance.
2279

2280
  This abstracts away some work from AddInstance.
2281

2282
  Args:
2283
    instance: the instance object
2284

2285
  Returns:
2286
    True or False showing the success of the creation process
2287

2288
  """
2289
  for device in instance.disks:
2290
    logger.Info("creating volume %s for instance %s" %
2291
              (device.iv_name, instance.name))
2292
    #HARDCODE
2293
    for secondary_node in instance.secondary_nodes:
2294
      if not _CreateBlockDevOnSecondary(cfg, secondary_node, device, False):
2295
        logger.Error("failed to create volume %s (%s) on secondary node %s!" %
2296
                     (device.iv_name, device, secondary_node))
2297
        return False
2298
    #HARDCODE
2299
    if not _CreateBlockDevOnPrimary(cfg, instance.primary_node, device):
2300
      logger.Error("failed to create volume %s on primary!" %
2301
                   device.iv_name)
2302
      return False
2303
  return True
2304

    
2305

    
2306
def _RemoveDisks(instance, cfg):
2307
  """Remove all disks for an instance.
2308

2309
  This abstracts away some work from `AddInstance()` and
2310
  `RemoveInstance()`. Note that in case some of the devices couldn't
2311
  be remove, the removal will continue with the other ones (compare
2312
  with `_CreateDisks()`).
2313

2314
  Args:
2315
    instance: the instance object
2316

2317
  Returns:
2318
    True or False showing the success of the removal proces
2319

2320
  """
2321
  logger.Info("removing block devices for instance %s" % instance.name)
2322

    
2323
  result = True
2324
  for device in instance.disks:
2325
    for node, disk in device.ComputeNodeTree(instance.primary_node):
2326
      cfg.SetDiskID(disk, node)
2327
      if not rpc.call_blockdev_remove(node, disk):
2328
        logger.Error("could not remove block device %s on node %s,"
2329
                     " continuing anyway" %
2330
                     (device.iv_name, node))
2331
        result = False
2332
  return result
2333

    
2334

    
2335
class LUCreateInstance(LogicalUnit):
2336
  """Create an instance.
2337

2338
  """
2339
  HPATH = "instance-add"
2340
  HTYPE = constants.HTYPE_INSTANCE
2341
  _OP_REQP = ["instance_name", "mem_size", "disk_size", "pnode",
2342
              "disk_template", "swap_size", "mode", "start", "vcpus",
2343
              "wait_for_sync"]
2344

    
2345
  def BuildHooksEnv(self):
2346
    """Build hooks env.
2347

2348
    This runs on master, primary and secondary nodes of the instance.
2349

2350
    """
2351
    env = {
2352
      "INSTANCE_NAME": self.op.instance_name,
2353
      "INSTANCE_PRIMARY": self.op.pnode,
2354
      "INSTANCE_SECONDARIES": " ".join(self.secondaries),
2355
      "DISK_TEMPLATE": self.op.disk_template,
2356
      "MEM_SIZE": self.op.mem_size,
2357
      "DISK_SIZE": self.op.disk_size,
2358
      "SWAP_SIZE": self.op.swap_size,
2359
      "VCPUS": self.op.vcpus,
2360
      "BRIDGE": self.op.bridge,
2361
      "INSTANCE_ADD_MODE": self.op.mode,
2362
      }
2363
    if self.op.mode == constants.INSTANCE_IMPORT:
2364
      env["SRC_NODE"] = self.op.src_node
2365
      env["SRC_PATH"] = self.op.src_path
2366
      env["SRC_IMAGE"] = self.src_image
2367
    if self.inst_ip:
2368
      env["INSTANCE_IP"] = self.inst_ip
2369

    
2370
    nl = ([self.cfg.GetMaster(), self.op.pnode] +
2371
          self.secondaries)
2372
    return env, nl, nl
2373

    
2374

    
2375
  def CheckPrereq(self):
2376
    """Check prerequisites.
2377

2378
    """
2379
    if self.op.mode not in (constants.INSTANCE_CREATE,
2380
                            constants.INSTANCE_IMPORT):
2381
      raise errors.OpPrereqError, ("Invalid instance creation mode '%s'" %
2382
                                   self.op.mode)
2383

    
2384
    if self.op.mode == constants.INSTANCE_IMPORT:
2385
      src_node = getattr(self.op, "src_node", None)
2386
      src_path = getattr(self.op, "src_path", None)
2387
      if src_node is None or src_path is None:
2388
        raise errors.OpPrereqError, ("Importing an instance requires source"
2389
                                     " node and path options")
2390
      src_node_full = self.cfg.ExpandNodeName(src_node)
2391
      if src_node_full is None:
2392
        raise errors.OpPrereqError, ("Unknown source node '%s'" % src_node)
2393
      self.op.src_node = src_node = src_node_full
2394

    
2395
      if not os.path.isabs(src_path):
2396
        raise errors.OpPrereqError, ("The source path must be absolute")
2397

    
2398
      export_info = rpc.call_export_info(src_node, src_path)
2399

    
2400
      if not export_info:
2401
        raise errors.OpPrereqError, ("No export found in dir %s" % src_path)
2402

    
2403
      if not export_info.has_section(constants.INISECT_EXP):
2404
        raise errors.ProgrammerError, ("Corrupted export config")
2405

    
2406
      ei_version = export_info.get(constants.INISECT_EXP, 'version')
2407
      if (int(ei_version) != constants.EXPORT_VERSION):
2408
        raise errors.OpPrereqError, ("Wrong export version %s (wanted %d)" %
2409
                                     (ei_version, constants.EXPORT_VERSION))
2410

    
2411
      if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
2412
        raise errors.OpPrereqError, ("Can't import instance with more than"
2413
                                     " one data disk")
2414

    
2415
      # FIXME: are the old os-es, disk sizes, etc. useful?
2416
      self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
2417
      diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
2418
                                                         'disk0_dump'))
2419
      self.src_image = diskimage
2420
    else: # INSTANCE_CREATE
2421
      if getattr(self.op, "os_type", None) is None:
2422
        raise errors.OpPrereqError, ("No guest OS specified")
2423

    
2424
    # check primary node
2425
    pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
2426
    if pnode is None:
2427
      raise errors.OpPrereqError, ("Primary node '%s' is uknown" %
2428
                                   self.op.pnode)
2429
    self.op.pnode = pnode.name
2430
    self.pnode = pnode
2431
    self.secondaries = []
2432
    # disk template and mirror node verification
2433
    if self.op.disk_template not in constants.DISK_TEMPLATES:
2434
      raise errors.OpPrereqError, ("Invalid disk template name")
2435

    
2436
    if self.op.disk_template == constants.DT_REMOTE_RAID1:
2437
      if getattr(self.op, "snode", None) is None:
2438
        raise errors.OpPrereqError, ("The 'remote_raid1' disk template needs"
2439
                                     " a mirror node")
2440

    
2441
      snode_name = self.cfg.ExpandNodeName(self.op.snode)
2442
      if snode_name is None:
2443
        raise errors.OpPrereqError, ("Unknown secondary node '%s'" %
2444
                                     self.op.snode)
2445
      elif snode_name == pnode.name:
2446
        raise errors.OpPrereqError, ("The secondary node cannot be"
2447
                                     " the primary node.")
2448
      self.secondaries.append(snode_name)
2449

    
2450
    # Check lv size requirements
2451
    nodenames = [pnode.name] + self.secondaries
2452
    nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
2453

    
2454
    # Required free disk space as a function of disk and swap space
2455
    req_size_dict = {
2456
      constants.DT_DISKLESS: 0,
2457
      constants.DT_PLAIN: self.op.disk_size + self.op.swap_size,
2458
      constants.DT_LOCAL_RAID1: (self.op.disk_size + self.op.swap_size) * 2,
2459
      # 256 MB are added for drbd metadata, 128MB for each drbd device
2460
      constants.DT_REMOTE_RAID1: self.op.disk_size + self.op.swap_size + 256,
2461
    }
2462

    
2463
    if self.op.disk_template not in req_size_dict:
2464
      raise errors.ProgrammerError, ("Disk template '%s' size requirement"
2465
                                     " is unknown" %  self.op.disk_template)
2466

    
2467
    req_size = req_size_dict[self.op.disk_template]
2468

    
2469
    for node in nodenames:
2470
      info = nodeinfo.get(node, None)
2471
      if not info:
2472
        raise errors.OpPrereqError, ("Cannot get current information"
2473
                                     " from node '%s'" % nodeinfo)
2474
      if req_size > info['vg_free']:
2475
        raise errors.OpPrereqError, ("Not enough disk space on target node %s."
2476
                                     " %d MB available, %d MB required" %
2477
                                     (node, info['vg_free'], req_size))
2478

    
2479
    # os verification
2480
    os_obj = rpc.call_os_get([pnode.name], self.op.os_type)[pnode.name]
2481
    if not isinstance(os_obj, objects.OS):
2482
      raise errors.OpPrereqError, ("OS '%s' not in supported os list for"
2483
                                   " primary node"  % self.op.os_type)
2484

    
2485
    # instance verification
2486
    hostname1 = utils.LookupHostname(self.op.instance_name)
2487
    if not hostname1:
2488
      raise errors.OpPrereqError, ("Instance name '%s' not found in dns" %
2489
                                   self.op.instance_name)
2490

    
2491
    self.op.instance_name = instance_name = hostname1['hostname']
2492
    instance_list = self.cfg.GetInstanceList()
2493
    if instance_name in instance_list:
2494
      raise errors.OpPrereqError, ("Instance '%s' is already in the cluster" %
2495
                                   instance_name)
2496

    
2497
    ip = getattr(self.op, "ip", None)
2498
    if ip is None or ip.lower() == "none":
2499
      inst_ip = None
2500
    elif ip.lower() == "auto":
2501
      inst_ip = hostname1['ip']
2502
    else:
2503
      if not utils.IsValidIP(ip):
2504
        raise errors.OpPrereqError, ("given IP address '%s' doesn't look"
2505
                                     " like a valid IP" % ip)
2506
      inst_ip = ip
2507
    self.inst_ip = inst_ip
2508

    
2509
    command = ["fping", "-q", hostname1['ip']]
2510
    result = utils.RunCmd(command)
2511
    if not result.failed:
2512
      raise errors.OpPrereqError, ("IP %s of instance %s already in use" %
2513
                                   (hostname1['ip'], instance_name))
2514

    
2515
    # bridge verification
2516
    bridge = getattr(self.op, "bridge", None)
2517
    if bridge is None:
2518
      self.op.bridge = self.cfg.GetDefBridge()
2519
    else:
2520
      self.op.bridge = bridge
2521

    
2522
    if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
2523
      raise errors.OpPrereqError, ("target bridge '%s' does not exist on"
2524
                                   " destination node '%s'" %
2525
                                   (self.op.bridge, pnode.name))
2526

    
2527
    if self.op.start:
2528
      self.instance_status = 'up'
2529
    else:
2530
      self.instance_status = 'down'
2531

    
2532
  def Exec(self, feedback_fn):
2533
    """Create and add the instance to the cluster.
2534

2535
    """
2536
    instance = self.op.instance_name
2537
    pnode_name = self.pnode.name
2538

    
2539
    nic = objects.NIC(bridge=self.op.bridge, mac=self.cfg.GenerateMAC())
2540
    if self.inst_ip is not None:
2541
      nic.ip = self.inst_ip
2542

    
2543
    disks = _GenerateDiskTemplate(self.cfg, self.cfg.GetVGName(),
2544
                                  self.op.disk_template,
2545
                                  instance, pnode_name,
2546
                                  self.secondaries, self.op.disk_size,
2547
                                  self.op.swap_size)
2548

    
2549
    iobj = objects.Instance(name=instance, os=self.op.os_type,
2550
                            primary_node=pnode_name,
2551
                            memory=self.op.mem_size,
2552
                            vcpus=self.op.vcpus,
2553
                            nics=[nic], disks=disks,
2554
                            disk_template=self.op.disk_template,
2555
                            status=self.instance_status,
2556
                            )
2557

    
2558
    feedback_fn("* creating instance disks...")
2559
    if not _CreateDisks(self.cfg, iobj):
2560
      _RemoveDisks(iobj, self.cfg)
2561
      raise errors.OpExecError, ("Device creation failed, reverting...")
2562

    
2563
    feedback_fn("adding instance %s to cluster config" % instance)
2564

    
2565
    self.cfg.AddInstance(iobj)
2566

    
2567
    if self.op.wait_for_sync:
2568
      disk_abort = not _WaitForSync(self.cfg, iobj)
2569
    elif iobj.disk_template == "remote_raid1":
2570
      # make sure the disks are not degraded (still sync-ing is ok)
2571
      time.sleep(15)
2572
      feedback_fn("* checking mirrors status")
2573
      disk_abort = not _WaitForSync(self.cfg, iobj, oneshot=True)
2574
    else:
2575
      disk_abort = False
2576

    
2577
    if disk_abort:
2578
      _RemoveDisks(iobj, self.cfg)
2579
      self.cfg.RemoveInstance(iobj.name)
2580
      raise errors.OpExecError, ("There are some degraded disks for"
2581
                                      " this instance")
2582

    
2583
    feedback_fn("creating os for instance %s on node %s" %
2584
                (instance, pnode_name))
2585

    
2586
    if iobj.disk_template != constants.DT_DISKLESS:
2587
      if self.op.mode == constants.INSTANCE_CREATE:
2588
        feedback_fn("* running the instance OS create scripts...")
2589
        if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
2590
          raise errors.OpExecError, ("could not add os for instance %s"
2591
                                          " on node %s" %
2592
                                          (instance, pnode_name))
2593

    
2594
      elif self.op.mode == constants.INSTANCE_IMPORT:
2595
        feedback_fn("* running the instance OS import scripts...")
2596
        src_node = self.op.src_node
2597
        src_image = self.src_image
2598
        if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
2599
                                                src_node, src_image):
2600
          raise errors.OpExecError, ("Could not import os for instance"
2601
                                          " %s on node %s" %
2602
                                          (instance, pnode_name))
2603
      else:
2604
        # also checked in the prereq part
2605
        raise errors.ProgrammerError, ("Unknown OS initialization mode '%s'"
2606
                                       % self.op.mode)
2607

    
2608
    if self.op.start:
2609
      logger.Info("starting instance %s on node %s" % (instance, pnode_name))
2610
      feedback_fn("* starting instance...")
2611
      if not rpc.call_instance_start(pnode_name, iobj, None):
2612
        raise errors.OpExecError, ("Could not start instance")
2613

    
2614

    
2615
class LUConnectConsole(NoHooksLU):
2616
  """Connect to an instance's console.
2617

2618
  This is somewhat special in that it returns the command line that
2619
  you need to run on the master node in order to connect to the
2620
  console.
2621

2622
  """
2623
  _OP_REQP = ["instance_name"]
2624

    
2625
  def CheckPrereq(self):
2626
    """Check prerequisites.
2627

2628
    This checks that the instance is in the cluster.
2629

2630
    """
2631
    instance = self.cfg.GetInstanceInfo(
2632
      self.cfg.ExpandInstanceName(self.op.instance_name))
2633
    if instance is None:
2634
      raise errors.OpPrereqError, ("Instance '%s' not known" %
2635
                                   self.op.instance_name)
2636
    self.instance = instance
2637

    
2638
  def Exec(self, feedback_fn):
2639
    """Connect to the console of an instance
2640

2641
    """
2642
    instance = self.instance
2643
    node = instance.primary_node
2644

    
2645
    node_insts = rpc.call_instance_list([node])[node]
2646
    if node_insts is False:
2647
      raise errors.OpExecError, ("Can't connect to node %s." % node)
2648

    
2649
    if instance.name not in node_insts:
2650
      raise errors.OpExecError, ("Instance %s is not running." % instance.name)
2651

    
2652
    logger.Debug("connecting to console of %s on %s" % (instance.name, node))
2653

    
2654
    hyper = hypervisor.GetHypervisor()
2655
    console_cmd = hyper.GetShellCommandForConsole(instance.name)
2656
    return node, console_cmd
2657

    
2658

    
2659
class LUAddMDDRBDComponent(LogicalUnit):
2660
  """Adda new mirror member to an instance's disk.
2661

2662
  """
2663
  HPATH = "mirror-add"
2664
  HTYPE = constants.HTYPE_INSTANCE
2665
  _OP_REQP = ["instance_name", "remote_node", "disk_name"]
2666

    
2667
  def BuildHooksEnv(self):
2668
    """Build hooks env.
2669

2670
    This runs on the master, the primary and all the secondaries.
2671

2672
    """
2673
    env = {
2674
      "INSTANCE_NAME": self.op.instance_name,
2675
      "NEW_SECONDARY": self.op.remote_node,
2676
      "DISK_NAME": self.op.disk_name,
2677
      }
2678
    nl = [self.cfg.GetMaster(), self.instance.primary_node,
2679
          self.op.remote_node,] + list(self.instance.secondary_nodes)
2680
    return env, nl, nl
2681

    
2682
  def CheckPrereq(self):
2683
    """Check prerequisites.
2684

2685
    This checks that the instance is in the cluster.
2686

2687
    """
2688
    instance = self.cfg.GetInstanceInfo(
2689
      self.cfg.ExpandInstanceName(self.op.instance_name))
2690
    if instance is None:
2691
      raise errors.OpPrereqError, ("Instance '%s' not known" %
2692
                                   self.op.instance_name)
2693
    self.instance = instance
2694

    
2695
    remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
2696
    if remote_node is None:
2697
      raise errors.OpPrereqError, ("Node '%s' not known" % self.op.remote_node)
2698
    self.remote_node = remote_node
2699

    
2700
    if remote_node == instance.primary_node:
2701
      raise errors.OpPrereqError, ("The specified node is the primary node of"
2702
                                   " the instance.")
2703

    
2704
    if instance.disk_template != constants.DT_REMOTE_RAID1:
2705
      raise errors.OpPrereqError, ("Instance's disk layout is not"
2706
                                   " remote_raid1.")
2707
    for disk in instance.disks:
2708
      if disk.iv_name == self.op.disk_name:
2709
        break
2710
    else:
2711
      raise errors.OpPrereqError, ("Can't find this device ('%s') in the"
2712
                                   " instance." % self.op.disk_name)
2713
    if len(disk.children) > 1:
2714
      raise errors.OpPrereqError, ("The device already has two slave"
2715
                                   " devices.\n"
2716
                                   "This would create a 3-disk raid1"
2717
                                   " which we don't allow.")
2718
    self.disk = disk
2719

    
2720
  def Exec(self, feedback_fn):
2721
    """Add the mirror component
2722

2723
    """
2724
    disk = self.disk
2725
    instance = self.instance
2726

    
2727
    remote_node = self.remote_node
2728
    new_drbd = _GenerateMDDRBDBranch(self.cfg, self.cfg.GetVGName(),
2729
                                     instance.primary_node, remote_node,
2730
                                     disk.size, "%s-%s" %
2731
                                     (instance.name, self.op.disk_name))
2732

    
2733
    logger.Info("adding new mirror component on secondary")
2734
    #HARDCODE
2735
    if not _CreateBlockDevOnSecondary(self.cfg, remote_node, new_drbd, False):
2736
      raise errors.OpExecError, ("Failed to create new component on secondary"
2737
                                 " node %s" % remote_node)
2738

    
2739
    logger.Info("adding new mirror component on primary")
2740
    #HARDCODE
2741
    if not _CreateBlockDevOnPrimary(self.cfg, instance.primary_node, new_drbd):
2742
      # remove secondary dev
2743
      self.cfg.SetDiskID(new_drbd, remote_node)
2744
      rpc.call_blockdev_remove(remote_node, new_drbd)
2745
      raise errors.OpExecError, ("Failed to create volume on primary")
2746

    
2747
    # the device exists now
2748
    # call the primary node to add the mirror to md
2749
    logger.Info("adding new mirror component to md")
2750
    if not rpc.call_blockdev_addchild(instance.primary_node,
2751
                                           disk, new_drbd):
2752
      logger.Error("Can't add mirror compoment to md!")
2753
      self.cfg.SetDiskID(new_drbd, remote_node)
2754
      if not rpc.call_blockdev_remove(remote_node, new_drbd):
2755
        logger.Error("Can't rollback on secondary")
2756
      self.cfg.SetDiskID(new_drbd, instance.primary_node)
2757
      if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
2758
        logger.Error("Can't rollback on primary")
2759
      raise errors.OpExecError, "Can't add mirror component to md array"
2760

    
2761
    disk.children.append(new_drbd)
2762

    
2763
    self.cfg.AddInstance(instance)
2764

    
2765
    _WaitForSync(self.cfg, instance)
2766

    
2767
    return 0
2768

    
2769

    
2770
class LURemoveMDDRBDComponent(LogicalUnit):
2771
  """Remove a component from a remote_raid1 disk.
2772

2773
  """
2774
  HPATH = "mirror-remove"
2775
  HTYPE = constants.HTYPE_INSTANCE
2776
  _OP_REQP = ["instance_name", "disk_name", "disk_id"]
2777

    
2778
  def BuildHooksEnv(self):
2779
    """Build hooks env.
2780

2781
    This runs on the master, the primary and all the secondaries.
2782

2783
    """
2784
    env = {
2785
      "INSTANCE_NAME": self.op.instance_name,
2786
      "DISK_NAME": self.op.disk_name,
2787
      "DISK_ID": self.op.disk_id,
2788
      "OLD_SECONDARY": self.old_secondary,
2789
      }
2790
    nl = [self.cfg.GetMaster(),
2791
          self.instance.primary_node] + list(self.instance.secondary_nodes)
2792
    return env, nl, nl
2793

    
2794
  def CheckPrereq(self):
2795
    """Check prerequisites.
2796

2797
    This checks that the instance is in the cluster.
2798

2799
    """
2800
    instance = self.cfg.GetInstanceInfo(
2801
      self.cfg.ExpandInstanceName(self.op.instance_name))
2802
    if instance is None:
2803
      raise errors.OpPrereqError, ("Instance '%s' not known" %
2804
                                   self.op.instance_name)
2805
    self.instance = instance
2806

    
2807
    if instance.disk_template != constants.DT_REMOTE_RAID1:
2808
      raise errors.OpPrereqError, ("Instance's disk layout is not"
2809
                                   " remote_raid1.")
2810
    for disk in instance.disks:
2811
      if disk.iv_name == self.op.disk_name:
2812
        break
2813
    else:
2814
      raise errors.OpPrereqError, ("Can't find this device ('%s') in the"
2815
                                   " instance." % self.op.disk_name)
2816
    for child in disk.children:
2817
      if child.dev_type == "drbd" and child.logical_id[2] == self.op.disk_id:
2818
        break
2819
    else:
2820
      raise errors.OpPrereqError, ("Can't find the device with this port.")
2821

    
2822
    if len(disk.children) < 2:
2823
      raise errors.OpPrereqError, ("Cannot remove the last component from"
2824
                                   " a mirror.")
2825
    self.disk = disk
2826
    self.child = child
2827
    if self.child.logical_id[0] == instance.primary_node:
2828
      oid = 1
2829
    else:
2830
      oid = 0
2831
    self.old_secondary = self.child.logical_id[oid]
2832

    
2833
  def Exec(self, feedback_fn):
2834
    """Remove the mirror component
2835

2836
    """
2837
    instance = self.instance
2838
    disk = self.disk
2839
    child = self.child
2840
    logger.Info("remove mirror component")
2841
    self.cfg.SetDiskID(disk, instance.primary_node)
2842
    if not rpc.call_blockdev_removechild(instance.primary_node,
2843
                                              disk, child):
2844
      raise errors.OpExecError, ("Can't remove child from mirror.")
2845

    
2846
    for node in child.logical_id[:2]:
2847
      self.cfg.SetDiskID(child, node)
2848
      if not rpc.call_blockdev_remove(node, child):
2849
        logger.Error("Warning: failed to remove device from node %s,"
2850
                     " continuing operation." % node)
2851

    
2852
    disk.children.remove(child)
2853
    self.cfg.AddInstance(instance)
2854

    
2855

    
2856
class LUReplaceDisks(LogicalUnit):
2857
  """Replace the disks of an instance.
2858

2859
  """
2860
  HPATH = "mirrors-replace"
2861
  HTYPE = constants.HTYPE_INSTANCE
2862
  _OP_REQP = ["instance_name"]
2863

    
2864
  def BuildHooksEnv(self):
2865
    """Build hooks env.
2866

2867
    This runs on the master, the primary and all the secondaries.
2868

2869
    """
2870
    env = {
2871
      "INSTANCE_NAME": self.op.instance_name,
2872
      "NEW_SECONDARY": self.op.remote_node,
2873
      "OLD_SECONDARY": self.instance.secondary_nodes[0],
2874
      }
2875
    nl = [self.cfg.GetMaster(),
2876
          self.instance.primary_node] + list(self.instance.secondary_nodes)
2877
    return env, nl, nl
2878

    
2879
  def CheckPrereq(self):
2880
    """Check prerequisites.
2881

2882
    This checks that the instance is in the cluster.
2883

2884
    """
2885
    instance = self.cfg.GetInstanceInfo(
2886
      self.cfg.ExpandInstanceName(self.op.instance_name))
2887
    if instance is None:
2888
      raise errors.OpPrereqError, ("Instance '%s' not known" %
2889
                                   self.op.instance_name)
2890
    self.instance = instance
2891

    
2892
    if instance.disk_template != constants.DT_REMOTE_RAID1:
2893
      raise errors.OpPrereqError, ("Instance's disk layout is not"
2894
                                   " remote_raid1.")
2895

    
2896
    if len(instance.secondary_nodes) != 1:
2897
      raise errors.OpPrereqError, ("The instance has a strange layout,"
2898
                                   " expected one secondary but found %d" %
2899
                                   len(instance.secondary_nodes))
2900

    
2901
    remote_node = getattr(self.op, "remote_node", None)
2902
    if remote_node is None:
2903
      remote_node = instance.secondary_nodes[0]
2904
    else:
2905
      remote_node = self.cfg.ExpandNodeName(remote_node)
2906
      if remote_node is None:
2907
        raise errors.OpPrereqError, ("Node '%s' not known" %
2908
                                     self.op.remote_node)
2909
    if remote_node == instance.primary_node:
2910
      raise errors.OpPrereqError, ("The specified node is the primary node of"
2911
                                   " the instance.")
2912
    self.op.remote_node = remote_node
2913

    
2914
  def Exec(self, feedback_fn):
2915
    """Replace the disks of an instance.
2916

2917
    """
2918
    instance = self.instance
2919
    iv_names = {}
2920
    # start of work
2921
    remote_node = self.op.remote_node
2922
    cfg = self.cfg
2923
    for dev in instance.disks:
2924
      size = dev.size
2925
      new_drbd = _GenerateMDDRBDBranch(cfg, self.cfg.GetVGName(),
2926
                                       instance.primary_node, remote_node, size,
2927
                                       "%s-%s" % (instance.name, dev.iv_name))
2928
      iv_names[dev.iv_name] = (dev, dev.children[0], new_drbd)
2929
      logger.Info("adding new mirror component on secondary for %s" %
2930
                  dev.iv_name)
2931
      #HARDCODE
2932
      if not _CreateBlockDevOnSecondary(cfg, remote_node, new_drbd, False):
2933
        raise errors.OpExecError, ("Failed to create new component on"
2934
                                   " secondary node %s\n"
2935
                                   "Full abort, cleanup manually!" %
2936
                                   remote_node)
2937

    
2938
      logger.Info("adding new mirror component on primary")
2939
      #HARDCODE
2940
      if not _CreateBlockDevOnPrimary(cfg, instance.primary_node, new_drbd):
2941
        # remove secondary dev
2942
        cfg.SetDiskID(new_drbd, remote_node)
2943
        rpc.call_blockdev_remove(remote_node, new_drbd)
2944
        raise errors.OpExecError("Failed to create volume on primary!\n"
2945
                                 "Full abort, cleanup manually!!")
2946

    
2947
      # the device exists now
2948
      # call the primary node to add the mirror to md
2949
      logger.Info("adding new mirror component to md")
2950
      if not rpc.call_blockdev_addchild(instance.primary_node, dev,
2951
                                             new_drbd):
2952
        logger.Error("Can't add mirror compoment to md!")
2953
        cfg.SetDiskID(new_drbd, remote_node)
2954
        if not rpc.call_blockdev_remove(remote_node, new_drbd):
2955
          logger.Error("Can't rollback on secondary")
2956
        cfg.SetDiskID(new_drbd, instance.primary_node)
2957
        if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
2958
          logger.Error("Can't rollback on primary")
2959
        raise errors.OpExecError, ("Full abort, cleanup manually!!")
2960

    
2961
      dev.children.append(new_drbd)
2962
      cfg.AddInstance(instance)
2963

    
2964
    # this can fail as the old devices are degraded and _WaitForSync
2965
    # does a combined result over all disks, so we don't check its
2966
    # return value
2967
    _WaitForSync(cfg, instance, unlock=True)
2968

    
2969
    # so check manually all the devices
2970
    for name in iv_names:
2971
      dev, child, new_drbd = iv_names[name]
2972
      cfg.SetDiskID(dev, instance.primary_node)
2973
      is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
2974
      if is_degr:
2975
        raise errors.OpExecError, ("MD device %s is degraded!" % name)
2976
      cfg.SetDiskID(new_drbd, instance.primary_node)
2977
      is_degr = rpc.call_blockdev_find(instance.primary_node, new_drbd)[5]
2978
      if is_degr:
2979
        raise errors.OpExecError, ("New drbd device %s is degraded!" % name)
2980

    
2981
    for name in iv_names:
2982
      dev, child, new_drbd = iv_names[name]
2983
      logger.Info("remove mirror %s component" % name)
2984
      cfg.SetDiskID(dev, instance.primary_node)
2985
      if not rpc.call_blockdev_removechild(instance.primary_node,
2986
                                                dev, child):
2987
        logger.Error("Can't remove child from mirror, aborting"
2988
                     " *this device cleanup*.\nYou need to cleanup manually!!")
2989
        continue
2990

    
2991
      for node in child.logical_id[:2]:
2992
        logger.Info("remove child device on %s" % node)
2993
        cfg.SetDiskID(child, node)
2994
        if not rpc.call_blockdev_remove(node, child):
2995
          logger.Error("Warning: failed to remove device from node %s,"
2996
                       " continuing operation." % node)
2997

    
2998
      dev.children.remove(child)
2999

    
3000
      cfg.AddInstance(instance)
3001

    
3002

    
3003
class LUQueryInstanceData(NoHooksLU):
3004
  """Query runtime instance data.
3005

3006
  """
3007
  _OP_REQP = ["instances"]
3008

    
3009
  def CheckPrereq(self):
3010
    """Check prerequisites.
3011

3012
    This only checks the optional instance list against the existing names.
3013

3014
    """
3015
    if not isinstance(self.op.instances, list):
3016
      raise errors.OpPrereqError, "Invalid argument type 'instances'"
3017
    if self.op.instances:
3018
      self.wanted_instances = []
3019
      names = self.op.instances
3020
      for name in names:
3021
        instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
3022
        if instance is None:
3023
          raise errors.OpPrereqError, ("No such instance name '%s'" % name)
3024
      self.wanted_instances.append(instance)
3025
    else:
3026
      self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
3027
                               in self.cfg.GetInstanceList()]
3028
    return
3029

    
3030

    
3031
  def _ComputeDiskStatus(self, instance, snode, dev):
3032
    """Compute block device status.
3033

3034
    """
3035
    self.cfg.SetDiskID(dev, instance.primary_node)
3036
    dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
3037
    if dev.dev_type == "drbd":
3038
      # we change the snode then (otherwise we use the one passed in)
3039
      if dev.logical_id[0] == instance.primary_node:
3040
        snode = dev.logical_id[1]
3041
      else:
3042
        snode = dev.logical_id[0]
3043

    
3044
    if snode:
3045
      self.cfg.SetDiskID(dev, snode)
3046
      dev_sstatus = rpc.call_blockdev_find(snode, dev)
3047
    else:
3048
      dev_sstatus = None
3049

    
3050
    if dev.children:
3051
      dev_children = [self._ComputeDiskStatus(instance, snode, child)
3052
                      for child in dev.children]
3053
    else:
3054
      dev_children = []
3055

    
3056
    data = {
3057
      "iv_name": dev.iv_name,
3058
      "dev_type": dev.dev_type,
3059
      "logical_id": dev.logical_id,
3060
      "physical_id": dev.physical_id,
3061
      "pstatus": dev_pstatus,
3062
      "sstatus": dev_sstatus,
3063
      "children": dev_children,
3064
      }
3065

    
3066
    return data
3067

    
3068
  def Exec(self, feedback_fn):
3069
    """Gather and return data"""
3070

    
3071
    result = {}
3072
    for instance in self.wanted_instances:
3073
      remote_info = rpc.call_instance_info(instance.primary_node,
3074
                                                instance.name)
3075
      if remote_info and "state" in remote_info:
3076
        remote_state = "up"
3077
      else:
3078
        remote_state = "down"
3079
      if instance.status == "down":
3080
        config_state = "down"
3081
      else:
3082
        config_state = "up"
3083

    
3084
      disks = [self._ComputeDiskStatus(instance, None, device)
3085
               for device in instance.disks]
3086

    
3087
      idict = {
3088
        "name": instance.name,
3089
        "config_state": config_state,
3090
        "run_state": remote_state,
3091
        "pnode": instance.primary_node,
3092
        "snodes": instance.secondary_nodes,
3093
        "os": instance.os,
3094
        "memory": instance.memory,
3095
        "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
3096
        "disks": disks,
3097
        }
3098

    
3099
      result[instance.name] = idict
3100

    
3101
    return result
3102

    
3103

    
3104
class LUQueryNodeData(NoHooksLU):
3105
  """Logical unit for querying node data.
3106

3107
  """
3108
  _OP_REQP = ["nodes"]
3109

    
3110
  def CheckPrereq(self):
3111
    """Check prerequisites.
3112

3113
    This only checks the optional node list against the existing names.
3114

3115
    """
3116
    if not isinstance(self.op.nodes, list):
3117
      raise errors.OpPrereqError, "Invalid argument type 'nodes'"
3118
    if self.op.nodes:
3119
      self.wanted_nodes = []
3120
      names = self.op.nodes
3121
      for name in names:
3122
        node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(name))
3123
        if node is None:
3124
          raise errors.OpPrereqError, ("No such node name '%s'" % name)
3125
      self.wanted_nodes.append(node)
3126
    else:
3127
      self.wanted_nodes = [self.cfg.GetNodeInfo(name) for name
3128
                           in self.cfg.GetNodeList()]
3129
    return
3130

    
3131
  def Exec(self, feedback_fn):
3132
    """Compute and return the list of nodes.
3133

3134
    """
3135

    
3136
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
3137
             in self.cfg.GetInstanceList()]
3138
    result = []
3139
    for node in self.wanted_nodes:
3140
      result.append((node.name, node.primary_ip, node.secondary_ip,
3141
                     [inst.name for inst in ilist
3142
                      if inst.primary_node == node.name],
3143
                     [inst.name for inst in ilist
3144
                      if node.name in inst.secondary_nodes],
3145
                     ))
3146
    return result
3147

    
3148

    
3149
class LUSetInstanceParms(LogicalUnit):
3150
  """Modifies an instances's parameters.
3151

3152
  """
3153
  HPATH = "instance-modify"
3154
  HTYPE = constants.HTYPE_INSTANCE
3155
  _OP_REQP = ["instance_name"]
3156

    
3157
  def BuildHooksEnv(self):
3158
    """Build hooks env.
3159

3160
    This runs on the master, primary and secondaries.
3161

3162
    """
3163
    env = {
3164
      "INSTANCE_NAME": self.op.instance_name,
3165
      }
3166
    if self.mem:
3167
      env["MEM_SIZE"] = self.mem
3168
    if self.vcpus:
3169
      env["VCPUS"] = self.vcpus
3170
    if self.do_ip:
3171
      env["INSTANCE_IP"] = self.ip
3172
    if self.bridge:
3173
      env["BRIDGE"] = self.bridge
3174

    
3175
    nl = [self.cfg.GetMaster(),
3176
          self.instance.primary_node] + list(self.instance.secondary_nodes)
3177

    
3178
    return env, nl, nl
3179

    
3180
  def CheckPrereq(self):
3181
    """Check prerequisites.
3182

3183
    This only checks the instance list against the existing names.
3184

3185
    """
3186
    self.mem = getattr(self.op, "mem", None)
3187
    self.vcpus = getattr(self.op, "vcpus", None)
3188
    self.ip = getattr(self.op, "ip", None)
3189
    self.bridge = getattr(self.op, "bridge", None)
3190
    if [self.mem, self.vcpus, self.ip, self.bridge].count(None) == 4:
3191
      raise errors.OpPrereqError, ("No changes submitted")
3192
    if self.mem is not None:
3193
      try:
3194
        self.mem = int(self.mem)
3195
      except ValueError, err:
3196
        raise errors.OpPrereqError, ("Invalid memory size: %s" % str(err))
3197
    if self.vcpus is not None:
3198
      try:
3199
        self.vcpus = int(self.vcpus)
3200
      except ValueError, err:
3201
        raise errors.OpPrereqError, ("Invalid vcpus number: %s" % str(err))
3202
    if self.ip is not None:
3203
      self.do_ip = True
3204
      if self.ip.lower() == "none":
3205
        self.ip = None
3206
      else:
3207
        if not utils.IsValidIP(self.ip):
3208
          raise errors.OpPrereqError, ("Invalid IP address '%s'." % self.ip)
3209
    else:
3210
      self.do_ip = False
3211

    
3212
    instance = self.cfg.GetInstanceInfo(
3213
      self.cfg.ExpandInstanceName(self.op.instance_name))
3214
    if instance is None:
3215
      raise errors.OpPrereqError, ("No such instance name '%s'" %
3216
                                   self.op.instance_name)
3217
    self.op.instance_name = instance.name
3218
    self.instance = instance
3219
    return
3220

    
3221
  def Exec(self, feedback_fn):
3222
    """Modifies an instance.
3223

3224
    All parameters take effect only at the next restart of the instance.
3225
    """
3226
    result = []
3227
    instance = self.instance
3228
    if self.mem:
3229
      instance.memory = self.mem
3230
      result.append(("mem", self.mem))
3231
    if self.vcpus:
3232
      instance.vcpus = self.vcpus
3233
      result.append(("vcpus",  self.vcpus))
3234
    if self.do_ip:
3235
      instance.nics[0].ip = self.ip
3236
      result.append(("ip", self.ip))
3237
    if self.bridge:
3238
      instance.nics[0].bridge = self.bridge
3239
      result.append(("bridge", self.bridge))
3240

    
3241
    self.cfg.AddInstance(instance)
3242

    
3243
    return result
3244

    
3245

    
3246
class LUQueryExports(NoHooksLU):
3247
  """Query the exports list
3248

3249
  """
3250
  _OP_REQP = []
3251

    
3252
  def CheckPrereq(self):
3253
    """Check that the nodelist contains only existing nodes.
3254

3255
    """
3256
    nodes = getattr(self.op, "nodes", None)
3257
    if not nodes:
3258
      self.op.nodes = self.cfg.GetNodeList()
3259
    else:
3260
      expnodes = [self.cfg.ExpandNodeName(node) for node in nodes]
3261
      if expnodes.count(None) > 0:
3262
        raise errors.OpPrereqError, ("At least one of the given nodes %s"
3263
                                     " is unknown" % self.op.nodes)
3264
      self.op.nodes = expnodes
3265

    
3266
  def Exec(self, feedback_fn):
3267

    
3268
    """Compute the list of all the exported system images.
3269

3270
    Returns:
3271
      a dictionary with the structure node->(export-list)
3272
      where export-list is a list of the instances exported on
3273
      that node.
3274

3275
    """
3276
    return rpc.call_export_list(self.op.nodes)
3277

    
3278

    
3279
class LUExportInstance(LogicalUnit):
3280
  """Export an instance to an image in the cluster.
3281

3282
  """
3283
  HPATH = "instance-export"
3284
  HTYPE = constants.HTYPE_INSTANCE
3285
  _OP_REQP = ["instance_name", "target_node", "shutdown"]
3286

    
3287
  def BuildHooksEnv(self):
3288
    """Build hooks env.
3289

3290
    This will run on the master, primary node and target node.
3291

3292
    """
3293
    env = {
3294
      "INSTANCE_NAME": self.op.instance_name,
3295
      "EXPORT_NODE": self.op.target_node,
3296
      "EXPORT_DO_SHUTDOWN": self.op.shutdown,
3297
      }
3298
    nl = [self.cfg.GetMaster(), self.instance.primary_node,
3299
          self.op.target_node]
3300
    return env, nl, nl
3301

    
3302
  def CheckPrereq(self):
3303
    """Check prerequisites.
3304

3305
    This checks that the instance name is a valid one.
3306

3307
    """
3308
    instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
3309
    self.instance = self.cfg.GetInstanceInfo(instance_name)
3310
    if self.instance is None:
3311
      raise errors.OpPrereqError, ("Instance '%s' not found" %
3312
                                   self.op.instance_name)
3313

    
3314
    # node verification
3315
    dst_node_short = self.cfg.ExpandNodeName(self.op.target_node)
3316
    self.dst_node = self.cfg.GetNodeInfo(dst_node_short)
3317

    
3318
    if self.dst_node is None:
3319
      raise errors.OpPrereqError, ("Destination node '%s' is uknown." %
3320
                                   self.op.target_node)
3321
    self.op.target_node = self.dst_node.name
3322

    
3323
  def Exec(self, feedback_fn):
3324
    """Export an instance to an image in the cluster.
3325

3326
    """
3327
    instance = self.instance
3328
    dst_node = self.dst_node
3329
    src_node = instance.primary_node
3330
    # shutdown the instance, unless requested not to do so
3331
    if self.op.shutdown:
3332
      op = opcodes.OpShutdownInstance(instance_name=instance.name)
3333
      self.processor.ChainOpCode(op, feedback_fn)
3334

    
3335
    vgname = self.cfg.GetVGName()
3336

    
3337
    snap_disks = []
3338

    
3339
    try:
3340
      for disk in instance.disks:
3341
        if disk.iv_name == "sda":
3342
          # new_dev_name will be a snapshot of an lvm leaf of the one we passed
3343
          new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
3344

    
3345
          if not new_dev_name:
3346
            logger.Error("could not snapshot block device %s on node %s" %
3347
                         (disk.logical_id[1], src_node))
3348
          else:
3349
            new_dev = objects.Disk(dev_type="lvm", size=disk.size,
3350
                                      logical_id=(vgname, new_dev_name),
3351
                                      physical_id=(vgname, new_dev_name),
3352
                                      iv_name=disk.iv_name)
3353
            snap_disks.append(new_dev)
3354

    
3355
    finally:
3356
      if self.op.shutdown:
3357
        op = opcodes.OpStartupInstance(instance_name=instance.name,
3358
                                       force=False)
3359
        self.processor.ChainOpCode(op, feedback_fn)
3360

    
3361
    # TODO: check for size
3362

    
3363
    for dev in snap_disks:
3364
      if not rpc.call_snapshot_export(src_node, dev, dst_node.name,
3365
                                           instance):
3366
        logger.Error("could not export block device %s from node"
3367
                     " %s to node %s" %
3368
                     (dev.logical_id[1], src_node, dst_node.name))
3369
      if not rpc.call_blockdev_remove(src_node, dev):
3370
        logger.Error("could not remove snapshot block device %s from"
3371
                     " node %s" % (dev.logical_id[1], src_node))
3372

    
3373
    if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
3374
      logger.Error("could not finalize export for instance %s on node %s" %
3375
                   (instance.name, dst_node.name))
3376

    
3377
    nodelist = self.cfg.GetNodeList()
3378
    nodelist.remove(dst_node.name)
3379

    
3380
    # on one-node clusters nodelist will be empty after the removal
3381
    # if we proceed the backup would be removed because OpQueryExports
3382
    # substitutes an empty list with the full cluster node list.
3383
    if nodelist:
3384
      op = opcodes.OpQueryExports(nodes=nodelist)
3385
      exportlist = self.processor.ChainOpCode(op, feedback_fn)
3386
      for node in exportlist:
3387
        if instance.name in exportlist[node]:
3388
          if not rpc.call_export_remove(node, instance.name):
3389
            logger.Error("could not remove older export for instance %s"
3390
                         " on node %s" % (instance.name, node))