Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ 59322403

History | View | Annotate | Download (109.2 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0613,W0201
25

    
26
import os
27
import os.path
28
import sha
29
import socket
30
import time
31
import tempfile
32
import re
33
import platform
34

    
35
from ganeti import rpc
36
from ganeti import ssh
37
from ganeti import logger
38
from ganeti import utils
39
from ganeti import errors
40
from ganeti import hypervisor
41
from ganeti import config
42
from ganeti import constants
43
from ganeti import objects
44
from ganeti import opcodes
45
from ganeti import ssconf
46

    
47
class LogicalUnit(object):
48
  """Logical Unit base class..
49

50
  Subclasses must follow these rules:
51
    - implement CheckPrereq which also fills in the opcode instance
52
      with all the fields (even if as None)
53
    - implement Exec
54
    - implement BuildHooksEnv
55
    - redefine HPATH and HTYPE
56
    - optionally redefine their run requirements (REQ_CLUSTER,
57
      REQ_MASTER); note that all commands require root permissions
58

59
  """
60
  HPATH = None
61
  HTYPE = None
62
  _OP_REQP = []
63
  REQ_CLUSTER = True
64
  REQ_MASTER = True
65

    
66
  def __init__(self, processor, op, cfg, sstore):
67
    """Constructor for LogicalUnit.
68

69
    This needs to be overriden in derived classes in order to check op
70
    validity.
71

72
    """
73
    self.processor = processor
74
    self.op = op
75
    self.cfg = cfg
76
    self.sstore = sstore
77
    for attr_name in self._OP_REQP:
78
      attr_val = getattr(op, attr_name, None)
79
      if attr_val is None:
80
        raise errors.OpPrereqError, ("Required parameter '%s' missing" %
81
                                     attr_name)
82
    if self.REQ_CLUSTER:
83
      if not cfg.IsCluster():
84
        raise errors.OpPrereqError, ("Cluster not initialized yet,"
85
                                     " use 'gnt-cluster init' first.")
86
      if self.REQ_MASTER:
87
        master = sstore.GetMasterNode()
88
        if master != socket.gethostname():
89
          raise errors.OpPrereqError, ("Commands must be run on the master"
90
                                       " node %s" % master)
91

    
92
  def CheckPrereq(self):
93
    """Check prerequisites for this LU.
94

95
    This method should check that the prerequisites for the execution
96
    of this LU are fulfilled. It can do internode communication, but
97
    it should be idempotent - no cluster or system changes are
98
    allowed.
99

100
    The method should raise errors.OpPrereqError in case something is
101
    not fulfilled. Its return value is ignored.
102

103
    This method should also update all the parameters of the opcode to
104
    their canonical form; e.g. a short node name must be fully
105
    expanded after this method has successfully completed (so that
106
    hooks, logging, etc. work correctly).
107

108
    """
109
    raise NotImplementedError
110

    
111
  def Exec(self, feedback_fn):
112
    """Execute the LU.
113

114
    This method should implement the actual work. It should raise
115
    errors.OpExecError for failures that are somewhat dealt with in
116
    code, or expected.
117

118
    """
119
    raise NotImplementedError
120

    
121
  def BuildHooksEnv(self):
122
    """Build hooks environment for this LU.
123

124
    This method should return a three-node tuple consisting of: a dict
125
    containing the environment that will be used for running the
126
    specific hook for this LU, a list of node names on which the hook
127
    should run before the execution, and a list of node names on which
128
    the hook should run after the execution.
129

130
    The keys of the dict must not have 'GANETI_' prefixed as this will
131
    be handled in the hooks runner. Also note additional keys will be
132
    added by the hooks runner. If the LU doesn't define any
133
    environment, an empty dict (and not None) should be returned.
134

135
    As for the node lists, the master should not be included in the
136
    them, as it will be added by the hooks runner in case this LU
137
    requires a cluster to run on (otherwise we don't have a node
138
    list). No nodes should be returned as an empty list (and not
139
    None).
140

141
    Note that if the HPATH for a LU class is None, this function will
142
    not be called.
143

144
    """
145
    raise NotImplementedError
146

    
147

    
148
class NoHooksLU(LogicalUnit):
149
  """Simple LU which runs no hooks.
150

151
  This LU is intended as a parent for other LogicalUnits which will
152
  run no hooks, in order to reduce duplicate code.
153

154
  """
155
  HPATH = None
156
  HTYPE = None
157

    
158
  def BuildHooksEnv(self):
159
    """Build hooks env.
160

161
    This is a no-op, since we don't run hooks.
162

163
    """
164
    return
165

    
166

    
167
def _GetWantedNodes(lu, nodes):
168
  """Returns list of checked and expanded nodes.
169

170
  Args:
171
    nodes: List of nodes (strings) or None for all
172

173
  """
174
  if nodes is not None and not isinstance(nodes, list):
175
    raise errors.OpPrereqError, "Invalid argument type 'nodes'"
176

    
177
  if nodes:
178
    wanted_nodes = []
179

    
180
    for name in nodes:
181
      node = lu.cfg.GetNodeInfo(lu.cfg.ExpandNodeName(name))
182
      if node is None:
183
        raise errors.OpPrereqError, ("No such node name '%s'" % name)
184
    wanted_nodes.append(node)
185

    
186
    return wanted_nodes
187
  else:
188
    return [lu.cfg.GetNodeInfo(name) for name in lu.cfg.GetNodeList()]
189

    
190

    
191
def _CheckOutputFields(static, dynamic, selected):
192
  """Checks whether all selected fields are valid.
193

194
  Args:
195
    static: Static fields
196
    dynamic: Dynamic fields
197

198
  """
199
  static_fields = frozenset(static)
200
  dynamic_fields = frozenset(dynamic)
201

    
202
  all_fields = static_fields | dynamic_fields
203

    
204
  if not all_fields.issuperset(selected):
205
    raise errors.OpPrereqError, ("Unknown output fields selected: %s"
206
                                 % ",".join(frozenset(selected).
207
                                            difference(all_fields)))
208

    
209

    
210
def _UpdateEtcHosts(fullnode, ip):
211
  """Ensure a node has a correct entry in /etc/hosts.
212

213
  Args:
214
    fullnode - Fully qualified domain name of host. (str)
215
    ip       - IPv4 address of host (str)
216

217
  """
218
  node = fullnode.split(".", 1)[0]
219

    
220
  f = open('/etc/hosts', 'r+')
221

    
222
  inthere = False
223

    
224
  save_lines = []
225
  add_lines = []
226
  removed = False
227

    
228
  while True:
229
    rawline = f.readline()
230

    
231
    if not rawline:
232
      # End of file
233
      break
234

    
235
    line = rawline.split('\n')[0]
236

    
237
    # Strip off comments
238
    line = line.split('#')[0]
239

    
240
    if not line:
241
      # Entire line was comment, skip
242
      save_lines.append(rawline)
243
      continue
244

    
245
    fields = line.split()
246

    
247
    haveall = True
248
    havesome = False
249
    for spec in [ ip, fullnode, node ]:
250
      if spec not in fields:
251
        haveall = False
252
      if spec in fields:
253
        havesome = True
254

    
255
    if haveall:
256
      inthere = True
257
      save_lines.append(rawline)
258
      continue
259

    
260
    if havesome and not haveall:
261
      # Line (old, or manual?) which is missing some.  Remove.
262
      removed = True
263
      continue
264

    
265
    save_lines.append(rawline)
266

    
267
  if not inthere:
268
    add_lines.append('%s\t%s %s\n' % (ip, fullnode, node))
269

    
270
  if removed:
271
    if add_lines:
272
      save_lines = save_lines + add_lines
273

    
274
    # We removed a line, write a new file and replace old.
275
    fd, tmpname = tempfile.mkstemp('tmp', 'hosts_', '/etc')
276
    newfile = os.fdopen(fd, 'w')
277
    newfile.write(''.join(save_lines))
278
    newfile.close()
279
    os.rename(tmpname, '/etc/hosts')
280

    
281
  elif add_lines:
282
    # Simply appending a new line will do the trick.
283
    f.seek(0, 2)
284
    for add in add_lines:
285
      f.write(add)
286

    
287
  f.close()
288

    
289

    
290
def _UpdateKnownHosts(fullnode, ip, pubkey):
291
  """Ensure a node has a correct known_hosts entry.
292

293
  Args:
294
    fullnode - Fully qualified domain name of host. (str)
295
    ip       - IPv4 address of host (str)
296
    pubkey   - the public key of the cluster
297

298
  """
299
  if os.path.exists('/etc/ssh/ssh_known_hosts'):
300
    f = open('/etc/ssh/ssh_known_hosts', 'r+')
301
  else:
302
    f = open('/etc/ssh/ssh_known_hosts', 'w+')
303

    
304
  inthere = False
305

    
306
  save_lines = []
307
  add_lines = []
308
  removed = False
309

    
310
  while True:
311
    rawline = f.readline()
312
    logger.Debug('read %s' % (repr(rawline),))
313

    
314
    if not rawline:
315
      # End of file
316
      break
317

    
318
    line = rawline.split('\n')[0]
319

    
320
    parts = line.split(' ')
321
    fields = parts[0].split(',')
322
    key = parts[2]
323

    
324
    haveall = True
325
    havesome = False
326
    for spec in [ ip, fullnode ]:
327
      if spec not in fields:
328
        haveall = False
329
      if spec in fields:
330
        havesome = True
331

    
332
    logger.Debug("key, pubkey = %s." % (repr((key, pubkey)),))
333
    if haveall and key == pubkey:
334
      inthere = True
335
      save_lines.append(rawline)
336
      logger.Debug("Keeping known_hosts '%s'." % (repr(rawline),))
337
      continue
338

    
339
    if havesome and (not haveall or key != pubkey):
340
      removed = True
341
      logger.Debug("Discarding known_hosts '%s'." % (repr(rawline),))
342
      continue
343

    
344
    save_lines.append(rawline)
345

    
346
  if not inthere:
347
    add_lines.append('%s,%s ssh-rsa %s\n' % (fullnode, ip, pubkey))
348
    logger.Debug("Adding known_hosts '%s'." % (repr(add_lines[-1]),))
349

    
350
  if removed:
351
    save_lines = save_lines + add_lines
352

    
353
    # Write a new file and replace old.
354
    fd, tmpname = tempfile.mkstemp('tmp', 'ssh_known_hosts_', '/etc/ssh')
355
    newfile = os.fdopen(fd, 'w')
356
    newfile.write(''.join(save_lines))
357
    newfile.close()
358
    logger.Debug("Wrote new known_hosts.")
359
    os.rename(tmpname, '/etc/ssh/ssh_known_hosts')
360

    
361
  elif add_lines:
362
    # Simply appending a new line will do the trick.
363
    f.seek(0, 2)
364
    for add in add_lines:
365
      f.write(add)
366

    
367
  f.close()
368

    
369

    
370
def _HasValidVG(vglist, vgname):
371
  """Checks if the volume group list is valid.
372

373
  A non-None return value means there's an error, and the return value
374
  is the error message.
375

376
  """
377
  vgsize = vglist.get(vgname, None)
378
  if vgsize is None:
379
    return "volume group '%s' missing" % vgname
380
  elif vgsize < 20480:
381
    return ("volume group '%s' too small (20480MiB required, %dMib found)" %
382
            (vgname, vgsize))
383
  return None
384

    
385

    
386
def _InitSSHSetup(node):
387
  """Setup the SSH configuration for the cluster.
388

389

390
  This generates a dsa keypair for root, adds the pub key to the
391
  permitted hosts and adds the hostkey to its own known hosts.
392

393
  Args:
394
    node: the name of this host as a fqdn
395

396
  """
397
  utils.RemoveFile('/root/.ssh/known_hosts')
398

    
399
  if os.path.exists('/root/.ssh/id_dsa'):
400
    utils.CreateBackup('/root/.ssh/id_dsa')
401
  if os.path.exists('/root/.ssh/id_dsa.pub'):
402
    utils.CreateBackup('/root/.ssh/id_dsa.pub')
403

    
404
  utils.RemoveFile('/root/.ssh/id_dsa')
405
  utils.RemoveFile('/root/.ssh/id_dsa.pub')
406

    
407
  result = utils.RunCmd(["ssh-keygen", "-t", "dsa",
408
                         "-f", "/root/.ssh/id_dsa",
409
                         "-q", "-N", ""])
410
  if result.failed:
411
    raise errors.OpExecError, ("could not generate ssh keypair, error %s" %
412
                               result.output)
413

    
414
  f = open('/root/.ssh/id_dsa.pub', 'r')
415
  try:
416
    utils.AddAuthorizedKey('/root/.ssh/authorized_keys', f.read(8192))
417
  finally:
418
    f.close()
419

    
420

    
421
def _InitGanetiServerSetup(ss):
422
  """Setup the necessary configuration for the initial node daemon.
423

424
  This creates the nodepass file containing the shared password for
425
  the cluster and also generates the SSL certificate.
426

427
  """
428
  # Create pseudo random password
429
  randpass = sha.new(os.urandom(64)).hexdigest()
430
  # and write it into sstore
431
  ss.SetKey(ss.SS_NODED_PASS, randpass)
432

    
433
  result = utils.RunCmd(["openssl", "req", "-new", "-newkey", "rsa:1024",
434
                         "-days", str(365*5), "-nodes", "-x509",
435
                         "-keyout", constants.SSL_CERT_FILE,
436
                         "-out", constants.SSL_CERT_FILE, "-batch"])
437
  if result.failed:
438
    raise errors.OpExecError, ("could not generate server ssl cert, command"
439
                               " %s had exitcode %s and error message %s" %
440
                               (result.cmd, result.exit_code, result.output))
441

    
442
  os.chmod(constants.SSL_CERT_FILE, 0400)
443

    
444
  result = utils.RunCmd([constants.NODE_INITD_SCRIPT, "restart"])
445

    
446
  if result.failed:
447
    raise errors.OpExecError, ("could not start the node daemon, command %s"
448
                               " had exitcode %s and error %s" %
449
                               (result.cmd, result.exit_code, result.output))
450

    
451

    
452
class LUInitCluster(LogicalUnit):
453
  """Initialise the cluster.
454

455
  """
456
  HPATH = "cluster-init"
457
  HTYPE = constants.HTYPE_CLUSTER
458
  _OP_REQP = ["cluster_name", "hypervisor_type", "vg_name", "mac_prefix",
459
              "def_bridge", "master_netdev"]
460
  REQ_CLUSTER = False
461

    
462
  def BuildHooksEnv(self):
463
    """Build hooks env.
464

465
    Notes: Since we don't require a cluster, we must manually add
466
    ourselves in the post-run node list.
467

468
    """
469
    env = {"CLUSTER": self.op.cluster_name,
470
           "MASTER": self.hostname['hostname_full']}
471
    return env, [], [self.hostname['hostname_full']]
472

    
473
  def CheckPrereq(self):
474
    """Verify that the passed name is a valid one.
475

476
    """
477
    if config.ConfigWriter.IsCluster():
478
      raise errors.OpPrereqError, ("Cluster is already initialised")
479

    
480
    hostname_local = socket.gethostname()
481
    self.hostname = hostname = utils.LookupHostname(hostname_local)
482
    if not hostname:
483
      raise errors.OpPrereqError, ("Cannot resolve my own hostname ('%s')" %
484
                                   hostname_local)
485

    
486
    self.clustername = clustername = utils.LookupHostname(self.op.cluster_name)
487
    if not clustername:
488
      raise errors.OpPrereqError, ("Cannot resolve given cluster name ('%s')"
489
                                   % self.op.cluster_name)
490

    
491
    result = utils.RunCmd(["fping", "-S127.0.0.1", "-q", hostname['ip']])
492
    if result.failed:
493
      raise errors.OpPrereqError, ("Inconsistency: this host's name resolves"
494
                                   " to %s,\nbut this ip address does not"
495
                                   " belong to this host."
496
                                   " Aborting." % hostname['ip'])
497

    
498
    secondary_ip = getattr(self.op, "secondary_ip", None)
499
    if secondary_ip and not utils.IsValidIP(secondary_ip):
500
      raise errors.OpPrereqError, ("Invalid secondary ip given")
501
    if secondary_ip and secondary_ip != hostname['ip']:
502
      result = utils.RunCmd(["fping", "-S127.0.0.1", "-q", secondary_ip])
503
      if result.failed:
504
        raise errors.OpPrereqError, ("You gave %s as secondary IP,\n"
505
                                     "but it does not belong to this host." %
506
                                     secondary_ip)
507
    self.secondary_ip = secondary_ip
508

    
509
    # checks presence of the volume group given
510
    vgstatus = _HasValidVG(utils.ListVolumeGroups(), self.op.vg_name)
511

    
512
    if vgstatus:
513
      raise errors.OpPrereqError, ("Error: %s" % vgstatus)
514

    
515
    if not re.match("^[0-9a-z]{2}:[0-9a-z]{2}:[0-9a-z]{2}$",
516
                    self.op.mac_prefix):
517
      raise errors.OpPrereqError, ("Invalid mac prefix given '%s'" %
518
                                   self.op.mac_prefix)
519

    
520
    if self.op.hypervisor_type not in hypervisor.VALID_HTYPES:
521
      raise errors.OpPrereqError, ("Invalid hypervisor type given '%s'" %
522
                                   self.op.hypervisor_type)
523

    
524
    result = utils.RunCmd(["ip", "link", "show", "dev", self.op.master_netdev])
525
    if result.failed:
526
      raise errors.OpPrereqError, ("Invalid master netdev given (%s): '%s'" %
527
                                   (self.op.master_netdev, result.output))
528

    
529
  def Exec(self, feedback_fn):
530
    """Initialize the cluster.
531

532
    """
533
    clustername = self.clustername
534
    hostname = self.hostname
535

    
536
    # set up the simple store
537
    ss = ssconf.SimpleStore()
538
    ss.SetKey(ss.SS_HYPERVISOR, self.op.hypervisor_type)
539
    ss.SetKey(ss.SS_MASTER_NODE, hostname['hostname_full'])
540
    ss.SetKey(ss.SS_MASTER_IP, clustername['ip'])
541
    ss.SetKey(ss.SS_MASTER_NETDEV, self.op.master_netdev)
542
    ss.SetKey(ss.SS_CLUSTER_NAME, clustername['hostname'])
543

    
544
    # set up the inter-node password and certificate
545
    _InitGanetiServerSetup(ss)
546

    
547
    # start the master ip
548
    rpc.call_node_start_master(hostname['hostname_full'])
549

    
550
    # set up ssh config and /etc/hosts
551
    f = open('/etc/ssh/ssh_host_rsa_key.pub', 'r')
552
    try:
553
      sshline = f.read()
554
    finally:
555
      f.close()
556
    sshkey = sshline.split(" ")[1]
557

    
558
    _UpdateEtcHosts(hostname['hostname_full'],
559
                    hostname['ip'],
560
                    )
561

    
562
    _UpdateKnownHosts(hostname['hostname_full'],
563
                      hostname['ip'],
564
                      sshkey,
565
                      )
566

    
567
    _InitSSHSetup(hostname['hostname'])
568

    
569
    # init of cluster config file
570
    cfgw = config.ConfigWriter()
571
    cfgw.InitConfig(hostname['hostname'], hostname['ip'], self.secondary_ip,
572
                    sshkey, self.op.mac_prefix,
573
                    self.op.vg_name, self.op.def_bridge)
574

    
575

    
576
class LUDestroyCluster(NoHooksLU):
577
  """Logical unit for destroying the cluster.
578

579
  """
580
  _OP_REQP = []
581

    
582
  def CheckPrereq(self):
583
    """Check prerequisites.
584

585
    This checks whether the cluster is empty.
586

587
    Any errors are signalled by raising errors.OpPrereqError.
588

589
    """
590
    master = self.sstore.GetMasterNode()
591

    
592
    nodelist = self.cfg.GetNodeList()
593
    if len(nodelist) > 0 and nodelist != [master]:
594
      raise errors.OpPrereqError, ("There are still %d node(s) in "
595
                                   "this cluster." % (len(nodelist) - 1))
596

    
597
  def Exec(self, feedback_fn):
598
    """Destroys the cluster.
599

600
    """
601
    utils.CreateBackup('/root/.ssh/id_dsa')
602
    utils.CreateBackup('/root/.ssh/id_dsa.pub')
603
    rpc.call_node_leave_cluster(self.sstore.GetMasterNode())
604

    
605

    
606
class LUVerifyCluster(NoHooksLU):
607
  """Verifies the cluster status.
608

609
  """
610
  _OP_REQP = []
611

    
612
  def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
613
                  remote_version, feedback_fn):
614
    """Run multiple tests against a node.
615

616
    Test list:
617
      - compares ganeti version
618
      - checks vg existance and size > 20G
619
      - checks config file checksum
620
      - checks ssh to other nodes
621

622
    Args:
623
      node: name of the node to check
624
      file_list: required list of files
625
      local_cksum: dictionary of local files and their checksums
626

627
    """
628
    # compares ganeti version
629
    local_version = constants.PROTOCOL_VERSION
630
    if not remote_version:
631
      feedback_fn(" - ERROR: connection to %s failed" % (node))
632
      return True
633

    
634
    if local_version != remote_version:
635
      feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
636
                      (local_version, node, remote_version))
637
      return True
638

    
639
    # checks vg existance and size > 20G
640

    
641
    bad = False
642
    if not vglist:
643
      feedback_fn("  - ERROR: unable to check volume groups on node %s." %
644
                      (node,))
645
      bad = True
646
    else:
647
      vgstatus = _HasValidVG(vglist, self.cfg.GetVGName())
648
      if vgstatus:
649
        feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
650
        bad = True
651

    
652
    # checks config file checksum
653
    # checks ssh to any
654

    
655
    if 'filelist' not in node_result:
656
      bad = True
657
      feedback_fn("  - ERROR: node hasn't returned file checksum data")
658
    else:
659
      remote_cksum = node_result['filelist']
660
      for file_name in file_list:
661
        if file_name not in remote_cksum:
662
          bad = True
663
          feedback_fn("  - ERROR: file '%s' missing" % file_name)
664
        elif remote_cksum[file_name] != local_cksum[file_name]:
665
          bad = True
666
          feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
667

    
668
    if 'nodelist' not in node_result:
669
      bad = True
670
      feedback_fn("  - ERROR: node hasn't returned node connectivity data")
671
    else:
672
      if node_result['nodelist']:
673
        bad = True
674
        for node in node_result['nodelist']:
675
          feedback_fn("  - ERROR: communication with node '%s': %s" %
676
                          (node, node_result['nodelist'][node]))
677
    hyp_result = node_result.get('hypervisor', None)
678
    if hyp_result is not None:
679
      feedback_fn("  - ERROR: hypervisor verify failure: '%s'" % hyp_result)
680
    return bad
681

    
682
  def _VerifyInstance(self, instance, node_vol_is, node_instance, feedback_fn):
683
    """Verify an instance.
684

685
    This function checks to see if the required block devices are
686
    available on the instance's node.
687

688
    """
689
    bad = False
690

    
691
    instancelist = self.cfg.GetInstanceList()
692
    if not instance in instancelist:
693
      feedback_fn("  - ERROR: instance %s not in instance list %s" %
694
                      (instance, instancelist))
695
      bad = True
696

    
697
    instanceconfig = self.cfg.GetInstanceInfo(instance)
698
    node_current = instanceconfig.primary_node
699

    
700
    node_vol_should = {}
701
    instanceconfig.MapLVsByNode(node_vol_should)
702

    
703
    for node in node_vol_should:
704
      for volume in node_vol_should[node]:
705
        if node not in node_vol_is or volume not in node_vol_is[node]:
706
          feedback_fn("  - ERROR: volume %s missing on node %s" %
707
                          (volume, node))
708
          bad = True
709

    
710
    if not instanceconfig.status == 'down':
711
      if not instance in node_instance[node_current]:
712
        feedback_fn("  - ERROR: instance %s not running on node %s" %
713
                        (instance, node_current))
714
        bad = True
715

    
716
    for node in node_instance:
717
      if (not node == node_current):
718
        if instance in node_instance[node]:
719
          feedback_fn("  - ERROR: instance %s should not run on node %s" %
720
                          (instance, node))
721
          bad = True
722

    
723
    return not bad
724

    
725
  def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
726
    """Verify if there are any unknown volumes in the cluster.
727

728
    The .os, .swap and backup volumes are ignored. All other volumes are
729
    reported as unknown.
730

731
    """
732
    bad = False
733

    
734
    for node in node_vol_is:
735
      for volume in node_vol_is[node]:
736
        if node not in node_vol_should or volume not in node_vol_should[node]:
737
          feedback_fn("  - ERROR: volume %s on node %s should not exist" %
738
                      (volume, node))
739
          bad = True
740
    return bad
741

    
742
  def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
743
    """Verify the list of running instances.
744

745
    This checks what instances are running but unknown to the cluster.
746

747
    """
748
    bad = False
749
    for node in node_instance:
750
      for runninginstance in node_instance[node]:
751
        if runninginstance not in instancelist:
752
          feedback_fn("  - ERROR: instance %s on node %s should not exist" %
753
                          (runninginstance, node))
754
          bad = True
755
    return bad
756

    
757
  def CheckPrereq(self):
758
    """Check prerequisites.
759

760
    This has no prerequisites.
761

762
    """
763
    pass
764

    
765
  def Exec(self, feedback_fn):
766
    """Verify integrity of cluster, performing various test on nodes.
767

768
    """
769
    bad = False
770
    feedback_fn("* Verifying global settings")
771
    self.cfg.VerifyConfig()
772

    
773
    master = self.sstore.GetMasterNode()
774
    vg_name = self.cfg.GetVGName()
775
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
776
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
777
    node_volume = {}
778
    node_instance = {}
779

    
780
    # FIXME: verify OS list
781
    # do local checksums
782
    file_names = list(self.sstore.GetFileList())
783
    file_names.append(constants.SSL_CERT_FILE)
784
    file_names.append(constants.CLUSTER_CONF_FILE)
785
    local_checksums = utils.FingerprintFiles(file_names)
786

    
787
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
788
    all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
789
    all_instanceinfo = rpc.call_instance_list(nodelist)
790
    all_vglist = rpc.call_vg_list(nodelist)
791
    node_verify_param = {
792
      'filelist': file_names,
793
      'nodelist': nodelist,
794
      'hypervisor': None,
795
      }
796
    all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
797
    all_rversion = rpc.call_version(nodelist)
798

    
799
    for node in nodelist:
800
      feedback_fn("* Verifying node %s" % node)
801
      result = self._VerifyNode(node, file_names, local_checksums,
802
                                all_vglist[node], all_nvinfo[node],
803
                                all_rversion[node], feedback_fn)
804
      bad = bad or result
805

    
806
      # node_volume
807
      volumeinfo = all_volumeinfo[node]
808

    
809
      if type(volumeinfo) != dict:
810
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
811
        bad = True
812
        continue
813

    
814
      node_volume[node] = volumeinfo
815

    
816
      # node_instance
817
      nodeinstance = all_instanceinfo[node]
818
      if type(nodeinstance) != list:
819
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
820
        bad = True
821
        continue
822

    
823
      node_instance[node] = nodeinstance
824

    
825
    node_vol_should = {}
826

    
827
    for instance in instancelist:
828
      feedback_fn("* Verifying instance %s" % instance)
829
      result =  self._VerifyInstance(instance, node_volume, node_instance,
830
                                     feedback_fn)
831
      bad = bad or result
832

    
833
      inst_config = self.cfg.GetInstanceInfo(instance)
834

    
835
      inst_config.MapLVsByNode(node_vol_should)
836

    
837
    feedback_fn("* Verifying orphan volumes")
838
    result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
839
                                       feedback_fn)
840
    bad = bad or result
841

    
842
    feedback_fn("* Verifying remaining instances")
843
    result = self._VerifyOrphanInstances(instancelist, node_instance,
844
                                         feedback_fn)
845
    bad = bad or result
846

    
847
    return int(bad)
848

    
849

    
850
def _WaitForSync(cfgw, instance, oneshot=False, unlock=False):
851
  """Sleep and poll for an instance's disk to sync.
852

853
  """
854
  if not instance.disks:
855
    return True
856

    
857
  if not oneshot:
858
    logger.ToStdout("Waiting for instance %s to sync disks." % instance.name)
859

    
860
  node = instance.primary_node
861

    
862
  for dev in instance.disks:
863
    cfgw.SetDiskID(dev, node)
864

    
865
  retries = 0
866
  while True:
867
    max_time = 0
868
    done = True
869
    cumul_degraded = False
870
    rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
871
    if not rstats:
872
      logger.ToStderr("Can't get any data from node %s" % node)
873
      retries += 1
874
      if retries >= 10:
875
        raise errors.RemoteError, ("Can't contact node %s for mirror data,"
876
                                   " aborting." % node)
877
      time.sleep(6)
878
      continue
879
    retries = 0
880
    for i in range(len(rstats)):
881
      mstat = rstats[i]
882
      if mstat is None:
883
        logger.ToStderr("Can't compute data for node %s/%s" %
884
                        (node, instance.disks[i].iv_name))
885
        continue
886
      perc_done, est_time, is_degraded = mstat
887
      cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
888
      if perc_done is not None:
889
        done = False
890
        if est_time is not None:
891
          rem_time = "%d estimated seconds remaining" % est_time
892
          max_time = est_time
893
        else:
894
          rem_time = "no time estimate"
895
        logger.ToStdout("- device %s: %5.2f%% done, %s" %
896
                        (instance.disks[i].iv_name, perc_done, rem_time))
897
    if done or oneshot:
898
      break
899

    
900
    if unlock:
901
      utils.Unlock('cmd')
902
    try:
903
      time.sleep(min(60, max_time))
904
    finally:
905
      if unlock:
906
        utils.Lock('cmd')
907

    
908
  if done:
909
    logger.ToStdout("Instance %s's disks are in sync." % instance.name)
910
  return not cumul_degraded
911

    
912

    
913
def _CheckDiskConsistency(cfgw, dev, node, on_primary):
914
  """Check that mirrors are not degraded.
915

916
  """
917
  cfgw.SetDiskID(dev, node)
918

    
919
  result = True
920
  if on_primary or dev.AssembleOnSecondary():
921
    rstats = rpc.call_blockdev_find(node, dev)
922
    if not rstats:
923
      logger.ToStderr("Can't get any data from node %s" % node)
924
      result = False
925
    else:
926
      result = result and (not rstats[5])
927
  if dev.children:
928
    for child in dev.children:
929
      result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
930

    
931
  return result
932

    
933

    
934
class LUDiagnoseOS(NoHooksLU):
935
  """Logical unit for OS diagnose/query.
936

937
  """
938
  _OP_REQP = []
939

    
940
  def CheckPrereq(self):
941
    """Check prerequisites.
942

943
    This always succeeds, since this is a pure query LU.
944

945
    """
946
    return
947

    
948
  def Exec(self, feedback_fn):
949
    """Compute the list of OSes.
950

951
    """
952
    node_list = self.cfg.GetNodeList()
953
    node_data = rpc.call_os_diagnose(node_list)
954
    if node_data == False:
955
      raise errors.OpExecError, "Can't gather the list of OSes"
956
    return node_data
957

    
958

    
959
class LURemoveNode(LogicalUnit):
960
  """Logical unit for removing a node.
961

962
  """
963
  HPATH = "node-remove"
964
  HTYPE = constants.HTYPE_NODE
965
  _OP_REQP = ["node_name"]
966

    
967
  def BuildHooksEnv(self):
968
    """Build hooks env.
969

970
    This doesn't run on the target node in the pre phase as a failed
971
    node would not allows itself to run.
972

973
    """
974
    all_nodes = self.cfg.GetNodeList()
975
    all_nodes.remove(self.op.node_name)
976
    return {"NODE_NAME": self.op.node_name}, all_nodes, all_nodes
977

    
978
  def CheckPrereq(self):
979
    """Check prerequisites.
980

981
    This checks:
982
     - the node exists in the configuration
983
     - it does not have primary or secondary instances
984
     - it's not the master
985

986
    Any errors are signalled by raising errors.OpPrereqError.
987

988
    """
989
    node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
990
    if node is None:
991
      logger.Error("Error: Node '%s' is unknown." % self.op.node_name)
992
      return 1
993

    
994
    instance_list = self.cfg.GetInstanceList()
995

    
996
    masternode = self.sstore.GetMasterNode()
997
    if node.name == masternode:
998
      raise errors.OpPrereqError, ("Node is the master node,"
999
                                   " you need to failover first.")
1000

    
1001
    for instance_name in instance_list:
1002
      instance = self.cfg.GetInstanceInfo(instance_name)
1003
      if node.name == instance.primary_node:
1004
        raise errors.OpPrereqError, ("Instance %s still running on the node,"
1005
                                     " please remove first." % instance_name)
1006
      if node.name in instance.secondary_nodes:
1007
        raise errors.OpPrereqError, ("Instance %s has node as a secondary,"
1008
                                     " please remove first." % instance_name)
1009
    self.op.node_name = node.name
1010
    self.node = node
1011

    
1012
  def Exec(self, feedback_fn):
1013
    """Removes the node from the cluster.
1014

1015
    """
1016
    node = self.node
1017
    logger.Info("stopping the node daemon and removing configs from node %s" %
1018
                node.name)
1019

    
1020
    rpc.call_node_leave_cluster(node.name)
1021

    
1022
    ssh.SSHCall(node.name, 'root', "%s stop" % constants.NODE_INITD_SCRIPT)
1023

    
1024
    logger.Info("Removing node %s from config" % node.name)
1025

    
1026
    self.cfg.RemoveNode(node.name)
1027

    
1028

    
1029
class LUQueryNodes(NoHooksLU):
1030
  """Logical unit for querying nodes.
1031

1032
  """
1033
  _OP_REQP = ["output_fields"]
1034

    
1035
  def CheckPrereq(self):
1036
    """Check prerequisites.
1037

1038
    This checks that the fields required are valid output fields.
1039

1040
    """
1041
    self.dynamic_fields = frozenset(["dtotal", "dfree",
1042
                                     "mtotal", "mnode", "mfree"])
1043

    
1044
    _CheckOutputFields(static=["name", "pinst", "sinst", "pip", "sip"],
1045
                       dynamic=self.dynamic_fields,
1046
                       selected=self.op.output_fields)
1047

    
1048

    
1049
  def Exec(self, feedback_fn):
1050
    """Computes the list of nodes and their attributes.
1051

1052
    """
1053
    nodenames = utils.NiceSort(self.cfg.GetNodeList())
1054
    nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1055

    
1056

    
1057
    # begin data gathering
1058

    
1059
    if self.dynamic_fields.intersection(self.op.output_fields):
1060
      live_data = {}
1061
      node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1062
      for name in nodenames:
1063
        nodeinfo = node_data.get(name, None)
1064
        if nodeinfo:
1065
          live_data[name] = {
1066
            "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1067
            "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1068
            "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1069
            "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1070
            "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1071
            }
1072
        else:
1073
          live_data[name] = {}
1074
    else:
1075
      live_data = dict.fromkeys(nodenames, {})
1076

    
1077
    node_to_primary = dict.fromkeys(nodenames, 0)
1078
    node_to_secondary = dict.fromkeys(nodenames, 0)
1079

    
1080
    if "pinst" in self.op.output_fields or "sinst" in self.op.output_fields:
1081
      instancelist = self.cfg.GetInstanceList()
1082

    
1083
      for instance in instancelist:
1084
        instanceinfo = self.cfg.GetInstanceInfo(instance)
1085
        node_to_primary[instanceinfo.primary_node] += 1
1086
        for secnode in instanceinfo.secondary_nodes:
1087
          node_to_secondary[secnode] += 1
1088

    
1089
    # end data gathering
1090

    
1091
    output = []
1092
    for node in nodelist:
1093
      node_output = []
1094
      for field in self.op.output_fields:
1095
        if field == "name":
1096
          val = node.name
1097
        elif field == "pinst":
1098
          val = node_to_primary[node.name]
1099
        elif field == "sinst":
1100
          val = node_to_secondary[node.name]
1101
        elif field == "pip":
1102
          val = node.primary_ip
1103
        elif field == "sip":
1104
          val = node.secondary_ip
1105
        elif field in self.dynamic_fields:
1106
          val = live_data[node.name].get(field, "?")
1107
        else:
1108
          raise errors.ParameterError, field
1109
        val = str(val)
1110
        node_output.append(val)
1111
      output.append(node_output)
1112

    
1113
    return output
1114

    
1115

    
1116
class LUQueryNodeVolumes(NoHooksLU):
1117
  """Logical unit for getting volumes on node(s).
1118

1119
  """
1120
  _OP_REQP = ["nodes", "output_fields"]
1121

    
1122
  def CheckPrereq(self):
1123
    """Check prerequisites.
1124

1125
    This checks that the fields required are valid output fields.
1126

1127
    """
1128
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1129

    
1130
    _CheckOutputFields(static=["node"],
1131
                       dynamic=["phys", "vg", "name", "size", "instance"],
1132
                       selected=self.op.output_fields)
1133

    
1134

    
1135
  def Exec(self, feedback_fn):
1136
    """Computes the list of nodes and their attributes.
1137

1138
    """
1139
    nodenames = utils.NiceSort([node.name for node in self.nodes])
1140
    volumes = rpc.call_node_volumes(nodenames)
1141

    
1142
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
1143
             in self.cfg.GetInstanceList()]
1144

    
1145
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1146

    
1147
    output = []
1148
    for node in nodenames:
1149
      node_vols = volumes[node][:]
1150
      node_vols.sort(key=lambda vol: vol['dev'])
1151

    
1152
      for vol in node_vols:
1153
        node_output = []
1154
        for field in self.op.output_fields:
1155
          if field == "node":
1156
            val = node
1157
          elif field == "phys":
1158
            val = vol['dev']
1159
          elif field == "vg":
1160
            val = vol['vg']
1161
          elif field == "name":
1162
            val = vol['name']
1163
          elif field == "size":
1164
            val = int(float(vol['size']))
1165
          elif field == "instance":
1166
            for inst in ilist:
1167
              if node not in lv_by_node[inst]:
1168
                continue
1169
              if vol['name'] in lv_by_node[inst][node]:
1170
                val = inst.name
1171
                break
1172
            else:
1173
              val = '-'
1174
          else:
1175
            raise errors.ParameterError, field
1176
          node_output.append(str(val))
1177

    
1178
        output.append(node_output)
1179

    
1180
    return output
1181

    
1182

    
1183
class LUAddNode(LogicalUnit):
1184
  """Logical unit for adding node to the cluster.
1185

1186
  """
1187
  HPATH = "node-add"
1188
  HTYPE = constants.HTYPE_NODE
1189
  _OP_REQP = ["node_name"]
1190

    
1191
  def BuildHooksEnv(self):
1192
    """Build hooks env.
1193

1194
    This will run on all nodes before, and on all nodes + the new node after.
1195

1196
    """
1197
    env = {
1198
      "NODE_NAME": self.op.node_name,
1199
      "NODE_PIP": self.op.primary_ip,
1200
      "NODE_SIP": self.op.secondary_ip,
1201
      }
1202
    nodes_0 = self.cfg.GetNodeList()
1203
    nodes_1 = nodes_0 + [self.op.node_name, ]
1204
    return env, nodes_0, nodes_1
1205

    
1206
  def CheckPrereq(self):
1207
    """Check prerequisites.
1208

1209
    This checks:
1210
     - the new node is not already in the config
1211
     - it is resolvable
1212
     - its parameters (single/dual homed) matches the cluster
1213

1214
    Any errors are signalled by raising errors.OpPrereqError.
1215

1216
    """
1217
    node_name = self.op.node_name
1218
    cfg = self.cfg
1219

    
1220
    dns_data = utils.LookupHostname(node_name)
1221
    if not dns_data:
1222
      raise errors.OpPrereqError, ("Node %s is not resolvable" % node_name)
1223

    
1224
    node = dns_data['hostname']
1225
    primary_ip = self.op.primary_ip = dns_data['ip']
1226
    secondary_ip = getattr(self.op, "secondary_ip", None)
1227
    if secondary_ip is None:
1228
      secondary_ip = primary_ip
1229
    if not utils.IsValidIP(secondary_ip):
1230
      raise errors.OpPrereqError, ("Invalid secondary IP given")
1231
    self.op.secondary_ip = secondary_ip
1232
    node_list = cfg.GetNodeList()
1233
    if node in node_list:
1234
      raise errors.OpPrereqError, ("Node %s is already in the configuration"
1235
                                   % node)
1236

    
1237
    for existing_node_name in node_list:
1238
      existing_node = cfg.GetNodeInfo(existing_node_name)
1239
      if (existing_node.primary_ip == primary_ip or
1240
          existing_node.secondary_ip == primary_ip or
1241
          existing_node.primary_ip == secondary_ip or
1242
          existing_node.secondary_ip == secondary_ip):
1243
        raise errors.OpPrereqError, ("New node ip address(es) conflict with"
1244
                                     " existing node %s" % existing_node.name)
1245

    
1246
    # check that the type of the node (single versus dual homed) is the
1247
    # same as for the master
1248
    myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1249
    master_singlehomed = myself.secondary_ip == myself.primary_ip
1250
    newbie_singlehomed = secondary_ip == primary_ip
1251
    if master_singlehomed != newbie_singlehomed:
1252
      if master_singlehomed:
1253
        raise errors.OpPrereqError, ("The master has no private ip but the"
1254
                                     " new node has one")
1255
      else:
1256
        raise errors.OpPrereqError ("The master has a private ip but the"
1257
                                    " new node doesn't have one")
1258

    
1259
    # checks reachablity
1260
    command = ["fping", "-q", primary_ip]
1261
    result = utils.RunCmd(command)
1262
    if result.failed:
1263
      raise errors.OpPrereqError, ("Node not reachable by ping")
1264

    
1265
    if not newbie_singlehomed:
1266
      # check reachability from my secondary ip to newbie's secondary ip
1267
      command = ["fping", "-S%s" % myself.secondary_ip, "-q", secondary_ip]
1268
      result = utils.RunCmd(command)
1269
      if result.failed:
1270
        raise errors.OpPrereqError, ("Node secondary ip not reachable by ping")
1271

    
1272
    self.new_node = objects.Node(name=node,
1273
                                 primary_ip=primary_ip,
1274
                                 secondary_ip=secondary_ip)
1275

    
1276
  def Exec(self, feedback_fn):
1277
    """Adds the new node to the cluster.
1278

1279
    """
1280
    new_node = self.new_node
1281
    node = new_node.name
1282

    
1283
    # set up inter-node password and certificate and restarts the node daemon
1284
    gntpass = self.sstore.GetNodeDaemonPassword()
1285
    if not re.match('^[a-zA-Z0-9.]{1,64}$', gntpass):
1286
      raise errors.OpExecError, ("ganeti password corruption detected")
1287
    f = open(constants.SSL_CERT_FILE)
1288
    try:
1289
      gntpem = f.read(8192)
1290
    finally:
1291
      f.close()
1292
    # in the base64 pem encoding, neither '!' nor '.' are valid chars,
1293
    # so we use this to detect an invalid certificate; as long as the
1294
    # cert doesn't contain this, the here-document will be correctly
1295
    # parsed by the shell sequence below
1296
    if re.search('^!EOF\.', gntpem, re.MULTILINE):
1297
      raise errors.OpExecError, ("invalid PEM encoding in the SSL certificate")
1298
    if not gntpem.endswith("\n"):
1299
      raise errors.OpExecError, ("PEM must end with newline")
1300
    logger.Info("copy cluster pass to %s and starting the node daemon" % node)
1301

    
1302
    # remove first the root's known_hosts file
1303
    utils.RemoveFile("/root/.ssh/known_hosts")
1304
    # and then connect with ssh to set password and start ganeti-noded
1305
    # note that all the below variables are sanitized at this point,
1306
    # either by being constants or by the checks above
1307
    ss = self.sstore
1308
    mycommand = ("umask 077 && "
1309
                 "echo '%s' > '%s' && "
1310
                 "cat > '%s' << '!EOF.' && \n"
1311
                 "%s!EOF.\n%s restart" %
1312
                 (gntpass, ss.KeyToFilename(ss.SS_NODED_PASS),
1313
                  constants.SSL_CERT_FILE, gntpem,
1314
                  constants.NODE_INITD_SCRIPT))
1315

    
1316
    result = ssh.SSHCall(node, 'root', mycommand, batch=False, ask_key=True)
1317
    if result.failed:
1318
      raise errors.OpExecError, ("Remote command on node %s, error: %s,"
1319
                                 " output: %s" %
1320
                                 (node, result.fail_reason, result.output))
1321

    
1322
    # check connectivity
1323
    time.sleep(4)
1324

    
1325
    result = rpc.call_version([node])[node]
1326
    if result:
1327
      if constants.PROTOCOL_VERSION == result:
1328
        logger.Info("communication to node %s fine, sw version %s match" %
1329
                    (node, result))
1330
      else:
1331
        raise errors.OpExecError, ("Version mismatch master version %s,"
1332
                                   " node version %s" %
1333
                                   (constants.PROTOCOL_VERSION, result))
1334
    else:
1335
      raise errors.OpExecError, ("Cannot get version from the new node")
1336

    
1337
    # setup ssh on node
1338
    logger.Info("copy ssh key to node %s" % node)
1339
    keyarray = []
1340
    keyfiles = ["/etc/ssh/ssh_host_dsa_key", "/etc/ssh/ssh_host_dsa_key.pub",
1341
                "/etc/ssh/ssh_host_rsa_key", "/etc/ssh/ssh_host_rsa_key.pub",
1342
                "/root/.ssh/id_dsa", "/root/.ssh/id_dsa.pub"]
1343

    
1344
    for i in keyfiles:
1345
      f = open(i, 'r')
1346
      try:
1347
        keyarray.append(f.read())
1348
      finally:
1349
        f.close()
1350

    
1351
    result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1352
                               keyarray[3], keyarray[4], keyarray[5])
1353

    
1354
    if not result:
1355
      raise errors.OpExecError, ("Cannot transfer ssh keys to the new node")
1356

    
1357
    # Add node to our /etc/hosts, and add key to known_hosts
1358
    _UpdateEtcHosts(new_node.name, new_node.primary_ip)
1359
    _UpdateKnownHosts(new_node.name, new_node.primary_ip,
1360
                      self.cfg.GetHostKey())
1361

    
1362
    if new_node.secondary_ip != new_node.primary_ip:
1363
      result = ssh.SSHCall(node, "root",
1364
                           "fping -S 127.0.0.1 -q %s" % new_node.secondary_ip)
1365
      if result.failed:
1366
        raise errors.OpExecError, ("Node claims it doesn't have the"
1367
                                   " secondary ip you gave (%s).\n"
1368
                                   "Please fix and re-run this command." %
1369
                                   new_node.secondary_ip)
1370

    
1371
    # Distribute updated /etc/hosts and known_hosts to all nodes,
1372
    # including the node just added
1373
    myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1374
    dist_nodes = self.cfg.GetNodeList() + [node]
1375
    if myself.name in dist_nodes:
1376
      dist_nodes.remove(myself.name)
1377

    
1378
    logger.Debug("Copying hosts and known_hosts to all nodes")
1379
    for fname in ("/etc/hosts", "/etc/ssh/ssh_known_hosts"):
1380
      result = rpc.call_upload_file(dist_nodes, fname)
1381
      for to_node in dist_nodes:
1382
        if not result[to_node]:
1383
          logger.Error("copy of file %s to node %s failed" %
1384
                       (fname, to_node))
1385

    
1386
    to_copy = ss.GetFileList()
1387
    for fname in to_copy:
1388
      if not ssh.CopyFileToNode(node, fname):
1389
        logger.Error("could not copy file %s to node %s" % (fname, node))
1390

    
1391
    logger.Info("adding node %s to cluster.conf" % node)
1392
    self.cfg.AddNode(new_node)
1393

    
1394

    
1395
class LUMasterFailover(LogicalUnit):
1396
  """Failover the master node to the current node.
1397

1398
  This is a special LU in that it must run on a non-master node.
1399

1400
  """
1401
  HPATH = "master-failover"
1402
  HTYPE = constants.HTYPE_CLUSTER
1403
  REQ_MASTER = False
1404
  _OP_REQP = []
1405

    
1406
  def BuildHooksEnv(self):
1407
    """Build hooks env.
1408

1409
    This will run on the new master only in the pre phase, and on all
1410
    the nodes in the post phase.
1411

1412
    """
1413
    env = {
1414
      "NEW_MASTER": self.new_master,
1415
      "OLD_MASTER": self.old_master,
1416
      }
1417
    return env, [self.new_master], self.cfg.GetNodeList()
1418

    
1419
  def CheckPrereq(self):
1420
    """Check prerequisites.
1421

1422
    This checks that we are not already the master.
1423

1424
    """
1425
    self.new_master = socket.gethostname()
1426

    
1427
    self.old_master = self.sstore.GetMasterNode()
1428

    
1429
    if self.old_master == self.new_master:
1430
      raise errors.OpPrereqError, ("This commands must be run on the node"
1431
                                   " where you want the new master to be.\n"
1432
                                   "%s is already the master" %
1433
                                   self.old_master)
1434

    
1435
  def Exec(self, feedback_fn):
1436
    """Failover the master node.
1437

1438
    This command, when run on a non-master node, will cause the current
1439
    master to cease being master, and the non-master to become new
1440
    master.
1441

1442
    """
1443
    #TODO: do not rely on gethostname returning the FQDN
1444
    logger.Info("setting master to %s, old master: %s" %
1445
                (self.new_master, self.old_master))
1446

    
1447
    if not rpc.call_node_stop_master(self.old_master):
1448
      logger.Error("could disable the master role on the old master"
1449
                   " %s, please disable manually" % self.old_master)
1450

    
1451
    ss = self.sstore
1452
    ss.SetKey(ss.SS_MASTER_NODE, self.new_master)
1453
    if not rpc.call_upload_file(self.cfg.GetNodeList(),
1454
                                ss.KeyToFilename(ss.SS_MASTER_NODE)):
1455
      logger.Error("could not distribute the new simple store master file"
1456
                   " to the other nodes, please check.")
1457

    
1458
    if not rpc.call_node_start_master(self.new_master):
1459
      logger.Error("could not start the master role on the new master"
1460
                   " %s, please check" % self.new_master)
1461
      feedback_fn("Error in activating the master IP on the new master,\n"
1462
                  "please fix manually.")
1463

    
1464

    
1465

    
1466
class LUQueryClusterInfo(NoHooksLU):
1467
  """Query cluster configuration.
1468

1469
  """
1470
  _OP_REQP = []
1471
  REQ_MASTER = False
1472

    
1473
  def CheckPrereq(self):
1474
    """No prerequsites needed for this LU.
1475

1476
    """
1477
    pass
1478

    
1479
  def Exec(self, feedback_fn):
1480
    """Return cluster config.
1481

1482
    """
1483
    result = {
1484
      "name": self.sstore.GetClusterName(),
1485
      "software_version": constants.RELEASE_VERSION,
1486
      "protocol_version": constants.PROTOCOL_VERSION,
1487
      "config_version": constants.CONFIG_VERSION,
1488
      "os_api_version": constants.OS_API_VERSION,
1489
      "export_version": constants.EXPORT_VERSION,
1490
      "master": self.sstore.GetMasterNode(),
1491
      "architecture": (platform.architecture()[0], platform.machine()),
1492
      }
1493

    
1494
    return result
1495

    
1496

    
1497
class LUClusterCopyFile(NoHooksLU):
1498
  """Copy file to cluster.
1499

1500
  """
1501
  _OP_REQP = ["nodes", "filename"]
1502

    
1503
  def CheckPrereq(self):
1504
    """Check prerequisites.
1505

1506
    It should check that the named file exists and that the given list
1507
    of nodes is valid.
1508

1509
    """
1510
    if not os.path.exists(self.op.filename):
1511
      raise errors.OpPrereqError("No such filename '%s'" % self.op.filename)
1512

    
1513
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1514

    
1515
  def Exec(self, feedback_fn):
1516
    """Copy a file from master to some nodes.
1517

1518
    Args:
1519
      opts - class with options as members
1520
      args - list containing a single element, the file name
1521
    Opts used:
1522
      nodes - list containing the name of target nodes; if empty, all nodes
1523

1524
    """
1525
    filename = self.op.filename
1526

    
1527
    myname = socket.gethostname()
1528

    
1529
    for node in self.nodes:
1530
      if node == myname:
1531
        continue
1532
      if not ssh.CopyFileToNode(node, filename):
1533
        logger.Error("Copy of file %s to node %s failed" % (filename, node))
1534

    
1535

    
1536
class LUDumpClusterConfig(NoHooksLU):
1537
  """Return a text-representation of the cluster-config.
1538

1539
  """
1540
  _OP_REQP = []
1541

    
1542
  def CheckPrereq(self):
1543
    """No prerequisites.
1544

1545
    """
1546
    pass
1547

    
1548
  def Exec(self, feedback_fn):
1549
    """Dump a representation of the cluster config to the standard output.
1550

1551
    """
1552
    return self.cfg.DumpConfig()
1553

    
1554

    
1555
class LURunClusterCommand(NoHooksLU):
1556
  """Run a command on some nodes.
1557

1558
  """
1559
  _OP_REQP = ["command", "nodes"]
1560

    
1561
  def CheckPrereq(self):
1562
    """Check prerequisites.
1563

1564
    It checks that the given list of nodes is valid.
1565

1566
    """
1567
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1568

    
1569
  def Exec(self, feedback_fn):
1570
    """Run a command on some nodes.
1571

1572
    """
1573
    data = []
1574
    for node in self.nodes:
1575
      result = utils.RunCmd(["ssh", node.name, self.op.command])
1576
      data.append((node.name, result.cmd, result.output, result.exit_code))
1577

    
1578
    return data
1579

    
1580

    
1581
class LUActivateInstanceDisks(NoHooksLU):
1582
  """Bring up an instance's disks.
1583

1584
  """
1585
  _OP_REQP = ["instance_name"]
1586

    
1587
  def CheckPrereq(self):
1588
    """Check prerequisites.
1589

1590
    This checks that the instance is in the cluster.
1591

1592
    """
1593
    instance = self.cfg.GetInstanceInfo(
1594
      self.cfg.ExpandInstanceName(self.op.instance_name))
1595
    if instance is None:
1596
      raise errors.OpPrereqError, ("Instance '%s' not known" %
1597
                                   self.op.instance_name)
1598
    self.instance = instance
1599

    
1600

    
1601
  def Exec(self, feedback_fn):
1602
    """Activate the disks.
1603

1604
    """
1605
    disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
1606
    if not disks_ok:
1607
      raise errors.OpExecError, ("Cannot activate block devices")
1608

    
1609
    return disks_info
1610

    
1611

    
1612
def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
1613
  """Prepare the block devices for an instance.
1614

1615
  This sets up the block devices on all nodes.
1616

1617
  Args:
1618
    instance: a ganeti.objects.Instance object
1619
    ignore_secondaries: if true, errors on secondary nodes won't result
1620
                        in an error return from the function
1621

1622
  Returns:
1623
    false if the operation failed
1624
    list of (host, instance_visible_name, node_visible_name) if the operation
1625
         suceeded with the mapping from node devices to instance devices
1626
  """
1627
  device_info = []
1628
  disks_ok = True
1629
  for inst_disk in instance.disks:
1630
    master_result = None
1631
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1632
      cfg.SetDiskID(node_disk, node)
1633
      is_primary = node == instance.primary_node
1634
      result = rpc.call_blockdev_assemble(node, node_disk, is_primary)
1635
      if not result:
1636
        logger.Error("could not prepare block device %s on node %s (is_pri"
1637
                     "mary=%s)" % (inst_disk.iv_name, node, is_primary))
1638
        if is_primary or not ignore_secondaries:
1639
          disks_ok = False
1640
      if is_primary:
1641
        master_result = result
1642
    device_info.append((instance.primary_node, inst_disk.iv_name,
1643
                        master_result))
1644

    
1645
  return disks_ok, device_info
1646

    
1647

    
1648
def _StartInstanceDisks(cfg, instance, force):
1649
  disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
1650
                                           ignore_secondaries=force)
1651
  if not disks_ok:
1652
    _ShutdownInstanceDisks(instance, cfg)
1653
    if force is not None and not force:
1654
      logger.Error("If the message above refers to a secondary node,"
1655
                   " you can retry the operation using '--force'.")
1656
    raise errors.OpExecError, ("Disk consistency error")
1657

    
1658

    
1659
class LUDeactivateInstanceDisks(NoHooksLU):
1660
  """Shutdown an instance's disks.
1661

1662
  """
1663
  _OP_REQP = ["instance_name"]
1664

    
1665
  def CheckPrereq(self):
1666
    """Check prerequisites.
1667

1668
    This checks that the instance is in the cluster.
1669

1670
    """
1671
    instance = self.cfg.GetInstanceInfo(
1672
      self.cfg.ExpandInstanceName(self.op.instance_name))
1673
    if instance is None:
1674
      raise errors.OpPrereqError, ("Instance '%s' not known" %
1675
                                   self.op.instance_name)
1676
    self.instance = instance
1677

    
1678
  def Exec(self, feedback_fn):
1679
    """Deactivate the disks
1680

1681
    """
1682
    instance = self.instance
1683
    ins_l = rpc.call_instance_list([instance.primary_node])
1684
    ins_l = ins_l[instance.primary_node]
1685
    if not type(ins_l) is list:
1686
      raise errors.OpExecError, ("Can't contact node '%s'" %
1687
                                 instance.primary_node)
1688

    
1689
    if self.instance.name in ins_l:
1690
      raise errors.OpExecError, ("Instance is running, can't shutdown"
1691
                                 " block devices.")
1692

    
1693
    _ShutdownInstanceDisks(instance, self.cfg)
1694

    
1695

    
1696
def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
1697
  """Shutdown block devices of an instance.
1698

1699
  This does the shutdown on all nodes of the instance.
1700

1701
  If the ignore_primary is false, errors on the primary node are
1702
  ignored.
1703

1704
  """
1705
  result = True
1706
  for disk in instance.disks:
1707
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
1708
      cfg.SetDiskID(top_disk, node)
1709
      if not rpc.call_blockdev_shutdown(node, top_disk):
1710
        logger.Error("could not shutdown block device %s on node %s" %
1711
                     (disk.iv_name, node))
1712
        if not ignore_primary or node != instance.primary_node:
1713
          result = False
1714
  return result
1715

    
1716

    
1717
class LUStartupInstance(LogicalUnit):
1718
  """Starts an instance.
1719

1720
  """
1721
  HPATH = "instance-start"
1722
  HTYPE = constants.HTYPE_INSTANCE
1723
  _OP_REQP = ["instance_name", "force"]
1724

    
1725
  def BuildHooksEnv(self):
1726
    """Build hooks env.
1727

1728
    This runs on master, primary and secondary nodes of the instance.
1729

1730
    """
1731
    env = {
1732
      "INSTANCE_NAME": self.op.instance_name,
1733
      "INSTANCE_PRIMARY": self.instance.primary_node,
1734
      "INSTANCE_SECONDARIES": " ".join(self.instance.secondary_nodes),
1735
      "FORCE": self.op.force,
1736
      }
1737
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
1738
          list(self.instance.secondary_nodes))
1739
    return env, nl, nl
1740

    
1741
  def CheckPrereq(self):
1742
    """Check prerequisites.
1743

1744
    This checks that the instance is in the cluster.
1745

1746
    """
1747
    instance = self.cfg.GetInstanceInfo(
1748
      self.cfg.ExpandInstanceName(self.op.instance_name))
1749
    if instance is None:
1750
      raise errors.OpPrereqError, ("Instance '%s' not known" %
1751
                                   self.op.instance_name)
1752

    
1753
    # check bridges existance
1754
    brlist = [nic.bridge for nic in instance.nics]
1755
    if not rpc.call_bridges_exist(instance.primary_node, brlist):
1756
      raise errors.OpPrereqError, ("one or more target bridges %s does not"
1757
                                   " exist on destination node '%s'" %
1758
                                   (brlist, instance.primary_node))
1759

    
1760
    self.instance = instance
1761
    self.op.instance_name = instance.name
1762

    
1763
  def Exec(self, feedback_fn):
1764
    """Start the instance.
1765

1766
    """
1767
    instance = self.instance
1768
    force = self.op.force
1769
    extra_args = getattr(self.op, "extra_args", "")
1770

    
1771
    node_current = instance.primary_node
1772

    
1773
    nodeinfo = rpc.call_node_info([node_current], self.cfg.GetVGName())
1774
    if not nodeinfo:
1775
      raise errors.OpExecError, ("Could not contact node %s for infos" %
1776
                                 (node_current))
1777

    
1778
    freememory = nodeinfo[node_current]['memory_free']
1779
    memory = instance.memory
1780
    if memory > freememory:
1781
      raise errors.OpExecError, ("Not enough memory to start instance"
1782
                                 " %s on node %s"
1783
                                 " needed %s MiB, available %s MiB" %
1784
                                 (instance.name, node_current, memory,
1785
                                  freememory))
1786

    
1787
    _StartInstanceDisks(self.cfg, instance, force)
1788

    
1789
    if not rpc.call_instance_start(node_current, instance, extra_args):
1790
      _ShutdownInstanceDisks(instance, self.cfg)
1791
      raise errors.OpExecError, ("Could not start instance")
1792

    
1793
    self.cfg.MarkInstanceUp(instance.name)
1794

    
1795

    
1796
class LUShutdownInstance(LogicalUnit):
1797
  """Shutdown an instance.
1798

1799
  """
1800
  HPATH = "instance-stop"
1801
  HTYPE = constants.HTYPE_INSTANCE
1802
  _OP_REQP = ["instance_name"]
1803

    
1804
  def BuildHooksEnv(self):
1805
    """Build hooks env.
1806

1807
    This runs on master, primary and secondary nodes of the instance.
1808

1809
    """
1810
    env = {
1811
      "INSTANCE_NAME": self.op.instance_name,
1812
      "INSTANCE_PRIMARY": self.instance.primary_node,
1813
      "INSTANCE_SECONDARIES": " ".join(self.instance.secondary_nodes),
1814
      }
1815
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
1816
          list(self.instance.secondary_nodes))
1817
    return env, nl, nl
1818

    
1819
  def CheckPrereq(self):
1820
    """Check prerequisites.
1821

1822
    This checks that the instance is in the cluster.
1823

1824
    """
1825
    instance = self.cfg.GetInstanceInfo(
1826
      self.cfg.ExpandInstanceName(self.op.instance_name))
1827
    if instance is None:
1828
      raise errors.OpPrereqError, ("Instance '%s' not known" %
1829
                                   self.op.instance_name)
1830
    self.instance = instance
1831

    
1832
  def Exec(self, feedback_fn):
1833
    """Shutdown the instance.
1834

1835
    """
1836
    instance = self.instance
1837
    node_current = instance.primary_node
1838
    if not rpc.call_instance_shutdown(node_current, instance):
1839
      logger.Error("could not shutdown instance")
1840

    
1841
    self.cfg.MarkInstanceDown(instance.name)
1842
    _ShutdownInstanceDisks(instance, self.cfg)
1843

    
1844

    
1845
class LUReinstallInstance(LogicalUnit):
1846
  """Reinstall an instance.
1847

1848
  """
1849
  HPATH = "instance-reinstall"
1850
  HTYPE = constants.HTYPE_INSTANCE
1851
  _OP_REQP = ["instance_name"]
1852

    
1853
  def BuildHooksEnv(self):
1854
    """Build hooks env.
1855

1856
    This runs on master, primary and secondary nodes of the instance.
1857

1858
    """
1859
    env = {
1860
      "INSTANCE_NAME": self.op.instance_name,
1861
      "INSTANCE_PRIMARY": self.instance.primary_node,
1862
      "INSTANCE_SECONDARIES": " ".join(self.instance.secondary_nodes),
1863
      }
1864
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
1865
          list(self.instance.secondary_nodes))
1866
    return env, nl, nl
1867

    
1868
  def CheckPrereq(self):
1869
    """Check prerequisites.
1870

1871
    This checks that the instance is in the cluster and is not running.
1872

1873
    """
1874
    instance = self.cfg.GetInstanceInfo(
1875
      self.cfg.ExpandInstanceName(self.op.instance_name))
1876
    if instance is None:
1877
      raise errors.OpPrereqError, ("Instance '%s' not known" %
1878
                                   self.op.instance_name)
1879
    if instance.disk_template == constants.DT_DISKLESS:
1880
      raise errors.OpPrereqError, ("Instance '%s' has no disks" %
1881
                                   self.op.instance_name)
1882
    if instance.status != "down":
1883
      raise errors.OpPrereqError, ("Instance '%s' is marked to be up" %
1884
                                   self.op.instance_name)
1885
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
1886
    if remote_info:
1887
      raise errors.OpPrereqError, ("Instance '%s' is running on the node %s" %
1888
                                   (self.op.instance_name,
1889
                                    instance.primary_node))
1890
    self.instance = instance
1891

    
1892
  def Exec(self, feedback_fn):
1893
    """Reinstall the instance.
1894

1895
    """
1896
    inst = self.instance
1897

    
1898
    _StartInstanceDisks(self.cfg, inst, None)
1899
    try:
1900
      feedback_fn("Running the instance OS create scripts...")
1901
      if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
1902
        raise errors.OpExecError, ("Could not install OS for instance %s "
1903
                                   "on node %s" %
1904
                                   (inst.name, inst.primary_node))
1905
    finally:
1906
      _ShutdownInstanceDisks(inst, self.cfg)
1907

    
1908

    
1909
class LURemoveInstance(LogicalUnit):
1910
  """Remove an instance.
1911

1912
  """
1913
  HPATH = "instance-remove"
1914
  HTYPE = constants.HTYPE_INSTANCE
1915
  _OP_REQP = ["instance_name"]
1916

    
1917
  def BuildHooksEnv(self):
1918
    """Build hooks env.
1919

1920
    This runs on master, primary and secondary nodes of the instance.
1921

1922
    """
1923
    env = {
1924
      "INSTANCE_NAME": self.op.instance_name,
1925
      "INSTANCE_PRIMARY": self.instance.primary_node,
1926
      "INSTANCE_SECONDARIES": " ".join(self.instance.secondary_nodes),
1927
      }
1928
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
1929
          list(self.instance.secondary_nodes))
1930
    return env, nl, nl
1931

    
1932
  def CheckPrereq(self):
1933
    """Check prerequisites.
1934

1935
    This checks that the instance is in the cluster.
1936

1937
    """
1938
    instance = self.cfg.GetInstanceInfo(
1939
      self.cfg.ExpandInstanceName(self.op.instance_name))
1940
    if instance is None:
1941
      raise errors.OpPrereqError, ("Instance '%s' not known" %
1942
                                   self.op.instance_name)
1943
    self.instance = instance
1944

    
1945
  def Exec(self, feedback_fn):
1946
    """Remove the instance.
1947

1948
    """
1949
    instance = self.instance
1950
    logger.Info("shutting down instance %s on node %s" %
1951
                (instance.name, instance.primary_node))
1952

    
1953
    if not rpc.call_instance_shutdown(instance.primary_node, instance):
1954
      raise errors.OpExecError, ("Could not shutdown instance %s on node %s" %
1955
                                 (instance.name, instance.primary_node))
1956

    
1957
    logger.Info("removing block devices for instance %s" % instance.name)
1958

    
1959
    _RemoveDisks(instance, self.cfg)
1960

    
1961
    logger.Info("removing instance %s out of cluster config" % instance.name)
1962

    
1963
    self.cfg.RemoveInstance(instance.name)
1964

    
1965

    
1966
class LUQueryInstances(NoHooksLU):
1967
  """Logical unit for querying instances.
1968

1969
  """
1970
  _OP_REQP = ["output_fields"]
1971

    
1972
  def CheckPrereq(self):
1973
    """Check prerequisites.
1974

1975
    This checks that the fields required are valid output fields.
1976

1977
    """
1978
    self.dynamic_fields = frozenset(["oper_state", "oper_ram"])
1979
    _CheckOutputFields(static=["name", "os", "pnode", "snodes",
1980
                               "admin_state", "admin_ram",
1981
                               "disk_template", "ip", "mac", "bridge"],
1982
                       dynamic=self.dynamic_fields,
1983
                       selected=self.op.output_fields)
1984

    
1985
  def Exec(self, feedback_fn):
1986
    """Computes the list of nodes and their attributes.
1987

1988
    """
1989
    instance_names = utils.NiceSort(self.cfg.GetInstanceList())
1990
    instance_list = [self.cfg.GetInstanceInfo(iname) for iname
1991
                     in instance_names]
1992

    
1993
    # begin data gathering
1994

    
1995
    nodes = frozenset([inst.primary_node for inst in instance_list])
1996

    
1997
    bad_nodes = []
1998
    if self.dynamic_fields.intersection(self.op.output_fields):
1999
      live_data = {}
2000
      node_data = rpc.call_all_instances_info(nodes)
2001
      for name in nodes:
2002
        result = node_data[name]
2003
        if result:
2004
          live_data.update(result)
2005
        elif result == False:
2006
          bad_nodes.append(name)
2007
        # else no instance is alive
2008
    else:
2009
      live_data = dict([(name, {}) for name in instance_names])
2010

    
2011
    # end data gathering
2012

    
2013
    output = []
2014
    for instance in instance_list:
2015
      iout = []
2016
      for field in self.op.output_fields:
2017
        if field == "name":
2018
          val = instance.name
2019
        elif field == "os":
2020
          val = instance.os
2021
        elif field == "pnode":
2022
          val = instance.primary_node
2023
        elif field == "snodes":
2024
          val = ",".join(instance.secondary_nodes) or "-"
2025
        elif field == "admin_state":
2026
          if instance.status == "down":
2027
            val = "no"
2028
          else:
2029
            val = "yes"
2030
        elif field == "oper_state":
2031
          if instance.primary_node in bad_nodes:
2032
            val = "(node down)"
2033
          else:
2034
            if live_data.get(instance.name):
2035
              val = "running"
2036
            else:
2037
              val = "stopped"
2038
        elif field == "admin_ram":
2039
          val = instance.memory
2040
        elif field == "oper_ram":
2041
          if instance.primary_node in bad_nodes:
2042
            val = "(node down)"
2043
          elif instance.name in live_data:
2044
            val = live_data[instance.name].get("memory", "?")
2045
          else:
2046
            val = "-"
2047
        elif field == "disk_template":
2048
          val = instance.disk_template
2049
        elif field == "ip":
2050
          val = instance.nics[0].ip
2051
        elif field == "bridge":
2052
          val = instance.nics[0].bridge
2053
        elif field == "mac":
2054
          val = instance.nics[0].mac
2055
        else:
2056
          raise errors.ParameterError, field
2057
        val = str(val)
2058
        iout.append(val)
2059
      output.append(iout)
2060

    
2061
    return output
2062

    
2063

    
2064
class LUFailoverInstance(LogicalUnit):
2065
  """Failover an instance.
2066

2067
  """
2068
  HPATH = "instance-failover"
2069
  HTYPE = constants.HTYPE_INSTANCE
2070
  _OP_REQP = ["instance_name", "ignore_consistency"]
2071

    
2072
  def BuildHooksEnv(self):
2073
    """Build hooks env.
2074

2075
    This runs on master, primary and secondary nodes of the instance.
2076

2077
    """
2078
    env = {
2079
      "INSTANCE_NAME": self.op.instance_name,
2080
      "INSTANCE_PRIMARY": self.instance.primary_node,
2081
      "INSTANCE_SECONDARIES": " ".join(self.instance.secondary_nodes),
2082
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2083
      }
2084
    nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2085
    return env, nl, nl
2086

    
2087
  def CheckPrereq(self):
2088
    """Check prerequisites.
2089

2090
    This checks that the instance is in the cluster.
2091

2092
    """
2093
    instance = self.cfg.GetInstanceInfo(
2094
      self.cfg.ExpandInstanceName(self.op.instance_name))
2095
    if instance is None:
2096
      raise errors.OpPrereqError, ("Instance '%s' not known" %
2097
                                   self.op.instance_name)
2098

    
2099
    # check memory requirements on the secondary node
2100
    target_node = instance.secondary_nodes[0]
2101
    nodeinfo = rpc.call_node_info([target_node], self.cfg.GetVGName())
2102
    info = nodeinfo.get(target_node, None)
2103
    if not info:
2104
      raise errors.OpPrereqError, ("Cannot get current information"
2105
                                   " from node '%s'" % nodeinfo)
2106
    if instance.memory > info['memory_free']:
2107
      raise errors.OpPrereqError, ("Not enough memory on target node %s."
2108
                                   " %d MB available, %d MB required" %
2109
                                   (target_node, info['memory_free'],
2110
                                    instance.memory))
2111

    
2112
    # check bridge existance
2113
    brlist = [nic.bridge for nic in instance.nics]
2114
    if not rpc.call_bridges_exist(instance.primary_node, brlist):
2115
      raise errors.OpPrereqError, ("one or more target bridges %s does not"
2116
                                   " exist on destination node '%s'" %
2117
                                   (brlist, instance.primary_node))
2118

    
2119
    self.instance = instance
2120

    
2121
  def Exec(self, feedback_fn):
2122
    """Failover an instance.
2123

2124
    The failover is done by shutting it down on its present node and
2125
    starting it on the secondary.
2126

2127
    """
2128
    instance = self.instance
2129

    
2130
    source_node = instance.primary_node
2131
    target_node = instance.secondary_nodes[0]
2132

    
2133
    feedback_fn("* checking disk consistency between source and target")
2134
    for dev in instance.disks:
2135
      # for remote_raid1, these are md over drbd
2136
      if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2137
        if not self.op.ignore_consistency:
2138
          raise errors.OpExecError, ("Disk %s is degraded on target node,"
2139
                                     " aborting failover." % dev.iv_name)
2140

    
2141
    feedback_fn("* checking target node resource availability")
2142
    nodeinfo = rpc.call_node_info([target_node], self.cfg.GetVGName())
2143

    
2144
    if not nodeinfo:
2145
      raise errors.OpExecError, ("Could not contact target node %s." %
2146
                                 target_node)
2147

    
2148
    free_memory = int(nodeinfo[target_node]['memory_free'])
2149
    memory = instance.memory
2150
    if memory > free_memory:
2151
      raise errors.OpExecError, ("Not enough memory to create instance %s on"
2152
                                 " node %s. needed %s MiB, available %s MiB" %
2153
                                 (instance.name, target_node, memory,
2154
                                  free_memory))
2155

    
2156
    feedback_fn("* shutting down instance on source node")
2157
    logger.Info("Shutting down instance %s on node %s" %
2158
                (instance.name, source_node))
2159

    
2160
    if not rpc.call_instance_shutdown(source_node, instance):
2161
      logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2162
                   " anyway. Please make sure node %s is down"  %
2163
                   (instance.name, source_node, source_node))
2164

    
2165
    feedback_fn("* deactivating the instance's disks on source node")
2166
    if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2167
      raise errors.OpExecError, ("Can't shut down the instance's disks.")
2168

    
2169
    instance.primary_node = target_node
2170
    # distribute new instance config to the other nodes
2171
    self.cfg.AddInstance(instance)
2172

    
2173
    feedback_fn("* activating the instance's disks on target node")
2174
    logger.Info("Starting instance %s on node %s" %
2175
                (instance.name, target_node))
2176

    
2177
    disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2178
                                             ignore_secondaries=True)
2179
    if not disks_ok:
2180
      _ShutdownInstanceDisks(instance, self.cfg)
2181
      raise errors.OpExecError, ("Can't activate the instance's disks")
2182

    
2183
    feedback_fn("* starting the instance on the target node")
2184
    if not rpc.call_instance_start(target_node, instance, None):
2185
      _ShutdownInstanceDisks(instance, self.cfg)
2186
      raise errors.OpExecError("Could not start instance %s on node %s." %
2187
                               (instance.name, target_node))
2188

    
2189

    
2190
def _CreateBlockDevOnPrimary(cfg, node, device):
2191
  """Create a tree of block devices on the primary node.
2192

2193
  This always creates all devices.
2194

2195
  """
2196
  if device.children:
2197
    for child in device.children:
2198
      if not _CreateBlockDevOnPrimary(cfg, node, child):
2199
        return False
2200

    
2201
  cfg.SetDiskID(device, node)
2202
  new_id = rpc.call_blockdev_create(node, device, device.size, True)
2203
  if not new_id:
2204
    return False
2205
  if device.physical_id is None:
2206
    device.physical_id = new_id
2207
  return True
2208

    
2209

    
2210
def _CreateBlockDevOnSecondary(cfg, node, device, force):
2211
  """Create a tree of block devices on a secondary node.
2212

2213
  If this device type has to be created on secondaries, create it and
2214
  all its children.
2215

2216
  If not, just recurse to children keeping the same 'force' value.
2217

2218
  """
2219
  if device.CreateOnSecondary():
2220
    force = True
2221
  if device.children:
2222
    for child in device.children:
2223
      if not _CreateBlockDevOnSecondary(cfg, node, child, force):
2224
        return False
2225

    
2226
  if not force:
2227
    return True
2228
  cfg.SetDiskID(device, node)
2229
  new_id = rpc.call_blockdev_create(node, device, device.size, False)
2230
  if not new_id:
2231
    return False
2232
  if device.physical_id is None:
2233
    device.physical_id = new_id
2234
  return True
2235

    
2236

    
2237
def _GenerateMDDRBDBranch(cfg, vgname, primary, secondary, size, base):
2238
  """Generate a drbd device complete with its children.
2239

2240
  """
2241
  port = cfg.AllocatePort()
2242
  base = "%s_%s" % (base, port)
2243
  dev_data = objects.Disk(dev_type="lvm", size=size,
2244
                          logical_id=(vgname, "%s.data" % base))
2245
  dev_meta = objects.Disk(dev_type="lvm", size=128,
2246
                          logical_id=(vgname, "%s.meta" % base))
2247
  drbd_dev = objects.Disk(dev_type="drbd", size=size,
2248
                          logical_id = (primary, secondary, port),
2249
                          children = [dev_data, dev_meta])
2250
  return drbd_dev
2251

    
2252

    
2253
def _GenerateDiskTemplate(cfg, vgname, template_name,
2254
                          instance_name, primary_node,
2255
                          secondary_nodes, disk_sz, swap_sz):
2256
  """Generate the entire disk layout for a given template type.
2257

2258
  """
2259
  #TODO: compute space requirements
2260

    
2261
  if template_name == "diskless":
2262
    disks = []
2263
  elif template_name == "plain":
2264
    if len(secondary_nodes) != 0:
2265
      raise errors.ProgrammerError("Wrong template configuration")
2266
    sda_dev = objects.Disk(dev_type="lvm", size=disk_sz,
2267
                           logical_id=(vgname, "%s.os" % instance_name),
2268
                           iv_name = "sda")
2269
    sdb_dev = objects.Disk(dev_type="lvm", size=swap_sz,
2270
                           logical_id=(vgname, "%s.swap" % instance_name),
2271
                           iv_name = "sdb")
2272
    disks = [sda_dev, sdb_dev]
2273
  elif template_name == "local_raid1":
2274
    if len(secondary_nodes) != 0:
2275
      raise errors.ProgrammerError("Wrong template configuration")
2276
    sda_dev_m1 = objects.Disk(dev_type="lvm", size=disk_sz,
2277
                              logical_id=(vgname, "%s.os_m1" % instance_name))
2278
    sda_dev_m2 = objects.Disk(dev_type="lvm", size=disk_sz,
2279
                              logical_id=(vgname, "%s.os_m2" % instance_name))
2280
    md_sda_dev = objects.Disk(dev_type="md_raid1", iv_name = "sda",
2281
                              size=disk_sz,
2282
                              children = [sda_dev_m1, sda_dev_m2])
2283
    sdb_dev_m1 = objects.Disk(dev_type="lvm", size=swap_sz,
2284
                              logical_id=(vgname, "%s.swap_m1" %
2285
                                          instance_name))
2286
    sdb_dev_m2 = objects.Disk(dev_type="lvm", size=swap_sz,
2287
                              logical_id=(vgname, "%s.swap_m2" %
2288
                                          instance_name))
2289
    md_sdb_dev = objects.Disk(dev_type="md_raid1", iv_name = "sdb",
2290
                              size=swap_sz,
2291
                              children = [sdb_dev_m1, sdb_dev_m2])
2292
    disks = [md_sda_dev, md_sdb_dev]
2293
  elif template_name == "remote_raid1":
2294
    if len(secondary_nodes) != 1:
2295
      raise errors.ProgrammerError("Wrong template configuration")
2296
    remote_node = secondary_nodes[0]
2297
    drbd_sda_dev = _GenerateMDDRBDBranch(cfg, vgname,
2298
                                         primary_node, remote_node, disk_sz,
2299
                                         "%s-sda" % instance_name)
2300
    md_sda_dev = objects.Disk(dev_type="md_raid1", iv_name="sda",
2301
                              children = [drbd_sda_dev], size=disk_sz)
2302
    drbd_sdb_dev = _GenerateMDDRBDBranch(cfg, vgname,
2303
                                         primary_node, remote_node, swap_sz,
2304
                                         "%s-sdb" % instance_name)
2305
    md_sdb_dev = objects.Disk(dev_type="md_raid1", iv_name="sdb",
2306
                              children = [drbd_sdb_dev], size=swap_sz)
2307
    disks = [md_sda_dev, md_sdb_dev]
2308
  else:
2309
    raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
2310
  return disks
2311

    
2312

    
2313
def _CreateDisks(cfg, instance):
2314
  """Create all disks for an instance.
2315

2316
  This abstracts away some work from AddInstance.
2317

2318
  Args:
2319
    instance: the instance object
2320

2321
  Returns:
2322
    True or False showing the success of the creation process
2323

2324
  """
2325
  for device in instance.disks:
2326
    logger.Info("creating volume %s for instance %s" %
2327
              (device.iv_name, instance.name))
2328
    #HARDCODE
2329
    for secondary_node in instance.secondary_nodes:
2330
      if not _CreateBlockDevOnSecondary(cfg, secondary_node, device, False):
2331
        logger.Error("failed to create volume %s (%s) on secondary node %s!" %
2332
                     (device.iv_name, device, secondary_node))
2333
        return False
2334
    #HARDCODE
2335
    if not _CreateBlockDevOnPrimary(cfg, instance.primary_node, device):
2336
      logger.Error("failed to create volume %s on primary!" %
2337
                   device.iv_name)
2338
      return False
2339
  return True
2340

    
2341

    
2342
def _RemoveDisks(instance, cfg):
2343
  """Remove all disks for an instance.
2344

2345
  This abstracts away some work from `AddInstance()` and
2346
  `RemoveInstance()`. Note that in case some of the devices couldn't
2347
  be remove, the removal will continue with the other ones (compare
2348
  with `_CreateDisks()`).
2349

2350
  Args:
2351
    instance: the instance object
2352

2353
  Returns:
2354
    True or False showing the success of the removal proces
2355

2356
  """
2357
  logger.Info("removing block devices for instance %s" % instance.name)
2358

    
2359
  result = True
2360
  for device in instance.disks:
2361
    for node, disk in device.ComputeNodeTree(instance.primary_node):
2362
      cfg.SetDiskID(disk, node)
2363
      if not rpc.call_blockdev_remove(node, disk):
2364
        logger.Error("could not remove block device %s on node %s,"
2365
                     " continuing anyway" %
2366
                     (device.iv_name, node))
2367
        result = False
2368
  return result
2369

    
2370

    
2371
class LUCreateInstance(LogicalUnit):
2372
  """Create an instance.
2373

2374
  """
2375
  HPATH = "instance-add"
2376
  HTYPE = constants.HTYPE_INSTANCE
2377
  _OP_REQP = ["instance_name", "mem_size", "disk_size", "pnode",
2378
              "disk_template", "swap_size", "mode", "start", "vcpus",
2379
              "wait_for_sync"]
2380

    
2381
  def BuildHooksEnv(self):
2382
    """Build hooks env.
2383

2384
    This runs on master, primary and secondary nodes of the instance.
2385

2386
    """
2387
    env = {
2388
      "INSTANCE_NAME": self.op.instance_name,
2389
      "INSTANCE_PRIMARY": self.op.pnode,
2390
      "INSTANCE_SECONDARIES": " ".join(self.secondaries),
2391
      "DISK_TEMPLATE": self.op.disk_template,
2392
      "MEM_SIZE": self.op.mem_size,
2393
      "DISK_SIZE": self.op.disk_size,
2394
      "SWAP_SIZE": self.op.swap_size,
2395
      "VCPUS": self.op.vcpus,
2396
      "BRIDGE": self.op.bridge,
2397
      "INSTANCE_ADD_MODE": self.op.mode,
2398
      }
2399
    if self.op.mode == constants.INSTANCE_IMPORT:
2400
      env["SRC_NODE"] = self.op.src_node
2401
      env["SRC_PATH"] = self.op.src_path
2402
      env["SRC_IMAGE"] = self.src_image
2403
    if self.inst_ip:
2404
      env["INSTANCE_IP"] = self.inst_ip
2405

    
2406
    nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
2407
          self.secondaries)
2408
    return env, nl, nl
2409

    
2410

    
2411
  def CheckPrereq(self):
2412
    """Check prerequisites.
2413

2414
    """
2415
    if self.op.mode not in (constants.INSTANCE_CREATE,
2416
                            constants.INSTANCE_IMPORT):
2417
      raise errors.OpPrereqError, ("Invalid instance creation mode '%s'" %
2418
                                   self.op.mode)
2419

    
2420
    if self.op.mode == constants.INSTANCE_IMPORT:
2421
      src_node = getattr(self.op, "src_node", None)
2422
      src_path = getattr(self.op, "src_path", None)
2423
      if src_node is None or src_path is None:
2424
        raise errors.OpPrereqError, ("Importing an instance requires source"
2425
                                     " node and path options")
2426
      src_node_full = self.cfg.ExpandNodeName(src_node)
2427
      if src_node_full is None:
2428
        raise errors.OpPrereqError, ("Unknown source node '%s'" % src_node)
2429
      self.op.src_node = src_node = src_node_full
2430

    
2431
      if not os.path.isabs(src_path):
2432
        raise errors.OpPrereqError, ("The source path must be absolute")
2433

    
2434
      export_info = rpc.call_export_info(src_node, src_path)
2435

    
2436
      if not export_info:
2437
        raise errors.OpPrereqError, ("No export found in dir %s" % src_path)
2438

    
2439
      if not export_info.has_section(constants.INISECT_EXP):
2440
        raise errors.ProgrammerError, ("Corrupted export config")
2441

    
2442
      ei_version = export_info.get(constants.INISECT_EXP, 'version')
2443
      if (int(ei_version) != constants.EXPORT_VERSION):
2444
        raise errors.OpPrereqError, ("Wrong export version %s (wanted %d)" %
2445
                                     (ei_version, constants.EXPORT_VERSION))
2446

    
2447
      if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
2448
        raise errors.OpPrereqError, ("Can't import instance with more than"
2449
                                     " one data disk")
2450

    
2451
      # FIXME: are the old os-es, disk sizes, etc. useful?
2452
      self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
2453
      diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
2454
                                                         'disk0_dump'))
2455
      self.src_image = diskimage
2456
    else: # INSTANCE_CREATE
2457
      if getattr(self.op, "os_type", None) is None:
2458
        raise errors.OpPrereqError, ("No guest OS specified")
2459

    
2460
    # check primary node
2461
    pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
2462
    if pnode is None:
2463
      raise errors.OpPrereqError, ("Primary node '%s' is unknown" %
2464
                                   self.op.pnode)
2465
    self.op.pnode = pnode.name
2466
    self.pnode = pnode
2467
    self.secondaries = []
2468
    # disk template and mirror node verification
2469
    if self.op.disk_template not in constants.DISK_TEMPLATES:
2470
      raise errors.OpPrereqError, ("Invalid disk template name")
2471

    
2472
    if self.op.disk_template == constants.DT_REMOTE_RAID1:
2473
      if getattr(self.op, "snode", None) is None:
2474
        raise errors.OpPrereqError, ("The 'remote_raid1' disk template needs"
2475
                                     " a mirror node")
2476

    
2477
      snode_name = self.cfg.ExpandNodeName(self.op.snode)
2478
      if snode_name is None:
2479
        raise errors.OpPrereqError, ("Unknown secondary node '%s'" %
2480
                                     self.op.snode)
2481
      elif snode_name == pnode.name:
2482
        raise errors.OpPrereqError, ("The secondary node cannot be"
2483
                                     " the primary node.")
2484
      self.secondaries.append(snode_name)
2485

    
2486
    # Check lv size requirements
2487
    nodenames = [pnode.name] + self.secondaries
2488
    nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
2489

    
2490
    # Required free disk space as a function of disk and swap space
2491
    req_size_dict = {
2492
      constants.DT_DISKLESS: 0,
2493
      constants.DT_PLAIN: self.op.disk_size + self.op.swap_size,
2494
      constants.DT_LOCAL_RAID1: (self.op.disk_size + self.op.swap_size) * 2,
2495
      # 256 MB are added for drbd metadata, 128MB for each drbd device
2496
      constants.DT_REMOTE_RAID1: self.op.disk_size + self.op.swap_size + 256,
2497
    }
2498

    
2499
    if self.op.disk_template not in req_size_dict:
2500
      raise errors.ProgrammerError, ("Disk template '%s' size requirement"
2501
                                     " is unknown" %  self.op.disk_template)
2502

    
2503
    req_size = req_size_dict[self.op.disk_template]
2504

    
2505
    for node in nodenames:
2506
      info = nodeinfo.get(node, None)
2507
      if not info:
2508
        raise errors.OpPrereqError, ("Cannot get current information"
2509
                                     " from node '%s'" % nodeinfo)
2510
      if req_size > info['vg_free']:
2511
        raise errors.OpPrereqError, ("Not enough disk space on target node %s."
2512
                                     " %d MB available, %d MB required" %
2513
                                     (node, info['vg_free'], req_size))
2514

    
2515
    # os verification
2516
    os_obj = rpc.call_os_get([pnode.name], self.op.os_type)[pnode.name]
2517
    if not isinstance(os_obj, objects.OS):
2518
      raise errors.OpPrereqError, ("OS '%s' not in supported os list for"
2519
                                   " primary node"  % self.op.os_type)
2520

    
2521
    # instance verification
2522
    hostname1 = utils.LookupHostname(self.op.instance_name)
2523
    if not hostname1:
2524
      raise errors.OpPrereqError, ("Instance name '%s' not found in dns" %
2525
                                   self.op.instance_name)
2526

    
2527
    self.op.instance_name = instance_name = hostname1['hostname']
2528
    instance_list = self.cfg.GetInstanceList()
2529
    if instance_name in instance_list:
2530
      raise errors.OpPrereqError, ("Instance '%s' is already in the cluster" %
2531
                                   instance_name)
2532

    
2533
    ip = getattr(self.op, "ip", None)
2534
    if ip is None or ip.lower() == "none":
2535
      inst_ip = None
2536
    elif ip.lower() == "auto":
2537
      inst_ip = hostname1['ip']
2538
    else:
2539
      if not utils.IsValidIP(ip):
2540
        raise errors.OpPrereqError, ("given IP address '%s' doesn't look"
2541
                                     " like a valid IP" % ip)
2542
      inst_ip = ip
2543
    self.inst_ip = inst_ip
2544

    
2545
    command = ["fping", "-q", hostname1['ip']]
2546
    result = utils.RunCmd(command)
2547
    if not result.failed:
2548
      raise errors.OpPrereqError, ("IP %s of instance %s already in use" %
2549
                                   (hostname1['ip'], instance_name))
2550

    
2551
    # bridge verification
2552
    bridge = getattr(self.op, "bridge", None)
2553
    if bridge is None:
2554
      self.op.bridge = self.cfg.GetDefBridge()
2555
    else:
2556
      self.op.bridge = bridge
2557

    
2558
    if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
2559
      raise errors.OpPrereqError, ("target bridge '%s' does not exist on"
2560
                                   " destination node '%s'" %
2561
                                   (self.op.bridge, pnode.name))
2562

    
2563
    if self.op.start:
2564
      self.instance_status = 'up'
2565
    else:
2566
      self.instance_status = 'down'
2567

    
2568
  def Exec(self, feedback_fn):
2569
    """Create and add the instance to the cluster.
2570

2571
    """
2572
    instance = self.op.instance_name
2573
    pnode_name = self.pnode.name
2574

    
2575
    nic = objects.NIC(bridge=self.op.bridge, mac=self.cfg.GenerateMAC())
2576
    if self.inst_ip is not None:
2577
      nic.ip = self.inst_ip
2578

    
2579
    disks = _GenerateDiskTemplate(self.cfg, self.cfg.GetVGName(),
2580
                                  self.op.disk_template,
2581
                                  instance, pnode_name,
2582
                                  self.secondaries, self.op.disk_size,
2583
                                  self.op.swap_size)
2584

    
2585
    iobj = objects.Instance(name=instance, os=self.op.os_type,
2586
                            primary_node=pnode_name,
2587
                            memory=self.op.mem_size,
2588
                            vcpus=self.op.vcpus,
2589
                            nics=[nic], disks=disks,
2590
                            disk_template=self.op.disk_template,
2591
                            status=self.instance_status,
2592
                            )
2593

    
2594
    feedback_fn("* creating instance disks...")
2595
    if not _CreateDisks(self.cfg, iobj):
2596
      _RemoveDisks(iobj, self.cfg)
2597
      raise errors.OpExecError, ("Device creation failed, reverting...")
2598

    
2599
    feedback_fn("adding instance %s to cluster config" % instance)
2600

    
2601
    self.cfg.AddInstance(iobj)
2602

    
2603
    if self.op.wait_for_sync:
2604
      disk_abort = not _WaitForSync(self.cfg, iobj)
2605
    elif iobj.disk_template == "remote_raid1":
2606
      # make sure the disks are not degraded (still sync-ing is ok)
2607
      time.sleep(15)
2608
      feedback_fn("* checking mirrors status")
2609
      disk_abort = not _WaitForSync(self.cfg, iobj, oneshot=True)
2610
    else:
2611
      disk_abort = False
2612

    
2613
    if disk_abort:
2614
      _RemoveDisks(iobj, self.cfg)
2615
      self.cfg.RemoveInstance(iobj.name)
2616
      raise errors.OpExecError, ("There are some degraded disks for"
2617
                                      " this instance")
2618

    
2619
    feedback_fn("creating os for instance %s on node %s" %
2620
                (instance, pnode_name))
2621

    
2622
    if iobj.disk_template != constants.DT_DISKLESS:
2623
      if self.op.mode == constants.INSTANCE_CREATE:
2624
        feedback_fn("* running the instance OS create scripts...")
2625
        if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
2626
          raise errors.OpExecError, ("could not add os for instance %s"
2627
                                          " on node %s" %
2628
                                          (instance, pnode_name))
2629

    
2630
      elif self.op.mode == constants.INSTANCE_IMPORT:
2631
        feedback_fn("* running the instance OS import scripts...")
2632
        src_node = self.op.src_node
2633
        src_image = self.src_image
2634
        if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
2635
                                                src_node, src_image):
2636
          raise errors.OpExecError, ("Could not import os for instance"
2637
                                          " %s on node %s" %
2638
                                          (instance, pnode_name))
2639
      else:
2640
        # also checked in the prereq part
2641
        raise errors.ProgrammerError, ("Unknown OS initialization mode '%s'"
2642
                                       % self.op.mode)
2643

    
2644
    if self.op.start:
2645
      logger.Info("starting instance %s on node %s" % (instance, pnode_name))
2646
      feedback_fn("* starting instance...")
2647
      if not rpc.call_instance_start(pnode_name, iobj, None):
2648
        raise errors.OpExecError, ("Could not start instance")
2649

    
2650

    
2651
class LUConnectConsole(NoHooksLU):
2652
  """Connect to an instance's console.
2653

2654
  This is somewhat special in that it returns the command line that
2655
  you need to run on the master node in order to connect to the
2656
  console.
2657

2658
  """
2659
  _OP_REQP = ["instance_name"]
2660

    
2661
  def CheckPrereq(self):
2662
    """Check prerequisites.
2663

2664
    This checks that the instance is in the cluster.
2665

2666
    """
2667
    instance = self.cfg.GetInstanceInfo(
2668
      self.cfg.ExpandInstanceName(self.op.instance_name))
2669
    if instance is None:
2670
      raise errors.OpPrereqError, ("Instance '%s' not known" %
2671
                                   self.op.instance_name)
2672
    self.instance = instance
2673

    
2674
  def Exec(self, feedback_fn):
2675
    """Connect to the console of an instance
2676

2677
    """
2678
    instance = self.instance
2679
    node = instance.primary_node
2680

    
2681
    node_insts = rpc.call_instance_list([node])[node]
2682
    if node_insts is False:
2683
      raise errors.OpExecError, ("Can't connect to node %s." % node)
2684

    
2685
    if instance.name not in node_insts:
2686
      raise errors.OpExecError, ("Instance %s is not running." % instance.name)
2687

    
2688
    logger.Debug("connecting to console of %s on %s" % (instance.name, node))
2689

    
2690
    hyper = hypervisor.GetHypervisor()
2691
    console_cmd = hyper.GetShellCommandForConsole(instance.name)
2692
    return node, console_cmd
2693

    
2694

    
2695
class LUAddMDDRBDComponent(LogicalUnit):
2696
  """Adda new mirror member to an instance's disk.
2697

2698
  """
2699
  HPATH = "mirror-add"
2700
  HTYPE = constants.HTYPE_INSTANCE
2701
  _OP_REQP = ["instance_name", "remote_node", "disk_name"]
2702

    
2703
  def BuildHooksEnv(self):
2704
    """Build hooks env.
2705

2706
    This runs on the master, the primary and all the secondaries.
2707

2708
    """
2709
    env = {
2710
      "INSTANCE_NAME": self.op.instance_name,
2711
      "NEW_SECONDARY": self.op.remote_node,
2712
      "DISK_NAME": self.op.disk_name,
2713
      }
2714
    nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
2715
          self.op.remote_node,] + list(self.instance.secondary_nodes)
2716
    return env, nl, nl
2717

    
2718
  def CheckPrereq(self):
2719
    """Check prerequisites.
2720

2721
    This checks that the instance is in the cluster.
2722

2723
    """
2724
    instance = self.cfg.GetInstanceInfo(
2725
      self.cfg.ExpandInstanceName(self.op.instance_name))
2726
    if instance is None:
2727
      raise errors.OpPrereqError, ("Instance '%s' not known" %
2728
                                   self.op.instance_name)
2729
    self.instance = instance
2730

    
2731
    remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
2732
    if remote_node is None:
2733
      raise errors.OpPrereqError, ("Node '%s' not known" % self.op.remote_node)
2734
    self.remote_node = remote_node
2735

    
2736
    if remote_node == instance.primary_node:
2737
      raise errors.OpPrereqError, ("The specified node is the primary node of"
2738
                                   " the instance.")
2739

    
2740
    if instance.disk_template != constants.DT_REMOTE_RAID1:
2741
      raise errors.OpPrereqError, ("Instance's disk layout is not"
2742
                                   " remote_raid1.")
2743
    for disk in instance.disks:
2744
      if disk.iv_name == self.op.disk_name:
2745
        break
2746
    else:
2747
      raise errors.OpPrereqError, ("Can't find this device ('%s') in the"
2748
                                   " instance." % self.op.disk_name)
2749
    if len(disk.children) > 1:
2750
      raise errors.OpPrereqError, ("The device already has two slave"
2751
                                   " devices.\n"
2752
                                   "This would create a 3-disk raid1"
2753
                                   " which we don't allow.")
2754
    self.disk = disk
2755

    
2756
  def Exec(self, feedback_fn):
2757
    """Add the mirror component
2758

2759
    """
2760
    disk = self.disk
2761
    instance = self.instance
2762

    
2763
    remote_node = self.remote_node
2764
    new_drbd = _GenerateMDDRBDBranch(self.cfg, self.cfg.GetVGName(),
2765
                                     instance.primary_node, remote_node,
2766
                                     disk.size, "%s-%s" %
2767
                                     (instance.name, self.op.disk_name))
2768

    
2769
    logger.Info("adding new mirror component on secondary")
2770
    #HARDCODE
2771
    if not _CreateBlockDevOnSecondary(self.cfg, remote_node, new_drbd, False):
2772
      raise errors.OpExecError, ("Failed to create new component on secondary"
2773
                                 " node %s" % remote_node)
2774

    
2775
    logger.Info("adding new mirror component on primary")
2776
    #HARDCODE
2777
    if not _CreateBlockDevOnPrimary(self.cfg, instance.primary_node, new_drbd):
2778
      # remove secondary dev
2779
      self.cfg.SetDiskID(new_drbd, remote_node)
2780
      rpc.call_blockdev_remove(remote_node, new_drbd)
2781
      raise errors.OpExecError, ("Failed to create volume on primary")
2782

    
2783
    # the device exists now
2784
    # call the primary node to add the mirror to md
2785
    logger.Info("adding new mirror component to md")
2786
    if not rpc.call_blockdev_addchild(instance.primary_node,
2787
                                           disk, new_drbd):
2788
      logger.Error("Can't add mirror compoment to md!")
2789
      self.cfg.SetDiskID(new_drbd, remote_node)
2790
      if not rpc.call_blockdev_remove(remote_node, new_drbd):
2791
        logger.Error("Can't rollback on secondary")
2792
      self.cfg.SetDiskID(new_drbd, instance.primary_node)
2793
      if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
2794
        logger.Error("Can't rollback on primary")
2795
      raise errors.OpExecError, "Can't add mirror component to md array"
2796

    
2797
    disk.children.append(new_drbd)
2798

    
2799
    self.cfg.AddInstance(instance)
2800

    
2801
    _WaitForSync(self.cfg, instance)
2802

    
2803
    return 0
2804

    
2805

    
2806
class LURemoveMDDRBDComponent(LogicalUnit):
2807
  """Remove a component from a remote_raid1 disk.
2808

2809
  """
2810
  HPATH = "mirror-remove"
2811
  HTYPE = constants.HTYPE_INSTANCE
2812
  _OP_REQP = ["instance_name", "disk_name", "disk_id"]
2813

    
2814
  def BuildHooksEnv(self):
2815
    """Build hooks env.
2816

2817
    This runs on the master, the primary and all the secondaries.
2818

2819
    """
2820
    env = {
2821
      "INSTANCE_NAME": self.op.instance_name,
2822
      "DISK_NAME": self.op.disk_name,
2823
      "DISK_ID": self.op.disk_id,
2824
      "OLD_SECONDARY": self.old_secondary,
2825
      }
2826
    nl = [self.sstore.GetMasterNode(),
2827
          self.instance.primary_node] + list(self.instance.secondary_nodes)
2828
    return env, nl, nl
2829

    
2830
  def CheckPrereq(self):
2831
    """Check prerequisites.
2832

2833
    This checks that the instance is in the cluster.
2834

2835
    """
2836
    instance = self.cfg.GetInstanceInfo(
2837
      self.cfg.ExpandInstanceName(self.op.instance_name))
2838
    if instance is None:
2839
      raise errors.OpPrereqError, ("Instance '%s' not known" %
2840
                                   self.op.instance_name)
2841
    self.instance = instance
2842

    
2843
    if instance.disk_template != constants.DT_REMOTE_RAID1:
2844
      raise errors.OpPrereqError, ("Instance's disk layout is not"
2845
                                   " remote_raid1.")
2846
    for disk in instance.disks:
2847
      if disk.iv_name == self.op.disk_name:
2848
        break
2849
    else:
2850
      raise errors.OpPrereqError, ("Can't find this device ('%s') in the"
2851
                                   " instance." % self.op.disk_name)
2852
    for child in disk.children:
2853
      if child.dev_type == "drbd" and child.logical_id[2] == self.op.disk_id:
2854
        break
2855
    else:
2856
      raise errors.OpPrereqError, ("Can't find the device with this port.")
2857

    
2858
    if len(disk.children) < 2:
2859
      raise errors.OpPrereqError, ("Cannot remove the last component from"
2860
                                   " a mirror.")
2861
    self.disk = disk
2862
    self.child = child
2863
    if self.child.logical_id[0] == instance.primary_node:
2864
      oid = 1
2865
    else:
2866
      oid = 0
2867
    self.old_secondary = self.child.logical_id[oid]
2868

    
2869
  def Exec(self, feedback_fn):
2870
    """Remove the mirror component
2871

2872
    """
2873
    instance = self.instance
2874
    disk = self.disk
2875
    child = self.child
2876
    logger.Info("remove mirror component")
2877
    self.cfg.SetDiskID(disk, instance.primary_node)
2878
    if not rpc.call_blockdev_removechild(instance.primary_node,
2879
                                              disk, child):
2880
      raise errors.OpExecError, ("Can't remove child from mirror.")
2881

    
2882
    for node in child.logical_id[:2]:
2883
      self.cfg.SetDiskID(child, node)
2884
      if not rpc.call_blockdev_remove(node, child):
2885
        logger.Error("Warning: failed to remove device from node %s,"
2886
                     " continuing operation." % node)
2887

    
2888
    disk.children.remove(child)
2889
    self.cfg.AddInstance(instance)
2890

    
2891

    
2892
class LUReplaceDisks(LogicalUnit):
2893
  """Replace the disks of an instance.
2894

2895
  """
2896
  HPATH = "mirrors-replace"
2897
  HTYPE = constants.HTYPE_INSTANCE
2898
  _OP_REQP = ["instance_name"]
2899

    
2900
  def BuildHooksEnv(self):
2901
    """Build hooks env.
2902

2903
    This runs on the master, the primary and all the secondaries.
2904

2905
    """
2906
    env = {
2907
      "INSTANCE_NAME": self.op.instance_name,
2908
      "NEW_SECONDARY": self.op.remote_node,
2909
      "OLD_SECONDARY": self.instance.secondary_nodes[0],
2910
      }
2911
    nl = [self.sstore.GetMasterNode(),
2912
          self.instance.primary_node] + list(self.instance.secondary_nodes)
2913
    return env, nl, nl
2914

    
2915
  def CheckPrereq(self):
2916
    """Check prerequisites.
2917

2918
    This checks that the instance is in the cluster.
2919

2920
    """
2921
    instance = self.cfg.GetInstanceInfo(
2922
      self.cfg.ExpandInstanceName(self.op.instance_name))
2923
    if instance is None:
2924
      raise errors.OpPrereqError, ("Instance '%s' not known" %
2925
                                   self.op.instance_name)
2926
    self.instance = instance
2927

    
2928
    if instance.disk_template != constants.DT_REMOTE_RAID1:
2929
      raise errors.OpPrereqError, ("Instance's disk layout is not"
2930
                                   " remote_raid1.")
2931

    
2932
    if len(instance.secondary_nodes) != 1:
2933
      raise errors.OpPrereqError, ("The instance has a strange layout,"
2934
                                   " expected one secondary but found %d" %
2935
                                   len(instance.secondary_nodes))
2936

    
2937
    remote_node = getattr(self.op, "remote_node", None)
2938
    if remote_node is None:
2939
      remote_node = instance.secondary_nodes[0]
2940
    else:
2941
      remote_node = self.cfg.ExpandNodeName(remote_node)
2942
      if remote_node is None:
2943
        raise errors.OpPrereqError, ("Node '%s' not known" %
2944
                                     self.op.remote_node)
2945
    if remote_node == instance.primary_node:
2946
      raise errors.OpPrereqError, ("The specified node is the primary node of"
2947
                                   " the instance.")
2948
    self.op.remote_node = remote_node
2949

    
2950
  def Exec(self, feedback_fn):
2951
    """Replace the disks of an instance.
2952

2953
    """
2954
    instance = self.instance
2955
    iv_names = {}
2956
    # start of work
2957
    remote_node = self.op.remote_node
2958
    cfg = self.cfg
2959
    vgname = cfg.GetVGName()
2960
    for dev in instance.disks:
2961
      size = dev.size
2962
      new_drbd = _GenerateMDDRBDBranch(cfg, vgname, instance.primary_node,
2963
                                       remote_node, size,
2964
                                       "%s-%s" % (instance.name, dev.iv_name))
2965
      iv_names[dev.iv_name] = (dev, dev.children[0], new_drbd)
2966
      logger.Info("adding new mirror component on secondary for %s" %
2967
                  dev.iv_name)
2968
      #HARDCODE
2969
      if not _CreateBlockDevOnSecondary(cfg, remote_node, new_drbd, False):
2970
        raise errors.OpExecError, ("Failed to create new component on"
2971
                                   " secondary node %s\n"
2972
                                   "Full abort, cleanup manually!" %
2973
                                   remote_node)
2974

    
2975
      logger.Info("adding new mirror component on primary")
2976
      #HARDCODE
2977
      if not _CreateBlockDevOnPrimary(cfg, instance.primary_node, new_drbd):
2978
        # remove secondary dev
2979
        cfg.SetDiskID(new_drbd, remote_node)
2980
        rpc.call_blockdev_remove(remote_node, new_drbd)
2981
        raise errors.OpExecError("Failed to create volume on primary!\n"
2982
                                 "Full abort, cleanup manually!!")
2983

    
2984
      # the device exists now
2985
      # call the primary node to add the mirror to md
2986
      logger.Info("adding new mirror component to md")
2987
      if not rpc.call_blockdev_addchild(instance.primary_node, dev,
2988
                                        new_drbd):
2989
        logger.Error("Can't add mirror compoment to md!")
2990
        cfg.SetDiskID(new_drbd, remote_node)
2991
        if not rpc.call_blockdev_remove(remote_node, new_drbd):
2992
          logger.Error("Can't rollback on secondary")
2993
        cfg.SetDiskID(new_drbd, instance.primary_node)
2994
        if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
2995
          logger.Error("Can't rollback on primary")
2996
        raise errors.OpExecError, ("Full abort, cleanup manually!!")
2997

    
2998
      dev.children.append(new_drbd)
2999
      cfg.AddInstance(instance)
3000

    
3001
    # this can fail as the old devices are degraded and _WaitForSync
3002
    # does a combined result over all disks, so we don't check its
3003
    # return value
3004
    _WaitForSync(cfg, instance, unlock=True)
3005

    
3006
    # so check manually all the devices
3007
    for name in iv_names:
3008
      dev, child, new_drbd = iv_names[name]
3009
      cfg.SetDiskID(dev, instance.primary_node)
3010
      is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3011
      if is_degr:
3012
        raise errors.OpExecError, ("MD device %s is degraded!" % name)
3013
      cfg.SetDiskID(new_drbd, instance.primary_node)
3014
      is_degr = rpc.call_blockdev_find(instance.primary_node, new_drbd)[5]
3015
      if is_degr:
3016
        raise errors.OpExecError, ("New drbd device %s is degraded!" % name)
3017

    
3018
    for name in iv_names:
3019
      dev, child, new_drbd = iv_names[name]
3020
      logger.Info("remove mirror %s component" % name)
3021
      cfg.SetDiskID(dev, instance.primary_node)
3022
      if not rpc.call_blockdev_removechild(instance.primary_node,
3023
                                                dev, child):
3024
        logger.Error("Can't remove child from mirror, aborting"
3025
                     " *this device cleanup*.\nYou need to cleanup manually!!")
3026
        continue
3027

    
3028
      for node in child.logical_id[:2]:
3029
        logger.Info("remove child device on %s" % node)
3030
        cfg.SetDiskID(child, node)
3031
        if not rpc.call_blockdev_remove(node, child):
3032
          logger.Error("Warning: failed to remove device from node %s,"
3033
                       " continuing operation." % node)
3034

    
3035
      dev.children.remove(child)
3036

    
3037
      cfg.AddInstance(instance)
3038

    
3039

    
3040
class LUQueryInstanceData(NoHooksLU):
3041
  """Query runtime instance data.
3042

3043
  """
3044
  _OP_REQP = ["instances"]
3045

    
3046
  def CheckPrereq(self):
3047
    """Check prerequisites.
3048

3049
    This only checks the optional instance list against the existing names.
3050

3051
    """
3052
    if not isinstance(self.op.instances, list):
3053
      raise errors.OpPrereqError, "Invalid argument type 'instances'"
3054
    if self.op.instances:
3055
      self.wanted_instances = []
3056
      names = self.op.instances
3057
      for name in names:
3058
        instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
3059
        if instance is None:
3060
          raise errors.OpPrereqError, ("No such instance name '%s'" % name)
3061
      self.wanted_instances.append(instance)
3062
    else:
3063
      self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
3064
                               in self.cfg.GetInstanceList()]
3065
    return
3066

    
3067

    
3068
  def _ComputeDiskStatus(self, instance, snode, dev):
3069
    """Compute block device status.
3070

3071
    """
3072
    self.cfg.SetDiskID(dev, instance.primary_node)
3073
    dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
3074
    if dev.dev_type == "drbd":
3075
      # we change the snode then (otherwise we use the one passed in)
3076
      if dev.logical_id[0] == instance.primary_node:
3077
        snode = dev.logical_id[1]
3078
      else:
3079
        snode = dev.logical_id[0]
3080

    
3081
    if snode:
3082
      self.cfg.SetDiskID(dev, snode)
3083
      dev_sstatus = rpc.call_blockdev_find(snode, dev)
3084
    else:
3085
      dev_sstatus = None
3086

    
3087
    if dev.children:
3088
      dev_children = [self._ComputeDiskStatus(instance, snode, child)
3089
                      for child in dev.children]
3090
    else:
3091
      dev_children = []
3092

    
3093
    data = {
3094
      "iv_name": dev.iv_name,
3095
      "dev_type": dev.dev_type,
3096
      "logical_id": dev.logical_id,
3097
      "physical_id": dev.physical_id,
3098
      "pstatus": dev_pstatus,
3099
      "sstatus": dev_sstatus,
3100
      "children": dev_children,
3101
      }
3102

    
3103
    return data
3104

    
3105
  def Exec(self, feedback_fn):
3106
    """Gather and return data"""
3107
    result = {}
3108
    for instance in self.wanted_instances:
3109
      remote_info = rpc.call_instance_info(instance.primary_node,
3110
                                                instance.name)
3111
      if remote_info and "state" in remote_info:
3112
        remote_state = "up"
3113
      else:
3114
        remote_state = "down"
3115
      if instance.status == "down":
3116
        config_state = "down"
3117
      else:
3118
        config_state = "up"
3119

    
3120
      disks = [self._ComputeDiskStatus(instance, None, device)
3121
               for device in instance.disks]
3122

    
3123
      idict = {
3124
        "name": instance.name,
3125
        "config_state": config_state,
3126
        "run_state": remote_state,
3127
        "pnode": instance.primary_node,
3128
        "snodes": instance.secondary_nodes,
3129
        "os": instance.os,
3130
        "memory": instance.memory,
3131
        "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
3132
        "disks": disks,
3133
        }
3134

    
3135
      result[instance.name] = idict
3136

    
3137
    return result
3138

    
3139

    
3140
class LUQueryNodeData(NoHooksLU):
3141
  """Logical unit for querying node data.
3142

3143
  """
3144
  _OP_REQP = ["nodes"]
3145

    
3146
  def CheckPrereq(self):
3147
    """Check prerequisites.
3148

3149
    This only checks the optional node list against the existing names.
3150

3151
    """
3152
    self.wanted_nodes = _GetWantedNodes(self, self.op.nodes)
3153

    
3154
  def Exec(self, feedback_fn):
3155
    """Compute and return the list of nodes.
3156

3157
    """
3158
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
3159
             in self.cfg.GetInstanceList()]
3160
    result = []
3161
    for node in self.wanted_nodes:
3162
      result.append((node.name, node.primary_ip, node.secondary_ip,
3163
                     [inst.name for inst in ilist
3164
                      if inst.primary_node == node.name],
3165
                     [inst.name for inst in ilist
3166
                      if node.name in inst.secondary_nodes],
3167
                     ))
3168
    return result
3169

    
3170

    
3171
class LUSetInstanceParms(LogicalUnit):
3172
  """Modifies an instances's parameters.
3173

3174
  """
3175
  HPATH = "instance-modify"
3176
  HTYPE = constants.HTYPE_INSTANCE
3177
  _OP_REQP = ["instance_name"]
3178

    
3179
  def BuildHooksEnv(self):
3180
    """Build hooks env.
3181

3182
    This runs on the master, primary and secondaries.
3183

3184
    """
3185
    env = {
3186
      "INSTANCE_NAME": self.op.instance_name,
3187
      }
3188
    if self.mem:
3189
      env["MEM_SIZE"] = self.mem
3190
    if self.vcpus:
3191
      env["VCPUS"] = self.vcpus
3192
    if self.do_ip:
3193
      env["INSTANCE_IP"] = self.ip
3194
    if self.bridge:
3195
      env["BRIDGE"] = self.bridge
3196

    
3197
    nl = [self.sstore.GetMasterNode(),
3198
          self.instance.primary_node] + list(self.instance.secondary_nodes)
3199

    
3200
    return env, nl, nl
3201

    
3202
  def CheckPrereq(self):
3203
    """Check prerequisites.
3204

3205
    This only checks the instance list against the existing names.
3206

3207
    """
3208
    self.mem = getattr(self.op, "mem", None)
3209
    self.vcpus = getattr(self.op, "vcpus", None)
3210
    self.ip = getattr(self.op, "ip", None)
3211
    self.bridge = getattr(self.op, "bridge", None)
3212
    if [self.mem, self.vcpus, self.ip, self.bridge].count(None) == 4:
3213
      raise errors.OpPrereqError, ("No changes submitted")
3214
    if self.mem is not None:
3215
      try:
3216
        self.mem = int(self.mem)
3217
      except ValueError, err:
3218
        raise errors.OpPrereqError, ("Invalid memory size: %s" % str(err))
3219
    if self.vcpus is not None:
3220
      try:
3221
        self.vcpus = int(self.vcpus)
3222
      except ValueError, err:
3223
        raise errors.OpPrereqError, ("Invalid vcpus number: %s" % str(err))
3224
    if self.ip is not None:
3225
      self.do_ip = True
3226
      if self.ip.lower() == "none":
3227
        self.ip = None
3228
      else:
3229
        if not utils.IsValidIP(self.ip):
3230
          raise errors.OpPrereqError, ("Invalid IP address '%s'." % self.ip)
3231
    else:
3232
      self.do_ip = False
3233

    
3234
    instance = self.cfg.GetInstanceInfo(
3235
      self.cfg.ExpandInstanceName(self.op.instance_name))
3236
    if instance is None:
3237
      raise errors.OpPrereqError, ("No such instance name '%s'" %
3238
                                   self.op.instance_name)
3239
    self.op.instance_name = instance.name
3240
    self.instance = instance
3241
    return
3242

    
3243
  def Exec(self, feedback_fn):
3244
    """Modifies an instance.
3245

3246
    All parameters take effect only at the next restart of the instance.
3247
    """
3248
    result = []
3249
    instance = self.instance
3250
    if self.mem:
3251
      instance.memory = self.mem
3252
      result.append(("mem", self.mem))
3253
    if self.vcpus:
3254
      instance.vcpus = self.vcpus
3255
      result.append(("vcpus",  self.vcpus))
3256
    if self.do_ip:
3257
      instance.nics[0].ip = self.ip
3258
      result.append(("ip", self.ip))
3259
    if self.bridge:
3260
      instance.nics[0].bridge = self.bridge
3261
      result.append(("bridge", self.bridge))
3262

    
3263
    self.cfg.AddInstance(instance)
3264

    
3265
    return result
3266

    
3267

    
3268
class LUQueryExports(NoHooksLU):
3269
  """Query the exports list
3270

3271
  """
3272
  _OP_REQP = []
3273

    
3274
  def CheckPrereq(self):
3275
    """Check that the nodelist contains only existing nodes.
3276

3277
    """
3278
    self.nodes = _GetWantedNodes(self, getattr(self.op, "nodes", None))
3279

    
3280
  def Exec(self, feedback_fn):
3281
    """Compute the list of all the exported system images.
3282

3283
    Returns:
3284
      a dictionary with the structure node->(export-list)
3285
      where export-list is a list of the instances exported on
3286
      that node.
3287

3288
    """
3289
    return rpc.call_export_list([node.name for node in self.nodes])
3290

    
3291

    
3292
class LUExportInstance(LogicalUnit):
3293
  """Export an instance to an image in the cluster.
3294

3295
  """
3296
  HPATH = "instance-export"
3297
  HTYPE = constants.HTYPE_INSTANCE
3298
  _OP_REQP = ["instance_name", "target_node", "shutdown"]
3299

    
3300
  def BuildHooksEnv(self):
3301
    """Build hooks env.
3302

3303
    This will run on the master, primary node and target node.
3304

3305
    """
3306
    env = {
3307
      "INSTANCE_NAME": self.op.instance_name,
3308
      "EXPORT_NODE": self.op.target_node,
3309
      "EXPORT_DO_SHUTDOWN": self.op.shutdown,
3310
      }
3311
    nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
3312
          self.op.target_node]
3313
    return env, nl, nl
3314

    
3315
  def CheckPrereq(self):
3316
    """Check prerequisites.
3317

3318
    This checks that the instance name is a valid one.
3319

3320
    """
3321
    instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
3322
    self.instance = self.cfg.GetInstanceInfo(instance_name)
3323
    if self.instance is None:
3324
      raise errors.OpPrereqError, ("Instance '%s' not found" %
3325
                                   self.op.instance_name)
3326

    
3327
    # node verification
3328
    dst_node_short = self.cfg.ExpandNodeName(self.op.target_node)
3329
    self.dst_node = self.cfg.GetNodeInfo(dst_node_short)
3330

    
3331
    if self.dst_node is None:
3332
      raise errors.OpPrereqError, ("Destination node '%s' is unknown." %
3333
                                   self.op.target_node)
3334
    self.op.target_node = self.dst_node.name
3335

    
3336
  def Exec(self, feedback_fn):
3337
    """Export an instance to an image in the cluster.
3338

3339
    """
3340
    instance = self.instance
3341
    dst_node = self.dst_node
3342
    src_node = instance.primary_node
3343
    # shutdown the instance, unless requested not to do so
3344
    if self.op.shutdown:
3345
      op = opcodes.OpShutdownInstance(instance_name=instance.name)
3346
      self.processor.ChainOpCode(op, feedback_fn)
3347

    
3348
    vgname = self.cfg.GetVGName()
3349

    
3350
    snap_disks = []
3351

    
3352
    try:
3353
      for disk in instance.disks:
3354
        if disk.iv_name == "sda":
3355
          # new_dev_name will be a snapshot of an lvm leaf of the one we passed
3356
          new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
3357

    
3358
          if not new_dev_name:
3359
            logger.Error("could not snapshot block device %s on node %s" %
3360
                         (disk.logical_id[1], src_node))
3361
          else:
3362
            new_dev = objects.Disk(dev_type="lvm", size=disk.size,
3363
                                      logical_id=(vgname, new_dev_name),
3364
                                      physical_id=(vgname, new_dev_name),
3365
                                      iv_name=disk.iv_name)
3366
            snap_disks.append(new_dev)
3367

    
3368
    finally:
3369
      if self.op.shutdown:
3370
        op = opcodes.OpStartupInstance(instance_name=instance.name,
3371
                                       force=False)
3372
        self.processor.ChainOpCode(op, feedback_fn)
3373

    
3374
    # TODO: check for size
3375

    
3376
    for dev in snap_disks:
3377
      if not rpc.call_snapshot_export(src_node, dev, dst_node.name,
3378
                                           instance):
3379
        logger.Error("could not export block device %s from node"
3380
                     " %s to node %s" %
3381
                     (dev.logical_id[1], src_node, dst_node.name))
3382
      if not rpc.call_blockdev_remove(src_node, dev):
3383
        logger.Error("could not remove snapshot block device %s from"
3384
                     " node %s" % (dev.logical_id[1], src_node))
3385

    
3386
    if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
3387
      logger.Error("could not finalize export for instance %s on node %s" %
3388
                   (instance.name, dst_node.name))
3389

    
3390
    nodelist = self.cfg.GetNodeList()
3391
    nodelist.remove(dst_node.name)
3392

    
3393
    # on one-node clusters nodelist will be empty after the removal
3394
    # if we proceed the backup would be removed because OpQueryExports
3395
    # substitutes an empty list with the full cluster node list.
3396
    if nodelist:
3397
      op = opcodes.OpQueryExports(nodes=nodelist)
3398
      exportlist = self.processor.ChainOpCode(op, feedback_fn)
3399
      for node in exportlist:
3400
        if instance.name in exportlist[node]:
3401
          if not rpc.call_export_remove(node, instance.name):
3402
            logger.Error("could not remove older export for instance %s"
3403
                         " on node %s" % (instance.name, node))