Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ 246e180a

History | View | Annotate | Download (116.2 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0613,W0201
25

    
26
import os
27
import os.path
28
import sha
29
import socket
30
import time
31
import tempfile
32
import re
33
import platform
34

    
35
from ganeti import rpc
36
from ganeti import ssh
37
from ganeti import logger
38
from ganeti import utils
39
from ganeti import errors
40
from ganeti import hypervisor
41
from ganeti import config
42
from ganeti import constants
43
from ganeti import objects
44
from ganeti import opcodes
45
from ganeti import ssconf
46

    
47
class LogicalUnit(object):
48
  """Logical Unit base class.
49

50
  Subclasses must follow these rules:
51
    - implement CheckPrereq which also fills in the opcode instance
52
      with all the fields (even if as None)
53
    - implement Exec
54
    - implement BuildHooksEnv
55
    - redefine HPATH and HTYPE
56
    - optionally redefine their run requirements (REQ_CLUSTER,
57
      REQ_MASTER); note that all commands require root permissions
58

59
  """
60
  HPATH = None
61
  HTYPE = None
62
  _OP_REQP = []
63
  REQ_CLUSTER = True
64
  REQ_MASTER = True
65

    
66
  def __init__(self, processor, op, cfg, sstore):
67
    """Constructor for LogicalUnit.
68

69
    This needs to be overriden in derived classes in order to check op
70
    validity.
71

72
    """
73
    self.processor = processor
74
    self.op = op
75
    self.cfg = cfg
76
    self.sstore = sstore
77
    for attr_name in self._OP_REQP:
78
      attr_val = getattr(op, attr_name, None)
79
      if attr_val is None:
80
        raise errors.OpPrereqError("Required parameter '%s' missing" %
81
                                   attr_name)
82
    if self.REQ_CLUSTER:
83
      if not cfg.IsCluster():
84
        raise errors.OpPrereqError("Cluster not initialized yet,"
85
                                   " use 'gnt-cluster init' first.")
86
      if self.REQ_MASTER:
87
        master = sstore.GetMasterNode()
88
        if master != socket.gethostname():
89
          raise errors.OpPrereqError("Commands must be run on the master"
90
                                     " node %s" % master)
91

    
92
  def CheckPrereq(self):
93
    """Check prerequisites for this LU.
94

95
    This method should check that the prerequisites for the execution
96
    of this LU are fulfilled. It can do internode communication, but
97
    it should be idempotent - no cluster or system changes are
98
    allowed.
99

100
    The method should raise errors.OpPrereqError in case something is
101
    not fulfilled. Its return value is ignored.
102

103
    This method should also update all the parameters of the opcode to
104
    their canonical form; e.g. a short node name must be fully
105
    expanded after this method has successfully completed (so that
106
    hooks, logging, etc. work correctly).
107

108
    """
109
    raise NotImplementedError
110

    
111
  def Exec(self, feedback_fn):
112
    """Execute the LU.
113

114
    This method should implement the actual work. It should raise
115
    errors.OpExecError for failures that are somewhat dealt with in
116
    code, or expected.
117

118
    """
119
    raise NotImplementedError
120

    
121
  def BuildHooksEnv(self):
122
    """Build hooks environment for this LU.
123

124
    This method should return a three-node tuple consisting of: a dict
125
    containing the environment that will be used for running the
126
    specific hook for this LU, a list of node names on which the hook
127
    should run before the execution, and a list of node names on which
128
    the hook should run after the execution.
129

130
    The keys of the dict must not have 'GANETI_' prefixed as this will
131
    be handled in the hooks runner. Also note additional keys will be
132
    added by the hooks runner. If the LU doesn't define any
133
    environment, an empty dict (and not None) should be returned.
134

135
    As for the node lists, the master should not be included in the
136
    them, as it will be added by the hooks runner in case this LU
137
    requires a cluster to run on (otherwise we don't have a node
138
    list). No nodes should be returned as an empty list (and not
139
    None).
140

141
    Note that if the HPATH for a LU class is None, this function will
142
    not be called.
143

144
    """
145
    raise NotImplementedError
146

    
147

    
148
class NoHooksLU(LogicalUnit):
149
  """Simple LU which runs no hooks.
150

151
  This LU is intended as a parent for other LogicalUnits which will
152
  run no hooks, in order to reduce duplicate code.
153

154
  """
155
  HPATH = None
156
  HTYPE = None
157

    
158
  def BuildHooksEnv(self):
159
    """Build hooks env.
160

161
    This is a no-op, since we don't run hooks.
162

163
    """
164
    return
165

    
166

    
167
def _GetWantedNodes(lu, nodes):
168
  """Returns list of checked and expanded node names.
169

170
  Args:
171
    nodes: List of nodes (strings) or None for all
172

173
  """
174
  if not isinstance(nodes, list):
175
    raise errors.OpPrereqError("Invalid argument type 'nodes'")
176

    
177
  if nodes:
178
    wanted = []
179

    
180
    for name in nodes:
181
      node = lu.cfg.ExpandNodeName(name)
182
      if node is None:
183
        raise errors.OpPrereqError("No such node name '%s'" % name)
184
      wanted.append(node)
185

    
186
  else:
187
    wanted = lu.cfg.GetNodeList()
188
  return utils.NiceSort(wanted)
189

    
190

    
191
def _GetWantedInstances(lu, instances):
192
  """Returns list of checked and expanded instance names.
193

194
  Args:
195
    instances: List of instances (strings) or None for all
196

197
  """
198
  if not isinstance(instances, list):
199
    raise errors.OpPrereqError("Invalid argument type 'instances'")
200

    
201
  if instances:
202
    wanted = []
203

    
204
    for name in instances:
205
      instance = lu.cfg.ExpandInstanceName(name)
206
      if instance is None:
207
        raise errors.OpPrereqError("No such instance name '%s'" % name)
208
      wanted.append(instance)
209

    
210
  else:
211
    wanted = lu.cfg.GetInstanceList()
212
  return utils.NiceSort(wanted)
213

    
214

    
215
def _CheckOutputFields(static, dynamic, selected):
216
  """Checks whether all selected fields are valid.
217

218
  Args:
219
    static: Static fields
220
    dynamic: Dynamic fields
221

222
  """
223
  static_fields = frozenset(static)
224
  dynamic_fields = frozenset(dynamic)
225

    
226
  all_fields = static_fields | dynamic_fields
227

    
228
  if not all_fields.issuperset(selected):
229
    raise errors.OpPrereqError("Unknown output fields selected: %s"
230
                               % ",".join(frozenset(selected).
231
                                          difference(all_fields)))
232

    
233

    
234
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
235
                          memory, vcpus, nics):
236
  """Builds instance related env variables for hooks from single variables.
237

238
  Args:
239
    secondary_nodes: List of secondary nodes as strings
240
  """
241
  env = {
242
    "INSTANCE_NAME": name,
243
    "INSTANCE_PRIMARY": primary_node,
244
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
245
    "INSTANCE_OS_TYPE": os_type,
246
    "INSTANCE_STATUS": status,
247
    "INSTANCE_MEMORY": memory,
248
    "INSTANCE_VCPUS": vcpus,
249
  }
250

    
251
  if nics:
252
    nic_count = len(nics)
253
    for idx, (ip, bridge) in enumerate(nics):
254
      if ip is None:
255
        ip = ""
256
      env["INSTANCE_NIC%d_IP" % idx] = ip
257
      env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
258
  else:
259
    nic_count = 0
260

    
261
  env["INSTANCE_NIC_COUNT"] = nic_count
262

    
263
  return env
264

    
265

    
266
def _BuildInstanceHookEnvByObject(instance, override=None):
267
  """Builds instance related env variables for hooks from an object.
268

269
  Args:
270
    instance: objects.Instance object of instance
271
    override: dict of values to override
272
  """
273
  args = {
274
    'name': instance.name,
275
    'primary_node': instance.primary_node,
276
    'secondary_nodes': instance.secondary_nodes,
277
    'os_type': instance.os,
278
    'status': instance.os,
279
    'memory': instance.memory,
280
    'vcpus': instance.vcpus,
281
    'nics': [(nic.ip, nic.bridge) for nic in instance.nics],
282
  }
283
  if override:
284
    args.update(override)
285
  return _BuildInstanceHookEnv(**args)
286

    
287

    
288
def _UpdateEtcHosts(fullnode, ip):
289
  """Ensure a node has a correct entry in /etc/hosts.
290

291
  Args:
292
    fullnode - Fully qualified domain name of host. (str)
293
    ip       - IPv4 address of host (str)
294

295
  """
296
  node = fullnode.split(".", 1)[0]
297

    
298
  f = open('/etc/hosts', 'r+')
299

    
300
  inthere = False
301

    
302
  save_lines = []
303
  add_lines = []
304
  removed = False
305

    
306
  while True:
307
    rawline = f.readline()
308

    
309
    if not rawline:
310
      # End of file
311
      break
312

    
313
    line = rawline.split('\n')[0]
314

    
315
    # Strip off comments
316
    line = line.split('#')[0]
317

    
318
    if not line:
319
      # Entire line was comment, skip
320
      save_lines.append(rawline)
321
      continue
322

    
323
    fields = line.split()
324

    
325
    haveall = True
326
    havesome = False
327
    for spec in [ ip, fullnode, node ]:
328
      if spec not in fields:
329
        haveall = False
330
      if spec in fields:
331
        havesome = True
332

    
333
    if haveall:
334
      inthere = True
335
      save_lines.append(rawline)
336
      continue
337

    
338
    if havesome and not haveall:
339
      # Line (old, or manual?) which is missing some.  Remove.
340
      removed = True
341
      continue
342

    
343
    save_lines.append(rawline)
344

    
345
  if not inthere:
346
    add_lines.append('%s\t%s %s\n' % (ip, fullnode, node))
347

    
348
  if removed:
349
    if add_lines:
350
      save_lines = save_lines + add_lines
351

    
352
    # We removed a line, write a new file and replace old.
353
    fd, tmpname = tempfile.mkstemp('tmp', 'hosts_', '/etc')
354
    newfile = os.fdopen(fd, 'w')
355
    newfile.write(''.join(save_lines))
356
    newfile.close()
357
    os.rename(tmpname, '/etc/hosts')
358

    
359
  elif add_lines:
360
    # Simply appending a new line will do the trick.
361
    f.seek(0, 2)
362
    for add in add_lines:
363
      f.write(add)
364

    
365
  f.close()
366

    
367

    
368
def _UpdateKnownHosts(fullnode, ip, pubkey):
369
  """Ensure a node has a correct known_hosts entry.
370

371
  Args:
372
    fullnode - Fully qualified domain name of host. (str)
373
    ip       - IPv4 address of host (str)
374
    pubkey   - the public key of the cluster
375

376
  """
377
  if os.path.exists(constants.SSH_KNOWN_HOSTS_FILE):
378
    f = open(constants.SSH_KNOWN_HOSTS_FILE, 'r+')
379
  else:
380
    f = open(constants.SSH_KNOWN_HOSTS_FILE, 'w+')
381

    
382
  inthere = False
383

    
384
  save_lines = []
385
  add_lines = []
386
  removed = False
387

    
388
  while True:
389
    rawline = f.readline()
390
    logger.Debug('read %s' % (repr(rawline),))
391

    
392
    if not rawline:
393
      # End of file
394
      break
395

    
396
    line = rawline.split('\n')[0]
397

    
398
    parts = line.split(' ')
399
    fields = parts[0].split(',')
400
    key = parts[2]
401

    
402
    haveall = True
403
    havesome = False
404
    for spec in [ ip, fullnode ]:
405
      if spec not in fields:
406
        haveall = False
407
      if spec in fields:
408
        havesome = True
409

    
410
    logger.Debug("key, pubkey = %s." % (repr((key, pubkey)),))
411
    if haveall and key == pubkey:
412
      inthere = True
413
      save_lines.append(rawline)
414
      logger.Debug("Keeping known_hosts '%s'." % (repr(rawline),))
415
      continue
416

    
417
    if havesome and (not haveall or key != pubkey):
418
      removed = True
419
      logger.Debug("Discarding known_hosts '%s'." % (repr(rawline),))
420
      continue
421

    
422
    save_lines.append(rawline)
423

    
424
  if not inthere:
425
    add_lines.append('%s,%s ssh-rsa %s\n' % (fullnode, ip, pubkey))
426
    logger.Debug("Adding known_hosts '%s'." % (repr(add_lines[-1]),))
427

    
428
  if removed:
429
    save_lines = save_lines + add_lines
430

    
431
    # Write a new file and replace old.
432
    fd, tmpname = tempfile.mkstemp('.tmp', 'known_hosts.',
433
                                   constants.DATA_DIR)
434
    newfile = os.fdopen(fd, 'w')
435
    try:
436
      newfile.write(''.join(save_lines))
437
    finally:
438
      newfile.close()
439
    logger.Debug("Wrote new known_hosts.")
440
    os.rename(tmpname, constants.SSH_KNOWN_HOSTS_FILE)
441

    
442
  elif add_lines:
443
    # Simply appending a new line will do the trick.
444
    f.seek(0, 2)
445
    for add in add_lines:
446
      f.write(add)
447

    
448
  f.close()
449

    
450

    
451
def _HasValidVG(vglist, vgname):
452
  """Checks if the volume group list is valid.
453

454
  A non-None return value means there's an error, and the return value
455
  is the error message.
456

457
  """
458
  vgsize = vglist.get(vgname, None)
459
  if vgsize is None:
460
    return "volume group '%s' missing" % vgname
461
  elif vgsize < 20480:
462
    return ("volume group '%s' too small (20480MiB required, %dMib found)" %
463
            (vgname, vgsize))
464
  return None
465

    
466

    
467
def _InitSSHSetup(node):
468
  """Setup the SSH configuration for the cluster.
469

470

471
  This generates a dsa keypair for root, adds the pub key to the
472
  permitted hosts and adds the hostkey to its own known hosts.
473

474
  Args:
475
    node: the name of this host as a fqdn
476

477
  """
478
  if os.path.exists('/root/.ssh/id_dsa'):
479
    utils.CreateBackup('/root/.ssh/id_dsa')
480
  if os.path.exists('/root/.ssh/id_dsa.pub'):
481
    utils.CreateBackup('/root/.ssh/id_dsa.pub')
482

    
483
  utils.RemoveFile('/root/.ssh/id_dsa')
484
  utils.RemoveFile('/root/.ssh/id_dsa.pub')
485

    
486
  result = utils.RunCmd(["ssh-keygen", "-t", "dsa",
487
                         "-f", "/root/.ssh/id_dsa",
488
                         "-q", "-N", ""])
489
  if result.failed:
490
    raise errors.OpExecError("Could not generate ssh keypair, error %s" %
491
                             result.output)
492

    
493
  f = open('/root/.ssh/id_dsa.pub', 'r')
494
  try:
495
    utils.AddAuthorizedKey('/root/.ssh/authorized_keys', f.read(8192))
496
  finally:
497
    f.close()
498

    
499

    
500
def _InitGanetiServerSetup(ss):
501
  """Setup the necessary configuration for the initial node daemon.
502

503
  This creates the nodepass file containing the shared password for
504
  the cluster and also generates the SSL certificate.
505

506
  """
507
  # Create pseudo random password
508
  randpass = sha.new(os.urandom(64)).hexdigest()
509
  # and write it into sstore
510
  ss.SetKey(ss.SS_NODED_PASS, randpass)
511

    
512
  result = utils.RunCmd(["openssl", "req", "-new", "-newkey", "rsa:1024",
513
                         "-days", str(365*5), "-nodes", "-x509",
514
                         "-keyout", constants.SSL_CERT_FILE,
515
                         "-out", constants.SSL_CERT_FILE, "-batch"])
516
  if result.failed:
517
    raise errors.OpExecError("could not generate server ssl cert, command"
518
                             " %s had exitcode %s and error message %s" %
519
                             (result.cmd, result.exit_code, result.output))
520

    
521
  os.chmod(constants.SSL_CERT_FILE, 0400)
522

    
523
  result = utils.RunCmd([constants.NODE_INITD_SCRIPT, "restart"])
524

    
525
  if result.failed:
526
    raise errors.OpExecError("Could not start the node daemon, command %s"
527
                             " had exitcode %s and error %s" %
528
                             (result.cmd, result.exit_code, result.output))
529

    
530

    
531
class LUInitCluster(LogicalUnit):
532
  """Initialise the cluster.
533

534
  """
535
  HPATH = "cluster-init"
536
  HTYPE = constants.HTYPE_CLUSTER
537
  _OP_REQP = ["cluster_name", "hypervisor_type", "vg_name", "mac_prefix",
538
              "def_bridge", "master_netdev"]
539
  REQ_CLUSTER = False
540

    
541
  def BuildHooksEnv(self):
542
    """Build hooks env.
543

544
    Notes: Since we don't require a cluster, we must manually add
545
    ourselves in the post-run node list.
546

547
    """
548
    env = {
549
      "CLUSTER": self.op.cluster_name,
550
      "MASTER": self.hostname['hostname_full'],
551
      }
552
    return env, [], [self.hostname['hostname_full']]
553

    
554
  def CheckPrereq(self):
555
    """Verify that the passed name is a valid one.
556

557
    """
558
    if config.ConfigWriter.IsCluster():
559
      raise errors.OpPrereqError("Cluster is already initialised")
560

    
561
    hostname_local = socket.gethostname()
562
    self.hostname = hostname = utils.LookupHostname(hostname_local)
563
    if not hostname:
564
      raise errors.OpPrereqError("Cannot resolve my own hostname ('%s')" %
565
                                 hostname_local)
566

    
567
    if hostname["hostname_full"] != hostname_local:
568
      raise errors.OpPrereqError("My own hostname (%s) does not match the"
569
                                 " resolver (%s): probably not using FQDN"
570
                                 " for hostname." %
571
                                 (hostname_local, hostname["hostname_full"]))
572

    
573
    if hostname["ip"].startswith("127."):
574
      raise errors.OpPrereqError("This host's IP resolves to the private"
575
                                 " range (%s). Please fix DNS or /etc/hosts." %
576
                                 (hostname["ip"],))
577

    
578
    self.clustername = clustername = utils.LookupHostname(self.op.cluster_name)
579
    if not clustername:
580
      raise errors.OpPrereqError("Cannot resolve given cluster name ('%s')"
581
                                 % self.op.cluster_name)
582

    
583
    result = utils.RunCmd(["fping", "-S127.0.0.1", "-q", hostname['ip']])
584
    if result.failed:
585
      raise errors.OpPrereqError("Inconsistency: this host's name resolves"
586
                                 " to %s,\nbut this ip address does not"
587
                                 " belong to this host."
588
                                 " Aborting." % hostname['ip'])
589

    
590
    secondary_ip = getattr(self.op, "secondary_ip", None)
591
    if secondary_ip and not utils.IsValidIP(secondary_ip):
592
      raise errors.OpPrereqError("Invalid secondary ip given")
593
    if secondary_ip and secondary_ip != hostname['ip']:
594
      result = utils.RunCmd(["fping", "-S127.0.0.1", "-q", secondary_ip])
595
      if result.failed:
596
        raise errors.OpPrereqError("You gave %s as secondary IP,\n"
597
                                   "but it does not belong to this host." %
598
                                   secondary_ip)
599
    self.secondary_ip = secondary_ip
600

    
601
    # checks presence of the volume group given
602
    vgstatus = _HasValidVG(utils.ListVolumeGroups(), self.op.vg_name)
603

    
604
    if vgstatus:
605
      raise errors.OpPrereqError("Error: %s" % vgstatus)
606

    
607
    if not re.match("^[0-9a-z]{2}:[0-9a-z]{2}:[0-9a-z]{2}$",
608
                    self.op.mac_prefix):
609
      raise errors.OpPrereqError("Invalid mac prefix given '%s'" %
610
                                 self.op.mac_prefix)
611

    
612
    if self.op.hypervisor_type not in hypervisor.VALID_HTYPES:
613
      raise errors.OpPrereqError("Invalid hypervisor type given '%s'" %
614
                                 self.op.hypervisor_type)
615

    
616
    result = utils.RunCmd(["ip", "link", "show", "dev", self.op.master_netdev])
617
    if result.failed:
618
      raise errors.OpPrereqError("Invalid master netdev given (%s): '%s'" %
619
                                 (self.op.master_netdev,
620
                                  result.output.strip()))
621

    
622
  def Exec(self, feedback_fn):
623
    """Initialize the cluster.
624

625
    """
626
    clustername = self.clustername
627
    hostname = self.hostname
628

    
629
    # set up the simple store
630
    ss = ssconf.SimpleStore()
631
    ss.SetKey(ss.SS_HYPERVISOR, self.op.hypervisor_type)
632
    ss.SetKey(ss.SS_MASTER_NODE, hostname['hostname_full'])
633
    ss.SetKey(ss.SS_MASTER_IP, clustername['ip'])
634
    ss.SetKey(ss.SS_MASTER_NETDEV, self.op.master_netdev)
635
    ss.SetKey(ss.SS_CLUSTER_NAME, clustername['hostname'])
636

    
637
    # set up the inter-node password and certificate
638
    _InitGanetiServerSetup(ss)
639

    
640
    # start the master ip
641
    rpc.call_node_start_master(hostname['hostname_full'])
642

    
643
    # set up ssh config and /etc/hosts
644
    f = open('/etc/ssh/ssh_host_rsa_key.pub', 'r')
645
    try:
646
      sshline = f.read()
647
    finally:
648
      f.close()
649
    sshkey = sshline.split(" ")[1]
650

    
651
    _UpdateEtcHosts(hostname['hostname_full'],
652
                    hostname['ip'],
653
                    )
654

    
655
    _UpdateKnownHosts(hostname['hostname_full'],
656
                      hostname['ip'],
657
                      sshkey,
658
                      )
659

    
660
    _InitSSHSetup(hostname['hostname'])
661

    
662
    # init of cluster config file
663
    cfgw = config.ConfigWriter()
664
    cfgw.InitConfig(hostname['hostname'], hostname['ip'], self.secondary_ip,
665
                    sshkey, self.op.mac_prefix,
666
                    self.op.vg_name, self.op.def_bridge)
667

    
668

    
669
class LUDestroyCluster(NoHooksLU):
670
  """Logical unit for destroying the cluster.
671

672
  """
673
  _OP_REQP = []
674

    
675
  def CheckPrereq(self):
676
    """Check prerequisites.
677

678
    This checks whether the cluster is empty.
679

680
    Any errors are signalled by raising errors.OpPrereqError.
681

682
    """
683
    master = self.sstore.GetMasterNode()
684

    
685
    nodelist = self.cfg.GetNodeList()
686
    if len(nodelist) != 1 or nodelist[0] != master:
687
      raise errors.OpPrereqError("There are still %d node(s) in"
688
                                 " this cluster." % (len(nodelist) - 1))
689
    instancelist = self.cfg.GetInstanceList()
690
    if instancelist:
691
      raise errors.OpPrereqError("There are still %d instance(s) in"
692
                                 " this cluster." % len(instancelist))
693

    
694
  def Exec(self, feedback_fn):
695
    """Destroys the cluster.
696

697
    """
698
    utils.CreateBackup('/root/.ssh/id_dsa')
699
    utils.CreateBackup('/root/.ssh/id_dsa.pub')
700
    rpc.call_node_leave_cluster(self.sstore.GetMasterNode())
701

    
702

    
703
class LUVerifyCluster(NoHooksLU):
704
  """Verifies the cluster status.
705

706
  """
707
  _OP_REQP = []
708

    
709
  def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
710
                  remote_version, feedback_fn):
711
    """Run multiple tests against a node.
712

713
    Test list:
714
      - compares ganeti version
715
      - checks vg existance and size > 20G
716
      - checks config file checksum
717
      - checks ssh to other nodes
718

719
    Args:
720
      node: name of the node to check
721
      file_list: required list of files
722
      local_cksum: dictionary of local files and their checksums
723

724
    """
725
    # compares ganeti version
726
    local_version = constants.PROTOCOL_VERSION
727
    if not remote_version:
728
      feedback_fn(" - ERROR: connection to %s failed" % (node))
729
      return True
730

    
731
    if local_version != remote_version:
732
      feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
733
                      (local_version, node, remote_version))
734
      return True
735

    
736
    # checks vg existance and size > 20G
737

    
738
    bad = False
739
    if not vglist:
740
      feedback_fn("  - ERROR: unable to check volume groups on node %s." %
741
                      (node,))
742
      bad = True
743
    else:
744
      vgstatus = _HasValidVG(vglist, self.cfg.GetVGName())
745
      if vgstatus:
746
        feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
747
        bad = True
748

    
749
    # checks config file checksum
750
    # checks ssh to any
751

    
752
    if 'filelist' not in node_result:
753
      bad = True
754
      feedback_fn("  - ERROR: node hasn't returned file checksum data")
755
    else:
756
      remote_cksum = node_result['filelist']
757
      for file_name in file_list:
758
        if file_name not in remote_cksum:
759
          bad = True
760
          feedback_fn("  - ERROR: file '%s' missing" % file_name)
761
        elif remote_cksum[file_name] != local_cksum[file_name]:
762
          bad = True
763
          feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
764

    
765
    if 'nodelist' not in node_result:
766
      bad = True
767
      feedback_fn("  - ERROR: node hasn't returned node connectivity data")
768
    else:
769
      if node_result['nodelist']:
770
        bad = True
771
        for node in node_result['nodelist']:
772
          feedback_fn("  - ERROR: communication with node '%s': %s" %
773
                          (node, node_result['nodelist'][node]))
774
    hyp_result = node_result.get('hypervisor', None)
775
    if hyp_result is not None:
776
      feedback_fn("  - ERROR: hypervisor verify failure: '%s'" % hyp_result)
777
    return bad
778

    
779
  def _VerifyInstance(self, instance, node_vol_is, node_instance, feedback_fn):
780
    """Verify an instance.
781

782
    This function checks to see if the required block devices are
783
    available on the instance's node.
784

785
    """
786
    bad = False
787

    
788
    instancelist = self.cfg.GetInstanceList()
789
    if not instance in instancelist:
790
      feedback_fn("  - ERROR: instance %s not in instance list %s" %
791
                      (instance, instancelist))
792
      bad = True
793

    
794
    instanceconfig = self.cfg.GetInstanceInfo(instance)
795
    node_current = instanceconfig.primary_node
796

    
797
    node_vol_should = {}
798
    instanceconfig.MapLVsByNode(node_vol_should)
799

    
800
    for node in node_vol_should:
801
      for volume in node_vol_should[node]:
802
        if node not in node_vol_is or volume not in node_vol_is[node]:
803
          feedback_fn("  - ERROR: volume %s missing on node %s" %
804
                          (volume, node))
805
          bad = True
806

    
807
    if not instanceconfig.status == 'down':
808
      if not instance in node_instance[node_current]:
809
        feedback_fn("  - ERROR: instance %s not running on node %s" %
810
                        (instance, node_current))
811
        bad = True
812

    
813
    for node in node_instance:
814
      if (not node == node_current):
815
        if instance in node_instance[node]:
816
          feedback_fn("  - ERROR: instance %s should not run on node %s" %
817
                          (instance, node))
818
          bad = True
819

    
820
    return not bad
821

    
822
  def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
823
    """Verify if there are any unknown volumes in the cluster.
824

825
    The .os, .swap and backup volumes are ignored. All other volumes are
826
    reported as unknown.
827

828
    """
829
    bad = False
830

    
831
    for node in node_vol_is:
832
      for volume in node_vol_is[node]:
833
        if node not in node_vol_should or volume not in node_vol_should[node]:
834
          feedback_fn("  - ERROR: volume %s on node %s should not exist" %
835
                      (volume, node))
836
          bad = True
837
    return bad
838

    
839
  def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
840
    """Verify the list of running instances.
841

842
    This checks what instances are running but unknown to the cluster.
843

844
    """
845
    bad = False
846
    for node in node_instance:
847
      for runninginstance in node_instance[node]:
848
        if runninginstance not in instancelist:
849
          feedback_fn("  - ERROR: instance %s on node %s should not exist" %
850
                          (runninginstance, node))
851
          bad = True
852
    return bad
853

    
854
  def CheckPrereq(self):
855
    """Check prerequisites.
856

857
    This has no prerequisites.
858

859
    """
860
    pass
861

    
862
  def Exec(self, feedback_fn):
863
    """Verify integrity of cluster, performing various test on nodes.
864

865
    """
866
    bad = False
867
    feedback_fn("* Verifying global settings")
868
    self.cfg.VerifyConfig()
869

    
870
    master = self.sstore.GetMasterNode()
871
    vg_name = self.cfg.GetVGName()
872
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
873
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
874
    node_volume = {}
875
    node_instance = {}
876

    
877
    # FIXME: verify OS list
878
    # do local checksums
879
    file_names = list(self.sstore.GetFileList())
880
    file_names.append(constants.SSL_CERT_FILE)
881
    file_names.append(constants.CLUSTER_CONF_FILE)
882
    local_checksums = utils.FingerprintFiles(file_names)
883

    
884
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
885
    all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
886
    all_instanceinfo = rpc.call_instance_list(nodelist)
887
    all_vglist = rpc.call_vg_list(nodelist)
888
    node_verify_param = {
889
      'filelist': file_names,
890
      'nodelist': nodelist,
891
      'hypervisor': None,
892
      }
893
    all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
894
    all_rversion = rpc.call_version(nodelist)
895

    
896
    for node in nodelist:
897
      feedback_fn("* Verifying node %s" % node)
898
      result = self._VerifyNode(node, file_names, local_checksums,
899
                                all_vglist[node], all_nvinfo[node],
900
                                all_rversion[node], feedback_fn)
901
      bad = bad or result
902

    
903
      # node_volume
904
      volumeinfo = all_volumeinfo[node]
905

    
906
      if type(volumeinfo) != dict:
907
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
908
        bad = True
909
        continue
910

    
911
      node_volume[node] = volumeinfo
912

    
913
      # node_instance
914
      nodeinstance = all_instanceinfo[node]
915
      if type(nodeinstance) != list:
916
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
917
        bad = True
918
        continue
919

    
920
      node_instance[node] = nodeinstance
921

    
922
    node_vol_should = {}
923

    
924
    for instance in instancelist:
925
      feedback_fn("* Verifying instance %s" % instance)
926
      result =  self._VerifyInstance(instance, node_volume, node_instance,
927
                                     feedback_fn)
928
      bad = bad or result
929

    
930
      inst_config = self.cfg.GetInstanceInfo(instance)
931

    
932
      inst_config.MapLVsByNode(node_vol_should)
933

    
934
    feedback_fn("* Verifying orphan volumes")
935
    result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
936
                                       feedback_fn)
937
    bad = bad or result
938

    
939
    feedback_fn("* Verifying remaining instances")
940
    result = self._VerifyOrphanInstances(instancelist, node_instance,
941
                                         feedback_fn)
942
    bad = bad or result
943

    
944
    return int(bad)
945

    
946

    
947
def _WaitForSync(cfgw, instance, oneshot=False, unlock=False):
948
  """Sleep and poll for an instance's disk to sync.
949

950
  """
951
  if not instance.disks:
952
    return True
953

    
954
  if not oneshot:
955
    logger.ToStdout("Waiting for instance %s to sync disks." % instance.name)
956

    
957
  node = instance.primary_node
958

    
959
  for dev in instance.disks:
960
    cfgw.SetDiskID(dev, node)
961

    
962
  retries = 0
963
  while True:
964
    max_time = 0
965
    done = True
966
    cumul_degraded = False
967
    rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
968
    if not rstats:
969
      logger.ToStderr("Can't get any data from node %s" % node)
970
      retries += 1
971
      if retries >= 10:
972
        raise errors.RemoteError("Can't contact node %s for mirror data,"
973
                                 " aborting." % node)
974
      time.sleep(6)
975
      continue
976
    retries = 0
977
    for i in range(len(rstats)):
978
      mstat = rstats[i]
979
      if mstat is None:
980
        logger.ToStderr("Can't compute data for node %s/%s" %
981
                        (node, instance.disks[i].iv_name))
982
        continue
983
      perc_done, est_time, is_degraded = mstat
984
      cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
985
      if perc_done is not None:
986
        done = False
987
        if est_time is not None:
988
          rem_time = "%d estimated seconds remaining" % est_time
989
          max_time = est_time
990
        else:
991
          rem_time = "no time estimate"
992
        logger.ToStdout("- device %s: %5.2f%% done, %s" %
993
                        (instance.disks[i].iv_name, perc_done, rem_time))
994
    if done or oneshot:
995
      break
996

    
997
    if unlock:
998
      utils.Unlock('cmd')
999
    try:
1000
      time.sleep(min(60, max_time))
1001
    finally:
1002
      if unlock:
1003
        utils.Lock('cmd')
1004

    
1005
  if done:
1006
    logger.ToStdout("Instance %s's disks are in sync." % instance.name)
1007
  return not cumul_degraded
1008

    
1009

    
1010
def _CheckDiskConsistency(cfgw, dev, node, on_primary):
1011
  """Check that mirrors are not degraded.
1012

1013
  """
1014
  cfgw.SetDiskID(dev, node)
1015

    
1016
  result = True
1017
  if on_primary or dev.AssembleOnSecondary():
1018
    rstats = rpc.call_blockdev_find(node, dev)
1019
    if not rstats:
1020
      logger.ToStderr("Can't get any data from node %s" % node)
1021
      result = False
1022
    else:
1023
      result = result and (not rstats[5])
1024
  if dev.children:
1025
    for child in dev.children:
1026
      result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1027

    
1028
  return result
1029

    
1030

    
1031
class LUDiagnoseOS(NoHooksLU):
1032
  """Logical unit for OS diagnose/query.
1033

1034
  """
1035
  _OP_REQP = []
1036

    
1037
  def CheckPrereq(self):
1038
    """Check prerequisites.
1039

1040
    This always succeeds, since this is a pure query LU.
1041

1042
    """
1043
    return
1044

    
1045
  def Exec(self, feedback_fn):
1046
    """Compute the list of OSes.
1047

1048
    """
1049
    node_list = self.cfg.GetNodeList()
1050
    node_data = rpc.call_os_diagnose(node_list)
1051
    if node_data == False:
1052
      raise errors.OpExecError("Can't gather the list of OSes")
1053
    return node_data
1054

    
1055

    
1056
class LURemoveNode(LogicalUnit):
1057
  """Logical unit for removing a node.
1058

1059
  """
1060
  HPATH = "node-remove"
1061
  HTYPE = constants.HTYPE_NODE
1062
  _OP_REQP = ["node_name"]
1063

    
1064
  def BuildHooksEnv(self):
1065
    """Build hooks env.
1066

1067
    This doesn't run on the target node in the pre phase as a failed
1068
    node would not allows itself to run.
1069

1070
    """
1071
    env = {
1072
      "NODE_NAME": self.op.node_name,
1073
      }
1074
    all_nodes = self.cfg.GetNodeList()
1075
    all_nodes.remove(self.op.node_name)
1076
    return env, all_nodes, all_nodes
1077

    
1078
  def CheckPrereq(self):
1079
    """Check prerequisites.
1080

1081
    This checks:
1082
     - the node exists in the configuration
1083
     - it does not have primary or secondary instances
1084
     - it's not the master
1085

1086
    Any errors are signalled by raising errors.OpPrereqError.
1087

1088
    """
1089
    node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1090
    if node is None:
1091
      raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1092

    
1093
    instance_list = self.cfg.GetInstanceList()
1094

    
1095
    masternode = self.sstore.GetMasterNode()
1096
    if node.name == masternode:
1097
      raise errors.OpPrereqError("Node is the master node,"
1098
                                 " you need to failover first.")
1099

    
1100
    for instance_name in instance_list:
1101
      instance = self.cfg.GetInstanceInfo(instance_name)
1102
      if node.name == instance.primary_node:
1103
        raise errors.OpPrereqError("Instance %s still running on the node,"
1104
                                   " please remove first." % instance_name)
1105
      if node.name in instance.secondary_nodes:
1106
        raise errors.OpPrereqError("Instance %s has node as a secondary,"
1107
                                   " please remove first." % instance_name)
1108
    self.op.node_name = node.name
1109
    self.node = node
1110

    
1111
  def Exec(self, feedback_fn):
1112
    """Removes the node from the cluster.
1113

1114
    """
1115
    node = self.node
1116
    logger.Info("stopping the node daemon and removing configs from node %s" %
1117
                node.name)
1118

    
1119
    rpc.call_node_leave_cluster(node.name)
1120

    
1121
    ssh.SSHCall(node.name, 'root', "%s stop" % constants.NODE_INITD_SCRIPT)
1122

    
1123
    logger.Info("Removing node %s from config" % node.name)
1124

    
1125
    self.cfg.RemoveNode(node.name)
1126

    
1127

    
1128
class LUQueryNodes(NoHooksLU):
1129
  """Logical unit for querying nodes.
1130

1131
  """
1132
  _OP_REQP = ["output_fields", "names"]
1133

    
1134
  def CheckPrereq(self):
1135
    """Check prerequisites.
1136

1137
    This checks that the fields required are valid output fields.
1138

1139
    """
1140
    self.dynamic_fields = frozenset(["dtotal", "dfree",
1141
                                     "mtotal", "mnode", "mfree"])
1142

    
1143
    _CheckOutputFields(static=["name", "pinst_cnt", "sinst_cnt",
1144
                               "pinst_list", "sinst_list",
1145
                               "pip", "sip"],
1146
                       dynamic=self.dynamic_fields,
1147
                       selected=self.op.output_fields)
1148

    
1149
    self.wanted = _GetWantedNodes(self, self.op.names)
1150

    
1151
  def Exec(self, feedback_fn):
1152
    """Computes the list of nodes and their attributes.
1153

1154
    """
1155
    nodenames = self.wanted
1156
    nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1157

    
1158
    # begin data gathering
1159

    
1160
    if self.dynamic_fields.intersection(self.op.output_fields):
1161
      live_data = {}
1162
      node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1163
      for name in nodenames:
1164
        nodeinfo = node_data.get(name, None)
1165
        if nodeinfo:
1166
          live_data[name] = {
1167
            "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1168
            "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1169
            "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1170
            "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1171
            "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1172
            }
1173
        else:
1174
          live_data[name] = {}
1175
    else:
1176
      live_data = dict.fromkeys(nodenames, {})
1177

    
1178
    node_to_primary = dict([(name, set()) for name in nodenames])
1179
    node_to_secondary = dict([(name, set()) for name in nodenames])
1180

    
1181
    inst_fields = frozenset(("pinst_cnt", "pinst_list",
1182
                             "sinst_cnt", "sinst_list"))
1183
    if inst_fields & frozenset(self.op.output_fields):
1184
      instancelist = self.cfg.GetInstanceList()
1185

    
1186
      for instance_name in instancelist:
1187
        inst = self.cfg.GetInstanceInfo(instance_name)
1188
        if inst.primary_node in node_to_primary:
1189
          node_to_primary[inst.primary_node].add(inst.name)
1190
        for secnode in inst.secondary_nodes:
1191
          if secnode in node_to_secondary:
1192
            node_to_secondary[secnode].add(inst.name)
1193

    
1194
    # end data gathering
1195

    
1196
    output = []
1197
    for node in nodelist:
1198
      node_output = []
1199
      for field in self.op.output_fields:
1200
        if field == "name":
1201
          val = node.name
1202
        elif field == "pinst_list":
1203
          val = list(node_to_primary[node.name])
1204
        elif field == "sinst_list":
1205
          val = list(node_to_secondary[node.name])
1206
        elif field == "pinst_cnt":
1207
          val = len(node_to_primary[node.name])
1208
        elif field == "sinst_cnt":
1209
          val = len(node_to_secondary[node.name])
1210
        elif field == "pip":
1211
          val = node.primary_ip
1212
        elif field == "sip":
1213
          val = node.secondary_ip
1214
        elif field in self.dynamic_fields:
1215
          val = live_data[node.name].get(field, None)
1216
        else:
1217
          raise errors.ParameterError(field)
1218
        node_output.append(val)
1219
      output.append(node_output)
1220

    
1221
    return output
1222

    
1223

    
1224
class LUQueryNodeVolumes(NoHooksLU):
1225
  """Logical unit for getting volumes on node(s).
1226

1227
  """
1228
  _OP_REQP = ["nodes", "output_fields"]
1229

    
1230
  def CheckPrereq(self):
1231
    """Check prerequisites.
1232

1233
    This checks that the fields required are valid output fields.
1234

1235
    """
1236
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1237

    
1238
    _CheckOutputFields(static=["node"],
1239
                       dynamic=["phys", "vg", "name", "size", "instance"],
1240
                       selected=self.op.output_fields)
1241

    
1242

    
1243
  def Exec(self, feedback_fn):
1244
    """Computes the list of nodes and their attributes.
1245

1246
    """
1247
    nodenames = self.nodes
1248
    volumes = rpc.call_node_volumes(nodenames)
1249

    
1250
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
1251
             in self.cfg.GetInstanceList()]
1252

    
1253
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1254

    
1255
    output = []
1256
    for node in nodenames:
1257
      if node not in volumes or not volumes[node]:
1258
        continue
1259

    
1260
      node_vols = volumes[node][:]
1261
      node_vols.sort(key=lambda vol: vol['dev'])
1262

    
1263
      for vol in node_vols:
1264
        node_output = []
1265
        for field in self.op.output_fields:
1266
          if field == "node":
1267
            val = node
1268
          elif field == "phys":
1269
            val = vol['dev']
1270
          elif field == "vg":
1271
            val = vol['vg']
1272
          elif field == "name":
1273
            val = vol['name']
1274
          elif field == "size":
1275
            val = int(float(vol['size']))
1276
          elif field == "instance":
1277
            for inst in ilist:
1278
              if node not in lv_by_node[inst]:
1279
                continue
1280
              if vol['name'] in lv_by_node[inst][node]:
1281
                val = inst.name
1282
                break
1283
            else:
1284
              val = '-'
1285
          else:
1286
            raise errors.ParameterError(field)
1287
          node_output.append(str(val))
1288

    
1289
        output.append(node_output)
1290

    
1291
    return output
1292

    
1293

    
1294
class LUAddNode(LogicalUnit):
1295
  """Logical unit for adding node to the cluster.
1296

1297
  """
1298
  HPATH = "node-add"
1299
  HTYPE = constants.HTYPE_NODE
1300
  _OP_REQP = ["node_name"]
1301

    
1302
  def BuildHooksEnv(self):
1303
    """Build hooks env.
1304

1305
    This will run on all nodes before, and on all nodes + the new node after.
1306

1307
    """
1308
    env = {
1309
      "NODE_NAME": self.op.node_name,
1310
      "NODE_PIP": self.op.primary_ip,
1311
      "NODE_SIP": self.op.secondary_ip,
1312
      }
1313
    nodes_0 = self.cfg.GetNodeList()
1314
    nodes_1 = nodes_0 + [self.op.node_name, ]
1315
    return env, nodes_0, nodes_1
1316

    
1317
  def CheckPrereq(self):
1318
    """Check prerequisites.
1319

1320
    This checks:
1321
     - the new node is not already in the config
1322
     - it is resolvable
1323
     - its parameters (single/dual homed) matches the cluster
1324

1325
    Any errors are signalled by raising errors.OpPrereqError.
1326

1327
    """
1328
    node_name = self.op.node_name
1329
    cfg = self.cfg
1330

    
1331
    dns_data = utils.LookupHostname(node_name)
1332
    if not dns_data:
1333
      raise errors.OpPrereqError("Node %s is not resolvable" % node_name)
1334

    
1335
    node = dns_data['hostname']
1336
    primary_ip = self.op.primary_ip = dns_data['ip']
1337
    secondary_ip = getattr(self.op, "secondary_ip", None)
1338
    if secondary_ip is None:
1339
      secondary_ip = primary_ip
1340
    if not utils.IsValidIP(secondary_ip):
1341
      raise errors.OpPrereqError("Invalid secondary IP given")
1342
    self.op.secondary_ip = secondary_ip
1343
    node_list = cfg.GetNodeList()
1344
    if node in node_list:
1345
      raise errors.OpPrereqError("Node %s is already in the configuration"
1346
                                 % node)
1347

    
1348
    for existing_node_name in node_list:
1349
      existing_node = cfg.GetNodeInfo(existing_node_name)
1350
      if (existing_node.primary_ip == primary_ip or
1351
          existing_node.secondary_ip == primary_ip or
1352
          existing_node.primary_ip == secondary_ip or
1353
          existing_node.secondary_ip == secondary_ip):
1354
        raise errors.OpPrereqError("New node ip address(es) conflict with"
1355
                                   " existing node %s" % existing_node.name)
1356

    
1357
    # check that the type of the node (single versus dual homed) is the
1358
    # same as for the master
1359
    myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1360
    master_singlehomed = myself.secondary_ip == myself.primary_ip
1361
    newbie_singlehomed = secondary_ip == primary_ip
1362
    if master_singlehomed != newbie_singlehomed:
1363
      if master_singlehomed:
1364
        raise errors.OpPrereqError("The master has no private ip but the"
1365
                                   " new node has one")
1366
      else:
1367
        raise errors.OpPrereqError("The master has a private ip but the"
1368
                                   " new node doesn't have one")
1369

    
1370
    # checks reachablity
1371
    command = ["fping", "-q", primary_ip]
1372
    result = utils.RunCmd(command)
1373
    if result.failed:
1374
      raise errors.OpPrereqError("Node not reachable by ping")
1375

    
1376
    if not newbie_singlehomed:
1377
      # check reachability from my secondary ip to newbie's secondary ip
1378
      command = ["fping", "-S%s" % myself.secondary_ip, "-q", secondary_ip]
1379
      result = utils.RunCmd(command)
1380
      if result.failed:
1381
        raise errors.OpPrereqError("Node secondary ip not reachable by ping")
1382

    
1383
    self.new_node = objects.Node(name=node,
1384
                                 primary_ip=primary_ip,
1385
                                 secondary_ip=secondary_ip)
1386

    
1387
  def Exec(self, feedback_fn):
1388
    """Adds the new node to the cluster.
1389

1390
    """
1391
    new_node = self.new_node
1392
    node = new_node.name
1393

    
1394
    # set up inter-node password and certificate and restarts the node daemon
1395
    gntpass = self.sstore.GetNodeDaemonPassword()
1396
    if not re.match('^[a-zA-Z0-9.]{1,64}$', gntpass):
1397
      raise errors.OpExecError("ganeti password corruption detected")
1398
    f = open(constants.SSL_CERT_FILE)
1399
    try:
1400
      gntpem = f.read(8192)
1401
    finally:
1402
      f.close()
1403
    # in the base64 pem encoding, neither '!' nor '.' are valid chars,
1404
    # so we use this to detect an invalid certificate; as long as the
1405
    # cert doesn't contain this, the here-document will be correctly
1406
    # parsed by the shell sequence below
1407
    if re.search('^!EOF\.', gntpem, re.MULTILINE):
1408
      raise errors.OpExecError("invalid PEM encoding in the SSL certificate")
1409
    if not gntpem.endswith("\n"):
1410
      raise errors.OpExecError("PEM must end with newline")
1411
    logger.Info("copy cluster pass to %s and starting the node daemon" % node)
1412

    
1413
    # and then connect with ssh to set password and start ganeti-noded
1414
    # note that all the below variables are sanitized at this point,
1415
    # either by being constants or by the checks above
1416
    ss = self.sstore
1417
    mycommand = ("umask 077 && "
1418
                 "echo '%s' > '%s' && "
1419
                 "cat > '%s' << '!EOF.' && \n"
1420
                 "%s!EOF.\n%s restart" %
1421
                 (gntpass, ss.KeyToFilename(ss.SS_NODED_PASS),
1422
                  constants.SSL_CERT_FILE, gntpem,
1423
                  constants.NODE_INITD_SCRIPT))
1424

    
1425
    result = ssh.SSHCall(node, 'root', mycommand, batch=False, ask_key=True)
1426
    if result.failed:
1427
      raise errors.OpExecError("Remote command on node %s, error: %s,"
1428
                               " output: %s" %
1429
                               (node, result.fail_reason, result.output))
1430

    
1431
    # check connectivity
1432
    time.sleep(4)
1433

    
1434
    result = rpc.call_version([node])[node]
1435
    if result:
1436
      if constants.PROTOCOL_VERSION == result:
1437
        logger.Info("communication to node %s fine, sw version %s match" %
1438
                    (node, result))
1439
      else:
1440
        raise errors.OpExecError("Version mismatch master version %s,"
1441
                                 " node version %s" %
1442
                                 (constants.PROTOCOL_VERSION, result))
1443
    else:
1444
      raise errors.OpExecError("Cannot get version from the new node")
1445

    
1446
    # setup ssh on node
1447
    logger.Info("copy ssh key to node %s" % node)
1448
    keyarray = []
1449
    keyfiles = ["/etc/ssh/ssh_host_dsa_key", "/etc/ssh/ssh_host_dsa_key.pub",
1450
                "/etc/ssh/ssh_host_rsa_key", "/etc/ssh/ssh_host_rsa_key.pub",
1451
                "/root/.ssh/id_dsa", "/root/.ssh/id_dsa.pub"]
1452

    
1453
    for i in keyfiles:
1454
      f = open(i, 'r')
1455
      try:
1456
        keyarray.append(f.read())
1457
      finally:
1458
        f.close()
1459

    
1460
    result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1461
                               keyarray[3], keyarray[4], keyarray[5])
1462

    
1463
    if not result:
1464
      raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1465

    
1466
    # Add node to our /etc/hosts, and add key to known_hosts
1467
    _UpdateEtcHosts(new_node.name, new_node.primary_ip)
1468
    _UpdateKnownHosts(new_node.name, new_node.primary_ip,
1469
                      self.cfg.GetHostKey())
1470

    
1471
    if new_node.secondary_ip != new_node.primary_ip:
1472
      result = ssh.SSHCall(node, "root",
1473
                           "fping -S 127.0.0.1 -q %s" % new_node.secondary_ip)
1474
      if result.failed:
1475
        raise errors.OpExecError("Node claims it doesn't have the"
1476
                                 " secondary ip you gave (%s).\n"
1477
                                 "Please fix and re-run this command." %
1478
                                 new_node.secondary_ip)
1479

    
1480
    success, msg = ssh.VerifyNodeHostname(node)
1481
    if not success:
1482
      raise errors.OpExecError("Node '%s' claims it has a different hostname"
1483
                               " than the one the resolver gives: %s.\n"
1484
                               "Please fix and re-run this command." %
1485
                               (node, msg))
1486

    
1487
    # Distribute updated /etc/hosts and known_hosts to all nodes,
1488
    # including the node just added
1489
    myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1490
    dist_nodes = self.cfg.GetNodeList() + [node]
1491
    if myself.name in dist_nodes:
1492
      dist_nodes.remove(myself.name)
1493

    
1494
    logger.Debug("Copying hosts and known_hosts to all nodes")
1495
    for fname in ("/etc/hosts", constants.SSH_KNOWN_HOSTS_FILE):
1496
      result = rpc.call_upload_file(dist_nodes, fname)
1497
      for to_node in dist_nodes:
1498
        if not result[to_node]:
1499
          logger.Error("copy of file %s to node %s failed" %
1500
                       (fname, to_node))
1501

    
1502
    to_copy = ss.GetFileList()
1503
    for fname in to_copy:
1504
      if not ssh.CopyFileToNode(node, fname):
1505
        logger.Error("could not copy file %s to node %s" % (fname, node))
1506

    
1507
    logger.Info("adding node %s to cluster.conf" % node)
1508
    self.cfg.AddNode(new_node)
1509

    
1510

    
1511
class LUMasterFailover(LogicalUnit):
1512
  """Failover the master node to the current node.
1513

1514
  This is a special LU in that it must run on a non-master node.
1515

1516
  """
1517
  HPATH = "master-failover"
1518
  HTYPE = constants.HTYPE_CLUSTER
1519
  REQ_MASTER = False
1520
  _OP_REQP = []
1521

    
1522
  def BuildHooksEnv(self):
1523
    """Build hooks env.
1524

1525
    This will run on the new master only in the pre phase, and on all
1526
    the nodes in the post phase.
1527

1528
    """
1529
    env = {
1530
      "NEW_MASTER": self.new_master,
1531
      "OLD_MASTER": self.old_master,
1532
      }
1533
    return env, [self.new_master], self.cfg.GetNodeList()
1534

    
1535
  def CheckPrereq(self):
1536
    """Check prerequisites.
1537

1538
    This checks that we are not already the master.
1539

1540
    """
1541
    self.new_master = socket.gethostname()
1542

    
1543
    self.old_master = self.sstore.GetMasterNode()
1544

    
1545
    if self.old_master == self.new_master:
1546
      raise errors.OpPrereqError("This commands must be run on the node"
1547
                                 " where you want the new master to be.\n"
1548
                                 "%s is already the master" %
1549
                                 self.old_master)
1550

    
1551
  def Exec(self, feedback_fn):
1552
    """Failover the master node.
1553

1554
    This command, when run on a non-master node, will cause the current
1555
    master to cease being master, and the non-master to become new
1556
    master.
1557

1558
    """
1559
    #TODO: do not rely on gethostname returning the FQDN
1560
    logger.Info("setting master to %s, old master: %s" %
1561
                (self.new_master, self.old_master))
1562

    
1563
    if not rpc.call_node_stop_master(self.old_master):
1564
      logger.Error("could disable the master role on the old master"
1565
                   " %s, please disable manually" % self.old_master)
1566

    
1567
    ss = self.sstore
1568
    ss.SetKey(ss.SS_MASTER_NODE, self.new_master)
1569
    if not rpc.call_upload_file(self.cfg.GetNodeList(),
1570
                                ss.KeyToFilename(ss.SS_MASTER_NODE)):
1571
      logger.Error("could not distribute the new simple store master file"
1572
                   " to the other nodes, please check.")
1573

    
1574
    if not rpc.call_node_start_master(self.new_master):
1575
      logger.Error("could not start the master role on the new master"
1576
                   " %s, please check" % self.new_master)
1577
      feedback_fn("Error in activating the master IP on the new master,\n"
1578
                  "please fix manually.")
1579

    
1580

    
1581

    
1582
class LUQueryClusterInfo(NoHooksLU):
1583
  """Query cluster configuration.
1584

1585
  """
1586
  _OP_REQP = []
1587
  REQ_MASTER = False
1588

    
1589
  def CheckPrereq(self):
1590
    """No prerequsites needed for this LU.
1591

1592
    """
1593
    pass
1594

    
1595
  def Exec(self, feedback_fn):
1596
    """Return cluster config.
1597

1598
    """
1599
    result = {
1600
      "name": self.sstore.GetClusterName(),
1601
      "software_version": constants.RELEASE_VERSION,
1602
      "protocol_version": constants.PROTOCOL_VERSION,
1603
      "config_version": constants.CONFIG_VERSION,
1604
      "os_api_version": constants.OS_API_VERSION,
1605
      "export_version": constants.EXPORT_VERSION,
1606
      "master": self.sstore.GetMasterNode(),
1607
      "architecture": (platform.architecture()[0], platform.machine()),
1608
      }
1609

    
1610
    return result
1611

    
1612

    
1613
class LUClusterCopyFile(NoHooksLU):
1614
  """Copy file to cluster.
1615

1616
  """
1617
  _OP_REQP = ["nodes", "filename"]
1618

    
1619
  def CheckPrereq(self):
1620
    """Check prerequisites.
1621

1622
    It should check that the named file exists and that the given list
1623
    of nodes is valid.
1624

1625
    """
1626
    if not os.path.exists(self.op.filename):
1627
      raise errors.OpPrereqError("No such filename '%s'" % self.op.filename)
1628

    
1629
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1630

    
1631
  def Exec(self, feedback_fn):
1632
    """Copy a file from master to some nodes.
1633

1634
    Args:
1635
      opts - class with options as members
1636
      args - list containing a single element, the file name
1637
    Opts used:
1638
      nodes - list containing the name of target nodes; if empty, all nodes
1639

1640
    """
1641
    filename = self.op.filename
1642

    
1643
    myname = socket.gethostname()
1644

    
1645
    for node in self.nodes:
1646
      if node == myname:
1647
        continue
1648
      if not ssh.CopyFileToNode(node, filename):
1649
        logger.Error("Copy of file %s to node %s failed" % (filename, node))
1650

    
1651

    
1652
class LUDumpClusterConfig(NoHooksLU):
1653
  """Return a text-representation of the cluster-config.
1654

1655
  """
1656
  _OP_REQP = []
1657

    
1658
  def CheckPrereq(self):
1659
    """No prerequisites.
1660

1661
    """
1662
    pass
1663

    
1664
  def Exec(self, feedback_fn):
1665
    """Dump a representation of the cluster config to the standard output.
1666

1667
    """
1668
    return self.cfg.DumpConfig()
1669

    
1670

    
1671
class LURunClusterCommand(NoHooksLU):
1672
  """Run a command on some nodes.
1673

1674
  """
1675
  _OP_REQP = ["command", "nodes"]
1676

    
1677
  def CheckPrereq(self):
1678
    """Check prerequisites.
1679

1680
    It checks that the given list of nodes is valid.
1681

1682
    """
1683
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1684

    
1685
  def Exec(self, feedback_fn):
1686
    """Run a command on some nodes.
1687

1688
    """
1689
    data = []
1690
    for node in self.nodes:
1691
      result = ssh.SSHCall(node, "root", self.op.command)
1692
      data.append((node, result.output, result.exit_code))
1693

    
1694
    return data
1695

    
1696

    
1697
class LUActivateInstanceDisks(NoHooksLU):
1698
  """Bring up an instance's disks.
1699

1700
  """
1701
  _OP_REQP = ["instance_name"]
1702

    
1703
  def CheckPrereq(self):
1704
    """Check prerequisites.
1705

1706
    This checks that the instance is in the cluster.
1707

1708
    """
1709
    instance = self.cfg.GetInstanceInfo(
1710
      self.cfg.ExpandInstanceName(self.op.instance_name))
1711
    if instance is None:
1712
      raise errors.OpPrereqError("Instance '%s' not known" %
1713
                                 self.op.instance_name)
1714
    self.instance = instance
1715

    
1716

    
1717
  def Exec(self, feedback_fn):
1718
    """Activate the disks.
1719

1720
    """
1721
    disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
1722
    if not disks_ok:
1723
      raise errors.OpExecError("Cannot activate block devices")
1724

    
1725
    return disks_info
1726

    
1727

    
1728
def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
1729
  """Prepare the block devices for an instance.
1730

1731
  This sets up the block devices on all nodes.
1732

1733
  Args:
1734
    instance: a ganeti.objects.Instance object
1735
    ignore_secondaries: if true, errors on secondary nodes won't result
1736
                        in an error return from the function
1737

1738
  Returns:
1739
    false if the operation failed
1740
    list of (host, instance_visible_name, node_visible_name) if the operation
1741
         suceeded with the mapping from node devices to instance devices
1742
  """
1743
  device_info = []
1744
  disks_ok = True
1745
  for inst_disk in instance.disks:
1746
    master_result = None
1747
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1748
      cfg.SetDiskID(node_disk, node)
1749
      is_primary = node == instance.primary_node
1750
      result = rpc.call_blockdev_assemble(node, node_disk, is_primary)
1751
      if not result:
1752
        logger.Error("could not prepare block device %s on node %s (is_pri"
1753
                     "mary=%s)" % (inst_disk.iv_name, node, is_primary))
1754
        if is_primary or not ignore_secondaries:
1755
          disks_ok = False
1756
      if is_primary:
1757
        master_result = result
1758
    device_info.append((instance.primary_node, inst_disk.iv_name,
1759
                        master_result))
1760

    
1761
  return disks_ok, device_info
1762

    
1763

    
1764
def _StartInstanceDisks(cfg, instance, force):
1765
  """Start the disks of an instance.
1766

1767
  """
1768
  disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
1769
                                           ignore_secondaries=force)
1770
  if not disks_ok:
1771
    _ShutdownInstanceDisks(instance, cfg)
1772
    if force is not None and not force:
1773
      logger.Error("If the message above refers to a secondary node,"
1774
                   " you can retry the operation using '--force'.")
1775
    raise errors.OpExecError("Disk consistency error")
1776

    
1777

    
1778
class LUDeactivateInstanceDisks(NoHooksLU):
1779
  """Shutdown an instance's disks.
1780

1781
  """
1782
  _OP_REQP = ["instance_name"]
1783

    
1784
  def CheckPrereq(self):
1785
    """Check prerequisites.
1786

1787
    This checks that the instance is in the cluster.
1788

1789
    """
1790
    instance = self.cfg.GetInstanceInfo(
1791
      self.cfg.ExpandInstanceName(self.op.instance_name))
1792
    if instance is None:
1793
      raise errors.OpPrereqError("Instance '%s' not known" %
1794
                                 self.op.instance_name)
1795
    self.instance = instance
1796

    
1797
  def Exec(self, feedback_fn):
1798
    """Deactivate the disks
1799

1800
    """
1801
    instance = self.instance
1802
    ins_l = rpc.call_instance_list([instance.primary_node])
1803
    ins_l = ins_l[instance.primary_node]
1804
    if not type(ins_l) is list:
1805
      raise errors.OpExecError("Can't contact node '%s'" %
1806
                               instance.primary_node)
1807

    
1808
    if self.instance.name in ins_l:
1809
      raise errors.OpExecError("Instance is running, can't shutdown"
1810
                               " block devices.")
1811

    
1812
    _ShutdownInstanceDisks(instance, self.cfg)
1813

    
1814

    
1815
def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
1816
  """Shutdown block devices of an instance.
1817

1818
  This does the shutdown on all nodes of the instance.
1819

1820
  If the ignore_primary is false, errors on the primary node are
1821
  ignored.
1822

1823
  """
1824
  result = True
1825
  for disk in instance.disks:
1826
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
1827
      cfg.SetDiskID(top_disk, node)
1828
      if not rpc.call_blockdev_shutdown(node, top_disk):
1829
        logger.Error("could not shutdown block device %s on node %s" %
1830
                     (disk.iv_name, node))
1831
        if not ignore_primary or node != instance.primary_node:
1832
          result = False
1833
  return result
1834

    
1835

    
1836
class LUStartupInstance(LogicalUnit):
1837
  """Starts an instance.
1838

1839
  """
1840
  HPATH = "instance-start"
1841
  HTYPE = constants.HTYPE_INSTANCE
1842
  _OP_REQP = ["instance_name", "force"]
1843

    
1844
  def BuildHooksEnv(self):
1845
    """Build hooks env.
1846

1847
    This runs on master, primary and secondary nodes of the instance.
1848

1849
    """
1850
    env = {
1851
      "FORCE": self.op.force,
1852
      }
1853
    env.update(_BuildInstanceHookEnvByObject(self.instance))
1854
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
1855
          list(self.instance.secondary_nodes))
1856
    return env, nl, nl
1857

    
1858
  def CheckPrereq(self):
1859
    """Check prerequisites.
1860

1861
    This checks that the instance is in the cluster.
1862

1863
    """
1864
    instance = self.cfg.GetInstanceInfo(
1865
      self.cfg.ExpandInstanceName(self.op.instance_name))
1866
    if instance is None:
1867
      raise errors.OpPrereqError("Instance '%s' not known" %
1868
                                 self.op.instance_name)
1869

    
1870
    # check bridges existance
1871
    brlist = [nic.bridge for nic in instance.nics]
1872
    if not rpc.call_bridges_exist(instance.primary_node, brlist):
1873
      raise errors.OpPrereqError("one or more target bridges %s does not"
1874
                                 " exist on destination node '%s'" %
1875
                                 (brlist, instance.primary_node))
1876

    
1877
    self.instance = instance
1878
    self.op.instance_name = instance.name
1879

    
1880
  def Exec(self, feedback_fn):
1881
    """Start the instance.
1882

1883
    """
1884
    instance = self.instance
1885
    force = self.op.force
1886
    extra_args = getattr(self.op, "extra_args", "")
1887

    
1888
    node_current = instance.primary_node
1889

    
1890
    nodeinfo = rpc.call_node_info([node_current], self.cfg.GetVGName())
1891
    if not nodeinfo:
1892
      raise errors.OpExecError("Could not contact node %s for infos" %
1893
                               (node_current))
1894

    
1895
    freememory = nodeinfo[node_current]['memory_free']
1896
    memory = instance.memory
1897
    if memory > freememory:
1898
      raise errors.OpExecError("Not enough memory to start instance"
1899
                               " %s on node %s"
1900
                               " needed %s MiB, available %s MiB" %
1901
                               (instance.name, node_current, memory,
1902
                                freememory))
1903

    
1904
    _StartInstanceDisks(self.cfg, instance, force)
1905

    
1906
    if not rpc.call_instance_start(node_current, instance, extra_args):
1907
      _ShutdownInstanceDisks(instance, self.cfg)
1908
      raise errors.OpExecError("Could not start instance")
1909

    
1910
    self.cfg.MarkInstanceUp(instance.name)
1911

    
1912

    
1913
class LUShutdownInstance(LogicalUnit):
1914
  """Shutdown an instance.
1915

1916
  """
1917
  HPATH = "instance-stop"
1918
  HTYPE = constants.HTYPE_INSTANCE
1919
  _OP_REQP = ["instance_name"]
1920

    
1921
  def BuildHooksEnv(self):
1922
    """Build hooks env.
1923

1924
    This runs on master, primary and secondary nodes of the instance.
1925

1926
    """
1927
    env = _BuildInstanceHookEnvByObject(self.instance)
1928
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
1929
          list(self.instance.secondary_nodes))
1930
    return env, nl, nl
1931

    
1932
  def CheckPrereq(self):
1933
    """Check prerequisites.
1934

1935
    This checks that the instance is in the cluster.
1936

1937
    """
1938
    instance = self.cfg.GetInstanceInfo(
1939
      self.cfg.ExpandInstanceName(self.op.instance_name))
1940
    if instance is None:
1941
      raise errors.OpPrereqError("Instance '%s' not known" %
1942
                                 self.op.instance_name)
1943
    self.instance = instance
1944

    
1945
  def Exec(self, feedback_fn):
1946
    """Shutdown the instance.
1947

1948
    """
1949
    instance = self.instance
1950
    node_current = instance.primary_node
1951
    if not rpc.call_instance_shutdown(node_current, instance):
1952
      logger.Error("could not shutdown instance")
1953

    
1954
    self.cfg.MarkInstanceDown(instance.name)
1955
    _ShutdownInstanceDisks(instance, self.cfg)
1956

    
1957

    
1958
class LUReinstallInstance(LogicalUnit):
1959
  """Reinstall an instance.
1960

1961
  """
1962
  HPATH = "instance-reinstall"
1963
  HTYPE = constants.HTYPE_INSTANCE
1964
  _OP_REQP = ["instance_name"]
1965

    
1966
  def BuildHooksEnv(self):
1967
    """Build hooks env.
1968

1969
    This runs on master, primary and secondary nodes of the instance.
1970

1971
    """
1972
    env = _BuildInstanceHookEnvByObject(self.instance)
1973
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
1974
          list(self.instance.secondary_nodes))
1975
    return env, nl, nl
1976

    
1977
  def CheckPrereq(self):
1978
    """Check prerequisites.
1979

1980
    This checks that the instance is in the cluster and is not running.
1981

1982
    """
1983
    instance = self.cfg.GetInstanceInfo(
1984
      self.cfg.ExpandInstanceName(self.op.instance_name))
1985
    if instance is None:
1986
      raise errors.OpPrereqError("Instance '%s' not known" %
1987
                                 self.op.instance_name)
1988
    if instance.disk_template == constants.DT_DISKLESS:
1989
      raise errors.OpPrereqError("Instance '%s' has no disks" %
1990
                                 self.op.instance_name)
1991
    if instance.status != "down":
1992
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
1993
                                 self.op.instance_name)
1994
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
1995
    if remote_info:
1996
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
1997
                                 (self.op.instance_name,
1998
                                  instance.primary_node))
1999

    
2000
    self.op.os_type = getattr(self.op, "os_type", None)
2001
    if self.op.os_type is not None:
2002
      # OS verification
2003
      pnode = self.cfg.GetNodeInfo(
2004
        self.cfg.ExpandNodeName(instance.primary_node))
2005
      if pnode is None:
2006
        raise errors.OpPrereqError("Primary node '%s' is unknown" %
2007
                                   self.op.pnode)
2008
      os_obj = rpc.call_os_get([pnode.name], self.op.os_type)[pnode.name]
2009
      if not isinstance(os_obj, objects.OS):
2010
        raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2011
                                   " primary node"  % self.op.os_type)
2012

    
2013
    self.instance = instance
2014

    
2015
  def Exec(self, feedback_fn):
2016
    """Reinstall the instance.
2017

2018
    """
2019
    inst = self.instance
2020

    
2021
    if self.op.os_type is not None:
2022
      feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2023
      inst.os = self.op.os_type
2024
      self.cfg.AddInstance(inst)
2025

    
2026
    _StartInstanceDisks(self.cfg, inst, None)
2027
    try:
2028
      feedback_fn("Running the instance OS create scripts...")
2029
      if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2030
        raise errors.OpExecError("Could not install OS for instance %s "
2031
                                 "on node %s" %
2032
                                 (inst.name, inst.primary_node))
2033
    finally:
2034
      _ShutdownInstanceDisks(inst, self.cfg)
2035

    
2036

    
2037
class LURemoveInstance(LogicalUnit):
2038
  """Remove an instance.
2039

2040
  """
2041
  HPATH = "instance-remove"
2042
  HTYPE = constants.HTYPE_INSTANCE
2043
  _OP_REQP = ["instance_name"]
2044

    
2045
  def BuildHooksEnv(self):
2046
    """Build hooks env.
2047

2048
    This runs on master, primary and secondary nodes of the instance.
2049

2050
    """
2051
    env = _BuildInstanceHookEnvByObject(self.instance)
2052
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2053
          list(self.instance.secondary_nodes))
2054
    return env, nl, nl
2055

    
2056
  def CheckPrereq(self):
2057
    """Check prerequisites.
2058

2059
    This checks that the instance is in the cluster.
2060

2061
    """
2062
    instance = self.cfg.GetInstanceInfo(
2063
      self.cfg.ExpandInstanceName(self.op.instance_name))
2064
    if instance is None:
2065
      raise errors.OpPrereqError("Instance '%s' not known" %
2066
                                 self.op.instance_name)
2067
    self.instance = instance
2068

    
2069
  def Exec(self, feedback_fn):
2070
    """Remove the instance.
2071

2072
    """
2073
    instance = self.instance
2074
    logger.Info("shutting down instance %s on node %s" %
2075
                (instance.name, instance.primary_node))
2076

    
2077
    if not rpc.call_instance_shutdown(instance.primary_node, instance):
2078
      raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2079
                               (instance.name, instance.primary_node))
2080

    
2081
    logger.Info("removing block devices for instance %s" % instance.name)
2082

    
2083
    _RemoveDisks(instance, self.cfg)
2084

    
2085
    logger.Info("removing instance %s out of cluster config" % instance.name)
2086

    
2087
    self.cfg.RemoveInstance(instance.name)
2088

    
2089

    
2090
class LUQueryInstances(NoHooksLU):
2091
  """Logical unit for querying instances.
2092

2093
  """
2094
  _OP_REQP = ["output_fields", "names"]
2095

    
2096
  def CheckPrereq(self):
2097
    """Check prerequisites.
2098

2099
    This checks that the fields required are valid output fields.
2100

2101
    """
2102
    self.dynamic_fields = frozenset(["oper_state", "oper_ram"])
2103
    _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2104
                               "admin_state", "admin_ram",
2105
                               "disk_template", "ip", "mac", "bridge",
2106
                               "sda_size", "sdb_size"],
2107
                       dynamic=self.dynamic_fields,
2108
                       selected=self.op.output_fields)
2109

    
2110
    self.wanted = _GetWantedInstances(self, self.op.names)
2111

    
2112
  def Exec(self, feedback_fn):
2113
    """Computes the list of nodes and their attributes.
2114

2115
    """
2116
    instance_names = self.wanted
2117
    instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2118
                     in instance_names]
2119

    
2120
    # begin data gathering
2121

    
2122
    nodes = frozenset([inst.primary_node for inst in instance_list])
2123

    
2124
    bad_nodes = []
2125
    if self.dynamic_fields.intersection(self.op.output_fields):
2126
      live_data = {}
2127
      node_data = rpc.call_all_instances_info(nodes)
2128
      for name in nodes:
2129
        result = node_data[name]
2130
        if result:
2131
          live_data.update(result)
2132
        elif result == False:
2133
          bad_nodes.append(name)
2134
        # else no instance is alive
2135
    else:
2136
      live_data = dict([(name, {}) for name in instance_names])
2137

    
2138
    # end data gathering
2139

    
2140
    output = []
2141
    for instance in instance_list:
2142
      iout = []
2143
      for field in self.op.output_fields:
2144
        if field == "name":
2145
          val = instance.name
2146
        elif field == "os":
2147
          val = instance.os
2148
        elif field == "pnode":
2149
          val = instance.primary_node
2150
        elif field == "snodes":
2151
          val = list(instance.secondary_nodes)
2152
        elif field == "admin_state":
2153
          val = (instance.status != "down")
2154
        elif field == "oper_state":
2155
          if instance.primary_node in bad_nodes:
2156
            val = None
2157
          else:
2158
            val = bool(live_data.get(instance.name))
2159
        elif field == "admin_ram":
2160
          val = instance.memory
2161
        elif field == "oper_ram":
2162
          if instance.primary_node in bad_nodes:
2163
            val = None
2164
          elif instance.name in live_data:
2165
            val = live_data[instance.name].get("memory", "?")
2166
          else:
2167
            val = "-"
2168
        elif field == "disk_template":
2169
          val = instance.disk_template
2170
        elif field == "ip":
2171
          val = instance.nics[0].ip
2172
        elif field == "bridge":
2173
          val = instance.nics[0].bridge
2174
        elif field == "mac":
2175
          val = instance.nics[0].mac
2176
        elif field == "sda_size" or field == "sdb_size":
2177
          disk = instance.FindDisk(field[:3])
2178
          if disk is None:
2179
            val = None
2180
          else:
2181
            val = disk.size
2182
        else:
2183
          raise errors.ParameterError(field)
2184
        iout.append(val)
2185
      output.append(iout)
2186

    
2187
    return output
2188

    
2189

    
2190
class LUFailoverInstance(LogicalUnit):
2191
  """Failover an instance.
2192

2193
  """
2194
  HPATH = "instance-failover"
2195
  HTYPE = constants.HTYPE_INSTANCE
2196
  _OP_REQP = ["instance_name", "ignore_consistency"]
2197

    
2198
  def BuildHooksEnv(self):
2199
    """Build hooks env.
2200

2201
    This runs on master, primary and secondary nodes of the instance.
2202

2203
    """
2204
    env = {
2205
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2206
      }
2207
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2208
    nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2209
    return env, nl, nl
2210

    
2211
  def CheckPrereq(self):
2212
    """Check prerequisites.
2213

2214
    This checks that the instance is in the cluster.
2215

2216
    """
2217
    instance = self.cfg.GetInstanceInfo(
2218
      self.cfg.ExpandInstanceName(self.op.instance_name))
2219
    if instance is None:
2220
      raise errors.OpPrereqError("Instance '%s' not known" %
2221
                                 self.op.instance_name)
2222

    
2223
    if instance.disk_template != constants.DT_REMOTE_RAID1:
2224
      raise errors.OpPrereqError("Instance's disk layout is not"
2225
                                 " remote_raid1.")
2226

    
2227
    secondary_nodes = instance.secondary_nodes
2228
    if not secondary_nodes:
2229
      raise errors.ProgrammerError("no secondary node but using "
2230
                                   "DT_REMOTE_RAID1 template")
2231

    
2232
    # check memory requirements on the secondary node
2233
    target_node = secondary_nodes[0]
2234
    nodeinfo = rpc.call_node_info([target_node], self.cfg.GetVGName())
2235
    info = nodeinfo.get(target_node, None)
2236
    if not info:
2237
      raise errors.OpPrereqError("Cannot get current information"
2238
                                 " from node '%s'" % nodeinfo)
2239
    if instance.memory > info['memory_free']:
2240
      raise errors.OpPrereqError("Not enough memory on target node %s."
2241
                                 " %d MB available, %d MB required" %
2242
                                 (target_node, info['memory_free'],
2243
                                  instance.memory))
2244

    
2245
    # check bridge existance
2246
    brlist = [nic.bridge for nic in instance.nics]
2247
    if not rpc.call_bridges_exist(instance.primary_node, brlist):
2248
      raise errors.OpPrereqError("One or more target bridges %s does not"
2249
                                 " exist on destination node '%s'" %
2250
                                 (brlist, instance.primary_node))
2251

    
2252
    self.instance = instance
2253

    
2254
  def Exec(self, feedback_fn):
2255
    """Failover an instance.
2256

2257
    The failover is done by shutting it down on its present node and
2258
    starting it on the secondary.
2259

2260
    """
2261
    instance = self.instance
2262

    
2263
    source_node = instance.primary_node
2264
    target_node = instance.secondary_nodes[0]
2265

    
2266
    feedback_fn("* checking disk consistency between source and target")
2267
    for dev in instance.disks:
2268
      # for remote_raid1, these are md over drbd
2269
      if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2270
        if not self.op.ignore_consistency:
2271
          raise errors.OpExecError("Disk %s is degraded on target node,"
2272
                                   " aborting failover." % dev.iv_name)
2273

    
2274
    feedback_fn("* checking target node resource availability")
2275
    nodeinfo = rpc.call_node_info([target_node], self.cfg.GetVGName())
2276

    
2277
    if not nodeinfo:
2278
      raise errors.OpExecError("Could not contact target node %s." %
2279
                               target_node)
2280

    
2281
    free_memory = int(nodeinfo[target_node]['memory_free'])
2282
    memory = instance.memory
2283
    if memory > free_memory:
2284
      raise errors.OpExecError("Not enough memory to create instance %s on"
2285
                               " node %s. needed %s MiB, available %s MiB" %
2286
                               (instance.name, target_node, memory,
2287
                                free_memory))
2288

    
2289
    feedback_fn("* shutting down instance on source node")
2290
    logger.Info("Shutting down instance %s on node %s" %
2291
                (instance.name, source_node))
2292

    
2293
    if not rpc.call_instance_shutdown(source_node, instance):
2294
      logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2295
                   " anyway. Please make sure node %s is down"  %
2296
                   (instance.name, source_node, source_node))
2297

    
2298
    feedback_fn("* deactivating the instance's disks on source node")
2299
    if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2300
      raise errors.OpExecError("Can't shut down the instance's disks.")
2301

    
2302
    instance.primary_node = target_node
2303
    # distribute new instance config to the other nodes
2304
    self.cfg.AddInstance(instance)
2305

    
2306
    feedback_fn("* activating the instance's disks on target node")
2307
    logger.Info("Starting instance %s on node %s" %
2308
                (instance.name, target_node))
2309

    
2310
    disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2311
                                             ignore_secondaries=True)
2312
    if not disks_ok:
2313
      _ShutdownInstanceDisks(instance, self.cfg)
2314
      raise errors.OpExecError("Can't activate the instance's disks")
2315

    
2316
    feedback_fn("* starting the instance on the target node")
2317
    if not rpc.call_instance_start(target_node, instance, None):
2318
      _ShutdownInstanceDisks(instance, self.cfg)
2319
      raise errors.OpExecError("Could not start instance %s on node %s." %
2320
                               (instance.name, target_node))
2321

    
2322

    
2323
def _CreateBlockDevOnPrimary(cfg, node, device, info):
2324
  """Create a tree of block devices on the primary node.
2325

2326
  This always creates all devices.
2327

2328
  """
2329
  if device.children:
2330
    for child in device.children:
2331
      if not _CreateBlockDevOnPrimary(cfg, node, child, info):
2332
        return False
2333

    
2334
  cfg.SetDiskID(device, node)
2335
  new_id = rpc.call_blockdev_create(node, device, device.size, True, info)
2336
  if not new_id:
2337
    return False
2338
  if device.physical_id is None:
2339
    device.physical_id = new_id
2340
  return True
2341

    
2342

    
2343
def _CreateBlockDevOnSecondary(cfg, node, device, force, info):
2344
  """Create a tree of block devices on a secondary node.
2345

2346
  If this device type has to be created on secondaries, create it and
2347
  all its children.
2348

2349
  If not, just recurse to children keeping the same 'force' value.
2350

2351
  """
2352
  if device.CreateOnSecondary():
2353
    force = True
2354
  if device.children:
2355
    for child in device.children:
2356
      if not _CreateBlockDevOnSecondary(cfg, node, child, force, info):
2357
        return False
2358

    
2359
  if not force:
2360
    return True
2361
  cfg.SetDiskID(device, node)
2362
  new_id = rpc.call_blockdev_create(node, device, device.size, False, info)
2363
  if not new_id:
2364
    return False
2365
  if device.physical_id is None:
2366
    device.physical_id = new_id
2367
  return True
2368

    
2369

    
2370
def _GenerateUniqueNames(cfg, exts):
2371
  """Generate a suitable LV name.
2372

2373
  This will generate a logical volume name for the given instance.
2374

2375
  """
2376
  results = []
2377
  for val in exts:
2378
    new_id = cfg.GenerateUniqueID()
2379
    results.append("%s%s" % (new_id, val))
2380
  return results
2381

    
2382

    
2383
def _GenerateMDDRBDBranch(cfg, primary, secondary, size, names):
2384
  """Generate a drbd device complete with its children.
2385

2386
  """
2387
  port = cfg.AllocatePort()
2388
  vgname = cfg.GetVGName()
2389
  dev_data = objects.Disk(dev_type="lvm", size=size,
2390
                          logical_id=(vgname, names[0]))
2391
  dev_meta = objects.Disk(dev_type="lvm", size=128,
2392
                          logical_id=(vgname, names[1]))
2393
  drbd_dev = objects.Disk(dev_type="drbd", size=size,
2394
                          logical_id = (primary, secondary, port),
2395
                          children = [dev_data, dev_meta])
2396
  return drbd_dev
2397

    
2398

    
2399
def _GenerateDiskTemplate(cfg, template_name,
2400
                          instance_name, primary_node,
2401
                          secondary_nodes, disk_sz, swap_sz):
2402
  """Generate the entire disk layout for a given template type.
2403

2404
  """
2405
  #TODO: compute space requirements
2406

    
2407
  vgname = cfg.GetVGName()
2408
  if template_name == "diskless":
2409
    disks = []
2410
  elif template_name == "plain":
2411
    if len(secondary_nodes) != 0:
2412
      raise errors.ProgrammerError("Wrong template configuration")
2413

    
2414
    names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
2415
    sda_dev = objects.Disk(dev_type="lvm", size=disk_sz,
2416
                           logical_id=(vgname, names[0]),
2417
                           iv_name = "sda")
2418
    sdb_dev = objects.Disk(dev_type="lvm", size=swap_sz,
2419
                           logical_id=(vgname, names[1]),
2420
                           iv_name = "sdb")
2421
    disks = [sda_dev, sdb_dev]
2422
  elif template_name == "local_raid1":
2423
    if len(secondary_nodes) != 0:
2424
      raise errors.ProgrammerError("Wrong template configuration")
2425

    
2426

    
2427
    names = _GenerateUniqueNames(cfg, [".sda_m1", ".sda_m2",
2428
                                       ".sdb_m1", ".sdb_m2"])
2429
    sda_dev_m1 = objects.Disk(dev_type="lvm", size=disk_sz,
2430
                              logical_id=(vgname, names[0]))
2431
    sda_dev_m2 = objects.Disk(dev_type="lvm", size=disk_sz,
2432
                              logical_id=(vgname, names[1]))
2433
    md_sda_dev = objects.Disk(dev_type="md_raid1", iv_name = "sda",
2434
                              size=disk_sz,
2435
                              children = [sda_dev_m1, sda_dev_m2])
2436
    sdb_dev_m1 = objects.Disk(dev_type="lvm", size=swap_sz,
2437
                              logical_id=(vgname, names[2]))
2438
    sdb_dev_m2 = objects.Disk(dev_type="lvm", size=swap_sz,
2439
                              logical_id=(vgname, names[3]))
2440
    md_sdb_dev = objects.Disk(dev_type="md_raid1", iv_name = "sdb",
2441
                              size=swap_sz,
2442
                              children = [sdb_dev_m1, sdb_dev_m2])
2443
    disks = [md_sda_dev, md_sdb_dev]
2444
  elif template_name == constants.DT_REMOTE_RAID1:
2445
    if len(secondary_nodes) != 1:
2446
      raise errors.ProgrammerError("Wrong template configuration")
2447
    remote_node = secondary_nodes[0]
2448
    names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2449
                                       ".sdb_data", ".sdb_meta"])
2450
    drbd_sda_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2451
                                         disk_sz, names[0:2])
2452
    md_sda_dev = objects.Disk(dev_type="md_raid1", iv_name="sda",
2453
                              children = [drbd_sda_dev], size=disk_sz)
2454
    drbd_sdb_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2455
                                         swap_sz, names[2:4])
2456
    md_sdb_dev = objects.Disk(dev_type="md_raid1", iv_name="sdb",
2457
                              children = [drbd_sdb_dev], size=swap_sz)
2458
    disks = [md_sda_dev, md_sdb_dev]
2459
  else:
2460
    raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
2461
  return disks
2462

    
2463

    
2464
def _GetInstanceInfoText(instance):
2465
  """Compute that text that should be added to the disk's metadata.
2466

2467
  """
2468
  return "originstname+%s" % instance.name
2469

    
2470

    
2471
def _CreateDisks(cfg, instance):
2472
  """Create all disks for an instance.
2473

2474
  This abstracts away some work from AddInstance.
2475

2476
  Args:
2477
    instance: the instance object
2478

2479
  Returns:
2480
    True or False showing the success of the creation process
2481

2482
  """
2483
  info = _GetInstanceInfoText(instance)
2484

    
2485
  for device in instance.disks:
2486
    logger.Info("creating volume %s for instance %s" %
2487
              (device.iv_name, instance.name))
2488
    #HARDCODE
2489
    for secondary_node in instance.secondary_nodes:
2490
      if not _CreateBlockDevOnSecondary(cfg, secondary_node, device, False,
2491
                                        info):
2492
        logger.Error("failed to create volume %s (%s) on secondary node %s!" %
2493
                     (device.iv_name, device, secondary_node))
2494
        return False
2495
    #HARDCODE
2496
    if not _CreateBlockDevOnPrimary(cfg, instance.primary_node, device, info):
2497
      logger.Error("failed to create volume %s on primary!" %
2498
                   device.iv_name)
2499
      return False
2500
  return True
2501

    
2502

    
2503
def _RemoveDisks(instance, cfg):
2504
  """Remove all disks for an instance.
2505

2506
  This abstracts away some work from `AddInstance()` and
2507
  `RemoveInstance()`. Note that in case some of the devices couldn't
2508
  be remove, the removal will continue with the other ones (compare
2509
  with `_CreateDisks()`).
2510

2511
  Args:
2512
    instance: the instance object
2513

2514
  Returns:
2515
    True or False showing the success of the removal proces
2516

2517
  """
2518
  logger.Info("removing block devices for instance %s" % instance.name)
2519

    
2520
  result = True
2521
  for device in instance.disks:
2522
    for node, disk in device.ComputeNodeTree(instance.primary_node):
2523
      cfg.SetDiskID(disk, node)
2524
      if not rpc.call_blockdev_remove(node, disk):
2525
        logger.Error("could not remove block device %s on node %s,"
2526
                     " continuing anyway" %
2527
                     (device.iv_name, node))
2528
        result = False
2529
  return result
2530

    
2531

    
2532
class LUCreateInstance(LogicalUnit):
2533
  """Create an instance.
2534

2535
  """
2536
  HPATH = "instance-add"
2537
  HTYPE = constants.HTYPE_INSTANCE
2538
  _OP_REQP = ["instance_name", "mem_size", "disk_size", "pnode",
2539
              "disk_template", "swap_size", "mode", "start", "vcpus",
2540
              "wait_for_sync"]
2541

    
2542
  def BuildHooksEnv(self):
2543
    """Build hooks env.
2544

2545
    This runs on master, primary and secondary nodes of the instance.
2546

2547
    """
2548
    env = {
2549
      "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
2550
      "INSTANCE_DISK_SIZE": self.op.disk_size,
2551
      "INSTANCE_SWAP_SIZE": self.op.swap_size,
2552
      "INSTANCE_ADD_MODE": self.op.mode,
2553
      }
2554
    if self.op.mode == constants.INSTANCE_IMPORT:
2555
      env["INSTANCE_SRC_NODE"] = self.op.src_node
2556
      env["INSTANCE_SRC_PATH"] = self.op.src_path
2557
      env["INSTANCE_SRC_IMAGE"] = self.src_image
2558

    
2559
    env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
2560
      primary_node=self.op.pnode,
2561
      secondary_nodes=self.secondaries,
2562
      status=self.instance_status,
2563
      os_type=self.op.os_type,
2564
      memory=self.op.mem_size,
2565
      vcpus=self.op.vcpus,
2566
      nics=[(self.inst_ip, self.op.bridge)],
2567
    ))
2568

    
2569
    nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
2570
          self.secondaries)
2571
    return env, nl, nl
2572

    
2573

    
2574
  def CheckPrereq(self):
2575
    """Check prerequisites.
2576

2577
    """
2578
    if self.op.mode not in (constants.INSTANCE_CREATE,
2579
                            constants.INSTANCE_IMPORT):
2580
      raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
2581
                                 self.op.mode)
2582

    
2583
    if self.op.mode == constants.INSTANCE_IMPORT:
2584
      src_node = getattr(self.op, "src_node", None)
2585
      src_path = getattr(self.op, "src_path", None)
2586
      if src_node is None or src_path is None:
2587
        raise errors.OpPrereqError("Importing an instance requires source"
2588
                                   " node and path options")
2589
      src_node_full = self.cfg.ExpandNodeName(src_node)
2590
      if src_node_full is None:
2591
        raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
2592
      self.op.src_node = src_node = src_node_full
2593

    
2594
      if not os.path.isabs(src_path):
2595
        raise errors.OpPrereqError("The source path must be absolute")
2596

    
2597
      export_info = rpc.call_export_info(src_node, src_path)
2598

    
2599
      if not export_info:
2600
        raise errors.OpPrereqError("No export found in dir %s" % src_path)
2601

    
2602
      if not export_info.has_section(constants.INISECT_EXP):
2603
        raise errors.ProgrammerError("Corrupted export config")
2604

    
2605
      ei_version = export_info.get(constants.INISECT_EXP, 'version')
2606
      if (int(ei_version) != constants.EXPORT_VERSION):
2607
        raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
2608
                                   (ei_version, constants.EXPORT_VERSION))
2609

    
2610
      if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
2611
        raise errors.OpPrereqError("Can't import instance with more than"
2612
                                   " one data disk")
2613

    
2614
      # FIXME: are the old os-es, disk sizes, etc. useful?
2615
      self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
2616
      diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
2617
                                                         'disk0_dump'))
2618
      self.src_image = diskimage
2619
    else: # INSTANCE_CREATE
2620
      if getattr(self.op, "os_type", None) is None:
2621
        raise errors.OpPrereqError("No guest OS specified")
2622

    
2623
    # check primary node
2624
    pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
2625
    if pnode is None:
2626
      raise errors.OpPrereqError("Primary node '%s' is unknown" %
2627
                                 self.op.pnode)
2628
    self.op.pnode = pnode.name
2629
    self.pnode = pnode
2630
    self.secondaries = []
2631
    # disk template and mirror node verification
2632
    if self.op.disk_template not in constants.DISK_TEMPLATES:
2633
      raise errors.OpPrereqError("Invalid disk template name")
2634

    
2635
    if self.op.disk_template == constants.DT_REMOTE_RAID1:
2636
      if getattr(self.op, "snode", None) is None:
2637
        raise errors.OpPrereqError("The 'remote_raid1' disk template needs"
2638
                                   " a mirror node")
2639

    
2640
      snode_name = self.cfg.ExpandNodeName(self.op.snode)
2641
      if snode_name is None:
2642
        raise errors.OpPrereqError("Unknown secondary node '%s'" %
2643
                                   self.op.snode)
2644
      elif snode_name == pnode.name:
2645
        raise errors.OpPrereqError("The secondary node cannot be"
2646
                                   " the primary node.")
2647
      self.secondaries.append(snode_name)
2648

    
2649
    # Check lv size requirements
2650
    nodenames = [pnode.name] + self.secondaries
2651
    nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
2652

    
2653
    # Required free disk space as a function of disk and swap space
2654
    req_size_dict = {
2655
      constants.DT_DISKLESS: 0,
2656
      constants.DT_PLAIN: self.op.disk_size + self.op.swap_size,
2657
      constants.DT_LOCAL_RAID1: (self.op.disk_size + self.op.swap_size) * 2,
2658
      # 256 MB are added for drbd metadata, 128MB for each drbd device
2659
      constants.DT_REMOTE_RAID1: self.op.disk_size + self.op.swap_size + 256,
2660
    }
2661

    
2662
    if self.op.disk_template not in req_size_dict:
2663
      raise errors.ProgrammerError("Disk template '%s' size requirement"
2664
                                   " is unknown" %  self.op.disk_template)
2665

    
2666
    req_size = req_size_dict[self.op.disk_template]
2667

    
2668
    for node in nodenames:
2669
      info = nodeinfo.get(node, None)
2670
      if not info:
2671
        raise errors.OpPrereqError("Cannot get current information"
2672
                                   " from node '%s'" % nodeinfo)
2673
      if req_size > info['vg_free']:
2674
        raise errors.OpPrereqError("Not enough disk space on target node %s."
2675
                                   " %d MB available, %d MB required" %
2676
                                   (node, info['vg_free'], req_size))
2677

    
2678
    # os verification
2679
    os_obj = rpc.call_os_get([pnode.name], self.op.os_type)[pnode.name]
2680
    if not isinstance(os_obj, objects.OS):
2681
      raise errors.OpPrereqError("OS '%s' not in supported os list for"
2682
                                 " primary node"  % self.op.os_type)
2683

    
2684
    # instance verification
2685
    hostname1 = utils.LookupHostname(self.op.instance_name)
2686
    if not hostname1:
2687
      raise errors.OpPrereqError("Instance name '%s' not found in dns" %
2688
                                 self.op.instance_name)
2689

    
2690
    self.op.instance_name = instance_name = hostname1['hostname']
2691
    instance_list = self.cfg.GetInstanceList()
2692
    if instance_name in instance_list:
2693
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2694
                                 instance_name)
2695

    
2696
    ip = getattr(self.op, "ip", None)
2697
    if ip is None or ip.lower() == "none":
2698
      inst_ip = None
2699
    elif ip.lower() == "auto":
2700
      inst_ip = hostname1['ip']
2701
    else:
2702
      if not utils.IsValidIP(ip):
2703
        raise errors.OpPrereqError("given IP address '%s' doesn't look"
2704
                                   " like a valid IP" % ip)
2705
      inst_ip = ip
2706
    self.inst_ip = inst_ip
2707

    
2708
    command = ["fping", "-q", hostname1['ip']]
2709
    result = utils.RunCmd(command)
2710
    if not result.failed:
2711
      raise errors.OpPrereqError("IP %s of instance %s already in use" %
2712
                                 (hostname1['ip'], instance_name))
2713

    
2714
    # bridge verification
2715
    bridge = getattr(self.op, "bridge", None)
2716
    if bridge is None:
2717
      self.op.bridge = self.cfg.GetDefBridge()
2718
    else:
2719
      self.op.bridge = bridge
2720

    
2721
    if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
2722
      raise errors.OpPrereqError("target bridge '%s' does not exist on"
2723
                                 " destination node '%s'" %
2724
                                 (self.op.bridge, pnode.name))
2725

    
2726
    if self.op.start:
2727
      self.instance_status = 'up'
2728
    else:
2729
      self.instance_status = 'down'
2730

    
2731
  def Exec(self, feedback_fn):
2732
    """Create and add the instance to the cluster.
2733

2734
    """
2735
    instance = self.op.instance_name
2736
    pnode_name = self.pnode.name
2737

    
2738
    nic = objects.NIC(bridge=self.op.bridge, mac=self.cfg.GenerateMAC())
2739
    if self.inst_ip is not None:
2740
      nic.ip = self.inst_ip
2741

    
2742
    disks = _GenerateDiskTemplate(self.cfg,
2743
                                  self.op.disk_template,
2744
                                  instance, pnode_name,
2745
                                  self.secondaries, self.op.disk_size,
2746
                                  self.op.swap_size)
2747

    
2748
    iobj = objects.Instance(name=instance, os=self.op.os_type,
2749
                            primary_node=pnode_name,
2750
                            memory=self.op.mem_size,
2751
                            vcpus=self.op.vcpus,
2752
                            nics=[nic], disks=disks,
2753
                            disk_template=self.op.disk_template,
2754
                            status=self.instance_status,
2755
                            )
2756

    
2757
    feedback_fn("* creating instance disks...")
2758
    if not _CreateDisks(self.cfg, iobj):
2759
      _RemoveDisks(iobj, self.cfg)
2760
      raise errors.OpExecError("Device creation failed, reverting...")
2761

    
2762
    feedback_fn("adding instance %s to cluster config" % instance)
2763

    
2764
    self.cfg.AddInstance(iobj)
2765

    
2766
    if self.op.wait_for_sync:
2767
      disk_abort = not _WaitForSync(self.cfg, iobj)
2768
    elif iobj.disk_template == constants.DT_REMOTE_RAID1:
2769
      # make sure the disks are not degraded (still sync-ing is ok)
2770
      time.sleep(15)
2771
      feedback_fn("* checking mirrors status")
2772
      disk_abort = not _WaitForSync(self.cfg, iobj, oneshot=True)
2773
    else:
2774
      disk_abort = False
2775

    
2776
    if disk_abort:
2777
      _RemoveDisks(iobj, self.cfg)
2778
      self.cfg.RemoveInstance(iobj.name)
2779
      raise errors.OpExecError("There are some degraded disks for"
2780
                               " this instance")
2781

    
2782
    feedback_fn("creating os for instance %s on node %s" %
2783
                (instance, pnode_name))
2784

    
2785
    if iobj.disk_template != constants.DT_DISKLESS:
2786
      if self.op.mode == constants.INSTANCE_CREATE:
2787
        feedback_fn("* running the instance OS create scripts...")
2788
        if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
2789
          raise errors.OpExecError("could not add os for instance %s"
2790
                                   " on node %s" %
2791
                                   (instance, pnode_name))
2792

    
2793
      elif self.op.mode == constants.INSTANCE_IMPORT:
2794
        feedback_fn("* running the instance OS import scripts...")
2795
        src_node = self.op.src_node
2796
        src_image = self.src_image
2797
        if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
2798
                                                src_node, src_image):
2799
          raise errors.OpExecError("Could not import os for instance"
2800
                                   " %s on node %s" %
2801
                                   (instance, pnode_name))
2802
      else:
2803
        # also checked in the prereq part
2804
        raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
2805
                                     % self.op.mode)
2806

    
2807
    if self.op.start:
2808
      logger.Info("starting instance %s on node %s" % (instance, pnode_name))
2809
      feedback_fn("* starting instance...")
2810
      if not rpc.call_instance_start(pnode_name, iobj, None):
2811
        raise errors.OpExecError("Could not start instance")
2812

    
2813

    
2814
class LUConnectConsole(NoHooksLU):
2815
  """Connect to an instance's console.
2816

2817
  This is somewhat special in that it returns the command line that
2818
  you need to run on the master node in order to connect to the
2819
  console.
2820

2821
  """
2822
  _OP_REQP = ["instance_name"]
2823

    
2824
  def CheckPrereq(self):
2825
    """Check prerequisites.
2826

2827
    This checks that the instance is in the cluster.
2828

2829
    """
2830
    instance = self.cfg.GetInstanceInfo(
2831
      self.cfg.ExpandInstanceName(self.op.instance_name))
2832
    if instance is None:
2833
      raise errors.OpPrereqError("Instance '%s' not known" %
2834
                                 self.op.instance_name)
2835
    self.instance = instance
2836

    
2837
  def Exec(self, feedback_fn):
2838
    """Connect to the console of an instance
2839

2840
    """
2841
    instance = self.instance
2842
    node = instance.primary_node
2843

    
2844
    node_insts = rpc.call_instance_list([node])[node]
2845
    if node_insts is False:
2846
      raise errors.OpExecError("Can't connect to node %s." % node)
2847

    
2848
    if instance.name not in node_insts:
2849
      raise errors.OpExecError("Instance %s is not running." % instance.name)
2850

    
2851
    logger.Debug("connecting to console of %s on %s" % (instance.name, node))
2852

    
2853
    hyper = hypervisor.GetHypervisor()
2854
    console_cmd = hyper.GetShellCommandForConsole(instance.name)
2855
    # build ssh cmdline
2856
    argv = ["ssh", "-q", "-t"]
2857
    argv.extend(ssh.KNOWN_HOSTS_OPTS)
2858
    argv.extend(ssh.BATCH_MODE_OPTS)
2859
    argv.append(node)
2860
    argv.append(console_cmd)
2861
    return "ssh", argv
2862

    
2863

    
2864
class LUAddMDDRBDComponent(LogicalUnit):
2865
  """Adda new mirror member to an instance's disk.
2866

2867
  """
2868
  HPATH = "mirror-add"
2869
  HTYPE = constants.HTYPE_INSTANCE
2870
  _OP_REQP = ["instance_name", "remote_node", "disk_name"]
2871

    
2872
  def BuildHooksEnv(self):
2873
    """Build hooks env.
2874

2875
    This runs on the master, the primary and all the secondaries.
2876

2877
    """
2878
    env = {
2879
      "NEW_SECONDARY": self.op.remote_node,
2880
      "DISK_NAME": self.op.disk_name,
2881
      }
2882
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2883
    nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
2884
          self.op.remote_node,] + list(self.instance.secondary_nodes)
2885
    return env, nl, nl
2886

    
2887
  def CheckPrereq(self):
2888
    """Check prerequisites.
2889

2890
    This checks that the instance is in the cluster.
2891

2892
    """
2893
    instance = self.cfg.GetInstanceInfo(
2894
      self.cfg.ExpandInstanceName(self.op.instance_name))
2895
    if instance is None:
2896
      raise errors.OpPrereqError("Instance '%s' not known" %
2897
                                 self.op.instance_name)
2898
    self.instance = instance
2899

    
2900
    remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
2901
    if remote_node is None:
2902
      raise errors.OpPrereqError("Node '%s' not known" % self.op.remote_node)
2903
    self.remote_node = remote_node
2904

    
2905
    if remote_node == instance.primary_node:
2906
      raise errors.OpPrereqError("The specified node is the primary node of"
2907
                                 " the instance.")
2908

    
2909
    if instance.disk_template != constants.DT_REMOTE_RAID1:
2910
      raise errors.OpPrereqError("Instance's disk layout is not"
2911
                                 " remote_raid1.")
2912
    for disk in instance.disks:
2913
      if disk.iv_name == self.op.disk_name:
2914
        break
2915
    else:
2916
      raise errors.OpPrereqError("Can't find this device ('%s') in the"
2917
                                 " instance." % self.op.disk_name)
2918
    if len(disk.children) > 1:
2919
      raise errors.OpPrereqError("The device already has two slave"
2920
                                 " devices.\n"
2921
                                 "This would create a 3-disk raid1"
2922
                                 " which we don't allow.")
2923
    self.disk = disk
2924

    
2925
  def Exec(self, feedback_fn):
2926
    """Add the mirror component
2927

2928
    """
2929
    disk = self.disk
2930
    instance = self.instance
2931

    
2932
    remote_node = self.remote_node
2933
    lv_names = [".%s_%s" % (disk.iv_name, suf) for suf in ["data", "meta"]]
2934
    names = _GenerateUniqueNames(self.cfg, lv_names)
2935
    new_drbd = _GenerateMDDRBDBranch(self.cfg, instance.primary_node,
2936
                                     remote_node, disk.size, names)
2937

    
2938
    logger.Info("adding new mirror component on secondary")
2939
    #HARDCODE
2940
    if not _CreateBlockDevOnSecondary(self.cfg, remote_node, new_drbd, False,
2941
                                      _GetInstanceInfoText(instance)):
2942
      raise errors.OpExecError("Failed to create new component on secondary"
2943
                               " node %s" % remote_node)
2944

    
2945
    logger.Info("adding new mirror component on primary")
2946
    #HARDCODE
2947
    if not _CreateBlockDevOnPrimary(self.cfg, instance.primary_node, new_drbd,
2948
                                    _GetInstanceInfoText(instance)):
2949
      # remove secondary dev
2950
      self.cfg.SetDiskID(new_drbd, remote_node)
2951
      rpc.call_blockdev_remove(remote_node, new_drbd)
2952
      raise errors.OpExecError("Failed to create volume on primary")
2953

    
2954
    # the device exists now
2955
    # call the primary node to add the mirror to md
2956
    logger.Info("adding new mirror component to md")
2957
    if not rpc.call_blockdev_addchild(instance.primary_node,
2958
                                           disk, new_drbd):
2959
      logger.Error("Can't add mirror compoment to md!")
2960
      self.cfg.SetDiskID(new_drbd, remote_node)
2961
      if not rpc.call_blockdev_remove(remote_node, new_drbd):
2962
        logger.Error("Can't rollback on secondary")
2963
      self.cfg.SetDiskID(new_drbd, instance.primary_node)
2964
      if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
2965
        logger.Error("Can't rollback on primary")
2966
      raise errors.OpExecError("Can't add mirror component to md array")
2967

    
2968
    disk.children.append(new_drbd)
2969

    
2970
    self.cfg.AddInstance(instance)
2971

    
2972
    _WaitForSync(self.cfg, instance)
2973

    
2974
    return 0
2975

    
2976

    
2977
class LURemoveMDDRBDComponent(LogicalUnit):
2978
  """Remove a component from a remote_raid1 disk.
2979

2980
  """
2981
  HPATH = "mirror-remove"
2982
  HTYPE = constants.HTYPE_INSTANCE
2983
  _OP_REQP = ["instance_name", "disk_name", "disk_id"]
2984

    
2985
  def BuildHooksEnv(self):
2986
    """Build hooks env.
2987

2988
    This runs on the master, the primary and all the secondaries.
2989

2990
    """
2991
    env = {
2992
      "DISK_NAME": self.op.disk_name,
2993
      "DISK_ID": self.op.disk_id,
2994
      "OLD_SECONDARY": self.old_secondary,
2995
      }
2996
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2997
    nl = [self.sstore.GetMasterNode(),
2998
          self.instance.primary_node] + list(self.instance.secondary_nodes)
2999
    return env, nl, nl
3000

    
3001
  def CheckPrereq(self):
3002
    """Check prerequisites.
3003

3004
    This checks that the instance is in the cluster.
3005

3006
    """
3007
    instance = self.cfg.GetInstanceInfo(
3008
      self.cfg.ExpandInstanceName(self.op.instance_name))
3009
    if instance is None:
3010
      raise errors.OpPrereqError("Instance '%s' not known" %
3011
                                 self.op.instance_name)
3012
    self.instance = instance
3013

    
3014
    if instance.disk_template != constants.DT_REMOTE_RAID1:
3015
      raise errors.OpPrereqError("Instance's disk layout is not"
3016
                                 " remote_raid1.")
3017
    for disk in instance.disks:
3018
      if disk.iv_name == self.op.disk_name:
3019
        break
3020
    else:
3021
      raise errors.OpPrereqError("Can't find this device ('%s') in the"
3022
                                 " instance." % self.op.disk_name)
3023
    for child in disk.children:
3024
      if child.dev_type == "drbd" and child.logical_id[2] == self.op.disk_id:
3025
        break
3026
    else:
3027
      raise errors.OpPrereqError("Can't find the device with this port.")
3028

    
3029
    if len(disk.children) < 2:
3030
      raise errors.OpPrereqError("Cannot remove the last component from"
3031
                                 " a mirror.")
3032
    self.disk = disk
3033
    self.child = child
3034
    if self.child.logical_id[0] == instance.primary_node:
3035
      oid = 1
3036
    else:
3037
      oid = 0
3038
    self.old_secondary = self.child.logical_id[oid]
3039

    
3040
  def Exec(self, feedback_fn):
3041
    """Remove the mirror component
3042

3043
    """
3044
    instance = self.instance
3045
    disk = self.disk
3046
    child = self.child
3047
    logger.Info("remove mirror component")
3048
    self.cfg.SetDiskID(disk, instance.primary_node)
3049
    if not rpc.call_blockdev_removechild(instance.primary_node,
3050
                                              disk, child):
3051
      raise errors.OpExecError("Can't remove child from mirror.")
3052

    
3053
    for node in child.logical_id[:2]:
3054
      self.cfg.SetDiskID(child, node)
3055
      if not rpc.call_blockdev_remove(node, child):
3056
        logger.Error("Warning: failed to remove device from node %s,"
3057
                     " continuing operation." % node)
3058

    
3059
    disk.children.remove(child)
3060
    self.cfg.AddInstance(instance)
3061

    
3062

    
3063
class LUReplaceDisks(LogicalUnit):
3064
  """Replace the disks of an instance.
3065

3066
  """
3067
  HPATH = "mirrors-replace"
3068
  HTYPE = constants.HTYPE_INSTANCE
3069
  _OP_REQP = ["instance_name"]
3070

    
3071
  def BuildHooksEnv(self):
3072
    """Build hooks env.
3073

3074
    This runs on the master, the primary and all the secondaries.
3075

3076
    """
3077
    env = {
3078
      "NEW_SECONDARY": self.op.remote_node,
3079
      "OLD_SECONDARY": self.instance.secondary_nodes[0],
3080
      }
3081
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3082
    nl = [self.sstore.GetMasterNode(),
3083
          self.instance.primary_node] + list(self.instance.secondary_nodes)
3084
    return env, nl, nl
3085

    
3086
  def CheckPrereq(self):
3087
    """Check prerequisites.
3088

3089
    This checks that the instance is in the cluster.
3090

3091
    """
3092
    instance = self.cfg.GetInstanceInfo(
3093
      self.cfg.ExpandInstanceName(self.op.instance_name))
3094
    if instance is None:
3095
      raise errors.OpPrereqError("Instance '%s' not known" %
3096
                                 self.op.instance_name)
3097
    self.instance = instance
3098

    
3099
    if instance.disk_template != constants.DT_REMOTE_RAID1:
3100
      raise errors.OpPrereqError("Instance's disk layout is not"
3101
                                 " remote_raid1.")
3102

    
3103
    if len(instance.secondary_nodes) != 1:
3104
      raise errors.OpPrereqError("The instance has a strange layout,"
3105
                                 " expected one secondary but found %d" %
3106
                                 len(instance.secondary_nodes))
3107

    
3108
    remote_node = getattr(self.op, "remote_node", None)
3109
    if remote_node is None:
3110
      remote_node = instance.secondary_nodes[0]
3111
    else:
3112
      remote_node = self.cfg.ExpandNodeName(remote_node)
3113
      if remote_node is None:
3114
        raise errors.OpPrereqError("Node '%s' not known" %
3115
                                   self.op.remote_node)
3116
    if remote_node == instance.primary_node:
3117
      raise errors.OpPrereqError("The specified node is the primary node of"
3118
                                 " the instance.")
3119
    self.op.remote_node = remote_node
3120

    
3121
  def Exec(self, feedback_fn):
3122
    """Replace the disks of an instance.
3123

3124
    """
3125
    instance = self.instance
3126
    iv_names = {}
3127
    # start of work
3128
    remote_node = self.op.remote_node
3129
    cfg = self.cfg
3130
    for dev in instance.disks:
3131
      size = dev.size
3132
      lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3133
      names = _GenerateUniqueNames(cfg, lv_names)
3134
      new_drbd = _GenerateMDDRBDBranch(cfg, instance.primary_node,
3135
                                       remote_node, size, names)
3136
      iv_names[dev.iv_name] = (dev, dev.children[0], new_drbd)
3137
      logger.Info("adding new mirror component on secondary for %s" %
3138
                  dev.iv_name)
3139
      #HARDCODE
3140
      if not _CreateBlockDevOnSecondary(cfg, remote_node, new_drbd, False,
3141
                                        _GetInstanceInfoText(instance)):
3142
        raise errors.OpExecError("Failed to create new component on"
3143
                                 " secondary node %s\n"
3144
                                 "Full abort, cleanup manually!" %
3145
                                 remote_node)
3146

    
3147
      logger.Info("adding new mirror component on primary")
3148
      #HARDCODE
3149
      if not _CreateBlockDevOnPrimary(cfg, instance.primary_node, new_drbd,
3150
                                      _GetInstanceInfoText(instance)):
3151
        # remove secondary dev
3152
        cfg.SetDiskID(new_drbd, remote_node)
3153
        rpc.call_blockdev_remove(remote_node, new_drbd)
3154
        raise errors.OpExecError("Failed to create volume on primary!\n"
3155
                                 "Full abort, cleanup manually!!")
3156

    
3157
      # the device exists now
3158
      # call the primary node to add the mirror to md
3159
      logger.Info("adding new mirror component to md")
3160
      if not rpc.call_blockdev_addchild(instance.primary_node, dev,
3161
                                        new_drbd):
3162
        logger.Error("Can't add mirror compoment to md!")
3163
        cfg.SetDiskID(new_drbd, remote_node)
3164
        if not rpc.call_blockdev_remove(remote_node, new_drbd):
3165
          logger.Error("Can't rollback on secondary")
3166
        cfg.SetDiskID(new_drbd, instance.primary_node)
3167
        if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3168
          logger.Error("Can't rollback on primary")
3169
        raise errors.OpExecError("Full abort, cleanup manually!!")
3170

    
3171
      dev.children.append(new_drbd)
3172
      cfg.AddInstance(instance)
3173

    
3174
    # this can fail as the old devices are degraded and _WaitForSync
3175
    # does a combined result over all disks, so we don't check its
3176
    # return value
3177
    _WaitForSync(cfg, instance, unlock=True)
3178

    
3179
    # so check manually all the devices
3180
    for name in iv_names:
3181
      dev, child, new_drbd = iv_names[name]
3182
      cfg.SetDiskID(dev, instance.primary_node)
3183
      is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3184
      if is_degr:
3185
        raise errors.OpExecError("MD device %s is degraded!" % name)
3186
      cfg.SetDiskID(new_drbd, instance.primary_node)
3187
      is_degr = rpc.call_blockdev_find(instance.primary_node, new_drbd)[5]
3188
      if is_degr:
3189
        raise errors.OpExecError("New drbd device %s is degraded!" % name)
3190

    
3191
    for name in iv_names:
3192
      dev, child, new_drbd = iv_names[name]
3193
      logger.Info("remove mirror %s component" % name)
3194
      cfg.SetDiskID(dev, instance.primary_node)
3195
      if not rpc.call_blockdev_removechild(instance.primary_node,
3196
                                                dev, child):
3197
        logger.Error("Can't remove child from mirror, aborting"
3198
                     " *this device cleanup*.\nYou need to cleanup manually!!")
3199
        continue
3200

    
3201
      for node in child.logical_id[:2]:
3202
        logger.Info("remove child device on %s" % node)
3203
        cfg.SetDiskID(child, node)
3204
        if not rpc.call_blockdev_remove(node, child):
3205
          logger.Error("Warning: failed to remove device from node %s,"
3206
                       " continuing operation." % node)
3207

    
3208
      dev.children.remove(child)
3209

    
3210
      cfg.AddInstance(instance)
3211

    
3212

    
3213
class LUQueryInstanceData(NoHooksLU):
3214
  """Query runtime instance data.
3215

3216
  """
3217
  _OP_REQP = ["instances"]
3218

    
3219
  def CheckPrereq(self):
3220
    """Check prerequisites.
3221

3222
    This only checks the optional instance list against the existing names.
3223

3224
    """
3225
    if not isinstance(self.op.instances, list):
3226
      raise errors.OpPrereqError("Invalid argument type 'instances'")
3227
    if self.op.instances:
3228
      self.wanted_instances = []
3229
      names = self.op.instances
3230
      for name in names:
3231
        instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
3232
        if instance is None:
3233
          raise errors.OpPrereqError("No such instance name '%s'" % name)
3234
      self.wanted_instances.append(instance)
3235
    else:
3236
      self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
3237
                               in self.cfg.GetInstanceList()]
3238
    return
3239

    
3240

    
3241
  def _ComputeDiskStatus(self, instance, snode, dev):
3242
    """Compute block device status.
3243

3244
    """
3245
    self.cfg.SetDiskID(dev, instance.primary_node)
3246
    dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
3247
    if dev.dev_type == "drbd":
3248
      # we change the snode then (otherwise we use the one passed in)
3249
      if dev.logical_id[0] == instance.primary_node:
3250
        snode = dev.logical_id[1]
3251
      else:
3252
        snode = dev.logical_id[0]
3253

    
3254
    if snode:
3255
      self.cfg.SetDiskID(dev, snode)
3256
      dev_sstatus = rpc.call_blockdev_find(snode, dev)
3257
    else:
3258
      dev_sstatus = None
3259

    
3260
    if dev.children:
3261
      dev_children = [self._ComputeDiskStatus(instance, snode, child)
3262
                      for child in dev.children]
3263
    else:
3264
      dev_children = []
3265

    
3266
    data = {
3267
      "iv_name": dev.iv_name,
3268
      "dev_type": dev.dev_type,
3269
      "logical_id": dev.logical_id,
3270
      "physical_id": dev.physical_id,
3271
      "pstatus": dev_pstatus,
3272
      "sstatus": dev_sstatus,
3273
      "children": dev_children,
3274
      }
3275

    
3276
    return data
3277

    
3278
  def Exec(self, feedback_fn):
3279
    """Gather and return data"""
3280
    result = {}
3281
    for instance in self.wanted_instances:
3282
      remote_info = rpc.call_instance_info(instance.primary_node,
3283
                                                instance.name)
3284
      if remote_info and "state" in remote_info:
3285
        remote_state = "up"
3286
      else:
3287
        remote_state = "down"
3288
      if instance.status == "down":
3289
        config_state = "down"
3290
      else:
3291
        config_state = "up"
3292

    
3293
      disks = [self._ComputeDiskStatus(instance, None, device)
3294
               for device in instance.disks]
3295

    
3296
      idict = {
3297
        "name": instance.name,
3298
        "config_state": config_state,
3299
        "run_state": remote_state,
3300
        "pnode": instance.primary_node,
3301
        "snodes": instance.secondary_nodes,
3302
        "os": instance.os,
3303
        "memory": instance.memory,
3304
        "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
3305
        "disks": disks,
3306
        }
3307

    
3308
      result[instance.name] = idict
3309

    
3310
    return result
3311

    
3312

    
3313
class LUSetInstanceParms(LogicalUnit):
3314
  """Modifies an instances's parameters.
3315

3316
  """
3317
  HPATH = "instance-modify"
3318
  HTYPE = constants.HTYPE_INSTANCE
3319
  _OP_REQP = ["instance_name"]
3320

    
3321
  def BuildHooksEnv(self):
3322
    """Build hooks env.
3323

3324
    This runs on the master, primary and secondaries.
3325

3326
    """
3327
    args = dict()
3328
    if self.mem:
3329
      args['memory'] = self.mem
3330
    if self.vcpus:
3331
      args['vcpus'] = self.vcpus
3332
    if self.do_ip or self.do_bridge:
3333
      if self.do_ip:
3334
        ip = self.ip
3335
      else:
3336
        ip = self.instance.nics[0].ip
3337
      if self.bridge:
3338
        bridge = self.bridge
3339
      else:
3340
        bridge = self.instance.nics[0].bridge
3341
      args['nics'] = [(ip, bridge)]
3342
    env = _BuildInstanceHookEnvByObject(self.instance, override=args)
3343
    nl = [self.sstore.GetMasterNode(),
3344
          self.instance.primary_node] + list(self.instance.secondary_nodes)
3345
    return env, nl, nl
3346

    
3347
  def CheckPrereq(self):
3348
    """Check prerequisites.
3349

3350
    This only checks the instance list against the existing names.
3351

3352
    """
3353
    self.mem = getattr(self.op, "mem", None)
3354
    self.vcpus = getattr(self.op, "vcpus", None)
3355
    self.ip = getattr(self.op, "ip", None)
3356
    self.bridge = getattr(self.op, "bridge", None)
3357
    if [self.mem, self.vcpus, self.ip, self.bridge].count(None) == 4:
3358
      raise errors.OpPrereqError("No changes submitted")
3359
    if self.mem is not None:
3360
      try:
3361
        self.mem = int(self.mem)
3362
      except ValueError, err:
3363
        raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
3364
    if self.vcpus is not None:
3365
      try:
3366
        self.vcpus = int(self.vcpus)
3367
      except ValueError, err:
3368
        raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
3369
    if self.ip is not None:
3370
      self.do_ip = True
3371
      if self.ip.lower() == "none":
3372
        self.ip = None
3373
      else:
3374
        if not utils.IsValidIP(self.ip):
3375
          raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
3376
    else:
3377
      self.do_ip = False
3378
    self.do_bridge = (self.bridge is not None)
3379

    
3380
    instance = self.cfg.GetInstanceInfo(
3381
      self.cfg.ExpandInstanceName(self.op.instance_name))
3382
    if instance is None:
3383
      raise errors.OpPrereqError("No such instance name '%s'" %
3384
                                 self.op.instance_name)
3385
    self.op.instance_name = instance.name
3386
    self.instance = instance
3387
    return
3388

    
3389
  def Exec(self, feedback_fn):
3390
    """Modifies an instance.
3391

3392
    All parameters take effect only at the next restart of the instance.
3393
    """
3394
    result = []
3395
    instance = self.instance
3396
    if self.mem:
3397
      instance.memory = self.mem
3398
      result.append(("mem", self.mem))
3399
    if self.vcpus:
3400
      instance.vcpus = self.vcpus
3401
      result.append(("vcpus",  self.vcpus))
3402
    if self.do_ip:
3403
      instance.nics[0].ip = self.ip
3404
      result.append(("ip", self.ip))
3405
    if self.bridge:
3406
      instance.nics[0].bridge = self.bridge
3407
      result.append(("bridge", self.bridge))
3408

    
3409
    self.cfg.AddInstance(instance)
3410

    
3411
    return result
3412

    
3413

    
3414
class LUQueryExports(NoHooksLU):
3415
  """Query the exports list
3416

3417
  """
3418
  _OP_REQP = []
3419

    
3420
  def CheckPrereq(self):
3421
    """Check that the nodelist contains only existing nodes.
3422

3423
    """
3424
    self.nodes = _GetWantedNodes(self, getattr(self.op, "nodes", None))
3425

    
3426
  def Exec(self, feedback_fn):
3427
    """Compute the list of all the exported system images.
3428

3429
    Returns:
3430
      a dictionary with the structure node->(export-list)
3431
      where export-list is a list of the instances exported on
3432
      that node.
3433

3434
    """
3435
    return rpc.call_export_list(self.nodes)
3436

    
3437

    
3438
class LUExportInstance(LogicalUnit):
3439
  """Export an instance to an image in the cluster.
3440

3441
  """
3442
  HPATH = "instance-export"
3443
  HTYPE = constants.HTYPE_INSTANCE
3444
  _OP_REQP = ["instance_name", "target_node", "shutdown"]
3445

    
3446
  def BuildHooksEnv(self):
3447
    """Build hooks env.
3448

3449
    This will run on the master, primary node and target node.
3450

3451
    """
3452
    env = {
3453
      "EXPORT_NODE": self.op.target_node,
3454
      "EXPORT_DO_SHUTDOWN": self.op.shutdown,
3455
      }
3456
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3457
    nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
3458
          self.op.target_node]
3459
    return env, nl, nl
3460

    
3461
  def CheckPrereq(self):
3462
    """Check prerequisites.
3463

3464
    This checks that the instance name is a valid one.
3465

3466
    """
3467
    instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
3468
    self.instance = self.cfg.GetInstanceInfo(instance_name)
3469
    if self.instance is None:
3470
      raise errors.OpPrereqError("Instance '%s' not found" %
3471
                                 self.op.instance_name)
3472

    
3473
    # node verification
3474
    dst_node_short = self.cfg.ExpandNodeName(self.op.target_node)
3475
    self.dst_node = self.cfg.GetNodeInfo(dst_node_short)
3476

    
3477
    if self.dst_node is None:
3478
      raise errors.OpPrereqError("Destination node '%s' is unknown." %
3479
                                 self.op.target_node)
3480
    self.op.target_node = self.dst_node.name
3481

    
3482
  def Exec(self, feedback_fn):
3483
    """Export an instance to an image in the cluster.
3484

3485
    """
3486
    instance = self.instance
3487
    dst_node = self.dst_node
3488
    src_node = instance.primary_node
3489
    # shutdown the instance, unless requested not to do so
3490
    if self.op.shutdown:
3491
      op = opcodes.OpShutdownInstance(instance_name=instance.name)
3492
      self.processor.ChainOpCode(op, feedback_fn)
3493

    
3494
    vgname = self.cfg.GetVGName()
3495

    
3496
    snap_disks = []
3497

    
3498
    try:
3499
      for disk in instance.disks:
3500
        if disk.iv_name == "sda":
3501
          # new_dev_name will be a snapshot of an lvm leaf of the one we passed
3502
          new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
3503

    
3504
          if not new_dev_name:
3505
            logger.Error("could not snapshot block device %s on node %s" %
3506
                         (disk.logical_id[1], src_node))
3507
          else:
3508
            new_dev = objects.Disk(dev_type="lvm", size=disk.size,
3509
                                      logical_id=(vgname, new_dev_name),
3510
                                      physical_id=(vgname, new_dev_name),
3511
                                      iv_name=disk.iv_name)
3512
            snap_disks.append(new_dev)
3513

    
3514
    finally:
3515
      if self.op.shutdown:
3516
        op = opcodes.OpStartupInstance(instance_name=instance.name,
3517
                                       force=False)
3518
        self.processor.ChainOpCode(op, feedback_fn)
3519

    
3520
    # TODO: check for size
3521

    
3522
    for dev in snap_disks:
3523
      if not rpc.call_snapshot_export(src_node, dev, dst_node.name,
3524
                                           instance):
3525
        logger.Error("could not export block device %s from node"
3526
                     " %s to node %s" %
3527
                     (dev.logical_id[1], src_node, dst_node.name))
3528
      if not rpc.call_blockdev_remove(src_node, dev):
3529
        logger.Error("could not remove snapshot block device %s from"
3530
                     " node %s" % (dev.logical_id[1], src_node))
3531

    
3532
    if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
3533
      logger.Error("could not finalize export for instance %s on node %s" %
3534
                   (instance.name, dst_node.name))
3535

    
3536
    nodelist = self.cfg.GetNodeList()
3537
    nodelist.remove(dst_node.name)
3538

    
3539
    # on one-node clusters nodelist will be empty after the removal
3540
    # if we proceed the backup would be removed because OpQueryExports
3541
    # substitutes an empty list with the full cluster node list.
3542
    if nodelist:
3543
      op = opcodes.OpQueryExports(nodes=nodelist)
3544
      exportlist = self.processor.ChainOpCode(op, feedback_fn)
3545
      for node in exportlist:
3546
        if instance.name in exportlist[node]:
3547
          if not rpc.call_export_remove(node, instance.name):
3548
            logger.Error("could not remove older export for instance %s"
3549
                         " on node %s" % (instance.name, node))
3550

    
3551

    
3552
class TagsLU(NoHooksLU):
3553
  """Generic tags LU.
3554

3555
  This is an abstract class which is the parent of all the other tags LUs.
3556

3557
  """
3558
  def CheckPrereq(self):
3559
    """Check prerequisites.
3560

3561
    """
3562
    if self.op.kind == constants.TAG_CLUSTER:
3563
      self.target = self.cfg.GetClusterInfo()
3564
    elif self.op.kind == constants.TAG_NODE:
3565
      name = self.cfg.ExpandNodeName(self.op.name)
3566
      if name is None:
3567
        raise errors.OpPrereqError("Invalid node name (%s)" %
3568
                                   (self.op.name,))
3569
      self.op.name = name
3570
      self.target = self.cfg.GetNodeInfo(name)
3571
    elif self.op.kind == constants.TAG_INSTANCE:
3572
      name = self.cfg.ExpandInstanceName(name)
3573
      if name is None:
3574
        raise errors.OpPrereqError("Invalid instance name (%s)" %
3575
                                   (self.op.name,))
3576
      self.op.name = name
3577
      self.target = self.cfg.GetInstanceInfo(name)
3578
    else:
3579
      raise errors.OpPrereqError("Wrong tag type requested (%s)" %
3580
                                 str(self.op.kind))
3581

    
3582

    
3583
class LUGetTags(TagsLU):
3584
  """Returns the tags of a given object.
3585

3586
  """
3587
  _OP_REQP = ["kind", "name"]
3588

    
3589
  def Exec(self, feedback_fn):
3590
    """Returns the tag list.
3591

3592
    """
3593
    return self.target.GetTags()
3594

    
3595

    
3596
class LUAddTag(TagsLU):
3597
  """Sets a tag on a given object.
3598

3599
  """
3600
  _OP_REQP = ["kind", "name", "tag"]
3601

    
3602
  def CheckPrereq(self):
3603
    """Check prerequisites.
3604

3605
    This checks the type and length of the tag name and value.
3606

3607
    """
3608
    TagsLU.CheckPrereq(self)
3609
    objects.TaggableObject.ValidateTag(self.op.tag)
3610

    
3611
  def Exec(self, feedback_fn):
3612
    """Sets the tag.
3613

3614
    """
3615
    try:
3616
      self.target.AddTag(self.op.tag)
3617
    except errors.TagError, err:
3618
      raise errors.OpExecError("Error while setting tag: %s" % str(err))
3619
    try:
3620
      self.cfg.Update(self.target)
3621
    except errors.ConfigurationError:
3622
      raise errors.OpRetryError("There has been a modification to the"
3623
                                " config file and the operation has been"
3624
                                " aborted. Please retry.")
3625

    
3626

    
3627
class LUDelTag(TagsLU):
3628
  """Delete a tag from a given object.
3629

3630
  """
3631
  _OP_REQP = ["kind", "name", "tag"]
3632

    
3633
  def CheckPrereq(self):
3634
    """Check prerequisites.
3635

3636
    This checks that we have the given tag.
3637

3638
    """
3639
    TagsLU.CheckPrereq(self)
3640
    objects.TaggableObject.ValidateTag(self.op.tag)
3641
    if self.op.tag not in self.target.GetTags():
3642
      raise errors.OpPrereqError("Tag not found")
3643

    
3644
  def Exec(self, feedback_fn):
3645
    """Remove the tag from the object.
3646

3647
    """
3648
    self.target.RemoveTag(self.op.tag)
3649
    try:
3650
      self.cfg.Update(self.target)
3651
    except errors.ConfigurationError:
3652
      raise errors.OpRetryError("There has been a modification to the"
3653
                                " config file and the operation has been"
3654
                                " aborted. Please retry.")