Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ ff98055b

History | View | Annotate | Download (115.6 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0613,W0201
25

    
26
import os
27
import os.path
28
import sha
29
import socket
30
import time
31
import tempfile
32
import re
33
import platform
34

    
35
from ganeti import rpc
36
from ganeti import ssh
37
from ganeti import logger
38
from ganeti import utils
39
from ganeti import errors
40
from ganeti import hypervisor
41
from ganeti import config
42
from ganeti import constants
43
from ganeti import objects
44
from ganeti import opcodes
45
from ganeti import ssconf
46

    
47
class LogicalUnit(object):
48
  """Logical Unit base class.
49

50
  Subclasses must follow these rules:
51
    - implement CheckPrereq which also fills in the opcode instance
52
      with all the fields (even if as None)
53
    - implement Exec
54
    - implement BuildHooksEnv
55
    - redefine HPATH and HTYPE
56
    - optionally redefine their run requirements (REQ_CLUSTER,
57
      REQ_MASTER); note that all commands require root permissions
58

59
  """
60
  HPATH = None
61
  HTYPE = None
62
  _OP_REQP = []
63
  REQ_CLUSTER = True
64
  REQ_MASTER = True
65

    
66
  def __init__(self, processor, op, cfg, sstore):
67
    """Constructor for LogicalUnit.
68

69
    This needs to be overriden in derived classes in order to check op
70
    validity.
71

72
    """
73
    self.processor = processor
74
    self.op = op
75
    self.cfg = cfg
76
    self.sstore = sstore
77
    for attr_name in self._OP_REQP:
78
      attr_val = getattr(op, attr_name, None)
79
      if attr_val is None:
80
        raise errors.OpPrereqError("Required parameter '%s' missing" %
81
                                   attr_name)
82
    if self.REQ_CLUSTER:
83
      if not cfg.IsCluster():
84
        raise errors.OpPrereqError("Cluster not initialized yet,"
85
                                   " use 'gnt-cluster init' first.")
86
      if self.REQ_MASTER:
87
        master = sstore.GetMasterNode()
88
        if master != socket.gethostname():
89
          raise errors.OpPrereqError("Commands must be run on the master"
90
                                     " node %s" % master)
91

    
92
  def CheckPrereq(self):
93
    """Check prerequisites for this LU.
94

95
    This method should check that the prerequisites for the execution
96
    of this LU are fulfilled. It can do internode communication, but
97
    it should be idempotent - no cluster or system changes are
98
    allowed.
99

100
    The method should raise errors.OpPrereqError in case something is
101
    not fulfilled. Its return value is ignored.
102

103
    This method should also update all the parameters of the opcode to
104
    their canonical form; e.g. a short node name must be fully
105
    expanded after this method has successfully completed (so that
106
    hooks, logging, etc. work correctly).
107

108
    """
109
    raise NotImplementedError
110

    
111
  def Exec(self, feedback_fn):
112
    """Execute the LU.
113

114
    This method should implement the actual work. It should raise
115
    errors.OpExecError for failures that are somewhat dealt with in
116
    code, or expected.
117

118
    """
119
    raise NotImplementedError
120

    
121
  def BuildHooksEnv(self):
122
    """Build hooks environment for this LU.
123

124
    This method should return a three-node tuple consisting of: a dict
125
    containing the environment that will be used for running the
126
    specific hook for this LU, a list of node names on which the hook
127
    should run before the execution, and a list of node names on which
128
    the hook should run after the execution.
129

130
    The keys of the dict must not have 'GANETI_' prefixed as this will
131
    be handled in the hooks runner. Also note additional keys will be
132
    added by the hooks runner. If the LU doesn't define any
133
    environment, an empty dict (and not None) should be returned.
134

135
    As for the node lists, the master should not be included in the
136
    them, as it will be added by the hooks runner in case this LU
137
    requires a cluster to run on (otherwise we don't have a node
138
    list). No nodes should be returned as an empty list (and not
139
    None).
140

141
    Note that if the HPATH for a LU class is None, this function will
142
    not be called.
143

144
    """
145
    raise NotImplementedError
146

    
147

    
148
class NoHooksLU(LogicalUnit):
149
  """Simple LU which runs no hooks.
150

151
  This LU is intended as a parent for other LogicalUnits which will
152
  run no hooks, in order to reduce duplicate code.
153

154
  """
155
  HPATH = None
156
  HTYPE = None
157

    
158
  def BuildHooksEnv(self):
159
    """Build hooks env.
160

161
    This is a no-op, since we don't run hooks.
162

163
    """
164
    return
165

    
166

    
167
def _GetWantedNodes(lu, nodes):
168
  """Returns list of checked and expanded nodes.
169

170
  Args:
171
    nodes: List of nodes (strings) or None for all
172

173
  """
174
  if nodes is not None and not isinstance(nodes, list):
175
    raise errors.OpPrereqError("Invalid argument type 'nodes'")
176

    
177
  if nodes:
178
    wanted_nodes = []
179

    
180
    for name in nodes:
181
      node = lu.cfg.GetNodeInfo(lu.cfg.ExpandNodeName(name))
182
      if node is None:
183
        raise errors.OpPrereqError("No such node name '%s'" % name)
184
    wanted_nodes.append(node)
185

    
186
    return wanted_nodes
187
  else:
188
    return [lu.cfg.GetNodeInfo(name) for name in lu.cfg.GetNodeList()]
189

    
190

    
191
def _CheckOutputFields(static, dynamic, selected):
192
  """Checks whether all selected fields are valid.
193

194
  Args:
195
    static: Static fields
196
    dynamic: Dynamic fields
197

198
  """
199
  static_fields = frozenset(static)
200
  dynamic_fields = frozenset(dynamic)
201

    
202
  all_fields = static_fields | dynamic_fields
203

    
204
  if not all_fields.issuperset(selected):
205
    raise errors.OpPrereqError("Unknown output fields selected: %s"
206
                               % ",".join(frozenset(selected).
207
                                          difference(all_fields)))
208

    
209

    
210
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
211
                          memory, vcpus, nics):
212
  """Builds instance related env variables for hooks from single variables.
213

214
  Args:
215
    secondary_nodes: List of secondary nodes as strings
216
  """
217
  env = {
218
    "INSTANCE_NAME": name,
219
    "INSTANCE_PRIMARY": primary_node,
220
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
221
    "INSTANCE_OS_TYPE": os_type,
222
    "INSTANCE_STATUS": status,
223
    "INSTANCE_MEMORY": memory,
224
    "INSTANCE_VCPUS": vcpus,
225
  }
226

    
227
  if nics:
228
    nic_count = len(nics)
229
    for idx, (ip, bridge) in enumerate(nics):
230
      if ip is None:
231
        ip = ""
232
      env["INSTANCE_NIC%d_IP" % idx] = ip
233
      env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
234
  else:
235
    nic_count = 0
236

    
237
  env["INSTANCE_NIC_COUNT"] = nic_count
238

    
239
  return env
240

    
241

    
242
def _BuildInstanceHookEnvByObject(instance, override=None):
243
  """Builds instance related env variables for hooks from an object.
244

245
  Args:
246
    instance: objects.Instance object of instance
247
    override: dict of values to override
248
  """
249
  args = {
250
    'name': instance.name,
251
    'primary_node': instance.primary_node,
252
    'secondary_nodes': instance.secondary_nodes,
253
    'os_type': instance.os,
254
    'status': instance.os,
255
    'memory': instance.memory,
256
    'vcpus': instance.vcpus,
257
    'nics': [(nic.ip, nic.bridge) for nic in instance.nics],
258
  }
259
  if override:
260
    args.update(override)
261
  return _BuildInstanceHookEnv(**args)
262

    
263

    
264
def _UpdateEtcHosts(fullnode, ip):
265
  """Ensure a node has a correct entry in /etc/hosts.
266

267
  Args:
268
    fullnode - Fully qualified domain name of host. (str)
269
    ip       - IPv4 address of host (str)
270

271
  """
272
  node = fullnode.split(".", 1)[0]
273

    
274
  f = open('/etc/hosts', 'r+')
275

    
276
  inthere = False
277

    
278
  save_lines = []
279
  add_lines = []
280
  removed = False
281

    
282
  while True:
283
    rawline = f.readline()
284

    
285
    if not rawline:
286
      # End of file
287
      break
288

    
289
    line = rawline.split('\n')[0]
290

    
291
    # Strip off comments
292
    line = line.split('#')[0]
293

    
294
    if not line:
295
      # Entire line was comment, skip
296
      save_lines.append(rawline)
297
      continue
298

    
299
    fields = line.split()
300

    
301
    haveall = True
302
    havesome = False
303
    for spec in [ ip, fullnode, node ]:
304
      if spec not in fields:
305
        haveall = False
306
      if spec in fields:
307
        havesome = True
308

    
309
    if haveall:
310
      inthere = True
311
      save_lines.append(rawline)
312
      continue
313

    
314
    if havesome and not haveall:
315
      # Line (old, or manual?) which is missing some.  Remove.
316
      removed = True
317
      continue
318

    
319
    save_lines.append(rawline)
320

    
321
  if not inthere:
322
    add_lines.append('%s\t%s %s\n' % (ip, fullnode, node))
323

    
324
  if removed:
325
    if add_lines:
326
      save_lines = save_lines + add_lines
327

    
328
    # We removed a line, write a new file and replace old.
329
    fd, tmpname = tempfile.mkstemp('tmp', 'hosts_', '/etc')
330
    newfile = os.fdopen(fd, 'w')
331
    newfile.write(''.join(save_lines))
332
    newfile.close()
333
    os.rename(tmpname, '/etc/hosts')
334

    
335
  elif add_lines:
336
    # Simply appending a new line will do the trick.
337
    f.seek(0, 2)
338
    for add in add_lines:
339
      f.write(add)
340

    
341
  f.close()
342

    
343

    
344
def _UpdateKnownHosts(fullnode, ip, pubkey):
345
  """Ensure a node has a correct known_hosts entry.
346

347
  Args:
348
    fullnode - Fully qualified domain name of host. (str)
349
    ip       - IPv4 address of host (str)
350
    pubkey   - the public key of the cluster
351

352
  """
353
  if os.path.exists(constants.SSH_KNOWN_HOSTS_FILE):
354
    f = open(constants.SSH_KNOWN_HOSTS_FILE, 'r+')
355
  else:
356
    f = open(constants.SSH_KNOWN_HOSTS_FILE, 'w+')
357

    
358
  inthere = False
359

    
360
  save_lines = []
361
  add_lines = []
362
  removed = False
363

    
364
  while True:
365
    rawline = f.readline()
366
    logger.Debug('read %s' % (repr(rawline),))
367

    
368
    if not rawline:
369
      # End of file
370
      break
371

    
372
    line = rawline.split('\n')[0]
373

    
374
    parts = line.split(' ')
375
    fields = parts[0].split(',')
376
    key = parts[2]
377

    
378
    haveall = True
379
    havesome = False
380
    for spec in [ ip, fullnode ]:
381
      if spec not in fields:
382
        haveall = False
383
      if spec in fields:
384
        havesome = True
385

    
386
    logger.Debug("key, pubkey = %s." % (repr((key, pubkey)),))
387
    if haveall and key == pubkey:
388
      inthere = True
389
      save_lines.append(rawline)
390
      logger.Debug("Keeping known_hosts '%s'." % (repr(rawline),))
391
      continue
392

    
393
    if havesome and (not haveall or key != pubkey):
394
      removed = True
395
      logger.Debug("Discarding known_hosts '%s'." % (repr(rawline),))
396
      continue
397

    
398
    save_lines.append(rawline)
399

    
400
  if not inthere:
401
    add_lines.append('%s,%s ssh-rsa %s\n' % (fullnode, ip, pubkey))
402
    logger.Debug("Adding known_hosts '%s'." % (repr(add_lines[-1]),))
403

    
404
  if removed:
405
    save_lines = save_lines + add_lines
406

    
407
    # Write a new file and replace old.
408
    fd, tmpname = tempfile.mkstemp('.tmp', 'known_hosts.',
409
                                   constants.DATA_DIR)
410
    newfile = os.fdopen(fd, 'w')
411
    try:
412
      newfile.write(''.join(save_lines))
413
    finally:
414
      newfile.close()
415
    logger.Debug("Wrote new known_hosts.")
416
    os.rename(tmpname, constants.SSH_KNOWN_HOSTS_FILE)
417

    
418
  elif add_lines:
419
    # Simply appending a new line will do the trick.
420
    f.seek(0, 2)
421
    for add in add_lines:
422
      f.write(add)
423

    
424
  f.close()
425

    
426

    
427
def _HasValidVG(vglist, vgname):
428
  """Checks if the volume group list is valid.
429

430
  A non-None return value means there's an error, and the return value
431
  is the error message.
432

433
  """
434
  vgsize = vglist.get(vgname, None)
435
  if vgsize is None:
436
    return "volume group '%s' missing" % vgname
437
  elif vgsize < 20480:
438
    return ("volume group '%s' too small (20480MiB required, %dMib found)" %
439
            (vgname, vgsize))
440
  return None
441

    
442

    
443
def _InitSSHSetup(node):
444
  """Setup the SSH configuration for the cluster.
445

446

447
  This generates a dsa keypair for root, adds the pub key to the
448
  permitted hosts and adds the hostkey to its own known hosts.
449

450
  Args:
451
    node: the name of this host as a fqdn
452

453
  """
454
  if os.path.exists('/root/.ssh/id_dsa'):
455
    utils.CreateBackup('/root/.ssh/id_dsa')
456
  if os.path.exists('/root/.ssh/id_dsa.pub'):
457
    utils.CreateBackup('/root/.ssh/id_dsa.pub')
458

    
459
  utils.RemoveFile('/root/.ssh/id_dsa')
460
  utils.RemoveFile('/root/.ssh/id_dsa.pub')
461

    
462
  result = utils.RunCmd(["ssh-keygen", "-t", "dsa",
463
                         "-f", "/root/.ssh/id_dsa",
464
                         "-q", "-N", ""])
465
  if result.failed:
466
    raise errors.OpExecError("Could not generate ssh keypair, error %s" %
467
                             result.output)
468

    
469
  f = open('/root/.ssh/id_dsa.pub', 'r')
470
  try:
471
    utils.AddAuthorizedKey('/root/.ssh/authorized_keys', f.read(8192))
472
  finally:
473
    f.close()
474

    
475

    
476
def _InitGanetiServerSetup(ss):
477
  """Setup the necessary configuration for the initial node daemon.
478

479
  This creates the nodepass file containing the shared password for
480
  the cluster and also generates the SSL certificate.
481

482
  """
483
  # Create pseudo random password
484
  randpass = sha.new(os.urandom(64)).hexdigest()
485
  # and write it into sstore
486
  ss.SetKey(ss.SS_NODED_PASS, randpass)
487

    
488
  result = utils.RunCmd(["openssl", "req", "-new", "-newkey", "rsa:1024",
489
                         "-days", str(365*5), "-nodes", "-x509",
490
                         "-keyout", constants.SSL_CERT_FILE,
491
                         "-out", constants.SSL_CERT_FILE, "-batch"])
492
  if result.failed:
493
    raise errors.OpExecError("could not generate server ssl cert, command"
494
                             " %s had exitcode %s and error message %s" %
495
                             (result.cmd, result.exit_code, result.output))
496

    
497
  os.chmod(constants.SSL_CERT_FILE, 0400)
498

    
499
  result = utils.RunCmd([constants.NODE_INITD_SCRIPT, "restart"])
500

    
501
  if result.failed:
502
    raise errors.OpExecError("Could not start the node daemon, command %s"
503
                             " had exitcode %s and error %s" %
504
                             (result.cmd, result.exit_code, result.output))
505

    
506

    
507
class LUInitCluster(LogicalUnit):
508
  """Initialise the cluster.
509

510
  """
511
  HPATH = "cluster-init"
512
  HTYPE = constants.HTYPE_CLUSTER
513
  _OP_REQP = ["cluster_name", "hypervisor_type", "vg_name", "mac_prefix",
514
              "def_bridge", "master_netdev"]
515
  REQ_CLUSTER = False
516

    
517
  def BuildHooksEnv(self):
518
    """Build hooks env.
519

520
    Notes: Since we don't require a cluster, we must manually add
521
    ourselves in the post-run node list.
522

523
    """
524
    env = {
525
      "CLUSTER": self.op.cluster_name,
526
      "MASTER": self.hostname['hostname_full'],
527
      }
528
    return env, [], [self.hostname['hostname_full']]
529

    
530
  def CheckPrereq(self):
531
    """Verify that the passed name is a valid one.
532

533
    """
534
    if config.ConfigWriter.IsCluster():
535
      raise errors.OpPrereqError("Cluster is already initialised")
536

    
537
    hostname_local = socket.gethostname()
538
    self.hostname = hostname = utils.LookupHostname(hostname_local)
539
    if not hostname:
540
      raise errors.OpPrereqError("Cannot resolve my own hostname ('%s')" %
541
                                 hostname_local)
542

    
543
    if hostname["hostname_full"] != hostname_local:
544
      raise errors.OpPrereqError("My own hostname (%s) does not match the"
545
                                 " resolver (%s): probably not using FQDN"
546
                                 " for hostname." %
547
                                 (hostname_local, hostname["hostname_full"]))
548

    
549
    self.clustername = clustername = utils.LookupHostname(self.op.cluster_name)
550
    if not clustername:
551
      raise errors.OpPrereqError("Cannot resolve given cluster name ('%s')"
552
                                 % self.op.cluster_name)
553

    
554
    result = utils.RunCmd(["fping", "-S127.0.0.1", "-q", hostname['ip']])
555
    if result.failed:
556
      raise errors.OpPrereqError("Inconsistency: this host's name resolves"
557
                                 " to %s,\nbut this ip address does not"
558
                                 " belong to this host."
559
                                 " Aborting." % hostname['ip'])
560

    
561
    secondary_ip = getattr(self.op, "secondary_ip", None)
562
    if secondary_ip and not utils.IsValidIP(secondary_ip):
563
      raise errors.OpPrereqError("Invalid secondary ip given")
564
    if secondary_ip and secondary_ip != hostname['ip']:
565
      result = utils.RunCmd(["fping", "-S127.0.0.1", "-q", secondary_ip])
566
      if result.failed:
567
        raise errors.OpPrereqError("You gave %s as secondary IP,\n"
568
                                   "but it does not belong to this host." %
569
                                   secondary_ip)
570
    self.secondary_ip = secondary_ip
571

    
572
    # checks presence of the volume group given
573
    vgstatus = _HasValidVG(utils.ListVolumeGroups(), self.op.vg_name)
574

    
575
    if vgstatus:
576
      raise errors.OpPrereqError("Error: %s" % vgstatus)
577

    
578
    if not re.match("^[0-9a-z]{2}:[0-9a-z]{2}:[0-9a-z]{2}$",
579
                    self.op.mac_prefix):
580
      raise errors.OpPrereqError("Invalid mac prefix given '%s'" %
581
                                 self.op.mac_prefix)
582

    
583
    if self.op.hypervisor_type not in hypervisor.VALID_HTYPES:
584
      raise errors.OpPrereqError("Invalid hypervisor type given '%s'" %
585
                                 self.op.hypervisor_type)
586

    
587
    result = utils.RunCmd(["ip", "link", "show", "dev", self.op.master_netdev])
588
    if result.failed:
589
      raise errors.OpPrereqError("Invalid master netdev given (%s): '%s'" %
590
                                 (self.op.master_netdev,
591
                                  result.output.strip()))
592

    
593
  def Exec(self, feedback_fn):
594
    """Initialize the cluster.
595

596
    """
597
    clustername = self.clustername
598
    hostname = self.hostname
599

    
600
    # set up the simple store
601
    ss = ssconf.SimpleStore()
602
    ss.SetKey(ss.SS_HYPERVISOR, self.op.hypervisor_type)
603
    ss.SetKey(ss.SS_MASTER_NODE, hostname['hostname_full'])
604
    ss.SetKey(ss.SS_MASTER_IP, clustername['ip'])
605
    ss.SetKey(ss.SS_MASTER_NETDEV, self.op.master_netdev)
606
    ss.SetKey(ss.SS_CLUSTER_NAME, clustername['hostname'])
607

    
608
    # set up the inter-node password and certificate
609
    _InitGanetiServerSetup(ss)
610

    
611
    # start the master ip
612
    rpc.call_node_start_master(hostname['hostname_full'])
613

    
614
    # set up ssh config and /etc/hosts
615
    f = open('/etc/ssh/ssh_host_rsa_key.pub', 'r')
616
    try:
617
      sshline = f.read()
618
    finally:
619
      f.close()
620
    sshkey = sshline.split(" ")[1]
621

    
622
    _UpdateEtcHosts(hostname['hostname_full'],
623
                    hostname['ip'],
624
                    )
625

    
626
    _UpdateKnownHosts(hostname['hostname_full'],
627
                      hostname['ip'],
628
                      sshkey,
629
                      )
630

    
631
    _InitSSHSetup(hostname['hostname'])
632

    
633
    # init of cluster config file
634
    cfgw = config.ConfigWriter()
635
    cfgw.InitConfig(hostname['hostname'], hostname['ip'], self.secondary_ip,
636
                    sshkey, self.op.mac_prefix,
637
                    self.op.vg_name, self.op.def_bridge)
638

    
639

    
640
class LUDestroyCluster(NoHooksLU):
641
  """Logical unit for destroying the cluster.
642

643
  """
644
  _OP_REQP = []
645

    
646
  def CheckPrereq(self):
647
    """Check prerequisites.
648

649
    This checks whether the cluster is empty.
650

651
    Any errors are signalled by raising errors.OpPrereqError.
652

653
    """
654
    master = self.sstore.GetMasterNode()
655

    
656
    nodelist = self.cfg.GetNodeList()
657
    if len(nodelist) != 1 or nodelist[0] != master:
658
      raise errors.OpPrereqError("There are still %d node(s) in"
659
                                 " this cluster." % (len(nodelist) - 1))
660
    instancelist = self.cfg.GetInstanceList()
661
    if instancelist:
662
      raise errors.OpPrereqError("There are still %d instance(s) in"
663
                                 " this cluster." % len(instancelist))
664

    
665
  def Exec(self, feedback_fn):
666
    """Destroys the cluster.
667

668
    """
669
    utils.CreateBackup('/root/.ssh/id_dsa')
670
    utils.CreateBackup('/root/.ssh/id_dsa.pub')
671
    rpc.call_node_leave_cluster(self.sstore.GetMasterNode())
672

    
673

    
674
class LUVerifyCluster(NoHooksLU):
675
  """Verifies the cluster status.
676

677
  """
678
  _OP_REQP = []
679

    
680
  def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
681
                  remote_version, feedback_fn):
682
    """Run multiple tests against a node.
683

684
    Test list:
685
      - compares ganeti version
686
      - checks vg existance and size > 20G
687
      - checks config file checksum
688
      - checks ssh to other nodes
689

690
    Args:
691
      node: name of the node to check
692
      file_list: required list of files
693
      local_cksum: dictionary of local files and their checksums
694

695
    """
696
    # compares ganeti version
697
    local_version = constants.PROTOCOL_VERSION
698
    if not remote_version:
699
      feedback_fn(" - ERROR: connection to %s failed" % (node))
700
      return True
701

    
702
    if local_version != remote_version:
703
      feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
704
                      (local_version, node, remote_version))
705
      return True
706

    
707
    # checks vg existance and size > 20G
708

    
709
    bad = False
710
    if not vglist:
711
      feedback_fn("  - ERROR: unable to check volume groups on node %s." %
712
                      (node,))
713
      bad = True
714
    else:
715
      vgstatus = _HasValidVG(vglist, self.cfg.GetVGName())
716
      if vgstatus:
717
        feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
718
        bad = True
719

    
720
    # checks config file checksum
721
    # checks ssh to any
722

    
723
    if 'filelist' not in node_result:
724
      bad = True
725
      feedback_fn("  - ERROR: node hasn't returned file checksum data")
726
    else:
727
      remote_cksum = node_result['filelist']
728
      for file_name in file_list:
729
        if file_name not in remote_cksum:
730
          bad = True
731
          feedback_fn("  - ERROR: file '%s' missing" % file_name)
732
        elif remote_cksum[file_name] != local_cksum[file_name]:
733
          bad = True
734
          feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
735

    
736
    if 'nodelist' not in node_result:
737
      bad = True
738
      feedback_fn("  - ERROR: node hasn't returned node connectivity data")
739
    else:
740
      if node_result['nodelist']:
741
        bad = True
742
        for node in node_result['nodelist']:
743
          feedback_fn("  - ERROR: communication with node '%s': %s" %
744
                          (node, node_result['nodelist'][node]))
745
    hyp_result = node_result.get('hypervisor', None)
746
    if hyp_result is not None:
747
      feedback_fn("  - ERROR: hypervisor verify failure: '%s'" % hyp_result)
748
    return bad
749

    
750
  def _VerifyInstance(self, instance, node_vol_is, node_instance, feedback_fn):
751
    """Verify an instance.
752

753
    This function checks to see if the required block devices are
754
    available on the instance's node.
755

756
    """
757
    bad = False
758

    
759
    instancelist = self.cfg.GetInstanceList()
760
    if not instance in instancelist:
761
      feedback_fn("  - ERROR: instance %s not in instance list %s" %
762
                      (instance, instancelist))
763
      bad = True
764

    
765
    instanceconfig = self.cfg.GetInstanceInfo(instance)
766
    node_current = instanceconfig.primary_node
767

    
768
    node_vol_should = {}
769
    instanceconfig.MapLVsByNode(node_vol_should)
770

    
771
    for node in node_vol_should:
772
      for volume in node_vol_should[node]:
773
        if node not in node_vol_is or volume not in node_vol_is[node]:
774
          feedback_fn("  - ERROR: volume %s missing on node %s" %
775
                          (volume, node))
776
          bad = True
777

    
778
    if not instanceconfig.status == 'down':
779
      if not instance in node_instance[node_current]:
780
        feedback_fn("  - ERROR: instance %s not running on node %s" %
781
                        (instance, node_current))
782
        bad = True
783

    
784
    for node in node_instance:
785
      if (not node == node_current):
786
        if instance in node_instance[node]:
787
          feedback_fn("  - ERROR: instance %s should not run on node %s" %
788
                          (instance, node))
789
          bad = True
790

    
791
    return not bad
792

    
793
  def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
794
    """Verify if there are any unknown volumes in the cluster.
795

796
    The .os, .swap and backup volumes are ignored. All other volumes are
797
    reported as unknown.
798

799
    """
800
    bad = False
801

    
802
    for node in node_vol_is:
803
      for volume in node_vol_is[node]:
804
        if node not in node_vol_should or volume not in node_vol_should[node]:
805
          feedback_fn("  - ERROR: volume %s on node %s should not exist" %
806
                      (volume, node))
807
          bad = True
808
    return bad
809

    
810
  def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
811
    """Verify the list of running instances.
812

813
    This checks what instances are running but unknown to the cluster.
814

815
    """
816
    bad = False
817
    for node in node_instance:
818
      for runninginstance in node_instance[node]:
819
        if runninginstance not in instancelist:
820
          feedback_fn("  - ERROR: instance %s on node %s should not exist" %
821
                          (runninginstance, node))
822
          bad = True
823
    return bad
824

    
825
  def CheckPrereq(self):
826
    """Check prerequisites.
827

828
    This has no prerequisites.
829

830
    """
831
    pass
832

    
833
  def Exec(self, feedback_fn):
834
    """Verify integrity of cluster, performing various test on nodes.
835

836
    """
837
    bad = False
838
    feedback_fn("* Verifying global settings")
839
    self.cfg.VerifyConfig()
840

    
841
    master = self.sstore.GetMasterNode()
842
    vg_name = self.cfg.GetVGName()
843
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
844
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
845
    node_volume = {}
846
    node_instance = {}
847

    
848
    # FIXME: verify OS list
849
    # do local checksums
850
    file_names = list(self.sstore.GetFileList())
851
    file_names.append(constants.SSL_CERT_FILE)
852
    file_names.append(constants.CLUSTER_CONF_FILE)
853
    local_checksums = utils.FingerprintFiles(file_names)
854

    
855
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
856
    all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
857
    all_instanceinfo = rpc.call_instance_list(nodelist)
858
    all_vglist = rpc.call_vg_list(nodelist)
859
    node_verify_param = {
860
      'filelist': file_names,
861
      'nodelist': nodelist,
862
      'hypervisor': None,
863
      }
864
    all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
865
    all_rversion = rpc.call_version(nodelist)
866

    
867
    for node in nodelist:
868
      feedback_fn("* Verifying node %s" % node)
869
      result = self._VerifyNode(node, file_names, local_checksums,
870
                                all_vglist[node], all_nvinfo[node],
871
                                all_rversion[node], feedback_fn)
872
      bad = bad or result
873

    
874
      # node_volume
875
      volumeinfo = all_volumeinfo[node]
876

    
877
      if type(volumeinfo) != dict:
878
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
879
        bad = True
880
        continue
881

    
882
      node_volume[node] = volumeinfo
883

    
884
      # node_instance
885
      nodeinstance = all_instanceinfo[node]
886
      if type(nodeinstance) != list:
887
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
888
        bad = True
889
        continue
890

    
891
      node_instance[node] = nodeinstance
892

    
893
    node_vol_should = {}
894

    
895
    for instance in instancelist:
896
      feedback_fn("* Verifying instance %s" % instance)
897
      result =  self._VerifyInstance(instance, node_volume, node_instance,
898
                                     feedback_fn)
899
      bad = bad or result
900

    
901
      inst_config = self.cfg.GetInstanceInfo(instance)
902

    
903
      inst_config.MapLVsByNode(node_vol_should)
904

    
905
    feedback_fn("* Verifying orphan volumes")
906
    result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
907
                                       feedback_fn)
908
    bad = bad or result
909

    
910
    feedback_fn("* Verifying remaining instances")
911
    result = self._VerifyOrphanInstances(instancelist, node_instance,
912
                                         feedback_fn)
913
    bad = bad or result
914

    
915
    return int(bad)
916

    
917

    
918
def _WaitForSync(cfgw, instance, oneshot=False, unlock=False):
919
  """Sleep and poll for an instance's disk to sync.
920

921
  """
922
  if not instance.disks:
923
    return True
924

    
925
  if not oneshot:
926
    logger.ToStdout("Waiting for instance %s to sync disks." % instance.name)
927

    
928
  node = instance.primary_node
929

    
930
  for dev in instance.disks:
931
    cfgw.SetDiskID(dev, node)
932

    
933
  retries = 0
934
  while True:
935
    max_time = 0
936
    done = True
937
    cumul_degraded = False
938
    rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
939
    if not rstats:
940
      logger.ToStderr("Can't get any data from node %s" % node)
941
      retries += 1
942
      if retries >= 10:
943
        raise errors.RemoteError("Can't contact node %s for mirror data,"
944
                                 " aborting." % node)
945
      time.sleep(6)
946
      continue
947
    retries = 0
948
    for i in range(len(rstats)):
949
      mstat = rstats[i]
950
      if mstat is None:
951
        logger.ToStderr("Can't compute data for node %s/%s" %
952
                        (node, instance.disks[i].iv_name))
953
        continue
954
      perc_done, est_time, is_degraded = mstat
955
      cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
956
      if perc_done is not None:
957
        done = False
958
        if est_time is not None:
959
          rem_time = "%d estimated seconds remaining" % est_time
960
          max_time = est_time
961
        else:
962
          rem_time = "no time estimate"
963
        logger.ToStdout("- device %s: %5.2f%% done, %s" %
964
                        (instance.disks[i].iv_name, perc_done, rem_time))
965
    if done or oneshot:
966
      break
967

    
968
    if unlock:
969
      utils.Unlock('cmd')
970
    try:
971
      time.sleep(min(60, max_time))
972
    finally:
973
      if unlock:
974
        utils.Lock('cmd')
975

    
976
  if done:
977
    logger.ToStdout("Instance %s's disks are in sync." % instance.name)
978
  return not cumul_degraded
979

    
980

    
981
def _CheckDiskConsistency(cfgw, dev, node, on_primary):
982
  """Check that mirrors are not degraded.
983

984
  """
985
  cfgw.SetDiskID(dev, node)
986

    
987
  result = True
988
  if on_primary or dev.AssembleOnSecondary():
989
    rstats = rpc.call_blockdev_find(node, dev)
990
    if not rstats:
991
      logger.ToStderr("Can't get any data from node %s" % node)
992
      result = False
993
    else:
994
      result = result and (not rstats[5])
995
  if dev.children:
996
    for child in dev.children:
997
      result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
998

    
999
  return result
1000

    
1001

    
1002
class LUDiagnoseOS(NoHooksLU):
1003
  """Logical unit for OS diagnose/query.
1004

1005
  """
1006
  _OP_REQP = []
1007

    
1008
  def CheckPrereq(self):
1009
    """Check prerequisites.
1010

1011
    This always succeeds, since this is a pure query LU.
1012

1013
    """
1014
    return
1015

    
1016
  def Exec(self, feedback_fn):
1017
    """Compute the list of OSes.
1018

1019
    """
1020
    node_list = self.cfg.GetNodeList()
1021
    node_data = rpc.call_os_diagnose(node_list)
1022
    if node_data == False:
1023
      raise errors.OpExecError("Can't gather the list of OSes")
1024
    return node_data
1025

    
1026

    
1027
class LURemoveNode(LogicalUnit):
1028
  """Logical unit for removing a node.
1029

1030
  """
1031
  HPATH = "node-remove"
1032
  HTYPE = constants.HTYPE_NODE
1033
  _OP_REQP = ["node_name"]
1034

    
1035
  def BuildHooksEnv(self):
1036
    """Build hooks env.
1037

1038
    This doesn't run on the target node in the pre phase as a failed
1039
    node would not allows itself to run.
1040

1041
    """
1042
    env = {
1043
      "NODE_NAME": self.op.node_name,
1044
      }
1045
    all_nodes = self.cfg.GetNodeList()
1046
    all_nodes.remove(self.op.node_name)
1047
    return env, all_nodes, all_nodes
1048

    
1049
  def CheckPrereq(self):
1050
    """Check prerequisites.
1051

1052
    This checks:
1053
     - the node exists in the configuration
1054
     - it does not have primary or secondary instances
1055
     - it's not the master
1056

1057
    Any errors are signalled by raising errors.OpPrereqError.
1058

1059
    """
1060
    node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1061
    if node is None:
1062
      raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1063

    
1064
    instance_list = self.cfg.GetInstanceList()
1065

    
1066
    masternode = self.sstore.GetMasterNode()
1067
    if node.name == masternode:
1068
      raise errors.OpPrereqError("Node is the master node,"
1069
                                 " you need to failover first.")
1070

    
1071
    for instance_name in instance_list:
1072
      instance = self.cfg.GetInstanceInfo(instance_name)
1073
      if node.name == instance.primary_node:
1074
        raise errors.OpPrereqError("Instance %s still running on the node,"
1075
                                   " please remove first." % instance_name)
1076
      if node.name in instance.secondary_nodes:
1077
        raise errors.OpPrereqError("Instance %s has node as a secondary,"
1078
                                   " please remove first." % instance_name)
1079
    self.op.node_name = node.name
1080
    self.node = node
1081

    
1082
  def Exec(self, feedback_fn):
1083
    """Removes the node from the cluster.
1084

1085
    """
1086
    node = self.node
1087
    logger.Info("stopping the node daemon and removing configs from node %s" %
1088
                node.name)
1089

    
1090
    rpc.call_node_leave_cluster(node.name)
1091

    
1092
    ssh.SSHCall(node.name, 'root', "%s stop" % constants.NODE_INITD_SCRIPT)
1093

    
1094
    logger.Info("Removing node %s from config" % node.name)
1095

    
1096
    self.cfg.RemoveNode(node.name)
1097

    
1098

    
1099
class LUQueryNodes(NoHooksLU):
1100
  """Logical unit for querying nodes.
1101

1102
  """
1103
  _OP_REQP = ["output_fields"]
1104

    
1105
  def CheckPrereq(self):
1106
    """Check prerequisites.
1107

1108
    This checks that the fields required are valid output fields.
1109

1110
    """
1111
    self.dynamic_fields = frozenset(["dtotal", "dfree",
1112
                                     "mtotal", "mnode", "mfree"])
1113

    
1114
    _CheckOutputFields(static=["name", "pinst", "sinst", "pip", "sip"],
1115
                       dynamic=self.dynamic_fields,
1116
                       selected=self.op.output_fields)
1117

    
1118

    
1119
  def Exec(self, feedback_fn):
1120
    """Computes the list of nodes and their attributes.
1121

1122
    """
1123
    nodenames = utils.NiceSort(self.cfg.GetNodeList())
1124
    nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1125

    
1126

    
1127
    # begin data gathering
1128

    
1129
    if self.dynamic_fields.intersection(self.op.output_fields):
1130
      live_data = {}
1131
      node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1132
      for name in nodenames:
1133
        nodeinfo = node_data.get(name, None)
1134
        if nodeinfo:
1135
          live_data[name] = {
1136
            "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1137
            "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1138
            "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1139
            "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1140
            "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1141
            }
1142
        else:
1143
          live_data[name] = {}
1144
    else:
1145
      live_data = dict.fromkeys(nodenames, {})
1146

    
1147
    node_to_primary = dict.fromkeys(nodenames, 0)
1148
    node_to_secondary = dict.fromkeys(nodenames, 0)
1149

    
1150
    if "pinst" in self.op.output_fields or "sinst" in self.op.output_fields:
1151
      instancelist = self.cfg.GetInstanceList()
1152

    
1153
      for instance in instancelist:
1154
        instanceinfo = self.cfg.GetInstanceInfo(instance)
1155
        node_to_primary[instanceinfo.primary_node] += 1
1156
        for secnode in instanceinfo.secondary_nodes:
1157
          node_to_secondary[secnode] += 1
1158

    
1159
    # end data gathering
1160

    
1161
    output = []
1162
    for node in nodelist:
1163
      node_output = []
1164
      for field in self.op.output_fields:
1165
        if field == "name":
1166
          val = node.name
1167
        elif field == "pinst":
1168
          val = node_to_primary[node.name]
1169
        elif field == "sinst":
1170
          val = node_to_secondary[node.name]
1171
        elif field == "pip":
1172
          val = node.primary_ip
1173
        elif field == "sip":
1174
          val = node.secondary_ip
1175
        elif field in self.dynamic_fields:
1176
          val = live_data[node.name].get(field, "?")
1177
        else:
1178
          raise errors.ParameterError(field)
1179
        val = str(val)
1180
        node_output.append(val)
1181
      output.append(node_output)
1182

    
1183
    return output
1184

    
1185

    
1186
class LUQueryNodeVolumes(NoHooksLU):
1187
  """Logical unit for getting volumes on node(s).
1188

1189
  """
1190
  _OP_REQP = ["nodes", "output_fields"]
1191

    
1192
  def CheckPrereq(self):
1193
    """Check prerequisites.
1194

1195
    This checks that the fields required are valid output fields.
1196

1197
    """
1198
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1199

    
1200
    _CheckOutputFields(static=["node"],
1201
                       dynamic=["phys", "vg", "name", "size", "instance"],
1202
                       selected=self.op.output_fields)
1203

    
1204

    
1205
  def Exec(self, feedback_fn):
1206
    """Computes the list of nodes and their attributes.
1207

1208
    """
1209
    nodenames = utils.NiceSort([node.name for node in self.nodes])
1210
    volumes = rpc.call_node_volumes(nodenames)
1211

    
1212
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
1213
             in self.cfg.GetInstanceList()]
1214

    
1215
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1216

    
1217
    output = []
1218
    for node in nodenames:
1219
      if node not in volumes or not volumes[node]:
1220
        continue
1221

    
1222
      node_vols = volumes[node][:]
1223
      node_vols.sort(key=lambda vol: vol['dev'])
1224

    
1225
      for vol in node_vols:
1226
        node_output = []
1227
        for field in self.op.output_fields:
1228
          if field == "node":
1229
            val = node
1230
          elif field == "phys":
1231
            val = vol['dev']
1232
          elif field == "vg":
1233
            val = vol['vg']
1234
          elif field == "name":
1235
            val = vol['name']
1236
          elif field == "size":
1237
            val = int(float(vol['size']))
1238
          elif field == "instance":
1239
            for inst in ilist:
1240
              if node not in lv_by_node[inst]:
1241
                continue
1242
              if vol['name'] in lv_by_node[inst][node]:
1243
                val = inst.name
1244
                break
1245
            else:
1246
              val = '-'
1247
          else:
1248
            raise errors.ParameterError(field)
1249
          node_output.append(str(val))
1250

    
1251
        output.append(node_output)
1252

    
1253
    return output
1254

    
1255

    
1256
class LUAddNode(LogicalUnit):
1257
  """Logical unit for adding node to the cluster.
1258

1259
  """
1260
  HPATH = "node-add"
1261
  HTYPE = constants.HTYPE_NODE
1262
  _OP_REQP = ["node_name"]
1263

    
1264
  def BuildHooksEnv(self):
1265
    """Build hooks env.
1266

1267
    This will run on all nodes before, and on all nodes + the new node after.
1268

1269
    """
1270
    env = {
1271
      "NODE_NAME": self.op.node_name,
1272
      "NODE_PIP": self.op.primary_ip,
1273
      "NODE_SIP": self.op.secondary_ip,
1274
      }
1275
    nodes_0 = self.cfg.GetNodeList()
1276
    nodes_1 = nodes_0 + [self.op.node_name, ]
1277
    return env, nodes_0, nodes_1
1278

    
1279
  def CheckPrereq(self):
1280
    """Check prerequisites.
1281

1282
    This checks:
1283
     - the new node is not already in the config
1284
     - it is resolvable
1285
     - its parameters (single/dual homed) matches the cluster
1286

1287
    Any errors are signalled by raising errors.OpPrereqError.
1288

1289
    """
1290
    node_name = self.op.node_name
1291
    cfg = self.cfg
1292

    
1293
    dns_data = utils.LookupHostname(node_name)
1294
    if not dns_data:
1295
      raise errors.OpPrereqError("Node %s is not resolvable" % node_name)
1296

    
1297
    node = dns_data['hostname']
1298
    primary_ip = self.op.primary_ip = dns_data['ip']
1299
    secondary_ip = getattr(self.op, "secondary_ip", None)
1300
    if secondary_ip is None:
1301
      secondary_ip = primary_ip
1302
    if not utils.IsValidIP(secondary_ip):
1303
      raise errors.OpPrereqError("Invalid secondary IP given")
1304
    self.op.secondary_ip = secondary_ip
1305
    node_list = cfg.GetNodeList()
1306
    if node in node_list:
1307
      raise errors.OpPrereqError("Node %s is already in the configuration"
1308
                                 % node)
1309

    
1310
    for existing_node_name in node_list:
1311
      existing_node = cfg.GetNodeInfo(existing_node_name)
1312
      if (existing_node.primary_ip == primary_ip or
1313
          existing_node.secondary_ip == primary_ip or
1314
          existing_node.primary_ip == secondary_ip or
1315
          existing_node.secondary_ip == secondary_ip):
1316
        raise errors.OpPrereqError("New node ip address(es) conflict with"
1317
                                   " existing node %s" % existing_node.name)
1318

    
1319
    # check that the type of the node (single versus dual homed) is the
1320
    # same as for the master
1321
    myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1322
    master_singlehomed = myself.secondary_ip == myself.primary_ip
1323
    newbie_singlehomed = secondary_ip == primary_ip
1324
    if master_singlehomed != newbie_singlehomed:
1325
      if master_singlehomed:
1326
        raise errors.OpPrereqError("The master has no private ip but the"
1327
                                   " new node has one")
1328
      else:
1329
        raise errors.OpPrereqError("The master has a private ip but the"
1330
                                   " new node doesn't have one")
1331

    
1332
    # checks reachablity
1333
    command = ["fping", "-q", primary_ip]
1334
    result = utils.RunCmd(command)
1335
    if result.failed:
1336
      raise errors.OpPrereqError("Node not reachable by ping")
1337

    
1338
    if not newbie_singlehomed:
1339
      # check reachability from my secondary ip to newbie's secondary ip
1340
      command = ["fping", "-S%s" % myself.secondary_ip, "-q", secondary_ip]
1341
      result = utils.RunCmd(command)
1342
      if result.failed:
1343
        raise errors.OpPrereqError("Node secondary ip not reachable by ping")
1344

    
1345
    self.new_node = objects.Node(name=node,
1346
                                 primary_ip=primary_ip,
1347
                                 secondary_ip=secondary_ip)
1348

    
1349
  def Exec(self, feedback_fn):
1350
    """Adds the new node to the cluster.
1351

1352
    """
1353
    new_node = self.new_node
1354
    node = new_node.name
1355

    
1356
    # set up inter-node password and certificate and restarts the node daemon
1357
    gntpass = self.sstore.GetNodeDaemonPassword()
1358
    if not re.match('^[a-zA-Z0-9.]{1,64}$', gntpass):
1359
      raise errors.OpExecError("ganeti password corruption detected")
1360
    f = open(constants.SSL_CERT_FILE)
1361
    try:
1362
      gntpem = f.read(8192)
1363
    finally:
1364
      f.close()
1365
    # in the base64 pem encoding, neither '!' nor '.' are valid chars,
1366
    # so we use this to detect an invalid certificate; as long as the
1367
    # cert doesn't contain this, the here-document will be correctly
1368
    # parsed by the shell sequence below
1369
    if re.search('^!EOF\.', gntpem, re.MULTILINE):
1370
      raise errors.OpExecError("invalid PEM encoding in the SSL certificate")
1371
    if not gntpem.endswith("\n"):
1372
      raise errors.OpExecError("PEM must end with newline")
1373
    logger.Info("copy cluster pass to %s and starting the node daemon" % node)
1374

    
1375
    # and then connect with ssh to set password and start ganeti-noded
1376
    # note that all the below variables are sanitized at this point,
1377
    # either by being constants or by the checks above
1378
    ss = self.sstore
1379
    mycommand = ("umask 077 && "
1380
                 "echo '%s' > '%s' && "
1381
                 "cat > '%s' << '!EOF.' && \n"
1382
                 "%s!EOF.\n%s restart" %
1383
                 (gntpass, ss.KeyToFilename(ss.SS_NODED_PASS),
1384
                  constants.SSL_CERT_FILE, gntpem,
1385
                  constants.NODE_INITD_SCRIPT))
1386

    
1387
    result = ssh.SSHCall(node, 'root', mycommand, batch=False, ask_key=True)
1388
    if result.failed:
1389
      raise errors.OpExecError("Remote command on node %s, error: %s,"
1390
                               " output: %s" %
1391
                               (node, result.fail_reason, result.output))
1392

    
1393
    # check connectivity
1394
    time.sleep(4)
1395

    
1396
    result = rpc.call_version([node])[node]
1397
    if result:
1398
      if constants.PROTOCOL_VERSION == result:
1399
        logger.Info("communication to node %s fine, sw version %s match" %
1400
                    (node, result))
1401
      else:
1402
        raise errors.OpExecError("Version mismatch master version %s,"
1403
                                 " node version %s" %
1404
                                 (constants.PROTOCOL_VERSION, result))
1405
    else:
1406
      raise errors.OpExecError("Cannot get version from the new node")
1407

    
1408
    # setup ssh on node
1409
    logger.Info("copy ssh key to node %s" % node)
1410
    keyarray = []
1411
    keyfiles = ["/etc/ssh/ssh_host_dsa_key", "/etc/ssh/ssh_host_dsa_key.pub",
1412
                "/etc/ssh/ssh_host_rsa_key", "/etc/ssh/ssh_host_rsa_key.pub",
1413
                "/root/.ssh/id_dsa", "/root/.ssh/id_dsa.pub"]
1414

    
1415
    for i in keyfiles:
1416
      f = open(i, 'r')
1417
      try:
1418
        keyarray.append(f.read())
1419
      finally:
1420
        f.close()
1421

    
1422
    result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1423
                               keyarray[3], keyarray[4], keyarray[5])
1424

    
1425
    if not result:
1426
      raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1427

    
1428
    # Add node to our /etc/hosts, and add key to known_hosts
1429
    _UpdateEtcHosts(new_node.name, new_node.primary_ip)
1430
    _UpdateKnownHosts(new_node.name, new_node.primary_ip,
1431
                      self.cfg.GetHostKey())
1432

    
1433
    if new_node.secondary_ip != new_node.primary_ip:
1434
      result = ssh.SSHCall(node, "root",
1435
                           "fping -S 127.0.0.1 -q %s" % new_node.secondary_ip)
1436
      if result.failed:
1437
        raise errors.OpExecError("Node claims it doesn't have the"
1438
                                 " secondary ip you gave (%s).\n"
1439
                                 "Please fix and re-run this command." %
1440
                                 new_node.secondary_ip)
1441

    
1442
    success, msg = ssh.VerifyNodeHostname(node)
1443
    if not success:
1444
      raise errors.OpExecError("Node '%s' claims it has a different hostname"
1445
                               " than the one the resolver gives: %s.\n"
1446
                               "Please fix and re-run this command." %
1447
                               (node, msg))
1448

    
1449
    # Distribute updated /etc/hosts and known_hosts to all nodes,
1450
    # including the node just added
1451
    myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1452
    dist_nodes = self.cfg.GetNodeList() + [node]
1453
    if myself.name in dist_nodes:
1454
      dist_nodes.remove(myself.name)
1455

    
1456
    logger.Debug("Copying hosts and known_hosts to all nodes")
1457
    for fname in ("/etc/hosts", constants.SSH_KNOWN_HOSTS_FILE):
1458
      result = rpc.call_upload_file(dist_nodes, fname)
1459
      for to_node in dist_nodes:
1460
        if not result[to_node]:
1461
          logger.Error("copy of file %s to node %s failed" %
1462
                       (fname, to_node))
1463

    
1464
    to_copy = ss.GetFileList()
1465
    for fname in to_copy:
1466
      if not ssh.CopyFileToNode(node, fname):
1467
        logger.Error("could not copy file %s to node %s" % (fname, node))
1468

    
1469
    logger.Info("adding node %s to cluster.conf" % node)
1470
    self.cfg.AddNode(new_node)
1471

    
1472

    
1473
class LUMasterFailover(LogicalUnit):
1474
  """Failover the master node to the current node.
1475

1476
  This is a special LU in that it must run on a non-master node.
1477

1478
  """
1479
  HPATH = "master-failover"
1480
  HTYPE = constants.HTYPE_CLUSTER
1481
  REQ_MASTER = False
1482
  _OP_REQP = []
1483

    
1484
  def BuildHooksEnv(self):
1485
    """Build hooks env.
1486

1487
    This will run on the new master only in the pre phase, and on all
1488
    the nodes in the post phase.
1489

1490
    """
1491
    env = {
1492
      "NEW_MASTER": self.new_master,
1493
      "OLD_MASTER": self.old_master,
1494
      }
1495
    return env, [self.new_master], self.cfg.GetNodeList()
1496

    
1497
  def CheckPrereq(self):
1498
    """Check prerequisites.
1499

1500
    This checks that we are not already the master.
1501

1502
    """
1503
    self.new_master = socket.gethostname()
1504

    
1505
    self.old_master = self.sstore.GetMasterNode()
1506

    
1507
    if self.old_master == self.new_master:
1508
      raise errors.OpPrereqError("This commands must be run on the node"
1509
                                 " where you want the new master to be.\n"
1510
                                 "%s is already the master" %
1511
                                 self.old_master)
1512

    
1513
  def Exec(self, feedback_fn):
1514
    """Failover the master node.
1515

1516
    This command, when run on a non-master node, will cause the current
1517
    master to cease being master, and the non-master to become new
1518
    master.
1519

1520
    """
1521
    #TODO: do not rely on gethostname returning the FQDN
1522
    logger.Info("setting master to %s, old master: %s" %
1523
                (self.new_master, self.old_master))
1524

    
1525
    if not rpc.call_node_stop_master(self.old_master):
1526
      logger.Error("could disable the master role on the old master"
1527
                   " %s, please disable manually" % self.old_master)
1528

    
1529
    ss = self.sstore
1530
    ss.SetKey(ss.SS_MASTER_NODE, self.new_master)
1531
    if not rpc.call_upload_file(self.cfg.GetNodeList(),
1532
                                ss.KeyToFilename(ss.SS_MASTER_NODE)):
1533
      logger.Error("could not distribute the new simple store master file"
1534
                   " to the other nodes, please check.")
1535

    
1536
    if not rpc.call_node_start_master(self.new_master):
1537
      logger.Error("could not start the master role on the new master"
1538
                   " %s, please check" % self.new_master)
1539
      feedback_fn("Error in activating the master IP on the new master,\n"
1540
                  "please fix manually.")
1541

    
1542

    
1543

    
1544
class LUQueryClusterInfo(NoHooksLU):
1545
  """Query cluster configuration.
1546

1547
  """
1548
  _OP_REQP = []
1549
  REQ_MASTER = False
1550

    
1551
  def CheckPrereq(self):
1552
    """No prerequsites needed for this LU.
1553

1554
    """
1555
    pass
1556

    
1557
  def Exec(self, feedback_fn):
1558
    """Return cluster config.
1559

1560
    """
1561
    result = {
1562
      "name": self.sstore.GetClusterName(),
1563
      "software_version": constants.RELEASE_VERSION,
1564
      "protocol_version": constants.PROTOCOL_VERSION,
1565
      "config_version": constants.CONFIG_VERSION,
1566
      "os_api_version": constants.OS_API_VERSION,
1567
      "export_version": constants.EXPORT_VERSION,
1568
      "master": self.sstore.GetMasterNode(),
1569
      "architecture": (platform.architecture()[0], platform.machine()),
1570
      }
1571

    
1572
    return result
1573

    
1574

    
1575
class LUClusterCopyFile(NoHooksLU):
1576
  """Copy file to cluster.
1577

1578
  """
1579
  _OP_REQP = ["nodes", "filename"]
1580

    
1581
  def CheckPrereq(self):
1582
    """Check prerequisites.
1583

1584
    It should check that the named file exists and that the given list
1585
    of nodes is valid.
1586

1587
    """
1588
    if not os.path.exists(self.op.filename):
1589
      raise errors.OpPrereqError("No such filename '%s'" % self.op.filename)
1590

    
1591
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1592

    
1593
  def Exec(self, feedback_fn):
1594
    """Copy a file from master to some nodes.
1595

1596
    Args:
1597
      opts - class with options as members
1598
      args - list containing a single element, the file name
1599
    Opts used:
1600
      nodes - list containing the name of target nodes; if empty, all nodes
1601

1602
    """
1603
    filename = self.op.filename
1604

    
1605
    myname = socket.gethostname()
1606

    
1607
    for node in [node.name for node in self.nodes]:
1608
      if node == myname:
1609
        continue
1610
      if not ssh.CopyFileToNode(node, filename):
1611
        logger.Error("Copy of file %s to node %s failed" % (filename, node))
1612

    
1613

    
1614
class LUDumpClusterConfig(NoHooksLU):
1615
  """Return a text-representation of the cluster-config.
1616

1617
  """
1618
  _OP_REQP = []
1619

    
1620
  def CheckPrereq(self):
1621
    """No prerequisites.
1622

1623
    """
1624
    pass
1625

    
1626
  def Exec(self, feedback_fn):
1627
    """Dump a representation of the cluster config to the standard output.
1628

1629
    """
1630
    return self.cfg.DumpConfig()
1631

    
1632

    
1633
class LURunClusterCommand(NoHooksLU):
1634
  """Run a command on some nodes.
1635

1636
  """
1637
  _OP_REQP = ["command", "nodes"]
1638

    
1639
  def CheckPrereq(self):
1640
    """Check prerequisites.
1641

1642
    It checks that the given list of nodes is valid.
1643

1644
    """
1645
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1646

    
1647
  def Exec(self, feedback_fn):
1648
    """Run a command on some nodes.
1649

1650
    """
1651
    data = []
1652
    for node in self.nodes:
1653
      result = ssh.SSHCall(node.name, "root", self.op.command)
1654
      data.append((node.name, result.output, result.exit_code))
1655

    
1656
    return data
1657

    
1658

    
1659
class LUActivateInstanceDisks(NoHooksLU):
1660
  """Bring up an instance's disks.
1661

1662
  """
1663
  _OP_REQP = ["instance_name"]
1664

    
1665
  def CheckPrereq(self):
1666
    """Check prerequisites.
1667

1668
    This checks that the instance is in the cluster.
1669

1670
    """
1671
    instance = self.cfg.GetInstanceInfo(
1672
      self.cfg.ExpandInstanceName(self.op.instance_name))
1673
    if instance is None:
1674
      raise errors.OpPrereqError("Instance '%s' not known" %
1675
                                 self.op.instance_name)
1676
    self.instance = instance
1677

    
1678

    
1679
  def Exec(self, feedback_fn):
1680
    """Activate the disks.
1681

1682
    """
1683
    disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
1684
    if not disks_ok:
1685
      raise errors.OpExecError("Cannot activate block devices")
1686

    
1687
    return disks_info
1688

    
1689

    
1690
def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
1691
  """Prepare the block devices for an instance.
1692

1693
  This sets up the block devices on all nodes.
1694

1695
  Args:
1696
    instance: a ganeti.objects.Instance object
1697
    ignore_secondaries: if true, errors on secondary nodes won't result
1698
                        in an error return from the function
1699

1700
  Returns:
1701
    false if the operation failed
1702
    list of (host, instance_visible_name, node_visible_name) if the operation
1703
         suceeded with the mapping from node devices to instance devices
1704
  """
1705
  device_info = []
1706
  disks_ok = True
1707
  for inst_disk in instance.disks:
1708
    master_result = None
1709
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1710
      cfg.SetDiskID(node_disk, node)
1711
      is_primary = node == instance.primary_node
1712
      result = rpc.call_blockdev_assemble(node, node_disk, is_primary)
1713
      if not result:
1714
        logger.Error("could not prepare block device %s on node %s (is_pri"
1715
                     "mary=%s)" % (inst_disk.iv_name, node, is_primary))
1716
        if is_primary or not ignore_secondaries:
1717
          disks_ok = False
1718
      if is_primary:
1719
        master_result = result
1720
    device_info.append((instance.primary_node, inst_disk.iv_name,
1721
                        master_result))
1722

    
1723
  return disks_ok, device_info
1724

    
1725

    
1726
def _StartInstanceDisks(cfg, instance, force):
1727
  """Start the disks of an instance.
1728

1729
  """
1730
  disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
1731
                                           ignore_secondaries=force)
1732
  if not disks_ok:
1733
    _ShutdownInstanceDisks(instance, cfg)
1734
    if force is not None and not force:
1735
      logger.Error("If the message above refers to a secondary node,"
1736
                   " you can retry the operation using '--force'.")
1737
    raise errors.OpExecError("Disk consistency error")
1738

    
1739

    
1740
class LUDeactivateInstanceDisks(NoHooksLU):
1741
  """Shutdown an instance's disks.
1742

1743
  """
1744
  _OP_REQP = ["instance_name"]
1745

    
1746
  def CheckPrereq(self):
1747
    """Check prerequisites.
1748

1749
    This checks that the instance is in the cluster.
1750

1751
    """
1752
    instance = self.cfg.GetInstanceInfo(
1753
      self.cfg.ExpandInstanceName(self.op.instance_name))
1754
    if instance is None:
1755
      raise errors.OpPrereqError("Instance '%s' not known" %
1756
                                 self.op.instance_name)
1757
    self.instance = instance
1758

    
1759
  def Exec(self, feedback_fn):
1760
    """Deactivate the disks
1761

1762
    """
1763
    instance = self.instance
1764
    ins_l = rpc.call_instance_list([instance.primary_node])
1765
    ins_l = ins_l[instance.primary_node]
1766
    if not type(ins_l) is list:
1767
      raise errors.OpExecError("Can't contact node '%s'" %
1768
                               instance.primary_node)
1769

    
1770
    if self.instance.name in ins_l:
1771
      raise errors.OpExecError("Instance is running, can't shutdown"
1772
                               " block devices.")
1773

    
1774
    _ShutdownInstanceDisks(instance, self.cfg)
1775

    
1776

    
1777
def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
1778
  """Shutdown block devices of an instance.
1779

1780
  This does the shutdown on all nodes of the instance.
1781

1782
  If the ignore_primary is false, errors on the primary node are
1783
  ignored.
1784

1785
  """
1786
  result = True
1787
  for disk in instance.disks:
1788
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
1789
      cfg.SetDiskID(top_disk, node)
1790
      if not rpc.call_blockdev_shutdown(node, top_disk):
1791
        logger.Error("could not shutdown block device %s on node %s" %
1792
                     (disk.iv_name, node))
1793
        if not ignore_primary or node != instance.primary_node:
1794
          result = False
1795
  return result
1796

    
1797

    
1798
class LUStartupInstance(LogicalUnit):
1799
  """Starts an instance.
1800

1801
  """
1802
  HPATH = "instance-start"
1803
  HTYPE = constants.HTYPE_INSTANCE
1804
  _OP_REQP = ["instance_name", "force"]
1805

    
1806
  def BuildHooksEnv(self):
1807
    """Build hooks env.
1808

1809
    This runs on master, primary and secondary nodes of the instance.
1810

1811
    """
1812
    env = {
1813
      "FORCE": self.op.force,
1814
      }
1815
    env.update(_BuildInstanceHookEnvByObject(self.instance))
1816
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
1817
          list(self.instance.secondary_nodes))
1818
    return env, nl, nl
1819

    
1820
  def CheckPrereq(self):
1821
    """Check prerequisites.
1822

1823
    This checks that the instance is in the cluster.
1824

1825
    """
1826
    instance = self.cfg.GetInstanceInfo(
1827
      self.cfg.ExpandInstanceName(self.op.instance_name))
1828
    if instance is None:
1829
      raise errors.OpPrereqError("Instance '%s' not known" %
1830
                                 self.op.instance_name)
1831

    
1832
    # check bridges existance
1833
    brlist = [nic.bridge for nic in instance.nics]
1834
    if not rpc.call_bridges_exist(instance.primary_node, brlist):
1835
      raise errors.OpPrereqError("one or more target bridges %s does not"
1836
                                 " exist on destination node '%s'" %
1837
                                 (brlist, instance.primary_node))
1838

    
1839
    self.instance = instance
1840
    self.op.instance_name = instance.name
1841

    
1842
  def Exec(self, feedback_fn):
1843
    """Start the instance.
1844

1845
    """
1846
    instance = self.instance
1847
    force = self.op.force
1848
    extra_args = getattr(self.op, "extra_args", "")
1849

    
1850
    node_current = instance.primary_node
1851

    
1852
    nodeinfo = rpc.call_node_info([node_current], self.cfg.GetVGName())
1853
    if not nodeinfo:
1854
      raise errors.OpExecError("Could not contact node %s for infos" %
1855
                               (node_current))
1856

    
1857
    freememory = nodeinfo[node_current]['memory_free']
1858
    memory = instance.memory
1859
    if memory > freememory:
1860
      raise errors.OpExecError("Not enough memory to start instance"
1861
                               " %s on node %s"
1862
                               " needed %s MiB, available %s MiB" %
1863
                               (instance.name, node_current, memory,
1864
                                freememory))
1865

    
1866
    _StartInstanceDisks(self.cfg, instance, force)
1867

    
1868
    if not rpc.call_instance_start(node_current, instance, extra_args):
1869
      _ShutdownInstanceDisks(instance, self.cfg)
1870
      raise errors.OpExecError("Could not start instance")
1871

    
1872
    self.cfg.MarkInstanceUp(instance.name)
1873

    
1874

    
1875
class LUShutdownInstance(LogicalUnit):
1876
  """Shutdown an instance.
1877

1878
  """
1879
  HPATH = "instance-stop"
1880
  HTYPE = constants.HTYPE_INSTANCE
1881
  _OP_REQP = ["instance_name"]
1882

    
1883
  def BuildHooksEnv(self):
1884
    """Build hooks env.
1885

1886
    This runs on master, primary and secondary nodes of the instance.
1887

1888
    """
1889
    env = _BuildInstanceHookEnvByObject(self.instance)
1890
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
1891
          list(self.instance.secondary_nodes))
1892
    return env, nl, nl
1893

    
1894
  def CheckPrereq(self):
1895
    """Check prerequisites.
1896

1897
    This checks that the instance is in the cluster.
1898

1899
    """
1900
    instance = self.cfg.GetInstanceInfo(
1901
      self.cfg.ExpandInstanceName(self.op.instance_name))
1902
    if instance is None:
1903
      raise errors.OpPrereqError("Instance '%s' not known" %
1904
                                 self.op.instance_name)
1905
    self.instance = instance
1906

    
1907
  def Exec(self, feedback_fn):
1908
    """Shutdown the instance.
1909

1910
    """
1911
    instance = self.instance
1912
    node_current = instance.primary_node
1913
    if not rpc.call_instance_shutdown(node_current, instance):
1914
      logger.Error("could not shutdown instance")
1915

    
1916
    self.cfg.MarkInstanceDown(instance.name)
1917
    _ShutdownInstanceDisks(instance, self.cfg)
1918

    
1919

    
1920
class LUReinstallInstance(LogicalUnit):
1921
  """Reinstall an instance.
1922

1923
  """
1924
  HPATH = "instance-reinstall"
1925
  HTYPE = constants.HTYPE_INSTANCE
1926
  _OP_REQP = ["instance_name"]
1927

    
1928
  def BuildHooksEnv(self):
1929
    """Build hooks env.
1930

1931
    This runs on master, primary and secondary nodes of the instance.
1932

1933
    """
1934
    env = _BuildInstanceHookEnvByObject(self.instance)
1935
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
1936
          list(self.instance.secondary_nodes))
1937
    return env, nl, nl
1938

    
1939
  def CheckPrereq(self):
1940
    """Check prerequisites.
1941

1942
    This checks that the instance is in the cluster and is not running.
1943

1944
    """
1945
    instance = self.cfg.GetInstanceInfo(
1946
      self.cfg.ExpandInstanceName(self.op.instance_name))
1947
    if instance is None:
1948
      raise errors.OpPrereqError("Instance '%s' not known" %
1949
                                 self.op.instance_name)
1950
    if instance.disk_template == constants.DT_DISKLESS:
1951
      raise errors.OpPrereqError("Instance '%s' has no disks" %
1952
                                 self.op.instance_name)
1953
    if instance.status != "down":
1954
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
1955
                                 self.op.instance_name)
1956
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
1957
    if remote_info:
1958
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
1959
                                 (self.op.instance_name,
1960
                                  instance.primary_node))
1961

    
1962
    self.op.os_type = getattr(self.op, "os_type", None)
1963
    if self.op.os_type is not None:
1964
      # OS verification
1965
      pnode = self.cfg.GetNodeInfo(
1966
        self.cfg.ExpandNodeName(instance.primary_node))
1967
      if pnode is None:
1968
        raise errors.OpPrereqError("Primary node '%s' is unknown" %
1969
                                   self.op.pnode)
1970
      os_obj = rpc.call_os_get([pnode.name], self.op.os_type)[pnode.name]
1971
      if not isinstance(os_obj, objects.OS):
1972
        raise errors.OpPrereqError("OS '%s' not in supported OS list for"
1973
                                   " primary node"  % self.op.os_type)
1974

    
1975
    self.instance = instance
1976

    
1977
  def Exec(self, feedback_fn):
1978
    """Reinstall the instance.
1979

1980
    """
1981
    inst = self.instance
1982

    
1983
    if self.op.os_type is not None:
1984
      feedback_fn("Changing OS to '%s'..." % self.op.os_type)
1985
      inst.os = self.op.os_type
1986
      self.cfg.AddInstance(inst)
1987

    
1988
    _StartInstanceDisks(self.cfg, inst, None)
1989
    try:
1990
      feedback_fn("Running the instance OS create scripts...")
1991
      if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
1992
        raise errors.OpExecError("Could not install OS for instance %s "
1993
                                 "on node %s" %
1994
                                 (inst.name, inst.primary_node))
1995
    finally:
1996
      _ShutdownInstanceDisks(inst, self.cfg)
1997

    
1998

    
1999
class LURemoveInstance(LogicalUnit):
2000
  """Remove an instance.
2001

2002
  """
2003
  HPATH = "instance-remove"
2004
  HTYPE = constants.HTYPE_INSTANCE
2005
  _OP_REQP = ["instance_name"]
2006

    
2007
  def BuildHooksEnv(self):
2008
    """Build hooks env.
2009

2010
    This runs on master, primary and secondary nodes of the instance.
2011

2012
    """
2013
    env = _BuildInstanceHookEnvByObject(self.instance)
2014
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2015
          list(self.instance.secondary_nodes))
2016
    return env, nl, nl
2017

    
2018
  def CheckPrereq(self):
2019
    """Check prerequisites.
2020

2021
    This checks that the instance is in the cluster.
2022

2023
    """
2024
    instance = self.cfg.GetInstanceInfo(
2025
      self.cfg.ExpandInstanceName(self.op.instance_name))
2026
    if instance is None:
2027
      raise errors.OpPrereqError("Instance '%s' not known" %
2028
                                 self.op.instance_name)
2029
    self.instance = instance
2030

    
2031
  def Exec(self, feedback_fn):
2032
    """Remove the instance.
2033

2034
    """
2035
    instance = self.instance
2036
    logger.Info("shutting down instance %s on node %s" %
2037
                (instance.name, instance.primary_node))
2038

    
2039
    if not rpc.call_instance_shutdown(instance.primary_node, instance):
2040
      raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2041
                               (instance.name, instance.primary_node))
2042

    
2043
    logger.Info("removing block devices for instance %s" % instance.name)
2044

    
2045
    _RemoveDisks(instance, self.cfg)
2046

    
2047
    logger.Info("removing instance %s out of cluster config" % instance.name)
2048

    
2049
    self.cfg.RemoveInstance(instance.name)
2050

    
2051

    
2052
class LUQueryInstances(NoHooksLU):
2053
  """Logical unit for querying instances.
2054

2055
  """
2056
  _OP_REQP = ["output_fields"]
2057

    
2058
  def CheckPrereq(self):
2059
    """Check prerequisites.
2060

2061
    This checks that the fields required are valid output fields.
2062

2063
    """
2064
    self.dynamic_fields = frozenset(["oper_state", "oper_ram"])
2065
    _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2066
                               "admin_state", "admin_ram",
2067
                               "disk_template", "ip", "mac", "bridge",
2068
                               "sda_size", "sdb_size"],
2069
                       dynamic=self.dynamic_fields,
2070
                       selected=self.op.output_fields)
2071

    
2072
  def Exec(self, feedback_fn):
2073
    """Computes the list of nodes and their attributes.
2074

2075
    """
2076
    instance_names = utils.NiceSort(self.cfg.GetInstanceList())
2077
    instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2078
                     in instance_names]
2079

    
2080
    # begin data gathering
2081

    
2082
    nodes = frozenset([inst.primary_node for inst in instance_list])
2083

    
2084
    bad_nodes = []
2085
    if self.dynamic_fields.intersection(self.op.output_fields):
2086
      live_data = {}
2087
      node_data = rpc.call_all_instances_info(nodes)
2088
      for name in nodes:
2089
        result = node_data[name]
2090
        if result:
2091
          live_data.update(result)
2092
        elif result == False:
2093
          bad_nodes.append(name)
2094
        # else no instance is alive
2095
    else:
2096
      live_data = dict([(name, {}) for name in instance_names])
2097

    
2098
    # end data gathering
2099

    
2100
    output = []
2101
    for instance in instance_list:
2102
      iout = []
2103
      for field in self.op.output_fields:
2104
        if field == "name":
2105
          val = instance.name
2106
        elif field == "os":
2107
          val = instance.os
2108
        elif field == "pnode":
2109
          val = instance.primary_node
2110
        elif field == "snodes":
2111
          val = ",".join(instance.secondary_nodes) or "-"
2112
        elif field == "admin_state":
2113
          if instance.status == "down":
2114
            val = "no"
2115
          else:
2116
            val = "yes"
2117
        elif field == "oper_state":
2118
          if instance.primary_node in bad_nodes:
2119
            val = "(node down)"
2120
          else:
2121
            if live_data.get(instance.name):
2122
              val = "running"
2123
            else:
2124
              val = "stopped"
2125
        elif field == "admin_ram":
2126
          val = instance.memory
2127
        elif field == "oper_ram":
2128
          if instance.primary_node in bad_nodes:
2129
            val = "(node down)"
2130
          elif instance.name in live_data:
2131
            val = live_data[instance.name].get("memory", "?")
2132
          else:
2133
            val = "-"
2134
        elif field == "disk_template":
2135
          val = instance.disk_template
2136
        elif field == "ip":
2137
          val = instance.nics[0].ip
2138
        elif field == "bridge":
2139
          val = instance.nics[0].bridge
2140
        elif field == "mac":
2141
          val = instance.nics[0].mac
2142
        elif field == "sda_size" or field == "sdb_size":
2143
          disk = instance.FindDisk(field[:3])
2144
          if disk is None:
2145
            val = "N/A"
2146
          else:
2147
            val = disk.size
2148
        else:
2149
          raise errors.ParameterError(field)
2150
        val = str(val)
2151
        iout.append(val)
2152
      output.append(iout)
2153

    
2154
    return output
2155

    
2156

    
2157
class LUFailoverInstance(LogicalUnit):
2158
  """Failover an instance.
2159

2160
  """
2161
  HPATH = "instance-failover"
2162
  HTYPE = constants.HTYPE_INSTANCE
2163
  _OP_REQP = ["instance_name", "ignore_consistency"]
2164

    
2165
  def BuildHooksEnv(self):
2166
    """Build hooks env.
2167

2168
    This runs on master, primary and secondary nodes of the instance.
2169

2170
    """
2171
    env = {
2172
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2173
      }
2174
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2175
    nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2176
    return env, nl, nl
2177

    
2178
  def CheckPrereq(self):
2179
    """Check prerequisites.
2180

2181
    This checks that the instance is in the cluster.
2182

2183
    """
2184
    instance = self.cfg.GetInstanceInfo(
2185
      self.cfg.ExpandInstanceName(self.op.instance_name))
2186
    if instance is None:
2187
      raise errors.OpPrereqError("Instance '%s' not known" %
2188
                                 self.op.instance_name)
2189

    
2190
    # check memory requirements on the secondary node
2191
    target_node = instance.secondary_nodes[0]
2192
    nodeinfo = rpc.call_node_info([target_node], self.cfg.GetVGName())
2193
    info = nodeinfo.get(target_node, None)
2194
    if not info:
2195
      raise errors.OpPrereqError("Cannot get current information"
2196
                                 " from node '%s'" % nodeinfo)
2197
    if instance.memory > info['memory_free']:
2198
      raise errors.OpPrereqError("Not enough memory on target node %s."
2199
                                 " %d MB available, %d MB required" %
2200
                                 (target_node, info['memory_free'],
2201
                                  instance.memory))
2202

    
2203
    # check bridge existance
2204
    brlist = [nic.bridge for nic in instance.nics]
2205
    if not rpc.call_bridges_exist(instance.primary_node, brlist):
2206
      raise errors.OpPrereqError("One or more target bridges %s does not"
2207
                                 " exist on destination node '%s'" %
2208
                                 (brlist, instance.primary_node))
2209

    
2210
    self.instance = instance
2211

    
2212
  def Exec(self, feedback_fn):
2213
    """Failover an instance.
2214

2215
    The failover is done by shutting it down on its present node and
2216
    starting it on the secondary.
2217

2218
    """
2219
    instance = self.instance
2220

    
2221
    source_node = instance.primary_node
2222
    target_node = instance.secondary_nodes[0]
2223

    
2224
    feedback_fn("* checking disk consistency between source and target")
2225
    for dev in instance.disks:
2226
      # for remote_raid1, these are md over drbd
2227
      if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2228
        if not self.op.ignore_consistency:
2229
          raise errors.OpExecError("Disk %s is degraded on target node,"
2230
                                   " aborting failover." % dev.iv_name)
2231

    
2232
    feedback_fn("* checking target node resource availability")
2233
    nodeinfo = rpc.call_node_info([target_node], self.cfg.GetVGName())
2234

    
2235
    if not nodeinfo:
2236
      raise errors.OpExecError("Could not contact target node %s." %
2237
                               target_node)
2238

    
2239
    free_memory = int(nodeinfo[target_node]['memory_free'])
2240
    memory = instance.memory
2241
    if memory > free_memory:
2242
      raise errors.OpExecError("Not enough memory to create instance %s on"
2243
                               " node %s. needed %s MiB, available %s MiB" %
2244
                               (instance.name, target_node, memory,
2245
                                free_memory))
2246

    
2247
    feedback_fn("* shutting down instance on source node")
2248
    logger.Info("Shutting down instance %s on node %s" %
2249
                (instance.name, source_node))
2250

    
2251
    if not rpc.call_instance_shutdown(source_node, instance):
2252
      logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2253
                   " anyway. Please make sure node %s is down"  %
2254
                   (instance.name, source_node, source_node))
2255

    
2256
    feedback_fn("* deactivating the instance's disks on source node")
2257
    if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2258
      raise errors.OpExecError("Can't shut down the instance's disks.")
2259

    
2260
    instance.primary_node = target_node
2261
    # distribute new instance config to the other nodes
2262
    self.cfg.AddInstance(instance)
2263

    
2264
    feedback_fn("* activating the instance's disks on target node")
2265
    logger.Info("Starting instance %s on node %s" %
2266
                (instance.name, target_node))
2267

    
2268
    disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2269
                                             ignore_secondaries=True)
2270
    if not disks_ok:
2271
      _ShutdownInstanceDisks(instance, self.cfg)
2272
      raise errors.OpExecError("Can't activate the instance's disks")
2273

    
2274
    feedback_fn("* starting the instance on the target node")
2275
    if not rpc.call_instance_start(target_node, instance, None):
2276
      _ShutdownInstanceDisks(instance, self.cfg)
2277
      raise errors.OpExecError("Could not start instance %s on node %s." %
2278
                               (instance.name, target_node))
2279

    
2280

    
2281
def _CreateBlockDevOnPrimary(cfg, node, device, info):
2282
  """Create a tree of block devices on the primary node.
2283

2284
  This always creates all devices.
2285

2286
  """
2287
  if device.children:
2288
    for child in device.children:
2289
      if not _CreateBlockDevOnPrimary(cfg, node, child, info):
2290
        return False
2291

    
2292
  cfg.SetDiskID(device, node)
2293
  new_id = rpc.call_blockdev_create(node, device, device.size, True, info)
2294
  if not new_id:
2295
    return False
2296
  if device.physical_id is None:
2297
    device.physical_id = new_id
2298
  return True
2299

    
2300

    
2301
def _CreateBlockDevOnSecondary(cfg, node, device, force, info):
2302
  """Create a tree of block devices on a secondary node.
2303

2304
  If this device type has to be created on secondaries, create it and
2305
  all its children.
2306

2307
  If not, just recurse to children keeping the same 'force' value.
2308

2309
  """
2310
  if device.CreateOnSecondary():
2311
    force = True
2312
  if device.children:
2313
    for child in device.children:
2314
      if not _CreateBlockDevOnSecondary(cfg, node, child, force, info):
2315
        return False
2316

    
2317
  if not force:
2318
    return True
2319
  cfg.SetDiskID(device, node)
2320
  new_id = rpc.call_blockdev_create(node, device, device.size, False, info)
2321
  if not new_id:
2322
    return False
2323
  if device.physical_id is None:
2324
    device.physical_id = new_id
2325
  return True
2326

    
2327

    
2328
def _GenerateUniqueNames(cfg, exts):
2329
  """Generate a suitable LV name.
2330

2331
  This will generate a logical volume name for the given instance.
2332

2333
  """
2334
  results = []
2335
  for val in exts:
2336
    new_id = cfg.GenerateUniqueID()
2337
    results.append("%s%s" % (new_id, val))
2338
  return results
2339

    
2340

    
2341
def _GenerateMDDRBDBranch(cfg, primary, secondary, size, names):
2342
  """Generate a drbd device complete with its children.
2343

2344
  """
2345
  port = cfg.AllocatePort()
2346
  vgname = cfg.GetVGName()
2347
  dev_data = objects.Disk(dev_type="lvm", size=size,
2348
                          logical_id=(vgname, names[0]))
2349
  dev_meta = objects.Disk(dev_type="lvm", size=128,
2350
                          logical_id=(vgname, names[1]))
2351
  drbd_dev = objects.Disk(dev_type="drbd", size=size,
2352
                          logical_id = (primary, secondary, port),
2353
                          children = [dev_data, dev_meta])
2354
  return drbd_dev
2355

    
2356

    
2357
def _GenerateDiskTemplate(cfg, template_name,
2358
                          instance_name, primary_node,
2359
                          secondary_nodes, disk_sz, swap_sz):
2360
  """Generate the entire disk layout for a given template type.
2361

2362
  """
2363
  #TODO: compute space requirements
2364

    
2365
  vgname = cfg.GetVGName()
2366
  if template_name == "diskless":
2367
    disks = []
2368
  elif template_name == "plain":
2369
    if len(secondary_nodes) != 0:
2370
      raise errors.ProgrammerError("Wrong template configuration")
2371

    
2372
    names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
2373
    sda_dev = objects.Disk(dev_type="lvm", size=disk_sz,
2374
                           logical_id=(vgname, names[0]),
2375
                           iv_name = "sda")
2376
    sdb_dev = objects.Disk(dev_type="lvm", size=swap_sz,
2377
                           logical_id=(vgname, names[1]),
2378
                           iv_name = "sdb")
2379
    disks = [sda_dev, sdb_dev]
2380
  elif template_name == "local_raid1":
2381
    if len(secondary_nodes) != 0:
2382
      raise errors.ProgrammerError("Wrong template configuration")
2383

    
2384

    
2385
    names = _GenerateUniqueNames(cfg, [".sda_m1", ".sda_m2",
2386
                                       ".sdb_m1", ".sdb_m2"])
2387
    sda_dev_m1 = objects.Disk(dev_type="lvm", size=disk_sz,
2388
                              logical_id=(vgname, names[0]))
2389
    sda_dev_m2 = objects.Disk(dev_type="lvm", size=disk_sz,
2390
                              logical_id=(vgname, names[1]))
2391
    md_sda_dev = objects.Disk(dev_type="md_raid1", iv_name = "sda",
2392
                              size=disk_sz,
2393
                              children = [sda_dev_m1, sda_dev_m2])
2394
    sdb_dev_m1 = objects.Disk(dev_type="lvm", size=swap_sz,
2395
                              logical_id=(vgname, names[2]))
2396
    sdb_dev_m2 = objects.Disk(dev_type="lvm", size=swap_sz,
2397
                              logical_id=(vgname, names[3]))
2398
    md_sdb_dev = objects.Disk(dev_type="md_raid1", iv_name = "sdb",
2399
                              size=swap_sz,
2400
                              children = [sdb_dev_m1, sdb_dev_m2])
2401
    disks = [md_sda_dev, md_sdb_dev]
2402
  elif template_name == "remote_raid1":
2403
    if len(secondary_nodes) != 1:
2404
      raise errors.ProgrammerError("Wrong template configuration")
2405
    remote_node = secondary_nodes[0]
2406
    names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2407
                                       ".sdb_data", ".sdb_meta"])
2408
    drbd_sda_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2409
                                         disk_sz, names[0:2])
2410
    md_sda_dev = objects.Disk(dev_type="md_raid1", iv_name="sda",
2411
                              children = [drbd_sda_dev], size=disk_sz)
2412
    drbd_sdb_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2413
                                         swap_sz, names[2:4])
2414
    md_sdb_dev = objects.Disk(dev_type="md_raid1", iv_name="sdb",
2415
                              children = [drbd_sdb_dev], size=swap_sz)
2416
    disks = [md_sda_dev, md_sdb_dev]
2417
  else:
2418
    raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
2419
  return disks
2420

    
2421

    
2422
def _GetInstanceInfoText(instance):
2423
  """Compute that text that should be added to the disk's metadata.
2424

2425
  """
2426
  return "originstname+%s" % instance.name
2427

    
2428

    
2429
def _CreateDisks(cfg, instance):
2430
  """Create all disks for an instance.
2431

2432
  This abstracts away some work from AddInstance.
2433

2434
  Args:
2435
    instance: the instance object
2436

2437
  Returns:
2438
    True or False showing the success of the creation process
2439

2440
  """
2441
  info = _GetInstanceInfoText(instance)
2442

    
2443
  for device in instance.disks:
2444
    logger.Info("creating volume %s for instance %s" %
2445
              (device.iv_name, instance.name))
2446
    #HARDCODE
2447
    for secondary_node in instance.secondary_nodes:
2448
      if not _CreateBlockDevOnSecondary(cfg, secondary_node, device, False,
2449
                                        info):
2450
        logger.Error("failed to create volume %s (%s) on secondary node %s!" %
2451
                     (device.iv_name, device, secondary_node))
2452
        return False
2453
    #HARDCODE
2454
    if not _CreateBlockDevOnPrimary(cfg, instance.primary_node, device, info):
2455
      logger.Error("failed to create volume %s on primary!" %
2456
                   device.iv_name)
2457
      return False
2458
  return True
2459

    
2460

    
2461
def _RemoveDisks(instance, cfg):
2462
  """Remove all disks for an instance.
2463

2464
  This abstracts away some work from `AddInstance()` and
2465
  `RemoveInstance()`. Note that in case some of the devices couldn't
2466
  be remove, the removal will continue with the other ones (compare
2467
  with `_CreateDisks()`).
2468

2469
  Args:
2470
    instance: the instance object
2471

2472
  Returns:
2473
    True or False showing the success of the removal proces
2474

2475
  """
2476
  logger.Info("removing block devices for instance %s" % instance.name)
2477

    
2478
  result = True
2479
  for device in instance.disks:
2480
    for node, disk in device.ComputeNodeTree(instance.primary_node):
2481
      cfg.SetDiskID(disk, node)
2482
      if not rpc.call_blockdev_remove(node, disk):
2483
        logger.Error("could not remove block device %s on node %s,"
2484
                     " continuing anyway" %
2485
                     (device.iv_name, node))
2486
        result = False
2487
  return result
2488

    
2489

    
2490
class LUCreateInstance(LogicalUnit):
2491
  """Create an instance.
2492

2493
  """
2494
  HPATH = "instance-add"
2495
  HTYPE = constants.HTYPE_INSTANCE
2496
  _OP_REQP = ["instance_name", "mem_size", "disk_size", "pnode",
2497
              "disk_template", "swap_size", "mode", "start", "vcpus",
2498
              "wait_for_sync"]
2499

    
2500
  def BuildHooksEnv(self):
2501
    """Build hooks env.
2502

2503
    This runs on master, primary and secondary nodes of the instance.
2504

2505
    """
2506
    env = {
2507
      "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
2508
      "INSTANCE_DISK_SIZE": self.op.disk_size,
2509
      "INSTANCE_SWAP_SIZE": self.op.swap_size,
2510
      "INSTANCE_ADD_MODE": self.op.mode,
2511
      }
2512
    if self.op.mode == constants.INSTANCE_IMPORT:
2513
      env["INSTANCE_SRC_NODE"] = self.op.src_node
2514
      env["INSTANCE_SRC_PATH"] = self.op.src_path
2515
      env["INSTANCE_SRC_IMAGE"] = self.src_image
2516

    
2517
    env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
2518
      primary_node=self.op.pnode,
2519
      secondary_nodes=self.secondaries,
2520
      status=self.instance_status,
2521
      os_type=self.op.os_type,
2522
      memory=self.op.mem_size,
2523
      vcpus=self.op.vcpus,
2524
      nics=[(self.inst_ip, self.op.bridge)],
2525
    ))
2526

    
2527
    nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
2528
          self.secondaries)
2529
    return env, nl, nl
2530

    
2531

    
2532
  def CheckPrereq(self):
2533
    """Check prerequisites.
2534

2535
    """
2536
    if self.op.mode not in (constants.INSTANCE_CREATE,
2537
                            constants.INSTANCE_IMPORT):
2538
      raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
2539
                                 self.op.mode)
2540

    
2541
    if self.op.mode == constants.INSTANCE_IMPORT:
2542
      src_node = getattr(self.op, "src_node", None)
2543
      src_path = getattr(self.op, "src_path", None)
2544
      if src_node is None or src_path is None:
2545
        raise errors.OpPrereqError("Importing an instance requires source"
2546
                                   " node and path options")
2547
      src_node_full = self.cfg.ExpandNodeName(src_node)
2548
      if src_node_full is None:
2549
        raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
2550
      self.op.src_node = src_node = src_node_full
2551

    
2552
      if not os.path.isabs(src_path):
2553
        raise errors.OpPrereqError("The source path must be absolute")
2554

    
2555
      export_info = rpc.call_export_info(src_node, src_path)
2556

    
2557
      if not export_info:
2558
        raise errors.OpPrereqError("No export found in dir %s" % src_path)
2559

    
2560
      if not export_info.has_section(constants.INISECT_EXP):
2561
        raise errors.ProgrammerError("Corrupted export config")
2562

    
2563
      ei_version = export_info.get(constants.INISECT_EXP, 'version')
2564
      if (int(ei_version) != constants.EXPORT_VERSION):
2565
        raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
2566
                                   (ei_version, constants.EXPORT_VERSION))
2567

    
2568
      if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
2569
        raise errors.OpPrereqError("Can't import instance with more than"
2570
                                   " one data disk")
2571

    
2572
      # FIXME: are the old os-es, disk sizes, etc. useful?
2573
      self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
2574
      diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
2575
                                                         'disk0_dump'))
2576
      self.src_image = diskimage
2577
    else: # INSTANCE_CREATE
2578
      if getattr(self.op, "os_type", None) is None:
2579
        raise errors.OpPrereqError("No guest OS specified")
2580

    
2581
    # check primary node
2582
    pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
2583
    if pnode is None:
2584
      raise errors.OpPrereqError("Primary node '%s' is unknown" %
2585
                                 self.op.pnode)
2586
    self.op.pnode = pnode.name
2587
    self.pnode = pnode
2588
    self.secondaries = []
2589
    # disk template and mirror node verification
2590
    if self.op.disk_template not in constants.DISK_TEMPLATES:
2591
      raise errors.OpPrereqError("Invalid disk template name")
2592

    
2593
    if self.op.disk_template == constants.DT_REMOTE_RAID1:
2594
      if getattr(self.op, "snode", None) is None:
2595
        raise errors.OpPrereqError("The 'remote_raid1' disk template needs"
2596
                                   " a mirror node")
2597

    
2598
      snode_name = self.cfg.ExpandNodeName(self.op.snode)
2599
      if snode_name is None:
2600
        raise errors.OpPrereqError("Unknown secondary node '%s'" %
2601
                                   self.op.snode)
2602
      elif snode_name == pnode.name:
2603
        raise errors.OpPrereqError("The secondary node cannot be"
2604
                                   " the primary node.")
2605
      self.secondaries.append(snode_name)
2606

    
2607
    # Check lv size requirements
2608
    nodenames = [pnode.name] + self.secondaries
2609
    nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
2610

    
2611
    # Required free disk space as a function of disk and swap space
2612
    req_size_dict = {
2613
      constants.DT_DISKLESS: 0,
2614
      constants.DT_PLAIN: self.op.disk_size + self.op.swap_size,
2615
      constants.DT_LOCAL_RAID1: (self.op.disk_size + self.op.swap_size) * 2,
2616
      # 256 MB are added for drbd metadata, 128MB for each drbd device
2617
      constants.DT_REMOTE_RAID1: self.op.disk_size + self.op.swap_size + 256,
2618
    }
2619

    
2620
    if self.op.disk_template not in req_size_dict:
2621
      raise errors.ProgrammerError("Disk template '%s' size requirement"
2622
                                   " is unknown" %  self.op.disk_template)
2623

    
2624
    req_size = req_size_dict[self.op.disk_template]
2625

    
2626
    for node in nodenames:
2627
      info = nodeinfo.get(node, None)
2628
      if not info:
2629
        raise errors.OpPrereqError("Cannot get current information"
2630
                                   " from node '%s'" % nodeinfo)
2631
      if req_size > info['vg_free']:
2632
        raise errors.OpPrereqError("Not enough disk space on target node %s."
2633
                                   " %d MB available, %d MB required" %
2634
                                   (node, info['vg_free'], req_size))
2635

    
2636
    # os verification
2637
    os_obj = rpc.call_os_get([pnode.name], self.op.os_type)[pnode.name]
2638
    if not isinstance(os_obj, objects.OS):
2639
      raise errors.OpPrereqError("OS '%s' not in supported os list for"
2640
                                 " primary node"  % self.op.os_type)
2641

    
2642
    # instance verification
2643
    hostname1 = utils.LookupHostname(self.op.instance_name)
2644
    if not hostname1:
2645
      raise errors.OpPrereqError("Instance name '%s' not found in dns" %
2646
                                 self.op.instance_name)
2647

    
2648
    self.op.instance_name = instance_name = hostname1['hostname']
2649
    instance_list = self.cfg.GetInstanceList()
2650
    if instance_name in instance_list:
2651
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2652
                                 instance_name)
2653

    
2654
    ip = getattr(self.op, "ip", None)
2655
    if ip is None or ip.lower() == "none":
2656
      inst_ip = None
2657
    elif ip.lower() == "auto":
2658
      inst_ip = hostname1['ip']
2659
    else:
2660
      if not utils.IsValidIP(ip):
2661
        raise errors.OpPrereqError("given IP address '%s' doesn't look"
2662
                                   " like a valid IP" % ip)
2663
      inst_ip = ip
2664
    self.inst_ip = inst_ip
2665

    
2666
    command = ["fping", "-q", hostname1['ip']]
2667
    result = utils.RunCmd(command)
2668
    if not result.failed:
2669
      raise errors.OpPrereqError("IP %s of instance %s already in use" %
2670
                                 (hostname1['ip'], instance_name))
2671

    
2672
    # bridge verification
2673
    bridge = getattr(self.op, "bridge", None)
2674
    if bridge is None:
2675
      self.op.bridge = self.cfg.GetDefBridge()
2676
    else:
2677
      self.op.bridge = bridge
2678

    
2679
    if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
2680
      raise errors.OpPrereqError("target bridge '%s' does not exist on"
2681
                                 " destination node '%s'" %
2682
                                 (self.op.bridge, pnode.name))
2683

    
2684
    if self.op.start:
2685
      self.instance_status = 'up'
2686
    else:
2687
      self.instance_status = 'down'
2688

    
2689
  def Exec(self, feedback_fn):
2690
    """Create and add the instance to the cluster.
2691

2692
    """
2693
    instance = self.op.instance_name
2694
    pnode_name = self.pnode.name
2695

    
2696
    nic = objects.NIC(bridge=self.op.bridge, mac=self.cfg.GenerateMAC())
2697
    if self.inst_ip is not None:
2698
      nic.ip = self.inst_ip
2699

    
2700
    disks = _GenerateDiskTemplate(self.cfg,
2701
                                  self.op.disk_template,
2702
                                  instance, pnode_name,
2703
                                  self.secondaries, self.op.disk_size,
2704
                                  self.op.swap_size)
2705

    
2706
    iobj = objects.Instance(name=instance, os=self.op.os_type,
2707
                            primary_node=pnode_name,
2708
                            memory=self.op.mem_size,
2709
                            vcpus=self.op.vcpus,
2710
                            nics=[nic], disks=disks,
2711
                            disk_template=self.op.disk_template,
2712
                            status=self.instance_status,
2713
                            )
2714

    
2715
    feedback_fn("* creating instance disks...")
2716
    if not _CreateDisks(self.cfg, iobj):
2717
      _RemoveDisks(iobj, self.cfg)
2718
      raise errors.OpExecError("Device creation failed, reverting...")
2719

    
2720
    feedback_fn("adding instance %s to cluster config" % instance)
2721

    
2722
    self.cfg.AddInstance(iobj)
2723

    
2724
    if self.op.wait_for_sync:
2725
      disk_abort = not _WaitForSync(self.cfg, iobj)
2726
    elif iobj.disk_template == "remote_raid1":
2727
      # make sure the disks are not degraded (still sync-ing is ok)
2728
      time.sleep(15)
2729
      feedback_fn("* checking mirrors status")
2730
      disk_abort = not _WaitForSync(self.cfg, iobj, oneshot=True)
2731
    else:
2732
      disk_abort = False
2733

    
2734
    if disk_abort:
2735
      _RemoveDisks(iobj, self.cfg)
2736
      self.cfg.RemoveInstance(iobj.name)
2737
      raise errors.OpExecError("There are some degraded disks for"
2738
                               " this instance")
2739

    
2740
    feedback_fn("creating os for instance %s on node %s" %
2741
                (instance, pnode_name))
2742

    
2743
    if iobj.disk_template != constants.DT_DISKLESS:
2744
      if self.op.mode == constants.INSTANCE_CREATE:
2745
        feedback_fn("* running the instance OS create scripts...")
2746
        if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
2747
          raise errors.OpExecError("could not add os for instance %s"
2748
                                   " on node %s" %
2749
                                   (instance, pnode_name))
2750

    
2751
      elif self.op.mode == constants.INSTANCE_IMPORT:
2752
        feedback_fn("* running the instance OS import scripts...")
2753
        src_node = self.op.src_node
2754
        src_image = self.src_image
2755
        if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
2756
                                                src_node, src_image):
2757
          raise errors.OpExecError("Could not import os for instance"
2758
                                   " %s on node %s" %
2759
                                   (instance, pnode_name))
2760
      else:
2761
        # also checked in the prereq part
2762
        raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
2763
                                     % self.op.mode)
2764

    
2765
    if self.op.start:
2766
      logger.Info("starting instance %s on node %s" % (instance, pnode_name))
2767
      feedback_fn("* starting instance...")
2768
      if not rpc.call_instance_start(pnode_name, iobj, None):
2769
        raise errors.OpExecError("Could not start instance")
2770

    
2771

    
2772
class LUConnectConsole(NoHooksLU):
2773
  """Connect to an instance's console.
2774

2775
  This is somewhat special in that it returns the command line that
2776
  you need to run on the master node in order to connect to the
2777
  console.
2778

2779
  """
2780
  _OP_REQP = ["instance_name"]
2781

    
2782
  def CheckPrereq(self):
2783
    """Check prerequisites.
2784

2785
    This checks that the instance is in the cluster.
2786

2787
    """
2788
    instance = self.cfg.GetInstanceInfo(
2789
      self.cfg.ExpandInstanceName(self.op.instance_name))
2790
    if instance is None:
2791
      raise errors.OpPrereqError("Instance '%s' not known" %
2792
                                 self.op.instance_name)
2793
    self.instance = instance
2794

    
2795
  def Exec(self, feedback_fn):
2796
    """Connect to the console of an instance
2797

2798
    """
2799
    instance = self.instance
2800
    node = instance.primary_node
2801

    
2802
    node_insts = rpc.call_instance_list([node])[node]
2803
    if node_insts is False:
2804
      raise errors.OpExecError("Can't connect to node %s." % node)
2805

    
2806
    if instance.name not in node_insts:
2807
      raise errors.OpExecError("Instance %s is not running." % instance.name)
2808

    
2809
    logger.Debug("connecting to console of %s on %s" % (instance.name, node))
2810

    
2811
    hyper = hypervisor.GetHypervisor()
2812
    console_cmd = hyper.GetShellCommandForConsole(instance.name)
2813
    # build ssh cmdline
2814
    argv = ["ssh", "-q", "-t"]
2815
    argv.extend(ssh.KNOWN_HOSTS_OPTS)
2816
    argv.extend(ssh.BATCH_MODE_OPTS)
2817
    argv.append(node)
2818
    argv.append(console_cmd)
2819
    return "ssh", argv
2820

    
2821

    
2822
class LUAddMDDRBDComponent(LogicalUnit):
2823
  """Adda new mirror member to an instance's disk.
2824

2825
  """
2826
  HPATH = "mirror-add"
2827
  HTYPE = constants.HTYPE_INSTANCE
2828
  _OP_REQP = ["instance_name", "remote_node", "disk_name"]
2829

    
2830
  def BuildHooksEnv(self):
2831
    """Build hooks env.
2832

2833
    This runs on the master, the primary and all the secondaries.
2834

2835
    """
2836
    env = {
2837
      "NEW_SECONDARY": self.op.remote_node,
2838
      "DISK_NAME": self.op.disk_name,
2839
      }
2840
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2841
    nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
2842
          self.op.remote_node,] + list(self.instance.secondary_nodes)
2843
    return env, nl, nl
2844

    
2845
  def CheckPrereq(self):
2846
    """Check prerequisites.
2847

2848
    This checks that the instance is in the cluster.
2849

2850
    """
2851
    instance = self.cfg.GetInstanceInfo(
2852
      self.cfg.ExpandInstanceName(self.op.instance_name))
2853
    if instance is None:
2854
      raise errors.OpPrereqError("Instance '%s' not known" %
2855
                                 self.op.instance_name)
2856
    self.instance = instance
2857

    
2858
    remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
2859
    if remote_node is None:
2860
      raise errors.OpPrereqError("Node '%s' not known" % self.op.remote_node)
2861
    self.remote_node = remote_node
2862

    
2863
    if remote_node == instance.primary_node:
2864
      raise errors.OpPrereqError("The specified node is the primary node of"
2865
                                 " the instance.")
2866

    
2867
    if instance.disk_template != constants.DT_REMOTE_RAID1:
2868
      raise errors.OpPrereqError("Instance's disk layout is not"
2869
                                 " remote_raid1.")
2870
    for disk in instance.disks:
2871
      if disk.iv_name == self.op.disk_name:
2872
        break
2873
    else:
2874
      raise errors.OpPrereqError("Can't find this device ('%s') in the"
2875
                                 " instance." % self.op.disk_name)
2876
    if len(disk.children) > 1:
2877
      raise errors.OpPrereqError("The device already has two slave"
2878
                                 " devices.\n"
2879
                                 "This would create a 3-disk raid1"
2880
                                 " which we don't allow.")
2881
    self.disk = disk
2882

    
2883
  def Exec(self, feedback_fn):
2884
    """Add the mirror component
2885

2886
    """
2887
    disk = self.disk
2888
    instance = self.instance
2889

    
2890
    remote_node = self.remote_node
2891
    lv_names = [".%s_%s" % (disk.iv_name, suf) for suf in ["data", "meta"]]
2892
    names = _GenerateUniqueNames(self.cfg, lv_names)
2893
    new_drbd = _GenerateMDDRBDBranch(self.cfg, instance.primary_node,
2894
                                     remote_node, disk.size, names)
2895

    
2896
    logger.Info("adding new mirror component on secondary")
2897
    #HARDCODE
2898
    if not _CreateBlockDevOnSecondary(self.cfg, remote_node, new_drbd, False,
2899
                                      _GetInstanceInfoText(instance)):
2900
      raise errors.OpExecError("Failed to create new component on secondary"
2901
                               " node %s" % remote_node)
2902

    
2903
    logger.Info("adding new mirror component on primary")
2904
    #HARDCODE
2905
    if not _CreateBlockDevOnPrimary(self.cfg, instance.primary_node, new_drbd,
2906
                                    _GetInstanceInfoText(instance)):
2907
      # remove secondary dev
2908
      self.cfg.SetDiskID(new_drbd, remote_node)
2909
      rpc.call_blockdev_remove(remote_node, new_drbd)
2910
      raise errors.OpExecError("Failed to create volume on primary")
2911

    
2912
    # the device exists now
2913
    # call the primary node to add the mirror to md
2914
    logger.Info("adding new mirror component to md")
2915
    if not rpc.call_blockdev_addchild(instance.primary_node,
2916
                                           disk, new_drbd):
2917
      logger.Error("Can't add mirror compoment to md!")
2918
      self.cfg.SetDiskID(new_drbd, remote_node)
2919
      if not rpc.call_blockdev_remove(remote_node, new_drbd):
2920
        logger.Error("Can't rollback on secondary")
2921
      self.cfg.SetDiskID(new_drbd, instance.primary_node)
2922
      if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
2923
        logger.Error("Can't rollback on primary")
2924
      raise errors.OpExecError("Can't add mirror component to md array")
2925

    
2926
    disk.children.append(new_drbd)
2927

    
2928
    self.cfg.AddInstance(instance)
2929

    
2930
    _WaitForSync(self.cfg, instance)
2931

    
2932
    return 0
2933

    
2934

    
2935
class LURemoveMDDRBDComponent(LogicalUnit):
2936
  """Remove a component from a remote_raid1 disk.
2937

2938
  """
2939
  HPATH = "mirror-remove"
2940
  HTYPE = constants.HTYPE_INSTANCE
2941
  _OP_REQP = ["instance_name", "disk_name", "disk_id"]
2942

    
2943
  def BuildHooksEnv(self):
2944
    """Build hooks env.
2945

2946
    This runs on the master, the primary and all the secondaries.
2947

2948
    """
2949
    env = {
2950
      "DISK_NAME": self.op.disk_name,
2951
      "DISK_ID": self.op.disk_id,
2952
      "OLD_SECONDARY": self.old_secondary,
2953
      }
2954
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2955
    nl = [self.sstore.GetMasterNode(),
2956
          self.instance.primary_node] + list(self.instance.secondary_nodes)
2957
    return env, nl, nl
2958

    
2959
  def CheckPrereq(self):
2960
    """Check prerequisites.
2961

2962
    This checks that the instance is in the cluster.
2963

2964
    """
2965
    instance = self.cfg.GetInstanceInfo(
2966
      self.cfg.ExpandInstanceName(self.op.instance_name))
2967
    if instance is None:
2968
      raise errors.OpPrereqError("Instance '%s' not known" %
2969
                                 self.op.instance_name)
2970
    self.instance = instance
2971

    
2972
    if instance.disk_template != constants.DT_REMOTE_RAID1:
2973
      raise errors.OpPrereqError("Instance's disk layout is not"
2974
                                 " remote_raid1.")
2975
    for disk in instance.disks:
2976
      if disk.iv_name == self.op.disk_name:
2977
        break
2978
    else:
2979
      raise errors.OpPrereqError("Can't find this device ('%s') in the"
2980
                                 " instance." % self.op.disk_name)
2981
    for child in disk.children:
2982
      if child.dev_type == "drbd" and child.logical_id[2] == self.op.disk_id:
2983
        break
2984
    else:
2985
      raise errors.OpPrereqError("Can't find the device with this port.")
2986

    
2987
    if len(disk.children) < 2:
2988
      raise errors.OpPrereqError("Cannot remove the last component from"
2989
                                 " a mirror.")
2990
    self.disk = disk
2991
    self.child = child
2992
    if self.child.logical_id[0] == instance.primary_node:
2993
      oid = 1
2994
    else:
2995
      oid = 0
2996
    self.old_secondary = self.child.logical_id[oid]
2997

    
2998
  def Exec(self, feedback_fn):
2999
    """Remove the mirror component
3000

3001
    """
3002
    instance = self.instance
3003
    disk = self.disk
3004
    child = self.child
3005
    logger.Info("remove mirror component")
3006
    self.cfg.SetDiskID(disk, instance.primary_node)
3007
    if not rpc.call_blockdev_removechild(instance.primary_node,
3008
                                              disk, child):
3009
      raise errors.OpExecError("Can't remove child from mirror.")
3010

    
3011
    for node in child.logical_id[:2]:
3012
      self.cfg.SetDiskID(child, node)
3013
      if not rpc.call_blockdev_remove(node, child):
3014
        logger.Error("Warning: failed to remove device from node %s,"
3015
                     " continuing operation." % node)
3016

    
3017
    disk.children.remove(child)
3018
    self.cfg.AddInstance(instance)
3019

    
3020

    
3021
class LUReplaceDisks(LogicalUnit):
3022
  """Replace the disks of an instance.
3023

3024
  """
3025
  HPATH = "mirrors-replace"
3026
  HTYPE = constants.HTYPE_INSTANCE
3027
  _OP_REQP = ["instance_name"]
3028

    
3029
  def BuildHooksEnv(self):
3030
    """Build hooks env.
3031

3032
    This runs on the master, the primary and all the secondaries.
3033

3034
    """
3035
    env = {
3036
      "NEW_SECONDARY": self.op.remote_node,
3037
      "OLD_SECONDARY": self.instance.secondary_nodes[0],
3038
      }
3039
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3040
    nl = [self.sstore.GetMasterNode(),
3041
          self.instance.primary_node] + list(self.instance.secondary_nodes)
3042
    return env, nl, nl
3043

    
3044
  def CheckPrereq(self):
3045
    """Check prerequisites.
3046

3047
    This checks that the instance is in the cluster.
3048

3049
    """
3050
    instance = self.cfg.GetInstanceInfo(
3051
      self.cfg.ExpandInstanceName(self.op.instance_name))
3052
    if instance is None:
3053
      raise errors.OpPrereqError("Instance '%s' not known" %
3054
                                 self.op.instance_name)
3055
    self.instance = instance
3056

    
3057
    if instance.disk_template != constants.DT_REMOTE_RAID1:
3058
      raise errors.OpPrereqError("Instance's disk layout is not"
3059
                                 " remote_raid1.")
3060

    
3061
    if len(instance.secondary_nodes) != 1:
3062
      raise errors.OpPrereqError("The instance has a strange layout,"
3063
                                 " expected one secondary but found %d" %
3064
                                 len(instance.secondary_nodes))
3065

    
3066
    remote_node = getattr(self.op, "remote_node", None)
3067
    if remote_node is None:
3068
      remote_node = instance.secondary_nodes[0]
3069
    else:
3070
      remote_node = self.cfg.ExpandNodeName(remote_node)
3071
      if remote_node is None:
3072
        raise errors.OpPrereqError("Node '%s' not known" %
3073
                                   self.op.remote_node)
3074
    if remote_node == instance.primary_node:
3075
      raise errors.OpPrereqError("The specified node is the primary node of"
3076
                                 " the instance.")
3077
    self.op.remote_node = remote_node
3078

    
3079
  def Exec(self, feedback_fn):
3080
    """Replace the disks of an instance.
3081

3082
    """
3083
    instance = self.instance
3084
    iv_names = {}
3085
    # start of work
3086
    remote_node = self.op.remote_node
3087
    cfg = self.cfg
3088
    vgname = cfg.GetVGName()
3089
    for dev in instance.disks:
3090
      size = dev.size
3091
      lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3092
      names = _GenerateUniqueNames(cfg, lv_names)
3093
      new_drbd = _GenerateMDDRBDBranch(cfg, instance.primary_node,
3094
                                       remote_node, size, names)
3095
      iv_names[dev.iv_name] = (dev, dev.children[0], new_drbd)
3096
      logger.Info("adding new mirror component on secondary for %s" %
3097
                  dev.iv_name)
3098
      #HARDCODE
3099
      if not _CreateBlockDevOnSecondary(cfg, remote_node, new_drbd, False,
3100
                                        _GetInstanceInfoText(instance)):
3101
        raise errors.OpExecError("Failed to create new component on"
3102
                                 " secondary node %s\n"
3103
                                 "Full abort, cleanup manually!" %
3104
                                 remote_node)
3105

    
3106
      logger.Info("adding new mirror component on primary")
3107
      #HARDCODE
3108
      if not _CreateBlockDevOnPrimary(cfg, instance.primary_node, new_drbd,
3109
                                      _GetInstanceInfoText(instance)):
3110
        # remove secondary dev
3111
        cfg.SetDiskID(new_drbd, remote_node)
3112
        rpc.call_blockdev_remove(remote_node, new_drbd)
3113
        raise errors.OpExecError("Failed to create volume on primary!\n"
3114
                                 "Full abort, cleanup manually!!")
3115

    
3116
      # the device exists now
3117
      # call the primary node to add the mirror to md
3118
      logger.Info("adding new mirror component to md")
3119
      if not rpc.call_blockdev_addchild(instance.primary_node, dev,
3120
                                        new_drbd):
3121
        logger.Error("Can't add mirror compoment to md!")
3122
        cfg.SetDiskID(new_drbd, remote_node)
3123
        if not rpc.call_blockdev_remove(remote_node, new_drbd):
3124
          logger.Error("Can't rollback on secondary")
3125
        cfg.SetDiskID(new_drbd, instance.primary_node)
3126
        if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3127
          logger.Error("Can't rollback on primary")
3128
        raise errors.OpExecError("Full abort, cleanup manually!!")
3129

    
3130
      dev.children.append(new_drbd)
3131
      cfg.AddInstance(instance)
3132

    
3133
    # this can fail as the old devices are degraded and _WaitForSync
3134
    # does a combined result over all disks, so we don't check its
3135
    # return value
3136
    _WaitForSync(cfg, instance, unlock=True)
3137

    
3138
    # so check manually all the devices
3139
    for name in iv_names:
3140
      dev, child, new_drbd = iv_names[name]
3141
      cfg.SetDiskID(dev, instance.primary_node)
3142
      is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3143
      if is_degr:
3144
        raise errors.OpExecError("MD device %s is degraded!" % name)
3145
      cfg.SetDiskID(new_drbd, instance.primary_node)
3146
      is_degr = rpc.call_blockdev_find(instance.primary_node, new_drbd)[5]
3147
      if is_degr:
3148
        raise errors.OpExecError("New drbd device %s is degraded!" % name)
3149

    
3150
    for name in iv_names:
3151
      dev, child, new_drbd = iv_names[name]
3152
      logger.Info("remove mirror %s component" % name)
3153
      cfg.SetDiskID(dev, instance.primary_node)
3154
      if not rpc.call_blockdev_removechild(instance.primary_node,
3155
                                                dev, child):
3156
        logger.Error("Can't remove child from mirror, aborting"
3157
                     " *this device cleanup*.\nYou need to cleanup manually!!")
3158
        continue
3159

    
3160
      for node in child.logical_id[:2]:
3161
        logger.Info("remove child device on %s" % node)
3162
        cfg.SetDiskID(child, node)
3163
        if not rpc.call_blockdev_remove(node, child):
3164
          logger.Error("Warning: failed to remove device from node %s,"
3165
                       " continuing operation." % node)
3166

    
3167
      dev.children.remove(child)
3168

    
3169
      cfg.AddInstance(instance)
3170

    
3171

    
3172
class LUQueryInstanceData(NoHooksLU):
3173
  """Query runtime instance data.
3174

3175
  """
3176
  _OP_REQP = ["instances"]
3177

    
3178
  def CheckPrereq(self):
3179
    """Check prerequisites.
3180

3181
    This only checks the optional instance list against the existing names.
3182

3183
    """
3184
    if not isinstance(self.op.instances, list):
3185
      raise errors.OpPrereqError("Invalid argument type 'instances'")
3186
    if self.op.instances:
3187
      self.wanted_instances = []
3188
      names = self.op.instances
3189
      for name in names:
3190
        instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
3191
        if instance is None:
3192
          raise errors.OpPrereqError("No such instance name '%s'" % name)
3193
      self.wanted_instances.append(instance)
3194
    else:
3195
      self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
3196
                               in self.cfg.GetInstanceList()]
3197
    return
3198

    
3199

    
3200
  def _ComputeDiskStatus(self, instance, snode, dev):
3201
    """Compute block device status.
3202

3203
    """
3204
    self.cfg.SetDiskID(dev, instance.primary_node)
3205
    dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
3206
    if dev.dev_type == "drbd":
3207
      # we change the snode then (otherwise we use the one passed in)
3208
      if dev.logical_id[0] == instance.primary_node:
3209
        snode = dev.logical_id[1]
3210
      else:
3211
        snode = dev.logical_id[0]
3212

    
3213
    if snode:
3214
      self.cfg.SetDiskID(dev, snode)
3215
      dev_sstatus = rpc.call_blockdev_find(snode, dev)
3216
    else:
3217
      dev_sstatus = None
3218

    
3219
    if dev.children:
3220
      dev_children = [self._ComputeDiskStatus(instance, snode, child)
3221
                      for child in dev.children]
3222
    else:
3223
      dev_children = []
3224

    
3225
    data = {
3226
      "iv_name": dev.iv_name,
3227
      "dev_type": dev.dev_type,
3228
      "logical_id": dev.logical_id,
3229
      "physical_id": dev.physical_id,
3230
      "pstatus": dev_pstatus,
3231
      "sstatus": dev_sstatus,
3232
      "children": dev_children,
3233
      }
3234

    
3235
    return data
3236

    
3237
  def Exec(self, feedback_fn):
3238
    """Gather and return data"""
3239
    result = {}
3240
    for instance in self.wanted_instances:
3241
      remote_info = rpc.call_instance_info(instance.primary_node,
3242
                                                instance.name)
3243
      if remote_info and "state" in remote_info:
3244
        remote_state = "up"
3245
      else:
3246
        remote_state = "down"
3247
      if instance.status == "down":
3248
        config_state = "down"
3249
      else:
3250
        config_state = "up"
3251

    
3252
      disks = [self._ComputeDiskStatus(instance, None, device)
3253
               for device in instance.disks]
3254

    
3255
      idict = {
3256
        "name": instance.name,
3257
        "config_state": config_state,
3258
        "run_state": remote_state,
3259
        "pnode": instance.primary_node,
3260
        "snodes": instance.secondary_nodes,
3261
        "os": instance.os,
3262
        "memory": instance.memory,
3263
        "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
3264
        "disks": disks,
3265
        }
3266

    
3267
      result[instance.name] = idict
3268

    
3269
    return result
3270

    
3271

    
3272
class LUQueryNodeData(NoHooksLU):
3273
  """Logical unit for querying node data.
3274

3275
  """
3276
  _OP_REQP = ["nodes"]
3277

    
3278
  def CheckPrereq(self):
3279
    """Check prerequisites.
3280

3281
    This only checks the optional node list against the existing names.
3282

3283
    """
3284
    self.wanted_nodes = _GetWantedNodes(self, self.op.nodes)
3285

    
3286
  def Exec(self, feedback_fn):
3287
    """Compute and return the list of nodes.
3288

3289
    """
3290
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
3291
             in self.cfg.GetInstanceList()]
3292
    result = []
3293
    for node in self.wanted_nodes:
3294
      result.append((node.name, node.primary_ip, node.secondary_ip,
3295
                     [inst.name for inst in ilist
3296
                      if inst.primary_node == node.name],
3297
                     [inst.name for inst in ilist
3298
                      if node.name in inst.secondary_nodes],
3299
                     ))
3300
    return result
3301

    
3302

    
3303
class LUSetInstanceParms(LogicalUnit):
3304
  """Modifies an instances's parameters.
3305

3306
  """
3307
  HPATH = "instance-modify"
3308
  HTYPE = constants.HTYPE_INSTANCE
3309
  _OP_REQP = ["instance_name"]
3310

    
3311
  def BuildHooksEnv(self):
3312
    """Build hooks env.
3313

3314
    This runs on the master, primary and secondaries.
3315

3316
    """
3317
    args = dict()
3318
    if self.mem:
3319
      args['memory'] = self.mem
3320
    if self.vcpus:
3321
      args['vcpus'] = self.vcpus
3322
    if self.do_ip or self.do_bridge:
3323
      if self.do_ip:
3324
        ip = self.ip
3325
      else:
3326
        ip = self.instance.nics[0].ip
3327
      if self.bridge:
3328
        bridge = self.bridge
3329
      else:
3330
        bridge = self.instance.nics[0].bridge
3331
      args['nics'] = [(ip, bridge)]
3332
    env = _BuildInstanceHookEnvByObject(self.instance, override=args)
3333
    nl = [self.sstore.GetMasterNode(),
3334
          self.instance.primary_node] + list(self.instance.secondary_nodes)
3335
    return env, nl, nl
3336

    
3337
  def CheckPrereq(self):
3338
    """Check prerequisites.
3339

3340
    This only checks the instance list against the existing names.
3341

3342
    """
3343
    self.mem = getattr(self.op, "mem", None)
3344
    self.vcpus = getattr(self.op, "vcpus", None)
3345
    self.ip = getattr(self.op, "ip", None)
3346
    self.bridge = getattr(self.op, "bridge", None)
3347
    if [self.mem, self.vcpus, self.ip, self.bridge].count(None) == 4:
3348
      raise errors.OpPrereqError("No changes submitted")
3349
    if self.mem is not None:
3350
      try:
3351
        self.mem = int(self.mem)
3352
      except ValueError, err:
3353
        raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
3354
    if self.vcpus is not None:
3355
      try:
3356
        self.vcpus = int(self.vcpus)
3357
      except ValueError, err:
3358
        raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
3359
    if self.ip is not None:
3360
      self.do_ip = True
3361
      if self.ip.lower() == "none":
3362
        self.ip = None
3363
      else:
3364
        if not utils.IsValidIP(self.ip):
3365
          raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
3366
    else:
3367
      self.do_ip = False
3368
    self.do_bridge = (self.bridge is not None)
3369

    
3370
    instance = self.cfg.GetInstanceInfo(
3371
      self.cfg.ExpandInstanceName(self.op.instance_name))
3372
    if instance is None:
3373
      raise errors.OpPrereqError("No such instance name '%s'" %
3374
                                 self.op.instance_name)
3375
    self.op.instance_name = instance.name
3376
    self.instance = instance
3377
    return
3378

    
3379
  def Exec(self, feedback_fn):
3380
    """Modifies an instance.
3381

3382
    All parameters take effect only at the next restart of the instance.
3383
    """
3384
    result = []
3385
    instance = self.instance
3386
    if self.mem:
3387
      instance.memory = self.mem
3388
      result.append(("mem", self.mem))
3389
    if self.vcpus:
3390
      instance.vcpus = self.vcpus
3391
      result.append(("vcpus",  self.vcpus))
3392
    if self.do_ip:
3393
      instance.nics[0].ip = self.ip
3394
      result.append(("ip", self.ip))
3395
    if self.bridge:
3396
      instance.nics[0].bridge = self.bridge
3397
      result.append(("bridge", self.bridge))
3398

    
3399
    self.cfg.AddInstance(instance)
3400

    
3401
    return result
3402

    
3403

    
3404
class LUQueryExports(NoHooksLU):
3405
  """Query the exports list
3406

3407
  """
3408
  _OP_REQP = []
3409

    
3410
  def CheckPrereq(self):
3411
    """Check that the nodelist contains only existing nodes.
3412

3413
    """
3414
    self.nodes = _GetWantedNodes(self, getattr(self.op, "nodes", None))
3415

    
3416
  def Exec(self, feedback_fn):
3417
    """Compute the list of all the exported system images.
3418

3419
    Returns:
3420
      a dictionary with the structure node->(export-list)
3421
      where export-list is a list of the instances exported on
3422
      that node.
3423

3424
    """
3425
    return rpc.call_export_list([node.name for node in self.nodes])
3426

    
3427

    
3428
class LUExportInstance(LogicalUnit):
3429
  """Export an instance to an image in the cluster.
3430

3431
  """
3432
  HPATH = "instance-export"
3433
  HTYPE = constants.HTYPE_INSTANCE
3434
  _OP_REQP = ["instance_name", "target_node", "shutdown"]
3435

    
3436
  def BuildHooksEnv(self):
3437
    """Build hooks env.
3438

3439
    This will run on the master, primary node and target node.
3440

3441
    """
3442
    env = {
3443
      "EXPORT_NODE": self.op.target_node,
3444
      "EXPORT_DO_SHUTDOWN": self.op.shutdown,
3445
      }
3446
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3447
    nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
3448
          self.op.target_node]
3449
    return env, nl, nl
3450

    
3451
  def CheckPrereq(self):
3452
    """Check prerequisites.
3453

3454
    This checks that the instance name is a valid one.
3455

3456
    """
3457
    instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
3458
    self.instance = self.cfg.GetInstanceInfo(instance_name)
3459
    if self.instance is None:
3460
      raise errors.OpPrereqError("Instance '%s' not found" %
3461
                                 self.op.instance_name)
3462

    
3463
    # node verification
3464
    dst_node_short = self.cfg.ExpandNodeName(self.op.target_node)
3465
    self.dst_node = self.cfg.GetNodeInfo(dst_node_short)
3466

    
3467
    if self.dst_node is None:
3468
      raise errors.OpPrereqError("Destination node '%s' is unknown." %
3469
                                 self.op.target_node)
3470
    self.op.target_node = self.dst_node.name
3471

    
3472
  def Exec(self, feedback_fn):
3473
    """Export an instance to an image in the cluster.
3474

3475
    """
3476
    instance = self.instance
3477
    dst_node = self.dst_node
3478
    src_node = instance.primary_node
3479
    # shutdown the instance, unless requested not to do so
3480
    if self.op.shutdown:
3481
      op = opcodes.OpShutdownInstance(instance_name=instance.name)
3482
      self.processor.ChainOpCode(op, feedback_fn)
3483

    
3484
    vgname = self.cfg.GetVGName()
3485

    
3486
    snap_disks = []
3487

    
3488
    try:
3489
      for disk in instance.disks:
3490
        if disk.iv_name == "sda":
3491
          # new_dev_name will be a snapshot of an lvm leaf of the one we passed
3492
          new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
3493

    
3494
          if not new_dev_name:
3495
            logger.Error("could not snapshot block device %s on node %s" %
3496
                         (disk.logical_id[1], src_node))
3497
          else:
3498
            new_dev = objects.Disk(dev_type="lvm", size=disk.size,
3499
                                      logical_id=(vgname, new_dev_name),
3500
                                      physical_id=(vgname, new_dev_name),
3501
                                      iv_name=disk.iv_name)
3502
            snap_disks.append(new_dev)
3503

    
3504
    finally:
3505
      if self.op.shutdown:
3506
        op = opcodes.OpStartupInstance(instance_name=instance.name,
3507
                                       force=False)
3508
        self.processor.ChainOpCode(op, feedback_fn)
3509

    
3510
    # TODO: check for size
3511

    
3512
    for dev in snap_disks:
3513
      if not rpc.call_snapshot_export(src_node, dev, dst_node.name,
3514
                                           instance):
3515
        logger.Error("could not export block device %s from node"
3516
                     " %s to node %s" %
3517
                     (dev.logical_id[1], src_node, dst_node.name))
3518
      if not rpc.call_blockdev_remove(src_node, dev):
3519
        logger.Error("could not remove snapshot block device %s from"
3520
                     " node %s" % (dev.logical_id[1], src_node))
3521

    
3522
    if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
3523
      logger.Error("could not finalize export for instance %s on node %s" %
3524
                   (instance.name, dst_node.name))
3525

    
3526
    nodelist = self.cfg.GetNodeList()
3527
    nodelist.remove(dst_node.name)
3528

    
3529
    # on one-node clusters nodelist will be empty after the removal
3530
    # if we proceed the backup would be removed because OpQueryExports
3531
    # substitutes an empty list with the full cluster node list.
3532
    if nodelist:
3533
      op = opcodes.OpQueryExports(nodes=nodelist)
3534
      exportlist = self.processor.ChainOpCode(op, feedback_fn)
3535
      for node in exportlist:
3536
        if instance.name in exportlist[node]:
3537
          if not rpc.call_export_remove(node, instance.name):
3538
            logger.Error("could not remove older export for instance %s"
3539
                         " on node %s" % (instance.name, node))
3540

    
3541

    
3542
class TagsLU(NoHooksLU):
3543
  """Generic tags LU.
3544

3545
  This is an abstract class which is the parent of all the other tags LUs.
3546

3547
  """
3548
  def CheckPrereq(self):
3549
    """Check prerequisites.
3550

3551
    """
3552
    if self.op.kind == constants.TAG_CLUSTER:
3553
      self.target = self.cfg.GetClusterInfo()
3554
    elif self.op.kind == constants.TAG_NODE:
3555
      name = self.cfg.ExpandNodeName(self.op.name)
3556
      if name is None:
3557
        raise errors.OpPrereqError("Invalid node name (%s)" %
3558
                                   (self.op.name,))
3559
      self.op.name = name
3560
      self.target = self.cfg.GetNodeInfo(name)
3561
    elif self.op.kind == constants.TAG_INSTANCE:
3562
      name = self.cfg.ExpandInstanceName(name)
3563
      if name is None:
3564
        raise errors.OpPrereqError("Invalid instance name (%s)" %
3565
                                   (self.op.name,))
3566
      self.op.name = name
3567
      self.target = self.cfg.GetInstanceInfo(name)
3568
    else:
3569
      raise errors.OpPrereqError("Wrong tag type requested (%s)" %
3570
                                 str(self.op.kind))
3571

    
3572

    
3573
class LUGetTags(TagsLU):
3574
  """Returns the tags of a given object.
3575

3576
  """
3577
  _OP_REQP = ["kind", "name"]
3578

    
3579
  def Exec(self, feedback_fn):
3580
    """Returns the tag list.
3581

3582
    """
3583
    return self.target.GetTags()
3584

    
3585

    
3586
class LUAddTag(TagsLU):
3587
  """Sets a tag on a given object.
3588

3589
  """
3590
  _OP_REQP = ["kind", "name", "tag"]
3591

    
3592
  def CheckPrereq(self):
3593
    """Check prerequisites.
3594

3595
    This checks the type and length of the tag name and value.
3596

3597
    """
3598
    TagsLU.CheckPrereq(self)
3599
    objects.TaggableObject.ValidateTag(self.op.tag)
3600

    
3601
  def Exec(self, feedback_fn):
3602
    """Sets the tag.
3603

3604
    """
3605
    try:
3606
      self.target.AddTag(self.op.tag)
3607
    except errors.TagError, err:
3608
      raise errors.OpExecError("Error while setting tag: %s" % str(err))
3609
    try:
3610
      self.cfg.Update(self.target)
3611
    except errors.ConfigurationError:
3612
      raise errors.OpRetryError("There has been a modification to the"
3613
                                " config file and the operation has been"
3614
                                " aborted. Please retry.")
3615

    
3616

    
3617
class LUDelTag(TagsLU):
3618
  """Delete a tag from a given object.
3619

3620
  """
3621
  _OP_REQP = ["kind", "name", "tag"]
3622

    
3623
  def CheckPrereq(self):
3624
    """Check prerequisites.
3625

3626
    This checks that we have the given tag.
3627

3628
    """
3629
    TagsLU.CheckPrereq(self)
3630
    objects.TaggableObject.ValidateTag(self.op.tag)
3631
    if self.op.tag not in self.target.GetTags():
3632
      raise errors.OpPrereqError("Tag not found")
3633

    
3634
  def Exec(self, feedback_fn):
3635
    """Remove the tag from the object.
3636

3637
    """
3638
    self.target.RemoveTag(self.op.tag)
3639
    try:
3640
      self.cfg.Update(self.target)
3641
    except errors.ConfigurationError:
3642
      raise errors.OpRetryError("There has been a modification to the"
3643
                                " config file and the operation has been"
3644
                                " aborted. Please retry.")