Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ 098c0958

History | View | Annotate | Download (107.1 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0613,W0201
25

    
26
import os
27
import os.path
28
import sha
29
import socket
30
import time
31
import tempfile
32
import re
33
import platform
34

    
35
from ganeti import rpc
36
from ganeti import ssh
37
from ganeti import logger
38
from ganeti import utils
39
from ganeti import errors
40
from ganeti import hypervisor
41
from ganeti import config
42
from ganeti import constants
43
from ganeti import objects
44
from ganeti import opcodes
45
from ganeti import ssconf
46

    
47
class LogicalUnit(object):
48
  """Logical Unit base class..
49

50
  Subclasses must follow these rules:
51
    - implement CheckPrereq which also fills in the opcode instance
52
      with all the fields (even if as None)
53
    - implement Exec
54
    - implement BuildHooksEnv
55
    - redefine HPATH and HTYPE
56
    - optionally redefine their run requirements (REQ_CLUSTER,
57
      REQ_MASTER); note that all commands require root permissions
58

59
  """
60
  HPATH = None
61
  HTYPE = None
62
  _OP_REQP = []
63
  REQ_CLUSTER = True
64
  REQ_MASTER = True
65

    
66
  def __init__(self, processor, op, cfg, sstore):
67
    """Constructor for LogicalUnit.
68

69
    This needs to be overriden in derived classes in order to check op
70
    validity.
71

72
    """
73
    self.processor = processor
74
    self.op = op
75
    self.cfg = cfg
76
    self.sstore = sstore
77
    for attr_name in self._OP_REQP:
78
      attr_val = getattr(op, attr_name, None)
79
      if attr_val is None:
80
        raise errors.OpPrereqError, ("Required parameter '%s' missing" %
81
                                     attr_name)
82
    if self.REQ_CLUSTER:
83
      if not cfg.IsCluster():
84
        raise errors.OpPrereqError, ("Cluster not initialized yet,"
85
                                     " use 'gnt-cluster init' first.")
86
      if self.REQ_MASTER:
87
        master = sstore.GetMasterNode()
88
        if master != socket.gethostname():
89
          raise errors.OpPrereqError, ("Commands must be run on the master"
90
                                       " node %s" % master)
91

    
92
  def CheckPrereq(self):
93
    """Check prerequisites for this LU.
94

95
    This method should check that the prerequisites for the execution
96
    of this LU are fulfilled. It can do internode communication, but
97
    it should be idempotent - no cluster or system changes are
98
    allowed.
99

100
    The method should raise errors.OpPrereqError in case something is
101
    not fulfilled. Its return value is ignored.
102

103
    This method should also update all the parameters of the opcode to
104
    their canonical form; e.g. a short node name must be fully
105
    expanded after this method has successfully completed (so that
106
    hooks, logging, etc. work correctly).
107

108
    """
109
    raise NotImplementedError
110

    
111
  def Exec(self, feedback_fn):
112
    """Execute the LU.
113

114
    This method should implement the actual work. It should raise
115
    errors.OpExecError for failures that are somewhat dealt with in
116
    code, or expected.
117

118
    """
119
    raise NotImplementedError
120

    
121
  def BuildHooksEnv(self):
122
    """Build hooks environment for this LU.
123

124
    This method should return a three-node tuple consisting of: a dict
125
    containing the environment that will be used for running the
126
    specific hook for this LU, a list of node names on which the hook
127
    should run before the execution, and a list of node names on which
128
    the hook should run after the execution.
129

130
    The keys of the dict must not have 'GANETI_' prefixed as this will
131
    be handled in the hooks runner. Also note additional keys will be
132
    added by the hooks runner. If the LU doesn't define any
133
    environment, an empty dict (and not None) should be returned.
134

135
    As for the node lists, the master should not be included in the
136
    them, as it will be added by the hooks runner in case this LU
137
    requires a cluster to run on (otherwise we don't have a node
138
    list). No nodes should be returned as an empty list (and not
139
    None).
140

141
    Note that if the HPATH for a LU class is None, this function will
142
    not be called.
143

144
    """
145
    raise NotImplementedError
146

    
147

    
148
class NoHooksLU(LogicalUnit):
149
  """Simple LU which runs no hooks.
150

151
  This LU is intended as a parent for other LogicalUnits which will
152
  run no hooks, in order to reduce duplicate code.
153

154
  """
155
  HPATH = None
156
  HTYPE = None
157

    
158
  def BuildHooksEnv(self):
159
    """Build hooks env.
160

161
    This is a no-op, since we don't run hooks.
162

163
    """
164
    return
165

    
166

    
167
def _GetWantedNodes(lu, nodes):
168
  """Returns list of checked and expanded nodes.
169

170
  Args:
171
    nodes: List of nodes (strings) or None for all
172

173
  """
174
  if nodes is not None and not isinstance(nodes, list):
175
    raise errors.OpPrereqError, "Invalid argument type 'nodes'"
176

    
177
  if nodes:
178
    wanted_nodes = []
179

    
180
    for name in nodes:
181
      node = lu.cfg.GetNodeInfo(lu.cfg.ExpandNodeName(name))
182
      if node is None:
183
        raise errors.OpPrereqError, ("No such node name '%s'" % name)
184
    wanted_nodes.append(node)
185

    
186
    return wanted_nodes
187
  else:
188
    return [lu.cfg.GetNodeInfo(name) for name in lu.cfg.GetNodeList()]
189

    
190

    
191
def _CheckOutputFields(static, dynamic, selected):
192
  """Checks whether all selected fields are valid.
193

194
  Args:
195
    static: Static fields
196
    dynamic: Dynamic fields
197

198
  """
199
  static_fields = frozenset(static)
200
  dynamic_fields = frozenset(dynamic)
201

    
202
  all_fields = static_fields | dynamic_fields
203

    
204
  if not all_fields.issuperset(selected):
205
    raise errors.OpPrereqError, ("Unknown output fields selected: %s"
206
                                 % ",".join(frozenset(selected).
207
                                            difference(all_fields)))
208

    
209

    
210
def _UpdateEtcHosts(fullnode, ip):
211
  """Ensure a node has a correct entry in /etc/hosts.
212

213
  Args:
214
    fullnode - Fully qualified domain name of host. (str)
215
    ip       - IPv4 address of host (str)
216

217
  """
218
  node = fullnode.split(".", 1)[0]
219

    
220
  f = open('/etc/hosts', 'r+')
221

    
222
  inthere = False
223

    
224
  save_lines = []
225
  add_lines = []
226
  removed = False
227

    
228
  while True:
229
    rawline = f.readline()
230

    
231
    if not rawline:
232
      # End of file
233
      break
234

    
235
    line = rawline.split('\n')[0]
236

    
237
    # Strip off comments
238
    line = line.split('#')[0]
239

    
240
    if not line:
241
      # Entire line was comment, skip
242
      save_lines.append(rawline)
243
      continue
244

    
245
    fields = line.split()
246

    
247
    haveall = True
248
    havesome = False
249
    for spec in [ ip, fullnode, node ]:
250
      if spec not in fields:
251
        haveall = False
252
      if spec in fields:
253
        havesome = True
254

    
255
    if haveall:
256
      inthere = True
257
      save_lines.append(rawline)
258
      continue
259

    
260
    if havesome and not haveall:
261
      # Line (old, or manual?) which is missing some.  Remove.
262
      removed = True
263
      continue
264

    
265
    save_lines.append(rawline)
266

    
267
  if not inthere:
268
    add_lines.append('%s\t%s %s\n' % (ip, fullnode, node))
269

    
270
  if removed:
271
    if add_lines:
272
      save_lines = save_lines + add_lines
273

    
274
    # We removed a line, write a new file and replace old.
275
    fd, tmpname = tempfile.mkstemp('tmp', 'hosts_', '/etc')
276
    newfile = os.fdopen(fd, 'w')
277
    newfile.write(''.join(save_lines))
278
    newfile.close()
279
    os.rename(tmpname, '/etc/hosts')
280

    
281
  elif add_lines:
282
    # Simply appending a new line will do the trick.
283
    f.seek(0, 2)
284
    for add in add_lines:
285
      f.write(add)
286

    
287
  f.close()
288

    
289

    
290
def _UpdateKnownHosts(fullnode, ip, pubkey):
291
  """Ensure a node has a correct known_hosts entry.
292

293
  Args:
294
    fullnode - Fully qualified domain name of host. (str)
295
    ip       - IPv4 address of host (str)
296
    pubkey   - the public key of the cluster
297

298
  """
299
  if os.path.exists('/etc/ssh/ssh_known_hosts'):
300
    f = open('/etc/ssh/ssh_known_hosts', 'r+')
301
  else:
302
    f = open('/etc/ssh/ssh_known_hosts', 'w+')
303

    
304
  inthere = False
305

    
306
  save_lines = []
307
  add_lines = []
308
  removed = False
309

    
310
  while True:
311
    rawline = f.readline()
312
    logger.Debug('read %s' % (repr(rawline),))
313

    
314
    if not rawline:
315
      # End of file
316
      break
317

    
318
    line = rawline.split('\n')[0]
319

    
320
    parts = line.split(' ')
321
    fields = parts[0].split(',')
322
    key = parts[2]
323

    
324
    haveall = True
325
    havesome = False
326
    for spec in [ ip, fullnode ]:
327
      if spec not in fields:
328
        haveall = False
329
      if spec in fields:
330
        havesome = True
331

    
332
    logger.Debug("key, pubkey = %s." % (repr((key, pubkey)),))
333
    if haveall and key == pubkey:
334
      inthere = True
335
      save_lines.append(rawline)
336
      logger.Debug("Keeping known_hosts '%s'." % (repr(rawline),))
337
      continue
338

    
339
    if havesome and (not haveall or key != pubkey):
340
      removed = True
341
      logger.Debug("Discarding known_hosts '%s'." % (repr(rawline),))
342
      continue
343

    
344
    save_lines.append(rawline)
345

    
346
  if not inthere:
347
    add_lines.append('%s,%s ssh-rsa %s\n' % (fullnode, ip, pubkey))
348
    logger.Debug("Adding known_hosts '%s'." % (repr(add_lines[-1]),))
349

    
350
  if removed:
351
    save_lines = save_lines + add_lines
352

    
353
    # Write a new file and replace old.
354
    fd, tmpname = tempfile.mkstemp('tmp', 'ssh_known_hosts_', '/etc/ssh')
355
    newfile = os.fdopen(fd, 'w')
356
    newfile.write(''.join(save_lines))
357
    newfile.close()
358
    logger.Debug("Wrote new known_hosts.")
359
    os.rename(tmpname, '/etc/ssh/ssh_known_hosts')
360

    
361
  elif add_lines:
362
    # Simply appending a new line will do the trick.
363
    f.seek(0, 2)
364
    for add in add_lines:
365
      f.write(add)
366

    
367
  f.close()
368

    
369

    
370
def _HasValidVG(vglist, vgname):
371
  """Checks if the volume group list is valid.
372

373
  A non-None return value means there's an error, and the return value
374
  is the error message.
375

376
  """
377
  vgsize = vglist.get(vgname, None)
378
  if vgsize is None:
379
    return "volume group '%s' missing" % vgname
380
  elif vgsize < 20480:
381
    return ("volume group '%s' too small (20480MiB required, %dMib found)" %
382
            (vgname, vgsize))
383
  return None
384

    
385

    
386
def _InitSSHSetup(node):
387
  """Setup the SSH configuration for the cluster.
388

389

390
  This generates a dsa keypair for root, adds the pub key to the
391
  permitted hosts and adds the hostkey to its own known hosts.
392

393
  Args:
394
    node: the name of this host as a fqdn
395

396
  """
397
  utils.RemoveFile('/root/.ssh/known_hosts')
398

    
399
  if os.path.exists('/root/.ssh/id_dsa'):
400
    utils.CreateBackup('/root/.ssh/id_dsa')
401
  if os.path.exists('/root/.ssh/id_dsa.pub'):
402
    utils.CreateBackup('/root/.ssh/id_dsa.pub')
403

    
404
  utils.RemoveFile('/root/.ssh/id_dsa')
405
  utils.RemoveFile('/root/.ssh/id_dsa.pub')
406

    
407
  result = utils.RunCmd(["ssh-keygen", "-t", "dsa",
408
                         "-f", "/root/.ssh/id_dsa",
409
                         "-q", "-N", ""])
410
  if result.failed:
411
    raise errors.OpExecError, ("could not generate ssh keypair, error %s" %
412
                               result.output)
413

    
414
  f = open('/root/.ssh/id_dsa.pub', 'r')
415
  try:
416
    utils.AddAuthorizedKey('/root/.ssh/authorized_keys', f.read(8192))
417
  finally:
418
    f.close()
419

    
420

    
421
def _InitGanetiServerSetup(ss):
422
  """Setup the necessary configuration for the initial node daemon.
423

424
  This creates the nodepass file containing the shared password for
425
  the cluster and also generates the SSL certificate.
426

427
  """
428
  # Create pseudo random password
429
  randpass = sha.new(os.urandom(64)).hexdigest()
430
  # and write it into sstore
431
  ss.SetKey(ss.SS_NODED_PASS, randpass)
432

    
433
  result = utils.RunCmd(["openssl", "req", "-new", "-newkey", "rsa:1024",
434
                         "-days", str(365*5), "-nodes", "-x509",
435
                         "-keyout", constants.SSL_CERT_FILE,
436
                         "-out", constants.SSL_CERT_FILE, "-batch"])
437
  if result.failed:
438
    raise errors.OpExecError, ("could not generate server ssl cert, command"
439
                               " %s had exitcode %s and error message %s" %
440
                               (result.cmd, result.exit_code, result.output))
441

    
442
  os.chmod(constants.SSL_CERT_FILE, 0400)
443

    
444
  result = utils.RunCmd([constants.NODE_INITD_SCRIPT, "restart"])
445

    
446
  if result.failed:
447
    raise errors.OpExecError, ("could not start the node daemon, command %s"
448
                               " had exitcode %s and error %s" %
449
                               (result.cmd, result.exit_code, result.output))
450

    
451

    
452
class LUInitCluster(LogicalUnit):
453
  """Initialise the cluster.
454

455
  """
456
  HPATH = "cluster-init"
457
  HTYPE = constants.HTYPE_CLUSTER
458
  _OP_REQP = ["cluster_name", "hypervisor_type", "vg_name", "mac_prefix",
459
              "def_bridge", "master_netdev"]
460
  REQ_CLUSTER = False
461

    
462
  def BuildHooksEnv(self):
463
    """Build hooks env.
464

465
    Notes: Since we don't require a cluster, we must manually add
466
    ourselves in the post-run node list.
467

468
    """
469
    env = {"CLUSTER": self.op.cluster_name,
470
           "MASTER": self.hostname['hostname_full']}
471
    return env, [], [self.hostname['hostname_full']]
472

    
473
  def CheckPrereq(self):
474
    """Verify that the passed name is a valid one.
475

476
    """
477
    if config.ConfigWriter.IsCluster():
478
      raise errors.OpPrereqError, ("Cluster is already initialised")
479

    
480
    hostname_local = socket.gethostname()
481
    self.hostname = hostname = utils.LookupHostname(hostname_local)
482
    if not hostname:
483
      raise errors.OpPrereqError, ("Cannot resolve my own hostname ('%s')" %
484
                                   hostname_local)
485

    
486
    self.clustername = clustername = utils.LookupHostname(self.op.cluster_name)
487
    if not clustername:
488
      raise errors.OpPrereqError, ("Cannot resolve given cluster name ('%s')"
489
                                   % self.op.cluster_name)
490

    
491
    result = utils.RunCmd(["fping", "-S127.0.0.1", "-q", hostname['ip']])
492
    if result.failed:
493
      raise errors.OpPrereqError, ("Inconsistency: this host's name resolves"
494
                                   " to %s,\nbut this ip address does not"
495
                                   " belong to this host."
496
                                   " Aborting." % hostname['ip'])
497

    
498
    secondary_ip = getattr(self.op, "secondary_ip", None)
499
    if secondary_ip and not utils.IsValidIP(secondary_ip):
500
      raise errors.OpPrereqError, ("Invalid secondary ip given")
501
    if secondary_ip and secondary_ip != hostname['ip']:
502
      result = utils.RunCmd(["fping", "-S127.0.0.1", "-q", secondary_ip])
503
      if result.failed:
504
        raise errors.OpPrereqError, ("You gave %s as secondary IP,\n"
505
                                     "but it does not belong to this host." %
506
                                     secondary_ip)
507
    self.secondary_ip = secondary_ip
508

    
509
    # checks presence of the volume group given
510
    vgstatus = _HasValidVG(utils.ListVolumeGroups(), self.op.vg_name)
511

    
512
    if vgstatus:
513
      raise errors.OpPrereqError, ("Error: %s" % vgstatus)
514

    
515
    if not re.match("^[0-9a-z]{2}:[0-9a-z]{2}:[0-9a-z]{2}$",
516
                    self.op.mac_prefix):
517
      raise errors.OpPrereqError, ("Invalid mac prefix given '%s'" %
518
                                   self.op.mac_prefix)
519

    
520
    if self.op.hypervisor_type not in hypervisor.VALID_HTYPES:
521
      raise errors.OpPrereqError, ("Invalid hypervisor type given '%s'" %
522
                                   self.op.hypervisor_type)
523

    
524
    result = utils.RunCmd(["ip", "link", "show", "dev", self.op.master_netdev])
525
    if result.failed:
526
      raise errors.OpPrereqError, ("Invalid master netdev given (%s): '%s'" %
527
                                   (self.op.master_netdev, result.output))
528

    
529
  def Exec(self, feedback_fn):
530
    """Initialize the cluster.
531

532
    """
533
    clustername = self.clustername
534
    hostname = self.hostname
535

    
536
    # set up the simple store
537
    ss = ssconf.SimpleStore()
538
    ss.SetKey(ss.SS_HYPERVISOR, self.op.hypervisor_type)
539
    ss.SetKey(ss.SS_MASTER_NODE, hostname['hostname_full'])
540
    ss.SetKey(ss.SS_MASTER_IP, clustername['ip'])
541
    ss.SetKey(ss.SS_MASTER_NETDEV, self.op.master_netdev)
542

    
543
    # set up the inter-node password and certificate
544
    _InitGanetiServerSetup(ss)
545

    
546
    # start the master ip
547
    rpc.call_node_start_master(hostname['hostname_full'])
548

    
549
    # set up ssh config and /etc/hosts
550
    f = open('/etc/ssh/ssh_host_rsa_key.pub', 'r')
551
    try:
552
      sshline = f.read()
553
    finally:
554
      f.close()
555
    sshkey = sshline.split(" ")[1]
556

    
557
    _UpdateEtcHosts(hostname['hostname_full'],
558
                    hostname['ip'],
559
                    )
560

    
561
    _UpdateKnownHosts(hostname['hostname_full'],
562
                      hostname['ip'],
563
                      sshkey,
564
                      )
565

    
566
    _InitSSHSetup(hostname['hostname'])
567

    
568
    # init of cluster config file
569
    cfgw = config.ConfigWriter()
570
    cfgw.InitConfig(hostname['hostname'], hostname['ip'], self.secondary_ip,
571
                    clustername['hostname'], sshkey, self.op.mac_prefix,
572
                    self.op.vg_name, self.op.def_bridge)
573

    
574

    
575
class LUDestroyCluster(NoHooksLU):
576
  """Logical unit for destroying the cluster.
577

578
  """
579
  _OP_REQP = []
580

    
581
  def CheckPrereq(self):
582
    """Check prerequisites.
583

584
    This checks whether the cluster is empty.
585

586
    Any errors are signalled by raising errors.OpPrereqError.
587

588
    """
589
    master = self.sstore.GetMasterNode()
590

    
591
    nodelist = self.cfg.GetNodeList()
592
    if len(nodelist) > 0 and nodelist != [master]:
593
      raise errors.OpPrereqError, ("There are still %d node(s) in "
594
                                   "this cluster." % (len(nodelist) - 1))
595

    
596
  def Exec(self, feedback_fn):
597
    """Destroys the cluster.
598

599
    """
600
    utils.CreateBackup('/root/.ssh/id_dsa')
601
    utils.CreateBackup('/root/.ssh/id_dsa.pub')
602
    rpc.call_node_leave_cluster(self.sstore.GetMasterNode())
603

    
604

    
605
class LUVerifyCluster(NoHooksLU):
606
  """Verifies the cluster status.
607

608
  """
609
  _OP_REQP = []
610

    
611
  def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
612
                  remote_version, feedback_fn):
613
    """Run multiple tests against a node.
614

615
    Test list:
616
      - compares ganeti version
617
      - checks vg existance and size > 20G
618
      - checks config file checksum
619
      - checks ssh to other nodes
620

621
    Args:
622
      node: name of the node to check
623
      file_list: required list of files
624
      local_cksum: dictionary of local files and their checksums
625

626
    """
627
    # compares ganeti version
628
    local_version = constants.PROTOCOL_VERSION
629
    if not remote_version:
630
      feedback_fn(" - ERROR: connection to %s failed" % (node))
631
      return True
632

    
633
    if local_version != remote_version:
634
      feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
635
                      (local_version, node, remote_version))
636
      return True
637

    
638
    # checks vg existance and size > 20G
639

    
640
    bad = False
641
    if not vglist:
642
      feedback_fn("  - ERROR: unable to check volume groups on node %s." %
643
                      (node,))
644
      bad = True
645
    else:
646
      vgstatus = _HasValidVG(vglist, self.cfg.GetVGName())
647
      if vgstatus:
648
        feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
649
        bad = True
650

    
651
    # checks config file checksum
652
    # checks ssh to any
653

    
654
    if 'filelist' not in node_result:
655
      bad = True
656
      feedback_fn("  - ERROR: node hasn't returned file checksum data")
657
    else:
658
      remote_cksum = node_result['filelist']
659
      for file_name in file_list:
660
        if file_name not in remote_cksum:
661
          bad = True
662
          feedback_fn("  - ERROR: file '%s' missing" % file_name)
663
        elif remote_cksum[file_name] != local_cksum[file_name]:
664
          bad = True
665
          feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
666

    
667
    if 'nodelist' not in node_result:
668
      bad = True
669
      feedback_fn("  - ERROR: node hasn't returned node connectivity data")
670
    else:
671
      if node_result['nodelist']:
672
        bad = True
673
        for node in node_result['nodelist']:
674
          feedback_fn("  - ERROR: communication with node '%s': %s" %
675
                          (node, node_result['nodelist'][node]))
676
    hyp_result = node_result.get('hypervisor', None)
677
    if hyp_result is not None:
678
      feedback_fn("  - ERROR: hypervisor verify failure: '%s'" % hyp_result)
679
    return bad
680

    
681
  def _VerifyInstance(self, instance, node_vol_is, node_instance, feedback_fn):
682
    """Verify an instance.
683

684
    This function checks to see if the required block devices are
685
    available on the instance's node.
686

687
    """
688
    bad = False
689

    
690
    instancelist = self.cfg.GetInstanceList()
691
    if not instance in instancelist:
692
      feedback_fn("  - ERROR: instance %s not in instance list %s" %
693
                      (instance, instancelist))
694
      bad = True
695

    
696
    instanceconfig = self.cfg.GetInstanceInfo(instance)
697
    node_current = instanceconfig.primary_node
698

    
699
    node_vol_should = {}
700
    instanceconfig.MapLVsByNode(node_vol_should)
701

    
702
    for node in node_vol_should:
703
      for volume in node_vol_should[node]:
704
        if node not in node_vol_is or volume not in node_vol_is[node]:
705
          feedback_fn("  - ERROR: volume %s missing on node %s" %
706
                          (volume, node))
707
          bad = True
708

    
709
    if not instanceconfig.status == 'down':
710
      if not instance in node_instance[node_current]:
711
        feedback_fn("  - ERROR: instance %s not running on node %s" %
712
                        (instance, node_current))
713
        bad = True
714

    
715
    for node in node_instance:
716
      if (not node == node_current):
717
        if instance in node_instance[node]:
718
          feedback_fn("  - ERROR: instance %s should not run on node %s" %
719
                          (instance, node))
720
          bad = True
721

    
722
    return not bad
723

    
724
  def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
725
    """Verify if there are any unknown volumes in the cluster.
726

727
    The .os, .swap and backup volumes are ignored. All other volumes are
728
    reported as unknown.
729

730
    """
731
    bad = False
732

    
733
    for node in node_vol_is:
734
      for volume in node_vol_is[node]:
735
        if node not in node_vol_should or volume not in node_vol_should[node]:
736
          feedback_fn("  - ERROR: volume %s on node %s should not exist" %
737
                      (volume, node))
738
          bad = True
739
    return bad
740

    
741
  def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
742
    """Verify the list of running instances.
743

744
    This checks what instances are running but unknown to the cluster.
745

746
    """
747
    bad = False
748
    for node in node_instance:
749
      for runninginstance in node_instance[node]:
750
        if runninginstance not in instancelist:
751
          feedback_fn("  - ERROR: instance %s on node %s should not exist" %
752
                          (runninginstance, node))
753
          bad = True
754
    return bad
755

    
756
  def CheckPrereq(self):
757
    """Check prerequisites.
758

759
    This has no prerequisites.
760

761
    """
762
    pass
763

    
764
  def Exec(self, feedback_fn):
765
    """Verify integrity of cluster, performing various test on nodes.
766

767
    """
768
    bad = False
769
    feedback_fn("* Verifying global settings")
770
    self.cfg.VerifyConfig()
771

    
772
    master = self.sstore.GetMasterNode()
773
    vg_name = self.cfg.GetVGName()
774
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
775
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
776
    node_volume = {}
777
    node_instance = {}
778

    
779
    # FIXME: verify OS list
780
    # do local checksums
781
    file_names = list(self.sstore.GetFileList())
782
    file_names.append(constants.SSL_CERT_FILE)
783
    file_names.append(constants.CLUSTER_CONF_FILE)
784
    local_checksums = utils.FingerprintFiles(file_names)
785

    
786
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
787
    all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
788
    all_instanceinfo = rpc.call_instance_list(nodelist)
789
    all_vglist = rpc.call_vg_list(nodelist)
790
    node_verify_param = {
791
      'filelist': file_names,
792
      'nodelist': nodelist,
793
      'hypervisor': None,
794
      }
795
    all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
796
    all_rversion = rpc.call_version(nodelist)
797

    
798
    for node in nodelist:
799
      feedback_fn("* Verifying node %s" % node)
800
      result = self._VerifyNode(node, file_names, local_checksums,
801
                                all_vglist[node], all_nvinfo[node],
802
                                all_rversion[node], feedback_fn)
803
      bad = bad or result
804

    
805
      # node_volume
806
      volumeinfo = all_volumeinfo[node]
807

    
808
      if type(volumeinfo) != dict:
809
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
810
        bad = True
811
        continue
812

    
813
      node_volume[node] = volumeinfo
814

    
815
      # node_instance
816
      nodeinstance = all_instanceinfo[node]
817
      if type(nodeinstance) != list:
818
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
819
        bad = True
820
        continue
821

    
822
      node_instance[node] = nodeinstance
823

    
824
    node_vol_should = {}
825

    
826
    for instance in instancelist:
827
      feedback_fn("* Verifying instance %s" % instance)
828
      result =  self._VerifyInstance(instance, node_volume, node_instance,
829
                                     feedback_fn)
830
      bad = bad or result
831

    
832
      inst_config = self.cfg.GetInstanceInfo(instance)
833

    
834
      inst_config.MapLVsByNode(node_vol_should)
835

    
836
    feedback_fn("* Verifying orphan volumes")
837
    result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
838
                                       feedback_fn)
839
    bad = bad or result
840

    
841
    feedback_fn("* Verifying remaining instances")
842
    result = self._VerifyOrphanInstances(instancelist, node_instance,
843
                                         feedback_fn)
844
    bad = bad or result
845

    
846
    return int(bad)
847

    
848

    
849
def _WaitForSync(cfgw, instance, oneshot=False, unlock=False):
850
  """Sleep and poll for an instance's disk to sync.
851

852
  """
853
  if not instance.disks:
854
    return True
855

    
856
  if not oneshot:
857
    logger.ToStdout("Waiting for instance %s to sync disks." % instance.name)
858

    
859
  node = instance.primary_node
860

    
861
  for dev in instance.disks:
862
    cfgw.SetDiskID(dev, node)
863

    
864
  retries = 0
865
  while True:
866
    max_time = 0
867
    done = True
868
    cumul_degraded = False
869
    rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
870
    if not rstats:
871
      logger.ToStderr("Can't get any data from node %s" % node)
872
      retries += 1
873
      if retries >= 10:
874
        raise errors.RemoteError, ("Can't contact node %s for mirror data,"
875
                                   " aborting." % node)
876
      time.sleep(6)
877
      continue
878
    retries = 0
879
    for i in range(len(rstats)):
880
      mstat = rstats[i]
881
      if mstat is None:
882
        logger.ToStderr("Can't compute data for node %s/%s" %
883
                        (node, instance.disks[i].iv_name))
884
        continue
885
      perc_done, est_time, is_degraded = mstat
886
      cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
887
      if perc_done is not None:
888
        done = False
889
        if est_time is not None:
890
          rem_time = "%d estimated seconds remaining" % est_time
891
          max_time = est_time
892
        else:
893
          rem_time = "no time estimate"
894
        logger.ToStdout("- device %s: %5.2f%% done, %s" %
895
                        (instance.disks[i].iv_name, perc_done, rem_time))
896
    if done or oneshot:
897
      break
898

    
899
    if unlock:
900
      utils.Unlock('cmd')
901
    try:
902
      time.sleep(min(60, max_time))
903
    finally:
904
      if unlock:
905
        utils.Lock('cmd')
906

    
907
  if done:
908
    logger.ToStdout("Instance %s's disks are in sync." % instance.name)
909
  return not cumul_degraded
910

    
911

    
912
def _CheckDiskConsistency(cfgw, dev, node, on_primary):
913
  """Check that mirrors are not degraded.
914

915
  """
916
  cfgw.SetDiskID(dev, node)
917

    
918
  result = True
919
  if on_primary or dev.AssembleOnSecondary():
920
    rstats = rpc.call_blockdev_find(node, dev)
921
    if not rstats:
922
      logger.ToStderr("Can't get any data from node %s" % node)
923
      result = False
924
    else:
925
      result = result and (not rstats[5])
926
  if dev.children:
927
    for child in dev.children:
928
      result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
929

    
930
  return result
931

    
932

    
933
class LUDiagnoseOS(NoHooksLU):
934
  """Logical unit for OS diagnose/query.
935

936
  """
937
  _OP_REQP = []
938

    
939
  def CheckPrereq(self):
940
    """Check prerequisites.
941

942
    This always succeeds, since this is a pure query LU.
943

944
    """
945
    return
946

    
947
  def Exec(self, feedback_fn):
948
    """Compute the list of OSes.
949

950
    """
951
    node_list = self.cfg.GetNodeList()
952
    node_data = rpc.call_os_diagnose(node_list)
953
    if node_data == False:
954
      raise errors.OpExecError, "Can't gather the list of OSes"
955
    return node_data
956

    
957

    
958
class LURemoveNode(LogicalUnit):
959
  """Logical unit for removing a node.
960

961
  """
962
  HPATH = "node-remove"
963
  HTYPE = constants.HTYPE_NODE
964
  _OP_REQP = ["node_name"]
965

    
966
  def BuildHooksEnv(self):
967
    """Build hooks env.
968

969
    This doesn't run on the target node in the pre phase as a failed
970
    node would not allows itself to run.
971

972
    """
973
    all_nodes = self.cfg.GetNodeList()
974
    all_nodes.remove(self.op.node_name)
975
    return {"NODE_NAME": self.op.node_name}, all_nodes, all_nodes
976

    
977
  def CheckPrereq(self):
978
    """Check prerequisites.
979

980
    This checks:
981
     - the node exists in the configuration
982
     - it does not have primary or secondary instances
983
     - it's not the master
984

985
    Any errors are signalled by raising errors.OpPrereqError.
986

987
    """
988
    node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
989
    if node is None:
990
      logger.Error("Error: Node '%s' is unknown." % self.op.node_name)
991
      return 1
992

    
993
    instance_list = self.cfg.GetInstanceList()
994

    
995
    masternode = self.sstore.GetMasterNode()
996
    if node.name == masternode:
997
      raise errors.OpPrereqError, ("Node is the master node,"
998
                                   " you need to failover first.")
999

    
1000
    for instance_name in instance_list:
1001
      instance = self.cfg.GetInstanceInfo(instance_name)
1002
      if node.name == instance.primary_node:
1003
        raise errors.OpPrereqError, ("Instance %s still running on the node,"
1004
                                     " please remove first." % instance_name)
1005
      if node.name in instance.secondary_nodes:
1006
        raise errors.OpPrereqError, ("Instance %s has node as a secondary,"
1007
                                     " please remove first." % instance_name)
1008
    self.op.node_name = node.name
1009
    self.node = node
1010

    
1011
  def Exec(self, feedback_fn):
1012
    """Removes the node from the cluster.
1013

1014
    """
1015
    node = self.node
1016
    logger.Info("stopping the node daemon and removing configs from node %s" %
1017
                node.name)
1018

    
1019
    rpc.call_node_leave_cluster(node.name)
1020

    
1021
    ssh.SSHCall(node.name, 'root', "%s stop" % constants.NODE_INITD_SCRIPT)
1022

    
1023
    logger.Info("Removing node %s from config" % node.name)
1024

    
1025
    self.cfg.RemoveNode(node.name)
1026

    
1027

    
1028
class LUQueryNodes(NoHooksLU):
1029
  """Logical unit for querying nodes.
1030

1031
  """
1032
  _OP_REQP = ["output_fields"]
1033

    
1034
  def CheckPrereq(self):
1035
    """Check prerequisites.
1036

1037
    This checks that the fields required are valid output fields.
1038

1039
    """
1040
    self.dynamic_fields = frozenset(["dtotal", "dfree",
1041
                                     "mtotal", "mnode", "mfree"])
1042

    
1043
    _CheckOutputFields(static=["name", "pinst", "sinst", "pip", "sip"],
1044
                       dynamic=self.dynamic_fields,
1045
                       selected=self.op.output_fields)
1046

    
1047

    
1048
  def Exec(self, feedback_fn):
1049
    """Computes the list of nodes and their attributes.
1050

1051
    """
1052
    nodenames = utils.NiceSort(self.cfg.GetNodeList())
1053
    nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1054

    
1055

    
1056
    # begin data gathering
1057

    
1058
    if self.dynamic_fields.intersection(self.op.output_fields):
1059
      live_data = {}
1060
      node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1061
      for name in nodenames:
1062
        nodeinfo = node_data.get(name, None)
1063
        if nodeinfo:
1064
          live_data[name] = {
1065
            "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1066
            "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1067
            "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1068
            "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1069
            "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1070
            }
1071
        else:
1072
          live_data[name] = {}
1073
    else:
1074
      live_data = dict.fromkeys(nodenames, {})
1075

    
1076
    node_to_primary = dict.fromkeys(nodenames, 0)
1077
    node_to_secondary = dict.fromkeys(nodenames, 0)
1078

    
1079
    if "pinst" in self.op.output_fields or "sinst" in self.op.output_fields:
1080
      instancelist = self.cfg.GetInstanceList()
1081

    
1082
      for instance in instancelist:
1083
        instanceinfo = self.cfg.GetInstanceInfo(instance)
1084
        node_to_primary[instanceinfo.primary_node] += 1
1085
        for secnode in instanceinfo.secondary_nodes:
1086
          node_to_secondary[secnode] += 1
1087

    
1088
    # end data gathering
1089

    
1090
    output = []
1091
    for node in nodelist:
1092
      node_output = []
1093
      for field in self.op.output_fields:
1094
        if field == "name":
1095
          val = node.name
1096
        elif field == "pinst":
1097
          val = node_to_primary[node.name]
1098
        elif field == "sinst":
1099
          val = node_to_secondary[node.name]
1100
        elif field == "pip":
1101
          val = node.primary_ip
1102
        elif field == "sip":
1103
          val = node.secondary_ip
1104
        elif field in self.dynamic_fields:
1105
          val = live_data[node.name].get(field, "?")
1106
        else:
1107
          raise errors.ParameterError, field
1108
        val = str(val)
1109
        node_output.append(val)
1110
      output.append(node_output)
1111

    
1112
    return output
1113

    
1114

    
1115
class LUQueryNodeVolumes(NoHooksLU):
1116
  """Logical unit for getting volumes on node(s).
1117

1118
  """
1119
  _OP_REQP = ["nodes", "output_fields"]
1120

    
1121
  def CheckPrereq(self):
1122
    """Check prerequisites.
1123

1124
    This checks that the fields required are valid output fields.
1125

1126
    """
1127
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1128

    
1129
    _CheckOutputFields(static=["node"],
1130
                       dynamic=["phys", "vg", "name", "size", "instance"],
1131
                       selected=self.op.output_fields)
1132

    
1133

    
1134
  def Exec(self, feedback_fn):
1135
    """Computes the list of nodes and their attributes.
1136

1137
    """
1138
    nodenames = utils.NiceSort([node.name for node in self.nodes])
1139
    volumes = rpc.call_node_volumes(nodenames)
1140

    
1141
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
1142
             in self.cfg.GetInstanceList()]
1143

    
1144
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1145

    
1146
    output = []
1147
    for node in nodenames:
1148
      node_vols = volumes[node][:]
1149
      node_vols.sort(key=lambda vol: vol['dev'])
1150

    
1151
      for vol in node_vols:
1152
        node_output = []
1153
        for field in self.op.output_fields:
1154
          if field == "node":
1155
            val = node
1156
          elif field == "phys":
1157
            val = vol['dev']
1158
          elif field == "vg":
1159
            val = vol['vg']
1160
          elif field == "name":
1161
            val = vol['name']
1162
          elif field == "size":
1163
            val = int(float(vol['size']))
1164
          elif field == "instance":
1165
            for inst in ilist:
1166
              if node not in lv_by_node[inst]:
1167
                continue
1168
              if vol['name'] in lv_by_node[inst][node]:
1169
                val = inst.name
1170
                break
1171
            else:
1172
              val = '-'
1173
          else:
1174
            raise errors.ParameterError, field
1175
          node_output.append(str(val))
1176

    
1177
        output.append(node_output)
1178

    
1179
    return output
1180

    
1181

    
1182
class LUAddNode(LogicalUnit):
1183
  """Logical unit for adding node to the cluster.
1184

1185
  """
1186
  HPATH = "node-add"
1187
  HTYPE = constants.HTYPE_NODE
1188
  _OP_REQP = ["node_name"]
1189

    
1190
  def BuildHooksEnv(self):
1191
    """Build hooks env.
1192

1193
    This will run on all nodes before, and on all nodes + the new node after.
1194

1195
    """
1196
    env = {
1197
      "NODE_NAME": self.op.node_name,
1198
      "NODE_PIP": self.op.primary_ip,
1199
      "NODE_SIP": self.op.secondary_ip,
1200
      }
1201
    nodes_0 = self.cfg.GetNodeList()
1202
    nodes_1 = nodes_0 + [self.op.node_name, ]
1203
    return env, nodes_0, nodes_1
1204

    
1205
  def CheckPrereq(self):
1206
    """Check prerequisites.
1207

1208
    This checks:
1209
     - the new node is not already in the config
1210
     - it is resolvable
1211
     - its parameters (single/dual homed) matches the cluster
1212

1213
    Any errors are signalled by raising errors.OpPrereqError.
1214

1215
    """
1216
    node_name = self.op.node_name
1217
    cfg = self.cfg
1218

    
1219
    dns_data = utils.LookupHostname(node_name)
1220
    if not dns_data:
1221
      raise errors.OpPrereqError, ("Node %s is not resolvable" % node_name)
1222

    
1223
    node = dns_data['hostname']
1224
    primary_ip = self.op.primary_ip = dns_data['ip']
1225
    secondary_ip = getattr(self.op, "secondary_ip", None)
1226
    if secondary_ip is None:
1227
      secondary_ip = primary_ip
1228
    if not utils.IsValidIP(secondary_ip):
1229
      raise errors.OpPrereqError, ("Invalid secondary IP given")
1230
    self.op.secondary_ip = secondary_ip
1231
    node_list = cfg.GetNodeList()
1232
    if node in node_list:
1233
      raise errors.OpPrereqError, ("Node %s is already in the configuration"
1234
                                   % node)
1235

    
1236
    for existing_node_name in node_list:
1237
      existing_node = cfg.GetNodeInfo(existing_node_name)
1238
      if (existing_node.primary_ip == primary_ip or
1239
          existing_node.secondary_ip == primary_ip or
1240
          existing_node.primary_ip == secondary_ip or
1241
          existing_node.secondary_ip == secondary_ip):
1242
        raise errors.OpPrereqError, ("New node ip address(es) conflict with"
1243
                                     " existing node %s" % existing_node.name)
1244

    
1245
    # check that the type of the node (single versus dual homed) is the
1246
    # same as for the master
1247
    myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1248
    master_singlehomed = myself.secondary_ip == myself.primary_ip
1249
    newbie_singlehomed = secondary_ip == primary_ip
1250
    if master_singlehomed != newbie_singlehomed:
1251
      if master_singlehomed:
1252
        raise errors.OpPrereqError, ("The master has no private ip but the"
1253
                                     " new node has one")
1254
      else:
1255
        raise errors.OpPrereqError ("The master has a private ip but the"
1256
                                    " new node doesn't have one")
1257

    
1258
    # checks reachablity
1259
    command = ["fping", "-q", primary_ip]
1260
    result = utils.RunCmd(command)
1261
    if result.failed:
1262
      raise errors.OpPrereqError, ("Node not reachable by ping")
1263

    
1264
    if not newbie_singlehomed:
1265
      # check reachability from my secondary ip to newbie's secondary ip
1266
      command = ["fping", "-S%s" % myself.secondary_ip, "-q", secondary_ip]
1267
      result = utils.RunCmd(command)
1268
      if result.failed:
1269
        raise errors.OpPrereqError, ("Node secondary ip not reachable by ping")
1270

    
1271
    self.new_node = objects.Node(name=node,
1272
                                 primary_ip=primary_ip,
1273
                                 secondary_ip=secondary_ip)
1274

    
1275
  def Exec(self, feedback_fn):
1276
    """Adds the new node to the cluster.
1277

1278
    """
1279
    new_node = self.new_node
1280
    node = new_node.name
1281

    
1282
    # set up inter-node password and certificate and restarts the node daemon
1283
    gntpass = self.sstore.GetNodeDaemonPassword()
1284
    if not re.match('^[a-zA-Z0-9.]{1,64}$', gntpass):
1285
      raise errors.OpExecError, ("ganeti password corruption detected")
1286
    f = open(constants.SSL_CERT_FILE)
1287
    try:
1288
      gntpem = f.read(8192)
1289
    finally:
1290
      f.close()
1291
    # in the base64 pem encoding, neither '!' nor '.' are valid chars,
1292
    # so we use this to detect an invalid certificate; as long as the
1293
    # cert doesn't contain this, the here-document will be correctly
1294
    # parsed by the shell sequence below
1295
    if re.search('^!EOF\.', gntpem, re.MULTILINE):
1296
      raise errors.OpExecError, ("invalid PEM encoding in the SSL certificate")
1297
    if not gntpem.endswith("\n"):
1298
      raise errors.OpExecError, ("PEM must end with newline")
1299
    logger.Info("copy cluster pass to %s and starting the node daemon" % node)
1300

    
1301
    # remove first the root's known_hosts file
1302
    utils.RemoveFile("/root/.ssh/known_hosts")
1303
    # and then connect with ssh to set password and start ganeti-noded
1304
    # note that all the below variables are sanitized at this point,
1305
    # either by being constants or by the checks above
1306
    ss = self.sstore
1307
    mycommand = ("umask 077 && "
1308
                 "echo '%s' > '%s' && "
1309
                 "cat > '%s' << '!EOF.' && \n"
1310
                 "%s!EOF.\n%s restart" %
1311
                 (gntpass, ss.KeyToFilename(ss.SS_NODED_PASS),
1312
                  constants.SSL_CERT_FILE, gntpem,
1313
                  constants.NODE_INITD_SCRIPT))
1314

    
1315
    result = ssh.SSHCall(node, 'root', mycommand, batch=False, ask_key=True)
1316
    if result.failed:
1317
      raise errors.OpExecError, ("Remote command on node %s, error: %s,"
1318
                                 " output: %s" %
1319
                                 (node, result.fail_reason, result.output))
1320

    
1321
    # check connectivity
1322
    time.sleep(4)
1323

    
1324
    result = rpc.call_version([node])[node]
1325
    if result:
1326
      if constants.PROTOCOL_VERSION == result:
1327
        logger.Info("communication to node %s fine, sw version %s match" %
1328
                    (node, result))
1329
      else:
1330
        raise errors.OpExecError, ("Version mismatch master version %s,"
1331
                                   " node version %s" %
1332
                                   (constants.PROTOCOL_VERSION, result))
1333
    else:
1334
      raise errors.OpExecError, ("Cannot get version from the new node")
1335

    
1336
    # setup ssh on node
1337
    logger.Info("copy ssh key to node %s" % node)
1338
    keyarray = []
1339
    keyfiles = ["/etc/ssh/ssh_host_dsa_key", "/etc/ssh/ssh_host_dsa_key.pub",
1340
                "/etc/ssh/ssh_host_rsa_key", "/etc/ssh/ssh_host_rsa_key.pub",
1341
                "/root/.ssh/id_dsa", "/root/.ssh/id_dsa.pub"]
1342

    
1343
    for i in keyfiles:
1344
      f = open(i, 'r')
1345
      try:
1346
        keyarray.append(f.read())
1347
      finally:
1348
        f.close()
1349

    
1350
    result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1351
                               keyarray[3], keyarray[4], keyarray[5])
1352

    
1353
    if not result:
1354
      raise errors.OpExecError, ("Cannot transfer ssh keys to the new node")
1355

    
1356
    # Add node to our /etc/hosts, and add key to known_hosts
1357
    _UpdateEtcHosts(new_node.name, new_node.primary_ip)
1358
    _UpdateKnownHosts(new_node.name, new_node.primary_ip,
1359
                      self.cfg.GetHostKey())
1360

    
1361
    if new_node.secondary_ip != new_node.primary_ip:
1362
      result = ssh.SSHCall(node, "root",
1363
                           "fping -S 127.0.0.1 -q %s" % new_node.secondary_ip)
1364
      if result.failed:
1365
        raise errors.OpExecError, ("Node claims it doesn't have the"
1366
                                   " secondary ip you gave (%s).\n"
1367
                                   "Please fix and re-run this command." %
1368
                                   new_node.secondary_ip)
1369

    
1370
    # Distribute updated /etc/hosts and known_hosts to all nodes,
1371
    # including the node just added
1372
    myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1373
    dist_nodes = self.cfg.GetNodeList() + [node]
1374
    if myself.name in dist_nodes:
1375
      dist_nodes.remove(myself.name)
1376

    
1377
    logger.Debug("Copying hosts and known_hosts to all nodes")
1378
    for fname in ("/etc/hosts", "/etc/ssh/ssh_known_hosts"):
1379
      result = rpc.call_upload_file(dist_nodes, fname)
1380
      for to_node in dist_nodes:
1381
        if not result[to_node]:
1382
          logger.Error("copy of file %s to node %s failed" %
1383
                       (fname, to_node))
1384

    
1385
    to_copy = ss.GetFileList()
1386
    for fname in to_copy:
1387
      if not ssh.CopyFileToNode(node, fname):
1388
        logger.Error("could not copy file %s to node %s" % (fname, node))
1389

    
1390
    logger.Info("adding node %s to cluster.conf" % node)
1391
    self.cfg.AddNode(new_node)
1392

    
1393

    
1394
class LUMasterFailover(LogicalUnit):
1395
  """Failover the master node to the current node.
1396

1397
  This is a special LU in that it must run on a non-master node.
1398

1399
  """
1400
  HPATH = "master-failover"
1401
  HTYPE = constants.HTYPE_CLUSTER
1402
  REQ_MASTER = False
1403
  _OP_REQP = []
1404

    
1405
  def BuildHooksEnv(self):
1406
    """Build hooks env.
1407

1408
    This will run on the new master only in the pre phase, and on all
1409
    the nodes in the post phase.
1410

1411
    """
1412
    env = {
1413
      "NEW_MASTER": self.new_master,
1414
      "OLD_MASTER": self.old_master,
1415
      }
1416
    return env, [self.new_master], self.cfg.GetNodeList()
1417

    
1418
  def CheckPrereq(self):
1419
    """Check prerequisites.
1420

1421
    This checks that we are not already the master.
1422

1423
    """
1424
    self.new_master = socket.gethostname()
1425

    
1426
    self.old_master = self.sstore.GetMasterNode()
1427

    
1428
    if self.old_master == self.new_master:
1429
      raise errors.OpPrereqError, ("This commands must be run on the node"
1430
                                   " where you want the new master to be.\n"
1431
                                   "%s is already the master" %
1432
                                   self.old_master)
1433

    
1434
  def Exec(self, feedback_fn):
1435
    """Failover the master node.
1436

1437
    This command, when run on a non-master node, will cause the current
1438
    master to cease being master, and the non-master to become new
1439
    master.
1440

1441
    """
1442
    #TODO: do not rely on gethostname returning the FQDN
1443
    logger.Info("setting master to %s, old master: %s" %
1444
                (self.new_master, self.old_master))
1445

    
1446
    if not rpc.call_node_stop_master(self.old_master):
1447
      logger.Error("could disable the master role on the old master"
1448
                   " %s, please disable manually" % self.old_master)
1449

    
1450
    ss = self.sstore
1451
    ss.SetKey(ss.SS_MASTER_NODE, self.new_master)
1452
    if not rpc.call_upload_file(self.cfg.GetNodeList(),
1453
                                ss.KeyToFilename(ss.SS_MASTER_NODE)):
1454
      logger.Error("could not distribute the new simple store master file"
1455
                   " to the other nodes, please check.")
1456

    
1457
    if not rpc.call_node_start_master(self.new_master):
1458
      logger.Error("could not start the master role on the new master"
1459
                   " %s, please check" % self.new_master)
1460
      feedback_fn("Error in activating the master IP on the new master,\n"
1461
                  "please fix manually.")
1462

    
1463

    
1464

    
1465
class LUQueryClusterInfo(NoHooksLU):
1466
  """Query cluster configuration.
1467

1468
  """
1469
  _OP_REQP = []
1470

    
1471
  def CheckPrereq(self):
1472
    """No prerequsites needed for this LU.
1473

1474
    """
1475
    pass
1476

    
1477
  def Exec(self, feedback_fn):
1478
    """Return cluster config.
1479

1480
    """
1481
    instances = [self.cfg.GetInstanceInfo(name)
1482
                 for name in self.cfg.GetInstanceList()]
1483
    result = {
1484
      "name": self.cfg.GetClusterName(),
1485
      "software_version": constants.RELEASE_VERSION,
1486
      "protocol_version": constants.PROTOCOL_VERSION,
1487
      "config_version": constants.CONFIG_VERSION,
1488
      "os_api_version": constants.OS_API_VERSION,
1489
      "export_version": constants.EXPORT_VERSION,
1490
      "master": self.sstore.GetMasterNode(),
1491
      "architecture": (platform.architecture()[0], platform.machine()),
1492
      "instances": [(instance.name, instance.primary_node)
1493
                    for instance in instances],
1494
      "nodes": self.cfg.GetNodeList(),
1495
      }
1496

    
1497
    return result
1498

    
1499

    
1500
class LUClusterCopyFile(NoHooksLU):
1501
  """Copy file to cluster.
1502

1503
  """
1504
  _OP_REQP = ["nodes", "filename"]
1505

    
1506
  def CheckPrereq(self):
1507
    """Check prerequisites.
1508

1509
    It should check that the named file exists and that the given list
1510
    of nodes is valid.
1511

1512
    """
1513
    if not os.path.exists(self.op.filename):
1514
      raise errors.OpPrereqError("No such filename '%s'" % self.op.filename)
1515

    
1516
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1517

    
1518
  def Exec(self, feedback_fn):
1519
    """Copy a file from master to some nodes.
1520

1521
    Args:
1522
      opts - class with options as members
1523
      args - list containing a single element, the file name
1524
    Opts used:
1525
      nodes - list containing the name of target nodes; if empty, all nodes
1526

1527
    """
1528
    filename = self.op.filename
1529

    
1530
    myname = socket.gethostname()
1531

    
1532
    for node in self.nodes:
1533
      if node == myname:
1534
        continue
1535
      if not ssh.CopyFileToNode(node, filename):
1536
        logger.Error("Copy of file %s to node %s failed" % (filename, node))
1537

    
1538

    
1539
class LUDumpClusterConfig(NoHooksLU):
1540
  """Return a text-representation of the cluster-config.
1541

1542
  """
1543
  _OP_REQP = []
1544

    
1545
  def CheckPrereq(self):
1546
    """No prerequisites.
1547

1548
    """
1549
    pass
1550

    
1551
  def Exec(self, feedback_fn):
1552
    """Dump a representation of the cluster config to the standard output.
1553

1554
    """
1555
    return self.cfg.DumpConfig()
1556

    
1557

    
1558
class LURunClusterCommand(NoHooksLU):
1559
  """Run a command on some nodes.
1560

1561
  """
1562
  _OP_REQP = ["command", "nodes"]
1563

    
1564
  def CheckPrereq(self):
1565
    """Check prerequisites.
1566

1567
    It checks that the given list of nodes is valid.
1568

1569
    """
1570
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1571

    
1572
  def Exec(self, feedback_fn):
1573
    """Run a command on some nodes.
1574

1575
    """
1576
    data = []
1577
    for node in self.nodes:
1578
      result = utils.RunCmd(["ssh", node.name, self.op.command])
1579
      data.append((node.name, result.cmd, result.output, result.exit_code))
1580

    
1581
    return data
1582

    
1583

    
1584
class LUActivateInstanceDisks(NoHooksLU):
1585
  """Bring up an instance's disks.
1586

1587
  """
1588
  _OP_REQP = ["instance_name"]
1589

    
1590
  def CheckPrereq(self):
1591
    """Check prerequisites.
1592

1593
    This checks that the instance is in the cluster.
1594

1595
    """
1596
    instance = self.cfg.GetInstanceInfo(
1597
      self.cfg.ExpandInstanceName(self.op.instance_name))
1598
    if instance is None:
1599
      raise errors.OpPrereqError, ("Instance '%s' not known" %
1600
                                   self.op.instance_name)
1601
    self.instance = instance
1602

    
1603

    
1604
  def Exec(self, feedback_fn):
1605
    """Activate the disks.
1606

1607
    """
1608
    disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
1609
    if not disks_ok:
1610
      raise errors.OpExecError, ("Cannot activate block devices")
1611

    
1612
    return disks_info
1613

    
1614

    
1615
def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
1616
  """Prepare the block devices for an instance.
1617

1618
  This sets up the block devices on all nodes.
1619

1620
  Args:
1621
    instance: a ganeti.objects.Instance object
1622
    ignore_secondaries: if true, errors on secondary nodes won't result
1623
                        in an error return from the function
1624

1625
  Returns:
1626
    false if the operation failed
1627
    list of (host, instance_visible_name, node_visible_name) if the operation
1628
         suceeded with the mapping from node devices to instance devices
1629
  """
1630
  device_info = []
1631
  disks_ok = True
1632
  for inst_disk in instance.disks:
1633
    master_result = None
1634
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1635
      cfg.SetDiskID(node_disk, node)
1636
      is_primary = node == instance.primary_node
1637
      result = rpc.call_blockdev_assemble(node, node_disk, is_primary)
1638
      if not result:
1639
        logger.Error("could not prepare block device %s on node %s (is_pri"
1640
                     "mary=%s)" % (inst_disk.iv_name, node, is_primary))
1641
        if is_primary or not ignore_secondaries:
1642
          disks_ok = False
1643
      if is_primary:
1644
        master_result = result
1645
    device_info.append((instance.primary_node, inst_disk.iv_name,
1646
                        master_result))
1647

    
1648
  return disks_ok, device_info
1649

    
1650

    
1651
class LUDeactivateInstanceDisks(NoHooksLU):
1652
  """Shutdown an instance's disks.
1653

1654
  """
1655
  _OP_REQP = ["instance_name"]
1656

    
1657
  def CheckPrereq(self):
1658
    """Check prerequisites.
1659

1660
    This checks that the instance is in the cluster.
1661

1662
    """
1663
    instance = self.cfg.GetInstanceInfo(
1664
      self.cfg.ExpandInstanceName(self.op.instance_name))
1665
    if instance is None:
1666
      raise errors.OpPrereqError, ("Instance '%s' not known" %
1667
                                   self.op.instance_name)
1668
    self.instance = instance
1669

    
1670
  def Exec(self, feedback_fn):
1671
    """Deactivate the disks
1672

1673
    """
1674
    instance = self.instance
1675
    ins_l = rpc.call_instance_list([instance.primary_node])
1676
    ins_l = ins_l[instance.primary_node]
1677
    if not type(ins_l) is list:
1678
      raise errors.OpExecError, ("Can't contact node '%s'" %
1679
                                 instance.primary_node)
1680

    
1681
    if self.instance.name in ins_l:
1682
      raise errors.OpExecError, ("Instance is running, can't shutdown"
1683
                                 " block devices.")
1684

    
1685
    _ShutdownInstanceDisks(instance, self.cfg)
1686

    
1687

    
1688
def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
1689
  """Shutdown block devices of an instance.
1690

1691
  This does the shutdown on all nodes of the instance.
1692

1693
  If the ignore_primary is false, errors on the primary node are
1694
  ignored.
1695

1696
  """
1697
  result = True
1698
  for disk in instance.disks:
1699
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
1700
      cfg.SetDiskID(top_disk, node)
1701
      if not rpc.call_blockdev_shutdown(node, top_disk):
1702
        logger.Error("could not shutdown block device %s on node %s" %
1703
                     (disk.iv_name, node))
1704
        if not ignore_primary or node != instance.primary_node:
1705
          result = False
1706
  return result
1707

    
1708

    
1709
class LUStartupInstance(LogicalUnit):
1710
  """Starts an instance.
1711

1712
  """
1713
  HPATH = "instance-start"
1714
  HTYPE = constants.HTYPE_INSTANCE
1715
  _OP_REQP = ["instance_name", "force"]
1716

    
1717
  def BuildHooksEnv(self):
1718
    """Build hooks env.
1719

1720
    This runs on master, primary and secondary nodes of the instance.
1721

1722
    """
1723
    env = {
1724
      "INSTANCE_NAME": self.op.instance_name,
1725
      "INSTANCE_PRIMARY": self.instance.primary_node,
1726
      "INSTANCE_SECONDARIES": " ".join(self.instance.secondary_nodes),
1727
      "FORCE": self.op.force,
1728
      }
1729
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
1730
          list(self.instance.secondary_nodes))
1731
    return env, nl, nl
1732

    
1733
  def CheckPrereq(self):
1734
    """Check prerequisites.
1735

1736
    This checks that the instance is in the cluster.
1737

1738
    """
1739
    instance = self.cfg.GetInstanceInfo(
1740
      self.cfg.ExpandInstanceName(self.op.instance_name))
1741
    if instance is None:
1742
      raise errors.OpPrereqError, ("Instance '%s' not known" %
1743
                                   self.op.instance_name)
1744

    
1745
    # check bridges existance
1746
    brlist = [nic.bridge for nic in instance.nics]
1747
    if not rpc.call_bridges_exist(instance.primary_node, brlist):
1748
      raise errors.OpPrereqError, ("one or more target bridges %s does not"
1749
                                   " exist on destination node '%s'" %
1750
                                   (brlist, instance.primary_node))
1751

    
1752
    self.instance = instance
1753
    self.op.instance_name = instance.name
1754

    
1755
  def Exec(self, feedback_fn):
1756
    """Start the instance.
1757

1758
    """
1759
    instance = self.instance
1760
    force = self.op.force
1761
    extra_args = getattr(self.op, "extra_args", "")
1762

    
1763
    node_current = instance.primary_node
1764

    
1765
    nodeinfo = rpc.call_node_info([node_current], self.cfg.GetVGName())
1766
    if not nodeinfo:
1767
      raise errors.OpExecError, ("Could not contact node %s for infos" %
1768
                                 (node_current))
1769

    
1770
    freememory = nodeinfo[node_current]['memory_free']
1771
    memory = instance.memory
1772
    if memory > freememory:
1773
      raise errors.OpExecError, ("Not enough memory to start instance"
1774
                                 " %s on node %s"
1775
                                 " needed %s MiB, available %s MiB" %
1776
                                 (instance.name, node_current, memory,
1777
                                  freememory))
1778

    
1779
    disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
1780
                                             ignore_secondaries=force)
1781
    if not disks_ok:
1782
      _ShutdownInstanceDisks(instance, self.cfg)
1783
      if not force:
1784
        logger.Error("If the message above refers to a secondary node,"
1785
                     " you can retry the operation using '--force'.")
1786
      raise errors.OpExecError, ("Disk consistency error")
1787

    
1788
    if not rpc.call_instance_start(node_current, instance, extra_args):
1789
      _ShutdownInstanceDisks(instance, self.cfg)
1790
      raise errors.OpExecError, ("Could not start instance")
1791

    
1792
    self.cfg.MarkInstanceUp(instance.name)
1793

    
1794

    
1795
class LUShutdownInstance(LogicalUnit):
1796
  """Shutdown an instance.
1797

1798
  """
1799
  HPATH = "instance-stop"
1800
  HTYPE = constants.HTYPE_INSTANCE
1801
  _OP_REQP = ["instance_name"]
1802

    
1803
  def BuildHooksEnv(self):
1804
    """Build hooks env.
1805

1806
    This runs on master, primary and secondary nodes of the instance.
1807

1808
    """
1809
    env = {
1810
      "INSTANCE_NAME": self.op.instance_name,
1811
      "INSTANCE_PRIMARY": self.instance.primary_node,
1812
      "INSTANCE_SECONDARIES": " ".join(self.instance.secondary_nodes),
1813
      }
1814
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
1815
          list(self.instance.secondary_nodes))
1816
    return env, nl, nl
1817

    
1818
  def CheckPrereq(self):
1819
    """Check prerequisites.
1820

1821
    This checks that the instance is in the cluster.
1822

1823
    """
1824
    instance = self.cfg.GetInstanceInfo(
1825
      self.cfg.ExpandInstanceName(self.op.instance_name))
1826
    if instance is None:
1827
      raise errors.OpPrereqError, ("Instance '%s' not known" %
1828
                                   self.op.instance_name)
1829
    self.instance = instance
1830

    
1831
  def Exec(self, feedback_fn):
1832
    """Shutdown the instance.
1833

1834
    """
1835
    instance = self.instance
1836
    node_current = instance.primary_node
1837
    if not rpc.call_instance_shutdown(node_current, instance):
1838
      logger.Error("could not shutdown instance")
1839

    
1840
    self.cfg.MarkInstanceDown(instance.name)
1841
    _ShutdownInstanceDisks(instance, self.cfg)
1842

    
1843

    
1844
class LURemoveInstance(LogicalUnit):
1845
  """Remove an instance.
1846

1847
  """
1848
  HPATH = "instance-remove"
1849
  HTYPE = constants.HTYPE_INSTANCE
1850
  _OP_REQP = ["instance_name"]
1851

    
1852
  def BuildHooksEnv(self):
1853
    """Build hooks env.
1854

1855
    This runs on master, primary and secondary nodes of the instance.
1856

1857
    """
1858
    env = {
1859
      "INSTANCE_NAME": self.op.instance_name,
1860
      "INSTANCE_PRIMARY": self.instance.primary_node,
1861
      "INSTANCE_SECONDARIES": " ".join(self.instance.secondary_nodes),
1862
      }
1863
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
1864
          list(self.instance.secondary_nodes))
1865
    return env, nl, nl
1866

    
1867
  def CheckPrereq(self):
1868
    """Check prerequisites.
1869

1870
    This checks that the instance is in the cluster.
1871

1872
    """
1873
    instance = self.cfg.GetInstanceInfo(
1874
      self.cfg.ExpandInstanceName(self.op.instance_name))
1875
    if instance is None:
1876
      raise errors.OpPrereqError, ("Instance '%s' not known" %
1877
                                   self.op.instance_name)
1878
    self.instance = instance
1879

    
1880
  def Exec(self, feedback_fn):
1881
    """Remove the instance.
1882

1883
    """
1884
    instance = self.instance
1885
    logger.Info("shutting down instance %s on node %s" %
1886
                (instance.name, instance.primary_node))
1887

    
1888
    if not rpc.call_instance_shutdown(instance.primary_node, instance):
1889
      raise errors.OpExecError, ("Could not shutdown instance %s on node %s" %
1890
                                 (instance.name, instance.primary_node))
1891

    
1892
    logger.Info("removing block devices for instance %s" % instance.name)
1893

    
1894
    _RemoveDisks(instance, self.cfg)
1895

    
1896
    logger.Info("removing instance %s out of cluster config" % instance.name)
1897

    
1898
    self.cfg.RemoveInstance(instance.name)
1899

    
1900

    
1901
class LUQueryInstances(NoHooksLU):
1902
  """Logical unit for querying instances.
1903

1904
  """
1905
  _OP_REQP = ["output_fields"]
1906

    
1907
  def CheckPrereq(self):
1908
    """Check prerequisites.
1909

1910
    This checks that the fields required are valid output fields.
1911

1912
    """
1913
    self.dynamic_fields = frozenset(["oper_state", "oper_ram"])
1914
    _CheckOutputFields(static=["name", "os", "pnode", "snodes",
1915
                               "admin_state", "admin_ram",
1916
                               "disk_template", "ip", "mac", "bridge"],
1917
                       dynamic=self.dynamic_fields,
1918
                       selected=self.op.output_fields)
1919

    
1920
  def Exec(self, feedback_fn):
1921
    """Computes the list of nodes and their attributes.
1922

1923
    """
1924
    instance_names = utils.NiceSort(self.cfg.GetInstanceList())
1925
    instance_list = [self.cfg.GetInstanceInfo(iname) for iname
1926
                     in instance_names]
1927

    
1928
    # begin data gathering
1929

    
1930
    nodes = frozenset([inst.primary_node for inst in instance_list])
1931

    
1932
    bad_nodes = []
1933
    if self.dynamic_fields.intersection(self.op.output_fields):
1934
      live_data = {}
1935
      node_data = rpc.call_all_instances_info(nodes)
1936
      for name in nodes:
1937
        result = node_data[name]
1938
        if result:
1939
          live_data.update(result)
1940
        elif result == False:
1941
          bad_nodes.append(name)
1942
        # else no instance is alive
1943
    else:
1944
      live_data = dict([(name, {}) for name in instance_names])
1945

    
1946
    # end data gathering
1947

    
1948
    output = []
1949
    for instance in instance_list:
1950
      iout = []
1951
      for field in self.op.output_fields:
1952
        if field == "name":
1953
          val = instance.name
1954
        elif field == "os":
1955
          val = instance.os
1956
        elif field == "pnode":
1957
          val = instance.primary_node
1958
        elif field == "snodes":
1959
          val = ",".join(instance.secondary_nodes) or "-"
1960
        elif field == "admin_state":
1961
          if instance.status == "down":
1962
            val = "no"
1963
          else:
1964
            val = "yes"
1965
        elif field == "oper_state":
1966
          if instance.primary_node in bad_nodes:
1967
            val = "(node down)"
1968
          else:
1969
            if live_data.get(instance.name):
1970
              val = "running"
1971
            else:
1972
              val = "stopped"
1973
        elif field == "admin_ram":
1974
          val = instance.memory
1975
        elif field == "oper_ram":
1976
          if instance.primary_node in bad_nodes:
1977
            val = "(node down)"
1978
          elif instance.name in live_data:
1979
            val = live_data[instance.name].get("memory", "?")
1980
          else:
1981
            val = "-"
1982
        elif field == "disk_template":
1983
          val = instance.disk_template
1984
        elif field == "ip":
1985
          val = instance.nics[0].ip
1986
        elif field == "bridge":
1987
          val = instance.nics[0].bridge
1988
        elif field == "mac":
1989
          val = instance.nics[0].mac
1990
        else:
1991
          raise errors.ParameterError, field
1992
        val = str(val)
1993
        iout.append(val)
1994
      output.append(iout)
1995

    
1996
    return output
1997

    
1998

    
1999
class LUFailoverInstance(LogicalUnit):
2000
  """Failover an instance.
2001

2002
  """
2003
  HPATH = "instance-failover"
2004
  HTYPE = constants.HTYPE_INSTANCE
2005
  _OP_REQP = ["instance_name", "ignore_consistency"]
2006

    
2007
  def BuildHooksEnv(self):
2008
    """Build hooks env.
2009

2010
    This runs on master, primary and secondary nodes of the instance.
2011

2012
    """
2013
    env = {
2014
      "INSTANCE_NAME": self.op.instance_name,
2015
      "INSTANCE_PRIMARY": self.instance.primary_node,
2016
      "INSTANCE_SECONDARIES": " ".join(self.instance.secondary_nodes),
2017
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2018
      }
2019
    nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2020
    return env, nl, nl
2021

    
2022
  def CheckPrereq(self):
2023
    """Check prerequisites.
2024

2025
    This checks that the instance is in the cluster.
2026

2027
    """
2028
    instance = self.cfg.GetInstanceInfo(
2029
      self.cfg.ExpandInstanceName(self.op.instance_name))
2030
    if instance is None:
2031
      raise errors.OpPrereqError, ("Instance '%s' not known" %
2032
                                   self.op.instance_name)
2033

    
2034
    # check memory requirements on the secondary node
2035
    target_node = instance.secondary_nodes[0]
2036
    nodeinfo = rpc.call_node_info([target_node], self.cfg.GetVGName())
2037
    info = nodeinfo.get(target_node, None)
2038
    if not info:
2039
      raise errors.OpPrereqError, ("Cannot get current information"
2040
                                   " from node '%s'" % nodeinfo)
2041
    if instance.memory > info['memory_free']:
2042
      raise errors.OpPrereqError, ("Not enough memory on target node %s."
2043
                                   " %d MB available, %d MB required" %
2044
                                   (target_node, info['memory_free'],
2045
                                    instance.memory))
2046

    
2047
    # check bridge existance
2048
    brlist = [nic.bridge for nic in instance.nics]
2049
    if not rpc.call_bridges_exist(instance.primary_node, brlist):
2050
      raise errors.OpPrereqError, ("one or more target bridges %s does not"
2051
                                   " exist on destination node '%s'" %
2052
                                   (brlist, instance.primary_node))
2053

    
2054
    self.instance = instance
2055

    
2056
  def Exec(self, feedback_fn):
2057
    """Failover an instance.
2058

2059
    The failover is done by shutting it down on its present node and
2060
    starting it on the secondary.
2061

2062
    """
2063
    instance = self.instance
2064

    
2065
    source_node = instance.primary_node
2066
    target_node = instance.secondary_nodes[0]
2067

    
2068
    feedback_fn("* checking disk consistency between source and target")
2069
    for dev in instance.disks:
2070
      # for remote_raid1, these are md over drbd
2071
      if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2072
        if not self.op.ignore_consistency:
2073
          raise errors.OpExecError, ("Disk %s is degraded on target node,"
2074
                                     " aborting failover." % dev.iv_name)
2075

    
2076
    feedback_fn("* checking target node resource availability")
2077
    nodeinfo = rpc.call_node_info([target_node], self.cfg.GetVGName())
2078

    
2079
    if not nodeinfo:
2080
      raise errors.OpExecError, ("Could not contact target node %s." %
2081
                                 target_node)
2082

    
2083
    free_memory = int(nodeinfo[target_node]['memory_free'])
2084
    memory = instance.memory
2085
    if memory > free_memory:
2086
      raise errors.OpExecError, ("Not enough memory to create instance %s on"
2087
                                 " node %s. needed %s MiB, available %s MiB" %
2088
                                 (instance.name, target_node, memory,
2089
                                  free_memory))
2090

    
2091
    feedback_fn("* shutting down instance on source node")
2092
    logger.Info("Shutting down instance %s on node %s" %
2093
                (instance.name, source_node))
2094

    
2095
    if not rpc.call_instance_shutdown(source_node, instance):
2096
      logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2097
                   " anyway. Please make sure node %s is down"  %
2098
                   (instance.name, source_node, source_node))
2099

    
2100
    feedback_fn("* deactivating the instance's disks on source node")
2101
    if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2102
      raise errors.OpExecError, ("Can't shut down the instance's disks.")
2103

    
2104
    instance.primary_node = target_node
2105
    # distribute new instance config to the other nodes
2106
    self.cfg.AddInstance(instance)
2107

    
2108
    feedback_fn("* activating the instance's disks on target node")
2109
    logger.Info("Starting instance %s on node %s" %
2110
                (instance.name, target_node))
2111

    
2112
    disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2113
                                             ignore_secondaries=True)
2114
    if not disks_ok:
2115
      _ShutdownInstanceDisks(instance, self.cfg)
2116
      raise errors.OpExecError, ("Can't activate the instance's disks")
2117

    
2118
    feedback_fn("* starting the instance on the target node")
2119
    if not rpc.call_instance_start(target_node, instance, None):
2120
      _ShutdownInstanceDisks(instance, self.cfg)
2121
      raise errors.OpExecError("Could not start instance %s on node %s." %
2122
                               (instance.name, target_node))
2123

    
2124

    
2125
def _CreateBlockDevOnPrimary(cfg, node, device):
2126
  """Create a tree of block devices on the primary node.
2127

2128
  This always creates all devices.
2129

2130
  """
2131
  if device.children:
2132
    for child in device.children:
2133
      if not _CreateBlockDevOnPrimary(cfg, node, child):
2134
        return False
2135

    
2136
  cfg.SetDiskID(device, node)
2137
  new_id = rpc.call_blockdev_create(node, device, device.size, True)
2138
  if not new_id:
2139
    return False
2140
  if device.physical_id is None:
2141
    device.physical_id = new_id
2142
  return True
2143

    
2144

    
2145
def _CreateBlockDevOnSecondary(cfg, node, device, force):
2146
  """Create a tree of block devices on a secondary node.
2147

2148
  If this device type has to be created on secondaries, create it and
2149
  all its children.
2150

2151
  If not, just recurse to children keeping the same 'force' value.
2152

2153
  """
2154
  if device.CreateOnSecondary():
2155
    force = True
2156
  if device.children:
2157
    for child in device.children:
2158
      if not _CreateBlockDevOnSecondary(cfg, node, child, force):
2159
        return False
2160

    
2161
  if not force:
2162
    return True
2163
  cfg.SetDiskID(device, node)
2164
  new_id = rpc.call_blockdev_create(node, device, device.size, False)
2165
  if not new_id:
2166
    return False
2167
  if device.physical_id is None:
2168
    device.physical_id = new_id
2169
  return True
2170

    
2171

    
2172
def _GenerateMDDRBDBranch(cfg, vgname, primary, secondary, size, base):
2173
  """Generate a drbd device complete with its children.
2174

2175
  """
2176
  port = cfg.AllocatePort()
2177
  base = "%s_%s" % (base, port)
2178
  dev_data = objects.Disk(dev_type="lvm", size=size,
2179
                          logical_id=(vgname, "%s.data" % base))
2180
  dev_meta = objects.Disk(dev_type="lvm", size=128,
2181
                          logical_id=(vgname, "%s.meta" % base))
2182
  drbd_dev = objects.Disk(dev_type="drbd", size=size,
2183
                          logical_id = (primary, secondary, port),
2184
                          children = [dev_data, dev_meta])
2185
  return drbd_dev
2186

    
2187

    
2188
def _GenerateDiskTemplate(cfg, vgname, template_name,
2189
                          instance_name, primary_node,
2190
                          secondary_nodes, disk_sz, swap_sz):
2191
  """Generate the entire disk layout for a given template type.
2192

2193
  """
2194
  #TODO: compute space requirements
2195

    
2196
  if template_name == "diskless":
2197
    disks = []
2198
  elif template_name == "plain":
2199
    if len(secondary_nodes) != 0:
2200
      raise errors.ProgrammerError("Wrong template configuration")
2201
    sda_dev = objects.Disk(dev_type="lvm", size=disk_sz,
2202
                           logical_id=(vgname, "%s.os" % instance_name),
2203
                           iv_name = "sda")
2204
    sdb_dev = objects.Disk(dev_type="lvm", size=swap_sz,
2205
                           logical_id=(vgname, "%s.swap" % instance_name),
2206
                           iv_name = "sdb")
2207
    disks = [sda_dev, sdb_dev]
2208
  elif template_name == "local_raid1":
2209
    if len(secondary_nodes) != 0:
2210
      raise errors.ProgrammerError("Wrong template configuration")
2211
    sda_dev_m1 = objects.Disk(dev_type="lvm", size=disk_sz,
2212
                              logical_id=(vgname, "%s.os_m1" % instance_name))
2213
    sda_dev_m2 = objects.Disk(dev_type="lvm", size=disk_sz,
2214
                              logical_id=(vgname, "%s.os_m2" % instance_name))
2215
    md_sda_dev = objects.Disk(dev_type="md_raid1", iv_name = "sda",
2216
                              size=disk_sz,
2217
                              children = [sda_dev_m1, sda_dev_m2])
2218
    sdb_dev_m1 = objects.Disk(dev_type="lvm", size=swap_sz,
2219
                              logical_id=(vgname, "%s.swap_m1" %
2220
                                          instance_name))
2221
    sdb_dev_m2 = objects.Disk(dev_type="lvm", size=swap_sz,
2222
                              logical_id=(vgname, "%s.swap_m2" %
2223
                                          instance_name))
2224
    md_sdb_dev = objects.Disk(dev_type="md_raid1", iv_name = "sdb",
2225
                              size=swap_sz,
2226
                              children = [sdb_dev_m1, sdb_dev_m2])
2227
    disks = [md_sda_dev, md_sdb_dev]
2228
  elif template_name == "remote_raid1":
2229
    if len(secondary_nodes) != 1:
2230
      raise errors.ProgrammerError("Wrong template configuration")
2231
    remote_node = secondary_nodes[0]
2232
    drbd_sda_dev = _GenerateMDDRBDBranch(cfg, vgname,
2233
                                         primary_node, remote_node, disk_sz,
2234
                                         "%s-sda" % instance_name)
2235
    md_sda_dev = objects.Disk(dev_type="md_raid1", iv_name="sda",
2236
                              children = [drbd_sda_dev], size=disk_sz)
2237
    drbd_sdb_dev = _GenerateMDDRBDBranch(cfg, vgname,
2238
                                         primary_node, remote_node, swap_sz,
2239
                                         "%s-sdb" % instance_name)
2240
    md_sdb_dev = objects.Disk(dev_type="md_raid1", iv_name="sdb",
2241
                              children = [drbd_sdb_dev], size=swap_sz)
2242
    disks = [md_sda_dev, md_sdb_dev]
2243
  else:
2244
    raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
2245
  return disks
2246

    
2247

    
2248
def _CreateDisks(cfg, instance):
2249
  """Create all disks for an instance.
2250

2251
  This abstracts away some work from AddInstance.
2252

2253
  Args:
2254
    instance: the instance object
2255

2256
  Returns:
2257
    True or False showing the success of the creation process
2258

2259
  """
2260
  for device in instance.disks:
2261
    logger.Info("creating volume %s for instance %s" %
2262
              (device.iv_name, instance.name))
2263
    #HARDCODE
2264
    for secondary_node in instance.secondary_nodes:
2265
      if not _CreateBlockDevOnSecondary(cfg, secondary_node, device, False):
2266
        logger.Error("failed to create volume %s (%s) on secondary node %s!" %
2267
                     (device.iv_name, device, secondary_node))
2268
        return False
2269
    #HARDCODE
2270
    if not _CreateBlockDevOnPrimary(cfg, instance.primary_node, device):
2271
      logger.Error("failed to create volume %s on primary!" %
2272
                   device.iv_name)
2273
      return False
2274
  return True
2275

    
2276

    
2277
def _RemoveDisks(instance, cfg):
2278
  """Remove all disks for an instance.
2279

2280
  This abstracts away some work from `AddInstance()` and
2281
  `RemoveInstance()`. Note that in case some of the devices couldn't
2282
  be remove, the removal will continue with the other ones (compare
2283
  with `_CreateDisks()`).
2284

2285
  Args:
2286
    instance: the instance object
2287

2288
  Returns:
2289
    True or False showing the success of the removal proces
2290

2291
  """
2292
  logger.Info("removing block devices for instance %s" % instance.name)
2293

    
2294
  result = True
2295
  for device in instance.disks:
2296
    for node, disk in device.ComputeNodeTree(instance.primary_node):
2297
      cfg.SetDiskID(disk, node)
2298
      if not rpc.call_blockdev_remove(node, disk):
2299
        logger.Error("could not remove block device %s on node %s,"
2300
                     " continuing anyway" %
2301
                     (device.iv_name, node))
2302
        result = False
2303
  return result
2304

    
2305

    
2306
class LUCreateInstance(LogicalUnit):
2307
  """Create an instance.
2308

2309
  """
2310
  HPATH = "instance-add"
2311
  HTYPE = constants.HTYPE_INSTANCE
2312
  _OP_REQP = ["instance_name", "mem_size", "disk_size", "pnode",
2313
              "disk_template", "swap_size", "mode", "start", "vcpus",
2314
              "wait_for_sync"]
2315

    
2316
  def BuildHooksEnv(self):
2317
    """Build hooks env.
2318

2319
    This runs on master, primary and secondary nodes of the instance.
2320

2321
    """
2322
    env = {
2323
      "INSTANCE_NAME": self.op.instance_name,
2324
      "INSTANCE_PRIMARY": self.op.pnode,
2325
      "INSTANCE_SECONDARIES": " ".join(self.secondaries),
2326
      "DISK_TEMPLATE": self.op.disk_template,
2327
      "MEM_SIZE": self.op.mem_size,
2328
      "DISK_SIZE": self.op.disk_size,
2329
      "SWAP_SIZE": self.op.swap_size,
2330
      "VCPUS": self.op.vcpus,
2331
      "BRIDGE": self.op.bridge,
2332
      "INSTANCE_ADD_MODE": self.op.mode,
2333
      }
2334
    if self.op.mode == constants.INSTANCE_IMPORT:
2335
      env["SRC_NODE"] = self.op.src_node
2336
      env["SRC_PATH"] = self.op.src_path
2337
      env["SRC_IMAGE"] = self.src_image
2338
    if self.inst_ip:
2339
      env["INSTANCE_IP"] = self.inst_ip
2340

    
2341
    nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
2342
          self.secondaries)
2343
    return env, nl, nl
2344

    
2345

    
2346
  def CheckPrereq(self):
2347
    """Check prerequisites.
2348

2349
    """
2350
    if self.op.mode not in (constants.INSTANCE_CREATE,
2351
                            constants.INSTANCE_IMPORT):
2352
      raise errors.OpPrereqError, ("Invalid instance creation mode '%s'" %
2353
                                   self.op.mode)
2354

    
2355
    if self.op.mode == constants.INSTANCE_IMPORT:
2356
      src_node = getattr(self.op, "src_node", None)
2357
      src_path = getattr(self.op, "src_path", None)
2358
      if src_node is None or src_path is None:
2359
        raise errors.OpPrereqError, ("Importing an instance requires source"
2360
                                     " node and path options")
2361
      src_node_full = self.cfg.ExpandNodeName(src_node)
2362
      if src_node_full is None:
2363
        raise errors.OpPrereqError, ("Unknown source node '%s'" % src_node)
2364
      self.op.src_node = src_node = src_node_full
2365

    
2366
      if not os.path.isabs(src_path):
2367
        raise errors.OpPrereqError, ("The source path must be absolute")
2368

    
2369
      export_info = rpc.call_export_info(src_node, src_path)
2370

    
2371
      if not export_info:
2372
        raise errors.OpPrereqError, ("No export found in dir %s" % src_path)
2373

    
2374
      if not export_info.has_section(constants.INISECT_EXP):
2375
        raise errors.ProgrammerError, ("Corrupted export config")
2376

    
2377
      ei_version = export_info.get(constants.INISECT_EXP, 'version')
2378
      if (int(ei_version) != constants.EXPORT_VERSION):
2379
        raise errors.OpPrereqError, ("Wrong export version %s (wanted %d)" %
2380
                                     (ei_version, constants.EXPORT_VERSION))
2381

    
2382
      if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
2383
        raise errors.OpPrereqError, ("Can't import instance with more than"
2384
                                     " one data disk")
2385

    
2386
      # FIXME: are the old os-es, disk sizes, etc. useful?
2387
      self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
2388
      diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
2389
                                                         'disk0_dump'))
2390
      self.src_image = diskimage
2391
    else: # INSTANCE_CREATE
2392
      if getattr(self.op, "os_type", None) is None:
2393
        raise errors.OpPrereqError, ("No guest OS specified")
2394

    
2395
    # check primary node
2396
    pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
2397
    if pnode is None:
2398
      raise errors.OpPrereqError, ("Primary node '%s' is uknown" %
2399
                                   self.op.pnode)
2400
    self.op.pnode = pnode.name
2401
    self.pnode = pnode
2402
    self.secondaries = []
2403
    # disk template and mirror node verification
2404
    if self.op.disk_template not in constants.DISK_TEMPLATES:
2405
      raise errors.OpPrereqError, ("Invalid disk template name")
2406

    
2407
    if self.op.disk_template == constants.DT_REMOTE_RAID1:
2408
      if getattr(self.op, "snode", None) is None:
2409
        raise errors.OpPrereqError, ("The 'remote_raid1' disk template needs"
2410
                                     " a mirror node")
2411

    
2412
      snode_name = self.cfg.ExpandNodeName(self.op.snode)
2413
      if snode_name is None:
2414
        raise errors.OpPrereqError, ("Unknown secondary node '%s'" %
2415
                                     self.op.snode)
2416
      elif snode_name == pnode.name:
2417
        raise errors.OpPrereqError, ("The secondary node cannot be"
2418
                                     " the primary node.")
2419
      self.secondaries.append(snode_name)
2420

    
2421
    # Check lv size requirements
2422
    nodenames = [pnode.name] + self.secondaries
2423
    nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
2424

    
2425
    # Required free disk space as a function of disk and swap space
2426
    req_size_dict = {
2427
      constants.DT_DISKLESS: 0,
2428
      constants.DT_PLAIN: self.op.disk_size + self.op.swap_size,
2429
      constants.DT_LOCAL_RAID1: (self.op.disk_size + self.op.swap_size) * 2,
2430
      # 256 MB are added for drbd metadata, 128MB for each drbd device
2431
      constants.DT_REMOTE_RAID1: self.op.disk_size + self.op.swap_size + 256,
2432
    }
2433

    
2434
    if self.op.disk_template not in req_size_dict:
2435
      raise errors.ProgrammerError, ("Disk template '%s' size requirement"
2436
                                     " is unknown" %  self.op.disk_template)
2437

    
2438
    req_size = req_size_dict[self.op.disk_template]
2439

    
2440
    for node in nodenames:
2441
      info = nodeinfo.get(node, None)
2442
      if not info:
2443
        raise errors.OpPrereqError, ("Cannot get current information"
2444
                                     " from node '%s'" % nodeinfo)
2445
      if req_size > info['vg_free']:
2446
        raise errors.OpPrereqError, ("Not enough disk space on target node %s."
2447
                                     " %d MB available, %d MB required" %
2448
                                     (node, info['vg_free'], req_size))
2449

    
2450
    # os verification
2451
    os_obj = rpc.call_os_get([pnode.name], self.op.os_type)[pnode.name]
2452
    if not isinstance(os_obj, objects.OS):
2453
      raise errors.OpPrereqError, ("OS '%s' not in supported os list for"
2454
                                   " primary node"  % self.op.os_type)
2455

    
2456
    # instance verification
2457
    hostname1 = utils.LookupHostname(self.op.instance_name)
2458
    if not hostname1:
2459
      raise errors.OpPrereqError, ("Instance name '%s' not found in dns" %
2460
                                   self.op.instance_name)
2461

    
2462
    self.op.instance_name = instance_name = hostname1['hostname']
2463
    instance_list = self.cfg.GetInstanceList()
2464
    if instance_name in instance_list:
2465
      raise errors.OpPrereqError, ("Instance '%s' is already in the cluster" %
2466
                                   instance_name)
2467

    
2468
    ip = getattr(self.op, "ip", None)
2469
    if ip is None or ip.lower() == "none":
2470
      inst_ip = None
2471
    elif ip.lower() == "auto":
2472
      inst_ip = hostname1['ip']
2473
    else:
2474
      if not utils.IsValidIP(ip):
2475
        raise errors.OpPrereqError, ("given IP address '%s' doesn't look"
2476
                                     " like a valid IP" % ip)
2477
      inst_ip = ip
2478
    self.inst_ip = inst_ip
2479

    
2480
    command = ["fping", "-q", hostname1['ip']]
2481
    result = utils.RunCmd(command)
2482
    if not result.failed:
2483
      raise errors.OpPrereqError, ("IP %s of instance %s already in use" %
2484
                                   (hostname1['ip'], instance_name))
2485

    
2486
    # bridge verification
2487
    bridge = getattr(self.op, "bridge", None)
2488
    if bridge is None:
2489
      self.op.bridge = self.cfg.GetDefBridge()
2490
    else:
2491
      self.op.bridge = bridge
2492

    
2493
    if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
2494
      raise errors.OpPrereqError, ("target bridge '%s' does not exist on"
2495
                                   " destination node '%s'" %
2496
                                   (self.op.bridge, pnode.name))
2497

    
2498
    if self.op.start:
2499
      self.instance_status = 'up'
2500
    else:
2501
      self.instance_status = 'down'
2502

    
2503
  def Exec(self, feedback_fn):
2504
    """Create and add the instance to the cluster.
2505

2506
    """
2507
    instance = self.op.instance_name
2508
    pnode_name = self.pnode.name
2509

    
2510
    nic = objects.NIC(bridge=self.op.bridge, mac=self.cfg.GenerateMAC())
2511
    if self.inst_ip is not None:
2512
      nic.ip = self.inst_ip
2513

    
2514
    disks = _GenerateDiskTemplate(self.cfg, self.cfg.GetVGName(),
2515
                                  self.op.disk_template,
2516
                                  instance, pnode_name,
2517
                                  self.secondaries, self.op.disk_size,
2518
                                  self.op.swap_size)
2519

    
2520
    iobj = objects.Instance(name=instance, os=self.op.os_type,
2521
                            primary_node=pnode_name,
2522
                            memory=self.op.mem_size,
2523
                            vcpus=self.op.vcpus,
2524
                            nics=[nic], disks=disks,
2525
                            disk_template=self.op.disk_template,
2526
                            status=self.instance_status,
2527
                            )
2528

    
2529
    feedback_fn("* creating instance disks...")
2530
    if not _CreateDisks(self.cfg, iobj):
2531
      _RemoveDisks(iobj, self.cfg)
2532
      raise errors.OpExecError, ("Device creation failed, reverting...")
2533

    
2534
    feedback_fn("adding instance %s to cluster config" % instance)
2535

    
2536
    self.cfg.AddInstance(iobj)
2537

    
2538
    if self.op.wait_for_sync:
2539
      disk_abort = not _WaitForSync(self.cfg, iobj)
2540
    elif iobj.disk_template == "remote_raid1":
2541
      # make sure the disks are not degraded (still sync-ing is ok)
2542
      time.sleep(15)
2543
      feedback_fn("* checking mirrors status")
2544
      disk_abort = not _WaitForSync(self.cfg, iobj, oneshot=True)
2545
    else:
2546
      disk_abort = False
2547

    
2548
    if disk_abort:
2549
      _RemoveDisks(iobj, self.cfg)
2550
      self.cfg.RemoveInstance(iobj.name)
2551
      raise errors.OpExecError, ("There are some degraded disks for"
2552
                                      " this instance")
2553

    
2554
    feedback_fn("creating os for instance %s on node %s" %
2555
                (instance, pnode_name))
2556

    
2557
    if iobj.disk_template != constants.DT_DISKLESS:
2558
      if self.op.mode == constants.INSTANCE_CREATE:
2559
        feedback_fn("* running the instance OS create scripts...")
2560
        if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
2561
          raise errors.OpExecError, ("could not add os for instance %s"
2562
                                          " on node %s" %
2563
                                          (instance, pnode_name))
2564

    
2565
      elif self.op.mode == constants.INSTANCE_IMPORT:
2566
        feedback_fn("* running the instance OS import scripts...")
2567
        src_node = self.op.src_node
2568
        src_image = self.src_image
2569
        if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
2570
                                                src_node, src_image):
2571
          raise errors.OpExecError, ("Could not import os for instance"
2572
                                          " %s on node %s" %
2573
                                          (instance, pnode_name))
2574
      else:
2575
        # also checked in the prereq part
2576
        raise errors.ProgrammerError, ("Unknown OS initialization mode '%s'"
2577
                                       % self.op.mode)
2578

    
2579
    if self.op.start:
2580
      logger.Info("starting instance %s on node %s" % (instance, pnode_name))
2581
      feedback_fn("* starting instance...")
2582
      if not rpc.call_instance_start(pnode_name, iobj, None):
2583
        raise errors.OpExecError, ("Could not start instance")
2584

    
2585

    
2586
class LUConnectConsole(NoHooksLU):
2587
  """Connect to an instance's console.
2588

2589
  This is somewhat special in that it returns the command line that
2590
  you need to run on the master node in order to connect to the
2591
  console.
2592

2593
  """
2594
  _OP_REQP = ["instance_name"]
2595

    
2596
  def CheckPrereq(self):
2597
    """Check prerequisites.
2598

2599
    This checks that the instance is in the cluster.
2600

2601
    """
2602
    instance = self.cfg.GetInstanceInfo(
2603
      self.cfg.ExpandInstanceName(self.op.instance_name))
2604
    if instance is None:
2605
      raise errors.OpPrereqError, ("Instance '%s' not known" %
2606
                                   self.op.instance_name)
2607
    self.instance = instance
2608

    
2609
  def Exec(self, feedback_fn):
2610
    """Connect to the console of an instance
2611

2612
    """
2613
    instance = self.instance
2614
    node = instance.primary_node
2615

    
2616
    node_insts = rpc.call_instance_list([node])[node]
2617
    if node_insts is False:
2618
      raise errors.OpExecError, ("Can't connect to node %s." % node)
2619

    
2620
    if instance.name not in node_insts:
2621
      raise errors.OpExecError, ("Instance %s is not running." % instance.name)
2622

    
2623
    logger.Debug("connecting to console of %s on %s" % (instance.name, node))
2624

    
2625
    hyper = hypervisor.GetHypervisor()
2626
    console_cmd = hyper.GetShellCommandForConsole(instance.name)
2627
    return node, console_cmd
2628

    
2629

    
2630
class LUAddMDDRBDComponent(LogicalUnit):
2631
  """Adda new mirror member to an instance's disk.
2632

2633
  """
2634
  HPATH = "mirror-add"
2635
  HTYPE = constants.HTYPE_INSTANCE
2636
  _OP_REQP = ["instance_name", "remote_node", "disk_name"]
2637

    
2638
  def BuildHooksEnv(self):
2639
    """Build hooks env.
2640

2641
    This runs on the master, the primary and all the secondaries.
2642

2643
    """
2644
    env = {
2645
      "INSTANCE_NAME": self.op.instance_name,
2646
      "NEW_SECONDARY": self.op.remote_node,
2647
      "DISK_NAME": self.op.disk_name,
2648
      }
2649
    nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
2650
          self.op.remote_node,] + list(self.instance.secondary_nodes)
2651
    return env, nl, nl
2652

    
2653
  def CheckPrereq(self):
2654
    """Check prerequisites.
2655

2656
    This checks that the instance is in the cluster.
2657

2658
    """
2659
    instance = self.cfg.GetInstanceInfo(
2660
      self.cfg.ExpandInstanceName(self.op.instance_name))
2661
    if instance is None:
2662
      raise errors.OpPrereqError, ("Instance '%s' not known" %
2663
                                   self.op.instance_name)
2664
    self.instance = instance
2665

    
2666
    remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
2667
    if remote_node is None:
2668
      raise errors.OpPrereqError, ("Node '%s' not known" % self.op.remote_node)
2669
    self.remote_node = remote_node
2670

    
2671
    if remote_node == instance.primary_node:
2672
      raise errors.OpPrereqError, ("The specified node is the primary node of"
2673
                                   " the instance.")
2674

    
2675
    if instance.disk_template != constants.DT_REMOTE_RAID1:
2676
      raise errors.OpPrereqError, ("Instance's disk layout is not"
2677
                                   " remote_raid1.")
2678
    for disk in instance.disks:
2679
      if disk.iv_name == self.op.disk_name:
2680
        break
2681
    else:
2682
      raise errors.OpPrereqError, ("Can't find this device ('%s') in the"
2683
                                   " instance." % self.op.disk_name)
2684
    if len(disk.children) > 1:
2685
      raise errors.OpPrereqError, ("The device already has two slave"
2686
                                   " devices.\n"
2687
                                   "This would create a 3-disk raid1"
2688
                                   " which we don't allow.")
2689
    self.disk = disk
2690

    
2691
  def Exec(self, feedback_fn):
2692
    """Add the mirror component
2693

2694
    """
2695
    disk = self.disk
2696
    instance = self.instance
2697

    
2698
    remote_node = self.remote_node
2699
    new_drbd = _GenerateMDDRBDBranch(self.cfg, self.cfg.GetVGName(),
2700
                                     instance.primary_node, remote_node,
2701
                                     disk.size, "%s-%s" %
2702
                                     (instance.name, self.op.disk_name))
2703

    
2704
    logger.Info("adding new mirror component on secondary")
2705
    #HARDCODE
2706
    if not _CreateBlockDevOnSecondary(self.cfg, remote_node, new_drbd, False):
2707
      raise errors.OpExecError, ("Failed to create new component on secondary"
2708
                                 " node %s" % remote_node)
2709

    
2710
    logger.Info("adding new mirror component on primary")
2711
    #HARDCODE
2712
    if not _CreateBlockDevOnPrimary(self.cfg, instance.primary_node, new_drbd):
2713
      # remove secondary dev
2714
      self.cfg.SetDiskID(new_drbd, remote_node)
2715
      rpc.call_blockdev_remove(remote_node, new_drbd)
2716
      raise errors.OpExecError, ("Failed to create volume on primary")
2717

    
2718
    # the device exists now
2719
    # call the primary node to add the mirror to md
2720
    logger.Info("adding new mirror component to md")
2721
    if not rpc.call_blockdev_addchild(instance.primary_node,
2722
                                           disk, new_drbd):
2723
      logger.Error("Can't add mirror compoment to md!")
2724
      self.cfg.SetDiskID(new_drbd, remote_node)
2725
      if not rpc.call_blockdev_remove(remote_node, new_drbd):
2726
        logger.Error("Can't rollback on secondary")
2727
      self.cfg.SetDiskID(new_drbd, instance.primary_node)
2728
      if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
2729
        logger.Error("Can't rollback on primary")
2730
      raise errors.OpExecError, "Can't add mirror component to md array"
2731

    
2732
    disk.children.append(new_drbd)
2733

    
2734
    self.cfg.AddInstance(instance)
2735

    
2736
    _WaitForSync(self.cfg, instance)
2737

    
2738
    return 0
2739

    
2740

    
2741
class LURemoveMDDRBDComponent(LogicalUnit):
2742
  """Remove a component from a remote_raid1 disk.
2743

2744
  """
2745
  HPATH = "mirror-remove"
2746
  HTYPE = constants.HTYPE_INSTANCE
2747
  _OP_REQP = ["instance_name", "disk_name", "disk_id"]
2748

    
2749
  def BuildHooksEnv(self):
2750
    """Build hooks env.
2751

2752
    This runs on the master, the primary and all the secondaries.
2753

2754
    """
2755
    env = {
2756
      "INSTANCE_NAME": self.op.instance_name,
2757
      "DISK_NAME": self.op.disk_name,
2758
      "DISK_ID": self.op.disk_id,
2759
      "OLD_SECONDARY": self.old_secondary,
2760
      }
2761
    nl = [self.sstore.GetMasterNode(),
2762
          self.instance.primary_node] + list(self.instance.secondary_nodes)
2763
    return env, nl, nl
2764

    
2765
  def CheckPrereq(self):
2766
    """Check prerequisites.
2767

2768
    This checks that the instance is in the cluster.
2769

2770
    """
2771
    instance = self.cfg.GetInstanceInfo(
2772
      self.cfg.ExpandInstanceName(self.op.instance_name))
2773
    if instance is None:
2774
      raise errors.OpPrereqError, ("Instance '%s' not known" %
2775
                                   self.op.instance_name)
2776
    self.instance = instance
2777

    
2778
    if instance.disk_template != constants.DT_REMOTE_RAID1:
2779
      raise errors.OpPrereqError, ("Instance's disk layout is not"
2780
                                   " remote_raid1.")
2781
    for disk in instance.disks:
2782
      if disk.iv_name == self.op.disk_name:
2783
        break
2784
    else:
2785
      raise errors.OpPrereqError, ("Can't find this device ('%s') in the"
2786
                                   " instance." % self.op.disk_name)
2787
    for child in disk.children:
2788
      if child.dev_type == "drbd" and child.logical_id[2] == self.op.disk_id:
2789
        break
2790
    else:
2791
      raise errors.OpPrereqError, ("Can't find the device with this port.")
2792

    
2793
    if len(disk.children) < 2:
2794
      raise errors.OpPrereqError, ("Cannot remove the last component from"
2795
                                   " a mirror.")
2796
    self.disk = disk
2797
    self.child = child
2798
    if self.child.logical_id[0] == instance.primary_node:
2799
      oid = 1
2800
    else:
2801
      oid = 0
2802
    self.old_secondary = self.child.logical_id[oid]
2803

    
2804
  def Exec(self, feedback_fn):
2805
    """Remove the mirror component
2806

2807
    """
2808
    instance = self.instance
2809
    disk = self.disk
2810
    child = self.child
2811
    logger.Info("remove mirror component")
2812
    self.cfg.SetDiskID(disk, instance.primary_node)
2813
    if not rpc.call_blockdev_removechild(instance.primary_node,
2814
                                              disk, child):
2815
      raise errors.OpExecError, ("Can't remove child from mirror.")
2816

    
2817
    for node in child.logical_id[:2]:
2818
      self.cfg.SetDiskID(child, node)
2819
      if not rpc.call_blockdev_remove(node, child):
2820
        logger.Error("Warning: failed to remove device from node %s,"
2821
                     " continuing operation." % node)
2822

    
2823
    disk.children.remove(child)
2824
    self.cfg.AddInstance(instance)
2825

    
2826

    
2827
class LUReplaceDisks(LogicalUnit):
2828
  """Replace the disks of an instance.
2829

2830
  """
2831
  HPATH = "mirrors-replace"
2832
  HTYPE = constants.HTYPE_INSTANCE
2833
  _OP_REQP = ["instance_name"]
2834

    
2835
  def BuildHooksEnv(self):
2836
    """Build hooks env.
2837

2838
    This runs on the master, the primary and all the secondaries.
2839

2840
    """
2841
    env = {
2842
      "INSTANCE_NAME": self.op.instance_name,
2843
      "NEW_SECONDARY": self.op.remote_node,
2844
      "OLD_SECONDARY": self.instance.secondary_nodes[0],
2845
      }
2846
    nl = [self.sstore.GetMasterNode(),
2847
          self.instance.primary_node] + list(self.instance.secondary_nodes)
2848
    return env, nl, nl
2849

    
2850
  def CheckPrereq(self):
2851
    """Check prerequisites.
2852

2853
    This checks that the instance is in the cluster.
2854

2855
    """
2856
    instance = self.cfg.GetInstanceInfo(
2857
      self.cfg.ExpandInstanceName(self.op.instance_name))
2858
    if instance is None:
2859
      raise errors.OpPrereqError, ("Instance '%s' not known" %
2860
                                   self.op.instance_name)
2861
    self.instance = instance
2862

    
2863
    if instance.disk_template != constants.DT_REMOTE_RAID1:
2864
      raise errors.OpPrereqError, ("Instance's disk layout is not"
2865
                                   " remote_raid1.")
2866

    
2867
    if len(instance.secondary_nodes) != 1:
2868
      raise errors.OpPrereqError, ("The instance has a strange layout,"
2869
                                   " expected one secondary but found %d" %
2870
                                   len(instance.secondary_nodes))
2871

    
2872
    remote_node = getattr(self.op, "remote_node", None)
2873
    if remote_node is None:
2874
      remote_node = instance.secondary_nodes[0]
2875
    else:
2876
      remote_node = self.cfg.ExpandNodeName(remote_node)
2877
      if remote_node is None:
2878
        raise errors.OpPrereqError, ("Node '%s' not known" %
2879
                                     self.op.remote_node)
2880
    if remote_node == instance.primary_node:
2881
      raise errors.OpPrereqError, ("The specified node is the primary node of"
2882
                                   " the instance.")
2883
    self.op.remote_node = remote_node
2884

    
2885
  def Exec(self, feedback_fn):
2886
    """Replace the disks of an instance.
2887

2888
    """
2889
    instance = self.instance
2890
    iv_names = {}
2891
    # start of work
2892
    remote_node = self.op.remote_node
2893
    cfg = self.cfg
2894
    vgname = cfg.GetVGName()
2895
    for dev in instance.disks:
2896
      size = dev.size
2897
      new_drbd = _GenerateMDDRBDBranch(cfg, vgname, instance.primary_node,
2898
                                       remote_node, size,
2899
                                       "%s-%s" % (instance.name, dev.iv_name))
2900
      iv_names[dev.iv_name] = (dev, dev.children[0], new_drbd)
2901
      logger.Info("adding new mirror component on secondary for %s" %
2902
                  dev.iv_name)
2903
      #HARDCODE
2904
      if not _CreateBlockDevOnSecondary(cfg, remote_node, new_drbd, False):
2905
        raise errors.OpExecError, ("Failed to create new component on"
2906
                                   " secondary node %s\n"
2907
                                   "Full abort, cleanup manually!" %
2908
                                   remote_node)
2909

    
2910
      logger.Info("adding new mirror component on primary")
2911
      #HARDCODE
2912
      if not _CreateBlockDevOnPrimary(cfg, instance.primary_node, new_drbd):
2913
        # remove secondary dev
2914
        cfg.SetDiskID(new_drbd, remote_node)
2915
        rpc.call_blockdev_remove(remote_node, new_drbd)
2916
        raise errors.OpExecError("Failed to create volume on primary!\n"
2917
                                 "Full abort, cleanup manually!!")
2918

    
2919
      # the device exists now
2920
      # call the primary node to add the mirror to md
2921
      logger.Info("adding new mirror component to md")
2922
      if not rpc.call_blockdev_addchild(instance.primary_node, dev,
2923
                                        new_drbd):
2924
        logger.Error("Can't add mirror compoment to md!")
2925
        cfg.SetDiskID(new_drbd, remote_node)
2926
        if not rpc.call_blockdev_remove(remote_node, new_drbd):
2927
          logger.Error("Can't rollback on secondary")
2928
        cfg.SetDiskID(new_drbd, instance.primary_node)
2929
        if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
2930
          logger.Error("Can't rollback on primary")
2931
        raise errors.OpExecError, ("Full abort, cleanup manually!!")
2932

    
2933
      dev.children.append(new_drbd)
2934
      cfg.AddInstance(instance)
2935

    
2936
    # this can fail as the old devices are degraded and _WaitForSync
2937
    # does a combined result over all disks, so we don't check its
2938
    # return value
2939
    _WaitForSync(cfg, instance, unlock=True)
2940

    
2941
    # so check manually all the devices
2942
    for name in iv_names:
2943
      dev, child, new_drbd = iv_names[name]
2944
      cfg.SetDiskID(dev, instance.primary_node)
2945
      is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
2946
      if is_degr:
2947
        raise errors.OpExecError, ("MD device %s is degraded!" % name)
2948
      cfg.SetDiskID(new_drbd, instance.primary_node)
2949
      is_degr = rpc.call_blockdev_find(instance.primary_node, new_drbd)[5]
2950
      if is_degr:
2951
        raise errors.OpExecError, ("New drbd device %s is degraded!" % name)
2952

    
2953
    for name in iv_names:
2954
      dev, child, new_drbd = iv_names[name]
2955
      logger.Info("remove mirror %s component" % name)
2956
      cfg.SetDiskID(dev, instance.primary_node)
2957
      if not rpc.call_blockdev_removechild(instance.primary_node,
2958
                                                dev, child):
2959
        logger.Error("Can't remove child from mirror, aborting"
2960
                     " *this device cleanup*.\nYou need to cleanup manually!!")
2961
        continue
2962

    
2963
      for node in child.logical_id[:2]:
2964
        logger.Info("remove child device on %s" % node)
2965
        cfg.SetDiskID(child, node)
2966
        if not rpc.call_blockdev_remove(node, child):
2967
          logger.Error("Warning: failed to remove device from node %s,"
2968
                       " continuing operation." % node)
2969

    
2970
      dev.children.remove(child)
2971

    
2972
      cfg.AddInstance(instance)
2973

    
2974

    
2975
class LUQueryInstanceData(NoHooksLU):
2976
  """Query runtime instance data.
2977

2978
  """
2979
  _OP_REQP = ["instances"]
2980

    
2981
  def CheckPrereq(self):
2982
    """Check prerequisites.
2983

2984
    This only checks the optional instance list against the existing names.
2985

2986
    """
2987
    if not isinstance(self.op.instances, list):
2988
      raise errors.OpPrereqError, "Invalid argument type 'instances'"
2989
    if self.op.instances:
2990
      self.wanted_instances = []
2991
      names = self.op.instances
2992
      for name in names:
2993
        instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
2994
        if instance is None:
2995
          raise errors.OpPrereqError, ("No such instance name '%s'" % name)
2996
      self.wanted_instances.append(instance)
2997
    else:
2998
      self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
2999
                               in self.cfg.GetInstanceList()]
3000
    return
3001

    
3002

    
3003
  def _ComputeDiskStatus(self, instance, snode, dev):
3004
    """Compute block device status.
3005

3006
    """
3007
    self.cfg.SetDiskID(dev, instance.primary_node)
3008
    dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
3009
    if dev.dev_type == "drbd":
3010
      # we change the snode then (otherwise we use the one passed in)
3011
      if dev.logical_id[0] == instance.primary_node:
3012
        snode = dev.logical_id[1]
3013
      else:
3014
        snode = dev.logical_id[0]
3015

    
3016
    if snode:
3017
      self.cfg.SetDiskID(dev, snode)
3018
      dev_sstatus = rpc.call_blockdev_find(snode, dev)
3019
    else:
3020
      dev_sstatus = None
3021

    
3022
    if dev.children:
3023
      dev_children = [self._ComputeDiskStatus(instance, snode, child)
3024
                      for child in dev.children]
3025
    else:
3026
      dev_children = []
3027

    
3028
    data = {
3029
      "iv_name": dev.iv_name,
3030
      "dev_type": dev.dev_type,
3031
      "logical_id": dev.logical_id,
3032
      "physical_id": dev.physical_id,
3033
      "pstatus": dev_pstatus,
3034
      "sstatus": dev_sstatus,
3035
      "children": dev_children,
3036
      }
3037

    
3038
    return data
3039

    
3040
  def Exec(self, feedback_fn):
3041
    """Gather and return data"""
3042
    result = {}
3043
    for instance in self.wanted_instances:
3044
      remote_info = rpc.call_instance_info(instance.primary_node,
3045
                                                instance.name)
3046
      if remote_info and "state" in remote_info:
3047
        remote_state = "up"
3048
      else:
3049
        remote_state = "down"
3050
      if instance.status == "down":
3051
        config_state = "down"
3052
      else:
3053
        config_state = "up"
3054

    
3055
      disks = [self._ComputeDiskStatus(instance, None, device)
3056
               for device in instance.disks]
3057

    
3058
      idict = {
3059
        "name": instance.name,
3060
        "config_state": config_state,
3061
        "run_state": remote_state,
3062
        "pnode": instance.primary_node,
3063
        "snodes": instance.secondary_nodes,
3064
        "os": instance.os,
3065
        "memory": instance.memory,
3066
        "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
3067
        "disks": disks,
3068
        }
3069

    
3070
      result[instance.name] = idict
3071

    
3072
    return result
3073

    
3074

    
3075
class LUQueryNodeData(NoHooksLU):
3076
  """Logical unit for querying node data.
3077

3078
  """
3079
  _OP_REQP = ["nodes"]
3080

    
3081
  def CheckPrereq(self):
3082
    """Check prerequisites.
3083

3084
    This only checks the optional node list against the existing names.
3085

3086
    """
3087
    self.wanted_nodes = _GetWantedNodes(self, self.op.nodes)
3088

    
3089
  def Exec(self, feedback_fn):
3090
    """Compute and return the list of nodes.
3091

3092
    """
3093
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
3094
             in self.cfg.GetInstanceList()]
3095
    result = []
3096
    for node in self.wanted_nodes:
3097
      result.append((node.name, node.primary_ip, node.secondary_ip,
3098
                     [inst.name for inst in ilist
3099
                      if inst.primary_node == node.name],
3100
                     [inst.name for inst in ilist
3101
                      if node.name in inst.secondary_nodes],
3102
                     ))
3103
    return result
3104

    
3105

    
3106
class LUSetInstanceParms(LogicalUnit):
3107
  """Modifies an instances's parameters.
3108

3109
  """
3110
  HPATH = "instance-modify"
3111
  HTYPE = constants.HTYPE_INSTANCE
3112
  _OP_REQP = ["instance_name"]
3113

    
3114
  def BuildHooksEnv(self):
3115
    """Build hooks env.
3116

3117
    This runs on the master, primary and secondaries.
3118

3119
    """
3120
    env = {
3121
      "INSTANCE_NAME": self.op.instance_name,
3122
      }
3123
    if self.mem:
3124
      env["MEM_SIZE"] = self.mem
3125
    if self.vcpus:
3126
      env["VCPUS"] = self.vcpus
3127
    if self.do_ip:
3128
      env["INSTANCE_IP"] = self.ip
3129
    if self.bridge:
3130
      env["BRIDGE"] = self.bridge
3131

    
3132
    nl = [self.sstore.GetMasterNode(),
3133
          self.instance.primary_node] + list(self.instance.secondary_nodes)
3134

    
3135
    return env, nl, nl
3136

    
3137
  def CheckPrereq(self):
3138
    """Check prerequisites.
3139

3140
    This only checks the instance list against the existing names.
3141

3142
    """
3143
    self.mem = getattr(self.op, "mem", None)
3144
    self.vcpus = getattr(self.op, "vcpus", None)
3145
    self.ip = getattr(self.op, "ip", None)
3146
    self.bridge = getattr(self.op, "bridge", None)
3147
    if [self.mem, self.vcpus, self.ip, self.bridge].count(None) == 4:
3148
      raise errors.OpPrereqError, ("No changes submitted")
3149
    if self.mem is not None:
3150
      try:
3151
        self.mem = int(self.mem)
3152
      except ValueError, err:
3153
        raise errors.OpPrereqError, ("Invalid memory size: %s" % str(err))
3154
    if self.vcpus is not None:
3155
      try:
3156
        self.vcpus = int(self.vcpus)
3157
      except ValueError, err:
3158
        raise errors.OpPrereqError, ("Invalid vcpus number: %s" % str(err))
3159
    if self.ip is not None:
3160
      self.do_ip = True
3161
      if self.ip.lower() == "none":
3162
        self.ip = None
3163
      else:
3164
        if not utils.IsValidIP(self.ip):
3165
          raise errors.OpPrereqError, ("Invalid IP address '%s'." % self.ip)
3166
    else:
3167
      self.do_ip = False
3168

    
3169
    instance = self.cfg.GetInstanceInfo(
3170
      self.cfg.ExpandInstanceName(self.op.instance_name))
3171
    if instance is None:
3172
      raise errors.OpPrereqError, ("No such instance name '%s'" %
3173
                                   self.op.instance_name)
3174
    self.op.instance_name = instance.name
3175
    self.instance = instance
3176
    return
3177

    
3178
  def Exec(self, feedback_fn):
3179
    """Modifies an instance.
3180

3181
    All parameters take effect only at the next restart of the instance.
3182
    """
3183
    result = []
3184
    instance = self.instance
3185
    if self.mem:
3186
      instance.memory = self.mem
3187
      result.append(("mem", self.mem))
3188
    if self.vcpus:
3189
      instance.vcpus = self.vcpus
3190
      result.append(("vcpus",  self.vcpus))
3191
    if self.do_ip:
3192
      instance.nics[0].ip = self.ip
3193
      result.append(("ip", self.ip))
3194
    if self.bridge:
3195
      instance.nics[0].bridge = self.bridge
3196
      result.append(("bridge", self.bridge))
3197

    
3198
    self.cfg.AddInstance(instance)
3199

    
3200
    return result
3201

    
3202

    
3203
class LUQueryExports(NoHooksLU):
3204
  """Query the exports list
3205

3206
  """
3207
  _OP_REQP = []
3208

    
3209
  def CheckPrereq(self):
3210
    """Check that the nodelist contains only existing nodes.
3211

3212
    """
3213
    self.nodes = _GetWantedNodes(self, getattr(self.op, "nodes", None))
3214

    
3215
  def Exec(self, feedback_fn):
3216
    """Compute the list of all the exported system images.
3217

3218
    Returns:
3219
      a dictionary with the structure node->(export-list)
3220
      where export-list is a list of the instances exported on
3221
      that node.
3222

3223
    """
3224
    return rpc.call_export_list([node.name for node in self.nodes])
3225

    
3226

    
3227
class LUExportInstance(LogicalUnit):
3228
  """Export an instance to an image in the cluster.
3229

3230
  """
3231
  HPATH = "instance-export"
3232
  HTYPE = constants.HTYPE_INSTANCE
3233
  _OP_REQP = ["instance_name", "target_node", "shutdown"]
3234

    
3235
  def BuildHooksEnv(self):
3236
    """Build hooks env.
3237

3238
    This will run on the master, primary node and target node.
3239

3240
    """
3241
    env = {
3242
      "INSTANCE_NAME": self.op.instance_name,
3243
      "EXPORT_NODE": self.op.target_node,
3244
      "EXPORT_DO_SHUTDOWN": self.op.shutdown,
3245
      }
3246
    nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
3247
          self.op.target_node]
3248
    return env, nl, nl
3249

    
3250
  def CheckPrereq(self):
3251
    """Check prerequisites.
3252

3253
    This checks that the instance name is a valid one.
3254

3255
    """
3256
    instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
3257
    self.instance = self.cfg.GetInstanceInfo(instance_name)
3258
    if self.instance is None:
3259
      raise errors.OpPrereqError, ("Instance '%s' not found" %
3260
                                   self.op.instance_name)
3261

    
3262
    # node verification
3263
    dst_node_short = self.cfg.ExpandNodeName(self.op.target_node)
3264
    self.dst_node = self.cfg.GetNodeInfo(dst_node_short)
3265

    
3266
    if self.dst_node is None:
3267
      raise errors.OpPrereqError, ("Destination node '%s' is uknown." %
3268
                                   self.op.target_node)
3269
    self.op.target_node = self.dst_node.name
3270

    
3271
  def Exec(self, feedback_fn):
3272
    """Export an instance to an image in the cluster.
3273

3274
    """
3275
    instance = self.instance
3276
    dst_node = self.dst_node
3277
    src_node = instance.primary_node
3278
    # shutdown the instance, unless requested not to do so
3279
    if self.op.shutdown:
3280
      op = opcodes.OpShutdownInstance(instance_name=instance.name)
3281
      self.processor.ChainOpCode(op, feedback_fn)
3282

    
3283
    vgname = self.cfg.GetVGName()
3284

    
3285
    snap_disks = []
3286

    
3287
    try:
3288
      for disk in instance.disks:
3289
        if disk.iv_name == "sda":
3290
          # new_dev_name will be a snapshot of an lvm leaf of the one we passed
3291
          new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
3292

    
3293
          if not new_dev_name:
3294
            logger.Error("could not snapshot block device %s on node %s" %
3295
                         (disk.logical_id[1], src_node))
3296
          else:
3297
            new_dev = objects.Disk(dev_type="lvm", size=disk.size,
3298
                                      logical_id=(vgname, new_dev_name),
3299
                                      physical_id=(vgname, new_dev_name),
3300
                                      iv_name=disk.iv_name)
3301
            snap_disks.append(new_dev)
3302

    
3303
    finally:
3304
      if self.op.shutdown:
3305
        op = opcodes.OpStartupInstance(instance_name=instance.name,
3306
                                       force=False)
3307
        self.processor.ChainOpCode(op, feedback_fn)
3308

    
3309
    # TODO: check for size
3310

    
3311
    for dev in snap_disks:
3312
      if not rpc.call_snapshot_export(src_node, dev, dst_node.name,
3313
                                           instance):
3314
        logger.Error("could not export block device %s from node"
3315
                     " %s to node %s" %
3316
                     (dev.logical_id[1], src_node, dst_node.name))
3317
      if not rpc.call_blockdev_remove(src_node, dev):
3318
        logger.Error("could not remove snapshot block device %s from"
3319
                     " node %s" % (dev.logical_id[1], src_node))
3320

    
3321
    if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
3322
      logger.Error("could not finalize export for instance %s on node %s" %
3323
                   (instance.name, dst_node.name))
3324

    
3325
    nodelist = self.cfg.GetNodeList()
3326
    nodelist.remove(dst_node.name)
3327

    
3328
    # on one-node clusters nodelist will be empty after the removal
3329
    # if we proceed the backup would be removed because OpQueryExports
3330
    # substitutes an empty list with the full cluster node list.
3331
    if nodelist:
3332
      op = opcodes.OpQueryExports(nodes=nodelist)
3333
      exportlist = self.processor.ChainOpCode(op, feedback_fn)
3334
      for node in exportlist:
3335
        if instance.name in exportlist[node]:
3336
          if not rpc.call_export_remove(node, instance.name):
3337
            logger.Error("could not remove older export for instance %s"
3338
                         " on node %s" % (instance.name, node))