Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ decd5f45

History | View | Annotate | Download (118.9 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0613,W0201
25

    
26
import os
27
import os.path
28
import sha
29
import socket
30
import time
31
import tempfile
32
import re
33
import platform
34

    
35
from ganeti import rpc
36
from ganeti import ssh
37
from ganeti import logger
38
from ganeti import utils
39
from ganeti import errors
40
from ganeti import hypervisor
41
from ganeti import config
42
from ganeti import constants
43
from ganeti import objects
44
from ganeti import opcodes
45
from ganeti import ssconf
46

    
47
class LogicalUnit(object):
48
  """Logical Unit base class.
49

50
  Subclasses must follow these rules:
51
    - implement CheckPrereq which also fills in the opcode instance
52
      with all the fields (even if as None)
53
    - implement Exec
54
    - implement BuildHooksEnv
55
    - redefine HPATH and HTYPE
56
    - optionally redefine their run requirements (REQ_CLUSTER,
57
      REQ_MASTER); note that all commands require root permissions
58

59
  """
60
  HPATH = None
61
  HTYPE = None
62
  _OP_REQP = []
63
  REQ_CLUSTER = True
64
  REQ_MASTER = True
65

    
66
  def __init__(self, processor, op, cfg, sstore):
67
    """Constructor for LogicalUnit.
68

69
    This needs to be overriden in derived classes in order to check op
70
    validity.
71

72
    """
73
    self.processor = processor
74
    self.op = op
75
    self.cfg = cfg
76
    self.sstore = sstore
77
    for attr_name in self._OP_REQP:
78
      attr_val = getattr(op, attr_name, None)
79
      if attr_val is None:
80
        raise errors.OpPrereqError("Required parameter '%s' missing" %
81
                                   attr_name)
82
    if self.REQ_CLUSTER:
83
      if not cfg.IsCluster():
84
        raise errors.OpPrereqError("Cluster not initialized yet,"
85
                                   " use 'gnt-cluster init' first.")
86
      if self.REQ_MASTER:
87
        master = sstore.GetMasterNode()
88
        if master != socket.gethostname():
89
          raise errors.OpPrereqError("Commands must be run on the master"
90
                                     " node %s" % master)
91

    
92
  def CheckPrereq(self):
93
    """Check prerequisites for this LU.
94

95
    This method should check that the prerequisites for the execution
96
    of this LU are fulfilled. It can do internode communication, but
97
    it should be idempotent - no cluster or system changes are
98
    allowed.
99

100
    The method should raise errors.OpPrereqError in case something is
101
    not fulfilled. Its return value is ignored.
102

103
    This method should also update all the parameters of the opcode to
104
    their canonical form; e.g. a short node name must be fully
105
    expanded after this method has successfully completed (so that
106
    hooks, logging, etc. work correctly).
107

108
    """
109
    raise NotImplementedError
110

    
111
  def Exec(self, feedback_fn):
112
    """Execute the LU.
113

114
    This method should implement the actual work. It should raise
115
    errors.OpExecError for failures that are somewhat dealt with in
116
    code, or expected.
117

118
    """
119
    raise NotImplementedError
120

    
121
  def BuildHooksEnv(self):
122
    """Build hooks environment for this LU.
123

124
    This method should return a three-node tuple consisting of: a dict
125
    containing the environment that will be used for running the
126
    specific hook for this LU, a list of node names on which the hook
127
    should run before the execution, and a list of node names on which
128
    the hook should run after the execution.
129

130
    The keys of the dict must not have 'GANETI_' prefixed as this will
131
    be handled in the hooks runner. Also note additional keys will be
132
    added by the hooks runner. If the LU doesn't define any
133
    environment, an empty dict (and not None) should be returned.
134

135
    As for the node lists, the master should not be included in the
136
    them, as it will be added by the hooks runner in case this LU
137
    requires a cluster to run on (otherwise we don't have a node
138
    list). No nodes should be returned as an empty list (and not
139
    None).
140

141
    Note that if the HPATH for a LU class is None, this function will
142
    not be called.
143

144
    """
145
    raise NotImplementedError
146

    
147

    
148
class NoHooksLU(LogicalUnit):
149
  """Simple LU which runs no hooks.
150

151
  This LU is intended as a parent for other LogicalUnits which will
152
  run no hooks, in order to reduce duplicate code.
153

154
  """
155
  HPATH = None
156
  HTYPE = None
157

    
158
  def BuildHooksEnv(self):
159
    """Build hooks env.
160

161
    This is a no-op, since we don't run hooks.
162

163
    """
164
    return
165

    
166

    
167
def _GetWantedNodes(lu, nodes):
168
  """Returns list of checked and expanded node names.
169

170
  Args:
171
    nodes: List of nodes (strings) or None for all
172

173
  """
174
  if not isinstance(nodes, list):
175
    raise errors.OpPrereqError("Invalid argument type 'nodes'")
176

    
177
  if nodes:
178
    wanted = []
179

    
180
    for name in nodes:
181
      node = lu.cfg.ExpandNodeName(name)
182
      if node is None:
183
        raise errors.OpPrereqError("No such node name '%s'" % name)
184
      wanted.append(node)
185

    
186
  else:
187
    wanted = lu.cfg.GetNodeList()
188
  return utils.NiceSort(wanted)
189

    
190

    
191
def _GetWantedInstances(lu, instances):
192
  """Returns list of checked and expanded instance names.
193

194
  Args:
195
    instances: List of instances (strings) or None for all
196

197
  """
198
  if not isinstance(instances, list):
199
    raise errors.OpPrereqError("Invalid argument type 'instances'")
200

    
201
  if instances:
202
    wanted = []
203

    
204
    for name in instances:
205
      instance = lu.cfg.ExpandInstanceName(name)
206
      if instance is None:
207
        raise errors.OpPrereqError("No such instance name '%s'" % name)
208
      wanted.append(instance)
209

    
210
  else:
211
    wanted = lu.cfg.GetInstanceList()
212
  return utils.NiceSort(wanted)
213

    
214

    
215
def _CheckOutputFields(static, dynamic, selected):
216
  """Checks whether all selected fields are valid.
217

218
  Args:
219
    static: Static fields
220
    dynamic: Dynamic fields
221

222
  """
223
  static_fields = frozenset(static)
224
  dynamic_fields = frozenset(dynamic)
225

    
226
  all_fields = static_fields | dynamic_fields
227

    
228
  if not all_fields.issuperset(selected):
229
    raise errors.OpPrereqError("Unknown output fields selected: %s"
230
                               % ",".join(frozenset(selected).
231
                                          difference(all_fields)))
232

    
233

    
234
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
235
                          memory, vcpus, nics):
236
  """Builds instance related env variables for hooks from single variables.
237

238
  Args:
239
    secondary_nodes: List of secondary nodes as strings
240
  """
241
  env = {
242
    "INSTANCE_NAME": name,
243
    "INSTANCE_PRIMARY": primary_node,
244
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
245
    "INSTANCE_OS_TYPE": os_type,
246
    "INSTANCE_STATUS": status,
247
    "INSTANCE_MEMORY": memory,
248
    "INSTANCE_VCPUS": vcpus,
249
  }
250

    
251
  if nics:
252
    nic_count = len(nics)
253
    for idx, (ip, bridge) in enumerate(nics):
254
      if ip is None:
255
        ip = ""
256
      env["INSTANCE_NIC%d_IP" % idx] = ip
257
      env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
258
  else:
259
    nic_count = 0
260

    
261
  env["INSTANCE_NIC_COUNT"] = nic_count
262

    
263
  return env
264

    
265

    
266
def _BuildInstanceHookEnvByObject(instance, override=None):
267
  """Builds instance related env variables for hooks from an object.
268

269
  Args:
270
    instance: objects.Instance object of instance
271
    override: dict of values to override
272
  """
273
  args = {
274
    'name': instance.name,
275
    'primary_node': instance.primary_node,
276
    'secondary_nodes': instance.secondary_nodes,
277
    'os_type': instance.os,
278
    'status': instance.os,
279
    'memory': instance.memory,
280
    'vcpus': instance.vcpus,
281
    'nics': [(nic.ip, nic.bridge) for nic in instance.nics],
282
  }
283
  if override:
284
    args.update(override)
285
  return _BuildInstanceHookEnv(**args)
286

    
287

    
288
def _UpdateEtcHosts(fullnode, ip):
289
  """Ensure a node has a correct entry in /etc/hosts.
290

291
  Args:
292
    fullnode - Fully qualified domain name of host. (str)
293
    ip       - IPv4 address of host (str)
294

295
  """
296
  node = fullnode.split(".", 1)[0]
297

    
298
  f = open('/etc/hosts', 'r+')
299

    
300
  inthere = False
301

    
302
  save_lines = []
303
  add_lines = []
304
  removed = False
305

    
306
  while True:
307
    rawline = f.readline()
308

    
309
    if not rawline:
310
      # End of file
311
      break
312

    
313
    line = rawline.split('\n')[0]
314

    
315
    # Strip off comments
316
    line = line.split('#')[0]
317

    
318
    if not line:
319
      # Entire line was comment, skip
320
      save_lines.append(rawline)
321
      continue
322

    
323
    fields = line.split()
324

    
325
    haveall = True
326
    havesome = False
327
    for spec in [ ip, fullnode, node ]:
328
      if spec not in fields:
329
        haveall = False
330
      if spec in fields:
331
        havesome = True
332

    
333
    if haveall:
334
      inthere = True
335
      save_lines.append(rawline)
336
      continue
337

    
338
    if havesome and not haveall:
339
      # Line (old, or manual?) which is missing some.  Remove.
340
      removed = True
341
      continue
342

    
343
    save_lines.append(rawline)
344

    
345
  if not inthere:
346
    add_lines.append('%s\t%s %s\n' % (ip, fullnode, node))
347

    
348
  if removed:
349
    if add_lines:
350
      save_lines = save_lines + add_lines
351

    
352
    # We removed a line, write a new file and replace old.
353
    fd, tmpname = tempfile.mkstemp('tmp', 'hosts_', '/etc')
354
    newfile = os.fdopen(fd, 'w')
355
    newfile.write(''.join(save_lines))
356
    newfile.close()
357
    os.rename(tmpname, '/etc/hosts')
358

    
359
  elif add_lines:
360
    # Simply appending a new line will do the trick.
361
    f.seek(0, 2)
362
    for add in add_lines:
363
      f.write(add)
364

    
365
  f.close()
366

    
367

    
368
def _UpdateKnownHosts(fullnode, ip, pubkey):
369
  """Ensure a node has a correct known_hosts entry.
370

371
  Args:
372
    fullnode - Fully qualified domain name of host. (str)
373
    ip       - IPv4 address of host (str)
374
    pubkey   - the public key of the cluster
375

376
  """
377
  if os.path.exists(constants.SSH_KNOWN_HOSTS_FILE):
378
    f = open(constants.SSH_KNOWN_HOSTS_FILE, 'r+')
379
  else:
380
    f = open(constants.SSH_KNOWN_HOSTS_FILE, 'w+')
381

    
382
  inthere = False
383

    
384
  save_lines = []
385
  add_lines = []
386
  removed = False
387

    
388
  while True:
389
    rawline = f.readline()
390
    logger.Debug('read %s' % (repr(rawline),))
391

    
392
    if not rawline:
393
      # End of file
394
      break
395

    
396
    line = rawline.split('\n')[0]
397

    
398
    parts = line.split(' ')
399
    fields = parts[0].split(',')
400
    key = parts[2]
401

    
402
    haveall = True
403
    havesome = False
404
    for spec in [ ip, fullnode ]:
405
      if spec not in fields:
406
        haveall = False
407
      if spec in fields:
408
        havesome = True
409

    
410
    logger.Debug("key, pubkey = %s." % (repr((key, pubkey)),))
411
    if haveall and key == pubkey:
412
      inthere = True
413
      save_lines.append(rawline)
414
      logger.Debug("Keeping known_hosts '%s'." % (repr(rawline),))
415
      continue
416

    
417
    if havesome and (not haveall or key != pubkey):
418
      removed = True
419
      logger.Debug("Discarding known_hosts '%s'." % (repr(rawline),))
420
      continue
421

    
422
    save_lines.append(rawline)
423

    
424
  if not inthere:
425
    add_lines.append('%s,%s ssh-rsa %s\n' % (fullnode, ip, pubkey))
426
    logger.Debug("Adding known_hosts '%s'." % (repr(add_lines[-1]),))
427

    
428
  if removed:
429
    save_lines = save_lines + add_lines
430

    
431
    # Write a new file and replace old.
432
    fd, tmpname = tempfile.mkstemp('.tmp', 'known_hosts.',
433
                                   constants.DATA_DIR)
434
    newfile = os.fdopen(fd, 'w')
435
    try:
436
      newfile.write(''.join(save_lines))
437
    finally:
438
      newfile.close()
439
    logger.Debug("Wrote new known_hosts.")
440
    os.rename(tmpname, constants.SSH_KNOWN_HOSTS_FILE)
441

    
442
  elif add_lines:
443
    # Simply appending a new line will do the trick.
444
    f.seek(0, 2)
445
    for add in add_lines:
446
      f.write(add)
447

    
448
  f.close()
449

    
450

    
451
def _HasValidVG(vglist, vgname):
452
  """Checks if the volume group list is valid.
453

454
  A non-None return value means there's an error, and the return value
455
  is the error message.
456

457
  """
458
  vgsize = vglist.get(vgname, None)
459
  if vgsize is None:
460
    return "volume group '%s' missing" % vgname
461
  elif vgsize < 20480:
462
    return ("volume group '%s' too small (20480MiB required, %dMib found)" %
463
            (vgname, vgsize))
464
  return None
465

    
466

    
467
def _InitSSHSetup(node):
468
  """Setup the SSH configuration for the cluster.
469

470

471
  This generates a dsa keypair for root, adds the pub key to the
472
  permitted hosts and adds the hostkey to its own known hosts.
473

474
  Args:
475
    node: the name of this host as a fqdn
476

477
  """
478
  if os.path.exists('/root/.ssh/id_dsa'):
479
    utils.CreateBackup('/root/.ssh/id_dsa')
480
  if os.path.exists('/root/.ssh/id_dsa.pub'):
481
    utils.CreateBackup('/root/.ssh/id_dsa.pub')
482

    
483
  utils.RemoveFile('/root/.ssh/id_dsa')
484
  utils.RemoveFile('/root/.ssh/id_dsa.pub')
485

    
486
  result = utils.RunCmd(["ssh-keygen", "-t", "dsa",
487
                         "-f", "/root/.ssh/id_dsa",
488
                         "-q", "-N", ""])
489
  if result.failed:
490
    raise errors.OpExecError("Could not generate ssh keypair, error %s" %
491
                             result.output)
492

    
493
  f = open('/root/.ssh/id_dsa.pub', 'r')
494
  try:
495
    utils.AddAuthorizedKey('/root/.ssh/authorized_keys', f.read(8192))
496
  finally:
497
    f.close()
498

    
499

    
500
def _InitGanetiServerSetup(ss):
501
  """Setup the necessary configuration for the initial node daemon.
502

503
  This creates the nodepass file containing the shared password for
504
  the cluster and also generates the SSL certificate.
505

506
  """
507
  # Create pseudo random password
508
  randpass = sha.new(os.urandom(64)).hexdigest()
509
  # and write it into sstore
510
  ss.SetKey(ss.SS_NODED_PASS, randpass)
511

    
512
  result = utils.RunCmd(["openssl", "req", "-new", "-newkey", "rsa:1024",
513
                         "-days", str(365*5), "-nodes", "-x509",
514
                         "-keyout", constants.SSL_CERT_FILE,
515
                         "-out", constants.SSL_CERT_FILE, "-batch"])
516
  if result.failed:
517
    raise errors.OpExecError("could not generate server ssl cert, command"
518
                             " %s had exitcode %s and error message %s" %
519
                             (result.cmd, result.exit_code, result.output))
520

    
521
  os.chmod(constants.SSL_CERT_FILE, 0400)
522

    
523
  result = utils.RunCmd([constants.NODE_INITD_SCRIPT, "restart"])
524

    
525
  if result.failed:
526
    raise errors.OpExecError("Could not start the node daemon, command %s"
527
                             " had exitcode %s and error %s" %
528
                             (result.cmd, result.exit_code, result.output))
529

    
530

    
531
class LUInitCluster(LogicalUnit):
532
  """Initialise the cluster.
533

534
  """
535
  HPATH = "cluster-init"
536
  HTYPE = constants.HTYPE_CLUSTER
537
  _OP_REQP = ["cluster_name", "hypervisor_type", "vg_name", "mac_prefix",
538
              "def_bridge", "master_netdev"]
539
  REQ_CLUSTER = False
540

    
541
  def BuildHooksEnv(self):
542
    """Build hooks env.
543

544
    Notes: Since we don't require a cluster, we must manually add
545
    ourselves in the post-run node list.
546

547
    """
548
    env = {
549
      "CLUSTER": self.op.cluster_name,
550
      "MASTER": self.hostname['hostname_full'],
551
      }
552
    return env, [], [self.hostname['hostname_full']]
553

    
554
  def CheckPrereq(self):
555
    """Verify that the passed name is a valid one.
556

557
    """
558
    if config.ConfigWriter.IsCluster():
559
      raise errors.OpPrereqError("Cluster is already initialised")
560

    
561
    hostname_local = socket.gethostname()
562
    self.hostname = hostname = utils.LookupHostname(hostname_local)
563
    if not hostname:
564
      raise errors.OpPrereqError("Cannot resolve my own hostname ('%s')" %
565
                                 hostname_local)
566

    
567
    if hostname["hostname_full"] != hostname_local:
568
      raise errors.OpPrereqError("My own hostname (%s) does not match the"
569
                                 " resolver (%s): probably not using FQDN"
570
                                 " for hostname." %
571
                                 (hostname_local, hostname["hostname_full"]))
572

    
573
    if hostname["ip"].startswith("127."):
574
      raise errors.OpPrereqError("This host's IP resolves to the private"
575
                                 " range (%s). Please fix DNS or /etc/hosts." %
576
                                 (hostname["ip"],))
577

    
578
    self.clustername = clustername = utils.LookupHostname(self.op.cluster_name)
579
    if not clustername:
580
      raise errors.OpPrereqError("Cannot resolve given cluster name ('%s')"
581
                                 % self.op.cluster_name)
582

    
583
    result = utils.RunCmd(["fping", "-S127.0.0.1", "-q", hostname['ip']])
584
    if result.failed:
585
      raise errors.OpPrereqError("Inconsistency: this host's name resolves"
586
                                 " to %s,\nbut this ip address does not"
587
                                 " belong to this host."
588
                                 " Aborting." % hostname['ip'])
589

    
590
    secondary_ip = getattr(self.op, "secondary_ip", None)
591
    if secondary_ip and not utils.IsValidIP(secondary_ip):
592
      raise errors.OpPrereqError("Invalid secondary ip given")
593
    if secondary_ip and secondary_ip != hostname['ip']:
594
      result = utils.RunCmd(["fping", "-S127.0.0.1", "-q", secondary_ip])
595
      if result.failed:
596
        raise errors.OpPrereqError("You gave %s as secondary IP,\n"
597
                                   "but it does not belong to this host." %
598
                                   secondary_ip)
599
    self.secondary_ip = secondary_ip
600

    
601
    # checks presence of the volume group given
602
    vgstatus = _HasValidVG(utils.ListVolumeGroups(), self.op.vg_name)
603

    
604
    if vgstatus:
605
      raise errors.OpPrereqError("Error: %s" % vgstatus)
606

    
607
    if not re.match("^[0-9a-z]{2}:[0-9a-z]{2}:[0-9a-z]{2}$",
608
                    self.op.mac_prefix):
609
      raise errors.OpPrereqError("Invalid mac prefix given '%s'" %
610
                                 self.op.mac_prefix)
611

    
612
    if self.op.hypervisor_type not in hypervisor.VALID_HTYPES:
613
      raise errors.OpPrereqError("Invalid hypervisor type given '%s'" %
614
                                 self.op.hypervisor_type)
615

    
616
    result = utils.RunCmd(["ip", "link", "show", "dev", self.op.master_netdev])
617
    if result.failed:
618
      raise errors.OpPrereqError("Invalid master netdev given (%s): '%s'" %
619
                                 (self.op.master_netdev,
620
                                  result.output.strip()))
621

    
622
  def Exec(self, feedback_fn):
623
    """Initialize the cluster.
624

625
    """
626
    clustername = self.clustername
627
    hostname = self.hostname
628

    
629
    # set up the simple store
630
    ss = ssconf.SimpleStore()
631
    ss.SetKey(ss.SS_HYPERVISOR, self.op.hypervisor_type)
632
    ss.SetKey(ss.SS_MASTER_NODE, hostname['hostname_full'])
633
    ss.SetKey(ss.SS_MASTER_IP, clustername['ip'])
634
    ss.SetKey(ss.SS_MASTER_NETDEV, self.op.master_netdev)
635
    ss.SetKey(ss.SS_CLUSTER_NAME, clustername['hostname'])
636

    
637
    # set up the inter-node password and certificate
638
    _InitGanetiServerSetup(ss)
639

    
640
    # start the master ip
641
    rpc.call_node_start_master(hostname['hostname_full'])
642

    
643
    # set up ssh config and /etc/hosts
644
    f = open('/etc/ssh/ssh_host_rsa_key.pub', 'r')
645
    try:
646
      sshline = f.read()
647
    finally:
648
      f.close()
649
    sshkey = sshline.split(" ")[1]
650

    
651
    _UpdateEtcHosts(hostname['hostname_full'],
652
                    hostname['ip'],
653
                    )
654

    
655
    _UpdateKnownHosts(hostname['hostname_full'],
656
                      hostname['ip'],
657
                      sshkey,
658
                      )
659

    
660
    _InitSSHSetup(hostname['hostname'])
661

    
662
    # init of cluster config file
663
    cfgw = config.ConfigWriter()
664
    cfgw.InitConfig(hostname['hostname'], hostname['ip'], self.secondary_ip,
665
                    sshkey, self.op.mac_prefix,
666
                    self.op.vg_name, self.op.def_bridge)
667

    
668

    
669
class LUDestroyCluster(NoHooksLU):
670
  """Logical unit for destroying the cluster.
671

672
  """
673
  _OP_REQP = []
674

    
675
  def CheckPrereq(self):
676
    """Check prerequisites.
677

678
    This checks whether the cluster is empty.
679

680
    Any errors are signalled by raising errors.OpPrereqError.
681

682
    """
683
    master = self.sstore.GetMasterNode()
684

    
685
    nodelist = self.cfg.GetNodeList()
686
    if len(nodelist) != 1 or nodelist[0] != master:
687
      raise errors.OpPrereqError("There are still %d node(s) in"
688
                                 " this cluster." % (len(nodelist) - 1))
689
    instancelist = self.cfg.GetInstanceList()
690
    if instancelist:
691
      raise errors.OpPrereqError("There are still %d instance(s) in"
692
                                 " this cluster." % len(instancelist))
693

    
694
  def Exec(self, feedback_fn):
695
    """Destroys the cluster.
696

697
    """
698
    utils.CreateBackup('/root/.ssh/id_dsa')
699
    utils.CreateBackup('/root/.ssh/id_dsa.pub')
700
    rpc.call_node_leave_cluster(self.sstore.GetMasterNode())
701

    
702

    
703
class LUVerifyCluster(NoHooksLU):
704
  """Verifies the cluster status.
705

706
  """
707
  _OP_REQP = []
708

    
709
  def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
710
                  remote_version, feedback_fn):
711
    """Run multiple tests against a node.
712

713
    Test list:
714
      - compares ganeti version
715
      - checks vg existance and size > 20G
716
      - checks config file checksum
717
      - checks ssh to other nodes
718

719
    Args:
720
      node: name of the node to check
721
      file_list: required list of files
722
      local_cksum: dictionary of local files and their checksums
723

724
    """
725
    # compares ganeti version
726
    local_version = constants.PROTOCOL_VERSION
727
    if not remote_version:
728
      feedback_fn(" - ERROR: connection to %s failed" % (node))
729
      return True
730

    
731
    if local_version != remote_version:
732
      feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
733
                      (local_version, node, remote_version))
734
      return True
735

    
736
    # checks vg existance and size > 20G
737

    
738
    bad = False
739
    if not vglist:
740
      feedback_fn("  - ERROR: unable to check volume groups on node %s." %
741
                      (node,))
742
      bad = True
743
    else:
744
      vgstatus = _HasValidVG(vglist, self.cfg.GetVGName())
745
      if vgstatus:
746
        feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
747
        bad = True
748

    
749
    # checks config file checksum
750
    # checks ssh to any
751

    
752
    if 'filelist' not in node_result:
753
      bad = True
754
      feedback_fn("  - ERROR: node hasn't returned file checksum data")
755
    else:
756
      remote_cksum = node_result['filelist']
757
      for file_name in file_list:
758
        if file_name not in remote_cksum:
759
          bad = True
760
          feedback_fn("  - ERROR: file '%s' missing" % file_name)
761
        elif remote_cksum[file_name] != local_cksum[file_name]:
762
          bad = True
763
          feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
764

    
765
    if 'nodelist' not in node_result:
766
      bad = True
767
      feedback_fn("  - ERROR: node hasn't returned node connectivity data")
768
    else:
769
      if node_result['nodelist']:
770
        bad = True
771
        for node in node_result['nodelist']:
772
          feedback_fn("  - ERROR: communication with node '%s': %s" %
773
                          (node, node_result['nodelist'][node]))
774
    hyp_result = node_result.get('hypervisor', None)
775
    if hyp_result is not None:
776
      feedback_fn("  - ERROR: hypervisor verify failure: '%s'" % hyp_result)
777
    return bad
778

    
779
  def _VerifyInstance(self, instance, node_vol_is, node_instance, feedback_fn):
780
    """Verify an instance.
781

782
    This function checks to see if the required block devices are
783
    available on the instance's node.
784

785
    """
786
    bad = False
787

    
788
    instancelist = self.cfg.GetInstanceList()
789
    if not instance in instancelist:
790
      feedback_fn("  - ERROR: instance %s not in instance list %s" %
791
                      (instance, instancelist))
792
      bad = True
793

    
794
    instanceconfig = self.cfg.GetInstanceInfo(instance)
795
    node_current = instanceconfig.primary_node
796

    
797
    node_vol_should = {}
798
    instanceconfig.MapLVsByNode(node_vol_should)
799

    
800
    for node in node_vol_should:
801
      for volume in node_vol_should[node]:
802
        if node not in node_vol_is or volume not in node_vol_is[node]:
803
          feedback_fn("  - ERROR: volume %s missing on node %s" %
804
                          (volume, node))
805
          bad = True
806

    
807
    if not instanceconfig.status == 'down':
808
      if not instance in node_instance[node_current]:
809
        feedback_fn("  - ERROR: instance %s not running on node %s" %
810
                        (instance, node_current))
811
        bad = True
812

    
813
    for node in node_instance:
814
      if (not node == node_current):
815
        if instance in node_instance[node]:
816
          feedback_fn("  - ERROR: instance %s should not run on node %s" %
817
                          (instance, node))
818
          bad = True
819

    
820
    return not bad
821

    
822
  def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
823
    """Verify if there are any unknown volumes in the cluster.
824

825
    The .os, .swap and backup volumes are ignored. All other volumes are
826
    reported as unknown.
827

828
    """
829
    bad = False
830

    
831
    for node in node_vol_is:
832
      for volume in node_vol_is[node]:
833
        if node not in node_vol_should or volume not in node_vol_should[node]:
834
          feedback_fn("  - ERROR: volume %s on node %s should not exist" %
835
                      (volume, node))
836
          bad = True
837
    return bad
838

    
839
  def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
840
    """Verify the list of running instances.
841

842
    This checks what instances are running but unknown to the cluster.
843

844
    """
845
    bad = False
846
    for node in node_instance:
847
      for runninginstance in node_instance[node]:
848
        if runninginstance not in instancelist:
849
          feedback_fn("  - ERROR: instance %s on node %s should not exist" %
850
                          (runninginstance, node))
851
          bad = True
852
    return bad
853

    
854
  def CheckPrereq(self):
855
    """Check prerequisites.
856

857
    This has no prerequisites.
858

859
    """
860
    pass
861

    
862
  def Exec(self, feedback_fn):
863
    """Verify integrity of cluster, performing various test on nodes.
864

865
    """
866
    bad = False
867
    feedback_fn("* Verifying global settings")
868
    self.cfg.VerifyConfig()
869

    
870
    master = self.sstore.GetMasterNode()
871
    vg_name = self.cfg.GetVGName()
872
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
873
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
874
    node_volume = {}
875
    node_instance = {}
876

    
877
    # FIXME: verify OS list
878
    # do local checksums
879
    file_names = list(self.sstore.GetFileList())
880
    file_names.append(constants.SSL_CERT_FILE)
881
    file_names.append(constants.CLUSTER_CONF_FILE)
882
    local_checksums = utils.FingerprintFiles(file_names)
883

    
884
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
885
    all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
886
    all_instanceinfo = rpc.call_instance_list(nodelist)
887
    all_vglist = rpc.call_vg_list(nodelist)
888
    node_verify_param = {
889
      'filelist': file_names,
890
      'nodelist': nodelist,
891
      'hypervisor': None,
892
      }
893
    all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
894
    all_rversion = rpc.call_version(nodelist)
895

    
896
    for node in nodelist:
897
      feedback_fn("* Verifying node %s" % node)
898
      result = self._VerifyNode(node, file_names, local_checksums,
899
                                all_vglist[node], all_nvinfo[node],
900
                                all_rversion[node], feedback_fn)
901
      bad = bad or result
902

    
903
      # node_volume
904
      volumeinfo = all_volumeinfo[node]
905

    
906
      if type(volumeinfo) != dict:
907
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
908
        bad = True
909
        continue
910

    
911
      node_volume[node] = volumeinfo
912

    
913
      # node_instance
914
      nodeinstance = all_instanceinfo[node]
915
      if type(nodeinstance) != list:
916
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
917
        bad = True
918
        continue
919

    
920
      node_instance[node] = nodeinstance
921

    
922
    node_vol_should = {}
923

    
924
    for instance in instancelist:
925
      feedback_fn("* Verifying instance %s" % instance)
926
      result =  self._VerifyInstance(instance, node_volume, node_instance,
927
                                     feedback_fn)
928
      bad = bad or result
929

    
930
      inst_config = self.cfg.GetInstanceInfo(instance)
931

    
932
      inst_config.MapLVsByNode(node_vol_should)
933

    
934
    feedback_fn("* Verifying orphan volumes")
935
    result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
936
                                       feedback_fn)
937
    bad = bad or result
938

    
939
    feedback_fn("* Verifying remaining instances")
940
    result = self._VerifyOrphanInstances(instancelist, node_instance,
941
                                         feedback_fn)
942
    bad = bad or result
943

    
944
    return int(bad)
945

    
946

    
947
def _WaitForSync(cfgw, instance, oneshot=False, unlock=False):
948
  """Sleep and poll for an instance's disk to sync.
949

950
  """
951
  if not instance.disks:
952
    return True
953

    
954
  if not oneshot:
955
    logger.ToStdout("Waiting for instance %s to sync disks." % instance.name)
956

    
957
  node = instance.primary_node
958

    
959
  for dev in instance.disks:
960
    cfgw.SetDiskID(dev, node)
961

    
962
  retries = 0
963
  while True:
964
    max_time = 0
965
    done = True
966
    cumul_degraded = False
967
    rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
968
    if not rstats:
969
      logger.ToStderr("Can't get any data from node %s" % node)
970
      retries += 1
971
      if retries >= 10:
972
        raise errors.RemoteError("Can't contact node %s for mirror data,"
973
                                 " aborting." % node)
974
      time.sleep(6)
975
      continue
976
    retries = 0
977
    for i in range(len(rstats)):
978
      mstat = rstats[i]
979
      if mstat is None:
980
        logger.ToStderr("Can't compute data for node %s/%s" %
981
                        (node, instance.disks[i].iv_name))
982
        continue
983
      perc_done, est_time, is_degraded = mstat
984
      cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
985
      if perc_done is not None:
986
        done = False
987
        if est_time is not None:
988
          rem_time = "%d estimated seconds remaining" % est_time
989
          max_time = est_time
990
        else:
991
          rem_time = "no time estimate"
992
        logger.ToStdout("- device %s: %5.2f%% done, %s" %
993
                        (instance.disks[i].iv_name, perc_done, rem_time))
994
    if done or oneshot:
995
      break
996

    
997
    if unlock:
998
      utils.Unlock('cmd')
999
    try:
1000
      time.sleep(min(60, max_time))
1001
    finally:
1002
      if unlock:
1003
        utils.Lock('cmd')
1004

    
1005
  if done:
1006
    logger.ToStdout("Instance %s's disks are in sync." % instance.name)
1007
  return not cumul_degraded
1008

    
1009

    
1010
def _CheckDiskConsistency(cfgw, dev, node, on_primary):
1011
  """Check that mirrors are not degraded.
1012

1013
  """
1014
  cfgw.SetDiskID(dev, node)
1015

    
1016
  result = True
1017
  if on_primary or dev.AssembleOnSecondary():
1018
    rstats = rpc.call_blockdev_find(node, dev)
1019
    if not rstats:
1020
      logger.ToStderr("Can't get any data from node %s" % node)
1021
      result = False
1022
    else:
1023
      result = result and (not rstats[5])
1024
  if dev.children:
1025
    for child in dev.children:
1026
      result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1027

    
1028
  return result
1029

    
1030

    
1031
class LUDiagnoseOS(NoHooksLU):
1032
  """Logical unit for OS diagnose/query.
1033

1034
  """
1035
  _OP_REQP = []
1036

    
1037
  def CheckPrereq(self):
1038
    """Check prerequisites.
1039

1040
    This always succeeds, since this is a pure query LU.
1041

1042
    """
1043
    return
1044

    
1045
  def Exec(self, feedback_fn):
1046
    """Compute the list of OSes.
1047

1048
    """
1049
    node_list = self.cfg.GetNodeList()
1050
    node_data = rpc.call_os_diagnose(node_list)
1051
    if node_data == False:
1052
      raise errors.OpExecError("Can't gather the list of OSes")
1053
    return node_data
1054

    
1055

    
1056
class LURemoveNode(LogicalUnit):
1057
  """Logical unit for removing a node.
1058

1059
  """
1060
  HPATH = "node-remove"
1061
  HTYPE = constants.HTYPE_NODE
1062
  _OP_REQP = ["node_name"]
1063

    
1064
  def BuildHooksEnv(self):
1065
    """Build hooks env.
1066

1067
    This doesn't run on the target node in the pre phase as a failed
1068
    node would not allows itself to run.
1069

1070
    """
1071
    env = {
1072
      "NODE_NAME": self.op.node_name,
1073
      }
1074
    all_nodes = self.cfg.GetNodeList()
1075
    all_nodes.remove(self.op.node_name)
1076
    return env, all_nodes, all_nodes
1077

    
1078
  def CheckPrereq(self):
1079
    """Check prerequisites.
1080

1081
    This checks:
1082
     - the node exists in the configuration
1083
     - it does not have primary or secondary instances
1084
     - it's not the master
1085

1086
    Any errors are signalled by raising errors.OpPrereqError.
1087

1088
    """
1089
    node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1090
    if node is None:
1091
      raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1092

    
1093
    instance_list = self.cfg.GetInstanceList()
1094

    
1095
    masternode = self.sstore.GetMasterNode()
1096
    if node.name == masternode:
1097
      raise errors.OpPrereqError("Node is the master node,"
1098
                                 " you need to failover first.")
1099

    
1100
    for instance_name in instance_list:
1101
      instance = self.cfg.GetInstanceInfo(instance_name)
1102
      if node.name == instance.primary_node:
1103
        raise errors.OpPrereqError("Instance %s still running on the node,"
1104
                                   " please remove first." % instance_name)
1105
      if node.name in instance.secondary_nodes:
1106
        raise errors.OpPrereqError("Instance %s has node as a secondary,"
1107
                                   " please remove first." % instance_name)
1108
    self.op.node_name = node.name
1109
    self.node = node
1110

    
1111
  def Exec(self, feedback_fn):
1112
    """Removes the node from the cluster.
1113

1114
    """
1115
    node = self.node
1116
    logger.Info("stopping the node daemon and removing configs from node %s" %
1117
                node.name)
1118

    
1119
    rpc.call_node_leave_cluster(node.name)
1120

    
1121
    ssh.SSHCall(node.name, 'root', "%s stop" % constants.NODE_INITD_SCRIPT)
1122

    
1123
    logger.Info("Removing node %s from config" % node.name)
1124

    
1125
    self.cfg.RemoveNode(node.name)
1126

    
1127

    
1128
class LUQueryNodes(NoHooksLU):
1129
  """Logical unit for querying nodes.
1130

1131
  """
1132
  _OP_REQP = ["output_fields", "names"]
1133

    
1134
  def CheckPrereq(self):
1135
    """Check prerequisites.
1136

1137
    This checks that the fields required are valid output fields.
1138

1139
    """
1140
    self.dynamic_fields = frozenset(["dtotal", "dfree",
1141
                                     "mtotal", "mnode", "mfree"])
1142

    
1143
    _CheckOutputFields(static=["name", "pinst_cnt", "sinst_cnt",
1144
                               "pinst_list", "sinst_list",
1145
                               "pip", "sip"],
1146
                       dynamic=self.dynamic_fields,
1147
                       selected=self.op.output_fields)
1148

    
1149
    self.wanted = _GetWantedNodes(self, self.op.names)
1150

    
1151
  def Exec(self, feedback_fn):
1152
    """Computes the list of nodes and their attributes.
1153

1154
    """
1155
    nodenames = self.wanted
1156
    nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1157

    
1158
    # begin data gathering
1159

    
1160
    if self.dynamic_fields.intersection(self.op.output_fields):
1161
      live_data = {}
1162
      node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1163
      for name in nodenames:
1164
        nodeinfo = node_data.get(name, None)
1165
        if nodeinfo:
1166
          live_data[name] = {
1167
            "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1168
            "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1169
            "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1170
            "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1171
            "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1172
            }
1173
        else:
1174
          live_data[name] = {}
1175
    else:
1176
      live_data = dict.fromkeys(nodenames, {})
1177

    
1178
    node_to_primary = dict([(name, set()) for name in nodenames])
1179
    node_to_secondary = dict([(name, set()) for name in nodenames])
1180

    
1181
    inst_fields = frozenset(("pinst_cnt", "pinst_list",
1182
                             "sinst_cnt", "sinst_list"))
1183
    if inst_fields & frozenset(self.op.output_fields):
1184
      instancelist = self.cfg.GetInstanceList()
1185

    
1186
      for instance_name in instancelist:
1187
        inst = self.cfg.GetInstanceInfo(instance_name)
1188
        if inst.primary_node in node_to_primary:
1189
          node_to_primary[inst.primary_node].add(inst.name)
1190
        for secnode in inst.secondary_nodes:
1191
          if secnode in node_to_secondary:
1192
            node_to_secondary[secnode].add(inst.name)
1193

    
1194
    # end data gathering
1195

    
1196
    output = []
1197
    for node in nodelist:
1198
      node_output = []
1199
      for field in self.op.output_fields:
1200
        if field == "name":
1201
          val = node.name
1202
        elif field == "pinst_list":
1203
          val = list(node_to_primary[node.name])
1204
        elif field == "sinst_list":
1205
          val = list(node_to_secondary[node.name])
1206
        elif field == "pinst_cnt":
1207
          val = len(node_to_primary[node.name])
1208
        elif field == "sinst_cnt":
1209
          val = len(node_to_secondary[node.name])
1210
        elif field == "pip":
1211
          val = node.primary_ip
1212
        elif field == "sip":
1213
          val = node.secondary_ip
1214
        elif field in self.dynamic_fields:
1215
          val = live_data[node.name].get(field, None)
1216
        else:
1217
          raise errors.ParameterError(field)
1218
        node_output.append(val)
1219
      output.append(node_output)
1220

    
1221
    return output
1222

    
1223

    
1224
class LUQueryNodeVolumes(NoHooksLU):
1225
  """Logical unit for getting volumes on node(s).
1226

1227
  """
1228
  _OP_REQP = ["nodes", "output_fields"]
1229

    
1230
  def CheckPrereq(self):
1231
    """Check prerequisites.
1232

1233
    This checks that the fields required are valid output fields.
1234

1235
    """
1236
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1237

    
1238
    _CheckOutputFields(static=["node"],
1239
                       dynamic=["phys", "vg", "name", "size", "instance"],
1240
                       selected=self.op.output_fields)
1241

    
1242

    
1243
  def Exec(self, feedback_fn):
1244
    """Computes the list of nodes and their attributes.
1245

1246
    """
1247
    nodenames = self.nodes
1248
    volumes = rpc.call_node_volumes(nodenames)
1249

    
1250
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
1251
             in self.cfg.GetInstanceList()]
1252

    
1253
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1254

    
1255
    output = []
1256
    for node in nodenames:
1257
      if node not in volumes or not volumes[node]:
1258
        continue
1259

    
1260
      node_vols = volumes[node][:]
1261
      node_vols.sort(key=lambda vol: vol['dev'])
1262

    
1263
      for vol in node_vols:
1264
        node_output = []
1265
        for field in self.op.output_fields:
1266
          if field == "node":
1267
            val = node
1268
          elif field == "phys":
1269
            val = vol['dev']
1270
          elif field == "vg":
1271
            val = vol['vg']
1272
          elif field == "name":
1273
            val = vol['name']
1274
          elif field == "size":
1275
            val = int(float(vol['size']))
1276
          elif field == "instance":
1277
            for inst in ilist:
1278
              if node not in lv_by_node[inst]:
1279
                continue
1280
              if vol['name'] in lv_by_node[inst][node]:
1281
                val = inst.name
1282
                break
1283
            else:
1284
              val = '-'
1285
          else:
1286
            raise errors.ParameterError(field)
1287
          node_output.append(str(val))
1288

    
1289
        output.append(node_output)
1290

    
1291
    return output
1292

    
1293

    
1294
class LUAddNode(LogicalUnit):
1295
  """Logical unit for adding node to the cluster.
1296

1297
  """
1298
  HPATH = "node-add"
1299
  HTYPE = constants.HTYPE_NODE
1300
  _OP_REQP = ["node_name"]
1301

    
1302
  def BuildHooksEnv(self):
1303
    """Build hooks env.
1304

1305
    This will run on all nodes before, and on all nodes + the new node after.
1306

1307
    """
1308
    env = {
1309
      "NODE_NAME": self.op.node_name,
1310
      "NODE_PIP": self.op.primary_ip,
1311
      "NODE_SIP": self.op.secondary_ip,
1312
      }
1313
    nodes_0 = self.cfg.GetNodeList()
1314
    nodes_1 = nodes_0 + [self.op.node_name, ]
1315
    return env, nodes_0, nodes_1
1316

    
1317
  def CheckPrereq(self):
1318
    """Check prerequisites.
1319

1320
    This checks:
1321
     - the new node is not already in the config
1322
     - it is resolvable
1323
     - its parameters (single/dual homed) matches the cluster
1324

1325
    Any errors are signalled by raising errors.OpPrereqError.
1326

1327
    """
1328
    node_name = self.op.node_name
1329
    cfg = self.cfg
1330

    
1331
    dns_data = utils.LookupHostname(node_name)
1332
    if not dns_data:
1333
      raise errors.OpPrereqError("Node %s is not resolvable" % node_name)
1334

    
1335
    node = dns_data['hostname']
1336
    primary_ip = self.op.primary_ip = dns_data['ip']
1337
    secondary_ip = getattr(self.op, "secondary_ip", None)
1338
    if secondary_ip is None:
1339
      secondary_ip = primary_ip
1340
    if not utils.IsValidIP(secondary_ip):
1341
      raise errors.OpPrereqError("Invalid secondary IP given")
1342
    self.op.secondary_ip = secondary_ip
1343
    node_list = cfg.GetNodeList()
1344
    if node in node_list:
1345
      raise errors.OpPrereqError("Node %s is already in the configuration"
1346
                                 % node)
1347

    
1348
    for existing_node_name in node_list:
1349
      existing_node = cfg.GetNodeInfo(existing_node_name)
1350
      if (existing_node.primary_ip == primary_ip or
1351
          existing_node.secondary_ip == primary_ip or
1352
          existing_node.primary_ip == secondary_ip or
1353
          existing_node.secondary_ip == secondary_ip):
1354
        raise errors.OpPrereqError("New node ip address(es) conflict with"
1355
                                   " existing node %s" % existing_node.name)
1356

    
1357
    # check that the type of the node (single versus dual homed) is the
1358
    # same as for the master
1359
    myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1360
    master_singlehomed = myself.secondary_ip == myself.primary_ip
1361
    newbie_singlehomed = secondary_ip == primary_ip
1362
    if master_singlehomed != newbie_singlehomed:
1363
      if master_singlehomed:
1364
        raise errors.OpPrereqError("The master has no private ip but the"
1365
                                   " new node has one")
1366
      else:
1367
        raise errors.OpPrereqError("The master has a private ip but the"
1368
                                   " new node doesn't have one")
1369

    
1370
    # checks reachablity
1371
    command = ["fping", "-q", primary_ip]
1372
    result = utils.RunCmd(command)
1373
    if result.failed:
1374
      raise errors.OpPrereqError("Node not reachable by ping")
1375

    
1376
    if not newbie_singlehomed:
1377
      # check reachability from my secondary ip to newbie's secondary ip
1378
      command = ["fping", "-S%s" % myself.secondary_ip, "-q", secondary_ip]
1379
      result = utils.RunCmd(command)
1380
      if result.failed:
1381
        raise errors.OpPrereqError("Node secondary ip not reachable by ping")
1382

    
1383
    self.new_node = objects.Node(name=node,
1384
                                 primary_ip=primary_ip,
1385
                                 secondary_ip=secondary_ip)
1386

    
1387
  def Exec(self, feedback_fn):
1388
    """Adds the new node to the cluster.
1389

1390
    """
1391
    new_node = self.new_node
1392
    node = new_node.name
1393

    
1394
    # set up inter-node password and certificate and restarts the node daemon
1395
    gntpass = self.sstore.GetNodeDaemonPassword()
1396
    if not re.match('^[a-zA-Z0-9.]{1,64}$', gntpass):
1397
      raise errors.OpExecError("ganeti password corruption detected")
1398
    f = open(constants.SSL_CERT_FILE)
1399
    try:
1400
      gntpem = f.read(8192)
1401
    finally:
1402
      f.close()
1403
    # in the base64 pem encoding, neither '!' nor '.' are valid chars,
1404
    # so we use this to detect an invalid certificate; as long as the
1405
    # cert doesn't contain this, the here-document will be correctly
1406
    # parsed by the shell sequence below
1407
    if re.search('^!EOF\.', gntpem, re.MULTILINE):
1408
      raise errors.OpExecError("invalid PEM encoding in the SSL certificate")
1409
    if not gntpem.endswith("\n"):
1410
      raise errors.OpExecError("PEM must end with newline")
1411
    logger.Info("copy cluster pass to %s and starting the node daemon" % node)
1412

    
1413
    # and then connect with ssh to set password and start ganeti-noded
1414
    # note that all the below variables are sanitized at this point,
1415
    # either by being constants or by the checks above
1416
    ss = self.sstore
1417
    mycommand = ("umask 077 && "
1418
                 "echo '%s' > '%s' && "
1419
                 "cat > '%s' << '!EOF.' && \n"
1420
                 "%s!EOF.\n%s restart" %
1421
                 (gntpass, ss.KeyToFilename(ss.SS_NODED_PASS),
1422
                  constants.SSL_CERT_FILE, gntpem,
1423
                  constants.NODE_INITD_SCRIPT))
1424

    
1425
    result = ssh.SSHCall(node, 'root', mycommand, batch=False, ask_key=True)
1426
    if result.failed:
1427
      raise errors.OpExecError("Remote command on node %s, error: %s,"
1428
                               " output: %s" %
1429
                               (node, result.fail_reason, result.output))
1430

    
1431
    # check connectivity
1432
    time.sleep(4)
1433

    
1434
    result = rpc.call_version([node])[node]
1435
    if result:
1436
      if constants.PROTOCOL_VERSION == result:
1437
        logger.Info("communication to node %s fine, sw version %s match" %
1438
                    (node, result))
1439
      else:
1440
        raise errors.OpExecError("Version mismatch master version %s,"
1441
                                 " node version %s" %
1442
                                 (constants.PROTOCOL_VERSION, result))
1443
    else:
1444
      raise errors.OpExecError("Cannot get version from the new node")
1445

    
1446
    # setup ssh on node
1447
    logger.Info("copy ssh key to node %s" % node)
1448
    keyarray = []
1449
    keyfiles = ["/etc/ssh/ssh_host_dsa_key", "/etc/ssh/ssh_host_dsa_key.pub",
1450
                "/etc/ssh/ssh_host_rsa_key", "/etc/ssh/ssh_host_rsa_key.pub",
1451
                "/root/.ssh/id_dsa", "/root/.ssh/id_dsa.pub"]
1452

    
1453
    for i in keyfiles:
1454
      f = open(i, 'r')
1455
      try:
1456
        keyarray.append(f.read())
1457
      finally:
1458
        f.close()
1459

    
1460
    result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1461
                               keyarray[3], keyarray[4], keyarray[5])
1462

    
1463
    if not result:
1464
      raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1465

    
1466
    # Add node to our /etc/hosts, and add key to known_hosts
1467
    _UpdateEtcHosts(new_node.name, new_node.primary_ip)
1468
    _UpdateKnownHosts(new_node.name, new_node.primary_ip,
1469
                      self.cfg.GetHostKey())
1470

    
1471
    if new_node.secondary_ip != new_node.primary_ip:
1472
      result = ssh.SSHCall(node, "root",
1473
                           "fping -S 127.0.0.1 -q %s" % new_node.secondary_ip)
1474
      if result.failed:
1475
        raise errors.OpExecError("Node claims it doesn't have the"
1476
                                 " secondary ip you gave (%s).\n"
1477
                                 "Please fix and re-run this command." %
1478
                                 new_node.secondary_ip)
1479

    
1480
    success, msg = ssh.VerifyNodeHostname(node)
1481
    if not success:
1482
      raise errors.OpExecError("Node '%s' claims it has a different hostname"
1483
                               " than the one the resolver gives: %s.\n"
1484
                               "Please fix and re-run this command." %
1485
                               (node, msg))
1486

    
1487
    # Distribute updated /etc/hosts and known_hosts to all nodes,
1488
    # including the node just added
1489
    myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1490
    dist_nodes = self.cfg.GetNodeList() + [node]
1491
    if myself.name in dist_nodes:
1492
      dist_nodes.remove(myself.name)
1493

    
1494
    logger.Debug("Copying hosts and known_hosts to all nodes")
1495
    for fname in ("/etc/hosts", constants.SSH_KNOWN_HOSTS_FILE):
1496
      result = rpc.call_upload_file(dist_nodes, fname)
1497
      for to_node in dist_nodes:
1498
        if not result[to_node]:
1499
          logger.Error("copy of file %s to node %s failed" %
1500
                       (fname, to_node))
1501

    
1502
    to_copy = ss.GetFileList()
1503
    for fname in to_copy:
1504
      if not ssh.CopyFileToNode(node, fname):
1505
        logger.Error("could not copy file %s to node %s" % (fname, node))
1506

    
1507
    logger.Info("adding node %s to cluster.conf" % node)
1508
    self.cfg.AddNode(new_node)
1509

    
1510

    
1511
class LUMasterFailover(LogicalUnit):
1512
  """Failover the master node to the current node.
1513

1514
  This is a special LU in that it must run on a non-master node.
1515

1516
  """
1517
  HPATH = "master-failover"
1518
  HTYPE = constants.HTYPE_CLUSTER
1519
  REQ_MASTER = False
1520
  _OP_REQP = []
1521

    
1522
  def BuildHooksEnv(self):
1523
    """Build hooks env.
1524

1525
    This will run on the new master only in the pre phase, and on all
1526
    the nodes in the post phase.
1527

1528
    """
1529
    env = {
1530
      "NEW_MASTER": self.new_master,
1531
      "OLD_MASTER": self.old_master,
1532
      }
1533
    return env, [self.new_master], self.cfg.GetNodeList()
1534

    
1535
  def CheckPrereq(self):
1536
    """Check prerequisites.
1537

1538
    This checks that we are not already the master.
1539

1540
    """
1541
    self.new_master = socket.gethostname()
1542

    
1543
    self.old_master = self.sstore.GetMasterNode()
1544

    
1545
    if self.old_master == self.new_master:
1546
      raise errors.OpPrereqError("This commands must be run on the node"
1547
                                 " where you want the new master to be.\n"
1548
                                 "%s is already the master" %
1549
                                 self.old_master)
1550

    
1551
  def Exec(self, feedback_fn):
1552
    """Failover the master node.
1553

1554
    This command, when run on a non-master node, will cause the current
1555
    master to cease being master, and the non-master to become new
1556
    master.
1557

1558
    """
1559
    #TODO: do not rely on gethostname returning the FQDN
1560
    logger.Info("setting master to %s, old master: %s" %
1561
                (self.new_master, self.old_master))
1562

    
1563
    if not rpc.call_node_stop_master(self.old_master):
1564
      logger.Error("could disable the master role on the old master"
1565
                   " %s, please disable manually" % self.old_master)
1566

    
1567
    ss = self.sstore
1568
    ss.SetKey(ss.SS_MASTER_NODE, self.new_master)
1569
    if not rpc.call_upload_file(self.cfg.GetNodeList(),
1570
                                ss.KeyToFilename(ss.SS_MASTER_NODE)):
1571
      logger.Error("could not distribute the new simple store master file"
1572
                   " to the other nodes, please check.")
1573

    
1574
    if not rpc.call_node_start_master(self.new_master):
1575
      logger.Error("could not start the master role on the new master"
1576
                   " %s, please check" % self.new_master)
1577
      feedback_fn("Error in activating the master IP on the new master,\n"
1578
                  "please fix manually.")
1579

    
1580

    
1581

    
1582
class LUQueryClusterInfo(NoHooksLU):
1583
  """Query cluster configuration.
1584

1585
  """
1586
  _OP_REQP = []
1587
  REQ_MASTER = False
1588

    
1589
  def CheckPrereq(self):
1590
    """No prerequsites needed for this LU.
1591

1592
    """
1593
    pass
1594

    
1595
  def Exec(self, feedback_fn):
1596
    """Return cluster config.
1597

1598
    """
1599
    result = {
1600
      "name": self.sstore.GetClusterName(),
1601
      "software_version": constants.RELEASE_VERSION,
1602
      "protocol_version": constants.PROTOCOL_VERSION,
1603
      "config_version": constants.CONFIG_VERSION,
1604
      "os_api_version": constants.OS_API_VERSION,
1605
      "export_version": constants.EXPORT_VERSION,
1606
      "master": self.sstore.GetMasterNode(),
1607
      "architecture": (platform.architecture()[0], platform.machine()),
1608
      }
1609

    
1610
    return result
1611

    
1612

    
1613
class LUClusterCopyFile(NoHooksLU):
1614
  """Copy file to cluster.
1615

1616
  """
1617
  _OP_REQP = ["nodes", "filename"]
1618

    
1619
  def CheckPrereq(self):
1620
    """Check prerequisites.
1621

1622
    It should check that the named file exists and that the given list
1623
    of nodes is valid.
1624

1625
    """
1626
    if not os.path.exists(self.op.filename):
1627
      raise errors.OpPrereqError("No such filename '%s'" % self.op.filename)
1628

    
1629
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1630

    
1631
  def Exec(self, feedback_fn):
1632
    """Copy a file from master to some nodes.
1633

1634
    Args:
1635
      opts - class with options as members
1636
      args - list containing a single element, the file name
1637
    Opts used:
1638
      nodes - list containing the name of target nodes; if empty, all nodes
1639

1640
    """
1641
    filename = self.op.filename
1642

    
1643
    myname = socket.gethostname()
1644

    
1645
    for node in self.nodes:
1646
      if node == myname:
1647
        continue
1648
      if not ssh.CopyFileToNode(node, filename):
1649
        logger.Error("Copy of file %s to node %s failed" % (filename, node))
1650

    
1651

    
1652
class LUDumpClusterConfig(NoHooksLU):
1653
  """Return a text-representation of the cluster-config.
1654

1655
  """
1656
  _OP_REQP = []
1657

    
1658
  def CheckPrereq(self):
1659
    """No prerequisites.
1660

1661
    """
1662
    pass
1663

    
1664
  def Exec(self, feedback_fn):
1665
    """Dump a representation of the cluster config to the standard output.
1666

1667
    """
1668
    return self.cfg.DumpConfig()
1669

    
1670

    
1671
class LURunClusterCommand(NoHooksLU):
1672
  """Run a command on some nodes.
1673

1674
  """
1675
  _OP_REQP = ["command", "nodes"]
1676

    
1677
  def CheckPrereq(self):
1678
    """Check prerequisites.
1679

1680
    It checks that the given list of nodes is valid.
1681

1682
    """
1683
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1684

    
1685
  def Exec(self, feedback_fn):
1686
    """Run a command on some nodes.
1687

1688
    """
1689
    data = []
1690
    for node in self.nodes:
1691
      result = ssh.SSHCall(node, "root", self.op.command)
1692
      data.append((node, result.output, result.exit_code))
1693

    
1694
    return data
1695

    
1696

    
1697
class LUActivateInstanceDisks(NoHooksLU):
1698
  """Bring up an instance's disks.
1699

1700
  """
1701
  _OP_REQP = ["instance_name"]
1702

    
1703
  def CheckPrereq(self):
1704
    """Check prerequisites.
1705

1706
    This checks that the instance is in the cluster.
1707

1708
    """
1709
    instance = self.cfg.GetInstanceInfo(
1710
      self.cfg.ExpandInstanceName(self.op.instance_name))
1711
    if instance is None:
1712
      raise errors.OpPrereqError("Instance '%s' not known" %
1713
                                 self.op.instance_name)
1714
    self.instance = instance
1715

    
1716

    
1717
  def Exec(self, feedback_fn):
1718
    """Activate the disks.
1719

1720
    """
1721
    disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
1722
    if not disks_ok:
1723
      raise errors.OpExecError("Cannot activate block devices")
1724

    
1725
    return disks_info
1726

    
1727

    
1728
def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
1729
  """Prepare the block devices for an instance.
1730

1731
  This sets up the block devices on all nodes.
1732

1733
  Args:
1734
    instance: a ganeti.objects.Instance object
1735
    ignore_secondaries: if true, errors on secondary nodes won't result
1736
                        in an error return from the function
1737

1738
  Returns:
1739
    false if the operation failed
1740
    list of (host, instance_visible_name, node_visible_name) if the operation
1741
         suceeded with the mapping from node devices to instance devices
1742
  """
1743
  device_info = []
1744
  disks_ok = True
1745
  for inst_disk in instance.disks:
1746
    master_result = None
1747
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1748
      cfg.SetDiskID(node_disk, node)
1749
      is_primary = node == instance.primary_node
1750
      result = rpc.call_blockdev_assemble(node, node_disk, is_primary)
1751
      if not result:
1752
        logger.Error("could not prepare block device %s on node %s (is_pri"
1753
                     "mary=%s)" % (inst_disk.iv_name, node, is_primary))
1754
        if is_primary or not ignore_secondaries:
1755
          disks_ok = False
1756
      if is_primary:
1757
        master_result = result
1758
    device_info.append((instance.primary_node, inst_disk.iv_name,
1759
                        master_result))
1760

    
1761
  return disks_ok, device_info
1762

    
1763

    
1764
def _StartInstanceDisks(cfg, instance, force):
1765
  """Start the disks of an instance.
1766

1767
  """
1768
  disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
1769
                                           ignore_secondaries=force)
1770
  if not disks_ok:
1771
    _ShutdownInstanceDisks(instance, cfg)
1772
    if force is not None and not force:
1773
      logger.Error("If the message above refers to a secondary node,"
1774
                   " you can retry the operation using '--force'.")
1775
    raise errors.OpExecError("Disk consistency error")
1776

    
1777

    
1778
class LUDeactivateInstanceDisks(NoHooksLU):
1779
  """Shutdown an instance's disks.
1780

1781
  """
1782
  _OP_REQP = ["instance_name"]
1783

    
1784
  def CheckPrereq(self):
1785
    """Check prerequisites.
1786

1787
    This checks that the instance is in the cluster.
1788

1789
    """
1790
    instance = self.cfg.GetInstanceInfo(
1791
      self.cfg.ExpandInstanceName(self.op.instance_name))
1792
    if instance is None:
1793
      raise errors.OpPrereqError("Instance '%s' not known" %
1794
                                 self.op.instance_name)
1795
    self.instance = instance
1796

    
1797
  def Exec(self, feedback_fn):
1798
    """Deactivate the disks
1799

1800
    """
1801
    instance = self.instance
1802
    ins_l = rpc.call_instance_list([instance.primary_node])
1803
    ins_l = ins_l[instance.primary_node]
1804
    if not type(ins_l) is list:
1805
      raise errors.OpExecError("Can't contact node '%s'" %
1806
                               instance.primary_node)
1807

    
1808
    if self.instance.name in ins_l:
1809
      raise errors.OpExecError("Instance is running, can't shutdown"
1810
                               " block devices.")
1811

    
1812
    _ShutdownInstanceDisks(instance, self.cfg)
1813

    
1814

    
1815
def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
1816
  """Shutdown block devices of an instance.
1817

1818
  This does the shutdown on all nodes of the instance.
1819

1820
  If the ignore_primary is false, errors on the primary node are
1821
  ignored.
1822

1823
  """
1824
  result = True
1825
  for disk in instance.disks:
1826
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
1827
      cfg.SetDiskID(top_disk, node)
1828
      if not rpc.call_blockdev_shutdown(node, top_disk):
1829
        logger.Error("could not shutdown block device %s on node %s" %
1830
                     (disk.iv_name, node))
1831
        if not ignore_primary or node != instance.primary_node:
1832
          result = False
1833
  return result
1834

    
1835

    
1836
class LUStartupInstance(LogicalUnit):
1837
  """Starts an instance.
1838

1839
  """
1840
  HPATH = "instance-start"
1841
  HTYPE = constants.HTYPE_INSTANCE
1842
  _OP_REQP = ["instance_name", "force"]
1843

    
1844
  def BuildHooksEnv(self):
1845
    """Build hooks env.
1846

1847
    This runs on master, primary and secondary nodes of the instance.
1848

1849
    """
1850
    env = {
1851
      "FORCE": self.op.force,
1852
      }
1853
    env.update(_BuildInstanceHookEnvByObject(self.instance))
1854
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
1855
          list(self.instance.secondary_nodes))
1856
    return env, nl, nl
1857

    
1858
  def CheckPrereq(self):
1859
    """Check prerequisites.
1860

1861
    This checks that the instance is in the cluster.
1862

1863
    """
1864
    instance = self.cfg.GetInstanceInfo(
1865
      self.cfg.ExpandInstanceName(self.op.instance_name))
1866
    if instance is None:
1867
      raise errors.OpPrereqError("Instance '%s' not known" %
1868
                                 self.op.instance_name)
1869

    
1870
    # check bridges existance
1871
    brlist = [nic.bridge for nic in instance.nics]
1872
    if not rpc.call_bridges_exist(instance.primary_node, brlist):
1873
      raise errors.OpPrereqError("one or more target bridges %s does not"
1874
                                 " exist on destination node '%s'" %
1875
                                 (brlist, instance.primary_node))
1876

    
1877
    self.instance = instance
1878
    self.op.instance_name = instance.name
1879

    
1880
  def Exec(self, feedback_fn):
1881
    """Start the instance.
1882

1883
    """
1884
    instance = self.instance
1885
    force = self.op.force
1886
    extra_args = getattr(self.op, "extra_args", "")
1887

    
1888
    node_current = instance.primary_node
1889

    
1890
    nodeinfo = rpc.call_node_info([node_current], self.cfg.GetVGName())
1891
    if not nodeinfo:
1892
      raise errors.OpExecError("Could not contact node %s for infos" %
1893
                               (node_current))
1894

    
1895
    freememory = nodeinfo[node_current]['memory_free']
1896
    memory = instance.memory
1897
    if memory > freememory:
1898
      raise errors.OpExecError("Not enough memory to start instance"
1899
                               " %s on node %s"
1900
                               " needed %s MiB, available %s MiB" %
1901
                               (instance.name, node_current, memory,
1902
                                freememory))
1903

    
1904
    _StartInstanceDisks(self.cfg, instance, force)
1905

    
1906
    if not rpc.call_instance_start(node_current, instance, extra_args):
1907
      _ShutdownInstanceDisks(instance, self.cfg)
1908
      raise errors.OpExecError("Could not start instance")
1909

    
1910
    self.cfg.MarkInstanceUp(instance.name)
1911

    
1912

    
1913
class LUShutdownInstance(LogicalUnit):
1914
  """Shutdown an instance.
1915

1916
  """
1917
  HPATH = "instance-stop"
1918
  HTYPE = constants.HTYPE_INSTANCE
1919
  _OP_REQP = ["instance_name"]
1920

    
1921
  def BuildHooksEnv(self):
1922
    """Build hooks env.
1923

1924
    This runs on master, primary and secondary nodes of the instance.
1925

1926
    """
1927
    env = _BuildInstanceHookEnvByObject(self.instance)
1928
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
1929
          list(self.instance.secondary_nodes))
1930
    return env, nl, nl
1931

    
1932
  def CheckPrereq(self):
1933
    """Check prerequisites.
1934

1935
    This checks that the instance is in the cluster.
1936

1937
    """
1938
    instance = self.cfg.GetInstanceInfo(
1939
      self.cfg.ExpandInstanceName(self.op.instance_name))
1940
    if instance is None:
1941
      raise errors.OpPrereqError("Instance '%s' not known" %
1942
                                 self.op.instance_name)
1943
    self.instance = instance
1944

    
1945
  def Exec(self, feedback_fn):
1946
    """Shutdown the instance.
1947

1948
    """
1949
    instance = self.instance
1950
    node_current = instance.primary_node
1951
    if not rpc.call_instance_shutdown(node_current, instance):
1952
      logger.Error("could not shutdown instance")
1953

    
1954
    self.cfg.MarkInstanceDown(instance.name)
1955
    _ShutdownInstanceDisks(instance, self.cfg)
1956

    
1957

    
1958
class LUReinstallInstance(LogicalUnit):
1959
  """Reinstall an instance.
1960

1961
  """
1962
  HPATH = "instance-reinstall"
1963
  HTYPE = constants.HTYPE_INSTANCE
1964
  _OP_REQP = ["instance_name"]
1965

    
1966
  def BuildHooksEnv(self):
1967
    """Build hooks env.
1968

1969
    This runs on master, primary and secondary nodes of the instance.
1970

1971
    """
1972
    env = _BuildInstanceHookEnvByObject(self.instance)
1973
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
1974
          list(self.instance.secondary_nodes))
1975
    return env, nl, nl
1976

    
1977
  def CheckPrereq(self):
1978
    """Check prerequisites.
1979

1980
    This checks that the instance is in the cluster and is not running.
1981

1982
    """
1983
    instance = self.cfg.GetInstanceInfo(
1984
      self.cfg.ExpandInstanceName(self.op.instance_name))
1985
    if instance is None:
1986
      raise errors.OpPrereqError("Instance '%s' not known" %
1987
                                 self.op.instance_name)
1988
    if instance.disk_template == constants.DT_DISKLESS:
1989
      raise errors.OpPrereqError("Instance '%s' has no disks" %
1990
                                 self.op.instance_name)
1991
    if instance.status != "down":
1992
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
1993
                                 self.op.instance_name)
1994
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
1995
    if remote_info:
1996
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
1997
                                 (self.op.instance_name,
1998
                                  instance.primary_node))
1999

    
2000
    self.op.os_type = getattr(self.op, "os_type", None)
2001
    if self.op.os_type is not None:
2002
      # OS verification
2003
      pnode = self.cfg.GetNodeInfo(
2004
        self.cfg.ExpandNodeName(instance.primary_node))
2005
      if pnode is None:
2006
        raise errors.OpPrereqError("Primary node '%s' is unknown" %
2007
                                   self.op.pnode)
2008
      os_obj = rpc.call_os_get([pnode.name], self.op.os_type)[pnode.name]
2009
      if not isinstance(os_obj, objects.OS):
2010
        raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2011
                                   " primary node"  % self.op.os_type)
2012

    
2013
    self.instance = instance
2014

    
2015
  def Exec(self, feedback_fn):
2016
    """Reinstall the instance.
2017

2018
    """
2019
    inst = self.instance
2020

    
2021
    if self.op.os_type is not None:
2022
      feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2023
      inst.os = self.op.os_type
2024
      self.cfg.AddInstance(inst)
2025

    
2026
    _StartInstanceDisks(self.cfg, inst, None)
2027
    try:
2028
      feedback_fn("Running the instance OS create scripts...")
2029
      if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2030
        raise errors.OpExecError("Could not install OS for instance %s "
2031
                                 "on node %s" %
2032
                                 (inst.name, inst.primary_node))
2033
    finally:
2034
      _ShutdownInstanceDisks(inst, self.cfg)
2035

    
2036

    
2037
class LURenameInstance(LogicalUnit):
2038
  """Rename an instance.
2039

2040
  """
2041
  HPATH = "instance-rename"
2042
  HTYPE = constants.HTYPE_INSTANCE
2043
  _OP_REQP = ["instance_name", "new_name"]
2044

    
2045
  def BuildHooksEnv(self):
2046
    """Build hooks env.
2047

2048
    This runs on master, primary and secondary nodes of the instance.
2049

2050
    """
2051
    env = _BuildInstanceHookEnvByObject(self.instance)
2052
    env["INSTANCE_NEW_NAME"] = self.op.new_name
2053
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2054
          list(self.instance.secondary_nodes))
2055
    return env, nl, nl
2056

    
2057
  def CheckPrereq(self):
2058
    """Check prerequisites.
2059

2060
    This checks that the instance is in the cluster and is not running.
2061

2062
    """
2063
    instance = self.cfg.GetInstanceInfo(
2064
      self.cfg.ExpandInstanceName(self.op.instance_name))
2065
    if instance is None:
2066
      raise errors.OpPrereqError("Instance '%s' not known" %
2067
                                 self.op.instance_name)
2068
    if instance.status != "down":
2069
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2070
                                 self.op.instance_name)
2071
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2072
    if remote_info:
2073
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2074
                                 (self.op.instance_name,
2075
                                  instance.primary_node))
2076
    self.instance = instance
2077

    
2078
    # new name verification
2079
    hostname1 = utils.LookupHostname(self.op.new_name)
2080
    if not hostname1:
2081
      raise errors.OpPrereqError("New instance name '%s' not found in dns" %
2082
                                 self.op.new_name)
2083

    
2084
    self.op.new_name = new_name = hostname1['hostname']
2085
    if not getattr(self.op, "ignore_ip", False):
2086
      command = ["fping", "-q", hostname1['ip']]
2087
      result = utils.RunCmd(command)
2088
      if not result.failed:
2089
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
2090
                                   (hostname1['ip'], new_name))
2091

    
2092

    
2093
  def Exec(self, feedback_fn):
2094
    """Reinstall the instance.
2095

2096
    """
2097
    inst = self.instance
2098
    old_name = inst.name
2099

    
2100
    self.cfg.RenameInstance(inst.name, self.op.new_name)
2101

    
2102
    # re-read the instance from the configuration after rename
2103
    inst = self.cfg.GetInstanceInfo(self.op.new_name)
2104

    
2105
    _StartInstanceDisks(self.cfg, inst, None)
2106
    try:
2107
      if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2108
                                          "sda", "sdb"):
2109
        msg = ("Could run OS rename script for instance %s\n"
2110
               "on node %s\n"
2111
               "(but the instance has been renamed in Ganeti)" %
2112
               (inst.name, inst.primary_node))
2113
        logger.Error(msg)
2114
    finally:
2115
      _ShutdownInstanceDisks(inst, self.cfg)
2116

    
2117

    
2118
class LURemoveInstance(LogicalUnit):
2119
  """Remove an instance.
2120

2121
  """
2122
  HPATH = "instance-remove"
2123
  HTYPE = constants.HTYPE_INSTANCE
2124
  _OP_REQP = ["instance_name"]
2125

    
2126
  def BuildHooksEnv(self):
2127
    """Build hooks env.
2128

2129
    This runs on master, primary and secondary nodes of the instance.
2130

2131
    """
2132
    env = _BuildInstanceHookEnvByObject(self.instance)
2133
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2134
          list(self.instance.secondary_nodes))
2135
    return env, nl, nl
2136

    
2137
  def CheckPrereq(self):
2138
    """Check prerequisites.
2139

2140
    This checks that the instance is in the cluster.
2141

2142
    """
2143
    instance = self.cfg.GetInstanceInfo(
2144
      self.cfg.ExpandInstanceName(self.op.instance_name))
2145
    if instance is None:
2146
      raise errors.OpPrereqError("Instance '%s' not known" %
2147
                                 self.op.instance_name)
2148
    self.instance = instance
2149

    
2150
  def Exec(self, feedback_fn):
2151
    """Remove the instance.
2152

2153
    """
2154
    instance = self.instance
2155
    logger.Info("shutting down instance %s on node %s" %
2156
                (instance.name, instance.primary_node))
2157

    
2158
    if not rpc.call_instance_shutdown(instance.primary_node, instance):
2159
      raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2160
                               (instance.name, instance.primary_node))
2161

    
2162
    logger.Info("removing block devices for instance %s" % instance.name)
2163

    
2164
    _RemoveDisks(instance, self.cfg)
2165

    
2166
    logger.Info("removing instance %s out of cluster config" % instance.name)
2167

    
2168
    self.cfg.RemoveInstance(instance.name)
2169

    
2170

    
2171
class LUQueryInstances(NoHooksLU):
2172
  """Logical unit for querying instances.
2173

2174
  """
2175
  _OP_REQP = ["output_fields", "names"]
2176

    
2177
  def CheckPrereq(self):
2178
    """Check prerequisites.
2179

2180
    This checks that the fields required are valid output fields.
2181

2182
    """
2183
    self.dynamic_fields = frozenset(["oper_state", "oper_ram"])
2184
    _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2185
                               "admin_state", "admin_ram",
2186
                               "disk_template", "ip", "mac", "bridge",
2187
                               "sda_size", "sdb_size"],
2188
                       dynamic=self.dynamic_fields,
2189
                       selected=self.op.output_fields)
2190

    
2191
    self.wanted = _GetWantedInstances(self, self.op.names)
2192

    
2193
  def Exec(self, feedback_fn):
2194
    """Computes the list of nodes and their attributes.
2195

2196
    """
2197
    instance_names = self.wanted
2198
    instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2199
                     in instance_names]
2200

    
2201
    # begin data gathering
2202

    
2203
    nodes = frozenset([inst.primary_node for inst in instance_list])
2204

    
2205
    bad_nodes = []
2206
    if self.dynamic_fields.intersection(self.op.output_fields):
2207
      live_data = {}
2208
      node_data = rpc.call_all_instances_info(nodes)
2209
      for name in nodes:
2210
        result = node_data[name]
2211
        if result:
2212
          live_data.update(result)
2213
        elif result == False:
2214
          bad_nodes.append(name)
2215
        # else no instance is alive
2216
    else:
2217
      live_data = dict([(name, {}) for name in instance_names])
2218

    
2219
    # end data gathering
2220

    
2221
    output = []
2222
    for instance in instance_list:
2223
      iout = []
2224
      for field in self.op.output_fields:
2225
        if field == "name":
2226
          val = instance.name
2227
        elif field == "os":
2228
          val = instance.os
2229
        elif field == "pnode":
2230
          val = instance.primary_node
2231
        elif field == "snodes":
2232
          val = list(instance.secondary_nodes)
2233
        elif field == "admin_state":
2234
          val = (instance.status != "down")
2235
        elif field == "oper_state":
2236
          if instance.primary_node in bad_nodes:
2237
            val = None
2238
          else:
2239
            val = bool(live_data.get(instance.name))
2240
        elif field == "admin_ram":
2241
          val = instance.memory
2242
        elif field == "oper_ram":
2243
          if instance.primary_node in bad_nodes:
2244
            val = None
2245
          elif instance.name in live_data:
2246
            val = live_data[instance.name].get("memory", "?")
2247
          else:
2248
            val = "-"
2249
        elif field == "disk_template":
2250
          val = instance.disk_template
2251
        elif field == "ip":
2252
          val = instance.nics[0].ip
2253
        elif field == "bridge":
2254
          val = instance.nics[0].bridge
2255
        elif field == "mac":
2256
          val = instance.nics[0].mac
2257
        elif field == "sda_size" or field == "sdb_size":
2258
          disk = instance.FindDisk(field[:3])
2259
          if disk is None:
2260
            val = None
2261
          else:
2262
            val = disk.size
2263
        else:
2264
          raise errors.ParameterError(field)
2265
        iout.append(val)
2266
      output.append(iout)
2267

    
2268
    return output
2269

    
2270

    
2271
class LUFailoverInstance(LogicalUnit):
2272
  """Failover an instance.
2273

2274
  """
2275
  HPATH = "instance-failover"
2276
  HTYPE = constants.HTYPE_INSTANCE
2277
  _OP_REQP = ["instance_name", "ignore_consistency"]
2278

    
2279
  def BuildHooksEnv(self):
2280
    """Build hooks env.
2281

2282
    This runs on master, primary and secondary nodes of the instance.
2283

2284
    """
2285
    env = {
2286
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2287
      }
2288
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2289
    nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2290
    return env, nl, nl
2291

    
2292
  def CheckPrereq(self):
2293
    """Check prerequisites.
2294

2295
    This checks that the instance is in the cluster.
2296

2297
    """
2298
    instance = self.cfg.GetInstanceInfo(
2299
      self.cfg.ExpandInstanceName(self.op.instance_name))
2300
    if instance is None:
2301
      raise errors.OpPrereqError("Instance '%s' not known" %
2302
                                 self.op.instance_name)
2303

    
2304
    if instance.disk_template != constants.DT_REMOTE_RAID1:
2305
      raise errors.OpPrereqError("Instance's disk layout is not"
2306
                                 " remote_raid1.")
2307

    
2308
    secondary_nodes = instance.secondary_nodes
2309
    if not secondary_nodes:
2310
      raise errors.ProgrammerError("no secondary node but using "
2311
                                   "DT_REMOTE_RAID1 template")
2312

    
2313
    # check memory requirements on the secondary node
2314
    target_node = secondary_nodes[0]
2315
    nodeinfo = rpc.call_node_info([target_node], self.cfg.GetVGName())
2316
    info = nodeinfo.get(target_node, None)
2317
    if not info:
2318
      raise errors.OpPrereqError("Cannot get current information"
2319
                                 " from node '%s'" % nodeinfo)
2320
    if instance.memory > info['memory_free']:
2321
      raise errors.OpPrereqError("Not enough memory on target node %s."
2322
                                 " %d MB available, %d MB required" %
2323
                                 (target_node, info['memory_free'],
2324
                                  instance.memory))
2325

    
2326
    # check bridge existance
2327
    brlist = [nic.bridge for nic in instance.nics]
2328
    if not rpc.call_bridges_exist(instance.primary_node, brlist):
2329
      raise errors.OpPrereqError("One or more target bridges %s does not"
2330
                                 " exist on destination node '%s'" %
2331
                                 (brlist, instance.primary_node))
2332

    
2333
    self.instance = instance
2334

    
2335
  def Exec(self, feedback_fn):
2336
    """Failover an instance.
2337

2338
    The failover is done by shutting it down on its present node and
2339
    starting it on the secondary.
2340

2341
    """
2342
    instance = self.instance
2343

    
2344
    source_node = instance.primary_node
2345
    target_node = instance.secondary_nodes[0]
2346

    
2347
    feedback_fn("* checking disk consistency between source and target")
2348
    for dev in instance.disks:
2349
      # for remote_raid1, these are md over drbd
2350
      if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2351
        if not self.op.ignore_consistency:
2352
          raise errors.OpExecError("Disk %s is degraded on target node,"
2353
                                   " aborting failover." % dev.iv_name)
2354

    
2355
    feedback_fn("* checking target node resource availability")
2356
    nodeinfo = rpc.call_node_info([target_node], self.cfg.GetVGName())
2357

    
2358
    if not nodeinfo:
2359
      raise errors.OpExecError("Could not contact target node %s." %
2360
                               target_node)
2361

    
2362
    free_memory = int(nodeinfo[target_node]['memory_free'])
2363
    memory = instance.memory
2364
    if memory > free_memory:
2365
      raise errors.OpExecError("Not enough memory to create instance %s on"
2366
                               " node %s. needed %s MiB, available %s MiB" %
2367
                               (instance.name, target_node, memory,
2368
                                free_memory))
2369

    
2370
    feedback_fn("* shutting down instance on source node")
2371
    logger.Info("Shutting down instance %s on node %s" %
2372
                (instance.name, source_node))
2373

    
2374
    if not rpc.call_instance_shutdown(source_node, instance):
2375
      logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2376
                   " anyway. Please make sure node %s is down"  %
2377
                   (instance.name, source_node, source_node))
2378

    
2379
    feedback_fn("* deactivating the instance's disks on source node")
2380
    if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2381
      raise errors.OpExecError("Can't shut down the instance's disks.")
2382

    
2383
    instance.primary_node = target_node
2384
    # distribute new instance config to the other nodes
2385
    self.cfg.AddInstance(instance)
2386

    
2387
    feedback_fn("* activating the instance's disks on target node")
2388
    logger.Info("Starting instance %s on node %s" %
2389
                (instance.name, target_node))
2390

    
2391
    disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2392
                                             ignore_secondaries=True)
2393
    if not disks_ok:
2394
      _ShutdownInstanceDisks(instance, self.cfg)
2395
      raise errors.OpExecError("Can't activate the instance's disks")
2396

    
2397
    feedback_fn("* starting the instance on the target node")
2398
    if not rpc.call_instance_start(target_node, instance, None):
2399
      _ShutdownInstanceDisks(instance, self.cfg)
2400
      raise errors.OpExecError("Could not start instance %s on node %s." %
2401
                               (instance.name, target_node))
2402

    
2403

    
2404
def _CreateBlockDevOnPrimary(cfg, node, device, info):
2405
  """Create a tree of block devices on the primary node.
2406

2407
  This always creates all devices.
2408

2409
  """
2410
  if device.children:
2411
    for child in device.children:
2412
      if not _CreateBlockDevOnPrimary(cfg, node, child, info):
2413
        return False
2414

    
2415
  cfg.SetDiskID(device, node)
2416
  new_id = rpc.call_blockdev_create(node, device, device.size, True, info)
2417
  if not new_id:
2418
    return False
2419
  if device.physical_id is None:
2420
    device.physical_id = new_id
2421
  return True
2422

    
2423

    
2424
def _CreateBlockDevOnSecondary(cfg, node, device, force, info):
2425
  """Create a tree of block devices on a secondary node.
2426

2427
  If this device type has to be created on secondaries, create it and
2428
  all its children.
2429

2430
  If not, just recurse to children keeping the same 'force' value.
2431

2432
  """
2433
  if device.CreateOnSecondary():
2434
    force = True
2435
  if device.children:
2436
    for child in device.children:
2437
      if not _CreateBlockDevOnSecondary(cfg, node, child, force, info):
2438
        return False
2439

    
2440
  if not force:
2441
    return True
2442
  cfg.SetDiskID(device, node)
2443
  new_id = rpc.call_blockdev_create(node, device, device.size, False, info)
2444
  if not new_id:
2445
    return False
2446
  if device.physical_id is None:
2447
    device.physical_id = new_id
2448
  return True
2449

    
2450

    
2451
def _GenerateUniqueNames(cfg, exts):
2452
  """Generate a suitable LV name.
2453

2454
  This will generate a logical volume name for the given instance.
2455

2456
  """
2457
  results = []
2458
  for val in exts:
2459
    new_id = cfg.GenerateUniqueID()
2460
    results.append("%s%s" % (new_id, val))
2461
  return results
2462

    
2463

    
2464
def _GenerateMDDRBDBranch(cfg, primary, secondary, size, names):
2465
  """Generate a drbd device complete with its children.
2466

2467
  """
2468
  port = cfg.AllocatePort()
2469
  vgname = cfg.GetVGName()
2470
  dev_data = objects.Disk(dev_type="lvm", size=size,
2471
                          logical_id=(vgname, names[0]))
2472
  dev_meta = objects.Disk(dev_type="lvm", size=128,
2473
                          logical_id=(vgname, names[1]))
2474
  drbd_dev = objects.Disk(dev_type="drbd", size=size,
2475
                          logical_id = (primary, secondary, port),
2476
                          children = [dev_data, dev_meta])
2477
  return drbd_dev
2478

    
2479

    
2480
def _GenerateDiskTemplate(cfg, template_name,
2481
                          instance_name, primary_node,
2482
                          secondary_nodes, disk_sz, swap_sz):
2483
  """Generate the entire disk layout for a given template type.
2484

2485
  """
2486
  #TODO: compute space requirements
2487

    
2488
  vgname = cfg.GetVGName()
2489
  if template_name == "diskless":
2490
    disks = []
2491
  elif template_name == "plain":
2492
    if len(secondary_nodes) != 0:
2493
      raise errors.ProgrammerError("Wrong template configuration")
2494

    
2495
    names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
2496
    sda_dev = objects.Disk(dev_type="lvm", size=disk_sz,
2497
                           logical_id=(vgname, names[0]),
2498
                           iv_name = "sda")
2499
    sdb_dev = objects.Disk(dev_type="lvm", size=swap_sz,
2500
                           logical_id=(vgname, names[1]),
2501
                           iv_name = "sdb")
2502
    disks = [sda_dev, sdb_dev]
2503
  elif template_name == "local_raid1":
2504
    if len(secondary_nodes) != 0:
2505
      raise errors.ProgrammerError("Wrong template configuration")
2506

    
2507

    
2508
    names = _GenerateUniqueNames(cfg, [".sda_m1", ".sda_m2",
2509
                                       ".sdb_m1", ".sdb_m2"])
2510
    sda_dev_m1 = objects.Disk(dev_type="lvm", size=disk_sz,
2511
                              logical_id=(vgname, names[0]))
2512
    sda_dev_m2 = objects.Disk(dev_type="lvm", size=disk_sz,
2513
                              logical_id=(vgname, names[1]))
2514
    md_sda_dev = objects.Disk(dev_type="md_raid1", iv_name = "sda",
2515
                              size=disk_sz,
2516
                              children = [sda_dev_m1, sda_dev_m2])
2517
    sdb_dev_m1 = objects.Disk(dev_type="lvm", size=swap_sz,
2518
                              logical_id=(vgname, names[2]))
2519
    sdb_dev_m2 = objects.Disk(dev_type="lvm", size=swap_sz,
2520
                              logical_id=(vgname, names[3]))
2521
    md_sdb_dev = objects.Disk(dev_type="md_raid1", iv_name = "sdb",
2522
                              size=swap_sz,
2523
                              children = [sdb_dev_m1, sdb_dev_m2])
2524
    disks = [md_sda_dev, md_sdb_dev]
2525
  elif template_name == constants.DT_REMOTE_RAID1:
2526
    if len(secondary_nodes) != 1:
2527
      raise errors.ProgrammerError("Wrong template configuration")
2528
    remote_node = secondary_nodes[0]
2529
    names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2530
                                       ".sdb_data", ".sdb_meta"])
2531
    drbd_sda_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2532
                                         disk_sz, names[0:2])
2533
    md_sda_dev = objects.Disk(dev_type="md_raid1", iv_name="sda",
2534
                              children = [drbd_sda_dev], size=disk_sz)
2535
    drbd_sdb_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2536
                                         swap_sz, names[2:4])
2537
    md_sdb_dev = objects.Disk(dev_type="md_raid1", iv_name="sdb",
2538
                              children = [drbd_sdb_dev], size=swap_sz)
2539
    disks = [md_sda_dev, md_sdb_dev]
2540
  else:
2541
    raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
2542
  return disks
2543

    
2544

    
2545
def _GetInstanceInfoText(instance):
2546
  """Compute that text that should be added to the disk's metadata.
2547

2548
  """
2549
  return "originstname+%s" % instance.name
2550

    
2551

    
2552
def _CreateDisks(cfg, instance):
2553
  """Create all disks for an instance.
2554

2555
  This abstracts away some work from AddInstance.
2556

2557
  Args:
2558
    instance: the instance object
2559

2560
  Returns:
2561
    True or False showing the success of the creation process
2562

2563
  """
2564
  info = _GetInstanceInfoText(instance)
2565

    
2566
  for device in instance.disks:
2567
    logger.Info("creating volume %s for instance %s" %
2568
              (device.iv_name, instance.name))
2569
    #HARDCODE
2570
    for secondary_node in instance.secondary_nodes:
2571
      if not _CreateBlockDevOnSecondary(cfg, secondary_node, device, False,
2572
                                        info):
2573
        logger.Error("failed to create volume %s (%s) on secondary node %s!" %
2574
                     (device.iv_name, device, secondary_node))
2575
        return False
2576
    #HARDCODE
2577
    if not _CreateBlockDevOnPrimary(cfg, instance.primary_node, device, info):
2578
      logger.Error("failed to create volume %s on primary!" %
2579
                   device.iv_name)
2580
      return False
2581
  return True
2582

    
2583

    
2584
def _RemoveDisks(instance, cfg):
2585
  """Remove all disks for an instance.
2586

2587
  This abstracts away some work from `AddInstance()` and
2588
  `RemoveInstance()`. Note that in case some of the devices couldn't
2589
  be remove, the removal will continue with the other ones (compare
2590
  with `_CreateDisks()`).
2591

2592
  Args:
2593
    instance: the instance object
2594

2595
  Returns:
2596
    True or False showing the success of the removal proces
2597

2598
  """
2599
  logger.Info("removing block devices for instance %s" % instance.name)
2600

    
2601
  result = True
2602
  for device in instance.disks:
2603
    for node, disk in device.ComputeNodeTree(instance.primary_node):
2604
      cfg.SetDiskID(disk, node)
2605
      if not rpc.call_blockdev_remove(node, disk):
2606
        logger.Error("could not remove block device %s on node %s,"
2607
                     " continuing anyway" %
2608
                     (device.iv_name, node))
2609
        result = False
2610
  return result
2611

    
2612

    
2613
class LUCreateInstance(LogicalUnit):
2614
  """Create an instance.
2615

2616
  """
2617
  HPATH = "instance-add"
2618
  HTYPE = constants.HTYPE_INSTANCE
2619
  _OP_REQP = ["instance_name", "mem_size", "disk_size", "pnode",
2620
              "disk_template", "swap_size", "mode", "start", "vcpus",
2621
              "wait_for_sync"]
2622

    
2623
  def BuildHooksEnv(self):
2624
    """Build hooks env.
2625

2626
    This runs on master, primary and secondary nodes of the instance.
2627

2628
    """
2629
    env = {
2630
      "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
2631
      "INSTANCE_DISK_SIZE": self.op.disk_size,
2632
      "INSTANCE_SWAP_SIZE": self.op.swap_size,
2633
      "INSTANCE_ADD_MODE": self.op.mode,
2634
      }
2635
    if self.op.mode == constants.INSTANCE_IMPORT:
2636
      env["INSTANCE_SRC_NODE"] = self.op.src_node
2637
      env["INSTANCE_SRC_PATH"] = self.op.src_path
2638
      env["INSTANCE_SRC_IMAGE"] = self.src_image
2639

    
2640
    env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
2641
      primary_node=self.op.pnode,
2642
      secondary_nodes=self.secondaries,
2643
      status=self.instance_status,
2644
      os_type=self.op.os_type,
2645
      memory=self.op.mem_size,
2646
      vcpus=self.op.vcpus,
2647
      nics=[(self.inst_ip, self.op.bridge)],
2648
    ))
2649

    
2650
    nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
2651
          self.secondaries)
2652
    return env, nl, nl
2653

    
2654

    
2655
  def CheckPrereq(self):
2656
    """Check prerequisites.
2657

2658
    """
2659
    if self.op.mode not in (constants.INSTANCE_CREATE,
2660
                            constants.INSTANCE_IMPORT):
2661
      raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
2662
                                 self.op.mode)
2663

    
2664
    if self.op.mode == constants.INSTANCE_IMPORT:
2665
      src_node = getattr(self.op, "src_node", None)
2666
      src_path = getattr(self.op, "src_path", None)
2667
      if src_node is None or src_path is None:
2668
        raise errors.OpPrereqError("Importing an instance requires source"
2669
                                   " node and path options")
2670
      src_node_full = self.cfg.ExpandNodeName(src_node)
2671
      if src_node_full is None:
2672
        raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
2673
      self.op.src_node = src_node = src_node_full
2674

    
2675
      if not os.path.isabs(src_path):
2676
        raise errors.OpPrereqError("The source path must be absolute")
2677

    
2678
      export_info = rpc.call_export_info(src_node, src_path)
2679

    
2680
      if not export_info:
2681
        raise errors.OpPrereqError("No export found in dir %s" % src_path)
2682

    
2683
      if not export_info.has_section(constants.INISECT_EXP):
2684
        raise errors.ProgrammerError("Corrupted export config")
2685

    
2686
      ei_version = export_info.get(constants.INISECT_EXP, 'version')
2687
      if (int(ei_version) != constants.EXPORT_VERSION):
2688
        raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
2689
                                   (ei_version, constants.EXPORT_VERSION))
2690

    
2691
      if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
2692
        raise errors.OpPrereqError("Can't import instance with more than"
2693
                                   " one data disk")
2694

    
2695
      # FIXME: are the old os-es, disk sizes, etc. useful?
2696
      self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
2697
      diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
2698
                                                         'disk0_dump'))
2699
      self.src_image = diskimage
2700
    else: # INSTANCE_CREATE
2701
      if getattr(self.op, "os_type", None) is None:
2702
        raise errors.OpPrereqError("No guest OS specified")
2703

    
2704
    # check primary node
2705
    pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
2706
    if pnode is None:
2707
      raise errors.OpPrereqError("Primary node '%s' is unknown" %
2708
                                 self.op.pnode)
2709
    self.op.pnode = pnode.name
2710
    self.pnode = pnode
2711
    self.secondaries = []
2712
    # disk template and mirror node verification
2713
    if self.op.disk_template not in constants.DISK_TEMPLATES:
2714
      raise errors.OpPrereqError("Invalid disk template name")
2715

    
2716
    if self.op.disk_template == constants.DT_REMOTE_RAID1:
2717
      if getattr(self.op, "snode", None) is None:
2718
        raise errors.OpPrereqError("The 'remote_raid1' disk template needs"
2719
                                   " a mirror node")
2720

    
2721
      snode_name = self.cfg.ExpandNodeName(self.op.snode)
2722
      if snode_name is None:
2723
        raise errors.OpPrereqError("Unknown secondary node '%s'" %
2724
                                   self.op.snode)
2725
      elif snode_name == pnode.name:
2726
        raise errors.OpPrereqError("The secondary node cannot be"
2727
                                   " the primary node.")
2728
      self.secondaries.append(snode_name)
2729

    
2730
    # Check lv size requirements
2731
    nodenames = [pnode.name] + self.secondaries
2732
    nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
2733

    
2734
    # Required free disk space as a function of disk and swap space
2735
    req_size_dict = {
2736
      constants.DT_DISKLESS: 0,
2737
      constants.DT_PLAIN: self.op.disk_size + self.op.swap_size,
2738
      constants.DT_LOCAL_RAID1: (self.op.disk_size + self.op.swap_size) * 2,
2739
      # 256 MB are added for drbd metadata, 128MB for each drbd device
2740
      constants.DT_REMOTE_RAID1: self.op.disk_size + self.op.swap_size + 256,
2741
    }
2742

    
2743
    if self.op.disk_template not in req_size_dict:
2744
      raise errors.ProgrammerError("Disk template '%s' size requirement"
2745
                                   " is unknown" %  self.op.disk_template)
2746

    
2747
    req_size = req_size_dict[self.op.disk_template]
2748

    
2749
    for node in nodenames:
2750
      info = nodeinfo.get(node, None)
2751
      if not info:
2752
        raise errors.OpPrereqError("Cannot get current information"
2753
                                   " from node '%s'" % nodeinfo)
2754
      if req_size > info['vg_free']:
2755
        raise errors.OpPrereqError("Not enough disk space on target node %s."
2756
                                   " %d MB available, %d MB required" %
2757
                                   (node, info['vg_free'], req_size))
2758

    
2759
    # os verification
2760
    os_obj = rpc.call_os_get([pnode.name], self.op.os_type)[pnode.name]
2761
    if not isinstance(os_obj, objects.OS):
2762
      raise errors.OpPrereqError("OS '%s' not in supported os list for"
2763
                                 " primary node"  % self.op.os_type)
2764

    
2765
    # instance verification
2766
    hostname1 = utils.LookupHostname(self.op.instance_name)
2767
    if not hostname1:
2768
      raise errors.OpPrereqError("Instance name '%s' not found in dns" %
2769
                                 self.op.instance_name)
2770

    
2771
    self.op.instance_name = instance_name = hostname1['hostname']
2772
    instance_list = self.cfg.GetInstanceList()
2773
    if instance_name in instance_list:
2774
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2775
                                 instance_name)
2776

    
2777
    ip = getattr(self.op, "ip", None)
2778
    if ip is None or ip.lower() == "none":
2779
      inst_ip = None
2780
    elif ip.lower() == "auto":
2781
      inst_ip = hostname1['ip']
2782
    else:
2783
      if not utils.IsValidIP(ip):
2784
        raise errors.OpPrereqError("given IP address '%s' doesn't look"
2785
                                   " like a valid IP" % ip)
2786
      inst_ip = ip
2787
    self.inst_ip = inst_ip
2788

    
2789
    command = ["fping", "-q", hostname1['ip']]
2790
    result = utils.RunCmd(command)
2791
    if not result.failed:
2792
      raise errors.OpPrereqError("IP %s of instance %s already in use" %
2793
                                 (hostname1['ip'], instance_name))
2794

    
2795
    # bridge verification
2796
    bridge = getattr(self.op, "bridge", None)
2797
    if bridge is None:
2798
      self.op.bridge = self.cfg.GetDefBridge()
2799
    else:
2800
      self.op.bridge = bridge
2801

    
2802
    if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
2803
      raise errors.OpPrereqError("target bridge '%s' does not exist on"
2804
                                 " destination node '%s'" %
2805
                                 (self.op.bridge, pnode.name))
2806

    
2807
    if self.op.start:
2808
      self.instance_status = 'up'
2809
    else:
2810
      self.instance_status = 'down'
2811

    
2812
  def Exec(self, feedback_fn):
2813
    """Create and add the instance to the cluster.
2814

2815
    """
2816
    instance = self.op.instance_name
2817
    pnode_name = self.pnode.name
2818

    
2819
    nic = objects.NIC(bridge=self.op.bridge, mac=self.cfg.GenerateMAC())
2820
    if self.inst_ip is not None:
2821
      nic.ip = self.inst_ip
2822

    
2823
    disks = _GenerateDiskTemplate(self.cfg,
2824
                                  self.op.disk_template,
2825
                                  instance, pnode_name,
2826
                                  self.secondaries, self.op.disk_size,
2827
                                  self.op.swap_size)
2828

    
2829
    iobj = objects.Instance(name=instance, os=self.op.os_type,
2830
                            primary_node=pnode_name,
2831
                            memory=self.op.mem_size,
2832
                            vcpus=self.op.vcpus,
2833
                            nics=[nic], disks=disks,
2834
                            disk_template=self.op.disk_template,
2835
                            status=self.instance_status,
2836
                            )
2837

    
2838
    feedback_fn("* creating instance disks...")
2839
    if not _CreateDisks(self.cfg, iobj):
2840
      _RemoveDisks(iobj, self.cfg)
2841
      raise errors.OpExecError("Device creation failed, reverting...")
2842

    
2843
    feedback_fn("adding instance %s to cluster config" % instance)
2844

    
2845
    self.cfg.AddInstance(iobj)
2846

    
2847
    if self.op.wait_for_sync:
2848
      disk_abort = not _WaitForSync(self.cfg, iobj)
2849
    elif iobj.disk_template == constants.DT_REMOTE_RAID1:
2850
      # make sure the disks are not degraded (still sync-ing is ok)
2851
      time.sleep(15)
2852
      feedback_fn("* checking mirrors status")
2853
      disk_abort = not _WaitForSync(self.cfg, iobj, oneshot=True)
2854
    else:
2855
      disk_abort = False
2856

    
2857
    if disk_abort:
2858
      _RemoveDisks(iobj, self.cfg)
2859
      self.cfg.RemoveInstance(iobj.name)
2860
      raise errors.OpExecError("There are some degraded disks for"
2861
                               " this instance")
2862

    
2863
    feedback_fn("creating os for instance %s on node %s" %
2864
                (instance, pnode_name))
2865

    
2866
    if iobj.disk_template != constants.DT_DISKLESS:
2867
      if self.op.mode == constants.INSTANCE_CREATE:
2868
        feedback_fn("* running the instance OS create scripts...")
2869
        if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
2870
          raise errors.OpExecError("could not add os for instance %s"
2871
                                   " on node %s" %
2872
                                   (instance, pnode_name))
2873

    
2874
      elif self.op.mode == constants.INSTANCE_IMPORT:
2875
        feedback_fn("* running the instance OS import scripts...")
2876
        src_node = self.op.src_node
2877
        src_image = self.src_image
2878
        if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
2879
                                                src_node, src_image):
2880
          raise errors.OpExecError("Could not import os for instance"
2881
                                   " %s on node %s" %
2882
                                   (instance, pnode_name))
2883
      else:
2884
        # also checked in the prereq part
2885
        raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
2886
                                     % self.op.mode)
2887

    
2888
    if self.op.start:
2889
      logger.Info("starting instance %s on node %s" % (instance, pnode_name))
2890
      feedback_fn("* starting instance...")
2891
      if not rpc.call_instance_start(pnode_name, iobj, None):
2892
        raise errors.OpExecError("Could not start instance")
2893

    
2894

    
2895
class LUConnectConsole(NoHooksLU):
2896
  """Connect to an instance's console.
2897

2898
  This is somewhat special in that it returns the command line that
2899
  you need to run on the master node in order to connect to the
2900
  console.
2901

2902
  """
2903
  _OP_REQP = ["instance_name"]
2904

    
2905
  def CheckPrereq(self):
2906
    """Check prerequisites.
2907

2908
    This checks that the instance is in the cluster.
2909

2910
    """
2911
    instance = self.cfg.GetInstanceInfo(
2912
      self.cfg.ExpandInstanceName(self.op.instance_name))
2913
    if instance is None:
2914
      raise errors.OpPrereqError("Instance '%s' not known" %
2915
                                 self.op.instance_name)
2916
    self.instance = instance
2917

    
2918
  def Exec(self, feedback_fn):
2919
    """Connect to the console of an instance
2920

2921
    """
2922
    instance = self.instance
2923
    node = instance.primary_node
2924

    
2925
    node_insts = rpc.call_instance_list([node])[node]
2926
    if node_insts is False:
2927
      raise errors.OpExecError("Can't connect to node %s." % node)
2928

    
2929
    if instance.name not in node_insts:
2930
      raise errors.OpExecError("Instance %s is not running." % instance.name)
2931

    
2932
    logger.Debug("connecting to console of %s on %s" % (instance.name, node))
2933

    
2934
    hyper = hypervisor.GetHypervisor()
2935
    console_cmd = hyper.GetShellCommandForConsole(instance.name)
2936
    # build ssh cmdline
2937
    argv = ["ssh", "-q", "-t"]
2938
    argv.extend(ssh.KNOWN_HOSTS_OPTS)
2939
    argv.extend(ssh.BATCH_MODE_OPTS)
2940
    argv.append(node)
2941
    argv.append(console_cmd)
2942
    return "ssh", argv
2943

    
2944

    
2945
class LUAddMDDRBDComponent(LogicalUnit):
2946
  """Adda new mirror member to an instance's disk.
2947

2948
  """
2949
  HPATH = "mirror-add"
2950
  HTYPE = constants.HTYPE_INSTANCE
2951
  _OP_REQP = ["instance_name", "remote_node", "disk_name"]
2952

    
2953
  def BuildHooksEnv(self):
2954
    """Build hooks env.
2955

2956
    This runs on the master, the primary and all the secondaries.
2957

2958
    """
2959
    env = {
2960
      "NEW_SECONDARY": self.op.remote_node,
2961
      "DISK_NAME": self.op.disk_name,
2962
      }
2963
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2964
    nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
2965
          self.op.remote_node,] + list(self.instance.secondary_nodes)
2966
    return env, nl, nl
2967

    
2968
  def CheckPrereq(self):
2969
    """Check prerequisites.
2970

2971
    This checks that the instance is in the cluster.
2972

2973
    """
2974
    instance = self.cfg.GetInstanceInfo(
2975
      self.cfg.ExpandInstanceName(self.op.instance_name))
2976
    if instance is None:
2977
      raise errors.OpPrereqError("Instance '%s' not known" %
2978
                                 self.op.instance_name)
2979
    self.instance = instance
2980

    
2981
    remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
2982
    if remote_node is None:
2983
      raise errors.OpPrereqError("Node '%s' not known" % self.op.remote_node)
2984
    self.remote_node = remote_node
2985

    
2986
    if remote_node == instance.primary_node:
2987
      raise errors.OpPrereqError("The specified node is the primary node of"
2988
                                 " the instance.")
2989

    
2990
    if instance.disk_template != constants.DT_REMOTE_RAID1:
2991
      raise errors.OpPrereqError("Instance's disk layout is not"
2992
                                 " remote_raid1.")
2993
    for disk in instance.disks:
2994
      if disk.iv_name == self.op.disk_name:
2995
        break
2996
    else:
2997
      raise errors.OpPrereqError("Can't find this device ('%s') in the"
2998
                                 " instance." % self.op.disk_name)
2999
    if len(disk.children) > 1:
3000
      raise errors.OpPrereqError("The device already has two slave"
3001
                                 " devices.\n"
3002
                                 "This would create a 3-disk raid1"
3003
                                 " which we don't allow.")
3004
    self.disk = disk
3005

    
3006
  def Exec(self, feedback_fn):
3007
    """Add the mirror component
3008

3009
    """
3010
    disk = self.disk
3011
    instance = self.instance
3012

    
3013
    remote_node = self.remote_node
3014
    lv_names = [".%s_%s" % (disk.iv_name, suf) for suf in ["data", "meta"]]
3015
    names = _GenerateUniqueNames(self.cfg, lv_names)
3016
    new_drbd = _GenerateMDDRBDBranch(self.cfg, instance.primary_node,
3017
                                     remote_node, disk.size, names)
3018

    
3019
    logger.Info("adding new mirror component on secondary")
3020
    #HARDCODE
3021
    if not _CreateBlockDevOnSecondary(self.cfg, remote_node, new_drbd, False,
3022
                                      _GetInstanceInfoText(instance)):
3023
      raise errors.OpExecError("Failed to create new component on secondary"
3024
                               " node %s" % remote_node)
3025

    
3026
    logger.Info("adding new mirror component on primary")
3027
    #HARDCODE
3028
    if not _CreateBlockDevOnPrimary(self.cfg, instance.primary_node, new_drbd,
3029
                                    _GetInstanceInfoText(instance)):
3030
      # remove secondary dev
3031
      self.cfg.SetDiskID(new_drbd, remote_node)
3032
      rpc.call_blockdev_remove(remote_node, new_drbd)
3033
      raise errors.OpExecError("Failed to create volume on primary")
3034

    
3035
    # the device exists now
3036
    # call the primary node to add the mirror to md
3037
    logger.Info("adding new mirror component to md")
3038
    if not rpc.call_blockdev_addchild(instance.primary_node,
3039
                                           disk, new_drbd):
3040
      logger.Error("Can't add mirror compoment to md!")
3041
      self.cfg.SetDiskID(new_drbd, remote_node)
3042
      if not rpc.call_blockdev_remove(remote_node, new_drbd):
3043
        logger.Error("Can't rollback on secondary")
3044
      self.cfg.SetDiskID(new_drbd, instance.primary_node)
3045
      if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3046
        logger.Error("Can't rollback on primary")
3047
      raise errors.OpExecError("Can't add mirror component to md array")
3048

    
3049
    disk.children.append(new_drbd)
3050

    
3051
    self.cfg.AddInstance(instance)
3052

    
3053
    _WaitForSync(self.cfg, instance)
3054

    
3055
    return 0
3056

    
3057

    
3058
class LURemoveMDDRBDComponent(LogicalUnit):
3059
  """Remove a component from a remote_raid1 disk.
3060

3061
  """
3062
  HPATH = "mirror-remove"
3063
  HTYPE = constants.HTYPE_INSTANCE
3064
  _OP_REQP = ["instance_name", "disk_name", "disk_id"]
3065

    
3066
  def BuildHooksEnv(self):
3067
    """Build hooks env.
3068

3069
    This runs on the master, the primary and all the secondaries.
3070

3071
    """
3072
    env = {
3073
      "DISK_NAME": self.op.disk_name,
3074
      "DISK_ID": self.op.disk_id,
3075
      "OLD_SECONDARY": self.old_secondary,
3076
      }
3077
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3078
    nl = [self.sstore.GetMasterNode(),
3079
          self.instance.primary_node] + list(self.instance.secondary_nodes)
3080
    return env, nl, nl
3081

    
3082
  def CheckPrereq(self):
3083
    """Check prerequisites.
3084

3085
    This checks that the instance is in the cluster.
3086

3087
    """
3088
    instance = self.cfg.GetInstanceInfo(
3089
      self.cfg.ExpandInstanceName(self.op.instance_name))
3090
    if instance is None:
3091
      raise errors.OpPrereqError("Instance '%s' not known" %
3092
                                 self.op.instance_name)
3093
    self.instance = instance
3094

    
3095
    if instance.disk_template != constants.DT_REMOTE_RAID1:
3096
      raise errors.OpPrereqError("Instance's disk layout is not"
3097
                                 " remote_raid1.")
3098
    for disk in instance.disks:
3099
      if disk.iv_name == self.op.disk_name:
3100
        break
3101
    else:
3102
      raise errors.OpPrereqError("Can't find this device ('%s') in the"
3103
                                 " instance." % self.op.disk_name)
3104
    for child in disk.children:
3105
      if child.dev_type == "drbd" and child.logical_id[2] == self.op.disk_id:
3106
        break
3107
    else:
3108
      raise errors.OpPrereqError("Can't find the device with this port.")
3109

    
3110
    if len(disk.children) < 2:
3111
      raise errors.OpPrereqError("Cannot remove the last component from"
3112
                                 " a mirror.")
3113
    self.disk = disk
3114
    self.child = child
3115
    if self.child.logical_id[0] == instance.primary_node:
3116
      oid = 1
3117
    else:
3118
      oid = 0
3119
    self.old_secondary = self.child.logical_id[oid]
3120

    
3121
  def Exec(self, feedback_fn):
3122
    """Remove the mirror component
3123

3124
    """
3125
    instance = self.instance
3126
    disk = self.disk
3127
    child = self.child
3128
    logger.Info("remove mirror component")
3129
    self.cfg.SetDiskID(disk, instance.primary_node)
3130
    if not rpc.call_blockdev_removechild(instance.primary_node,
3131
                                              disk, child):
3132
      raise errors.OpExecError("Can't remove child from mirror.")
3133

    
3134
    for node in child.logical_id[:2]:
3135
      self.cfg.SetDiskID(child, node)
3136
      if not rpc.call_blockdev_remove(node, child):
3137
        logger.Error("Warning: failed to remove device from node %s,"
3138
                     " continuing operation." % node)
3139

    
3140
    disk.children.remove(child)
3141
    self.cfg.AddInstance(instance)
3142

    
3143

    
3144
class LUReplaceDisks(LogicalUnit):
3145
  """Replace the disks of an instance.
3146

3147
  """
3148
  HPATH = "mirrors-replace"
3149
  HTYPE = constants.HTYPE_INSTANCE
3150
  _OP_REQP = ["instance_name"]
3151

    
3152
  def BuildHooksEnv(self):
3153
    """Build hooks env.
3154

3155
    This runs on the master, the primary and all the secondaries.
3156

3157
    """
3158
    env = {
3159
      "NEW_SECONDARY": self.op.remote_node,
3160
      "OLD_SECONDARY": self.instance.secondary_nodes[0],
3161
      }
3162
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3163
    nl = [self.sstore.GetMasterNode(),
3164
          self.instance.primary_node] + list(self.instance.secondary_nodes)
3165
    return env, nl, nl
3166

    
3167
  def CheckPrereq(self):
3168
    """Check prerequisites.
3169

3170
    This checks that the instance is in the cluster.
3171

3172
    """
3173
    instance = self.cfg.GetInstanceInfo(
3174
      self.cfg.ExpandInstanceName(self.op.instance_name))
3175
    if instance is None:
3176
      raise errors.OpPrereqError("Instance '%s' not known" %
3177
                                 self.op.instance_name)
3178
    self.instance = instance
3179

    
3180
    if instance.disk_template != constants.DT_REMOTE_RAID1:
3181
      raise errors.OpPrereqError("Instance's disk layout is not"
3182
                                 " remote_raid1.")
3183

    
3184
    if len(instance.secondary_nodes) != 1:
3185
      raise errors.OpPrereqError("The instance has a strange layout,"
3186
                                 " expected one secondary but found %d" %
3187
                                 len(instance.secondary_nodes))
3188

    
3189
    remote_node = getattr(self.op, "remote_node", None)
3190
    if remote_node is None:
3191
      remote_node = instance.secondary_nodes[0]
3192
    else:
3193
      remote_node = self.cfg.ExpandNodeName(remote_node)
3194
      if remote_node is None:
3195
        raise errors.OpPrereqError("Node '%s' not known" %
3196
                                   self.op.remote_node)
3197
    if remote_node == instance.primary_node:
3198
      raise errors.OpPrereqError("The specified node is the primary node of"
3199
                                 " the instance.")
3200
    self.op.remote_node = remote_node
3201

    
3202
  def Exec(self, feedback_fn):
3203
    """Replace the disks of an instance.
3204

3205
    """
3206
    instance = self.instance
3207
    iv_names = {}
3208
    # start of work
3209
    remote_node = self.op.remote_node
3210
    cfg = self.cfg
3211
    for dev in instance.disks:
3212
      size = dev.size
3213
      lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3214
      names = _GenerateUniqueNames(cfg, lv_names)
3215
      new_drbd = _GenerateMDDRBDBranch(cfg, instance.primary_node,
3216
                                       remote_node, size, names)
3217
      iv_names[dev.iv_name] = (dev, dev.children[0], new_drbd)
3218
      logger.Info("adding new mirror component on secondary for %s" %
3219
                  dev.iv_name)
3220
      #HARDCODE
3221
      if not _CreateBlockDevOnSecondary(cfg, remote_node, new_drbd, False,
3222
                                        _GetInstanceInfoText(instance)):
3223
        raise errors.OpExecError("Failed to create new component on"
3224
                                 " secondary node %s\n"
3225
                                 "Full abort, cleanup manually!" %
3226
                                 remote_node)
3227

    
3228
      logger.Info("adding new mirror component on primary")
3229
      #HARDCODE
3230
      if not _CreateBlockDevOnPrimary(cfg, instance.primary_node, new_drbd,
3231
                                      _GetInstanceInfoText(instance)):
3232
        # remove secondary dev
3233
        cfg.SetDiskID(new_drbd, remote_node)
3234
        rpc.call_blockdev_remove(remote_node, new_drbd)
3235
        raise errors.OpExecError("Failed to create volume on primary!\n"
3236
                                 "Full abort, cleanup manually!!")
3237

    
3238
      # the device exists now
3239
      # call the primary node to add the mirror to md
3240
      logger.Info("adding new mirror component to md")
3241
      if not rpc.call_blockdev_addchild(instance.primary_node, dev,
3242
                                        new_drbd):
3243
        logger.Error("Can't add mirror compoment to md!")
3244
        cfg.SetDiskID(new_drbd, remote_node)
3245
        if not rpc.call_blockdev_remove(remote_node, new_drbd):
3246
          logger.Error("Can't rollback on secondary")
3247
        cfg.SetDiskID(new_drbd, instance.primary_node)
3248
        if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3249
          logger.Error("Can't rollback on primary")
3250
        raise errors.OpExecError("Full abort, cleanup manually!!")
3251

    
3252
      dev.children.append(new_drbd)
3253
      cfg.AddInstance(instance)
3254

    
3255
    # this can fail as the old devices are degraded and _WaitForSync
3256
    # does a combined result over all disks, so we don't check its
3257
    # return value
3258
    _WaitForSync(cfg, instance, unlock=True)
3259

    
3260
    # so check manually all the devices
3261
    for name in iv_names:
3262
      dev, child, new_drbd = iv_names[name]
3263
      cfg.SetDiskID(dev, instance.primary_node)
3264
      is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3265
      if is_degr:
3266
        raise errors.OpExecError("MD device %s is degraded!" % name)
3267
      cfg.SetDiskID(new_drbd, instance.primary_node)
3268
      is_degr = rpc.call_blockdev_find(instance.primary_node, new_drbd)[5]
3269
      if is_degr:
3270
        raise errors.OpExecError("New drbd device %s is degraded!" % name)
3271

    
3272
    for name in iv_names:
3273
      dev, child, new_drbd = iv_names[name]
3274
      logger.Info("remove mirror %s component" % name)
3275
      cfg.SetDiskID(dev, instance.primary_node)
3276
      if not rpc.call_blockdev_removechild(instance.primary_node,
3277
                                                dev, child):
3278
        logger.Error("Can't remove child from mirror, aborting"
3279
                     " *this device cleanup*.\nYou need to cleanup manually!!")
3280
        continue
3281

    
3282
      for node in child.logical_id[:2]:
3283
        logger.Info("remove child device on %s" % node)
3284
        cfg.SetDiskID(child, node)
3285
        if not rpc.call_blockdev_remove(node, child):
3286
          logger.Error("Warning: failed to remove device from node %s,"
3287
                       " continuing operation." % node)
3288

    
3289
      dev.children.remove(child)
3290

    
3291
      cfg.AddInstance(instance)
3292

    
3293

    
3294
class LUQueryInstanceData(NoHooksLU):
3295
  """Query runtime instance data.
3296

3297
  """
3298
  _OP_REQP = ["instances"]
3299

    
3300
  def CheckPrereq(self):
3301
    """Check prerequisites.
3302

3303
    This only checks the optional instance list against the existing names.
3304

3305
    """
3306
    if not isinstance(self.op.instances, list):
3307
      raise errors.OpPrereqError("Invalid argument type 'instances'")
3308
    if self.op.instances:
3309
      self.wanted_instances = []
3310
      names = self.op.instances
3311
      for name in names:
3312
        instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
3313
        if instance is None:
3314
          raise errors.OpPrereqError("No such instance name '%s'" % name)
3315
      self.wanted_instances.append(instance)
3316
    else:
3317
      self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
3318
                               in self.cfg.GetInstanceList()]
3319
    return
3320

    
3321

    
3322
  def _ComputeDiskStatus(self, instance, snode, dev):
3323
    """Compute block device status.
3324

3325
    """
3326
    self.cfg.SetDiskID(dev, instance.primary_node)
3327
    dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
3328
    if dev.dev_type == "drbd":
3329
      # we change the snode then (otherwise we use the one passed in)
3330
      if dev.logical_id[0] == instance.primary_node:
3331
        snode = dev.logical_id[1]
3332
      else:
3333
        snode = dev.logical_id[0]
3334

    
3335
    if snode:
3336
      self.cfg.SetDiskID(dev, snode)
3337
      dev_sstatus = rpc.call_blockdev_find(snode, dev)
3338
    else:
3339
      dev_sstatus = None
3340

    
3341
    if dev.children:
3342
      dev_children = [self._ComputeDiskStatus(instance, snode, child)
3343
                      for child in dev.children]
3344
    else:
3345
      dev_children = []
3346

    
3347
    data = {
3348
      "iv_name": dev.iv_name,
3349
      "dev_type": dev.dev_type,
3350
      "logical_id": dev.logical_id,
3351
      "physical_id": dev.physical_id,
3352
      "pstatus": dev_pstatus,
3353
      "sstatus": dev_sstatus,
3354
      "children": dev_children,
3355
      }
3356

    
3357
    return data
3358

    
3359
  def Exec(self, feedback_fn):
3360
    """Gather and return data"""
3361
    result = {}
3362
    for instance in self.wanted_instances:
3363
      remote_info = rpc.call_instance_info(instance.primary_node,
3364
                                                instance.name)
3365
      if remote_info and "state" in remote_info:
3366
        remote_state = "up"
3367
      else:
3368
        remote_state = "down"
3369
      if instance.status == "down":
3370
        config_state = "down"
3371
      else:
3372
        config_state = "up"
3373

    
3374
      disks = [self._ComputeDiskStatus(instance, None, device)
3375
               for device in instance.disks]
3376

    
3377
      idict = {
3378
        "name": instance.name,
3379
        "config_state": config_state,
3380
        "run_state": remote_state,
3381
        "pnode": instance.primary_node,
3382
        "snodes": instance.secondary_nodes,
3383
        "os": instance.os,
3384
        "memory": instance.memory,
3385
        "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
3386
        "disks": disks,
3387
        }
3388

    
3389
      result[instance.name] = idict
3390

    
3391
    return result
3392

    
3393

    
3394
class LUSetInstanceParms(LogicalUnit):
3395
  """Modifies an instances's parameters.
3396

3397
  """
3398
  HPATH = "instance-modify"
3399
  HTYPE = constants.HTYPE_INSTANCE
3400
  _OP_REQP = ["instance_name"]
3401

    
3402
  def BuildHooksEnv(self):
3403
    """Build hooks env.
3404

3405
    This runs on the master, primary and secondaries.
3406

3407
    """
3408
    args = dict()
3409
    if self.mem:
3410
      args['memory'] = self.mem
3411
    if self.vcpus:
3412
      args['vcpus'] = self.vcpus
3413
    if self.do_ip or self.do_bridge:
3414
      if self.do_ip:
3415
        ip = self.ip
3416
      else:
3417
        ip = self.instance.nics[0].ip
3418
      if self.bridge:
3419
        bridge = self.bridge
3420
      else:
3421
        bridge = self.instance.nics[0].bridge
3422
      args['nics'] = [(ip, bridge)]
3423
    env = _BuildInstanceHookEnvByObject(self.instance, override=args)
3424
    nl = [self.sstore.GetMasterNode(),
3425
          self.instance.primary_node] + list(self.instance.secondary_nodes)
3426
    return env, nl, nl
3427

    
3428
  def CheckPrereq(self):
3429
    """Check prerequisites.
3430

3431
    This only checks the instance list against the existing names.
3432

3433
    """
3434
    self.mem = getattr(self.op, "mem", None)
3435
    self.vcpus = getattr(self.op, "vcpus", None)
3436
    self.ip = getattr(self.op, "ip", None)
3437
    self.bridge = getattr(self.op, "bridge", None)
3438
    if [self.mem, self.vcpus, self.ip, self.bridge].count(None) == 4:
3439
      raise errors.OpPrereqError("No changes submitted")
3440
    if self.mem is not None:
3441
      try:
3442
        self.mem = int(self.mem)
3443
      except ValueError, err:
3444
        raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
3445
    if self.vcpus is not None:
3446
      try:
3447
        self.vcpus = int(self.vcpus)
3448
      except ValueError, err:
3449
        raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
3450
    if self.ip is not None:
3451
      self.do_ip = True
3452
      if self.ip.lower() == "none":
3453
        self.ip = None
3454
      else:
3455
        if not utils.IsValidIP(self.ip):
3456
          raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
3457
    else:
3458
      self.do_ip = False
3459
    self.do_bridge = (self.bridge is not None)
3460

    
3461
    instance = self.cfg.GetInstanceInfo(
3462
      self.cfg.ExpandInstanceName(self.op.instance_name))
3463
    if instance is None:
3464
      raise errors.OpPrereqError("No such instance name '%s'" %
3465
                                 self.op.instance_name)
3466
    self.op.instance_name = instance.name
3467
    self.instance = instance
3468
    return
3469

    
3470
  def Exec(self, feedback_fn):
3471
    """Modifies an instance.
3472

3473
    All parameters take effect only at the next restart of the instance.
3474
    """
3475
    result = []
3476
    instance = self.instance
3477
    if self.mem:
3478
      instance.memory = self.mem
3479
      result.append(("mem", self.mem))
3480
    if self.vcpus:
3481
      instance.vcpus = self.vcpus
3482
      result.append(("vcpus",  self.vcpus))
3483
    if self.do_ip:
3484
      instance.nics[0].ip = self.ip
3485
      result.append(("ip", self.ip))
3486
    if self.bridge:
3487
      instance.nics[0].bridge = self.bridge
3488
      result.append(("bridge", self.bridge))
3489

    
3490
    self.cfg.AddInstance(instance)
3491

    
3492
    return result
3493

    
3494

    
3495
class LUQueryExports(NoHooksLU):
3496
  """Query the exports list
3497

3498
  """
3499
  _OP_REQP = []
3500

    
3501
  def CheckPrereq(self):
3502
    """Check that the nodelist contains only existing nodes.
3503

3504
    """
3505
    self.nodes = _GetWantedNodes(self, getattr(self.op, "nodes", None))
3506

    
3507
  def Exec(self, feedback_fn):
3508
    """Compute the list of all the exported system images.
3509

3510
    Returns:
3511
      a dictionary with the structure node->(export-list)
3512
      where export-list is a list of the instances exported on
3513
      that node.
3514

3515
    """
3516
    return rpc.call_export_list(self.nodes)
3517

    
3518

    
3519
class LUExportInstance(LogicalUnit):
3520
  """Export an instance to an image in the cluster.
3521

3522
  """
3523
  HPATH = "instance-export"
3524
  HTYPE = constants.HTYPE_INSTANCE
3525
  _OP_REQP = ["instance_name", "target_node", "shutdown"]
3526

    
3527
  def BuildHooksEnv(self):
3528
    """Build hooks env.
3529

3530
    This will run on the master, primary node and target node.
3531

3532
    """
3533
    env = {
3534
      "EXPORT_NODE": self.op.target_node,
3535
      "EXPORT_DO_SHUTDOWN": self.op.shutdown,
3536
      }
3537
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3538
    nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
3539
          self.op.target_node]
3540
    return env, nl, nl
3541

    
3542
  def CheckPrereq(self):
3543
    """Check prerequisites.
3544

3545
    This checks that the instance name is a valid one.
3546

3547
    """
3548
    instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
3549
    self.instance = self.cfg.GetInstanceInfo(instance_name)
3550
    if self.instance is None:
3551
      raise errors.OpPrereqError("Instance '%s' not found" %
3552
                                 self.op.instance_name)
3553

    
3554
    # node verification
3555
    dst_node_short = self.cfg.ExpandNodeName(self.op.target_node)
3556
    self.dst_node = self.cfg.GetNodeInfo(dst_node_short)
3557

    
3558
    if self.dst_node is None:
3559
      raise errors.OpPrereqError("Destination node '%s' is unknown." %
3560
                                 self.op.target_node)
3561
    self.op.target_node = self.dst_node.name
3562

    
3563
  def Exec(self, feedback_fn):
3564
    """Export an instance to an image in the cluster.
3565

3566
    """
3567
    instance = self.instance
3568
    dst_node = self.dst_node
3569
    src_node = instance.primary_node
3570
    # shutdown the instance, unless requested not to do so
3571
    if self.op.shutdown:
3572
      op = opcodes.OpShutdownInstance(instance_name=instance.name)
3573
      self.processor.ChainOpCode(op, feedback_fn)
3574

    
3575
    vgname = self.cfg.GetVGName()
3576

    
3577
    snap_disks = []
3578

    
3579
    try:
3580
      for disk in instance.disks:
3581
        if disk.iv_name == "sda":
3582
          # new_dev_name will be a snapshot of an lvm leaf of the one we passed
3583
          new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
3584

    
3585
          if not new_dev_name:
3586
            logger.Error("could not snapshot block device %s on node %s" %
3587
                         (disk.logical_id[1], src_node))
3588
          else:
3589
            new_dev = objects.Disk(dev_type="lvm", size=disk.size,
3590
                                      logical_id=(vgname, new_dev_name),
3591
                                      physical_id=(vgname, new_dev_name),
3592
                                      iv_name=disk.iv_name)
3593
            snap_disks.append(new_dev)
3594

    
3595
    finally:
3596
      if self.op.shutdown:
3597
        op = opcodes.OpStartupInstance(instance_name=instance.name,
3598
                                       force=False)
3599
        self.processor.ChainOpCode(op, feedback_fn)
3600

    
3601
    # TODO: check for size
3602

    
3603
    for dev in snap_disks:
3604
      if not rpc.call_snapshot_export(src_node, dev, dst_node.name,
3605
                                           instance):
3606
        logger.Error("could not export block device %s from node"
3607
                     " %s to node %s" %
3608
                     (dev.logical_id[1], src_node, dst_node.name))
3609
      if not rpc.call_blockdev_remove(src_node, dev):
3610
        logger.Error("could not remove snapshot block device %s from"
3611
                     " node %s" % (dev.logical_id[1], src_node))
3612

    
3613
    if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
3614
      logger.Error("could not finalize export for instance %s on node %s" %
3615
                   (instance.name, dst_node.name))
3616

    
3617
    nodelist = self.cfg.GetNodeList()
3618
    nodelist.remove(dst_node.name)
3619

    
3620
    # on one-node clusters nodelist will be empty after the removal
3621
    # if we proceed the backup would be removed because OpQueryExports
3622
    # substitutes an empty list with the full cluster node list.
3623
    if nodelist:
3624
      op = opcodes.OpQueryExports(nodes=nodelist)
3625
      exportlist = self.processor.ChainOpCode(op, feedback_fn)
3626
      for node in exportlist:
3627
        if instance.name in exportlist[node]:
3628
          if not rpc.call_export_remove(node, instance.name):
3629
            logger.Error("could not remove older export for instance %s"
3630
                         " on node %s" % (instance.name, node))
3631

    
3632

    
3633
class TagsLU(NoHooksLU):
3634
  """Generic tags LU.
3635

3636
  This is an abstract class which is the parent of all the other tags LUs.
3637

3638
  """
3639
  def CheckPrereq(self):
3640
    """Check prerequisites.
3641

3642
    """
3643
    if self.op.kind == constants.TAG_CLUSTER:
3644
      self.target = self.cfg.GetClusterInfo()
3645
    elif self.op.kind == constants.TAG_NODE:
3646
      name = self.cfg.ExpandNodeName(self.op.name)
3647
      if name is None:
3648
        raise errors.OpPrereqError("Invalid node name (%s)" %
3649
                                   (self.op.name,))
3650
      self.op.name = name
3651
      self.target = self.cfg.GetNodeInfo(name)
3652
    elif self.op.kind == constants.TAG_INSTANCE:
3653
      name = self.cfg.ExpandInstanceName(name)
3654
      if name is None:
3655
        raise errors.OpPrereqError("Invalid instance name (%s)" %
3656
                                   (self.op.name,))
3657
      self.op.name = name
3658
      self.target = self.cfg.GetInstanceInfo(name)
3659
    else:
3660
      raise errors.OpPrereqError("Wrong tag type requested (%s)" %
3661
                                 str(self.op.kind))
3662

    
3663

    
3664
class LUGetTags(TagsLU):
3665
  """Returns the tags of a given object.
3666

3667
  """
3668
  _OP_REQP = ["kind", "name"]
3669

    
3670
  def Exec(self, feedback_fn):
3671
    """Returns the tag list.
3672

3673
    """
3674
    return self.target.GetTags()
3675

    
3676

    
3677
class LUAddTag(TagsLU):
3678
  """Sets a tag on a given object.
3679

3680
  """
3681
  _OP_REQP = ["kind", "name", "tag"]
3682

    
3683
  def CheckPrereq(self):
3684
    """Check prerequisites.
3685

3686
    This checks the type and length of the tag name and value.
3687

3688
    """
3689
    TagsLU.CheckPrereq(self)
3690
    objects.TaggableObject.ValidateTag(self.op.tag)
3691

    
3692
  def Exec(self, feedback_fn):
3693
    """Sets the tag.
3694

3695
    """
3696
    try:
3697
      self.target.AddTag(self.op.tag)
3698
    except errors.TagError, err:
3699
      raise errors.OpExecError("Error while setting tag: %s" % str(err))
3700
    try:
3701
      self.cfg.Update(self.target)
3702
    except errors.ConfigurationError:
3703
      raise errors.OpRetryError("There has been a modification to the"
3704
                                " config file and the operation has been"
3705
                                " aborted. Please retry.")
3706

    
3707

    
3708
class LUDelTag(TagsLU):
3709
  """Delete a tag from a given object.
3710

3711
  """
3712
  _OP_REQP = ["kind", "name", "tag"]
3713

    
3714
  def CheckPrereq(self):
3715
    """Check prerequisites.
3716

3717
    This checks that we have the given tag.
3718

3719
    """
3720
    TagsLU.CheckPrereq(self)
3721
    objects.TaggableObject.ValidateTag(self.op.tag)
3722
    if self.op.tag not in self.target.GetTags():
3723
      raise errors.OpPrereqError("Tag not found")
3724

    
3725
  def Exec(self, feedback_fn):
3726
    """Remove the tag from the object.
3727

3728
    """
3729
    self.target.RemoveTag(self.op.tag)
3730
    try:
3731
      self.cfg.Update(self.target)
3732
    except errors.ConfigurationError:
3733
      raise errors.OpRetryError("There has been a modification to the"
3734
                                " config file and the operation has been"
3735
                                " aborted. Please retry.")