Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ 5fcdc80d

History | View | Annotate | Download (109.4 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0613,W0201
25

    
26
import os
27
import os.path
28
import sha
29
import socket
30
import time
31
import tempfile
32
import re
33
import platform
34

    
35
from ganeti import rpc
36
from ganeti import ssh
37
from ganeti import logger
38
from ganeti import utils
39
from ganeti import errors
40
from ganeti import hypervisor
41
from ganeti import config
42
from ganeti import constants
43
from ganeti import objects
44
from ganeti import opcodes
45
from ganeti import ssconf
46

    
47
class LogicalUnit(object):
48
  """Logical Unit base class..
49

50
  Subclasses must follow these rules:
51
    - implement CheckPrereq which also fills in the opcode instance
52
      with all the fields (even if as None)
53
    - implement Exec
54
    - implement BuildHooksEnv
55
    - redefine HPATH and HTYPE
56
    - optionally redefine their run requirements (REQ_CLUSTER,
57
      REQ_MASTER); note that all commands require root permissions
58

59
  """
60
  HPATH = None
61
  HTYPE = None
62
  _OP_REQP = []
63
  REQ_CLUSTER = True
64
  REQ_MASTER = True
65

    
66
  def __init__(self, processor, op, cfg, sstore):
67
    """Constructor for LogicalUnit.
68

69
    This needs to be overriden in derived classes in order to check op
70
    validity.
71

72
    """
73
    self.processor = processor
74
    self.op = op
75
    self.cfg = cfg
76
    self.sstore = sstore
77
    for attr_name in self._OP_REQP:
78
      attr_val = getattr(op, attr_name, None)
79
      if attr_val is None:
80
        raise errors.OpPrereqError, ("Required parameter '%s' missing" %
81
                                     attr_name)
82
    if self.REQ_CLUSTER:
83
      if not cfg.IsCluster():
84
        raise errors.OpPrereqError, ("Cluster not initialized yet,"
85
                                     " use 'gnt-cluster init' first.")
86
      if self.REQ_MASTER:
87
        master = sstore.GetMasterNode()
88
        if master != socket.gethostname():
89
          raise errors.OpPrereqError, ("Commands must be run on the master"
90
                                       " node %s" % master)
91

    
92
  def CheckPrereq(self):
93
    """Check prerequisites for this LU.
94

95
    This method should check that the prerequisites for the execution
96
    of this LU are fulfilled. It can do internode communication, but
97
    it should be idempotent - no cluster or system changes are
98
    allowed.
99

100
    The method should raise errors.OpPrereqError in case something is
101
    not fulfilled. Its return value is ignored.
102

103
    This method should also update all the parameters of the opcode to
104
    their canonical form; e.g. a short node name must be fully
105
    expanded after this method has successfully completed (so that
106
    hooks, logging, etc. work correctly).
107

108
    """
109
    raise NotImplementedError
110

    
111
  def Exec(self, feedback_fn):
112
    """Execute the LU.
113

114
    This method should implement the actual work. It should raise
115
    errors.OpExecError for failures that are somewhat dealt with in
116
    code, or expected.
117

118
    """
119
    raise NotImplementedError
120

    
121
  def BuildHooksEnv(self):
122
    """Build hooks environment for this LU.
123

124
    This method should return a three-node tuple consisting of: a dict
125
    containing the environment that will be used for running the
126
    specific hook for this LU, a list of node names on which the hook
127
    should run before the execution, and a list of node names on which
128
    the hook should run after the execution.
129

130
    The keys of the dict must not have 'GANETI_' prefixed as this will
131
    be handled in the hooks runner. Also note additional keys will be
132
    added by the hooks runner. If the LU doesn't define any
133
    environment, an empty dict (and not None) should be returned.
134

135
    As for the node lists, the master should not be included in the
136
    them, as it will be added by the hooks runner in case this LU
137
    requires a cluster to run on (otherwise we don't have a node
138
    list). No nodes should be returned as an empty list (and not
139
    None).
140

141
    Note that if the HPATH for a LU class is None, this function will
142
    not be called.
143

144
    """
145
    raise NotImplementedError
146

    
147

    
148
class NoHooksLU(LogicalUnit):
149
  """Simple LU which runs no hooks.
150

151
  This LU is intended as a parent for other LogicalUnits which will
152
  run no hooks, in order to reduce duplicate code.
153

154
  """
155
  HPATH = None
156
  HTYPE = None
157

    
158
  def BuildHooksEnv(self):
159
    """Build hooks env.
160

161
    This is a no-op, since we don't run hooks.
162

163
    """
164
    return
165

    
166

    
167
def _GetWantedNodes(lu, nodes):
168
  """Returns list of checked and expanded nodes.
169

170
  Args:
171
    nodes: List of nodes (strings) or None for all
172

173
  """
174
  if nodes is not None and not isinstance(nodes, list):
175
    raise errors.OpPrereqError, "Invalid argument type 'nodes'"
176

    
177
  if nodes:
178
    wanted_nodes = []
179

    
180
    for name in nodes:
181
      node = lu.cfg.GetNodeInfo(lu.cfg.ExpandNodeName(name))
182
      if node is None:
183
        raise errors.OpPrereqError, ("No such node name '%s'" % name)
184
    wanted_nodes.append(node)
185

    
186
    return wanted_nodes
187
  else:
188
    return [lu.cfg.GetNodeInfo(name) for name in lu.cfg.GetNodeList()]
189

    
190

    
191
def _CheckOutputFields(static, dynamic, selected):
192
  """Checks whether all selected fields are valid.
193

194
  Args:
195
    static: Static fields
196
    dynamic: Dynamic fields
197

198
  """
199
  static_fields = frozenset(static)
200
  dynamic_fields = frozenset(dynamic)
201

    
202
  all_fields = static_fields | dynamic_fields
203

    
204
  if not all_fields.issuperset(selected):
205
    raise errors.OpPrereqError, ("Unknown output fields selected: %s"
206
                                 % ",".join(frozenset(selected).
207
                                            difference(all_fields)))
208

    
209

    
210
def _UpdateEtcHosts(fullnode, ip):
211
  """Ensure a node has a correct entry in /etc/hosts.
212

213
  Args:
214
    fullnode - Fully qualified domain name of host. (str)
215
    ip       - IPv4 address of host (str)
216

217
  """
218
  node = fullnode.split(".", 1)[0]
219

    
220
  f = open('/etc/hosts', 'r+')
221

    
222
  inthere = False
223

    
224
  save_lines = []
225
  add_lines = []
226
  removed = False
227

    
228
  while True:
229
    rawline = f.readline()
230

    
231
    if not rawline:
232
      # End of file
233
      break
234

    
235
    line = rawline.split('\n')[0]
236

    
237
    # Strip off comments
238
    line = line.split('#')[0]
239

    
240
    if not line:
241
      # Entire line was comment, skip
242
      save_lines.append(rawline)
243
      continue
244

    
245
    fields = line.split()
246

    
247
    haveall = True
248
    havesome = False
249
    for spec in [ ip, fullnode, node ]:
250
      if spec not in fields:
251
        haveall = False
252
      if spec in fields:
253
        havesome = True
254

    
255
    if haveall:
256
      inthere = True
257
      save_lines.append(rawline)
258
      continue
259

    
260
    if havesome and not haveall:
261
      # Line (old, or manual?) which is missing some.  Remove.
262
      removed = True
263
      continue
264

    
265
    save_lines.append(rawline)
266

    
267
  if not inthere:
268
    add_lines.append('%s\t%s %s\n' % (ip, fullnode, node))
269

    
270
  if removed:
271
    if add_lines:
272
      save_lines = save_lines + add_lines
273

    
274
    # We removed a line, write a new file and replace old.
275
    fd, tmpname = tempfile.mkstemp('tmp', 'hosts_', '/etc')
276
    newfile = os.fdopen(fd, 'w')
277
    newfile.write(''.join(save_lines))
278
    newfile.close()
279
    os.rename(tmpname, '/etc/hosts')
280

    
281
  elif add_lines:
282
    # Simply appending a new line will do the trick.
283
    f.seek(0, 2)
284
    for add in add_lines:
285
      f.write(add)
286

    
287
  f.close()
288

    
289

    
290
def _UpdateKnownHosts(fullnode, ip, pubkey):
291
  """Ensure a node has a correct known_hosts entry.
292

293
  Args:
294
    fullnode - Fully qualified domain name of host. (str)
295
    ip       - IPv4 address of host (str)
296
    pubkey   - the public key of the cluster
297

298
  """
299
  if os.path.exists('/etc/ssh/ssh_known_hosts'):
300
    f = open('/etc/ssh/ssh_known_hosts', 'r+')
301
  else:
302
    f = open('/etc/ssh/ssh_known_hosts', 'w+')
303

    
304
  inthere = False
305

    
306
  save_lines = []
307
  add_lines = []
308
  removed = False
309

    
310
  while True:
311
    rawline = f.readline()
312
    logger.Debug('read %s' % (repr(rawline),))
313

    
314
    if not rawline:
315
      # End of file
316
      break
317

    
318
    line = rawline.split('\n')[0]
319

    
320
    parts = line.split(' ')
321
    fields = parts[0].split(',')
322
    key = parts[2]
323

    
324
    haveall = True
325
    havesome = False
326
    for spec in [ ip, fullnode ]:
327
      if spec not in fields:
328
        haveall = False
329
      if spec in fields:
330
        havesome = True
331

    
332
    logger.Debug("key, pubkey = %s." % (repr((key, pubkey)),))
333
    if haveall and key == pubkey:
334
      inthere = True
335
      save_lines.append(rawline)
336
      logger.Debug("Keeping known_hosts '%s'." % (repr(rawline),))
337
      continue
338

    
339
    if havesome and (not haveall or key != pubkey):
340
      removed = True
341
      logger.Debug("Discarding known_hosts '%s'." % (repr(rawline),))
342
      continue
343

    
344
    save_lines.append(rawline)
345

    
346
  if not inthere:
347
    add_lines.append('%s,%s ssh-rsa %s\n' % (fullnode, ip, pubkey))
348
    logger.Debug("Adding known_hosts '%s'." % (repr(add_lines[-1]),))
349

    
350
  if removed:
351
    save_lines = save_lines + add_lines
352

    
353
    # Write a new file and replace old.
354
    fd, tmpname = tempfile.mkstemp('tmp', 'ssh_known_hosts_', '/etc/ssh')
355
    newfile = os.fdopen(fd, 'w')
356
    newfile.write(''.join(save_lines))
357
    newfile.close()
358
    logger.Debug("Wrote new known_hosts.")
359
    os.rename(tmpname, '/etc/ssh/ssh_known_hosts')
360

    
361
  elif add_lines:
362
    # Simply appending a new line will do the trick.
363
    f.seek(0, 2)
364
    for add in add_lines:
365
      f.write(add)
366

    
367
  f.close()
368

    
369

    
370
def _HasValidVG(vglist, vgname):
371
  """Checks if the volume group list is valid.
372

373
  A non-None return value means there's an error, and the return value
374
  is the error message.
375

376
  """
377
  vgsize = vglist.get(vgname, None)
378
  if vgsize is None:
379
    return "volume group '%s' missing" % vgname
380
  elif vgsize < 20480:
381
    return ("volume group '%s' too small (20480MiB required, %dMib found)" %
382
            (vgname, vgsize))
383
  return None
384

    
385

    
386
def _InitSSHSetup(node):
387
  """Setup the SSH configuration for the cluster.
388

389

390
  This generates a dsa keypair for root, adds the pub key to the
391
  permitted hosts and adds the hostkey to its own known hosts.
392

393
  Args:
394
    node: the name of this host as a fqdn
395

396
  """
397
  utils.RemoveFile('/root/.ssh/known_hosts')
398

    
399
  if os.path.exists('/root/.ssh/id_dsa'):
400
    utils.CreateBackup('/root/.ssh/id_dsa')
401
  if os.path.exists('/root/.ssh/id_dsa.pub'):
402
    utils.CreateBackup('/root/.ssh/id_dsa.pub')
403

    
404
  utils.RemoveFile('/root/.ssh/id_dsa')
405
  utils.RemoveFile('/root/.ssh/id_dsa.pub')
406

    
407
  result = utils.RunCmd(["ssh-keygen", "-t", "dsa",
408
                         "-f", "/root/.ssh/id_dsa",
409
                         "-q", "-N", ""])
410
  if result.failed:
411
    raise errors.OpExecError, ("could not generate ssh keypair, error %s" %
412
                               result.output)
413

    
414
  f = open('/root/.ssh/id_dsa.pub', 'r')
415
  try:
416
    utils.AddAuthorizedKey('/root/.ssh/authorized_keys', f.read(8192))
417
  finally:
418
    f.close()
419

    
420

    
421
def _InitGanetiServerSetup(ss):
422
  """Setup the necessary configuration for the initial node daemon.
423

424
  This creates the nodepass file containing the shared password for
425
  the cluster and also generates the SSL certificate.
426

427
  """
428
  # Create pseudo random password
429
  randpass = sha.new(os.urandom(64)).hexdigest()
430
  # and write it into sstore
431
  ss.SetKey(ss.SS_NODED_PASS, randpass)
432

    
433
  result = utils.RunCmd(["openssl", "req", "-new", "-newkey", "rsa:1024",
434
                         "-days", str(365*5), "-nodes", "-x509",
435
                         "-keyout", constants.SSL_CERT_FILE,
436
                         "-out", constants.SSL_CERT_FILE, "-batch"])
437
  if result.failed:
438
    raise errors.OpExecError, ("could not generate server ssl cert, command"
439
                               " %s had exitcode %s and error message %s" %
440
                               (result.cmd, result.exit_code, result.output))
441

    
442
  os.chmod(constants.SSL_CERT_FILE, 0400)
443

    
444
  result = utils.RunCmd([constants.NODE_INITD_SCRIPT, "restart"])
445

    
446
  if result.failed:
447
    raise errors.OpExecError, ("could not start the node daemon, command %s"
448
                               " had exitcode %s and error %s" %
449
                               (result.cmd, result.exit_code, result.output))
450

    
451

    
452
class LUInitCluster(LogicalUnit):
453
  """Initialise the cluster.
454

455
  """
456
  HPATH = "cluster-init"
457
  HTYPE = constants.HTYPE_CLUSTER
458
  _OP_REQP = ["cluster_name", "hypervisor_type", "vg_name", "mac_prefix",
459
              "def_bridge", "master_netdev"]
460
  REQ_CLUSTER = False
461

    
462
  def BuildHooksEnv(self):
463
    """Build hooks env.
464

465
    Notes: Since we don't require a cluster, we must manually add
466
    ourselves in the post-run node list.
467

468
    """
469
    env = {"CLUSTER": self.op.cluster_name,
470
           "MASTER": self.hostname['hostname_full']}
471
    return env, [], [self.hostname['hostname_full']]
472

    
473
  def CheckPrereq(self):
474
    """Verify that the passed name is a valid one.
475

476
    """
477
    if config.ConfigWriter.IsCluster():
478
      raise errors.OpPrereqError, ("Cluster is already initialised")
479

    
480
    hostname_local = socket.gethostname()
481
    self.hostname = hostname = utils.LookupHostname(hostname_local)
482
    if not hostname:
483
      raise errors.OpPrereqError, ("Cannot resolve my own hostname ('%s')" %
484
                                   hostname_local)
485

    
486
    self.clustername = clustername = utils.LookupHostname(self.op.cluster_name)
487
    if not clustername:
488
      raise errors.OpPrereqError, ("Cannot resolve given cluster name ('%s')"
489
                                   % self.op.cluster_name)
490

    
491
    result = utils.RunCmd(["fping", "-S127.0.0.1", "-q", hostname['ip']])
492
    if result.failed:
493
      raise errors.OpPrereqError, ("Inconsistency: this host's name resolves"
494
                                   " to %s,\nbut this ip address does not"
495
                                   " belong to this host."
496
                                   " Aborting." % hostname['ip'])
497

    
498
    secondary_ip = getattr(self.op, "secondary_ip", None)
499
    if secondary_ip and not utils.IsValidIP(secondary_ip):
500
      raise errors.OpPrereqError, ("Invalid secondary ip given")
501
    if secondary_ip and secondary_ip != hostname['ip']:
502
      result = utils.RunCmd(["fping", "-S127.0.0.1", "-q", secondary_ip])
503
      if result.failed:
504
        raise errors.OpPrereqError, ("You gave %s as secondary IP,\n"
505
                                     "but it does not belong to this host." %
506
                                     secondary_ip)
507
    self.secondary_ip = secondary_ip
508

    
509
    # checks presence of the volume group given
510
    vgstatus = _HasValidVG(utils.ListVolumeGroups(), self.op.vg_name)
511

    
512
    if vgstatus:
513
      raise errors.OpPrereqError, ("Error: %s" % vgstatus)
514

    
515
    if not re.match("^[0-9a-z]{2}:[0-9a-z]{2}:[0-9a-z]{2}$",
516
                    self.op.mac_prefix):
517
      raise errors.OpPrereqError, ("Invalid mac prefix given '%s'" %
518
                                   self.op.mac_prefix)
519

    
520
    if self.op.hypervisor_type not in hypervisor.VALID_HTYPES:
521
      raise errors.OpPrereqError, ("Invalid hypervisor type given '%s'" %
522
                                   self.op.hypervisor_type)
523

    
524
    result = utils.RunCmd(["ip", "link", "show", "dev", self.op.master_netdev])
525
    if result.failed:
526
      raise errors.OpPrereqError, ("Invalid master netdev given (%s): '%s'" %
527
                                   (self.op.master_netdev, result.output))
528

    
529
  def Exec(self, feedback_fn):
530
    """Initialize the cluster.
531

532
    """
533
    clustername = self.clustername
534
    hostname = self.hostname
535

    
536
    # set up the simple store
537
    ss = ssconf.SimpleStore()
538
    ss.SetKey(ss.SS_HYPERVISOR, self.op.hypervisor_type)
539
    ss.SetKey(ss.SS_MASTER_NODE, hostname['hostname_full'])
540
    ss.SetKey(ss.SS_MASTER_IP, clustername['ip'])
541
    ss.SetKey(ss.SS_MASTER_NETDEV, self.op.master_netdev)
542
    ss.SetKey(ss.SS_CLUSTER_NAME, clustername['hostname'])
543

    
544
    # set up the inter-node password and certificate
545
    _InitGanetiServerSetup(ss)
546

    
547
    # start the master ip
548
    rpc.call_node_start_master(hostname['hostname_full'])
549

    
550
    # set up ssh config and /etc/hosts
551
    f = open('/etc/ssh/ssh_host_rsa_key.pub', 'r')
552
    try:
553
      sshline = f.read()
554
    finally:
555
      f.close()
556
    sshkey = sshline.split(" ")[1]
557

    
558
    _UpdateEtcHosts(hostname['hostname_full'],
559
                    hostname['ip'],
560
                    )
561

    
562
    _UpdateKnownHosts(hostname['hostname_full'],
563
                      hostname['ip'],
564
                      sshkey,
565
                      )
566

    
567
    _InitSSHSetup(hostname['hostname'])
568

    
569
    # init of cluster config file
570
    cfgw = config.ConfigWriter()
571
    cfgw.InitConfig(hostname['hostname'], hostname['ip'], self.secondary_ip,
572
                    sshkey, self.op.mac_prefix,
573
                    self.op.vg_name, self.op.def_bridge)
574

    
575

    
576
class LUDestroyCluster(NoHooksLU):
577
  """Logical unit for destroying the cluster.
578

579
  """
580
  _OP_REQP = []
581

    
582
  def CheckPrereq(self):
583
    """Check prerequisites.
584

585
    This checks whether the cluster is empty.
586

587
    Any errors are signalled by raising errors.OpPrereqError.
588

589
    """
590
    master = self.sstore.GetMasterNode()
591

    
592
    nodelist = self.cfg.GetNodeList()
593
    if len(nodelist) > 0 and nodelist != [master]:
594
      raise errors.OpPrereqError, ("There are still %d node(s) in "
595
                                   "this cluster." % (len(nodelist) - 1))
596

    
597
  def Exec(self, feedback_fn):
598
    """Destroys the cluster.
599

600
    """
601
    utils.CreateBackup('/root/.ssh/id_dsa')
602
    utils.CreateBackup('/root/.ssh/id_dsa.pub')
603
    rpc.call_node_leave_cluster(self.sstore.GetMasterNode())
604

    
605

    
606
class LUVerifyCluster(NoHooksLU):
607
  """Verifies the cluster status.
608

609
  """
610
  _OP_REQP = []
611

    
612
  def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
613
                  remote_version, feedback_fn):
614
    """Run multiple tests against a node.
615

616
    Test list:
617
      - compares ganeti version
618
      - checks vg existance and size > 20G
619
      - checks config file checksum
620
      - checks ssh to other nodes
621

622
    Args:
623
      node: name of the node to check
624
      file_list: required list of files
625
      local_cksum: dictionary of local files and their checksums
626

627
    """
628
    # compares ganeti version
629
    local_version = constants.PROTOCOL_VERSION
630
    if not remote_version:
631
      feedback_fn(" - ERROR: connection to %s failed" % (node))
632
      return True
633

    
634
    if local_version != remote_version:
635
      feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
636
                      (local_version, node, remote_version))
637
      return True
638

    
639
    # checks vg existance and size > 20G
640

    
641
    bad = False
642
    if not vglist:
643
      feedback_fn("  - ERROR: unable to check volume groups on node %s." %
644
                      (node,))
645
      bad = True
646
    else:
647
      vgstatus = _HasValidVG(vglist, self.cfg.GetVGName())
648
      if vgstatus:
649
        feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
650
        bad = True
651

    
652
    # checks config file checksum
653
    # checks ssh to any
654

    
655
    if 'filelist' not in node_result:
656
      bad = True
657
      feedback_fn("  - ERROR: node hasn't returned file checksum data")
658
    else:
659
      remote_cksum = node_result['filelist']
660
      for file_name in file_list:
661
        if file_name not in remote_cksum:
662
          bad = True
663
          feedback_fn("  - ERROR: file '%s' missing" % file_name)
664
        elif remote_cksum[file_name] != local_cksum[file_name]:
665
          bad = True
666
          feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
667

    
668
    if 'nodelist' not in node_result:
669
      bad = True
670
      feedback_fn("  - ERROR: node hasn't returned node connectivity data")
671
    else:
672
      if node_result['nodelist']:
673
        bad = True
674
        for node in node_result['nodelist']:
675
          feedback_fn("  - ERROR: communication with node '%s': %s" %
676
                          (node, node_result['nodelist'][node]))
677
    hyp_result = node_result.get('hypervisor', None)
678
    if hyp_result is not None:
679
      feedback_fn("  - ERROR: hypervisor verify failure: '%s'" % hyp_result)
680
    return bad
681

    
682
  def _VerifyInstance(self, instance, node_vol_is, node_instance, feedback_fn):
683
    """Verify an instance.
684

685
    This function checks to see if the required block devices are
686
    available on the instance's node.
687

688
    """
689
    bad = False
690

    
691
    instancelist = self.cfg.GetInstanceList()
692
    if not instance in instancelist:
693
      feedback_fn("  - ERROR: instance %s not in instance list %s" %
694
                      (instance, instancelist))
695
      bad = True
696

    
697
    instanceconfig = self.cfg.GetInstanceInfo(instance)
698
    node_current = instanceconfig.primary_node
699

    
700
    node_vol_should = {}
701
    instanceconfig.MapLVsByNode(node_vol_should)
702

    
703
    for node in node_vol_should:
704
      for volume in node_vol_should[node]:
705
        if node not in node_vol_is or volume not in node_vol_is[node]:
706
          feedback_fn("  - ERROR: volume %s missing on node %s" %
707
                          (volume, node))
708
          bad = True
709

    
710
    if not instanceconfig.status == 'down':
711
      if not instance in node_instance[node_current]:
712
        feedback_fn("  - ERROR: instance %s not running on node %s" %
713
                        (instance, node_current))
714
        bad = True
715

    
716
    for node in node_instance:
717
      if (not node == node_current):
718
        if instance in node_instance[node]:
719
          feedback_fn("  - ERROR: instance %s should not run on node %s" %
720
                          (instance, node))
721
          bad = True
722

    
723
    return not bad
724

    
725
  def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
726
    """Verify if there are any unknown volumes in the cluster.
727

728
    The .os, .swap and backup volumes are ignored. All other volumes are
729
    reported as unknown.
730

731
    """
732
    bad = False
733

    
734
    for node in node_vol_is:
735
      for volume in node_vol_is[node]:
736
        if node not in node_vol_should or volume not in node_vol_should[node]:
737
          feedback_fn("  - ERROR: volume %s on node %s should not exist" %
738
                      (volume, node))
739
          bad = True
740
    return bad
741

    
742
  def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
743
    """Verify the list of running instances.
744

745
    This checks what instances are running but unknown to the cluster.
746

747
    """
748
    bad = False
749
    for node in node_instance:
750
      for runninginstance in node_instance[node]:
751
        if runninginstance not in instancelist:
752
          feedback_fn("  - ERROR: instance %s on node %s should not exist" %
753
                          (runninginstance, node))
754
          bad = True
755
    return bad
756

    
757
  def CheckPrereq(self):
758
    """Check prerequisites.
759

760
    This has no prerequisites.
761

762
    """
763
    pass
764

    
765
  def Exec(self, feedback_fn):
766
    """Verify integrity of cluster, performing various test on nodes.
767

768
    """
769
    bad = False
770
    feedback_fn("* Verifying global settings")
771
    self.cfg.VerifyConfig()
772

    
773
    master = self.sstore.GetMasterNode()
774
    vg_name = self.cfg.GetVGName()
775
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
776
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
777
    node_volume = {}
778
    node_instance = {}
779

    
780
    # FIXME: verify OS list
781
    # do local checksums
782
    file_names = list(self.sstore.GetFileList())
783
    file_names.append(constants.SSL_CERT_FILE)
784
    file_names.append(constants.CLUSTER_CONF_FILE)
785
    local_checksums = utils.FingerprintFiles(file_names)
786

    
787
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
788
    all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
789
    all_instanceinfo = rpc.call_instance_list(nodelist)
790
    all_vglist = rpc.call_vg_list(nodelist)
791
    node_verify_param = {
792
      'filelist': file_names,
793
      'nodelist': nodelist,
794
      'hypervisor': None,
795
      }
796
    all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
797
    all_rversion = rpc.call_version(nodelist)
798

    
799
    for node in nodelist:
800
      feedback_fn("* Verifying node %s" % node)
801
      result = self._VerifyNode(node, file_names, local_checksums,
802
                                all_vglist[node], all_nvinfo[node],
803
                                all_rversion[node], feedback_fn)
804
      bad = bad or result
805

    
806
      # node_volume
807
      volumeinfo = all_volumeinfo[node]
808

    
809
      if type(volumeinfo) != dict:
810
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
811
        bad = True
812
        continue
813

    
814
      node_volume[node] = volumeinfo
815

    
816
      # node_instance
817
      nodeinstance = all_instanceinfo[node]
818
      if type(nodeinstance) != list:
819
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
820
        bad = True
821
        continue
822

    
823
      node_instance[node] = nodeinstance
824

    
825
    node_vol_should = {}
826

    
827
    for instance in instancelist:
828
      feedback_fn("* Verifying instance %s" % instance)
829
      result =  self._VerifyInstance(instance, node_volume, node_instance,
830
                                     feedback_fn)
831
      bad = bad or result
832

    
833
      inst_config = self.cfg.GetInstanceInfo(instance)
834

    
835
      inst_config.MapLVsByNode(node_vol_should)
836

    
837
    feedback_fn("* Verifying orphan volumes")
838
    result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
839
                                       feedback_fn)
840
    bad = bad or result
841

    
842
    feedback_fn("* Verifying remaining instances")
843
    result = self._VerifyOrphanInstances(instancelist, node_instance,
844
                                         feedback_fn)
845
    bad = bad or result
846

    
847
    return int(bad)
848

    
849

    
850
def _WaitForSync(cfgw, instance, oneshot=False, unlock=False):
851
  """Sleep and poll for an instance's disk to sync.
852

853
  """
854
  if not instance.disks:
855
    return True
856

    
857
  if not oneshot:
858
    logger.ToStdout("Waiting for instance %s to sync disks." % instance.name)
859

    
860
  node = instance.primary_node
861

    
862
  for dev in instance.disks:
863
    cfgw.SetDiskID(dev, node)
864

    
865
  retries = 0
866
  while True:
867
    max_time = 0
868
    done = True
869
    cumul_degraded = False
870
    rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
871
    if not rstats:
872
      logger.ToStderr("Can't get any data from node %s" % node)
873
      retries += 1
874
      if retries >= 10:
875
        raise errors.RemoteError, ("Can't contact node %s for mirror data,"
876
                                   " aborting." % node)
877
      time.sleep(6)
878
      continue
879
    retries = 0
880
    for i in range(len(rstats)):
881
      mstat = rstats[i]
882
      if mstat is None:
883
        logger.ToStderr("Can't compute data for node %s/%s" %
884
                        (node, instance.disks[i].iv_name))
885
        continue
886
      perc_done, est_time, is_degraded = mstat
887
      cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
888
      if perc_done is not None:
889
        done = False
890
        if est_time is not None:
891
          rem_time = "%d estimated seconds remaining" % est_time
892
          max_time = est_time
893
        else:
894
          rem_time = "no time estimate"
895
        logger.ToStdout("- device %s: %5.2f%% done, %s" %
896
                        (instance.disks[i].iv_name, perc_done, rem_time))
897
    if done or oneshot:
898
      break
899

    
900
    if unlock:
901
      utils.Unlock('cmd')
902
    try:
903
      time.sleep(min(60, max_time))
904
    finally:
905
      if unlock:
906
        utils.Lock('cmd')
907

    
908
  if done:
909
    logger.ToStdout("Instance %s's disks are in sync." % instance.name)
910
  return not cumul_degraded
911

    
912

    
913
def _CheckDiskConsistency(cfgw, dev, node, on_primary):
914
  """Check that mirrors are not degraded.
915

916
  """
917
  cfgw.SetDiskID(dev, node)
918

    
919
  result = True
920
  if on_primary or dev.AssembleOnSecondary():
921
    rstats = rpc.call_blockdev_find(node, dev)
922
    if not rstats:
923
      logger.ToStderr("Can't get any data from node %s" % node)
924
      result = False
925
    else:
926
      result = result and (not rstats[5])
927
  if dev.children:
928
    for child in dev.children:
929
      result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
930

    
931
  return result
932

    
933

    
934
class LUDiagnoseOS(NoHooksLU):
935
  """Logical unit for OS diagnose/query.
936

937
  """
938
  _OP_REQP = []
939

    
940
  def CheckPrereq(self):
941
    """Check prerequisites.
942

943
    This always succeeds, since this is a pure query LU.
944

945
    """
946
    return
947

    
948
  def Exec(self, feedback_fn):
949
    """Compute the list of OSes.
950

951
    """
952
    node_list = self.cfg.GetNodeList()
953
    node_data = rpc.call_os_diagnose(node_list)
954
    if node_data == False:
955
      raise errors.OpExecError, "Can't gather the list of OSes"
956
    return node_data
957

    
958

    
959
class LURemoveNode(LogicalUnit):
960
  """Logical unit for removing a node.
961

962
  """
963
  HPATH = "node-remove"
964
  HTYPE = constants.HTYPE_NODE
965
  _OP_REQP = ["node_name"]
966

    
967
  def BuildHooksEnv(self):
968
    """Build hooks env.
969

970
    This doesn't run on the target node in the pre phase as a failed
971
    node would not allows itself to run.
972

973
    """
974
    all_nodes = self.cfg.GetNodeList()
975
    all_nodes.remove(self.op.node_name)
976
    return {"NODE_NAME": self.op.node_name}, all_nodes, all_nodes
977

    
978
  def CheckPrereq(self):
979
    """Check prerequisites.
980

981
    This checks:
982
     - the node exists in the configuration
983
     - it does not have primary or secondary instances
984
     - it's not the master
985

986
    Any errors are signalled by raising errors.OpPrereqError.
987

988
    """
989
    node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
990
    if node is None:
991
      logger.Error("Error: Node '%s' is unknown." % self.op.node_name)
992
      return 1
993

    
994
    instance_list = self.cfg.GetInstanceList()
995

    
996
    masternode = self.sstore.GetMasterNode()
997
    if node.name == masternode:
998
      raise errors.OpPrereqError, ("Node is the master node,"
999
                                   " you need to failover first.")
1000

    
1001
    for instance_name in instance_list:
1002
      instance = self.cfg.GetInstanceInfo(instance_name)
1003
      if node.name == instance.primary_node:
1004
        raise errors.OpPrereqError, ("Instance %s still running on the node,"
1005
                                     " please remove first." % instance_name)
1006
      if node.name in instance.secondary_nodes:
1007
        raise errors.OpPrereqError, ("Instance %s has node as a secondary,"
1008
                                     " please remove first." % instance_name)
1009
    self.op.node_name = node.name
1010
    self.node = node
1011

    
1012
  def Exec(self, feedback_fn):
1013
    """Removes the node from the cluster.
1014

1015
    """
1016
    node = self.node
1017
    logger.Info("stopping the node daemon and removing configs from node %s" %
1018
                node.name)
1019

    
1020
    rpc.call_node_leave_cluster(node.name)
1021

    
1022
    ssh.SSHCall(node.name, 'root', "%s stop" % constants.NODE_INITD_SCRIPT)
1023

    
1024
    logger.Info("Removing node %s from config" % node.name)
1025

    
1026
    self.cfg.RemoveNode(node.name)
1027

    
1028

    
1029
class LUQueryNodes(NoHooksLU):
1030
  """Logical unit for querying nodes.
1031

1032
  """
1033
  _OP_REQP = ["output_fields"]
1034

    
1035
  def CheckPrereq(self):
1036
    """Check prerequisites.
1037

1038
    This checks that the fields required are valid output fields.
1039

1040
    """
1041
    self.dynamic_fields = frozenset(["dtotal", "dfree",
1042
                                     "mtotal", "mnode", "mfree"])
1043

    
1044
    _CheckOutputFields(static=["name", "pinst", "sinst", "pip", "sip"],
1045
                       dynamic=self.dynamic_fields,
1046
                       selected=self.op.output_fields)
1047

    
1048

    
1049
  def Exec(self, feedback_fn):
1050
    """Computes the list of nodes and their attributes.
1051

1052
    """
1053
    nodenames = utils.NiceSort(self.cfg.GetNodeList())
1054
    nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1055

    
1056

    
1057
    # begin data gathering
1058

    
1059
    if self.dynamic_fields.intersection(self.op.output_fields):
1060
      live_data = {}
1061
      node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1062
      for name in nodenames:
1063
        nodeinfo = node_data.get(name, None)
1064
        if nodeinfo:
1065
          live_data[name] = {
1066
            "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1067
            "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1068
            "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1069
            "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1070
            "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1071
            }
1072
        else:
1073
          live_data[name] = {}
1074
    else:
1075
      live_data = dict.fromkeys(nodenames, {})
1076

    
1077
    node_to_primary = dict.fromkeys(nodenames, 0)
1078
    node_to_secondary = dict.fromkeys(nodenames, 0)
1079

    
1080
    if "pinst" in self.op.output_fields or "sinst" in self.op.output_fields:
1081
      instancelist = self.cfg.GetInstanceList()
1082

    
1083
      for instance in instancelist:
1084
        instanceinfo = self.cfg.GetInstanceInfo(instance)
1085
        node_to_primary[instanceinfo.primary_node] += 1
1086
        for secnode in instanceinfo.secondary_nodes:
1087
          node_to_secondary[secnode] += 1
1088

    
1089
    # end data gathering
1090

    
1091
    output = []
1092
    for node in nodelist:
1093
      node_output = []
1094
      for field in self.op.output_fields:
1095
        if field == "name":
1096
          val = node.name
1097
        elif field == "pinst":
1098
          val = node_to_primary[node.name]
1099
        elif field == "sinst":
1100
          val = node_to_secondary[node.name]
1101
        elif field == "pip":
1102
          val = node.primary_ip
1103
        elif field == "sip":
1104
          val = node.secondary_ip
1105
        elif field in self.dynamic_fields:
1106
          val = live_data[node.name].get(field, "?")
1107
        else:
1108
          raise errors.ParameterError, field
1109
        val = str(val)
1110
        node_output.append(val)
1111
      output.append(node_output)
1112

    
1113
    return output
1114

    
1115

    
1116
class LUQueryNodeVolumes(NoHooksLU):
1117
  """Logical unit for getting volumes on node(s).
1118

1119
  """
1120
  _OP_REQP = ["nodes", "output_fields"]
1121

    
1122
  def CheckPrereq(self):
1123
    """Check prerequisites.
1124

1125
    This checks that the fields required are valid output fields.
1126

1127
    """
1128
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1129

    
1130
    _CheckOutputFields(static=["node"],
1131
                       dynamic=["phys", "vg", "name", "size", "instance"],
1132
                       selected=self.op.output_fields)
1133

    
1134

    
1135
  def Exec(self, feedback_fn):
1136
    """Computes the list of nodes and their attributes.
1137

1138
    """
1139
    nodenames = utils.NiceSort([node.name for node in self.nodes])
1140
    volumes = rpc.call_node_volumes(nodenames)
1141

    
1142
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
1143
             in self.cfg.GetInstanceList()]
1144

    
1145
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1146

    
1147
    output = []
1148
    for node in nodenames:
1149
      node_vols = volumes[node][:]
1150
      node_vols.sort(key=lambda vol: vol['dev'])
1151

    
1152
      for vol in node_vols:
1153
        node_output = []
1154
        for field in self.op.output_fields:
1155
          if field == "node":
1156
            val = node
1157
          elif field == "phys":
1158
            val = vol['dev']
1159
          elif field == "vg":
1160
            val = vol['vg']
1161
          elif field == "name":
1162
            val = vol['name']
1163
          elif field == "size":
1164
            val = int(float(vol['size']))
1165
          elif field == "instance":
1166
            for inst in ilist:
1167
              if node not in lv_by_node[inst]:
1168
                continue
1169
              if vol['name'] in lv_by_node[inst][node]:
1170
                val = inst.name
1171
                break
1172
            else:
1173
              val = '-'
1174
          else:
1175
            raise errors.ParameterError, field
1176
          node_output.append(str(val))
1177

    
1178
        output.append(node_output)
1179

    
1180
    return output
1181

    
1182

    
1183
class LUAddNode(LogicalUnit):
1184
  """Logical unit for adding node to the cluster.
1185

1186
  """
1187
  HPATH = "node-add"
1188
  HTYPE = constants.HTYPE_NODE
1189
  _OP_REQP = ["node_name"]
1190

    
1191
  def BuildHooksEnv(self):
1192
    """Build hooks env.
1193

1194
    This will run on all nodes before, and on all nodes + the new node after.
1195

1196
    """
1197
    env = {
1198
      "NODE_NAME": self.op.node_name,
1199
      "NODE_PIP": self.op.primary_ip,
1200
      "NODE_SIP": self.op.secondary_ip,
1201
      }
1202
    nodes_0 = self.cfg.GetNodeList()
1203
    nodes_1 = nodes_0 + [self.op.node_name, ]
1204
    return env, nodes_0, nodes_1
1205

    
1206
  def CheckPrereq(self):
1207
    """Check prerequisites.
1208

1209
    This checks:
1210
     - the new node is not already in the config
1211
     - it is resolvable
1212
     - its parameters (single/dual homed) matches the cluster
1213

1214
    Any errors are signalled by raising errors.OpPrereqError.
1215

1216
    """
1217
    node_name = self.op.node_name
1218
    cfg = self.cfg
1219

    
1220
    dns_data = utils.LookupHostname(node_name)
1221
    if not dns_data:
1222
      raise errors.OpPrereqError, ("Node %s is not resolvable" % node_name)
1223

    
1224
    node = dns_data['hostname']
1225
    primary_ip = self.op.primary_ip = dns_data['ip']
1226
    secondary_ip = getattr(self.op, "secondary_ip", None)
1227
    if secondary_ip is None:
1228
      secondary_ip = primary_ip
1229
    if not utils.IsValidIP(secondary_ip):
1230
      raise errors.OpPrereqError, ("Invalid secondary IP given")
1231
    self.op.secondary_ip = secondary_ip
1232
    node_list = cfg.GetNodeList()
1233
    if node in node_list:
1234
      raise errors.OpPrereqError, ("Node %s is already in the configuration"
1235
                                   % node)
1236

    
1237
    for existing_node_name in node_list:
1238
      existing_node = cfg.GetNodeInfo(existing_node_name)
1239
      if (existing_node.primary_ip == primary_ip or
1240
          existing_node.secondary_ip == primary_ip or
1241
          existing_node.primary_ip == secondary_ip or
1242
          existing_node.secondary_ip == secondary_ip):
1243
        raise errors.OpPrereqError, ("New node ip address(es) conflict with"
1244
                                     " existing node %s" % existing_node.name)
1245

    
1246
    # check that the type of the node (single versus dual homed) is the
1247
    # same as for the master
1248
    myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1249
    master_singlehomed = myself.secondary_ip == myself.primary_ip
1250
    newbie_singlehomed = secondary_ip == primary_ip
1251
    if master_singlehomed != newbie_singlehomed:
1252
      if master_singlehomed:
1253
        raise errors.OpPrereqError, ("The master has no private ip but the"
1254
                                     " new node has one")
1255
      else:
1256
        raise errors.OpPrereqError ("The master has a private ip but the"
1257
                                    " new node doesn't have one")
1258

    
1259
    # checks reachablity
1260
    command = ["fping", "-q", primary_ip]
1261
    result = utils.RunCmd(command)
1262
    if result.failed:
1263
      raise errors.OpPrereqError, ("Node not reachable by ping")
1264

    
1265
    if not newbie_singlehomed:
1266
      # check reachability from my secondary ip to newbie's secondary ip
1267
      command = ["fping", "-S%s" % myself.secondary_ip, "-q", secondary_ip]
1268
      result = utils.RunCmd(command)
1269
      if result.failed:
1270
        raise errors.OpPrereqError, ("Node secondary ip not reachable by ping")
1271

    
1272
    self.new_node = objects.Node(name=node,
1273
                                 primary_ip=primary_ip,
1274
                                 secondary_ip=secondary_ip)
1275

    
1276
  def Exec(self, feedback_fn):
1277
    """Adds the new node to the cluster.
1278

1279
    """
1280
    new_node = self.new_node
1281
    node = new_node.name
1282

    
1283
    # set up inter-node password and certificate and restarts the node daemon
1284
    gntpass = self.sstore.GetNodeDaemonPassword()
1285
    if not re.match('^[a-zA-Z0-9.]{1,64}$', gntpass):
1286
      raise errors.OpExecError, ("ganeti password corruption detected")
1287
    f = open(constants.SSL_CERT_FILE)
1288
    try:
1289
      gntpem = f.read(8192)
1290
    finally:
1291
      f.close()
1292
    # in the base64 pem encoding, neither '!' nor '.' are valid chars,
1293
    # so we use this to detect an invalid certificate; as long as the
1294
    # cert doesn't contain this, the here-document will be correctly
1295
    # parsed by the shell sequence below
1296
    if re.search('^!EOF\.', gntpem, re.MULTILINE):
1297
      raise errors.OpExecError, ("invalid PEM encoding in the SSL certificate")
1298
    if not gntpem.endswith("\n"):
1299
      raise errors.OpExecError, ("PEM must end with newline")
1300
    logger.Info("copy cluster pass to %s and starting the node daemon" % node)
1301

    
1302
    # remove first the root's known_hosts file
1303
    utils.RemoveFile("/root/.ssh/known_hosts")
1304
    # and then connect with ssh to set password and start ganeti-noded
1305
    # note that all the below variables are sanitized at this point,
1306
    # either by being constants or by the checks above
1307
    ss = self.sstore
1308
    mycommand = ("umask 077 && "
1309
                 "echo '%s' > '%s' && "
1310
                 "cat > '%s' << '!EOF.' && \n"
1311
                 "%s!EOF.\n%s restart" %
1312
                 (gntpass, ss.KeyToFilename(ss.SS_NODED_PASS),
1313
                  constants.SSL_CERT_FILE, gntpem,
1314
                  constants.NODE_INITD_SCRIPT))
1315

    
1316
    result = ssh.SSHCall(node, 'root', mycommand, batch=False, ask_key=True)
1317
    if result.failed:
1318
      raise errors.OpExecError, ("Remote command on node %s, error: %s,"
1319
                                 " output: %s" %
1320
                                 (node, result.fail_reason, result.output))
1321

    
1322
    # check connectivity
1323
    time.sleep(4)
1324

    
1325
    result = rpc.call_version([node])[node]
1326
    if result:
1327
      if constants.PROTOCOL_VERSION == result:
1328
        logger.Info("communication to node %s fine, sw version %s match" %
1329
                    (node, result))
1330
      else:
1331
        raise errors.OpExecError, ("Version mismatch master version %s,"
1332
                                   " node version %s" %
1333
                                   (constants.PROTOCOL_VERSION, result))
1334
    else:
1335
      raise errors.OpExecError, ("Cannot get version from the new node")
1336

    
1337
    # setup ssh on node
1338
    logger.Info("copy ssh key to node %s" % node)
1339
    keyarray = []
1340
    keyfiles = ["/etc/ssh/ssh_host_dsa_key", "/etc/ssh/ssh_host_dsa_key.pub",
1341
                "/etc/ssh/ssh_host_rsa_key", "/etc/ssh/ssh_host_rsa_key.pub",
1342
                "/root/.ssh/id_dsa", "/root/.ssh/id_dsa.pub"]
1343

    
1344
    for i in keyfiles:
1345
      f = open(i, 'r')
1346
      try:
1347
        keyarray.append(f.read())
1348
      finally:
1349
        f.close()
1350

    
1351
    result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1352
                               keyarray[3], keyarray[4], keyarray[5])
1353

    
1354
    if not result:
1355
      raise errors.OpExecError, ("Cannot transfer ssh keys to the new node")
1356

    
1357
    # Add node to our /etc/hosts, and add key to known_hosts
1358
    _UpdateEtcHosts(new_node.name, new_node.primary_ip)
1359
    _UpdateKnownHosts(new_node.name, new_node.primary_ip,
1360
                      self.cfg.GetHostKey())
1361

    
1362
    if new_node.secondary_ip != new_node.primary_ip:
1363
      result = ssh.SSHCall(node, "root",
1364
                           "fping -S 127.0.0.1 -q %s" % new_node.secondary_ip)
1365
      if result.failed:
1366
        raise errors.OpExecError, ("Node claims it doesn't have the"
1367
                                   " secondary ip you gave (%s).\n"
1368
                                   "Please fix and re-run this command." %
1369
                                   new_node.secondary_ip)
1370

    
1371
    # Distribute updated /etc/hosts and known_hosts to all nodes,
1372
    # including the node just added
1373
    myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1374
    dist_nodes = self.cfg.GetNodeList() + [node]
1375
    if myself.name in dist_nodes:
1376
      dist_nodes.remove(myself.name)
1377

    
1378
    logger.Debug("Copying hosts and known_hosts to all nodes")
1379
    for fname in ("/etc/hosts", "/etc/ssh/ssh_known_hosts"):
1380
      result = rpc.call_upload_file(dist_nodes, fname)
1381
      for to_node in dist_nodes:
1382
        if not result[to_node]:
1383
          logger.Error("copy of file %s to node %s failed" %
1384
                       (fname, to_node))
1385

    
1386
    to_copy = ss.GetFileList()
1387
    for fname in to_copy:
1388
      if not ssh.CopyFileToNode(node, fname):
1389
        logger.Error("could not copy file %s to node %s" % (fname, node))
1390

    
1391
    logger.Info("adding node %s to cluster.conf" % node)
1392
    self.cfg.AddNode(new_node)
1393

    
1394

    
1395
class LUMasterFailover(LogicalUnit):
1396
  """Failover the master node to the current node.
1397

1398
  This is a special LU in that it must run on a non-master node.
1399

1400
  """
1401
  HPATH = "master-failover"
1402
  HTYPE = constants.HTYPE_CLUSTER
1403
  REQ_MASTER = False
1404
  _OP_REQP = []
1405

    
1406
  def BuildHooksEnv(self):
1407
    """Build hooks env.
1408

1409
    This will run on the new master only in the pre phase, and on all
1410
    the nodes in the post phase.
1411

1412
    """
1413
    env = {
1414
      "NEW_MASTER": self.new_master,
1415
      "OLD_MASTER": self.old_master,
1416
      }
1417
    return env, [self.new_master], self.cfg.GetNodeList()
1418

    
1419
  def CheckPrereq(self):
1420
    """Check prerequisites.
1421

1422
    This checks that we are not already the master.
1423

1424
    """
1425
    self.new_master = socket.gethostname()
1426

    
1427
    self.old_master = self.sstore.GetMasterNode()
1428

    
1429
    if self.old_master == self.new_master:
1430
      raise errors.OpPrereqError, ("This commands must be run on the node"
1431
                                   " where you want the new master to be.\n"
1432
                                   "%s is already the master" %
1433
                                   self.old_master)
1434

    
1435
  def Exec(self, feedback_fn):
1436
    """Failover the master node.
1437

1438
    This command, when run on a non-master node, will cause the current
1439
    master to cease being master, and the non-master to become new
1440
    master.
1441

1442
    """
1443
    #TODO: do not rely on gethostname returning the FQDN
1444
    logger.Info("setting master to %s, old master: %s" %
1445
                (self.new_master, self.old_master))
1446

    
1447
    if not rpc.call_node_stop_master(self.old_master):
1448
      logger.Error("could disable the master role on the old master"
1449
                   " %s, please disable manually" % self.old_master)
1450

    
1451
    ss = self.sstore
1452
    ss.SetKey(ss.SS_MASTER_NODE, self.new_master)
1453
    if not rpc.call_upload_file(self.cfg.GetNodeList(),
1454
                                ss.KeyToFilename(ss.SS_MASTER_NODE)):
1455
      logger.Error("could not distribute the new simple store master file"
1456
                   " to the other nodes, please check.")
1457

    
1458
    if not rpc.call_node_start_master(self.new_master):
1459
      logger.Error("could not start the master role on the new master"
1460
                   " %s, please check" % self.new_master)
1461
      feedback_fn("Error in activating the master IP on the new master,\n"
1462
                  "please fix manually.")
1463

    
1464

    
1465

    
1466
class LUQueryClusterInfo(NoHooksLU):
1467
  """Query cluster configuration.
1468

1469
  """
1470
  _OP_REQP = []
1471

    
1472
  def CheckPrereq(self):
1473
    """No prerequsites needed for this LU.
1474

1475
    """
1476
    pass
1477

    
1478
  def Exec(self, feedback_fn):
1479
    """Return cluster config.
1480

1481
    """
1482
    instances = [self.cfg.GetInstanceInfo(name)
1483
                 for name in self.cfg.GetInstanceList()]
1484
    result = {
1485
      "name": self.sstore.GetClusterName(),
1486
      "software_version": constants.RELEASE_VERSION,
1487
      "protocol_version": constants.PROTOCOL_VERSION,
1488
      "config_version": constants.CONFIG_VERSION,
1489
      "os_api_version": constants.OS_API_VERSION,
1490
      "export_version": constants.EXPORT_VERSION,
1491
      "master": self.sstore.GetMasterNode(),
1492
      "architecture": (platform.architecture()[0], platform.machine()),
1493
      "instances": [(instance.name, instance.primary_node)
1494
                    for instance in instances],
1495
      "nodes": self.cfg.GetNodeList(),
1496
      }
1497

    
1498
    return result
1499

    
1500

    
1501
class LUClusterCopyFile(NoHooksLU):
1502
  """Copy file to cluster.
1503

1504
  """
1505
  _OP_REQP = ["nodes", "filename"]
1506

    
1507
  def CheckPrereq(self):
1508
    """Check prerequisites.
1509

1510
    It should check that the named file exists and that the given list
1511
    of nodes is valid.
1512

1513
    """
1514
    if not os.path.exists(self.op.filename):
1515
      raise errors.OpPrereqError("No such filename '%s'" % self.op.filename)
1516

    
1517
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1518

    
1519
  def Exec(self, feedback_fn):
1520
    """Copy a file from master to some nodes.
1521

1522
    Args:
1523
      opts - class with options as members
1524
      args - list containing a single element, the file name
1525
    Opts used:
1526
      nodes - list containing the name of target nodes; if empty, all nodes
1527

1528
    """
1529
    filename = self.op.filename
1530

    
1531
    myname = socket.gethostname()
1532

    
1533
    for node in self.nodes:
1534
      if node == myname:
1535
        continue
1536
      if not ssh.CopyFileToNode(node, filename):
1537
        logger.Error("Copy of file %s to node %s failed" % (filename, node))
1538

    
1539

    
1540
class LUDumpClusterConfig(NoHooksLU):
1541
  """Return a text-representation of the cluster-config.
1542

1543
  """
1544
  _OP_REQP = []
1545

    
1546
  def CheckPrereq(self):
1547
    """No prerequisites.
1548

1549
    """
1550
    pass
1551

    
1552
  def Exec(self, feedback_fn):
1553
    """Dump a representation of the cluster config to the standard output.
1554

1555
    """
1556
    return self.cfg.DumpConfig()
1557

    
1558

    
1559
class LURunClusterCommand(NoHooksLU):
1560
  """Run a command on some nodes.
1561

1562
  """
1563
  _OP_REQP = ["command", "nodes"]
1564

    
1565
  def CheckPrereq(self):
1566
    """Check prerequisites.
1567

1568
    It checks that the given list of nodes is valid.
1569

1570
    """
1571
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1572

    
1573
  def Exec(self, feedback_fn):
1574
    """Run a command on some nodes.
1575

1576
    """
1577
    data = []
1578
    for node in self.nodes:
1579
      result = utils.RunCmd(["ssh", node.name, self.op.command])
1580
      data.append((node.name, result.cmd, result.output, result.exit_code))
1581

    
1582
    return data
1583

    
1584

    
1585
class LUActivateInstanceDisks(NoHooksLU):
1586
  """Bring up an instance's disks.
1587

1588
  """
1589
  _OP_REQP = ["instance_name"]
1590

    
1591
  def CheckPrereq(self):
1592
    """Check prerequisites.
1593

1594
    This checks that the instance is in the cluster.
1595

1596
    """
1597
    instance = self.cfg.GetInstanceInfo(
1598
      self.cfg.ExpandInstanceName(self.op.instance_name))
1599
    if instance is None:
1600
      raise errors.OpPrereqError, ("Instance '%s' not known" %
1601
                                   self.op.instance_name)
1602
    self.instance = instance
1603

    
1604

    
1605
  def Exec(self, feedback_fn):
1606
    """Activate the disks.
1607

1608
    """
1609
    disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
1610
    if not disks_ok:
1611
      raise errors.OpExecError, ("Cannot activate block devices")
1612

    
1613
    return disks_info
1614

    
1615

    
1616
def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
1617
  """Prepare the block devices for an instance.
1618

1619
  This sets up the block devices on all nodes.
1620

1621
  Args:
1622
    instance: a ganeti.objects.Instance object
1623
    ignore_secondaries: if true, errors on secondary nodes won't result
1624
                        in an error return from the function
1625

1626
  Returns:
1627
    false if the operation failed
1628
    list of (host, instance_visible_name, node_visible_name) if the operation
1629
         suceeded with the mapping from node devices to instance devices
1630
  """
1631
  device_info = []
1632
  disks_ok = True
1633
  for inst_disk in instance.disks:
1634
    master_result = None
1635
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1636
      cfg.SetDiskID(node_disk, node)
1637
      is_primary = node == instance.primary_node
1638
      result = rpc.call_blockdev_assemble(node, node_disk, is_primary)
1639
      if not result:
1640
        logger.Error("could not prepare block device %s on node %s (is_pri"
1641
                     "mary=%s)" % (inst_disk.iv_name, node, is_primary))
1642
        if is_primary or not ignore_secondaries:
1643
          disks_ok = False
1644
      if is_primary:
1645
        master_result = result
1646
    device_info.append((instance.primary_node, inst_disk.iv_name,
1647
                        master_result))
1648

    
1649
  return disks_ok, device_info
1650

    
1651

    
1652
def _StartInstanceDisks(cfg, instance, force):
1653
  disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
1654
                                           ignore_secondaries=force)
1655
  if not disks_ok:
1656
    _ShutdownInstanceDisks(instance, cfg)
1657
    if force is not None and not force:
1658
      logger.Error("If the message above refers to a secondary node,"
1659
                   " you can retry the operation using '--force'.")
1660
    raise errors.OpExecError, ("Disk consistency error")
1661

    
1662

    
1663
class LUDeactivateInstanceDisks(NoHooksLU):
1664
  """Shutdown an instance's disks.
1665

1666
  """
1667
  _OP_REQP = ["instance_name"]
1668

    
1669
  def CheckPrereq(self):
1670
    """Check prerequisites.
1671

1672
    This checks that the instance is in the cluster.
1673

1674
    """
1675
    instance = self.cfg.GetInstanceInfo(
1676
      self.cfg.ExpandInstanceName(self.op.instance_name))
1677
    if instance is None:
1678
      raise errors.OpPrereqError, ("Instance '%s' not known" %
1679
                                   self.op.instance_name)
1680
    self.instance = instance
1681

    
1682
  def Exec(self, feedback_fn):
1683
    """Deactivate the disks
1684

1685
    """
1686
    instance = self.instance
1687
    ins_l = rpc.call_instance_list([instance.primary_node])
1688
    ins_l = ins_l[instance.primary_node]
1689
    if not type(ins_l) is list:
1690
      raise errors.OpExecError, ("Can't contact node '%s'" %
1691
                                 instance.primary_node)
1692

    
1693
    if self.instance.name in ins_l:
1694
      raise errors.OpExecError, ("Instance is running, can't shutdown"
1695
                                 " block devices.")
1696

    
1697
    _ShutdownInstanceDisks(instance, self.cfg)
1698

    
1699

    
1700
def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
1701
  """Shutdown block devices of an instance.
1702

1703
  This does the shutdown on all nodes of the instance.
1704

1705
  If the ignore_primary is false, errors on the primary node are
1706
  ignored.
1707

1708
  """
1709
  result = True
1710
  for disk in instance.disks:
1711
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
1712
      cfg.SetDiskID(top_disk, node)
1713
      if not rpc.call_blockdev_shutdown(node, top_disk):
1714
        logger.Error("could not shutdown block device %s on node %s" %
1715
                     (disk.iv_name, node))
1716
        if not ignore_primary or node != instance.primary_node:
1717
          result = False
1718
  return result
1719

    
1720

    
1721
class LUStartupInstance(LogicalUnit):
1722
  """Starts an instance.
1723

1724
  """
1725
  HPATH = "instance-start"
1726
  HTYPE = constants.HTYPE_INSTANCE
1727
  _OP_REQP = ["instance_name", "force"]
1728

    
1729
  def BuildHooksEnv(self):
1730
    """Build hooks env.
1731

1732
    This runs on master, primary and secondary nodes of the instance.
1733

1734
    """
1735
    env = {
1736
      "INSTANCE_NAME": self.op.instance_name,
1737
      "INSTANCE_PRIMARY": self.instance.primary_node,
1738
      "INSTANCE_SECONDARIES": " ".join(self.instance.secondary_nodes),
1739
      "FORCE": self.op.force,
1740
      }
1741
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
1742
          list(self.instance.secondary_nodes))
1743
    return env, nl, nl
1744

    
1745
  def CheckPrereq(self):
1746
    """Check prerequisites.
1747

1748
    This checks that the instance is in the cluster.
1749

1750
    """
1751
    instance = self.cfg.GetInstanceInfo(
1752
      self.cfg.ExpandInstanceName(self.op.instance_name))
1753
    if instance is None:
1754
      raise errors.OpPrereqError, ("Instance '%s' not known" %
1755
                                   self.op.instance_name)
1756

    
1757
    # check bridges existance
1758
    brlist = [nic.bridge for nic in instance.nics]
1759
    if not rpc.call_bridges_exist(instance.primary_node, brlist):
1760
      raise errors.OpPrereqError, ("one or more target bridges %s does not"
1761
                                   " exist on destination node '%s'" %
1762
                                   (brlist, instance.primary_node))
1763

    
1764
    self.instance = instance
1765
    self.op.instance_name = instance.name
1766

    
1767
  def Exec(self, feedback_fn):
1768
    """Start the instance.
1769

1770
    """
1771
    instance = self.instance
1772
    force = self.op.force
1773
    extra_args = getattr(self.op, "extra_args", "")
1774

    
1775
    node_current = instance.primary_node
1776

    
1777
    nodeinfo = rpc.call_node_info([node_current], self.cfg.GetVGName())
1778
    if not nodeinfo:
1779
      raise errors.OpExecError, ("Could not contact node %s for infos" %
1780
                                 (node_current))
1781

    
1782
    freememory = nodeinfo[node_current]['memory_free']
1783
    memory = instance.memory
1784
    if memory > freememory:
1785
      raise errors.OpExecError, ("Not enough memory to start instance"
1786
                                 " %s on node %s"
1787
                                 " needed %s MiB, available %s MiB" %
1788
                                 (instance.name, node_current, memory,
1789
                                  freememory))
1790

    
1791
    _StartInstanceDisks(self.cfg, instance, force)
1792

    
1793
    if not rpc.call_instance_start(node_current, instance, extra_args):
1794
      _ShutdownInstanceDisks(instance, self.cfg)
1795
      raise errors.OpExecError, ("Could not start instance")
1796

    
1797
    self.cfg.MarkInstanceUp(instance.name)
1798

    
1799

    
1800
class LUShutdownInstance(LogicalUnit):
1801
  """Shutdown an instance.
1802

1803
  """
1804
  HPATH = "instance-stop"
1805
  HTYPE = constants.HTYPE_INSTANCE
1806
  _OP_REQP = ["instance_name"]
1807

    
1808
  def BuildHooksEnv(self):
1809
    """Build hooks env.
1810

1811
    This runs on master, primary and secondary nodes of the instance.
1812

1813
    """
1814
    env = {
1815
      "INSTANCE_NAME": self.op.instance_name,
1816
      "INSTANCE_PRIMARY": self.instance.primary_node,
1817
      "INSTANCE_SECONDARIES": " ".join(self.instance.secondary_nodes),
1818
      }
1819
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
1820
          list(self.instance.secondary_nodes))
1821
    return env, nl, nl
1822

    
1823
  def CheckPrereq(self):
1824
    """Check prerequisites.
1825

1826
    This checks that the instance is in the cluster.
1827

1828
    """
1829
    instance = self.cfg.GetInstanceInfo(
1830
      self.cfg.ExpandInstanceName(self.op.instance_name))
1831
    if instance is None:
1832
      raise errors.OpPrereqError, ("Instance '%s' not known" %
1833
                                   self.op.instance_name)
1834
    self.instance = instance
1835

    
1836
  def Exec(self, feedback_fn):
1837
    """Shutdown the instance.
1838

1839
    """
1840
    instance = self.instance
1841
    node_current = instance.primary_node
1842
    if not rpc.call_instance_shutdown(node_current, instance):
1843
      logger.Error("could not shutdown instance")
1844

    
1845
    self.cfg.MarkInstanceDown(instance.name)
1846
    _ShutdownInstanceDisks(instance, self.cfg)
1847

    
1848

    
1849
class LUReinstallInstance(LogicalUnit):
1850
  """Reinstall an instance.
1851

1852
  """
1853
  HPATH = "instance-reinstall"
1854
  HTYPE = constants.HTYPE_INSTANCE
1855
  _OP_REQP = ["instance_name"]
1856

    
1857
  def BuildHooksEnv(self):
1858
    """Build hooks env.
1859

1860
    This runs on master, primary and secondary nodes of the instance.
1861

1862
    """
1863
    env = {
1864
      "INSTANCE_NAME": self.op.instance_name,
1865
      "INSTANCE_PRIMARY": self.instance.primary_node,
1866
      "INSTANCE_SECONDARIES": " ".join(self.instance.secondary_nodes),
1867
      }
1868
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
1869
          list(self.instance.secondary_nodes))
1870
    return env, nl, nl
1871

    
1872
  def CheckPrereq(self):
1873
    """Check prerequisites.
1874

1875
    This checks that the instance is in the cluster and is not running.
1876

1877
    """
1878
    instance = self.cfg.GetInstanceInfo(
1879
      self.cfg.ExpandInstanceName(self.op.instance_name))
1880
    if instance is None:
1881
      raise errors.OpPrereqError, ("Instance '%s' not known" %
1882
                                   self.op.instance_name)
1883
    if instance.disk_template == constants.DT_DISKLESS:
1884
      raise errors.OpPrereqError, ("Instance '%s' has no disks" %
1885
                                   self.op.instance_name)
1886
    if instance.status != "down":
1887
      raise errors.OpPrereqError, ("Instance '%s' is marked to be up" %
1888
                                   self.op.instance_name)
1889
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
1890
    if remote_info:
1891
      raise errors.OpPrereqError, ("Instance '%s' is running on the node %s" %
1892
                                   (self.op.instance_name,
1893
                                    instance.primary_node))
1894
    self.instance = instance
1895

    
1896
  def Exec(self, feedback_fn):
1897
    """Reinstall the instance.
1898

1899
    """
1900
    inst = self.instance
1901

    
1902
    _StartInstanceDisks(self.cfg, inst, None)
1903
    try:
1904
      feedback_fn("Running the instance OS create scripts...")
1905
      if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
1906
        raise errors.OpExecError, ("Could not install OS for instance %s "
1907
                                   "on node %s" %
1908
                                   (inst.name, inst.primary_node))
1909
    finally:
1910
      _ShutdownInstanceDisks(inst, self.cfg)
1911

    
1912

    
1913
class LURemoveInstance(LogicalUnit):
1914
  """Remove an instance.
1915

1916
  """
1917
  HPATH = "instance-remove"
1918
  HTYPE = constants.HTYPE_INSTANCE
1919
  _OP_REQP = ["instance_name"]
1920

    
1921
  def BuildHooksEnv(self):
1922
    """Build hooks env.
1923

1924
    This runs on master, primary and secondary nodes of the instance.
1925

1926
    """
1927
    env = {
1928
      "INSTANCE_NAME": self.op.instance_name,
1929
      "INSTANCE_PRIMARY": self.instance.primary_node,
1930
      "INSTANCE_SECONDARIES": " ".join(self.instance.secondary_nodes),
1931
      }
1932
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
1933
          list(self.instance.secondary_nodes))
1934
    return env, nl, nl
1935

    
1936
  def CheckPrereq(self):
1937
    """Check prerequisites.
1938

1939
    This checks that the instance is in the cluster.
1940

1941
    """
1942
    instance = self.cfg.GetInstanceInfo(
1943
      self.cfg.ExpandInstanceName(self.op.instance_name))
1944
    if instance is None:
1945
      raise errors.OpPrereqError, ("Instance '%s' not known" %
1946
                                   self.op.instance_name)
1947
    self.instance = instance
1948

    
1949
  def Exec(self, feedback_fn):
1950
    """Remove the instance.
1951

1952
    """
1953
    instance = self.instance
1954
    logger.Info("shutting down instance %s on node %s" %
1955
                (instance.name, instance.primary_node))
1956

    
1957
    if not rpc.call_instance_shutdown(instance.primary_node, instance):
1958
      raise errors.OpExecError, ("Could not shutdown instance %s on node %s" %
1959
                                 (instance.name, instance.primary_node))
1960

    
1961
    logger.Info("removing block devices for instance %s" % instance.name)
1962

    
1963
    _RemoveDisks(instance, self.cfg)
1964

    
1965
    logger.Info("removing instance %s out of cluster config" % instance.name)
1966

    
1967
    self.cfg.RemoveInstance(instance.name)
1968

    
1969

    
1970
class LUQueryInstances(NoHooksLU):
1971
  """Logical unit for querying instances.
1972

1973
  """
1974
  _OP_REQP = ["output_fields"]
1975

    
1976
  def CheckPrereq(self):
1977
    """Check prerequisites.
1978

1979
    This checks that the fields required are valid output fields.
1980

1981
    """
1982
    self.dynamic_fields = frozenset(["oper_state", "oper_ram"])
1983
    _CheckOutputFields(static=["name", "os", "pnode", "snodes",
1984
                               "admin_state", "admin_ram",
1985
                               "disk_template", "ip", "mac", "bridge"],
1986
                       dynamic=self.dynamic_fields,
1987
                       selected=self.op.output_fields)
1988

    
1989
  def Exec(self, feedback_fn):
1990
    """Computes the list of nodes and their attributes.
1991

1992
    """
1993
    instance_names = utils.NiceSort(self.cfg.GetInstanceList())
1994
    instance_list = [self.cfg.GetInstanceInfo(iname) for iname
1995
                     in instance_names]
1996

    
1997
    # begin data gathering
1998

    
1999
    nodes = frozenset([inst.primary_node for inst in instance_list])
2000

    
2001
    bad_nodes = []
2002
    if self.dynamic_fields.intersection(self.op.output_fields):
2003
      live_data = {}
2004
      node_data = rpc.call_all_instances_info(nodes)
2005
      for name in nodes:
2006
        result = node_data[name]
2007
        if result:
2008
          live_data.update(result)
2009
        elif result == False:
2010
          bad_nodes.append(name)
2011
        # else no instance is alive
2012
    else:
2013
      live_data = dict([(name, {}) for name in instance_names])
2014

    
2015
    # end data gathering
2016

    
2017
    output = []
2018
    for instance in instance_list:
2019
      iout = []
2020
      for field in self.op.output_fields:
2021
        if field == "name":
2022
          val = instance.name
2023
        elif field == "os":
2024
          val = instance.os
2025
        elif field == "pnode":
2026
          val = instance.primary_node
2027
        elif field == "snodes":
2028
          val = ",".join(instance.secondary_nodes) or "-"
2029
        elif field == "admin_state":
2030
          if instance.status == "down":
2031
            val = "no"
2032
          else:
2033
            val = "yes"
2034
        elif field == "oper_state":
2035
          if instance.primary_node in bad_nodes:
2036
            val = "(node down)"
2037
          else:
2038
            if live_data.get(instance.name):
2039
              val = "running"
2040
            else:
2041
              val = "stopped"
2042
        elif field == "admin_ram":
2043
          val = instance.memory
2044
        elif field == "oper_ram":
2045
          if instance.primary_node in bad_nodes:
2046
            val = "(node down)"
2047
          elif instance.name in live_data:
2048
            val = live_data[instance.name].get("memory", "?")
2049
          else:
2050
            val = "-"
2051
        elif field == "disk_template":
2052
          val = instance.disk_template
2053
        elif field == "ip":
2054
          val = instance.nics[0].ip
2055
        elif field == "bridge":
2056
          val = instance.nics[0].bridge
2057
        elif field == "mac":
2058
          val = instance.nics[0].mac
2059
        else:
2060
          raise errors.ParameterError, field
2061
        val = str(val)
2062
        iout.append(val)
2063
      output.append(iout)
2064

    
2065
    return output
2066

    
2067

    
2068
class LUFailoverInstance(LogicalUnit):
2069
  """Failover an instance.
2070

2071
  """
2072
  HPATH = "instance-failover"
2073
  HTYPE = constants.HTYPE_INSTANCE
2074
  _OP_REQP = ["instance_name", "ignore_consistency"]
2075

    
2076
  def BuildHooksEnv(self):
2077
    """Build hooks env.
2078

2079
    This runs on master, primary and secondary nodes of the instance.
2080

2081
    """
2082
    env = {
2083
      "INSTANCE_NAME": self.op.instance_name,
2084
      "INSTANCE_PRIMARY": self.instance.primary_node,
2085
      "INSTANCE_SECONDARIES": " ".join(self.instance.secondary_nodes),
2086
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2087
      }
2088
    nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2089
    return env, nl, nl
2090

    
2091
  def CheckPrereq(self):
2092
    """Check prerequisites.
2093

2094
    This checks that the instance is in the cluster.
2095

2096
    """
2097
    instance = self.cfg.GetInstanceInfo(
2098
      self.cfg.ExpandInstanceName(self.op.instance_name))
2099
    if instance is None:
2100
      raise errors.OpPrereqError, ("Instance '%s' not known" %
2101
                                   self.op.instance_name)
2102

    
2103
    # check memory requirements on the secondary node
2104
    target_node = instance.secondary_nodes[0]
2105
    nodeinfo = rpc.call_node_info([target_node], self.cfg.GetVGName())
2106
    info = nodeinfo.get(target_node, None)
2107
    if not info:
2108
      raise errors.OpPrereqError, ("Cannot get current information"
2109
                                   " from node '%s'" % nodeinfo)
2110
    if instance.memory > info['memory_free']:
2111
      raise errors.OpPrereqError, ("Not enough memory on target node %s."
2112
                                   " %d MB available, %d MB required" %
2113
                                   (target_node, info['memory_free'],
2114
                                    instance.memory))
2115

    
2116
    # check bridge existance
2117
    brlist = [nic.bridge for nic in instance.nics]
2118
    if not rpc.call_bridges_exist(instance.primary_node, brlist):
2119
      raise errors.OpPrereqError, ("one or more target bridges %s does not"
2120
                                   " exist on destination node '%s'" %
2121
                                   (brlist, instance.primary_node))
2122

    
2123
    self.instance = instance
2124

    
2125
  def Exec(self, feedback_fn):
2126
    """Failover an instance.
2127

2128
    The failover is done by shutting it down on its present node and
2129
    starting it on the secondary.
2130

2131
    """
2132
    instance = self.instance
2133

    
2134
    source_node = instance.primary_node
2135
    target_node = instance.secondary_nodes[0]
2136

    
2137
    feedback_fn("* checking disk consistency between source and target")
2138
    for dev in instance.disks:
2139
      # for remote_raid1, these are md over drbd
2140
      if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2141
        if not self.op.ignore_consistency:
2142
          raise errors.OpExecError, ("Disk %s is degraded on target node,"
2143
                                     " aborting failover." % dev.iv_name)
2144

    
2145
    feedback_fn("* checking target node resource availability")
2146
    nodeinfo = rpc.call_node_info([target_node], self.cfg.GetVGName())
2147

    
2148
    if not nodeinfo:
2149
      raise errors.OpExecError, ("Could not contact target node %s." %
2150
                                 target_node)
2151

    
2152
    free_memory = int(nodeinfo[target_node]['memory_free'])
2153
    memory = instance.memory
2154
    if memory > free_memory:
2155
      raise errors.OpExecError, ("Not enough memory to create instance %s on"
2156
                                 " node %s. needed %s MiB, available %s MiB" %
2157
                                 (instance.name, target_node, memory,
2158
                                  free_memory))
2159

    
2160
    feedback_fn("* shutting down instance on source node")
2161
    logger.Info("Shutting down instance %s on node %s" %
2162
                (instance.name, source_node))
2163

    
2164
    if not rpc.call_instance_shutdown(source_node, instance):
2165
      logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2166
                   " anyway. Please make sure node %s is down"  %
2167
                   (instance.name, source_node, source_node))
2168

    
2169
    feedback_fn("* deactivating the instance's disks on source node")
2170
    if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2171
      raise errors.OpExecError, ("Can't shut down the instance's disks.")
2172

    
2173
    instance.primary_node = target_node
2174
    # distribute new instance config to the other nodes
2175
    self.cfg.AddInstance(instance)
2176

    
2177
    feedback_fn("* activating the instance's disks on target node")
2178
    logger.Info("Starting instance %s on node %s" %
2179
                (instance.name, target_node))
2180

    
2181
    disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2182
                                             ignore_secondaries=True)
2183
    if not disks_ok:
2184
      _ShutdownInstanceDisks(instance, self.cfg)
2185
      raise errors.OpExecError, ("Can't activate the instance's disks")
2186

    
2187
    feedback_fn("* starting the instance on the target node")
2188
    if not rpc.call_instance_start(target_node, instance, None):
2189
      _ShutdownInstanceDisks(instance, self.cfg)
2190
      raise errors.OpExecError("Could not start instance %s on node %s." %
2191
                               (instance.name, target_node))
2192

    
2193

    
2194
def _CreateBlockDevOnPrimary(cfg, node, device):
2195
  """Create a tree of block devices on the primary node.
2196

2197
  This always creates all devices.
2198

2199
  """
2200
  if device.children:
2201
    for child in device.children:
2202
      if not _CreateBlockDevOnPrimary(cfg, node, child):
2203
        return False
2204

    
2205
  cfg.SetDiskID(device, node)
2206
  new_id = rpc.call_blockdev_create(node, device, device.size, True)
2207
  if not new_id:
2208
    return False
2209
  if device.physical_id is None:
2210
    device.physical_id = new_id
2211
  return True
2212

    
2213

    
2214
def _CreateBlockDevOnSecondary(cfg, node, device, force):
2215
  """Create a tree of block devices on a secondary node.
2216

2217
  If this device type has to be created on secondaries, create it and
2218
  all its children.
2219

2220
  If not, just recurse to children keeping the same 'force' value.
2221

2222
  """
2223
  if device.CreateOnSecondary():
2224
    force = True
2225
  if device.children:
2226
    for child in device.children:
2227
      if not _CreateBlockDevOnSecondary(cfg, node, child, force):
2228
        return False
2229

    
2230
  if not force:
2231
    return True
2232
  cfg.SetDiskID(device, node)
2233
  new_id = rpc.call_blockdev_create(node, device, device.size, False)
2234
  if not new_id:
2235
    return False
2236
  if device.physical_id is None:
2237
    device.physical_id = new_id
2238
  return True
2239

    
2240

    
2241
def _GenerateMDDRBDBranch(cfg, vgname, primary, secondary, size, base):
2242
  """Generate a drbd device complete with its children.
2243

2244
  """
2245
  port = cfg.AllocatePort()
2246
  base = "%s_%s" % (base, port)
2247
  dev_data = objects.Disk(dev_type="lvm", size=size,
2248
                          logical_id=(vgname, "%s.data" % base))
2249
  dev_meta = objects.Disk(dev_type="lvm", size=128,
2250
                          logical_id=(vgname, "%s.meta" % base))
2251
  drbd_dev = objects.Disk(dev_type="drbd", size=size,
2252
                          logical_id = (primary, secondary, port),
2253
                          children = [dev_data, dev_meta])
2254
  return drbd_dev
2255

    
2256

    
2257
def _GenerateDiskTemplate(cfg, vgname, template_name,
2258
                          instance_name, primary_node,
2259
                          secondary_nodes, disk_sz, swap_sz):
2260
  """Generate the entire disk layout for a given template type.
2261

2262
  """
2263
  #TODO: compute space requirements
2264

    
2265
  if template_name == "diskless":
2266
    disks = []
2267
  elif template_name == "plain":
2268
    if len(secondary_nodes) != 0:
2269
      raise errors.ProgrammerError("Wrong template configuration")
2270
    sda_dev = objects.Disk(dev_type="lvm", size=disk_sz,
2271
                           logical_id=(vgname, "%s.os" % instance_name),
2272
                           iv_name = "sda")
2273
    sdb_dev = objects.Disk(dev_type="lvm", size=swap_sz,
2274
                           logical_id=(vgname, "%s.swap" % instance_name),
2275
                           iv_name = "sdb")
2276
    disks = [sda_dev, sdb_dev]
2277
  elif template_name == "local_raid1":
2278
    if len(secondary_nodes) != 0:
2279
      raise errors.ProgrammerError("Wrong template configuration")
2280
    sda_dev_m1 = objects.Disk(dev_type="lvm", size=disk_sz,
2281
                              logical_id=(vgname, "%s.os_m1" % instance_name))
2282
    sda_dev_m2 = objects.Disk(dev_type="lvm", size=disk_sz,
2283
                              logical_id=(vgname, "%s.os_m2" % instance_name))
2284
    md_sda_dev = objects.Disk(dev_type="md_raid1", iv_name = "sda",
2285
                              size=disk_sz,
2286
                              children = [sda_dev_m1, sda_dev_m2])
2287
    sdb_dev_m1 = objects.Disk(dev_type="lvm", size=swap_sz,
2288
                              logical_id=(vgname, "%s.swap_m1" %
2289
                                          instance_name))
2290
    sdb_dev_m2 = objects.Disk(dev_type="lvm", size=swap_sz,
2291
                              logical_id=(vgname, "%s.swap_m2" %
2292
                                          instance_name))
2293
    md_sdb_dev = objects.Disk(dev_type="md_raid1", iv_name = "sdb",
2294
                              size=swap_sz,
2295
                              children = [sdb_dev_m1, sdb_dev_m2])
2296
    disks = [md_sda_dev, md_sdb_dev]
2297
  elif template_name == "remote_raid1":
2298
    if len(secondary_nodes) != 1:
2299
      raise errors.ProgrammerError("Wrong template configuration")
2300
    remote_node = secondary_nodes[0]
2301
    drbd_sda_dev = _GenerateMDDRBDBranch(cfg, vgname,
2302
                                         primary_node, remote_node, disk_sz,
2303
                                         "%s-sda" % instance_name)
2304
    md_sda_dev = objects.Disk(dev_type="md_raid1", iv_name="sda",
2305
                              children = [drbd_sda_dev], size=disk_sz)
2306
    drbd_sdb_dev = _GenerateMDDRBDBranch(cfg, vgname,
2307
                                         primary_node, remote_node, swap_sz,
2308
                                         "%s-sdb" % instance_name)
2309
    md_sdb_dev = objects.Disk(dev_type="md_raid1", iv_name="sdb",
2310
                              children = [drbd_sdb_dev], size=swap_sz)
2311
    disks = [md_sda_dev, md_sdb_dev]
2312
  else:
2313
    raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
2314
  return disks
2315

    
2316

    
2317
def _CreateDisks(cfg, instance):
2318
  """Create all disks for an instance.
2319

2320
  This abstracts away some work from AddInstance.
2321

2322
  Args:
2323
    instance: the instance object
2324

2325
  Returns:
2326
    True or False showing the success of the creation process
2327

2328
  """
2329
  for device in instance.disks:
2330
    logger.Info("creating volume %s for instance %s" %
2331
              (device.iv_name, instance.name))
2332
    #HARDCODE
2333
    for secondary_node in instance.secondary_nodes:
2334
      if not _CreateBlockDevOnSecondary(cfg, secondary_node, device, False):
2335
        logger.Error("failed to create volume %s (%s) on secondary node %s!" %
2336
                     (device.iv_name, device, secondary_node))
2337
        return False
2338
    #HARDCODE
2339
    if not _CreateBlockDevOnPrimary(cfg, instance.primary_node, device):
2340
      logger.Error("failed to create volume %s on primary!" %
2341
                   device.iv_name)
2342
      return False
2343
  return True
2344

    
2345

    
2346
def _RemoveDisks(instance, cfg):
2347
  """Remove all disks for an instance.
2348

2349
  This abstracts away some work from `AddInstance()` and
2350
  `RemoveInstance()`. Note that in case some of the devices couldn't
2351
  be remove, the removal will continue with the other ones (compare
2352
  with `_CreateDisks()`).
2353

2354
  Args:
2355
    instance: the instance object
2356

2357
  Returns:
2358
    True or False showing the success of the removal proces
2359

2360
  """
2361
  logger.Info("removing block devices for instance %s" % instance.name)
2362

    
2363
  result = True
2364
  for device in instance.disks:
2365
    for node, disk in device.ComputeNodeTree(instance.primary_node):
2366
      cfg.SetDiskID(disk, node)
2367
      if not rpc.call_blockdev_remove(node, disk):
2368
        logger.Error("could not remove block device %s on node %s,"
2369
                     " continuing anyway" %
2370
                     (device.iv_name, node))
2371
        result = False
2372
  return result
2373

    
2374

    
2375
class LUCreateInstance(LogicalUnit):
2376
  """Create an instance.
2377

2378
  """
2379
  HPATH = "instance-add"
2380
  HTYPE = constants.HTYPE_INSTANCE
2381
  _OP_REQP = ["instance_name", "mem_size", "disk_size", "pnode",
2382
              "disk_template", "swap_size", "mode", "start", "vcpus",
2383
              "wait_for_sync"]
2384

    
2385
  def BuildHooksEnv(self):
2386
    """Build hooks env.
2387

2388
    This runs on master, primary and secondary nodes of the instance.
2389

2390
    """
2391
    env = {
2392
      "INSTANCE_NAME": self.op.instance_name,
2393
      "INSTANCE_PRIMARY": self.op.pnode,
2394
      "INSTANCE_SECONDARIES": " ".join(self.secondaries),
2395
      "DISK_TEMPLATE": self.op.disk_template,
2396
      "MEM_SIZE": self.op.mem_size,
2397
      "DISK_SIZE": self.op.disk_size,
2398
      "SWAP_SIZE": self.op.swap_size,
2399
      "VCPUS": self.op.vcpus,
2400
      "BRIDGE": self.op.bridge,
2401
      "INSTANCE_ADD_MODE": self.op.mode,
2402
      }
2403
    if self.op.mode == constants.INSTANCE_IMPORT:
2404
      env["SRC_NODE"] = self.op.src_node
2405
      env["SRC_PATH"] = self.op.src_path
2406
      env["SRC_IMAGE"] = self.src_image
2407
    if self.inst_ip:
2408
      env["INSTANCE_IP"] = self.inst_ip
2409

    
2410
    nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
2411
          self.secondaries)
2412
    return env, nl, nl
2413

    
2414

    
2415
  def CheckPrereq(self):
2416
    """Check prerequisites.
2417

2418
    """
2419
    if self.op.mode not in (constants.INSTANCE_CREATE,
2420
                            constants.INSTANCE_IMPORT):
2421
      raise errors.OpPrereqError, ("Invalid instance creation mode '%s'" %
2422
                                   self.op.mode)
2423

    
2424
    if self.op.mode == constants.INSTANCE_IMPORT:
2425
      src_node = getattr(self.op, "src_node", None)
2426
      src_path = getattr(self.op, "src_path", None)
2427
      if src_node is None or src_path is None:
2428
        raise errors.OpPrereqError, ("Importing an instance requires source"
2429
                                     " node and path options")
2430
      src_node_full = self.cfg.ExpandNodeName(src_node)
2431
      if src_node_full is None:
2432
        raise errors.OpPrereqError, ("Unknown source node '%s'" % src_node)
2433
      self.op.src_node = src_node = src_node_full
2434

    
2435
      if not os.path.isabs(src_path):
2436
        raise errors.OpPrereqError, ("The source path must be absolute")
2437

    
2438
      export_info = rpc.call_export_info(src_node, src_path)
2439

    
2440
      if not export_info:
2441
        raise errors.OpPrereqError, ("No export found in dir %s" % src_path)
2442

    
2443
      if not export_info.has_section(constants.INISECT_EXP):
2444
        raise errors.ProgrammerError, ("Corrupted export config")
2445

    
2446
      ei_version = export_info.get(constants.INISECT_EXP, 'version')
2447
      if (int(ei_version) != constants.EXPORT_VERSION):
2448
        raise errors.OpPrereqError, ("Wrong export version %s (wanted %d)" %
2449
                                     (ei_version, constants.EXPORT_VERSION))
2450

    
2451
      if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
2452
        raise errors.OpPrereqError, ("Can't import instance with more than"
2453
                                     " one data disk")
2454

    
2455
      # FIXME: are the old os-es, disk sizes, etc. useful?
2456
      self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
2457
      diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
2458
                                                         'disk0_dump'))
2459
      self.src_image = diskimage
2460
    else: # INSTANCE_CREATE
2461
      if getattr(self.op, "os_type", None) is None:
2462
        raise errors.OpPrereqError, ("No guest OS specified")
2463

    
2464
    # check primary node
2465
    pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
2466
    if pnode is None:
2467
      raise errors.OpPrereqError, ("Primary node '%s' is unknown" %
2468
                                   self.op.pnode)
2469
    self.op.pnode = pnode.name
2470
    self.pnode = pnode
2471
    self.secondaries = []
2472
    # disk template and mirror node verification
2473
    if self.op.disk_template not in constants.DISK_TEMPLATES:
2474
      raise errors.OpPrereqError, ("Invalid disk template name")
2475

    
2476
    if self.op.disk_template == constants.DT_REMOTE_RAID1:
2477
      if getattr(self.op, "snode", None) is None:
2478
        raise errors.OpPrereqError, ("The 'remote_raid1' disk template needs"
2479
                                     " a mirror node")
2480

    
2481
      snode_name = self.cfg.ExpandNodeName(self.op.snode)
2482
      if snode_name is None:
2483
        raise errors.OpPrereqError, ("Unknown secondary node '%s'" %
2484
                                     self.op.snode)
2485
      elif snode_name == pnode.name:
2486
        raise errors.OpPrereqError, ("The secondary node cannot be"
2487
                                     " the primary node.")
2488
      self.secondaries.append(snode_name)
2489

    
2490
    # Check lv size requirements
2491
    nodenames = [pnode.name] + self.secondaries
2492
    nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
2493

    
2494
    # Required free disk space as a function of disk and swap space
2495
    req_size_dict = {
2496
      constants.DT_DISKLESS: 0,
2497
      constants.DT_PLAIN: self.op.disk_size + self.op.swap_size,
2498
      constants.DT_LOCAL_RAID1: (self.op.disk_size + self.op.swap_size) * 2,
2499
      # 256 MB are added for drbd metadata, 128MB for each drbd device
2500
      constants.DT_REMOTE_RAID1: self.op.disk_size + self.op.swap_size + 256,
2501
    }
2502

    
2503
    if self.op.disk_template not in req_size_dict:
2504
      raise errors.ProgrammerError, ("Disk template '%s' size requirement"
2505
                                     " is unknown" %  self.op.disk_template)
2506

    
2507
    req_size = req_size_dict[self.op.disk_template]
2508

    
2509
    for node in nodenames:
2510
      info = nodeinfo.get(node, None)
2511
      if not info:
2512
        raise errors.OpPrereqError, ("Cannot get current information"
2513
                                     " from node '%s'" % nodeinfo)
2514
      if req_size > info['vg_free']:
2515
        raise errors.OpPrereqError, ("Not enough disk space on target node %s."
2516
                                     " %d MB available, %d MB required" %
2517
                                     (node, info['vg_free'], req_size))
2518

    
2519
    # os verification
2520
    os_obj = rpc.call_os_get([pnode.name], self.op.os_type)[pnode.name]
2521
    if not isinstance(os_obj, objects.OS):
2522
      raise errors.OpPrereqError, ("OS '%s' not in supported os list for"
2523
                                   " primary node"  % self.op.os_type)
2524

    
2525
    # instance verification
2526
    hostname1 = utils.LookupHostname(self.op.instance_name)
2527
    if not hostname1:
2528
      raise errors.OpPrereqError, ("Instance name '%s' not found in dns" %
2529
                                   self.op.instance_name)
2530

    
2531
    self.op.instance_name = instance_name = hostname1['hostname']
2532
    instance_list = self.cfg.GetInstanceList()
2533
    if instance_name in instance_list:
2534
      raise errors.OpPrereqError, ("Instance '%s' is already in the cluster" %
2535
                                   instance_name)
2536

    
2537
    ip = getattr(self.op, "ip", None)
2538
    if ip is None or ip.lower() == "none":
2539
      inst_ip = None
2540
    elif ip.lower() == "auto":
2541
      inst_ip = hostname1['ip']
2542
    else:
2543
      if not utils.IsValidIP(ip):
2544
        raise errors.OpPrereqError, ("given IP address '%s' doesn't look"
2545
                                     " like a valid IP" % ip)
2546
      inst_ip = ip
2547
    self.inst_ip = inst_ip
2548

    
2549
    command = ["fping", "-q", hostname1['ip']]
2550
    result = utils.RunCmd(command)
2551
    if not result.failed:
2552
      raise errors.OpPrereqError, ("IP %s of instance %s already in use" %
2553
                                   (hostname1['ip'], instance_name))
2554

    
2555
    # bridge verification
2556
    bridge = getattr(self.op, "bridge", None)
2557
    if bridge is None:
2558
      self.op.bridge = self.cfg.GetDefBridge()
2559
    else:
2560
      self.op.bridge = bridge
2561

    
2562
    if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
2563
      raise errors.OpPrereqError, ("target bridge '%s' does not exist on"
2564
                                   " destination node '%s'" %
2565
                                   (self.op.bridge, pnode.name))
2566

    
2567
    if self.op.start:
2568
      self.instance_status = 'up'
2569
    else:
2570
      self.instance_status = 'down'
2571

    
2572
  def Exec(self, feedback_fn):
2573
    """Create and add the instance to the cluster.
2574

2575
    """
2576
    instance = self.op.instance_name
2577
    pnode_name = self.pnode.name
2578

    
2579
    nic = objects.NIC(bridge=self.op.bridge, mac=self.cfg.GenerateMAC())
2580
    if self.inst_ip is not None:
2581
      nic.ip = self.inst_ip
2582

    
2583
    disks = _GenerateDiskTemplate(self.cfg, self.cfg.GetVGName(),
2584
                                  self.op.disk_template,
2585
                                  instance, pnode_name,
2586
                                  self.secondaries, self.op.disk_size,
2587
                                  self.op.swap_size)
2588

    
2589
    iobj = objects.Instance(name=instance, os=self.op.os_type,
2590
                            primary_node=pnode_name,
2591
                            memory=self.op.mem_size,
2592
                            vcpus=self.op.vcpus,
2593
                            nics=[nic], disks=disks,
2594
                            disk_template=self.op.disk_template,
2595
                            status=self.instance_status,
2596
                            )
2597

    
2598
    feedback_fn("* creating instance disks...")
2599
    if not _CreateDisks(self.cfg, iobj):
2600
      _RemoveDisks(iobj, self.cfg)
2601
      raise errors.OpExecError, ("Device creation failed, reverting...")
2602

    
2603
    feedback_fn("adding instance %s to cluster config" % instance)
2604

    
2605
    self.cfg.AddInstance(iobj)
2606

    
2607
    if self.op.wait_for_sync:
2608
      disk_abort = not _WaitForSync(self.cfg, iobj)
2609
    elif iobj.disk_template == "remote_raid1":
2610
      # make sure the disks are not degraded (still sync-ing is ok)
2611
      time.sleep(15)
2612
      feedback_fn("* checking mirrors status")
2613
      disk_abort = not _WaitForSync(self.cfg, iobj, oneshot=True)
2614
    else:
2615
      disk_abort = False
2616

    
2617
    if disk_abort:
2618
      _RemoveDisks(iobj, self.cfg)
2619
      self.cfg.RemoveInstance(iobj.name)
2620
      raise errors.OpExecError, ("There are some degraded disks for"
2621
                                      " this instance")
2622

    
2623
    feedback_fn("creating os for instance %s on node %s" %
2624
                (instance, pnode_name))
2625

    
2626
    if iobj.disk_template != constants.DT_DISKLESS:
2627
      if self.op.mode == constants.INSTANCE_CREATE:
2628
        feedback_fn("* running the instance OS create scripts...")
2629
        if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
2630
          raise errors.OpExecError, ("could not add os for instance %s"
2631
                                          " on node %s" %
2632
                                          (instance, pnode_name))
2633

    
2634
      elif self.op.mode == constants.INSTANCE_IMPORT:
2635
        feedback_fn("* running the instance OS import scripts...")
2636
        src_node = self.op.src_node
2637
        src_image = self.src_image
2638
        if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
2639
                                                src_node, src_image):
2640
          raise errors.OpExecError, ("Could not import os for instance"
2641
                                          " %s on node %s" %
2642
                                          (instance, pnode_name))
2643
      else:
2644
        # also checked in the prereq part
2645
        raise errors.ProgrammerError, ("Unknown OS initialization mode '%s'"
2646
                                       % self.op.mode)
2647

    
2648
    if self.op.start:
2649
      logger.Info("starting instance %s on node %s" % (instance, pnode_name))
2650
      feedback_fn("* starting instance...")
2651
      if not rpc.call_instance_start(pnode_name, iobj, None):
2652
        raise errors.OpExecError, ("Could not start instance")
2653

    
2654

    
2655
class LUConnectConsole(NoHooksLU):
2656
  """Connect to an instance's console.
2657

2658
  This is somewhat special in that it returns the command line that
2659
  you need to run on the master node in order to connect to the
2660
  console.
2661

2662
  """
2663
  _OP_REQP = ["instance_name"]
2664

    
2665
  def CheckPrereq(self):
2666
    """Check prerequisites.
2667

2668
    This checks that the instance is in the cluster.
2669

2670
    """
2671
    instance = self.cfg.GetInstanceInfo(
2672
      self.cfg.ExpandInstanceName(self.op.instance_name))
2673
    if instance is None:
2674
      raise errors.OpPrereqError, ("Instance '%s' not known" %
2675
                                   self.op.instance_name)
2676
    self.instance = instance
2677

    
2678
  def Exec(self, feedback_fn):
2679
    """Connect to the console of an instance
2680

2681
    """
2682
    instance = self.instance
2683
    node = instance.primary_node
2684

    
2685
    node_insts = rpc.call_instance_list([node])[node]
2686
    if node_insts is False:
2687
      raise errors.OpExecError, ("Can't connect to node %s." % node)
2688

    
2689
    if instance.name not in node_insts:
2690
      raise errors.OpExecError, ("Instance %s is not running." % instance.name)
2691

    
2692
    logger.Debug("connecting to console of %s on %s" % (instance.name, node))
2693

    
2694
    hyper = hypervisor.GetHypervisor()
2695
    console_cmd = hyper.GetShellCommandForConsole(instance.name)
2696
    return node, console_cmd
2697

    
2698

    
2699
class LUAddMDDRBDComponent(LogicalUnit):
2700
  """Adda new mirror member to an instance's disk.
2701

2702
  """
2703
  HPATH = "mirror-add"
2704
  HTYPE = constants.HTYPE_INSTANCE
2705
  _OP_REQP = ["instance_name", "remote_node", "disk_name"]
2706

    
2707
  def BuildHooksEnv(self):
2708
    """Build hooks env.
2709

2710
    This runs on the master, the primary and all the secondaries.
2711

2712
    """
2713
    env = {
2714
      "INSTANCE_NAME": self.op.instance_name,
2715
      "NEW_SECONDARY": self.op.remote_node,
2716
      "DISK_NAME": self.op.disk_name,
2717
      }
2718
    nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
2719
          self.op.remote_node,] + list(self.instance.secondary_nodes)
2720
    return env, nl, nl
2721

    
2722
  def CheckPrereq(self):
2723
    """Check prerequisites.
2724

2725
    This checks that the instance is in the cluster.
2726

2727
    """
2728
    instance = self.cfg.GetInstanceInfo(
2729
      self.cfg.ExpandInstanceName(self.op.instance_name))
2730
    if instance is None:
2731
      raise errors.OpPrereqError, ("Instance '%s' not known" %
2732
                                   self.op.instance_name)
2733
    self.instance = instance
2734

    
2735
    remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
2736
    if remote_node is None:
2737
      raise errors.OpPrereqError, ("Node '%s' not known" % self.op.remote_node)
2738
    self.remote_node = remote_node
2739

    
2740
    if remote_node == instance.primary_node:
2741
      raise errors.OpPrereqError, ("The specified node is the primary node of"
2742
                                   " the instance.")
2743

    
2744
    if instance.disk_template != constants.DT_REMOTE_RAID1:
2745
      raise errors.OpPrereqError, ("Instance's disk layout is not"
2746
                                   " remote_raid1.")
2747
    for disk in instance.disks:
2748
      if disk.iv_name == self.op.disk_name:
2749
        break
2750
    else:
2751
      raise errors.OpPrereqError, ("Can't find this device ('%s') in the"
2752
                                   " instance." % self.op.disk_name)
2753
    if len(disk.children) > 1:
2754
      raise errors.OpPrereqError, ("The device already has two slave"
2755
                                   " devices.\n"
2756
                                   "This would create a 3-disk raid1"
2757
                                   " which we don't allow.")
2758
    self.disk = disk
2759

    
2760
  def Exec(self, feedback_fn):
2761
    """Add the mirror component
2762

2763
    """
2764
    disk = self.disk
2765
    instance = self.instance
2766

    
2767
    remote_node = self.remote_node
2768
    new_drbd = _GenerateMDDRBDBranch(self.cfg, self.cfg.GetVGName(),
2769
                                     instance.primary_node, remote_node,
2770
                                     disk.size, "%s-%s" %
2771
                                     (instance.name, self.op.disk_name))
2772

    
2773
    logger.Info("adding new mirror component on secondary")
2774
    #HARDCODE
2775
    if not _CreateBlockDevOnSecondary(self.cfg, remote_node, new_drbd, False):
2776
      raise errors.OpExecError, ("Failed to create new component on secondary"
2777
                                 " node %s" % remote_node)
2778

    
2779
    logger.Info("adding new mirror component on primary")
2780
    #HARDCODE
2781
    if not _CreateBlockDevOnPrimary(self.cfg, instance.primary_node, new_drbd):
2782
      # remove secondary dev
2783
      self.cfg.SetDiskID(new_drbd, remote_node)
2784
      rpc.call_blockdev_remove(remote_node, new_drbd)
2785
      raise errors.OpExecError, ("Failed to create volume on primary")
2786

    
2787
    # the device exists now
2788
    # call the primary node to add the mirror to md
2789
    logger.Info("adding new mirror component to md")
2790
    if not rpc.call_blockdev_addchild(instance.primary_node,
2791
                                           disk, new_drbd):
2792
      logger.Error("Can't add mirror compoment to md!")
2793
      self.cfg.SetDiskID(new_drbd, remote_node)
2794
      if not rpc.call_blockdev_remove(remote_node, new_drbd):
2795
        logger.Error("Can't rollback on secondary")
2796
      self.cfg.SetDiskID(new_drbd, instance.primary_node)
2797
      if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
2798
        logger.Error("Can't rollback on primary")
2799
      raise errors.OpExecError, "Can't add mirror component to md array"
2800

    
2801
    disk.children.append(new_drbd)
2802

    
2803
    self.cfg.AddInstance(instance)
2804

    
2805
    _WaitForSync(self.cfg, instance)
2806

    
2807
    return 0
2808

    
2809

    
2810
class LURemoveMDDRBDComponent(LogicalUnit):
2811
  """Remove a component from a remote_raid1 disk.
2812

2813
  """
2814
  HPATH = "mirror-remove"
2815
  HTYPE = constants.HTYPE_INSTANCE
2816
  _OP_REQP = ["instance_name", "disk_name", "disk_id"]
2817

    
2818
  def BuildHooksEnv(self):
2819
    """Build hooks env.
2820

2821
    This runs on the master, the primary and all the secondaries.
2822

2823
    """
2824
    env = {
2825
      "INSTANCE_NAME": self.op.instance_name,
2826
      "DISK_NAME": self.op.disk_name,
2827
      "DISK_ID": self.op.disk_id,
2828
      "OLD_SECONDARY": self.old_secondary,
2829
      }
2830
    nl = [self.sstore.GetMasterNode(),
2831
          self.instance.primary_node] + list(self.instance.secondary_nodes)
2832
    return env, nl, nl
2833

    
2834
  def CheckPrereq(self):
2835
    """Check prerequisites.
2836

2837
    This checks that the instance is in the cluster.
2838

2839
    """
2840
    instance = self.cfg.GetInstanceInfo(
2841
      self.cfg.ExpandInstanceName(self.op.instance_name))
2842
    if instance is None:
2843
      raise errors.OpPrereqError, ("Instance '%s' not known" %
2844
                                   self.op.instance_name)
2845
    self.instance = instance
2846

    
2847
    if instance.disk_template != constants.DT_REMOTE_RAID1:
2848
      raise errors.OpPrereqError, ("Instance's disk layout is not"
2849
                                   " remote_raid1.")
2850
    for disk in instance.disks:
2851
      if disk.iv_name == self.op.disk_name:
2852
        break
2853
    else:
2854
      raise errors.OpPrereqError, ("Can't find this device ('%s') in the"
2855
                                   " instance." % self.op.disk_name)
2856
    for child in disk.children:
2857
      if child.dev_type == "drbd" and child.logical_id[2] == self.op.disk_id:
2858
        break
2859
    else:
2860
      raise errors.OpPrereqError, ("Can't find the device with this port.")
2861

    
2862
    if len(disk.children) < 2:
2863
      raise errors.OpPrereqError, ("Cannot remove the last component from"
2864
                                   " a mirror.")
2865
    self.disk = disk
2866
    self.child = child
2867
    if self.child.logical_id[0] == instance.primary_node:
2868
      oid = 1
2869
    else:
2870
      oid = 0
2871
    self.old_secondary = self.child.logical_id[oid]
2872

    
2873
  def Exec(self, feedback_fn):
2874
    """Remove the mirror component
2875

2876
    """
2877
    instance = self.instance
2878
    disk = self.disk
2879
    child = self.child
2880
    logger.Info("remove mirror component")
2881
    self.cfg.SetDiskID(disk, instance.primary_node)
2882
    if not rpc.call_blockdev_removechild(instance.primary_node,
2883
                                              disk, child):
2884
      raise errors.OpExecError, ("Can't remove child from mirror.")
2885

    
2886
    for node in child.logical_id[:2]:
2887
      self.cfg.SetDiskID(child, node)
2888
      if not rpc.call_blockdev_remove(node, child):
2889
        logger.Error("Warning: failed to remove device from node %s,"
2890
                     " continuing operation." % node)
2891

    
2892
    disk.children.remove(child)
2893
    self.cfg.AddInstance(instance)
2894

    
2895

    
2896
class LUReplaceDisks(LogicalUnit):
2897
  """Replace the disks of an instance.
2898

2899
  """
2900
  HPATH = "mirrors-replace"
2901
  HTYPE = constants.HTYPE_INSTANCE
2902
  _OP_REQP = ["instance_name"]
2903

    
2904
  def BuildHooksEnv(self):
2905
    """Build hooks env.
2906

2907
    This runs on the master, the primary and all the secondaries.
2908

2909
    """
2910
    env = {
2911
      "INSTANCE_NAME": self.op.instance_name,
2912
      "NEW_SECONDARY": self.op.remote_node,
2913
      "OLD_SECONDARY": self.instance.secondary_nodes[0],
2914
      }
2915
    nl = [self.sstore.GetMasterNode(),
2916
          self.instance.primary_node] + list(self.instance.secondary_nodes)
2917
    return env, nl, nl
2918

    
2919
  def CheckPrereq(self):
2920
    """Check prerequisites.
2921

2922
    This checks that the instance is in the cluster.
2923

2924
    """
2925
    instance = self.cfg.GetInstanceInfo(
2926
      self.cfg.ExpandInstanceName(self.op.instance_name))
2927
    if instance is None:
2928
      raise errors.OpPrereqError, ("Instance '%s' not known" %
2929
                                   self.op.instance_name)
2930
    self.instance = instance
2931

    
2932
    if instance.disk_template != constants.DT_REMOTE_RAID1:
2933
      raise errors.OpPrereqError, ("Instance's disk layout is not"
2934
                                   " remote_raid1.")
2935

    
2936
    if len(instance.secondary_nodes) != 1:
2937
      raise errors.OpPrereqError, ("The instance has a strange layout,"
2938
                                   " expected one secondary but found %d" %
2939
                                   len(instance.secondary_nodes))
2940

    
2941
    remote_node = getattr(self.op, "remote_node", None)
2942
    if remote_node is None:
2943
      remote_node = instance.secondary_nodes[0]
2944
    else:
2945
      remote_node = self.cfg.ExpandNodeName(remote_node)
2946
      if remote_node is None:
2947
        raise errors.OpPrereqError, ("Node '%s' not known" %
2948
                                     self.op.remote_node)
2949
    if remote_node == instance.primary_node:
2950
      raise errors.OpPrereqError, ("The specified node is the primary node of"
2951
                                   " the instance.")
2952
    self.op.remote_node = remote_node
2953

    
2954
  def Exec(self, feedback_fn):
2955
    """Replace the disks of an instance.
2956

2957
    """
2958
    instance = self.instance
2959
    iv_names = {}
2960
    # start of work
2961
    remote_node = self.op.remote_node
2962
    cfg = self.cfg
2963
    vgname = cfg.GetVGName()
2964
    for dev in instance.disks:
2965
      size = dev.size
2966
      new_drbd = _GenerateMDDRBDBranch(cfg, vgname, instance.primary_node,
2967
                                       remote_node, size,
2968
                                       "%s-%s" % (instance.name, dev.iv_name))
2969
      iv_names[dev.iv_name] = (dev, dev.children[0], new_drbd)
2970
      logger.Info("adding new mirror component on secondary for %s" %
2971
                  dev.iv_name)
2972
      #HARDCODE
2973
      if not _CreateBlockDevOnSecondary(cfg, remote_node, new_drbd, False):
2974
        raise errors.OpExecError, ("Failed to create new component on"
2975
                                   " secondary node %s\n"
2976
                                   "Full abort, cleanup manually!" %
2977
                                   remote_node)
2978

    
2979
      logger.Info("adding new mirror component on primary")
2980
      #HARDCODE
2981
      if not _CreateBlockDevOnPrimary(cfg, instance.primary_node, new_drbd):
2982
        # remove secondary dev
2983
        cfg.SetDiskID(new_drbd, remote_node)
2984
        rpc.call_blockdev_remove(remote_node, new_drbd)
2985
        raise errors.OpExecError("Failed to create volume on primary!\n"
2986
                                 "Full abort, cleanup manually!!")
2987

    
2988
      # the device exists now
2989
      # call the primary node to add the mirror to md
2990
      logger.Info("adding new mirror component to md")
2991
      if not rpc.call_blockdev_addchild(instance.primary_node, dev,
2992
                                        new_drbd):
2993
        logger.Error("Can't add mirror compoment to md!")
2994
        cfg.SetDiskID(new_drbd, remote_node)
2995
        if not rpc.call_blockdev_remove(remote_node, new_drbd):
2996
          logger.Error("Can't rollback on secondary")
2997
        cfg.SetDiskID(new_drbd, instance.primary_node)
2998
        if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
2999
          logger.Error("Can't rollback on primary")
3000
        raise errors.OpExecError, ("Full abort, cleanup manually!!")
3001

    
3002
      dev.children.append(new_drbd)
3003
      cfg.AddInstance(instance)
3004

    
3005
    # this can fail as the old devices are degraded and _WaitForSync
3006
    # does a combined result over all disks, so we don't check its
3007
    # return value
3008
    _WaitForSync(cfg, instance, unlock=True)
3009

    
3010
    # so check manually all the devices
3011
    for name in iv_names:
3012
      dev, child, new_drbd = iv_names[name]
3013
      cfg.SetDiskID(dev, instance.primary_node)
3014
      is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3015
      if is_degr:
3016
        raise errors.OpExecError, ("MD device %s is degraded!" % name)
3017
      cfg.SetDiskID(new_drbd, instance.primary_node)
3018
      is_degr = rpc.call_blockdev_find(instance.primary_node, new_drbd)[5]
3019
      if is_degr:
3020
        raise errors.OpExecError, ("New drbd device %s is degraded!" % name)
3021

    
3022
    for name in iv_names:
3023
      dev, child, new_drbd = iv_names[name]
3024
      logger.Info("remove mirror %s component" % name)
3025
      cfg.SetDiskID(dev, instance.primary_node)
3026
      if not rpc.call_blockdev_removechild(instance.primary_node,
3027
                                                dev, child):
3028
        logger.Error("Can't remove child from mirror, aborting"
3029
                     " *this device cleanup*.\nYou need to cleanup manually!!")
3030
        continue
3031

    
3032
      for node in child.logical_id[:2]:
3033
        logger.Info("remove child device on %s" % node)
3034
        cfg.SetDiskID(child, node)
3035
        if not rpc.call_blockdev_remove(node, child):
3036
          logger.Error("Warning: failed to remove device from node %s,"
3037
                       " continuing operation." % node)
3038

    
3039
      dev.children.remove(child)
3040

    
3041
      cfg.AddInstance(instance)
3042

    
3043

    
3044
class LUQueryInstanceData(NoHooksLU):
3045
  """Query runtime instance data.
3046

3047
  """
3048
  _OP_REQP = ["instances"]
3049

    
3050
  def CheckPrereq(self):
3051
    """Check prerequisites.
3052

3053
    This only checks the optional instance list against the existing names.
3054

3055
    """
3056
    if not isinstance(self.op.instances, list):
3057
      raise errors.OpPrereqError, "Invalid argument type 'instances'"
3058
    if self.op.instances:
3059
      self.wanted_instances = []
3060
      names = self.op.instances
3061
      for name in names:
3062
        instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
3063
        if instance is None:
3064
          raise errors.OpPrereqError, ("No such instance name '%s'" % name)
3065
      self.wanted_instances.append(instance)
3066
    else:
3067
      self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
3068
                               in self.cfg.GetInstanceList()]
3069
    return
3070

    
3071

    
3072
  def _ComputeDiskStatus(self, instance, snode, dev):
3073
    """Compute block device status.
3074

3075
    """
3076
    self.cfg.SetDiskID(dev, instance.primary_node)
3077
    dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
3078
    if dev.dev_type == "drbd":
3079
      # we change the snode then (otherwise we use the one passed in)
3080
      if dev.logical_id[0] == instance.primary_node:
3081
        snode = dev.logical_id[1]
3082
      else:
3083
        snode = dev.logical_id[0]
3084

    
3085
    if snode:
3086
      self.cfg.SetDiskID(dev, snode)
3087
      dev_sstatus = rpc.call_blockdev_find(snode, dev)
3088
    else:
3089
      dev_sstatus = None
3090

    
3091
    if dev.children:
3092
      dev_children = [self._ComputeDiskStatus(instance, snode, child)
3093
                      for child in dev.children]
3094
    else:
3095
      dev_children = []
3096

    
3097
    data = {
3098
      "iv_name": dev.iv_name,
3099
      "dev_type": dev.dev_type,
3100
      "logical_id": dev.logical_id,
3101
      "physical_id": dev.physical_id,
3102
      "pstatus": dev_pstatus,
3103
      "sstatus": dev_sstatus,
3104
      "children": dev_children,
3105
      }
3106

    
3107
    return data
3108

    
3109
  def Exec(self, feedback_fn):
3110
    """Gather and return data"""
3111
    result = {}
3112
    for instance in self.wanted_instances:
3113
      remote_info = rpc.call_instance_info(instance.primary_node,
3114
                                                instance.name)
3115
      if remote_info and "state" in remote_info:
3116
        remote_state = "up"
3117
      else:
3118
        remote_state = "down"
3119
      if instance.status == "down":
3120
        config_state = "down"
3121
      else:
3122
        config_state = "up"
3123

    
3124
      disks = [self._ComputeDiskStatus(instance, None, device)
3125
               for device in instance.disks]
3126

    
3127
      idict = {
3128
        "name": instance.name,
3129
        "config_state": config_state,
3130
        "run_state": remote_state,
3131
        "pnode": instance.primary_node,
3132
        "snodes": instance.secondary_nodes,
3133
        "os": instance.os,
3134
        "memory": instance.memory,
3135
        "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
3136
        "disks": disks,
3137
        }
3138

    
3139
      result[instance.name] = idict
3140

    
3141
    return result
3142

    
3143

    
3144
class LUQueryNodeData(NoHooksLU):
3145
  """Logical unit for querying node data.
3146

3147
  """
3148
  _OP_REQP = ["nodes"]
3149

    
3150
  def CheckPrereq(self):
3151
    """Check prerequisites.
3152

3153
    This only checks the optional node list against the existing names.
3154

3155
    """
3156
    self.wanted_nodes = _GetWantedNodes(self, self.op.nodes)
3157

    
3158
  def Exec(self, feedback_fn):
3159
    """Compute and return the list of nodes.
3160

3161
    """
3162
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
3163
             in self.cfg.GetInstanceList()]
3164
    result = []
3165
    for node in self.wanted_nodes:
3166
      result.append((node.name, node.primary_ip, node.secondary_ip,
3167
                     [inst.name for inst in ilist
3168
                      if inst.primary_node == node.name],
3169
                     [inst.name for inst in ilist
3170
                      if node.name in inst.secondary_nodes],
3171
                     ))
3172
    return result
3173

    
3174

    
3175
class LUSetInstanceParms(LogicalUnit):
3176
  """Modifies an instances's parameters.
3177

3178
  """
3179
  HPATH = "instance-modify"
3180
  HTYPE = constants.HTYPE_INSTANCE
3181
  _OP_REQP = ["instance_name"]
3182

    
3183
  def BuildHooksEnv(self):
3184
    """Build hooks env.
3185

3186
    This runs on the master, primary and secondaries.
3187

3188
    """
3189
    env = {
3190
      "INSTANCE_NAME": self.op.instance_name,
3191
      }
3192
    if self.mem:
3193
      env["MEM_SIZE"] = self.mem
3194
    if self.vcpus:
3195
      env["VCPUS"] = self.vcpus
3196
    if self.do_ip:
3197
      env["INSTANCE_IP"] = self.ip
3198
    if self.bridge:
3199
      env["BRIDGE"] = self.bridge
3200

    
3201
    nl = [self.sstore.GetMasterNode(),
3202
          self.instance.primary_node] + list(self.instance.secondary_nodes)
3203

    
3204
    return env, nl, nl
3205

    
3206
  def CheckPrereq(self):
3207
    """Check prerequisites.
3208

3209
    This only checks the instance list against the existing names.
3210

3211
    """
3212
    self.mem = getattr(self.op, "mem", None)
3213
    self.vcpus = getattr(self.op, "vcpus", None)
3214
    self.ip = getattr(self.op, "ip", None)
3215
    self.bridge = getattr(self.op, "bridge", None)
3216
    if [self.mem, self.vcpus, self.ip, self.bridge].count(None) == 4:
3217
      raise errors.OpPrereqError, ("No changes submitted")
3218
    if self.mem is not None:
3219
      try:
3220
        self.mem = int(self.mem)
3221
      except ValueError, err:
3222
        raise errors.OpPrereqError, ("Invalid memory size: %s" % str(err))
3223
    if self.vcpus is not None:
3224
      try:
3225
        self.vcpus = int(self.vcpus)
3226
      except ValueError, err:
3227
        raise errors.OpPrereqError, ("Invalid vcpus number: %s" % str(err))
3228
    if self.ip is not None:
3229
      self.do_ip = True
3230
      if self.ip.lower() == "none":
3231
        self.ip = None
3232
      else:
3233
        if not utils.IsValidIP(self.ip):
3234
          raise errors.OpPrereqError, ("Invalid IP address '%s'." % self.ip)
3235
    else:
3236
      self.do_ip = False
3237

    
3238
    instance = self.cfg.GetInstanceInfo(
3239
      self.cfg.ExpandInstanceName(self.op.instance_name))
3240
    if instance is None:
3241
      raise errors.OpPrereqError, ("No such instance name '%s'" %
3242
                                   self.op.instance_name)
3243
    self.op.instance_name = instance.name
3244
    self.instance = instance
3245
    return
3246

    
3247
  def Exec(self, feedback_fn):
3248
    """Modifies an instance.
3249

3250
    All parameters take effect only at the next restart of the instance.
3251
    """
3252
    result = []
3253
    instance = self.instance
3254
    if self.mem:
3255
      instance.memory = self.mem
3256
      result.append(("mem", self.mem))
3257
    if self.vcpus:
3258
      instance.vcpus = self.vcpus
3259
      result.append(("vcpus",  self.vcpus))
3260
    if self.do_ip:
3261
      instance.nics[0].ip = self.ip
3262
      result.append(("ip", self.ip))
3263
    if self.bridge:
3264
      instance.nics[0].bridge = self.bridge
3265
      result.append(("bridge", self.bridge))
3266

    
3267
    self.cfg.AddInstance(instance)
3268

    
3269
    return result
3270

    
3271

    
3272
class LUQueryExports(NoHooksLU):
3273
  """Query the exports list
3274

3275
  """
3276
  _OP_REQP = []
3277

    
3278
  def CheckPrereq(self):
3279
    """Check that the nodelist contains only existing nodes.
3280

3281
    """
3282
    self.nodes = _GetWantedNodes(self, getattr(self.op, "nodes", None))
3283

    
3284
  def Exec(self, feedback_fn):
3285
    """Compute the list of all the exported system images.
3286

3287
    Returns:
3288
      a dictionary with the structure node->(export-list)
3289
      where export-list is a list of the instances exported on
3290
      that node.
3291

3292
    """
3293
    return rpc.call_export_list([node.name for node in self.nodes])
3294

    
3295

    
3296
class LUExportInstance(LogicalUnit):
3297
  """Export an instance to an image in the cluster.
3298

3299
  """
3300
  HPATH = "instance-export"
3301
  HTYPE = constants.HTYPE_INSTANCE
3302
  _OP_REQP = ["instance_name", "target_node", "shutdown"]
3303

    
3304
  def BuildHooksEnv(self):
3305
    """Build hooks env.
3306

3307
    This will run on the master, primary node and target node.
3308

3309
    """
3310
    env = {
3311
      "INSTANCE_NAME": self.op.instance_name,
3312
      "EXPORT_NODE": self.op.target_node,
3313
      "EXPORT_DO_SHUTDOWN": self.op.shutdown,
3314
      }
3315
    nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
3316
          self.op.target_node]
3317
    return env, nl, nl
3318

    
3319
  def CheckPrereq(self):
3320
    """Check prerequisites.
3321

3322
    This checks that the instance name is a valid one.
3323

3324
    """
3325
    instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
3326
    self.instance = self.cfg.GetInstanceInfo(instance_name)
3327
    if self.instance is None:
3328
      raise errors.OpPrereqError, ("Instance '%s' not found" %
3329
                                   self.op.instance_name)
3330

    
3331
    # node verification
3332
    dst_node_short = self.cfg.ExpandNodeName(self.op.target_node)
3333
    self.dst_node = self.cfg.GetNodeInfo(dst_node_short)
3334

    
3335
    if self.dst_node is None:
3336
      raise errors.OpPrereqError, ("Destination node '%s' is unknown." %
3337
                                   self.op.target_node)
3338
    self.op.target_node = self.dst_node.name
3339

    
3340
  def Exec(self, feedback_fn):
3341
    """Export an instance to an image in the cluster.
3342

3343
    """
3344
    instance = self.instance
3345
    dst_node = self.dst_node
3346
    src_node = instance.primary_node
3347
    # shutdown the instance, unless requested not to do so
3348
    if self.op.shutdown:
3349
      op = opcodes.OpShutdownInstance(instance_name=instance.name)
3350
      self.processor.ChainOpCode(op, feedback_fn)
3351

    
3352
    vgname = self.cfg.GetVGName()
3353

    
3354
    snap_disks = []
3355

    
3356
    try:
3357
      for disk in instance.disks:
3358
        if disk.iv_name == "sda":
3359
          # new_dev_name will be a snapshot of an lvm leaf of the one we passed
3360
          new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
3361

    
3362
          if not new_dev_name:
3363
            logger.Error("could not snapshot block device %s on node %s" %
3364
                         (disk.logical_id[1], src_node))
3365
          else:
3366
            new_dev = objects.Disk(dev_type="lvm", size=disk.size,
3367
                                      logical_id=(vgname, new_dev_name),
3368
                                      physical_id=(vgname, new_dev_name),
3369
                                      iv_name=disk.iv_name)
3370
            snap_disks.append(new_dev)
3371

    
3372
    finally:
3373
      if self.op.shutdown:
3374
        op = opcodes.OpStartupInstance(instance_name=instance.name,
3375
                                       force=False)
3376
        self.processor.ChainOpCode(op, feedback_fn)
3377

    
3378
    # TODO: check for size
3379

    
3380
    for dev in snap_disks:
3381
      if not rpc.call_snapshot_export(src_node, dev, dst_node.name,
3382
                                           instance):
3383
        logger.Error("could not export block device %s from node"
3384
                     " %s to node %s" %
3385
                     (dev.logical_id[1], src_node, dst_node.name))
3386
      if not rpc.call_blockdev_remove(src_node, dev):
3387
        logger.Error("could not remove snapshot block device %s from"
3388
                     " node %s" % (dev.logical_id[1], src_node))
3389

    
3390
    if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
3391
      logger.Error("could not finalize export for instance %s on node %s" %
3392
                   (instance.name, dst_node.name))
3393

    
3394
    nodelist = self.cfg.GetNodeList()
3395
    nodelist.remove(dst_node.name)
3396

    
3397
    # on one-node clusters nodelist will be empty after the removal
3398
    # if we proceed the backup would be removed because OpQueryExports
3399
    # substitutes an empty list with the full cluster node list.
3400
    if nodelist:
3401
      op = opcodes.OpQueryExports(nodes=nodelist)
3402
      exportlist = self.processor.ChainOpCode(op, feedback_fn)
3403
      for node in exportlist:
3404
        if instance.name in exportlist[node]:
3405
          if not rpc.call_export_remove(node, instance.name):
3406
            logger.Error("could not remove older export for instance %s"
3407
                         " on node %s" % (instance.name, node))