Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ dcb93971

History | View | Annotate | Download (109.3 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0613,W0201
25

    
26
import os
27
import os.path
28
import sha
29
import socket
30
import time
31
import tempfile
32
import re
33
import platform
34

    
35
from ganeti import rpc
36
from ganeti import ssh
37
from ganeti import logger
38
from ganeti import utils
39
from ganeti import errors
40
from ganeti import hypervisor
41
from ganeti import config
42
from ganeti import constants
43
from ganeti import objects
44
from ganeti import opcodes
45
from ganeti import ssconf
46

    
47
class LogicalUnit(object):
48
  """Logical Unit base class..
49

50
  Subclasses must follow these rules:
51
    - implement CheckPrereq which also fills in the opcode instance
52
      with all the fields (even if as None)
53
    - implement Exec
54
    - implement BuildHooksEnv
55
    - redefine HPATH and HTYPE
56
    - optionally redefine their run requirements (REQ_CLUSTER,
57
      REQ_MASTER); note that all commands require root permissions
58

59
  """
60
  HPATH = None
61
  HTYPE = None
62
  _OP_REQP = []
63
  REQ_CLUSTER = True
64
  REQ_MASTER = True
65

    
66
  def __init__(self, processor, op, cfg, sstore):
67
    """Constructor for LogicalUnit.
68

69
    This needs to be overriden in derived classes in order to check op
70
    validity.
71

72
    """
73
    self.processor = processor
74
    self.op = op
75
    self.cfg = cfg
76
    self.sstore = sstore
77
    for attr_name in self._OP_REQP:
78
      attr_val = getattr(op, attr_name, None)
79
      if attr_val is None:
80
        raise errors.OpPrereqError, ("Required parameter '%s' missing" %
81
                                     attr_name)
82
    if self.REQ_CLUSTER:
83
      if not cfg.IsCluster():
84
        raise errors.OpPrereqError, ("Cluster not initialized yet,"
85
                                     " use 'gnt-cluster init' first.")
86
      if self.REQ_MASTER:
87
        master = sstore.GetMasterNode()
88
        if master != socket.gethostname():
89
          raise errors.OpPrereqError, ("Commands must be run on the master"
90
                                       " node %s" % master)
91

    
92
  def CheckPrereq(self):
93
    """Check prerequisites for this LU.
94

95
    This method should check that the prerequisites for the execution
96
    of this LU are fulfilled. It can do internode communication, but
97
    it should be idempotent - no cluster or system changes are
98
    allowed.
99

100
    The method should raise errors.OpPrereqError in case something is
101
    not fulfilled. Its return value is ignored.
102

103
    This method should also update all the parameters of the opcode to
104
    their canonical form; e.g. a short node name must be fully
105
    expanded after this method has successfully completed (so that
106
    hooks, logging, etc. work correctly).
107

108
    """
109
    raise NotImplementedError
110

    
111
  def Exec(self, feedback_fn):
112
    """Execute the LU.
113

114
    This method should implement the actual work. It should raise
115
    errors.OpExecError for failures that are somewhat dealt with in
116
    code, or expected.
117

118
    """
119
    raise NotImplementedError
120

    
121
  def BuildHooksEnv(self):
122
    """Build hooks environment for this LU.
123

124
    This method should return a three-node tuple consisting of: a dict
125
    containing the environment that will be used for running the
126
    specific hook for this LU, a list of node names on which the hook
127
    should run before the execution, and a list of node names on which
128
    the hook should run after the execution.
129

130
    The keys of the dict must not have 'GANETI_' prefixed as this will
131
    be handled in the hooks runner. Also note additional keys will be
132
    added by the hooks runner. If the LU doesn't define any
133
    environment, an empty dict (and not None) should be returned.
134

135
    As for the node lists, the master should not be included in the
136
    them, as it will be added by the hooks runner in case this LU
137
    requires a cluster to run on (otherwise we don't have a node
138
    list). No nodes should be returned as an empty list (and not
139
    None).
140

141
    Note that if the HPATH for a LU class is None, this function will
142
    not be called.
143

144
    """
145
    raise NotImplementedError
146

    
147

    
148
class NoHooksLU(LogicalUnit):
149
  """Simple LU which runs no hooks.
150

151
  This LU is intended as a parent for other LogicalUnits which will
152
  run no hooks, in order to reduce duplicate code.
153

154
  """
155
  HPATH = None
156
  HTYPE = None
157

    
158
  def BuildHooksEnv(self):
159
    """Build hooks env.
160

161
    This is a no-op, since we don't run hooks.
162

163
    """
164
    return
165

    
166

    
167
def _GetWantedNodes(lu, nodes):
168
  if nodes is not None and not isinstance(nodes, list):
169
    raise errors.OpPrereqError, "Invalid argument type 'nodes'"
170

    
171
  if nodes:
172
    wanted_nodes = []
173

    
174
    for name in nodes:
175
      node = lu.cfg.GetNodeInfo(lu.cfg.ExpandNodeName(name))
176
      if node is None:
177
        raise errors.OpPrereqError, ("No such node name '%s'" % name)
178
    wanted_nodes.append(node)
179

    
180
    return wanted_nodes
181
  else:
182
    return [lu.cfg.GetNodeInfo(name) for name in lu.cfg.GetNodeList()]
183

    
184

    
185
def _CheckOutputFields(static, dynamic, selected):
186
    static_fields = frozenset(static)
187
    dynamic_fields = frozenset(dynamic)
188

    
189
    all_fields = static_fields | dynamic_fields
190

    
191
    if not all_fields.issuperset(selected):
192
      raise errors.OpPrereqError, ("Unknown output fields selected: %s"
193
                                   % ",".join(frozenset(selected).
194
                                              difference(all_fields)))
195

    
196

    
197
def _UpdateEtcHosts(fullnode, ip):
198
  """Ensure a node has a correct entry in /etc/hosts.
199

200
  Args:
201
    fullnode - Fully qualified domain name of host. (str)
202
    ip       - IPv4 address of host (str)
203

204
  """
205
  node = fullnode.split(".", 1)[0]
206

    
207
  f = open('/etc/hosts', 'r+')
208

    
209
  inthere = False
210

    
211
  save_lines = []
212
  add_lines = []
213
  removed = False
214

    
215
  while True:
216
    rawline = f.readline()
217

    
218
    if not rawline:
219
      # End of file
220
      break
221

    
222
    line = rawline.split('\n')[0]
223

    
224
    # Strip off comments
225
    line = line.split('#')[0]
226

    
227
    if not line:
228
      # Entire line was comment, skip
229
      save_lines.append(rawline)
230
      continue
231

    
232
    fields = line.split()
233

    
234
    haveall = True
235
    havesome = False
236
    for spec in [ ip, fullnode, node ]:
237
      if spec not in fields:
238
        haveall = False
239
      if spec in fields:
240
        havesome = True
241

    
242
    if haveall:
243
      inthere = True
244
      save_lines.append(rawline)
245
      continue
246

    
247
    if havesome and not haveall:
248
      # Line (old, or manual?) which is missing some.  Remove.
249
      removed = True
250
      continue
251

    
252
    save_lines.append(rawline)
253

    
254
  if not inthere:
255
    add_lines.append('%s\t%s %s\n' % (ip, fullnode, node))
256

    
257
  if removed:
258
    if add_lines:
259
      save_lines = save_lines + add_lines
260

    
261
    # We removed a line, write a new file and replace old.
262
    fd, tmpname = tempfile.mkstemp('tmp', 'hosts_', '/etc')
263
    newfile = os.fdopen(fd, 'w')
264
    newfile.write(''.join(save_lines))
265
    newfile.close()
266
    os.rename(tmpname, '/etc/hosts')
267

    
268
  elif add_lines:
269
    # Simply appending a new line will do the trick.
270
    f.seek(0, 2)
271
    for add in add_lines:
272
      f.write(add)
273

    
274
  f.close()
275

    
276

    
277
def _UpdateKnownHosts(fullnode, ip, pubkey):
278
  """Ensure a node has a correct known_hosts entry.
279

280
  Args:
281
    fullnode - Fully qualified domain name of host. (str)
282
    ip       - IPv4 address of host (str)
283
    pubkey   - the public key of the cluster
284

285
  """
286
  if os.path.exists('/etc/ssh/ssh_known_hosts'):
287
    f = open('/etc/ssh/ssh_known_hosts', 'r+')
288
  else:
289
    f = open('/etc/ssh/ssh_known_hosts', 'w+')
290

    
291
  inthere = False
292

    
293
  save_lines = []
294
  add_lines = []
295
  removed = False
296

    
297
  while True:
298
    rawline = f.readline()
299
    logger.Debug('read %s' % (repr(rawline),))
300

    
301
    if not rawline:
302
      # End of file
303
      break
304

    
305
    line = rawline.split('\n')[0]
306

    
307
    parts = line.split(' ')
308
    fields = parts[0].split(',')
309
    key = parts[2]
310

    
311
    haveall = True
312
    havesome = False
313
    for spec in [ ip, fullnode ]:
314
      if spec not in fields:
315
        haveall = False
316
      if spec in fields:
317
        havesome = True
318

    
319
    logger.Debug("key, pubkey = %s." % (repr((key, pubkey)),))
320
    if haveall and key == pubkey:
321
      inthere = True
322
      save_lines.append(rawline)
323
      logger.Debug("Keeping known_hosts '%s'." % (repr(rawline),))
324
      continue
325

    
326
    if havesome and (not haveall or key != pubkey):
327
      removed = True
328
      logger.Debug("Discarding known_hosts '%s'." % (repr(rawline),))
329
      continue
330

    
331
    save_lines.append(rawline)
332

    
333
  if not inthere:
334
    add_lines.append('%s,%s ssh-rsa %s\n' % (fullnode, ip, pubkey))
335
    logger.Debug("Adding known_hosts '%s'." % (repr(add_lines[-1]),))
336

    
337
  if removed:
338
    save_lines = save_lines + add_lines
339

    
340
    # Write a new file and replace old.
341
    fd, tmpname = tempfile.mkstemp('tmp', 'ssh_known_hosts_', '/etc/ssh')
342
    newfile = os.fdopen(fd, 'w')
343
    newfile.write(''.join(save_lines))
344
    newfile.close()
345
    logger.Debug("Wrote new known_hosts.")
346
    os.rename(tmpname, '/etc/ssh/ssh_known_hosts')
347

    
348
  elif add_lines:
349
    # Simply appending a new line will do the trick.
350
    f.seek(0, 2)
351
    for add in add_lines:
352
      f.write(add)
353

    
354
  f.close()
355

    
356

    
357
def _HasValidVG(vglist, vgname):
358
  """Checks if the volume group list is valid.
359

360
  A non-None return value means there's an error, and the return value
361
  is the error message.
362

363
  """
364
  vgsize = vglist.get(vgname, None)
365
  if vgsize is None:
366
    return "volume group '%s' missing" % vgname
367
  elif vgsize < 20480:
368
    return ("volume group '%s' too small (20480MiB required, %dMib found)" %
369
            (vgname, vgsize))
370
  return None
371

    
372

    
373
def _InitSSHSetup(node):
374
  """Setup the SSH configuration for the cluster.
375

376

377
  This generates a dsa keypair for root, adds the pub key to the
378
  permitted hosts and adds the hostkey to its own known hosts.
379

380
  Args:
381
    node: the name of this host as a fqdn
382

383
  """
384
  utils.RemoveFile('/root/.ssh/known_hosts')
385

    
386
  if os.path.exists('/root/.ssh/id_dsa'):
387
    utils.CreateBackup('/root/.ssh/id_dsa')
388
  if os.path.exists('/root/.ssh/id_dsa.pub'):
389
    utils.CreateBackup('/root/.ssh/id_dsa.pub')
390

    
391
  utils.RemoveFile('/root/.ssh/id_dsa')
392
  utils.RemoveFile('/root/.ssh/id_dsa.pub')
393

    
394
  result = utils.RunCmd(["ssh-keygen", "-t", "dsa",
395
                         "-f", "/root/.ssh/id_dsa",
396
                         "-q", "-N", ""])
397
  if result.failed:
398
    raise errors.OpExecError, ("could not generate ssh keypair, error %s" %
399
                               result.output)
400

    
401
  f = open('/root/.ssh/id_dsa.pub', 'r')
402
  try:
403
    utils.AddAuthorizedKey('/root/.ssh/authorized_keys', f.read(8192))
404
  finally:
405
    f.close()
406

    
407

    
408
def _InitGanetiServerSetup(ss):
409
  """Setup the necessary configuration for the initial node daemon.
410

411
  This creates the nodepass file containing the shared password for
412
  the cluster and also generates the SSL certificate.
413

414
  """
415
  # Create pseudo random password
416
  randpass = sha.new(os.urandom(64)).hexdigest()
417
  # and write it into sstore
418
  ss.SetKey(ss.SS_NODED_PASS, randpass)
419

    
420
  result = utils.RunCmd(["openssl", "req", "-new", "-newkey", "rsa:1024",
421
                         "-days", str(365*5), "-nodes", "-x509",
422
                         "-keyout", constants.SSL_CERT_FILE,
423
                         "-out", constants.SSL_CERT_FILE, "-batch"])
424
  if result.failed:
425
    raise errors.OpExecError, ("could not generate server ssl cert, command"
426
                               " %s had exitcode %s and error message %s" %
427
                               (result.cmd, result.exit_code, result.output))
428

    
429
  os.chmod(constants.SSL_CERT_FILE, 0400)
430

    
431
  result = utils.RunCmd([constants.NODE_INITD_SCRIPT, "restart"])
432

    
433
  if result.failed:
434
    raise errors.OpExecError, ("could not start the node daemon, command %s"
435
                               " had exitcode %s and error %s" %
436
                               (result.cmd, result.exit_code, result.output))
437

    
438

    
439
class LUInitCluster(LogicalUnit):
440
  """Initialise the cluster.
441

442
  """
443
  HPATH = "cluster-init"
444
  HTYPE = constants.HTYPE_CLUSTER
445
  _OP_REQP = ["cluster_name", "hypervisor_type", "vg_name", "mac_prefix",
446
              "def_bridge", "master_netdev"]
447
  REQ_CLUSTER = False
448

    
449
  def BuildHooksEnv(self):
450
    """Build hooks env.
451

452
    Notes: Since we don't require a cluster, we must manually add
453
    ourselves in the post-run node list.
454

455
    """
456

    
457
    env = {"CLUSTER": self.op.cluster_name,
458
           "MASTER": self.hostname['hostname_full']}
459
    return env, [], [self.hostname['hostname_full']]
460

    
461
  def CheckPrereq(self):
462
    """Verify that the passed name is a valid one.
463

464
    """
465
    if config.ConfigWriter.IsCluster():
466
      raise errors.OpPrereqError, ("Cluster is already initialised")
467

    
468
    hostname_local = socket.gethostname()
469
    self.hostname = hostname = utils.LookupHostname(hostname_local)
470
    if not hostname:
471
      raise errors.OpPrereqError, ("Cannot resolve my own hostname ('%s')" %
472
                                   hostname_local)
473

    
474
    self.clustername = clustername = utils.LookupHostname(self.op.cluster_name)
475
    if not clustername:
476
      raise errors.OpPrereqError, ("Cannot resolve given cluster name ('%s')"
477
                                   % self.op.cluster_name)
478

    
479
    result = utils.RunCmd(["fping", "-S127.0.0.1", "-q", hostname['ip']])
480
    if result.failed:
481
      raise errors.OpPrereqError, ("Inconsistency: this host's name resolves"
482
                                   " to %s,\nbut this ip address does not"
483
                                   " belong to this host."
484
                                   " Aborting." % hostname['ip'])
485

    
486
    secondary_ip = getattr(self.op, "secondary_ip", None)
487
    if secondary_ip and not utils.IsValidIP(secondary_ip):
488
      raise errors.OpPrereqError, ("Invalid secondary ip given")
489
    if secondary_ip and secondary_ip != hostname['ip']:
490
      result = utils.RunCmd(["fping", "-S127.0.0.1", "-q", secondary_ip])
491
      if result.failed:
492
        raise errors.OpPrereqError, ("You gave %s as secondary IP,\n"
493
                                     "but it does not belong to this host." %
494
                                     secondary_ip)
495
    self.secondary_ip = secondary_ip
496

    
497
    # checks presence of the volume group given
498
    vgstatus = _HasValidVG(utils.ListVolumeGroups(), self.op.vg_name)
499

    
500
    if vgstatus:
501
      raise errors.OpPrereqError, ("Error: %s" % vgstatus)
502

    
503
    if not re.match("^[0-9a-z]{2}:[0-9a-z]{2}:[0-9a-z]{2}$",
504
                    self.op.mac_prefix):
505
      raise errors.OpPrereqError, ("Invalid mac prefix given '%s'" %
506
                                   self.op.mac_prefix)
507

    
508
    if self.op.hypervisor_type not in hypervisor.VALID_HTYPES:
509
      raise errors.OpPrereqError, ("Invalid hypervisor type given '%s'" %
510
                                   self.op.hypervisor_type)
511

    
512
    result = utils.RunCmd(["ip", "link", "show", "dev", self.op.master_netdev])
513
    if result.failed:
514
      raise errors.OpPrereqError, ("Invalid master netdev given (%s): '%s'" %
515
                                   (self.op.master_netdev, result.output))
516

    
517
  def Exec(self, feedback_fn):
518
    """Initialize the cluster.
519

520
    """
521
    clustername = self.clustername
522
    hostname = self.hostname
523

    
524
    # set up the simple store
525
    ss = ssconf.SimpleStore()
526
    ss.SetKey(ss.SS_HYPERVISOR, self.op.hypervisor_type)
527
    ss.SetKey(ss.SS_MASTER_NODE, hostname['hostname_full'])
528
    ss.SetKey(ss.SS_MASTER_IP, clustername['ip'])
529
    ss.SetKey(ss.SS_MASTER_NETDEV, self.op.master_netdev)
530

    
531
    # set up the inter-node password and certificate
532
    _InitGanetiServerSetup(ss)
533

    
534
    # start the master ip
535
    rpc.call_node_start_master(hostname['hostname_full'])
536

    
537
    # set up ssh config and /etc/hosts
538
    f = open('/etc/ssh/ssh_host_rsa_key.pub', 'r')
539
    try:
540
      sshline = f.read()
541
    finally:
542
      f.close()
543
    sshkey = sshline.split(" ")[1]
544

    
545
    _UpdateEtcHosts(hostname['hostname_full'],
546
                    hostname['ip'],
547
                    )
548

    
549
    _UpdateKnownHosts(hostname['hostname_full'],
550
                      hostname['ip'],
551
                      sshkey,
552
                      )
553

    
554
    _InitSSHSetup(hostname['hostname'])
555

    
556
    # init of cluster config file
557
    cfgw = config.ConfigWriter()
558
    cfgw.InitConfig(hostname['hostname'], hostname['ip'], self.secondary_ip,
559
                    clustername['hostname'], sshkey, self.op.mac_prefix,
560
                    self.op.vg_name, self.op.def_bridge)
561

    
562

    
563
class LUDestroyCluster(NoHooksLU):
564
  """Logical unit for destroying the cluster.
565

566
  """
567
  _OP_REQP = []
568

    
569
  def CheckPrereq(self):
570
    """Check prerequisites.
571

572
    This checks whether the cluster is empty.
573

574
    Any errors are signalled by raising errors.OpPrereqError.
575

576
    """
577
    master = self.sstore.GetMasterNode()
578

    
579
    nodelist = self.cfg.GetNodeList()
580
    if len(nodelist) > 0 and nodelist != [master]:
581
      raise errors.OpPrereqError, ("There are still %d node(s) in "
582
                                   "this cluster." % (len(nodelist) - 1))
583

    
584
  def Exec(self, feedback_fn):
585
    """Destroys the cluster.
586

587
    """
588
    utils.CreateBackup('/root/.ssh/id_dsa')
589
    utils.CreateBackup('/root/.ssh/id_dsa.pub')
590
    rpc.call_node_leave_cluster(self.sstore.GetMasterNode())
591

    
592

    
593
class LUVerifyCluster(NoHooksLU):
594
  """Verifies the cluster status.
595

596
  """
597
  _OP_REQP = []
598

    
599
  def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
600
                  remote_version, feedback_fn):
601
    """Run multiple tests against a node.
602

603
    Test list:
604
      - compares ganeti version
605
      - checks vg existance and size > 20G
606
      - checks config file checksum
607
      - checks ssh to other nodes
608

609
    Args:
610
      node: name of the node to check
611
      file_list: required list of files
612
      local_cksum: dictionary of local files and their checksums
613
    """
614
    # compares ganeti version
615
    local_version = constants.PROTOCOL_VERSION
616
    if not remote_version:
617
      feedback_fn(" - ERROR: connection to %s failed" % (node))
618
      return True
619

    
620
    if local_version != remote_version:
621
      feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
622
                      (local_version, node, remote_version))
623
      return True
624

    
625
    # checks vg existance and size > 20G
626

    
627
    bad = False
628
    if not vglist:
629
      feedback_fn("  - ERROR: unable to check volume groups on node %s." %
630
                      (node,))
631
      bad = True
632
    else:
633
      vgstatus = _HasValidVG(vglist, self.cfg.GetVGName())
634
      if vgstatus:
635
        feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
636
        bad = True
637

    
638
    # checks config file checksum
639
    # checks ssh to any
640

    
641
    if 'filelist' not in node_result:
642
      bad = True
643
      feedback_fn("  - ERROR: node hasn't returned file checksum data")
644
    else:
645
      remote_cksum = node_result['filelist']
646
      for file_name in file_list:
647
        if file_name not in remote_cksum:
648
          bad = True
649
          feedback_fn("  - ERROR: file '%s' missing" % file_name)
650
        elif remote_cksum[file_name] != local_cksum[file_name]:
651
          bad = True
652
          feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
653

    
654
    if 'nodelist' not in node_result:
655
      bad = True
656
      feedback_fn("  - ERROR: node hasn't returned node connectivity data")
657
    else:
658
      if node_result['nodelist']:
659
        bad = True
660
        for node in node_result['nodelist']:
661
          feedback_fn("  - ERROR: communication with node '%s': %s" %
662
                          (node, node_result['nodelist'][node]))
663
    hyp_result = node_result.get('hypervisor', None)
664
    if hyp_result is not None:
665
      feedback_fn("  - ERROR: hypervisor verify failure: '%s'" % hyp_result)
666
    return bad
667

    
668
  def _VerifyInstance(self, instance, node_vol_is, node_instance, feedback_fn):
669
    """Verify an instance.
670

671
    This function checks to see if the required block devices are
672
    available on the instance's node.
673

674
    """
675
    bad = False
676

    
677
    instancelist = self.cfg.GetInstanceList()
678
    if not instance in instancelist:
679
      feedback_fn("  - ERROR: instance %s not in instance list %s" %
680
                      (instance, instancelist))
681
      bad = True
682

    
683
    instanceconfig = self.cfg.GetInstanceInfo(instance)
684
    node_current = instanceconfig.primary_node
685

    
686
    node_vol_should = {}
687
    instanceconfig.MapLVsByNode(node_vol_should)
688

    
689
    for node in node_vol_should:
690
      for volume in node_vol_should[node]:
691
        if node not in node_vol_is or volume not in node_vol_is[node]:
692
          feedback_fn("  - ERROR: volume %s missing on node %s" %
693
                          (volume, node))
694
          bad = True
695

    
696
    if not instanceconfig.status == 'down':
697
      if not instance in node_instance[node_current]:
698
        feedback_fn("  - ERROR: instance %s not running on node %s" %
699
                        (instance, node_current))
700
        bad = True
701

    
702
    for node in node_instance:
703
      if (not node == node_current):
704
        if instance in node_instance[node]:
705
          feedback_fn("  - ERROR: instance %s should not run on node %s" %
706
                          (instance, node))
707
          bad = True
708

    
709
    return not bad
710

    
711
  def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
712
    """Verify if there are any unknown volumes in the cluster.
713

714
    The .os, .swap and backup volumes are ignored. All other volumes are
715
    reported as unknown.
716

717
    """
718
    bad = False
719

    
720
    for node in node_vol_is:
721
      for volume in node_vol_is[node]:
722
        if node not in node_vol_should or volume not in node_vol_should[node]:
723
          feedback_fn("  - ERROR: volume %s on node %s should not exist" %
724
                      (volume, node))
725
          bad = True
726
    return bad
727

    
728

    
729
  def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
730
    """Verify the list of running instances.
731

732
    This checks what instances are running but unknown to the cluster.
733

734
    """
735
    bad = False
736
    for node in node_instance:
737
      for runninginstance in node_instance[node]:
738
        if runninginstance not in instancelist:
739
          feedback_fn("  - ERROR: instance %s on node %s should not exist" %
740
                          (runninginstance, node))
741
          bad = True
742
    return bad
743

    
744
  def _VerifyNodeConfigFiles(self, ismaster, node, file_list, feedback_fn):
745
    """Verify the list of node config files"""
746

    
747
    bad = False
748
    for file_name in constants.MASTER_CONFIGFILES:
749
      if ismaster and file_name not in file_list:
750
        feedback_fn("  - ERROR: master config file %s missing from master"
751
                    " node %s" % (file_name, node))
752
        bad = True
753
      elif not ismaster and file_name in file_list:
754
        feedback_fn("  - ERROR: master config file %s should not exist"
755
                    " on non-master node %s" % (file_name, node))
756
        bad = True
757

    
758
    for file_name in constants.NODE_CONFIGFILES:
759
      if file_name not in file_list:
760
        feedback_fn("  - ERROR: config file %s missing from node %s" %
761
                    (file_name, node))
762
        bad = True
763

    
764
    return bad
765

    
766
  def CheckPrereq(self):
767
    """Check prerequisites.
768

769
    This has no prerequisites.
770

771
    """
772
    pass
773

    
774
  def Exec(self, feedback_fn):
775
    """Verify integrity of cluster, performing various test on nodes.
776

777
    """
778
    bad = False
779
    feedback_fn("* Verifying global settings")
780
    self.cfg.VerifyConfig()
781

    
782
    master = self.sstore.GetMasterNode()
783
    vg_name = self.cfg.GetVGName()
784
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
785
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
786
    node_volume = {}
787
    node_instance = {}
788

    
789
    # FIXME: verify OS list
790
    # do local checksums
791
    file_names = constants.CLUSTER_CONF_FILES
792
    local_checksums = utils.FingerprintFiles(file_names)
793

    
794
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
795
    all_configfile = rpc.call_configfile_list(nodelist)
796
    all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
797
    all_instanceinfo = rpc.call_instance_list(nodelist)
798
    all_vglist = rpc.call_vg_list(nodelist)
799
    node_verify_param = {
800
      'filelist': file_names,
801
      'nodelist': nodelist,
802
      'hypervisor': None,
803
      }
804
    all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
805
    all_rversion = rpc.call_version(nodelist)
806

    
807
    for node in nodelist:
808
      feedback_fn("* Verifying node %s" % node)
809
      result = self._VerifyNode(node, file_names, local_checksums,
810
                                all_vglist[node], all_nvinfo[node],
811
                                all_rversion[node], feedback_fn)
812
      bad = bad or result
813
      # node_configfile
814
      nodeconfigfile = all_configfile[node]
815

    
816
      if not nodeconfigfile:
817
        feedback_fn("  - ERROR: connection to %s failed" % (node))
818
        bad = True
819
        continue
820

    
821
      bad = bad or self._VerifyNodeConfigFiles(node==master, node,
822
                                               nodeconfigfile, feedback_fn)
823

    
824
      # node_volume
825
      volumeinfo = all_volumeinfo[node]
826

    
827
      if type(volumeinfo) != dict:
828
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
829
        bad = True
830
        continue
831

    
832
      node_volume[node] = volumeinfo
833

    
834
      # node_instance
835
      nodeinstance = all_instanceinfo[node]
836
      if type(nodeinstance) != list:
837
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
838
        bad = True
839
        continue
840

    
841
      node_instance[node] = nodeinstance
842

    
843
    node_vol_should = {}
844

    
845
    for instance in instancelist:
846
      feedback_fn("* Verifying instance %s" % instance)
847
      result =  self._VerifyInstance(instance, node_volume, node_instance,
848
                                     feedback_fn)
849
      bad = bad or result
850

    
851
      inst_config = self.cfg.GetInstanceInfo(instance)
852

    
853
      inst_config.MapLVsByNode(node_vol_should)
854

    
855
    feedback_fn("* Verifying orphan volumes")
856
    result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
857
                                       feedback_fn)
858
    bad = bad or result
859

    
860
    feedback_fn("* Verifying remaining instances")
861
    result = self._VerifyOrphanInstances(instancelist, node_instance,
862
                                         feedback_fn)
863
    bad = bad or result
864

    
865
    return int(bad)
866

    
867

    
868
def _WaitForSync(cfgw, instance, oneshot=False, unlock=False):
869
  """Sleep and poll for an instance's disk to sync.
870

871
  """
872
  if not instance.disks:
873
    return True
874

    
875
  if not oneshot:
876
    logger.ToStdout("Waiting for instance %s to sync disks." % instance.name)
877

    
878
  node = instance.primary_node
879

    
880
  for dev in instance.disks:
881
    cfgw.SetDiskID(dev, node)
882

    
883
  retries = 0
884
  while True:
885
    max_time = 0
886
    done = True
887
    cumul_degraded = False
888
    rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
889
    if not rstats:
890
      logger.ToStderr("Can't get any data from node %s" % node)
891
      retries += 1
892
      if retries >= 10:
893
        raise errors.RemoteError, ("Can't contact node %s for mirror data,"
894
                                   " aborting." % node)
895
      time.sleep(6)
896
      continue
897
    retries = 0
898
    for i in range(len(rstats)):
899
      mstat = rstats[i]
900
      if mstat is None:
901
        logger.ToStderr("Can't compute data for node %s/%s" %
902
                        (node, instance.disks[i].iv_name))
903
        continue
904
      perc_done, est_time, is_degraded = mstat
905
      cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
906
      if perc_done is not None:
907
        done = False
908
        if est_time is not None:
909
          rem_time = "%d estimated seconds remaining" % est_time
910
          max_time = est_time
911
        else:
912
          rem_time = "no time estimate"
913
        logger.ToStdout("- device %s: %5.2f%% done, %s" %
914
                        (instance.disks[i].iv_name, perc_done, rem_time))
915
    if done or oneshot:
916
      break
917

    
918
    if unlock:
919
      utils.Unlock('cmd')
920
    try:
921
      time.sleep(min(60, max_time))
922
    finally:
923
      if unlock:
924
        utils.Lock('cmd')
925

    
926
  if done:
927
    logger.ToStdout("Instance %s's disks are in sync." % instance.name)
928
  return not cumul_degraded
929

    
930

    
931
def _CheckDiskConsistency(cfgw, dev, node, on_primary):
932
  """Check that mirrors are not degraded.
933

934
  """
935

    
936
  cfgw.SetDiskID(dev, node)
937

    
938
  result = True
939
  if on_primary or dev.AssembleOnSecondary():
940
    rstats = rpc.call_blockdev_find(node, dev)
941
    if not rstats:
942
      logger.ToStderr("Can't get any data from node %s" % node)
943
      result = False
944
    else:
945
      result = result and (not rstats[5])
946
  if dev.children:
947
    for child in dev.children:
948
      result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
949

    
950
  return result
951

    
952

    
953
class LUDiagnoseOS(NoHooksLU):
954
  """Logical unit for OS diagnose/query.
955

956
  """
957
  _OP_REQP = []
958

    
959
  def CheckPrereq(self):
960
    """Check prerequisites.
961

962
    This always succeeds, since this is a pure query LU.
963

964
    """
965
    return
966

    
967
  def Exec(self, feedback_fn):
968
    """Compute the list of OSes.
969

970
    """
971
    node_list = self.cfg.GetNodeList()
972
    node_data = rpc.call_os_diagnose(node_list)
973
    if node_data == False:
974
      raise errors.OpExecError, "Can't gather the list of OSes"
975
    return node_data
976

    
977

    
978
class LURemoveNode(LogicalUnit):
979
  """Logical unit for removing a node.
980

981
  """
982
  HPATH = "node-remove"
983
  HTYPE = constants.HTYPE_NODE
984
  _OP_REQP = ["node_name"]
985

    
986
  def BuildHooksEnv(self):
987
    """Build hooks env.
988

989
    This doesn't run on the target node in the pre phase as a failed
990
    node would not allows itself to run.
991

992
    """
993
    all_nodes = self.cfg.GetNodeList()
994
    all_nodes.remove(self.op.node_name)
995
    return {"NODE_NAME": self.op.node_name}, all_nodes, all_nodes
996

    
997
  def CheckPrereq(self):
998
    """Check prerequisites.
999

1000
    This checks:
1001
     - the node exists in the configuration
1002
     - it does not have primary or secondary instances
1003
     - it's not the master
1004

1005
    Any errors are signalled by raising errors.OpPrereqError.
1006

1007
    """
1008

    
1009
    node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1010
    if node is None:
1011
      logger.Error("Error: Node '%s' is unknown." % self.op.node_name)
1012
      return 1
1013

    
1014
    instance_list = self.cfg.GetInstanceList()
1015

    
1016
    masternode = self.sstore.GetMasterNode()
1017
    if node.name == masternode:
1018
      raise errors.OpPrereqError, ("Node is the master node,"
1019
                                   " you need to failover first.")
1020

    
1021
    for instance_name in instance_list:
1022
      instance = self.cfg.GetInstanceInfo(instance_name)
1023
      if node.name == instance.primary_node:
1024
        raise errors.OpPrereqError, ("Instance %s still running on the node,"
1025
                                     " please remove first." % instance_name)
1026
      if node.name in instance.secondary_nodes:
1027
        raise errors.OpPrereqError, ("Instance %s has node as a secondary,"
1028
                                     " please remove first." % instance_name)
1029
    self.op.node_name = node.name
1030
    self.node = node
1031

    
1032
  def Exec(self, feedback_fn):
1033
    """Removes the node from the cluster.
1034

1035
    """
1036
    node = self.node
1037
    logger.Info("stopping the node daemon and removing configs from node %s" %
1038
                node.name)
1039

    
1040
    rpc.call_node_leave_cluster(node.name)
1041

    
1042
    ssh.SSHCall(node.name, 'root', "%s stop" % constants.NODE_INITD_SCRIPT)
1043

    
1044
    logger.Info("Removing node %s from config" % node.name)
1045

    
1046
    self.cfg.RemoveNode(node.name)
1047

    
1048

    
1049
class LUQueryNodes(NoHooksLU):
1050
  """Logical unit for querying nodes.
1051

1052
  """
1053
  _OP_REQP = ["output_fields"]
1054

    
1055
  def CheckPrereq(self):
1056
    """Check prerequisites.
1057

1058
    This checks that the fields required are valid output fields.
1059

1060
    """
1061
    self.dynamic_fields = frozenset(["dtotal", "dfree",
1062
                                     "mtotal", "mnode", "mfree"])
1063

    
1064
    _CheckOutputFields(static=["name", "pinst", "sinst", "pip", "sip"],
1065
                       dynamic=self.dynamic_fields,
1066
                       selected=self.op.output_fields)
1067

    
1068

    
1069
  def Exec(self, feedback_fn):
1070
    """Computes the list of nodes and their attributes.
1071

1072
    """
1073
    nodenames = utils.NiceSort(self.cfg.GetNodeList())
1074
    nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1075

    
1076

    
1077
    # begin data gathering
1078

    
1079
    if self.dynamic_fields.intersection(self.op.output_fields):
1080
      live_data = {}
1081
      node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1082
      for name in nodenames:
1083
        nodeinfo = node_data.get(name, None)
1084
        if nodeinfo:
1085
          live_data[name] = {
1086
            "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1087
            "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1088
            "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1089
            "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1090
            "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1091
            }
1092
        else:
1093
          live_data[name] = {}
1094
    else:
1095
      live_data = dict.fromkeys(nodenames, {})
1096

    
1097
    node_to_primary = dict.fromkeys(nodenames, 0)
1098
    node_to_secondary = dict.fromkeys(nodenames, 0)
1099

    
1100
    if "pinst" in self.op.output_fields or "sinst" in self.op.output_fields:
1101
      instancelist = self.cfg.GetInstanceList()
1102

    
1103
      for instance in instancelist:
1104
        instanceinfo = self.cfg.GetInstanceInfo(instance)
1105
        node_to_primary[instanceinfo.primary_node] += 1
1106
        for secnode in instanceinfo.secondary_nodes:
1107
          node_to_secondary[secnode] += 1
1108

    
1109
    # end data gathering
1110

    
1111
    output = []
1112
    for node in nodelist:
1113
      node_output = []
1114
      for field in self.op.output_fields:
1115
        if field == "name":
1116
          val = node.name
1117
        elif field == "pinst":
1118
          val = node_to_primary[node.name]
1119
        elif field == "sinst":
1120
          val = node_to_secondary[node.name]
1121
        elif field == "pip":
1122
          val = node.primary_ip
1123
        elif field == "sip":
1124
          val = node.secondary_ip
1125
        elif field in self.dynamic_fields:
1126
          val = live_data[node.name].get(field, "?")
1127
        else:
1128
          raise errors.ParameterError, field
1129
        val = str(val)
1130
        node_output.append(val)
1131
      output.append(node_output)
1132

    
1133
    return output
1134

    
1135

    
1136
class LUQueryNodeVolumes(NoHooksLU):
1137
  """Logical unit for getting volumes on node(s).
1138

1139
  """
1140
  _OP_REQP = ["nodes", "output_fields"]
1141

    
1142
  def CheckPrereq(self):
1143
    """Check prerequisites.
1144

1145
    This checks that the fields required are valid output fields.
1146

1147
    """
1148
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1149

    
1150
    _CheckOutputFields(static=["node"],
1151
                       dynamic=["phys", "vg", "name", "size", "instance"],
1152
                       selected=self.op.output_fields)
1153

    
1154

    
1155
  def Exec(self, feedback_fn):
1156
    """Computes the list of nodes and their attributes.
1157

1158
    """
1159
    nodenames = utils.NiceSort([node.name for node in self.nodes])
1160
    volumes = rpc.call_node_volumes(nodenames)
1161

    
1162
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
1163
             in self.cfg.GetInstanceList()]
1164

    
1165
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1166

    
1167
    output = []
1168
    for node in nodenames:
1169
      node_vols = volumes[node][:]
1170
      node_vols.sort(key=lambda vol: vol['dev'])
1171

    
1172
      for vol in node_vols:
1173
        node_output = []
1174
        for field in self.op.output_fields:
1175
          if field == "node":
1176
            val = node
1177
          elif field == "phys":
1178
            val = vol['dev']
1179
          elif field == "vg":
1180
            val = vol['vg']
1181
          elif field == "name":
1182
            val = vol['name']
1183
          elif field == "size":
1184
            val = int(float(vol['size']))
1185
          elif field == "instance":
1186
            for inst in ilist:
1187
              if node not in lv_by_node[inst]:
1188
                continue
1189
              if vol['name'] in lv_by_node[inst][node]:
1190
                val = inst.name
1191
                break
1192
            else:
1193
              val = '-'
1194
          else:
1195
            raise errors.ParameterError, field
1196
          node_output.append(str(val))
1197

    
1198
        output.append(node_output)
1199

    
1200
    return output
1201

    
1202

    
1203
def _CheckNodesDirs(node_list, paths):
1204
  """Verify if the given nodes have the same files.
1205

1206
  Args:
1207
    node_list: the list of node names to check
1208
    paths: the list of directories to checksum and compare
1209

1210
  Returns:
1211
    list of (node, different_file, message); if empty, the files are in sync
1212

1213
  """
1214
  file_names = []
1215
  for dir_name in paths:
1216
    flist = [os.path.join(dir_name, name) for name in os.listdir(dir_name)]
1217
    flist = [name for name in flist if os.path.isfile(name)]
1218
    file_names.extend(flist)
1219

    
1220
  local_checksums = utils.FingerprintFiles(file_names)
1221

    
1222
  results = []
1223
  verify_params = {'filelist': file_names}
1224
  all_node_results = rpc.call_node_verify(node_list, verify_params)
1225
  for node_name in node_list:
1226
    node_result = all_node_results.get(node_name, False)
1227
    if not node_result or 'filelist' not in node_result:
1228
      results.append((node_name, "'all files'", "node communication error"))
1229
      continue
1230
    remote_checksums = node_result['filelist']
1231
    for fname in local_checksums:
1232
      if fname not in remote_checksums:
1233
        results.append((node_name, fname, "missing file"))
1234
      elif remote_checksums[fname] != local_checksums[fname]:
1235
        results.append((node_name, fname, "wrong checksum"))
1236
  return results
1237

    
1238

    
1239
class LUAddNode(LogicalUnit):
1240
  """Logical unit for adding node to the cluster.
1241

1242
  """
1243
  HPATH = "node-add"
1244
  HTYPE = constants.HTYPE_NODE
1245
  _OP_REQP = ["node_name"]
1246

    
1247
  def BuildHooksEnv(self):
1248
    """Build hooks env.
1249

1250
    This will run on all nodes before, and on all nodes + the new node after.
1251

1252
    """
1253
    env = {
1254
      "NODE_NAME": self.op.node_name,
1255
      "NODE_PIP": self.op.primary_ip,
1256
      "NODE_SIP": self.op.secondary_ip,
1257
      }
1258
    nodes_0 = self.cfg.GetNodeList()
1259
    nodes_1 = nodes_0 + [self.op.node_name, ]
1260
    return env, nodes_0, nodes_1
1261

    
1262
  def CheckPrereq(self):
1263
    """Check prerequisites.
1264

1265
    This checks:
1266
     - the new node is not already in the config
1267
     - it is resolvable
1268
     - its parameters (single/dual homed) matches the cluster
1269

1270
    Any errors are signalled by raising errors.OpPrereqError.
1271

1272
    """
1273
    node_name = self.op.node_name
1274
    cfg = self.cfg
1275

    
1276
    dns_data = utils.LookupHostname(node_name)
1277
    if not dns_data:
1278
      raise errors.OpPrereqError, ("Node %s is not resolvable" % node_name)
1279

    
1280
    node = dns_data['hostname']
1281
    primary_ip = self.op.primary_ip = dns_data['ip']
1282
    secondary_ip = getattr(self.op, "secondary_ip", None)
1283
    if secondary_ip is None:
1284
      secondary_ip = primary_ip
1285
    if not utils.IsValidIP(secondary_ip):
1286
      raise errors.OpPrereqError, ("Invalid secondary IP given")
1287
    self.op.secondary_ip = secondary_ip
1288
    node_list = cfg.GetNodeList()
1289
    if node in node_list:
1290
      raise errors.OpPrereqError, ("Node %s is already in the configuration"
1291
                                   % node)
1292

    
1293
    for existing_node_name in node_list:
1294
      existing_node = cfg.GetNodeInfo(existing_node_name)
1295
      if (existing_node.primary_ip == primary_ip or
1296
          existing_node.secondary_ip == primary_ip or
1297
          existing_node.primary_ip == secondary_ip or
1298
          existing_node.secondary_ip == secondary_ip):
1299
        raise errors.OpPrereqError, ("New node ip address(es) conflict with"
1300
                                     " existing node %s" % existing_node.name)
1301

    
1302
    # check that the type of the node (single versus dual homed) is the
1303
    # same as for the master
1304
    myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1305
    master_singlehomed = myself.secondary_ip == myself.primary_ip
1306
    newbie_singlehomed = secondary_ip == primary_ip
1307
    if master_singlehomed != newbie_singlehomed:
1308
      if master_singlehomed:
1309
        raise errors.OpPrereqError, ("The master has no private ip but the"
1310
                                     " new node has one")
1311
      else:
1312
        raise errors.OpPrereqError ("The master has a private ip but the"
1313
                                    " new node doesn't have one")
1314

    
1315
    # checks reachablity
1316
    command = ["fping", "-q", primary_ip]
1317
    result = utils.RunCmd(command)
1318
    if result.failed:
1319
      raise errors.OpPrereqError, ("Node not reachable by ping")
1320

    
1321
    if not newbie_singlehomed:
1322
      # check reachability from my secondary ip to newbie's secondary ip
1323
      command = ["fping", "-S%s" % myself.secondary_ip, "-q", secondary_ip]
1324
      result = utils.RunCmd(command)
1325
      if result.failed:
1326
        raise errors.OpPrereqError, ("Node secondary ip not reachable by ping")
1327

    
1328
    self.new_node = objects.Node(name=node,
1329
                                 primary_ip=primary_ip,
1330
                                 secondary_ip=secondary_ip)
1331

    
1332
  def Exec(self, feedback_fn):
1333
    """Adds the new node to the cluster.
1334

1335
    """
1336
    new_node = self.new_node
1337
    node = new_node.name
1338

    
1339
    # set up inter-node password and certificate and restarts the node daemon
1340
    gntpass = self.sstore.GetNodeDaemonPassword()
1341
    if not re.match('^[a-zA-Z0-9.]{1,64}$', gntpass):
1342
      raise errors.OpExecError, ("ganeti password corruption detected")
1343
    f = open(constants.SSL_CERT_FILE)
1344
    try:
1345
      gntpem = f.read(8192)
1346
    finally:
1347
      f.close()
1348
    # in the base64 pem encoding, neither '!' nor '.' are valid chars,
1349
    # so we use this to detect an invalid certificate; as long as the
1350
    # cert doesn't contain this, the here-document will be correctly
1351
    # parsed by the shell sequence below
1352
    if re.search('^!EOF\.', gntpem, re.MULTILINE):
1353
      raise errors.OpExecError, ("invalid PEM encoding in the SSL certificate")
1354
    if not gntpem.endswith("\n"):
1355
      raise errors.OpExecError, ("PEM must end with newline")
1356
    logger.Info("copy cluster pass to %s and starting the node daemon" % node)
1357

    
1358
    # remove first the root's known_hosts file
1359
    utils.RemoveFile("/root/.ssh/known_hosts")
1360
    # and then connect with ssh to set password and start ganeti-noded
1361
    # note that all the below variables are sanitized at this point,
1362
    # either by being constants or by the checks above
1363
    ss = self.sstore
1364
    mycommand = ("umask 077 && "
1365
                 "echo '%s' > '%s' && "
1366
                 "cat > '%s' << '!EOF.' && \n"
1367
                 "%s!EOF.\n%s restart" %
1368
                 (gntpass, ss.KeyToFilename(ss.SS_NODED_PASS),
1369
                  constants.SSL_CERT_FILE, gntpem,
1370
                  constants.NODE_INITD_SCRIPT))
1371

    
1372
    result = ssh.SSHCall(node, 'root', mycommand, batch=False, ask_key=True)
1373
    if result.failed:
1374
      raise errors.OpExecError, ("Remote command on node %s, error: %s,"
1375
                                 " output: %s" %
1376
                                 (node, result.fail_reason, result.output))
1377

    
1378
    # check connectivity
1379
    time.sleep(4)
1380

    
1381
    result = rpc.call_version([node])[node]
1382
    if result:
1383
      if constants.PROTOCOL_VERSION == result:
1384
        logger.Info("communication to node %s fine, sw version %s match" %
1385
                    (node, result))
1386
      else:
1387
        raise errors.OpExecError, ("Version mismatch master version %s,"
1388
                                   " node version %s" %
1389
                                   (constants.PROTOCOL_VERSION, result))
1390
    else:
1391
      raise errors.OpExecError, ("Cannot get version from the new node")
1392

    
1393
    # setup ssh on node
1394
    logger.Info("copy ssh key to node %s" % node)
1395
    keyarray = []
1396
    keyfiles = ["/etc/ssh/ssh_host_dsa_key", "/etc/ssh/ssh_host_dsa_key.pub",
1397
                "/etc/ssh/ssh_host_rsa_key", "/etc/ssh/ssh_host_rsa_key.pub",
1398
                "/root/.ssh/id_dsa", "/root/.ssh/id_dsa.pub"]
1399

    
1400
    for i in keyfiles:
1401
      f = open(i, 'r')
1402
      try:
1403
        keyarray.append(f.read())
1404
      finally:
1405
        f.close()
1406

    
1407
    result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1408
                               keyarray[3], keyarray[4], keyarray[5])
1409

    
1410
    if not result:
1411
      raise errors.OpExecError, ("Cannot transfer ssh keys to the new node")
1412

    
1413
    # Add node to our /etc/hosts, and add key to known_hosts
1414
    _UpdateEtcHosts(new_node.name, new_node.primary_ip)
1415
    _UpdateKnownHosts(new_node.name, new_node.primary_ip,
1416
                      self.cfg.GetHostKey())
1417

    
1418
    if new_node.secondary_ip != new_node.primary_ip:
1419
      result = ssh.SSHCall(node, "root",
1420
                           "fping -S 127.0.0.1 -q %s" % new_node.secondary_ip)
1421
      if result.failed:
1422
        raise errors.OpExecError, ("Node claims it doesn't have the"
1423
                                   " secondary ip you gave (%s).\n"
1424
                                   "Please fix and re-run this command." %
1425
                                   new_node.secondary_ip)
1426

    
1427
    # Distribute updated /etc/hosts and known_hosts to all nodes,
1428
    # including the node just added
1429
    myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1430
    dist_nodes = self.cfg.GetNodeList() + [node]
1431
    if myself.name in dist_nodes:
1432
      dist_nodes.remove(myself.name)
1433

    
1434
    logger.Debug("Copying hosts and known_hosts to all nodes")
1435
    for fname in ("/etc/hosts", "/etc/ssh/ssh_known_hosts"):
1436
      result = rpc.call_upload_file(dist_nodes, fname)
1437
      for to_node in dist_nodes:
1438
        if not result[to_node]:
1439
          logger.Error("copy of file %s to node %s failed" %
1440
                       (fname, to_node))
1441

    
1442
    to_copy = [constants.MASTER_CRON_FILE]
1443
    to_copy.extend(ss.GetFileList())
1444
    for fname in to_copy:
1445
      if not ssh.CopyFileToNode(node, fname):
1446
        logger.Error("could not copy file %s to node %s" % (fname, node))
1447

    
1448
    logger.Info("adding node %s to cluster.conf" % node)
1449
    self.cfg.AddNode(new_node)
1450

    
1451

    
1452
class LUMasterFailover(LogicalUnit):
1453
  """Failover the master node to the current node.
1454

1455
  This is a special LU in that it must run on a non-master node.
1456

1457
  """
1458
  HPATH = "master-failover"
1459
  HTYPE = constants.HTYPE_CLUSTER
1460
  REQ_MASTER = False
1461
  _OP_REQP = []
1462

    
1463
  def BuildHooksEnv(self):
1464
    """Build hooks env.
1465

1466
    This will run on the new master only in the pre phase, and on all
1467
    the nodes in the post phase.
1468

1469
    """
1470
    env = {
1471
      "NEW_MASTER": self.new_master,
1472
      "OLD_MASTER": self.old_master,
1473
      }
1474
    return env, [self.new_master], self.cfg.GetNodeList()
1475

    
1476
  def CheckPrereq(self):
1477
    """Check prerequisites.
1478

1479
    This checks that we are not already the master.
1480

1481
    """
1482
    self.new_master = socket.gethostname()
1483

    
1484
    self.old_master = self.sstore.GetMasterNode()
1485

    
1486
    if self.old_master == self.new_master:
1487
      raise errors.OpPrereqError, ("This commands must be run on the node"
1488
                                   " where you want the new master to be.\n"
1489
                                   "%s is already the master" %
1490
                                   self.old_master)
1491

    
1492
  def Exec(self, feedback_fn):
1493
    """Failover the master node.
1494

1495
    This command, when run on a non-master node, will cause the current
1496
    master to cease being master, and the non-master to become new
1497
    master.
1498

1499
    """
1500

    
1501
    #TODO: do not rely on gethostname returning the FQDN
1502
    logger.Info("setting master to %s, old master: %s" %
1503
                (self.new_master, self.old_master))
1504

    
1505
    if not rpc.call_node_stop_master(self.old_master):
1506
      logger.Error("could disable the master role on the old master"
1507
                   " %s, please disable manually" % self.old_master)
1508

    
1509
    ss = self.sstore
1510
    ss.SetKey(ss.SS_MASTER_NODE, self.new_master)
1511
    if not rpc.call_upload_file(self.cfg.GetNodeList(),
1512
                                ss.KeyToFilename(ss.SS_MASTER_NODE)):
1513
      logger.Error("could not distribute the new simple store master file"
1514
                   " to the other nodes, please check.")
1515

    
1516
    if not rpc.call_node_start_master(self.new_master):
1517
      logger.Error("could not start the master role on the new master"
1518
                   " %s, please check" % self.new_master)
1519
      feedback_fn("Error in activating the master IP on the new master,\n"
1520
                  "please fix manually.")
1521

    
1522

    
1523

    
1524
class LUQueryClusterInfo(NoHooksLU):
1525
  """Query cluster configuration.
1526

1527
  """
1528
  _OP_REQP = []
1529

    
1530
  def CheckPrereq(self):
1531
    """No prerequsites needed for this LU.
1532

1533
    """
1534
    pass
1535

    
1536
  def Exec(self, feedback_fn):
1537
    """Return cluster config.
1538

1539
    """
1540
    instances = [self.cfg.GetInstanceInfo(name)
1541
                 for name in self.cfg.GetInstanceList()]
1542
    result = {
1543
      "name": self.cfg.GetClusterName(),
1544
      "software_version": constants.RELEASE_VERSION,
1545
      "protocol_version": constants.PROTOCOL_VERSION,
1546
      "config_version": constants.CONFIG_VERSION,
1547
      "os_api_version": constants.OS_API_VERSION,
1548
      "export_version": constants.EXPORT_VERSION,
1549
      "master": self.sstore.GetMasterNode(),
1550
      "architecture": (platform.architecture()[0], platform.machine()),
1551
      "instances": [(instance.name, instance.primary_node)
1552
                    for instance in instances],
1553
      "nodes": self.cfg.GetNodeList(),
1554
      }
1555

    
1556
    return result
1557

    
1558

    
1559
class LUClusterCopyFile(NoHooksLU):
1560
  """Copy file to cluster.
1561

1562
  """
1563
  _OP_REQP = ["nodes", "filename"]
1564

    
1565
  def CheckPrereq(self):
1566
    """Check prerequisites.
1567

1568
    It should check that the named file exists and that the given list
1569
    of nodes is valid.
1570

1571
    """
1572
    if not os.path.exists(self.op.filename):
1573
      raise errors.OpPrereqError("No such filename '%s'" % self.op.filename)
1574

    
1575
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1576

    
1577
  def Exec(self, feedback_fn):
1578
    """Copy a file from master to some nodes.
1579

1580
    Args:
1581
      opts - class with options as members
1582
      args - list containing a single element, the file name
1583
    Opts used:
1584
      nodes - list containing the name of target nodes; if empty, all nodes
1585

1586
    """
1587
    filename = self.op.filename
1588

    
1589
    myname = socket.gethostname()
1590

    
1591
    for node in self.nodes:
1592
      if node == myname:
1593
        continue
1594
      if not ssh.CopyFileToNode(node, filename):
1595
        logger.Error("Copy of file %s to node %s failed" % (filename, node))
1596

    
1597

    
1598
class LUDumpClusterConfig(NoHooksLU):
1599
  """Return a text-representation of the cluster-config.
1600

1601
  """
1602
  _OP_REQP = []
1603

    
1604
  def CheckPrereq(self):
1605
    """No prerequisites.
1606

1607
    """
1608
    pass
1609

    
1610
  def Exec(self, feedback_fn):
1611
    """Dump a representation of the cluster config to the standard output.
1612

1613
    """
1614
    return self.cfg.DumpConfig()
1615

    
1616

    
1617
class LURunClusterCommand(NoHooksLU):
1618
  """Run a command on some nodes.
1619

1620
  """
1621
  _OP_REQP = ["command", "nodes"]
1622

    
1623
  def CheckPrereq(self):
1624
    """Check prerequisites.
1625

1626
    It checks that the given list of nodes is valid.
1627

1628
    """
1629
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1630

    
1631
  def Exec(self, feedback_fn):
1632
    """Run a command on some nodes.
1633

1634
    """
1635
    data = []
1636
    for node in self.nodes:
1637
      result = utils.RunCmd(["ssh", node.name, self.op.command])
1638
      data.append((node.name, result.cmd, result.output, result.exit_code))
1639

    
1640
    return data
1641

    
1642

    
1643
class LUActivateInstanceDisks(NoHooksLU):
1644
  """Bring up an instance's disks.
1645

1646
  """
1647
  _OP_REQP = ["instance_name"]
1648

    
1649
  def CheckPrereq(self):
1650
    """Check prerequisites.
1651

1652
    This checks that the instance is in the cluster.
1653

1654
    """
1655
    instance = self.cfg.GetInstanceInfo(
1656
      self.cfg.ExpandInstanceName(self.op.instance_name))
1657
    if instance is None:
1658
      raise errors.OpPrereqError, ("Instance '%s' not known" %
1659
                                   self.op.instance_name)
1660
    self.instance = instance
1661

    
1662

    
1663
  def Exec(self, feedback_fn):
1664
    """Activate the disks.
1665

1666
    """
1667
    disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
1668
    if not disks_ok:
1669
      raise errors.OpExecError, ("Cannot activate block devices")
1670

    
1671
    return disks_info
1672

    
1673

    
1674
def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
1675
  """Prepare the block devices for an instance.
1676

1677
  This sets up the block devices on all nodes.
1678

1679
  Args:
1680
    instance: a ganeti.objects.Instance object
1681
    ignore_secondaries: if true, errors on secondary nodes won't result
1682
                        in an error return from the function
1683

1684
  Returns:
1685
    false if the operation failed
1686
    list of (host, instance_visible_name, node_visible_name) if the operation
1687
         suceeded with the mapping from node devices to instance devices
1688
  """
1689
  device_info = []
1690
  disks_ok = True
1691
  for inst_disk in instance.disks:
1692
    master_result = None
1693
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1694
      cfg.SetDiskID(node_disk, node)
1695
      is_primary = node == instance.primary_node
1696
      result = rpc.call_blockdev_assemble(node, node_disk, is_primary)
1697
      if not result:
1698
        logger.Error("could not prepare block device %s on node %s (is_pri"
1699
                     "mary=%s)" % (inst_disk.iv_name, node, is_primary))
1700
        if is_primary or not ignore_secondaries:
1701
          disks_ok = False
1702
      if is_primary:
1703
        master_result = result
1704
    device_info.append((instance.primary_node, inst_disk.iv_name,
1705
                        master_result))
1706

    
1707
  return disks_ok, device_info
1708

    
1709

    
1710
class LUDeactivateInstanceDisks(NoHooksLU):
1711
  """Shutdown an instance's disks.
1712

1713
  """
1714
  _OP_REQP = ["instance_name"]
1715

    
1716
  def CheckPrereq(self):
1717
    """Check prerequisites.
1718

1719
    This checks that the instance is in the cluster.
1720

1721
    """
1722
    instance = self.cfg.GetInstanceInfo(
1723
      self.cfg.ExpandInstanceName(self.op.instance_name))
1724
    if instance is None:
1725
      raise errors.OpPrereqError, ("Instance '%s' not known" %
1726
                                   self.op.instance_name)
1727
    self.instance = instance
1728

    
1729
  def Exec(self, feedback_fn):
1730
    """Deactivate the disks
1731

1732
    """
1733
    instance = self.instance
1734
    ins_l = rpc.call_instance_list([instance.primary_node])
1735
    ins_l = ins_l[instance.primary_node]
1736
    if not type(ins_l) is list:
1737
      raise errors.OpExecError, ("Can't contact node '%s'" %
1738
                                 instance.primary_node)
1739

    
1740
    if self.instance.name in ins_l:
1741
      raise errors.OpExecError, ("Instance is running, can't shutdown"
1742
                                 " block devices.")
1743

    
1744
    _ShutdownInstanceDisks(instance, self.cfg)
1745

    
1746

    
1747
def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
1748
  """Shutdown block devices of an instance.
1749

1750
  This does the shutdown on all nodes of the instance.
1751

1752
  If the ignore_primary is false, errors on the primary node are
1753
  ignored.
1754

1755
  """
1756
  result = True
1757
  for disk in instance.disks:
1758
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
1759
      cfg.SetDiskID(top_disk, node)
1760
      if not rpc.call_blockdev_shutdown(node, top_disk):
1761
        logger.Error("could not shutdown block device %s on node %s" %
1762
                     (disk.iv_name, node))
1763
        if not ignore_primary or node != instance.primary_node:
1764
          result = False
1765
  return result
1766

    
1767

    
1768
class LUStartupInstance(LogicalUnit):
1769
  """Starts an instance.
1770

1771
  """
1772
  HPATH = "instance-start"
1773
  HTYPE = constants.HTYPE_INSTANCE
1774
  _OP_REQP = ["instance_name", "force"]
1775

    
1776
  def BuildHooksEnv(self):
1777
    """Build hooks env.
1778

1779
    This runs on master, primary and secondary nodes of the instance.
1780

1781
    """
1782
    env = {
1783
      "INSTANCE_NAME": self.op.instance_name,
1784
      "INSTANCE_PRIMARY": self.instance.primary_node,
1785
      "INSTANCE_SECONDARIES": " ".join(self.instance.secondary_nodes),
1786
      "FORCE": self.op.force,
1787
      }
1788
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
1789
          list(self.instance.secondary_nodes))
1790
    return env, nl, nl
1791

    
1792
  def CheckPrereq(self):
1793
    """Check prerequisites.
1794

1795
    This checks that the instance is in the cluster.
1796

1797
    """
1798
    instance = self.cfg.GetInstanceInfo(
1799
      self.cfg.ExpandInstanceName(self.op.instance_name))
1800
    if instance is None:
1801
      raise errors.OpPrereqError, ("Instance '%s' not known" %
1802
                                   self.op.instance_name)
1803

    
1804
    # check bridges existance
1805
    brlist = [nic.bridge for nic in instance.nics]
1806
    if not rpc.call_bridges_exist(instance.primary_node, brlist):
1807
      raise errors.OpPrereqError, ("one or more target bridges %s does not"
1808
                                   " exist on destination node '%s'" %
1809
                                   (brlist, instance.primary_node))
1810

    
1811
    self.instance = instance
1812
    self.op.instance_name = instance.name
1813

    
1814
  def Exec(self, feedback_fn):
1815
    """Start the instance.
1816

1817
    """
1818
    instance = self.instance
1819
    force = self.op.force
1820
    extra_args = getattr(self.op, "extra_args", "")
1821

    
1822
    node_current = instance.primary_node
1823

    
1824
    nodeinfo = rpc.call_node_info([node_current], self.cfg.GetVGName())
1825
    if not nodeinfo:
1826
      raise errors.OpExecError, ("Could not contact node %s for infos" %
1827
                                 (node_current))
1828

    
1829
    freememory = nodeinfo[node_current]['memory_free']
1830
    memory = instance.memory
1831
    if memory > freememory:
1832
      raise errors.OpExecError, ("Not enough memory to start instance"
1833
                                 " %s on node %s"
1834
                                 " needed %s MiB, available %s MiB" %
1835
                                 (instance.name, node_current, memory,
1836
                                  freememory))
1837

    
1838
    disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
1839
                                             ignore_secondaries=force)
1840
    if not disks_ok:
1841
      _ShutdownInstanceDisks(instance, self.cfg)
1842
      if not force:
1843
        logger.Error("If the message above refers to a secondary node,"
1844
                     " you can retry the operation using '--force'.")
1845
      raise errors.OpExecError, ("Disk consistency error")
1846

    
1847
    if not rpc.call_instance_start(node_current, instance, extra_args):
1848
      _ShutdownInstanceDisks(instance, self.cfg)
1849
      raise errors.OpExecError, ("Could not start instance")
1850

    
1851
    self.cfg.MarkInstanceUp(instance.name)
1852

    
1853

    
1854
class LUShutdownInstance(LogicalUnit):
1855
  """Shutdown an instance.
1856

1857
  """
1858
  HPATH = "instance-stop"
1859
  HTYPE = constants.HTYPE_INSTANCE
1860
  _OP_REQP = ["instance_name"]
1861

    
1862
  def BuildHooksEnv(self):
1863
    """Build hooks env.
1864

1865
    This runs on master, primary and secondary nodes of the instance.
1866

1867
    """
1868
    env = {
1869
      "INSTANCE_NAME": self.op.instance_name,
1870
      "INSTANCE_PRIMARY": self.instance.primary_node,
1871
      "INSTANCE_SECONDARIES": " ".join(self.instance.secondary_nodes),
1872
      }
1873
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
1874
          list(self.instance.secondary_nodes))
1875
    return env, nl, nl
1876

    
1877
  def CheckPrereq(self):
1878
    """Check prerequisites.
1879

1880
    This checks that the instance is in the cluster.
1881

1882
    """
1883
    instance = self.cfg.GetInstanceInfo(
1884
      self.cfg.ExpandInstanceName(self.op.instance_name))
1885
    if instance is None:
1886
      raise errors.OpPrereqError, ("Instance '%s' not known" %
1887
                                   self.op.instance_name)
1888
    self.instance = instance
1889

    
1890
  def Exec(self, feedback_fn):
1891
    """Shutdown the instance.
1892

1893
    """
1894
    instance = self.instance
1895
    node_current = instance.primary_node
1896
    if not rpc.call_instance_shutdown(node_current, instance):
1897
      logger.Error("could not shutdown instance")
1898

    
1899
    self.cfg.MarkInstanceDown(instance.name)
1900
    _ShutdownInstanceDisks(instance, self.cfg)
1901

    
1902

    
1903
class LURemoveInstance(LogicalUnit):
1904
  """Remove an instance.
1905

1906
  """
1907
  HPATH = "instance-remove"
1908
  HTYPE = constants.HTYPE_INSTANCE
1909
  _OP_REQP = ["instance_name"]
1910

    
1911
  def BuildHooksEnv(self):
1912
    """Build hooks env.
1913

1914
    This runs on master, primary and secondary nodes of the instance.
1915

1916
    """
1917
    env = {
1918
      "INSTANCE_NAME": self.op.instance_name,
1919
      "INSTANCE_PRIMARY": self.instance.primary_node,
1920
      "INSTANCE_SECONDARIES": " ".join(self.instance.secondary_nodes),
1921
      }
1922
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
1923
          list(self.instance.secondary_nodes))
1924
    return env, nl, nl
1925

    
1926
  def CheckPrereq(self):
1927
    """Check prerequisites.
1928

1929
    This checks that the instance is in the cluster.
1930

1931
    """
1932
    instance = self.cfg.GetInstanceInfo(
1933
      self.cfg.ExpandInstanceName(self.op.instance_name))
1934
    if instance is None:
1935
      raise errors.OpPrereqError, ("Instance '%s' not known" %
1936
                                   self.op.instance_name)
1937
    self.instance = instance
1938

    
1939
  def Exec(self, feedback_fn):
1940
    """Remove the instance.
1941

1942
    """
1943
    instance = self.instance
1944
    logger.Info("shutting down instance %s on node %s" %
1945
                (instance.name, instance.primary_node))
1946

    
1947
    if not rpc.call_instance_shutdown(instance.primary_node, instance):
1948
      raise errors.OpExecError, ("Could not shutdown instance %s on node %s" %
1949
                                 (instance.name, instance.primary_node))
1950

    
1951
    logger.Info("removing block devices for instance %s" % instance.name)
1952

    
1953
    _RemoveDisks(instance, self.cfg)
1954

    
1955
    logger.Info("removing instance %s out of cluster config" % instance.name)
1956

    
1957
    self.cfg.RemoveInstance(instance.name)
1958

    
1959

    
1960
class LUQueryInstances(NoHooksLU):
1961
  """Logical unit for querying instances.
1962

1963
  """
1964
  _OP_REQP = ["output_fields"]
1965

    
1966
  def CheckPrereq(self):
1967
    """Check prerequisites.
1968

1969
    This checks that the fields required are valid output fields.
1970

1971
    """
1972
    self.dynamic_fields = frozenset(["oper_state", "oper_ram"])
1973
    _CheckOutputFields(static=["name", "os", "pnode", "snodes",
1974
                               "admin_state", "admin_ram",
1975
                               "disk_template", "ip", "mac", "bridge"],
1976
                       dynamic=self.dynamic_fields,
1977
                       selected=self.op.output_fields)
1978

    
1979
  def Exec(self, feedback_fn):
1980
    """Computes the list of nodes and their attributes.
1981

1982
    """
1983

    
1984
    instance_names = utils.NiceSort(self.cfg.GetInstanceList())
1985
    instance_list = [self.cfg.GetInstanceInfo(iname) for iname
1986
                     in instance_names]
1987

    
1988
    # begin data gathering
1989

    
1990
    nodes = frozenset([inst.primary_node for inst in instance_list])
1991

    
1992
    bad_nodes = []
1993
    if self.dynamic_fields.intersection(self.op.output_fields):
1994
      live_data = {}
1995
      node_data = rpc.call_all_instances_info(nodes)
1996
      for name in nodes:
1997
        result = node_data[name]
1998
        if result:
1999
          live_data.update(result)
2000
        elif result == False:
2001
          bad_nodes.append(name)
2002
        # else no instance is alive
2003
    else:
2004
      live_data = dict([(name, {}) for name in instance_names])
2005

    
2006
    # end data gathering
2007

    
2008
    output = []
2009
    for instance in instance_list:
2010
      iout = []
2011
      for field in self.op.output_fields:
2012
        if field == "name":
2013
          val = instance.name
2014
        elif field == "os":
2015
          val = instance.os
2016
        elif field == "pnode":
2017
          val = instance.primary_node
2018
        elif field == "snodes":
2019
          val = ",".join(instance.secondary_nodes) or "-"
2020
        elif field == "admin_state":
2021
          if instance.status == "down":
2022
            val = "no"
2023
          else:
2024
            val = "yes"
2025
        elif field == "oper_state":
2026
          if instance.primary_node in bad_nodes:
2027
            val = "(node down)"
2028
          else:
2029
            if live_data.get(instance.name):
2030
              val = "running"
2031
            else:
2032
              val = "stopped"
2033
        elif field == "admin_ram":
2034
          val = instance.memory
2035
        elif field == "oper_ram":
2036
          if instance.primary_node in bad_nodes:
2037
            val = "(node down)"
2038
          elif instance.name in live_data:
2039
            val = live_data[instance.name].get("memory", "?")
2040
          else:
2041
            val = "-"
2042
        elif field == "disk_template":
2043
          val = instance.disk_template
2044
        elif field == "ip":
2045
          val = instance.nics[0].ip
2046
        elif field == "bridge":
2047
          val = instance.nics[0].bridge
2048
        elif field == "mac":
2049
          val = instance.nics[0].mac
2050
        else:
2051
          raise errors.ParameterError, field
2052
        val = str(val)
2053
        iout.append(val)
2054
      output.append(iout)
2055

    
2056
    return output
2057

    
2058

    
2059
class LUFailoverInstance(LogicalUnit):
2060
  """Failover an instance.
2061

2062
  """
2063
  HPATH = "instance-failover"
2064
  HTYPE = constants.HTYPE_INSTANCE
2065
  _OP_REQP = ["instance_name", "ignore_consistency"]
2066

    
2067
  def BuildHooksEnv(self):
2068
    """Build hooks env.
2069

2070
    This runs on master, primary and secondary nodes of the instance.
2071

2072
    """
2073
    env = {
2074
      "INSTANCE_NAME": self.op.instance_name,
2075
      "INSTANCE_PRIMARY": self.instance.primary_node,
2076
      "INSTANCE_SECONDARIES": " ".join(self.instance.secondary_nodes),
2077
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2078
      }
2079
    nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2080
    return env, nl, nl
2081

    
2082
  def CheckPrereq(self):
2083
    """Check prerequisites.
2084

2085
    This checks that the instance is in the cluster.
2086

2087
    """
2088
    instance = self.cfg.GetInstanceInfo(
2089
      self.cfg.ExpandInstanceName(self.op.instance_name))
2090
    if instance is None:
2091
      raise errors.OpPrereqError, ("Instance '%s' not known" %
2092
                                   self.op.instance_name)
2093

    
2094
    # check memory requirements on the secondary node
2095
    target_node = instance.secondary_nodes[0]
2096
    nodeinfo = rpc.call_node_info([target_node], self.cfg.GetVGName())
2097
    info = nodeinfo.get(target_node, None)
2098
    if not info:
2099
      raise errors.OpPrereqError, ("Cannot get current information"
2100
                                   " from node '%s'" % nodeinfo)
2101
    if instance.memory > info['memory_free']:
2102
      raise errors.OpPrereqError, ("Not enough memory on target node %s."
2103
                                   " %d MB available, %d MB required" %
2104
                                   (target_node, info['memory_free'],
2105
                                    instance.memory))
2106

    
2107
    # check bridge existance
2108
    brlist = [nic.bridge for nic in instance.nics]
2109
    if not rpc.call_bridges_exist(instance.primary_node, brlist):
2110
      raise errors.OpPrereqError, ("one or more target bridges %s does not"
2111
                                   " exist on destination node '%s'" %
2112
                                   (brlist, instance.primary_node))
2113

    
2114
    self.instance = instance
2115

    
2116
  def Exec(self, feedback_fn):
2117
    """Failover an instance.
2118

2119
    The failover is done by shutting it down on its present node and
2120
    starting it on the secondary.
2121

2122
    """
2123
    instance = self.instance
2124

    
2125
    source_node = instance.primary_node
2126
    target_node = instance.secondary_nodes[0]
2127

    
2128
    feedback_fn("* checking disk consistency between source and target")
2129
    for dev in instance.disks:
2130
      # for remote_raid1, these are md over drbd
2131
      if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2132
        if not self.op.ignore_consistency:
2133
          raise errors.OpExecError, ("Disk %s is degraded on target node,"
2134
                                     " aborting failover." % dev.iv_name)
2135

    
2136
    feedback_fn("* checking target node resource availability")
2137
    nodeinfo = rpc.call_node_info([target_node], self.cfg.GetVGName())
2138

    
2139
    if not nodeinfo:
2140
      raise errors.OpExecError, ("Could not contact target node %s." %
2141
                                 target_node)
2142

    
2143
    free_memory = int(nodeinfo[target_node]['memory_free'])
2144
    memory = instance.memory
2145
    if memory > free_memory:
2146
      raise errors.OpExecError, ("Not enough memory to create instance %s on"
2147
                                 " node %s. needed %s MiB, available %s MiB" %
2148
                                 (instance.name, target_node, memory,
2149
                                  free_memory))
2150

    
2151
    feedback_fn("* shutting down instance on source node")
2152
    logger.Info("Shutting down instance %s on node %s" %
2153
                (instance.name, source_node))
2154

    
2155
    if not rpc.call_instance_shutdown(source_node, instance):
2156
      logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2157
                   " anyway. Please make sure node %s is down"  %
2158
                   (instance.name, source_node, source_node))
2159

    
2160
    feedback_fn("* deactivating the instance's disks on source node")
2161
    if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2162
      raise errors.OpExecError, ("Can't shut down the instance's disks.")
2163

    
2164
    instance.primary_node = target_node
2165
    # distribute new instance config to the other nodes
2166
    self.cfg.AddInstance(instance)
2167

    
2168
    feedback_fn("* activating the instance's disks on target node")
2169
    logger.Info("Starting instance %s on node %s" %
2170
                (instance.name, target_node))
2171

    
2172
    disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2173
                                             ignore_secondaries=True)
2174
    if not disks_ok:
2175
      _ShutdownInstanceDisks(instance, self.cfg)
2176
      raise errors.OpExecError, ("Can't activate the instance's disks")
2177

    
2178
    feedback_fn("* starting the instance on the target node")
2179
    if not rpc.call_instance_start(target_node, instance, None):
2180
      _ShutdownInstanceDisks(instance, self.cfg)
2181
      raise errors.OpExecError("Could not start instance %s on node %s." %
2182
                               (instance.name, target_node))
2183

    
2184

    
2185
def _CreateBlockDevOnPrimary(cfg, node, device):
2186
  """Create a tree of block devices on the primary node.
2187

2188
  This always creates all devices.
2189

2190
  """
2191

    
2192
  if device.children:
2193
    for child in device.children:
2194
      if not _CreateBlockDevOnPrimary(cfg, node, child):
2195
        return False
2196

    
2197
  cfg.SetDiskID(device, node)
2198
  new_id = rpc.call_blockdev_create(node, device, device.size, True)
2199
  if not new_id:
2200
    return False
2201
  if device.physical_id is None:
2202
    device.physical_id = new_id
2203
  return True
2204

    
2205

    
2206
def _CreateBlockDevOnSecondary(cfg, node, device, force):
2207
  """Create a tree of block devices on a secondary node.
2208

2209
  If this device type has to be created on secondaries, create it and
2210
  all its children.
2211

2212
  If not, just recurse to children keeping the same 'force' value.
2213

2214
  """
2215
  if device.CreateOnSecondary():
2216
    force = True
2217
  if device.children:
2218
    for child in device.children:
2219
      if not _CreateBlockDevOnSecondary(cfg, node, child, force):
2220
        return False
2221

    
2222
  if not force:
2223
    return True
2224
  cfg.SetDiskID(device, node)
2225
  new_id = rpc.call_blockdev_create(node, device, device.size, False)
2226
  if not new_id:
2227
    return False
2228
  if device.physical_id is None:
2229
    device.physical_id = new_id
2230
  return True
2231

    
2232

    
2233
def _GenerateMDDRBDBranch(cfg, vgname, primary, secondary, size, base):
2234
  """Generate a drbd device complete with its children.
2235

2236
  """
2237
  port = cfg.AllocatePort()
2238
  base = "%s_%s" % (base, port)
2239
  dev_data = objects.Disk(dev_type="lvm", size=size,
2240
                          logical_id=(vgname, "%s.data" % base))
2241
  dev_meta = objects.Disk(dev_type="lvm", size=128,
2242
                          logical_id=(vgname, "%s.meta" % base))
2243
  drbd_dev = objects.Disk(dev_type="drbd", size=size,
2244
                          logical_id = (primary, secondary, port),
2245
                          children = [dev_data, dev_meta])
2246
  return drbd_dev
2247

    
2248

    
2249
def _GenerateDiskTemplate(cfg, vgname, template_name,
2250
                          instance_name, primary_node,
2251
                          secondary_nodes, disk_sz, swap_sz):
2252
  """Generate the entire disk layout for a given template type.
2253

2254
  """
2255
  #TODO: compute space requirements
2256

    
2257
  if template_name == "diskless":
2258
    disks = []
2259
  elif template_name == "plain":
2260
    if len(secondary_nodes) != 0:
2261
      raise errors.ProgrammerError("Wrong template configuration")
2262
    sda_dev = objects.Disk(dev_type="lvm", size=disk_sz,
2263
                           logical_id=(vgname, "%s.os" % instance_name),
2264
                           iv_name = "sda")
2265
    sdb_dev = objects.Disk(dev_type="lvm", size=swap_sz,
2266
                           logical_id=(vgname, "%s.swap" % instance_name),
2267
                           iv_name = "sdb")
2268
    disks = [sda_dev, sdb_dev]
2269
  elif template_name == "local_raid1":
2270
    if len(secondary_nodes) != 0:
2271
      raise errors.ProgrammerError("Wrong template configuration")
2272
    sda_dev_m1 = objects.Disk(dev_type="lvm", size=disk_sz,
2273
                              logical_id=(vgname, "%s.os_m1" % instance_name))
2274
    sda_dev_m2 = objects.Disk(dev_type="lvm", size=disk_sz,
2275
                              logical_id=(vgname, "%s.os_m2" % instance_name))
2276
    md_sda_dev = objects.Disk(dev_type="md_raid1", iv_name = "sda",
2277
                              size=disk_sz,
2278
                              children = [sda_dev_m1, sda_dev_m2])
2279
    sdb_dev_m1 = objects.Disk(dev_type="lvm", size=swap_sz,
2280
                              logical_id=(vgname, "%s.swap_m1" %
2281
                                          instance_name))
2282
    sdb_dev_m2 = objects.Disk(dev_type="lvm", size=swap_sz,
2283
                              logical_id=(vgname, "%s.swap_m2" %
2284
                                          instance_name))
2285
    md_sdb_dev = objects.Disk(dev_type="md_raid1", iv_name = "sdb",
2286
                              size=swap_sz,
2287
                              children = [sdb_dev_m1, sdb_dev_m2])
2288
    disks = [md_sda_dev, md_sdb_dev]
2289
  elif template_name == "remote_raid1":
2290
    if len(secondary_nodes) != 1:
2291
      raise errors.ProgrammerError("Wrong template configuration")
2292
    remote_node = secondary_nodes[0]
2293
    drbd_sda_dev = _GenerateMDDRBDBranch(cfg, vgname,
2294
                                         primary_node, remote_node, disk_sz,
2295
                                         "%s-sda" % instance_name)
2296
    md_sda_dev = objects.Disk(dev_type="md_raid1", iv_name="sda",
2297
                              children = [drbd_sda_dev], size=disk_sz)
2298
    drbd_sdb_dev = _GenerateMDDRBDBranch(cfg, vgname,
2299
                                         primary_node, remote_node, swap_sz,
2300
                                         "%s-sdb" % instance_name)
2301
    md_sdb_dev = objects.Disk(dev_type="md_raid1", iv_name="sdb",
2302
                              children = [drbd_sdb_dev], size=swap_sz)
2303
    disks = [md_sda_dev, md_sdb_dev]
2304
  else:
2305
    raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
2306
  return disks
2307

    
2308

    
2309
def _CreateDisks(cfg, instance):
2310
  """Create all disks for an instance.
2311

2312
  This abstracts away some work from AddInstance.
2313

2314
  Args:
2315
    instance: the instance object
2316

2317
  Returns:
2318
    True or False showing the success of the creation process
2319

2320
  """
2321
  for device in instance.disks:
2322
    logger.Info("creating volume %s for instance %s" %
2323
              (device.iv_name, instance.name))
2324
    #HARDCODE
2325
    for secondary_node in instance.secondary_nodes:
2326
      if not _CreateBlockDevOnSecondary(cfg, secondary_node, device, False):
2327
        logger.Error("failed to create volume %s (%s) on secondary node %s!" %
2328
                     (device.iv_name, device, secondary_node))
2329
        return False
2330
    #HARDCODE
2331
    if not _CreateBlockDevOnPrimary(cfg, instance.primary_node, device):
2332
      logger.Error("failed to create volume %s on primary!" %
2333
                   device.iv_name)
2334
      return False
2335
  return True
2336

    
2337

    
2338
def _RemoveDisks(instance, cfg):
2339
  """Remove all disks for an instance.
2340

2341
  This abstracts away some work from `AddInstance()` and
2342
  `RemoveInstance()`. Note that in case some of the devices couldn't
2343
  be remove, the removal will continue with the other ones (compare
2344
  with `_CreateDisks()`).
2345

2346
  Args:
2347
    instance: the instance object
2348

2349
  Returns:
2350
    True or False showing the success of the removal proces
2351

2352
  """
2353
  logger.Info("removing block devices for instance %s" % instance.name)
2354

    
2355
  result = True
2356
  for device in instance.disks:
2357
    for node, disk in device.ComputeNodeTree(instance.primary_node):
2358
      cfg.SetDiskID(disk, node)
2359
      if not rpc.call_blockdev_remove(node, disk):
2360
        logger.Error("could not remove block device %s on node %s,"
2361
                     " continuing anyway" %
2362
                     (device.iv_name, node))
2363
        result = False
2364
  return result
2365

    
2366

    
2367
class LUCreateInstance(LogicalUnit):
2368
  """Create an instance.
2369

2370
  """
2371
  HPATH = "instance-add"
2372
  HTYPE = constants.HTYPE_INSTANCE
2373
  _OP_REQP = ["instance_name", "mem_size", "disk_size", "pnode",
2374
              "disk_template", "swap_size", "mode", "start", "vcpus",
2375
              "wait_for_sync"]
2376

    
2377
  def BuildHooksEnv(self):
2378
    """Build hooks env.
2379

2380
    This runs on master, primary and secondary nodes of the instance.
2381

2382
    """
2383
    env = {
2384
      "INSTANCE_NAME": self.op.instance_name,
2385
      "INSTANCE_PRIMARY": self.op.pnode,
2386
      "INSTANCE_SECONDARIES": " ".join(self.secondaries),
2387
      "DISK_TEMPLATE": self.op.disk_template,
2388
      "MEM_SIZE": self.op.mem_size,
2389
      "DISK_SIZE": self.op.disk_size,
2390
      "SWAP_SIZE": self.op.swap_size,
2391
      "VCPUS": self.op.vcpus,
2392
      "BRIDGE": self.op.bridge,
2393
      "INSTANCE_ADD_MODE": self.op.mode,
2394
      }
2395
    if self.op.mode == constants.INSTANCE_IMPORT:
2396
      env["SRC_NODE"] = self.op.src_node
2397
      env["SRC_PATH"] = self.op.src_path
2398
      env["SRC_IMAGE"] = self.src_image
2399
    if self.inst_ip:
2400
      env["INSTANCE_IP"] = self.inst_ip
2401

    
2402
    nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
2403
          self.secondaries)
2404
    return env, nl, nl
2405

    
2406

    
2407
  def CheckPrereq(self):
2408
    """Check prerequisites.
2409

2410
    """
2411
    if self.op.mode not in (constants.INSTANCE_CREATE,
2412
                            constants.INSTANCE_IMPORT):
2413
      raise errors.OpPrereqError, ("Invalid instance creation mode '%s'" %
2414
                                   self.op.mode)
2415

    
2416
    if self.op.mode == constants.INSTANCE_IMPORT:
2417
      src_node = getattr(self.op, "src_node", None)
2418
      src_path = getattr(self.op, "src_path", None)
2419
      if src_node is None or src_path is None:
2420
        raise errors.OpPrereqError, ("Importing an instance requires source"
2421
                                     " node and path options")
2422
      src_node_full = self.cfg.ExpandNodeName(src_node)
2423
      if src_node_full is None:
2424
        raise errors.OpPrereqError, ("Unknown source node '%s'" % src_node)
2425
      self.op.src_node = src_node = src_node_full
2426

    
2427
      if not os.path.isabs(src_path):
2428
        raise errors.OpPrereqError, ("The source path must be absolute")
2429

    
2430
      export_info = rpc.call_export_info(src_node, src_path)
2431

    
2432
      if not export_info:
2433
        raise errors.OpPrereqError, ("No export found in dir %s" % src_path)
2434

    
2435
      if not export_info.has_section(constants.INISECT_EXP):
2436
        raise errors.ProgrammerError, ("Corrupted export config")
2437

    
2438
      ei_version = export_info.get(constants.INISECT_EXP, 'version')
2439
      if (int(ei_version) != constants.EXPORT_VERSION):
2440
        raise errors.OpPrereqError, ("Wrong export version %s (wanted %d)" %
2441
                                     (ei_version, constants.EXPORT_VERSION))
2442

    
2443
      if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
2444
        raise errors.OpPrereqError, ("Can't import instance with more than"
2445
                                     " one data disk")
2446

    
2447
      # FIXME: are the old os-es, disk sizes, etc. useful?
2448
      self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
2449
      diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
2450
                                                         'disk0_dump'))
2451
      self.src_image = diskimage
2452
    else: # INSTANCE_CREATE
2453
      if getattr(self.op, "os_type", None) is None:
2454
        raise errors.OpPrereqError, ("No guest OS specified")
2455

    
2456
    # check primary node
2457
    pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
2458
    if pnode is None:
2459
      raise errors.OpPrereqError, ("Primary node '%s' is uknown" %
2460
                                   self.op.pnode)
2461
    self.op.pnode = pnode.name
2462
    self.pnode = pnode
2463
    self.secondaries = []
2464
    # disk template and mirror node verification
2465
    if self.op.disk_template not in constants.DISK_TEMPLATES:
2466
      raise errors.OpPrereqError, ("Invalid disk template name")
2467

    
2468
    if self.op.disk_template == constants.DT_REMOTE_RAID1:
2469
      if getattr(self.op, "snode", None) is None:
2470
        raise errors.OpPrereqError, ("The 'remote_raid1' disk template needs"
2471
                                     " a mirror node")
2472

    
2473
      snode_name = self.cfg.ExpandNodeName(self.op.snode)
2474
      if snode_name is None:
2475
        raise errors.OpPrereqError, ("Unknown secondary node '%s'" %
2476
                                     self.op.snode)
2477
      elif snode_name == pnode.name:
2478
        raise errors.OpPrereqError, ("The secondary node cannot be"
2479
                                     " the primary node.")
2480
      self.secondaries.append(snode_name)
2481

    
2482
    # Check lv size requirements
2483
    nodenames = [pnode.name] + self.secondaries
2484
    nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
2485

    
2486
    # Required free disk space as a function of disk and swap space
2487
    req_size_dict = {
2488
      constants.DT_DISKLESS: 0,
2489
      constants.DT_PLAIN: self.op.disk_size + self.op.swap_size,
2490
      constants.DT_LOCAL_RAID1: (self.op.disk_size + self.op.swap_size) * 2,
2491
      # 256 MB are added for drbd metadata, 128MB for each drbd device
2492
      constants.DT_REMOTE_RAID1: self.op.disk_size + self.op.swap_size + 256,
2493
    }
2494

    
2495
    if self.op.disk_template not in req_size_dict:
2496
      raise errors.ProgrammerError, ("Disk template '%s' size requirement"
2497
                                     " is unknown" %  self.op.disk_template)
2498

    
2499
    req_size = req_size_dict[self.op.disk_template]
2500

    
2501
    for node in nodenames:
2502
      info = nodeinfo.get(node, None)
2503
      if not info:
2504
        raise errors.OpPrereqError, ("Cannot get current information"
2505
                                     " from node '%s'" % nodeinfo)
2506
      if req_size > info['vg_free']:
2507
        raise errors.OpPrereqError, ("Not enough disk space on target node %s."
2508
                                     " %d MB available, %d MB required" %
2509
                                     (node, info['vg_free'], req_size))
2510

    
2511
    # os verification
2512
    os_obj = rpc.call_os_get([pnode.name], self.op.os_type)[pnode.name]
2513
    if not isinstance(os_obj, objects.OS):
2514
      raise errors.OpPrereqError, ("OS '%s' not in supported os list for"
2515
                                   " primary node"  % self.op.os_type)
2516

    
2517
    # instance verification
2518
    hostname1 = utils.LookupHostname(self.op.instance_name)
2519
    if not hostname1:
2520
      raise errors.OpPrereqError, ("Instance name '%s' not found in dns" %
2521
                                   self.op.instance_name)
2522

    
2523
    self.op.instance_name = instance_name = hostname1['hostname']
2524
    instance_list = self.cfg.GetInstanceList()
2525
    if instance_name in instance_list:
2526
      raise errors.OpPrereqError, ("Instance '%s' is already in the cluster" %
2527
                                   instance_name)
2528

    
2529
    ip = getattr(self.op, "ip", None)
2530
    if ip is None or ip.lower() == "none":
2531
      inst_ip = None
2532
    elif ip.lower() == "auto":
2533
      inst_ip = hostname1['ip']
2534
    else:
2535
      if not utils.IsValidIP(ip):
2536
        raise errors.OpPrereqError, ("given IP address '%s' doesn't look"
2537
                                     " like a valid IP" % ip)
2538
      inst_ip = ip
2539
    self.inst_ip = inst_ip
2540

    
2541
    command = ["fping", "-q", hostname1['ip']]
2542
    result = utils.RunCmd(command)
2543
    if not result.failed:
2544
      raise errors.OpPrereqError, ("IP %s of instance %s already in use" %
2545
                                   (hostname1['ip'], instance_name))
2546

    
2547
    # bridge verification
2548
    bridge = getattr(self.op, "bridge", None)
2549
    if bridge is None:
2550
      self.op.bridge = self.cfg.GetDefBridge()
2551
    else:
2552
      self.op.bridge = bridge
2553

    
2554
    if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
2555
      raise errors.OpPrereqError, ("target bridge '%s' does not exist on"
2556
                                   " destination node '%s'" %
2557
                                   (self.op.bridge, pnode.name))
2558

    
2559
    if self.op.start:
2560
      self.instance_status = 'up'
2561
    else:
2562
      self.instance_status = 'down'
2563

    
2564
  def Exec(self, feedback_fn):
2565
    """Create and add the instance to the cluster.
2566

2567
    """
2568
    instance = self.op.instance_name
2569
    pnode_name = self.pnode.name
2570

    
2571
    nic = objects.NIC(bridge=self.op.bridge, mac=self.cfg.GenerateMAC())
2572
    if self.inst_ip is not None:
2573
      nic.ip = self.inst_ip
2574

    
2575
    disks = _GenerateDiskTemplate(self.cfg, self.cfg.GetVGName(),
2576
                                  self.op.disk_template,
2577
                                  instance, pnode_name,
2578
                                  self.secondaries, self.op.disk_size,
2579
                                  self.op.swap_size)
2580

    
2581
    iobj = objects.Instance(name=instance, os=self.op.os_type,
2582
                            primary_node=pnode_name,
2583
                            memory=self.op.mem_size,
2584
                            vcpus=self.op.vcpus,
2585
                            nics=[nic], disks=disks,
2586
                            disk_template=self.op.disk_template,
2587
                            status=self.instance_status,
2588
                            )
2589

    
2590
    feedback_fn("* creating instance disks...")
2591
    if not _CreateDisks(self.cfg, iobj):
2592
      _RemoveDisks(iobj, self.cfg)
2593
      raise errors.OpExecError, ("Device creation failed, reverting...")
2594

    
2595
    feedback_fn("adding instance %s to cluster config" % instance)
2596

    
2597
    self.cfg.AddInstance(iobj)
2598

    
2599
    if self.op.wait_for_sync:
2600
      disk_abort = not _WaitForSync(self.cfg, iobj)
2601
    elif iobj.disk_template == "remote_raid1":
2602
      # make sure the disks are not degraded (still sync-ing is ok)
2603
      time.sleep(15)
2604
      feedback_fn("* checking mirrors status")
2605
      disk_abort = not _WaitForSync(self.cfg, iobj, oneshot=True)
2606
    else:
2607
      disk_abort = False
2608

    
2609
    if disk_abort:
2610
      _RemoveDisks(iobj, self.cfg)
2611
      self.cfg.RemoveInstance(iobj.name)
2612
      raise errors.OpExecError, ("There are some degraded disks for"
2613
                                      " this instance")
2614

    
2615
    feedback_fn("creating os for instance %s on node %s" %
2616
                (instance, pnode_name))
2617

    
2618
    if iobj.disk_template != constants.DT_DISKLESS:
2619
      if self.op.mode == constants.INSTANCE_CREATE:
2620
        feedback_fn("* running the instance OS create scripts...")
2621
        if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
2622
          raise errors.OpExecError, ("could not add os for instance %s"
2623
                                          " on node %s" %
2624
                                          (instance, pnode_name))
2625

    
2626
      elif self.op.mode == constants.INSTANCE_IMPORT:
2627
        feedback_fn("* running the instance OS import scripts...")
2628
        src_node = self.op.src_node
2629
        src_image = self.src_image
2630
        if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
2631
                                                src_node, src_image):
2632
          raise errors.OpExecError, ("Could not import os for instance"
2633
                                          " %s on node %s" %
2634
                                          (instance, pnode_name))
2635
      else:
2636
        # also checked in the prereq part
2637
        raise errors.ProgrammerError, ("Unknown OS initialization mode '%s'"
2638
                                       % self.op.mode)
2639

    
2640
    if self.op.start:
2641
      logger.Info("starting instance %s on node %s" % (instance, pnode_name))
2642
      feedback_fn("* starting instance...")
2643
      if not rpc.call_instance_start(pnode_name, iobj, None):
2644
        raise errors.OpExecError, ("Could not start instance")
2645

    
2646

    
2647
class LUConnectConsole(NoHooksLU):
2648
  """Connect to an instance's console.
2649

2650
  This is somewhat special in that it returns the command line that
2651
  you need to run on the master node in order to connect to the
2652
  console.
2653

2654
  """
2655
  _OP_REQP = ["instance_name"]
2656

    
2657
  def CheckPrereq(self):
2658
    """Check prerequisites.
2659

2660
    This checks that the instance is in the cluster.
2661

2662
    """
2663
    instance = self.cfg.GetInstanceInfo(
2664
      self.cfg.ExpandInstanceName(self.op.instance_name))
2665
    if instance is None:
2666
      raise errors.OpPrereqError, ("Instance '%s' not known" %
2667
                                   self.op.instance_name)
2668
    self.instance = instance
2669

    
2670
  def Exec(self, feedback_fn):
2671
    """Connect to the console of an instance
2672

2673
    """
2674
    instance = self.instance
2675
    node = instance.primary_node
2676

    
2677
    node_insts = rpc.call_instance_list([node])[node]
2678
    if node_insts is False:
2679
      raise errors.OpExecError, ("Can't connect to node %s." % node)
2680

    
2681
    if instance.name not in node_insts:
2682
      raise errors.OpExecError, ("Instance %s is not running." % instance.name)
2683

    
2684
    logger.Debug("connecting to console of %s on %s" % (instance.name, node))
2685

    
2686
    hyper = hypervisor.GetHypervisor()
2687
    console_cmd = hyper.GetShellCommandForConsole(instance.name)
2688
    return node, console_cmd
2689

    
2690

    
2691
class LUAddMDDRBDComponent(LogicalUnit):
2692
  """Adda new mirror member to an instance's disk.
2693

2694
  """
2695
  HPATH = "mirror-add"
2696
  HTYPE = constants.HTYPE_INSTANCE
2697
  _OP_REQP = ["instance_name", "remote_node", "disk_name"]
2698

    
2699
  def BuildHooksEnv(self):
2700
    """Build hooks env.
2701

2702
    This runs on the master, the primary and all the secondaries.
2703

2704
    """
2705
    env = {
2706
      "INSTANCE_NAME": self.op.instance_name,
2707
      "NEW_SECONDARY": self.op.remote_node,
2708
      "DISK_NAME": self.op.disk_name,
2709
      }
2710
    nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
2711
          self.op.remote_node,] + list(self.instance.secondary_nodes)
2712
    return env, nl, nl
2713

    
2714
  def CheckPrereq(self):
2715
    """Check prerequisites.
2716

2717
    This checks that the instance is in the cluster.
2718

2719
    """
2720
    instance = self.cfg.GetInstanceInfo(
2721
      self.cfg.ExpandInstanceName(self.op.instance_name))
2722
    if instance is None:
2723
      raise errors.OpPrereqError, ("Instance '%s' not known" %
2724
                                   self.op.instance_name)
2725
    self.instance = instance
2726

    
2727
    remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
2728
    if remote_node is None:
2729
      raise errors.OpPrereqError, ("Node '%s' not known" % self.op.remote_node)
2730
    self.remote_node = remote_node
2731

    
2732
    if remote_node == instance.primary_node:
2733
      raise errors.OpPrereqError, ("The specified node is the primary node of"
2734
                                   " the instance.")
2735

    
2736
    if instance.disk_template != constants.DT_REMOTE_RAID1:
2737
      raise errors.OpPrereqError, ("Instance's disk layout is not"
2738
                                   " remote_raid1.")
2739
    for disk in instance.disks:
2740
      if disk.iv_name == self.op.disk_name:
2741
        break
2742
    else:
2743
      raise errors.OpPrereqError, ("Can't find this device ('%s') in the"
2744
                                   " instance." % self.op.disk_name)
2745
    if len(disk.children) > 1:
2746
      raise errors.OpPrereqError, ("The device already has two slave"
2747
                                   " devices.\n"
2748
                                   "This would create a 3-disk raid1"
2749
                                   " which we don't allow.")
2750
    self.disk = disk
2751

    
2752
  def Exec(self, feedback_fn):
2753
    """Add the mirror component
2754

2755
    """
2756
    disk = self.disk
2757
    instance = self.instance
2758

    
2759
    remote_node = self.remote_node
2760
    new_drbd = _GenerateMDDRBDBranch(self.cfg, self.cfg.GetVGName(),
2761
                                     instance.primary_node, remote_node,
2762
                                     disk.size, "%s-%s" %
2763
                                     (instance.name, self.op.disk_name))
2764

    
2765
    logger.Info("adding new mirror component on secondary")
2766
    #HARDCODE
2767
    if not _CreateBlockDevOnSecondary(self.cfg, remote_node, new_drbd, False):
2768
      raise errors.OpExecError, ("Failed to create new component on secondary"
2769
                                 " node %s" % remote_node)
2770

    
2771
    logger.Info("adding new mirror component on primary")
2772
    #HARDCODE
2773
    if not _CreateBlockDevOnPrimary(self.cfg, instance.primary_node, new_drbd):
2774
      # remove secondary dev
2775
      self.cfg.SetDiskID(new_drbd, remote_node)
2776
      rpc.call_blockdev_remove(remote_node, new_drbd)
2777
      raise errors.OpExecError, ("Failed to create volume on primary")
2778

    
2779
    # the device exists now
2780
    # call the primary node to add the mirror to md
2781
    logger.Info("adding new mirror component to md")
2782
    if not rpc.call_blockdev_addchild(instance.primary_node,
2783
                                           disk, new_drbd):
2784
      logger.Error("Can't add mirror compoment to md!")
2785
      self.cfg.SetDiskID(new_drbd, remote_node)
2786
      if not rpc.call_blockdev_remove(remote_node, new_drbd):
2787
        logger.Error("Can't rollback on secondary")
2788
      self.cfg.SetDiskID(new_drbd, instance.primary_node)
2789
      if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
2790
        logger.Error("Can't rollback on primary")
2791
      raise errors.OpExecError, "Can't add mirror component to md array"
2792

    
2793
    disk.children.append(new_drbd)
2794

    
2795
    self.cfg.AddInstance(instance)
2796

    
2797
    _WaitForSync(self.cfg, instance)
2798

    
2799
    return 0
2800

    
2801

    
2802
class LURemoveMDDRBDComponent(LogicalUnit):
2803
  """Remove a component from a remote_raid1 disk.
2804

2805
  """
2806
  HPATH = "mirror-remove"
2807
  HTYPE = constants.HTYPE_INSTANCE
2808
  _OP_REQP = ["instance_name", "disk_name", "disk_id"]
2809

    
2810
  def BuildHooksEnv(self):
2811
    """Build hooks env.
2812

2813
    This runs on the master, the primary and all the secondaries.
2814

2815
    """
2816
    env = {
2817
      "INSTANCE_NAME": self.op.instance_name,
2818
      "DISK_NAME": self.op.disk_name,
2819
      "DISK_ID": self.op.disk_id,
2820
      "OLD_SECONDARY": self.old_secondary,
2821
      }
2822
    nl = [self.sstore.GetMasterNode(),
2823
          self.instance.primary_node] + list(self.instance.secondary_nodes)
2824
    return env, nl, nl
2825

    
2826
  def CheckPrereq(self):
2827
    """Check prerequisites.
2828

2829
    This checks that the instance is in the cluster.
2830

2831
    """
2832
    instance = self.cfg.GetInstanceInfo(
2833
      self.cfg.ExpandInstanceName(self.op.instance_name))
2834
    if instance is None:
2835
      raise errors.OpPrereqError, ("Instance '%s' not known" %
2836
                                   self.op.instance_name)
2837
    self.instance = instance
2838

    
2839
    if instance.disk_template != constants.DT_REMOTE_RAID1:
2840
      raise errors.OpPrereqError, ("Instance's disk layout is not"
2841
                                   " remote_raid1.")
2842
    for disk in instance.disks:
2843
      if disk.iv_name == self.op.disk_name:
2844
        break
2845
    else:
2846
      raise errors.OpPrereqError, ("Can't find this device ('%s') in the"
2847
                                   " instance." % self.op.disk_name)
2848
    for child in disk.children:
2849
      if child.dev_type == "drbd" and child.logical_id[2] == self.op.disk_id:
2850
        break
2851
    else:
2852
      raise errors.OpPrereqError, ("Can't find the device with this port.")
2853

    
2854
    if len(disk.children) < 2:
2855
      raise errors.OpPrereqError, ("Cannot remove the last component from"
2856
                                   " a mirror.")
2857
    self.disk = disk
2858
    self.child = child
2859
    if self.child.logical_id[0] == instance.primary_node:
2860
      oid = 1
2861
    else:
2862
      oid = 0
2863
    self.old_secondary = self.child.logical_id[oid]
2864

    
2865
  def Exec(self, feedback_fn):
2866
    """Remove the mirror component
2867

2868
    """
2869
    instance = self.instance
2870
    disk = self.disk
2871
    child = self.child
2872
    logger.Info("remove mirror component")
2873
    self.cfg.SetDiskID(disk, instance.primary_node)
2874
    if not rpc.call_blockdev_removechild(instance.primary_node,
2875
                                              disk, child):
2876
      raise errors.OpExecError, ("Can't remove child from mirror.")
2877

    
2878
    for node in child.logical_id[:2]:
2879
      self.cfg.SetDiskID(child, node)
2880
      if not rpc.call_blockdev_remove(node, child):
2881
        logger.Error("Warning: failed to remove device from node %s,"
2882
                     " continuing operation." % node)
2883

    
2884
    disk.children.remove(child)
2885
    self.cfg.AddInstance(instance)
2886

    
2887

    
2888
class LUReplaceDisks(LogicalUnit):
2889
  """Replace the disks of an instance.
2890

2891
  """
2892
  HPATH = "mirrors-replace"
2893
  HTYPE = constants.HTYPE_INSTANCE
2894
  _OP_REQP = ["instance_name"]
2895

    
2896
  def BuildHooksEnv(self):
2897
    """Build hooks env.
2898

2899
    This runs on the master, the primary and all the secondaries.
2900

2901
    """
2902
    env = {
2903
      "INSTANCE_NAME": self.op.instance_name,
2904
      "NEW_SECONDARY": self.op.remote_node,
2905
      "OLD_SECONDARY": self.instance.secondary_nodes[0],
2906
      }
2907
    nl = [self.sstore.GetMasterNode(),
2908
          self.instance.primary_node] + list(self.instance.secondary_nodes)
2909
    return env, nl, nl
2910

    
2911
  def CheckPrereq(self):
2912
    """Check prerequisites.
2913

2914
    This checks that the instance is in the cluster.
2915

2916
    """
2917
    instance = self.cfg.GetInstanceInfo(
2918
      self.cfg.ExpandInstanceName(self.op.instance_name))
2919
    if instance is None:
2920
      raise errors.OpPrereqError, ("Instance '%s' not known" %
2921
                                   self.op.instance_name)
2922
    self.instance = instance
2923

    
2924
    if instance.disk_template != constants.DT_REMOTE_RAID1:
2925
      raise errors.OpPrereqError, ("Instance's disk layout is not"
2926
                                   " remote_raid1.")
2927

    
2928
    if len(instance.secondary_nodes) != 1:
2929
      raise errors.OpPrereqError, ("The instance has a strange layout,"
2930
                                   " expected one secondary but found %d" %
2931
                                   len(instance.secondary_nodes))
2932

    
2933
    remote_node = getattr(self.op, "remote_node", None)
2934
    if remote_node is None:
2935
      remote_node = instance.secondary_nodes[0]
2936
    else:
2937
      remote_node = self.cfg.ExpandNodeName(remote_node)
2938
      if remote_node is None:
2939
        raise errors.OpPrereqError, ("Node '%s' not known" %
2940
                                     self.op.remote_node)
2941
    if remote_node == instance.primary_node:
2942
      raise errors.OpPrereqError, ("The specified node is the primary node of"
2943
                                   " the instance.")
2944
    self.op.remote_node = remote_node
2945

    
2946
  def Exec(self, feedback_fn):
2947
    """Replace the disks of an instance.
2948

2949
    """
2950
    instance = self.instance
2951
    iv_names = {}
2952
    # start of work
2953
    remote_node = self.op.remote_node
2954
    cfg = self.cfg
2955
    vgname = cfg.GetVGName()
2956
    for dev in instance.disks:
2957
      size = dev.size
2958
      new_drbd = _GenerateMDDRBDBranch(cfg, vgname, instance.primary_node,
2959
                                       remote_node, size,
2960
                                       "%s-%s" % (instance.name, dev.iv_name))
2961
      iv_names[dev.iv_name] = (dev, dev.children[0], new_drbd)
2962
      logger.Info("adding new mirror component on secondary for %s" %
2963
                  dev.iv_name)
2964
      #HARDCODE
2965
      if not _CreateBlockDevOnSecondary(cfg, remote_node, new_drbd, False):
2966
        raise errors.OpExecError, ("Failed to create new component on"
2967
                                   " secondary node %s\n"
2968
                                   "Full abort, cleanup manually!" %
2969
                                   remote_node)
2970

    
2971
      logger.Info("adding new mirror component on primary")
2972
      #HARDCODE
2973
      if not _CreateBlockDevOnPrimary(cfg, instance.primary_node, new_drbd):
2974
        # remove secondary dev
2975
        cfg.SetDiskID(new_drbd, remote_node)
2976
        rpc.call_blockdev_remove(remote_node, new_drbd)
2977
        raise errors.OpExecError("Failed to create volume on primary!\n"
2978
                                 "Full abort, cleanup manually!!")
2979

    
2980
      # the device exists now
2981
      # call the primary node to add the mirror to md
2982
      logger.Info("adding new mirror component to md")
2983
      if not rpc.call_blockdev_addchild(instance.primary_node, dev,
2984
                                        new_drbd):
2985
        logger.Error("Can't add mirror compoment to md!")
2986
        cfg.SetDiskID(new_drbd, remote_node)
2987
        if not rpc.call_blockdev_remove(remote_node, new_drbd):
2988
          logger.Error("Can't rollback on secondary")
2989
        cfg.SetDiskID(new_drbd, instance.primary_node)
2990
        if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
2991
          logger.Error("Can't rollback on primary")
2992
        raise errors.OpExecError, ("Full abort, cleanup manually!!")
2993

    
2994
      dev.children.append(new_drbd)
2995
      cfg.AddInstance(instance)
2996

    
2997
    # this can fail as the old devices are degraded and _WaitForSync
2998
    # does a combined result over all disks, so we don't check its
2999
    # return value
3000
    _WaitForSync(cfg, instance, unlock=True)
3001

    
3002
    # so check manually all the devices
3003
    for name in iv_names:
3004
      dev, child, new_drbd = iv_names[name]
3005
      cfg.SetDiskID(dev, instance.primary_node)
3006
      is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3007
      if is_degr:
3008
        raise errors.OpExecError, ("MD device %s is degraded!" % name)
3009
      cfg.SetDiskID(new_drbd, instance.primary_node)
3010
      is_degr = rpc.call_blockdev_find(instance.primary_node, new_drbd)[5]
3011
      if is_degr:
3012
        raise errors.OpExecError, ("New drbd device %s is degraded!" % name)
3013

    
3014
    for name in iv_names:
3015
      dev, child, new_drbd = iv_names[name]
3016
      logger.Info("remove mirror %s component" % name)
3017
      cfg.SetDiskID(dev, instance.primary_node)
3018
      if not rpc.call_blockdev_removechild(instance.primary_node,
3019
                                                dev, child):
3020
        logger.Error("Can't remove child from mirror, aborting"
3021
                     " *this device cleanup*.\nYou need to cleanup manually!!")
3022
        continue
3023

    
3024
      for node in child.logical_id[:2]:
3025
        logger.Info("remove child device on %s" % node)
3026
        cfg.SetDiskID(child, node)
3027
        if not rpc.call_blockdev_remove(node, child):
3028
          logger.Error("Warning: failed to remove device from node %s,"
3029
                       " continuing operation." % node)
3030

    
3031
      dev.children.remove(child)
3032

    
3033
      cfg.AddInstance(instance)
3034

    
3035

    
3036
class LUQueryInstanceData(NoHooksLU):
3037
  """Query runtime instance data.
3038

3039
  """
3040
  _OP_REQP = ["instances"]
3041

    
3042
  def CheckPrereq(self):
3043
    """Check prerequisites.
3044

3045
    This only checks the optional instance list against the existing names.
3046

3047
    """
3048
    if not isinstance(self.op.instances, list):
3049
      raise errors.OpPrereqError, "Invalid argument type 'instances'"
3050
    if self.op.instances:
3051
      self.wanted_instances = []
3052
      names = self.op.instances
3053
      for name in names:
3054
        instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
3055
        if instance is None:
3056
          raise errors.OpPrereqError, ("No such instance name '%s'" % name)
3057
      self.wanted_instances.append(instance)
3058
    else:
3059
      self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
3060
                               in self.cfg.GetInstanceList()]
3061
    return
3062

    
3063

    
3064
  def _ComputeDiskStatus(self, instance, snode, dev):
3065
    """Compute block device status.
3066

3067
    """
3068
    self.cfg.SetDiskID(dev, instance.primary_node)
3069
    dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
3070
    if dev.dev_type == "drbd":
3071
      # we change the snode then (otherwise we use the one passed in)
3072
      if dev.logical_id[0] == instance.primary_node:
3073
        snode = dev.logical_id[1]
3074
      else:
3075
        snode = dev.logical_id[0]
3076

    
3077
    if snode:
3078
      self.cfg.SetDiskID(dev, snode)
3079
      dev_sstatus = rpc.call_blockdev_find(snode, dev)
3080
    else:
3081
      dev_sstatus = None
3082

    
3083
    if dev.children:
3084
      dev_children = [self._ComputeDiskStatus(instance, snode, child)
3085
                      for child in dev.children]
3086
    else:
3087
      dev_children = []
3088

    
3089
    data = {
3090
      "iv_name": dev.iv_name,
3091
      "dev_type": dev.dev_type,
3092
      "logical_id": dev.logical_id,
3093
      "physical_id": dev.physical_id,
3094
      "pstatus": dev_pstatus,
3095
      "sstatus": dev_sstatus,
3096
      "children": dev_children,
3097
      }
3098

    
3099
    return data
3100

    
3101
  def Exec(self, feedback_fn):
3102
    """Gather and return data"""
3103

    
3104
    result = {}
3105
    for instance in self.wanted_instances:
3106
      remote_info = rpc.call_instance_info(instance.primary_node,
3107
                                                instance.name)
3108
      if remote_info and "state" in remote_info:
3109
        remote_state = "up"
3110
      else:
3111
        remote_state = "down"
3112
      if instance.status == "down":
3113
        config_state = "down"
3114
      else:
3115
        config_state = "up"
3116

    
3117
      disks = [self._ComputeDiskStatus(instance, None, device)
3118
               for device in instance.disks]
3119

    
3120
      idict = {
3121
        "name": instance.name,
3122
        "config_state": config_state,
3123
        "run_state": remote_state,
3124
        "pnode": instance.primary_node,
3125
        "snodes": instance.secondary_nodes,
3126
        "os": instance.os,
3127
        "memory": instance.memory,
3128
        "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
3129
        "disks": disks,
3130
        }
3131

    
3132
      result[instance.name] = idict
3133

    
3134
    return result
3135

    
3136

    
3137
class LUQueryNodeData(NoHooksLU):
3138
  """Logical unit for querying node data.
3139

3140
  """
3141
  _OP_REQP = ["nodes"]
3142

    
3143
  def CheckPrereq(self):
3144
    """Check prerequisites.
3145

3146
    This only checks the optional node list against the existing names.
3147

3148
    """
3149
    self.wanted_nodes = _GetWantedNodes(self, self.op.nodes)
3150

    
3151
  def Exec(self, feedback_fn):
3152
    """Compute and return the list of nodes.
3153

3154
    """
3155

    
3156
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
3157
             in self.cfg.GetInstanceList()]
3158
    result = []
3159
    for node in self.wanted_nodes:
3160
      result.append((node.name, node.primary_ip, node.secondary_ip,
3161
                     [inst.name for inst in ilist
3162
                      if inst.primary_node == node.name],
3163
                     [inst.name for inst in ilist
3164
                      if node.name in inst.secondary_nodes],
3165
                     ))
3166
    return result
3167

    
3168

    
3169
class LUSetInstanceParms(LogicalUnit):
3170
  """Modifies an instances's parameters.
3171

3172
  """
3173
  HPATH = "instance-modify"
3174
  HTYPE = constants.HTYPE_INSTANCE
3175
  _OP_REQP = ["instance_name"]
3176

    
3177
  def BuildHooksEnv(self):
3178
    """Build hooks env.
3179

3180
    This runs on the master, primary and secondaries.
3181

3182
    """
3183
    env = {
3184
      "INSTANCE_NAME": self.op.instance_name,
3185
      }
3186
    if self.mem:
3187
      env["MEM_SIZE"] = self.mem
3188
    if self.vcpus:
3189
      env["VCPUS"] = self.vcpus
3190
    if self.do_ip:
3191
      env["INSTANCE_IP"] = self.ip
3192
    if self.bridge:
3193
      env["BRIDGE"] = self.bridge
3194

    
3195
    nl = [self.sstore.GetMasterNode(),
3196
          self.instance.primary_node] + list(self.instance.secondary_nodes)
3197

    
3198
    return env, nl, nl
3199

    
3200
  def CheckPrereq(self):
3201
    """Check prerequisites.
3202

3203
    This only checks the instance list against the existing names.
3204

3205
    """
3206
    self.mem = getattr(self.op, "mem", None)
3207
    self.vcpus = getattr(self.op, "vcpus", None)
3208
    self.ip = getattr(self.op, "ip", None)
3209
    self.bridge = getattr(self.op, "bridge", None)
3210
    if [self.mem, self.vcpus, self.ip, self.bridge].count(None) == 4:
3211
      raise errors.OpPrereqError, ("No changes submitted")
3212
    if self.mem is not None:
3213
      try:
3214
        self.mem = int(self.mem)
3215
      except ValueError, err:
3216
        raise errors.OpPrereqError, ("Invalid memory size: %s" % str(err))
3217
    if self.vcpus is not None:
3218
      try:
3219
        self.vcpus = int(self.vcpus)
3220
      except ValueError, err:
3221
        raise errors.OpPrereqError, ("Invalid vcpus number: %s" % str(err))
3222
    if self.ip is not None:
3223
      self.do_ip = True
3224
      if self.ip.lower() == "none":
3225
        self.ip = None
3226
      else:
3227
        if not utils.IsValidIP(self.ip):
3228
          raise errors.OpPrereqError, ("Invalid IP address '%s'." % self.ip)
3229
    else:
3230
      self.do_ip = False
3231

    
3232
    instance = self.cfg.GetInstanceInfo(
3233
      self.cfg.ExpandInstanceName(self.op.instance_name))
3234
    if instance is None:
3235
      raise errors.OpPrereqError, ("No such instance name '%s'" %
3236
                                   self.op.instance_name)
3237
    self.op.instance_name = instance.name
3238
    self.instance = instance
3239
    return
3240

    
3241
  def Exec(self, feedback_fn):
3242
    """Modifies an instance.
3243

3244
    All parameters take effect only at the next restart of the instance.
3245
    """
3246
    result = []
3247
    instance = self.instance
3248
    if self.mem:
3249
      instance.memory = self.mem
3250
      result.append(("mem", self.mem))
3251
    if self.vcpus:
3252
      instance.vcpus = self.vcpus
3253
      result.append(("vcpus",  self.vcpus))
3254
    if self.do_ip:
3255
      instance.nics[0].ip = self.ip
3256
      result.append(("ip", self.ip))
3257
    if self.bridge:
3258
      instance.nics[0].bridge = self.bridge
3259
      result.append(("bridge", self.bridge))
3260

    
3261
    self.cfg.AddInstance(instance)
3262

    
3263
    return result
3264

    
3265

    
3266
class LUQueryExports(NoHooksLU):
3267
  """Query the exports list
3268

3269
  """
3270
  _OP_REQP = []
3271

    
3272
  def CheckPrereq(self):
3273
    """Check that the nodelist contains only existing nodes.
3274

3275
    """
3276
    self.nodes = _GetWantedNodes(self, getattr(self.op, "nodes", None))
3277

    
3278
  def Exec(self, feedback_fn):
3279
    """Compute the list of all the exported system images.
3280

3281
    Returns:
3282
      a dictionary with the structure node->(export-list)
3283
      where export-list is a list of the instances exported on
3284
      that node.
3285

3286
    """
3287
    return rpc.call_export_list([node.name for node in self.nodes])
3288

    
3289

    
3290
class LUExportInstance(LogicalUnit):
3291
  """Export an instance to an image in the cluster.
3292

3293
  """
3294
  HPATH = "instance-export"
3295
  HTYPE = constants.HTYPE_INSTANCE
3296
  _OP_REQP = ["instance_name", "target_node", "shutdown"]
3297

    
3298
  def BuildHooksEnv(self):
3299
    """Build hooks env.
3300

3301
    This will run on the master, primary node and target node.
3302

3303
    """
3304
    env = {
3305
      "INSTANCE_NAME": self.op.instance_name,
3306
      "EXPORT_NODE": self.op.target_node,
3307
      "EXPORT_DO_SHUTDOWN": self.op.shutdown,
3308
      }
3309
    nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
3310
          self.op.target_node]
3311
    return env, nl, nl
3312

    
3313
  def CheckPrereq(self):
3314
    """Check prerequisites.
3315

3316
    This checks that the instance name is a valid one.
3317

3318
    """
3319
    instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
3320
    self.instance = self.cfg.GetInstanceInfo(instance_name)
3321
    if self.instance is None:
3322
      raise errors.OpPrereqError, ("Instance '%s' not found" %
3323
                                   self.op.instance_name)
3324

    
3325
    # node verification
3326
    dst_node_short = self.cfg.ExpandNodeName(self.op.target_node)
3327
    self.dst_node = self.cfg.GetNodeInfo(dst_node_short)
3328

    
3329
    if self.dst_node is None:
3330
      raise errors.OpPrereqError, ("Destination node '%s' is uknown." %
3331
                                   self.op.target_node)
3332
    self.op.target_node = self.dst_node.name
3333

    
3334
  def Exec(self, feedback_fn):
3335
    """Export an instance to an image in the cluster.
3336

3337
    """
3338
    instance = self.instance
3339
    dst_node = self.dst_node
3340
    src_node = instance.primary_node
3341
    # shutdown the instance, unless requested not to do so
3342
    if self.op.shutdown:
3343
      op = opcodes.OpShutdownInstance(instance_name=instance.name)
3344
      self.processor.ChainOpCode(op, feedback_fn)
3345

    
3346
    vgname = self.cfg.GetVGName()
3347

    
3348
    snap_disks = []
3349

    
3350
    try:
3351
      for disk in instance.disks:
3352
        if disk.iv_name == "sda":
3353
          # new_dev_name will be a snapshot of an lvm leaf of the one we passed
3354
          new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
3355

    
3356
          if not new_dev_name:
3357
            logger.Error("could not snapshot block device %s on node %s" %
3358
                         (disk.logical_id[1], src_node))
3359
          else:
3360
            new_dev = objects.Disk(dev_type="lvm", size=disk.size,
3361
                                      logical_id=(vgname, new_dev_name),
3362
                                      physical_id=(vgname, new_dev_name),
3363
                                      iv_name=disk.iv_name)
3364
            snap_disks.append(new_dev)
3365

    
3366
    finally:
3367
      if self.op.shutdown:
3368
        op = opcodes.OpStartupInstance(instance_name=instance.name,
3369
                                       force=False)
3370
        self.processor.ChainOpCode(op, feedback_fn)
3371

    
3372
    # TODO: check for size
3373

    
3374
    for dev in snap_disks:
3375
      if not rpc.call_snapshot_export(src_node, dev, dst_node.name,
3376
                                           instance):
3377
        logger.Error("could not export block device %s from node"
3378
                     " %s to node %s" %
3379
                     (dev.logical_id[1], src_node, dst_node.name))
3380
      if not rpc.call_blockdev_remove(src_node, dev):
3381
        logger.Error("could not remove snapshot block device %s from"
3382
                     " node %s" % (dev.logical_id[1], src_node))
3383

    
3384
    if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
3385
      logger.Error("could not finalize export for instance %s on node %s" %
3386
                   (instance.name, dst_node.name))
3387

    
3388
    nodelist = self.cfg.GetNodeList()
3389
    nodelist.remove(dst_node.name)
3390

    
3391
    # on one-node clusters nodelist will be empty after the removal
3392
    # if we proceed the backup would be removed because OpQueryExports
3393
    # substitutes an empty list with the full cluster node list.
3394
    if nodelist:
3395
      op = opcodes.OpQueryExports(nodes=nodelist)
3396
      exportlist = self.processor.ChainOpCode(op, feedback_fn)
3397
      for node in exportlist:
3398
        if instance.name in exportlist[node]:
3399
          if not rpc.call_export_remove(node, instance.name):
3400
            logger.Error("could not remove older export for instance %s"
3401
                         " on node %s" % (instance.name, node))