Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ 2a710df1

History | View | Annotate | Download (116.2 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0613,W0201
25

    
26
import os
27
import os.path
28
import sha
29
import socket
30
import time
31
import tempfile
32
import re
33
import platform
34

    
35
from ganeti import rpc
36
from ganeti import ssh
37
from ganeti import logger
38
from ganeti import utils
39
from ganeti import errors
40
from ganeti import hypervisor
41
from ganeti import config
42
from ganeti import constants
43
from ganeti import objects
44
from ganeti import opcodes
45
from ganeti import ssconf
46

    
47
class LogicalUnit(object):
48
  """Logical Unit base class.
49

50
  Subclasses must follow these rules:
51
    - implement CheckPrereq which also fills in the opcode instance
52
      with all the fields (even if as None)
53
    - implement Exec
54
    - implement BuildHooksEnv
55
    - redefine HPATH and HTYPE
56
    - optionally redefine their run requirements (REQ_CLUSTER,
57
      REQ_MASTER); note that all commands require root permissions
58

59
  """
60
  HPATH = None
61
  HTYPE = None
62
  _OP_REQP = []
63
  REQ_CLUSTER = True
64
  REQ_MASTER = True
65

    
66
  def __init__(self, processor, op, cfg, sstore):
67
    """Constructor for LogicalUnit.
68

69
    This needs to be overriden in derived classes in order to check op
70
    validity.
71

72
    """
73
    self.processor = processor
74
    self.op = op
75
    self.cfg = cfg
76
    self.sstore = sstore
77
    for attr_name in self._OP_REQP:
78
      attr_val = getattr(op, attr_name, None)
79
      if attr_val is None:
80
        raise errors.OpPrereqError("Required parameter '%s' missing" %
81
                                   attr_name)
82
    if self.REQ_CLUSTER:
83
      if not cfg.IsCluster():
84
        raise errors.OpPrereqError("Cluster not initialized yet,"
85
                                   " use 'gnt-cluster init' first.")
86
      if self.REQ_MASTER:
87
        master = sstore.GetMasterNode()
88
        if master != socket.gethostname():
89
          raise errors.OpPrereqError("Commands must be run on the master"
90
                                     " node %s" % master)
91

    
92
  def CheckPrereq(self):
93
    """Check prerequisites for this LU.
94

95
    This method should check that the prerequisites for the execution
96
    of this LU are fulfilled. It can do internode communication, but
97
    it should be idempotent - no cluster or system changes are
98
    allowed.
99

100
    The method should raise errors.OpPrereqError in case something is
101
    not fulfilled. Its return value is ignored.
102

103
    This method should also update all the parameters of the opcode to
104
    their canonical form; e.g. a short node name must be fully
105
    expanded after this method has successfully completed (so that
106
    hooks, logging, etc. work correctly).
107

108
    """
109
    raise NotImplementedError
110

    
111
  def Exec(self, feedback_fn):
112
    """Execute the LU.
113

114
    This method should implement the actual work. It should raise
115
    errors.OpExecError for failures that are somewhat dealt with in
116
    code, or expected.
117

118
    """
119
    raise NotImplementedError
120

    
121
  def BuildHooksEnv(self):
122
    """Build hooks environment for this LU.
123

124
    This method should return a three-node tuple consisting of: a dict
125
    containing the environment that will be used for running the
126
    specific hook for this LU, a list of node names on which the hook
127
    should run before the execution, and a list of node names on which
128
    the hook should run after the execution.
129

130
    The keys of the dict must not have 'GANETI_' prefixed as this will
131
    be handled in the hooks runner. Also note additional keys will be
132
    added by the hooks runner. If the LU doesn't define any
133
    environment, an empty dict (and not None) should be returned.
134

135
    As for the node lists, the master should not be included in the
136
    them, as it will be added by the hooks runner in case this LU
137
    requires a cluster to run on (otherwise we don't have a node
138
    list). No nodes should be returned as an empty list (and not
139
    None).
140

141
    Note that if the HPATH for a LU class is None, this function will
142
    not be called.
143

144
    """
145
    raise NotImplementedError
146

    
147

    
148
class NoHooksLU(LogicalUnit):
149
  """Simple LU which runs no hooks.
150

151
  This LU is intended as a parent for other LogicalUnits which will
152
  run no hooks, in order to reduce duplicate code.
153

154
  """
155
  HPATH = None
156
  HTYPE = None
157

    
158
  def BuildHooksEnv(self):
159
    """Build hooks env.
160

161
    This is a no-op, since we don't run hooks.
162

163
    """
164
    return
165

    
166

    
167
def _GetWantedNodes(lu, nodes):
168
  """Returns list of checked and expanded nodes.
169

170
  Args:
171
    nodes: List of nodes (strings) or None for all
172

173
  """
174
  if nodes is not None and not isinstance(nodes, list):
175
    raise errors.OpPrereqError("Invalid argument type 'nodes'")
176

    
177
  if nodes:
178
    wanted_nodes = []
179

    
180
    for name in nodes:
181
      node = lu.cfg.GetNodeInfo(lu.cfg.ExpandNodeName(name))
182
      if node is None:
183
        raise errors.OpPrereqError("No such node name '%s'" % name)
184
    wanted_nodes.append(node)
185

    
186
    return wanted_nodes
187
  else:
188
    return [lu.cfg.GetNodeInfo(name) for name in lu.cfg.GetNodeList()]
189

    
190

    
191
def _CheckOutputFields(static, dynamic, selected):
192
  """Checks whether all selected fields are valid.
193

194
  Args:
195
    static: Static fields
196
    dynamic: Dynamic fields
197

198
  """
199
  static_fields = frozenset(static)
200
  dynamic_fields = frozenset(dynamic)
201

    
202
  all_fields = static_fields | dynamic_fields
203

    
204
  if not all_fields.issuperset(selected):
205
    raise errors.OpPrereqError("Unknown output fields selected: %s"
206
                               % ",".join(frozenset(selected).
207
                                          difference(all_fields)))
208

    
209

    
210
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
211
                          memory, vcpus, nics):
212
  """Builds instance related env variables for hooks from single variables.
213

214
  Args:
215
    secondary_nodes: List of secondary nodes as strings
216
  """
217
  env = {
218
    "INSTANCE_NAME": name,
219
    "INSTANCE_PRIMARY": primary_node,
220
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
221
    "INSTANCE_OS_TYPE": os_type,
222
    "INSTANCE_STATUS": status,
223
    "INSTANCE_MEMORY": memory,
224
    "INSTANCE_VCPUS": vcpus,
225
  }
226

    
227
  if nics:
228
    nic_count = len(nics)
229
    for idx, (ip, bridge) in enumerate(nics):
230
      if ip is None:
231
        ip = ""
232
      env["INSTANCE_NIC%d_IP" % idx] = ip
233
      env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
234
  else:
235
    nic_count = 0
236

    
237
  env["INSTANCE_NIC_COUNT"] = nic_count
238

    
239
  return env
240

    
241

    
242
def _BuildInstanceHookEnvByObject(instance, override=None):
243
  """Builds instance related env variables for hooks from an object.
244

245
  Args:
246
    instance: objects.Instance object of instance
247
    override: dict of values to override
248
  """
249
  args = {
250
    'name': instance.name,
251
    'primary_node': instance.primary_node,
252
    'secondary_nodes': instance.secondary_nodes,
253
    'os_type': instance.os,
254
    'status': instance.os,
255
    'memory': instance.memory,
256
    'vcpus': instance.vcpus,
257
    'nics': [(nic.ip, nic.bridge) for nic in instance.nics],
258
  }
259
  if override:
260
    args.update(override)
261
  return _BuildInstanceHookEnv(**args)
262

    
263

    
264
def _UpdateEtcHosts(fullnode, ip):
265
  """Ensure a node has a correct entry in /etc/hosts.
266

267
  Args:
268
    fullnode - Fully qualified domain name of host. (str)
269
    ip       - IPv4 address of host (str)
270

271
  """
272
  node = fullnode.split(".", 1)[0]
273

    
274
  f = open('/etc/hosts', 'r+')
275

    
276
  inthere = False
277

    
278
  save_lines = []
279
  add_lines = []
280
  removed = False
281

    
282
  while True:
283
    rawline = f.readline()
284

    
285
    if not rawline:
286
      # End of file
287
      break
288

    
289
    line = rawline.split('\n')[0]
290

    
291
    # Strip off comments
292
    line = line.split('#')[0]
293

    
294
    if not line:
295
      # Entire line was comment, skip
296
      save_lines.append(rawline)
297
      continue
298

    
299
    fields = line.split()
300

    
301
    haveall = True
302
    havesome = False
303
    for spec in [ ip, fullnode, node ]:
304
      if spec not in fields:
305
        haveall = False
306
      if spec in fields:
307
        havesome = True
308

    
309
    if haveall:
310
      inthere = True
311
      save_lines.append(rawline)
312
      continue
313

    
314
    if havesome and not haveall:
315
      # Line (old, or manual?) which is missing some.  Remove.
316
      removed = True
317
      continue
318

    
319
    save_lines.append(rawline)
320

    
321
  if not inthere:
322
    add_lines.append('%s\t%s %s\n' % (ip, fullnode, node))
323

    
324
  if removed:
325
    if add_lines:
326
      save_lines = save_lines + add_lines
327

    
328
    # We removed a line, write a new file and replace old.
329
    fd, tmpname = tempfile.mkstemp('tmp', 'hosts_', '/etc')
330
    newfile = os.fdopen(fd, 'w')
331
    newfile.write(''.join(save_lines))
332
    newfile.close()
333
    os.rename(tmpname, '/etc/hosts')
334

    
335
  elif add_lines:
336
    # Simply appending a new line will do the trick.
337
    f.seek(0, 2)
338
    for add in add_lines:
339
      f.write(add)
340

    
341
  f.close()
342

    
343

    
344
def _UpdateKnownHosts(fullnode, ip, pubkey):
345
  """Ensure a node has a correct known_hosts entry.
346

347
  Args:
348
    fullnode - Fully qualified domain name of host. (str)
349
    ip       - IPv4 address of host (str)
350
    pubkey   - the public key of the cluster
351

352
  """
353
  if os.path.exists(constants.SSH_KNOWN_HOSTS_FILE):
354
    f = open(constants.SSH_KNOWN_HOSTS_FILE, 'r+')
355
  else:
356
    f = open(constants.SSH_KNOWN_HOSTS_FILE, 'w+')
357

    
358
  inthere = False
359

    
360
  save_lines = []
361
  add_lines = []
362
  removed = False
363

    
364
  while True:
365
    rawline = f.readline()
366
    logger.Debug('read %s' % (repr(rawline),))
367

    
368
    if not rawline:
369
      # End of file
370
      break
371

    
372
    line = rawline.split('\n')[0]
373

    
374
    parts = line.split(' ')
375
    fields = parts[0].split(',')
376
    key = parts[2]
377

    
378
    haveall = True
379
    havesome = False
380
    for spec in [ ip, fullnode ]:
381
      if spec not in fields:
382
        haveall = False
383
      if spec in fields:
384
        havesome = True
385

    
386
    logger.Debug("key, pubkey = %s." % (repr((key, pubkey)),))
387
    if haveall and key == pubkey:
388
      inthere = True
389
      save_lines.append(rawline)
390
      logger.Debug("Keeping known_hosts '%s'." % (repr(rawline),))
391
      continue
392

    
393
    if havesome and (not haveall or key != pubkey):
394
      removed = True
395
      logger.Debug("Discarding known_hosts '%s'." % (repr(rawline),))
396
      continue
397

    
398
    save_lines.append(rawline)
399

    
400
  if not inthere:
401
    add_lines.append('%s,%s ssh-rsa %s\n' % (fullnode, ip, pubkey))
402
    logger.Debug("Adding known_hosts '%s'." % (repr(add_lines[-1]),))
403

    
404
  if removed:
405
    save_lines = save_lines + add_lines
406

    
407
    # Write a new file and replace old.
408
    fd, tmpname = tempfile.mkstemp('.tmp', 'known_hosts.',
409
                                   constants.DATA_DIR)
410
    newfile = os.fdopen(fd, 'w')
411
    try:
412
      newfile.write(''.join(save_lines))
413
    finally:
414
      newfile.close()
415
    logger.Debug("Wrote new known_hosts.")
416
    os.rename(tmpname, constants.SSH_KNOWN_HOSTS_FILE)
417

    
418
  elif add_lines:
419
    # Simply appending a new line will do the trick.
420
    f.seek(0, 2)
421
    for add in add_lines:
422
      f.write(add)
423

    
424
  f.close()
425

    
426

    
427
def _HasValidVG(vglist, vgname):
428
  """Checks if the volume group list is valid.
429

430
  A non-None return value means there's an error, and the return value
431
  is the error message.
432

433
  """
434
  vgsize = vglist.get(vgname, None)
435
  if vgsize is None:
436
    return "volume group '%s' missing" % vgname
437
  elif vgsize < 20480:
438
    return ("volume group '%s' too small (20480MiB required, %dMib found)" %
439
            (vgname, vgsize))
440
  return None
441

    
442

    
443
def _InitSSHSetup(node):
444
  """Setup the SSH configuration for the cluster.
445

446

447
  This generates a dsa keypair for root, adds the pub key to the
448
  permitted hosts and adds the hostkey to its own known hosts.
449

450
  Args:
451
    node: the name of this host as a fqdn
452

453
  """
454
  if os.path.exists('/root/.ssh/id_dsa'):
455
    utils.CreateBackup('/root/.ssh/id_dsa')
456
  if os.path.exists('/root/.ssh/id_dsa.pub'):
457
    utils.CreateBackup('/root/.ssh/id_dsa.pub')
458

    
459
  utils.RemoveFile('/root/.ssh/id_dsa')
460
  utils.RemoveFile('/root/.ssh/id_dsa.pub')
461

    
462
  result = utils.RunCmd(["ssh-keygen", "-t", "dsa",
463
                         "-f", "/root/.ssh/id_dsa",
464
                         "-q", "-N", ""])
465
  if result.failed:
466
    raise errors.OpExecError("Could not generate ssh keypair, error %s" %
467
                             result.output)
468

    
469
  f = open('/root/.ssh/id_dsa.pub', 'r')
470
  try:
471
    utils.AddAuthorizedKey('/root/.ssh/authorized_keys', f.read(8192))
472
  finally:
473
    f.close()
474

    
475

    
476
def _InitGanetiServerSetup(ss):
477
  """Setup the necessary configuration for the initial node daemon.
478

479
  This creates the nodepass file containing the shared password for
480
  the cluster and also generates the SSL certificate.
481

482
  """
483
  # Create pseudo random password
484
  randpass = sha.new(os.urandom(64)).hexdigest()
485
  # and write it into sstore
486
  ss.SetKey(ss.SS_NODED_PASS, randpass)
487

    
488
  result = utils.RunCmd(["openssl", "req", "-new", "-newkey", "rsa:1024",
489
                         "-days", str(365*5), "-nodes", "-x509",
490
                         "-keyout", constants.SSL_CERT_FILE,
491
                         "-out", constants.SSL_CERT_FILE, "-batch"])
492
  if result.failed:
493
    raise errors.OpExecError("could not generate server ssl cert, command"
494
                             " %s had exitcode %s and error message %s" %
495
                             (result.cmd, result.exit_code, result.output))
496

    
497
  os.chmod(constants.SSL_CERT_FILE, 0400)
498

    
499
  result = utils.RunCmd([constants.NODE_INITD_SCRIPT, "restart"])
500

    
501
  if result.failed:
502
    raise errors.OpExecError("Could not start the node daemon, command %s"
503
                             " had exitcode %s and error %s" %
504
                             (result.cmd, result.exit_code, result.output))
505

    
506

    
507
class LUInitCluster(LogicalUnit):
508
  """Initialise the cluster.
509

510
  """
511
  HPATH = "cluster-init"
512
  HTYPE = constants.HTYPE_CLUSTER
513
  _OP_REQP = ["cluster_name", "hypervisor_type", "vg_name", "mac_prefix",
514
              "def_bridge", "master_netdev"]
515
  REQ_CLUSTER = False
516

    
517
  def BuildHooksEnv(self):
518
    """Build hooks env.
519

520
    Notes: Since we don't require a cluster, we must manually add
521
    ourselves in the post-run node list.
522

523
    """
524
    env = {
525
      "CLUSTER": self.op.cluster_name,
526
      "MASTER": self.hostname['hostname_full'],
527
      }
528
    return env, [], [self.hostname['hostname_full']]
529

    
530
  def CheckPrereq(self):
531
    """Verify that the passed name is a valid one.
532

533
    """
534
    if config.ConfigWriter.IsCluster():
535
      raise errors.OpPrereqError("Cluster is already initialised")
536

    
537
    hostname_local = socket.gethostname()
538
    self.hostname = hostname = utils.LookupHostname(hostname_local)
539
    if not hostname:
540
      raise errors.OpPrereqError("Cannot resolve my own hostname ('%s')" %
541
                                 hostname_local)
542

    
543
    if hostname["hostname_full"] != hostname_local:
544
      raise errors.OpPrereqError("My own hostname (%s) does not match the"
545
                                 " resolver (%s): probably not using FQDN"
546
                                 " for hostname." %
547
                                 (hostname_local, hostname["hostname_full"]))
548

    
549
    if hostname["ip"].startswith("127."):
550
      raise errors.OpPrereqError("This host's IP resolves to the private"
551
                                 " range (%s). Please fix DNS or /etc/hosts." %
552
                                 (hostname["ip"],))
553

    
554
    self.clustername = clustername = utils.LookupHostname(self.op.cluster_name)
555
    if not clustername:
556
      raise errors.OpPrereqError("Cannot resolve given cluster name ('%s')"
557
                                 % self.op.cluster_name)
558

    
559
    result = utils.RunCmd(["fping", "-S127.0.0.1", "-q", hostname['ip']])
560
    if result.failed:
561
      raise errors.OpPrereqError("Inconsistency: this host's name resolves"
562
                                 " to %s,\nbut this ip address does not"
563
                                 " belong to this host."
564
                                 " Aborting." % hostname['ip'])
565

    
566
    secondary_ip = getattr(self.op, "secondary_ip", None)
567
    if secondary_ip and not utils.IsValidIP(secondary_ip):
568
      raise errors.OpPrereqError("Invalid secondary ip given")
569
    if secondary_ip and secondary_ip != hostname['ip']:
570
      result = utils.RunCmd(["fping", "-S127.0.0.1", "-q", secondary_ip])
571
      if result.failed:
572
        raise errors.OpPrereqError("You gave %s as secondary IP,\n"
573
                                   "but it does not belong to this host." %
574
                                   secondary_ip)
575
    self.secondary_ip = secondary_ip
576

    
577
    # checks presence of the volume group given
578
    vgstatus = _HasValidVG(utils.ListVolumeGroups(), self.op.vg_name)
579

    
580
    if vgstatus:
581
      raise errors.OpPrereqError("Error: %s" % vgstatus)
582

    
583
    if not re.match("^[0-9a-z]{2}:[0-9a-z]{2}:[0-9a-z]{2}$",
584
                    self.op.mac_prefix):
585
      raise errors.OpPrereqError("Invalid mac prefix given '%s'" %
586
                                 self.op.mac_prefix)
587

    
588
    if self.op.hypervisor_type not in hypervisor.VALID_HTYPES:
589
      raise errors.OpPrereqError("Invalid hypervisor type given '%s'" %
590
                                 self.op.hypervisor_type)
591

    
592
    result = utils.RunCmd(["ip", "link", "show", "dev", self.op.master_netdev])
593
    if result.failed:
594
      raise errors.OpPrereqError("Invalid master netdev given (%s): '%s'" %
595
                                 (self.op.master_netdev,
596
                                  result.output.strip()))
597

    
598
  def Exec(self, feedback_fn):
599
    """Initialize the cluster.
600

601
    """
602
    clustername = self.clustername
603
    hostname = self.hostname
604

    
605
    # set up the simple store
606
    ss = ssconf.SimpleStore()
607
    ss.SetKey(ss.SS_HYPERVISOR, self.op.hypervisor_type)
608
    ss.SetKey(ss.SS_MASTER_NODE, hostname['hostname_full'])
609
    ss.SetKey(ss.SS_MASTER_IP, clustername['ip'])
610
    ss.SetKey(ss.SS_MASTER_NETDEV, self.op.master_netdev)
611
    ss.SetKey(ss.SS_CLUSTER_NAME, clustername['hostname'])
612

    
613
    # set up the inter-node password and certificate
614
    _InitGanetiServerSetup(ss)
615

    
616
    # start the master ip
617
    rpc.call_node_start_master(hostname['hostname_full'])
618

    
619
    # set up ssh config and /etc/hosts
620
    f = open('/etc/ssh/ssh_host_rsa_key.pub', 'r')
621
    try:
622
      sshline = f.read()
623
    finally:
624
      f.close()
625
    sshkey = sshline.split(" ")[1]
626

    
627
    _UpdateEtcHosts(hostname['hostname_full'],
628
                    hostname['ip'],
629
                    )
630

    
631
    _UpdateKnownHosts(hostname['hostname_full'],
632
                      hostname['ip'],
633
                      sshkey,
634
                      )
635

    
636
    _InitSSHSetup(hostname['hostname'])
637

    
638
    # init of cluster config file
639
    cfgw = config.ConfigWriter()
640
    cfgw.InitConfig(hostname['hostname'], hostname['ip'], self.secondary_ip,
641
                    sshkey, self.op.mac_prefix,
642
                    self.op.vg_name, self.op.def_bridge)
643

    
644

    
645
class LUDestroyCluster(NoHooksLU):
646
  """Logical unit for destroying the cluster.
647

648
  """
649
  _OP_REQP = []
650

    
651
  def CheckPrereq(self):
652
    """Check prerequisites.
653

654
    This checks whether the cluster is empty.
655

656
    Any errors are signalled by raising errors.OpPrereqError.
657

658
    """
659
    master = self.sstore.GetMasterNode()
660

    
661
    nodelist = self.cfg.GetNodeList()
662
    if len(nodelist) != 1 or nodelist[0] != master:
663
      raise errors.OpPrereqError("There are still %d node(s) in"
664
                                 " this cluster." % (len(nodelist) - 1))
665
    instancelist = self.cfg.GetInstanceList()
666
    if instancelist:
667
      raise errors.OpPrereqError("There are still %d instance(s) in"
668
                                 " this cluster." % len(instancelist))
669

    
670
  def Exec(self, feedback_fn):
671
    """Destroys the cluster.
672

673
    """
674
    utils.CreateBackup('/root/.ssh/id_dsa')
675
    utils.CreateBackup('/root/.ssh/id_dsa.pub')
676
    rpc.call_node_leave_cluster(self.sstore.GetMasterNode())
677

    
678

    
679
class LUVerifyCluster(NoHooksLU):
680
  """Verifies the cluster status.
681

682
  """
683
  _OP_REQP = []
684

    
685
  def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
686
                  remote_version, feedback_fn):
687
    """Run multiple tests against a node.
688

689
    Test list:
690
      - compares ganeti version
691
      - checks vg existance and size > 20G
692
      - checks config file checksum
693
      - checks ssh to other nodes
694

695
    Args:
696
      node: name of the node to check
697
      file_list: required list of files
698
      local_cksum: dictionary of local files and their checksums
699

700
    """
701
    # compares ganeti version
702
    local_version = constants.PROTOCOL_VERSION
703
    if not remote_version:
704
      feedback_fn(" - ERROR: connection to %s failed" % (node))
705
      return True
706

    
707
    if local_version != remote_version:
708
      feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
709
                      (local_version, node, remote_version))
710
      return True
711

    
712
    # checks vg existance and size > 20G
713

    
714
    bad = False
715
    if not vglist:
716
      feedback_fn("  - ERROR: unable to check volume groups on node %s." %
717
                      (node,))
718
      bad = True
719
    else:
720
      vgstatus = _HasValidVG(vglist, self.cfg.GetVGName())
721
      if vgstatus:
722
        feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
723
        bad = True
724

    
725
    # checks config file checksum
726
    # checks ssh to any
727

    
728
    if 'filelist' not in node_result:
729
      bad = True
730
      feedback_fn("  - ERROR: node hasn't returned file checksum data")
731
    else:
732
      remote_cksum = node_result['filelist']
733
      for file_name in file_list:
734
        if file_name not in remote_cksum:
735
          bad = True
736
          feedback_fn("  - ERROR: file '%s' missing" % file_name)
737
        elif remote_cksum[file_name] != local_cksum[file_name]:
738
          bad = True
739
          feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
740

    
741
    if 'nodelist' not in node_result:
742
      bad = True
743
      feedback_fn("  - ERROR: node hasn't returned node connectivity data")
744
    else:
745
      if node_result['nodelist']:
746
        bad = True
747
        for node in node_result['nodelist']:
748
          feedback_fn("  - ERROR: communication with node '%s': %s" %
749
                          (node, node_result['nodelist'][node]))
750
    hyp_result = node_result.get('hypervisor', None)
751
    if hyp_result is not None:
752
      feedback_fn("  - ERROR: hypervisor verify failure: '%s'" % hyp_result)
753
    return bad
754

    
755
  def _VerifyInstance(self, instance, node_vol_is, node_instance, feedback_fn):
756
    """Verify an instance.
757

758
    This function checks to see if the required block devices are
759
    available on the instance's node.
760

761
    """
762
    bad = False
763

    
764
    instancelist = self.cfg.GetInstanceList()
765
    if not instance in instancelist:
766
      feedback_fn("  - ERROR: instance %s not in instance list %s" %
767
                      (instance, instancelist))
768
      bad = True
769

    
770
    instanceconfig = self.cfg.GetInstanceInfo(instance)
771
    node_current = instanceconfig.primary_node
772

    
773
    node_vol_should = {}
774
    instanceconfig.MapLVsByNode(node_vol_should)
775

    
776
    for node in node_vol_should:
777
      for volume in node_vol_should[node]:
778
        if node not in node_vol_is or volume not in node_vol_is[node]:
779
          feedback_fn("  - ERROR: volume %s missing on node %s" %
780
                          (volume, node))
781
          bad = True
782

    
783
    if not instanceconfig.status == 'down':
784
      if not instance in node_instance[node_current]:
785
        feedback_fn("  - ERROR: instance %s not running on node %s" %
786
                        (instance, node_current))
787
        bad = True
788

    
789
    for node in node_instance:
790
      if (not node == node_current):
791
        if instance in node_instance[node]:
792
          feedback_fn("  - ERROR: instance %s should not run on node %s" %
793
                          (instance, node))
794
          bad = True
795

    
796
    return not bad
797

    
798
  def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
799
    """Verify if there are any unknown volumes in the cluster.
800

801
    The .os, .swap and backup volumes are ignored. All other volumes are
802
    reported as unknown.
803

804
    """
805
    bad = False
806

    
807
    for node in node_vol_is:
808
      for volume in node_vol_is[node]:
809
        if node not in node_vol_should or volume not in node_vol_should[node]:
810
          feedback_fn("  - ERROR: volume %s on node %s should not exist" %
811
                      (volume, node))
812
          bad = True
813
    return bad
814

    
815
  def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
816
    """Verify the list of running instances.
817

818
    This checks what instances are running but unknown to the cluster.
819

820
    """
821
    bad = False
822
    for node in node_instance:
823
      for runninginstance in node_instance[node]:
824
        if runninginstance not in instancelist:
825
          feedback_fn("  - ERROR: instance %s on node %s should not exist" %
826
                          (runninginstance, node))
827
          bad = True
828
    return bad
829

    
830
  def CheckPrereq(self):
831
    """Check prerequisites.
832

833
    This has no prerequisites.
834

835
    """
836
    pass
837

    
838
  def Exec(self, feedback_fn):
839
    """Verify integrity of cluster, performing various test on nodes.
840

841
    """
842
    bad = False
843
    feedback_fn("* Verifying global settings")
844
    self.cfg.VerifyConfig()
845

    
846
    master = self.sstore.GetMasterNode()
847
    vg_name = self.cfg.GetVGName()
848
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
849
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
850
    node_volume = {}
851
    node_instance = {}
852

    
853
    # FIXME: verify OS list
854
    # do local checksums
855
    file_names = list(self.sstore.GetFileList())
856
    file_names.append(constants.SSL_CERT_FILE)
857
    file_names.append(constants.CLUSTER_CONF_FILE)
858
    local_checksums = utils.FingerprintFiles(file_names)
859

    
860
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
861
    all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
862
    all_instanceinfo = rpc.call_instance_list(nodelist)
863
    all_vglist = rpc.call_vg_list(nodelist)
864
    node_verify_param = {
865
      'filelist': file_names,
866
      'nodelist': nodelist,
867
      'hypervisor': None,
868
      }
869
    all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
870
    all_rversion = rpc.call_version(nodelist)
871

    
872
    for node in nodelist:
873
      feedback_fn("* Verifying node %s" % node)
874
      result = self._VerifyNode(node, file_names, local_checksums,
875
                                all_vglist[node], all_nvinfo[node],
876
                                all_rversion[node], feedback_fn)
877
      bad = bad or result
878

    
879
      # node_volume
880
      volumeinfo = all_volumeinfo[node]
881

    
882
      if type(volumeinfo) != dict:
883
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
884
        bad = True
885
        continue
886

    
887
      node_volume[node] = volumeinfo
888

    
889
      # node_instance
890
      nodeinstance = all_instanceinfo[node]
891
      if type(nodeinstance) != list:
892
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
893
        bad = True
894
        continue
895

    
896
      node_instance[node] = nodeinstance
897

    
898
    node_vol_should = {}
899

    
900
    for instance in instancelist:
901
      feedback_fn("* Verifying instance %s" % instance)
902
      result =  self._VerifyInstance(instance, node_volume, node_instance,
903
                                     feedback_fn)
904
      bad = bad or result
905

    
906
      inst_config = self.cfg.GetInstanceInfo(instance)
907

    
908
      inst_config.MapLVsByNode(node_vol_should)
909

    
910
    feedback_fn("* Verifying orphan volumes")
911
    result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
912
                                       feedback_fn)
913
    bad = bad or result
914

    
915
    feedback_fn("* Verifying remaining instances")
916
    result = self._VerifyOrphanInstances(instancelist, node_instance,
917
                                         feedback_fn)
918
    bad = bad or result
919

    
920
    return int(bad)
921

    
922

    
923
def _WaitForSync(cfgw, instance, oneshot=False, unlock=False):
924
  """Sleep and poll for an instance's disk to sync.
925

926
  """
927
  if not instance.disks:
928
    return True
929

    
930
  if not oneshot:
931
    logger.ToStdout("Waiting for instance %s to sync disks." % instance.name)
932

    
933
  node = instance.primary_node
934

    
935
  for dev in instance.disks:
936
    cfgw.SetDiskID(dev, node)
937

    
938
  retries = 0
939
  while True:
940
    max_time = 0
941
    done = True
942
    cumul_degraded = False
943
    rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
944
    if not rstats:
945
      logger.ToStderr("Can't get any data from node %s" % node)
946
      retries += 1
947
      if retries >= 10:
948
        raise errors.RemoteError("Can't contact node %s for mirror data,"
949
                                 " aborting." % node)
950
      time.sleep(6)
951
      continue
952
    retries = 0
953
    for i in range(len(rstats)):
954
      mstat = rstats[i]
955
      if mstat is None:
956
        logger.ToStderr("Can't compute data for node %s/%s" %
957
                        (node, instance.disks[i].iv_name))
958
        continue
959
      perc_done, est_time, is_degraded = mstat
960
      cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
961
      if perc_done is not None:
962
        done = False
963
        if est_time is not None:
964
          rem_time = "%d estimated seconds remaining" % est_time
965
          max_time = est_time
966
        else:
967
          rem_time = "no time estimate"
968
        logger.ToStdout("- device %s: %5.2f%% done, %s" %
969
                        (instance.disks[i].iv_name, perc_done, rem_time))
970
    if done or oneshot:
971
      break
972

    
973
    if unlock:
974
      utils.Unlock('cmd')
975
    try:
976
      time.sleep(min(60, max_time))
977
    finally:
978
      if unlock:
979
        utils.Lock('cmd')
980

    
981
  if done:
982
    logger.ToStdout("Instance %s's disks are in sync." % instance.name)
983
  return not cumul_degraded
984

    
985

    
986
def _CheckDiskConsistency(cfgw, dev, node, on_primary):
987
  """Check that mirrors are not degraded.
988

989
  """
990
  cfgw.SetDiskID(dev, node)
991

    
992
  result = True
993
  if on_primary or dev.AssembleOnSecondary():
994
    rstats = rpc.call_blockdev_find(node, dev)
995
    if not rstats:
996
      logger.ToStderr("Can't get any data from node %s" % node)
997
      result = False
998
    else:
999
      result = result and (not rstats[5])
1000
  if dev.children:
1001
    for child in dev.children:
1002
      result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1003

    
1004
  return result
1005

    
1006

    
1007
class LUDiagnoseOS(NoHooksLU):
1008
  """Logical unit for OS diagnose/query.
1009

1010
  """
1011
  _OP_REQP = []
1012

    
1013
  def CheckPrereq(self):
1014
    """Check prerequisites.
1015

1016
    This always succeeds, since this is a pure query LU.
1017

1018
    """
1019
    return
1020

    
1021
  def Exec(self, feedback_fn):
1022
    """Compute the list of OSes.
1023

1024
    """
1025
    node_list = self.cfg.GetNodeList()
1026
    node_data = rpc.call_os_diagnose(node_list)
1027
    if node_data == False:
1028
      raise errors.OpExecError("Can't gather the list of OSes")
1029
    return node_data
1030

    
1031

    
1032
class LURemoveNode(LogicalUnit):
1033
  """Logical unit for removing a node.
1034

1035
  """
1036
  HPATH = "node-remove"
1037
  HTYPE = constants.HTYPE_NODE
1038
  _OP_REQP = ["node_name"]
1039

    
1040
  def BuildHooksEnv(self):
1041
    """Build hooks env.
1042

1043
    This doesn't run on the target node in the pre phase as a failed
1044
    node would not allows itself to run.
1045

1046
    """
1047
    env = {
1048
      "NODE_NAME": self.op.node_name,
1049
      }
1050
    all_nodes = self.cfg.GetNodeList()
1051
    all_nodes.remove(self.op.node_name)
1052
    return env, all_nodes, all_nodes
1053

    
1054
  def CheckPrereq(self):
1055
    """Check prerequisites.
1056

1057
    This checks:
1058
     - the node exists in the configuration
1059
     - it does not have primary or secondary instances
1060
     - it's not the master
1061

1062
    Any errors are signalled by raising errors.OpPrereqError.
1063

1064
    """
1065
    node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1066
    if node is None:
1067
      raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1068

    
1069
    instance_list = self.cfg.GetInstanceList()
1070

    
1071
    masternode = self.sstore.GetMasterNode()
1072
    if node.name == masternode:
1073
      raise errors.OpPrereqError("Node is the master node,"
1074
                                 " you need to failover first.")
1075

    
1076
    for instance_name in instance_list:
1077
      instance = self.cfg.GetInstanceInfo(instance_name)
1078
      if node.name == instance.primary_node:
1079
        raise errors.OpPrereqError("Instance %s still running on the node,"
1080
                                   " please remove first." % instance_name)
1081
      if node.name in instance.secondary_nodes:
1082
        raise errors.OpPrereqError("Instance %s has node as a secondary,"
1083
                                   " please remove first." % instance_name)
1084
    self.op.node_name = node.name
1085
    self.node = node
1086

    
1087
  def Exec(self, feedback_fn):
1088
    """Removes the node from the cluster.
1089

1090
    """
1091
    node = self.node
1092
    logger.Info("stopping the node daemon and removing configs from node %s" %
1093
                node.name)
1094

    
1095
    rpc.call_node_leave_cluster(node.name)
1096

    
1097
    ssh.SSHCall(node.name, 'root', "%s stop" % constants.NODE_INITD_SCRIPT)
1098

    
1099
    logger.Info("Removing node %s from config" % node.name)
1100

    
1101
    self.cfg.RemoveNode(node.name)
1102

    
1103

    
1104
class LUQueryNodes(NoHooksLU):
1105
  """Logical unit for querying nodes.
1106

1107
  """
1108
  _OP_REQP = ["output_fields"]
1109

    
1110
  def CheckPrereq(self):
1111
    """Check prerequisites.
1112

1113
    This checks that the fields required are valid output fields.
1114

1115
    """
1116
    self.dynamic_fields = frozenset(["dtotal", "dfree",
1117
                                     "mtotal", "mnode", "mfree"])
1118

    
1119
    _CheckOutputFields(static=["name", "pinst", "sinst", "pip", "sip"],
1120
                       dynamic=self.dynamic_fields,
1121
                       selected=self.op.output_fields)
1122

    
1123

    
1124
  def Exec(self, feedback_fn):
1125
    """Computes the list of nodes and their attributes.
1126

1127
    """
1128
    nodenames = utils.NiceSort(self.cfg.GetNodeList())
1129
    nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1130

    
1131

    
1132
    # begin data gathering
1133

    
1134
    if self.dynamic_fields.intersection(self.op.output_fields):
1135
      live_data = {}
1136
      node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1137
      for name in nodenames:
1138
        nodeinfo = node_data.get(name, None)
1139
        if nodeinfo:
1140
          live_data[name] = {
1141
            "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1142
            "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1143
            "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1144
            "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1145
            "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1146
            }
1147
        else:
1148
          live_data[name] = {}
1149
    else:
1150
      live_data = dict.fromkeys(nodenames, {})
1151

    
1152
    node_to_primary = dict.fromkeys(nodenames, 0)
1153
    node_to_secondary = dict.fromkeys(nodenames, 0)
1154

    
1155
    if "pinst" in self.op.output_fields or "sinst" in self.op.output_fields:
1156
      instancelist = self.cfg.GetInstanceList()
1157

    
1158
      for instance in instancelist:
1159
        instanceinfo = self.cfg.GetInstanceInfo(instance)
1160
        node_to_primary[instanceinfo.primary_node] += 1
1161
        for secnode in instanceinfo.secondary_nodes:
1162
          node_to_secondary[secnode] += 1
1163

    
1164
    # end data gathering
1165

    
1166
    output = []
1167
    for node in nodelist:
1168
      node_output = []
1169
      for field in self.op.output_fields:
1170
        if field == "name":
1171
          val = node.name
1172
        elif field == "pinst":
1173
          val = node_to_primary[node.name]
1174
        elif field == "sinst":
1175
          val = node_to_secondary[node.name]
1176
        elif field == "pip":
1177
          val = node.primary_ip
1178
        elif field == "sip":
1179
          val = node.secondary_ip
1180
        elif field in self.dynamic_fields:
1181
          val = live_data[node.name].get(field, "?")
1182
        else:
1183
          raise errors.ParameterError(field)
1184
        val = str(val)
1185
        node_output.append(val)
1186
      output.append(node_output)
1187

    
1188
    return output
1189

    
1190

    
1191
class LUQueryNodeVolumes(NoHooksLU):
1192
  """Logical unit for getting volumes on node(s).
1193

1194
  """
1195
  _OP_REQP = ["nodes", "output_fields"]
1196

    
1197
  def CheckPrereq(self):
1198
    """Check prerequisites.
1199

1200
    This checks that the fields required are valid output fields.
1201

1202
    """
1203
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1204

    
1205
    _CheckOutputFields(static=["node"],
1206
                       dynamic=["phys", "vg", "name", "size", "instance"],
1207
                       selected=self.op.output_fields)
1208

    
1209

    
1210
  def Exec(self, feedback_fn):
1211
    """Computes the list of nodes and their attributes.
1212

1213
    """
1214
    nodenames = utils.NiceSort([node.name for node in self.nodes])
1215
    volumes = rpc.call_node_volumes(nodenames)
1216

    
1217
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
1218
             in self.cfg.GetInstanceList()]
1219

    
1220
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1221

    
1222
    output = []
1223
    for node in nodenames:
1224
      if node not in volumes or not volumes[node]:
1225
        continue
1226

    
1227
      node_vols = volumes[node][:]
1228
      node_vols.sort(key=lambda vol: vol['dev'])
1229

    
1230
      for vol in node_vols:
1231
        node_output = []
1232
        for field in self.op.output_fields:
1233
          if field == "node":
1234
            val = node
1235
          elif field == "phys":
1236
            val = vol['dev']
1237
          elif field == "vg":
1238
            val = vol['vg']
1239
          elif field == "name":
1240
            val = vol['name']
1241
          elif field == "size":
1242
            val = int(float(vol['size']))
1243
          elif field == "instance":
1244
            for inst in ilist:
1245
              if node not in lv_by_node[inst]:
1246
                continue
1247
              if vol['name'] in lv_by_node[inst][node]:
1248
                val = inst.name
1249
                break
1250
            else:
1251
              val = '-'
1252
          else:
1253
            raise errors.ParameterError(field)
1254
          node_output.append(str(val))
1255

    
1256
        output.append(node_output)
1257

    
1258
    return output
1259

    
1260

    
1261
class LUAddNode(LogicalUnit):
1262
  """Logical unit for adding node to the cluster.
1263

1264
  """
1265
  HPATH = "node-add"
1266
  HTYPE = constants.HTYPE_NODE
1267
  _OP_REQP = ["node_name"]
1268

    
1269
  def BuildHooksEnv(self):
1270
    """Build hooks env.
1271

1272
    This will run on all nodes before, and on all nodes + the new node after.
1273

1274
    """
1275
    env = {
1276
      "NODE_NAME": self.op.node_name,
1277
      "NODE_PIP": self.op.primary_ip,
1278
      "NODE_SIP": self.op.secondary_ip,
1279
      }
1280
    nodes_0 = self.cfg.GetNodeList()
1281
    nodes_1 = nodes_0 + [self.op.node_name, ]
1282
    return env, nodes_0, nodes_1
1283

    
1284
  def CheckPrereq(self):
1285
    """Check prerequisites.
1286

1287
    This checks:
1288
     - the new node is not already in the config
1289
     - it is resolvable
1290
     - its parameters (single/dual homed) matches the cluster
1291

1292
    Any errors are signalled by raising errors.OpPrereqError.
1293

1294
    """
1295
    node_name = self.op.node_name
1296
    cfg = self.cfg
1297

    
1298
    dns_data = utils.LookupHostname(node_name)
1299
    if not dns_data:
1300
      raise errors.OpPrereqError("Node %s is not resolvable" % node_name)
1301

    
1302
    node = dns_data['hostname']
1303
    primary_ip = self.op.primary_ip = dns_data['ip']
1304
    secondary_ip = getattr(self.op, "secondary_ip", None)
1305
    if secondary_ip is None:
1306
      secondary_ip = primary_ip
1307
    if not utils.IsValidIP(secondary_ip):
1308
      raise errors.OpPrereqError("Invalid secondary IP given")
1309
    self.op.secondary_ip = secondary_ip
1310
    node_list = cfg.GetNodeList()
1311
    if node in node_list:
1312
      raise errors.OpPrereqError("Node %s is already in the configuration"
1313
                                 % node)
1314

    
1315
    for existing_node_name in node_list:
1316
      existing_node = cfg.GetNodeInfo(existing_node_name)
1317
      if (existing_node.primary_ip == primary_ip or
1318
          existing_node.secondary_ip == primary_ip or
1319
          existing_node.primary_ip == secondary_ip or
1320
          existing_node.secondary_ip == secondary_ip):
1321
        raise errors.OpPrereqError("New node ip address(es) conflict with"
1322
                                   " existing node %s" % existing_node.name)
1323

    
1324
    # check that the type of the node (single versus dual homed) is the
1325
    # same as for the master
1326
    myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1327
    master_singlehomed = myself.secondary_ip == myself.primary_ip
1328
    newbie_singlehomed = secondary_ip == primary_ip
1329
    if master_singlehomed != newbie_singlehomed:
1330
      if master_singlehomed:
1331
        raise errors.OpPrereqError("The master has no private ip but the"
1332
                                   " new node has one")
1333
      else:
1334
        raise errors.OpPrereqError("The master has a private ip but the"
1335
                                   " new node doesn't have one")
1336

    
1337
    # checks reachablity
1338
    command = ["fping", "-q", primary_ip]
1339
    result = utils.RunCmd(command)
1340
    if result.failed:
1341
      raise errors.OpPrereqError("Node not reachable by ping")
1342

    
1343
    if not newbie_singlehomed:
1344
      # check reachability from my secondary ip to newbie's secondary ip
1345
      command = ["fping", "-S%s" % myself.secondary_ip, "-q", secondary_ip]
1346
      result = utils.RunCmd(command)
1347
      if result.failed:
1348
        raise errors.OpPrereqError("Node secondary ip not reachable by ping")
1349

    
1350
    self.new_node = objects.Node(name=node,
1351
                                 primary_ip=primary_ip,
1352
                                 secondary_ip=secondary_ip)
1353

    
1354
  def Exec(self, feedback_fn):
1355
    """Adds the new node to the cluster.
1356

1357
    """
1358
    new_node = self.new_node
1359
    node = new_node.name
1360

    
1361
    # set up inter-node password and certificate and restarts the node daemon
1362
    gntpass = self.sstore.GetNodeDaemonPassword()
1363
    if not re.match('^[a-zA-Z0-9.]{1,64}$', gntpass):
1364
      raise errors.OpExecError("ganeti password corruption detected")
1365
    f = open(constants.SSL_CERT_FILE)
1366
    try:
1367
      gntpem = f.read(8192)
1368
    finally:
1369
      f.close()
1370
    # in the base64 pem encoding, neither '!' nor '.' are valid chars,
1371
    # so we use this to detect an invalid certificate; as long as the
1372
    # cert doesn't contain this, the here-document will be correctly
1373
    # parsed by the shell sequence below
1374
    if re.search('^!EOF\.', gntpem, re.MULTILINE):
1375
      raise errors.OpExecError("invalid PEM encoding in the SSL certificate")
1376
    if not gntpem.endswith("\n"):
1377
      raise errors.OpExecError("PEM must end with newline")
1378
    logger.Info("copy cluster pass to %s and starting the node daemon" % node)
1379

    
1380
    # and then connect with ssh to set password and start ganeti-noded
1381
    # note that all the below variables are sanitized at this point,
1382
    # either by being constants or by the checks above
1383
    ss = self.sstore
1384
    mycommand = ("umask 077 && "
1385
                 "echo '%s' > '%s' && "
1386
                 "cat > '%s' << '!EOF.' && \n"
1387
                 "%s!EOF.\n%s restart" %
1388
                 (gntpass, ss.KeyToFilename(ss.SS_NODED_PASS),
1389
                  constants.SSL_CERT_FILE, gntpem,
1390
                  constants.NODE_INITD_SCRIPT))
1391

    
1392
    result = ssh.SSHCall(node, 'root', mycommand, batch=False, ask_key=True)
1393
    if result.failed:
1394
      raise errors.OpExecError("Remote command on node %s, error: %s,"
1395
                               " output: %s" %
1396
                               (node, result.fail_reason, result.output))
1397

    
1398
    # check connectivity
1399
    time.sleep(4)
1400

    
1401
    result = rpc.call_version([node])[node]
1402
    if result:
1403
      if constants.PROTOCOL_VERSION == result:
1404
        logger.Info("communication to node %s fine, sw version %s match" %
1405
                    (node, result))
1406
      else:
1407
        raise errors.OpExecError("Version mismatch master version %s,"
1408
                                 " node version %s" %
1409
                                 (constants.PROTOCOL_VERSION, result))
1410
    else:
1411
      raise errors.OpExecError("Cannot get version from the new node")
1412

    
1413
    # setup ssh on node
1414
    logger.Info("copy ssh key to node %s" % node)
1415
    keyarray = []
1416
    keyfiles = ["/etc/ssh/ssh_host_dsa_key", "/etc/ssh/ssh_host_dsa_key.pub",
1417
                "/etc/ssh/ssh_host_rsa_key", "/etc/ssh/ssh_host_rsa_key.pub",
1418
                "/root/.ssh/id_dsa", "/root/.ssh/id_dsa.pub"]
1419

    
1420
    for i in keyfiles:
1421
      f = open(i, 'r')
1422
      try:
1423
        keyarray.append(f.read())
1424
      finally:
1425
        f.close()
1426

    
1427
    result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1428
                               keyarray[3], keyarray[4], keyarray[5])
1429

    
1430
    if not result:
1431
      raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1432

    
1433
    # Add node to our /etc/hosts, and add key to known_hosts
1434
    _UpdateEtcHosts(new_node.name, new_node.primary_ip)
1435
    _UpdateKnownHosts(new_node.name, new_node.primary_ip,
1436
                      self.cfg.GetHostKey())
1437

    
1438
    if new_node.secondary_ip != new_node.primary_ip:
1439
      result = ssh.SSHCall(node, "root",
1440
                           "fping -S 127.0.0.1 -q %s" % new_node.secondary_ip)
1441
      if result.failed:
1442
        raise errors.OpExecError("Node claims it doesn't have the"
1443
                                 " secondary ip you gave (%s).\n"
1444
                                 "Please fix and re-run this command." %
1445
                                 new_node.secondary_ip)
1446

    
1447
    success, msg = ssh.VerifyNodeHostname(node)
1448
    if not success:
1449
      raise errors.OpExecError("Node '%s' claims it has a different hostname"
1450
                               " than the one the resolver gives: %s.\n"
1451
                               "Please fix and re-run this command." %
1452
                               (node, msg))
1453

    
1454
    # Distribute updated /etc/hosts and known_hosts to all nodes,
1455
    # including the node just added
1456
    myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1457
    dist_nodes = self.cfg.GetNodeList() + [node]
1458
    if myself.name in dist_nodes:
1459
      dist_nodes.remove(myself.name)
1460

    
1461
    logger.Debug("Copying hosts and known_hosts to all nodes")
1462
    for fname in ("/etc/hosts", constants.SSH_KNOWN_HOSTS_FILE):
1463
      result = rpc.call_upload_file(dist_nodes, fname)
1464
      for to_node in dist_nodes:
1465
        if not result[to_node]:
1466
          logger.Error("copy of file %s to node %s failed" %
1467
                       (fname, to_node))
1468

    
1469
    to_copy = ss.GetFileList()
1470
    for fname in to_copy:
1471
      if not ssh.CopyFileToNode(node, fname):
1472
        logger.Error("could not copy file %s to node %s" % (fname, node))
1473

    
1474
    logger.Info("adding node %s to cluster.conf" % node)
1475
    self.cfg.AddNode(new_node)
1476

    
1477

    
1478
class LUMasterFailover(LogicalUnit):
1479
  """Failover the master node to the current node.
1480

1481
  This is a special LU in that it must run on a non-master node.
1482

1483
  """
1484
  HPATH = "master-failover"
1485
  HTYPE = constants.HTYPE_CLUSTER
1486
  REQ_MASTER = False
1487
  _OP_REQP = []
1488

    
1489
  def BuildHooksEnv(self):
1490
    """Build hooks env.
1491

1492
    This will run on the new master only in the pre phase, and on all
1493
    the nodes in the post phase.
1494

1495
    """
1496
    env = {
1497
      "NEW_MASTER": self.new_master,
1498
      "OLD_MASTER": self.old_master,
1499
      }
1500
    return env, [self.new_master], self.cfg.GetNodeList()
1501

    
1502
  def CheckPrereq(self):
1503
    """Check prerequisites.
1504

1505
    This checks that we are not already the master.
1506

1507
    """
1508
    self.new_master = socket.gethostname()
1509

    
1510
    self.old_master = self.sstore.GetMasterNode()
1511

    
1512
    if self.old_master == self.new_master:
1513
      raise errors.OpPrereqError("This commands must be run on the node"
1514
                                 " where you want the new master to be.\n"
1515
                                 "%s is already the master" %
1516
                                 self.old_master)
1517

    
1518
  def Exec(self, feedback_fn):
1519
    """Failover the master node.
1520

1521
    This command, when run on a non-master node, will cause the current
1522
    master to cease being master, and the non-master to become new
1523
    master.
1524

1525
    """
1526
    #TODO: do not rely on gethostname returning the FQDN
1527
    logger.Info("setting master to %s, old master: %s" %
1528
                (self.new_master, self.old_master))
1529

    
1530
    if not rpc.call_node_stop_master(self.old_master):
1531
      logger.Error("could disable the master role on the old master"
1532
                   " %s, please disable manually" % self.old_master)
1533

    
1534
    ss = self.sstore
1535
    ss.SetKey(ss.SS_MASTER_NODE, self.new_master)
1536
    if not rpc.call_upload_file(self.cfg.GetNodeList(),
1537
                                ss.KeyToFilename(ss.SS_MASTER_NODE)):
1538
      logger.Error("could not distribute the new simple store master file"
1539
                   " to the other nodes, please check.")
1540

    
1541
    if not rpc.call_node_start_master(self.new_master):
1542
      logger.Error("could not start the master role on the new master"
1543
                   " %s, please check" % self.new_master)
1544
      feedback_fn("Error in activating the master IP on the new master,\n"
1545
                  "please fix manually.")
1546

    
1547

    
1548

    
1549
class LUQueryClusterInfo(NoHooksLU):
1550
  """Query cluster configuration.
1551

1552
  """
1553
  _OP_REQP = []
1554
  REQ_MASTER = False
1555

    
1556
  def CheckPrereq(self):
1557
    """No prerequsites needed for this LU.
1558

1559
    """
1560
    pass
1561

    
1562
  def Exec(self, feedback_fn):
1563
    """Return cluster config.
1564

1565
    """
1566
    result = {
1567
      "name": self.sstore.GetClusterName(),
1568
      "software_version": constants.RELEASE_VERSION,
1569
      "protocol_version": constants.PROTOCOL_VERSION,
1570
      "config_version": constants.CONFIG_VERSION,
1571
      "os_api_version": constants.OS_API_VERSION,
1572
      "export_version": constants.EXPORT_VERSION,
1573
      "master": self.sstore.GetMasterNode(),
1574
      "architecture": (platform.architecture()[0], platform.machine()),
1575
      }
1576

    
1577
    return result
1578

    
1579

    
1580
class LUClusterCopyFile(NoHooksLU):
1581
  """Copy file to cluster.
1582

1583
  """
1584
  _OP_REQP = ["nodes", "filename"]
1585

    
1586
  def CheckPrereq(self):
1587
    """Check prerequisites.
1588

1589
    It should check that the named file exists and that the given list
1590
    of nodes is valid.
1591

1592
    """
1593
    if not os.path.exists(self.op.filename):
1594
      raise errors.OpPrereqError("No such filename '%s'" % self.op.filename)
1595

    
1596
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1597

    
1598
  def Exec(self, feedback_fn):
1599
    """Copy a file from master to some nodes.
1600

1601
    Args:
1602
      opts - class with options as members
1603
      args - list containing a single element, the file name
1604
    Opts used:
1605
      nodes - list containing the name of target nodes; if empty, all nodes
1606

1607
    """
1608
    filename = self.op.filename
1609

    
1610
    myname = socket.gethostname()
1611

    
1612
    for node in [node.name for node in self.nodes]:
1613
      if node == myname:
1614
        continue
1615
      if not ssh.CopyFileToNode(node, filename):
1616
        logger.Error("Copy of file %s to node %s failed" % (filename, node))
1617

    
1618

    
1619
class LUDumpClusterConfig(NoHooksLU):
1620
  """Return a text-representation of the cluster-config.
1621

1622
  """
1623
  _OP_REQP = []
1624

    
1625
  def CheckPrereq(self):
1626
    """No prerequisites.
1627

1628
    """
1629
    pass
1630

    
1631
  def Exec(self, feedback_fn):
1632
    """Dump a representation of the cluster config to the standard output.
1633

1634
    """
1635
    return self.cfg.DumpConfig()
1636

    
1637

    
1638
class LURunClusterCommand(NoHooksLU):
1639
  """Run a command on some nodes.
1640

1641
  """
1642
  _OP_REQP = ["command", "nodes"]
1643

    
1644
  def CheckPrereq(self):
1645
    """Check prerequisites.
1646

1647
    It checks that the given list of nodes is valid.
1648

1649
    """
1650
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1651

    
1652
  def Exec(self, feedback_fn):
1653
    """Run a command on some nodes.
1654

1655
    """
1656
    data = []
1657
    for node in self.nodes:
1658
      result = ssh.SSHCall(node.name, "root", self.op.command)
1659
      data.append((node.name, result.output, result.exit_code))
1660

    
1661
    return data
1662

    
1663

    
1664
class LUActivateInstanceDisks(NoHooksLU):
1665
  """Bring up an instance's disks.
1666

1667
  """
1668
  _OP_REQP = ["instance_name"]
1669

    
1670
  def CheckPrereq(self):
1671
    """Check prerequisites.
1672

1673
    This checks that the instance is in the cluster.
1674

1675
    """
1676
    instance = self.cfg.GetInstanceInfo(
1677
      self.cfg.ExpandInstanceName(self.op.instance_name))
1678
    if instance is None:
1679
      raise errors.OpPrereqError("Instance '%s' not known" %
1680
                                 self.op.instance_name)
1681
    self.instance = instance
1682

    
1683

    
1684
  def Exec(self, feedback_fn):
1685
    """Activate the disks.
1686

1687
    """
1688
    disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
1689
    if not disks_ok:
1690
      raise errors.OpExecError("Cannot activate block devices")
1691

    
1692
    return disks_info
1693

    
1694

    
1695
def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
1696
  """Prepare the block devices for an instance.
1697

1698
  This sets up the block devices on all nodes.
1699

1700
  Args:
1701
    instance: a ganeti.objects.Instance object
1702
    ignore_secondaries: if true, errors on secondary nodes won't result
1703
                        in an error return from the function
1704

1705
  Returns:
1706
    false if the operation failed
1707
    list of (host, instance_visible_name, node_visible_name) if the operation
1708
         suceeded with the mapping from node devices to instance devices
1709
  """
1710
  device_info = []
1711
  disks_ok = True
1712
  for inst_disk in instance.disks:
1713
    master_result = None
1714
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1715
      cfg.SetDiskID(node_disk, node)
1716
      is_primary = node == instance.primary_node
1717
      result = rpc.call_blockdev_assemble(node, node_disk, is_primary)
1718
      if not result:
1719
        logger.Error("could not prepare block device %s on node %s (is_pri"
1720
                     "mary=%s)" % (inst_disk.iv_name, node, is_primary))
1721
        if is_primary or not ignore_secondaries:
1722
          disks_ok = False
1723
      if is_primary:
1724
        master_result = result
1725
    device_info.append((instance.primary_node, inst_disk.iv_name,
1726
                        master_result))
1727

    
1728
  return disks_ok, device_info
1729

    
1730

    
1731
def _StartInstanceDisks(cfg, instance, force):
1732
  """Start the disks of an instance.
1733

1734
  """
1735
  disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
1736
                                           ignore_secondaries=force)
1737
  if not disks_ok:
1738
    _ShutdownInstanceDisks(instance, cfg)
1739
    if force is not None and not force:
1740
      logger.Error("If the message above refers to a secondary node,"
1741
                   " you can retry the operation using '--force'.")
1742
    raise errors.OpExecError("Disk consistency error")
1743

    
1744

    
1745
class LUDeactivateInstanceDisks(NoHooksLU):
1746
  """Shutdown an instance's disks.
1747

1748
  """
1749
  _OP_REQP = ["instance_name"]
1750

    
1751
  def CheckPrereq(self):
1752
    """Check prerequisites.
1753

1754
    This checks that the instance is in the cluster.
1755

1756
    """
1757
    instance = self.cfg.GetInstanceInfo(
1758
      self.cfg.ExpandInstanceName(self.op.instance_name))
1759
    if instance is None:
1760
      raise errors.OpPrereqError("Instance '%s' not known" %
1761
                                 self.op.instance_name)
1762
    self.instance = instance
1763

    
1764
  def Exec(self, feedback_fn):
1765
    """Deactivate the disks
1766

1767
    """
1768
    instance = self.instance
1769
    ins_l = rpc.call_instance_list([instance.primary_node])
1770
    ins_l = ins_l[instance.primary_node]
1771
    if not type(ins_l) is list:
1772
      raise errors.OpExecError("Can't contact node '%s'" %
1773
                               instance.primary_node)
1774

    
1775
    if self.instance.name in ins_l:
1776
      raise errors.OpExecError("Instance is running, can't shutdown"
1777
                               " block devices.")
1778

    
1779
    _ShutdownInstanceDisks(instance, self.cfg)
1780

    
1781

    
1782
def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
1783
  """Shutdown block devices of an instance.
1784

1785
  This does the shutdown on all nodes of the instance.
1786

1787
  If the ignore_primary is false, errors on the primary node are
1788
  ignored.
1789

1790
  """
1791
  result = True
1792
  for disk in instance.disks:
1793
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
1794
      cfg.SetDiskID(top_disk, node)
1795
      if not rpc.call_blockdev_shutdown(node, top_disk):
1796
        logger.Error("could not shutdown block device %s on node %s" %
1797
                     (disk.iv_name, node))
1798
        if not ignore_primary or node != instance.primary_node:
1799
          result = False
1800
  return result
1801

    
1802

    
1803
class LUStartupInstance(LogicalUnit):
1804
  """Starts an instance.
1805

1806
  """
1807
  HPATH = "instance-start"
1808
  HTYPE = constants.HTYPE_INSTANCE
1809
  _OP_REQP = ["instance_name", "force"]
1810

    
1811
  def BuildHooksEnv(self):
1812
    """Build hooks env.
1813

1814
    This runs on master, primary and secondary nodes of the instance.
1815

1816
    """
1817
    env = {
1818
      "FORCE": self.op.force,
1819
      }
1820
    env.update(_BuildInstanceHookEnvByObject(self.instance))
1821
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
1822
          list(self.instance.secondary_nodes))
1823
    return env, nl, nl
1824

    
1825
  def CheckPrereq(self):
1826
    """Check prerequisites.
1827

1828
    This checks that the instance is in the cluster.
1829

1830
    """
1831
    instance = self.cfg.GetInstanceInfo(
1832
      self.cfg.ExpandInstanceName(self.op.instance_name))
1833
    if instance is None:
1834
      raise errors.OpPrereqError("Instance '%s' not known" %
1835
                                 self.op.instance_name)
1836

    
1837
    # check bridges existance
1838
    brlist = [nic.bridge for nic in instance.nics]
1839
    if not rpc.call_bridges_exist(instance.primary_node, brlist):
1840
      raise errors.OpPrereqError("one or more target bridges %s does not"
1841
                                 " exist on destination node '%s'" %
1842
                                 (brlist, instance.primary_node))
1843

    
1844
    self.instance = instance
1845
    self.op.instance_name = instance.name
1846

    
1847
  def Exec(self, feedback_fn):
1848
    """Start the instance.
1849

1850
    """
1851
    instance = self.instance
1852
    force = self.op.force
1853
    extra_args = getattr(self.op, "extra_args", "")
1854

    
1855
    node_current = instance.primary_node
1856

    
1857
    nodeinfo = rpc.call_node_info([node_current], self.cfg.GetVGName())
1858
    if not nodeinfo:
1859
      raise errors.OpExecError("Could not contact node %s for infos" %
1860
                               (node_current))
1861

    
1862
    freememory = nodeinfo[node_current]['memory_free']
1863
    memory = instance.memory
1864
    if memory > freememory:
1865
      raise errors.OpExecError("Not enough memory to start instance"
1866
                               " %s on node %s"
1867
                               " needed %s MiB, available %s MiB" %
1868
                               (instance.name, node_current, memory,
1869
                                freememory))
1870

    
1871
    _StartInstanceDisks(self.cfg, instance, force)
1872

    
1873
    if not rpc.call_instance_start(node_current, instance, extra_args):
1874
      _ShutdownInstanceDisks(instance, self.cfg)
1875
      raise errors.OpExecError("Could not start instance")
1876

    
1877
    self.cfg.MarkInstanceUp(instance.name)
1878

    
1879

    
1880
class LUShutdownInstance(LogicalUnit):
1881
  """Shutdown an instance.
1882

1883
  """
1884
  HPATH = "instance-stop"
1885
  HTYPE = constants.HTYPE_INSTANCE
1886
  _OP_REQP = ["instance_name"]
1887

    
1888
  def BuildHooksEnv(self):
1889
    """Build hooks env.
1890

1891
    This runs on master, primary and secondary nodes of the instance.
1892

1893
    """
1894
    env = _BuildInstanceHookEnvByObject(self.instance)
1895
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
1896
          list(self.instance.secondary_nodes))
1897
    return env, nl, nl
1898

    
1899
  def CheckPrereq(self):
1900
    """Check prerequisites.
1901

1902
    This checks that the instance is in the cluster.
1903

1904
    """
1905
    instance = self.cfg.GetInstanceInfo(
1906
      self.cfg.ExpandInstanceName(self.op.instance_name))
1907
    if instance is None:
1908
      raise errors.OpPrereqError("Instance '%s' not known" %
1909
                                 self.op.instance_name)
1910
    self.instance = instance
1911

    
1912
  def Exec(self, feedback_fn):
1913
    """Shutdown the instance.
1914

1915
    """
1916
    instance = self.instance
1917
    node_current = instance.primary_node
1918
    if not rpc.call_instance_shutdown(node_current, instance):
1919
      logger.Error("could not shutdown instance")
1920

    
1921
    self.cfg.MarkInstanceDown(instance.name)
1922
    _ShutdownInstanceDisks(instance, self.cfg)
1923

    
1924

    
1925
class LUReinstallInstance(LogicalUnit):
1926
  """Reinstall an instance.
1927

1928
  """
1929
  HPATH = "instance-reinstall"
1930
  HTYPE = constants.HTYPE_INSTANCE
1931
  _OP_REQP = ["instance_name"]
1932

    
1933
  def BuildHooksEnv(self):
1934
    """Build hooks env.
1935

1936
    This runs on master, primary and secondary nodes of the instance.
1937

1938
    """
1939
    env = _BuildInstanceHookEnvByObject(self.instance)
1940
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
1941
          list(self.instance.secondary_nodes))
1942
    return env, nl, nl
1943

    
1944
  def CheckPrereq(self):
1945
    """Check prerequisites.
1946

1947
    This checks that the instance is in the cluster and is not running.
1948

1949
    """
1950
    instance = self.cfg.GetInstanceInfo(
1951
      self.cfg.ExpandInstanceName(self.op.instance_name))
1952
    if instance is None:
1953
      raise errors.OpPrereqError("Instance '%s' not known" %
1954
                                 self.op.instance_name)
1955
    if instance.disk_template == constants.DT_DISKLESS:
1956
      raise errors.OpPrereqError("Instance '%s' has no disks" %
1957
                                 self.op.instance_name)
1958
    if instance.status != "down":
1959
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
1960
                                 self.op.instance_name)
1961
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
1962
    if remote_info:
1963
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
1964
                                 (self.op.instance_name,
1965
                                  instance.primary_node))
1966

    
1967
    self.op.os_type = getattr(self.op, "os_type", None)
1968
    if self.op.os_type is not None:
1969
      # OS verification
1970
      pnode = self.cfg.GetNodeInfo(
1971
        self.cfg.ExpandNodeName(instance.primary_node))
1972
      if pnode is None:
1973
        raise errors.OpPrereqError("Primary node '%s' is unknown" %
1974
                                   self.op.pnode)
1975
      os_obj = rpc.call_os_get([pnode.name], self.op.os_type)[pnode.name]
1976
      if not isinstance(os_obj, objects.OS):
1977
        raise errors.OpPrereqError("OS '%s' not in supported OS list for"
1978
                                   " primary node"  % self.op.os_type)
1979

    
1980
    self.instance = instance
1981

    
1982
  def Exec(self, feedback_fn):
1983
    """Reinstall the instance.
1984

1985
    """
1986
    inst = self.instance
1987

    
1988
    if self.op.os_type is not None:
1989
      feedback_fn("Changing OS to '%s'..." % self.op.os_type)
1990
      inst.os = self.op.os_type
1991
      self.cfg.AddInstance(inst)
1992

    
1993
    _StartInstanceDisks(self.cfg, inst, None)
1994
    try:
1995
      feedback_fn("Running the instance OS create scripts...")
1996
      if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
1997
        raise errors.OpExecError("Could not install OS for instance %s "
1998
                                 "on node %s" %
1999
                                 (inst.name, inst.primary_node))
2000
    finally:
2001
      _ShutdownInstanceDisks(inst, self.cfg)
2002

    
2003

    
2004
class LURemoveInstance(LogicalUnit):
2005
  """Remove an instance.
2006

2007
  """
2008
  HPATH = "instance-remove"
2009
  HTYPE = constants.HTYPE_INSTANCE
2010
  _OP_REQP = ["instance_name"]
2011

    
2012
  def BuildHooksEnv(self):
2013
    """Build hooks env.
2014

2015
    This runs on master, primary and secondary nodes of the instance.
2016

2017
    """
2018
    env = _BuildInstanceHookEnvByObject(self.instance)
2019
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2020
          list(self.instance.secondary_nodes))
2021
    return env, nl, nl
2022

    
2023
  def CheckPrereq(self):
2024
    """Check prerequisites.
2025

2026
    This checks that the instance is in the cluster.
2027

2028
    """
2029
    instance = self.cfg.GetInstanceInfo(
2030
      self.cfg.ExpandInstanceName(self.op.instance_name))
2031
    if instance is None:
2032
      raise errors.OpPrereqError("Instance '%s' not known" %
2033
                                 self.op.instance_name)
2034
    self.instance = instance
2035

    
2036
  def Exec(self, feedback_fn):
2037
    """Remove the instance.
2038

2039
    """
2040
    instance = self.instance
2041
    logger.Info("shutting down instance %s on node %s" %
2042
                (instance.name, instance.primary_node))
2043

    
2044
    if not rpc.call_instance_shutdown(instance.primary_node, instance):
2045
      raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2046
                               (instance.name, instance.primary_node))
2047

    
2048
    logger.Info("removing block devices for instance %s" % instance.name)
2049

    
2050
    _RemoveDisks(instance, self.cfg)
2051

    
2052
    logger.Info("removing instance %s out of cluster config" % instance.name)
2053

    
2054
    self.cfg.RemoveInstance(instance.name)
2055

    
2056

    
2057
class LUQueryInstances(NoHooksLU):
2058
  """Logical unit for querying instances.
2059

2060
  """
2061
  _OP_REQP = ["output_fields"]
2062

    
2063
  def CheckPrereq(self):
2064
    """Check prerequisites.
2065

2066
    This checks that the fields required are valid output fields.
2067

2068
    """
2069
    self.dynamic_fields = frozenset(["oper_state", "oper_ram"])
2070
    _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2071
                               "admin_state", "admin_ram",
2072
                               "disk_template", "ip", "mac", "bridge",
2073
                               "sda_size", "sdb_size"],
2074
                       dynamic=self.dynamic_fields,
2075
                       selected=self.op.output_fields)
2076

    
2077
  def Exec(self, feedback_fn):
2078
    """Computes the list of nodes and their attributes.
2079

2080
    """
2081
    instance_names = utils.NiceSort(self.cfg.GetInstanceList())
2082
    instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2083
                     in instance_names]
2084

    
2085
    # begin data gathering
2086

    
2087
    nodes = frozenset([inst.primary_node for inst in instance_list])
2088

    
2089
    bad_nodes = []
2090
    if self.dynamic_fields.intersection(self.op.output_fields):
2091
      live_data = {}
2092
      node_data = rpc.call_all_instances_info(nodes)
2093
      for name in nodes:
2094
        result = node_data[name]
2095
        if result:
2096
          live_data.update(result)
2097
        elif result == False:
2098
          bad_nodes.append(name)
2099
        # else no instance is alive
2100
    else:
2101
      live_data = dict([(name, {}) for name in instance_names])
2102

    
2103
    # end data gathering
2104

    
2105
    output = []
2106
    for instance in instance_list:
2107
      iout = []
2108
      for field in self.op.output_fields:
2109
        if field == "name":
2110
          val = instance.name
2111
        elif field == "os":
2112
          val = instance.os
2113
        elif field == "pnode":
2114
          val = instance.primary_node
2115
        elif field == "snodes":
2116
          val = ",".join(instance.secondary_nodes) or "-"
2117
        elif field == "admin_state":
2118
          if instance.status == "down":
2119
            val = "no"
2120
          else:
2121
            val = "yes"
2122
        elif field == "oper_state":
2123
          if instance.primary_node in bad_nodes:
2124
            val = "(node down)"
2125
          else:
2126
            if live_data.get(instance.name):
2127
              val = "running"
2128
            else:
2129
              val = "stopped"
2130
        elif field == "admin_ram":
2131
          val = instance.memory
2132
        elif field == "oper_ram":
2133
          if instance.primary_node in bad_nodes:
2134
            val = "(node down)"
2135
          elif instance.name in live_data:
2136
            val = live_data[instance.name].get("memory", "?")
2137
          else:
2138
            val = "-"
2139
        elif field == "disk_template":
2140
          val = instance.disk_template
2141
        elif field == "ip":
2142
          val = instance.nics[0].ip
2143
        elif field == "bridge":
2144
          val = instance.nics[0].bridge
2145
        elif field == "mac":
2146
          val = instance.nics[0].mac
2147
        elif field == "sda_size" or field == "sdb_size":
2148
          disk = instance.FindDisk(field[:3])
2149
          if disk is None:
2150
            val = "N/A"
2151
          else:
2152
            val = disk.size
2153
        else:
2154
          raise errors.ParameterError(field)
2155
        val = str(val)
2156
        iout.append(val)
2157
      output.append(iout)
2158

    
2159
    return output
2160

    
2161

    
2162
class LUFailoverInstance(LogicalUnit):
2163
  """Failover an instance.
2164

2165
  """
2166
  HPATH = "instance-failover"
2167
  HTYPE = constants.HTYPE_INSTANCE
2168
  _OP_REQP = ["instance_name", "ignore_consistency"]
2169

    
2170
  def BuildHooksEnv(self):
2171
    """Build hooks env.
2172

2173
    This runs on master, primary and secondary nodes of the instance.
2174

2175
    """
2176
    env = {
2177
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2178
      }
2179
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2180
    nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2181
    return env, nl, nl
2182

    
2183
  def CheckPrereq(self):
2184
    """Check prerequisites.
2185

2186
    This checks that the instance is in the cluster.
2187

2188
    """
2189
    instance = self.cfg.GetInstanceInfo(
2190
      self.cfg.ExpandInstanceName(self.op.instance_name))
2191
    if instance is None:
2192
      raise errors.OpPrereqError("Instance '%s' not known" %
2193
                                 self.op.instance_name)
2194

    
2195
    if instance.disk_template != constants.DT_REMOTE_RAID1:
2196
      raise errors.OpPrereqError("Instance's disk layout is not"
2197
                                 " remote_raid1.")
2198

    
2199
    secondary_nodes = instance.secondary_nodes
2200
    if not secondary_nodes:
2201
      raise errors.ProgrammerError("no secondary node but using "
2202
                                   "DT_REMOTE_RAID1 template")
2203

    
2204
    # check memory requirements on the secondary node
2205
    target_node = secondary_nodes[0]
2206
    nodeinfo = rpc.call_node_info([target_node], self.cfg.GetVGName())
2207
    info = nodeinfo.get(target_node, None)
2208
    if not info:
2209
      raise errors.OpPrereqError("Cannot get current information"
2210
                                 " from node '%s'" % nodeinfo)
2211
    if instance.memory > info['memory_free']:
2212
      raise errors.OpPrereqError("Not enough memory on target node %s."
2213
                                 " %d MB available, %d MB required" %
2214
                                 (target_node, info['memory_free'],
2215
                                  instance.memory))
2216

    
2217
    # check bridge existance
2218
    brlist = [nic.bridge for nic in instance.nics]
2219
    if not rpc.call_bridges_exist(instance.primary_node, brlist):
2220
      raise errors.OpPrereqError("One or more target bridges %s does not"
2221
                                 " exist on destination node '%s'" %
2222
                                 (brlist, instance.primary_node))
2223

    
2224
    self.instance = instance
2225

    
2226
  def Exec(self, feedback_fn):
2227
    """Failover an instance.
2228

2229
    The failover is done by shutting it down on its present node and
2230
    starting it on the secondary.
2231

2232
    """
2233
    instance = self.instance
2234

    
2235
    source_node = instance.primary_node
2236
    target_node = instance.secondary_nodes[0]
2237

    
2238
    feedback_fn("* checking disk consistency between source and target")
2239
    for dev in instance.disks:
2240
      # for remote_raid1, these are md over drbd
2241
      if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2242
        if not self.op.ignore_consistency:
2243
          raise errors.OpExecError("Disk %s is degraded on target node,"
2244
                                   " aborting failover." % dev.iv_name)
2245

    
2246
    feedback_fn("* checking target node resource availability")
2247
    nodeinfo = rpc.call_node_info([target_node], self.cfg.GetVGName())
2248

    
2249
    if not nodeinfo:
2250
      raise errors.OpExecError("Could not contact target node %s." %
2251
                               target_node)
2252

    
2253
    free_memory = int(nodeinfo[target_node]['memory_free'])
2254
    memory = instance.memory
2255
    if memory > free_memory:
2256
      raise errors.OpExecError("Not enough memory to create instance %s on"
2257
                               " node %s. needed %s MiB, available %s MiB" %
2258
                               (instance.name, target_node, memory,
2259
                                free_memory))
2260

    
2261
    feedback_fn("* shutting down instance on source node")
2262
    logger.Info("Shutting down instance %s on node %s" %
2263
                (instance.name, source_node))
2264

    
2265
    if not rpc.call_instance_shutdown(source_node, instance):
2266
      logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2267
                   " anyway. Please make sure node %s is down"  %
2268
                   (instance.name, source_node, source_node))
2269

    
2270
    feedback_fn("* deactivating the instance's disks on source node")
2271
    if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2272
      raise errors.OpExecError("Can't shut down the instance's disks.")
2273

    
2274
    instance.primary_node = target_node
2275
    # distribute new instance config to the other nodes
2276
    self.cfg.AddInstance(instance)
2277

    
2278
    feedback_fn("* activating the instance's disks on target node")
2279
    logger.Info("Starting instance %s on node %s" %
2280
                (instance.name, target_node))
2281

    
2282
    disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2283
                                             ignore_secondaries=True)
2284
    if not disks_ok:
2285
      _ShutdownInstanceDisks(instance, self.cfg)
2286
      raise errors.OpExecError("Can't activate the instance's disks")
2287

    
2288
    feedback_fn("* starting the instance on the target node")
2289
    if not rpc.call_instance_start(target_node, instance, None):
2290
      _ShutdownInstanceDisks(instance, self.cfg)
2291
      raise errors.OpExecError("Could not start instance %s on node %s." %
2292
                               (instance.name, target_node))
2293

    
2294

    
2295
def _CreateBlockDevOnPrimary(cfg, node, device, info):
2296
  """Create a tree of block devices on the primary node.
2297

2298
  This always creates all devices.
2299

2300
  """
2301
  if device.children:
2302
    for child in device.children:
2303
      if not _CreateBlockDevOnPrimary(cfg, node, child, info):
2304
        return False
2305

    
2306
  cfg.SetDiskID(device, node)
2307
  new_id = rpc.call_blockdev_create(node, device, device.size, True, info)
2308
  if not new_id:
2309
    return False
2310
  if device.physical_id is None:
2311
    device.physical_id = new_id
2312
  return True
2313

    
2314

    
2315
def _CreateBlockDevOnSecondary(cfg, node, device, force, info):
2316
  """Create a tree of block devices on a secondary node.
2317

2318
  If this device type has to be created on secondaries, create it and
2319
  all its children.
2320

2321
  If not, just recurse to children keeping the same 'force' value.
2322

2323
  """
2324
  if device.CreateOnSecondary():
2325
    force = True
2326
  if device.children:
2327
    for child in device.children:
2328
      if not _CreateBlockDevOnSecondary(cfg, node, child, force, info):
2329
        return False
2330

    
2331
  if not force:
2332
    return True
2333
  cfg.SetDiskID(device, node)
2334
  new_id = rpc.call_blockdev_create(node, device, device.size, False, info)
2335
  if not new_id:
2336
    return False
2337
  if device.physical_id is None:
2338
    device.physical_id = new_id
2339
  return True
2340

    
2341

    
2342
def _GenerateUniqueNames(cfg, exts):
2343
  """Generate a suitable LV name.
2344

2345
  This will generate a logical volume name for the given instance.
2346

2347
  """
2348
  results = []
2349
  for val in exts:
2350
    new_id = cfg.GenerateUniqueID()
2351
    results.append("%s%s" % (new_id, val))
2352
  return results
2353

    
2354

    
2355
def _GenerateMDDRBDBranch(cfg, primary, secondary, size, names):
2356
  """Generate a drbd device complete with its children.
2357

2358
  """
2359
  port = cfg.AllocatePort()
2360
  vgname = cfg.GetVGName()
2361
  dev_data = objects.Disk(dev_type="lvm", size=size,
2362
                          logical_id=(vgname, names[0]))
2363
  dev_meta = objects.Disk(dev_type="lvm", size=128,
2364
                          logical_id=(vgname, names[1]))
2365
  drbd_dev = objects.Disk(dev_type="drbd", size=size,
2366
                          logical_id = (primary, secondary, port),
2367
                          children = [dev_data, dev_meta])
2368
  return drbd_dev
2369

    
2370

    
2371
def _GenerateDiskTemplate(cfg, template_name,
2372
                          instance_name, primary_node,
2373
                          secondary_nodes, disk_sz, swap_sz):
2374
  """Generate the entire disk layout for a given template type.
2375

2376
  """
2377
  #TODO: compute space requirements
2378

    
2379
  vgname = cfg.GetVGName()
2380
  if template_name == "diskless":
2381
    disks = []
2382
  elif template_name == "plain":
2383
    if len(secondary_nodes) != 0:
2384
      raise errors.ProgrammerError("Wrong template configuration")
2385

    
2386
    names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
2387
    sda_dev = objects.Disk(dev_type="lvm", size=disk_sz,
2388
                           logical_id=(vgname, names[0]),
2389
                           iv_name = "sda")
2390
    sdb_dev = objects.Disk(dev_type="lvm", size=swap_sz,
2391
                           logical_id=(vgname, names[1]),
2392
                           iv_name = "sdb")
2393
    disks = [sda_dev, sdb_dev]
2394
  elif template_name == "local_raid1":
2395
    if len(secondary_nodes) != 0:
2396
      raise errors.ProgrammerError("Wrong template configuration")
2397

    
2398

    
2399
    names = _GenerateUniqueNames(cfg, [".sda_m1", ".sda_m2",
2400
                                       ".sdb_m1", ".sdb_m2"])
2401
    sda_dev_m1 = objects.Disk(dev_type="lvm", size=disk_sz,
2402
                              logical_id=(vgname, names[0]))
2403
    sda_dev_m2 = objects.Disk(dev_type="lvm", size=disk_sz,
2404
                              logical_id=(vgname, names[1]))
2405
    md_sda_dev = objects.Disk(dev_type="md_raid1", iv_name = "sda",
2406
                              size=disk_sz,
2407
                              children = [sda_dev_m1, sda_dev_m2])
2408
    sdb_dev_m1 = objects.Disk(dev_type="lvm", size=swap_sz,
2409
                              logical_id=(vgname, names[2]))
2410
    sdb_dev_m2 = objects.Disk(dev_type="lvm", size=swap_sz,
2411
                              logical_id=(vgname, names[3]))
2412
    md_sdb_dev = objects.Disk(dev_type="md_raid1", iv_name = "sdb",
2413
                              size=swap_sz,
2414
                              children = [sdb_dev_m1, sdb_dev_m2])
2415
    disks = [md_sda_dev, md_sdb_dev]
2416
  elif template_name == constants.DT_REMOTE_RAID1:
2417
    if len(secondary_nodes) != 1:
2418
      raise errors.ProgrammerError("Wrong template configuration")
2419
    remote_node = secondary_nodes[0]
2420
    names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2421
                                       ".sdb_data", ".sdb_meta"])
2422
    drbd_sda_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2423
                                         disk_sz, names[0:2])
2424
    md_sda_dev = objects.Disk(dev_type="md_raid1", iv_name="sda",
2425
                              children = [drbd_sda_dev], size=disk_sz)
2426
    drbd_sdb_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2427
                                         swap_sz, names[2:4])
2428
    md_sdb_dev = objects.Disk(dev_type="md_raid1", iv_name="sdb",
2429
                              children = [drbd_sdb_dev], size=swap_sz)
2430
    disks = [md_sda_dev, md_sdb_dev]
2431
  else:
2432
    raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
2433
  return disks
2434

    
2435

    
2436
def _GetInstanceInfoText(instance):
2437
  """Compute that text that should be added to the disk's metadata.
2438

2439
  """
2440
  return "originstname+%s" % instance.name
2441

    
2442

    
2443
def _CreateDisks(cfg, instance):
2444
  """Create all disks for an instance.
2445

2446
  This abstracts away some work from AddInstance.
2447

2448
  Args:
2449
    instance: the instance object
2450

2451
  Returns:
2452
    True or False showing the success of the creation process
2453

2454
  """
2455
  info = _GetInstanceInfoText(instance)
2456

    
2457
  for device in instance.disks:
2458
    logger.Info("creating volume %s for instance %s" %
2459
              (device.iv_name, instance.name))
2460
    #HARDCODE
2461
    for secondary_node in instance.secondary_nodes:
2462
      if not _CreateBlockDevOnSecondary(cfg, secondary_node, device, False,
2463
                                        info):
2464
        logger.Error("failed to create volume %s (%s) on secondary node %s!" %
2465
                     (device.iv_name, device, secondary_node))
2466
        return False
2467
    #HARDCODE
2468
    if not _CreateBlockDevOnPrimary(cfg, instance.primary_node, device, info):
2469
      logger.Error("failed to create volume %s on primary!" %
2470
                   device.iv_name)
2471
      return False
2472
  return True
2473

    
2474

    
2475
def _RemoveDisks(instance, cfg):
2476
  """Remove all disks for an instance.
2477

2478
  This abstracts away some work from `AddInstance()` and
2479
  `RemoveInstance()`. Note that in case some of the devices couldn't
2480
  be remove, the removal will continue with the other ones (compare
2481
  with `_CreateDisks()`).
2482

2483
  Args:
2484
    instance: the instance object
2485

2486
  Returns:
2487
    True or False showing the success of the removal proces
2488

2489
  """
2490
  logger.Info("removing block devices for instance %s" % instance.name)
2491

    
2492
  result = True
2493
  for device in instance.disks:
2494
    for node, disk in device.ComputeNodeTree(instance.primary_node):
2495
      cfg.SetDiskID(disk, node)
2496
      if not rpc.call_blockdev_remove(node, disk):
2497
        logger.Error("could not remove block device %s on node %s,"
2498
                     " continuing anyway" %
2499
                     (device.iv_name, node))
2500
        result = False
2501
  return result
2502

    
2503

    
2504
class LUCreateInstance(LogicalUnit):
2505
  """Create an instance.
2506

2507
  """
2508
  HPATH = "instance-add"
2509
  HTYPE = constants.HTYPE_INSTANCE
2510
  _OP_REQP = ["instance_name", "mem_size", "disk_size", "pnode",
2511
              "disk_template", "swap_size", "mode", "start", "vcpus",
2512
              "wait_for_sync"]
2513

    
2514
  def BuildHooksEnv(self):
2515
    """Build hooks env.
2516

2517
    This runs on master, primary and secondary nodes of the instance.
2518

2519
    """
2520
    env = {
2521
      "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
2522
      "INSTANCE_DISK_SIZE": self.op.disk_size,
2523
      "INSTANCE_SWAP_SIZE": self.op.swap_size,
2524
      "INSTANCE_ADD_MODE": self.op.mode,
2525
      }
2526
    if self.op.mode == constants.INSTANCE_IMPORT:
2527
      env["INSTANCE_SRC_NODE"] = self.op.src_node
2528
      env["INSTANCE_SRC_PATH"] = self.op.src_path
2529
      env["INSTANCE_SRC_IMAGE"] = self.src_image
2530

    
2531
    env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
2532
      primary_node=self.op.pnode,
2533
      secondary_nodes=self.secondaries,
2534
      status=self.instance_status,
2535
      os_type=self.op.os_type,
2536
      memory=self.op.mem_size,
2537
      vcpus=self.op.vcpus,
2538
      nics=[(self.inst_ip, self.op.bridge)],
2539
    ))
2540

    
2541
    nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
2542
          self.secondaries)
2543
    return env, nl, nl
2544

    
2545

    
2546
  def CheckPrereq(self):
2547
    """Check prerequisites.
2548

2549
    """
2550
    if self.op.mode not in (constants.INSTANCE_CREATE,
2551
                            constants.INSTANCE_IMPORT):
2552
      raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
2553
                                 self.op.mode)
2554

    
2555
    if self.op.mode == constants.INSTANCE_IMPORT:
2556
      src_node = getattr(self.op, "src_node", None)
2557
      src_path = getattr(self.op, "src_path", None)
2558
      if src_node is None or src_path is None:
2559
        raise errors.OpPrereqError("Importing an instance requires source"
2560
                                   " node and path options")
2561
      src_node_full = self.cfg.ExpandNodeName(src_node)
2562
      if src_node_full is None:
2563
        raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
2564
      self.op.src_node = src_node = src_node_full
2565

    
2566
      if not os.path.isabs(src_path):
2567
        raise errors.OpPrereqError("The source path must be absolute")
2568

    
2569
      export_info = rpc.call_export_info(src_node, src_path)
2570

    
2571
      if not export_info:
2572
        raise errors.OpPrereqError("No export found in dir %s" % src_path)
2573

    
2574
      if not export_info.has_section(constants.INISECT_EXP):
2575
        raise errors.ProgrammerError("Corrupted export config")
2576

    
2577
      ei_version = export_info.get(constants.INISECT_EXP, 'version')
2578
      if (int(ei_version) != constants.EXPORT_VERSION):
2579
        raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
2580
                                   (ei_version, constants.EXPORT_VERSION))
2581

    
2582
      if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
2583
        raise errors.OpPrereqError("Can't import instance with more than"
2584
                                   " one data disk")
2585

    
2586
      # FIXME: are the old os-es, disk sizes, etc. useful?
2587
      self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
2588
      diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
2589
                                                         'disk0_dump'))
2590
      self.src_image = diskimage
2591
    else: # INSTANCE_CREATE
2592
      if getattr(self.op, "os_type", None) is None:
2593
        raise errors.OpPrereqError("No guest OS specified")
2594

    
2595
    # check primary node
2596
    pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
2597
    if pnode is None:
2598
      raise errors.OpPrereqError("Primary node '%s' is unknown" %
2599
                                 self.op.pnode)
2600
    self.op.pnode = pnode.name
2601
    self.pnode = pnode
2602
    self.secondaries = []
2603
    # disk template and mirror node verification
2604
    if self.op.disk_template not in constants.DISK_TEMPLATES:
2605
      raise errors.OpPrereqError("Invalid disk template name")
2606

    
2607
    if self.op.disk_template == constants.DT_REMOTE_RAID1:
2608
      if getattr(self.op, "snode", None) is None:
2609
        raise errors.OpPrereqError("The 'remote_raid1' disk template needs"
2610
                                   " a mirror node")
2611

    
2612
      snode_name = self.cfg.ExpandNodeName(self.op.snode)
2613
      if snode_name is None:
2614
        raise errors.OpPrereqError("Unknown secondary node '%s'" %
2615
                                   self.op.snode)
2616
      elif snode_name == pnode.name:
2617
        raise errors.OpPrereqError("The secondary node cannot be"
2618
                                   " the primary node.")
2619
      self.secondaries.append(snode_name)
2620

    
2621
    # Check lv size requirements
2622
    nodenames = [pnode.name] + self.secondaries
2623
    nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
2624

    
2625
    # Required free disk space as a function of disk and swap space
2626
    req_size_dict = {
2627
      constants.DT_DISKLESS: 0,
2628
      constants.DT_PLAIN: self.op.disk_size + self.op.swap_size,
2629
      constants.DT_LOCAL_RAID1: (self.op.disk_size + self.op.swap_size) * 2,
2630
      # 256 MB are added for drbd metadata, 128MB for each drbd device
2631
      constants.DT_REMOTE_RAID1: self.op.disk_size + self.op.swap_size + 256,
2632
    }
2633

    
2634
    if self.op.disk_template not in req_size_dict:
2635
      raise errors.ProgrammerError("Disk template '%s' size requirement"
2636
                                   " is unknown" %  self.op.disk_template)
2637

    
2638
    req_size = req_size_dict[self.op.disk_template]
2639

    
2640
    for node in nodenames:
2641
      info = nodeinfo.get(node, None)
2642
      if not info:
2643
        raise errors.OpPrereqError("Cannot get current information"
2644
                                   " from node '%s'" % nodeinfo)
2645
      if req_size > info['vg_free']:
2646
        raise errors.OpPrereqError("Not enough disk space on target node %s."
2647
                                   " %d MB available, %d MB required" %
2648
                                   (node, info['vg_free'], req_size))
2649

    
2650
    # os verification
2651
    os_obj = rpc.call_os_get([pnode.name], self.op.os_type)[pnode.name]
2652
    if not isinstance(os_obj, objects.OS):
2653
      raise errors.OpPrereqError("OS '%s' not in supported os list for"
2654
                                 " primary node"  % self.op.os_type)
2655

    
2656
    # instance verification
2657
    hostname1 = utils.LookupHostname(self.op.instance_name)
2658
    if not hostname1:
2659
      raise errors.OpPrereqError("Instance name '%s' not found in dns" %
2660
                                 self.op.instance_name)
2661

    
2662
    self.op.instance_name = instance_name = hostname1['hostname']
2663
    instance_list = self.cfg.GetInstanceList()
2664
    if instance_name in instance_list:
2665
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2666
                                 instance_name)
2667

    
2668
    ip = getattr(self.op, "ip", None)
2669
    if ip is None or ip.lower() == "none":
2670
      inst_ip = None
2671
    elif ip.lower() == "auto":
2672
      inst_ip = hostname1['ip']
2673
    else:
2674
      if not utils.IsValidIP(ip):
2675
        raise errors.OpPrereqError("given IP address '%s' doesn't look"
2676
                                   " like a valid IP" % ip)
2677
      inst_ip = ip
2678
    self.inst_ip = inst_ip
2679

    
2680
    command = ["fping", "-q", hostname1['ip']]
2681
    result = utils.RunCmd(command)
2682
    if not result.failed:
2683
      raise errors.OpPrereqError("IP %s of instance %s already in use" %
2684
                                 (hostname1['ip'], instance_name))
2685

    
2686
    # bridge verification
2687
    bridge = getattr(self.op, "bridge", None)
2688
    if bridge is None:
2689
      self.op.bridge = self.cfg.GetDefBridge()
2690
    else:
2691
      self.op.bridge = bridge
2692

    
2693
    if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
2694
      raise errors.OpPrereqError("target bridge '%s' does not exist on"
2695
                                 " destination node '%s'" %
2696
                                 (self.op.bridge, pnode.name))
2697

    
2698
    if self.op.start:
2699
      self.instance_status = 'up'
2700
    else:
2701
      self.instance_status = 'down'
2702

    
2703
  def Exec(self, feedback_fn):
2704
    """Create and add the instance to the cluster.
2705

2706
    """
2707
    instance = self.op.instance_name
2708
    pnode_name = self.pnode.name
2709

    
2710
    nic = objects.NIC(bridge=self.op.bridge, mac=self.cfg.GenerateMAC())
2711
    if self.inst_ip is not None:
2712
      nic.ip = self.inst_ip
2713

    
2714
    disks = _GenerateDiskTemplate(self.cfg,
2715
                                  self.op.disk_template,
2716
                                  instance, pnode_name,
2717
                                  self.secondaries, self.op.disk_size,
2718
                                  self.op.swap_size)
2719

    
2720
    iobj = objects.Instance(name=instance, os=self.op.os_type,
2721
                            primary_node=pnode_name,
2722
                            memory=self.op.mem_size,
2723
                            vcpus=self.op.vcpus,
2724
                            nics=[nic], disks=disks,
2725
                            disk_template=self.op.disk_template,
2726
                            status=self.instance_status,
2727
                            )
2728

    
2729
    feedback_fn("* creating instance disks...")
2730
    if not _CreateDisks(self.cfg, iobj):
2731
      _RemoveDisks(iobj, self.cfg)
2732
      raise errors.OpExecError("Device creation failed, reverting...")
2733

    
2734
    feedback_fn("adding instance %s to cluster config" % instance)
2735

    
2736
    self.cfg.AddInstance(iobj)
2737

    
2738
    if self.op.wait_for_sync:
2739
      disk_abort = not _WaitForSync(self.cfg, iobj)
2740
    elif iobj.disk_template == constants.DT_REMOTE_RAID1:
2741
      # make sure the disks are not degraded (still sync-ing is ok)
2742
      time.sleep(15)
2743
      feedback_fn("* checking mirrors status")
2744
      disk_abort = not _WaitForSync(self.cfg, iobj, oneshot=True)
2745
    else:
2746
      disk_abort = False
2747

    
2748
    if disk_abort:
2749
      _RemoveDisks(iobj, self.cfg)
2750
      self.cfg.RemoveInstance(iobj.name)
2751
      raise errors.OpExecError("There are some degraded disks for"
2752
                               " this instance")
2753

    
2754
    feedback_fn("creating os for instance %s on node %s" %
2755
                (instance, pnode_name))
2756

    
2757
    if iobj.disk_template != constants.DT_DISKLESS:
2758
      if self.op.mode == constants.INSTANCE_CREATE:
2759
        feedback_fn("* running the instance OS create scripts...")
2760
        if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
2761
          raise errors.OpExecError("could not add os for instance %s"
2762
                                   " on node %s" %
2763
                                   (instance, pnode_name))
2764

    
2765
      elif self.op.mode == constants.INSTANCE_IMPORT:
2766
        feedback_fn("* running the instance OS import scripts...")
2767
        src_node = self.op.src_node
2768
        src_image = self.src_image
2769
        if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
2770
                                                src_node, src_image):
2771
          raise errors.OpExecError("Could not import os for instance"
2772
                                   " %s on node %s" %
2773
                                   (instance, pnode_name))
2774
      else:
2775
        # also checked in the prereq part
2776
        raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
2777
                                     % self.op.mode)
2778

    
2779
    if self.op.start:
2780
      logger.Info("starting instance %s on node %s" % (instance, pnode_name))
2781
      feedback_fn("* starting instance...")
2782
      if not rpc.call_instance_start(pnode_name, iobj, None):
2783
        raise errors.OpExecError("Could not start instance")
2784

    
2785

    
2786
class LUConnectConsole(NoHooksLU):
2787
  """Connect to an instance's console.
2788

2789
  This is somewhat special in that it returns the command line that
2790
  you need to run on the master node in order to connect to the
2791
  console.
2792

2793
  """
2794
  _OP_REQP = ["instance_name"]
2795

    
2796
  def CheckPrereq(self):
2797
    """Check prerequisites.
2798

2799
    This checks that the instance is in the cluster.
2800

2801
    """
2802
    instance = self.cfg.GetInstanceInfo(
2803
      self.cfg.ExpandInstanceName(self.op.instance_name))
2804
    if instance is None:
2805
      raise errors.OpPrereqError("Instance '%s' not known" %
2806
                                 self.op.instance_name)
2807
    self.instance = instance
2808

    
2809
  def Exec(self, feedback_fn):
2810
    """Connect to the console of an instance
2811

2812
    """
2813
    instance = self.instance
2814
    node = instance.primary_node
2815

    
2816
    node_insts = rpc.call_instance_list([node])[node]
2817
    if node_insts is False:
2818
      raise errors.OpExecError("Can't connect to node %s." % node)
2819

    
2820
    if instance.name not in node_insts:
2821
      raise errors.OpExecError("Instance %s is not running." % instance.name)
2822

    
2823
    logger.Debug("connecting to console of %s on %s" % (instance.name, node))
2824

    
2825
    hyper = hypervisor.GetHypervisor()
2826
    console_cmd = hyper.GetShellCommandForConsole(instance.name)
2827
    # build ssh cmdline
2828
    argv = ["ssh", "-q", "-t"]
2829
    argv.extend(ssh.KNOWN_HOSTS_OPTS)
2830
    argv.extend(ssh.BATCH_MODE_OPTS)
2831
    argv.append(node)
2832
    argv.append(console_cmd)
2833
    return "ssh", argv
2834

    
2835

    
2836
class LUAddMDDRBDComponent(LogicalUnit):
2837
  """Adda new mirror member to an instance's disk.
2838

2839
  """
2840
  HPATH = "mirror-add"
2841
  HTYPE = constants.HTYPE_INSTANCE
2842
  _OP_REQP = ["instance_name", "remote_node", "disk_name"]
2843

    
2844
  def BuildHooksEnv(self):
2845
    """Build hooks env.
2846

2847
    This runs on the master, the primary and all the secondaries.
2848

2849
    """
2850
    env = {
2851
      "NEW_SECONDARY": self.op.remote_node,
2852
      "DISK_NAME": self.op.disk_name,
2853
      }
2854
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2855
    nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
2856
          self.op.remote_node,] + list(self.instance.secondary_nodes)
2857
    return env, nl, nl
2858

    
2859
  def CheckPrereq(self):
2860
    """Check prerequisites.
2861

2862
    This checks that the instance is in the cluster.
2863

2864
    """
2865
    instance = self.cfg.GetInstanceInfo(
2866
      self.cfg.ExpandInstanceName(self.op.instance_name))
2867
    if instance is None:
2868
      raise errors.OpPrereqError("Instance '%s' not known" %
2869
                                 self.op.instance_name)
2870
    self.instance = instance
2871

    
2872
    remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
2873
    if remote_node is None:
2874
      raise errors.OpPrereqError("Node '%s' not known" % self.op.remote_node)
2875
    self.remote_node = remote_node
2876

    
2877
    if remote_node == instance.primary_node:
2878
      raise errors.OpPrereqError("The specified node is the primary node of"
2879
                                 " the instance.")
2880

    
2881
    if instance.disk_template != constants.DT_REMOTE_RAID1:
2882
      raise errors.OpPrereqError("Instance's disk layout is not"
2883
                                 " remote_raid1.")
2884
    for disk in instance.disks:
2885
      if disk.iv_name == self.op.disk_name:
2886
        break
2887
    else:
2888
      raise errors.OpPrereqError("Can't find this device ('%s') in the"
2889
                                 " instance." % self.op.disk_name)
2890
    if len(disk.children) > 1:
2891
      raise errors.OpPrereqError("The device already has two slave"
2892
                                 " devices.\n"
2893
                                 "This would create a 3-disk raid1"
2894
                                 " which we don't allow.")
2895
    self.disk = disk
2896

    
2897
  def Exec(self, feedback_fn):
2898
    """Add the mirror component
2899

2900
    """
2901
    disk = self.disk
2902
    instance = self.instance
2903

    
2904
    remote_node = self.remote_node
2905
    lv_names = [".%s_%s" % (disk.iv_name, suf) for suf in ["data", "meta"]]
2906
    names = _GenerateUniqueNames(self.cfg, lv_names)
2907
    new_drbd = _GenerateMDDRBDBranch(self.cfg, instance.primary_node,
2908
                                     remote_node, disk.size, names)
2909

    
2910
    logger.Info("adding new mirror component on secondary")
2911
    #HARDCODE
2912
    if not _CreateBlockDevOnSecondary(self.cfg, remote_node, new_drbd, False,
2913
                                      _GetInstanceInfoText(instance)):
2914
      raise errors.OpExecError("Failed to create new component on secondary"
2915
                               " node %s" % remote_node)
2916

    
2917
    logger.Info("adding new mirror component on primary")
2918
    #HARDCODE
2919
    if not _CreateBlockDevOnPrimary(self.cfg, instance.primary_node, new_drbd,
2920
                                    _GetInstanceInfoText(instance)):
2921
      # remove secondary dev
2922
      self.cfg.SetDiskID(new_drbd, remote_node)
2923
      rpc.call_blockdev_remove(remote_node, new_drbd)
2924
      raise errors.OpExecError("Failed to create volume on primary")
2925

    
2926
    # the device exists now
2927
    # call the primary node to add the mirror to md
2928
    logger.Info("adding new mirror component to md")
2929
    if not rpc.call_blockdev_addchild(instance.primary_node,
2930
                                           disk, new_drbd):
2931
      logger.Error("Can't add mirror compoment to md!")
2932
      self.cfg.SetDiskID(new_drbd, remote_node)
2933
      if not rpc.call_blockdev_remove(remote_node, new_drbd):
2934
        logger.Error("Can't rollback on secondary")
2935
      self.cfg.SetDiskID(new_drbd, instance.primary_node)
2936
      if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
2937
        logger.Error("Can't rollback on primary")
2938
      raise errors.OpExecError("Can't add mirror component to md array")
2939

    
2940
    disk.children.append(new_drbd)
2941

    
2942
    self.cfg.AddInstance(instance)
2943

    
2944
    _WaitForSync(self.cfg, instance)
2945

    
2946
    return 0
2947

    
2948

    
2949
class LURemoveMDDRBDComponent(LogicalUnit):
2950
  """Remove a component from a remote_raid1 disk.
2951

2952
  """
2953
  HPATH = "mirror-remove"
2954
  HTYPE = constants.HTYPE_INSTANCE
2955
  _OP_REQP = ["instance_name", "disk_name", "disk_id"]
2956

    
2957
  def BuildHooksEnv(self):
2958
    """Build hooks env.
2959

2960
    This runs on the master, the primary and all the secondaries.
2961

2962
    """
2963
    env = {
2964
      "DISK_NAME": self.op.disk_name,
2965
      "DISK_ID": self.op.disk_id,
2966
      "OLD_SECONDARY": self.old_secondary,
2967
      }
2968
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2969
    nl = [self.sstore.GetMasterNode(),
2970
          self.instance.primary_node] + list(self.instance.secondary_nodes)
2971
    return env, nl, nl
2972

    
2973
  def CheckPrereq(self):
2974
    """Check prerequisites.
2975

2976
    This checks that the instance is in the cluster.
2977

2978
    """
2979
    instance = self.cfg.GetInstanceInfo(
2980
      self.cfg.ExpandInstanceName(self.op.instance_name))
2981
    if instance is None:
2982
      raise errors.OpPrereqError("Instance '%s' not known" %
2983
                                 self.op.instance_name)
2984
    self.instance = instance
2985

    
2986
    if instance.disk_template != constants.DT_REMOTE_RAID1:
2987
      raise errors.OpPrereqError("Instance's disk layout is not"
2988
                                 " remote_raid1.")
2989
    for disk in instance.disks:
2990
      if disk.iv_name == self.op.disk_name:
2991
        break
2992
    else:
2993
      raise errors.OpPrereqError("Can't find this device ('%s') in the"
2994
                                 " instance." % self.op.disk_name)
2995
    for child in disk.children:
2996
      if child.dev_type == "drbd" and child.logical_id[2] == self.op.disk_id:
2997
        break
2998
    else:
2999
      raise errors.OpPrereqError("Can't find the device with this port.")
3000

    
3001
    if len(disk.children) < 2:
3002
      raise errors.OpPrereqError("Cannot remove the last component from"
3003
                                 " a mirror.")
3004
    self.disk = disk
3005
    self.child = child
3006
    if self.child.logical_id[0] == instance.primary_node:
3007
      oid = 1
3008
    else:
3009
      oid = 0
3010
    self.old_secondary = self.child.logical_id[oid]
3011

    
3012
  def Exec(self, feedback_fn):
3013
    """Remove the mirror component
3014

3015
    """
3016
    instance = self.instance
3017
    disk = self.disk
3018
    child = self.child
3019
    logger.Info("remove mirror component")
3020
    self.cfg.SetDiskID(disk, instance.primary_node)
3021
    if not rpc.call_blockdev_removechild(instance.primary_node,
3022
                                              disk, child):
3023
      raise errors.OpExecError("Can't remove child from mirror.")
3024

    
3025
    for node in child.logical_id[:2]:
3026
      self.cfg.SetDiskID(child, node)
3027
      if not rpc.call_blockdev_remove(node, child):
3028
        logger.Error("Warning: failed to remove device from node %s,"
3029
                     " continuing operation." % node)
3030

    
3031
    disk.children.remove(child)
3032
    self.cfg.AddInstance(instance)
3033

    
3034

    
3035
class LUReplaceDisks(LogicalUnit):
3036
  """Replace the disks of an instance.
3037

3038
  """
3039
  HPATH = "mirrors-replace"
3040
  HTYPE = constants.HTYPE_INSTANCE
3041
  _OP_REQP = ["instance_name"]
3042

    
3043
  def BuildHooksEnv(self):
3044
    """Build hooks env.
3045

3046
    This runs on the master, the primary and all the secondaries.
3047

3048
    """
3049
    env = {
3050
      "NEW_SECONDARY": self.op.remote_node,
3051
      "OLD_SECONDARY": self.instance.secondary_nodes[0],
3052
      }
3053
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3054
    nl = [self.sstore.GetMasterNode(),
3055
          self.instance.primary_node] + list(self.instance.secondary_nodes)
3056
    return env, nl, nl
3057

    
3058
  def CheckPrereq(self):
3059
    """Check prerequisites.
3060

3061
    This checks that the instance is in the cluster.
3062

3063
    """
3064
    instance = self.cfg.GetInstanceInfo(
3065
      self.cfg.ExpandInstanceName(self.op.instance_name))
3066
    if instance is None:
3067
      raise errors.OpPrereqError("Instance '%s' not known" %
3068
                                 self.op.instance_name)
3069
    self.instance = instance
3070

    
3071
    if instance.disk_template != constants.DT_REMOTE_RAID1:
3072
      raise errors.OpPrereqError("Instance's disk layout is not"
3073
                                 " remote_raid1.")
3074

    
3075
    if len(instance.secondary_nodes) != 1:
3076
      raise errors.OpPrereqError("The instance has a strange layout,"
3077
                                 " expected one secondary but found %d" %
3078
                                 len(instance.secondary_nodes))
3079

    
3080
    remote_node = getattr(self.op, "remote_node", None)
3081
    if remote_node is None:
3082
      remote_node = instance.secondary_nodes[0]
3083
    else:
3084
      remote_node = self.cfg.ExpandNodeName(remote_node)
3085
      if remote_node is None:
3086
        raise errors.OpPrereqError("Node '%s' not known" %
3087
                                   self.op.remote_node)
3088
    if remote_node == instance.primary_node:
3089
      raise errors.OpPrereqError("The specified node is the primary node of"
3090
                                 " the instance.")
3091
    self.op.remote_node = remote_node
3092

    
3093
  def Exec(self, feedback_fn):
3094
    """Replace the disks of an instance.
3095

3096
    """
3097
    instance = self.instance
3098
    iv_names = {}
3099
    # start of work
3100
    remote_node = self.op.remote_node
3101
    cfg = self.cfg
3102
    for dev in instance.disks:
3103
      size = dev.size
3104
      lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3105
      names = _GenerateUniqueNames(cfg, lv_names)
3106
      new_drbd = _GenerateMDDRBDBranch(cfg, instance.primary_node,
3107
                                       remote_node, size, names)
3108
      iv_names[dev.iv_name] = (dev, dev.children[0], new_drbd)
3109
      logger.Info("adding new mirror component on secondary for %s" %
3110
                  dev.iv_name)
3111
      #HARDCODE
3112
      if not _CreateBlockDevOnSecondary(cfg, remote_node, new_drbd, False,
3113
                                        _GetInstanceInfoText(instance)):
3114
        raise errors.OpExecError("Failed to create new component on"
3115
                                 " secondary node %s\n"
3116
                                 "Full abort, cleanup manually!" %
3117
                                 remote_node)
3118

    
3119
      logger.Info("adding new mirror component on primary")
3120
      #HARDCODE
3121
      if not _CreateBlockDevOnPrimary(cfg, instance.primary_node, new_drbd,
3122
                                      _GetInstanceInfoText(instance)):
3123
        # remove secondary dev
3124
        cfg.SetDiskID(new_drbd, remote_node)
3125
        rpc.call_blockdev_remove(remote_node, new_drbd)
3126
        raise errors.OpExecError("Failed to create volume on primary!\n"
3127
                                 "Full abort, cleanup manually!!")
3128

    
3129
      # the device exists now
3130
      # call the primary node to add the mirror to md
3131
      logger.Info("adding new mirror component to md")
3132
      if not rpc.call_blockdev_addchild(instance.primary_node, dev,
3133
                                        new_drbd):
3134
        logger.Error("Can't add mirror compoment to md!")
3135
        cfg.SetDiskID(new_drbd, remote_node)
3136
        if not rpc.call_blockdev_remove(remote_node, new_drbd):
3137
          logger.Error("Can't rollback on secondary")
3138
        cfg.SetDiskID(new_drbd, instance.primary_node)
3139
        if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3140
          logger.Error("Can't rollback on primary")
3141
        raise errors.OpExecError("Full abort, cleanup manually!!")
3142

    
3143
      dev.children.append(new_drbd)
3144
      cfg.AddInstance(instance)
3145

    
3146
    # this can fail as the old devices are degraded and _WaitForSync
3147
    # does a combined result over all disks, so we don't check its
3148
    # return value
3149
    _WaitForSync(cfg, instance, unlock=True)
3150

    
3151
    # so check manually all the devices
3152
    for name in iv_names:
3153
      dev, child, new_drbd = iv_names[name]
3154
      cfg.SetDiskID(dev, instance.primary_node)
3155
      is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3156
      if is_degr:
3157
        raise errors.OpExecError("MD device %s is degraded!" % name)
3158
      cfg.SetDiskID(new_drbd, instance.primary_node)
3159
      is_degr = rpc.call_blockdev_find(instance.primary_node, new_drbd)[5]
3160
      if is_degr:
3161
        raise errors.OpExecError("New drbd device %s is degraded!" % name)
3162

    
3163
    for name in iv_names:
3164
      dev, child, new_drbd = iv_names[name]
3165
      logger.Info("remove mirror %s component" % name)
3166
      cfg.SetDiskID(dev, instance.primary_node)
3167
      if not rpc.call_blockdev_removechild(instance.primary_node,
3168
                                                dev, child):
3169
        logger.Error("Can't remove child from mirror, aborting"
3170
                     " *this device cleanup*.\nYou need to cleanup manually!!")
3171
        continue
3172

    
3173
      for node in child.logical_id[:2]:
3174
        logger.Info("remove child device on %s" % node)
3175
        cfg.SetDiskID(child, node)
3176
        if not rpc.call_blockdev_remove(node, child):
3177
          logger.Error("Warning: failed to remove device from node %s,"
3178
                       " continuing operation." % node)
3179

    
3180
      dev.children.remove(child)
3181

    
3182
      cfg.AddInstance(instance)
3183

    
3184

    
3185
class LUQueryInstanceData(NoHooksLU):
3186
  """Query runtime instance data.
3187

3188
  """
3189
  _OP_REQP = ["instances"]
3190

    
3191
  def CheckPrereq(self):
3192
    """Check prerequisites.
3193

3194
    This only checks the optional instance list against the existing names.
3195

3196
    """
3197
    if not isinstance(self.op.instances, list):
3198
      raise errors.OpPrereqError("Invalid argument type 'instances'")
3199
    if self.op.instances:
3200
      self.wanted_instances = []
3201
      names = self.op.instances
3202
      for name in names:
3203
        instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
3204
        if instance is None:
3205
          raise errors.OpPrereqError("No such instance name '%s'" % name)
3206
      self.wanted_instances.append(instance)
3207
    else:
3208
      self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
3209
                               in self.cfg.GetInstanceList()]
3210
    return
3211

    
3212

    
3213
  def _ComputeDiskStatus(self, instance, snode, dev):
3214
    """Compute block device status.
3215

3216
    """
3217
    self.cfg.SetDiskID(dev, instance.primary_node)
3218
    dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
3219
    if dev.dev_type == "drbd":
3220
      # we change the snode then (otherwise we use the one passed in)
3221
      if dev.logical_id[0] == instance.primary_node:
3222
        snode = dev.logical_id[1]
3223
      else:
3224
        snode = dev.logical_id[0]
3225

    
3226
    if snode:
3227
      self.cfg.SetDiskID(dev, snode)
3228
      dev_sstatus = rpc.call_blockdev_find(snode, dev)
3229
    else:
3230
      dev_sstatus = None
3231

    
3232
    if dev.children:
3233
      dev_children = [self._ComputeDiskStatus(instance, snode, child)
3234
                      for child in dev.children]
3235
    else:
3236
      dev_children = []
3237

    
3238
    data = {
3239
      "iv_name": dev.iv_name,
3240
      "dev_type": dev.dev_type,
3241
      "logical_id": dev.logical_id,
3242
      "physical_id": dev.physical_id,
3243
      "pstatus": dev_pstatus,
3244
      "sstatus": dev_sstatus,
3245
      "children": dev_children,
3246
      }
3247

    
3248
    return data
3249

    
3250
  def Exec(self, feedback_fn):
3251
    """Gather and return data"""
3252
    result = {}
3253
    for instance in self.wanted_instances:
3254
      remote_info = rpc.call_instance_info(instance.primary_node,
3255
                                                instance.name)
3256
      if remote_info and "state" in remote_info:
3257
        remote_state = "up"
3258
      else:
3259
        remote_state = "down"
3260
      if instance.status == "down":
3261
        config_state = "down"
3262
      else:
3263
        config_state = "up"
3264

    
3265
      disks = [self._ComputeDiskStatus(instance, None, device)
3266
               for device in instance.disks]
3267

    
3268
      idict = {
3269
        "name": instance.name,
3270
        "config_state": config_state,
3271
        "run_state": remote_state,
3272
        "pnode": instance.primary_node,
3273
        "snodes": instance.secondary_nodes,
3274
        "os": instance.os,
3275
        "memory": instance.memory,
3276
        "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
3277
        "disks": disks,
3278
        }
3279

    
3280
      result[instance.name] = idict
3281

    
3282
    return result
3283

    
3284

    
3285
class LUQueryNodeData(NoHooksLU):
3286
  """Logical unit for querying node data.
3287

3288
  """
3289
  _OP_REQP = ["nodes"]
3290

    
3291
  def CheckPrereq(self):
3292
    """Check prerequisites.
3293

3294
    This only checks the optional node list against the existing names.
3295

3296
    """
3297
    self.wanted_nodes = _GetWantedNodes(self, self.op.nodes)
3298

    
3299
  def Exec(self, feedback_fn):
3300
    """Compute and return the list of nodes.
3301

3302
    """
3303
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
3304
             in self.cfg.GetInstanceList()]
3305
    result = []
3306
    for node in self.wanted_nodes:
3307
      result.append((node.name, node.primary_ip, node.secondary_ip,
3308
                     [inst.name for inst in ilist
3309
                      if inst.primary_node == node.name],
3310
                     [inst.name for inst in ilist
3311
                      if node.name in inst.secondary_nodes],
3312
                     ))
3313
    return result
3314

    
3315

    
3316
class LUSetInstanceParms(LogicalUnit):
3317
  """Modifies an instances's parameters.
3318

3319
  """
3320
  HPATH = "instance-modify"
3321
  HTYPE = constants.HTYPE_INSTANCE
3322
  _OP_REQP = ["instance_name"]
3323

    
3324
  def BuildHooksEnv(self):
3325
    """Build hooks env.
3326

3327
    This runs on the master, primary and secondaries.
3328

3329
    """
3330
    args = dict()
3331
    if self.mem:
3332
      args['memory'] = self.mem
3333
    if self.vcpus:
3334
      args['vcpus'] = self.vcpus
3335
    if self.do_ip or self.do_bridge:
3336
      if self.do_ip:
3337
        ip = self.ip
3338
      else:
3339
        ip = self.instance.nics[0].ip
3340
      if self.bridge:
3341
        bridge = self.bridge
3342
      else:
3343
        bridge = self.instance.nics[0].bridge
3344
      args['nics'] = [(ip, bridge)]
3345
    env = _BuildInstanceHookEnvByObject(self.instance, override=args)
3346
    nl = [self.sstore.GetMasterNode(),
3347
          self.instance.primary_node] + list(self.instance.secondary_nodes)
3348
    return env, nl, nl
3349

    
3350
  def CheckPrereq(self):
3351
    """Check prerequisites.
3352

3353
    This only checks the instance list against the existing names.
3354

3355
    """
3356
    self.mem = getattr(self.op, "mem", None)
3357
    self.vcpus = getattr(self.op, "vcpus", None)
3358
    self.ip = getattr(self.op, "ip", None)
3359
    self.bridge = getattr(self.op, "bridge", None)
3360
    if [self.mem, self.vcpus, self.ip, self.bridge].count(None) == 4:
3361
      raise errors.OpPrereqError("No changes submitted")
3362
    if self.mem is not None:
3363
      try:
3364
        self.mem = int(self.mem)
3365
      except ValueError, err:
3366
        raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
3367
    if self.vcpus is not None:
3368
      try:
3369
        self.vcpus = int(self.vcpus)
3370
      except ValueError, err:
3371
        raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
3372
    if self.ip is not None:
3373
      self.do_ip = True
3374
      if self.ip.lower() == "none":
3375
        self.ip = None
3376
      else:
3377
        if not utils.IsValidIP(self.ip):
3378
          raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
3379
    else:
3380
      self.do_ip = False
3381
    self.do_bridge = (self.bridge is not None)
3382

    
3383
    instance = self.cfg.GetInstanceInfo(
3384
      self.cfg.ExpandInstanceName(self.op.instance_name))
3385
    if instance is None:
3386
      raise errors.OpPrereqError("No such instance name '%s'" %
3387
                                 self.op.instance_name)
3388
    self.op.instance_name = instance.name
3389
    self.instance = instance
3390
    return
3391

    
3392
  def Exec(self, feedback_fn):
3393
    """Modifies an instance.
3394

3395
    All parameters take effect only at the next restart of the instance.
3396
    """
3397
    result = []
3398
    instance = self.instance
3399
    if self.mem:
3400
      instance.memory = self.mem
3401
      result.append(("mem", self.mem))
3402
    if self.vcpus:
3403
      instance.vcpus = self.vcpus
3404
      result.append(("vcpus",  self.vcpus))
3405
    if self.do_ip:
3406
      instance.nics[0].ip = self.ip
3407
      result.append(("ip", self.ip))
3408
    if self.bridge:
3409
      instance.nics[0].bridge = self.bridge
3410
      result.append(("bridge", self.bridge))
3411

    
3412
    self.cfg.AddInstance(instance)
3413

    
3414
    return result
3415

    
3416

    
3417
class LUQueryExports(NoHooksLU):
3418
  """Query the exports list
3419

3420
  """
3421
  _OP_REQP = []
3422

    
3423
  def CheckPrereq(self):
3424
    """Check that the nodelist contains only existing nodes.
3425

3426
    """
3427
    self.nodes = _GetWantedNodes(self, getattr(self.op, "nodes", None))
3428

    
3429
  def Exec(self, feedback_fn):
3430
    """Compute the list of all the exported system images.
3431

3432
    Returns:
3433
      a dictionary with the structure node->(export-list)
3434
      where export-list is a list of the instances exported on
3435
      that node.
3436

3437
    """
3438
    return rpc.call_export_list([node.name for node in self.nodes])
3439

    
3440

    
3441
class LUExportInstance(LogicalUnit):
3442
  """Export an instance to an image in the cluster.
3443

3444
  """
3445
  HPATH = "instance-export"
3446
  HTYPE = constants.HTYPE_INSTANCE
3447
  _OP_REQP = ["instance_name", "target_node", "shutdown"]
3448

    
3449
  def BuildHooksEnv(self):
3450
    """Build hooks env.
3451

3452
    This will run on the master, primary node and target node.
3453

3454
    """
3455
    env = {
3456
      "EXPORT_NODE": self.op.target_node,
3457
      "EXPORT_DO_SHUTDOWN": self.op.shutdown,
3458
      }
3459
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3460
    nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
3461
          self.op.target_node]
3462
    return env, nl, nl
3463

    
3464
  def CheckPrereq(self):
3465
    """Check prerequisites.
3466

3467
    This checks that the instance name is a valid one.
3468

3469
    """
3470
    instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
3471
    self.instance = self.cfg.GetInstanceInfo(instance_name)
3472
    if self.instance is None:
3473
      raise errors.OpPrereqError("Instance '%s' not found" %
3474
                                 self.op.instance_name)
3475

    
3476
    # node verification
3477
    dst_node_short = self.cfg.ExpandNodeName(self.op.target_node)
3478
    self.dst_node = self.cfg.GetNodeInfo(dst_node_short)
3479

    
3480
    if self.dst_node is None:
3481
      raise errors.OpPrereqError("Destination node '%s' is unknown." %
3482
                                 self.op.target_node)
3483
    self.op.target_node = self.dst_node.name
3484

    
3485
  def Exec(self, feedback_fn):
3486
    """Export an instance to an image in the cluster.
3487

3488
    """
3489
    instance = self.instance
3490
    dst_node = self.dst_node
3491
    src_node = instance.primary_node
3492
    # shutdown the instance, unless requested not to do so
3493
    if self.op.shutdown:
3494
      op = opcodes.OpShutdownInstance(instance_name=instance.name)
3495
      self.processor.ChainOpCode(op, feedback_fn)
3496

    
3497
    vgname = self.cfg.GetVGName()
3498

    
3499
    snap_disks = []
3500

    
3501
    try:
3502
      for disk in instance.disks:
3503
        if disk.iv_name == "sda":
3504
          # new_dev_name will be a snapshot of an lvm leaf of the one we passed
3505
          new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
3506

    
3507
          if not new_dev_name:
3508
            logger.Error("could not snapshot block device %s on node %s" %
3509
                         (disk.logical_id[1], src_node))
3510
          else:
3511
            new_dev = objects.Disk(dev_type="lvm", size=disk.size,
3512
                                      logical_id=(vgname, new_dev_name),
3513
                                      physical_id=(vgname, new_dev_name),
3514
                                      iv_name=disk.iv_name)
3515
            snap_disks.append(new_dev)
3516

    
3517
    finally:
3518
      if self.op.shutdown:
3519
        op = opcodes.OpStartupInstance(instance_name=instance.name,
3520
                                       force=False)
3521
        self.processor.ChainOpCode(op, feedback_fn)
3522

    
3523
    # TODO: check for size
3524

    
3525
    for dev in snap_disks:
3526
      if not rpc.call_snapshot_export(src_node, dev, dst_node.name,
3527
                                           instance):
3528
        logger.Error("could not export block device %s from node"
3529
                     " %s to node %s" %
3530
                     (dev.logical_id[1], src_node, dst_node.name))
3531
      if not rpc.call_blockdev_remove(src_node, dev):
3532
        logger.Error("could not remove snapshot block device %s from"
3533
                     " node %s" % (dev.logical_id[1], src_node))
3534

    
3535
    if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
3536
      logger.Error("could not finalize export for instance %s on node %s" %
3537
                   (instance.name, dst_node.name))
3538

    
3539
    nodelist = self.cfg.GetNodeList()
3540
    nodelist.remove(dst_node.name)
3541

    
3542
    # on one-node clusters nodelist will be empty after the removal
3543
    # if we proceed the backup would be removed because OpQueryExports
3544
    # substitutes an empty list with the full cluster node list.
3545
    if nodelist:
3546
      op = opcodes.OpQueryExports(nodes=nodelist)
3547
      exportlist = self.processor.ChainOpCode(op, feedback_fn)
3548
      for node in exportlist:
3549
        if instance.name in exportlist[node]:
3550
          if not rpc.call_export_remove(node, instance.name):
3551
            logger.Error("could not remove older export for instance %s"
3552
                         " on node %s" % (instance.name, node))
3553

    
3554

    
3555
class TagsLU(NoHooksLU):
3556
  """Generic tags LU.
3557

3558
  This is an abstract class which is the parent of all the other tags LUs.
3559

3560
  """
3561
  def CheckPrereq(self):
3562
    """Check prerequisites.
3563

3564
    """
3565
    if self.op.kind == constants.TAG_CLUSTER:
3566
      self.target = self.cfg.GetClusterInfo()
3567
    elif self.op.kind == constants.TAG_NODE:
3568
      name = self.cfg.ExpandNodeName(self.op.name)
3569
      if name is None:
3570
        raise errors.OpPrereqError("Invalid node name (%s)" %
3571
                                   (self.op.name,))
3572
      self.op.name = name
3573
      self.target = self.cfg.GetNodeInfo(name)
3574
    elif self.op.kind == constants.TAG_INSTANCE:
3575
      name = self.cfg.ExpandInstanceName(name)
3576
      if name is None:
3577
        raise errors.OpPrereqError("Invalid instance name (%s)" %
3578
                                   (self.op.name,))
3579
      self.op.name = name
3580
      self.target = self.cfg.GetInstanceInfo(name)
3581
    else:
3582
      raise errors.OpPrereqError("Wrong tag type requested (%s)" %
3583
                                 str(self.op.kind))
3584

    
3585

    
3586
class LUGetTags(TagsLU):
3587
  """Returns the tags of a given object.
3588

3589
  """
3590
  _OP_REQP = ["kind", "name"]
3591

    
3592
  def Exec(self, feedback_fn):
3593
    """Returns the tag list.
3594

3595
    """
3596
    return self.target.GetTags()
3597

    
3598

    
3599
class LUAddTag(TagsLU):
3600
  """Sets a tag on a given object.
3601

3602
  """
3603
  _OP_REQP = ["kind", "name", "tag"]
3604

    
3605
  def CheckPrereq(self):
3606
    """Check prerequisites.
3607

3608
    This checks the type and length of the tag name and value.
3609

3610
    """
3611
    TagsLU.CheckPrereq(self)
3612
    objects.TaggableObject.ValidateTag(self.op.tag)
3613

    
3614
  def Exec(self, feedback_fn):
3615
    """Sets the tag.
3616

3617
    """
3618
    try:
3619
      self.target.AddTag(self.op.tag)
3620
    except errors.TagError, err:
3621
      raise errors.OpExecError("Error while setting tag: %s" % str(err))
3622
    try:
3623
      self.cfg.Update(self.target)
3624
    except errors.ConfigurationError:
3625
      raise errors.OpRetryError("There has been a modification to the"
3626
                                " config file and the operation has been"
3627
                                " aborted. Please retry.")
3628

    
3629

    
3630
class LUDelTag(TagsLU):
3631
  """Delete a tag from a given object.
3632

3633
  """
3634
  _OP_REQP = ["kind", "name", "tag"]
3635

    
3636
  def CheckPrereq(self):
3637
    """Check prerequisites.
3638

3639
    This checks that we have the given tag.
3640

3641
    """
3642
    TagsLU.CheckPrereq(self)
3643
    objects.TaggableObject.ValidateTag(self.op.tag)
3644
    if self.op.tag not in self.target.GetTags():
3645
      raise errors.OpPrereqError("Tag not found")
3646

    
3647
  def Exec(self, feedback_fn):
3648
    """Remove the tag from the object.
3649

3650
    """
3651
    self.target.RemoveTag(self.op.tag)
3652
    try:
3653
      self.cfg.Update(self.target)
3654
    except errors.ConfigurationError:
3655
      raise errors.OpRetryError("There has been a modification to the"
3656
                                " config file and the operation has been"
3657
                                " aborted. Please retry.")