Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ 07bd8a51

History | View | Annotate | Download (121.4 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0613,W0201
25

    
26
import os
27
import os.path
28
import sha
29
import socket
30
import time
31
import tempfile
32
import re
33
import platform
34

    
35
from ganeti import rpc
36
from ganeti import ssh
37
from ganeti import logger
38
from ganeti import utils
39
from ganeti import errors
40
from ganeti import hypervisor
41
from ganeti import config
42
from ganeti import constants
43
from ganeti import objects
44
from ganeti import opcodes
45
from ganeti import ssconf
46

    
47
class LogicalUnit(object):
48
  """Logical Unit base class.
49

50
  Subclasses must follow these rules:
51
    - implement CheckPrereq which also fills in the opcode instance
52
      with all the fields (even if as None)
53
    - implement Exec
54
    - implement BuildHooksEnv
55
    - redefine HPATH and HTYPE
56
    - optionally redefine their run requirements (REQ_CLUSTER,
57
      REQ_MASTER); note that all commands require root permissions
58

59
  """
60
  HPATH = None
61
  HTYPE = None
62
  _OP_REQP = []
63
  REQ_CLUSTER = True
64
  REQ_MASTER = True
65

    
66
  def __init__(self, processor, op, cfg, sstore):
67
    """Constructor for LogicalUnit.
68

69
    This needs to be overriden in derived classes in order to check op
70
    validity.
71

72
    """
73
    self.processor = processor
74
    self.op = op
75
    self.cfg = cfg
76
    self.sstore = sstore
77
    for attr_name in self._OP_REQP:
78
      attr_val = getattr(op, attr_name, None)
79
      if attr_val is None:
80
        raise errors.OpPrereqError("Required parameter '%s' missing" %
81
                                   attr_name)
82
    if self.REQ_CLUSTER:
83
      if not cfg.IsCluster():
84
        raise errors.OpPrereqError("Cluster not initialized yet,"
85
                                   " use 'gnt-cluster init' first.")
86
      if self.REQ_MASTER:
87
        master = sstore.GetMasterNode()
88
        if master != socket.gethostname():
89
          raise errors.OpPrereqError("Commands must be run on the master"
90
                                     " node %s" % master)
91

    
92
  def CheckPrereq(self):
93
    """Check prerequisites for this LU.
94

95
    This method should check that the prerequisites for the execution
96
    of this LU are fulfilled. It can do internode communication, but
97
    it should be idempotent - no cluster or system changes are
98
    allowed.
99

100
    The method should raise errors.OpPrereqError in case something is
101
    not fulfilled. Its return value is ignored.
102

103
    This method should also update all the parameters of the opcode to
104
    their canonical form; e.g. a short node name must be fully
105
    expanded after this method has successfully completed (so that
106
    hooks, logging, etc. work correctly).
107

108
    """
109
    raise NotImplementedError
110

    
111
  def Exec(self, feedback_fn):
112
    """Execute the LU.
113

114
    This method should implement the actual work. It should raise
115
    errors.OpExecError for failures that are somewhat dealt with in
116
    code, or expected.
117

118
    """
119
    raise NotImplementedError
120

    
121
  def BuildHooksEnv(self):
122
    """Build hooks environment for this LU.
123

124
    This method should return a three-node tuple consisting of: a dict
125
    containing the environment that will be used for running the
126
    specific hook for this LU, a list of node names on which the hook
127
    should run before the execution, and a list of node names on which
128
    the hook should run after the execution.
129

130
    The keys of the dict must not have 'GANETI_' prefixed as this will
131
    be handled in the hooks runner. Also note additional keys will be
132
    added by the hooks runner. If the LU doesn't define any
133
    environment, an empty dict (and not None) should be returned.
134

135
    As for the node lists, the master should not be included in the
136
    them, as it will be added by the hooks runner in case this LU
137
    requires a cluster to run on (otherwise we don't have a node
138
    list). No nodes should be returned as an empty list (and not
139
    None).
140

141
    Note that if the HPATH for a LU class is None, this function will
142
    not be called.
143

144
    """
145
    raise NotImplementedError
146

    
147

    
148
class NoHooksLU(LogicalUnit):
149
  """Simple LU which runs no hooks.
150

151
  This LU is intended as a parent for other LogicalUnits which will
152
  run no hooks, in order to reduce duplicate code.
153

154
  """
155
  HPATH = None
156
  HTYPE = None
157

    
158
  def BuildHooksEnv(self):
159
    """Build hooks env.
160

161
    This is a no-op, since we don't run hooks.
162

163
    """
164
    return
165

    
166

    
167
def _GetWantedNodes(lu, nodes):
168
  """Returns list of checked and expanded node names.
169

170
  Args:
171
    nodes: List of nodes (strings) or None for all
172

173
  """
174
  if not isinstance(nodes, list):
175
    raise errors.OpPrereqError("Invalid argument type 'nodes'")
176

    
177
  if nodes:
178
    wanted = []
179

    
180
    for name in nodes:
181
      node = lu.cfg.ExpandNodeName(name)
182
      if node is None:
183
        raise errors.OpPrereqError("No such node name '%s'" % name)
184
      wanted.append(node)
185

    
186
  else:
187
    wanted = lu.cfg.GetNodeList()
188
  return utils.NiceSort(wanted)
189

    
190

    
191
def _GetWantedInstances(lu, instances):
192
  """Returns list of checked and expanded instance names.
193

194
  Args:
195
    instances: List of instances (strings) or None for all
196

197
  """
198
  if not isinstance(instances, list):
199
    raise errors.OpPrereqError("Invalid argument type 'instances'")
200

    
201
  if instances:
202
    wanted = []
203

    
204
    for name in instances:
205
      instance = lu.cfg.ExpandInstanceName(name)
206
      if instance is None:
207
        raise errors.OpPrereqError("No such instance name '%s'" % name)
208
      wanted.append(instance)
209

    
210
  else:
211
    wanted = lu.cfg.GetInstanceList()
212
  return utils.NiceSort(wanted)
213

    
214

    
215
def _CheckOutputFields(static, dynamic, selected):
216
  """Checks whether all selected fields are valid.
217

218
  Args:
219
    static: Static fields
220
    dynamic: Dynamic fields
221

222
  """
223
  static_fields = frozenset(static)
224
  dynamic_fields = frozenset(dynamic)
225

    
226
  all_fields = static_fields | dynamic_fields
227

    
228
  if not all_fields.issuperset(selected):
229
    raise errors.OpPrereqError("Unknown output fields selected: %s"
230
                               % ",".join(frozenset(selected).
231
                                          difference(all_fields)))
232

    
233

    
234
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
235
                          memory, vcpus, nics):
236
  """Builds instance related env variables for hooks from single variables.
237

238
  Args:
239
    secondary_nodes: List of secondary nodes as strings
240
  """
241
  env = {
242
    "INSTANCE_NAME": name,
243
    "INSTANCE_PRIMARY": primary_node,
244
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
245
    "INSTANCE_OS_TYPE": os_type,
246
    "INSTANCE_STATUS": status,
247
    "INSTANCE_MEMORY": memory,
248
    "INSTANCE_VCPUS": vcpus,
249
  }
250

    
251
  if nics:
252
    nic_count = len(nics)
253
    for idx, (ip, bridge) in enumerate(nics):
254
      if ip is None:
255
        ip = ""
256
      env["INSTANCE_NIC%d_IP" % idx] = ip
257
      env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
258
  else:
259
    nic_count = 0
260

    
261
  env["INSTANCE_NIC_COUNT"] = nic_count
262

    
263
  return env
264

    
265

    
266
def _BuildInstanceHookEnvByObject(instance, override=None):
267
  """Builds instance related env variables for hooks from an object.
268

269
  Args:
270
    instance: objects.Instance object of instance
271
    override: dict of values to override
272
  """
273
  args = {
274
    'name': instance.name,
275
    'primary_node': instance.primary_node,
276
    'secondary_nodes': instance.secondary_nodes,
277
    'os_type': instance.os,
278
    'status': instance.os,
279
    'memory': instance.memory,
280
    'vcpus': instance.vcpus,
281
    'nics': [(nic.ip, nic.bridge) for nic in instance.nics],
282
  }
283
  if override:
284
    args.update(override)
285
  return _BuildInstanceHookEnv(**args)
286

    
287

    
288
def _UpdateEtcHosts(fullnode, ip):
289
  """Ensure a node has a correct entry in /etc/hosts.
290

291
  Args:
292
    fullnode - Fully qualified domain name of host. (str)
293
    ip       - IPv4 address of host (str)
294

295
  """
296
  node = fullnode.split(".", 1)[0]
297

    
298
  f = open('/etc/hosts', 'r+')
299

    
300
  inthere = False
301

    
302
  save_lines = []
303
  add_lines = []
304
  removed = False
305

    
306
  while True:
307
    rawline = f.readline()
308

    
309
    if not rawline:
310
      # End of file
311
      break
312

    
313
    line = rawline.split('\n')[0]
314

    
315
    # Strip off comments
316
    line = line.split('#')[0]
317

    
318
    if not line:
319
      # Entire line was comment, skip
320
      save_lines.append(rawline)
321
      continue
322

    
323
    fields = line.split()
324

    
325
    haveall = True
326
    havesome = False
327
    for spec in [ ip, fullnode, node ]:
328
      if spec not in fields:
329
        haveall = False
330
      if spec in fields:
331
        havesome = True
332

    
333
    if haveall:
334
      inthere = True
335
      save_lines.append(rawline)
336
      continue
337

    
338
    if havesome and not haveall:
339
      # Line (old, or manual?) which is missing some.  Remove.
340
      removed = True
341
      continue
342

    
343
    save_lines.append(rawline)
344

    
345
  if not inthere:
346
    add_lines.append('%s\t%s %s\n' % (ip, fullnode, node))
347

    
348
  if removed:
349
    if add_lines:
350
      save_lines = save_lines + add_lines
351

    
352
    # We removed a line, write a new file and replace old.
353
    fd, tmpname = tempfile.mkstemp('tmp', 'hosts_', '/etc')
354
    newfile = os.fdopen(fd, 'w')
355
    newfile.write(''.join(save_lines))
356
    newfile.close()
357
    os.rename(tmpname, '/etc/hosts')
358

    
359
  elif add_lines:
360
    # Simply appending a new line will do the trick.
361
    f.seek(0, 2)
362
    for add in add_lines:
363
      f.write(add)
364

    
365
  f.close()
366

    
367

    
368
def _UpdateKnownHosts(fullnode, ip, pubkey):
369
  """Ensure a node has a correct known_hosts entry.
370

371
  Args:
372
    fullnode - Fully qualified domain name of host. (str)
373
    ip       - IPv4 address of host (str)
374
    pubkey   - the public key of the cluster
375

376
  """
377
  if os.path.exists(constants.SSH_KNOWN_HOSTS_FILE):
378
    f = open(constants.SSH_KNOWN_HOSTS_FILE, 'r+')
379
  else:
380
    f = open(constants.SSH_KNOWN_HOSTS_FILE, 'w+')
381

    
382
  inthere = False
383

    
384
  save_lines = []
385
  add_lines = []
386
  removed = False
387

    
388
  while True:
389
    rawline = f.readline()
390
    logger.Debug('read %s' % (repr(rawline),))
391

    
392
    if not rawline:
393
      # End of file
394
      break
395

    
396
    line = rawline.split('\n')[0]
397

    
398
    parts = line.split(' ')
399
    fields = parts[0].split(',')
400
    key = parts[2]
401

    
402
    haveall = True
403
    havesome = False
404
    for spec in [ ip, fullnode ]:
405
      if spec not in fields:
406
        haveall = False
407
      if spec in fields:
408
        havesome = True
409

    
410
    logger.Debug("key, pubkey = %s." % (repr((key, pubkey)),))
411
    if haveall and key == pubkey:
412
      inthere = True
413
      save_lines.append(rawline)
414
      logger.Debug("Keeping known_hosts '%s'." % (repr(rawline),))
415
      continue
416

    
417
    if havesome and (not haveall or key != pubkey):
418
      removed = True
419
      logger.Debug("Discarding known_hosts '%s'." % (repr(rawline),))
420
      continue
421

    
422
    save_lines.append(rawline)
423

    
424
  if not inthere:
425
    add_lines.append('%s,%s ssh-rsa %s\n' % (fullnode, ip, pubkey))
426
    logger.Debug("Adding known_hosts '%s'." % (repr(add_lines[-1]),))
427

    
428
  if removed:
429
    save_lines = save_lines + add_lines
430

    
431
    # Write a new file and replace old.
432
    fd, tmpname = tempfile.mkstemp('.tmp', 'known_hosts.',
433
                                   constants.DATA_DIR)
434
    newfile = os.fdopen(fd, 'w')
435
    try:
436
      newfile.write(''.join(save_lines))
437
    finally:
438
      newfile.close()
439
    logger.Debug("Wrote new known_hosts.")
440
    os.rename(tmpname, constants.SSH_KNOWN_HOSTS_FILE)
441

    
442
  elif add_lines:
443
    # Simply appending a new line will do the trick.
444
    f.seek(0, 2)
445
    for add in add_lines:
446
      f.write(add)
447

    
448
  f.close()
449

    
450

    
451
def _HasValidVG(vglist, vgname):
452
  """Checks if the volume group list is valid.
453

454
  A non-None return value means there's an error, and the return value
455
  is the error message.
456

457
  """
458
  vgsize = vglist.get(vgname, None)
459
  if vgsize is None:
460
    return "volume group '%s' missing" % vgname
461
  elif vgsize < 20480:
462
    return ("volume group '%s' too small (20480MiB required, %dMib found)" %
463
            (vgname, vgsize))
464
  return None
465

    
466

    
467
def _InitSSHSetup(node):
468
  """Setup the SSH configuration for the cluster.
469

470

471
  This generates a dsa keypair for root, adds the pub key to the
472
  permitted hosts and adds the hostkey to its own known hosts.
473

474
  Args:
475
    node: the name of this host as a fqdn
476

477
  """
478
  if os.path.exists('/root/.ssh/id_dsa'):
479
    utils.CreateBackup('/root/.ssh/id_dsa')
480
  if os.path.exists('/root/.ssh/id_dsa.pub'):
481
    utils.CreateBackup('/root/.ssh/id_dsa.pub')
482

    
483
  utils.RemoveFile('/root/.ssh/id_dsa')
484
  utils.RemoveFile('/root/.ssh/id_dsa.pub')
485

    
486
  result = utils.RunCmd(["ssh-keygen", "-t", "dsa",
487
                         "-f", "/root/.ssh/id_dsa",
488
                         "-q", "-N", ""])
489
  if result.failed:
490
    raise errors.OpExecError("Could not generate ssh keypair, error %s" %
491
                             result.output)
492

    
493
  f = open('/root/.ssh/id_dsa.pub', 'r')
494
  try:
495
    utils.AddAuthorizedKey('/root/.ssh/authorized_keys', f.read(8192))
496
  finally:
497
    f.close()
498

    
499

    
500
def _InitGanetiServerSetup(ss):
501
  """Setup the necessary configuration for the initial node daemon.
502

503
  This creates the nodepass file containing the shared password for
504
  the cluster and also generates the SSL certificate.
505

506
  """
507
  # Create pseudo random password
508
  randpass = sha.new(os.urandom(64)).hexdigest()
509
  # and write it into sstore
510
  ss.SetKey(ss.SS_NODED_PASS, randpass)
511

    
512
  result = utils.RunCmd(["openssl", "req", "-new", "-newkey", "rsa:1024",
513
                         "-days", str(365*5), "-nodes", "-x509",
514
                         "-keyout", constants.SSL_CERT_FILE,
515
                         "-out", constants.SSL_CERT_FILE, "-batch"])
516
  if result.failed:
517
    raise errors.OpExecError("could not generate server ssl cert, command"
518
                             " %s had exitcode %s and error message %s" %
519
                             (result.cmd, result.exit_code, result.output))
520

    
521
  os.chmod(constants.SSL_CERT_FILE, 0400)
522

    
523
  result = utils.RunCmd([constants.NODE_INITD_SCRIPT, "restart"])
524

    
525
  if result.failed:
526
    raise errors.OpExecError("Could not start the node daemon, command %s"
527
                             " had exitcode %s and error %s" %
528
                             (result.cmd, result.exit_code, result.output))
529

    
530

    
531
class LUInitCluster(LogicalUnit):
532
  """Initialise the cluster.
533

534
  """
535
  HPATH = "cluster-init"
536
  HTYPE = constants.HTYPE_CLUSTER
537
  _OP_REQP = ["cluster_name", "hypervisor_type", "vg_name", "mac_prefix",
538
              "def_bridge", "master_netdev"]
539
  REQ_CLUSTER = False
540

    
541
  def BuildHooksEnv(self):
542
    """Build hooks env.
543

544
    Notes: Since we don't require a cluster, we must manually add
545
    ourselves in the post-run node list.
546

547
    """
548
    env = {
549
      "CLUSTER": self.op.cluster_name,
550
      "MASTER": self.hostname['hostname_full'],
551
      }
552
    return env, [], [self.hostname['hostname_full']]
553

    
554
  def CheckPrereq(self):
555
    """Verify that the passed name is a valid one.
556

557
    """
558
    if config.ConfigWriter.IsCluster():
559
      raise errors.OpPrereqError("Cluster is already initialised")
560

    
561
    hostname_local = socket.gethostname()
562
    self.hostname = hostname = utils.LookupHostname(hostname_local)
563
    if not hostname:
564
      raise errors.OpPrereqError("Cannot resolve my own hostname ('%s')" %
565
                                 hostname_local)
566

    
567
    if hostname["hostname_full"] != hostname_local:
568
      raise errors.OpPrereqError("My own hostname (%s) does not match the"
569
                                 " resolver (%s): probably not using FQDN"
570
                                 " for hostname." %
571
                                 (hostname_local, hostname["hostname_full"]))
572

    
573
    if hostname["ip"].startswith("127."):
574
      raise errors.OpPrereqError("This host's IP resolves to the private"
575
                                 " range (%s). Please fix DNS or /etc/hosts." %
576
                                 (hostname["ip"],))
577

    
578
    self.clustername = clustername = utils.LookupHostname(self.op.cluster_name)
579
    if not clustername:
580
      raise errors.OpPrereqError("Cannot resolve given cluster name ('%s')"
581
                                 % self.op.cluster_name)
582

    
583
    result = utils.RunCmd(["fping", "-S127.0.0.1", "-q", hostname['ip']])
584
    if result.failed:
585
      raise errors.OpPrereqError("Inconsistency: this host's name resolves"
586
                                 " to %s,\nbut this ip address does not"
587
                                 " belong to this host."
588
                                 " Aborting." % hostname['ip'])
589

    
590
    secondary_ip = getattr(self.op, "secondary_ip", None)
591
    if secondary_ip and not utils.IsValidIP(secondary_ip):
592
      raise errors.OpPrereqError("Invalid secondary ip given")
593
    if secondary_ip and secondary_ip != hostname['ip']:
594
      result = utils.RunCmd(["fping", "-S127.0.0.1", "-q", secondary_ip])
595
      if result.failed:
596
        raise errors.OpPrereqError("You gave %s as secondary IP,\n"
597
                                   "but it does not belong to this host." %
598
                                   secondary_ip)
599
    self.secondary_ip = secondary_ip
600

    
601
    # checks presence of the volume group given
602
    vgstatus = _HasValidVG(utils.ListVolumeGroups(), self.op.vg_name)
603

    
604
    if vgstatus:
605
      raise errors.OpPrereqError("Error: %s" % vgstatus)
606

    
607
    if not re.match("^[0-9a-z]{2}:[0-9a-z]{2}:[0-9a-z]{2}$",
608
                    self.op.mac_prefix):
609
      raise errors.OpPrereqError("Invalid mac prefix given '%s'" %
610
                                 self.op.mac_prefix)
611

    
612
    if self.op.hypervisor_type not in hypervisor.VALID_HTYPES:
613
      raise errors.OpPrereqError("Invalid hypervisor type given '%s'" %
614
                                 self.op.hypervisor_type)
615

    
616
    result = utils.RunCmd(["ip", "link", "show", "dev", self.op.master_netdev])
617
    if result.failed:
618
      raise errors.OpPrereqError("Invalid master netdev given (%s): '%s'" %
619
                                 (self.op.master_netdev,
620
                                  result.output.strip()))
621

    
622
  def Exec(self, feedback_fn):
623
    """Initialize the cluster.
624

625
    """
626
    clustername = self.clustername
627
    hostname = self.hostname
628

    
629
    # set up the simple store
630
    ss = ssconf.SimpleStore()
631
    ss.SetKey(ss.SS_HYPERVISOR, self.op.hypervisor_type)
632
    ss.SetKey(ss.SS_MASTER_NODE, hostname['hostname_full'])
633
    ss.SetKey(ss.SS_MASTER_IP, clustername['ip'])
634
    ss.SetKey(ss.SS_MASTER_NETDEV, self.op.master_netdev)
635
    ss.SetKey(ss.SS_CLUSTER_NAME, clustername['hostname'])
636

    
637
    # set up the inter-node password and certificate
638
    _InitGanetiServerSetup(ss)
639

    
640
    # start the master ip
641
    rpc.call_node_start_master(hostname['hostname_full'])
642

    
643
    # set up ssh config and /etc/hosts
644
    f = open('/etc/ssh/ssh_host_rsa_key.pub', 'r')
645
    try:
646
      sshline = f.read()
647
    finally:
648
      f.close()
649
    sshkey = sshline.split(" ")[1]
650

    
651
    _UpdateEtcHosts(hostname['hostname_full'],
652
                    hostname['ip'],
653
                    )
654

    
655
    _UpdateKnownHosts(hostname['hostname_full'],
656
                      hostname['ip'],
657
                      sshkey,
658
                      )
659

    
660
    _InitSSHSetup(hostname['hostname'])
661

    
662
    # init of cluster config file
663
    cfgw = config.ConfigWriter()
664
    cfgw.InitConfig(hostname['hostname'], hostname['ip'], self.secondary_ip,
665
                    sshkey, self.op.mac_prefix,
666
                    self.op.vg_name, self.op.def_bridge)
667

    
668

    
669
class LUDestroyCluster(NoHooksLU):
670
  """Logical unit for destroying the cluster.
671

672
  """
673
  _OP_REQP = []
674

    
675
  def CheckPrereq(self):
676
    """Check prerequisites.
677

678
    This checks whether the cluster is empty.
679

680
    Any errors are signalled by raising errors.OpPrereqError.
681

682
    """
683
    master = self.sstore.GetMasterNode()
684

    
685
    nodelist = self.cfg.GetNodeList()
686
    if len(nodelist) != 1 or nodelist[0] != master:
687
      raise errors.OpPrereqError("There are still %d node(s) in"
688
                                 " this cluster." % (len(nodelist) - 1))
689
    instancelist = self.cfg.GetInstanceList()
690
    if instancelist:
691
      raise errors.OpPrereqError("There are still %d instance(s) in"
692
                                 " this cluster." % len(instancelist))
693

    
694
  def Exec(self, feedback_fn):
695
    """Destroys the cluster.
696

697
    """
698
    utils.CreateBackup('/root/.ssh/id_dsa')
699
    utils.CreateBackup('/root/.ssh/id_dsa.pub')
700
    rpc.call_node_leave_cluster(self.sstore.GetMasterNode())
701

    
702

    
703
class LUVerifyCluster(NoHooksLU):
704
  """Verifies the cluster status.
705

706
  """
707
  _OP_REQP = []
708

    
709
  def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
710
                  remote_version, feedback_fn):
711
    """Run multiple tests against a node.
712

713
    Test list:
714
      - compares ganeti version
715
      - checks vg existance and size > 20G
716
      - checks config file checksum
717
      - checks ssh to other nodes
718

719
    Args:
720
      node: name of the node to check
721
      file_list: required list of files
722
      local_cksum: dictionary of local files and their checksums
723

724
    """
725
    # compares ganeti version
726
    local_version = constants.PROTOCOL_VERSION
727
    if not remote_version:
728
      feedback_fn(" - ERROR: connection to %s failed" % (node))
729
      return True
730

    
731
    if local_version != remote_version:
732
      feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
733
                      (local_version, node, remote_version))
734
      return True
735

    
736
    # checks vg existance and size > 20G
737

    
738
    bad = False
739
    if not vglist:
740
      feedback_fn("  - ERROR: unable to check volume groups on node %s." %
741
                      (node,))
742
      bad = True
743
    else:
744
      vgstatus = _HasValidVG(vglist, self.cfg.GetVGName())
745
      if vgstatus:
746
        feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
747
        bad = True
748

    
749
    # checks config file checksum
750
    # checks ssh to any
751

    
752
    if 'filelist' not in node_result:
753
      bad = True
754
      feedback_fn("  - ERROR: node hasn't returned file checksum data")
755
    else:
756
      remote_cksum = node_result['filelist']
757
      for file_name in file_list:
758
        if file_name not in remote_cksum:
759
          bad = True
760
          feedback_fn("  - ERROR: file '%s' missing" % file_name)
761
        elif remote_cksum[file_name] != local_cksum[file_name]:
762
          bad = True
763
          feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
764

    
765
    if 'nodelist' not in node_result:
766
      bad = True
767
      feedback_fn("  - ERROR: node hasn't returned node connectivity data")
768
    else:
769
      if node_result['nodelist']:
770
        bad = True
771
        for node in node_result['nodelist']:
772
          feedback_fn("  - ERROR: communication with node '%s': %s" %
773
                          (node, node_result['nodelist'][node]))
774
    hyp_result = node_result.get('hypervisor', None)
775
    if hyp_result is not None:
776
      feedback_fn("  - ERROR: hypervisor verify failure: '%s'" % hyp_result)
777
    return bad
778

    
779
  def _VerifyInstance(self, instance, node_vol_is, node_instance, feedback_fn):
780
    """Verify an instance.
781

782
    This function checks to see if the required block devices are
783
    available on the instance's node.
784

785
    """
786
    bad = False
787

    
788
    instancelist = self.cfg.GetInstanceList()
789
    if not instance in instancelist:
790
      feedback_fn("  - ERROR: instance %s not in instance list %s" %
791
                      (instance, instancelist))
792
      bad = True
793

    
794
    instanceconfig = self.cfg.GetInstanceInfo(instance)
795
    node_current = instanceconfig.primary_node
796

    
797
    node_vol_should = {}
798
    instanceconfig.MapLVsByNode(node_vol_should)
799

    
800
    for node in node_vol_should:
801
      for volume in node_vol_should[node]:
802
        if node not in node_vol_is or volume not in node_vol_is[node]:
803
          feedback_fn("  - ERROR: volume %s missing on node %s" %
804
                          (volume, node))
805
          bad = True
806

    
807
    if not instanceconfig.status == 'down':
808
      if not instance in node_instance[node_current]:
809
        feedback_fn("  - ERROR: instance %s not running on node %s" %
810
                        (instance, node_current))
811
        bad = True
812

    
813
    for node in node_instance:
814
      if (not node == node_current):
815
        if instance in node_instance[node]:
816
          feedback_fn("  - ERROR: instance %s should not run on node %s" %
817
                          (instance, node))
818
          bad = True
819

    
820
    return not bad
821

    
822
  def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
823
    """Verify if there are any unknown volumes in the cluster.
824

825
    The .os, .swap and backup volumes are ignored. All other volumes are
826
    reported as unknown.
827

828
    """
829
    bad = False
830

    
831
    for node in node_vol_is:
832
      for volume in node_vol_is[node]:
833
        if node not in node_vol_should or volume not in node_vol_should[node]:
834
          feedback_fn("  - ERROR: volume %s on node %s should not exist" %
835
                      (volume, node))
836
          bad = True
837
    return bad
838

    
839
  def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
840
    """Verify the list of running instances.
841

842
    This checks what instances are running but unknown to the cluster.
843

844
    """
845
    bad = False
846
    for node in node_instance:
847
      for runninginstance in node_instance[node]:
848
        if runninginstance not in instancelist:
849
          feedback_fn("  - ERROR: instance %s on node %s should not exist" %
850
                          (runninginstance, node))
851
          bad = True
852
    return bad
853

    
854
  def CheckPrereq(self):
855
    """Check prerequisites.
856

857
    This has no prerequisites.
858

859
    """
860
    pass
861

    
862
  def Exec(self, feedback_fn):
863
    """Verify integrity of cluster, performing various test on nodes.
864

865
    """
866
    bad = False
867
    feedback_fn("* Verifying global settings")
868
    self.cfg.VerifyConfig()
869

    
870
    master = self.sstore.GetMasterNode()
871
    vg_name = self.cfg.GetVGName()
872
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
873
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
874
    node_volume = {}
875
    node_instance = {}
876

    
877
    # FIXME: verify OS list
878
    # do local checksums
879
    file_names = list(self.sstore.GetFileList())
880
    file_names.append(constants.SSL_CERT_FILE)
881
    file_names.append(constants.CLUSTER_CONF_FILE)
882
    local_checksums = utils.FingerprintFiles(file_names)
883

    
884
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
885
    all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
886
    all_instanceinfo = rpc.call_instance_list(nodelist)
887
    all_vglist = rpc.call_vg_list(nodelist)
888
    node_verify_param = {
889
      'filelist': file_names,
890
      'nodelist': nodelist,
891
      'hypervisor': None,
892
      }
893
    all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
894
    all_rversion = rpc.call_version(nodelist)
895

    
896
    for node in nodelist:
897
      feedback_fn("* Verifying node %s" % node)
898
      result = self._VerifyNode(node, file_names, local_checksums,
899
                                all_vglist[node], all_nvinfo[node],
900
                                all_rversion[node], feedback_fn)
901
      bad = bad or result
902

    
903
      # node_volume
904
      volumeinfo = all_volumeinfo[node]
905

    
906
      if type(volumeinfo) != dict:
907
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
908
        bad = True
909
        continue
910

    
911
      node_volume[node] = volumeinfo
912

    
913
      # node_instance
914
      nodeinstance = all_instanceinfo[node]
915
      if type(nodeinstance) != list:
916
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
917
        bad = True
918
        continue
919

    
920
      node_instance[node] = nodeinstance
921

    
922
    node_vol_should = {}
923

    
924
    for instance in instancelist:
925
      feedback_fn("* Verifying instance %s" % instance)
926
      result =  self._VerifyInstance(instance, node_volume, node_instance,
927
                                     feedback_fn)
928
      bad = bad or result
929

    
930
      inst_config = self.cfg.GetInstanceInfo(instance)
931

    
932
      inst_config.MapLVsByNode(node_vol_should)
933

    
934
    feedback_fn("* Verifying orphan volumes")
935
    result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
936
                                       feedback_fn)
937
    bad = bad or result
938

    
939
    feedback_fn("* Verifying remaining instances")
940
    result = self._VerifyOrphanInstances(instancelist, node_instance,
941
                                         feedback_fn)
942
    bad = bad or result
943

    
944
    return int(bad)
945

    
946

    
947
class LURenameCluster(LogicalUnit):
948
  """Rename the cluster.
949

950
  """
951
  HPATH = "cluster-rename"
952
  HTYPE = constants.HTYPE_CLUSTER
953
  _OP_REQP = ["name"]
954

    
955
  def BuildHooksEnv(self):
956
    """Build hooks env.
957

958
    """
959
    env = {
960
      "NEW_NAME": self.op.name,
961
      }
962
    mn = self.sstore.GetMasterNode()
963
    return env, [mn], [mn]
964

    
965
  def CheckPrereq(self):
966
    """Verify that the passed name is a valid one.
967

968
    """
969
    hostname = utils.LookupHostname(self.op.name)
970
    if not hostname:
971
      raise errors.OpPrereqError("Cannot resolve the new cluster name ('%s')" %
972
                                 self.op.name)
973

    
974
    new_name = hostname["hostname"]
975
    self.ip = new_ip = hostname["ip"]
976
    old_name = self.sstore.GetClusterName()
977
    old_ip = self.sstore.GetMasterIP()
978
    if new_name == old_name and new_ip == old_ip:
979
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
980
                                 " cluster has changed")
981
    if new_ip != old_ip:
982
      result = utils.RunCmd(["fping", "-q", new_ip])
983
      if not result.failed:
984
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
985
                                   " reachable on the network. Aborting." %
986
                                   new_ip)
987

    
988
    self.op.name = new_name
989

    
990
  def Exec(self, feedback_fn):
991
    """Rename the cluster.
992

993
    """
994
    clustername = self.op.name
995
    ip = self.ip
996
    ss = self.sstore
997

    
998
    # shutdown the master IP
999
    master = ss.GetMasterNode()
1000
    if not rpc.call_node_stop_master(master):
1001
      raise errors.OpExecError("Could not disable the master role")
1002

    
1003
    try:
1004
      # modify the sstore
1005
      ss.SetKey(ss.SS_MASTER_IP, ip)
1006
      ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
1007

    
1008
      # Distribute updated ss config to all nodes
1009
      myself = self.cfg.GetNodeInfo(master)
1010
      dist_nodes = self.cfg.GetNodeList()
1011
      if myself.name in dist_nodes:
1012
        dist_nodes.remove(myself.name)
1013

    
1014
      logger.Debug("Copying updated ssconf data to all nodes")
1015
      for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1016
        fname = ss.KeyToFilename(keyname)
1017
        result = rpc.call_upload_file(dist_nodes, fname)
1018
        for to_node in dist_nodes:
1019
          if not result[to_node]:
1020
            logger.Error("copy of file %s to node %s failed" %
1021
                         (fname, to_node))
1022
    finally:
1023
      if not rpc.call_node_start_master(master):
1024
        logger.Error("Could not re-enable the master role on the master,\n"
1025
                     "please restart manually.")
1026

    
1027

    
1028
def _WaitForSync(cfgw, instance, oneshot=False, unlock=False):
1029
  """Sleep and poll for an instance's disk to sync.
1030

1031
  """
1032
  if not instance.disks:
1033
    return True
1034

    
1035
  if not oneshot:
1036
    logger.ToStdout("Waiting for instance %s to sync disks." % instance.name)
1037

    
1038
  node = instance.primary_node
1039

    
1040
  for dev in instance.disks:
1041
    cfgw.SetDiskID(dev, node)
1042

    
1043
  retries = 0
1044
  while True:
1045
    max_time = 0
1046
    done = True
1047
    cumul_degraded = False
1048
    rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1049
    if not rstats:
1050
      logger.ToStderr("Can't get any data from node %s" % node)
1051
      retries += 1
1052
      if retries >= 10:
1053
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1054
                                 " aborting." % node)
1055
      time.sleep(6)
1056
      continue
1057
    retries = 0
1058
    for i in range(len(rstats)):
1059
      mstat = rstats[i]
1060
      if mstat is None:
1061
        logger.ToStderr("Can't compute data for node %s/%s" %
1062
                        (node, instance.disks[i].iv_name))
1063
        continue
1064
      perc_done, est_time, is_degraded = mstat
1065
      cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1066
      if perc_done is not None:
1067
        done = False
1068
        if est_time is not None:
1069
          rem_time = "%d estimated seconds remaining" % est_time
1070
          max_time = est_time
1071
        else:
1072
          rem_time = "no time estimate"
1073
        logger.ToStdout("- device %s: %5.2f%% done, %s" %
1074
                        (instance.disks[i].iv_name, perc_done, rem_time))
1075
    if done or oneshot:
1076
      break
1077

    
1078
    if unlock:
1079
      utils.Unlock('cmd')
1080
    try:
1081
      time.sleep(min(60, max_time))
1082
    finally:
1083
      if unlock:
1084
        utils.Lock('cmd')
1085

    
1086
  if done:
1087
    logger.ToStdout("Instance %s's disks are in sync." % instance.name)
1088
  return not cumul_degraded
1089

    
1090

    
1091
def _CheckDiskConsistency(cfgw, dev, node, on_primary):
1092
  """Check that mirrors are not degraded.
1093

1094
  """
1095
  cfgw.SetDiskID(dev, node)
1096

    
1097
  result = True
1098
  if on_primary or dev.AssembleOnSecondary():
1099
    rstats = rpc.call_blockdev_find(node, dev)
1100
    if not rstats:
1101
      logger.ToStderr("Can't get any data from node %s" % node)
1102
      result = False
1103
    else:
1104
      result = result and (not rstats[5])
1105
  if dev.children:
1106
    for child in dev.children:
1107
      result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1108

    
1109
  return result
1110

    
1111

    
1112
class LUDiagnoseOS(NoHooksLU):
1113
  """Logical unit for OS diagnose/query.
1114

1115
  """
1116
  _OP_REQP = []
1117

    
1118
  def CheckPrereq(self):
1119
    """Check prerequisites.
1120

1121
    This always succeeds, since this is a pure query LU.
1122

1123
    """
1124
    return
1125

    
1126
  def Exec(self, feedback_fn):
1127
    """Compute the list of OSes.
1128

1129
    """
1130
    node_list = self.cfg.GetNodeList()
1131
    node_data = rpc.call_os_diagnose(node_list)
1132
    if node_data == False:
1133
      raise errors.OpExecError("Can't gather the list of OSes")
1134
    return node_data
1135

    
1136

    
1137
class LURemoveNode(LogicalUnit):
1138
  """Logical unit for removing a node.
1139

1140
  """
1141
  HPATH = "node-remove"
1142
  HTYPE = constants.HTYPE_NODE
1143
  _OP_REQP = ["node_name"]
1144

    
1145
  def BuildHooksEnv(self):
1146
    """Build hooks env.
1147

1148
    This doesn't run on the target node in the pre phase as a failed
1149
    node would not allows itself to run.
1150

1151
    """
1152
    env = {
1153
      "NODE_NAME": self.op.node_name,
1154
      }
1155
    all_nodes = self.cfg.GetNodeList()
1156
    all_nodes.remove(self.op.node_name)
1157
    return env, all_nodes, all_nodes
1158

    
1159
  def CheckPrereq(self):
1160
    """Check prerequisites.
1161

1162
    This checks:
1163
     - the node exists in the configuration
1164
     - it does not have primary or secondary instances
1165
     - it's not the master
1166

1167
    Any errors are signalled by raising errors.OpPrereqError.
1168

1169
    """
1170
    node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1171
    if node is None:
1172
      raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1173

    
1174
    instance_list = self.cfg.GetInstanceList()
1175

    
1176
    masternode = self.sstore.GetMasterNode()
1177
    if node.name == masternode:
1178
      raise errors.OpPrereqError("Node is the master node,"
1179
                                 " you need to failover first.")
1180

    
1181
    for instance_name in instance_list:
1182
      instance = self.cfg.GetInstanceInfo(instance_name)
1183
      if node.name == instance.primary_node:
1184
        raise errors.OpPrereqError("Instance %s still running on the node,"
1185
                                   " please remove first." % instance_name)
1186
      if node.name in instance.secondary_nodes:
1187
        raise errors.OpPrereqError("Instance %s has node as a secondary,"
1188
                                   " please remove first." % instance_name)
1189
    self.op.node_name = node.name
1190
    self.node = node
1191

    
1192
  def Exec(self, feedback_fn):
1193
    """Removes the node from the cluster.
1194

1195
    """
1196
    node = self.node
1197
    logger.Info("stopping the node daemon and removing configs from node %s" %
1198
                node.name)
1199

    
1200
    rpc.call_node_leave_cluster(node.name)
1201

    
1202
    ssh.SSHCall(node.name, 'root', "%s stop" % constants.NODE_INITD_SCRIPT)
1203

    
1204
    logger.Info("Removing node %s from config" % node.name)
1205

    
1206
    self.cfg.RemoveNode(node.name)
1207

    
1208

    
1209
class LUQueryNodes(NoHooksLU):
1210
  """Logical unit for querying nodes.
1211

1212
  """
1213
  _OP_REQP = ["output_fields", "names"]
1214

    
1215
  def CheckPrereq(self):
1216
    """Check prerequisites.
1217

1218
    This checks that the fields required are valid output fields.
1219

1220
    """
1221
    self.dynamic_fields = frozenset(["dtotal", "dfree",
1222
                                     "mtotal", "mnode", "mfree"])
1223

    
1224
    _CheckOutputFields(static=["name", "pinst_cnt", "sinst_cnt",
1225
                               "pinst_list", "sinst_list",
1226
                               "pip", "sip"],
1227
                       dynamic=self.dynamic_fields,
1228
                       selected=self.op.output_fields)
1229

    
1230
    self.wanted = _GetWantedNodes(self, self.op.names)
1231

    
1232
  def Exec(self, feedback_fn):
1233
    """Computes the list of nodes and their attributes.
1234

1235
    """
1236
    nodenames = self.wanted
1237
    nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1238

    
1239
    # begin data gathering
1240

    
1241
    if self.dynamic_fields.intersection(self.op.output_fields):
1242
      live_data = {}
1243
      node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1244
      for name in nodenames:
1245
        nodeinfo = node_data.get(name, None)
1246
        if nodeinfo:
1247
          live_data[name] = {
1248
            "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1249
            "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1250
            "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1251
            "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1252
            "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1253
            }
1254
        else:
1255
          live_data[name] = {}
1256
    else:
1257
      live_data = dict.fromkeys(nodenames, {})
1258

    
1259
    node_to_primary = dict([(name, set()) for name in nodenames])
1260
    node_to_secondary = dict([(name, set()) for name in nodenames])
1261

    
1262
    inst_fields = frozenset(("pinst_cnt", "pinst_list",
1263
                             "sinst_cnt", "sinst_list"))
1264
    if inst_fields & frozenset(self.op.output_fields):
1265
      instancelist = self.cfg.GetInstanceList()
1266

    
1267
      for instance_name in instancelist:
1268
        inst = self.cfg.GetInstanceInfo(instance_name)
1269
        if inst.primary_node in node_to_primary:
1270
          node_to_primary[inst.primary_node].add(inst.name)
1271
        for secnode in inst.secondary_nodes:
1272
          if secnode in node_to_secondary:
1273
            node_to_secondary[secnode].add(inst.name)
1274

    
1275
    # end data gathering
1276

    
1277
    output = []
1278
    for node in nodelist:
1279
      node_output = []
1280
      for field in self.op.output_fields:
1281
        if field == "name":
1282
          val = node.name
1283
        elif field == "pinst_list":
1284
          val = list(node_to_primary[node.name])
1285
        elif field == "sinst_list":
1286
          val = list(node_to_secondary[node.name])
1287
        elif field == "pinst_cnt":
1288
          val = len(node_to_primary[node.name])
1289
        elif field == "sinst_cnt":
1290
          val = len(node_to_secondary[node.name])
1291
        elif field == "pip":
1292
          val = node.primary_ip
1293
        elif field == "sip":
1294
          val = node.secondary_ip
1295
        elif field in self.dynamic_fields:
1296
          val = live_data[node.name].get(field, None)
1297
        else:
1298
          raise errors.ParameterError(field)
1299
        node_output.append(val)
1300
      output.append(node_output)
1301

    
1302
    return output
1303

    
1304

    
1305
class LUQueryNodeVolumes(NoHooksLU):
1306
  """Logical unit for getting volumes on node(s).
1307

1308
  """
1309
  _OP_REQP = ["nodes", "output_fields"]
1310

    
1311
  def CheckPrereq(self):
1312
    """Check prerequisites.
1313

1314
    This checks that the fields required are valid output fields.
1315

1316
    """
1317
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1318

    
1319
    _CheckOutputFields(static=["node"],
1320
                       dynamic=["phys", "vg", "name", "size", "instance"],
1321
                       selected=self.op.output_fields)
1322

    
1323

    
1324
  def Exec(self, feedback_fn):
1325
    """Computes the list of nodes and their attributes.
1326

1327
    """
1328
    nodenames = self.nodes
1329
    volumes = rpc.call_node_volumes(nodenames)
1330

    
1331
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
1332
             in self.cfg.GetInstanceList()]
1333

    
1334
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1335

    
1336
    output = []
1337
    for node in nodenames:
1338
      if node not in volumes or not volumes[node]:
1339
        continue
1340

    
1341
      node_vols = volumes[node][:]
1342
      node_vols.sort(key=lambda vol: vol['dev'])
1343

    
1344
      for vol in node_vols:
1345
        node_output = []
1346
        for field in self.op.output_fields:
1347
          if field == "node":
1348
            val = node
1349
          elif field == "phys":
1350
            val = vol['dev']
1351
          elif field == "vg":
1352
            val = vol['vg']
1353
          elif field == "name":
1354
            val = vol['name']
1355
          elif field == "size":
1356
            val = int(float(vol['size']))
1357
          elif field == "instance":
1358
            for inst in ilist:
1359
              if node not in lv_by_node[inst]:
1360
                continue
1361
              if vol['name'] in lv_by_node[inst][node]:
1362
                val = inst.name
1363
                break
1364
            else:
1365
              val = '-'
1366
          else:
1367
            raise errors.ParameterError(field)
1368
          node_output.append(str(val))
1369

    
1370
        output.append(node_output)
1371

    
1372
    return output
1373

    
1374

    
1375
class LUAddNode(LogicalUnit):
1376
  """Logical unit for adding node to the cluster.
1377

1378
  """
1379
  HPATH = "node-add"
1380
  HTYPE = constants.HTYPE_NODE
1381
  _OP_REQP = ["node_name"]
1382

    
1383
  def BuildHooksEnv(self):
1384
    """Build hooks env.
1385

1386
    This will run on all nodes before, and on all nodes + the new node after.
1387

1388
    """
1389
    env = {
1390
      "NODE_NAME": self.op.node_name,
1391
      "NODE_PIP": self.op.primary_ip,
1392
      "NODE_SIP": self.op.secondary_ip,
1393
      }
1394
    nodes_0 = self.cfg.GetNodeList()
1395
    nodes_1 = nodes_0 + [self.op.node_name, ]
1396
    return env, nodes_0, nodes_1
1397

    
1398
  def CheckPrereq(self):
1399
    """Check prerequisites.
1400

1401
    This checks:
1402
     - the new node is not already in the config
1403
     - it is resolvable
1404
     - its parameters (single/dual homed) matches the cluster
1405

1406
    Any errors are signalled by raising errors.OpPrereqError.
1407

1408
    """
1409
    node_name = self.op.node_name
1410
    cfg = self.cfg
1411

    
1412
    dns_data = utils.LookupHostname(node_name)
1413
    if not dns_data:
1414
      raise errors.OpPrereqError("Node %s is not resolvable" % node_name)
1415

    
1416
    node = dns_data['hostname']
1417
    primary_ip = self.op.primary_ip = dns_data['ip']
1418
    secondary_ip = getattr(self.op, "secondary_ip", None)
1419
    if secondary_ip is None:
1420
      secondary_ip = primary_ip
1421
    if not utils.IsValidIP(secondary_ip):
1422
      raise errors.OpPrereqError("Invalid secondary IP given")
1423
    self.op.secondary_ip = secondary_ip
1424
    node_list = cfg.GetNodeList()
1425
    if node in node_list:
1426
      raise errors.OpPrereqError("Node %s is already in the configuration"
1427
                                 % node)
1428

    
1429
    for existing_node_name in node_list:
1430
      existing_node = cfg.GetNodeInfo(existing_node_name)
1431
      if (existing_node.primary_ip == primary_ip or
1432
          existing_node.secondary_ip == primary_ip or
1433
          existing_node.primary_ip == secondary_ip or
1434
          existing_node.secondary_ip == secondary_ip):
1435
        raise errors.OpPrereqError("New node ip address(es) conflict with"
1436
                                   " existing node %s" % existing_node.name)
1437

    
1438
    # check that the type of the node (single versus dual homed) is the
1439
    # same as for the master
1440
    myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1441
    master_singlehomed = myself.secondary_ip == myself.primary_ip
1442
    newbie_singlehomed = secondary_ip == primary_ip
1443
    if master_singlehomed != newbie_singlehomed:
1444
      if master_singlehomed:
1445
        raise errors.OpPrereqError("The master has no private ip but the"
1446
                                   " new node has one")
1447
      else:
1448
        raise errors.OpPrereqError("The master has a private ip but the"
1449
                                   " new node doesn't have one")
1450

    
1451
    # checks reachablity
1452
    command = ["fping", "-q", primary_ip]
1453
    result = utils.RunCmd(command)
1454
    if result.failed:
1455
      raise errors.OpPrereqError("Node not reachable by ping")
1456

    
1457
    if not newbie_singlehomed:
1458
      # check reachability from my secondary ip to newbie's secondary ip
1459
      command = ["fping", "-S%s" % myself.secondary_ip, "-q", secondary_ip]
1460
      result = utils.RunCmd(command)
1461
      if result.failed:
1462
        raise errors.OpPrereqError("Node secondary ip not reachable by ping")
1463

    
1464
    self.new_node = objects.Node(name=node,
1465
                                 primary_ip=primary_ip,
1466
                                 secondary_ip=secondary_ip)
1467

    
1468
  def Exec(self, feedback_fn):
1469
    """Adds the new node to the cluster.
1470

1471
    """
1472
    new_node = self.new_node
1473
    node = new_node.name
1474

    
1475
    # set up inter-node password and certificate and restarts the node daemon
1476
    gntpass = self.sstore.GetNodeDaemonPassword()
1477
    if not re.match('^[a-zA-Z0-9.]{1,64}$', gntpass):
1478
      raise errors.OpExecError("ganeti password corruption detected")
1479
    f = open(constants.SSL_CERT_FILE)
1480
    try:
1481
      gntpem = f.read(8192)
1482
    finally:
1483
      f.close()
1484
    # in the base64 pem encoding, neither '!' nor '.' are valid chars,
1485
    # so we use this to detect an invalid certificate; as long as the
1486
    # cert doesn't contain this, the here-document will be correctly
1487
    # parsed by the shell sequence below
1488
    if re.search('^!EOF\.', gntpem, re.MULTILINE):
1489
      raise errors.OpExecError("invalid PEM encoding in the SSL certificate")
1490
    if not gntpem.endswith("\n"):
1491
      raise errors.OpExecError("PEM must end with newline")
1492
    logger.Info("copy cluster pass to %s and starting the node daemon" % node)
1493

    
1494
    # and then connect with ssh to set password and start ganeti-noded
1495
    # note that all the below variables are sanitized at this point,
1496
    # either by being constants or by the checks above
1497
    ss = self.sstore
1498
    mycommand = ("umask 077 && "
1499
                 "echo '%s' > '%s' && "
1500
                 "cat > '%s' << '!EOF.' && \n"
1501
                 "%s!EOF.\n%s restart" %
1502
                 (gntpass, ss.KeyToFilename(ss.SS_NODED_PASS),
1503
                  constants.SSL_CERT_FILE, gntpem,
1504
                  constants.NODE_INITD_SCRIPT))
1505

    
1506
    result = ssh.SSHCall(node, 'root', mycommand, batch=False, ask_key=True)
1507
    if result.failed:
1508
      raise errors.OpExecError("Remote command on node %s, error: %s,"
1509
                               " output: %s" %
1510
                               (node, result.fail_reason, result.output))
1511

    
1512
    # check connectivity
1513
    time.sleep(4)
1514

    
1515
    result = rpc.call_version([node])[node]
1516
    if result:
1517
      if constants.PROTOCOL_VERSION == result:
1518
        logger.Info("communication to node %s fine, sw version %s match" %
1519
                    (node, result))
1520
      else:
1521
        raise errors.OpExecError("Version mismatch master version %s,"
1522
                                 " node version %s" %
1523
                                 (constants.PROTOCOL_VERSION, result))
1524
    else:
1525
      raise errors.OpExecError("Cannot get version from the new node")
1526

    
1527
    # setup ssh on node
1528
    logger.Info("copy ssh key to node %s" % node)
1529
    keyarray = []
1530
    keyfiles = ["/etc/ssh/ssh_host_dsa_key", "/etc/ssh/ssh_host_dsa_key.pub",
1531
                "/etc/ssh/ssh_host_rsa_key", "/etc/ssh/ssh_host_rsa_key.pub",
1532
                "/root/.ssh/id_dsa", "/root/.ssh/id_dsa.pub"]
1533

    
1534
    for i in keyfiles:
1535
      f = open(i, 'r')
1536
      try:
1537
        keyarray.append(f.read())
1538
      finally:
1539
        f.close()
1540

    
1541
    result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1542
                               keyarray[3], keyarray[4], keyarray[5])
1543

    
1544
    if not result:
1545
      raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1546

    
1547
    # Add node to our /etc/hosts, and add key to known_hosts
1548
    _UpdateEtcHosts(new_node.name, new_node.primary_ip)
1549
    _UpdateKnownHosts(new_node.name, new_node.primary_ip,
1550
                      self.cfg.GetHostKey())
1551

    
1552
    if new_node.secondary_ip != new_node.primary_ip:
1553
      result = ssh.SSHCall(node, "root",
1554
                           "fping -S 127.0.0.1 -q %s" % new_node.secondary_ip)
1555
      if result.failed:
1556
        raise errors.OpExecError("Node claims it doesn't have the"
1557
                                 " secondary ip you gave (%s).\n"
1558
                                 "Please fix and re-run this command." %
1559
                                 new_node.secondary_ip)
1560

    
1561
    success, msg = ssh.VerifyNodeHostname(node)
1562
    if not success:
1563
      raise errors.OpExecError("Node '%s' claims it has a different hostname"
1564
                               " than the one the resolver gives: %s.\n"
1565
                               "Please fix and re-run this command." %
1566
                               (node, msg))
1567

    
1568
    # Distribute updated /etc/hosts and known_hosts to all nodes,
1569
    # including the node just added
1570
    myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1571
    dist_nodes = self.cfg.GetNodeList() + [node]
1572
    if myself.name in dist_nodes:
1573
      dist_nodes.remove(myself.name)
1574

    
1575
    logger.Debug("Copying hosts and known_hosts to all nodes")
1576
    for fname in ("/etc/hosts", constants.SSH_KNOWN_HOSTS_FILE):
1577
      result = rpc.call_upload_file(dist_nodes, fname)
1578
      for to_node in dist_nodes:
1579
        if not result[to_node]:
1580
          logger.Error("copy of file %s to node %s failed" %
1581
                       (fname, to_node))
1582

    
1583
    to_copy = ss.GetFileList()
1584
    for fname in to_copy:
1585
      if not ssh.CopyFileToNode(node, fname):
1586
        logger.Error("could not copy file %s to node %s" % (fname, node))
1587

    
1588
    logger.Info("adding node %s to cluster.conf" % node)
1589
    self.cfg.AddNode(new_node)
1590

    
1591

    
1592
class LUMasterFailover(LogicalUnit):
1593
  """Failover the master node to the current node.
1594

1595
  This is a special LU in that it must run on a non-master node.
1596

1597
  """
1598
  HPATH = "master-failover"
1599
  HTYPE = constants.HTYPE_CLUSTER
1600
  REQ_MASTER = False
1601
  _OP_REQP = []
1602

    
1603
  def BuildHooksEnv(self):
1604
    """Build hooks env.
1605

1606
    This will run on the new master only in the pre phase, and on all
1607
    the nodes in the post phase.
1608

1609
    """
1610
    env = {
1611
      "NEW_MASTER": self.new_master,
1612
      "OLD_MASTER": self.old_master,
1613
      }
1614
    return env, [self.new_master], self.cfg.GetNodeList()
1615

    
1616
  def CheckPrereq(self):
1617
    """Check prerequisites.
1618

1619
    This checks that we are not already the master.
1620

1621
    """
1622
    self.new_master = socket.gethostname()
1623

    
1624
    self.old_master = self.sstore.GetMasterNode()
1625

    
1626
    if self.old_master == self.new_master:
1627
      raise errors.OpPrereqError("This commands must be run on the node"
1628
                                 " where you want the new master to be.\n"
1629
                                 "%s is already the master" %
1630
                                 self.old_master)
1631

    
1632
  def Exec(self, feedback_fn):
1633
    """Failover the master node.
1634

1635
    This command, when run on a non-master node, will cause the current
1636
    master to cease being master, and the non-master to become new
1637
    master.
1638

1639
    """
1640
    #TODO: do not rely on gethostname returning the FQDN
1641
    logger.Info("setting master to %s, old master: %s" %
1642
                (self.new_master, self.old_master))
1643

    
1644
    if not rpc.call_node_stop_master(self.old_master):
1645
      logger.Error("could disable the master role on the old master"
1646
                   " %s, please disable manually" % self.old_master)
1647

    
1648
    ss = self.sstore
1649
    ss.SetKey(ss.SS_MASTER_NODE, self.new_master)
1650
    if not rpc.call_upload_file(self.cfg.GetNodeList(),
1651
                                ss.KeyToFilename(ss.SS_MASTER_NODE)):
1652
      logger.Error("could not distribute the new simple store master file"
1653
                   " to the other nodes, please check.")
1654

    
1655
    if not rpc.call_node_start_master(self.new_master):
1656
      logger.Error("could not start the master role on the new master"
1657
                   " %s, please check" % self.new_master)
1658
      feedback_fn("Error in activating the master IP on the new master,\n"
1659
                  "please fix manually.")
1660

    
1661

    
1662

    
1663
class LUQueryClusterInfo(NoHooksLU):
1664
  """Query cluster configuration.
1665

1666
  """
1667
  _OP_REQP = []
1668
  REQ_MASTER = False
1669

    
1670
  def CheckPrereq(self):
1671
    """No prerequsites needed for this LU.
1672

1673
    """
1674
    pass
1675

    
1676
  def Exec(self, feedback_fn):
1677
    """Return cluster config.
1678

1679
    """
1680
    result = {
1681
      "name": self.sstore.GetClusterName(),
1682
      "software_version": constants.RELEASE_VERSION,
1683
      "protocol_version": constants.PROTOCOL_VERSION,
1684
      "config_version": constants.CONFIG_VERSION,
1685
      "os_api_version": constants.OS_API_VERSION,
1686
      "export_version": constants.EXPORT_VERSION,
1687
      "master": self.sstore.GetMasterNode(),
1688
      "architecture": (platform.architecture()[0], platform.machine()),
1689
      }
1690

    
1691
    return result
1692

    
1693

    
1694
class LUClusterCopyFile(NoHooksLU):
1695
  """Copy file to cluster.
1696

1697
  """
1698
  _OP_REQP = ["nodes", "filename"]
1699

    
1700
  def CheckPrereq(self):
1701
    """Check prerequisites.
1702

1703
    It should check that the named file exists and that the given list
1704
    of nodes is valid.
1705

1706
    """
1707
    if not os.path.exists(self.op.filename):
1708
      raise errors.OpPrereqError("No such filename '%s'" % self.op.filename)
1709

    
1710
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1711

    
1712
  def Exec(self, feedback_fn):
1713
    """Copy a file from master to some nodes.
1714

1715
    Args:
1716
      opts - class with options as members
1717
      args - list containing a single element, the file name
1718
    Opts used:
1719
      nodes - list containing the name of target nodes; if empty, all nodes
1720

1721
    """
1722
    filename = self.op.filename
1723

    
1724
    myname = socket.gethostname()
1725

    
1726
    for node in self.nodes:
1727
      if node == myname:
1728
        continue
1729
      if not ssh.CopyFileToNode(node, filename):
1730
        logger.Error("Copy of file %s to node %s failed" % (filename, node))
1731

    
1732

    
1733
class LUDumpClusterConfig(NoHooksLU):
1734
  """Return a text-representation of the cluster-config.
1735

1736
  """
1737
  _OP_REQP = []
1738

    
1739
  def CheckPrereq(self):
1740
    """No prerequisites.
1741

1742
    """
1743
    pass
1744

    
1745
  def Exec(self, feedback_fn):
1746
    """Dump a representation of the cluster config to the standard output.
1747

1748
    """
1749
    return self.cfg.DumpConfig()
1750

    
1751

    
1752
class LURunClusterCommand(NoHooksLU):
1753
  """Run a command on some nodes.
1754

1755
  """
1756
  _OP_REQP = ["command", "nodes"]
1757

    
1758
  def CheckPrereq(self):
1759
    """Check prerequisites.
1760

1761
    It checks that the given list of nodes is valid.
1762

1763
    """
1764
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1765

    
1766
  def Exec(self, feedback_fn):
1767
    """Run a command on some nodes.
1768

1769
    """
1770
    data = []
1771
    for node in self.nodes:
1772
      result = ssh.SSHCall(node, "root", self.op.command)
1773
      data.append((node, result.output, result.exit_code))
1774

    
1775
    return data
1776

    
1777

    
1778
class LUActivateInstanceDisks(NoHooksLU):
1779
  """Bring up an instance's disks.
1780

1781
  """
1782
  _OP_REQP = ["instance_name"]
1783

    
1784
  def CheckPrereq(self):
1785
    """Check prerequisites.
1786

1787
    This checks that the instance is in the cluster.
1788

1789
    """
1790
    instance = self.cfg.GetInstanceInfo(
1791
      self.cfg.ExpandInstanceName(self.op.instance_name))
1792
    if instance is None:
1793
      raise errors.OpPrereqError("Instance '%s' not known" %
1794
                                 self.op.instance_name)
1795
    self.instance = instance
1796

    
1797

    
1798
  def Exec(self, feedback_fn):
1799
    """Activate the disks.
1800

1801
    """
1802
    disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
1803
    if not disks_ok:
1804
      raise errors.OpExecError("Cannot activate block devices")
1805

    
1806
    return disks_info
1807

    
1808

    
1809
def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
1810
  """Prepare the block devices for an instance.
1811

1812
  This sets up the block devices on all nodes.
1813

1814
  Args:
1815
    instance: a ganeti.objects.Instance object
1816
    ignore_secondaries: if true, errors on secondary nodes won't result
1817
                        in an error return from the function
1818

1819
  Returns:
1820
    false if the operation failed
1821
    list of (host, instance_visible_name, node_visible_name) if the operation
1822
         suceeded with the mapping from node devices to instance devices
1823
  """
1824
  device_info = []
1825
  disks_ok = True
1826
  for inst_disk in instance.disks:
1827
    master_result = None
1828
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1829
      cfg.SetDiskID(node_disk, node)
1830
      is_primary = node == instance.primary_node
1831
      result = rpc.call_blockdev_assemble(node, node_disk, is_primary)
1832
      if not result:
1833
        logger.Error("could not prepare block device %s on node %s (is_pri"
1834
                     "mary=%s)" % (inst_disk.iv_name, node, is_primary))
1835
        if is_primary or not ignore_secondaries:
1836
          disks_ok = False
1837
      if is_primary:
1838
        master_result = result
1839
    device_info.append((instance.primary_node, inst_disk.iv_name,
1840
                        master_result))
1841

    
1842
  return disks_ok, device_info
1843

    
1844

    
1845
def _StartInstanceDisks(cfg, instance, force):
1846
  """Start the disks of an instance.
1847

1848
  """
1849
  disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
1850
                                           ignore_secondaries=force)
1851
  if not disks_ok:
1852
    _ShutdownInstanceDisks(instance, cfg)
1853
    if force is not None and not force:
1854
      logger.Error("If the message above refers to a secondary node,"
1855
                   " you can retry the operation using '--force'.")
1856
    raise errors.OpExecError("Disk consistency error")
1857

    
1858

    
1859
class LUDeactivateInstanceDisks(NoHooksLU):
1860
  """Shutdown an instance's disks.
1861

1862
  """
1863
  _OP_REQP = ["instance_name"]
1864

    
1865
  def CheckPrereq(self):
1866
    """Check prerequisites.
1867

1868
    This checks that the instance is in the cluster.
1869

1870
    """
1871
    instance = self.cfg.GetInstanceInfo(
1872
      self.cfg.ExpandInstanceName(self.op.instance_name))
1873
    if instance is None:
1874
      raise errors.OpPrereqError("Instance '%s' not known" %
1875
                                 self.op.instance_name)
1876
    self.instance = instance
1877

    
1878
  def Exec(self, feedback_fn):
1879
    """Deactivate the disks
1880

1881
    """
1882
    instance = self.instance
1883
    ins_l = rpc.call_instance_list([instance.primary_node])
1884
    ins_l = ins_l[instance.primary_node]
1885
    if not type(ins_l) is list:
1886
      raise errors.OpExecError("Can't contact node '%s'" %
1887
                               instance.primary_node)
1888

    
1889
    if self.instance.name in ins_l:
1890
      raise errors.OpExecError("Instance is running, can't shutdown"
1891
                               " block devices.")
1892

    
1893
    _ShutdownInstanceDisks(instance, self.cfg)
1894

    
1895

    
1896
def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
1897
  """Shutdown block devices of an instance.
1898

1899
  This does the shutdown on all nodes of the instance.
1900

1901
  If the ignore_primary is false, errors on the primary node are
1902
  ignored.
1903

1904
  """
1905
  result = True
1906
  for disk in instance.disks:
1907
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
1908
      cfg.SetDiskID(top_disk, node)
1909
      if not rpc.call_blockdev_shutdown(node, top_disk):
1910
        logger.Error("could not shutdown block device %s on node %s" %
1911
                     (disk.iv_name, node))
1912
        if not ignore_primary or node != instance.primary_node:
1913
          result = False
1914
  return result
1915

    
1916

    
1917
class LUStartupInstance(LogicalUnit):
1918
  """Starts an instance.
1919

1920
  """
1921
  HPATH = "instance-start"
1922
  HTYPE = constants.HTYPE_INSTANCE
1923
  _OP_REQP = ["instance_name", "force"]
1924

    
1925
  def BuildHooksEnv(self):
1926
    """Build hooks env.
1927

1928
    This runs on master, primary and secondary nodes of the instance.
1929

1930
    """
1931
    env = {
1932
      "FORCE": self.op.force,
1933
      }
1934
    env.update(_BuildInstanceHookEnvByObject(self.instance))
1935
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
1936
          list(self.instance.secondary_nodes))
1937
    return env, nl, nl
1938

    
1939
  def CheckPrereq(self):
1940
    """Check prerequisites.
1941

1942
    This checks that the instance is in the cluster.
1943

1944
    """
1945
    instance = self.cfg.GetInstanceInfo(
1946
      self.cfg.ExpandInstanceName(self.op.instance_name))
1947
    if instance is None:
1948
      raise errors.OpPrereqError("Instance '%s' not known" %
1949
                                 self.op.instance_name)
1950

    
1951
    # check bridges existance
1952
    brlist = [nic.bridge for nic in instance.nics]
1953
    if not rpc.call_bridges_exist(instance.primary_node, brlist):
1954
      raise errors.OpPrereqError("one or more target bridges %s does not"
1955
                                 " exist on destination node '%s'" %
1956
                                 (brlist, instance.primary_node))
1957

    
1958
    self.instance = instance
1959
    self.op.instance_name = instance.name
1960

    
1961
  def Exec(self, feedback_fn):
1962
    """Start the instance.
1963

1964
    """
1965
    instance = self.instance
1966
    force = self.op.force
1967
    extra_args = getattr(self.op, "extra_args", "")
1968

    
1969
    node_current = instance.primary_node
1970

    
1971
    nodeinfo = rpc.call_node_info([node_current], self.cfg.GetVGName())
1972
    if not nodeinfo:
1973
      raise errors.OpExecError("Could not contact node %s for infos" %
1974
                               (node_current))
1975

    
1976
    freememory = nodeinfo[node_current]['memory_free']
1977
    memory = instance.memory
1978
    if memory > freememory:
1979
      raise errors.OpExecError("Not enough memory to start instance"
1980
                               " %s on node %s"
1981
                               " needed %s MiB, available %s MiB" %
1982
                               (instance.name, node_current, memory,
1983
                                freememory))
1984

    
1985
    _StartInstanceDisks(self.cfg, instance, force)
1986

    
1987
    if not rpc.call_instance_start(node_current, instance, extra_args):
1988
      _ShutdownInstanceDisks(instance, self.cfg)
1989
      raise errors.OpExecError("Could not start instance")
1990

    
1991
    self.cfg.MarkInstanceUp(instance.name)
1992

    
1993

    
1994
class LUShutdownInstance(LogicalUnit):
1995
  """Shutdown an instance.
1996

1997
  """
1998
  HPATH = "instance-stop"
1999
  HTYPE = constants.HTYPE_INSTANCE
2000
  _OP_REQP = ["instance_name"]
2001

    
2002
  def BuildHooksEnv(self):
2003
    """Build hooks env.
2004

2005
    This runs on master, primary and secondary nodes of the instance.
2006

2007
    """
2008
    env = _BuildInstanceHookEnvByObject(self.instance)
2009
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2010
          list(self.instance.secondary_nodes))
2011
    return env, nl, nl
2012

    
2013
  def CheckPrereq(self):
2014
    """Check prerequisites.
2015

2016
    This checks that the instance is in the cluster.
2017

2018
    """
2019
    instance = self.cfg.GetInstanceInfo(
2020
      self.cfg.ExpandInstanceName(self.op.instance_name))
2021
    if instance is None:
2022
      raise errors.OpPrereqError("Instance '%s' not known" %
2023
                                 self.op.instance_name)
2024
    self.instance = instance
2025

    
2026
  def Exec(self, feedback_fn):
2027
    """Shutdown the instance.
2028

2029
    """
2030
    instance = self.instance
2031
    node_current = instance.primary_node
2032
    if not rpc.call_instance_shutdown(node_current, instance):
2033
      logger.Error("could not shutdown instance")
2034

    
2035
    self.cfg.MarkInstanceDown(instance.name)
2036
    _ShutdownInstanceDisks(instance, self.cfg)
2037

    
2038

    
2039
class LUReinstallInstance(LogicalUnit):
2040
  """Reinstall an instance.
2041

2042
  """
2043
  HPATH = "instance-reinstall"
2044
  HTYPE = constants.HTYPE_INSTANCE
2045
  _OP_REQP = ["instance_name"]
2046

    
2047
  def BuildHooksEnv(self):
2048
    """Build hooks env.
2049

2050
    This runs on master, primary and secondary nodes of the instance.
2051

2052
    """
2053
    env = _BuildInstanceHookEnvByObject(self.instance)
2054
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2055
          list(self.instance.secondary_nodes))
2056
    return env, nl, nl
2057

    
2058
  def CheckPrereq(self):
2059
    """Check prerequisites.
2060

2061
    This checks that the instance is in the cluster and is not running.
2062

2063
    """
2064
    instance = self.cfg.GetInstanceInfo(
2065
      self.cfg.ExpandInstanceName(self.op.instance_name))
2066
    if instance is None:
2067
      raise errors.OpPrereqError("Instance '%s' not known" %
2068
                                 self.op.instance_name)
2069
    if instance.disk_template == constants.DT_DISKLESS:
2070
      raise errors.OpPrereqError("Instance '%s' has no disks" %
2071
                                 self.op.instance_name)
2072
    if instance.status != "down":
2073
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2074
                                 self.op.instance_name)
2075
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2076
    if remote_info:
2077
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2078
                                 (self.op.instance_name,
2079
                                  instance.primary_node))
2080

    
2081
    self.op.os_type = getattr(self.op, "os_type", None)
2082
    if self.op.os_type is not None:
2083
      # OS verification
2084
      pnode = self.cfg.GetNodeInfo(
2085
        self.cfg.ExpandNodeName(instance.primary_node))
2086
      if pnode is None:
2087
        raise errors.OpPrereqError("Primary node '%s' is unknown" %
2088
                                   self.op.pnode)
2089
      os_obj = rpc.call_os_get([pnode.name], self.op.os_type)[pnode.name]
2090
      if not isinstance(os_obj, objects.OS):
2091
        raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2092
                                   " primary node"  % self.op.os_type)
2093

    
2094
    self.instance = instance
2095

    
2096
  def Exec(self, feedback_fn):
2097
    """Reinstall the instance.
2098

2099
    """
2100
    inst = self.instance
2101

    
2102
    if self.op.os_type is not None:
2103
      feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2104
      inst.os = self.op.os_type
2105
      self.cfg.AddInstance(inst)
2106

    
2107
    _StartInstanceDisks(self.cfg, inst, None)
2108
    try:
2109
      feedback_fn("Running the instance OS create scripts...")
2110
      if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2111
        raise errors.OpExecError("Could not install OS for instance %s "
2112
                                 "on node %s" %
2113
                                 (inst.name, inst.primary_node))
2114
    finally:
2115
      _ShutdownInstanceDisks(inst, self.cfg)
2116

    
2117

    
2118
class LURenameInstance(LogicalUnit):
2119
  """Rename an instance.
2120

2121
  """
2122
  HPATH = "instance-rename"
2123
  HTYPE = constants.HTYPE_INSTANCE
2124
  _OP_REQP = ["instance_name", "new_name"]
2125

    
2126
  def BuildHooksEnv(self):
2127
    """Build hooks env.
2128

2129
    This runs on master, primary and secondary nodes of the instance.
2130

2131
    """
2132
    env = _BuildInstanceHookEnvByObject(self.instance)
2133
    env["INSTANCE_NEW_NAME"] = self.op.new_name
2134
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2135
          list(self.instance.secondary_nodes))
2136
    return env, nl, nl
2137

    
2138
  def CheckPrereq(self):
2139
    """Check prerequisites.
2140

2141
    This checks that the instance is in the cluster and is not running.
2142

2143
    """
2144
    instance = self.cfg.GetInstanceInfo(
2145
      self.cfg.ExpandInstanceName(self.op.instance_name))
2146
    if instance is None:
2147
      raise errors.OpPrereqError("Instance '%s' not known" %
2148
                                 self.op.instance_name)
2149
    if instance.status != "down":
2150
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2151
                                 self.op.instance_name)
2152
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2153
    if remote_info:
2154
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2155
                                 (self.op.instance_name,
2156
                                  instance.primary_node))
2157
    self.instance = instance
2158

    
2159
    # new name verification
2160
    hostname1 = utils.LookupHostname(self.op.new_name)
2161
    if not hostname1:
2162
      raise errors.OpPrereqError("New instance name '%s' not found in dns" %
2163
                                 self.op.new_name)
2164

    
2165
    self.op.new_name = new_name = hostname1['hostname']
2166
    if not getattr(self.op, "ignore_ip", False):
2167
      command = ["fping", "-q", hostname1['ip']]
2168
      result = utils.RunCmd(command)
2169
      if not result.failed:
2170
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
2171
                                   (hostname1['ip'], new_name))
2172

    
2173

    
2174
  def Exec(self, feedback_fn):
2175
    """Reinstall the instance.
2176

2177
    """
2178
    inst = self.instance
2179
    old_name = inst.name
2180

    
2181
    self.cfg.RenameInstance(inst.name, self.op.new_name)
2182

    
2183
    # re-read the instance from the configuration after rename
2184
    inst = self.cfg.GetInstanceInfo(self.op.new_name)
2185

    
2186
    _StartInstanceDisks(self.cfg, inst, None)
2187
    try:
2188
      if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2189
                                          "sda", "sdb"):
2190
        msg = ("Could run OS rename script for instance %s\n"
2191
               "on node %s\n"
2192
               "(but the instance has been renamed in Ganeti)" %
2193
               (inst.name, inst.primary_node))
2194
        logger.Error(msg)
2195
    finally:
2196
      _ShutdownInstanceDisks(inst, self.cfg)
2197

    
2198

    
2199
class LURemoveInstance(LogicalUnit):
2200
  """Remove an instance.
2201

2202
  """
2203
  HPATH = "instance-remove"
2204
  HTYPE = constants.HTYPE_INSTANCE
2205
  _OP_REQP = ["instance_name"]
2206

    
2207
  def BuildHooksEnv(self):
2208
    """Build hooks env.
2209

2210
    This runs on master, primary and secondary nodes of the instance.
2211

2212
    """
2213
    env = _BuildInstanceHookEnvByObject(self.instance)
2214
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2215
          list(self.instance.secondary_nodes))
2216
    return env, nl, nl
2217

    
2218
  def CheckPrereq(self):
2219
    """Check prerequisites.
2220

2221
    This checks that the instance is in the cluster.
2222

2223
    """
2224
    instance = self.cfg.GetInstanceInfo(
2225
      self.cfg.ExpandInstanceName(self.op.instance_name))
2226
    if instance is None:
2227
      raise errors.OpPrereqError("Instance '%s' not known" %
2228
                                 self.op.instance_name)
2229
    self.instance = instance
2230

    
2231
  def Exec(self, feedback_fn):
2232
    """Remove the instance.
2233

2234
    """
2235
    instance = self.instance
2236
    logger.Info("shutting down instance %s on node %s" %
2237
                (instance.name, instance.primary_node))
2238

    
2239
    if not rpc.call_instance_shutdown(instance.primary_node, instance):
2240
      raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2241
                               (instance.name, instance.primary_node))
2242

    
2243
    logger.Info("removing block devices for instance %s" % instance.name)
2244

    
2245
    _RemoveDisks(instance, self.cfg)
2246

    
2247
    logger.Info("removing instance %s out of cluster config" % instance.name)
2248

    
2249
    self.cfg.RemoveInstance(instance.name)
2250

    
2251

    
2252
class LUQueryInstances(NoHooksLU):
2253
  """Logical unit for querying instances.
2254

2255
  """
2256
  _OP_REQP = ["output_fields", "names"]
2257

    
2258
  def CheckPrereq(self):
2259
    """Check prerequisites.
2260

2261
    This checks that the fields required are valid output fields.
2262

2263
    """
2264
    self.dynamic_fields = frozenset(["oper_state", "oper_ram"])
2265
    _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2266
                               "admin_state", "admin_ram",
2267
                               "disk_template", "ip", "mac", "bridge",
2268
                               "sda_size", "sdb_size"],
2269
                       dynamic=self.dynamic_fields,
2270
                       selected=self.op.output_fields)
2271

    
2272
    self.wanted = _GetWantedInstances(self, self.op.names)
2273

    
2274
  def Exec(self, feedback_fn):
2275
    """Computes the list of nodes and their attributes.
2276

2277
    """
2278
    instance_names = self.wanted
2279
    instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2280
                     in instance_names]
2281

    
2282
    # begin data gathering
2283

    
2284
    nodes = frozenset([inst.primary_node for inst in instance_list])
2285

    
2286
    bad_nodes = []
2287
    if self.dynamic_fields.intersection(self.op.output_fields):
2288
      live_data = {}
2289
      node_data = rpc.call_all_instances_info(nodes)
2290
      for name in nodes:
2291
        result = node_data[name]
2292
        if result:
2293
          live_data.update(result)
2294
        elif result == False:
2295
          bad_nodes.append(name)
2296
        # else no instance is alive
2297
    else:
2298
      live_data = dict([(name, {}) for name in instance_names])
2299

    
2300
    # end data gathering
2301

    
2302
    output = []
2303
    for instance in instance_list:
2304
      iout = []
2305
      for field in self.op.output_fields:
2306
        if field == "name":
2307
          val = instance.name
2308
        elif field == "os":
2309
          val = instance.os
2310
        elif field == "pnode":
2311
          val = instance.primary_node
2312
        elif field == "snodes":
2313
          val = list(instance.secondary_nodes)
2314
        elif field == "admin_state":
2315
          val = (instance.status != "down")
2316
        elif field == "oper_state":
2317
          if instance.primary_node in bad_nodes:
2318
            val = None
2319
          else:
2320
            val = bool(live_data.get(instance.name))
2321
        elif field == "admin_ram":
2322
          val = instance.memory
2323
        elif field == "oper_ram":
2324
          if instance.primary_node in bad_nodes:
2325
            val = None
2326
          elif instance.name in live_data:
2327
            val = live_data[instance.name].get("memory", "?")
2328
          else:
2329
            val = "-"
2330
        elif field == "disk_template":
2331
          val = instance.disk_template
2332
        elif field == "ip":
2333
          val = instance.nics[0].ip
2334
        elif field == "bridge":
2335
          val = instance.nics[0].bridge
2336
        elif field == "mac":
2337
          val = instance.nics[0].mac
2338
        elif field == "sda_size" or field == "sdb_size":
2339
          disk = instance.FindDisk(field[:3])
2340
          if disk is None:
2341
            val = None
2342
          else:
2343
            val = disk.size
2344
        else:
2345
          raise errors.ParameterError(field)
2346
        iout.append(val)
2347
      output.append(iout)
2348

    
2349
    return output
2350

    
2351

    
2352
class LUFailoverInstance(LogicalUnit):
2353
  """Failover an instance.
2354

2355
  """
2356
  HPATH = "instance-failover"
2357
  HTYPE = constants.HTYPE_INSTANCE
2358
  _OP_REQP = ["instance_name", "ignore_consistency"]
2359

    
2360
  def BuildHooksEnv(self):
2361
    """Build hooks env.
2362

2363
    This runs on master, primary and secondary nodes of the instance.
2364

2365
    """
2366
    env = {
2367
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2368
      }
2369
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2370
    nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2371
    return env, nl, nl
2372

    
2373
  def CheckPrereq(self):
2374
    """Check prerequisites.
2375

2376
    This checks that the instance is in the cluster.
2377

2378
    """
2379
    instance = self.cfg.GetInstanceInfo(
2380
      self.cfg.ExpandInstanceName(self.op.instance_name))
2381
    if instance is None:
2382
      raise errors.OpPrereqError("Instance '%s' not known" %
2383
                                 self.op.instance_name)
2384

    
2385
    if instance.disk_template != constants.DT_REMOTE_RAID1:
2386
      raise errors.OpPrereqError("Instance's disk layout is not"
2387
                                 " remote_raid1.")
2388

    
2389
    secondary_nodes = instance.secondary_nodes
2390
    if not secondary_nodes:
2391
      raise errors.ProgrammerError("no secondary node but using "
2392
                                   "DT_REMOTE_RAID1 template")
2393

    
2394
    # check memory requirements on the secondary node
2395
    target_node = secondary_nodes[0]
2396
    nodeinfo = rpc.call_node_info([target_node], self.cfg.GetVGName())
2397
    info = nodeinfo.get(target_node, None)
2398
    if not info:
2399
      raise errors.OpPrereqError("Cannot get current information"
2400
                                 " from node '%s'" % nodeinfo)
2401
    if instance.memory > info['memory_free']:
2402
      raise errors.OpPrereqError("Not enough memory on target node %s."
2403
                                 " %d MB available, %d MB required" %
2404
                                 (target_node, info['memory_free'],
2405
                                  instance.memory))
2406

    
2407
    # check bridge existance
2408
    brlist = [nic.bridge for nic in instance.nics]
2409
    if not rpc.call_bridges_exist(instance.primary_node, brlist):
2410
      raise errors.OpPrereqError("One or more target bridges %s does not"
2411
                                 " exist on destination node '%s'" %
2412
                                 (brlist, instance.primary_node))
2413

    
2414
    self.instance = instance
2415

    
2416
  def Exec(self, feedback_fn):
2417
    """Failover an instance.
2418

2419
    The failover is done by shutting it down on its present node and
2420
    starting it on the secondary.
2421

2422
    """
2423
    instance = self.instance
2424

    
2425
    source_node = instance.primary_node
2426
    target_node = instance.secondary_nodes[0]
2427

    
2428
    feedback_fn("* checking disk consistency between source and target")
2429
    for dev in instance.disks:
2430
      # for remote_raid1, these are md over drbd
2431
      if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2432
        if not self.op.ignore_consistency:
2433
          raise errors.OpExecError("Disk %s is degraded on target node,"
2434
                                   " aborting failover." % dev.iv_name)
2435

    
2436
    feedback_fn("* checking target node resource availability")
2437
    nodeinfo = rpc.call_node_info([target_node], self.cfg.GetVGName())
2438

    
2439
    if not nodeinfo:
2440
      raise errors.OpExecError("Could not contact target node %s." %
2441
                               target_node)
2442

    
2443
    free_memory = int(nodeinfo[target_node]['memory_free'])
2444
    memory = instance.memory
2445
    if memory > free_memory:
2446
      raise errors.OpExecError("Not enough memory to create instance %s on"
2447
                               " node %s. needed %s MiB, available %s MiB" %
2448
                               (instance.name, target_node, memory,
2449
                                free_memory))
2450

    
2451
    feedback_fn("* shutting down instance on source node")
2452
    logger.Info("Shutting down instance %s on node %s" %
2453
                (instance.name, source_node))
2454

    
2455
    if not rpc.call_instance_shutdown(source_node, instance):
2456
      logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2457
                   " anyway. Please make sure node %s is down"  %
2458
                   (instance.name, source_node, source_node))
2459

    
2460
    feedback_fn("* deactivating the instance's disks on source node")
2461
    if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2462
      raise errors.OpExecError("Can't shut down the instance's disks.")
2463

    
2464
    instance.primary_node = target_node
2465
    # distribute new instance config to the other nodes
2466
    self.cfg.AddInstance(instance)
2467

    
2468
    feedback_fn("* activating the instance's disks on target node")
2469
    logger.Info("Starting instance %s on node %s" %
2470
                (instance.name, target_node))
2471

    
2472
    disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2473
                                             ignore_secondaries=True)
2474
    if not disks_ok:
2475
      _ShutdownInstanceDisks(instance, self.cfg)
2476
      raise errors.OpExecError("Can't activate the instance's disks")
2477

    
2478
    feedback_fn("* starting the instance on the target node")
2479
    if not rpc.call_instance_start(target_node, instance, None):
2480
      _ShutdownInstanceDisks(instance, self.cfg)
2481
      raise errors.OpExecError("Could not start instance %s on node %s." %
2482
                               (instance.name, target_node))
2483

    
2484

    
2485
def _CreateBlockDevOnPrimary(cfg, node, device, info):
2486
  """Create a tree of block devices on the primary node.
2487

2488
  This always creates all devices.
2489

2490
  """
2491
  if device.children:
2492
    for child in device.children:
2493
      if not _CreateBlockDevOnPrimary(cfg, node, child, info):
2494
        return False
2495

    
2496
  cfg.SetDiskID(device, node)
2497
  new_id = rpc.call_blockdev_create(node, device, device.size, True, info)
2498
  if not new_id:
2499
    return False
2500
  if device.physical_id is None:
2501
    device.physical_id = new_id
2502
  return True
2503

    
2504

    
2505
def _CreateBlockDevOnSecondary(cfg, node, device, force, info):
2506
  """Create a tree of block devices on a secondary node.
2507

2508
  If this device type has to be created on secondaries, create it and
2509
  all its children.
2510

2511
  If not, just recurse to children keeping the same 'force' value.
2512

2513
  """
2514
  if device.CreateOnSecondary():
2515
    force = True
2516
  if device.children:
2517
    for child in device.children:
2518
      if not _CreateBlockDevOnSecondary(cfg, node, child, force, info):
2519
        return False
2520

    
2521
  if not force:
2522
    return True
2523
  cfg.SetDiskID(device, node)
2524
  new_id = rpc.call_blockdev_create(node, device, device.size, False, info)
2525
  if not new_id:
2526
    return False
2527
  if device.physical_id is None:
2528
    device.physical_id = new_id
2529
  return True
2530

    
2531

    
2532
def _GenerateUniqueNames(cfg, exts):
2533
  """Generate a suitable LV name.
2534

2535
  This will generate a logical volume name for the given instance.
2536

2537
  """
2538
  results = []
2539
  for val in exts:
2540
    new_id = cfg.GenerateUniqueID()
2541
    results.append("%s%s" % (new_id, val))
2542
  return results
2543

    
2544

    
2545
def _GenerateMDDRBDBranch(cfg, primary, secondary, size, names):
2546
  """Generate a drbd device complete with its children.
2547

2548
  """
2549
  port = cfg.AllocatePort()
2550
  vgname = cfg.GetVGName()
2551
  dev_data = objects.Disk(dev_type="lvm", size=size,
2552
                          logical_id=(vgname, names[0]))
2553
  dev_meta = objects.Disk(dev_type="lvm", size=128,
2554
                          logical_id=(vgname, names[1]))
2555
  drbd_dev = objects.Disk(dev_type="drbd", size=size,
2556
                          logical_id = (primary, secondary, port),
2557
                          children = [dev_data, dev_meta])
2558
  return drbd_dev
2559

    
2560

    
2561
def _GenerateDiskTemplate(cfg, template_name,
2562
                          instance_name, primary_node,
2563
                          secondary_nodes, disk_sz, swap_sz):
2564
  """Generate the entire disk layout for a given template type.
2565

2566
  """
2567
  #TODO: compute space requirements
2568

    
2569
  vgname = cfg.GetVGName()
2570
  if template_name == "diskless":
2571
    disks = []
2572
  elif template_name == "plain":
2573
    if len(secondary_nodes) != 0:
2574
      raise errors.ProgrammerError("Wrong template configuration")
2575

    
2576
    names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
2577
    sda_dev = objects.Disk(dev_type="lvm", size=disk_sz,
2578
                           logical_id=(vgname, names[0]),
2579
                           iv_name = "sda")
2580
    sdb_dev = objects.Disk(dev_type="lvm", size=swap_sz,
2581
                           logical_id=(vgname, names[1]),
2582
                           iv_name = "sdb")
2583
    disks = [sda_dev, sdb_dev]
2584
  elif template_name == "local_raid1":
2585
    if len(secondary_nodes) != 0:
2586
      raise errors.ProgrammerError("Wrong template configuration")
2587

    
2588

    
2589
    names = _GenerateUniqueNames(cfg, [".sda_m1", ".sda_m2",
2590
                                       ".sdb_m1", ".sdb_m2"])
2591
    sda_dev_m1 = objects.Disk(dev_type="lvm", size=disk_sz,
2592
                              logical_id=(vgname, names[0]))
2593
    sda_dev_m2 = objects.Disk(dev_type="lvm", size=disk_sz,
2594
                              logical_id=(vgname, names[1]))
2595
    md_sda_dev = objects.Disk(dev_type="md_raid1", iv_name = "sda",
2596
                              size=disk_sz,
2597
                              children = [sda_dev_m1, sda_dev_m2])
2598
    sdb_dev_m1 = objects.Disk(dev_type="lvm", size=swap_sz,
2599
                              logical_id=(vgname, names[2]))
2600
    sdb_dev_m2 = objects.Disk(dev_type="lvm", size=swap_sz,
2601
                              logical_id=(vgname, names[3]))
2602
    md_sdb_dev = objects.Disk(dev_type="md_raid1", iv_name = "sdb",
2603
                              size=swap_sz,
2604
                              children = [sdb_dev_m1, sdb_dev_m2])
2605
    disks = [md_sda_dev, md_sdb_dev]
2606
  elif template_name == constants.DT_REMOTE_RAID1:
2607
    if len(secondary_nodes) != 1:
2608
      raise errors.ProgrammerError("Wrong template configuration")
2609
    remote_node = secondary_nodes[0]
2610
    names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2611
                                       ".sdb_data", ".sdb_meta"])
2612
    drbd_sda_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2613
                                         disk_sz, names[0:2])
2614
    md_sda_dev = objects.Disk(dev_type="md_raid1", iv_name="sda",
2615
                              children = [drbd_sda_dev], size=disk_sz)
2616
    drbd_sdb_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2617
                                         swap_sz, names[2:4])
2618
    md_sdb_dev = objects.Disk(dev_type="md_raid1", iv_name="sdb",
2619
                              children = [drbd_sdb_dev], size=swap_sz)
2620
    disks = [md_sda_dev, md_sdb_dev]
2621
  else:
2622
    raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
2623
  return disks
2624

    
2625

    
2626
def _GetInstanceInfoText(instance):
2627
  """Compute that text that should be added to the disk's metadata.
2628

2629
  """
2630
  return "originstname+%s" % instance.name
2631

    
2632

    
2633
def _CreateDisks(cfg, instance):
2634
  """Create all disks for an instance.
2635

2636
  This abstracts away some work from AddInstance.
2637

2638
  Args:
2639
    instance: the instance object
2640

2641
  Returns:
2642
    True or False showing the success of the creation process
2643

2644
  """
2645
  info = _GetInstanceInfoText(instance)
2646

    
2647
  for device in instance.disks:
2648
    logger.Info("creating volume %s for instance %s" %
2649
              (device.iv_name, instance.name))
2650
    #HARDCODE
2651
    for secondary_node in instance.secondary_nodes:
2652
      if not _CreateBlockDevOnSecondary(cfg, secondary_node, device, False,
2653
                                        info):
2654
        logger.Error("failed to create volume %s (%s) on secondary node %s!" %
2655
                     (device.iv_name, device, secondary_node))
2656
        return False
2657
    #HARDCODE
2658
    if not _CreateBlockDevOnPrimary(cfg, instance.primary_node, device, info):
2659
      logger.Error("failed to create volume %s on primary!" %
2660
                   device.iv_name)
2661
      return False
2662
  return True
2663

    
2664

    
2665
def _RemoveDisks(instance, cfg):
2666
  """Remove all disks for an instance.
2667

2668
  This abstracts away some work from `AddInstance()` and
2669
  `RemoveInstance()`. Note that in case some of the devices couldn't
2670
  be remove, the removal will continue with the other ones (compare
2671
  with `_CreateDisks()`).
2672

2673
  Args:
2674
    instance: the instance object
2675

2676
  Returns:
2677
    True or False showing the success of the removal proces
2678

2679
  """
2680
  logger.Info("removing block devices for instance %s" % instance.name)
2681

    
2682
  result = True
2683
  for device in instance.disks:
2684
    for node, disk in device.ComputeNodeTree(instance.primary_node):
2685
      cfg.SetDiskID(disk, node)
2686
      if not rpc.call_blockdev_remove(node, disk):
2687
        logger.Error("could not remove block device %s on node %s,"
2688
                     " continuing anyway" %
2689
                     (device.iv_name, node))
2690
        result = False
2691
  return result
2692

    
2693

    
2694
class LUCreateInstance(LogicalUnit):
2695
  """Create an instance.
2696

2697
  """
2698
  HPATH = "instance-add"
2699
  HTYPE = constants.HTYPE_INSTANCE
2700
  _OP_REQP = ["instance_name", "mem_size", "disk_size", "pnode",
2701
              "disk_template", "swap_size", "mode", "start", "vcpus",
2702
              "wait_for_sync"]
2703

    
2704
  def BuildHooksEnv(self):
2705
    """Build hooks env.
2706

2707
    This runs on master, primary and secondary nodes of the instance.
2708

2709
    """
2710
    env = {
2711
      "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
2712
      "INSTANCE_DISK_SIZE": self.op.disk_size,
2713
      "INSTANCE_SWAP_SIZE": self.op.swap_size,
2714
      "INSTANCE_ADD_MODE": self.op.mode,
2715
      }
2716
    if self.op.mode == constants.INSTANCE_IMPORT:
2717
      env["INSTANCE_SRC_NODE"] = self.op.src_node
2718
      env["INSTANCE_SRC_PATH"] = self.op.src_path
2719
      env["INSTANCE_SRC_IMAGE"] = self.src_image
2720

    
2721
    env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
2722
      primary_node=self.op.pnode,
2723
      secondary_nodes=self.secondaries,
2724
      status=self.instance_status,
2725
      os_type=self.op.os_type,
2726
      memory=self.op.mem_size,
2727
      vcpus=self.op.vcpus,
2728
      nics=[(self.inst_ip, self.op.bridge)],
2729
    ))
2730

    
2731
    nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
2732
          self.secondaries)
2733
    return env, nl, nl
2734

    
2735

    
2736
  def CheckPrereq(self):
2737
    """Check prerequisites.
2738

2739
    """
2740
    if self.op.mode not in (constants.INSTANCE_CREATE,
2741
                            constants.INSTANCE_IMPORT):
2742
      raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
2743
                                 self.op.mode)
2744

    
2745
    if self.op.mode == constants.INSTANCE_IMPORT:
2746
      src_node = getattr(self.op, "src_node", None)
2747
      src_path = getattr(self.op, "src_path", None)
2748
      if src_node is None or src_path is None:
2749
        raise errors.OpPrereqError("Importing an instance requires source"
2750
                                   " node and path options")
2751
      src_node_full = self.cfg.ExpandNodeName(src_node)
2752
      if src_node_full is None:
2753
        raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
2754
      self.op.src_node = src_node = src_node_full
2755

    
2756
      if not os.path.isabs(src_path):
2757
        raise errors.OpPrereqError("The source path must be absolute")
2758

    
2759
      export_info = rpc.call_export_info(src_node, src_path)
2760

    
2761
      if not export_info:
2762
        raise errors.OpPrereqError("No export found in dir %s" % src_path)
2763

    
2764
      if not export_info.has_section(constants.INISECT_EXP):
2765
        raise errors.ProgrammerError("Corrupted export config")
2766

    
2767
      ei_version = export_info.get(constants.INISECT_EXP, 'version')
2768
      if (int(ei_version) != constants.EXPORT_VERSION):
2769
        raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
2770
                                   (ei_version, constants.EXPORT_VERSION))
2771

    
2772
      if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
2773
        raise errors.OpPrereqError("Can't import instance with more than"
2774
                                   " one data disk")
2775

    
2776
      # FIXME: are the old os-es, disk sizes, etc. useful?
2777
      self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
2778
      diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
2779
                                                         'disk0_dump'))
2780
      self.src_image = diskimage
2781
    else: # INSTANCE_CREATE
2782
      if getattr(self.op, "os_type", None) is None:
2783
        raise errors.OpPrereqError("No guest OS specified")
2784

    
2785
    # check primary node
2786
    pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
2787
    if pnode is None:
2788
      raise errors.OpPrereqError("Primary node '%s' is unknown" %
2789
                                 self.op.pnode)
2790
    self.op.pnode = pnode.name
2791
    self.pnode = pnode
2792
    self.secondaries = []
2793
    # disk template and mirror node verification
2794
    if self.op.disk_template not in constants.DISK_TEMPLATES:
2795
      raise errors.OpPrereqError("Invalid disk template name")
2796

    
2797
    if self.op.disk_template == constants.DT_REMOTE_RAID1:
2798
      if getattr(self.op, "snode", None) is None:
2799
        raise errors.OpPrereqError("The 'remote_raid1' disk template needs"
2800
                                   " a mirror node")
2801

    
2802
      snode_name = self.cfg.ExpandNodeName(self.op.snode)
2803
      if snode_name is None:
2804
        raise errors.OpPrereqError("Unknown secondary node '%s'" %
2805
                                   self.op.snode)
2806
      elif snode_name == pnode.name:
2807
        raise errors.OpPrereqError("The secondary node cannot be"
2808
                                   " the primary node.")
2809
      self.secondaries.append(snode_name)
2810

    
2811
    # Check lv size requirements
2812
    nodenames = [pnode.name] + self.secondaries
2813
    nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
2814

    
2815
    # Required free disk space as a function of disk and swap space
2816
    req_size_dict = {
2817
      constants.DT_DISKLESS: 0,
2818
      constants.DT_PLAIN: self.op.disk_size + self.op.swap_size,
2819
      constants.DT_LOCAL_RAID1: (self.op.disk_size + self.op.swap_size) * 2,
2820
      # 256 MB are added for drbd metadata, 128MB for each drbd device
2821
      constants.DT_REMOTE_RAID1: self.op.disk_size + self.op.swap_size + 256,
2822
    }
2823

    
2824
    if self.op.disk_template not in req_size_dict:
2825
      raise errors.ProgrammerError("Disk template '%s' size requirement"
2826
                                   " is unknown" %  self.op.disk_template)
2827

    
2828
    req_size = req_size_dict[self.op.disk_template]
2829

    
2830
    for node in nodenames:
2831
      info = nodeinfo.get(node, None)
2832
      if not info:
2833
        raise errors.OpPrereqError("Cannot get current information"
2834
                                   " from node '%s'" % nodeinfo)
2835
      if req_size > info['vg_free']:
2836
        raise errors.OpPrereqError("Not enough disk space on target node %s."
2837
                                   " %d MB available, %d MB required" %
2838
                                   (node, info['vg_free'], req_size))
2839

    
2840
    # os verification
2841
    os_obj = rpc.call_os_get([pnode.name], self.op.os_type)[pnode.name]
2842
    if not isinstance(os_obj, objects.OS):
2843
      raise errors.OpPrereqError("OS '%s' not in supported os list for"
2844
                                 " primary node"  % self.op.os_type)
2845

    
2846
    # instance verification
2847
    hostname1 = utils.LookupHostname(self.op.instance_name)
2848
    if not hostname1:
2849
      raise errors.OpPrereqError("Instance name '%s' not found in dns" %
2850
                                 self.op.instance_name)
2851

    
2852
    self.op.instance_name = instance_name = hostname1['hostname']
2853
    instance_list = self.cfg.GetInstanceList()
2854
    if instance_name in instance_list:
2855
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2856
                                 instance_name)
2857

    
2858
    ip = getattr(self.op, "ip", None)
2859
    if ip is None or ip.lower() == "none":
2860
      inst_ip = None
2861
    elif ip.lower() == "auto":
2862
      inst_ip = hostname1['ip']
2863
    else:
2864
      if not utils.IsValidIP(ip):
2865
        raise errors.OpPrereqError("given IP address '%s' doesn't look"
2866
                                   " like a valid IP" % ip)
2867
      inst_ip = ip
2868
    self.inst_ip = inst_ip
2869

    
2870
    command = ["fping", "-q", hostname1['ip']]
2871
    result = utils.RunCmd(command)
2872
    if not result.failed:
2873
      raise errors.OpPrereqError("IP %s of instance %s already in use" %
2874
                                 (hostname1['ip'], instance_name))
2875

    
2876
    # bridge verification
2877
    bridge = getattr(self.op, "bridge", None)
2878
    if bridge is None:
2879
      self.op.bridge = self.cfg.GetDefBridge()
2880
    else:
2881
      self.op.bridge = bridge
2882

    
2883
    if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
2884
      raise errors.OpPrereqError("target bridge '%s' does not exist on"
2885
                                 " destination node '%s'" %
2886
                                 (self.op.bridge, pnode.name))
2887

    
2888
    if self.op.start:
2889
      self.instance_status = 'up'
2890
    else:
2891
      self.instance_status = 'down'
2892

    
2893
  def Exec(self, feedback_fn):
2894
    """Create and add the instance to the cluster.
2895

2896
    """
2897
    instance = self.op.instance_name
2898
    pnode_name = self.pnode.name
2899

    
2900
    nic = objects.NIC(bridge=self.op.bridge, mac=self.cfg.GenerateMAC())
2901
    if self.inst_ip is not None:
2902
      nic.ip = self.inst_ip
2903

    
2904
    disks = _GenerateDiskTemplate(self.cfg,
2905
                                  self.op.disk_template,
2906
                                  instance, pnode_name,
2907
                                  self.secondaries, self.op.disk_size,
2908
                                  self.op.swap_size)
2909

    
2910
    iobj = objects.Instance(name=instance, os=self.op.os_type,
2911
                            primary_node=pnode_name,
2912
                            memory=self.op.mem_size,
2913
                            vcpus=self.op.vcpus,
2914
                            nics=[nic], disks=disks,
2915
                            disk_template=self.op.disk_template,
2916
                            status=self.instance_status,
2917
                            )
2918

    
2919
    feedback_fn("* creating instance disks...")
2920
    if not _CreateDisks(self.cfg, iobj):
2921
      _RemoveDisks(iobj, self.cfg)
2922
      raise errors.OpExecError("Device creation failed, reverting...")
2923

    
2924
    feedback_fn("adding instance %s to cluster config" % instance)
2925

    
2926
    self.cfg.AddInstance(iobj)
2927

    
2928
    if self.op.wait_for_sync:
2929
      disk_abort = not _WaitForSync(self.cfg, iobj)
2930
    elif iobj.disk_template == constants.DT_REMOTE_RAID1:
2931
      # make sure the disks are not degraded (still sync-ing is ok)
2932
      time.sleep(15)
2933
      feedback_fn("* checking mirrors status")
2934
      disk_abort = not _WaitForSync(self.cfg, iobj, oneshot=True)
2935
    else:
2936
      disk_abort = False
2937

    
2938
    if disk_abort:
2939
      _RemoveDisks(iobj, self.cfg)
2940
      self.cfg.RemoveInstance(iobj.name)
2941
      raise errors.OpExecError("There are some degraded disks for"
2942
                               " this instance")
2943

    
2944
    feedback_fn("creating os for instance %s on node %s" %
2945
                (instance, pnode_name))
2946

    
2947
    if iobj.disk_template != constants.DT_DISKLESS:
2948
      if self.op.mode == constants.INSTANCE_CREATE:
2949
        feedback_fn("* running the instance OS create scripts...")
2950
        if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
2951
          raise errors.OpExecError("could not add os for instance %s"
2952
                                   " on node %s" %
2953
                                   (instance, pnode_name))
2954

    
2955
      elif self.op.mode == constants.INSTANCE_IMPORT:
2956
        feedback_fn("* running the instance OS import scripts...")
2957
        src_node = self.op.src_node
2958
        src_image = self.src_image
2959
        if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
2960
                                                src_node, src_image):
2961
          raise errors.OpExecError("Could not import os for instance"
2962
                                   " %s on node %s" %
2963
                                   (instance, pnode_name))
2964
      else:
2965
        # also checked in the prereq part
2966
        raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
2967
                                     % self.op.mode)
2968

    
2969
    if self.op.start:
2970
      logger.Info("starting instance %s on node %s" % (instance, pnode_name))
2971
      feedback_fn("* starting instance...")
2972
      if not rpc.call_instance_start(pnode_name, iobj, None):
2973
        raise errors.OpExecError("Could not start instance")
2974

    
2975

    
2976
class LUConnectConsole(NoHooksLU):
2977
  """Connect to an instance's console.
2978

2979
  This is somewhat special in that it returns the command line that
2980
  you need to run on the master node in order to connect to the
2981
  console.
2982

2983
  """
2984
  _OP_REQP = ["instance_name"]
2985

    
2986
  def CheckPrereq(self):
2987
    """Check prerequisites.
2988

2989
    This checks that the instance is in the cluster.
2990

2991
    """
2992
    instance = self.cfg.GetInstanceInfo(
2993
      self.cfg.ExpandInstanceName(self.op.instance_name))
2994
    if instance is None:
2995
      raise errors.OpPrereqError("Instance '%s' not known" %
2996
                                 self.op.instance_name)
2997
    self.instance = instance
2998

    
2999
  def Exec(self, feedback_fn):
3000
    """Connect to the console of an instance
3001

3002
    """
3003
    instance = self.instance
3004
    node = instance.primary_node
3005

    
3006
    node_insts = rpc.call_instance_list([node])[node]
3007
    if node_insts is False:
3008
      raise errors.OpExecError("Can't connect to node %s." % node)
3009

    
3010
    if instance.name not in node_insts:
3011
      raise errors.OpExecError("Instance %s is not running." % instance.name)
3012

    
3013
    logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3014

    
3015
    hyper = hypervisor.GetHypervisor()
3016
    console_cmd = hyper.GetShellCommandForConsole(instance.name)
3017
    # build ssh cmdline
3018
    argv = ["ssh", "-q", "-t"]
3019
    argv.extend(ssh.KNOWN_HOSTS_OPTS)
3020
    argv.extend(ssh.BATCH_MODE_OPTS)
3021
    argv.append(node)
3022
    argv.append(console_cmd)
3023
    return "ssh", argv
3024

    
3025

    
3026
class LUAddMDDRBDComponent(LogicalUnit):
3027
  """Adda new mirror member to an instance's disk.
3028

3029
  """
3030
  HPATH = "mirror-add"
3031
  HTYPE = constants.HTYPE_INSTANCE
3032
  _OP_REQP = ["instance_name", "remote_node", "disk_name"]
3033

    
3034
  def BuildHooksEnv(self):
3035
    """Build hooks env.
3036

3037
    This runs on the master, the primary and all the secondaries.
3038

3039
    """
3040
    env = {
3041
      "NEW_SECONDARY": self.op.remote_node,
3042
      "DISK_NAME": self.op.disk_name,
3043
      }
3044
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3045
    nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
3046
          self.op.remote_node,] + list(self.instance.secondary_nodes)
3047
    return env, nl, nl
3048

    
3049
  def CheckPrereq(self):
3050
    """Check prerequisites.
3051

3052
    This checks that the instance is in the cluster.
3053

3054
    """
3055
    instance = self.cfg.GetInstanceInfo(
3056
      self.cfg.ExpandInstanceName(self.op.instance_name))
3057
    if instance is None:
3058
      raise errors.OpPrereqError("Instance '%s' not known" %
3059
                                 self.op.instance_name)
3060
    self.instance = instance
3061

    
3062
    remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
3063
    if remote_node is None:
3064
      raise errors.OpPrereqError("Node '%s' not known" % self.op.remote_node)
3065
    self.remote_node = remote_node
3066

    
3067
    if remote_node == instance.primary_node:
3068
      raise errors.OpPrereqError("The specified node is the primary node of"
3069
                                 " the instance.")
3070

    
3071
    if instance.disk_template != constants.DT_REMOTE_RAID1:
3072
      raise errors.OpPrereqError("Instance's disk layout is not"
3073
                                 " remote_raid1.")
3074
    for disk in instance.disks:
3075
      if disk.iv_name == self.op.disk_name:
3076
        break
3077
    else:
3078
      raise errors.OpPrereqError("Can't find this device ('%s') in the"
3079
                                 " instance." % self.op.disk_name)
3080
    if len(disk.children) > 1:
3081
      raise errors.OpPrereqError("The device already has two slave"
3082
                                 " devices.\n"
3083
                                 "This would create a 3-disk raid1"
3084
                                 " which we don't allow.")
3085
    self.disk = disk
3086

    
3087
  def Exec(self, feedback_fn):
3088
    """Add the mirror component
3089

3090
    """
3091
    disk = self.disk
3092
    instance = self.instance
3093

    
3094
    remote_node = self.remote_node
3095
    lv_names = [".%s_%s" % (disk.iv_name, suf) for suf in ["data", "meta"]]
3096
    names = _GenerateUniqueNames(self.cfg, lv_names)
3097
    new_drbd = _GenerateMDDRBDBranch(self.cfg, instance.primary_node,
3098
                                     remote_node, disk.size, names)
3099

    
3100
    logger.Info("adding new mirror component on secondary")
3101
    #HARDCODE
3102
    if not _CreateBlockDevOnSecondary(self.cfg, remote_node, new_drbd, False,
3103
                                      _GetInstanceInfoText(instance)):
3104
      raise errors.OpExecError("Failed to create new component on secondary"
3105
                               " node %s" % remote_node)
3106

    
3107
    logger.Info("adding new mirror component on primary")
3108
    #HARDCODE
3109
    if not _CreateBlockDevOnPrimary(self.cfg, instance.primary_node, new_drbd,
3110
                                    _GetInstanceInfoText(instance)):
3111
      # remove secondary dev
3112
      self.cfg.SetDiskID(new_drbd, remote_node)
3113
      rpc.call_blockdev_remove(remote_node, new_drbd)
3114
      raise errors.OpExecError("Failed to create volume on primary")
3115

    
3116
    # the device exists now
3117
    # call the primary node to add the mirror to md
3118
    logger.Info("adding new mirror component to md")
3119
    if not rpc.call_blockdev_addchild(instance.primary_node,
3120
                                           disk, new_drbd):
3121
      logger.Error("Can't add mirror compoment to md!")
3122
      self.cfg.SetDiskID(new_drbd, remote_node)
3123
      if not rpc.call_blockdev_remove(remote_node, new_drbd):
3124
        logger.Error("Can't rollback on secondary")
3125
      self.cfg.SetDiskID(new_drbd, instance.primary_node)
3126
      if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3127
        logger.Error("Can't rollback on primary")
3128
      raise errors.OpExecError("Can't add mirror component to md array")
3129

    
3130
    disk.children.append(new_drbd)
3131

    
3132
    self.cfg.AddInstance(instance)
3133

    
3134
    _WaitForSync(self.cfg, instance)
3135

    
3136
    return 0
3137

    
3138

    
3139
class LURemoveMDDRBDComponent(LogicalUnit):
3140
  """Remove a component from a remote_raid1 disk.
3141

3142
  """
3143
  HPATH = "mirror-remove"
3144
  HTYPE = constants.HTYPE_INSTANCE
3145
  _OP_REQP = ["instance_name", "disk_name", "disk_id"]
3146

    
3147
  def BuildHooksEnv(self):
3148
    """Build hooks env.
3149

3150
    This runs on the master, the primary and all the secondaries.
3151

3152
    """
3153
    env = {
3154
      "DISK_NAME": self.op.disk_name,
3155
      "DISK_ID": self.op.disk_id,
3156
      "OLD_SECONDARY": self.old_secondary,
3157
      }
3158
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3159
    nl = [self.sstore.GetMasterNode(),
3160
          self.instance.primary_node] + list(self.instance.secondary_nodes)
3161
    return env, nl, nl
3162

    
3163
  def CheckPrereq(self):
3164
    """Check prerequisites.
3165

3166
    This checks that the instance is in the cluster.
3167

3168
    """
3169
    instance = self.cfg.GetInstanceInfo(
3170
      self.cfg.ExpandInstanceName(self.op.instance_name))
3171
    if instance is None:
3172
      raise errors.OpPrereqError("Instance '%s' not known" %
3173
                                 self.op.instance_name)
3174
    self.instance = instance
3175

    
3176
    if instance.disk_template != constants.DT_REMOTE_RAID1:
3177
      raise errors.OpPrereqError("Instance's disk layout is not"
3178
                                 " remote_raid1.")
3179
    for disk in instance.disks:
3180
      if disk.iv_name == self.op.disk_name:
3181
        break
3182
    else:
3183
      raise errors.OpPrereqError("Can't find this device ('%s') in the"
3184
                                 " instance." % self.op.disk_name)
3185
    for child in disk.children:
3186
      if child.dev_type == "drbd" and child.logical_id[2] == self.op.disk_id:
3187
        break
3188
    else:
3189
      raise errors.OpPrereqError("Can't find the device with this port.")
3190

    
3191
    if len(disk.children) < 2:
3192
      raise errors.OpPrereqError("Cannot remove the last component from"
3193
                                 " a mirror.")
3194
    self.disk = disk
3195
    self.child = child
3196
    if self.child.logical_id[0] == instance.primary_node:
3197
      oid = 1
3198
    else:
3199
      oid = 0
3200
    self.old_secondary = self.child.logical_id[oid]
3201

    
3202
  def Exec(self, feedback_fn):
3203
    """Remove the mirror component
3204

3205
    """
3206
    instance = self.instance
3207
    disk = self.disk
3208
    child = self.child
3209
    logger.Info("remove mirror component")
3210
    self.cfg.SetDiskID(disk, instance.primary_node)
3211
    if not rpc.call_blockdev_removechild(instance.primary_node,
3212
                                              disk, child):
3213
      raise errors.OpExecError("Can't remove child from mirror.")
3214

    
3215
    for node in child.logical_id[:2]:
3216
      self.cfg.SetDiskID(child, node)
3217
      if not rpc.call_blockdev_remove(node, child):
3218
        logger.Error("Warning: failed to remove device from node %s,"
3219
                     " continuing operation." % node)
3220

    
3221
    disk.children.remove(child)
3222
    self.cfg.AddInstance(instance)
3223

    
3224

    
3225
class LUReplaceDisks(LogicalUnit):
3226
  """Replace the disks of an instance.
3227

3228
  """
3229
  HPATH = "mirrors-replace"
3230
  HTYPE = constants.HTYPE_INSTANCE
3231
  _OP_REQP = ["instance_name"]
3232

    
3233
  def BuildHooksEnv(self):
3234
    """Build hooks env.
3235

3236
    This runs on the master, the primary and all the secondaries.
3237

3238
    """
3239
    env = {
3240
      "NEW_SECONDARY": self.op.remote_node,
3241
      "OLD_SECONDARY": self.instance.secondary_nodes[0],
3242
      }
3243
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3244
    nl = [self.sstore.GetMasterNode(),
3245
          self.instance.primary_node] + list(self.instance.secondary_nodes)
3246
    return env, nl, nl
3247

    
3248
  def CheckPrereq(self):
3249
    """Check prerequisites.
3250

3251
    This checks that the instance is in the cluster.
3252

3253
    """
3254
    instance = self.cfg.GetInstanceInfo(
3255
      self.cfg.ExpandInstanceName(self.op.instance_name))
3256
    if instance is None:
3257
      raise errors.OpPrereqError("Instance '%s' not known" %
3258
                                 self.op.instance_name)
3259
    self.instance = instance
3260

    
3261
    if instance.disk_template != constants.DT_REMOTE_RAID1:
3262
      raise errors.OpPrereqError("Instance's disk layout is not"
3263
                                 " remote_raid1.")
3264

    
3265
    if len(instance.secondary_nodes) != 1:
3266
      raise errors.OpPrereqError("The instance has a strange layout,"
3267
                                 " expected one secondary but found %d" %
3268
                                 len(instance.secondary_nodes))
3269

    
3270
    remote_node = getattr(self.op, "remote_node", None)
3271
    if remote_node is None:
3272
      remote_node = instance.secondary_nodes[0]
3273
    else:
3274
      remote_node = self.cfg.ExpandNodeName(remote_node)
3275
      if remote_node is None:
3276
        raise errors.OpPrereqError("Node '%s' not known" %
3277
                                   self.op.remote_node)
3278
    if remote_node == instance.primary_node:
3279
      raise errors.OpPrereqError("The specified node is the primary node of"
3280
                                 " the instance.")
3281
    self.op.remote_node = remote_node
3282

    
3283
  def Exec(self, feedback_fn):
3284
    """Replace the disks of an instance.
3285

3286
    """
3287
    instance = self.instance
3288
    iv_names = {}
3289
    # start of work
3290
    remote_node = self.op.remote_node
3291
    cfg = self.cfg
3292
    for dev in instance.disks:
3293
      size = dev.size
3294
      lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3295
      names = _GenerateUniqueNames(cfg, lv_names)
3296
      new_drbd = _GenerateMDDRBDBranch(cfg, instance.primary_node,
3297
                                       remote_node, size, names)
3298
      iv_names[dev.iv_name] = (dev, dev.children[0], new_drbd)
3299
      logger.Info("adding new mirror component on secondary for %s" %
3300
                  dev.iv_name)
3301
      #HARDCODE
3302
      if not _CreateBlockDevOnSecondary(cfg, remote_node, new_drbd, False,
3303
                                        _GetInstanceInfoText(instance)):
3304
        raise errors.OpExecError("Failed to create new component on"
3305
                                 " secondary node %s\n"
3306
                                 "Full abort, cleanup manually!" %
3307
                                 remote_node)
3308

    
3309
      logger.Info("adding new mirror component on primary")
3310
      #HARDCODE
3311
      if not _CreateBlockDevOnPrimary(cfg, instance.primary_node, new_drbd,
3312
                                      _GetInstanceInfoText(instance)):
3313
        # remove secondary dev
3314
        cfg.SetDiskID(new_drbd, remote_node)
3315
        rpc.call_blockdev_remove(remote_node, new_drbd)
3316
        raise errors.OpExecError("Failed to create volume on primary!\n"
3317
                                 "Full abort, cleanup manually!!")
3318

    
3319
      # the device exists now
3320
      # call the primary node to add the mirror to md
3321
      logger.Info("adding new mirror component to md")
3322
      if not rpc.call_blockdev_addchild(instance.primary_node, dev,
3323
                                        new_drbd):
3324
        logger.Error("Can't add mirror compoment to md!")
3325
        cfg.SetDiskID(new_drbd, remote_node)
3326
        if not rpc.call_blockdev_remove(remote_node, new_drbd):
3327
          logger.Error("Can't rollback on secondary")
3328
        cfg.SetDiskID(new_drbd, instance.primary_node)
3329
        if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3330
          logger.Error("Can't rollback on primary")
3331
        raise errors.OpExecError("Full abort, cleanup manually!!")
3332

    
3333
      dev.children.append(new_drbd)
3334
      cfg.AddInstance(instance)
3335

    
3336
    # this can fail as the old devices are degraded and _WaitForSync
3337
    # does a combined result over all disks, so we don't check its
3338
    # return value
3339
    _WaitForSync(cfg, instance, unlock=True)
3340

    
3341
    # so check manually all the devices
3342
    for name in iv_names:
3343
      dev, child, new_drbd = iv_names[name]
3344
      cfg.SetDiskID(dev, instance.primary_node)
3345
      is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3346
      if is_degr:
3347
        raise errors.OpExecError("MD device %s is degraded!" % name)
3348
      cfg.SetDiskID(new_drbd, instance.primary_node)
3349
      is_degr = rpc.call_blockdev_find(instance.primary_node, new_drbd)[5]
3350
      if is_degr:
3351
        raise errors.OpExecError("New drbd device %s is degraded!" % name)
3352

    
3353
    for name in iv_names:
3354
      dev, child, new_drbd = iv_names[name]
3355
      logger.Info("remove mirror %s component" % name)
3356
      cfg.SetDiskID(dev, instance.primary_node)
3357
      if not rpc.call_blockdev_removechild(instance.primary_node,
3358
                                                dev, child):
3359
        logger.Error("Can't remove child from mirror, aborting"
3360
                     " *this device cleanup*.\nYou need to cleanup manually!!")
3361
        continue
3362

    
3363
      for node in child.logical_id[:2]:
3364
        logger.Info("remove child device on %s" % node)
3365
        cfg.SetDiskID(child, node)
3366
        if not rpc.call_blockdev_remove(node, child):
3367
          logger.Error("Warning: failed to remove device from node %s,"
3368
                       " continuing operation." % node)
3369

    
3370
      dev.children.remove(child)
3371

    
3372
      cfg.AddInstance(instance)
3373

    
3374

    
3375
class LUQueryInstanceData(NoHooksLU):
3376
  """Query runtime instance data.
3377

3378
  """
3379
  _OP_REQP = ["instances"]
3380

    
3381
  def CheckPrereq(self):
3382
    """Check prerequisites.
3383

3384
    This only checks the optional instance list against the existing names.
3385

3386
    """
3387
    if not isinstance(self.op.instances, list):
3388
      raise errors.OpPrereqError("Invalid argument type 'instances'")
3389
    if self.op.instances:
3390
      self.wanted_instances = []
3391
      names = self.op.instances
3392
      for name in names:
3393
        instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
3394
        if instance is None:
3395
          raise errors.OpPrereqError("No such instance name '%s'" % name)
3396
      self.wanted_instances.append(instance)
3397
    else:
3398
      self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
3399
                               in self.cfg.GetInstanceList()]
3400
    return
3401

    
3402

    
3403
  def _ComputeDiskStatus(self, instance, snode, dev):
3404
    """Compute block device status.
3405

3406
    """
3407
    self.cfg.SetDiskID(dev, instance.primary_node)
3408
    dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
3409
    if dev.dev_type == "drbd":
3410
      # we change the snode then (otherwise we use the one passed in)
3411
      if dev.logical_id[0] == instance.primary_node:
3412
        snode = dev.logical_id[1]
3413
      else:
3414
        snode = dev.logical_id[0]
3415

    
3416
    if snode:
3417
      self.cfg.SetDiskID(dev, snode)
3418
      dev_sstatus = rpc.call_blockdev_find(snode, dev)
3419
    else:
3420
      dev_sstatus = None
3421

    
3422
    if dev.children:
3423
      dev_children = [self._ComputeDiskStatus(instance, snode, child)
3424
                      for child in dev.children]
3425
    else:
3426
      dev_children = []
3427

    
3428
    data = {
3429
      "iv_name": dev.iv_name,
3430
      "dev_type": dev.dev_type,
3431
      "logical_id": dev.logical_id,
3432
      "physical_id": dev.physical_id,
3433
      "pstatus": dev_pstatus,
3434
      "sstatus": dev_sstatus,
3435
      "children": dev_children,
3436
      }
3437

    
3438
    return data
3439

    
3440
  def Exec(self, feedback_fn):
3441
    """Gather and return data"""
3442
    result = {}
3443
    for instance in self.wanted_instances:
3444
      remote_info = rpc.call_instance_info(instance.primary_node,
3445
                                                instance.name)
3446
      if remote_info and "state" in remote_info:
3447
        remote_state = "up"
3448
      else:
3449
        remote_state = "down"
3450
      if instance.status == "down":
3451
        config_state = "down"
3452
      else:
3453
        config_state = "up"
3454

    
3455
      disks = [self._ComputeDiskStatus(instance, None, device)
3456
               for device in instance.disks]
3457

    
3458
      idict = {
3459
        "name": instance.name,
3460
        "config_state": config_state,
3461
        "run_state": remote_state,
3462
        "pnode": instance.primary_node,
3463
        "snodes": instance.secondary_nodes,
3464
        "os": instance.os,
3465
        "memory": instance.memory,
3466
        "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
3467
        "disks": disks,
3468
        }
3469

    
3470
      result[instance.name] = idict
3471

    
3472
    return result
3473

    
3474

    
3475
class LUSetInstanceParms(LogicalUnit):
3476
  """Modifies an instances's parameters.
3477

3478
  """
3479
  HPATH = "instance-modify"
3480
  HTYPE = constants.HTYPE_INSTANCE
3481
  _OP_REQP = ["instance_name"]
3482

    
3483
  def BuildHooksEnv(self):
3484
    """Build hooks env.
3485

3486
    This runs on the master, primary and secondaries.
3487

3488
    """
3489
    args = dict()
3490
    if self.mem:
3491
      args['memory'] = self.mem
3492
    if self.vcpus:
3493
      args['vcpus'] = self.vcpus
3494
    if self.do_ip or self.do_bridge:
3495
      if self.do_ip:
3496
        ip = self.ip
3497
      else:
3498
        ip = self.instance.nics[0].ip
3499
      if self.bridge:
3500
        bridge = self.bridge
3501
      else:
3502
        bridge = self.instance.nics[0].bridge
3503
      args['nics'] = [(ip, bridge)]
3504
    env = _BuildInstanceHookEnvByObject(self.instance, override=args)
3505
    nl = [self.sstore.GetMasterNode(),
3506
          self.instance.primary_node] + list(self.instance.secondary_nodes)
3507
    return env, nl, nl
3508

    
3509
  def CheckPrereq(self):
3510
    """Check prerequisites.
3511

3512
    This only checks the instance list against the existing names.
3513

3514
    """
3515
    self.mem = getattr(self.op, "mem", None)
3516
    self.vcpus = getattr(self.op, "vcpus", None)
3517
    self.ip = getattr(self.op, "ip", None)
3518
    self.bridge = getattr(self.op, "bridge", None)
3519
    if [self.mem, self.vcpus, self.ip, self.bridge].count(None) == 4:
3520
      raise errors.OpPrereqError("No changes submitted")
3521
    if self.mem is not None:
3522
      try:
3523
        self.mem = int(self.mem)
3524
      except ValueError, err:
3525
        raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
3526
    if self.vcpus is not None:
3527
      try:
3528
        self.vcpus = int(self.vcpus)
3529
      except ValueError, err:
3530
        raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
3531
    if self.ip is not None:
3532
      self.do_ip = True
3533
      if self.ip.lower() == "none":
3534
        self.ip = None
3535
      else:
3536
        if not utils.IsValidIP(self.ip):
3537
          raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
3538
    else:
3539
      self.do_ip = False
3540
    self.do_bridge = (self.bridge is not None)
3541

    
3542
    instance = self.cfg.GetInstanceInfo(
3543
      self.cfg.ExpandInstanceName(self.op.instance_name))
3544
    if instance is None:
3545
      raise errors.OpPrereqError("No such instance name '%s'" %
3546
                                 self.op.instance_name)
3547
    self.op.instance_name = instance.name
3548
    self.instance = instance
3549
    return
3550

    
3551
  def Exec(self, feedback_fn):
3552
    """Modifies an instance.
3553

3554
    All parameters take effect only at the next restart of the instance.
3555
    """
3556
    result = []
3557
    instance = self.instance
3558
    if self.mem:
3559
      instance.memory = self.mem
3560
      result.append(("mem", self.mem))
3561
    if self.vcpus:
3562
      instance.vcpus = self.vcpus
3563
      result.append(("vcpus",  self.vcpus))
3564
    if self.do_ip:
3565
      instance.nics[0].ip = self.ip
3566
      result.append(("ip", self.ip))
3567
    if self.bridge:
3568
      instance.nics[0].bridge = self.bridge
3569
      result.append(("bridge", self.bridge))
3570

    
3571
    self.cfg.AddInstance(instance)
3572

    
3573
    return result
3574

    
3575

    
3576
class LUQueryExports(NoHooksLU):
3577
  """Query the exports list
3578

3579
  """
3580
  _OP_REQP = []
3581

    
3582
  def CheckPrereq(self):
3583
    """Check that the nodelist contains only existing nodes.
3584

3585
    """
3586
    self.nodes = _GetWantedNodes(self, getattr(self.op, "nodes", None))
3587

    
3588
  def Exec(self, feedback_fn):
3589
    """Compute the list of all the exported system images.
3590

3591
    Returns:
3592
      a dictionary with the structure node->(export-list)
3593
      where export-list is a list of the instances exported on
3594
      that node.
3595

3596
    """
3597
    return rpc.call_export_list(self.nodes)
3598

    
3599

    
3600
class LUExportInstance(LogicalUnit):
3601
  """Export an instance to an image in the cluster.
3602

3603
  """
3604
  HPATH = "instance-export"
3605
  HTYPE = constants.HTYPE_INSTANCE
3606
  _OP_REQP = ["instance_name", "target_node", "shutdown"]
3607

    
3608
  def BuildHooksEnv(self):
3609
    """Build hooks env.
3610

3611
    This will run on the master, primary node and target node.
3612

3613
    """
3614
    env = {
3615
      "EXPORT_NODE": self.op.target_node,
3616
      "EXPORT_DO_SHUTDOWN": self.op.shutdown,
3617
      }
3618
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3619
    nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
3620
          self.op.target_node]
3621
    return env, nl, nl
3622

    
3623
  def CheckPrereq(self):
3624
    """Check prerequisites.
3625

3626
    This checks that the instance name is a valid one.
3627

3628
    """
3629
    instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
3630
    self.instance = self.cfg.GetInstanceInfo(instance_name)
3631
    if self.instance is None:
3632
      raise errors.OpPrereqError("Instance '%s' not found" %
3633
                                 self.op.instance_name)
3634

    
3635
    # node verification
3636
    dst_node_short = self.cfg.ExpandNodeName(self.op.target_node)
3637
    self.dst_node = self.cfg.GetNodeInfo(dst_node_short)
3638

    
3639
    if self.dst_node is None:
3640
      raise errors.OpPrereqError("Destination node '%s' is unknown." %
3641
                                 self.op.target_node)
3642
    self.op.target_node = self.dst_node.name
3643

    
3644
  def Exec(self, feedback_fn):
3645
    """Export an instance to an image in the cluster.
3646

3647
    """
3648
    instance = self.instance
3649
    dst_node = self.dst_node
3650
    src_node = instance.primary_node
3651
    # shutdown the instance, unless requested not to do so
3652
    if self.op.shutdown:
3653
      op = opcodes.OpShutdownInstance(instance_name=instance.name)
3654
      self.processor.ChainOpCode(op, feedback_fn)
3655

    
3656
    vgname = self.cfg.GetVGName()
3657

    
3658
    snap_disks = []
3659

    
3660
    try:
3661
      for disk in instance.disks:
3662
        if disk.iv_name == "sda":
3663
          # new_dev_name will be a snapshot of an lvm leaf of the one we passed
3664
          new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
3665

    
3666
          if not new_dev_name:
3667
            logger.Error("could not snapshot block device %s on node %s" %
3668
                         (disk.logical_id[1], src_node))
3669
          else:
3670
            new_dev = objects.Disk(dev_type="lvm", size=disk.size,
3671
                                      logical_id=(vgname, new_dev_name),
3672
                                      physical_id=(vgname, new_dev_name),
3673
                                      iv_name=disk.iv_name)
3674
            snap_disks.append(new_dev)
3675

    
3676
    finally:
3677
      if self.op.shutdown:
3678
        op = opcodes.OpStartupInstance(instance_name=instance.name,
3679
                                       force=False)
3680
        self.processor.ChainOpCode(op, feedback_fn)
3681

    
3682
    # TODO: check for size
3683

    
3684
    for dev in snap_disks:
3685
      if not rpc.call_snapshot_export(src_node, dev, dst_node.name,
3686
                                           instance):
3687
        logger.Error("could not export block device %s from node"
3688
                     " %s to node %s" %
3689
                     (dev.logical_id[1], src_node, dst_node.name))
3690
      if not rpc.call_blockdev_remove(src_node, dev):
3691
        logger.Error("could not remove snapshot block device %s from"
3692
                     " node %s" % (dev.logical_id[1], src_node))
3693

    
3694
    if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
3695
      logger.Error("could not finalize export for instance %s on node %s" %
3696
                   (instance.name, dst_node.name))
3697

    
3698
    nodelist = self.cfg.GetNodeList()
3699
    nodelist.remove(dst_node.name)
3700

    
3701
    # on one-node clusters nodelist will be empty after the removal
3702
    # if we proceed the backup would be removed because OpQueryExports
3703
    # substitutes an empty list with the full cluster node list.
3704
    if nodelist:
3705
      op = opcodes.OpQueryExports(nodes=nodelist)
3706
      exportlist = self.processor.ChainOpCode(op, feedback_fn)
3707
      for node in exportlist:
3708
        if instance.name in exportlist[node]:
3709
          if not rpc.call_export_remove(node, instance.name):
3710
            logger.Error("could not remove older export for instance %s"
3711
                         " on node %s" % (instance.name, node))
3712

    
3713

    
3714
class TagsLU(NoHooksLU):
3715
  """Generic tags LU.
3716

3717
  This is an abstract class which is the parent of all the other tags LUs.
3718

3719
  """
3720
  def CheckPrereq(self):
3721
    """Check prerequisites.
3722

3723
    """
3724
    if self.op.kind == constants.TAG_CLUSTER:
3725
      self.target = self.cfg.GetClusterInfo()
3726
    elif self.op.kind == constants.TAG_NODE:
3727
      name = self.cfg.ExpandNodeName(self.op.name)
3728
      if name is None:
3729
        raise errors.OpPrereqError("Invalid node name (%s)" %
3730
                                   (self.op.name,))
3731
      self.op.name = name
3732
      self.target = self.cfg.GetNodeInfo(name)
3733
    elif self.op.kind == constants.TAG_INSTANCE:
3734
      name = self.cfg.ExpandInstanceName(name)
3735
      if name is None:
3736
        raise errors.OpPrereqError("Invalid instance name (%s)" %
3737
                                   (self.op.name,))
3738
      self.op.name = name
3739
      self.target = self.cfg.GetInstanceInfo(name)
3740
    else:
3741
      raise errors.OpPrereqError("Wrong tag type requested (%s)" %
3742
                                 str(self.op.kind))
3743

    
3744

    
3745
class LUGetTags(TagsLU):
3746
  """Returns the tags of a given object.
3747

3748
  """
3749
  _OP_REQP = ["kind", "name"]
3750

    
3751
  def Exec(self, feedback_fn):
3752
    """Returns the tag list.
3753

3754
    """
3755
    return self.target.GetTags()
3756

    
3757

    
3758
class LUAddTag(TagsLU):
3759
  """Sets a tag on a given object.
3760

3761
  """
3762
  _OP_REQP = ["kind", "name", "tag"]
3763

    
3764
  def CheckPrereq(self):
3765
    """Check prerequisites.
3766

3767
    This checks the type and length of the tag name and value.
3768

3769
    """
3770
    TagsLU.CheckPrereq(self)
3771
    objects.TaggableObject.ValidateTag(self.op.tag)
3772

    
3773
  def Exec(self, feedback_fn):
3774
    """Sets the tag.
3775

3776
    """
3777
    try:
3778
      self.target.AddTag(self.op.tag)
3779
    except errors.TagError, err:
3780
      raise errors.OpExecError("Error while setting tag: %s" % str(err))
3781
    try:
3782
      self.cfg.Update(self.target)
3783
    except errors.ConfigurationError:
3784
      raise errors.OpRetryError("There has been a modification to the"
3785
                                " config file and the operation has been"
3786
                                " aborted. Please retry.")
3787

    
3788

    
3789
class LUDelTag(TagsLU):
3790
  """Delete a tag from a given object.
3791

3792
  """
3793
  _OP_REQP = ["kind", "name", "tag"]
3794

    
3795
  def CheckPrereq(self):
3796
    """Check prerequisites.
3797

3798
    This checks that we have the given tag.
3799

3800
    """
3801
    TagsLU.CheckPrereq(self)
3802
    objects.TaggableObject.ValidateTag(self.op.tag)
3803
    if self.op.tag not in self.target.GetTags():
3804
      raise errors.OpPrereqError("Tag not found")
3805

    
3806
  def Exec(self, feedback_fn):
3807
    """Remove the tag from the object.
3808

3809
    """
3810
    self.target.RemoveTag(self.op.tag)
3811
    try:
3812
      self.cfg.Update(self.target)
3813
    except errors.ConfigurationError:
3814
      raise errors.OpRetryError("There has been a modification to the"
3815
                                " config file and the operation has been"
3816
                                " aborted. Please retry.")