Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ 82122173

History | View | Annotate | Download (115 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0613,W0201
25

    
26
import os
27
import os.path
28
import sha
29
import socket
30
import time
31
import tempfile
32
import re
33
import platform
34

    
35
from ganeti import rpc
36
from ganeti import ssh
37
from ganeti import logger
38
from ganeti import utils
39
from ganeti import errors
40
from ganeti import hypervisor
41
from ganeti import config
42
from ganeti import constants
43
from ganeti import objects
44
from ganeti import opcodes
45
from ganeti import ssconf
46

    
47
class LogicalUnit(object):
48
  """Logical Unit base class.
49

50
  Subclasses must follow these rules:
51
    - implement CheckPrereq which also fills in the opcode instance
52
      with all the fields (even if as None)
53
    - implement Exec
54
    - implement BuildHooksEnv
55
    - redefine HPATH and HTYPE
56
    - optionally redefine their run requirements (REQ_CLUSTER,
57
      REQ_MASTER); note that all commands require root permissions
58

59
  """
60
  HPATH = None
61
  HTYPE = None
62
  _OP_REQP = []
63
  REQ_CLUSTER = True
64
  REQ_MASTER = True
65

    
66
  def __init__(self, processor, op, cfg, sstore):
67
    """Constructor for LogicalUnit.
68

69
    This needs to be overriden in derived classes in order to check op
70
    validity.
71

72
    """
73
    self.processor = processor
74
    self.op = op
75
    self.cfg = cfg
76
    self.sstore = sstore
77
    for attr_name in self._OP_REQP:
78
      attr_val = getattr(op, attr_name, None)
79
      if attr_val is None:
80
        raise errors.OpPrereqError("Required parameter '%s' missing" %
81
                                   attr_name)
82
    if self.REQ_CLUSTER:
83
      if not cfg.IsCluster():
84
        raise errors.OpPrereqError("Cluster not initialized yet,"
85
                                   " use 'gnt-cluster init' first.")
86
      if self.REQ_MASTER:
87
        master = sstore.GetMasterNode()
88
        if master != socket.gethostname():
89
          raise errors.OpPrereqError("Commands must be run on the master"
90
                                     " node %s" % master)
91

    
92
  def CheckPrereq(self):
93
    """Check prerequisites for this LU.
94

95
    This method should check that the prerequisites for the execution
96
    of this LU are fulfilled. It can do internode communication, but
97
    it should be idempotent - no cluster or system changes are
98
    allowed.
99

100
    The method should raise errors.OpPrereqError in case something is
101
    not fulfilled. Its return value is ignored.
102

103
    This method should also update all the parameters of the opcode to
104
    their canonical form; e.g. a short node name must be fully
105
    expanded after this method has successfully completed (so that
106
    hooks, logging, etc. work correctly).
107

108
    """
109
    raise NotImplementedError
110

    
111
  def Exec(self, feedback_fn):
112
    """Execute the LU.
113

114
    This method should implement the actual work. It should raise
115
    errors.OpExecError for failures that are somewhat dealt with in
116
    code, or expected.
117

118
    """
119
    raise NotImplementedError
120

    
121
  def BuildHooksEnv(self):
122
    """Build hooks environment for this LU.
123

124
    This method should return a three-node tuple consisting of: a dict
125
    containing the environment that will be used for running the
126
    specific hook for this LU, a list of node names on which the hook
127
    should run before the execution, and a list of node names on which
128
    the hook should run after the execution.
129

130
    The keys of the dict must not have 'GANETI_' prefixed as this will
131
    be handled in the hooks runner. Also note additional keys will be
132
    added by the hooks runner. If the LU doesn't define any
133
    environment, an empty dict (and not None) should be returned.
134

135
    As for the node lists, the master should not be included in the
136
    them, as it will be added by the hooks runner in case this LU
137
    requires a cluster to run on (otherwise we don't have a node
138
    list). No nodes should be returned as an empty list (and not
139
    None).
140

141
    Note that if the HPATH for a LU class is None, this function will
142
    not be called.
143

144
    """
145
    raise NotImplementedError
146

    
147

    
148
class NoHooksLU(LogicalUnit):
149
  """Simple LU which runs no hooks.
150

151
  This LU is intended as a parent for other LogicalUnits which will
152
  run no hooks, in order to reduce duplicate code.
153

154
  """
155
  HPATH = None
156
  HTYPE = None
157

    
158
  def BuildHooksEnv(self):
159
    """Build hooks env.
160

161
    This is a no-op, since we don't run hooks.
162

163
    """
164
    return
165

    
166

    
167
def _GetWantedNodes(lu, nodes):
168
  """Returns list of checked and expanded nodes.
169

170
  Args:
171
    nodes: List of nodes (strings) or None for all
172

173
  """
174
  if nodes is not None and not isinstance(nodes, list):
175
    raise errors.OpPrereqError("Invalid argument type 'nodes'")
176

    
177
  if nodes:
178
    wanted_nodes = []
179

    
180
    for name in nodes:
181
      node = lu.cfg.GetNodeInfo(lu.cfg.ExpandNodeName(name))
182
      if node is None:
183
        raise errors.OpPrereqError("No such node name '%s'" % name)
184
    wanted_nodes.append(node)
185

    
186
    return wanted_nodes
187
  else:
188
    return [lu.cfg.GetNodeInfo(name) for name in lu.cfg.GetNodeList()]
189

    
190

    
191
def _CheckOutputFields(static, dynamic, selected):
192
  """Checks whether all selected fields are valid.
193

194
  Args:
195
    static: Static fields
196
    dynamic: Dynamic fields
197

198
  """
199
  static_fields = frozenset(static)
200
  dynamic_fields = frozenset(dynamic)
201

    
202
  all_fields = static_fields | dynamic_fields
203

    
204
  if not all_fields.issuperset(selected):
205
    raise errors.OpPrereqError("Unknown output fields selected: %s"
206
                               % ",".join(frozenset(selected).
207
                                          difference(all_fields)))
208

    
209

    
210
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
211
                          memory, vcpus, nics):
212
  """Builds instance related env variables for hooks from single variables.
213

214
  Args:
215
    secondary_nodes: List of secondary nodes as strings
216
  """
217
  env = {
218
    "INSTANCE_NAME": name,
219
    "INSTANCE_PRIMARY": primary_node,
220
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
221
    "INSTANCE_OS_TYPE": os_type,
222
    "INSTANCE_STATUS": status,
223
    "INSTANCE_MEMORY": memory,
224
    "INSTANCE_VCPUS": vcpus,
225
  }
226

    
227
  if nics:
228
    nic_count = len(nics)
229
    for idx, (ip, bridge) in enumerate(nics):
230
      if ip is None:
231
        ip = ""
232
      env["INSTANCE_NIC%d_IP" % idx] = ip
233
      env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
234
  else:
235
    nic_count = 0
236

    
237
  env["INSTANCE_NIC_COUNT"] = nic_count
238

    
239
  return env
240

    
241

    
242
def _BuildInstanceHookEnvByObject(instance, override=None):
243
  """Builds instance related env variables for hooks from an object.
244

245
  Args:
246
    instance: objects.Instance object of instance
247
    override: dict of values to override
248
  """
249
  args = {
250
    'name': instance.name,
251
    'primary_node': instance.primary_node,
252
    'secondary_nodes': instance.secondary_nodes,
253
    'os_type': instance.os,
254
    'status': instance.os,
255
    'memory': instance.memory,
256
    'vcpus': instance.vcpus,
257
    'nics': [(nic.ip, nic.bridge) for nic in instance.nics],
258
  }
259
  if override:
260
    args.update(override)
261
  return _BuildInstanceHookEnv(**args)
262

    
263

    
264
def _UpdateEtcHosts(fullnode, ip):
265
  """Ensure a node has a correct entry in /etc/hosts.
266

267
  Args:
268
    fullnode - Fully qualified domain name of host. (str)
269
    ip       - IPv4 address of host (str)
270

271
  """
272
  node = fullnode.split(".", 1)[0]
273

    
274
  f = open('/etc/hosts', 'r+')
275

    
276
  inthere = False
277

    
278
  save_lines = []
279
  add_lines = []
280
  removed = False
281

    
282
  while True:
283
    rawline = f.readline()
284

    
285
    if not rawline:
286
      # End of file
287
      break
288

    
289
    line = rawline.split('\n')[0]
290

    
291
    # Strip off comments
292
    line = line.split('#')[0]
293

    
294
    if not line:
295
      # Entire line was comment, skip
296
      save_lines.append(rawline)
297
      continue
298

    
299
    fields = line.split()
300

    
301
    haveall = True
302
    havesome = False
303
    for spec in [ ip, fullnode, node ]:
304
      if spec not in fields:
305
        haveall = False
306
      if spec in fields:
307
        havesome = True
308

    
309
    if haveall:
310
      inthere = True
311
      save_lines.append(rawline)
312
      continue
313

    
314
    if havesome and not haveall:
315
      # Line (old, or manual?) which is missing some.  Remove.
316
      removed = True
317
      continue
318

    
319
    save_lines.append(rawline)
320

    
321
  if not inthere:
322
    add_lines.append('%s\t%s %s\n' % (ip, fullnode, node))
323

    
324
  if removed:
325
    if add_lines:
326
      save_lines = save_lines + add_lines
327

    
328
    # We removed a line, write a new file and replace old.
329
    fd, tmpname = tempfile.mkstemp('tmp', 'hosts_', '/etc')
330
    newfile = os.fdopen(fd, 'w')
331
    newfile.write(''.join(save_lines))
332
    newfile.close()
333
    os.rename(tmpname, '/etc/hosts')
334

    
335
  elif add_lines:
336
    # Simply appending a new line will do the trick.
337
    f.seek(0, 2)
338
    for add in add_lines:
339
      f.write(add)
340

    
341
  f.close()
342

    
343

    
344
def _UpdateKnownHosts(fullnode, ip, pubkey):
345
  """Ensure a node has a correct known_hosts entry.
346

347
  Args:
348
    fullnode - Fully qualified domain name of host. (str)
349
    ip       - IPv4 address of host (str)
350
    pubkey   - the public key of the cluster
351

352
  """
353
  if os.path.exists(constants.SSH_KNOWN_HOSTS_FILE):
354
    f = open(constants.SSH_KNOWN_HOSTS_FILE, 'r+')
355
  else:
356
    f = open(constants.SSH_KNOWN_HOSTS_FILE, 'w+')
357

    
358
  inthere = False
359

    
360
  save_lines = []
361
  add_lines = []
362
  removed = False
363

    
364
  while True:
365
    rawline = f.readline()
366
    logger.Debug('read %s' % (repr(rawline),))
367

    
368
    if not rawline:
369
      # End of file
370
      break
371

    
372
    line = rawline.split('\n')[0]
373

    
374
    parts = line.split(' ')
375
    fields = parts[0].split(',')
376
    key = parts[2]
377

    
378
    haveall = True
379
    havesome = False
380
    for spec in [ ip, fullnode ]:
381
      if spec not in fields:
382
        haveall = False
383
      if spec in fields:
384
        havesome = True
385

    
386
    logger.Debug("key, pubkey = %s." % (repr((key, pubkey)),))
387
    if haveall and key == pubkey:
388
      inthere = True
389
      save_lines.append(rawline)
390
      logger.Debug("Keeping known_hosts '%s'." % (repr(rawline),))
391
      continue
392

    
393
    if havesome and (not haveall or key != pubkey):
394
      removed = True
395
      logger.Debug("Discarding known_hosts '%s'." % (repr(rawline),))
396
      continue
397

    
398
    save_lines.append(rawline)
399

    
400
  if not inthere:
401
    add_lines.append('%s,%s ssh-rsa %s\n' % (fullnode, ip, pubkey))
402
    logger.Debug("Adding known_hosts '%s'." % (repr(add_lines[-1]),))
403

    
404
  if removed:
405
    save_lines = save_lines + add_lines
406

    
407
    # Write a new file and replace old.
408
    fd, tmpname = tempfile.mkstemp('.tmp', 'known_hosts.',
409
                                   constants.DATA_DIR)
410
    newfile = os.fdopen(fd, 'w')
411
    try:
412
      newfile.write(''.join(save_lines))
413
    finally:
414
      newfile.close()
415
    logger.Debug("Wrote new known_hosts.")
416
    os.rename(tmpname, constants.SSH_KNOWN_HOSTS_FILE)
417

    
418
  elif add_lines:
419
    # Simply appending a new line will do the trick.
420
    f.seek(0, 2)
421
    for add in add_lines:
422
      f.write(add)
423

    
424
  f.close()
425

    
426

    
427
def _HasValidVG(vglist, vgname):
428
  """Checks if the volume group list is valid.
429

430
  A non-None return value means there's an error, and the return value
431
  is the error message.
432

433
  """
434
  vgsize = vglist.get(vgname, None)
435
  if vgsize is None:
436
    return "volume group '%s' missing" % vgname
437
  elif vgsize < 20480:
438
    return ("volume group '%s' too small (20480MiB required, %dMib found)" %
439
            (vgname, vgsize))
440
  return None
441

    
442

    
443
def _InitSSHSetup(node):
444
  """Setup the SSH configuration for the cluster.
445

446

447
  This generates a dsa keypair for root, adds the pub key to the
448
  permitted hosts and adds the hostkey to its own known hosts.
449

450
  Args:
451
    node: the name of this host as a fqdn
452

453
  """
454
  if os.path.exists('/root/.ssh/id_dsa'):
455
    utils.CreateBackup('/root/.ssh/id_dsa')
456
  if os.path.exists('/root/.ssh/id_dsa.pub'):
457
    utils.CreateBackup('/root/.ssh/id_dsa.pub')
458

    
459
  utils.RemoveFile('/root/.ssh/id_dsa')
460
  utils.RemoveFile('/root/.ssh/id_dsa.pub')
461

    
462
  result = utils.RunCmd(["ssh-keygen", "-t", "dsa",
463
                         "-f", "/root/.ssh/id_dsa",
464
                         "-q", "-N", ""])
465
  if result.failed:
466
    raise errors.OpExecError("Could not generate ssh keypair, error %s" %
467
                             result.output)
468

    
469
  f = open('/root/.ssh/id_dsa.pub', 'r')
470
  try:
471
    utils.AddAuthorizedKey('/root/.ssh/authorized_keys', f.read(8192))
472
  finally:
473
    f.close()
474

    
475

    
476
def _InitGanetiServerSetup(ss):
477
  """Setup the necessary configuration for the initial node daemon.
478

479
  This creates the nodepass file containing the shared password for
480
  the cluster and also generates the SSL certificate.
481

482
  """
483
  # Create pseudo random password
484
  randpass = sha.new(os.urandom(64)).hexdigest()
485
  # and write it into sstore
486
  ss.SetKey(ss.SS_NODED_PASS, randpass)
487

    
488
  result = utils.RunCmd(["openssl", "req", "-new", "-newkey", "rsa:1024",
489
                         "-days", str(365*5), "-nodes", "-x509",
490
                         "-keyout", constants.SSL_CERT_FILE,
491
                         "-out", constants.SSL_CERT_FILE, "-batch"])
492
  if result.failed:
493
    raise errors.OpExecError("could not generate server ssl cert, command"
494
                             " %s had exitcode %s and error message %s" %
495
                             (result.cmd, result.exit_code, result.output))
496

    
497
  os.chmod(constants.SSL_CERT_FILE, 0400)
498

    
499
  result = utils.RunCmd([constants.NODE_INITD_SCRIPT, "restart"])
500

    
501
  if result.failed:
502
    raise errors.OpExecError("Could not start the node daemon, command %s"
503
                             " had exitcode %s and error %s" %
504
                             (result.cmd, result.exit_code, result.output))
505

    
506

    
507
class LUInitCluster(LogicalUnit):
508
  """Initialise the cluster.
509

510
  """
511
  HPATH = "cluster-init"
512
  HTYPE = constants.HTYPE_CLUSTER
513
  _OP_REQP = ["cluster_name", "hypervisor_type", "vg_name", "mac_prefix",
514
              "def_bridge", "master_netdev"]
515
  REQ_CLUSTER = False
516

    
517
  def BuildHooksEnv(self):
518
    """Build hooks env.
519

520
    Notes: Since we don't require a cluster, we must manually add
521
    ourselves in the post-run node list.
522

523
    """
524
    env = {
525
      "CLUSTER": self.op.cluster_name,
526
      "MASTER": self.hostname['hostname_full'],
527
      }
528
    return env, [], [self.hostname['hostname_full']]
529

    
530
  def CheckPrereq(self):
531
    """Verify that the passed name is a valid one.
532

533
    """
534
    if config.ConfigWriter.IsCluster():
535
      raise errors.OpPrereqError("Cluster is already initialised")
536

    
537
    hostname_local = socket.gethostname()
538
    self.hostname = hostname = utils.LookupHostname(hostname_local)
539
    if not hostname:
540
      raise errors.OpPrereqError("Cannot resolve my own hostname ('%s')" %
541
                                 hostname_local)
542

    
543
    self.clustername = clustername = utils.LookupHostname(self.op.cluster_name)
544
    if not clustername:
545
      raise errors.OpPrereqError("Cannot resolve given cluster name ('%s')"
546
                                 % self.op.cluster_name)
547

    
548
    result = utils.RunCmd(["fping", "-S127.0.0.1", "-q", hostname['ip']])
549
    if result.failed:
550
      raise errors.OpPrereqError("Inconsistency: this host's name resolves"
551
                                 " to %s,\nbut this ip address does not"
552
                                 " belong to this host."
553
                                 " Aborting." % hostname['ip'])
554

    
555
    secondary_ip = getattr(self.op, "secondary_ip", None)
556
    if secondary_ip and not utils.IsValidIP(secondary_ip):
557
      raise errors.OpPrereqError("Invalid secondary ip given")
558
    if secondary_ip and secondary_ip != hostname['ip']:
559
      result = utils.RunCmd(["fping", "-S127.0.0.1", "-q", secondary_ip])
560
      if result.failed:
561
        raise errors.OpPrereqError("You gave %s as secondary IP,\n"
562
                                   "but it does not belong to this host." %
563
                                   secondary_ip)
564
    self.secondary_ip = secondary_ip
565

    
566
    # checks presence of the volume group given
567
    vgstatus = _HasValidVG(utils.ListVolumeGroups(), self.op.vg_name)
568

    
569
    if vgstatus:
570
      raise errors.OpPrereqError("Error: %s" % vgstatus)
571

    
572
    if not re.match("^[0-9a-z]{2}:[0-9a-z]{2}:[0-9a-z]{2}$",
573
                    self.op.mac_prefix):
574
      raise errors.OpPrereqError("Invalid mac prefix given '%s'" %
575
                                 self.op.mac_prefix)
576

    
577
    if self.op.hypervisor_type not in hypervisor.VALID_HTYPES:
578
      raise errors.OpPrereqError("Invalid hypervisor type given '%s'" %
579
                                 self.op.hypervisor_type)
580

    
581
    result = utils.RunCmd(["ip", "link", "show", "dev", self.op.master_netdev])
582
    if result.failed:
583
      raise errors.OpPrereqError("Invalid master netdev given (%s): '%s'" %
584
                                 (self.op.master_netdev,
585
                                  result.output.strip()))
586

    
587
  def Exec(self, feedback_fn):
588
    """Initialize the cluster.
589

590
    """
591
    clustername = self.clustername
592
    hostname = self.hostname
593

    
594
    # set up the simple store
595
    ss = ssconf.SimpleStore()
596
    ss.SetKey(ss.SS_HYPERVISOR, self.op.hypervisor_type)
597
    ss.SetKey(ss.SS_MASTER_NODE, hostname['hostname_full'])
598
    ss.SetKey(ss.SS_MASTER_IP, clustername['ip'])
599
    ss.SetKey(ss.SS_MASTER_NETDEV, self.op.master_netdev)
600
    ss.SetKey(ss.SS_CLUSTER_NAME, clustername['hostname'])
601

    
602
    # set up the inter-node password and certificate
603
    _InitGanetiServerSetup(ss)
604

    
605
    # start the master ip
606
    rpc.call_node_start_master(hostname['hostname_full'])
607

    
608
    # set up ssh config and /etc/hosts
609
    f = open('/etc/ssh/ssh_host_rsa_key.pub', 'r')
610
    try:
611
      sshline = f.read()
612
    finally:
613
      f.close()
614
    sshkey = sshline.split(" ")[1]
615

    
616
    _UpdateEtcHosts(hostname['hostname_full'],
617
                    hostname['ip'],
618
                    )
619

    
620
    _UpdateKnownHosts(hostname['hostname_full'],
621
                      hostname['ip'],
622
                      sshkey,
623
                      )
624

    
625
    _InitSSHSetup(hostname['hostname'])
626

    
627
    # init of cluster config file
628
    cfgw = config.ConfigWriter()
629
    cfgw.InitConfig(hostname['hostname'], hostname['ip'], self.secondary_ip,
630
                    sshkey, self.op.mac_prefix,
631
                    self.op.vg_name, self.op.def_bridge)
632

    
633

    
634
class LUDestroyCluster(NoHooksLU):
635
  """Logical unit for destroying the cluster.
636

637
  """
638
  _OP_REQP = []
639

    
640
  def CheckPrereq(self):
641
    """Check prerequisites.
642

643
    This checks whether the cluster is empty.
644

645
    Any errors are signalled by raising errors.OpPrereqError.
646

647
    """
648
    master = self.sstore.GetMasterNode()
649

    
650
    nodelist = self.cfg.GetNodeList()
651
    if len(nodelist) != 1 or nodelist[0] != master:
652
      raise errors.OpPrereqError("There are still %d node(s) in"
653
                                 " this cluster." % (len(nodelist) - 1))
654
    instancelist = self.cfg.GetInstanceList()
655
    if instancelist:
656
      raise errors.OpPrereqError("There are still %d instance(s) in"
657
                                 " this cluster." % len(instancelist))
658

    
659
  def Exec(self, feedback_fn):
660
    """Destroys the cluster.
661

662
    """
663
    utils.CreateBackup('/root/.ssh/id_dsa')
664
    utils.CreateBackup('/root/.ssh/id_dsa.pub')
665
    rpc.call_node_leave_cluster(self.sstore.GetMasterNode())
666

    
667

    
668
class LUVerifyCluster(NoHooksLU):
669
  """Verifies the cluster status.
670

671
  """
672
  _OP_REQP = []
673

    
674
  def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
675
                  remote_version, feedback_fn):
676
    """Run multiple tests against a node.
677

678
    Test list:
679
      - compares ganeti version
680
      - checks vg existance and size > 20G
681
      - checks config file checksum
682
      - checks ssh to other nodes
683

684
    Args:
685
      node: name of the node to check
686
      file_list: required list of files
687
      local_cksum: dictionary of local files and their checksums
688

689
    """
690
    # compares ganeti version
691
    local_version = constants.PROTOCOL_VERSION
692
    if not remote_version:
693
      feedback_fn(" - ERROR: connection to %s failed" % (node))
694
      return True
695

    
696
    if local_version != remote_version:
697
      feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
698
                      (local_version, node, remote_version))
699
      return True
700

    
701
    # checks vg existance and size > 20G
702

    
703
    bad = False
704
    if not vglist:
705
      feedback_fn("  - ERROR: unable to check volume groups on node %s." %
706
                      (node,))
707
      bad = True
708
    else:
709
      vgstatus = _HasValidVG(vglist, self.cfg.GetVGName())
710
      if vgstatus:
711
        feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
712
        bad = True
713

    
714
    # checks config file checksum
715
    # checks ssh to any
716

    
717
    if 'filelist' not in node_result:
718
      bad = True
719
      feedback_fn("  - ERROR: node hasn't returned file checksum data")
720
    else:
721
      remote_cksum = node_result['filelist']
722
      for file_name in file_list:
723
        if file_name not in remote_cksum:
724
          bad = True
725
          feedback_fn("  - ERROR: file '%s' missing" % file_name)
726
        elif remote_cksum[file_name] != local_cksum[file_name]:
727
          bad = True
728
          feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
729

    
730
    if 'nodelist' not in node_result:
731
      bad = True
732
      feedback_fn("  - ERROR: node hasn't returned node connectivity data")
733
    else:
734
      if node_result['nodelist']:
735
        bad = True
736
        for node in node_result['nodelist']:
737
          feedback_fn("  - ERROR: communication with node '%s': %s" %
738
                          (node, node_result['nodelist'][node]))
739
    hyp_result = node_result.get('hypervisor', None)
740
    if hyp_result is not None:
741
      feedback_fn("  - ERROR: hypervisor verify failure: '%s'" % hyp_result)
742
    return bad
743

    
744
  def _VerifyInstance(self, instance, node_vol_is, node_instance, feedback_fn):
745
    """Verify an instance.
746

747
    This function checks to see if the required block devices are
748
    available on the instance's node.
749

750
    """
751
    bad = False
752

    
753
    instancelist = self.cfg.GetInstanceList()
754
    if not instance in instancelist:
755
      feedback_fn("  - ERROR: instance %s not in instance list %s" %
756
                      (instance, instancelist))
757
      bad = True
758

    
759
    instanceconfig = self.cfg.GetInstanceInfo(instance)
760
    node_current = instanceconfig.primary_node
761

    
762
    node_vol_should = {}
763
    instanceconfig.MapLVsByNode(node_vol_should)
764

    
765
    for node in node_vol_should:
766
      for volume in node_vol_should[node]:
767
        if node not in node_vol_is or volume not in node_vol_is[node]:
768
          feedback_fn("  - ERROR: volume %s missing on node %s" %
769
                          (volume, node))
770
          bad = True
771

    
772
    if not instanceconfig.status == 'down':
773
      if not instance in node_instance[node_current]:
774
        feedback_fn("  - ERROR: instance %s not running on node %s" %
775
                        (instance, node_current))
776
        bad = True
777

    
778
    for node in node_instance:
779
      if (not node == node_current):
780
        if instance in node_instance[node]:
781
          feedback_fn("  - ERROR: instance %s should not run on node %s" %
782
                          (instance, node))
783
          bad = True
784

    
785
    return not bad
786

    
787
  def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
788
    """Verify if there are any unknown volumes in the cluster.
789

790
    The .os, .swap and backup volumes are ignored. All other volumes are
791
    reported as unknown.
792

793
    """
794
    bad = False
795

    
796
    for node in node_vol_is:
797
      for volume in node_vol_is[node]:
798
        if node not in node_vol_should or volume not in node_vol_should[node]:
799
          feedback_fn("  - ERROR: volume %s on node %s should not exist" %
800
                      (volume, node))
801
          bad = True
802
    return bad
803

    
804
  def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
805
    """Verify the list of running instances.
806

807
    This checks what instances are running but unknown to the cluster.
808

809
    """
810
    bad = False
811
    for node in node_instance:
812
      for runninginstance in node_instance[node]:
813
        if runninginstance not in instancelist:
814
          feedback_fn("  - ERROR: instance %s on node %s should not exist" %
815
                          (runninginstance, node))
816
          bad = True
817
    return bad
818

    
819
  def CheckPrereq(self):
820
    """Check prerequisites.
821

822
    This has no prerequisites.
823

824
    """
825
    pass
826

    
827
  def Exec(self, feedback_fn):
828
    """Verify integrity of cluster, performing various test on nodes.
829

830
    """
831
    bad = False
832
    feedback_fn("* Verifying global settings")
833
    self.cfg.VerifyConfig()
834

    
835
    master = self.sstore.GetMasterNode()
836
    vg_name = self.cfg.GetVGName()
837
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
838
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
839
    node_volume = {}
840
    node_instance = {}
841

    
842
    # FIXME: verify OS list
843
    # do local checksums
844
    file_names = list(self.sstore.GetFileList())
845
    file_names.append(constants.SSL_CERT_FILE)
846
    file_names.append(constants.CLUSTER_CONF_FILE)
847
    local_checksums = utils.FingerprintFiles(file_names)
848

    
849
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
850
    all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
851
    all_instanceinfo = rpc.call_instance_list(nodelist)
852
    all_vglist = rpc.call_vg_list(nodelist)
853
    node_verify_param = {
854
      'filelist': file_names,
855
      'nodelist': nodelist,
856
      'hypervisor': None,
857
      }
858
    all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
859
    all_rversion = rpc.call_version(nodelist)
860

    
861
    for node in nodelist:
862
      feedback_fn("* Verifying node %s" % node)
863
      result = self._VerifyNode(node, file_names, local_checksums,
864
                                all_vglist[node], all_nvinfo[node],
865
                                all_rversion[node], feedback_fn)
866
      bad = bad or result
867

    
868
      # node_volume
869
      volumeinfo = all_volumeinfo[node]
870

    
871
      if type(volumeinfo) != dict:
872
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
873
        bad = True
874
        continue
875

    
876
      node_volume[node] = volumeinfo
877

    
878
      # node_instance
879
      nodeinstance = all_instanceinfo[node]
880
      if type(nodeinstance) != list:
881
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
882
        bad = True
883
        continue
884

    
885
      node_instance[node] = nodeinstance
886

    
887
    node_vol_should = {}
888

    
889
    for instance in instancelist:
890
      feedback_fn("* Verifying instance %s" % instance)
891
      result =  self._VerifyInstance(instance, node_volume, node_instance,
892
                                     feedback_fn)
893
      bad = bad or result
894

    
895
      inst_config = self.cfg.GetInstanceInfo(instance)
896

    
897
      inst_config.MapLVsByNode(node_vol_should)
898

    
899
    feedback_fn("* Verifying orphan volumes")
900
    result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
901
                                       feedback_fn)
902
    bad = bad or result
903

    
904
    feedback_fn("* Verifying remaining instances")
905
    result = self._VerifyOrphanInstances(instancelist, node_instance,
906
                                         feedback_fn)
907
    bad = bad or result
908

    
909
    return int(bad)
910

    
911

    
912
def _WaitForSync(cfgw, instance, oneshot=False, unlock=False):
913
  """Sleep and poll for an instance's disk to sync.
914

915
  """
916
  if not instance.disks:
917
    return True
918

    
919
  if not oneshot:
920
    logger.ToStdout("Waiting for instance %s to sync disks." % instance.name)
921

    
922
  node = instance.primary_node
923

    
924
  for dev in instance.disks:
925
    cfgw.SetDiskID(dev, node)
926

    
927
  retries = 0
928
  while True:
929
    max_time = 0
930
    done = True
931
    cumul_degraded = False
932
    rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
933
    if not rstats:
934
      logger.ToStderr("Can't get any data from node %s" % node)
935
      retries += 1
936
      if retries >= 10:
937
        raise errors.RemoteError("Can't contact node %s for mirror data,"
938
                                 " aborting." % node)
939
      time.sleep(6)
940
      continue
941
    retries = 0
942
    for i in range(len(rstats)):
943
      mstat = rstats[i]
944
      if mstat is None:
945
        logger.ToStderr("Can't compute data for node %s/%s" %
946
                        (node, instance.disks[i].iv_name))
947
        continue
948
      perc_done, est_time, is_degraded = mstat
949
      cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
950
      if perc_done is not None:
951
        done = False
952
        if est_time is not None:
953
          rem_time = "%d estimated seconds remaining" % est_time
954
          max_time = est_time
955
        else:
956
          rem_time = "no time estimate"
957
        logger.ToStdout("- device %s: %5.2f%% done, %s" %
958
                        (instance.disks[i].iv_name, perc_done, rem_time))
959
    if done or oneshot:
960
      break
961

    
962
    if unlock:
963
      utils.Unlock('cmd')
964
    try:
965
      time.sleep(min(60, max_time))
966
    finally:
967
      if unlock:
968
        utils.Lock('cmd')
969

    
970
  if done:
971
    logger.ToStdout("Instance %s's disks are in sync." % instance.name)
972
  return not cumul_degraded
973

    
974

    
975
def _CheckDiskConsistency(cfgw, dev, node, on_primary):
976
  """Check that mirrors are not degraded.
977

978
  """
979
  cfgw.SetDiskID(dev, node)
980

    
981
  result = True
982
  if on_primary or dev.AssembleOnSecondary():
983
    rstats = rpc.call_blockdev_find(node, dev)
984
    if not rstats:
985
      logger.ToStderr("Can't get any data from node %s" % node)
986
      result = False
987
    else:
988
      result = result and (not rstats[5])
989
  if dev.children:
990
    for child in dev.children:
991
      result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
992

    
993
  return result
994

    
995

    
996
class LUDiagnoseOS(NoHooksLU):
997
  """Logical unit for OS diagnose/query.
998

999
  """
1000
  _OP_REQP = []
1001

    
1002
  def CheckPrereq(self):
1003
    """Check prerequisites.
1004

1005
    This always succeeds, since this is a pure query LU.
1006

1007
    """
1008
    return
1009

    
1010
  def Exec(self, feedback_fn):
1011
    """Compute the list of OSes.
1012

1013
    """
1014
    node_list = self.cfg.GetNodeList()
1015
    node_data = rpc.call_os_diagnose(node_list)
1016
    if node_data == False:
1017
      raise errors.OpExecError("Can't gather the list of OSes")
1018
    return node_data
1019

    
1020

    
1021
class LURemoveNode(LogicalUnit):
1022
  """Logical unit for removing a node.
1023

1024
  """
1025
  HPATH = "node-remove"
1026
  HTYPE = constants.HTYPE_NODE
1027
  _OP_REQP = ["node_name"]
1028

    
1029
  def BuildHooksEnv(self):
1030
    """Build hooks env.
1031

1032
    This doesn't run on the target node in the pre phase as a failed
1033
    node would not allows itself to run.
1034

1035
    """
1036
    env = {
1037
      "NODE_NAME": self.op.node_name,
1038
      }
1039
    all_nodes = self.cfg.GetNodeList()
1040
    all_nodes.remove(self.op.node_name)
1041
    return env, all_nodes, all_nodes
1042

    
1043
  def CheckPrereq(self):
1044
    """Check prerequisites.
1045

1046
    This checks:
1047
     - the node exists in the configuration
1048
     - it does not have primary or secondary instances
1049
     - it's not the master
1050

1051
    Any errors are signalled by raising errors.OpPrereqError.
1052

1053
    """
1054
    node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1055
    if node is None:
1056
      raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1057

    
1058
    instance_list = self.cfg.GetInstanceList()
1059

    
1060
    masternode = self.sstore.GetMasterNode()
1061
    if node.name == masternode:
1062
      raise errors.OpPrereqError("Node is the master node,"
1063
                                 " you need to failover first.")
1064

    
1065
    for instance_name in instance_list:
1066
      instance = self.cfg.GetInstanceInfo(instance_name)
1067
      if node.name == instance.primary_node:
1068
        raise errors.OpPrereqError("Instance %s still running on the node,"
1069
                                   " please remove first." % instance_name)
1070
      if node.name in instance.secondary_nodes:
1071
        raise errors.OpPrereqError("Instance %s has node as a secondary,"
1072
                                   " please remove first." % instance_name)
1073
    self.op.node_name = node.name
1074
    self.node = node
1075

    
1076
  def Exec(self, feedback_fn):
1077
    """Removes the node from the cluster.
1078

1079
    """
1080
    node = self.node
1081
    logger.Info("stopping the node daemon and removing configs from node %s" %
1082
                node.name)
1083

    
1084
    rpc.call_node_leave_cluster(node.name)
1085

    
1086
    ssh.SSHCall(node.name, 'root', "%s stop" % constants.NODE_INITD_SCRIPT)
1087

    
1088
    logger.Info("Removing node %s from config" % node.name)
1089

    
1090
    self.cfg.RemoveNode(node.name)
1091

    
1092

    
1093
class LUQueryNodes(NoHooksLU):
1094
  """Logical unit for querying nodes.
1095

1096
  """
1097
  _OP_REQP = ["output_fields"]
1098

    
1099
  def CheckPrereq(self):
1100
    """Check prerequisites.
1101

1102
    This checks that the fields required are valid output fields.
1103

1104
    """
1105
    self.dynamic_fields = frozenset(["dtotal", "dfree",
1106
                                     "mtotal", "mnode", "mfree"])
1107

    
1108
    _CheckOutputFields(static=["name", "pinst", "sinst", "pip", "sip"],
1109
                       dynamic=self.dynamic_fields,
1110
                       selected=self.op.output_fields)
1111

    
1112

    
1113
  def Exec(self, feedback_fn):
1114
    """Computes the list of nodes and their attributes.
1115

1116
    """
1117
    nodenames = utils.NiceSort(self.cfg.GetNodeList())
1118
    nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1119

    
1120

    
1121
    # begin data gathering
1122

    
1123
    if self.dynamic_fields.intersection(self.op.output_fields):
1124
      live_data = {}
1125
      node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1126
      for name in nodenames:
1127
        nodeinfo = node_data.get(name, None)
1128
        if nodeinfo:
1129
          live_data[name] = {
1130
            "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1131
            "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1132
            "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1133
            "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1134
            "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1135
            }
1136
        else:
1137
          live_data[name] = {}
1138
    else:
1139
      live_data = dict.fromkeys(nodenames, {})
1140

    
1141
    node_to_primary = dict.fromkeys(nodenames, 0)
1142
    node_to_secondary = dict.fromkeys(nodenames, 0)
1143

    
1144
    if "pinst" in self.op.output_fields or "sinst" in self.op.output_fields:
1145
      instancelist = self.cfg.GetInstanceList()
1146

    
1147
      for instance in instancelist:
1148
        instanceinfo = self.cfg.GetInstanceInfo(instance)
1149
        node_to_primary[instanceinfo.primary_node] += 1
1150
        for secnode in instanceinfo.secondary_nodes:
1151
          node_to_secondary[secnode] += 1
1152

    
1153
    # end data gathering
1154

    
1155
    output = []
1156
    for node in nodelist:
1157
      node_output = []
1158
      for field in self.op.output_fields:
1159
        if field == "name":
1160
          val = node.name
1161
        elif field == "pinst":
1162
          val = node_to_primary[node.name]
1163
        elif field == "sinst":
1164
          val = node_to_secondary[node.name]
1165
        elif field == "pip":
1166
          val = node.primary_ip
1167
        elif field == "sip":
1168
          val = node.secondary_ip
1169
        elif field in self.dynamic_fields:
1170
          val = live_data[node.name].get(field, "?")
1171
        else:
1172
          raise errors.ParameterError(field)
1173
        val = str(val)
1174
        node_output.append(val)
1175
      output.append(node_output)
1176

    
1177
    return output
1178

    
1179

    
1180
class LUQueryNodeVolumes(NoHooksLU):
1181
  """Logical unit for getting volumes on node(s).
1182

1183
  """
1184
  _OP_REQP = ["nodes", "output_fields"]
1185

    
1186
  def CheckPrereq(self):
1187
    """Check prerequisites.
1188

1189
    This checks that the fields required are valid output fields.
1190

1191
    """
1192
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1193

    
1194
    _CheckOutputFields(static=["node"],
1195
                       dynamic=["phys", "vg", "name", "size", "instance"],
1196
                       selected=self.op.output_fields)
1197

    
1198

    
1199
  def Exec(self, feedback_fn):
1200
    """Computes the list of nodes and their attributes.
1201

1202
    """
1203
    nodenames = utils.NiceSort([node.name for node in self.nodes])
1204
    volumes = rpc.call_node_volumes(nodenames)
1205

    
1206
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
1207
             in self.cfg.GetInstanceList()]
1208

    
1209
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1210

    
1211
    output = []
1212
    for node in nodenames:
1213
      if node not in volumes or not volumes[node]:
1214
        continue
1215

    
1216
      node_vols = volumes[node][:]
1217
      node_vols.sort(key=lambda vol: vol['dev'])
1218

    
1219
      for vol in node_vols:
1220
        node_output = []
1221
        for field in self.op.output_fields:
1222
          if field == "node":
1223
            val = node
1224
          elif field == "phys":
1225
            val = vol['dev']
1226
          elif field == "vg":
1227
            val = vol['vg']
1228
          elif field == "name":
1229
            val = vol['name']
1230
          elif field == "size":
1231
            val = int(float(vol['size']))
1232
          elif field == "instance":
1233
            for inst in ilist:
1234
              if node not in lv_by_node[inst]:
1235
                continue
1236
              if vol['name'] in lv_by_node[inst][node]:
1237
                val = inst.name
1238
                break
1239
            else:
1240
              val = '-'
1241
          else:
1242
            raise errors.ParameterError(field)
1243
          node_output.append(str(val))
1244

    
1245
        output.append(node_output)
1246

    
1247
    return output
1248

    
1249

    
1250
class LUAddNode(LogicalUnit):
1251
  """Logical unit for adding node to the cluster.
1252

1253
  """
1254
  HPATH = "node-add"
1255
  HTYPE = constants.HTYPE_NODE
1256
  _OP_REQP = ["node_name"]
1257

    
1258
  def BuildHooksEnv(self):
1259
    """Build hooks env.
1260

1261
    This will run on all nodes before, and on all nodes + the new node after.
1262

1263
    """
1264
    env = {
1265
      "NODE_NAME": self.op.node_name,
1266
      "NODE_PIP": self.op.primary_ip,
1267
      "NODE_SIP": self.op.secondary_ip,
1268
      }
1269
    nodes_0 = self.cfg.GetNodeList()
1270
    nodes_1 = nodes_0 + [self.op.node_name, ]
1271
    return env, nodes_0, nodes_1
1272

    
1273
  def CheckPrereq(self):
1274
    """Check prerequisites.
1275

1276
    This checks:
1277
     - the new node is not already in the config
1278
     - it is resolvable
1279
     - its parameters (single/dual homed) matches the cluster
1280

1281
    Any errors are signalled by raising errors.OpPrereqError.
1282

1283
    """
1284
    node_name = self.op.node_name
1285
    cfg = self.cfg
1286

    
1287
    dns_data = utils.LookupHostname(node_name)
1288
    if not dns_data:
1289
      raise errors.OpPrereqError("Node %s is not resolvable" % node_name)
1290

    
1291
    node = dns_data['hostname']
1292
    primary_ip = self.op.primary_ip = dns_data['ip']
1293
    secondary_ip = getattr(self.op, "secondary_ip", None)
1294
    if secondary_ip is None:
1295
      secondary_ip = primary_ip
1296
    if not utils.IsValidIP(secondary_ip):
1297
      raise errors.OpPrereqError("Invalid secondary IP given")
1298
    self.op.secondary_ip = secondary_ip
1299
    node_list = cfg.GetNodeList()
1300
    if node in node_list:
1301
      raise errors.OpPrereqError("Node %s is already in the configuration"
1302
                                 % node)
1303

    
1304
    for existing_node_name in node_list:
1305
      existing_node = cfg.GetNodeInfo(existing_node_name)
1306
      if (existing_node.primary_ip == primary_ip or
1307
          existing_node.secondary_ip == primary_ip or
1308
          existing_node.primary_ip == secondary_ip or
1309
          existing_node.secondary_ip == secondary_ip):
1310
        raise errors.OpPrereqError("New node ip address(es) conflict with"
1311
                                   " existing node %s" % existing_node.name)
1312

    
1313
    # check that the type of the node (single versus dual homed) is the
1314
    # same as for the master
1315
    myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1316
    master_singlehomed = myself.secondary_ip == myself.primary_ip
1317
    newbie_singlehomed = secondary_ip == primary_ip
1318
    if master_singlehomed != newbie_singlehomed:
1319
      if master_singlehomed:
1320
        raise errors.OpPrereqError("The master has no private ip but the"
1321
                                   " new node has one")
1322
      else:
1323
        raise errors.OpPrereqError("The master has a private ip but the"
1324
                                   " new node doesn't have one")
1325

    
1326
    # checks reachablity
1327
    command = ["fping", "-q", primary_ip]
1328
    result = utils.RunCmd(command)
1329
    if result.failed:
1330
      raise errors.OpPrereqError("Node not reachable by ping")
1331

    
1332
    if not newbie_singlehomed:
1333
      # check reachability from my secondary ip to newbie's secondary ip
1334
      command = ["fping", "-S%s" % myself.secondary_ip, "-q", secondary_ip]
1335
      result = utils.RunCmd(command)
1336
      if result.failed:
1337
        raise errors.OpPrereqError("Node secondary ip not reachable by ping")
1338

    
1339
    self.new_node = objects.Node(name=node,
1340
                                 primary_ip=primary_ip,
1341
                                 secondary_ip=secondary_ip)
1342

    
1343
  def Exec(self, feedback_fn):
1344
    """Adds the new node to the cluster.
1345

1346
    """
1347
    new_node = self.new_node
1348
    node = new_node.name
1349

    
1350
    # set up inter-node password and certificate and restarts the node daemon
1351
    gntpass = self.sstore.GetNodeDaemonPassword()
1352
    if not re.match('^[a-zA-Z0-9.]{1,64}$', gntpass):
1353
      raise errors.OpExecError("ganeti password corruption detected")
1354
    f = open(constants.SSL_CERT_FILE)
1355
    try:
1356
      gntpem = f.read(8192)
1357
    finally:
1358
      f.close()
1359
    # in the base64 pem encoding, neither '!' nor '.' are valid chars,
1360
    # so we use this to detect an invalid certificate; as long as the
1361
    # cert doesn't contain this, the here-document will be correctly
1362
    # parsed by the shell sequence below
1363
    if re.search('^!EOF\.', gntpem, re.MULTILINE):
1364
      raise errors.OpExecError("invalid PEM encoding in the SSL certificate")
1365
    if not gntpem.endswith("\n"):
1366
      raise errors.OpExecError("PEM must end with newline")
1367
    logger.Info("copy cluster pass to %s and starting the node daemon" % node)
1368

    
1369
    # and then connect with ssh to set password and start ganeti-noded
1370
    # note that all the below variables are sanitized at this point,
1371
    # either by being constants or by the checks above
1372
    ss = self.sstore
1373
    mycommand = ("umask 077 && "
1374
                 "echo '%s' > '%s' && "
1375
                 "cat > '%s' << '!EOF.' && \n"
1376
                 "%s!EOF.\n%s restart" %
1377
                 (gntpass, ss.KeyToFilename(ss.SS_NODED_PASS),
1378
                  constants.SSL_CERT_FILE, gntpem,
1379
                  constants.NODE_INITD_SCRIPT))
1380

    
1381
    result = ssh.SSHCall(node, 'root', mycommand, batch=False, ask_key=True)
1382
    if result.failed:
1383
      raise errors.OpExecError("Remote command on node %s, error: %s,"
1384
                               " output: %s" %
1385
                               (node, result.fail_reason, result.output))
1386

    
1387
    # check connectivity
1388
    time.sleep(4)
1389

    
1390
    result = rpc.call_version([node])[node]
1391
    if result:
1392
      if constants.PROTOCOL_VERSION == result:
1393
        logger.Info("communication to node %s fine, sw version %s match" %
1394
                    (node, result))
1395
      else:
1396
        raise errors.OpExecError("Version mismatch master version %s,"
1397
                                 " node version %s" %
1398
                                 (constants.PROTOCOL_VERSION, result))
1399
    else:
1400
      raise errors.OpExecError("Cannot get version from the new node")
1401

    
1402
    # setup ssh on node
1403
    logger.Info("copy ssh key to node %s" % node)
1404
    keyarray = []
1405
    keyfiles = ["/etc/ssh/ssh_host_dsa_key", "/etc/ssh/ssh_host_dsa_key.pub",
1406
                "/etc/ssh/ssh_host_rsa_key", "/etc/ssh/ssh_host_rsa_key.pub",
1407
                "/root/.ssh/id_dsa", "/root/.ssh/id_dsa.pub"]
1408

    
1409
    for i in keyfiles:
1410
      f = open(i, 'r')
1411
      try:
1412
        keyarray.append(f.read())
1413
      finally:
1414
        f.close()
1415

    
1416
    result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1417
                               keyarray[3], keyarray[4], keyarray[5])
1418

    
1419
    if not result:
1420
      raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1421

    
1422
    # Add node to our /etc/hosts, and add key to known_hosts
1423
    _UpdateEtcHosts(new_node.name, new_node.primary_ip)
1424
    _UpdateKnownHosts(new_node.name, new_node.primary_ip,
1425
                      self.cfg.GetHostKey())
1426

    
1427
    if new_node.secondary_ip != new_node.primary_ip:
1428
      result = ssh.SSHCall(node, "root",
1429
                           "fping -S 127.0.0.1 -q %s" % new_node.secondary_ip)
1430
      if result.failed:
1431
        raise errors.OpExecError("Node claims it doesn't have the"
1432
                                 " secondary ip you gave (%s).\n"
1433
                                 "Please fix and re-run this command." %
1434
                                 new_node.secondary_ip)
1435

    
1436
    # Distribute updated /etc/hosts and known_hosts to all nodes,
1437
    # including the node just added
1438
    myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1439
    dist_nodes = self.cfg.GetNodeList() + [node]
1440
    if myself.name in dist_nodes:
1441
      dist_nodes.remove(myself.name)
1442

    
1443
    logger.Debug("Copying hosts and known_hosts to all nodes")
1444
    for fname in ("/etc/hosts", constants.SSH_KNOWN_HOSTS_FILE):
1445
      result = rpc.call_upload_file(dist_nodes, fname)
1446
      for to_node in dist_nodes:
1447
        if not result[to_node]:
1448
          logger.Error("copy of file %s to node %s failed" %
1449
                       (fname, to_node))
1450

    
1451
    to_copy = ss.GetFileList()
1452
    for fname in to_copy:
1453
      if not ssh.CopyFileToNode(node, fname):
1454
        logger.Error("could not copy file %s to node %s" % (fname, node))
1455

    
1456
    logger.Info("adding node %s to cluster.conf" % node)
1457
    self.cfg.AddNode(new_node)
1458

    
1459

    
1460
class LUMasterFailover(LogicalUnit):
1461
  """Failover the master node to the current node.
1462

1463
  This is a special LU in that it must run on a non-master node.
1464

1465
  """
1466
  HPATH = "master-failover"
1467
  HTYPE = constants.HTYPE_CLUSTER
1468
  REQ_MASTER = False
1469
  _OP_REQP = []
1470

    
1471
  def BuildHooksEnv(self):
1472
    """Build hooks env.
1473

1474
    This will run on the new master only in the pre phase, and on all
1475
    the nodes in the post phase.
1476

1477
    """
1478
    env = {
1479
      "NEW_MASTER": self.new_master,
1480
      "OLD_MASTER": self.old_master,
1481
      }
1482
    return env, [self.new_master], self.cfg.GetNodeList()
1483

    
1484
  def CheckPrereq(self):
1485
    """Check prerequisites.
1486

1487
    This checks that we are not already the master.
1488

1489
    """
1490
    self.new_master = socket.gethostname()
1491

    
1492
    self.old_master = self.sstore.GetMasterNode()
1493

    
1494
    if self.old_master == self.new_master:
1495
      raise errors.OpPrereqError("This commands must be run on the node"
1496
                                 " where you want the new master to be.\n"
1497
                                 "%s is already the master" %
1498
                                 self.old_master)
1499

    
1500
  def Exec(self, feedback_fn):
1501
    """Failover the master node.
1502

1503
    This command, when run on a non-master node, will cause the current
1504
    master to cease being master, and the non-master to become new
1505
    master.
1506

1507
    """
1508
    #TODO: do not rely on gethostname returning the FQDN
1509
    logger.Info("setting master to %s, old master: %s" %
1510
                (self.new_master, self.old_master))
1511

    
1512
    if not rpc.call_node_stop_master(self.old_master):
1513
      logger.Error("could disable the master role on the old master"
1514
                   " %s, please disable manually" % self.old_master)
1515

    
1516
    ss = self.sstore
1517
    ss.SetKey(ss.SS_MASTER_NODE, self.new_master)
1518
    if not rpc.call_upload_file(self.cfg.GetNodeList(),
1519
                                ss.KeyToFilename(ss.SS_MASTER_NODE)):
1520
      logger.Error("could not distribute the new simple store master file"
1521
                   " to the other nodes, please check.")
1522

    
1523
    if not rpc.call_node_start_master(self.new_master):
1524
      logger.Error("could not start the master role on the new master"
1525
                   " %s, please check" % self.new_master)
1526
      feedback_fn("Error in activating the master IP on the new master,\n"
1527
                  "please fix manually.")
1528

    
1529

    
1530

    
1531
class LUQueryClusterInfo(NoHooksLU):
1532
  """Query cluster configuration.
1533

1534
  """
1535
  _OP_REQP = []
1536
  REQ_MASTER = False
1537

    
1538
  def CheckPrereq(self):
1539
    """No prerequsites needed for this LU.
1540

1541
    """
1542
    pass
1543

    
1544
  def Exec(self, feedback_fn):
1545
    """Return cluster config.
1546

1547
    """
1548
    result = {
1549
      "name": self.sstore.GetClusterName(),
1550
      "software_version": constants.RELEASE_VERSION,
1551
      "protocol_version": constants.PROTOCOL_VERSION,
1552
      "config_version": constants.CONFIG_VERSION,
1553
      "os_api_version": constants.OS_API_VERSION,
1554
      "export_version": constants.EXPORT_VERSION,
1555
      "master": self.sstore.GetMasterNode(),
1556
      "architecture": (platform.architecture()[0], platform.machine()),
1557
      }
1558

    
1559
    return result
1560

    
1561

    
1562
class LUClusterCopyFile(NoHooksLU):
1563
  """Copy file to cluster.
1564

1565
  """
1566
  _OP_REQP = ["nodes", "filename"]
1567

    
1568
  def CheckPrereq(self):
1569
    """Check prerequisites.
1570

1571
    It should check that the named file exists and that the given list
1572
    of nodes is valid.
1573

1574
    """
1575
    if not os.path.exists(self.op.filename):
1576
      raise errors.OpPrereqError("No such filename '%s'" % self.op.filename)
1577

    
1578
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1579

    
1580
  def Exec(self, feedback_fn):
1581
    """Copy a file from master to some nodes.
1582

1583
    Args:
1584
      opts - class with options as members
1585
      args - list containing a single element, the file name
1586
    Opts used:
1587
      nodes - list containing the name of target nodes; if empty, all nodes
1588

1589
    """
1590
    filename = self.op.filename
1591

    
1592
    myname = socket.gethostname()
1593

    
1594
    for node in [node.name for node in self.nodes]:
1595
      if node == myname:
1596
        continue
1597
      if not ssh.CopyFileToNode(node, filename):
1598
        logger.Error("Copy of file %s to node %s failed" % (filename, node))
1599

    
1600

    
1601
class LUDumpClusterConfig(NoHooksLU):
1602
  """Return a text-representation of the cluster-config.
1603

1604
  """
1605
  _OP_REQP = []
1606

    
1607
  def CheckPrereq(self):
1608
    """No prerequisites.
1609

1610
    """
1611
    pass
1612

    
1613
  def Exec(self, feedback_fn):
1614
    """Dump a representation of the cluster config to the standard output.
1615

1616
    """
1617
    return self.cfg.DumpConfig()
1618

    
1619

    
1620
class LURunClusterCommand(NoHooksLU):
1621
  """Run a command on some nodes.
1622

1623
  """
1624
  _OP_REQP = ["command", "nodes"]
1625

    
1626
  def CheckPrereq(self):
1627
    """Check prerequisites.
1628

1629
    It checks that the given list of nodes is valid.
1630

1631
    """
1632
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1633

    
1634
  def Exec(self, feedback_fn):
1635
    """Run a command on some nodes.
1636

1637
    """
1638
    data = []
1639
    for node in self.nodes:
1640
      result = ssh.SSHCall(node.name, "root", self.op.command)
1641
      data.append((node.name, result.output, result.exit_code))
1642

    
1643
    return data
1644

    
1645

    
1646
class LUActivateInstanceDisks(NoHooksLU):
1647
  """Bring up an instance's disks.
1648

1649
  """
1650
  _OP_REQP = ["instance_name"]
1651

    
1652
  def CheckPrereq(self):
1653
    """Check prerequisites.
1654

1655
    This checks that the instance is in the cluster.
1656

1657
    """
1658
    instance = self.cfg.GetInstanceInfo(
1659
      self.cfg.ExpandInstanceName(self.op.instance_name))
1660
    if instance is None:
1661
      raise errors.OpPrereqError("Instance '%s' not known" %
1662
                                 self.op.instance_name)
1663
    self.instance = instance
1664

    
1665

    
1666
  def Exec(self, feedback_fn):
1667
    """Activate the disks.
1668

1669
    """
1670
    disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
1671
    if not disks_ok:
1672
      raise errors.OpExecError("Cannot activate block devices")
1673

    
1674
    return disks_info
1675

    
1676

    
1677
def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
1678
  """Prepare the block devices for an instance.
1679

1680
  This sets up the block devices on all nodes.
1681

1682
  Args:
1683
    instance: a ganeti.objects.Instance object
1684
    ignore_secondaries: if true, errors on secondary nodes won't result
1685
                        in an error return from the function
1686

1687
  Returns:
1688
    false if the operation failed
1689
    list of (host, instance_visible_name, node_visible_name) if the operation
1690
         suceeded with the mapping from node devices to instance devices
1691
  """
1692
  device_info = []
1693
  disks_ok = True
1694
  for inst_disk in instance.disks:
1695
    master_result = None
1696
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1697
      cfg.SetDiskID(node_disk, node)
1698
      is_primary = node == instance.primary_node
1699
      result = rpc.call_blockdev_assemble(node, node_disk, is_primary)
1700
      if not result:
1701
        logger.Error("could not prepare block device %s on node %s (is_pri"
1702
                     "mary=%s)" % (inst_disk.iv_name, node, is_primary))
1703
        if is_primary or not ignore_secondaries:
1704
          disks_ok = False
1705
      if is_primary:
1706
        master_result = result
1707
    device_info.append((instance.primary_node, inst_disk.iv_name,
1708
                        master_result))
1709

    
1710
  return disks_ok, device_info
1711

    
1712

    
1713
def _StartInstanceDisks(cfg, instance, force):
1714
  """Start the disks of an instance.
1715

1716
  """
1717
  disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
1718
                                           ignore_secondaries=force)
1719
  if not disks_ok:
1720
    _ShutdownInstanceDisks(instance, cfg)
1721
    if force is not None and not force:
1722
      logger.Error("If the message above refers to a secondary node,"
1723
                   " you can retry the operation using '--force'.")
1724
    raise errors.OpExecError("Disk consistency error")
1725

    
1726

    
1727
class LUDeactivateInstanceDisks(NoHooksLU):
1728
  """Shutdown an instance's disks.
1729

1730
  """
1731
  _OP_REQP = ["instance_name"]
1732

    
1733
  def CheckPrereq(self):
1734
    """Check prerequisites.
1735

1736
    This checks that the instance is in the cluster.
1737

1738
    """
1739
    instance = self.cfg.GetInstanceInfo(
1740
      self.cfg.ExpandInstanceName(self.op.instance_name))
1741
    if instance is None:
1742
      raise errors.OpPrereqError("Instance '%s' not known" %
1743
                                 self.op.instance_name)
1744
    self.instance = instance
1745

    
1746
  def Exec(self, feedback_fn):
1747
    """Deactivate the disks
1748

1749
    """
1750
    instance = self.instance
1751
    ins_l = rpc.call_instance_list([instance.primary_node])
1752
    ins_l = ins_l[instance.primary_node]
1753
    if not type(ins_l) is list:
1754
      raise errors.OpExecError("Can't contact node '%s'" %
1755
                               instance.primary_node)
1756

    
1757
    if self.instance.name in ins_l:
1758
      raise errors.OpExecError("Instance is running, can't shutdown"
1759
                               " block devices.")
1760

    
1761
    _ShutdownInstanceDisks(instance, self.cfg)
1762

    
1763

    
1764
def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
1765
  """Shutdown block devices of an instance.
1766

1767
  This does the shutdown on all nodes of the instance.
1768

1769
  If the ignore_primary is false, errors on the primary node are
1770
  ignored.
1771

1772
  """
1773
  result = True
1774
  for disk in instance.disks:
1775
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
1776
      cfg.SetDiskID(top_disk, node)
1777
      if not rpc.call_blockdev_shutdown(node, top_disk):
1778
        logger.Error("could not shutdown block device %s on node %s" %
1779
                     (disk.iv_name, node))
1780
        if not ignore_primary or node != instance.primary_node:
1781
          result = False
1782
  return result
1783

    
1784

    
1785
class LUStartupInstance(LogicalUnit):
1786
  """Starts an instance.
1787

1788
  """
1789
  HPATH = "instance-start"
1790
  HTYPE = constants.HTYPE_INSTANCE
1791
  _OP_REQP = ["instance_name", "force"]
1792

    
1793
  def BuildHooksEnv(self):
1794
    """Build hooks env.
1795

1796
    This runs on master, primary and secondary nodes of the instance.
1797

1798
    """
1799
    env = {
1800
      "FORCE": self.op.force,
1801
      }
1802
    env.update(_BuildInstanceHookEnvByObject(self.instance))
1803
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
1804
          list(self.instance.secondary_nodes))
1805
    return env, nl, nl
1806

    
1807
  def CheckPrereq(self):
1808
    """Check prerequisites.
1809

1810
    This checks that the instance is in the cluster.
1811

1812
    """
1813
    instance = self.cfg.GetInstanceInfo(
1814
      self.cfg.ExpandInstanceName(self.op.instance_name))
1815
    if instance is None:
1816
      raise errors.OpPrereqError("Instance '%s' not known" %
1817
                                 self.op.instance_name)
1818

    
1819
    # check bridges existance
1820
    brlist = [nic.bridge for nic in instance.nics]
1821
    if not rpc.call_bridges_exist(instance.primary_node, brlist):
1822
      raise errors.OpPrereqError("one or more target bridges %s does not"
1823
                                 " exist on destination node '%s'" %
1824
                                 (brlist, instance.primary_node))
1825

    
1826
    self.instance = instance
1827
    self.op.instance_name = instance.name
1828

    
1829
  def Exec(self, feedback_fn):
1830
    """Start the instance.
1831

1832
    """
1833
    instance = self.instance
1834
    force = self.op.force
1835
    extra_args = getattr(self.op, "extra_args", "")
1836

    
1837
    node_current = instance.primary_node
1838

    
1839
    nodeinfo = rpc.call_node_info([node_current], self.cfg.GetVGName())
1840
    if not nodeinfo:
1841
      raise errors.OpExecError("Could not contact node %s for infos" %
1842
                               (node_current))
1843

    
1844
    freememory = nodeinfo[node_current]['memory_free']
1845
    memory = instance.memory
1846
    if memory > freememory:
1847
      raise errors.OpExecError("Not enough memory to start instance"
1848
                               " %s on node %s"
1849
                               " needed %s MiB, available %s MiB" %
1850
                               (instance.name, node_current, memory,
1851
                                freememory))
1852

    
1853
    _StartInstanceDisks(self.cfg, instance, force)
1854

    
1855
    if not rpc.call_instance_start(node_current, instance, extra_args):
1856
      _ShutdownInstanceDisks(instance, self.cfg)
1857
      raise errors.OpExecError("Could not start instance")
1858

    
1859
    self.cfg.MarkInstanceUp(instance.name)
1860

    
1861

    
1862
class LUShutdownInstance(LogicalUnit):
1863
  """Shutdown an instance.
1864

1865
  """
1866
  HPATH = "instance-stop"
1867
  HTYPE = constants.HTYPE_INSTANCE
1868
  _OP_REQP = ["instance_name"]
1869

    
1870
  def BuildHooksEnv(self):
1871
    """Build hooks env.
1872

1873
    This runs on master, primary and secondary nodes of the instance.
1874

1875
    """
1876
    env = _BuildInstanceHookEnvByObject(self.instance)
1877
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
1878
          list(self.instance.secondary_nodes))
1879
    return env, nl, nl
1880

    
1881
  def CheckPrereq(self):
1882
    """Check prerequisites.
1883

1884
    This checks that the instance is in the cluster.
1885

1886
    """
1887
    instance = self.cfg.GetInstanceInfo(
1888
      self.cfg.ExpandInstanceName(self.op.instance_name))
1889
    if instance is None:
1890
      raise errors.OpPrereqError("Instance '%s' not known" %
1891
                                 self.op.instance_name)
1892
    self.instance = instance
1893

    
1894
  def Exec(self, feedback_fn):
1895
    """Shutdown the instance.
1896

1897
    """
1898
    instance = self.instance
1899
    node_current = instance.primary_node
1900
    if not rpc.call_instance_shutdown(node_current, instance):
1901
      logger.Error("could not shutdown instance")
1902

    
1903
    self.cfg.MarkInstanceDown(instance.name)
1904
    _ShutdownInstanceDisks(instance, self.cfg)
1905

    
1906

    
1907
class LUReinstallInstance(LogicalUnit):
1908
  """Reinstall an instance.
1909

1910
  """
1911
  HPATH = "instance-reinstall"
1912
  HTYPE = constants.HTYPE_INSTANCE
1913
  _OP_REQP = ["instance_name"]
1914

    
1915
  def BuildHooksEnv(self):
1916
    """Build hooks env.
1917

1918
    This runs on master, primary and secondary nodes of the instance.
1919

1920
    """
1921
    env = _BuildInstanceHookEnvByObject(self.instance)
1922
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
1923
          list(self.instance.secondary_nodes))
1924
    return env, nl, nl
1925

    
1926
  def CheckPrereq(self):
1927
    """Check prerequisites.
1928

1929
    This checks that the instance is in the cluster and is not running.
1930

1931
    """
1932
    instance = self.cfg.GetInstanceInfo(
1933
      self.cfg.ExpandInstanceName(self.op.instance_name))
1934
    if instance is None:
1935
      raise errors.OpPrereqError("Instance '%s' not known" %
1936
                                 self.op.instance_name)
1937
    if instance.disk_template == constants.DT_DISKLESS:
1938
      raise errors.OpPrereqError("Instance '%s' has no disks" %
1939
                                 self.op.instance_name)
1940
    if instance.status != "down":
1941
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
1942
                                 self.op.instance_name)
1943
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
1944
    if remote_info:
1945
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
1946
                                 (self.op.instance_name,
1947
                                  instance.primary_node))
1948

    
1949
    self.op.os_type = getattr(self.op, "os_type", None)
1950
    if self.op.os_type is not None:
1951
      # OS verification
1952
      pnode = self.cfg.GetNodeInfo(
1953
        self.cfg.ExpandNodeName(instance.primary_node))
1954
      if pnode is None:
1955
        raise errors.OpPrereqError("Primary node '%s' is unknown" %
1956
                                   self.op.pnode)
1957
      os_obj = rpc.call_os_get([pnode.name], self.op.os_type)[pnode.name]
1958
      if not isinstance(os_obj, objects.OS):
1959
        raise errors.OpPrereqError("OS '%s' not in supported OS list for"
1960
                                   " primary node"  % self.op.os_type)
1961

    
1962
    self.instance = instance
1963

    
1964
  def Exec(self, feedback_fn):
1965
    """Reinstall the instance.
1966

1967
    """
1968
    inst = self.instance
1969

    
1970
    if self.op.os_type is not None:
1971
      feedback_fn("Changing OS to '%s'..." % self.op.os_type)
1972
      inst.os = self.op.os_type
1973
      self.cfg.AddInstance(inst)
1974

    
1975
    _StartInstanceDisks(self.cfg, inst, None)
1976
    try:
1977
      feedback_fn("Running the instance OS create scripts...")
1978
      if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
1979
        raise errors.OpExecError("Could not install OS for instance %s "
1980
                                 "on node %s" %
1981
                                 (inst.name, inst.primary_node))
1982
    finally:
1983
      _ShutdownInstanceDisks(inst, self.cfg)
1984

    
1985

    
1986
class LURemoveInstance(LogicalUnit):
1987
  """Remove an instance.
1988

1989
  """
1990
  HPATH = "instance-remove"
1991
  HTYPE = constants.HTYPE_INSTANCE
1992
  _OP_REQP = ["instance_name"]
1993

    
1994
  def BuildHooksEnv(self):
1995
    """Build hooks env.
1996

1997
    This runs on master, primary and secondary nodes of the instance.
1998

1999
    """
2000
    env = _BuildInstanceHookEnvByObject(self.instance)
2001
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2002
          list(self.instance.secondary_nodes))
2003
    return env, nl, nl
2004

    
2005
  def CheckPrereq(self):
2006
    """Check prerequisites.
2007

2008
    This checks that the instance is in the cluster.
2009

2010
    """
2011
    instance = self.cfg.GetInstanceInfo(
2012
      self.cfg.ExpandInstanceName(self.op.instance_name))
2013
    if instance is None:
2014
      raise errors.OpPrereqError("Instance '%s' not known" %
2015
                                 self.op.instance_name)
2016
    self.instance = instance
2017

    
2018
  def Exec(self, feedback_fn):
2019
    """Remove the instance.
2020

2021
    """
2022
    instance = self.instance
2023
    logger.Info("shutting down instance %s on node %s" %
2024
                (instance.name, instance.primary_node))
2025

    
2026
    if not rpc.call_instance_shutdown(instance.primary_node, instance):
2027
      raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2028
                               (instance.name, instance.primary_node))
2029

    
2030
    logger.Info("removing block devices for instance %s" % instance.name)
2031

    
2032
    _RemoveDisks(instance, self.cfg)
2033

    
2034
    logger.Info("removing instance %s out of cluster config" % instance.name)
2035

    
2036
    self.cfg.RemoveInstance(instance.name)
2037

    
2038

    
2039
class LUQueryInstances(NoHooksLU):
2040
  """Logical unit for querying instances.
2041

2042
  """
2043
  _OP_REQP = ["output_fields"]
2044

    
2045
  def CheckPrereq(self):
2046
    """Check prerequisites.
2047

2048
    This checks that the fields required are valid output fields.
2049

2050
    """
2051
    self.dynamic_fields = frozenset(["oper_state", "oper_ram"])
2052
    _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2053
                               "admin_state", "admin_ram",
2054
                               "disk_template", "ip", "mac", "bridge",
2055
                               "sda_size", "sdb_size"],
2056
                       dynamic=self.dynamic_fields,
2057
                       selected=self.op.output_fields)
2058

    
2059
  def Exec(self, feedback_fn):
2060
    """Computes the list of nodes and their attributes.
2061

2062
    """
2063
    instance_names = utils.NiceSort(self.cfg.GetInstanceList())
2064
    instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2065
                     in instance_names]
2066

    
2067
    # begin data gathering
2068

    
2069
    nodes = frozenset([inst.primary_node for inst in instance_list])
2070

    
2071
    bad_nodes = []
2072
    if self.dynamic_fields.intersection(self.op.output_fields):
2073
      live_data = {}
2074
      node_data = rpc.call_all_instances_info(nodes)
2075
      for name in nodes:
2076
        result = node_data[name]
2077
        if result:
2078
          live_data.update(result)
2079
        elif result == False:
2080
          bad_nodes.append(name)
2081
        # else no instance is alive
2082
    else:
2083
      live_data = dict([(name, {}) for name in instance_names])
2084

    
2085
    # end data gathering
2086

    
2087
    output = []
2088
    for instance in instance_list:
2089
      iout = []
2090
      for field in self.op.output_fields:
2091
        if field == "name":
2092
          val = instance.name
2093
        elif field == "os":
2094
          val = instance.os
2095
        elif field == "pnode":
2096
          val = instance.primary_node
2097
        elif field == "snodes":
2098
          val = ",".join(instance.secondary_nodes) or "-"
2099
        elif field == "admin_state":
2100
          if instance.status == "down":
2101
            val = "no"
2102
          else:
2103
            val = "yes"
2104
        elif field == "oper_state":
2105
          if instance.primary_node in bad_nodes:
2106
            val = "(node down)"
2107
          else:
2108
            if live_data.get(instance.name):
2109
              val = "running"
2110
            else:
2111
              val = "stopped"
2112
        elif field == "admin_ram":
2113
          val = instance.memory
2114
        elif field == "oper_ram":
2115
          if instance.primary_node in bad_nodes:
2116
            val = "(node down)"
2117
          elif instance.name in live_data:
2118
            val = live_data[instance.name].get("memory", "?")
2119
          else:
2120
            val = "-"
2121
        elif field == "disk_template":
2122
          val = instance.disk_template
2123
        elif field == "ip":
2124
          val = instance.nics[0].ip
2125
        elif field == "bridge":
2126
          val = instance.nics[0].bridge
2127
        elif field == "mac":
2128
          val = instance.nics[0].mac
2129
        elif field == "sda_size" or field == "sdb_size":
2130
          disk = instance.FindDisk(field[:3])
2131
          if disk is None:
2132
            val = "N/A"
2133
          else:
2134
            val = disk.size
2135
        else:
2136
          raise errors.ParameterError(field)
2137
        val = str(val)
2138
        iout.append(val)
2139
      output.append(iout)
2140

    
2141
    return output
2142

    
2143

    
2144
class LUFailoverInstance(LogicalUnit):
2145
  """Failover an instance.
2146

2147
  """
2148
  HPATH = "instance-failover"
2149
  HTYPE = constants.HTYPE_INSTANCE
2150
  _OP_REQP = ["instance_name", "ignore_consistency"]
2151

    
2152
  def BuildHooksEnv(self):
2153
    """Build hooks env.
2154

2155
    This runs on master, primary and secondary nodes of the instance.
2156

2157
    """
2158
    env = {
2159
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2160
      }
2161
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2162
    nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2163
    return env, nl, nl
2164

    
2165
  def CheckPrereq(self):
2166
    """Check prerequisites.
2167

2168
    This checks that the instance is in the cluster.
2169

2170
    """
2171
    instance = self.cfg.GetInstanceInfo(
2172
      self.cfg.ExpandInstanceName(self.op.instance_name))
2173
    if instance is None:
2174
      raise errors.OpPrereqError("Instance '%s' not known" %
2175
                                 self.op.instance_name)
2176

    
2177
    # check memory requirements on the secondary node
2178
    target_node = instance.secondary_nodes[0]
2179
    nodeinfo = rpc.call_node_info([target_node], self.cfg.GetVGName())
2180
    info = nodeinfo.get(target_node, None)
2181
    if not info:
2182
      raise errors.OpPrereqError("Cannot get current information"
2183
                                 " from node '%s'" % nodeinfo)
2184
    if instance.memory > info['memory_free']:
2185
      raise errors.OpPrereqError("Not enough memory on target node %s."
2186
                                 " %d MB available, %d MB required" %
2187
                                 (target_node, info['memory_free'],
2188
                                  instance.memory))
2189

    
2190
    # check bridge existance
2191
    brlist = [nic.bridge for nic in instance.nics]
2192
    if not rpc.call_bridges_exist(instance.primary_node, brlist):
2193
      raise errors.OpPrereqError("One or more target bridges %s does not"
2194
                                 " exist on destination node '%s'" %
2195
                                 (brlist, instance.primary_node))
2196

    
2197
    self.instance = instance
2198

    
2199
  def Exec(self, feedback_fn):
2200
    """Failover an instance.
2201

2202
    The failover is done by shutting it down on its present node and
2203
    starting it on the secondary.
2204

2205
    """
2206
    instance = self.instance
2207

    
2208
    source_node = instance.primary_node
2209
    target_node = instance.secondary_nodes[0]
2210

    
2211
    feedback_fn("* checking disk consistency between source and target")
2212
    for dev in instance.disks:
2213
      # for remote_raid1, these are md over drbd
2214
      if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2215
        if not self.op.ignore_consistency:
2216
          raise errors.OpExecError("Disk %s is degraded on target node,"
2217
                                   " aborting failover." % dev.iv_name)
2218

    
2219
    feedback_fn("* checking target node resource availability")
2220
    nodeinfo = rpc.call_node_info([target_node], self.cfg.GetVGName())
2221

    
2222
    if not nodeinfo:
2223
      raise errors.OpExecError("Could not contact target node %s." %
2224
                               target_node)
2225

    
2226
    free_memory = int(nodeinfo[target_node]['memory_free'])
2227
    memory = instance.memory
2228
    if memory > free_memory:
2229
      raise errors.OpExecError("Not enough memory to create instance %s on"
2230
                               " node %s. needed %s MiB, available %s MiB" %
2231
                               (instance.name, target_node, memory,
2232
                                free_memory))
2233

    
2234
    feedback_fn("* shutting down instance on source node")
2235
    logger.Info("Shutting down instance %s on node %s" %
2236
                (instance.name, source_node))
2237

    
2238
    if not rpc.call_instance_shutdown(source_node, instance):
2239
      logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2240
                   " anyway. Please make sure node %s is down"  %
2241
                   (instance.name, source_node, source_node))
2242

    
2243
    feedback_fn("* deactivating the instance's disks on source node")
2244
    if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2245
      raise errors.OpExecError("Can't shut down the instance's disks.")
2246

    
2247
    instance.primary_node = target_node
2248
    # distribute new instance config to the other nodes
2249
    self.cfg.AddInstance(instance)
2250

    
2251
    feedback_fn("* activating the instance's disks on target node")
2252
    logger.Info("Starting instance %s on node %s" %
2253
                (instance.name, target_node))
2254

    
2255
    disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2256
                                             ignore_secondaries=True)
2257
    if not disks_ok:
2258
      _ShutdownInstanceDisks(instance, self.cfg)
2259
      raise errors.OpExecError("Can't activate the instance's disks")
2260

    
2261
    feedback_fn("* starting the instance on the target node")
2262
    if not rpc.call_instance_start(target_node, instance, None):
2263
      _ShutdownInstanceDisks(instance, self.cfg)
2264
      raise errors.OpExecError("Could not start instance %s on node %s." %
2265
                               (instance.name, target_node))
2266

    
2267

    
2268
def _CreateBlockDevOnPrimary(cfg, node, device, info):
2269
  """Create a tree of block devices on the primary node.
2270

2271
  This always creates all devices.
2272

2273
  """
2274
  if device.children:
2275
    for child in device.children:
2276
      if not _CreateBlockDevOnPrimary(cfg, node, child, info):
2277
        return False
2278

    
2279
  cfg.SetDiskID(device, node)
2280
  new_id = rpc.call_blockdev_create(node, device, device.size, True, info)
2281
  if not new_id:
2282
    return False
2283
  if device.physical_id is None:
2284
    device.physical_id = new_id
2285
  return True
2286

    
2287

    
2288
def _CreateBlockDevOnSecondary(cfg, node, device, force, info):
2289
  """Create a tree of block devices on a secondary node.
2290

2291
  If this device type has to be created on secondaries, create it and
2292
  all its children.
2293

2294
  If not, just recurse to children keeping the same 'force' value.
2295

2296
  """
2297
  if device.CreateOnSecondary():
2298
    force = True
2299
  if device.children:
2300
    for child in device.children:
2301
      if not _CreateBlockDevOnSecondary(cfg, node, child, force, info):
2302
        return False
2303

    
2304
  if not force:
2305
    return True
2306
  cfg.SetDiskID(device, node)
2307
  new_id = rpc.call_blockdev_create(node, device, device.size, False, info)
2308
  if not new_id:
2309
    return False
2310
  if device.physical_id is None:
2311
    device.physical_id = new_id
2312
  return True
2313

    
2314

    
2315
def _GenerateUniqueNames(cfg, exts):
2316
  """Generate a suitable LV name.
2317

2318
  This will generate a logical volume name for the given instance.
2319

2320
  """
2321
  results = []
2322
  for val in exts:
2323
    new_id = cfg.GenerateUniqueID()
2324
    results.append("%s%s" % (new_id, val))
2325
  return results
2326

    
2327

    
2328
def _GenerateMDDRBDBranch(cfg, primary, secondary, size, names):
2329
  """Generate a drbd device complete with its children.
2330

2331
  """
2332
  port = cfg.AllocatePort()
2333
  vgname = cfg.GetVGName()
2334
  dev_data = objects.Disk(dev_type="lvm", size=size,
2335
                          logical_id=(vgname, names[0]))
2336
  dev_meta = objects.Disk(dev_type="lvm", size=128,
2337
                          logical_id=(vgname, names[1]))
2338
  drbd_dev = objects.Disk(dev_type="drbd", size=size,
2339
                          logical_id = (primary, secondary, port),
2340
                          children = [dev_data, dev_meta])
2341
  return drbd_dev
2342

    
2343

    
2344
def _GenerateDiskTemplate(cfg, template_name,
2345
                          instance_name, primary_node,
2346
                          secondary_nodes, disk_sz, swap_sz):
2347
  """Generate the entire disk layout for a given template type.
2348

2349
  """
2350
  #TODO: compute space requirements
2351

    
2352
  vgname = cfg.GetVGName()
2353
  if template_name == "diskless":
2354
    disks = []
2355
  elif template_name == "plain":
2356
    if len(secondary_nodes) != 0:
2357
      raise errors.ProgrammerError("Wrong template configuration")
2358

    
2359
    names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
2360
    sda_dev = objects.Disk(dev_type="lvm", size=disk_sz,
2361
                           logical_id=(vgname, names[0]),
2362
                           iv_name = "sda")
2363
    sdb_dev = objects.Disk(dev_type="lvm", size=swap_sz,
2364
                           logical_id=(vgname, names[1]),
2365
                           iv_name = "sdb")
2366
    disks = [sda_dev, sdb_dev]
2367
  elif template_name == "local_raid1":
2368
    if len(secondary_nodes) != 0:
2369
      raise errors.ProgrammerError("Wrong template configuration")
2370

    
2371

    
2372
    names = _GenerateUniqueNames(cfg, [".sda_m1", ".sda_m2",
2373
                                       ".sdb_m1", ".sdb_m2"])
2374
    sda_dev_m1 = objects.Disk(dev_type="lvm", size=disk_sz,
2375
                              logical_id=(vgname, names[0]))
2376
    sda_dev_m2 = objects.Disk(dev_type="lvm", size=disk_sz,
2377
                              logical_id=(vgname, names[1]))
2378
    md_sda_dev = objects.Disk(dev_type="md_raid1", iv_name = "sda",
2379
                              size=disk_sz,
2380
                              children = [sda_dev_m1, sda_dev_m2])
2381
    sdb_dev_m1 = objects.Disk(dev_type="lvm", size=swap_sz,
2382
                              logical_id=(vgname, names[2]))
2383
    sdb_dev_m2 = objects.Disk(dev_type="lvm", size=swap_sz,
2384
                              logical_id=(vgname, names[3]))
2385
    md_sdb_dev = objects.Disk(dev_type="md_raid1", iv_name = "sdb",
2386
                              size=swap_sz,
2387
                              children = [sdb_dev_m1, sdb_dev_m2])
2388
    disks = [md_sda_dev, md_sdb_dev]
2389
  elif template_name == "remote_raid1":
2390
    if len(secondary_nodes) != 1:
2391
      raise errors.ProgrammerError("Wrong template configuration")
2392
    remote_node = secondary_nodes[0]
2393
    names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2394
                                       ".sdb_data", ".sdb_meta"])
2395
    drbd_sda_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2396
                                         disk_sz, names[0:2])
2397
    md_sda_dev = objects.Disk(dev_type="md_raid1", iv_name="sda",
2398
                              children = [drbd_sda_dev], size=disk_sz)
2399
    drbd_sdb_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2400
                                         swap_sz, names[2:4])
2401
    md_sdb_dev = objects.Disk(dev_type="md_raid1", iv_name="sdb",
2402
                              children = [drbd_sdb_dev], size=swap_sz)
2403
    disks = [md_sda_dev, md_sdb_dev]
2404
  else:
2405
    raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
2406
  return disks
2407

    
2408

    
2409
def _GetInstanceInfoText(instance):
2410
  """Compute that text that should be added to the disk's metadata.
2411

2412
  """
2413
  return "originstname+%s" % instance.name
2414

    
2415

    
2416
def _CreateDisks(cfg, instance):
2417
  """Create all disks for an instance.
2418

2419
  This abstracts away some work from AddInstance.
2420

2421
  Args:
2422
    instance: the instance object
2423

2424
  Returns:
2425
    True or False showing the success of the creation process
2426

2427
  """
2428
  info = _GetInstanceInfoText(instance)
2429

    
2430
  for device in instance.disks:
2431
    logger.Info("creating volume %s for instance %s" %
2432
              (device.iv_name, instance.name))
2433
    #HARDCODE
2434
    for secondary_node in instance.secondary_nodes:
2435
      if not _CreateBlockDevOnSecondary(cfg, secondary_node, device, False,
2436
                                        info):
2437
        logger.Error("failed to create volume %s (%s) on secondary node %s!" %
2438
                     (device.iv_name, device, secondary_node))
2439
        return False
2440
    #HARDCODE
2441
    if not _CreateBlockDevOnPrimary(cfg, instance.primary_node, device, info):
2442
      logger.Error("failed to create volume %s on primary!" %
2443
                   device.iv_name)
2444
      return False
2445
  return True
2446

    
2447

    
2448
def _RemoveDisks(instance, cfg):
2449
  """Remove all disks for an instance.
2450

2451
  This abstracts away some work from `AddInstance()` and
2452
  `RemoveInstance()`. Note that in case some of the devices couldn't
2453
  be remove, the removal will continue with the other ones (compare
2454
  with `_CreateDisks()`).
2455

2456
  Args:
2457
    instance: the instance object
2458

2459
  Returns:
2460
    True or False showing the success of the removal proces
2461

2462
  """
2463
  logger.Info("removing block devices for instance %s" % instance.name)
2464

    
2465
  result = True
2466
  for device in instance.disks:
2467
    for node, disk in device.ComputeNodeTree(instance.primary_node):
2468
      cfg.SetDiskID(disk, node)
2469
      if not rpc.call_blockdev_remove(node, disk):
2470
        logger.Error("could not remove block device %s on node %s,"
2471
                     " continuing anyway" %
2472
                     (device.iv_name, node))
2473
        result = False
2474
  return result
2475

    
2476

    
2477
class LUCreateInstance(LogicalUnit):
2478
  """Create an instance.
2479

2480
  """
2481
  HPATH = "instance-add"
2482
  HTYPE = constants.HTYPE_INSTANCE
2483
  _OP_REQP = ["instance_name", "mem_size", "disk_size", "pnode",
2484
              "disk_template", "swap_size", "mode", "start", "vcpus",
2485
              "wait_for_sync"]
2486

    
2487
  def BuildHooksEnv(self):
2488
    """Build hooks env.
2489

2490
    This runs on master, primary and secondary nodes of the instance.
2491

2492
    """
2493
    env = {
2494
      "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
2495
      "INSTANCE_DISK_SIZE": self.op.disk_size,
2496
      "INSTANCE_SWAP_SIZE": self.op.swap_size,
2497
      "INSTANCE_ADD_MODE": self.op.mode,
2498
      }
2499
    if self.op.mode == constants.INSTANCE_IMPORT:
2500
      env["INSTANCE_SRC_NODE"] = self.op.src_node
2501
      env["INSTANCE_SRC_PATH"] = self.op.src_path
2502
      env["INSTANCE_SRC_IMAGE"] = self.src_image
2503

    
2504
    env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
2505
      primary_node=self.op.pnode,
2506
      secondary_nodes=self.secondaries,
2507
      status=self.instance_status,
2508
      os_type=self.op.os_type,
2509
      memory=self.op.mem_size,
2510
      vcpus=self.op.vcpus,
2511
      nics=[(self.inst_ip, self.op.bridge)],
2512
    ))
2513

    
2514
    nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
2515
          self.secondaries)
2516
    return env, nl, nl
2517

    
2518

    
2519
  def CheckPrereq(self):
2520
    """Check prerequisites.
2521

2522
    """
2523
    if self.op.mode not in (constants.INSTANCE_CREATE,
2524
                            constants.INSTANCE_IMPORT):
2525
      raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
2526
                                 self.op.mode)
2527

    
2528
    if self.op.mode == constants.INSTANCE_IMPORT:
2529
      src_node = getattr(self.op, "src_node", None)
2530
      src_path = getattr(self.op, "src_path", None)
2531
      if src_node is None or src_path is None:
2532
        raise errors.OpPrereqError("Importing an instance requires source"
2533
                                   " node and path options")
2534
      src_node_full = self.cfg.ExpandNodeName(src_node)
2535
      if src_node_full is None:
2536
        raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
2537
      self.op.src_node = src_node = src_node_full
2538

    
2539
      if not os.path.isabs(src_path):
2540
        raise errors.OpPrereqError("The source path must be absolute")
2541

    
2542
      export_info = rpc.call_export_info(src_node, src_path)
2543

    
2544
      if not export_info:
2545
        raise errors.OpPrereqError("No export found in dir %s" % src_path)
2546

    
2547
      if not export_info.has_section(constants.INISECT_EXP):
2548
        raise errors.ProgrammerError("Corrupted export config")
2549

    
2550
      ei_version = export_info.get(constants.INISECT_EXP, 'version')
2551
      if (int(ei_version) != constants.EXPORT_VERSION):
2552
        raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
2553
                                   (ei_version, constants.EXPORT_VERSION))
2554

    
2555
      if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
2556
        raise errors.OpPrereqError("Can't import instance with more than"
2557
                                   " one data disk")
2558

    
2559
      # FIXME: are the old os-es, disk sizes, etc. useful?
2560
      self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
2561
      diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
2562
                                                         'disk0_dump'))
2563
      self.src_image = diskimage
2564
    else: # INSTANCE_CREATE
2565
      if getattr(self.op, "os_type", None) is None:
2566
        raise errors.OpPrereqError("No guest OS specified")
2567

    
2568
    # check primary node
2569
    pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
2570
    if pnode is None:
2571
      raise errors.OpPrereqError("Primary node '%s' is unknown" %
2572
                                 self.op.pnode)
2573
    self.op.pnode = pnode.name
2574
    self.pnode = pnode
2575
    self.secondaries = []
2576
    # disk template and mirror node verification
2577
    if self.op.disk_template not in constants.DISK_TEMPLATES:
2578
      raise errors.OpPrereqError("Invalid disk template name")
2579

    
2580
    if self.op.disk_template == constants.DT_REMOTE_RAID1:
2581
      if getattr(self.op, "snode", None) is None:
2582
        raise errors.OpPrereqError("The 'remote_raid1' disk template needs"
2583
                                   " a mirror node")
2584

    
2585
      snode_name = self.cfg.ExpandNodeName(self.op.snode)
2586
      if snode_name is None:
2587
        raise errors.OpPrereqError("Unknown secondary node '%s'" %
2588
                                   self.op.snode)
2589
      elif snode_name == pnode.name:
2590
        raise errors.OpPrereqError("The secondary node cannot be"
2591
                                   " the primary node.")
2592
      self.secondaries.append(snode_name)
2593

    
2594
    # Check lv size requirements
2595
    nodenames = [pnode.name] + self.secondaries
2596
    nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
2597

    
2598
    # Required free disk space as a function of disk and swap space
2599
    req_size_dict = {
2600
      constants.DT_DISKLESS: 0,
2601
      constants.DT_PLAIN: self.op.disk_size + self.op.swap_size,
2602
      constants.DT_LOCAL_RAID1: (self.op.disk_size + self.op.swap_size) * 2,
2603
      # 256 MB are added for drbd metadata, 128MB for each drbd device
2604
      constants.DT_REMOTE_RAID1: self.op.disk_size + self.op.swap_size + 256,
2605
    }
2606

    
2607
    if self.op.disk_template not in req_size_dict:
2608
      raise errors.ProgrammerError("Disk template '%s' size requirement"
2609
                                   " is unknown" %  self.op.disk_template)
2610

    
2611
    req_size = req_size_dict[self.op.disk_template]
2612

    
2613
    for node in nodenames:
2614
      info = nodeinfo.get(node, None)
2615
      if not info:
2616
        raise errors.OpPrereqError("Cannot get current information"
2617
                                   " from node '%s'" % nodeinfo)
2618
      if req_size > info['vg_free']:
2619
        raise errors.OpPrereqError("Not enough disk space on target node %s."
2620
                                   " %d MB available, %d MB required" %
2621
                                   (node, info['vg_free'], req_size))
2622

    
2623
    # os verification
2624
    os_obj = rpc.call_os_get([pnode.name], self.op.os_type)[pnode.name]
2625
    if not isinstance(os_obj, objects.OS):
2626
      raise errors.OpPrereqError("OS '%s' not in supported os list for"
2627
                                 " primary node"  % self.op.os_type)
2628

    
2629
    # instance verification
2630
    hostname1 = utils.LookupHostname(self.op.instance_name)
2631
    if not hostname1:
2632
      raise errors.OpPrereqError("Instance name '%s' not found in dns" %
2633
                                 self.op.instance_name)
2634

    
2635
    self.op.instance_name = instance_name = hostname1['hostname']
2636
    instance_list = self.cfg.GetInstanceList()
2637
    if instance_name in instance_list:
2638
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2639
                                 instance_name)
2640

    
2641
    ip = getattr(self.op, "ip", None)
2642
    if ip is None or ip.lower() == "none":
2643
      inst_ip = None
2644
    elif ip.lower() == "auto":
2645
      inst_ip = hostname1['ip']
2646
    else:
2647
      if not utils.IsValidIP(ip):
2648
        raise errors.OpPrereqError("given IP address '%s' doesn't look"
2649
                                   " like a valid IP" % ip)
2650
      inst_ip = ip
2651
    self.inst_ip = inst_ip
2652

    
2653
    command = ["fping", "-q", hostname1['ip']]
2654
    result = utils.RunCmd(command)
2655
    if not result.failed:
2656
      raise errors.OpPrereqError("IP %s of instance %s already in use" %
2657
                                 (hostname1['ip'], instance_name))
2658

    
2659
    # bridge verification
2660
    bridge = getattr(self.op, "bridge", None)
2661
    if bridge is None:
2662
      self.op.bridge = self.cfg.GetDefBridge()
2663
    else:
2664
      self.op.bridge = bridge
2665

    
2666
    if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
2667
      raise errors.OpPrereqError("target bridge '%s' does not exist on"
2668
                                 " destination node '%s'" %
2669
                                 (self.op.bridge, pnode.name))
2670

    
2671
    if self.op.start:
2672
      self.instance_status = 'up'
2673
    else:
2674
      self.instance_status = 'down'
2675

    
2676
  def Exec(self, feedback_fn):
2677
    """Create and add the instance to the cluster.
2678

2679
    """
2680
    instance = self.op.instance_name
2681
    pnode_name = self.pnode.name
2682

    
2683
    nic = objects.NIC(bridge=self.op.bridge, mac=self.cfg.GenerateMAC())
2684
    if self.inst_ip is not None:
2685
      nic.ip = self.inst_ip
2686

    
2687
    disks = _GenerateDiskTemplate(self.cfg,
2688
                                  self.op.disk_template,
2689
                                  instance, pnode_name,
2690
                                  self.secondaries, self.op.disk_size,
2691
                                  self.op.swap_size)
2692

    
2693
    iobj = objects.Instance(name=instance, os=self.op.os_type,
2694
                            primary_node=pnode_name,
2695
                            memory=self.op.mem_size,
2696
                            vcpus=self.op.vcpus,
2697
                            nics=[nic], disks=disks,
2698
                            disk_template=self.op.disk_template,
2699
                            status=self.instance_status,
2700
                            )
2701

    
2702
    feedback_fn("* creating instance disks...")
2703
    if not _CreateDisks(self.cfg, iobj):
2704
      _RemoveDisks(iobj, self.cfg)
2705
      raise errors.OpExecError("Device creation failed, reverting...")
2706

    
2707
    feedback_fn("adding instance %s to cluster config" % instance)
2708

    
2709
    self.cfg.AddInstance(iobj)
2710

    
2711
    if self.op.wait_for_sync:
2712
      disk_abort = not _WaitForSync(self.cfg, iobj)
2713
    elif iobj.disk_template == "remote_raid1":
2714
      # make sure the disks are not degraded (still sync-ing is ok)
2715
      time.sleep(15)
2716
      feedback_fn("* checking mirrors status")
2717
      disk_abort = not _WaitForSync(self.cfg, iobj, oneshot=True)
2718
    else:
2719
      disk_abort = False
2720

    
2721
    if disk_abort:
2722
      _RemoveDisks(iobj, self.cfg)
2723
      self.cfg.RemoveInstance(iobj.name)
2724
      raise errors.OpExecError("There are some degraded disks for"
2725
                               " this instance")
2726

    
2727
    feedback_fn("creating os for instance %s on node %s" %
2728
                (instance, pnode_name))
2729

    
2730
    if iobj.disk_template != constants.DT_DISKLESS:
2731
      if self.op.mode == constants.INSTANCE_CREATE:
2732
        feedback_fn("* running the instance OS create scripts...")
2733
        if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
2734
          raise errors.OpExecError("could not add os for instance %s"
2735
                                   " on node %s" %
2736
                                   (instance, pnode_name))
2737

    
2738
      elif self.op.mode == constants.INSTANCE_IMPORT:
2739
        feedback_fn("* running the instance OS import scripts...")
2740
        src_node = self.op.src_node
2741
        src_image = self.src_image
2742
        if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
2743
                                                src_node, src_image):
2744
          raise errors.OpExecError("Could not import os for instance"
2745
                                   " %s on node %s" %
2746
                                   (instance, pnode_name))
2747
      else:
2748
        # also checked in the prereq part
2749
        raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
2750
                                     % self.op.mode)
2751

    
2752
    if self.op.start:
2753
      logger.Info("starting instance %s on node %s" % (instance, pnode_name))
2754
      feedback_fn("* starting instance...")
2755
      if not rpc.call_instance_start(pnode_name, iobj, None):
2756
        raise errors.OpExecError("Could not start instance")
2757

    
2758

    
2759
class LUConnectConsole(NoHooksLU):
2760
  """Connect to an instance's console.
2761

2762
  This is somewhat special in that it returns the command line that
2763
  you need to run on the master node in order to connect to the
2764
  console.
2765

2766
  """
2767
  _OP_REQP = ["instance_name"]
2768

    
2769
  def CheckPrereq(self):
2770
    """Check prerequisites.
2771

2772
    This checks that the instance is in the cluster.
2773

2774
    """
2775
    instance = self.cfg.GetInstanceInfo(
2776
      self.cfg.ExpandInstanceName(self.op.instance_name))
2777
    if instance is None:
2778
      raise errors.OpPrereqError("Instance '%s' not known" %
2779
                                 self.op.instance_name)
2780
    self.instance = instance
2781

    
2782
  def Exec(self, feedback_fn):
2783
    """Connect to the console of an instance
2784

2785
    """
2786
    instance = self.instance
2787
    node = instance.primary_node
2788

    
2789
    node_insts = rpc.call_instance_list([node])[node]
2790
    if node_insts is False:
2791
      raise errors.OpExecError("Can't connect to node %s." % node)
2792

    
2793
    if instance.name not in node_insts:
2794
      raise errors.OpExecError("Instance %s is not running." % instance.name)
2795

    
2796
    logger.Debug("connecting to console of %s on %s" % (instance.name, node))
2797

    
2798
    hyper = hypervisor.GetHypervisor()
2799
    console_cmd = hyper.GetShellCommandForConsole(instance.name)
2800
    # build ssh cmdline
2801
    argv = ["ssh", "-q", "-t"]
2802
    argv.extend(ssh.KNOWN_HOSTS_OPTS)
2803
    argv.extend(ssh.BATCH_MODE_OPTS)
2804
    argv.append(node)
2805
    argv.append(console_cmd)
2806
    return "ssh", argv
2807

    
2808

    
2809
class LUAddMDDRBDComponent(LogicalUnit):
2810
  """Adda new mirror member to an instance's disk.
2811

2812
  """
2813
  HPATH = "mirror-add"
2814
  HTYPE = constants.HTYPE_INSTANCE
2815
  _OP_REQP = ["instance_name", "remote_node", "disk_name"]
2816

    
2817
  def BuildHooksEnv(self):
2818
    """Build hooks env.
2819

2820
    This runs on the master, the primary and all the secondaries.
2821

2822
    """
2823
    env = {
2824
      "NEW_SECONDARY": self.op.remote_node,
2825
      "DISK_NAME": self.op.disk_name,
2826
      }
2827
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2828
    nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
2829
          self.op.remote_node,] + list(self.instance.secondary_nodes)
2830
    return env, nl, nl
2831

    
2832
  def CheckPrereq(self):
2833
    """Check prerequisites.
2834

2835
    This checks that the instance is in the cluster.
2836

2837
    """
2838
    instance = self.cfg.GetInstanceInfo(
2839
      self.cfg.ExpandInstanceName(self.op.instance_name))
2840
    if instance is None:
2841
      raise errors.OpPrereqError("Instance '%s' not known" %
2842
                                 self.op.instance_name)
2843
    self.instance = instance
2844

    
2845
    remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
2846
    if remote_node is None:
2847
      raise errors.OpPrereqError("Node '%s' not known" % self.op.remote_node)
2848
    self.remote_node = remote_node
2849

    
2850
    if remote_node == instance.primary_node:
2851
      raise errors.OpPrereqError("The specified node is the primary node of"
2852
                                 " the instance.")
2853

    
2854
    if instance.disk_template != constants.DT_REMOTE_RAID1:
2855
      raise errors.OpPrereqError("Instance's disk layout is not"
2856
                                 " remote_raid1.")
2857
    for disk in instance.disks:
2858
      if disk.iv_name == self.op.disk_name:
2859
        break
2860
    else:
2861
      raise errors.OpPrereqError("Can't find this device ('%s') in the"
2862
                                 " instance." % self.op.disk_name)
2863
    if len(disk.children) > 1:
2864
      raise errors.OpPrereqError("The device already has two slave"
2865
                                 " devices.\n"
2866
                                 "This would create a 3-disk raid1"
2867
                                 " which we don't allow.")
2868
    self.disk = disk
2869

    
2870
  def Exec(self, feedback_fn):
2871
    """Add the mirror component
2872

2873
    """
2874
    disk = self.disk
2875
    instance = self.instance
2876

    
2877
    remote_node = self.remote_node
2878
    lv_names = [".%s_%s" % (disk.iv_name, suf) for suf in ["data", "meta"]]
2879
    names = _GenerateUniqueNames(self.cfg, lv_names)
2880
    new_drbd = _GenerateMDDRBDBranch(self.cfg, instance.primary_node,
2881
                                     remote_node, disk.size, names)
2882

    
2883
    logger.Info("adding new mirror component on secondary")
2884
    #HARDCODE
2885
    if not _CreateBlockDevOnSecondary(self.cfg, remote_node, new_drbd, False,
2886
                                      _GetInstanceInfoText(instance)):
2887
      raise errors.OpExecError("Failed to create new component on secondary"
2888
                               " node %s" % remote_node)
2889

    
2890
    logger.Info("adding new mirror component on primary")
2891
    #HARDCODE
2892
    if not _CreateBlockDevOnPrimary(self.cfg, instance.primary_node, new_drbd,
2893
                                    _GetInstanceInfoText(instance)):
2894
      # remove secondary dev
2895
      self.cfg.SetDiskID(new_drbd, remote_node)
2896
      rpc.call_blockdev_remove(remote_node, new_drbd)
2897
      raise errors.OpExecError("Failed to create volume on primary")
2898

    
2899
    # the device exists now
2900
    # call the primary node to add the mirror to md
2901
    logger.Info("adding new mirror component to md")
2902
    if not rpc.call_blockdev_addchild(instance.primary_node,
2903
                                           disk, new_drbd):
2904
      logger.Error("Can't add mirror compoment to md!")
2905
      self.cfg.SetDiskID(new_drbd, remote_node)
2906
      if not rpc.call_blockdev_remove(remote_node, new_drbd):
2907
        logger.Error("Can't rollback on secondary")
2908
      self.cfg.SetDiskID(new_drbd, instance.primary_node)
2909
      if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
2910
        logger.Error("Can't rollback on primary")
2911
      raise errors.OpExecError("Can't add mirror component to md array")
2912

    
2913
    disk.children.append(new_drbd)
2914

    
2915
    self.cfg.AddInstance(instance)
2916

    
2917
    _WaitForSync(self.cfg, instance)
2918

    
2919
    return 0
2920

    
2921

    
2922
class LURemoveMDDRBDComponent(LogicalUnit):
2923
  """Remove a component from a remote_raid1 disk.
2924

2925
  """
2926
  HPATH = "mirror-remove"
2927
  HTYPE = constants.HTYPE_INSTANCE
2928
  _OP_REQP = ["instance_name", "disk_name", "disk_id"]
2929

    
2930
  def BuildHooksEnv(self):
2931
    """Build hooks env.
2932

2933
    This runs on the master, the primary and all the secondaries.
2934

2935
    """
2936
    env = {
2937
      "DISK_NAME": self.op.disk_name,
2938
      "DISK_ID": self.op.disk_id,
2939
      "OLD_SECONDARY": self.old_secondary,
2940
      }
2941
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2942
    nl = [self.sstore.GetMasterNode(),
2943
          self.instance.primary_node] + list(self.instance.secondary_nodes)
2944
    return env, nl, nl
2945

    
2946
  def CheckPrereq(self):
2947
    """Check prerequisites.
2948

2949
    This checks that the instance is in the cluster.
2950

2951
    """
2952
    instance = self.cfg.GetInstanceInfo(
2953
      self.cfg.ExpandInstanceName(self.op.instance_name))
2954
    if instance is None:
2955
      raise errors.OpPrereqError("Instance '%s' not known" %
2956
                                 self.op.instance_name)
2957
    self.instance = instance
2958

    
2959
    if instance.disk_template != constants.DT_REMOTE_RAID1:
2960
      raise errors.OpPrereqError("Instance's disk layout is not"
2961
                                 " remote_raid1.")
2962
    for disk in instance.disks:
2963
      if disk.iv_name == self.op.disk_name:
2964
        break
2965
    else:
2966
      raise errors.OpPrereqError("Can't find this device ('%s') in the"
2967
                                 " instance." % self.op.disk_name)
2968
    for child in disk.children:
2969
      if child.dev_type == "drbd" and child.logical_id[2] == self.op.disk_id:
2970
        break
2971
    else:
2972
      raise errors.OpPrereqError("Can't find the device with this port.")
2973

    
2974
    if len(disk.children) < 2:
2975
      raise errors.OpPrereqError("Cannot remove the last component from"
2976
                                 " a mirror.")
2977
    self.disk = disk
2978
    self.child = child
2979
    if self.child.logical_id[0] == instance.primary_node:
2980
      oid = 1
2981
    else:
2982
      oid = 0
2983
    self.old_secondary = self.child.logical_id[oid]
2984

    
2985
  def Exec(self, feedback_fn):
2986
    """Remove the mirror component
2987

2988
    """
2989
    instance = self.instance
2990
    disk = self.disk
2991
    child = self.child
2992
    logger.Info("remove mirror component")
2993
    self.cfg.SetDiskID(disk, instance.primary_node)
2994
    if not rpc.call_blockdev_removechild(instance.primary_node,
2995
                                              disk, child):
2996
      raise errors.OpExecError("Can't remove child from mirror.")
2997

    
2998
    for node in child.logical_id[:2]:
2999
      self.cfg.SetDiskID(child, node)
3000
      if not rpc.call_blockdev_remove(node, child):
3001
        logger.Error("Warning: failed to remove device from node %s,"
3002
                     " continuing operation." % node)
3003

    
3004
    disk.children.remove(child)
3005
    self.cfg.AddInstance(instance)
3006

    
3007

    
3008
class LUReplaceDisks(LogicalUnit):
3009
  """Replace the disks of an instance.
3010

3011
  """
3012
  HPATH = "mirrors-replace"
3013
  HTYPE = constants.HTYPE_INSTANCE
3014
  _OP_REQP = ["instance_name"]
3015

    
3016
  def BuildHooksEnv(self):
3017
    """Build hooks env.
3018

3019
    This runs on the master, the primary and all the secondaries.
3020

3021
    """
3022
    env = {
3023
      "NEW_SECONDARY": self.op.remote_node,
3024
      "OLD_SECONDARY": self.instance.secondary_nodes[0],
3025
      }
3026
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3027
    nl = [self.sstore.GetMasterNode(),
3028
          self.instance.primary_node] + list(self.instance.secondary_nodes)
3029
    return env, nl, nl
3030

    
3031
  def CheckPrereq(self):
3032
    """Check prerequisites.
3033

3034
    This checks that the instance is in the cluster.
3035

3036
    """
3037
    instance = self.cfg.GetInstanceInfo(
3038
      self.cfg.ExpandInstanceName(self.op.instance_name))
3039
    if instance is None:
3040
      raise errors.OpPrereqError("Instance '%s' not known" %
3041
                                 self.op.instance_name)
3042
    self.instance = instance
3043

    
3044
    if instance.disk_template != constants.DT_REMOTE_RAID1:
3045
      raise errors.OpPrereqError("Instance's disk layout is not"
3046
                                 " remote_raid1.")
3047

    
3048
    if len(instance.secondary_nodes) != 1:
3049
      raise errors.OpPrereqError("The instance has a strange layout,"
3050
                                 " expected one secondary but found %d" %
3051
                                 len(instance.secondary_nodes))
3052

    
3053
    remote_node = getattr(self.op, "remote_node", None)
3054
    if remote_node is None:
3055
      remote_node = instance.secondary_nodes[0]
3056
    else:
3057
      remote_node = self.cfg.ExpandNodeName(remote_node)
3058
      if remote_node is None:
3059
        raise errors.OpPrereqError("Node '%s' not known" %
3060
                                   self.op.remote_node)
3061
    if remote_node == instance.primary_node:
3062
      raise errors.OpPrereqError("The specified node is the primary node of"
3063
                                 " the instance.")
3064
    self.op.remote_node = remote_node
3065

    
3066
  def Exec(self, feedback_fn):
3067
    """Replace the disks of an instance.
3068

3069
    """
3070
    instance = self.instance
3071
    iv_names = {}
3072
    # start of work
3073
    remote_node = self.op.remote_node
3074
    cfg = self.cfg
3075
    vgname = cfg.GetVGName()
3076
    for dev in instance.disks:
3077
      size = dev.size
3078
      lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3079
      names = _GenerateUniqueNames(cfg, lv_names)
3080
      new_drbd = _GenerateMDDRBDBranch(cfg, instance.primary_node,
3081
                                       remote_node, size, names)
3082
      iv_names[dev.iv_name] = (dev, dev.children[0], new_drbd)
3083
      logger.Info("adding new mirror component on secondary for %s" %
3084
                  dev.iv_name)
3085
      #HARDCODE
3086
      if not _CreateBlockDevOnSecondary(cfg, remote_node, new_drbd, False,
3087
                                        _GetInstanceInfoText(instance)):
3088
        raise errors.OpExecError("Failed to create new component on"
3089
                                 " secondary node %s\n"
3090
                                 "Full abort, cleanup manually!" %
3091
                                 remote_node)
3092

    
3093
      logger.Info("adding new mirror component on primary")
3094
      #HARDCODE
3095
      if not _CreateBlockDevOnPrimary(cfg, instance.primary_node, new_drbd,
3096
                                      _GetInstanceInfoText(instance)):
3097
        # remove secondary dev
3098
        cfg.SetDiskID(new_drbd, remote_node)
3099
        rpc.call_blockdev_remove(remote_node, new_drbd)
3100
        raise errors.OpExecError("Failed to create volume on primary!\n"
3101
                                 "Full abort, cleanup manually!!")
3102

    
3103
      # the device exists now
3104
      # call the primary node to add the mirror to md
3105
      logger.Info("adding new mirror component to md")
3106
      if not rpc.call_blockdev_addchild(instance.primary_node, dev,
3107
                                        new_drbd):
3108
        logger.Error("Can't add mirror compoment to md!")
3109
        cfg.SetDiskID(new_drbd, remote_node)
3110
        if not rpc.call_blockdev_remove(remote_node, new_drbd):
3111
          logger.Error("Can't rollback on secondary")
3112
        cfg.SetDiskID(new_drbd, instance.primary_node)
3113
        if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3114
          logger.Error("Can't rollback on primary")
3115
        raise errors.OpExecError("Full abort, cleanup manually!!")
3116

    
3117
      dev.children.append(new_drbd)
3118
      cfg.AddInstance(instance)
3119

    
3120
    # this can fail as the old devices are degraded and _WaitForSync
3121
    # does a combined result over all disks, so we don't check its
3122
    # return value
3123
    _WaitForSync(cfg, instance, unlock=True)
3124

    
3125
    # so check manually all the devices
3126
    for name in iv_names:
3127
      dev, child, new_drbd = iv_names[name]
3128
      cfg.SetDiskID(dev, instance.primary_node)
3129
      is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3130
      if is_degr:
3131
        raise errors.OpExecError("MD device %s is degraded!" % name)
3132
      cfg.SetDiskID(new_drbd, instance.primary_node)
3133
      is_degr = rpc.call_blockdev_find(instance.primary_node, new_drbd)[5]
3134
      if is_degr:
3135
        raise errors.OpExecError("New drbd device %s is degraded!" % name)
3136

    
3137
    for name in iv_names:
3138
      dev, child, new_drbd = iv_names[name]
3139
      logger.Info("remove mirror %s component" % name)
3140
      cfg.SetDiskID(dev, instance.primary_node)
3141
      if not rpc.call_blockdev_removechild(instance.primary_node,
3142
                                                dev, child):
3143
        logger.Error("Can't remove child from mirror, aborting"
3144
                     " *this device cleanup*.\nYou need to cleanup manually!!")
3145
        continue
3146

    
3147
      for node in child.logical_id[:2]:
3148
        logger.Info("remove child device on %s" % node)
3149
        cfg.SetDiskID(child, node)
3150
        if not rpc.call_blockdev_remove(node, child):
3151
          logger.Error("Warning: failed to remove device from node %s,"
3152
                       " continuing operation." % node)
3153

    
3154
      dev.children.remove(child)
3155

    
3156
      cfg.AddInstance(instance)
3157

    
3158

    
3159
class LUQueryInstanceData(NoHooksLU):
3160
  """Query runtime instance data.
3161

3162
  """
3163
  _OP_REQP = ["instances"]
3164

    
3165
  def CheckPrereq(self):
3166
    """Check prerequisites.
3167

3168
    This only checks the optional instance list against the existing names.
3169

3170
    """
3171
    if not isinstance(self.op.instances, list):
3172
      raise errors.OpPrereqError("Invalid argument type 'instances'")
3173
    if self.op.instances:
3174
      self.wanted_instances = []
3175
      names = self.op.instances
3176
      for name in names:
3177
        instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
3178
        if instance is None:
3179
          raise errors.OpPrereqError("No such instance name '%s'" % name)
3180
      self.wanted_instances.append(instance)
3181
    else:
3182
      self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
3183
                               in self.cfg.GetInstanceList()]
3184
    return
3185

    
3186

    
3187
  def _ComputeDiskStatus(self, instance, snode, dev):
3188
    """Compute block device status.
3189

3190
    """
3191
    self.cfg.SetDiskID(dev, instance.primary_node)
3192
    dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
3193
    if dev.dev_type == "drbd":
3194
      # we change the snode then (otherwise we use the one passed in)
3195
      if dev.logical_id[0] == instance.primary_node:
3196
        snode = dev.logical_id[1]
3197
      else:
3198
        snode = dev.logical_id[0]
3199

    
3200
    if snode:
3201
      self.cfg.SetDiskID(dev, snode)
3202
      dev_sstatus = rpc.call_blockdev_find(snode, dev)
3203
    else:
3204
      dev_sstatus = None
3205

    
3206
    if dev.children:
3207
      dev_children = [self._ComputeDiskStatus(instance, snode, child)
3208
                      for child in dev.children]
3209
    else:
3210
      dev_children = []
3211

    
3212
    data = {
3213
      "iv_name": dev.iv_name,
3214
      "dev_type": dev.dev_type,
3215
      "logical_id": dev.logical_id,
3216
      "physical_id": dev.physical_id,
3217
      "pstatus": dev_pstatus,
3218
      "sstatus": dev_sstatus,
3219
      "children": dev_children,
3220
      }
3221

    
3222
    return data
3223

    
3224
  def Exec(self, feedback_fn):
3225
    """Gather and return data"""
3226
    result = {}
3227
    for instance in self.wanted_instances:
3228
      remote_info = rpc.call_instance_info(instance.primary_node,
3229
                                                instance.name)
3230
      if remote_info and "state" in remote_info:
3231
        remote_state = "up"
3232
      else:
3233
        remote_state = "down"
3234
      if instance.status == "down":
3235
        config_state = "down"
3236
      else:
3237
        config_state = "up"
3238

    
3239
      disks = [self._ComputeDiskStatus(instance, None, device)
3240
               for device in instance.disks]
3241

    
3242
      idict = {
3243
        "name": instance.name,
3244
        "config_state": config_state,
3245
        "run_state": remote_state,
3246
        "pnode": instance.primary_node,
3247
        "snodes": instance.secondary_nodes,
3248
        "os": instance.os,
3249
        "memory": instance.memory,
3250
        "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
3251
        "disks": disks,
3252
        }
3253

    
3254
      result[instance.name] = idict
3255

    
3256
    return result
3257

    
3258

    
3259
class LUQueryNodeData(NoHooksLU):
3260
  """Logical unit for querying node data.
3261

3262
  """
3263
  _OP_REQP = ["nodes"]
3264

    
3265
  def CheckPrereq(self):
3266
    """Check prerequisites.
3267

3268
    This only checks the optional node list against the existing names.
3269

3270
    """
3271
    self.wanted_nodes = _GetWantedNodes(self, self.op.nodes)
3272

    
3273
  def Exec(self, feedback_fn):
3274
    """Compute and return the list of nodes.
3275

3276
    """
3277
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
3278
             in self.cfg.GetInstanceList()]
3279
    result = []
3280
    for node in self.wanted_nodes:
3281
      result.append((node.name, node.primary_ip, node.secondary_ip,
3282
                     [inst.name for inst in ilist
3283
                      if inst.primary_node == node.name],
3284
                     [inst.name for inst in ilist
3285
                      if node.name in inst.secondary_nodes],
3286
                     ))
3287
    return result
3288

    
3289

    
3290
class LUSetInstanceParms(LogicalUnit):
3291
  """Modifies an instances's parameters.
3292

3293
  """
3294
  HPATH = "instance-modify"
3295
  HTYPE = constants.HTYPE_INSTANCE
3296
  _OP_REQP = ["instance_name"]
3297

    
3298
  def BuildHooksEnv(self):
3299
    """Build hooks env.
3300

3301
    This runs on the master, primary and secondaries.
3302

3303
    """
3304
    args = dict()
3305
    if self.mem:
3306
      args['memory'] = self.mem
3307
    if self.vcpus:
3308
      args['vcpus'] = self.vcpus
3309
    if self.do_ip or self.do_bridge:
3310
      if self.do_ip:
3311
        ip = self.ip
3312
      else:
3313
        ip = self.instance.nics[0].ip
3314
      if self.bridge:
3315
        bridge = self.bridge
3316
      else:
3317
        bridge = self.instance.nics[0].bridge
3318
      args['nics'] = [(ip, bridge)]
3319
    env = _BuildInstanceHookEnvByObject(self.instance, override=args)
3320
    nl = [self.sstore.GetMasterNode(),
3321
          self.instance.primary_node] + list(self.instance.secondary_nodes)
3322
    return env, nl, nl
3323

    
3324
  def CheckPrereq(self):
3325
    """Check prerequisites.
3326

3327
    This only checks the instance list against the existing names.
3328

3329
    """
3330
    self.mem = getattr(self.op, "mem", None)
3331
    self.vcpus = getattr(self.op, "vcpus", None)
3332
    self.ip = getattr(self.op, "ip", None)
3333
    self.bridge = getattr(self.op, "bridge", None)
3334
    if [self.mem, self.vcpus, self.ip, self.bridge].count(None) == 4:
3335
      raise errors.OpPrereqError("No changes submitted")
3336
    if self.mem is not None:
3337
      try:
3338
        self.mem = int(self.mem)
3339
      except ValueError, err:
3340
        raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
3341
    if self.vcpus is not None:
3342
      try:
3343
        self.vcpus = int(self.vcpus)
3344
      except ValueError, err:
3345
        raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
3346
    if self.ip is not None:
3347
      self.do_ip = True
3348
      if self.ip.lower() == "none":
3349
        self.ip = None
3350
      else:
3351
        if not utils.IsValidIP(self.ip):
3352
          raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
3353
    else:
3354
      self.do_ip = False
3355
    self.do_bridge = (self.bridge is not None)
3356

    
3357
    instance = self.cfg.GetInstanceInfo(
3358
      self.cfg.ExpandInstanceName(self.op.instance_name))
3359
    if instance is None:
3360
      raise errors.OpPrereqError("No such instance name '%s'" %
3361
                                 self.op.instance_name)
3362
    self.op.instance_name = instance.name
3363
    self.instance = instance
3364
    return
3365

    
3366
  def Exec(self, feedback_fn):
3367
    """Modifies an instance.
3368

3369
    All parameters take effect only at the next restart of the instance.
3370
    """
3371
    result = []
3372
    instance = self.instance
3373
    if self.mem:
3374
      instance.memory = self.mem
3375
      result.append(("mem", self.mem))
3376
    if self.vcpus:
3377
      instance.vcpus = self.vcpus
3378
      result.append(("vcpus",  self.vcpus))
3379
    if self.do_ip:
3380
      instance.nics[0].ip = self.ip
3381
      result.append(("ip", self.ip))
3382
    if self.bridge:
3383
      instance.nics[0].bridge = self.bridge
3384
      result.append(("bridge", self.bridge))
3385

    
3386
    self.cfg.AddInstance(instance)
3387

    
3388
    return result
3389

    
3390

    
3391
class LUQueryExports(NoHooksLU):
3392
  """Query the exports list
3393

3394
  """
3395
  _OP_REQP = []
3396

    
3397
  def CheckPrereq(self):
3398
    """Check that the nodelist contains only existing nodes.
3399

3400
    """
3401
    self.nodes = _GetWantedNodes(self, getattr(self.op, "nodes", None))
3402

    
3403
  def Exec(self, feedback_fn):
3404
    """Compute the list of all the exported system images.
3405

3406
    Returns:
3407
      a dictionary with the structure node->(export-list)
3408
      where export-list is a list of the instances exported on
3409
      that node.
3410

3411
    """
3412
    return rpc.call_export_list([node.name for node in self.nodes])
3413

    
3414

    
3415
class LUExportInstance(LogicalUnit):
3416
  """Export an instance to an image in the cluster.
3417

3418
  """
3419
  HPATH = "instance-export"
3420
  HTYPE = constants.HTYPE_INSTANCE
3421
  _OP_REQP = ["instance_name", "target_node", "shutdown"]
3422

    
3423
  def BuildHooksEnv(self):
3424
    """Build hooks env.
3425

3426
    This will run on the master, primary node and target node.
3427

3428
    """
3429
    env = {
3430
      "EXPORT_NODE": self.op.target_node,
3431
      "EXPORT_DO_SHUTDOWN": self.op.shutdown,
3432
      }
3433
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3434
    nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
3435
          self.op.target_node]
3436
    return env, nl, nl
3437

    
3438
  def CheckPrereq(self):
3439
    """Check prerequisites.
3440

3441
    This checks that the instance name is a valid one.
3442

3443
    """
3444
    instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
3445
    self.instance = self.cfg.GetInstanceInfo(instance_name)
3446
    if self.instance is None:
3447
      raise errors.OpPrereqError("Instance '%s' not found" %
3448
                                 self.op.instance_name)
3449

    
3450
    # node verification
3451
    dst_node_short = self.cfg.ExpandNodeName(self.op.target_node)
3452
    self.dst_node = self.cfg.GetNodeInfo(dst_node_short)
3453

    
3454
    if self.dst_node is None:
3455
      raise errors.OpPrereqError("Destination node '%s' is unknown." %
3456
                                 self.op.target_node)
3457
    self.op.target_node = self.dst_node.name
3458

    
3459
  def Exec(self, feedback_fn):
3460
    """Export an instance to an image in the cluster.
3461

3462
    """
3463
    instance = self.instance
3464
    dst_node = self.dst_node
3465
    src_node = instance.primary_node
3466
    # shutdown the instance, unless requested not to do so
3467
    if self.op.shutdown:
3468
      op = opcodes.OpShutdownInstance(instance_name=instance.name)
3469
      self.processor.ChainOpCode(op, feedback_fn)
3470

    
3471
    vgname = self.cfg.GetVGName()
3472

    
3473
    snap_disks = []
3474

    
3475
    try:
3476
      for disk in instance.disks:
3477
        if disk.iv_name == "sda":
3478
          # new_dev_name will be a snapshot of an lvm leaf of the one we passed
3479
          new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
3480

    
3481
          if not new_dev_name:
3482
            logger.Error("could not snapshot block device %s on node %s" %
3483
                         (disk.logical_id[1], src_node))
3484
          else:
3485
            new_dev = objects.Disk(dev_type="lvm", size=disk.size,
3486
                                      logical_id=(vgname, new_dev_name),
3487
                                      physical_id=(vgname, new_dev_name),
3488
                                      iv_name=disk.iv_name)
3489
            snap_disks.append(new_dev)
3490

    
3491
    finally:
3492
      if self.op.shutdown:
3493
        op = opcodes.OpStartupInstance(instance_name=instance.name,
3494
                                       force=False)
3495
        self.processor.ChainOpCode(op, feedback_fn)
3496

    
3497
    # TODO: check for size
3498

    
3499
    for dev in snap_disks:
3500
      if not rpc.call_snapshot_export(src_node, dev, dst_node.name,
3501
                                           instance):
3502
        logger.Error("could not export block device %s from node"
3503
                     " %s to node %s" %
3504
                     (dev.logical_id[1], src_node, dst_node.name))
3505
      if not rpc.call_blockdev_remove(src_node, dev):
3506
        logger.Error("could not remove snapshot block device %s from"
3507
                     " node %s" % (dev.logical_id[1], src_node))
3508

    
3509
    if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
3510
      logger.Error("could not finalize export for instance %s on node %s" %
3511
                   (instance.name, dst_node.name))
3512

    
3513
    nodelist = self.cfg.GetNodeList()
3514
    nodelist.remove(dst_node.name)
3515

    
3516
    # on one-node clusters nodelist will be empty after the removal
3517
    # if we proceed the backup would be removed because OpQueryExports
3518
    # substitutes an empty list with the full cluster node list.
3519
    if nodelist:
3520
      op = opcodes.OpQueryExports(nodes=nodelist)
3521
      exportlist = self.processor.ChainOpCode(op, feedback_fn)
3522
      for node in exportlist:
3523
        if instance.name in exportlist[node]:
3524
          if not rpc.call_export_remove(node, instance.name):
3525
            logger.Error("could not remove older export for instance %s"
3526
                         " on node %s" % (instance.name, node))
3527

    
3528

    
3529
class TagsLU(NoHooksLU):
3530
  """Generic tags LU.
3531

3532
  This is an abstract class which is the parent of all the other tags LUs.
3533

3534
  """
3535
  def CheckPrereq(self):
3536
    """Check prerequisites.
3537

3538
    """
3539
    if self.op.kind == constants.TAG_CLUSTER:
3540
      self.target = self.cfg.GetClusterInfo()
3541
    elif self.op.kind == constants.TAG_NODE:
3542
      name = self.cfg.ExpandNodeName(self.op.name)
3543
      if name is None:
3544
        raise errors.OpPrereqError("Invalid node name (%s)" %
3545
                                   (self.op.name,))
3546
      self.op.name = name
3547
      self.target = self.cfg.GetNodeInfo(name)
3548
    elif self.op.kind == constants.TAG_INSTANCE:
3549
      name = self.cfg.ExpandInstanceName(name)
3550
      if name is None:
3551
        raise errors.OpPrereqError("Invalid instance name (%s)" %
3552
                                   (self.op.name,))
3553
      self.op.name = name
3554
      self.target = self.cfg.GetInstanceInfo(name)
3555
    else:
3556
      raise errors.OpPrereqError("Wrong tag type requested (%s)" %
3557
                                 str(self.op.kind))
3558

    
3559

    
3560
class LUGetTags(TagsLU):
3561
  """Returns the tags of a given object.
3562

3563
  """
3564
  _OP_REQP = ["kind", "name"]
3565

    
3566
  def Exec(self, feedback_fn):
3567
    """Returns the tag list.
3568

3569
    """
3570
    return self.target.GetTags()
3571

    
3572

    
3573
class LUAddTag(TagsLU):
3574
  """Sets a tag on a given object.
3575

3576
  """
3577
  _OP_REQP = ["kind", "name", "tag"]
3578

    
3579
  def CheckPrereq(self):
3580
    """Check prerequisites.
3581

3582
    This checks the type and length of the tag name and value.
3583

3584
    """
3585
    TagsLU.CheckPrereq(self)
3586
    objects.TaggableObject.ValidateTag(self.op.tag)
3587

    
3588
  def Exec(self, feedback_fn):
3589
    """Sets the tag.
3590

3591
    """
3592
    try:
3593
      self.target.AddTag(self.op.tag)
3594
    except errors.TagError, err:
3595
      raise errors.OpExecError("Error while setting tag: %s" % str(err))
3596
    try:
3597
      self.cfg.Update(self.target)
3598
    except errors.ConfigurationError:
3599
      raise errors.OpRetryError("There has been a modification to the"
3600
                                " config file and the operation has been"
3601
                                " aborted. Please retry.")
3602

    
3603

    
3604
class LUDelTag(TagsLU):
3605
  """Delete a tag from a given object.
3606

3607
  """
3608
  _OP_REQP = ["kind", "name", "tag"]
3609

    
3610
  def CheckPrereq(self):
3611
    """Check prerequisites.
3612

3613
    This checks that we have the given tag.
3614

3615
    """
3616
    TagsLU.CheckPrereq(self)
3617
    objects.TaggableObject.ValidateTag(self.op.tag)
3618
    if self.op.tag not in self.target.GetTags():
3619
      raise errors.OpPrereqError("Tag not found")
3620

    
3621
  def Exec(self, feedback_fn):
3622
    """Remove the tag from the object.
3623

3624
    """
3625
    self.target.RemoveTag(self.op.tag)
3626
    try:
3627
      self.cfg.Update(self.target)
3628
    except errors.ConfigurationError:
3629
      raise errors.OpRetryError("There has been a modification to the"
3630
                                " config file and the operation has been"
3631
                                " aborted. Please retry.")