Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ 396e1b78

History | View | Annotate | Download (111 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0613,W0201
25

    
26
import os
27
import os.path
28
import sha
29
import socket
30
import time
31
import tempfile
32
import re
33
import platform
34

    
35
from ganeti import rpc
36
from ganeti import ssh
37
from ganeti import logger
38
from ganeti import utils
39
from ganeti import errors
40
from ganeti import hypervisor
41
from ganeti import config
42
from ganeti import constants
43
from ganeti import objects
44
from ganeti import opcodes
45
from ganeti import ssconf
46

    
47
class LogicalUnit(object):
48
  """Logical Unit base class.
49

50
  Subclasses must follow these rules:
51
    - implement CheckPrereq which also fills in the opcode instance
52
      with all the fields (even if as None)
53
    - implement Exec
54
    - implement BuildHooksEnv
55
    - redefine HPATH and HTYPE
56
    - optionally redefine their run requirements (REQ_CLUSTER,
57
      REQ_MASTER); note that all commands require root permissions
58

59
  """
60
  HPATH = None
61
  HTYPE = None
62
  _OP_REQP = []
63
  REQ_CLUSTER = True
64
  REQ_MASTER = True
65

    
66
  def __init__(self, processor, op, cfg, sstore):
67
    """Constructor for LogicalUnit.
68

69
    This needs to be overriden in derived classes in order to check op
70
    validity.
71

72
    """
73
    self.processor = processor
74
    self.op = op
75
    self.cfg = cfg
76
    self.sstore = sstore
77
    for attr_name in self._OP_REQP:
78
      attr_val = getattr(op, attr_name, None)
79
      if attr_val is None:
80
        raise errors.OpPrereqError, ("Required parameter '%s' missing" %
81
                                     attr_name)
82
    if self.REQ_CLUSTER:
83
      if not cfg.IsCluster():
84
        raise errors.OpPrereqError, ("Cluster not initialized yet,"
85
                                     " use 'gnt-cluster init' first.")
86
      if self.REQ_MASTER:
87
        master = sstore.GetMasterNode()
88
        if master != socket.gethostname():
89
          raise errors.OpPrereqError, ("Commands must be run on the master"
90
                                       " node %s" % master)
91

    
92
  def CheckPrereq(self):
93
    """Check prerequisites for this LU.
94

95
    This method should check that the prerequisites for the execution
96
    of this LU are fulfilled. It can do internode communication, but
97
    it should be idempotent - no cluster or system changes are
98
    allowed.
99

100
    The method should raise errors.OpPrereqError in case something is
101
    not fulfilled. Its return value is ignored.
102

103
    This method should also update all the parameters of the opcode to
104
    their canonical form; e.g. a short node name must be fully
105
    expanded after this method has successfully completed (so that
106
    hooks, logging, etc. work correctly).
107

108
    """
109
    raise NotImplementedError
110

    
111
  def Exec(self, feedback_fn):
112
    """Execute the LU.
113

114
    This method should implement the actual work. It should raise
115
    errors.OpExecError for failures that are somewhat dealt with in
116
    code, or expected.
117

118
    """
119
    raise NotImplementedError
120

    
121
  def BuildHooksEnv(self):
122
    """Build hooks environment for this LU.
123

124
    This method should return a three-node tuple consisting of: a dict
125
    containing the environment that will be used for running the
126
    specific hook for this LU, a list of node names on which the hook
127
    should run before the execution, and a list of node names on which
128
    the hook should run after the execution.
129

130
    The keys of the dict must not have 'GANETI_' prefixed as this will
131
    be handled in the hooks runner. Also note additional keys will be
132
    added by the hooks runner. If the LU doesn't define any
133
    environment, an empty dict (and not None) should be returned.
134

135
    As for the node lists, the master should not be included in the
136
    them, as it will be added by the hooks runner in case this LU
137
    requires a cluster to run on (otherwise we don't have a node
138
    list). No nodes should be returned as an empty list (and not
139
    None).
140

141
    Note that if the HPATH for a LU class is None, this function will
142
    not be called.
143

144
    """
145
    raise NotImplementedError
146

    
147

    
148
class NoHooksLU(LogicalUnit):
149
  """Simple LU which runs no hooks.
150

151
  This LU is intended as a parent for other LogicalUnits which will
152
  run no hooks, in order to reduce duplicate code.
153

154
  """
155
  HPATH = None
156
  HTYPE = None
157

    
158
  def BuildHooksEnv(self):
159
    """Build hooks env.
160

161
    This is a no-op, since we don't run hooks.
162

163
    """
164
    return
165

    
166

    
167
def _GetWantedNodes(lu, nodes):
168
  """Returns list of checked and expanded nodes.
169

170
  Args:
171
    nodes: List of nodes (strings) or None for all
172

173
  """
174
  if nodes is not None and not isinstance(nodes, list):
175
    raise errors.OpPrereqError, "Invalid argument type 'nodes'"
176

    
177
  if nodes:
178
    wanted_nodes = []
179

    
180
    for name in nodes:
181
      node = lu.cfg.GetNodeInfo(lu.cfg.ExpandNodeName(name))
182
      if node is None:
183
        raise errors.OpPrereqError, ("No such node name '%s'" % name)
184
    wanted_nodes.append(node)
185

    
186
    return wanted_nodes
187
  else:
188
    return [lu.cfg.GetNodeInfo(name) for name in lu.cfg.GetNodeList()]
189

    
190

    
191
def _CheckOutputFields(static, dynamic, selected):
192
  """Checks whether all selected fields are valid.
193

194
  Args:
195
    static: Static fields
196
    dynamic: Dynamic fields
197

198
  """
199
  static_fields = frozenset(static)
200
  dynamic_fields = frozenset(dynamic)
201

    
202
  all_fields = static_fields | dynamic_fields
203

    
204
  if not all_fields.issuperset(selected):
205
    raise errors.OpPrereqError, ("Unknown output fields selected: %s"
206
                                 % ",".join(frozenset(selected).
207
                                            difference(all_fields)))
208

    
209

    
210
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os, status,
211
                          memory, vcpus, nics):
212
  """
213
  """
214
  env = {
215
    "INSTANCE_NAME": name,
216
    "INSTANCE_PRIMARY": primary_node,
217
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
218
    "INSTANCE_OS": os,
219
    "INSTANCE_STATUS": status,
220
    "INSTANCE_MEMORY": memory,
221
    "INSTANCE_VCPUS": vcpus,
222
  }
223

    
224
  if nics:
225
    nic_count = len(nics)
226
    for idx, (ip, bridge) in enumerate(nics):
227
      if ip is None:
228
        ip = ""
229
      env["INSTANCE_NIC%d_IP" % idx] = ip
230
      env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
231
  else:
232
    nic_count = 0
233

    
234
  env["INSTANCE_NIC_COUNT"] = nic_count
235

    
236
  return env
237

    
238

    
239
def _BuildInstanceHookEnvByObject(instance, override=None):
240
  args = {
241
    'name': instance.name,
242
    'primary_node': instance.primary_node,
243
    'secondary_nodes': instance.secondary_nodes,
244
    'os': instance.os,
245
    'status': instance.os,
246
    'memory': instance.memory,
247
    'vcpus': instance.vcpus,
248
    'nics': [(nic.ip, nic.bridge) for nic in instance.nics],
249
  }
250
  if override:
251
    args.update(override)
252
  return _BuildInstanceHookEnv(**args)
253

    
254

    
255
def _UpdateEtcHosts(fullnode, ip):
256
  """Ensure a node has a correct entry in /etc/hosts.
257

258
  Args:
259
    fullnode - Fully qualified domain name of host. (str)
260
    ip       - IPv4 address of host (str)
261

262
  """
263
  node = fullnode.split(".", 1)[0]
264

    
265
  f = open('/etc/hosts', 'r+')
266

    
267
  inthere = False
268

    
269
  save_lines = []
270
  add_lines = []
271
  removed = False
272

    
273
  while True:
274
    rawline = f.readline()
275

    
276
    if not rawline:
277
      # End of file
278
      break
279

    
280
    line = rawline.split('\n')[0]
281

    
282
    # Strip off comments
283
    line = line.split('#')[0]
284

    
285
    if not line:
286
      # Entire line was comment, skip
287
      save_lines.append(rawline)
288
      continue
289

    
290
    fields = line.split()
291

    
292
    haveall = True
293
    havesome = False
294
    for spec in [ ip, fullnode, node ]:
295
      if spec not in fields:
296
        haveall = False
297
      if spec in fields:
298
        havesome = True
299

    
300
    if haveall:
301
      inthere = True
302
      save_lines.append(rawline)
303
      continue
304

    
305
    if havesome and not haveall:
306
      # Line (old, or manual?) which is missing some.  Remove.
307
      removed = True
308
      continue
309

    
310
    save_lines.append(rawline)
311

    
312
  if not inthere:
313
    add_lines.append('%s\t%s %s\n' % (ip, fullnode, node))
314

    
315
  if removed:
316
    if add_lines:
317
      save_lines = save_lines + add_lines
318

    
319
    # We removed a line, write a new file and replace old.
320
    fd, tmpname = tempfile.mkstemp('tmp', 'hosts_', '/etc')
321
    newfile = os.fdopen(fd, 'w')
322
    newfile.write(''.join(save_lines))
323
    newfile.close()
324
    os.rename(tmpname, '/etc/hosts')
325

    
326
  elif add_lines:
327
    # Simply appending a new line will do the trick.
328
    f.seek(0, 2)
329
    for add in add_lines:
330
      f.write(add)
331

    
332
  f.close()
333

    
334

    
335
def _UpdateKnownHosts(fullnode, ip, pubkey):
336
  """Ensure a node has a correct known_hosts entry.
337

338
  Args:
339
    fullnode - Fully qualified domain name of host. (str)
340
    ip       - IPv4 address of host (str)
341
    pubkey   - the public key of the cluster
342

343
  """
344
  if os.path.exists('/etc/ssh/ssh_known_hosts'):
345
    f = open('/etc/ssh/ssh_known_hosts', 'r+')
346
  else:
347
    f = open('/etc/ssh/ssh_known_hosts', 'w+')
348

    
349
  inthere = False
350

    
351
  save_lines = []
352
  add_lines = []
353
  removed = False
354

    
355
  while True:
356
    rawline = f.readline()
357
    logger.Debug('read %s' % (repr(rawline),))
358

    
359
    if not rawline:
360
      # End of file
361
      break
362

    
363
    line = rawline.split('\n')[0]
364

    
365
    parts = line.split(' ')
366
    fields = parts[0].split(',')
367
    key = parts[2]
368

    
369
    haveall = True
370
    havesome = False
371
    for spec in [ ip, fullnode ]:
372
      if spec not in fields:
373
        haveall = False
374
      if spec in fields:
375
        havesome = True
376

    
377
    logger.Debug("key, pubkey = %s." % (repr((key, pubkey)),))
378
    if haveall and key == pubkey:
379
      inthere = True
380
      save_lines.append(rawline)
381
      logger.Debug("Keeping known_hosts '%s'." % (repr(rawline),))
382
      continue
383

    
384
    if havesome and (not haveall or key != pubkey):
385
      removed = True
386
      logger.Debug("Discarding known_hosts '%s'." % (repr(rawline),))
387
      continue
388

    
389
    save_lines.append(rawline)
390

    
391
  if not inthere:
392
    add_lines.append('%s,%s ssh-rsa %s\n' % (fullnode, ip, pubkey))
393
    logger.Debug("Adding known_hosts '%s'." % (repr(add_lines[-1]),))
394

    
395
  if removed:
396
    save_lines = save_lines + add_lines
397

    
398
    # Write a new file and replace old.
399
    fd, tmpname = tempfile.mkstemp('tmp', 'ssh_known_hosts_', '/etc/ssh')
400
    newfile = os.fdopen(fd, 'w')
401
    newfile.write(''.join(save_lines))
402
    newfile.close()
403
    logger.Debug("Wrote new known_hosts.")
404
    os.rename(tmpname, '/etc/ssh/ssh_known_hosts')
405

    
406
  elif add_lines:
407
    # Simply appending a new line will do the trick.
408
    f.seek(0, 2)
409
    for add in add_lines:
410
      f.write(add)
411

    
412
  f.close()
413

    
414

    
415
def _HasValidVG(vglist, vgname):
416
  """Checks if the volume group list is valid.
417

418
  A non-None return value means there's an error, and the return value
419
  is the error message.
420

421
  """
422
  vgsize = vglist.get(vgname, None)
423
  if vgsize is None:
424
    return "volume group '%s' missing" % vgname
425
  elif vgsize < 20480:
426
    return ("volume group '%s' too small (20480MiB required, %dMib found)" %
427
            (vgname, vgsize))
428
  return None
429

    
430

    
431
def _InitSSHSetup(node):
432
  """Setup the SSH configuration for the cluster.
433

434

435
  This generates a dsa keypair for root, adds the pub key to the
436
  permitted hosts and adds the hostkey to its own known hosts.
437

438
  Args:
439
    node: the name of this host as a fqdn
440

441
  """
442
  utils.RemoveFile('/root/.ssh/known_hosts')
443

    
444
  if os.path.exists('/root/.ssh/id_dsa'):
445
    utils.CreateBackup('/root/.ssh/id_dsa')
446
  if os.path.exists('/root/.ssh/id_dsa.pub'):
447
    utils.CreateBackup('/root/.ssh/id_dsa.pub')
448

    
449
  utils.RemoveFile('/root/.ssh/id_dsa')
450
  utils.RemoveFile('/root/.ssh/id_dsa.pub')
451

    
452
  result = utils.RunCmd(["ssh-keygen", "-t", "dsa",
453
                         "-f", "/root/.ssh/id_dsa",
454
                         "-q", "-N", ""])
455
  if result.failed:
456
    raise errors.OpExecError, ("could not generate ssh keypair, error %s" %
457
                               result.output)
458

    
459
  f = open('/root/.ssh/id_dsa.pub', 'r')
460
  try:
461
    utils.AddAuthorizedKey('/root/.ssh/authorized_keys', f.read(8192))
462
  finally:
463
    f.close()
464

    
465

    
466
def _InitGanetiServerSetup(ss):
467
  """Setup the necessary configuration for the initial node daemon.
468

469
  This creates the nodepass file containing the shared password for
470
  the cluster and also generates the SSL certificate.
471

472
  """
473
  # Create pseudo random password
474
  randpass = sha.new(os.urandom(64)).hexdigest()
475
  # and write it into sstore
476
  ss.SetKey(ss.SS_NODED_PASS, randpass)
477

    
478
  result = utils.RunCmd(["openssl", "req", "-new", "-newkey", "rsa:1024",
479
                         "-days", str(365*5), "-nodes", "-x509",
480
                         "-keyout", constants.SSL_CERT_FILE,
481
                         "-out", constants.SSL_CERT_FILE, "-batch"])
482
  if result.failed:
483
    raise errors.OpExecError, ("could not generate server ssl cert, command"
484
                               " %s had exitcode %s and error message %s" %
485
                               (result.cmd, result.exit_code, result.output))
486

    
487
  os.chmod(constants.SSL_CERT_FILE, 0400)
488

    
489
  result = utils.RunCmd([constants.NODE_INITD_SCRIPT, "restart"])
490

    
491
  if result.failed:
492
    raise errors.OpExecError, ("could not start the node daemon, command %s"
493
                               " had exitcode %s and error %s" %
494
                               (result.cmd, result.exit_code, result.output))
495

    
496

    
497
class LUInitCluster(LogicalUnit):
498
  """Initialise the cluster.
499

500
  """
501
  HPATH = "cluster-init"
502
  HTYPE = constants.HTYPE_CLUSTER
503
  _OP_REQP = ["cluster_name", "hypervisor_type", "vg_name", "mac_prefix",
504
              "def_bridge", "master_netdev"]
505
  REQ_CLUSTER = False
506

    
507
  def BuildHooksEnv(self):
508
    """Build hooks env.
509

510
    Notes: Since we don't require a cluster, we must manually add
511
    ourselves in the post-run node list.
512

513
    """
514
    env = {
515
      "CLUSTER": self.op.cluster_name,
516
      "MASTER": self.hostname['hostname_full'],
517
      }
518
    return env, [], [self.hostname['hostname_full']]
519

    
520
  def CheckPrereq(self):
521
    """Verify that the passed name is a valid one.
522

523
    """
524
    if config.ConfigWriter.IsCluster():
525
      raise errors.OpPrereqError, ("Cluster is already initialised")
526

    
527
    hostname_local = socket.gethostname()
528
    self.hostname = hostname = utils.LookupHostname(hostname_local)
529
    if not hostname:
530
      raise errors.OpPrereqError, ("Cannot resolve my own hostname ('%s')" %
531
                                   hostname_local)
532

    
533
    self.clustername = clustername = utils.LookupHostname(self.op.cluster_name)
534
    if not clustername:
535
      raise errors.OpPrereqError, ("Cannot resolve given cluster name ('%s')"
536
                                   % self.op.cluster_name)
537

    
538
    result = utils.RunCmd(["fping", "-S127.0.0.1", "-q", hostname['ip']])
539
    if result.failed:
540
      raise errors.OpPrereqError, ("Inconsistency: this host's name resolves"
541
                                   " to %s,\nbut this ip address does not"
542
                                   " belong to this host."
543
                                   " Aborting." % hostname['ip'])
544

    
545
    secondary_ip = getattr(self.op, "secondary_ip", None)
546
    if secondary_ip and not utils.IsValidIP(secondary_ip):
547
      raise errors.OpPrereqError, ("Invalid secondary ip given")
548
    if secondary_ip and secondary_ip != hostname['ip']:
549
      result = utils.RunCmd(["fping", "-S127.0.0.1", "-q", secondary_ip])
550
      if result.failed:
551
        raise errors.OpPrereqError, ("You gave %s as secondary IP,\n"
552
                                     "but it does not belong to this host." %
553
                                     secondary_ip)
554
    self.secondary_ip = secondary_ip
555

    
556
    # checks presence of the volume group given
557
    vgstatus = _HasValidVG(utils.ListVolumeGroups(), self.op.vg_name)
558

    
559
    if vgstatus:
560
      raise errors.OpPrereqError, ("Error: %s" % vgstatus)
561

    
562
    if not re.match("^[0-9a-z]{2}:[0-9a-z]{2}:[0-9a-z]{2}$",
563
                    self.op.mac_prefix):
564
      raise errors.OpPrereqError, ("Invalid mac prefix given '%s'" %
565
                                   self.op.mac_prefix)
566

    
567
    if self.op.hypervisor_type not in hypervisor.VALID_HTYPES:
568
      raise errors.OpPrereqError, ("Invalid hypervisor type given '%s'" %
569
                                   self.op.hypervisor_type)
570

    
571
    result = utils.RunCmd(["ip", "link", "show", "dev", self.op.master_netdev])
572
    if result.failed:
573
      raise errors.OpPrereqError, ("Invalid master netdev given (%s): '%s'" %
574
                                   (self.op.master_netdev, result.output))
575

    
576
  def Exec(self, feedback_fn):
577
    """Initialize the cluster.
578

579
    """
580
    clustername = self.clustername
581
    hostname = self.hostname
582

    
583
    # set up the simple store
584
    ss = ssconf.SimpleStore()
585
    ss.SetKey(ss.SS_HYPERVISOR, self.op.hypervisor_type)
586
    ss.SetKey(ss.SS_MASTER_NODE, hostname['hostname_full'])
587
    ss.SetKey(ss.SS_MASTER_IP, clustername['ip'])
588
    ss.SetKey(ss.SS_MASTER_NETDEV, self.op.master_netdev)
589
    ss.SetKey(ss.SS_CLUSTER_NAME, clustername['hostname'])
590

    
591
    # set up the inter-node password and certificate
592
    _InitGanetiServerSetup(ss)
593

    
594
    # start the master ip
595
    rpc.call_node_start_master(hostname['hostname_full'])
596

    
597
    # set up ssh config and /etc/hosts
598
    f = open('/etc/ssh/ssh_host_rsa_key.pub', 'r')
599
    try:
600
      sshline = f.read()
601
    finally:
602
      f.close()
603
    sshkey = sshline.split(" ")[1]
604

    
605
    _UpdateEtcHosts(hostname['hostname_full'],
606
                    hostname['ip'],
607
                    )
608

    
609
    _UpdateKnownHosts(hostname['hostname_full'],
610
                      hostname['ip'],
611
                      sshkey,
612
                      )
613

    
614
    _InitSSHSetup(hostname['hostname'])
615

    
616
    # init of cluster config file
617
    cfgw = config.ConfigWriter()
618
    cfgw.InitConfig(hostname['hostname'], hostname['ip'], self.secondary_ip,
619
                    sshkey, self.op.mac_prefix,
620
                    self.op.vg_name, self.op.def_bridge)
621

    
622

    
623
class LUDestroyCluster(NoHooksLU):
624
  """Logical unit for destroying the cluster.
625

626
  """
627
  _OP_REQP = []
628

    
629
  def CheckPrereq(self):
630
    """Check prerequisites.
631

632
    This checks whether the cluster is empty.
633

634
    Any errors are signalled by raising errors.OpPrereqError.
635

636
    """
637
    master = self.sstore.GetMasterNode()
638

    
639
    nodelist = self.cfg.GetNodeList()
640
    if len(nodelist) != 1 or nodelist[0] != master:
641
      raise errors.OpPrereqError, ("There are still %d node(s) in "
642
                                   "this cluster." % (len(nodelist) - 1))
643
    instancelist = self.cfg.GetInstanceList()
644
    if instancelist:
645
      raise errors.OpPrereqError, ("There are still %d instance(s) in "
646
                                   "this cluster." % len(instancelist))
647

    
648
  def Exec(self, feedback_fn):
649
    """Destroys the cluster.
650

651
    """
652
    utils.CreateBackup('/root/.ssh/id_dsa')
653
    utils.CreateBackup('/root/.ssh/id_dsa.pub')
654
    rpc.call_node_leave_cluster(self.sstore.GetMasterNode())
655

    
656

    
657
class LUVerifyCluster(NoHooksLU):
658
  """Verifies the cluster status.
659

660
  """
661
  _OP_REQP = []
662

    
663
  def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
664
                  remote_version, feedback_fn):
665
    """Run multiple tests against a node.
666

667
    Test list:
668
      - compares ganeti version
669
      - checks vg existance and size > 20G
670
      - checks config file checksum
671
      - checks ssh to other nodes
672

673
    Args:
674
      node: name of the node to check
675
      file_list: required list of files
676
      local_cksum: dictionary of local files and their checksums
677

678
    """
679
    # compares ganeti version
680
    local_version = constants.PROTOCOL_VERSION
681
    if not remote_version:
682
      feedback_fn(" - ERROR: connection to %s failed" % (node))
683
      return True
684

    
685
    if local_version != remote_version:
686
      feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
687
                      (local_version, node, remote_version))
688
      return True
689

    
690
    # checks vg existance and size > 20G
691

    
692
    bad = False
693
    if not vglist:
694
      feedback_fn("  - ERROR: unable to check volume groups on node %s." %
695
                      (node,))
696
      bad = True
697
    else:
698
      vgstatus = _HasValidVG(vglist, self.cfg.GetVGName())
699
      if vgstatus:
700
        feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
701
        bad = True
702

    
703
    # checks config file checksum
704
    # checks ssh to any
705

    
706
    if 'filelist' not in node_result:
707
      bad = True
708
      feedback_fn("  - ERROR: node hasn't returned file checksum data")
709
    else:
710
      remote_cksum = node_result['filelist']
711
      for file_name in file_list:
712
        if file_name not in remote_cksum:
713
          bad = True
714
          feedback_fn("  - ERROR: file '%s' missing" % file_name)
715
        elif remote_cksum[file_name] != local_cksum[file_name]:
716
          bad = True
717
          feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
718

    
719
    if 'nodelist' not in node_result:
720
      bad = True
721
      feedback_fn("  - ERROR: node hasn't returned node connectivity data")
722
    else:
723
      if node_result['nodelist']:
724
        bad = True
725
        for node in node_result['nodelist']:
726
          feedback_fn("  - ERROR: communication with node '%s': %s" %
727
                          (node, node_result['nodelist'][node]))
728
    hyp_result = node_result.get('hypervisor', None)
729
    if hyp_result is not None:
730
      feedback_fn("  - ERROR: hypervisor verify failure: '%s'" % hyp_result)
731
    return bad
732

    
733
  def _VerifyInstance(self, instance, node_vol_is, node_instance, feedback_fn):
734
    """Verify an instance.
735

736
    This function checks to see if the required block devices are
737
    available on the instance's node.
738

739
    """
740
    bad = False
741

    
742
    instancelist = self.cfg.GetInstanceList()
743
    if not instance in instancelist:
744
      feedback_fn("  - ERROR: instance %s not in instance list %s" %
745
                      (instance, instancelist))
746
      bad = True
747

    
748
    instanceconfig = self.cfg.GetInstanceInfo(instance)
749
    node_current = instanceconfig.primary_node
750

    
751
    node_vol_should = {}
752
    instanceconfig.MapLVsByNode(node_vol_should)
753

    
754
    for node in node_vol_should:
755
      for volume in node_vol_should[node]:
756
        if node not in node_vol_is or volume not in node_vol_is[node]:
757
          feedback_fn("  - ERROR: volume %s missing on node %s" %
758
                          (volume, node))
759
          bad = True
760

    
761
    if not instanceconfig.status == 'down':
762
      if not instance in node_instance[node_current]:
763
        feedback_fn("  - ERROR: instance %s not running on node %s" %
764
                        (instance, node_current))
765
        bad = True
766

    
767
    for node in node_instance:
768
      if (not node == node_current):
769
        if instance in node_instance[node]:
770
          feedback_fn("  - ERROR: instance %s should not run on node %s" %
771
                          (instance, node))
772
          bad = True
773

    
774
    return not bad
775

    
776
  def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
777
    """Verify if there are any unknown volumes in the cluster.
778

779
    The .os, .swap and backup volumes are ignored. All other volumes are
780
    reported as unknown.
781

782
    """
783
    bad = False
784

    
785
    for node in node_vol_is:
786
      for volume in node_vol_is[node]:
787
        if node not in node_vol_should or volume not in node_vol_should[node]:
788
          feedback_fn("  - ERROR: volume %s on node %s should not exist" %
789
                      (volume, node))
790
          bad = True
791
    return bad
792

    
793
  def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
794
    """Verify the list of running instances.
795

796
    This checks what instances are running but unknown to the cluster.
797

798
    """
799
    bad = False
800
    for node in node_instance:
801
      for runninginstance in node_instance[node]:
802
        if runninginstance not in instancelist:
803
          feedback_fn("  - ERROR: instance %s on node %s should not exist" %
804
                          (runninginstance, node))
805
          bad = True
806
    return bad
807

    
808
  def CheckPrereq(self):
809
    """Check prerequisites.
810

811
    This has no prerequisites.
812

813
    """
814
    pass
815

    
816
  def Exec(self, feedback_fn):
817
    """Verify integrity of cluster, performing various test on nodes.
818

819
    """
820
    bad = False
821
    feedback_fn("* Verifying global settings")
822
    self.cfg.VerifyConfig()
823

    
824
    master = self.sstore.GetMasterNode()
825
    vg_name = self.cfg.GetVGName()
826
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
827
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
828
    node_volume = {}
829
    node_instance = {}
830

    
831
    # FIXME: verify OS list
832
    # do local checksums
833
    file_names = list(self.sstore.GetFileList())
834
    file_names.append(constants.SSL_CERT_FILE)
835
    file_names.append(constants.CLUSTER_CONF_FILE)
836
    local_checksums = utils.FingerprintFiles(file_names)
837

    
838
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
839
    all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
840
    all_instanceinfo = rpc.call_instance_list(nodelist)
841
    all_vglist = rpc.call_vg_list(nodelist)
842
    node_verify_param = {
843
      'filelist': file_names,
844
      'nodelist': nodelist,
845
      'hypervisor': None,
846
      }
847
    all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
848
    all_rversion = rpc.call_version(nodelist)
849

    
850
    for node in nodelist:
851
      feedback_fn("* Verifying node %s" % node)
852
      result = self._VerifyNode(node, file_names, local_checksums,
853
                                all_vglist[node], all_nvinfo[node],
854
                                all_rversion[node], feedback_fn)
855
      bad = bad or result
856

    
857
      # node_volume
858
      volumeinfo = all_volumeinfo[node]
859

    
860
      if type(volumeinfo) != dict:
861
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
862
        bad = True
863
        continue
864

    
865
      node_volume[node] = volumeinfo
866

    
867
      # node_instance
868
      nodeinstance = all_instanceinfo[node]
869
      if type(nodeinstance) != list:
870
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
871
        bad = True
872
        continue
873

    
874
      node_instance[node] = nodeinstance
875

    
876
    node_vol_should = {}
877

    
878
    for instance in instancelist:
879
      feedback_fn("* Verifying instance %s" % instance)
880
      result =  self._VerifyInstance(instance, node_volume, node_instance,
881
                                     feedback_fn)
882
      bad = bad or result
883

    
884
      inst_config = self.cfg.GetInstanceInfo(instance)
885

    
886
      inst_config.MapLVsByNode(node_vol_should)
887

    
888
    feedback_fn("* Verifying orphan volumes")
889
    result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
890
                                       feedback_fn)
891
    bad = bad or result
892

    
893
    feedback_fn("* Verifying remaining instances")
894
    result = self._VerifyOrphanInstances(instancelist, node_instance,
895
                                         feedback_fn)
896
    bad = bad or result
897

    
898
    return int(bad)
899

    
900

    
901
def _WaitForSync(cfgw, instance, oneshot=False, unlock=False):
902
  """Sleep and poll for an instance's disk to sync.
903

904
  """
905
  if not instance.disks:
906
    return True
907

    
908
  if not oneshot:
909
    logger.ToStdout("Waiting for instance %s to sync disks." % instance.name)
910

    
911
  node = instance.primary_node
912

    
913
  for dev in instance.disks:
914
    cfgw.SetDiskID(dev, node)
915

    
916
  retries = 0
917
  while True:
918
    max_time = 0
919
    done = True
920
    cumul_degraded = False
921
    rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
922
    if not rstats:
923
      logger.ToStderr("Can't get any data from node %s" % node)
924
      retries += 1
925
      if retries >= 10:
926
        raise errors.RemoteError, ("Can't contact node %s for mirror data,"
927
                                   " aborting." % node)
928
      time.sleep(6)
929
      continue
930
    retries = 0
931
    for i in range(len(rstats)):
932
      mstat = rstats[i]
933
      if mstat is None:
934
        logger.ToStderr("Can't compute data for node %s/%s" %
935
                        (node, instance.disks[i].iv_name))
936
        continue
937
      perc_done, est_time, is_degraded = mstat
938
      cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
939
      if perc_done is not None:
940
        done = False
941
        if est_time is not None:
942
          rem_time = "%d estimated seconds remaining" % est_time
943
          max_time = est_time
944
        else:
945
          rem_time = "no time estimate"
946
        logger.ToStdout("- device %s: %5.2f%% done, %s" %
947
                        (instance.disks[i].iv_name, perc_done, rem_time))
948
    if done or oneshot:
949
      break
950

    
951
    if unlock:
952
      utils.Unlock('cmd')
953
    try:
954
      time.sleep(min(60, max_time))
955
    finally:
956
      if unlock:
957
        utils.Lock('cmd')
958

    
959
  if done:
960
    logger.ToStdout("Instance %s's disks are in sync." % instance.name)
961
  return not cumul_degraded
962

    
963

    
964
def _CheckDiskConsistency(cfgw, dev, node, on_primary):
965
  """Check that mirrors are not degraded.
966

967
  """
968
  cfgw.SetDiskID(dev, node)
969

    
970
  result = True
971
  if on_primary or dev.AssembleOnSecondary():
972
    rstats = rpc.call_blockdev_find(node, dev)
973
    if not rstats:
974
      logger.ToStderr("Can't get any data from node %s" % node)
975
      result = False
976
    else:
977
      result = result and (not rstats[5])
978
  if dev.children:
979
    for child in dev.children:
980
      result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
981

    
982
  return result
983

    
984

    
985
class LUDiagnoseOS(NoHooksLU):
986
  """Logical unit for OS diagnose/query.
987

988
  """
989
  _OP_REQP = []
990

    
991
  def CheckPrereq(self):
992
    """Check prerequisites.
993

994
    This always succeeds, since this is a pure query LU.
995

996
    """
997
    return
998

    
999
  def Exec(self, feedback_fn):
1000
    """Compute the list of OSes.
1001

1002
    """
1003
    node_list = self.cfg.GetNodeList()
1004
    node_data = rpc.call_os_diagnose(node_list)
1005
    if node_data == False:
1006
      raise errors.OpExecError, "Can't gather the list of OSes"
1007
    return node_data
1008

    
1009

    
1010
class LURemoveNode(LogicalUnit):
1011
  """Logical unit for removing a node.
1012

1013
  """
1014
  HPATH = "node-remove"
1015
  HTYPE = constants.HTYPE_NODE
1016
  _OP_REQP = ["node_name"]
1017

    
1018
  def BuildHooksEnv(self):
1019
    """Build hooks env.
1020

1021
    This doesn't run on the target node in the pre phase as a failed
1022
    node would not allows itself to run.
1023

1024
    """
1025
    env = {
1026
      "NODE_NAME": self.op.node_name,
1027
      }
1028
    all_nodes = self.cfg.GetNodeList()
1029
    all_nodes.remove(self.op.node_name)
1030
    return env, all_nodes, all_nodes
1031

    
1032
  def CheckPrereq(self):
1033
    """Check prerequisites.
1034

1035
    This checks:
1036
     - the node exists in the configuration
1037
     - it does not have primary or secondary instances
1038
     - it's not the master
1039

1040
    Any errors are signalled by raising errors.OpPrereqError.
1041

1042
    """
1043
    node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1044
    if node is None:
1045
      logger.Error("Error: Node '%s' is unknown." % self.op.node_name)
1046
      return 1
1047

    
1048
    instance_list = self.cfg.GetInstanceList()
1049

    
1050
    masternode = self.sstore.GetMasterNode()
1051
    if node.name == masternode:
1052
      raise errors.OpPrereqError, ("Node is the master node,"
1053
                                   " you need to failover first.")
1054

    
1055
    for instance_name in instance_list:
1056
      instance = self.cfg.GetInstanceInfo(instance_name)
1057
      if node.name == instance.primary_node:
1058
        raise errors.OpPrereqError, ("Instance %s still running on the node,"
1059
                                     " please remove first." % instance_name)
1060
      if node.name in instance.secondary_nodes:
1061
        raise errors.OpPrereqError, ("Instance %s has node as a secondary,"
1062
                                     " please remove first." % instance_name)
1063
    self.op.node_name = node.name
1064
    self.node = node
1065

    
1066
  def Exec(self, feedback_fn):
1067
    """Removes the node from the cluster.
1068

1069
    """
1070
    node = self.node
1071
    logger.Info("stopping the node daemon and removing configs from node %s" %
1072
                node.name)
1073

    
1074
    rpc.call_node_leave_cluster(node.name)
1075

    
1076
    ssh.SSHCall(node.name, 'root', "%s stop" % constants.NODE_INITD_SCRIPT)
1077

    
1078
    logger.Info("Removing node %s from config" % node.name)
1079

    
1080
    self.cfg.RemoveNode(node.name)
1081

    
1082

    
1083
class LUQueryNodes(NoHooksLU):
1084
  """Logical unit for querying nodes.
1085

1086
  """
1087
  _OP_REQP = ["output_fields"]
1088

    
1089
  def CheckPrereq(self):
1090
    """Check prerequisites.
1091

1092
    This checks that the fields required are valid output fields.
1093

1094
    """
1095
    self.dynamic_fields = frozenset(["dtotal", "dfree",
1096
                                     "mtotal", "mnode", "mfree"])
1097

    
1098
    _CheckOutputFields(static=["name", "pinst", "sinst", "pip", "sip"],
1099
                       dynamic=self.dynamic_fields,
1100
                       selected=self.op.output_fields)
1101

    
1102

    
1103
  def Exec(self, feedback_fn):
1104
    """Computes the list of nodes and their attributes.
1105

1106
    """
1107
    nodenames = utils.NiceSort(self.cfg.GetNodeList())
1108
    nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1109

    
1110

    
1111
    # begin data gathering
1112

    
1113
    if self.dynamic_fields.intersection(self.op.output_fields):
1114
      live_data = {}
1115
      node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1116
      for name in nodenames:
1117
        nodeinfo = node_data.get(name, None)
1118
        if nodeinfo:
1119
          live_data[name] = {
1120
            "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1121
            "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1122
            "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1123
            "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1124
            "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1125
            }
1126
        else:
1127
          live_data[name] = {}
1128
    else:
1129
      live_data = dict.fromkeys(nodenames, {})
1130

    
1131
    node_to_primary = dict.fromkeys(nodenames, 0)
1132
    node_to_secondary = dict.fromkeys(nodenames, 0)
1133

    
1134
    if "pinst" in self.op.output_fields or "sinst" in self.op.output_fields:
1135
      instancelist = self.cfg.GetInstanceList()
1136

    
1137
      for instance in instancelist:
1138
        instanceinfo = self.cfg.GetInstanceInfo(instance)
1139
        node_to_primary[instanceinfo.primary_node] += 1
1140
        for secnode in instanceinfo.secondary_nodes:
1141
          node_to_secondary[secnode] += 1
1142

    
1143
    # end data gathering
1144

    
1145
    output = []
1146
    for node in nodelist:
1147
      node_output = []
1148
      for field in self.op.output_fields:
1149
        if field == "name":
1150
          val = node.name
1151
        elif field == "pinst":
1152
          val = node_to_primary[node.name]
1153
        elif field == "sinst":
1154
          val = node_to_secondary[node.name]
1155
        elif field == "pip":
1156
          val = node.primary_ip
1157
        elif field == "sip":
1158
          val = node.secondary_ip
1159
        elif field in self.dynamic_fields:
1160
          val = live_data[node.name].get(field, "?")
1161
        else:
1162
          raise errors.ParameterError, field
1163
        val = str(val)
1164
        node_output.append(val)
1165
      output.append(node_output)
1166

    
1167
    return output
1168

    
1169

    
1170
class LUQueryNodeVolumes(NoHooksLU):
1171
  """Logical unit for getting volumes on node(s).
1172

1173
  """
1174
  _OP_REQP = ["nodes", "output_fields"]
1175

    
1176
  def CheckPrereq(self):
1177
    """Check prerequisites.
1178

1179
    This checks that the fields required are valid output fields.
1180

1181
    """
1182
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1183

    
1184
    _CheckOutputFields(static=["node"],
1185
                       dynamic=["phys", "vg", "name", "size", "instance"],
1186
                       selected=self.op.output_fields)
1187

    
1188

    
1189
  def Exec(self, feedback_fn):
1190
    """Computes the list of nodes and their attributes.
1191

1192
    """
1193
    nodenames = utils.NiceSort([node.name for node in self.nodes])
1194
    volumes = rpc.call_node_volumes(nodenames)
1195

    
1196
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
1197
             in self.cfg.GetInstanceList()]
1198

    
1199
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1200

    
1201
    output = []
1202
    for node in nodenames:
1203
      node_vols = volumes[node][:]
1204
      node_vols.sort(key=lambda vol: vol['dev'])
1205

    
1206
      for vol in node_vols:
1207
        node_output = []
1208
        for field in self.op.output_fields:
1209
          if field == "node":
1210
            val = node
1211
          elif field == "phys":
1212
            val = vol['dev']
1213
          elif field == "vg":
1214
            val = vol['vg']
1215
          elif field == "name":
1216
            val = vol['name']
1217
          elif field == "size":
1218
            val = int(float(vol['size']))
1219
          elif field == "instance":
1220
            for inst in ilist:
1221
              if node not in lv_by_node[inst]:
1222
                continue
1223
              if vol['name'] in lv_by_node[inst][node]:
1224
                val = inst.name
1225
                break
1226
            else:
1227
              val = '-'
1228
          else:
1229
            raise errors.ParameterError, field
1230
          node_output.append(str(val))
1231

    
1232
        output.append(node_output)
1233

    
1234
    return output
1235

    
1236

    
1237
class LUAddNode(LogicalUnit):
1238
  """Logical unit for adding node to the cluster.
1239

1240
  """
1241
  HPATH = "node-add"
1242
  HTYPE = constants.HTYPE_NODE
1243
  _OP_REQP = ["node_name"]
1244

    
1245
  def BuildHooksEnv(self):
1246
    """Build hooks env.
1247

1248
    This will run on all nodes before, and on all nodes + the new node after.
1249

1250
    """
1251
    env = {
1252
      "NODE_NAME": self.op.node_name,
1253
      "NODE_PIP": self.op.primary_ip,
1254
      "NODE_SIP": self.op.secondary_ip,
1255
      }
1256
    nodes_0 = self.cfg.GetNodeList()
1257
    nodes_1 = nodes_0 + [self.op.node_name, ]
1258
    return env, nodes_0, nodes_1
1259

    
1260
  def CheckPrereq(self):
1261
    """Check prerequisites.
1262

1263
    This checks:
1264
     - the new node is not already in the config
1265
     - it is resolvable
1266
     - its parameters (single/dual homed) matches the cluster
1267

1268
    Any errors are signalled by raising errors.OpPrereqError.
1269

1270
    """
1271
    node_name = self.op.node_name
1272
    cfg = self.cfg
1273

    
1274
    dns_data = utils.LookupHostname(node_name)
1275
    if not dns_data:
1276
      raise errors.OpPrereqError, ("Node %s is not resolvable" % node_name)
1277

    
1278
    node = dns_data['hostname']
1279
    primary_ip = self.op.primary_ip = dns_data['ip']
1280
    secondary_ip = getattr(self.op, "secondary_ip", None)
1281
    if secondary_ip is None:
1282
      secondary_ip = primary_ip
1283
    if not utils.IsValidIP(secondary_ip):
1284
      raise errors.OpPrereqError, ("Invalid secondary IP given")
1285
    self.op.secondary_ip = secondary_ip
1286
    node_list = cfg.GetNodeList()
1287
    if node in node_list:
1288
      raise errors.OpPrereqError, ("Node %s is already in the configuration"
1289
                                   % node)
1290

    
1291
    for existing_node_name in node_list:
1292
      existing_node = cfg.GetNodeInfo(existing_node_name)
1293
      if (existing_node.primary_ip == primary_ip or
1294
          existing_node.secondary_ip == primary_ip or
1295
          existing_node.primary_ip == secondary_ip or
1296
          existing_node.secondary_ip == secondary_ip):
1297
        raise errors.OpPrereqError, ("New node ip address(es) conflict with"
1298
                                     " existing node %s" % existing_node.name)
1299

    
1300
    # check that the type of the node (single versus dual homed) is the
1301
    # same as for the master
1302
    myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1303
    master_singlehomed = myself.secondary_ip == myself.primary_ip
1304
    newbie_singlehomed = secondary_ip == primary_ip
1305
    if master_singlehomed != newbie_singlehomed:
1306
      if master_singlehomed:
1307
        raise errors.OpPrereqError, ("The master has no private ip but the"
1308
                                     " new node has one")
1309
      else:
1310
        raise errors.OpPrereqError ("The master has a private ip but the"
1311
                                    " new node doesn't have one")
1312

    
1313
    # checks reachablity
1314
    command = ["fping", "-q", primary_ip]
1315
    result = utils.RunCmd(command)
1316
    if result.failed:
1317
      raise errors.OpPrereqError, ("Node not reachable by ping")
1318

    
1319
    if not newbie_singlehomed:
1320
      # check reachability from my secondary ip to newbie's secondary ip
1321
      command = ["fping", "-S%s" % myself.secondary_ip, "-q", secondary_ip]
1322
      result = utils.RunCmd(command)
1323
      if result.failed:
1324
        raise errors.OpPrereqError, ("Node secondary ip not reachable by ping")
1325

    
1326
    self.new_node = objects.Node(name=node,
1327
                                 primary_ip=primary_ip,
1328
                                 secondary_ip=secondary_ip)
1329

    
1330
  def Exec(self, feedback_fn):
1331
    """Adds the new node to the cluster.
1332

1333
    """
1334
    new_node = self.new_node
1335
    node = new_node.name
1336

    
1337
    # set up inter-node password and certificate and restarts the node daemon
1338
    gntpass = self.sstore.GetNodeDaemonPassword()
1339
    if not re.match('^[a-zA-Z0-9.]{1,64}$', gntpass):
1340
      raise errors.OpExecError, ("ganeti password corruption detected")
1341
    f = open(constants.SSL_CERT_FILE)
1342
    try:
1343
      gntpem = f.read(8192)
1344
    finally:
1345
      f.close()
1346
    # in the base64 pem encoding, neither '!' nor '.' are valid chars,
1347
    # so we use this to detect an invalid certificate; as long as the
1348
    # cert doesn't contain this, the here-document will be correctly
1349
    # parsed by the shell sequence below
1350
    if re.search('^!EOF\.', gntpem, re.MULTILINE):
1351
      raise errors.OpExecError, ("invalid PEM encoding in the SSL certificate")
1352
    if not gntpem.endswith("\n"):
1353
      raise errors.OpExecError, ("PEM must end with newline")
1354
    logger.Info("copy cluster pass to %s and starting the node daemon" % node)
1355

    
1356
    # remove first the root's known_hosts file
1357
    utils.RemoveFile("/root/.ssh/known_hosts")
1358
    # and then connect with ssh to set password and start ganeti-noded
1359
    # note that all the below variables are sanitized at this point,
1360
    # either by being constants or by the checks above
1361
    ss = self.sstore
1362
    mycommand = ("umask 077 && "
1363
                 "echo '%s' > '%s' && "
1364
                 "cat > '%s' << '!EOF.' && \n"
1365
                 "%s!EOF.\n%s restart" %
1366
                 (gntpass, ss.KeyToFilename(ss.SS_NODED_PASS),
1367
                  constants.SSL_CERT_FILE, gntpem,
1368
                  constants.NODE_INITD_SCRIPT))
1369

    
1370
    result = ssh.SSHCall(node, 'root', mycommand, batch=False, ask_key=True)
1371
    if result.failed:
1372
      raise errors.OpExecError, ("Remote command on node %s, error: %s,"
1373
                                 " output: %s" %
1374
                                 (node, result.fail_reason, result.output))
1375

    
1376
    # check connectivity
1377
    time.sleep(4)
1378

    
1379
    result = rpc.call_version([node])[node]
1380
    if result:
1381
      if constants.PROTOCOL_VERSION == result:
1382
        logger.Info("communication to node %s fine, sw version %s match" %
1383
                    (node, result))
1384
      else:
1385
        raise errors.OpExecError, ("Version mismatch master version %s,"
1386
                                   " node version %s" %
1387
                                   (constants.PROTOCOL_VERSION, result))
1388
    else:
1389
      raise errors.OpExecError, ("Cannot get version from the new node")
1390

    
1391
    # setup ssh on node
1392
    logger.Info("copy ssh key to node %s" % node)
1393
    keyarray = []
1394
    keyfiles = ["/etc/ssh/ssh_host_dsa_key", "/etc/ssh/ssh_host_dsa_key.pub",
1395
                "/etc/ssh/ssh_host_rsa_key", "/etc/ssh/ssh_host_rsa_key.pub",
1396
                "/root/.ssh/id_dsa", "/root/.ssh/id_dsa.pub"]
1397

    
1398
    for i in keyfiles:
1399
      f = open(i, 'r')
1400
      try:
1401
        keyarray.append(f.read())
1402
      finally:
1403
        f.close()
1404

    
1405
    result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1406
                               keyarray[3], keyarray[4], keyarray[5])
1407

    
1408
    if not result:
1409
      raise errors.OpExecError, ("Cannot transfer ssh keys to the new node")
1410

    
1411
    # Add node to our /etc/hosts, and add key to known_hosts
1412
    _UpdateEtcHosts(new_node.name, new_node.primary_ip)
1413
    _UpdateKnownHosts(new_node.name, new_node.primary_ip,
1414
                      self.cfg.GetHostKey())
1415

    
1416
    if new_node.secondary_ip != new_node.primary_ip:
1417
      result = ssh.SSHCall(node, "root",
1418
                           "fping -S 127.0.0.1 -q %s" % new_node.secondary_ip)
1419
      if result.failed:
1420
        raise errors.OpExecError, ("Node claims it doesn't have the"
1421
                                   " secondary ip you gave (%s).\n"
1422
                                   "Please fix and re-run this command." %
1423
                                   new_node.secondary_ip)
1424

    
1425
    # Distribute updated /etc/hosts and known_hosts to all nodes,
1426
    # including the node just added
1427
    myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1428
    dist_nodes = self.cfg.GetNodeList() + [node]
1429
    if myself.name in dist_nodes:
1430
      dist_nodes.remove(myself.name)
1431

    
1432
    logger.Debug("Copying hosts and known_hosts to all nodes")
1433
    for fname in ("/etc/hosts", "/etc/ssh/ssh_known_hosts"):
1434
      result = rpc.call_upload_file(dist_nodes, fname)
1435
      for to_node in dist_nodes:
1436
        if not result[to_node]:
1437
          logger.Error("copy of file %s to node %s failed" %
1438
                       (fname, to_node))
1439

    
1440
    to_copy = ss.GetFileList()
1441
    for fname in to_copy:
1442
      if not ssh.CopyFileToNode(node, fname):
1443
        logger.Error("could not copy file %s to node %s" % (fname, node))
1444

    
1445
    logger.Info("adding node %s to cluster.conf" % node)
1446
    self.cfg.AddNode(new_node)
1447

    
1448

    
1449
class LUMasterFailover(LogicalUnit):
1450
  """Failover the master node to the current node.
1451

1452
  This is a special LU in that it must run on a non-master node.
1453

1454
  """
1455
  HPATH = "master-failover"
1456
  HTYPE = constants.HTYPE_CLUSTER
1457
  REQ_MASTER = False
1458
  _OP_REQP = []
1459

    
1460
  def BuildHooksEnv(self):
1461
    """Build hooks env.
1462

1463
    This will run on the new master only in the pre phase, and on all
1464
    the nodes in the post phase.
1465

1466
    """
1467
    env = {
1468
      "NEW_MASTER": self.new_master,
1469
      "OLD_MASTER": self.old_master,
1470
      }
1471
    return env, [self.new_master], self.cfg.GetNodeList()
1472

    
1473
  def CheckPrereq(self):
1474
    """Check prerequisites.
1475

1476
    This checks that we are not already the master.
1477

1478
    """
1479
    self.new_master = socket.gethostname()
1480

    
1481
    self.old_master = self.sstore.GetMasterNode()
1482

    
1483
    if self.old_master == self.new_master:
1484
      raise errors.OpPrereqError, ("This commands must be run on the node"
1485
                                   " where you want the new master to be.\n"
1486
                                   "%s is already the master" %
1487
                                   self.old_master)
1488

    
1489
  def Exec(self, feedback_fn):
1490
    """Failover the master node.
1491

1492
    This command, when run on a non-master node, will cause the current
1493
    master to cease being master, and the non-master to become new
1494
    master.
1495

1496
    """
1497
    #TODO: do not rely on gethostname returning the FQDN
1498
    logger.Info("setting master to %s, old master: %s" %
1499
                (self.new_master, self.old_master))
1500

    
1501
    if not rpc.call_node_stop_master(self.old_master):
1502
      logger.Error("could disable the master role on the old master"
1503
                   " %s, please disable manually" % self.old_master)
1504

    
1505
    ss = self.sstore
1506
    ss.SetKey(ss.SS_MASTER_NODE, self.new_master)
1507
    if not rpc.call_upload_file(self.cfg.GetNodeList(),
1508
                                ss.KeyToFilename(ss.SS_MASTER_NODE)):
1509
      logger.Error("could not distribute the new simple store master file"
1510
                   " to the other nodes, please check.")
1511

    
1512
    if not rpc.call_node_start_master(self.new_master):
1513
      logger.Error("could not start the master role on the new master"
1514
                   " %s, please check" % self.new_master)
1515
      feedback_fn("Error in activating the master IP on the new master,\n"
1516
                  "please fix manually.")
1517

    
1518

    
1519

    
1520
class LUQueryClusterInfo(NoHooksLU):
1521
  """Query cluster configuration.
1522

1523
  """
1524
  _OP_REQP = []
1525
  REQ_MASTER = False
1526

    
1527
  def CheckPrereq(self):
1528
    """No prerequsites needed for this LU.
1529

1530
    """
1531
    pass
1532

    
1533
  def Exec(self, feedback_fn):
1534
    """Return cluster config.
1535

1536
    """
1537
    result = {
1538
      "name": self.sstore.GetClusterName(),
1539
      "software_version": constants.RELEASE_VERSION,
1540
      "protocol_version": constants.PROTOCOL_VERSION,
1541
      "config_version": constants.CONFIG_VERSION,
1542
      "os_api_version": constants.OS_API_VERSION,
1543
      "export_version": constants.EXPORT_VERSION,
1544
      "master": self.sstore.GetMasterNode(),
1545
      "architecture": (platform.architecture()[0], platform.machine()),
1546
      }
1547

    
1548
    return result
1549

    
1550

    
1551
class LUClusterCopyFile(NoHooksLU):
1552
  """Copy file to cluster.
1553

1554
  """
1555
  _OP_REQP = ["nodes", "filename"]
1556

    
1557
  def CheckPrereq(self):
1558
    """Check prerequisites.
1559

1560
    It should check that the named file exists and that the given list
1561
    of nodes is valid.
1562

1563
    """
1564
    if not os.path.exists(self.op.filename):
1565
      raise errors.OpPrereqError("No such filename '%s'" % self.op.filename)
1566

    
1567
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1568

    
1569
  def Exec(self, feedback_fn):
1570
    """Copy a file from master to some nodes.
1571

1572
    Args:
1573
      opts - class with options as members
1574
      args - list containing a single element, the file name
1575
    Opts used:
1576
      nodes - list containing the name of target nodes; if empty, all nodes
1577

1578
    """
1579
    filename = self.op.filename
1580

    
1581
    myname = socket.gethostname()
1582

    
1583
    for node in self.nodes:
1584
      if node == myname:
1585
        continue
1586
      if not ssh.CopyFileToNode(node, filename):
1587
        logger.Error("Copy of file %s to node %s failed" % (filename, node))
1588

    
1589

    
1590
class LUDumpClusterConfig(NoHooksLU):
1591
  """Return a text-representation of the cluster-config.
1592

1593
  """
1594
  _OP_REQP = []
1595

    
1596
  def CheckPrereq(self):
1597
    """No prerequisites.
1598

1599
    """
1600
    pass
1601

    
1602
  def Exec(self, feedback_fn):
1603
    """Dump a representation of the cluster config to the standard output.
1604

1605
    """
1606
    return self.cfg.DumpConfig()
1607

    
1608

    
1609
class LURunClusterCommand(NoHooksLU):
1610
  """Run a command on some nodes.
1611

1612
  """
1613
  _OP_REQP = ["command", "nodes"]
1614

    
1615
  def CheckPrereq(self):
1616
    """Check prerequisites.
1617

1618
    It checks that the given list of nodes is valid.
1619

1620
    """
1621
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1622

    
1623
  def Exec(self, feedback_fn):
1624
    """Run a command on some nodes.
1625

1626
    """
1627
    data = []
1628
    for node in self.nodes:
1629
      result = utils.RunCmd(["ssh", node.name, self.op.command])
1630
      data.append((node.name, result.cmd, result.output, result.exit_code))
1631

    
1632
    return data
1633

    
1634

    
1635
class LUActivateInstanceDisks(NoHooksLU):
1636
  """Bring up an instance's disks.
1637

1638
  """
1639
  _OP_REQP = ["instance_name"]
1640

    
1641
  def CheckPrereq(self):
1642
    """Check prerequisites.
1643

1644
    This checks that the instance is in the cluster.
1645

1646
    """
1647
    instance = self.cfg.GetInstanceInfo(
1648
      self.cfg.ExpandInstanceName(self.op.instance_name))
1649
    if instance is None:
1650
      raise errors.OpPrereqError, ("Instance '%s' not known" %
1651
                                   self.op.instance_name)
1652
    self.instance = instance
1653

    
1654

    
1655
  def Exec(self, feedback_fn):
1656
    """Activate the disks.
1657

1658
    """
1659
    disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
1660
    if not disks_ok:
1661
      raise errors.OpExecError, ("Cannot activate block devices")
1662

    
1663
    return disks_info
1664

    
1665

    
1666
def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
1667
  """Prepare the block devices for an instance.
1668

1669
  This sets up the block devices on all nodes.
1670

1671
  Args:
1672
    instance: a ganeti.objects.Instance object
1673
    ignore_secondaries: if true, errors on secondary nodes won't result
1674
                        in an error return from the function
1675

1676
  Returns:
1677
    false if the operation failed
1678
    list of (host, instance_visible_name, node_visible_name) if the operation
1679
         suceeded with the mapping from node devices to instance devices
1680
  """
1681
  device_info = []
1682
  disks_ok = True
1683
  for inst_disk in instance.disks:
1684
    master_result = None
1685
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1686
      cfg.SetDiskID(node_disk, node)
1687
      is_primary = node == instance.primary_node
1688
      result = rpc.call_blockdev_assemble(node, node_disk, is_primary)
1689
      if not result:
1690
        logger.Error("could not prepare block device %s on node %s (is_pri"
1691
                     "mary=%s)" % (inst_disk.iv_name, node, is_primary))
1692
        if is_primary or not ignore_secondaries:
1693
          disks_ok = False
1694
      if is_primary:
1695
        master_result = result
1696
    device_info.append((instance.primary_node, inst_disk.iv_name,
1697
                        master_result))
1698

    
1699
  return disks_ok, device_info
1700

    
1701

    
1702
def _StartInstanceDisks(cfg, instance, force):
1703
  disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
1704
                                           ignore_secondaries=force)
1705
  if not disks_ok:
1706
    _ShutdownInstanceDisks(instance, cfg)
1707
    if force is not None and not force:
1708
      logger.Error("If the message above refers to a secondary node,"
1709
                   " you can retry the operation using '--force'.")
1710
    raise errors.OpExecError, ("Disk consistency error")
1711

    
1712

    
1713
class LUDeactivateInstanceDisks(NoHooksLU):
1714
  """Shutdown an instance's disks.
1715

1716
  """
1717
  _OP_REQP = ["instance_name"]
1718

    
1719
  def CheckPrereq(self):
1720
    """Check prerequisites.
1721

1722
    This checks that the instance is in the cluster.
1723

1724
    """
1725
    instance = self.cfg.GetInstanceInfo(
1726
      self.cfg.ExpandInstanceName(self.op.instance_name))
1727
    if instance is None:
1728
      raise errors.OpPrereqError, ("Instance '%s' not known" %
1729
                                   self.op.instance_name)
1730
    self.instance = instance
1731

    
1732
  def Exec(self, feedback_fn):
1733
    """Deactivate the disks
1734

1735
    """
1736
    instance = self.instance
1737
    ins_l = rpc.call_instance_list([instance.primary_node])
1738
    ins_l = ins_l[instance.primary_node]
1739
    if not type(ins_l) is list:
1740
      raise errors.OpExecError, ("Can't contact node '%s'" %
1741
                                 instance.primary_node)
1742

    
1743
    if self.instance.name in ins_l:
1744
      raise errors.OpExecError, ("Instance is running, can't shutdown"
1745
                                 " block devices.")
1746

    
1747
    _ShutdownInstanceDisks(instance, self.cfg)
1748

    
1749

    
1750
def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
1751
  """Shutdown block devices of an instance.
1752

1753
  This does the shutdown on all nodes of the instance.
1754

1755
  If the ignore_primary is false, errors on the primary node are
1756
  ignored.
1757

1758
  """
1759
  result = True
1760
  for disk in instance.disks:
1761
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
1762
      cfg.SetDiskID(top_disk, node)
1763
      if not rpc.call_blockdev_shutdown(node, top_disk):
1764
        logger.Error("could not shutdown block device %s on node %s" %
1765
                     (disk.iv_name, node))
1766
        if not ignore_primary or node != instance.primary_node:
1767
          result = False
1768
  return result
1769

    
1770

    
1771
class LUStartupInstance(LogicalUnit):
1772
  """Starts an instance.
1773

1774
  """
1775
  HPATH = "instance-start"
1776
  HTYPE = constants.HTYPE_INSTANCE
1777
  _OP_REQP = ["instance_name", "force"]
1778

    
1779
  def BuildHooksEnv(self):
1780
    """Build hooks env.
1781

1782
    This runs on master, primary and secondary nodes of the instance.
1783

1784
    """
1785
    env = {
1786
      "FORCE": self.op.force,
1787
      }
1788
    env.update(_BuildInstanceHookEnvByObject(self.instance))
1789
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
1790
          list(self.instance.secondary_nodes))
1791
    return env, nl, nl
1792

    
1793
  def CheckPrereq(self):
1794
    """Check prerequisites.
1795

1796
    This checks that the instance is in the cluster.
1797

1798
    """
1799
    instance = self.cfg.GetInstanceInfo(
1800
      self.cfg.ExpandInstanceName(self.op.instance_name))
1801
    if instance is None:
1802
      raise errors.OpPrereqError, ("Instance '%s' not known" %
1803
                                   self.op.instance_name)
1804

    
1805
    # check bridges existance
1806
    brlist = [nic.bridge for nic in instance.nics]
1807
    if not rpc.call_bridges_exist(instance.primary_node, brlist):
1808
      raise errors.OpPrereqError, ("one or more target bridges %s does not"
1809
                                   " exist on destination node '%s'" %
1810
                                   (brlist, instance.primary_node))
1811

    
1812
    self.instance = instance
1813
    self.op.instance_name = instance.name
1814

    
1815
  def Exec(self, feedback_fn):
1816
    """Start the instance.
1817

1818
    """
1819
    instance = self.instance
1820
    force = self.op.force
1821
    extra_args = getattr(self.op, "extra_args", "")
1822

    
1823
    node_current = instance.primary_node
1824

    
1825
    nodeinfo = rpc.call_node_info([node_current], self.cfg.GetVGName())
1826
    if not nodeinfo:
1827
      raise errors.OpExecError, ("Could not contact node %s for infos" %
1828
                                 (node_current))
1829

    
1830
    freememory = nodeinfo[node_current]['memory_free']
1831
    memory = instance.memory
1832
    if memory > freememory:
1833
      raise errors.OpExecError, ("Not enough memory to start instance"
1834
                                 " %s on node %s"
1835
                                 " needed %s MiB, available %s MiB" %
1836
                                 (instance.name, node_current, memory,
1837
                                  freememory))
1838

    
1839
    _StartInstanceDisks(self.cfg, instance, force)
1840

    
1841
    if not rpc.call_instance_start(node_current, instance, extra_args):
1842
      _ShutdownInstanceDisks(instance, self.cfg)
1843
      raise errors.OpExecError, ("Could not start instance")
1844

    
1845
    self.cfg.MarkInstanceUp(instance.name)
1846

    
1847

    
1848
class LUShutdownInstance(LogicalUnit):
1849
  """Shutdown an instance.
1850

1851
  """
1852
  HPATH = "instance-stop"
1853
  HTYPE = constants.HTYPE_INSTANCE
1854
  _OP_REQP = ["instance_name"]
1855

    
1856
  def BuildHooksEnv(self):
1857
    """Build hooks env.
1858

1859
    This runs on master, primary and secondary nodes of the instance.
1860

1861
    """
1862
    env = _BuildInstanceHookEnvByObject(self.instance)
1863
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
1864
          list(self.instance.secondary_nodes))
1865
    return env, nl, nl
1866

    
1867
  def CheckPrereq(self):
1868
    """Check prerequisites.
1869

1870
    This checks that the instance is in the cluster.
1871

1872
    """
1873
    instance = self.cfg.GetInstanceInfo(
1874
      self.cfg.ExpandInstanceName(self.op.instance_name))
1875
    if instance is None:
1876
      raise errors.OpPrereqError, ("Instance '%s' not known" %
1877
                                   self.op.instance_name)
1878
    self.instance = instance
1879

    
1880
  def Exec(self, feedback_fn):
1881
    """Shutdown the instance.
1882

1883
    """
1884
    instance = self.instance
1885
    node_current = instance.primary_node
1886
    if not rpc.call_instance_shutdown(node_current, instance):
1887
      logger.Error("could not shutdown instance")
1888

    
1889
    self.cfg.MarkInstanceDown(instance.name)
1890
    _ShutdownInstanceDisks(instance, self.cfg)
1891

    
1892

    
1893
class LUReinstallInstance(LogicalUnit):
1894
  """Reinstall an instance.
1895

1896
  """
1897
  HPATH = "instance-reinstall"
1898
  HTYPE = constants.HTYPE_INSTANCE
1899
  _OP_REQP = ["instance_name"]
1900

    
1901
  def BuildHooksEnv(self):
1902
    """Build hooks env.
1903

1904
    This runs on master, primary and secondary nodes of the instance.
1905

1906
    """
1907
    env = _BuildInstanceHookEnvByObject(self.instance)
1908
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
1909
          list(self.instance.secondary_nodes))
1910
    return env, nl, nl
1911

    
1912
  def CheckPrereq(self):
1913
    """Check prerequisites.
1914

1915
    This checks that the instance is in the cluster and is not running.
1916

1917
    """
1918
    instance = self.cfg.GetInstanceInfo(
1919
      self.cfg.ExpandInstanceName(self.op.instance_name))
1920
    if instance is None:
1921
      raise errors.OpPrereqError, ("Instance '%s' not known" %
1922
                                   self.op.instance_name)
1923
    if instance.disk_template == constants.DT_DISKLESS:
1924
      raise errors.OpPrereqError, ("Instance '%s' has no disks" %
1925
                                   self.op.instance_name)
1926
    if instance.status != "down":
1927
      raise errors.OpPrereqError, ("Instance '%s' is marked to be up" %
1928
                                   self.op.instance_name)
1929
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
1930
    if remote_info:
1931
      raise errors.OpPrereqError, ("Instance '%s' is running on the node %s" %
1932
                                   (self.op.instance_name,
1933
                                    instance.primary_node))
1934

    
1935
    self.op.os_type = getattr(self.op, "os_type", None)
1936
    if self.op.os_type is not None:
1937
      # OS verification
1938
      pnode = self.cfg.GetNodeInfo(
1939
        self.cfg.ExpandNodeName(instance.primary_node))
1940
      if pnode is None:
1941
        raise errors.OpPrereqError, ("Primary node '%s' is unknown" %
1942
                                     self.op.pnode)
1943
      os_obj = rpc.call_os_get([pnode.name], self.op.os_type)[pnode.name]
1944
      if not isinstance(os_obj, objects.OS):
1945
        raise errors.OpPrereqError, ("OS '%s' not in supported OS list for"
1946
                                     " primary node"  % self.op.os_type)
1947

    
1948
    self.instance = instance
1949

    
1950
  def Exec(self, feedback_fn):
1951
    """Reinstall the instance.
1952

1953
    """
1954
    inst = self.instance
1955

    
1956
    if self.op.os_type is not None:
1957
      feedback_fn("Changing OS to '%s'..." % self.op.os_type)
1958
      inst.os = self.op.os_type
1959
      self.cfg.AddInstance(inst)
1960

    
1961
    _StartInstanceDisks(self.cfg, inst, None)
1962
    try:
1963
      feedback_fn("Running the instance OS create scripts...")
1964
      if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
1965
        raise errors.OpExecError, ("Could not install OS for instance %s "
1966
                                   "on node %s" %
1967
                                   (inst.name, inst.primary_node))
1968
    finally:
1969
      _ShutdownInstanceDisks(inst, self.cfg)
1970

    
1971

    
1972
class LURemoveInstance(LogicalUnit):
1973
  """Remove an instance.
1974

1975
  """
1976
  HPATH = "instance-remove"
1977
  HTYPE = constants.HTYPE_INSTANCE
1978
  _OP_REQP = ["instance_name"]
1979

    
1980
  def BuildHooksEnv(self):
1981
    """Build hooks env.
1982

1983
    This runs on master, primary and secondary nodes of the instance.
1984

1985
    """
1986
    env = _BuildInstanceHookEnvByObject(self.instance)
1987
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
1988
          list(self.instance.secondary_nodes))
1989
    return env, nl, nl
1990

    
1991
  def CheckPrereq(self):
1992
    """Check prerequisites.
1993

1994
    This checks that the instance is in the cluster.
1995

1996
    """
1997
    instance = self.cfg.GetInstanceInfo(
1998
      self.cfg.ExpandInstanceName(self.op.instance_name))
1999
    if instance is None:
2000
      raise errors.OpPrereqError, ("Instance '%s' not known" %
2001
                                   self.op.instance_name)
2002
    self.instance = instance
2003

    
2004
  def Exec(self, feedback_fn):
2005
    """Remove the instance.
2006

2007
    """
2008
    instance = self.instance
2009
    logger.Info("shutting down instance %s on node %s" %
2010
                (instance.name, instance.primary_node))
2011

    
2012
    if not rpc.call_instance_shutdown(instance.primary_node, instance):
2013
      raise errors.OpExecError, ("Could not shutdown instance %s on node %s" %
2014
                                 (instance.name, instance.primary_node))
2015

    
2016
    logger.Info("removing block devices for instance %s" % instance.name)
2017

    
2018
    _RemoveDisks(instance, self.cfg)
2019

    
2020
    logger.Info("removing instance %s out of cluster config" % instance.name)
2021

    
2022
    self.cfg.RemoveInstance(instance.name)
2023

    
2024

    
2025
class LUQueryInstances(NoHooksLU):
2026
  """Logical unit for querying instances.
2027

2028
  """
2029
  _OP_REQP = ["output_fields"]
2030

    
2031
  def CheckPrereq(self):
2032
    """Check prerequisites.
2033

2034
    This checks that the fields required are valid output fields.
2035

2036
    """
2037
    self.dynamic_fields = frozenset(["oper_state", "oper_ram"])
2038
    _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2039
                               "admin_state", "admin_ram",
2040
                               "disk_template", "ip", "mac", "bridge"],
2041
                       dynamic=self.dynamic_fields,
2042
                       selected=self.op.output_fields)
2043

    
2044
  def Exec(self, feedback_fn):
2045
    """Computes the list of nodes and their attributes.
2046

2047
    """
2048
    instance_names = utils.NiceSort(self.cfg.GetInstanceList())
2049
    instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2050
                     in instance_names]
2051

    
2052
    # begin data gathering
2053

    
2054
    nodes = frozenset([inst.primary_node for inst in instance_list])
2055

    
2056
    bad_nodes = []
2057
    if self.dynamic_fields.intersection(self.op.output_fields):
2058
      live_data = {}
2059
      node_data = rpc.call_all_instances_info(nodes)
2060
      for name in nodes:
2061
        result = node_data[name]
2062
        if result:
2063
          live_data.update(result)
2064
        elif result == False:
2065
          bad_nodes.append(name)
2066
        # else no instance is alive
2067
    else:
2068
      live_data = dict([(name, {}) for name in instance_names])
2069

    
2070
    # end data gathering
2071

    
2072
    output = []
2073
    for instance in instance_list:
2074
      iout = []
2075
      for field in self.op.output_fields:
2076
        if field == "name":
2077
          val = instance.name
2078
        elif field == "os":
2079
          val = instance.os
2080
        elif field == "pnode":
2081
          val = instance.primary_node
2082
        elif field == "snodes":
2083
          val = ",".join(instance.secondary_nodes) or "-"
2084
        elif field == "admin_state":
2085
          if instance.status == "down":
2086
            val = "no"
2087
          else:
2088
            val = "yes"
2089
        elif field == "oper_state":
2090
          if instance.primary_node in bad_nodes:
2091
            val = "(node down)"
2092
          else:
2093
            if live_data.get(instance.name):
2094
              val = "running"
2095
            else:
2096
              val = "stopped"
2097
        elif field == "admin_ram":
2098
          val = instance.memory
2099
        elif field == "oper_ram":
2100
          if instance.primary_node in bad_nodes:
2101
            val = "(node down)"
2102
          elif instance.name in live_data:
2103
            val = live_data[instance.name].get("memory", "?")
2104
          else:
2105
            val = "-"
2106
        elif field == "disk_template":
2107
          val = instance.disk_template
2108
        elif field == "ip":
2109
          val = instance.nics[0].ip
2110
        elif field == "bridge":
2111
          val = instance.nics[0].bridge
2112
        elif field == "mac":
2113
          val = instance.nics[0].mac
2114
        else:
2115
          raise errors.ParameterError, field
2116
        val = str(val)
2117
        iout.append(val)
2118
      output.append(iout)
2119

    
2120
    return output
2121

    
2122

    
2123
class LUFailoverInstance(LogicalUnit):
2124
  """Failover an instance.
2125

2126
  """
2127
  HPATH = "instance-failover"
2128
  HTYPE = constants.HTYPE_INSTANCE
2129
  _OP_REQP = ["instance_name", "ignore_consistency"]
2130

    
2131
  def BuildHooksEnv(self):
2132
    """Build hooks env.
2133

2134
    This runs on master, primary and secondary nodes of the instance.
2135

2136
    """
2137
    env = {
2138
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2139
      }
2140
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2141
    nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2142
    return env, nl, nl
2143

    
2144
  def CheckPrereq(self):
2145
    """Check prerequisites.
2146

2147
    This checks that the instance is in the cluster.
2148

2149
    """
2150
    instance = self.cfg.GetInstanceInfo(
2151
      self.cfg.ExpandInstanceName(self.op.instance_name))
2152
    if instance is None:
2153
      raise errors.OpPrereqError, ("Instance '%s' not known" %
2154
                                   self.op.instance_name)
2155

    
2156
    # check memory requirements on the secondary node
2157
    target_node = instance.secondary_nodes[0]
2158
    nodeinfo = rpc.call_node_info([target_node], self.cfg.GetVGName())
2159
    info = nodeinfo.get(target_node, None)
2160
    if not info:
2161
      raise errors.OpPrereqError, ("Cannot get current information"
2162
                                   " from node '%s'" % nodeinfo)
2163
    if instance.memory > info['memory_free']:
2164
      raise errors.OpPrereqError, ("Not enough memory on target node %s."
2165
                                   " %d MB available, %d MB required" %
2166
                                   (target_node, info['memory_free'],
2167
                                    instance.memory))
2168

    
2169
    # check bridge existance
2170
    brlist = [nic.bridge for nic in instance.nics]
2171
    if not rpc.call_bridges_exist(instance.primary_node, brlist):
2172
      raise errors.OpPrereqError, ("one or more target bridges %s does not"
2173
                                   " exist on destination node '%s'" %
2174
                                   (brlist, instance.primary_node))
2175

    
2176
    self.instance = instance
2177

    
2178
  def Exec(self, feedback_fn):
2179
    """Failover an instance.
2180

2181
    The failover is done by shutting it down on its present node and
2182
    starting it on the secondary.
2183

2184
    """
2185
    instance = self.instance
2186

    
2187
    source_node = instance.primary_node
2188
    target_node = instance.secondary_nodes[0]
2189

    
2190
    feedback_fn("* checking disk consistency between source and target")
2191
    for dev in instance.disks:
2192
      # for remote_raid1, these are md over drbd
2193
      if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2194
        if not self.op.ignore_consistency:
2195
          raise errors.OpExecError, ("Disk %s is degraded on target node,"
2196
                                     " aborting failover." % dev.iv_name)
2197

    
2198
    feedback_fn("* checking target node resource availability")
2199
    nodeinfo = rpc.call_node_info([target_node], self.cfg.GetVGName())
2200

    
2201
    if not nodeinfo:
2202
      raise errors.OpExecError, ("Could not contact target node %s." %
2203
                                 target_node)
2204

    
2205
    free_memory = int(nodeinfo[target_node]['memory_free'])
2206
    memory = instance.memory
2207
    if memory > free_memory:
2208
      raise errors.OpExecError, ("Not enough memory to create instance %s on"
2209
                                 " node %s. needed %s MiB, available %s MiB" %
2210
                                 (instance.name, target_node, memory,
2211
                                  free_memory))
2212

    
2213
    feedback_fn("* shutting down instance on source node")
2214
    logger.Info("Shutting down instance %s on node %s" %
2215
                (instance.name, source_node))
2216

    
2217
    if not rpc.call_instance_shutdown(source_node, instance):
2218
      logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2219
                   " anyway. Please make sure node %s is down"  %
2220
                   (instance.name, source_node, source_node))
2221

    
2222
    feedback_fn("* deactivating the instance's disks on source node")
2223
    if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2224
      raise errors.OpExecError, ("Can't shut down the instance's disks.")
2225

    
2226
    instance.primary_node = target_node
2227
    # distribute new instance config to the other nodes
2228
    self.cfg.AddInstance(instance)
2229

    
2230
    feedback_fn("* activating the instance's disks on target node")
2231
    logger.Info("Starting instance %s on node %s" %
2232
                (instance.name, target_node))
2233

    
2234
    disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2235
                                             ignore_secondaries=True)
2236
    if not disks_ok:
2237
      _ShutdownInstanceDisks(instance, self.cfg)
2238
      raise errors.OpExecError, ("Can't activate the instance's disks")
2239

    
2240
    feedback_fn("* starting the instance on the target node")
2241
    if not rpc.call_instance_start(target_node, instance, None):
2242
      _ShutdownInstanceDisks(instance, self.cfg)
2243
      raise errors.OpExecError("Could not start instance %s on node %s." %
2244
                               (instance.name, target_node))
2245

    
2246

    
2247
def _CreateBlockDevOnPrimary(cfg, node, device):
2248
  """Create a tree of block devices on the primary node.
2249

2250
  This always creates all devices.
2251

2252
  """
2253
  if device.children:
2254
    for child in device.children:
2255
      if not _CreateBlockDevOnPrimary(cfg, node, child):
2256
        return False
2257

    
2258
  cfg.SetDiskID(device, node)
2259
  new_id = rpc.call_blockdev_create(node, device, device.size, True)
2260
  if not new_id:
2261
    return False
2262
  if device.physical_id is None:
2263
    device.physical_id = new_id
2264
  return True
2265

    
2266

    
2267
def _CreateBlockDevOnSecondary(cfg, node, device, force):
2268
  """Create a tree of block devices on a secondary node.
2269

2270
  If this device type has to be created on secondaries, create it and
2271
  all its children.
2272

2273
  If not, just recurse to children keeping the same 'force' value.
2274

2275
  """
2276
  if device.CreateOnSecondary():
2277
    force = True
2278
  if device.children:
2279
    for child in device.children:
2280
      if not _CreateBlockDevOnSecondary(cfg, node, child, force):
2281
        return False
2282

    
2283
  if not force:
2284
    return True
2285
  cfg.SetDiskID(device, node)
2286
  new_id = rpc.call_blockdev_create(node, device, device.size, False)
2287
  if not new_id:
2288
    return False
2289
  if device.physical_id is None:
2290
    device.physical_id = new_id
2291
  return True
2292

    
2293

    
2294
def _GenerateMDDRBDBranch(cfg, vgname, primary, secondary, size, base):
2295
  """Generate a drbd device complete with its children.
2296

2297
  """
2298
  port = cfg.AllocatePort()
2299
  base = "%s_%s" % (base, port)
2300
  dev_data = objects.Disk(dev_type="lvm", size=size,
2301
                          logical_id=(vgname, "%s.data" % base))
2302
  dev_meta = objects.Disk(dev_type="lvm", size=128,
2303
                          logical_id=(vgname, "%s.meta" % base))
2304
  drbd_dev = objects.Disk(dev_type="drbd", size=size,
2305
                          logical_id = (primary, secondary, port),
2306
                          children = [dev_data, dev_meta])
2307
  return drbd_dev
2308

    
2309

    
2310
def _GenerateDiskTemplate(cfg, vgname, template_name,
2311
                          instance_name, primary_node,
2312
                          secondary_nodes, disk_sz, swap_sz):
2313
  """Generate the entire disk layout for a given template type.
2314

2315
  """
2316
  #TODO: compute space requirements
2317

    
2318
  if template_name == "diskless":
2319
    disks = []
2320
  elif template_name == "plain":
2321
    if len(secondary_nodes) != 0:
2322
      raise errors.ProgrammerError("Wrong template configuration")
2323
    sda_dev = objects.Disk(dev_type="lvm", size=disk_sz,
2324
                           logical_id=(vgname, "%s.os" % instance_name),
2325
                           iv_name = "sda")
2326
    sdb_dev = objects.Disk(dev_type="lvm", size=swap_sz,
2327
                           logical_id=(vgname, "%s.swap" % instance_name),
2328
                           iv_name = "sdb")
2329
    disks = [sda_dev, sdb_dev]
2330
  elif template_name == "local_raid1":
2331
    if len(secondary_nodes) != 0:
2332
      raise errors.ProgrammerError("Wrong template configuration")
2333
    sda_dev_m1 = objects.Disk(dev_type="lvm", size=disk_sz,
2334
                              logical_id=(vgname, "%s.os_m1" % instance_name))
2335
    sda_dev_m2 = objects.Disk(dev_type="lvm", size=disk_sz,
2336
                              logical_id=(vgname, "%s.os_m2" % instance_name))
2337
    md_sda_dev = objects.Disk(dev_type="md_raid1", iv_name = "sda",
2338
                              size=disk_sz,
2339
                              children = [sda_dev_m1, sda_dev_m2])
2340
    sdb_dev_m1 = objects.Disk(dev_type="lvm", size=swap_sz,
2341
                              logical_id=(vgname, "%s.swap_m1" %
2342
                                          instance_name))
2343
    sdb_dev_m2 = objects.Disk(dev_type="lvm", size=swap_sz,
2344
                              logical_id=(vgname, "%s.swap_m2" %
2345
                                          instance_name))
2346
    md_sdb_dev = objects.Disk(dev_type="md_raid1", iv_name = "sdb",
2347
                              size=swap_sz,
2348
                              children = [sdb_dev_m1, sdb_dev_m2])
2349
    disks = [md_sda_dev, md_sdb_dev]
2350
  elif template_name == "remote_raid1":
2351
    if len(secondary_nodes) != 1:
2352
      raise errors.ProgrammerError("Wrong template configuration")
2353
    remote_node = secondary_nodes[0]
2354
    drbd_sda_dev = _GenerateMDDRBDBranch(cfg, vgname,
2355
                                         primary_node, remote_node, disk_sz,
2356
                                         "%s-sda" % instance_name)
2357
    md_sda_dev = objects.Disk(dev_type="md_raid1", iv_name="sda",
2358
                              children = [drbd_sda_dev], size=disk_sz)
2359
    drbd_sdb_dev = _GenerateMDDRBDBranch(cfg, vgname,
2360
                                         primary_node, remote_node, swap_sz,
2361
                                         "%s-sdb" % instance_name)
2362
    md_sdb_dev = objects.Disk(dev_type="md_raid1", iv_name="sdb",
2363
                              children = [drbd_sdb_dev], size=swap_sz)
2364
    disks = [md_sda_dev, md_sdb_dev]
2365
  else:
2366
    raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
2367
  return disks
2368

    
2369

    
2370
def _CreateDisks(cfg, instance):
2371
  """Create all disks for an instance.
2372

2373
  This abstracts away some work from AddInstance.
2374

2375
  Args:
2376
    instance: the instance object
2377

2378
  Returns:
2379
    True or False showing the success of the creation process
2380

2381
  """
2382
  for device in instance.disks:
2383
    logger.Info("creating volume %s for instance %s" %
2384
              (device.iv_name, instance.name))
2385
    #HARDCODE
2386
    for secondary_node in instance.secondary_nodes:
2387
      if not _CreateBlockDevOnSecondary(cfg, secondary_node, device, False):
2388
        logger.Error("failed to create volume %s (%s) on secondary node %s!" %
2389
                     (device.iv_name, device, secondary_node))
2390
        return False
2391
    #HARDCODE
2392
    if not _CreateBlockDevOnPrimary(cfg, instance.primary_node, device):
2393
      logger.Error("failed to create volume %s on primary!" %
2394
                   device.iv_name)
2395
      return False
2396
  return True
2397

    
2398

    
2399
def _RemoveDisks(instance, cfg):
2400
  """Remove all disks for an instance.
2401

2402
  This abstracts away some work from `AddInstance()` and
2403
  `RemoveInstance()`. Note that in case some of the devices couldn't
2404
  be remove, the removal will continue with the other ones (compare
2405
  with `_CreateDisks()`).
2406

2407
  Args:
2408
    instance: the instance object
2409

2410
  Returns:
2411
    True or False showing the success of the removal proces
2412

2413
  """
2414
  logger.Info("removing block devices for instance %s" % instance.name)
2415

    
2416
  result = True
2417
  for device in instance.disks:
2418
    for node, disk in device.ComputeNodeTree(instance.primary_node):
2419
      cfg.SetDiskID(disk, node)
2420
      if not rpc.call_blockdev_remove(node, disk):
2421
        logger.Error("could not remove block device %s on node %s,"
2422
                     " continuing anyway" %
2423
                     (device.iv_name, node))
2424
        result = False
2425
  return result
2426

    
2427

    
2428
class LUCreateInstance(LogicalUnit):
2429
  """Create an instance.
2430

2431
  """
2432
  HPATH = "instance-add"
2433
  HTYPE = constants.HTYPE_INSTANCE
2434
  _OP_REQP = ["instance_name", "mem_size", "disk_size", "pnode",
2435
              "disk_template", "swap_size", "mode", "start", "vcpus",
2436
              "wait_for_sync"]
2437

    
2438
  def BuildHooksEnv(self):
2439
    """Build hooks env.
2440

2441
    This runs on master, primary and secondary nodes of the instance.
2442

2443
    """
2444
    env = {
2445
      "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
2446
      "INSTANCE_DISK_SIZE": self.op.disk_size,
2447
      "INSTANCE_SWAP_SIZE": self.op.swap_size,
2448
      "INSTANCE_ADD_MODE": self.op.mode,
2449
      }
2450
    if self.op.mode == constants.INSTANCE_IMPORT:
2451
      env["INSTANCE_SRC_NODE"] = self.op.src_node
2452
      env["INSTANCE_SRC_PATH"] = self.op.src_path
2453
      env["INSTANCE_SRC_IMAGE"] = self.src_image
2454

    
2455
    env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
2456
      primary_node=self.op.pnode,
2457
      secondary_nodes=self.secondaries,
2458
      status=self.instance_status,
2459
      os=self.op.os_type,
2460
      memory=self.op.mem_size,
2461
      vcpus=self.op.vcpus,
2462
      nics=[(self.inst_ip, self.op.bridge)],
2463
    ))
2464

    
2465
    nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
2466
          self.secondaries)
2467
    return env, nl, nl
2468

    
2469

    
2470
  def CheckPrereq(self):
2471
    """Check prerequisites.
2472

2473
    """
2474
    if self.op.mode not in (constants.INSTANCE_CREATE,
2475
                            constants.INSTANCE_IMPORT):
2476
      raise errors.OpPrereqError, ("Invalid instance creation mode '%s'" %
2477
                                   self.op.mode)
2478

    
2479
    if self.op.mode == constants.INSTANCE_IMPORT:
2480
      src_node = getattr(self.op, "src_node", None)
2481
      src_path = getattr(self.op, "src_path", None)
2482
      if src_node is None or src_path is None:
2483
        raise errors.OpPrereqError, ("Importing an instance requires source"
2484
                                     " node and path options")
2485
      src_node_full = self.cfg.ExpandNodeName(src_node)
2486
      if src_node_full is None:
2487
        raise errors.OpPrereqError, ("Unknown source node '%s'" % src_node)
2488
      self.op.src_node = src_node = src_node_full
2489

    
2490
      if not os.path.isabs(src_path):
2491
        raise errors.OpPrereqError, ("The source path must be absolute")
2492

    
2493
      export_info = rpc.call_export_info(src_node, src_path)
2494

    
2495
      if not export_info:
2496
        raise errors.OpPrereqError, ("No export found in dir %s" % src_path)
2497

    
2498
      if not export_info.has_section(constants.INISECT_EXP):
2499
        raise errors.ProgrammerError, ("Corrupted export config")
2500

    
2501
      ei_version = export_info.get(constants.INISECT_EXP, 'version')
2502
      if (int(ei_version) != constants.EXPORT_VERSION):
2503
        raise errors.OpPrereqError, ("Wrong export version %s (wanted %d)" %
2504
                                     (ei_version, constants.EXPORT_VERSION))
2505

    
2506
      if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
2507
        raise errors.OpPrereqError, ("Can't import instance with more than"
2508
                                     " one data disk")
2509

    
2510
      # FIXME: are the old os-es, disk sizes, etc. useful?
2511
      self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
2512
      diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
2513
                                                         'disk0_dump'))
2514
      self.src_image = diskimage
2515
    else: # INSTANCE_CREATE
2516
      if getattr(self.op, "os_type", None) is None:
2517
        raise errors.OpPrereqError, ("No guest OS specified")
2518

    
2519
    # check primary node
2520
    pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
2521
    if pnode is None:
2522
      raise errors.OpPrereqError, ("Primary node '%s' is unknown" %
2523
                                   self.op.pnode)
2524
    self.op.pnode = pnode.name
2525
    self.pnode = pnode
2526
    self.secondaries = []
2527
    # disk template and mirror node verification
2528
    if self.op.disk_template not in constants.DISK_TEMPLATES:
2529
      raise errors.OpPrereqError, ("Invalid disk template name")
2530

    
2531
    if self.op.disk_template == constants.DT_REMOTE_RAID1:
2532
      if getattr(self.op, "snode", None) is None:
2533
        raise errors.OpPrereqError, ("The 'remote_raid1' disk template needs"
2534
                                     " a mirror node")
2535

    
2536
      snode_name = self.cfg.ExpandNodeName(self.op.snode)
2537
      if snode_name is None:
2538
        raise errors.OpPrereqError, ("Unknown secondary node '%s'" %
2539
                                     self.op.snode)
2540
      elif snode_name == pnode.name:
2541
        raise errors.OpPrereqError, ("The secondary node cannot be"
2542
                                     " the primary node.")
2543
      self.secondaries.append(snode_name)
2544

    
2545
    # Check lv size requirements
2546
    nodenames = [pnode.name] + self.secondaries
2547
    nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
2548

    
2549
    # Required free disk space as a function of disk and swap space
2550
    req_size_dict = {
2551
      constants.DT_DISKLESS: 0,
2552
      constants.DT_PLAIN: self.op.disk_size + self.op.swap_size,
2553
      constants.DT_LOCAL_RAID1: (self.op.disk_size + self.op.swap_size) * 2,
2554
      # 256 MB are added for drbd metadata, 128MB for each drbd device
2555
      constants.DT_REMOTE_RAID1: self.op.disk_size + self.op.swap_size + 256,
2556
    }
2557

    
2558
    if self.op.disk_template not in req_size_dict:
2559
      raise errors.ProgrammerError, ("Disk template '%s' size requirement"
2560
                                     " is unknown" %  self.op.disk_template)
2561

    
2562
    req_size = req_size_dict[self.op.disk_template]
2563

    
2564
    for node in nodenames:
2565
      info = nodeinfo.get(node, None)
2566
      if not info:
2567
        raise errors.OpPrereqError, ("Cannot get current information"
2568
                                     " from node '%s'" % nodeinfo)
2569
      if req_size > info['vg_free']:
2570
        raise errors.OpPrereqError, ("Not enough disk space on target node %s."
2571
                                     " %d MB available, %d MB required" %
2572
                                     (node, info['vg_free'], req_size))
2573

    
2574
    # os verification
2575
    os_obj = rpc.call_os_get([pnode.name], self.op.os_type)[pnode.name]
2576
    if not isinstance(os_obj, objects.OS):
2577
      raise errors.OpPrereqError, ("OS '%s' not in supported os list for"
2578
                                   " primary node"  % self.op.os_type)
2579

    
2580
    # instance verification
2581
    hostname1 = utils.LookupHostname(self.op.instance_name)
2582
    if not hostname1:
2583
      raise errors.OpPrereqError, ("Instance name '%s' not found in dns" %
2584
                                   self.op.instance_name)
2585

    
2586
    self.op.instance_name = instance_name = hostname1['hostname']
2587
    instance_list = self.cfg.GetInstanceList()
2588
    if instance_name in instance_list:
2589
      raise errors.OpPrereqError, ("Instance '%s' is already in the cluster" %
2590
                                   instance_name)
2591

    
2592
    ip = getattr(self.op, "ip", None)
2593
    if ip is None or ip.lower() == "none":
2594
      inst_ip = None
2595
    elif ip.lower() == "auto":
2596
      inst_ip = hostname1['ip']
2597
    else:
2598
      if not utils.IsValidIP(ip):
2599
        raise errors.OpPrereqError, ("given IP address '%s' doesn't look"
2600
                                     " like a valid IP" % ip)
2601
      inst_ip = ip
2602
    self.inst_ip = inst_ip
2603

    
2604
    command = ["fping", "-q", hostname1['ip']]
2605
    result = utils.RunCmd(command)
2606
    if not result.failed:
2607
      raise errors.OpPrereqError, ("IP %s of instance %s already in use" %
2608
                                   (hostname1['ip'], instance_name))
2609

    
2610
    # bridge verification
2611
    bridge = getattr(self.op, "bridge", None)
2612
    if bridge is None:
2613
      self.op.bridge = self.cfg.GetDefBridge()
2614
    else:
2615
      self.op.bridge = bridge
2616

    
2617
    if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
2618
      raise errors.OpPrereqError, ("target bridge '%s' does not exist on"
2619
                                   " destination node '%s'" %
2620
                                   (self.op.bridge, pnode.name))
2621

    
2622
    if self.op.start:
2623
      self.instance_status = 'up'
2624
    else:
2625
      self.instance_status = 'down'
2626

    
2627
  def Exec(self, feedback_fn):
2628
    """Create and add the instance to the cluster.
2629

2630
    """
2631
    instance = self.op.instance_name
2632
    pnode_name = self.pnode.name
2633

    
2634
    nic = objects.NIC(bridge=self.op.bridge, mac=self.cfg.GenerateMAC())
2635
    if self.inst_ip is not None:
2636
      nic.ip = self.inst_ip
2637

    
2638
    disks = _GenerateDiskTemplate(self.cfg, self.cfg.GetVGName(),
2639
                                  self.op.disk_template,
2640
                                  instance, pnode_name,
2641
                                  self.secondaries, self.op.disk_size,
2642
                                  self.op.swap_size)
2643

    
2644
    iobj = objects.Instance(name=instance, os=self.op.os_type,
2645
                            primary_node=pnode_name,
2646
                            memory=self.op.mem_size,
2647
                            vcpus=self.op.vcpus,
2648
                            nics=[nic], disks=disks,
2649
                            disk_template=self.op.disk_template,
2650
                            status=self.instance_status,
2651
                            )
2652

    
2653
    feedback_fn("* creating instance disks...")
2654
    if not _CreateDisks(self.cfg, iobj):
2655
      _RemoveDisks(iobj, self.cfg)
2656
      raise errors.OpExecError, ("Device creation failed, reverting...")
2657

    
2658
    feedback_fn("adding instance %s to cluster config" % instance)
2659

    
2660
    self.cfg.AddInstance(iobj)
2661

    
2662
    if self.op.wait_for_sync:
2663
      disk_abort = not _WaitForSync(self.cfg, iobj)
2664
    elif iobj.disk_template == "remote_raid1":
2665
      # make sure the disks are not degraded (still sync-ing is ok)
2666
      time.sleep(15)
2667
      feedback_fn("* checking mirrors status")
2668
      disk_abort = not _WaitForSync(self.cfg, iobj, oneshot=True)
2669
    else:
2670
      disk_abort = False
2671

    
2672
    if disk_abort:
2673
      _RemoveDisks(iobj, self.cfg)
2674
      self.cfg.RemoveInstance(iobj.name)
2675
      raise errors.OpExecError, ("There are some degraded disks for"
2676
                                      " this instance")
2677

    
2678
    feedback_fn("creating os for instance %s on node %s" %
2679
                (instance, pnode_name))
2680

    
2681
    if iobj.disk_template != constants.DT_DISKLESS:
2682
      if self.op.mode == constants.INSTANCE_CREATE:
2683
        feedback_fn("* running the instance OS create scripts...")
2684
        if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
2685
          raise errors.OpExecError, ("could not add os for instance %s"
2686
                                          " on node %s" %
2687
                                          (instance, pnode_name))
2688

    
2689
      elif self.op.mode == constants.INSTANCE_IMPORT:
2690
        feedback_fn("* running the instance OS import scripts...")
2691
        src_node = self.op.src_node
2692
        src_image = self.src_image
2693
        if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
2694
                                                src_node, src_image):
2695
          raise errors.OpExecError, ("Could not import os for instance"
2696
                                          " %s on node %s" %
2697
                                          (instance, pnode_name))
2698
      else:
2699
        # also checked in the prereq part
2700
        raise errors.ProgrammerError, ("Unknown OS initialization mode '%s'"
2701
                                       % self.op.mode)
2702

    
2703
    if self.op.start:
2704
      logger.Info("starting instance %s on node %s" % (instance, pnode_name))
2705
      feedback_fn("* starting instance...")
2706
      if not rpc.call_instance_start(pnode_name, iobj, None):
2707
        raise errors.OpExecError, ("Could not start instance")
2708

    
2709

    
2710
class LUConnectConsole(NoHooksLU):
2711
  """Connect to an instance's console.
2712

2713
  This is somewhat special in that it returns the command line that
2714
  you need to run on the master node in order to connect to the
2715
  console.
2716

2717
  """
2718
  _OP_REQP = ["instance_name"]
2719

    
2720
  def CheckPrereq(self):
2721
    """Check prerequisites.
2722

2723
    This checks that the instance is in the cluster.
2724

2725
    """
2726
    instance = self.cfg.GetInstanceInfo(
2727
      self.cfg.ExpandInstanceName(self.op.instance_name))
2728
    if instance is None:
2729
      raise errors.OpPrereqError, ("Instance '%s' not known" %
2730
                                   self.op.instance_name)
2731
    self.instance = instance
2732

    
2733
  def Exec(self, feedback_fn):
2734
    """Connect to the console of an instance
2735

2736
    """
2737
    instance = self.instance
2738
    node = instance.primary_node
2739

    
2740
    node_insts = rpc.call_instance_list([node])[node]
2741
    if node_insts is False:
2742
      raise errors.OpExecError, ("Can't connect to node %s." % node)
2743

    
2744
    if instance.name not in node_insts:
2745
      raise errors.OpExecError, ("Instance %s is not running." % instance.name)
2746

    
2747
    logger.Debug("connecting to console of %s on %s" % (instance.name, node))
2748

    
2749
    hyper = hypervisor.GetHypervisor()
2750
    console_cmd = hyper.GetShellCommandForConsole(instance.name)
2751
    return node, console_cmd
2752

    
2753

    
2754
class LUAddMDDRBDComponent(LogicalUnit):
2755
  """Adda new mirror member to an instance's disk.
2756

2757
  """
2758
  HPATH = "mirror-add"
2759
  HTYPE = constants.HTYPE_INSTANCE
2760
  _OP_REQP = ["instance_name", "remote_node", "disk_name"]
2761

    
2762
  def BuildHooksEnv(self):
2763
    """Build hooks env.
2764

2765
    This runs on the master, the primary and all the secondaries.
2766

2767
    """
2768
    env = {
2769
      "NEW_SECONDARY": self.op.remote_node,
2770
      "DISK_NAME": self.op.disk_name,
2771
      }
2772
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2773
    nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
2774
          self.op.remote_node,] + list(self.instance.secondary_nodes)
2775
    return env, nl, nl
2776

    
2777
  def CheckPrereq(self):
2778
    """Check prerequisites.
2779

2780
    This checks that the instance is in the cluster.
2781

2782
    """
2783
    instance = self.cfg.GetInstanceInfo(
2784
      self.cfg.ExpandInstanceName(self.op.instance_name))
2785
    if instance is None:
2786
      raise errors.OpPrereqError, ("Instance '%s' not known" %
2787
                                   self.op.instance_name)
2788
    self.instance = instance
2789

    
2790
    remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
2791
    if remote_node is None:
2792
      raise errors.OpPrereqError, ("Node '%s' not known" % self.op.remote_node)
2793
    self.remote_node = remote_node
2794

    
2795
    if remote_node == instance.primary_node:
2796
      raise errors.OpPrereqError, ("The specified node is the primary node of"
2797
                                   " the instance.")
2798

    
2799
    if instance.disk_template != constants.DT_REMOTE_RAID1:
2800
      raise errors.OpPrereqError, ("Instance's disk layout is not"
2801
                                   " remote_raid1.")
2802
    for disk in instance.disks:
2803
      if disk.iv_name == self.op.disk_name:
2804
        break
2805
    else:
2806
      raise errors.OpPrereqError, ("Can't find this device ('%s') in the"
2807
                                   " instance." % self.op.disk_name)
2808
    if len(disk.children) > 1:
2809
      raise errors.OpPrereqError, ("The device already has two slave"
2810
                                   " devices.\n"
2811
                                   "This would create a 3-disk raid1"
2812
                                   " which we don't allow.")
2813
    self.disk = disk
2814

    
2815
  def Exec(self, feedback_fn):
2816
    """Add the mirror component
2817

2818
    """
2819
    disk = self.disk
2820
    instance = self.instance
2821

    
2822
    remote_node = self.remote_node
2823
    new_drbd = _GenerateMDDRBDBranch(self.cfg, self.cfg.GetVGName(),
2824
                                     instance.primary_node, remote_node,
2825
                                     disk.size, "%s-%s" %
2826
                                     (instance.name, self.op.disk_name))
2827

    
2828
    logger.Info("adding new mirror component on secondary")
2829
    #HARDCODE
2830
    if not _CreateBlockDevOnSecondary(self.cfg, remote_node, new_drbd, False):
2831
      raise errors.OpExecError, ("Failed to create new component on secondary"
2832
                                 " node %s" % remote_node)
2833

    
2834
    logger.Info("adding new mirror component on primary")
2835
    #HARDCODE
2836
    if not _CreateBlockDevOnPrimary(self.cfg, instance.primary_node, new_drbd):
2837
      # remove secondary dev
2838
      self.cfg.SetDiskID(new_drbd, remote_node)
2839
      rpc.call_blockdev_remove(remote_node, new_drbd)
2840
      raise errors.OpExecError, ("Failed to create volume on primary")
2841

    
2842
    # the device exists now
2843
    # call the primary node to add the mirror to md
2844
    logger.Info("adding new mirror component to md")
2845
    if not rpc.call_blockdev_addchild(instance.primary_node,
2846
                                           disk, new_drbd):
2847
      logger.Error("Can't add mirror compoment to md!")
2848
      self.cfg.SetDiskID(new_drbd, remote_node)
2849
      if not rpc.call_blockdev_remove(remote_node, new_drbd):
2850
        logger.Error("Can't rollback on secondary")
2851
      self.cfg.SetDiskID(new_drbd, instance.primary_node)
2852
      if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
2853
        logger.Error("Can't rollback on primary")
2854
      raise errors.OpExecError, "Can't add mirror component to md array"
2855

    
2856
    disk.children.append(new_drbd)
2857

    
2858
    self.cfg.AddInstance(instance)
2859

    
2860
    _WaitForSync(self.cfg, instance)
2861

    
2862
    return 0
2863

    
2864

    
2865
class LURemoveMDDRBDComponent(LogicalUnit):
2866
  """Remove a component from a remote_raid1 disk.
2867

2868
  """
2869
  HPATH = "mirror-remove"
2870
  HTYPE = constants.HTYPE_INSTANCE
2871
  _OP_REQP = ["instance_name", "disk_name", "disk_id"]
2872

    
2873
  def BuildHooksEnv(self):
2874
    """Build hooks env.
2875

2876
    This runs on the master, the primary and all the secondaries.
2877

2878
    """
2879
    env = {
2880
      "DISK_NAME": self.op.disk_name,
2881
      "DISK_ID": self.op.disk_id,
2882
      "OLD_SECONDARY": self.old_secondary,
2883
      }
2884
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2885
    nl = [self.sstore.GetMasterNode(),
2886
          self.instance.primary_node] + list(self.instance.secondary_nodes)
2887
    return env, nl, nl
2888

    
2889
  def CheckPrereq(self):
2890
    """Check prerequisites.
2891

2892
    This checks that the instance is in the cluster.
2893

2894
    """
2895
    instance = self.cfg.GetInstanceInfo(
2896
      self.cfg.ExpandInstanceName(self.op.instance_name))
2897
    if instance is None:
2898
      raise errors.OpPrereqError, ("Instance '%s' not known" %
2899
                                   self.op.instance_name)
2900
    self.instance = instance
2901

    
2902
    if instance.disk_template != constants.DT_REMOTE_RAID1:
2903
      raise errors.OpPrereqError, ("Instance's disk layout is not"
2904
                                   " remote_raid1.")
2905
    for disk in instance.disks:
2906
      if disk.iv_name == self.op.disk_name:
2907
        break
2908
    else:
2909
      raise errors.OpPrereqError, ("Can't find this device ('%s') in the"
2910
                                   " instance." % self.op.disk_name)
2911
    for child in disk.children:
2912
      if child.dev_type == "drbd" and child.logical_id[2] == self.op.disk_id:
2913
        break
2914
    else:
2915
      raise errors.OpPrereqError, ("Can't find the device with this port.")
2916

    
2917
    if len(disk.children) < 2:
2918
      raise errors.OpPrereqError, ("Cannot remove the last component from"
2919
                                   " a mirror.")
2920
    self.disk = disk
2921
    self.child = child
2922
    if self.child.logical_id[0] == instance.primary_node:
2923
      oid = 1
2924
    else:
2925
      oid = 0
2926
    self.old_secondary = self.child.logical_id[oid]
2927

    
2928
  def Exec(self, feedback_fn):
2929
    """Remove the mirror component
2930

2931
    """
2932
    instance = self.instance
2933
    disk = self.disk
2934
    child = self.child
2935
    logger.Info("remove mirror component")
2936
    self.cfg.SetDiskID(disk, instance.primary_node)
2937
    if not rpc.call_blockdev_removechild(instance.primary_node,
2938
                                              disk, child):
2939
      raise errors.OpExecError, ("Can't remove child from mirror.")
2940

    
2941
    for node in child.logical_id[:2]:
2942
      self.cfg.SetDiskID(child, node)
2943
      if not rpc.call_blockdev_remove(node, child):
2944
        logger.Error("Warning: failed to remove device from node %s,"
2945
                     " continuing operation." % node)
2946

    
2947
    disk.children.remove(child)
2948
    self.cfg.AddInstance(instance)
2949

    
2950

    
2951
class LUReplaceDisks(LogicalUnit):
2952
  """Replace the disks of an instance.
2953

2954
  """
2955
  HPATH = "mirrors-replace"
2956
  HTYPE = constants.HTYPE_INSTANCE
2957
  _OP_REQP = ["instance_name"]
2958

    
2959
  def BuildHooksEnv(self):
2960
    """Build hooks env.
2961

2962
    This runs on the master, the primary and all the secondaries.
2963

2964
    """
2965
    env = {
2966
      "NEW_SECONDARY": self.op.remote_node,
2967
      "OLD_SECONDARY": self.instance.secondary_nodes[0],
2968
      }
2969
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2970
    nl = [self.sstore.GetMasterNode(),
2971
          self.instance.primary_node] + list(self.instance.secondary_nodes)
2972
    return env, nl, nl
2973

    
2974
  def CheckPrereq(self):
2975
    """Check prerequisites.
2976

2977
    This checks that the instance is in the cluster.
2978

2979
    """
2980
    instance = self.cfg.GetInstanceInfo(
2981
      self.cfg.ExpandInstanceName(self.op.instance_name))
2982
    if instance is None:
2983
      raise errors.OpPrereqError, ("Instance '%s' not known" %
2984
                                   self.op.instance_name)
2985
    self.instance = instance
2986

    
2987
    if instance.disk_template != constants.DT_REMOTE_RAID1:
2988
      raise errors.OpPrereqError, ("Instance's disk layout is not"
2989
                                   " remote_raid1.")
2990

    
2991
    if len(instance.secondary_nodes) != 1:
2992
      raise errors.OpPrereqError, ("The instance has a strange layout,"
2993
                                   " expected one secondary but found %d" %
2994
                                   len(instance.secondary_nodes))
2995

    
2996
    remote_node = getattr(self.op, "remote_node", None)
2997
    if remote_node is None:
2998
      remote_node = instance.secondary_nodes[0]
2999
    else:
3000
      remote_node = self.cfg.ExpandNodeName(remote_node)
3001
      if remote_node is None:
3002
        raise errors.OpPrereqError, ("Node '%s' not known" %
3003
                                     self.op.remote_node)
3004
    if remote_node == instance.primary_node:
3005
      raise errors.OpPrereqError, ("The specified node is the primary node of"
3006
                                   " the instance.")
3007
    self.op.remote_node = remote_node
3008

    
3009
  def Exec(self, feedback_fn):
3010
    """Replace the disks of an instance.
3011

3012
    """
3013
    instance = self.instance
3014
    iv_names = {}
3015
    # start of work
3016
    remote_node = self.op.remote_node
3017
    cfg = self.cfg
3018
    vgname = cfg.GetVGName()
3019
    for dev in instance.disks:
3020
      size = dev.size
3021
      new_drbd = _GenerateMDDRBDBranch(cfg, vgname, instance.primary_node,
3022
                                       remote_node, size,
3023
                                       "%s-%s" % (instance.name, dev.iv_name))
3024
      iv_names[dev.iv_name] = (dev, dev.children[0], new_drbd)
3025
      logger.Info("adding new mirror component on secondary for %s" %
3026
                  dev.iv_name)
3027
      #HARDCODE
3028
      if not _CreateBlockDevOnSecondary(cfg, remote_node, new_drbd, False):
3029
        raise errors.OpExecError, ("Failed to create new component on"
3030
                                   " secondary node %s\n"
3031
                                   "Full abort, cleanup manually!" %
3032
                                   remote_node)
3033

    
3034
      logger.Info("adding new mirror component on primary")
3035
      #HARDCODE
3036
      if not _CreateBlockDevOnPrimary(cfg, instance.primary_node, new_drbd):
3037
        # remove secondary dev
3038
        cfg.SetDiskID(new_drbd, remote_node)
3039
        rpc.call_blockdev_remove(remote_node, new_drbd)
3040
        raise errors.OpExecError("Failed to create volume on primary!\n"
3041
                                 "Full abort, cleanup manually!!")
3042

    
3043
      # the device exists now
3044
      # call the primary node to add the mirror to md
3045
      logger.Info("adding new mirror component to md")
3046
      if not rpc.call_blockdev_addchild(instance.primary_node, dev,
3047
                                        new_drbd):
3048
        logger.Error("Can't add mirror compoment to md!")
3049
        cfg.SetDiskID(new_drbd, remote_node)
3050
        if not rpc.call_blockdev_remove(remote_node, new_drbd):
3051
          logger.Error("Can't rollback on secondary")
3052
        cfg.SetDiskID(new_drbd, instance.primary_node)
3053
        if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3054
          logger.Error("Can't rollback on primary")
3055
        raise errors.OpExecError, ("Full abort, cleanup manually!!")
3056

    
3057
      dev.children.append(new_drbd)
3058
      cfg.AddInstance(instance)
3059

    
3060
    # this can fail as the old devices are degraded and _WaitForSync
3061
    # does a combined result over all disks, so we don't check its
3062
    # return value
3063
    _WaitForSync(cfg, instance, unlock=True)
3064

    
3065
    # so check manually all the devices
3066
    for name in iv_names:
3067
      dev, child, new_drbd = iv_names[name]
3068
      cfg.SetDiskID(dev, instance.primary_node)
3069
      is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3070
      if is_degr:
3071
        raise errors.OpExecError, ("MD device %s is degraded!" % name)
3072
      cfg.SetDiskID(new_drbd, instance.primary_node)
3073
      is_degr = rpc.call_blockdev_find(instance.primary_node, new_drbd)[5]
3074
      if is_degr:
3075
        raise errors.OpExecError, ("New drbd device %s is degraded!" % name)
3076

    
3077
    for name in iv_names:
3078
      dev, child, new_drbd = iv_names[name]
3079
      logger.Info("remove mirror %s component" % name)
3080
      cfg.SetDiskID(dev, instance.primary_node)
3081
      if not rpc.call_blockdev_removechild(instance.primary_node,
3082
                                                dev, child):
3083
        logger.Error("Can't remove child from mirror, aborting"
3084
                     " *this device cleanup*.\nYou need to cleanup manually!!")
3085
        continue
3086

    
3087
      for node in child.logical_id[:2]:
3088
        logger.Info("remove child device on %s" % node)
3089
        cfg.SetDiskID(child, node)
3090
        if not rpc.call_blockdev_remove(node, child):
3091
          logger.Error("Warning: failed to remove device from node %s,"
3092
                       " continuing operation." % node)
3093

    
3094
      dev.children.remove(child)
3095

    
3096
      cfg.AddInstance(instance)
3097

    
3098

    
3099
class LUQueryInstanceData(NoHooksLU):
3100
  """Query runtime instance data.
3101

3102
  """
3103
  _OP_REQP = ["instances"]
3104

    
3105
  def CheckPrereq(self):
3106
    """Check prerequisites.
3107

3108
    This only checks the optional instance list against the existing names.
3109

3110
    """
3111
    if not isinstance(self.op.instances, list):
3112
      raise errors.OpPrereqError, "Invalid argument type 'instances'"
3113
    if self.op.instances:
3114
      self.wanted_instances = []
3115
      names = self.op.instances
3116
      for name in names:
3117
        instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
3118
        if instance is None:
3119
          raise errors.OpPrereqError, ("No such instance name '%s'" % name)
3120
      self.wanted_instances.append(instance)
3121
    else:
3122
      self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
3123
                               in self.cfg.GetInstanceList()]
3124
    return
3125

    
3126

    
3127
  def _ComputeDiskStatus(self, instance, snode, dev):
3128
    """Compute block device status.
3129

3130
    """
3131
    self.cfg.SetDiskID(dev, instance.primary_node)
3132
    dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
3133
    if dev.dev_type == "drbd":
3134
      # we change the snode then (otherwise we use the one passed in)
3135
      if dev.logical_id[0] == instance.primary_node:
3136
        snode = dev.logical_id[1]
3137
      else:
3138
        snode = dev.logical_id[0]
3139

    
3140
    if snode:
3141
      self.cfg.SetDiskID(dev, snode)
3142
      dev_sstatus = rpc.call_blockdev_find(snode, dev)
3143
    else:
3144
      dev_sstatus = None
3145

    
3146
    if dev.children:
3147
      dev_children = [self._ComputeDiskStatus(instance, snode, child)
3148
                      for child in dev.children]
3149
    else:
3150
      dev_children = []
3151

    
3152
    data = {
3153
      "iv_name": dev.iv_name,
3154
      "dev_type": dev.dev_type,
3155
      "logical_id": dev.logical_id,
3156
      "physical_id": dev.physical_id,
3157
      "pstatus": dev_pstatus,
3158
      "sstatus": dev_sstatus,
3159
      "children": dev_children,
3160
      }
3161

    
3162
    return data
3163

    
3164
  def Exec(self, feedback_fn):
3165
    """Gather and return data"""
3166
    result = {}
3167
    for instance in self.wanted_instances:
3168
      remote_info = rpc.call_instance_info(instance.primary_node,
3169
                                                instance.name)
3170
      if remote_info and "state" in remote_info:
3171
        remote_state = "up"
3172
      else:
3173
        remote_state = "down"
3174
      if instance.status == "down":
3175
        config_state = "down"
3176
      else:
3177
        config_state = "up"
3178

    
3179
      disks = [self._ComputeDiskStatus(instance, None, device)
3180
               for device in instance.disks]
3181

    
3182
      idict = {
3183
        "name": instance.name,
3184
        "config_state": config_state,
3185
        "run_state": remote_state,
3186
        "pnode": instance.primary_node,
3187
        "snodes": instance.secondary_nodes,
3188
        "os": instance.os,
3189
        "memory": instance.memory,
3190
        "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
3191
        "disks": disks,
3192
        }
3193

    
3194
      result[instance.name] = idict
3195

    
3196
    return result
3197

    
3198

    
3199
class LUQueryNodeData(NoHooksLU):
3200
  """Logical unit for querying node data.
3201

3202
  """
3203
  _OP_REQP = ["nodes"]
3204

    
3205
  def CheckPrereq(self):
3206
    """Check prerequisites.
3207

3208
    This only checks the optional node list against the existing names.
3209

3210
    """
3211
    self.wanted_nodes = _GetWantedNodes(self, self.op.nodes)
3212

    
3213
  def Exec(self, feedback_fn):
3214
    """Compute and return the list of nodes.
3215

3216
    """
3217
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
3218
             in self.cfg.GetInstanceList()]
3219
    result = []
3220
    for node in self.wanted_nodes:
3221
      result.append((node.name, node.primary_ip, node.secondary_ip,
3222
                     [inst.name for inst in ilist
3223
                      if inst.primary_node == node.name],
3224
                     [inst.name for inst in ilist
3225
                      if node.name in inst.secondary_nodes],
3226
                     ))
3227
    return result
3228

    
3229

    
3230
class LUSetInstanceParms(LogicalUnit):
3231
  """Modifies an instances's parameters.
3232

3233
  """
3234
  HPATH = "instance-modify"
3235
  HTYPE = constants.HTYPE_INSTANCE
3236
  _OP_REQP = ["instance_name"]
3237

    
3238
  def BuildHooksEnv(self):
3239
    """Build hooks env.
3240

3241
    This runs on the master, primary and secondaries.
3242

3243
    """
3244
    args = dict()
3245
    if self.mem:
3246
      args['memory'] = self.mem
3247
    if self.vcpus:
3248
      args['vcpus'] = self.vcpus
3249
    if self.do_ip or self.do_bridge:
3250
      if self.do_ip:
3251
        ip = self.ip
3252
      else:
3253
        ip = self.instance.nics[0].ip
3254
      if self.bridge:
3255
        bridge = self.bridge
3256
      else:
3257
        bridge = self.instance.nics[0].bridge
3258
      args['nics'] = [(ip, bridge)]
3259
    env = _BuildInstanceHookEnvByObject(self.instance, override=args)
3260
    nl = [self.sstore.GetMasterNode(),
3261
          self.instance.primary_node] + list(self.instance.secondary_nodes)
3262
    return env, nl, nl
3263

    
3264
  def CheckPrereq(self):
3265
    """Check prerequisites.
3266

3267
    This only checks the instance list against the existing names.
3268

3269
    """
3270
    self.mem = getattr(self.op, "mem", None)
3271
    self.vcpus = getattr(self.op, "vcpus", None)
3272
    self.ip = getattr(self.op, "ip", None)
3273
    self.bridge = getattr(self.op, "bridge", None)
3274
    if [self.mem, self.vcpus, self.ip, self.bridge].count(None) == 4:
3275
      raise errors.OpPrereqError, ("No changes submitted")
3276
    if self.mem is not None:
3277
      try:
3278
        self.mem = int(self.mem)
3279
      except ValueError, err:
3280
        raise errors.OpPrereqError, ("Invalid memory size: %s" % str(err))
3281
    if self.vcpus is not None:
3282
      try:
3283
        self.vcpus = int(self.vcpus)
3284
      except ValueError, err:
3285
        raise errors.OpPrereqError, ("Invalid vcpus number: %s" % str(err))
3286
    if self.ip is not None:
3287
      self.do_ip = True
3288
      if self.ip.lower() == "none":
3289
        self.ip = None
3290
      else:
3291
        if not utils.IsValidIP(self.ip):
3292
          raise errors.OpPrereqError, ("Invalid IP address '%s'." % self.ip)
3293
    else:
3294
      self.do_ip = False
3295

    
3296
    instance = self.cfg.GetInstanceInfo(
3297
      self.cfg.ExpandInstanceName(self.op.instance_name))
3298
    if instance is None:
3299
      raise errors.OpPrereqError, ("No such instance name '%s'" %
3300
                                   self.op.instance_name)
3301
    self.op.instance_name = instance.name
3302
    self.instance = instance
3303
    return
3304

    
3305
  def Exec(self, feedback_fn):
3306
    """Modifies an instance.
3307

3308
    All parameters take effect only at the next restart of the instance.
3309
    """
3310
    result = []
3311
    instance = self.instance
3312
    if self.mem:
3313
      instance.memory = self.mem
3314
      result.append(("mem", self.mem))
3315
    if self.vcpus:
3316
      instance.vcpus = self.vcpus
3317
      result.append(("vcpus",  self.vcpus))
3318
    if self.do_ip:
3319
      instance.nics[0].ip = self.ip
3320
      result.append(("ip", self.ip))
3321
    if self.bridge:
3322
      instance.nics[0].bridge = self.bridge
3323
      result.append(("bridge", self.bridge))
3324

    
3325
    self.cfg.AddInstance(instance)
3326

    
3327
    return result
3328

    
3329

    
3330
class LUQueryExports(NoHooksLU):
3331
  """Query the exports list
3332

3333
  """
3334
  _OP_REQP = []
3335

    
3336
  def CheckPrereq(self):
3337
    """Check that the nodelist contains only existing nodes.
3338

3339
    """
3340
    self.nodes = _GetWantedNodes(self, getattr(self.op, "nodes", None))
3341

    
3342
  def Exec(self, feedback_fn):
3343
    """Compute the list of all the exported system images.
3344

3345
    Returns:
3346
      a dictionary with the structure node->(export-list)
3347
      where export-list is a list of the instances exported on
3348
      that node.
3349

3350
    """
3351
    return rpc.call_export_list([node.name for node in self.nodes])
3352

    
3353

    
3354
class LUExportInstance(LogicalUnit):
3355
  """Export an instance to an image in the cluster.
3356

3357
  """
3358
  HPATH = "instance-export"
3359
  HTYPE = constants.HTYPE_INSTANCE
3360
  _OP_REQP = ["instance_name", "target_node", "shutdown"]
3361

    
3362
  def BuildHooksEnv(self):
3363
    """Build hooks env.
3364

3365
    This will run on the master, primary node and target node.
3366

3367
    """
3368
    env = {
3369
      "EXPORT_NODE": self.op.target_node,
3370
      "EXPORT_DO_SHUTDOWN": self.op.shutdown,
3371
      }
3372
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3373
    nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
3374
          self.op.target_node]
3375
    return env, nl, nl
3376

    
3377
  def CheckPrereq(self):
3378
    """Check prerequisites.
3379

3380
    This checks that the instance name is a valid one.
3381

3382
    """
3383
    instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
3384
    self.instance = self.cfg.GetInstanceInfo(instance_name)
3385
    if self.instance is None:
3386
      raise errors.OpPrereqError, ("Instance '%s' not found" %
3387
                                   self.op.instance_name)
3388

    
3389
    # node verification
3390
    dst_node_short = self.cfg.ExpandNodeName(self.op.target_node)
3391
    self.dst_node = self.cfg.GetNodeInfo(dst_node_short)
3392

    
3393
    if self.dst_node is None:
3394
      raise errors.OpPrereqError, ("Destination node '%s' is unknown." %
3395
                                   self.op.target_node)
3396
    self.op.target_node = self.dst_node.name
3397

    
3398
  def Exec(self, feedback_fn):
3399
    """Export an instance to an image in the cluster.
3400

3401
    """
3402
    instance = self.instance
3403
    dst_node = self.dst_node
3404
    src_node = instance.primary_node
3405
    # shutdown the instance, unless requested not to do so
3406
    if self.op.shutdown:
3407
      op = opcodes.OpShutdownInstance(instance_name=instance.name)
3408
      self.processor.ChainOpCode(op, feedback_fn)
3409

    
3410
    vgname = self.cfg.GetVGName()
3411

    
3412
    snap_disks = []
3413

    
3414
    try:
3415
      for disk in instance.disks:
3416
        if disk.iv_name == "sda":
3417
          # new_dev_name will be a snapshot of an lvm leaf of the one we passed
3418
          new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
3419

    
3420
          if not new_dev_name:
3421
            logger.Error("could not snapshot block device %s on node %s" %
3422
                         (disk.logical_id[1], src_node))
3423
          else:
3424
            new_dev = objects.Disk(dev_type="lvm", size=disk.size,
3425
                                      logical_id=(vgname, new_dev_name),
3426
                                      physical_id=(vgname, new_dev_name),
3427
                                      iv_name=disk.iv_name)
3428
            snap_disks.append(new_dev)
3429

    
3430
    finally:
3431
      if self.op.shutdown:
3432
        op = opcodes.OpStartupInstance(instance_name=instance.name,
3433
                                       force=False)
3434
        self.processor.ChainOpCode(op, feedback_fn)
3435

    
3436
    # TODO: check for size
3437

    
3438
    for dev in snap_disks:
3439
      if not rpc.call_snapshot_export(src_node, dev, dst_node.name,
3440
                                           instance):
3441
        logger.Error("could not export block device %s from node"
3442
                     " %s to node %s" %
3443
                     (dev.logical_id[1], src_node, dst_node.name))
3444
      if not rpc.call_blockdev_remove(src_node, dev):
3445
        logger.Error("could not remove snapshot block device %s from"
3446
                     " node %s" % (dev.logical_id[1], src_node))
3447

    
3448
    if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
3449
      logger.Error("could not finalize export for instance %s on node %s" %
3450
                   (instance.name, dst_node.name))
3451

    
3452
    nodelist = self.cfg.GetNodeList()
3453
    nodelist.remove(dst_node.name)
3454

    
3455
    # on one-node clusters nodelist will be empty after the removal
3456
    # if we proceed the backup would be removed because OpQueryExports
3457
    # substitutes an empty list with the full cluster node list.
3458
    if nodelist:
3459
      op = opcodes.OpQueryExports(nodes=nodelist)
3460
      exportlist = self.processor.ChainOpCode(op, feedback_fn)
3461
      for node in exportlist:
3462
        if instance.name in exportlist[node]:
3463
          if not rpc.call_export_remove(node, instance.name):
3464
            logger.Error("could not remove older export for instance %s"
3465
                         " on node %s" % (instance.name, node))