Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ d0834de3

History | View | Annotate | Download (110 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0613,W0201
25

    
26
import os
27
import os.path
28
import sha
29
import socket
30
import time
31
import tempfile
32
import re
33
import platform
34

    
35
from ganeti import rpc
36
from ganeti import ssh
37
from ganeti import logger
38
from ganeti import utils
39
from ganeti import errors
40
from ganeti import hypervisor
41
from ganeti import config
42
from ganeti import constants
43
from ganeti import objects
44
from ganeti import opcodes
45
from ganeti import ssconf
46

    
47
class LogicalUnit(object):
48
  """Logical Unit base class..
49

50
  Subclasses must follow these rules:
51
    - implement CheckPrereq which also fills in the opcode instance
52
      with all the fields (even if as None)
53
    - implement Exec
54
    - implement BuildHooksEnv
55
    - redefine HPATH and HTYPE
56
    - optionally redefine their run requirements (REQ_CLUSTER,
57
      REQ_MASTER); note that all commands require root permissions
58

59
  """
60
  HPATH = None
61
  HTYPE = None
62
  _OP_REQP = []
63
  REQ_CLUSTER = True
64
  REQ_MASTER = True
65

    
66
  def __init__(self, processor, op, cfg, sstore):
67
    """Constructor for LogicalUnit.
68

69
    This needs to be overriden in derived classes in order to check op
70
    validity.
71

72
    """
73
    self.processor = processor
74
    self.op = op
75
    self.cfg = cfg
76
    self.sstore = sstore
77
    for attr_name in self._OP_REQP:
78
      attr_val = getattr(op, attr_name, None)
79
      if attr_val is None:
80
        raise errors.OpPrereqError, ("Required parameter '%s' missing" %
81
                                     attr_name)
82
    if self.REQ_CLUSTER:
83
      if not cfg.IsCluster():
84
        raise errors.OpPrereqError, ("Cluster not initialized yet,"
85
                                     " use 'gnt-cluster init' first.")
86
      if self.REQ_MASTER:
87
        master = sstore.GetMasterNode()
88
        if master != socket.gethostname():
89
          raise errors.OpPrereqError, ("Commands must be run on the master"
90
                                       " node %s" % master)
91

    
92
  def CheckPrereq(self):
93
    """Check prerequisites for this LU.
94

95
    This method should check that the prerequisites for the execution
96
    of this LU are fulfilled. It can do internode communication, but
97
    it should be idempotent - no cluster or system changes are
98
    allowed.
99

100
    The method should raise errors.OpPrereqError in case something is
101
    not fulfilled. Its return value is ignored.
102

103
    This method should also update all the parameters of the opcode to
104
    their canonical form; e.g. a short node name must be fully
105
    expanded after this method has successfully completed (so that
106
    hooks, logging, etc. work correctly).
107

108
    """
109
    raise NotImplementedError
110

    
111
  def Exec(self, feedback_fn):
112
    """Execute the LU.
113

114
    This method should implement the actual work. It should raise
115
    errors.OpExecError for failures that are somewhat dealt with in
116
    code, or expected.
117

118
    """
119
    raise NotImplementedError
120

    
121
  def BuildHooksEnv(self):
122
    """Build hooks environment for this LU.
123

124
    This method should return a three-node tuple consisting of: a dict
125
    containing the environment that will be used for running the
126
    specific hook for this LU, a list of node names on which the hook
127
    should run before the execution, and a list of node names on which
128
    the hook should run after the execution.
129

130
    The keys of the dict must not have 'GANETI_' prefixed as this will
131
    be handled in the hooks runner. Also note additional keys will be
132
    added by the hooks runner. If the LU doesn't define any
133
    environment, an empty dict (and not None) should be returned.
134

135
    As for the node lists, the master should not be included in the
136
    them, as it will be added by the hooks runner in case this LU
137
    requires a cluster to run on (otherwise we don't have a node
138
    list). No nodes should be returned as an empty list (and not
139
    None).
140

141
    Note that if the HPATH for a LU class is None, this function will
142
    not be called.
143

144
    """
145
    raise NotImplementedError
146

    
147

    
148
class NoHooksLU(LogicalUnit):
149
  """Simple LU which runs no hooks.
150

151
  This LU is intended as a parent for other LogicalUnits which will
152
  run no hooks, in order to reduce duplicate code.
153

154
  """
155
  HPATH = None
156
  HTYPE = None
157

    
158
  def BuildHooksEnv(self):
159
    """Build hooks env.
160

161
    This is a no-op, since we don't run hooks.
162

163
    """
164
    return
165

    
166

    
167
def _GetWantedNodes(lu, nodes):
168
  """Returns list of checked and expanded nodes.
169

170
  Args:
171
    nodes: List of nodes (strings) or None for all
172

173
  """
174
  if nodes is not None and not isinstance(nodes, list):
175
    raise errors.OpPrereqError, "Invalid argument type 'nodes'"
176

    
177
  if nodes:
178
    wanted_nodes = []
179

    
180
    for name in nodes:
181
      node = lu.cfg.GetNodeInfo(lu.cfg.ExpandNodeName(name))
182
      if node is None:
183
        raise errors.OpPrereqError, ("No such node name '%s'" % name)
184
    wanted_nodes.append(node)
185

    
186
    return wanted_nodes
187
  else:
188
    return [lu.cfg.GetNodeInfo(name) for name in lu.cfg.GetNodeList()]
189

    
190

    
191
def _CheckOutputFields(static, dynamic, selected):
192
  """Checks whether all selected fields are valid.
193

194
  Args:
195
    static: Static fields
196
    dynamic: Dynamic fields
197

198
  """
199
  static_fields = frozenset(static)
200
  dynamic_fields = frozenset(dynamic)
201

    
202
  all_fields = static_fields | dynamic_fields
203

    
204
  if not all_fields.issuperset(selected):
205
    raise errors.OpPrereqError, ("Unknown output fields selected: %s"
206
                                 % ",".join(frozenset(selected).
207
                                            difference(all_fields)))
208

    
209

    
210
def _UpdateEtcHosts(fullnode, ip):
211
  """Ensure a node has a correct entry in /etc/hosts.
212

213
  Args:
214
    fullnode - Fully qualified domain name of host. (str)
215
    ip       - IPv4 address of host (str)
216

217
  """
218
  node = fullnode.split(".", 1)[0]
219

    
220
  f = open('/etc/hosts', 'r+')
221

    
222
  inthere = False
223

    
224
  save_lines = []
225
  add_lines = []
226
  removed = False
227

    
228
  while True:
229
    rawline = f.readline()
230

    
231
    if not rawline:
232
      # End of file
233
      break
234

    
235
    line = rawline.split('\n')[0]
236

    
237
    # Strip off comments
238
    line = line.split('#')[0]
239

    
240
    if not line:
241
      # Entire line was comment, skip
242
      save_lines.append(rawline)
243
      continue
244

    
245
    fields = line.split()
246

    
247
    haveall = True
248
    havesome = False
249
    for spec in [ ip, fullnode, node ]:
250
      if spec not in fields:
251
        haveall = False
252
      if spec in fields:
253
        havesome = True
254

    
255
    if haveall:
256
      inthere = True
257
      save_lines.append(rawline)
258
      continue
259

    
260
    if havesome and not haveall:
261
      # Line (old, or manual?) which is missing some.  Remove.
262
      removed = True
263
      continue
264

    
265
    save_lines.append(rawline)
266

    
267
  if not inthere:
268
    add_lines.append('%s\t%s %s\n' % (ip, fullnode, node))
269

    
270
  if removed:
271
    if add_lines:
272
      save_lines = save_lines + add_lines
273

    
274
    # We removed a line, write a new file and replace old.
275
    fd, tmpname = tempfile.mkstemp('tmp', 'hosts_', '/etc')
276
    newfile = os.fdopen(fd, 'w')
277
    newfile.write(''.join(save_lines))
278
    newfile.close()
279
    os.rename(tmpname, '/etc/hosts')
280

    
281
  elif add_lines:
282
    # Simply appending a new line will do the trick.
283
    f.seek(0, 2)
284
    for add in add_lines:
285
      f.write(add)
286

    
287
  f.close()
288

    
289

    
290
def _UpdateKnownHosts(fullnode, ip, pubkey):
291
  """Ensure a node has a correct known_hosts entry.
292

293
  Args:
294
    fullnode - Fully qualified domain name of host. (str)
295
    ip       - IPv4 address of host (str)
296
    pubkey   - the public key of the cluster
297

298
  """
299
  if os.path.exists('/etc/ssh/ssh_known_hosts'):
300
    f = open('/etc/ssh/ssh_known_hosts', 'r+')
301
  else:
302
    f = open('/etc/ssh/ssh_known_hosts', 'w+')
303

    
304
  inthere = False
305

    
306
  save_lines = []
307
  add_lines = []
308
  removed = False
309

    
310
  while True:
311
    rawline = f.readline()
312
    logger.Debug('read %s' % (repr(rawline),))
313

    
314
    if not rawline:
315
      # End of file
316
      break
317

    
318
    line = rawline.split('\n')[0]
319

    
320
    parts = line.split(' ')
321
    fields = parts[0].split(',')
322
    key = parts[2]
323

    
324
    haveall = True
325
    havesome = False
326
    for spec in [ ip, fullnode ]:
327
      if spec not in fields:
328
        haveall = False
329
      if spec in fields:
330
        havesome = True
331

    
332
    logger.Debug("key, pubkey = %s." % (repr((key, pubkey)),))
333
    if haveall and key == pubkey:
334
      inthere = True
335
      save_lines.append(rawline)
336
      logger.Debug("Keeping known_hosts '%s'." % (repr(rawline),))
337
      continue
338

    
339
    if havesome and (not haveall or key != pubkey):
340
      removed = True
341
      logger.Debug("Discarding known_hosts '%s'." % (repr(rawline),))
342
      continue
343

    
344
    save_lines.append(rawline)
345

    
346
  if not inthere:
347
    add_lines.append('%s,%s ssh-rsa %s\n' % (fullnode, ip, pubkey))
348
    logger.Debug("Adding known_hosts '%s'." % (repr(add_lines[-1]),))
349

    
350
  if removed:
351
    save_lines = save_lines + add_lines
352

    
353
    # Write a new file and replace old.
354
    fd, tmpname = tempfile.mkstemp('tmp', 'ssh_known_hosts_', '/etc/ssh')
355
    newfile = os.fdopen(fd, 'w')
356
    newfile.write(''.join(save_lines))
357
    newfile.close()
358
    logger.Debug("Wrote new known_hosts.")
359
    os.rename(tmpname, '/etc/ssh/ssh_known_hosts')
360

    
361
  elif add_lines:
362
    # Simply appending a new line will do the trick.
363
    f.seek(0, 2)
364
    for add in add_lines:
365
      f.write(add)
366

    
367
  f.close()
368

    
369

    
370
def _HasValidVG(vglist, vgname):
371
  """Checks if the volume group list is valid.
372

373
  A non-None return value means there's an error, and the return value
374
  is the error message.
375

376
  """
377
  vgsize = vglist.get(vgname, None)
378
  if vgsize is None:
379
    return "volume group '%s' missing" % vgname
380
  elif vgsize < 20480:
381
    return ("volume group '%s' too small (20480MiB required, %dMib found)" %
382
            (vgname, vgsize))
383
  return None
384

    
385

    
386
def _InitSSHSetup(node):
387
  """Setup the SSH configuration for the cluster.
388

389

390
  This generates a dsa keypair for root, adds the pub key to the
391
  permitted hosts and adds the hostkey to its own known hosts.
392

393
  Args:
394
    node: the name of this host as a fqdn
395

396
  """
397
  utils.RemoveFile('/root/.ssh/known_hosts')
398

    
399
  if os.path.exists('/root/.ssh/id_dsa'):
400
    utils.CreateBackup('/root/.ssh/id_dsa')
401
  if os.path.exists('/root/.ssh/id_dsa.pub'):
402
    utils.CreateBackup('/root/.ssh/id_dsa.pub')
403

    
404
  utils.RemoveFile('/root/.ssh/id_dsa')
405
  utils.RemoveFile('/root/.ssh/id_dsa.pub')
406

    
407
  result = utils.RunCmd(["ssh-keygen", "-t", "dsa",
408
                         "-f", "/root/.ssh/id_dsa",
409
                         "-q", "-N", ""])
410
  if result.failed:
411
    raise errors.OpExecError, ("could not generate ssh keypair, error %s" %
412
                               result.output)
413

    
414
  f = open('/root/.ssh/id_dsa.pub', 'r')
415
  try:
416
    utils.AddAuthorizedKey('/root/.ssh/authorized_keys', f.read(8192))
417
  finally:
418
    f.close()
419

    
420

    
421
def _InitGanetiServerSetup(ss):
422
  """Setup the necessary configuration for the initial node daemon.
423

424
  This creates the nodepass file containing the shared password for
425
  the cluster and also generates the SSL certificate.
426

427
  """
428
  # Create pseudo random password
429
  randpass = sha.new(os.urandom(64)).hexdigest()
430
  # and write it into sstore
431
  ss.SetKey(ss.SS_NODED_PASS, randpass)
432

    
433
  result = utils.RunCmd(["openssl", "req", "-new", "-newkey", "rsa:1024",
434
                         "-days", str(365*5), "-nodes", "-x509",
435
                         "-keyout", constants.SSL_CERT_FILE,
436
                         "-out", constants.SSL_CERT_FILE, "-batch"])
437
  if result.failed:
438
    raise errors.OpExecError, ("could not generate server ssl cert, command"
439
                               " %s had exitcode %s and error message %s" %
440
                               (result.cmd, result.exit_code, result.output))
441

    
442
  os.chmod(constants.SSL_CERT_FILE, 0400)
443

    
444
  result = utils.RunCmd([constants.NODE_INITD_SCRIPT, "restart"])
445

    
446
  if result.failed:
447
    raise errors.OpExecError, ("could not start the node daemon, command %s"
448
                               " had exitcode %s and error %s" %
449
                               (result.cmd, result.exit_code, result.output))
450

    
451

    
452
class LUInitCluster(LogicalUnit):
453
  """Initialise the cluster.
454

455
  """
456
  HPATH = "cluster-init"
457
  HTYPE = constants.HTYPE_CLUSTER
458
  _OP_REQP = ["cluster_name", "hypervisor_type", "vg_name", "mac_prefix",
459
              "def_bridge", "master_netdev"]
460
  REQ_CLUSTER = False
461

    
462
  def BuildHooksEnv(self):
463
    """Build hooks env.
464

465
    Notes: Since we don't require a cluster, we must manually add
466
    ourselves in the post-run node list.
467

468
    """
469
    env = {"CLUSTER": self.op.cluster_name,
470
           "MASTER": self.hostname['hostname_full']}
471
    return env, [], [self.hostname['hostname_full']]
472

    
473
  def CheckPrereq(self):
474
    """Verify that the passed name is a valid one.
475

476
    """
477
    if config.ConfigWriter.IsCluster():
478
      raise errors.OpPrereqError, ("Cluster is already initialised")
479

    
480
    hostname_local = socket.gethostname()
481
    self.hostname = hostname = utils.LookupHostname(hostname_local)
482
    if not hostname:
483
      raise errors.OpPrereqError, ("Cannot resolve my own hostname ('%s')" %
484
                                   hostname_local)
485

    
486
    self.clustername = clustername = utils.LookupHostname(self.op.cluster_name)
487
    if not clustername:
488
      raise errors.OpPrereqError, ("Cannot resolve given cluster name ('%s')"
489
                                   % self.op.cluster_name)
490

    
491
    result = utils.RunCmd(["fping", "-S127.0.0.1", "-q", hostname['ip']])
492
    if result.failed:
493
      raise errors.OpPrereqError, ("Inconsistency: this host's name resolves"
494
                                   " to %s,\nbut this ip address does not"
495
                                   " belong to this host."
496
                                   " Aborting." % hostname['ip'])
497

    
498
    secondary_ip = getattr(self.op, "secondary_ip", None)
499
    if secondary_ip and not utils.IsValidIP(secondary_ip):
500
      raise errors.OpPrereqError, ("Invalid secondary ip given")
501
    if secondary_ip and secondary_ip != hostname['ip']:
502
      result = utils.RunCmd(["fping", "-S127.0.0.1", "-q", secondary_ip])
503
      if result.failed:
504
        raise errors.OpPrereqError, ("You gave %s as secondary IP,\n"
505
                                     "but it does not belong to this host." %
506
                                     secondary_ip)
507
    self.secondary_ip = secondary_ip
508

    
509
    # checks presence of the volume group given
510
    vgstatus = _HasValidVG(utils.ListVolumeGroups(), self.op.vg_name)
511

    
512
    if vgstatus:
513
      raise errors.OpPrereqError, ("Error: %s" % vgstatus)
514

    
515
    if not re.match("^[0-9a-z]{2}:[0-9a-z]{2}:[0-9a-z]{2}$",
516
                    self.op.mac_prefix):
517
      raise errors.OpPrereqError, ("Invalid mac prefix given '%s'" %
518
                                   self.op.mac_prefix)
519

    
520
    if self.op.hypervisor_type not in hypervisor.VALID_HTYPES:
521
      raise errors.OpPrereqError, ("Invalid hypervisor type given '%s'" %
522
                                   self.op.hypervisor_type)
523

    
524
    result = utils.RunCmd(["ip", "link", "show", "dev", self.op.master_netdev])
525
    if result.failed:
526
      raise errors.OpPrereqError, ("Invalid master netdev given (%s): '%s'" %
527
                                   (self.op.master_netdev, result.output))
528

    
529
  def Exec(self, feedback_fn):
530
    """Initialize the cluster.
531

532
    """
533
    clustername = self.clustername
534
    hostname = self.hostname
535

    
536
    # set up the simple store
537
    ss = ssconf.SimpleStore()
538
    ss.SetKey(ss.SS_HYPERVISOR, self.op.hypervisor_type)
539
    ss.SetKey(ss.SS_MASTER_NODE, hostname['hostname_full'])
540
    ss.SetKey(ss.SS_MASTER_IP, clustername['ip'])
541
    ss.SetKey(ss.SS_MASTER_NETDEV, self.op.master_netdev)
542
    ss.SetKey(ss.SS_CLUSTER_NAME, clustername['hostname'])
543

    
544
    # set up the inter-node password and certificate
545
    _InitGanetiServerSetup(ss)
546

    
547
    # start the master ip
548
    rpc.call_node_start_master(hostname['hostname_full'])
549

    
550
    # set up ssh config and /etc/hosts
551
    f = open('/etc/ssh/ssh_host_rsa_key.pub', 'r')
552
    try:
553
      sshline = f.read()
554
    finally:
555
      f.close()
556
    sshkey = sshline.split(" ")[1]
557

    
558
    _UpdateEtcHosts(hostname['hostname_full'],
559
                    hostname['ip'],
560
                    )
561

    
562
    _UpdateKnownHosts(hostname['hostname_full'],
563
                      hostname['ip'],
564
                      sshkey,
565
                      )
566

    
567
    _InitSSHSetup(hostname['hostname'])
568

    
569
    # init of cluster config file
570
    cfgw = config.ConfigWriter()
571
    cfgw.InitConfig(hostname['hostname'], hostname['ip'], self.secondary_ip,
572
                    sshkey, self.op.mac_prefix,
573
                    self.op.vg_name, self.op.def_bridge)
574

    
575

    
576
class LUDestroyCluster(NoHooksLU):
577
  """Logical unit for destroying the cluster.
578

579
  """
580
  _OP_REQP = []
581

    
582
  def CheckPrereq(self):
583
    """Check prerequisites.
584

585
    This checks whether the cluster is empty.
586

587
    Any errors are signalled by raising errors.OpPrereqError.
588

589
    """
590
    master = self.sstore.GetMasterNode()
591

    
592
    nodelist = self.cfg.GetNodeList()
593
    if len(nodelist) > 0 and nodelist != [master]:
594
      raise errors.OpPrereqError, ("There are still %d node(s) in "
595
                                   "this cluster." % (len(nodelist) - 1))
596

    
597
  def Exec(self, feedback_fn):
598
    """Destroys the cluster.
599

600
    """
601
    utils.CreateBackup('/root/.ssh/id_dsa')
602
    utils.CreateBackup('/root/.ssh/id_dsa.pub')
603
    rpc.call_node_leave_cluster(self.sstore.GetMasterNode())
604

    
605

    
606
class LUVerifyCluster(NoHooksLU):
607
  """Verifies the cluster status.
608

609
  """
610
  _OP_REQP = []
611

    
612
  def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
613
                  remote_version, feedback_fn):
614
    """Run multiple tests against a node.
615

616
    Test list:
617
      - compares ganeti version
618
      - checks vg existance and size > 20G
619
      - checks config file checksum
620
      - checks ssh to other nodes
621

622
    Args:
623
      node: name of the node to check
624
      file_list: required list of files
625
      local_cksum: dictionary of local files and their checksums
626

627
    """
628
    # compares ganeti version
629
    local_version = constants.PROTOCOL_VERSION
630
    if not remote_version:
631
      feedback_fn(" - ERROR: connection to %s failed" % (node))
632
      return True
633

    
634
    if local_version != remote_version:
635
      feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
636
                      (local_version, node, remote_version))
637
      return True
638

    
639
    # checks vg existance and size > 20G
640

    
641
    bad = False
642
    if not vglist:
643
      feedback_fn("  - ERROR: unable to check volume groups on node %s." %
644
                      (node,))
645
      bad = True
646
    else:
647
      vgstatus = _HasValidVG(vglist, self.cfg.GetVGName())
648
      if vgstatus:
649
        feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
650
        bad = True
651

    
652
    # checks config file checksum
653
    # checks ssh to any
654

    
655
    if 'filelist' not in node_result:
656
      bad = True
657
      feedback_fn("  - ERROR: node hasn't returned file checksum data")
658
    else:
659
      remote_cksum = node_result['filelist']
660
      for file_name in file_list:
661
        if file_name not in remote_cksum:
662
          bad = True
663
          feedback_fn("  - ERROR: file '%s' missing" % file_name)
664
        elif remote_cksum[file_name] != local_cksum[file_name]:
665
          bad = True
666
          feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
667

    
668
    if 'nodelist' not in node_result:
669
      bad = True
670
      feedback_fn("  - ERROR: node hasn't returned node connectivity data")
671
    else:
672
      if node_result['nodelist']:
673
        bad = True
674
        for node in node_result['nodelist']:
675
          feedback_fn("  - ERROR: communication with node '%s': %s" %
676
                          (node, node_result['nodelist'][node]))
677
    hyp_result = node_result.get('hypervisor', None)
678
    if hyp_result is not None:
679
      feedback_fn("  - ERROR: hypervisor verify failure: '%s'" % hyp_result)
680
    return bad
681

    
682
  def _VerifyInstance(self, instance, node_vol_is, node_instance, feedback_fn):
683
    """Verify an instance.
684

685
    This function checks to see if the required block devices are
686
    available on the instance's node.
687

688
    """
689
    bad = False
690

    
691
    instancelist = self.cfg.GetInstanceList()
692
    if not instance in instancelist:
693
      feedback_fn("  - ERROR: instance %s not in instance list %s" %
694
                      (instance, instancelist))
695
      bad = True
696

    
697
    instanceconfig = self.cfg.GetInstanceInfo(instance)
698
    node_current = instanceconfig.primary_node
699

    
700
    node_vol_should = {}
701
    instanceconfig.MapLVsByNode(node_vol_should)
702

    
703
    for node in node_vol_should:
704
      for volume in node_vol_should[node]:
705
        if node not in node_vol_is or volume not in node_vol_is[node]:
706
          feedback_fn("  - ERROR: volume %s missing on node %s" %
707
                          (volume, node))
708
          bad = True
709

    
710
    if not instanceconfig.status == 'down':
711
      if not instance in node_instance[node_current]:
712
        feedback_fn("  - ERROR: instance %s not running on node %s" %
713
                        (instance, node_current))
714
        bad = True
715

    
716
    for node in node_instance:
717
      if (not node == node_current):
718
        if instance in node_instance[node]:
719
          feedback_fn("  - ERROR: instance %s should not run on node %s" %
720
                          (instance, node))
721
          bad = True
722

    
723
    return not bad
724

    
725
  def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
726
    """Verify if there are any unknown volumes in the cluster.
727

728
    The .os, .swap and backup volumes are ignored. All other volumes are
729
    reported as unknown.
730

731
    """
732
    bad = False
733

    
734
    for node in node_vol_is:
735
      for volume in node_vol_is[node]:
736
        if node not in node_vol_should or volume not in node_vol_should[node]:
737
          feedback_fn("  - ERROR: volume %s on node %s should not exist" %
738
                      (volume, node))
739
          bad = True
740
    return bad
741

    
742
  def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
743
    """Verify the list of running instances.
744

745
    This checks what instances are running but unknown to the cluster.
746

747
    """
748
    bad = False
749
    for node in node_instance:
750
      for runninginstance in node_instance[node]:
751
        if runninginstance not in instancelist:
752
          feedback_fn("  - ERROR: instance %s on node %s should not exist" %
753
                          (runninginstance, node))
754
          bad = True
755
    return bad
756

    
757
  def CheckPrereq(self):
758
    """Check prerequisites.
759

760
    This has no prerequisites.
761

762
    """
763
    pass
764

    
765
  def Exec(self, feedback_fn):
766
    """Verify integrity of cluster, performing various test on nodes.
767

768
    """
769
    bad = False
770
    feedback_fn("* Verifying global settings")
771
    self.cfg.VerifyConfig()
772

    
773
    master = self.sstore.GetMasterNode()
774
    vg_name = self.cfg.GetVGName()
775
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
776
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
777
    node_volume = {}
778
    node_instance = {}
779

    
780
    # FIXME: verify OS list
781
    # do local checksums
782
    file_names = list(self.sstore.GetFileList())
783
    file_names.append(constants.SSL_CERT_FILE)
784
    file_names.append(constants.CLUSTER_CONF_FILE)
785
    local_checksums = utils.FingerprintFiles(file_names)
786

    
787
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
788
    all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
789
    all_instanceinfo = rpc.call_instance_list(nodelist)
790
    all_vglist = rpc.call_vg_list(nodelist)
791
    node_verify_param = {
792
      'filelist': file_names,
793
      'nodelist': nodelist,
794
      'hypervisor': None,
795
      }
796
    all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
797
    all_rversion = rpc.call_version(nodelist)
798

    
799
    for node in nodelist:
800
      feedback_fn("* Verifying node %s" % node)
801
      result = self._VerifyNode(node, file_names, local_checksums,
802
                                all_vglist[node], all_nvinfo[node],
803
                                all_rversion[node], feedback_fn)
804
      bad = bad or result
805

    
806
      # node_volume
807
      volumeinfo = all_volumeinfo[node]
808

    
809
      if type(volumeinfo) != dict:
810
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
811
        bad = True
812
        continue
813

    
814
      node_volume[node] = volumeinfo
815

    
816
      # node_instance
817
      nodeinstance = all_instanceinfo[node]
818
      if type(nodeinstance) != list:
819
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
820
        bad = True
821
        continue
822

    
823
      node_instance[node] = nodeinstance
824

    
825
    node_vol_should = {}
826

    
827
    for instance in instancelist:
828
      feedback_fn("* Verifying instance %s" % instance)
829
      result =  self._VerifyInstance(instance, node_volume, node_instance,
830
                                     feedback_fn)
831
      bad = bad or result
832

    
833
      inst_config = self.cfg.GetInstanceInfo(instance)
834

    
835
      inst_config.MapLVsByNode(node_vol_should)
836

    
837
    feedback_fn("* Verifying orphan volumes")
838
    result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
839
                                       feedback_fn)
840
    bad = bad or result
841

    
842
    feedback_fn("* Verifying remaining instances")
843
    result = self._VerifyOrphanInstances(instancelist, node_instance,
844
                                         feedback_fn)
845
    bad = bad or result
846

    
847
    return int(bad)
848

    
849

    
850
def _WaitForSync(cfgw, instance, oneshot=False, unlock=False):
851
  """Sleep and poll for an instance's disk to sync.
852

853
  """
854
  if not instance.disks:
855
    return True
856

    
857
  if not oneshot:
858
    logger.ToStdout("Waiting for instance %s to sync disks." % instance.name)
859

    
860
  node = instance.primary_node
861

    
862
  for dev in instance.disks:
863
    cfgw.SetDiskID(dev, node)
864

    
865
  retries = 0
866
  while True:
867
    max_time = 0
868
    done = True
869
    cumul_degraded = False
870
    rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
871
    if not rstats:
872
      logger.ToStderr("Can't get any data from node %s" % node)
873
      retries += 1
874
      if retries >= 10:
875
        raise errors.RemoteError, ("Can't contact node %s for mirror data,"
876
                                   " aborting." % node)
877
      time.sleep(6)
878
      continue
879
    retries = 0
880
    for i in range(len(rstats)):
881
      mstat = rstats[i]
882
      if mstat is None:
883
        logger.ToStderr("Can't compute data for node %s/%s" %
884
                        (node, instance.disks[i].iv_name))
885
        continue
886
      perc_done, est_time, is_degraded = mstat
887
      cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
888
      if perc_done is not None:
889
        done = False
890
        if est_time is not None:
891
          rem_time = "%d estimated seconds remaining" % est_time
892
          max_time = est_time
893
        else:
894
          rem_time = "no time estimate"
895
        logger.ToStdout("- device %s: %5.2f%% done, %s" %
896
                        (instance.disks[i].iv_name, perc_done, rem_time))
897
    if done or oneshot:
898
      break
899

    
900
    if unlock:
901
      utils.Unlock('cmd')
902
    try:
903
      time.sleep(min(60, max_time))
904
    finally:
905
      if unlock:
906
        utils.Lock('cmd')
907

    
908
  if done:
909
    logger.ToStdout("Instance %s's disks are in sync." % instance.name)
910
  return not cumul_degraded
911

    
912

    
913
def _CheckDiskConsistency(cfgw, dev, node, on_primary):
914
  """Check that mirrors are not degraded.
915

916
  """
917
  cfgw.SetDiskID(dev, node)
918

    
919
  result = True
920
  if on_primary or dev.AssembleOnSecondary():
921
    rstats = rpc.call_blockdev_find(node, dev)
922
    if not rstats:
923
      logger.ToStderr("Can't get any data from node %s" % node)
924
      result = False
925
    else:
926
      result = result and (not rstats[5])
927
  if dev.children:
928
    for child in dev.children:
929
      result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
930

    
931
  return result
932

    
933

    
934
class LUDiagnoseOS(NoHooksLU):
935
  """Logical unit for OS diagnose/query.
936

937
  """
938
  _OP_REQP = []
939

    
940
  def CheckPrereq(self):
941
    """Check prerequisites.
942

943
    This always succeeds, since this is a pure query LU.
944

945
    """
946
    return
947

    
948
  def Exec(self, feedback_fn):
949
    """Compute the list of OSes.
950

951
    """
952
    node_list = self.cfg.GetNodeList()
953
    node_data = rpc.call_os_diagnose(node_list)
954
    if node_data == False:
955
      raise errors.OpExecError, "Can't gather the list of OSes"
956
    return node_data
957

    
958

    
959
class LURemoveNode(LogicalUnit):
960
  """Logical unit for removing a node.
961

962
  """
963
  HPATH = "node-remove"
964
  HTYPE = constants.HTYPE_NODE
965
  _OP_REQP = ["node_name"]
966

    
967
  def BuildHooksEnv(self):
968
    """Build hooks env.
969

970
    This doesn't run on the target node in the pre phase as a failed
971
    node would not allows itself to run.
972

973
    """
974
    all_nodes = self.cfg.GetNodeList()
975
    all_nodes.remove(self.op.node_name)
976
    return {"NODE_NAME": self.op.node_name}, all_nodes, all_nodes
977

    
978
  def CheckPrereq(self):
979
    """Check prerequisites.
980

981
    This checks:
982
     - the node exists in the configuration
983
     - it does not have primary or secondary instances
984
     - it's not the master
985

986
    Any errors are signalled by raising errors.OpPrereqError.
987

988
    """
989
    node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
990
    if node is None:
991
      logger.Error("Error: Node '%s' is unknown." % self.op.node_name)
992
      return 1
993

    
994
    instance_list = self.cfg.GetInstanceList()
995

    
996
    masternode = self.sstore.GetMasterNode()
997
    if node.name == masternode:
998
      raise errors.OpPrereqError, ("Node is the master node,"
999
                                   " you need to failover first.")
1000

    
1001
    for instance_name in instance_list:
1002
      instance = self.cfg.GetInstanceInfo(instance_name)
1003
      if node.name == instance.primary_node:
1004
        raise errors.OpPrereqError, ("Instance %s still running on the node,"
1005
                                     " please remove first." % instance_name)
1006
      if node.name in instance.secondary_nodes:
1007
        raise errors.OpPrereqError, ("Instance %s has node as a secondary,"
1008
                                     " please remove first." % instance_name)
1009
    self.op.node_name = node.name
1010
    self.node = node
1011

    
1012
  def Exec(self, feedback_fn):
1013
    """Removes the node from the cluster.
1014

1015
    """
1016
    node = self.node
1017
    logger.Info("stopping the node daemon and removing configs from node %s" %
1018
                node.name)
1019

    
1020
    rpc.call_node_leave_cluster(node.name)
1021

    
1022
    ssh.SSHCall(node.name, 'root', "%s stop" % constants.NODE_INITD_SCRIPT)
1023

    
1024
    logger.Info("Removing node %s from config" % node.name)
1025

    
1026
    self.cfg.RemoveNode(node.name)
1027

    
1028

    
1029
class LUQueryNodes(NoHooksLU):
1030
  """Logical unit for querying nodes.
1031

1032
  """
1033
  _OP_REQP = ["output_fields"]
1034

    
1035
  def CheckPrereq(self):
1036
    """Check prerequisites.
1037

1038
    This checks that the fields required are valid output fields.
1039

1040
    """
1041
    self.dynamic_fields = frozenset(["dtotal", "dfree",
1042
                                     "mtotal", "mnode", "mfree"])
1043

    
1044
    _CheckOutputFields(static=["name", "pinst", "sinst", "pip", "sip"],
1045
                       dynamic=self.dynamic_fields,
1046
                       selected=self.op.output_fields)
1047

    
1048

    
1049
  def Exec(self, feedback_fn):
1050
    """Computes the list of nodes and their attributes.
1051

1052
    """
1053
    nodenames = utils.NiceSort(self.cfg.GetNodeList())
1054
    nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1055

    
1056

    
1057
    # begin data gathering
1058

    
1059
    if self.dynamic_fields.intersection(self.op.output_fields):
1060
      live_data = {}
1061
      node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1062
      for name in nodenames:
1063
        nodeinfo = node_data.get(name, None)
1064
        if nodeinfo:
1065
          live_data[name] = {
1066
            "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1067
            "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1068
            "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1069
            "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1070
            "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1071
            }
1072
        else:
1073
          live_data[name] = {}
1074
    else:
1075
      live_data = dict.fromkeys(nodenames, {})
1076

    
1077
    node_to_primary = dict.fromkeys(nodenames, 0)
1078
    node_to_secondary = dict.fromkeys(nodenames, 0)
1079

    
1080
    if "pinst" in self.op.output_fields or "sinst" in self.op.output_fields:
1081
      instancelist = self.cfg.GetInstanceList()
1082

    
1083
      for instance in instancelist:
1084
        instanceinfo = self.cfg.GetInstanceInfo(instance)
1085
        node_to_primary[instanceinfo.primary_node] += 1
1086
        for secnode in instanceinfo.secondary_nodes:
1087
          node_to_secondary[secnode] += 1
1088

    
1089
    # end data gathering
1090

    
1091
    output = []
1092
    for node in nodelist:
1093
      node_output = []
1094
      for field in self.op.output_fields:
1095
        if field == "name":
1096
          val = node.name
1097
        elif field == "pinst":
1098
          val = node_to_primary[node.name]
1099
        elif field == "sinst":
1100
          val = node_to_secondary[node.name]
1101
        elif field == "pip":
1102
          val = node.primary_ip
1103
        elif field == "sip":
1104
          val = node.secondary_ip
1105
        elif field in self.dynamic_fields:
1106
          val = live_data[node.name].get(field, "?")
1107
        else:
1108
          raise errors.ParameterError, field
1109
        val = str(val)
1110
        node_output.append(val)
1111
      output.append(node_output)
1112

    
1113
    return output
1114

    
1115

    
1116
class LUQueryNodeVolumes(NoHooksLU):
1117
  """Logical unit for getting volumes on node(s).
1118

1119
  """
1120
  _OP_REQP = ["nodes", "output_fields"]
1121

    
1122
  def CheckPrereq(self):
1123
    """Check prerequisites.
1124

1125
    This checks that the fields required are valid output fields.
1126

1127
    """
1128
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1129

    
1130
    _CheckOutputFields(static=["node"],
1131
                       dynamic=["phys", "vg", "name", "size", "instance"],
1132
                       selected=self.op.output_fields)
1133

    
1134

    
1135
  def Exec(self, feedback_fn):
1136
    """Computes the list of nodes and their attributes.
1137

1138
    """
1139
    nodenames = utils.NiceSort([node.name for node in self.nodes])
1140
    volumes = rpc.call_node_volumes(nodenames)
1141

    
1142
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
1143
             in self.cfg.GetInstanceList()]
1144

    
1145
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1146

    
1147
    output = []
1148
    for node in nodenames:
1149
      node_vols = volumes[node][:]
1150
      node_vols.sort(key=lambda vol: vol['dev'])
1151

    
1152
      for vol in node_vols:
1153
        node_output = []
1154
        for field in self.op.output_fields:
1155
          if field == "node":
1156
            val = node
1157
          elif field == "phys":
1158
            val = vol['dev']
1159
          elif field == "vg":
1160
            val = vol['vg']
1161
          elif field == "name":
1162
            val = vol['name']
1163
          elif field == "size":
1164
            val = int(float(vol['size']))
1165
          elif field == "instance":
1166
            for inst in ilist:
1167
              if node not in lv_by_node[inst]:
1168
                continue
1169
              if vol['name'] in lv_by_node[inst][node]:
1170
                val = inst.name
1171
                break
1172
            else:
1173
              val = '-'
1174
          else:
1175
            raise errors.ParameterError, field
1176
          node_output.append(str(val))
1177

    
1178
        output.append(node_output)
1179

    
1180
    return output
1181

    
1182

    
1183
class LUAddNode(LogicalUnit):
1184
  """Logical unit for adding node to the cluster.
1185

1186
  """
1187
  HPATH = "node-add"
1188
  HTYPE = constants.HTYPE_NODE
1189
  _OP_REQP = ["node_name"]
1190

    
1191
  def BuildHooksEnv(self):
1192
    """Build hooks env.
1193

1194
    This will run on all nodes before, and on all nodes + the new node after.
1195

1196
    """
1197
    env = {
1198
      "NODE_NAME": self.op.node_name,
1199
      "NODE_PIP": self.op.primary_ip,
1200
      "NODE_SIP": self.op.secondary_ip,
1201
      }
1202
    nodes_0 = self.cfg.GetNodeList()
1203
    nodes_1 = nodes_0 + [self.op.node_name, ]
1204
    return env, nodes_0, nodes_1
1205

    
1206
  def CheckPrereq(self):
1207
    """Check prerequisites.
1208

1209
    This checks:
1210
     - the new node is not already in the config
1211
     - it is resolvable
1212
     - its parameters (single/dual homed) matches the cluster
1213

1214
    Any errors are signalled by raising errors.OpPrereqError.
1215

1216
    """
1217
    node_name = self.op.node_name
1218
    cfg = self.cfg
1219

    
1220
    dns_data = utils.LookupHostname(node_name)
1221
    if not dns_data:
1222
      raise errors.OpPrereqError, ("Node %s is not resolvable" % node_name)
1223

    
1224
    node = dns_data['hostname']
1225
    primary_ip = self.op.primary_ip = dns_data['ip']
1226
    secondary_ip = getattr(self.op, "secondary_ip", None)
1227
    if secondary_ip is None:
1228
      secondary_ip = primary_ip
1229
    if not utils.IsValidIP(secondary_ip):
1230
      raise errors.OpPrereqError, ("Invalid secondary IP given")
1231
    self.op.secondary_ip = secondary_ip
1232
    node_list = cfg.GetNodeList()
1233
    if node in node_list:
1234
      raise errors.OpPrereqError, ("Node %s is already in the configuration"
1235
                                   % node)
1236

    
1237
    for existing_node_name in node_list:
1238
      existing_node = cfg.GetNodeInfo(existing_node_name)
1239
      if (existing_node.primary_ip == primary_ip or
1240
          existing_node.secondary_ip == primary_ip or
1241
          existing_node.primary_ip == secondary_ip or
1242
          existing_node.secondary_ip == secondary_ip):
1243
        raise errors.OpPrereqError, ("New node ip address(es) conflict with"
1244
                                     " existing node %s" % existing_node.name)
1245

    
1246
    # check that the type of the node (single versus dual homed) is the
1247
    # same as for the master
1248
    myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1249
    master_singlehomed = myself.secondary_ip == myself.primary_ip
1250
    newbie_singlehomed = secondary_ip == primary_ip
1251
    if master_singlehomed != newbie_singlehomed:
1252
      if master_singlehomed:
1253
        raise errors.OpPrereqError, ("The master has no private ip but the"
1254
                                     " new node has one")
1255
      else:
1256
        raise errors.OpPrereqError ("The master has a private ip but the"
1257
                                    " new node doesn't have one")
1258

    
1259
    # checks reachablity
1260
    command = ["fping", "-q", primary_ip]
1261
    result = utils.RunCmd(command)
1262
    if result.failed:
1263
      raise errors.OpPrereqError, ("Node not reachable by ping")
1264

    
1265
    if not newbie_singlehomed:
1266
      # check reachability from my secondary ip to newbie's secondary ip
1267
      command = ["fping", "-S%s" % myself.secondary_ip, "-q", secondary_ip]
1268
      result = utils.RunCmd(command)
1269
      if result.failed:
1270
        raise errors.OpPrereqError, ("Node secondary ip not reachable by ping")
1271

    
1272
    self.new_node = objects.Node(name=node,
1273
                                 primary_ip=primary_ip,
1274
                                 secondary_ip=secondary_ip)
1275

    
1276
  def Exec(self, feedback_fn):
1277
    """Adds the new node to the cluster.
1278

1279
    """
1280
    new_node = self.new_node
1281
    node = new_node.name
1282

    
1283
    # set up inter-node password and certificate and restarts the node daemon
1284
    gntpass = self.sstore.GetNodeDaemonPassword()
1285
    if not re.match('^[a-zA-Z0-9.]{1,64}$', gntpass):
1286
      raise errors.OpExecError, ("ganeti password corruption detected")
1287
    f = open(constants.SSL_CERT_FILE)
1288
    try:
1289
      gntpem = f.read(8192)
1290
    finally:
1291
      f.close()
1292
    # in the base64 pem encoding, neither '!' nor '.' are valid chars,
1293
    # so we use this to detect an invalid certificate; as long as the
1294
    # cert doesn't contain this, the here-document will be correctly
1295
    # parsed by the shell sequence below
1296
    if re.search('^!EOF\.', gntpem, re.MULTILINE):
1297
      raise errors.OpExecError, ("invalid PEM encoding in the SSL certificate")
1298
    if not gntpem.endswith("\n"):
1299
      raise errors.OpExecError, ("PEM must end with newline")
1300
    logger.Info("copy cluster pass to %s and starting the node daemon" % node)
1301

    
1302
    # remove first the root's known_hosts file
1303
    utils.RemoveFile("/root/.ssh/known_hosts")
1304
    # and then connect with ssh to set password and start ganeti-noded
1305
    # note that all the below variables are sanitized at this point,
1306
    # either by being constants or by the checks above
1307
    ss = self.sstore
1308
    mycommand = ("umask 077 && "
1309
                 "echo '%s' > '%s' && "
1310
                 "cat > '%s' << '!EOF.' && \n"
1311
                 "%s!EOF.\n%s restart" %
1312
                 (gntpass, ss.KeyToFilename(ss.SS_NODED_PASS),
1313
                  constants.SSL_CERT_FILE, gntpem,
1314
                  constants.NODE_INITD_SCRIPT))
1315

    
1316
    result = ssh.SSHCall(node, 'root', mycommand, batch=False, ask_key=True)
1317
    if result.failed:
1318
      raise errors.OpExecError, ("Remote command on node %s, error: %s,"
1319
                                 " output: %s" %
1320
                                 (node, result.fail_reason, result.output))
1321

    
1322
    # check connectivity
1323
    time.sleep(4)
1324

    
1325
    result = rpc.call_version([node])[node]
1326
    if result:
1327
      if constants.PROTOCOL_VERSION == result:
1328
        logger.Info("communication to node %s fine, sw version %s match" %
1329
                    (node, result))
1330
      else:
1331
        raise errors.OpExecError, ("Version mismatch master version %s,"
1332
                                   " node version %s" %
1333
                                   (constants.PROTOCOL_VERSION, result))
1334
    else:
1335
      raise errors.OpExecError, ("Cannot get version from the new node")
1336

    
1337
    # setup ssh on node
1338
    logger.Info("copy ssh key to node %s" % node)
1339
    keyarray = []
1340
    keyfiles = ["/etc/ssh/ssh_host_dsa_key", "/etc/ssh/ssh_host_dsa_key.pub",
1341
                "/etc/ssh/ssh_host_rsa_key", "/etc/ssh/ssh_host_rsa_key.pub",
1342
                "/root/.ssh/id_dsa", "/root/.ssh/id_dsa.pub"]
1343

    
1344
    for i in keyfiles:
1345
      f = open(i, 'r')
1346
      try:
1347
        keyarray.append(f.read())
1348
      finally:
1349
        f.close()
1350

    
1351
    result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1352
                               keyarray[3], keyarray[4], keyarray[5])
1353

    
1354
    if not result:
1355
      raise errors.OpExecError, ("Cannot transfer ssh keys to the new node")
1356

    
1357
    # Add node to our /etc/hosts, and add key to known_hosts
1358
    _UpdateEtcHosts(new_node.name, new_node.primary_ip)
1359
    _UpdateKnownHosts(new_node.name, new_node.primary_ip,
1360
                      self.cfg.GetHostKey())
1361

    
1362
    if new_node.secondary_ip != new_node.primary_ip:
1363
      result = ssh.SSHCall(node, "root",
1364
                           "fping -S 127.0.0.1 -q %s" % new_node.secondary_ip)
1365
      if result.failed:
1366
        raise errors.OpExecError, ("Node claims it doesn't have the"
1367
                                   " secondary ip you gave (%s).\n"
1368
                                   "Please fix and re-run this command." %
1369
                                   new_node.secondary_ip)
1370

    
1371
    # Distribute updated /etc/hosts and known_hosts to all nodes,
1372
    # including the node just added
1373
    myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1374
    dist_nodes = self.cfg.GetNodeList() + [node]
1375
    if myself.name in dist_nodes:
1376
      dist_nodes.remove(myself.name)
1377

    
1378
    logger.Debug("Copying hosts and known_hosts to all nodes")
1379
    for fname in ("/etc/hosts", "/etc/ssh/ssh_known_hosts"):
1380
      result = rpc.call_upload_file(dist_nodes, fname)
1381
      for to_node in dist_nodes:
1382
        if not result[to_node]:
1383
          logger.Error("copy of file %s to node %s failed" %
1384
                       (fname, to_node))
1385

    
1386
    to_copy = ss.GetFileList()
1387
    for fname in to_copy:
1388
      if not ssh.CopyFileToNode(node, fname):
1389
        logger.Error("could not copy file %s to node %s" % (fname, node))
1390

    
1391
    logger.Info("adding node %s to cluster.conf" % node)
1392
    self.cfg.AddNode(new_node)
1393

    
1394

    
1395
class LUMasterFailover(LogicalUnit):
1396
  """Failover the master node to the current node.
1397

1398
  This is a special LU in that it must run on a non-master node.
1399

1400
  """
1401
  HPATH = "master-failover"
1402
  HTYPE = constants.HTYPE_CLUSTER
1403
  REQ_MASTER = False
1404
  _OP_REQP = []
1405

    
1406
  def BuildHooksEnv(self):
1407
    """Build hooks env.
1408

1409
    This will run on the new master only in the pre phase, and on all
1410
    the nodes in the post phase.
1411

1412
    """
1413
    env = {
1414
      "NEW_MASTER": self.new_master,
1415
      "OLD_MASTER": self.old_master,
1416
      }
1417
    return env, [self.new_master], self.cfg.GetNodeList()
1418

    
1419
  def CheckPrereq(self):
1420
    """Check prerequisites.
1421

1422
    This checks that we are not already the master.
1423

1424
    """
1425
    self.new_master = socket.gethostname()
1426

    
1427
    self.old_master = self.sstore.GetMasterNode()
1428

    
1429
    if self.old_master == self.new_master:
1430
      raise errors.OpPrereqError, ("This commands must be run on the node"
1431
                                   " where you want the new master to be.\n"
1432
                                   "%s is already the master" %
1433
                                   self.old_master)
1434

    
1435
  def Exec(self, feedback_fn):
1436
    """Failover the master node.
1437

1438
    This command, when run on a non-master node, will cause the current
1439
    master to cease being master, and the non-master to become new
1440
    master.
1441

1442
    """
1443
    #TODO: do not rely on gethostname returning the FQDN
1444
    logger.Info("setting master to %s, old master: %s" %
1445
                (self.new_master, self.old_master))
1446

    
1447
    if not rpc.call_node_stop_master(self.old_master):
1448
      logger.Error("could disable the master role on the old master"
1449
                   " %s, please disable manually" % self.old_master)
1450

    
1451
    ss = self.sstore
1452
    ss.SetKey(ss.SS_MASTER_NODE, self.new_master)
1453
    if not rpc.call_upload_file(self.cfg.GetNodeList(),
1454
                                ss.KeyToFilename(ss.SS_MASTER_NODE)):
1455
      logger.Error("could not distribute the new simple store master file"
1456
                   " to the other nodes, please check.")
1457

    
1458
    if not rpc.call_node_start_master(self.new_master):
1459
      logger.Error("could not start the master role on the new master"
1460
                   " %s, please check" % self.new_master)
1461
      feedback_fn("Error in activating the master IP on the new master,\n"
1462
                  "please fix manually.")
1463

    
1464

    
1465

    
1466
class LUQueryClusterInfo(NoHooksLU):
1467
  """Query cluster configuration.
1468

1469
  """
1470
  _OP_REQP = []
1471
  REQ_MASTER = False
1472

    
1473
  def CheckPrereq(self):
1474
    """No prerequsites needed for this LU.
1475

1476
    """
1477
    pass
1478

    
1479
  def Exec(self, feedback_fn):
1480
    """Return cluster config.
1481

1482
    """
1483
    result = {
1484
      "name": self.sstore.GetClusterName(),
1485
      "software_version": constants.RELEASE_VERSION,
1486
      "protocol_version": constants.PROTOCOL_VERSION,
1487
      "config_version": constants.CONFIG_VERSION,
1488
      "os_api_version": constants.OS_API_VERSION,
1489
      "export_version": constants.EXPORT_VERSION,
1490
      "master": self.sstore.GetMasterNode(),
1491
      "architecture": (platform.architecture()[0], platform.machine()),
1492
      }
1493

    
1494
    return result
1495

    
1496

    
1497
class LUClusterCopyFile(NoHooksLU):
1498
  """Copy file to cluster.
1499

1500
  """
1501
  _OP_REQP = ["nodes", "filename"]
1502

    
1503
  def CheckPrereq(self):
1504
    """Check prerequisites.
1505

1506
    It should check that the named file exists and that the given list
1507
    of nodes is valid.
1508

1509
    """
1510
    if not os.path.exists(self.op.filename):
1511
      raise errors.OpPrereqError("No such filename '%s'" % self.op.filename)
1512

    
1513
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1514

    
1515
  def Exec(self, feedback_fn):
1516
    """Copy a file from master to some nodes.
1517

1518
    Args:
1519
      opts - class with options as members
1520
      args - list containing a single element, the file name
1521
    Opts used:
1522
      nodes - list containing the name of target nodes; if empty, all nodes
1523

1524
    """
1525
    filename = self.op.filename
1526

    
1527
    myname = socket.gethostname()
1528

    
1529
    for node in self.nodes:
1530
      if node == myname:
1531
        continue
1532
      if not ssh.CopyFileToNode(node, filename):
1533
        logger.Error("Copy of file %s to node %s failed" % (filename, node))
1534

    
1535

    
1536
class LUDumpClusterConfig(NoHooksLU):
1537
  """Return a text-representation of the cluster-config.
1538

1539
  """
1540
  _OP_REQP = []
1541

    
1542
  def CheckPrereq(self):
1543
    """No prerequisites.
1544

1545
    """
1546
    pass
1547

    
1548
  def Exec(self, feedback_fn):
1549
    """Dump a representation of the cluster config to the standard output.
1550

1551
    """
1552
    return self.cfg.DumpConfig()
1553

    
1554

    
1555
class LURunClusterCommand(NoHooksLU):
1556
  """Run a command on some nodes.
1557

1558
  """
1559
  _OP_REQP = ["command", "nodes"]
1560

    
1561
  def CheckPrereq(self):
1562
    """Check prerequisites.
1563

1564
    It checks that the given list of nodes is valid.
1565

1566
    """
1567
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1568

    
1569
  def Exec(self, feedback_fn):
1570
    """Run a command on some nodes.
1571

1572
    """
1573
    data = []
1574
    for node in self.nodes:
1575
      result = utils.RunCmd(["ssh", node.name, self.op.command])
1576
      data.append((node.name, result.cmd, result.output, result.exit_code))
1577

    
1578
    return data
1579

    
1580

    
1581
class LUActivateInstanceDisks(NoHooksLU):
1582
  """Bring up an instance's disks.
1583

1584
  """
1585
  _OP_REQP = ["instance_name"]
1586

    
1587
  def CheckPrereq(self):
1588
    """Check prerequisites.
1589

1590
    This checks that the instance is in the cluster.
1591

1592
    """
1593
    instance = self.cfg.GetInstanceInfo(
1594
      self.cfg.ExpandInstanceName(self.op.instance_name))
1595
    if instance is None:
1596
      raise errors.OpPrereqError, ("Instance '%s' not known" %
1597
                                   self.op.instance_name)
1598
    self.instance = instance
1599

    
1600

    
1601
  def Exec(self, feedback_fn):
1602
    """Activate the disks.
1603

1604
    """
1605
    disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
1606
    if not disks_ok:
1607
      raise errors.OpExecError, ("Cannot activate block devices")
1608

    
1609
    return disks_info
1610

    
1611

    
1612
def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
1613
  """Prepare the block devices for an instance.
1614

1615
  This sets up the block devices on all nodes.
1616

1617
  Args:
1618
    instance: a ganeti.objects.Instance object
1619
    ignore_secondaries: if true, errors on secondary nodes won't result
1620
                        in an error return from the function
1621

1622
  Returns:
1623
    false if the operation failed
1624
    list of (host, instance_visible_name, node_visible_name) if the operation
1625
         suceeded with the mapping from node devices to instance devices
1626
  """
1627
  device_info = []
1628
  disks_ok = True
1629
  for inst_disk in instance.disks:
1630
    master_result = None
1631
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1632
      cfg.SetDiskID(node_disk, node)
1633
      is_primary = node == instance.primary_node
1634
      result = rpc.call_blockdev_assemble(node, node_disk, is_primary)
1635
      if not result:
1636
        logger.Error("could not prepare block device %s on node %s (is_pri"
1637
                     "mary=%s)" % (inst_disk.iv_name, node, is_primary))
1638
        if is_primary or not ignore_secondaries:
1639
          disks_ok = False
1640
      if is_primary:
1641
        master_result = result
1642
    device_info.append((instance.primary_node, inst_disk.iv_name,
1643
                        master_result))
1644

    
1645
  return disks_ok, device_info
1646

    
1647

    
1648
def _StartInstanceDisks(cfg, instance, force):
1649
  disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
1650
                                           ignore_secondaries=force)
1651
  if not disks_ok:
1652
    _ShutdownInstanceDisks(instance, cfg)
1653
    if force is not None and not force:
1654
      logger.Error("If the message above refers to a secondary node,"
1655
                   " you can retry the operation using '--force'.")
1656
    raise errors.OpExecError, ("Disk consistency error")
1657

    
1658

    
1659
class LUDeactivateInstanceDisks(NoHooksLU):
1660
  """Shutdown an instance's disks.
1661

1662
  """
1663
  _OP_REQP = ["instance_name"]
1664

    
1665
  def CheckPrereq(self):
1666
    """Check prerequisites.
1667

1668
    This checks that the instance is in the cluster.
1669

1670
    """
1671
    instance = self.cfg.GetInstanceInfo(
1672
      self.cfg.ExpandInstanceName(self.op.instance_name))
1673
    if instance is None:
1674
      raise errors.OpPrereqError, ("Instance '%s' not known" %
1675
                                   self.op.instance_name)
1676
    self.instance = instance
1677

    
1678
  def Exec(self, feedback_fn):
1679
    """Deactivate the disks
1680

1681
    """
1682
    instance = self.instance
1683
    ins_l = rpc.call_instance_list([instance.primary_node])
1684
    ins_l = ins_l[instance.primary_node]
1685
    if not type(ins_l) is list:
1686
      raise errors.OpExecError, ("Can't contact node '%s'" %
1687
                                 instance.primary_node)
1688

    
1689
    if self.instance.name in ins_l:
1690
      raise errors.OpExecError, ("Instance is running, can't shutdown"
1691
                                 " block devices.")
1692

    
1693
    _ShutdownInstanceDisks(instance, self.cfg)
1694

    
1695

    
1696
def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
1697
  """Shutdown block devices of an instance.
1698

1699
  This does the shutdown on all nodes of the instance.
1700

1701
  If the ignore_primary is false, errors on the primary node are
1702
  ignored.
1703

1704
  """
1705
  result = True
1706
  for disk in instance.disks:
1707
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
1708
      cfg.SetDiskID(top_disk, node)
1709
      if not rpc.call_blockdev_shutdown(node, top_disk):
1710
        logger.Error("could not shutdown block device %s on node %s" %
1711
                     (disk.iv_name, node))
1712
        if not ignore_primary or node != instance.primary_node:
1713
          result = False
1714
  return result
1715

    
1716

    
1717
class LUStartupInstance(LogicalUnit):
1718
  """Starts an instance.
1719

1720
  """
1721
  HPATH = "instance-start"
1722
  HTYPE = constants.HTYPE_INSTANCE
1723
  _OP_REQP = ["instance_name", "force"]
1724

    
1725
  def BuildHooksEnv(self):
1726
    """Build hooks env.
1727

1728
    This runs on master, primary and secondary nodes of the instance.
1729

1730
    """
1731
    env = {
1732
      "INSTANCE_NAME": self.op.instance_name,
1733
      "INSTANCE_PRIMARY": self.instance.primary_node,
1734
      "INSTANCE_SECONDARIES": " ".join(self.instance.secondary_nodes),
1735
      "FORCE": self.op.force,
1736
      }
1737
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
1738
          list(self.instance.secondary_nodes))
1739
    return env, nl, nl
1740

    
1741
  def CheckPrereq(self):
1742
    """Check prerequisites.
1743

1744
    This checks that the instance is in the cluster.
1745

1746
    """
1747
    instance = self.cfg.GetInstanceInfo(
1748
      self.cfg.ExpandInstanceName(self.op.instance_name))
1749
    if instance is None:
1750
      raise errors.OpPrereqError, ("Instance '%s' not known" %
1751
                                   self.op.instance_name)
1752

    
1753
    # check bridges existance
1754
    brlist = [nic.bridge for nic in instance.nics]
1755
    if not rpc.call_bridges_exist(instance.primary_node, brlist):
1756
      raise errors.OpPrereqError, ("one or more target bridges %s does not"
1757
                                   " exist on destination node '%s'" %
1758
                                   (brlist, instance.primary_node))
1759

    
1760
    self.instance = instance
1761
    self.op.instance_name = instance.name
1762

    
1763
  def Exec(self, feedback_fn):
1764
    """Start the instance.
1765

1766
    """
1767
    instance = self.instance
1768
    force = self.op.force
1769
    extra_args = getattr(self.op, "extra_args", "")
1770

    
1771
    node_current = instance.primary_node
1772

    
1773
    nodeinfo = rpc.call_node_info([node_current], self.cfg.GetVGName())
1774
    if not nodeinfo:
1775
      raise errors.OpExecError, ("Could not contact node %s for infos" %
1776
                                 (node_current))
1777

    
1778
    freememory = nodeinfo[node_current]['memory_free']
1779
    memory = instance.memory
1780
    if memory > freememory:
1781
      raise errors.OpExecError, ("Not enough memory to start instance"
1782
                                 " %s on node %s"
1783
                                 " needed %s MiB, available %s MiB" %
1784
                                 (instance.name, node_current, memory,
1785
                                  freememory))
1786

    
1787
    _StartInstanceDisks(self.cfg, instance, force)
1788

    
1789
    if not rpc.call_instance_start(node_current, instance, extra_args):
1790
      _ShutdownInstanceDisks(instance, self.cfg)
1791
      raise errors.OpExecError, ("Could not start instance")
1792

    
1793
    self.cfg.MarkInstanceUp(instance.name)
1794

    
1795

    
1796
class LUShutdownInstance(LogicalUnit):
1797
  """Shutdown an instance.
1798

1799
  """
1800
  HPATH = "instance-stop"
1801
  HTYPE = constants.HTYPE_INSTANCE
1802
  _OP_REQP = ["instance_name"]
1803

    
1804
  def BuildHooksEnv(self):
1805
    """Build hooks env.
1806

1807
    This runs on master, primary and secondary nodes of the instance.
1808

1809
    """
1810
    env = {
1811
      "INSTANCE_NAME": self.op.instance_name,
1812
      "INSTANCE_PRIMARY": self.instance.primary_node,
1813
      "INSTANCE_SECONDARIES": " ".join(self.instance.secondary_nodes),
1814
      }
1815
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
1816
          list(self.instance.secondary_nodes))
1817
    return env, nl, nl
1818

    
1819
  def CheckPrereq(self):
1820
    """Check prerequisites.
1821

1822
    This checks that the instance is in the cluster.
1823

1824
    """
1825
    instance = self.cfg.GetInstanceInfo(
1826
      self.cfg.ExpandInstanceName(self.op.instance_name))
1827
    if instance is None:
1828
      raise errors.OpPrereqError, ("Instance '%s' not known" %
1829
                                   self.op.instance_name)
1830
    self.instance = instance
1831

    
1832
  def Exec(self, feedback_fn):
1833
    """Shutdown the instance.
1834

1835
    """
1836
    instance = self.instance
1837
    node_current = instance.primary_node
1838
    if not rpc.call_instance_shutdown(node_current, instance):
1839
      logger.Error("could not shutdown instance")
1840

    
1841
    self.cfg.MarkInstanceDown(instance.name)
1842
    _ShutdownInstanceDisks(instance, self.cfg)
1843

    
1844

    
1845
class LUReinstallInstance(LogicalUnit):
1846
  """Reinstall an instance.
1847

1848
  """
1849
  HPATH = "instance-reinstall"
1850
  HTYPE = constants.HTYPE_INSTANCE
1851
  _OP_REQP = ["instance_name"]
1852

    
1853
  def BuildHooksEnv(self):
1854
    """Build hooks env.
1855

1856
    This runs on master, primary and secondary nodes of the instance.
1857

1858
    """
1859
    env = {
1860
      "INSTANCE_NAME": self.op.instance_name,
1861
      "INSTANCE_PRIMARY": self.instance.primary_node,
1862
      "INSTANCE_SECONDARIES": " ".join(self.instance.secondary_nodes),
1863
      }
1864
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
1865
          list(self.instance.secondary_nodes))
1866
    return env, nl, nl
1867

    
1868
  def CheckPrereq(self):
1869
    """Check prerequisites.
1870

1871
    This checks that the instance is in the cluster and is not running.
1872

1873
    """
1874
    instance = self.cfg.GetInstanceInfo(
1875
      self.cfg.ExpandInstanceName(self.op.instance_name))
1876
    if instance is None:
1877
      raise errors.OpPrereqError, ("Instance '%s' not known" %
1878
                                   self.op.instance_name)
1879
    if instance.disk_template == constants.DT_DISKLESS:
1880
      raise errors.OpPrereqError, ("Instance '%s' has no disks" %
1881
                                   self.op.instance_name)
1882
    if instance.status != "down":
1883
      raise errors.OpPrereqError, ("Instance '%s' is marked to be up" %
1884
                                   self.op.instance_name)
1885
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
1886
    if remote_info:
1887
      raise errors.OpPrereqError, ("Instance '%s' is running on the node %s" %
1888
                                   (self.op.instance_name,
1889
                                    instance.primary_node))
1890

    
1891
    self.op.os_type = getattr(self.op, "os_type", None)
1892
    if self.op.os_type is not None:
1893
      # OS verification
1894
      pnode = self.cfg.GetNodeInfo(
1895
        self.cfg.ExpandNodeName(instance.primary_node))
1896
      if pnode is None:
1897
        raise errors.OpPrereqError, ("Primary node '%s' is unknown" %
1898
                                     self.op.pnode)
1899
      os_obj = rpc.call_os_get([pnode.name], self.op.os_type)[pnode.name]
1900
      if not isinstance(os_obj, objects.OS):
1901
        raise errors.OpPrereqError, ("OS '%s' not in supported OS list for"
1902
                                     " primary node"  % self.op.os_type)
1903

    
1904
    self.instance = instance
1905

    
1906
  def Exec(self, feedback_fn):
1907
    """Reinstall the instance.
1908

1909
    """
1910
    inst = self.instance
1911

    
1912
    if self.op.os_type is not None:
1913
      feedback_fn("Changing OS to '%s'..." % self.op.os_type)
1914
      inst.os = self.op.os_type
1915
      self.cfg.AddInstance(inst)
1916

    
1917
    _StartInstanceDisks(self.cfg, inst, None)
1918
    try:
1919
      feedback_fn("Running the instance OS create scripts...")
1920
      if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
1921
        raise errors.OpExecError, ("Could not install OS for instance %s "
1922
                                   "on node %s" %
1923
                                   (inst.name, inst.primary_node))
1924
    finally:
1925
      _ShutdownInstanceDisks(inst, self.cfg)
1926

    
1927

    
1928
class LURemoveInstance(LogicalUnit):
1929
  """Remove an instance.
1930

1931
  """
1932
  HPATH = "instance-remove"
1933
  HTYPE = constants.HTYPE_INSTANCE
1934
  _OP_REQP = ["instance_name"]
1935

    
1936
  def BuildHooksEnv(self):
1937
    """Build hooks env.
1938

1939
    This runs on master, primary and secondary nodes of the instance.
1940

1941
    """
1942
    env = {
1943
      "INSTANCE_NAME": self.op.instance_name,
1944
      "INSTANCE_PRIMARY": self.instance.primary_node,
1945
      "INSTANCE_SECONDARIES": " ".join(self.instance.secondary_nodes),
1946
      }
1947
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
1948
          list(self.instance.secondary_nodes))
1949
    return env, nl, nl
1950

    
1951
  def CheckPrereq(self):
1952
    """Check prerequisites.
1953

1954
    This checks that the instance is in the cluster.
1955

1956
    """
1957
    instance = self.cfg.GetInstanceInfo(
1958
      self.cfg.ExpandInstanceName(self.op.instance_name))
1959
    if instance is None:
1960
      raise errors.OpPrereqError, ("Instance '%s' not known" %
1961
                                   self.op.instance_name)
1962
    self.instance = instance
1963

    
1964
  def Exec(self, feedback_fn):
1965
    """Remove the instance.
1966

1967
    """
1968
    instance = self.instance
1969
    logger.Info("shutting down instance %s on node %s" %
1970
                (instance.name, instance.primary_node))
1971

    
1972
    if not rpc.call_instance_shutdown(instance.primary_node, instance):
1973
      raise errors.OpExecError, ("Could not shutdown instance %s on node %s" %
1974
                                 (instance.name, instance.primary_node))
1975

    
1976
    logger.Info("removing block devices for instance %s" % instance.name)
1977

    
1978
    _RemoveDisks(instance, self.cfg)
1979

    
1980
    logger.Info("removing instance %s out of cluster config" % instance.name)
1981

    
1982
    self.cfg.RemoveInstance(instance.name)
1983

    
1984

    
1985
class LUQueryInstances(NoHooksLU):
1986
  """Logical unit for querying instances.
1987

1988
  """
1989
  _OP_REQP = ["output_fields"]
1990

    
1991
  def CheckPrereq(self):
1992
    """Check prerequisites.
1993

1994
    This checks that the fields required are valid output fields.
1995

1996
    """
1997
    self.dynamic_fields = frozenset(["oper_state", "oper_ram"])
1998
    _CheckOutputFields(static=["name", "os", "pnode", "snodes",
1999
                               "admin_state", "admin_ram",
2000
                               "disk_template", "ip", "mac", "bridge"],
2001
                       dynamic=self.dynamic_fields,
2002
                       selected=self.op.output_fields)
2003

    
2004
  def Exec(self, feedback_fn):
2005
    """Computes the list of nodes and their attributes.
2006

2007
    """
2008
    instance_names = utils.NiceSort(self.cfg.GetInstanceList())
2009
    instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2010
                     in instance_names]
2011

    
2012
    # begin data gathering
2013

    
2014
    nodes = frozenset([inst.primary_node for inst in instance_list])
2015

    
2016
    bad_nodes = []
2017
    if self.dynamic_fields.intersection(self.op.output_fields):
2018
      live_data = {}
2019
      node_data = rpc.call_all_instances_info(nodes)
2020
      for name in nodes:
2021
        result = node_data[name]
2022
        if result:
2023
          live_data.update(result)
2024
        elif result == False:
2025
          bad_nodes.append(name)
2026
        # else no instance is alive
2027
    else:
2028
      live_data = dict([(name, {}) for name in instance_names])
2029

    
2030
    # end data gathering
2031

    
2032
    output = []
2033
    for instance in instance_list:
2034
      iout = []
2035
      for field in self.op.output_fields:
2036
        if field == "name":
2037
          val = instance.name
2038
        elif field == "os":
2039
          val = instance.os
2040
        elif field == "pnode":
2041
          val = instance.primary_node
2042
        elif field == "snodes":
2043
          val = ",".join(instance.secondary_nodes) or "-"
2044
        elif field == "admin_state":
2045
          if instance.status == "down":
2046
            val = "no"
2047
          else:
2048
            val = "yes"
2049
        elif field == "oper_state":
2050
          if instance.primary_node in bad_nodes:
2051
            val = "(node down)"
2052
          else:
2053
            if live_data.get(instance.name):
2054
              val = "running"
2055
            else:
2056
              val = "stopped"
2057
        elif field == "admin_ram":
2058
          val = instance.memory
2059
        elif field == "oper_ram":
2060
          if instance.primary_node in bad_nodes:
2061
            val = "(node down)"
2062
          elif instance.name in live_data:
2063
            val = live_data[instance.name].get("memory", "?")
2064
          else:
2065
            val = "-"
2066
        elif field == "disk_template":
2067
          val = instance.disk_template
2068
        elif field == "ip":
2069
          val = instance.nics[0].ip
2070
        elif field == "bridge":
2071
          val = instance.nics[0].bridge
2072
        elif field == "mac":
2073
          val = instance.nics[0].mac
2074
        else:
2075
          raise errors.ParameterError, field
2076
        val = str(val)
2077
        iout.append(val)
2078
      output.append(iout)
2079

    
2080
    return output
2081

    
2082

    
2083
class LUFailoverInstance(LogicalUnit):
2084
  """Failover an instance.
2085

2086
  """
2087
  HPATH = "instance-failover"
2088
  HTYPE = constants.HTYPE_INSTANCE
2089
  _OP_REQP = ["instance_name", "ignore_consistency"]
2090

    
2091
  def BuildHooksEnv(self):
2092
    """Build hooks env.
2093

2094
    This runs on master, primary and secondary nodes of the instance.
2095

2096
    """
2097
    env = {
2098
      "INSTANCE_NAME": self.op.instance_name,
2099
      "INSTANCE_PRIMARY": self.instance.primary_node,
2100
      "INSTANCE_SECONDARIES": " ".join(self.instance.secondary_nodes),
2101
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2102
      }
2103
    nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2104
    return env, nl, nl
2105

    
2106
  def CheckPrereq(self):
2107
    """Check prerequisites.
2108

2109
    This checks that the instance is in the cluster.
2110

2111
    """
2112
    instance = self.cfg.GetInstanceInfo(
2113
      self.cfg.ExpandInstanceName(self.op.instance_name))
2114
    if instance is None:
2115
      raise errors.OpPrereqError, ("Instance '%s' not known" %
2116
                                   self.op.instance_name)
2117

    
2118
    # check memory requirements on the secondary node
2119
    target_node = instance.secondary_nodes[0]
2120
    nodeinfo = rpc.call_node_info([target_node], self.cfg.GetVGName())
2121
    info = nodeinfo.get(target_node, None)
2122
    if not info:
2123
      raise errors.OpPrereqError, ("Cannot get current information"
2124
                                   " from node '%s'" % nodeinfo)
2125
    if instance.memory > info['memory_free']:
2126
      raise errors.OpPrereqError, ("Not enough memory on target node %s."
2127
                                   " %d MB available, %d MB required" %
2128
                                   (target_node, info['memory_free'],
2129
                                    instance.memory))
2130

    
2131
    # check bridge existance
2132
    brlist = [nic.bridge for nic in instance.nics]
2133
    if not rpc.call_bridges_exist(instance.primary_node, brlist):
2134
      raise errors.OpPrereqError, ("one or more target bridges %s does not"
2135
                                   " exist on destination node '%s'" %
2136
                                   (brlist, instance.primary_node))
2137

    
2138
    self.instance = instance
2139

    
2140
  def Exec(self, feedback_fn):
2141
    """Failover an instance.
2142

2143
    The failover is done by shutting it down on its present node and
2144
    starting it on the secondary.
2145

2146
    """
2147
    instance = self.instance
2148

    
2149
    source_node = instance.primary_node
2150
    target_node = instance.secondary_nodes[0]
2151

    
2152
    feedback_fn("* checking disk consistency between source and target")
2153
    for dev in instance.disks:
2154
      # for remote_raid1, these are md over drbd
2155
      if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2156
        if not self.op.ignore_consistency:
2157
          raise errors.OpExecError, ("Disk %s is degraded on target node,"
2158
                                     " aborting failover." % dev.iv_name)
2159

    
2160
    feedback_fn("* checking target node resource availability")
2161
    nodeinfo = rpc.call_node_info([target_node], self.cfg.GetVGName())
2162

    
2163
    if not nodeinfo:
2164
      raise errors.OpExecError, ("Could not contact target node %s." %
2165
                                 target_node)
2166

    
2167
    free_memory = int(nodeinfo[target_node]['memory_free'])
2168
    memory = instance.memory
2169
    if memory > free_memory:
2170
      raise errors.OpExecError, ("Not enough memory to create instance %s on"
2171
                                 " node %s. needed %s MiB, available %s MiB" %
2172
                                 (instance.name, target_node, memory,
2173
                                  free_memory))
2174

    
2175
    feedback_fn("* shutting down instance on source node")
2176
    logger.Info("Shutting down instance %s on node %s" %
2177
                (instance.name, source_node))
2178

    
2179
    if not rpc.call_instance_shutdown(source_node, instance):
2180
      logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2181
                   " anyway. Please make sure node %s is down"  %
2182
                   (instance.name, source_node, source_node))
2183

    
2184
    feedback_fn("* deactivating the instance's disks on source node")
2185
    if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2186
      raise errors.OpExecError, ("Can't shut down the instance's disks.")
2187

    
2188
    instance.primary_node = target_node
2189
    # distribute new instance config to the other nodes
2190
    self.cfg.AddInstance(instance)
2191

    
2192
    feedback_fn("* activating the instance's disks on target node")
2193
    logger.Info("Starting instance %s on node %s" %
2194
                (instance.name, target_node))
2195

    
2196
    disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2197
                                             ignore_secondaries=True)
2198
    if not disks_ok:
2199
      _ShutdownInstanceDisks(instance, self.cfg)
2200
      raise errors.OpExecError, ("Can't activate the instance's disks")
2201

    
2202
    feedback_fn("* starting the instance on the target node")
2203
    if not rpc.call_instance_start(target_node, instance, None):
2204
      _ShutdownInstanceDisks(instance, self.cfg)
2205
      raise errors.OpExecError("Could not start instance %s on node %s." %
2206
                               (instance.name, target_node))
2207

    
2208

    
2209
def _CreateBlockDevOnPrimary(cfg, node, device):
2210
  """Create a tree of block devices on the primary node.
2211

2212
  This always creates all devices.
2213

2214
  """
2215
  if device.children:
2216
    for child in device.children:
2217
      if not _CreateBlockDevOnPrimary(cfg, node, child):
2218
        return False
2219

    
2220
  cfg.SetDiskID(device, node)
2221
  new_id = rpc.call_blockdev_create(node, device, device.size, True)
2222
  if not new_id:
2223
    return False
2224
  if device.physical_id is None:
2225
    device.physical_id = new_id
2226
  return True
2227

    
2228

    
2229
def _CreateBlockDevOnSecondary(cfg, node, device, force):
2230
  """Create a tree of block devices on a secondary node.
2231

2232
  If this device type has to be created on secondaries, create it and
2233
  all its children.
2234

2235
  If not, just recurse to children keeping the same 'force' value.
2236

2237
  """
2238
  if device.CreateOnSecondary():
2239
    force = True
2240
  if device.children:
2241
    for child in device.children:
2242
      if not _CreateBlockDevOnSecondary(cfg, node, child, force):
2243
        return False
2244

    
2245
  if not force:
2246
    return True
2247
  cfg.SetDiskID(device, node)
2248
  new_id = rpc.call_blockdev_create(node, device, device.size, False)
2249
  if not new_id:
2250
    return False
2251
  if device.physical_id is None:
2252
    device.physical_id = new_id
2253
  return True
2254

    
2255

    
2256
def _GenerateMDDRBDBranch(cfg, vgname, primary, secondary, size, base):
2257
  """Generate a drbd device complete with its children.
2258

2259
  """
2260
  port = cfg.AllocatePort()
2261
  base = "%s_%s" % (base, port)
2262
  dev_data = objects.Disk(dev_type="lvm", size=size,
2263
                          logical_id=(vgname, "%s.data" % base))
2264
  dev_meta = objects.Disk(dev_type="lvm", size=128,
2265
                          logical_id=(vgname, "%s.meta" % base))
2266
  drbd_dev = objects.Disk(dev_type="drbd", size=size,
2267
                          logical_id = (primary, secondary, port),
2268
                          children = [dev_data, dev_meta])
2269
  return drbd_dev
2270

    
2271

    
2272
def _GenerateDiskTemplate(cfg, vgname, template_name,
2273
                          instance_name, primary_node,
2274
                          secondary_nodes, disk_sz, swap_sz):
2275
  """Generate the entire disk layout for a given template type.
2276

2277
  """
2278
  #TODO: compute space requirements
2279

    
2280
  if template_name == "diskless":
2281
    disks = []
2282
  elif template_name == "plain":
2283
    if len(secondary_nodes) != 0:
2284
      raise errors.ProgrammerError("Wrong template configuration")
2285
    sda_dev = objects.Disk(dev_type="lvm", size=disk_sz,
2286
                           logical_id=(vgname, "%s.os" % instance_name),
2287
                           iv_name = "sda")
2288
    sdb_dev = objects.Disk(dev_type="lvm", size=swap_sz,
2289
                           logical_id=(vgname, "%s.swap" % instance_name),
2290
                           iv_name = "sdb")
2291
    disks = [sda_dev, sdb_dev]
2292
  elif template_name == "local_raid1":
2293
    if len(secondary_nodes) != 0:
2294
      raise errors.ProgrammerError("Wrong template configuration")
2295
    sda_dev_m1 = objects.Disk(dev_type="lvm", size=disk_sz,
2296
                              logical_id=(vgname, "%s.os_m1" % instance_name))
2297
    sda_dev_m2 = objects.Disk(dev_type="lvm", size=disk_sz,
2298
                              logical_id=(vgname, "%s.os_m2" % instance_name))
2299
    md_sda_dev = objects.Disk(dev_type="md_raid1", iv_name = "sda",
2300
                              size=disk_sz,
2301
                              children = [sda_dev_m1, sda_dev_m2])
2302
    sdb_dev_m1 = objects.Disk(dev_type="lvm", size=swap_sz,
2303
                              logical_id=(vgname, "%s.swap_m1" %
2304
                                          instance_name))
2305
    sdb_dev_m2 = objects.Disk(dev_type="lvm", size=swap_sz,
2306
                              logical_id=(vgname, "%s.swap_m2" %
2307
                                          instance_name))
2308
    md_sdb_dev = objects.Disk(dev_type="md_raid1", iv_name = "sdb",
2309
                              size=swap_sz,
2310
                              children = [sdb_dev_m1, sdb_dev_m2])
2311
    disks = [md_sda_dev, md_sdb_dev]
2312
  elif template_name == "remote_raid1":
2313
    if len(secondary_nodes) != 1:
2314
      raise errors.ProgrammerError("Wrong template configuration")
2315
    remote_node = secondary_nodes[0]
2316
    drbd_sda_dev = _GenerateMDDRBDBranch(cfg, vgname,
2317
                                         primary_node, remote_node, disk_sz,
2318
                                         "%s-sda" % instance_name)
2319
    md_sda_dev = objects.Disk(dev_type="md_raid1", iv_name="sda",
2320
                              children = [drbd_sda_dev], size=disk_sz)
2321
    drbd_sdb_dev = _GenerateMDDRBDBranch(cfg, vgname,
2322
                                         primary_node, remote_node, swap_sz,
2323
                                         "%s-sdb" % instance_name)
2324
    md_sdb_dev = objects.Disk(dev_type="md_raid1", iv_name="sdb",
2325
                              children = [drbd_sdb_dev], size=swap_sz)
2326
    disks = [md_sda_dev, md_sdb_dev]
2327
  else:
2328
    raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
2329
  return disks
2330

    
2331

    
2332
def _CreateDisks(cfg, instance):
2333
  """Create all disks for an instance.
2334

2335
  This abstracts away some work from AddInstance.
2336

2337
  Args:
2338
    instance: the instance object
2339

2340
  Returns:
2341
    True or False showing the success of the creation process
2342

2343
  """
2344
  for device in instance.disks:
2345
    logger.Info("creating volume %s for instance %s" %
2346
              (device.iv_name, instance.name))
2347
    #HARDCODE
2348
    for secondary_node in instance.secondary_nodes:
2349
      if not _CreateBlockDevOnSecondary(cfg, secondary_node, device, False):
2350
        logger.Error("failed to create volume %s (%s) on secondary node %s!" %
2351
                     (device.iv_name, device, secondary_node))
2352
        return False
2353
    #HARDCODE
2354
    if not _CreateBlockDevOnPrimary(cfg, instance.primary_node, device):
2355
      logger.Error("failed to create volume %s on primary!" %
2356
                   device.iv_name)
2357
      return False
2358
  return True
2359

    
2360

    
2361
def _RemoveDisks(instance, cfg):
2362
  """Remove all disks for an instance.
2363

2364
  This abstracts away some work from `AddInstance()` and
2365
  `RemoveInstance()`. Note that in case some of the devices couldn't
2366
  be remove, the removal will continue with the other ones (compare
2367
  with `_CreateDisks()`).
2368

2369
  Args:
2370
    instance: the instance object
2371

2372
  Returns:
2373
    True or False showing the success of the removal proces
2374

2375
  """
2376
  logger.Info("removing block devices for instance %s" % instance.name)
2377

    
2378
  result = True
2379
  for device in instance.disks:
2380
    for node, disk in device.ComputeNodeTree(instance.primary_node):
2381
      cfg.SetDiskID(disk, node)
2382
      if not rpc.call_blockdev_remove(node, disk):
2383
        logger.Error("could not remove block device %s on node %s,"
2384
                     " continuing anyway" %
2385
                     (device.iv_name, node))
2386
        result = False
2387
  return result
2388

    
2389

    
2390
class LUCreateInstance(LogicalUnit):
2391
  """Create an instance.
2392

2393
  """
2394
  HPATH = "instance-add"
2395
  HTYPE = constants.HTYPE_INSTANCE
2396
  _OP_REQP = ["instance_name", "mem_size", "disk_size", "pnode",
2397
              "disk_template", "swap_size", "mode", "start", "vcpus",
2398
              "wait_for_sync"]
2399

    
2400
  def BuildHooksEnv(self):
2401
    """Build hooks env.
2402

2403
    This runs on master, primary and secondary nodes of the instance.
2404

2405
    """
2406
    env = {
2407
      "INSTANCE_NAME": self.op.instance_name,
2408
      "INSTANCE_PRIMARY": self.op.pnode,
2409
      "INSTANCE_SECONDARIES": " ".join(self.secondaries),
2410
      "DISK_TEMPLATE": self.op.disk_template,
2411
      "MEM_SIZE": self.op.mem_size,
2412
      "DISK_SIZE": self.op.disk_size,
2413
      "SWAP_SIZE": self.op.swap_size,
2414
      "VCPUS": self.op.vcpus,
2415
      "BRIDGE": self.op.bridge,
2416
      "INSTANCE_ADD_MODE": self.op.mode,
2417
      }
2418
    if self.op.mode == constants.INSTANCE_IMPORT:
2419
      env["SRC_NODE"] = self.op.src_node
2420
      env["SRC_PATH"] = self.op.src_path
2421
      env["SRC_IMAGE"] = self.src_image
2422
    if self.inst_ip:
2423
      env["INSTANCE_IP"] = self.inst_ip
2424

    
2425
    nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
2426
          self.secondaries)
2427
    return env, nl, nl
2428

    
2429

    
2430
  def CheckPrereq(self):
2431
    """Check prerequisites.
2432

2433
    """
2434
    if self.op.mode not in (constants.INSTANCE_CREATE,
2435
                            constants.INSTANCE_IMPORT):
2436
      raise errors.OpPrereqError, ("Invalid instance creation mode '%s'" %
2437
                                   self.op.mode)
2438

    
2439
    if self.op.mode == constants.INSTANCE_IMPORT:
2440
      src_node = getattr(self.op, "src_node", None)
2441
      src_path = getattr(self.op, "src_path", None)
2442
      if src_node is None or src_path is None:
2443
        raise errors.OpPrereqError, ("Importing an instance requires source"
2444
                                     " node and path options")
2445
      src_node_full = self.cfg.ExpandNodeName(src_node)
2446
      if src_node_full is None:
2447
        raise errors.OpPrereqError, ("Unknown source node '%s'" % src_node)
2448
      self.op.src_node = src_node = src_node_full
2449

    
2450
      if not os.path.isabs(src_path):
2451
        raise errors.OpPrereqError, ("The source path must be absolute")
2452

    
2453
      export_info = rpc.call_export_info(src_node, src_path)
2454

    
2455
      if not export_info:
2456
        raise errors.OpPrereqError, ("No export found in dir %s" % src_path)
2457

    
2458
      if not export_info.has_section(constants.INISECT_EXP):
2459
        raise errors.ProgrammerError, ("Corrupted export config")
2460

    
2461
      ei_version = export_info.get(constants.INISECT_EXP, 'version')
2462
      if (int(ei_version) != constants.EXPORT_VERSION):
2463
        raise errors.OpPrereqError, ("Wrong export version %s (wanted %d)" %
2464
                                     (ei_version, constants.EXPORT_VERSION))
2465

    
2466
      if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
2467
        raise errors.OpPrereqError, ("Can't import instance with more than"
2468
                                     " one data disk")
2469

    
2470
      # FIXME: are the old os-es, disk sizes, etc. useful?
2471
      self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
2472
      diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
2473
                                                         'disk0_dump'))
2474
      self.src_image = diskimage
2475
    else: # INSTANCE_CREATE
2476
      if getattr(self.op, "os_type", None) is None:
2477
        raise errors.OpPrereqError, ("No guest OS specified")
2478

    
2479
    # check primary node
2480
    pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
2481
    if pnode is None:
2482
      raise errors.OpPrereqError, ("Primary node '%s' is unknown" %
2483
                                   self.op.pnode)
2484
    self.op.pnode = pnode.name
2485
    self.pnode = pnode
2486
    self.secondaries = []
2487
    # disk template and mirror node verification
2488
    if self.op.disk_template not in constants.DISK_TEMPLATES:
2489
      raise errors.OpPrereqError, ("Invalid disk template name")
2490

    
2491
    if self.op.disk_template == constants.DT_REMOTE_RAID1:
2492
      if getattr(self.op, "snode", None) is None:
2493
        raise errors.OpPrereqError, ("The 'remote_raid1' disk template needs"
2494
                                     " a mirror node")
2495

    
2496
      snode_name = self.cfg.ExpandNodeName(self.op.snode)
2497
      if snode_name is None:
2498
        raise errors.OpPrereqError, ("Unknown secondary node '%s'" %
2499
                                     self.op.snode)
2500
      elif snode_name == pnode.name:
2501
        raise errors.OpPrereqError, ("The secondary node cannot be"
2502
                                     " the primary node.")
2503
      self.secondaries.append(snode_name)
2504

    
2505
    # Check lv size requirements
2506
    nodenames = [pnode.name] + self.secondaries
2507
    nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
2508

    
2509
    # Required free disk space as a function of disk and swap space
2510
    req_size_dict = {
2511
      constants.DT_DISKLESS: 0,
2512
      constants.DT_PLAIN: self.op.disk_size + self.op.swap_size,
2513
      constants.DT_LOCAL_RAID1: (self.op.disk_size + self.op.swap_size) * 2,
2514
      # 256 MB are added for drbd metadata, 128MB for each drbd device
2515
      constants.DT_REMOTE_RAID1: self.op.disk_size + self.op.swap_size + 256,
2516
    }
2517

    
2518
    if self.op.disk_template not in req_size_dict:
2519
      raise errors.ProgrammerError, ("Disk template '%s' size requirement"
2520
                                     " is unknown" %  self.op.disk_template)
2521

    
2522
    req_size = req_size_dict[self.op.disk_template]
2523

    
2524
    for node in nodenames:
2525
      info = nodeinfo.get(node, None)
2526
      if not info:
2527
        raise errors.OpPrereqError, ("Cannot get current information"
2528
                                     " from node '%s'" % nodeinfo)
2529
      if req_size > info['vg_free']:
2530
        raise errors.OpPrereqError, ("Not enough disk space on target node %s."
2531
                                     " %d MB available, %d MB required" %
2532
                                     (node, info['vg_free'], req_size))
2533

    
2534
    # os verification
2535
    os_obj = rpc.call_os_get([pnode.name], self.op.os_type)[pnode.name]
2536
    if not isinstance(os_obj, objects.OS):
2537
      raise errors.OpPrereqError, ("OS '%s' not in supported os list for"
2538
                                   " primary node"  % self.op.os_type)
2539

    
2540
    # instance verification
2541
    hostname1 = utils.LookupHostname(self.op.instance_name)
2542
    if not hostname1:
2543
      raise errors.OpPrereqError, ("Instance name '%s' not found in dns" %
2544
                                   self.op.instance_name)
2545

    
2546
    self.op.instance_name = instance_name = hostname1['hostname']
2547
    instance_list = self.cfg.GetInstanceList()
2548
    if instance_name in instance_list:
2549
      raise errors.OpPrereqError, ("Instance '%s' is already in the cluster" %
2550
                                   instance_name)
2551

    
2552
    ip = getattr(self.op, "ip", None)
2553
    if ip is None or ip.lower() == "none":
2554
      inst_ip = None
2555
    elif ip.lower() == "auto":
2556
      inst_ip = hostname1['ip']
2557
    else:
2558
      if not utils.IsValidIP(ip):
2559
        raise errors.OpPrereqError, ("given IP address '%s' doesn't look"
2560
                                     " like a valid IP" % ip)
2561
      inst_ip = ip
2562
    self.inst_ip = inst_ip
2563

    
2564
    command = ["fping", "-q", hostname1['ip']]
2565
    result = utils.RunCmd(command)
2566
    if not result.failed:
2567
      raise errors.OpPrereqError, ("IP %s of instance %s already in use" %
2568
                                   (hostname1['ip'], instance_name))
2569

    
2570
    # bridge verification
2571
    bridge = getattr(self.op, "bridge", None)
2572
    if bridge is None:
2573
      self.op.bridge = self.cfg.GetDefBridge()
2574
    else:
2575
      self.op.bridge = bridge
2576

    
2577
    if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
2578
      raise errors.OpPrereqError, ("target bridge '%s' does not exist on"
2579
                                   " destination node '%s'" %
2580
                                   (self.op.bridge, pnode.name))
2581

    
2582
    if self.op.start:
2583
      self.instance_status = 'up'
2584
    else:
2585
      self.instance_status = 'down'
2586

    
2587
  def Exec(self, feedback_fn):
2588
    """Create and add the instance to the cluster.
2589

2590
    """
2591
    instance = self.op.instance_name
2592
    pnode_name = self.pnode.name
2593

    
2594
    nic = objects.NIC(bridge=self.op.bridge, mac=self.cfg.GenerateMAC())
2595
    if self.inst_ip is not None:
2596
      nic.ip = self.inst_ip
2597

    
2598
    disks = _GenerateDiskTemplate(self.cfg, self.cfg.GetVGName(),
2599
                                  self.op.disk_template,
2600
                                  instance, pnode_name,
2601
                                  self.secondaries, self.op.disk_size,
2602
                                  self.op.swap_size)
2603

    
2604
    iobj = objects.Instance(name=instance, os=self.op.os_type,
2605
                            primary_node=pnode_name,
2606
                            memory=self.op.mem_size,
2607
                            vcpus=self.op.vcpus,
2608
                            nics=[nic], disks=disks,
2609
                            disk_template=self.op.disk_template,
2610
                            status=self.instance_status,
2611
                            )
2612

    
2613
    feedback_fn("* creating instance disks...")
2614
    if not _CreateDisks(self.cfg, iobj):
2615
      _RemoveDisks(iobj, self.cfg)
2616
      raise errors.OpExecError, ("Device creation failed, reverting...")
2617

    
2618
    feedback_fn("adding instance %s to cluster config" % instance)
2619

    
2620
    self.cfg.AddInstance(iobj)
2621

    
2622
    if self.op.wait_for_sync:
2623
      disk_abort = not _WaitForSync(self.cfg, iobj)
2624
    elif iobj.disk_template == "remote_raid1":
2625
      # make sure the disks are not degraded (still sync-ing is ok)
2626
      time.sleep(15)
2627
      feedback_fn("* checking mirrors status")
2628
      disk_abort = not _WaitForSync(self.cfg, iobj, oneshot=True)
2629
    else:
2630
      disk_abort = False
2631

    
2632
    if disk_abort:
2633
      _RemoveDisks(iobj, self.cfg)
2634
      self.cfg.RemoveInstance(iobj.name)
2635
      raise errors.OpExecError, ("There are some degraded disks for"
2636
                                      " this instance")
2637

    
2638
    feedback_fn("creating os for instance %s on node %s" %
2639
                (instance, pnode_name))
2640

    
2641
    if iobj.disk_template != constants.DT_DISKLESS:
2642
      if self.op.mode == constants.INSTANCE_CREATE:
2643
        feedback_fn("* running the instance OS create scripts...")
2644
        if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
2645
          raise errors.OpExecError, ("could not add os for instance %s"
2646
                                          " on node %s" %
2647
                                          (instance, pnode_name))
2648

    
2649
      elif self.op.mode == constants.INSTANCE_IMPORT:
2650
        feedback_fn("* running the instance OS import scripts...")
2651
        src_node = self.op.src_node
2652
        src_image = self.src_image
2653
        if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
2654
                                                src_node, src_image):
2655
          raise errors.OpExecError, ("Could not import os for instance"
2656
                                          " %s on node %s" %
2657
                                          (instance, pnode_name))
2658
      else:
2659
        # also checked in the prereq part
2660
        raise errors.ProgrammerError, ("Unknown OS initialization mode '%s'"
2661
                                       % self.op.mode)
2662

    
2663
    if self.op.start:
2664
      logger.Info("starting instance %s on node %s" % (instance, pnode_name))
2665
      feedback_fn("* starting instance...")
2666
      if not rpc.call_instance_start(pnode_name, iobj, None):
2667
        raise errors.OpExecError, ("Could not start instance")
2668

    
2669

    
2670
class LUConnectConsole(NoHooksLU):
2671
  """Connect to an instance's console.
2672

2673
  This is somewhat special in that it returns the command line that
2674
  you need to run on the master node in order to connect to the
2675
  console.
2676

2677
  """
2678
  _OP_REQP = ["instance_name"]
2679

    
2680
  def CheckPrereq(self):
2681
    """Check prerequisites.
2682

2683
    This checks that the instance is in the cluster.
2684

2685
    """
2686
    instance = self.cfg.GetInstanceInfo(
2687
      self.cfg.ExpandInstanceName(self.op.instance_name))
2688
    if instance is None:
2689
      raise errors.OpPrereqError, ("Instance '%s' not known" %
2690
                                   self.op.instance_name)
2691
    self.instance = instance
2692

    
2693
  def Exec(self, feedback_fn):
2694
    """Connect to the console of an instance
2695

2696
    """
2697
    instance = self.instance
2698
    node = instance.primary_node
2699

    
2700
    node_insts = rpc.call_instance_list([node])[node]
2701
    if node_insts is False:
2702
      raise errors.OpExecError, ("Can't connect to node %s." % node)
2703

    
2704
    if instance.name not in node_insts:
2705
      raise errors.OpExecError, ("Instance %s is not running." % instance.name)
2706

    
2707
    logger.Debug("connecting to console of %s on %s" % (instance.name, node))
2708

    
2709
    hyper = hypervisor.GetHypervisor()
2710
    console_cmd = hyper.GetShellCommandForConsole(instance.name)
2711
    return node, console_cmd
2712

    
2713

    
2714
class LUAddMDDRBDComponent(LogicalUnit):
2715
  """Adda new mirror member to an instance's disk.
2716

2717
  """
2718
  HPATH = "mirror-add"
2719
  HTYPE = constants.HTYPE_INSTANCE
2720
  _OP_REQP = ["instance_name", "remote_node", "disk_name"]
2721

    
2722
  def BuildHooksEnv(self):
2723
    """Build hooks env.
2724

2725
    This runs on the master, the primary and all the secondaries.
2726

2727
    """
2728
    env = {
2729
      "INSTANCE_NAME": self.op.instance_name,
2730
      "NEW_SECONDARY": self.op.remote_node,
2731
      "DISK_NAME": self.op.disk_name,
2732
      }
2733
    nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
2734
          self.op.remote_node,] + list(self.instance.secondary_nodes)
2735
    return env, nl, nl
2736

    
2737
  def CheckPrereq(self):
2738
    """Check prerequisites.
2739

2740
    This checks that the instance is in the cluster.
2741

2742
    """
2743
    instance = self.cfg.GetInstanceInfo(
2744
      self.cfg.ExpandInstanceName(self.op.instance_name))
2745
    if instance is None:
2746
      raise errors.OpPrereqError, ("Instance '%s' not known" %
2747
                                   self.op.instance_name)
2748
    self.instance = instance
2749

    
2750
    remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
2751
    if remote_node is None:
2752
      raise errors.OpPrereqError, ("Node '%s' not known" % self.op.remote_node)
2753
    self.remote_node = remote_node
2754

    
2755
    if remote_node == instance.primary_node:
2756
      raise errors.OpPrereqError, ("The specified node is the primary node of"
2757
                                   " the instance.")
2758

    
2759
    if instance.disk_template != constants.DT_REMOTE_RAID1:
2760
      raise errors.OpPrereqError, ("Instance's disk layout is not"
2761
                                   " remote_raid1.")
2762
    for disk in instance.disks:
2763
      if disk.iv_name == self.op.disk_name:
2764
        break
2765
    else:
2766
      raise errors.OpPrereqError, ("Can't find this device ('%s') in the"
2767
                                   " instance." % self.op.disk_name)
2768
    if len(disk.children) > 1:
2769
      raise errors.OpPrereqError, ("The device already has two slave"
2770
                                   " devices.\n"
2771
                                   "This would create a 3-disk raid1"
2772
                                   " which we don't allow.")
2773
    self.disk = disk
2774

    
2775
  def Exec(self, feedback_fn):
2776
    """Add the mirror component
2777

2778
    """
2779
    disk = self.disk
2780
    instance = self.instance
2781

    
2782
    remote_node = self.remote_node
2783
    new_drbd = _GenerateMDDRBDBranch(self.cfg, self.cfg.GetVGName(),
2784
                                     instance.primary_node, remote_node,
2785
                                     disk.size, "%s-%s" %
2786
                                     (instance.name, self.op.disk_name))
2787

    
2788
    logger.Info("adding new mirror component on secondary")
2789
    #HARDCODE
2790
    if not _CreateBlockDevOnSecondary(self.cfg, remote_node, new_drbd, False):
2791
      raise errors.OpExecError, ("Failed to create new component on secondary"
2792
                                 " node %s" % remote_node)
2793

    
2794
    logger.Info("adding new mirror component on primary")
2795
    #HARDCODE
2796
    if not _CreateBlockDevOnPrimary(self.cfg, instance.primary_node, new_drbd):
2797
      # remove secondary dev
2798
      self.cfg.SetDiskID(new_drbd, remote_node)
2799
      rpc.call_blockdev_remove(remote_node, new_drbd)
2800
      raise errors.OpExecError, ("Failed to create volume on primary")
2801

    
2802
    # the device exists now
2803
    # call the primary node to add the mirror to md
2804
    logger.Info("adding new mirror component to md")
2805
    if not rpc.call_blockdev_addchild(instance.primary_node,
2806
                                           disk, new_drbd):
2807
      logger.Error("Can't add mirror compoment to md!")
2808
      self.cfg.SetDiskID(new_drbd, remote_node)
2809
      if not rpc.call_blockdev_remove(remote_node, new_drbd):
2810
        logger.Error("Can't rollback on secondary")
2811
      self.cfg.SetDiskID(new_drbd, instance.primary_node)
2812
      if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
2813
        logger.Error("Can't rollback on primary")
2814
      raise errors.OpExecError, "Can't add mirror component to md array"
2815

    
2816
    disk.children.append(new_drbd)
2817

    
2818
    self.cfg.AddInstance(instance)
2819

    
2820
    _WaitForSync(self.cfg, instance)
2821

    
2822
    return 0
2823

    
2824

    
2825
class LURemoveMDDRBDComponent(LogicalUnit):
2826
  """Remove a component from a remote_raid1 disk.
2827

2828
  """
2829
  HPATH = "mirror-remove"
2830
  HTYPE = constants.HTYPE_INSTANCE
2831
  _OP_REQP = ["instance_name", "disk_name", "disk_id"]
2832

    
2833
  def BuildHooksEnv(self):
2834
    """Build hooks env.
2835

2836
    This runs on the master, the primary and all the secondaries.
2837

2838
    """
2839
    env = {
2840
      "INSTANCE_NAME": self.op.instance_name,
2841
      "DISK_NAME": self.op.disk_name,
2842
      "DISK_ID": self.op.disk_id,
2843
      "OLD_SECONDARY": self.old_secondary,
2844
      }
2845
    nl = [self.sstore.GetMasterNode(),
2846
          self.instance.primary_node] + list(self.instance.secondary_nodes)
2847
    return env, nl, nl
2848

    
2849
  def CheckPrereq(self):
2850
    """Check prerequisites.
2851

2852
    This checks that the instance is in the cluster.
2853

2854
    """
2855
    instance = self.cfg.GetInstanceInfo(
2856
      self.cfg.ExpandInstanceName(self.op.instance_name))
2857
    if instance is None:
2858
      raise errors.OpPrereqError, ("Instance '%s' not known" %
2859
                                   self.op.instance_name)
2860
    self.instance = instance
2861

    
2862
    if instance.disk_template != constants.DT_REMOTE_RAID1:
2863
      raise errors.OpPrereqError, ("Instance's disk layout is not"
2864
                                   " remote_raid1.")
2865
    for disk in instance.disks:
2866
      if disk.iv_name == self.op.disk_name:
2867
        break
2868
    else:
2869
      raise errors.OpPrereqError, ("Can't find this device ('%s') in the"
2870
                                   " instance." % self.op.disk_name)
2871
    for child in disk.children:
2872
      if child.dev_type == "drbd" and child.logical_id[2] == self.op.disk_id:
2873
        break
2874
    else:
2875
      raise errors.OpPrereqError, ("Can't find the device with this port.")
2876

    
2877
    if len(disk.children) < 2:
2878
      raise errors.OpPrereqError, ("Cannot remove the last component from"
2879
                                   " a mirror.")
2880
    self.disk = disk
2881
    self.child = child
2882
    if self.child.logical_id[0] == instance.primary_node:
2883
      oid = 1
2884
    else:
2885
      oid = 0
2886
    self.old_secondary = self.child.logical_id[oid]
2887

    
2888
  def Exec(self, feedback_fn):
2889
    """Remove the mirror component
2890

2891
    """
2892
    instance = self.instance
2893
    disk = self.disk
2894
    child = self.child
2895
    logger.Info("remove mirror component")
2896
    self.cfg.SetDiskID(disk, instance.primary_node)
2897
    if not rpc.call_blockdev_removechild(instance.primary_node,
2898
                                              disk, child):
2899
      raise errors.OpExecError, ("Can't remove child from mirror.")
2900

    
2901
    for node in child.logical_id[:2]:
2902
      self.cfg.SetDiskID(child, node)
2903
      if not rpc.call_blockdev_remove(node, child):
2904
        logger.Error("Warning: failed to remove device from node %s,"
2905
                     " continuing operation." % node)
2906

    
2907
    disk.children.remove(child)
2908
    self.cfg.AddInstance(instance)
2909

    
2910

    
2911
class LUReplaceDisks(LogicalUnit):
2912
  """Replace the disks of an instance.
2913

2914
  """
2915
  HPATH = "mirrors-replace"
2916
  HTYPE = constants.HTYPE_INSTANCE
2917
  _OP_REQP = ["instance_name"]
2918

    
2919
  def BuildHooksEnv(self):
2920
    """Build hooks env.
2921

2922
    This runs on the master, the primary and all the secondaries.
2923

2924
    """
2925
    env = {
2926
      "INSTANCE_NAME": self.op.instance_name,
2927
      "NEW_SECONDARY": self.op.remote_node,
2928
      "OLD_SECONDARY": self.instance.secondary_nodes[0],
2929
      }
2930
    nl = [self.sstore.GetMasterNode(),
2931
          self.instance.primary_node] + list(self.instance.secondary_nodes)
2932
    return env, nl, nl
2933

    
2934
  def CheckPrereq(self):
2935
    """Check prerequisites.
2936

2937
    This checks that the instance is in the cluster.
2938

2939
    """
2940
    instance = self.cfg.GetInstanceInfo(
2941
      self.cfg.ExpandInstanceName(self.op.instance_name))
2942
    if instance is None:
2943
      raise errors.OpPrereqError, ("Instance '%s' not known" %
2944
                                   self.op.instance_name)
2945
    self.instance = instance
2946

    
2947
    if instance.disk_template != constants.DT_REMOTE_RAID1:
2948
      raise errors.OpPrereqError, ("Instance's disk layout is not"
2949
                                   " remote_raid1.")
2950

    
2951
    if len(instance.secondary_nodes) != 1:
2952
      raise errors.OpPrereqError, ("The instance has a strange layout,"
2953
                                   " expected one secondary but found %d" %
2954
                                   len(instance.secondary_nodes))
2955

    
2956
    remote_node = getattr(self.op, "remote_node", None)
2957
    if remote_node is None:
2958
      remote_node = instance.secondary_nodes[0]
2959
    else:
2960
      remote_node = self.cfg.ExpandNodeName(remote_node)
2961
      if remote_node is None:
2962
        raise errors.OpPrereqError, ("Node '%s' not known" %
2963
                                     self.op.remote_node)
2964
    if remote_node == instance.primary_node:
2965
      raise errors.OpPrereqError, ("The specified node is the primary node of"
2966
                                   " the instance.")
2967
    self.op.remote_node = remote_node
2968

    
2969
  def Exec(self, feedback_fn):
2970
    """Replace the disks of an instance.
2971

2972
    """
2973
    instance = self.instance
2974
    iv_names = {}
2975
    # start of work
2976
    remote_node = self.op.remote_node
2977
    cfg = self.cfg
2978
    vgname = cfg.GetVGName()
2979
    for dev in instance.disks:
2980
      size = dev.size
2981
      new_drbd = _GenerateMDDRBDBranch(cfg, vgname, instance.primary_node,
2982
                                       remote_node, size,
2983
                                       "%s-%s" % (instance.name, dev.iv_name))
2984
      iv_names[dev.iv_name] = (dev, dev.children[0], new_drbd)
2985
      logger.Info("adding new mirror component on secondary for %s" %
2986
                  dev.iv_name)
2987
      #HARDCODE
2988
      if not _CreateBlockDevOnSecondary(cfg, remote_node, new_drbd, False):
2989
        raise errors.OpExecError, ("Failed to create new component on"
2990
                                   " secondary node %s\n"
2991
                                   "Full abort, cleanup manually!" %
2992
                                   remote_node)
2993

    
2994
      logger.Info("adding new mirror component on primary")
2995
      #HARDCODE
2996
      if not _CreateBlockDevOnPrimary(cfg, instance.primary_node, new_drbd):
2997
        # remove secondary dev
2998
        cfg.SetDiskID(new_drbd, remote_node)
2999
        rpc.call_blockdev_remove(remote_node, new_drbd)
3000
        raise errors.OpExecError("Failed to create volume on primary!\n"
3001
                                 "Full abort, cleanup manually!!")
3002

    
3003
      # the device exists now
3004
      # call the primary node to add the mirror to md
3005
      logger.Info("adding new mirror component to md")
3006
      if not rpc.call_blockdev_addchild(instance.primary_node, dev,
3007
                                        new_drbd):
3008
        logger.Error("Can't add mirror compoment to md!")
3009
        cfg.SetDiskID(new_drbd, remote_node)
3010
        if not rpc.call_blockdev_remove(remote_node, new_drbd):
3011
          logger.Error("Can't rollback on secondary")
3012
        cfg.SetDiskID(new_drbd, instance.primary_node)
3013
        if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3014
          logger.Error("Can't rollback on primary")
3015
        raise errors.OpExecError, ("Full abort, cleanup manually!!")
3016

    
3017
      dev.children.append(new_drbd)
3018
      cfg.AddInstance(instance)
3019

    
3020
    # this can fail as the old devices are degraded and _WaitForSync
3021
    # does a combined result over all disks, so we don't check its
3022
    # return value
3023
    _WaitForSync(cfg, instance, unlock=True)
3024

    
3025
    # so check manually all the devices
3026
    for name in iv_names:
3027
      dev, child, new_drbd = iv_names[name]
3028
      cfg.SetDiskID(dev, instance.primary_node)
3029
      is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3030
      if is_degr:
3031
        raise errors.OpExecError, ("MD device %s is degraded!" % name)
3032
      cfg.SetDiskID(new_drbd, instance.primary_node)
3033
      is_degr = rpc.call_blockdev_find(instance.primary_node, new_drbd)[5]
3034
      if is_degr:
3035
        raise errors.OpExecError, ("New drbd device %s is degraded!" % name)
3036

    
3037
    for name in iv_names:
3038
      dev, child, new_drbd = iv_names[name]
3039
      logger.Info("remove mirror %s component" % name)
3040
      cfg.SetDiskID(dev, instance.primary_node)
3041
      if not rpc.call_blockdev_removechild(instance.primary_node,
3042
                                                dev, child):
3043
        logger.Error("Can't remove child from mirror, aborting"
3044
                     " *this device cleanup*.\nYou need to cleanup manually!!")
3045
        continue
3046

    
3047
      for node in child.logical_id[:2]:
3048
        logger.Info("remove child device on %s" % node)
3049
        cfg.SetDiskID(child, node)
3050
        if not rpc.call_blockdev_remove(node, child):
3051
          logger.Error("Warning: failed to remove device from node %s,"
3052
                       " continuing operation." % node)
3053

    
3054
      dev.children.remove(child)
3055

    
3056
      cfg.AddInstance(instance)
3057

    
3058

    
3059
class LUQueryInstanceData(NoHooksLU):
3060
  """Query runtime instance data.
3061

3062
  """
3063
  _OP_REQP = ["instances"]
3064

    
3065
  def CheckPrereq(self):
3066
    """Check prerequisites.
3067

3068
    This only checks the optional instance list against the existing names.
3069

3070
    """
3071
    if not isinstance(self.op.instances, list):
3072
      raise errors.OpPrereqError, "Invalid argument type 'instances'"
3073
    if self.op.instances:
3074
      self.wanted_instances = []
3075
      names = self.op.instances
3076
      for name in names:
3077
        instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
3078
        if instance is None:
3079
          raise errors.OpPrereqError, ("No such instance name '%s'" % name)
3080
      self.wanted_instances.append(instance)
3081
    else:
3082
      self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
3083
                               in self.cfg.GetInstanceList()]
3084
    return
3085

    
3086

    
3087
  def _ComputeDiskStatus(self, instance, snode, dev):
3088
    """Compute block device status.
3089

3090
    """
3091
    self.cfg.SetDiskID(dev, instance.primary_node)
3092
    dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
3093
    if dev.dev_type == "drbd":
3094
      # we change the snode then (otherwise we use the one passed in)
3095
      if dev.logical_id[0] == instance.primary_node:
3096
        snode = dev.logical_id[1]
3097
      else:
3098
        snode = dev.logical_id[0]
3099

    
3100
    if snode:
3101
      self.cfg.SetDiskID(dev, snode)
3102
      dev_sstatus = rpc.call_blockdev_find(snode, dev)
3103
    else:
3104
      dev_sstatus = None
3105

    
3106
    if dev.children:
3107
      dev_children = [self._ComputeDiskStatus(instance, snode, child)
3108
                      for child in dev.children]
3109
    else:
3110
      dev_children = []
3111

    
3112
    data = {
3113
      "iv_name": dev.iv_name,
3114
      "dev_type": dev.dev_type,
3115
      "logical_id": dev.logical_id,
3116
      "physical_id": dev.physical_id,
3117
      "pstatus": dev_pstatus,
3118
      "sstatus": dev_sstatus,
3119
      "children": dev_children,
3120
      }
3121

    
3122
    return data
3123

    
3124
  def Exec(self, feedback_fn):
3125
    """Gather and return data"""
3126
    result = {}
3127
    for instance in self.wanted_instances:
3128
      remote_info = rpc.call_instance_info(instance.primary_node,
3129
                                                instance.name)
3130
      if remote_info and "state" in remote_info:
3131
        remote_state = "up"
3132
      else:
3133
        remote_state = "down"
3134
      if instance.status == "down":
3135
        config_state = "down"
3136
      else:
3137
        config_state = "up"
3138

    
3139
      disks = [self._ComputeDiskStatus(instance, None, device)
3140
               for device in instance.disks]
3141

    
3142
      idict = {
3143
        "name": instance.name,
3144
        "config_state": config_state,
3145
        "run_state": remote_state,
3146
        "pnode": instance.primary_node,
3147
        "snodes": instance.secondary_nodes,
3148
        "os": instance.os,
3149
        "memory": instance.memory,
3150
        "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
3151
        "disks": disks,
3152
        }
3153

    
3154
      result[instance.name] = idict
3155

    
3156
    return result
3157

    
3158

    
3159
class LUQueryNodeData(NoHooksLU):
3160
  """Logical unit for querying node data.
3161

3162
  """
3163
  _OP_REQP = ["nodes"]
3164

    
3165
  def CheckPrereq(self):
3166
    """Check prerequisites.
3167

3168
    This only checks the optional node list against the existing names.
3169

3170
    """
3171
    self.wanted_nodes = _GetWantedNodes(self, self.op.nodes)
3172

    
3173
  def Exec(self, feedback_fn):
3174
    """Compute and return the list of nodes.
3175

3176
    """
3177
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
3178
             in self.cfg.GetInstanceList()]
3179
    result = []
3180
    for node in self.wanted_nodes:
3181
      result.append((node.name, node.primary_ip, node.secondary_ip,
3182
                     [inst.name for inst in ilist
3183
                      if inst.primary_node == node.name],
3184
                     [inst.name for inst in ilist
3185
                      if node.name in inst.secondary_nodes],
3186
                     ))
3187
    return result
3188

    
3189

    
3190
class LUSetInstanceParms(LogicalUnit):
3191
  """Modifies an instances's parameters.
3192

3193
  """
3194
  HPATH = "instance-modify"
3195
  HTYPE = constants.HTYPE_INSTANCE
3196
  _OP_REQP = ["instance_name"]
3197

    
3198
  def BuildHooksEnv(self):
3199
    """Build hooks env.
3200

3201
    This runs on the master, primary and secondaries.
3202

3203
    """
3204
    env = {
3205
      "INSTANCE_NAME": self.op.instance_name,
3206
      }
3207
    if self.mem:
3208
      env["MEM_SIZE"] = self.mem
3209
    if self.vcpus:
3210
      env["VCPUS"] = self.vcpus
3211
    if self.do_ip:
3212
      env["INSTANCE_IP"] = self.ip
3213
    if self.bridge:
3214
      env["BRIDGE"] = self.bridge
3215

    
3216
    nl = [self.sstore.GetMasterNode(),
3217
          self.instance.primary_node] + list(self.instance.secondary_nodes)
3218

    
3219
    return env, nl, nl
3220

    
3221
  def CheckPrereq(self):
3222
    """Check prerequisites.
3223

3224
    This only checks the instance list against the existing names.
3225

3226
    """
3227
    self.mem = getattr(self.op, "mem", None)
3228
    self.vcpus = getattr(self.op, "vcpus", None)
3229
    self.ip = getattr(self.op, "ip", None)
3230
    self.bridge = getattr(self.op, "bridge", None)
3231
    if [self.mem, self.vcpus, self.ip, self.bridge].count(None) == 4:
3232
      raise errors.OpPrereqError, ("No changes submitted")
3233
    if self.mem is not None:
3234
      try:
3235
        self.mem = int(self.mem)
3236
      except ValueError, err:
3237
        raise errors.OpPrereqError, ("Invalid memory size: %s" % str(err))
3238
    if self.vcpus is not None:
3239
      try:
3240
        self.vcpus = int(self.vcpus)
3241
      except ValueError, err:
3242
        raise errors.OpPrereqError, ("Invalid vcpus number: %s" % str(err))
3243
    if self.ip is not None:
3244
      self.do_ip = True
3245
      if self.ip.lower() == "none":
3246
        self.ip = None
3247
      else:
3248
        if not utils.IsValidIP(self.ip):
3249
          raise errors.OpPrereqError, ("Invalid IP address '%s'." % self.ip)
3250
    else:
3251
      self.do_ip = False
3252

    
3253
    instance = self.cfg.GetInstanceInfo(
3254
      self.cfg.ExpandInstanceName(self.op.instance_name))
3255
    if instance is None:
3256
      raise errors.OpPrereqError, ("No such instance name '%s'" %
3257
                                   self.op.instance_name)
3258
    self.op.instance_name = instance.name
3259
    self.instance = instance
3260
    return
3261

    
3262
  def Exec(self, feedback_fn):
3263
    """Modifies an instance.
3264

3265
    All parameters take effect only at the next restart of the instance.
3266
    """
3267
    result = []
3268
    instance = self.instance
3269
    if self.mem:
3270
      instance.memory = self.mem
3271
      result.append(("mem", self.mem))
3272
    if self.vcpus:
3273
      instance.vcpus = self.vcpus
3274
      result.append(("vcpus",  self.vcpus))
3275
    if self.do_ip:
3276
      instance.nics[0].ip = self.ip
3277
      result.append(("ip", self.ip))
3278
    if self.bridge:
3279
      instance.nics[0].bridge = self.bridge
3280
      result.append(("bridge", self.bridge))
3281

    
3282
    self.cfg.AddInstance(instance)
3283

    
3284
    return result
3285

    
3286

    
3287
class LUQueryExports(NoHooksLU):
3288
  """Query the exports list
3289

3290
  """
3291
  _OP_REQP = []
3292

    
3293
  def CheckPrereq(self):
3294
    """Check that the nodelist contains only existing nodes.
3295

3296
    """
3297
    self.nodes = _GetWantedNodes(self, getattr(self.op, "nodes", None))
3298

    
3299
  def Exec(self, feedback_fn):
3300
    """Compute the list of all the exported system images.
3301

3302
    Returns:
3303
      a dictionary with the structure node->(export-list)
3304
      where export-list is a list of the instances exported on
3305
      that node.
3306

3307
    """
3308
    return rpc.call_export_list([node.name for node in self.nodes])
3309

    
3310

    
3311
class LUExportInstance(LogicalUnit):
3312
  """Export an instance to an image in the cluster.
3313

3314
  """
3315
  HPATH = "instance-export"
3316
  HTYPE = constants.HTYPE_INSTANCE
3317
  _OP_REQP = ["instance_name", "target_node", "shutdown"]
3318

    
3319
  def BuildHooksEnv(self):
3320
    """Build hooks env.
3321

3322
    This will run on the master, primary node and target node.
3323

3324
    """
3325
    env = {
3326
      "INSTANCE_NAME": self.op.instance_name,
3327
      "EXPORT_NODE": self.op.target_node,
3328
      "EXPORT_DO_SHUTDOWN": self.op.shutdown,
3329
      }
3330
    nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
3331
          self.op.target_node]
3332
    return env, nl, nl
3333

    
3334
  def CheckPrereq(self):
3335
    """Check prerequisites.
3336

3337
    This checks that the instance name is a valid one.
3338

3339
    """
3340
    instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
3341
    self.instance = self.cfg.GetInstanceInfo(instance_name)
3342
    if self.instance is None:
3343
      raise errors.OpPrereqError, ("Instance '%s' not found" %
3344
                                   self.op.instance_name)
3345

    
3346
    # node verification
3347
    dst_node_short = self.cfg.ExpandNodeName(self.op.target_node)
3348
    self.dst_node = self.cfg.GetNodeInfo(dst_node_short)
3349

    
3350
    if self.dst_node is None:
3351
      raise errors.OpPrereqError, ("Destination node '%s' is unknown." %
3352
                                   self.op.target_node)
3353
    self.op.target_node = self.dst_node.name
3354

    
3355
  def Exec(self, feedback_fn):
3356
    """Export an instance to an image in the cluster.
3357

3358
    """
3359
    instance = self.instance
3360
    dst_node = self.dst_node
3361
    src_node = instance.primary_node
3362
    # shutdown the instance, unless requested not to do so
3363
    if self.op.shutdown:
3364
      op = opcodes.OpShutdownInstance(instance_name=instance.name)
3365
      self.processor.ChainOpCode(op, feedback_fn)
3366

    
3367
    vgname = self.cfg.GetVGName()
3368

    
3369
    snap_disks = []
3370

    
3371
    try:
3372
      for disk in instance.disks:
3373
        if disk.iv_name == "sda":
3374
          # new_dev_name will be a snapshot of an lvm leaf of the one we passed
3375
          new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
3376

    
3377
          if not new_dev_name:
3378
            logger.Error("could not snapshot block device %s on node %s" %
3379
                         (disk.logical_id[1], src_node))
3380
          else:
3381
            new_dev = objects.Disk(dev_type="lvm", size=disk.size,
3382
                                      logical_id=(vgname, new_dev_name),
3383
                                      physical_id=(vgname, new_dev_name),
3384
                                      iv_name=disk.iv_name)
3385
            snap_disks.append(new_dev)
3386

    
3387
    finally:
3388
      if self.op.shutdown:
3389
        op = opcodes.OpStartupInstance(instance_name=instance.name,
3390
                                       force=False)
3391
        self.processor.ChainOpCode(op, feedback_fn)
3392

    
3393
    # TODO: check for size
3394

    
3395
    for dev in snap_disks:
3396
      if not rpc.call_snapshot_export(src_node, dev, dst_node.name,
3397
                                           instance):
3398
        logger.Error("could not export block device %s from node"
3399
                     " %s to node %s" %
3400
                     (dev.logical_id[1], src_node, dst_node.name))
3401
      if not rpc.call_blockdev_remove(src_node, dev):
3402
        logger.Error("could not remove snapshot block device %s from"
3403
                     " node %s" % (dev.logical_id[1], src_node))
3404

    
3405
    if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
3406
      logger.Error("could not finalize export for instance %s on node %s" %
3407
                   (instance.name, dst_node.name))
3408

    
3409
    nodelist = self.cfg.GetNodeList()
3410
    nodelist.remove(dst_node.name)
3411

    
3412
    # on one-node clusters nodelist will be empty after the removal
3413
    # if we proceed the backup would be removed because OpQueryExports
3414
    # substitutes an empty list with the full cluster node list.
3415
    if nodelist:
3416
      op = opcodes.OpQueryExports(nodes=nodelist)
3417
      exportlist = self.processor.ChainOpCode(op, feedback_fn)
3418
      for node in exportlist:
3419
        if instance.name in exportlist[node]:
3420
          if not rpc.call_export_remove(node, instance.name):
3421
            logger.Error("could not remove older export for instance %s"
3422
                         " on node %s" % (instance.name, node))