Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ 4a72cc75

History | View | Annotate | Download (116.2 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0613,W0201
25

    
26
import os
27
import os.path
28
import sha
29
import socket
30
import time
31
import tempfile
32
import re
33
import platform
34

    
35
from ganeti import rpc
36
from ganeti import ssh
37
from ganeti import logger
38
from ganeti import utils
39
from ganeti import errors
40
from ganeti import hypervisor
41
from ganeti import config
42
from ganeti import constants
43
from ganeti import objects
44
from ganeti import opcodes
45
from ganeti import ssconf
46

    
47
class LogicalUnit(object):
48
  """Logical Unit base class.
49

50
  Subclasses must follow these rules:
51
    - implement CheckPrereq which also fills in the opcode instance
52
      with all the fields (even if as None)
53
    - implement Exec
54
    - implement BuildHooksEnv
55
    - redefine HPATH and HTYPE
56
    - optionally redefine their run requirements (REQ_CLUSTER,
57
      REQ_MASTER); note that all commands require root permissions
58

59
  """
60
  HPATH = None
61
  HTYPE = None
62
  _OP_REQP = []
63
  REQ_CLUSTER = True
64
  REQ_MASTER = True
65

    
66
  def __init__(self, processor, op, cfg, sstore):
67
    """Constructor for LogicalUnit.
68

69
    This needs to be overriden in derived classes in order to check op
70
    validity.
71

72
    """
73
    self.processor = processor
74
    self.op = op
75
    self.cfg = cfg
76
    self.sstore = sstore
77
    for attr_name in self._OP_REQP:
78
      attr_val = getattr(op, attr_name, None)
79
      if attr_val is None:
80
        raise errors.OpPrereqError("Required parameter '%s' missing" %
81
                                   attr_name)
82
    if self.REQ_CLUSTER:
83
      if not cfg.IsCluster():
84
        raise errors.OpPrereqError("Cluster not initialized yet,"
85
                                   " use 'gnt-cluster init' first.")
86
      if self.REQ_MASTER:
87
        master = sstore.GetMasterNode()
88
        if master != socket.gethostname():
89
          raise errors.OpPrereqError("Commands must be run on the master"
90
                                     " node %s" % master)
91

    
92
  def CheckPrereq(self):
93
    """Check prerequisites for this LU.
94

95
    This method should check that the prerequisites for the execution
96
    of this LU are fulfilled. It can do internode communication, but
97
    it should be idempotent - no cluster or system changes are
98
    allowed.
99

100
    The method should raise errors.OpPrereqError in case something is
101
    not fulfilled. Its return value is ignored.
102

103
    This method should also update all the parameters of the opcode to
104
    their canonical form; e.g. a short node name must be fully
105
    expanded after this method has successfully completed (so that
106
    hooks, logging, etc. work correctly).
107

108
    """
109
    raise NotImplementedError
110

    
111
  def Exec(self, feedback_fn):
112
    """Execute the LU.
113

114
    This method should implement the actual work. It should raise
115
    errors.OpExecError for failures that are somewhat dealt with in
116
    code, or expected.
117

118
    """
119
    raise NotImplementedError
120

    
121
  def BuildHooksEnv(self):
122
    """Build hooks environment for this LU.
123

124
    This method should return a three-node tuple consisting of: a dict
125
    containing the environment that will be used for running the
126
    specific hook for this LU, a list of node names on which the hook
127
    should run before the execution, and a list of node names on which
128
    the hook should run after the execution.
129

130
    The keys of the dict must not have 'GANETI_' prefixed as this will
131
    be handled in the hooks runner. Also note additional keys will be
132
    added by the hooks runner. If the LU doesn't define any
133
    environment, an empty dict (and not None) should be returned.
134

135
    As for the node lists, the master should not be included in the
136
    them, as it will be added by the hooks runner in case this LU
137
    requires a cluster to run on (otherwise we don't have a node
138
    list). No nodes should be returned as an empty list (and not
139
    None).
140

141
    Note that if the HPATH for a LU class is None, this function will
142
    not be called.
143

144
    """
145
    raise NotImplementedError
146

    
147

    
148
class NoHooksLU(LogicalUnit):
149
  """Simple LU which runs no hooks.
150

151
  This LU is intended as a parent for other LogicalUnits which will
152
  run no hooks, in order to reduce duplicate code.
153

154
  """
155
  HPATH = None
156
  HTYPE = None
157

    
158
  def BuildHooksEnv(self):
159
    """Build hooks env.
160

161
    This is a no-op, since we don't run hooks.
162

163
    """
164
    return
165

    
166

    
167
def _GetWantedNodes(lu, nodes):
168
  """Returns list of checked and expanded node names.
169

170
  Args:
171
    nodes: List of nodes (strings) or None for all
172

173
  """
174
  if not isinstance(nodes, list):
175
    raise errors.OpPrereqError("Invalid argument type 'nodes'")
176

    
177
  if nodes:
178
    wanted = []
179

    
180
    for name in nodes:
181
      node = lu.cfg.ExpandNodeName(name)
182
      if node is None:
183
        raise errors.OpPrereqError("No such node name '%s'" % name)
184
      wanted.append(node)
185

    
186
  else:
187
    wanted = lu.cfg.GetNodeList()
188
  return utils.NiceSort(wanted)
189

    
190

    
191
def _GetWantedInstances(lu, instances):
192
  """Returns list of checked and expanded instance names.
193

194
  Args:
195
    instances: List of instances (strings) or None for all
196

197
  """
198
  if not isinstance(instances, list):
199
    raise errors.OpPrereqError("Invalid argument type 'instances'")
200

    
201
  if instances:
202
    wanted = []
203

    
204
    for name in instances:
205
      instance = lu.cfg.ExpandInstanceName(name)
206
      if instance is None:
207
        raise errors.OpPrereqError("No such instance name '%s'" % name)
208
      wanted.append(instance)
209

    
210
  else:
211
    wanted = lu.cfg.GetInstanceList()
212
  return utils.NiceSort(wanted)
213

    
214

    
215
def _CheckOutputFields(static, dynamic, selected):
216
  """Checks whether all selected fields are valid.
217

218
  Args:
219
    static: Static fields
220
    dynamic: Dynamic fields
221

222
  """
223
  static_fields = frozenset(static)
224
  dynamic_fields = frozenset(dynamic)
225

    
226
  all_fields = static_fields | dynamic_fields
227

    
228
  if not all_fields.issuperset(selected):
229
    raise errors.OpPrereqError("Unknown output fields selected: %s"
230
                               % ",".join(frozenset(selected).
231
                                          difference(all_fields)))
232

    
233

    
234
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
235
                          memory, vcpus, nics):
236
  """Builds instance related env variables for hooks from single variables.
237

238
  Args:
239
    secondary_nodes: List of secondary nodes as strings
240
  """
241
  env = {
242
    "INSTANCE_NAME": name,
243
    "INSTANCE_PRIMARY": primary_node,
244
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
245
    "INSTANCE_OS_TYPE": os_type,
246
    "INSTANCE_STATUS": status,
247
    "INSTANCE_MEMORY": memory,
248
    "INSTANCE_VCPUS": vcpus,
249
  }
250

    
251
  if nics:
252
    nic_count = len(nics)
253
    for idx, (ip, bridge) in enumerate(nics):
254
      if ip is None:
255
        ip = ""
256
      env["INSTANCE_NIC%d_IP" % idx] = ip
257
      env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
258
  else:
259
    nic_count = 0
260

    
261
  env["INSTANCE_NIC_COUNT"] = nic_count
262

    
263
  return env
264

    
265

    
266
def _BuildInstanceHookEnvByObject(instance, override=None):
267
  """Builds instance related env variables for hooks from an object.
268

269
  Args:
270
    instance: objects.Instance object of instance
271
    override: dict of values to override
272
  """
273
  args = {
274
    'name': instance.name,
275
    'primary_node': instance.primary_node,
276
    'secondary_nodes': instance.secondary_nodes,
277
    'os_type': instance.os,
278
    'status': instance.os,
279
    'memory': instance.memory,
280
    'vcpus': instance.vcpus,
281
    'nics': [(nic.ip, nic.bridge) for nic in instance.nics],
282
  }
283
  if override:
284
    args.update(override)
285
  return _BuildInstanceHookEnv(**args)
286

    
287

    
288
def _UpdateEtcHosts(fullnode, ip):
289
  """Ensure a node has a correct entry in /etc/hosts.
290

291
  Args:
292
    fullnode - Fully qualified domain name of host. (str)
293
    ip       - IPv4 address of host (str)
294

295
  """
296
  node = fullnode.split(".", 1)[0]
297

    
298
  f = open('/etc/hosts', 'r+')
299

    
300
  inthere = False
301

    
302
  save_lines = []
303
  add_lines = []
304
  removed = False
305

    
306
  while True:
307
    rawline = f.readline()
308

    
309
    if not rawline:
310
      # End of file
311
      break
312

    
313
    line = rawline.split('\n')[0]
314

    
315
    # Strip off comments
316
    line = line.split('#')[0]
317

    
318
    if not line:
319
      # Entire line was comment, skip
320
      save_lines.append(rawline)
321
      continue
322

    
323
    fields = line.split()
324

    
325
    haveall = True
326
    havesome = False
327
    for spec in [ ip, fullnode, node ]:
328
      if spec not in fields:
329
        haveall = False
330
      if spec in fields:
331
        havesome = True
332

    
333
    if haveall:
334
      inthere = True
335
      save_lines.append(rawline)
336
      continue
337

    
338
    if havesome and not haveall:
339
      # Line (old, or manual?) which is missing some.  Remove.
340
      removed = True
341
      continue
342

    
343
    save_lines.append(rawline)
344

    
345
  if not inthere:
346
    add_lines.append('%s\t%s %s\n' % (ip, fullnode, node))
347

    
348
  if removed:
349
    if add_lines:
350
      save_lines = save_lines + add_lines
351

    
352
    # We removed a line, write a new file and replace old.
353
    fd, tmpname = tempfile.mkstemp('tmp', 'hosts_', '/etc')
354
    newfile = os.fdopen(fd, 'w')
355
    newfile.write(''.join(save_lines))
356
    newfile.close()
357
    os.rename(tmpname, '/etc/hosts')
358

    
359
  elif add_lines:
360
    # Simply appending a new line will do the trick.
361
    f.seek(0, 2)
362
    for add in add_lines:
363
      f.write(add)
364

    
365
  f.close()
366

    
367

    
368
def _UpdateKnownHosts(fullnode, ip, pubkey):
369
  """Ensure a node has a correct known_hosts entry.
370

371
  Args:
372
    fullnode - Fully qualified domain name of host. (str)
373
    ip       - IPv4 address of host (str)
374
    pubkey   - the public key of the cluster
375

376
  """
377
  if os.path.exists(constants.SSH_KNOWN_HOSTS_FILE):
378
    f = open(constants.SSH_KNOWN_HOSTS_FILE, 'r+')
379
  else:
380
    f = open(constants.SSH_KNOWN_HOSTS_FILE, 'w+')
381

    
382
  inthere = False
383

    
384
  save_lines = []
385
  add_lines = []
386
  removed = False
387

    
388
  while True:
389
    rawline = f.readline()
390
    logger.Debug('read %s' % (repr(rawline),))
391

    
392
    if not rawline:
393
      # End of file
394
      break
395

    
396
    line = rawline.split('\n')[0]
397

    
398
    parts = line.split(' ')
399
    fields = parts[0].split(',')
400
    key = parts[2]
401

    
402
    haveall = True
403
    havesome = False
404
    for spec in [ ip, fullnode ]:
405
      if spec not in fields:
406
        haveall = False
407
      if spec in fields:
408
        havesome = True
409

    
410
    logger.Debug("key, pubkey = %s." % (repr((key, pubkey)),))
411
    if haveall and key == pubkey:
412
      inthere = True
413
      save_lines.append(rawline)
414
      logger.Debug("Keeping known_hosts '%s'." % (repr(rawline),))
415
      continue
416

    
417
    if havesome and (not haveall or key != pubkey):
418
      removed = True
419
      logger.Debug("Discarding known_hosts '%s'." % (repr(rawline),))
420
      continue
421

    
422
    save_lines.append(rawline)
423

    
424
  if not inthere:
425
    add_lines.append('%s,%s ssh-rsa %s\n' % (fullnode, ip, pubkey))
426
    logger.Debug("Adding known_hosts '%s'." % (repr(add_lines[-1]),))
427

    
428
  if removed:
429
    save_lines = save_lines + add_lines
430

    
431
    # Write a new file and replace old.
432
    fd, tmpname = tempfile.mkstemp('.tmp', 'known_hosts.',
433
                                   constants.DATA_DIR)
434
    newfile = os.fdopen(fd, 'w')
435
    try:
436
      newfile.write(''.join(save_lines))
437
    finally:
438
      newfile.close()
439
    logger.Debug("Wrote new known_hosts.")
440
    os.rename(tmpname, constants.SSH_KNOWN_HOSTS_FILE)
441

    
442
  elif add_lines:
443
    # Simply appending a new line will do the trick.
444
    f.seek(0, 2)
445
    for add in add_lines:
446
      f.write(add)
447

    
448
  f.close()
449

    
450

    
451
def _HasValidVG(vglist, vgname):
452
  """Checks if the volume group list is valid.
453

454
  A non-None return value means there's an error, and the return value
455
  is the error message.
456

457
  """
458
  vgsize = vglist.get(vgname, None)
459
  if vgsize is None:
460
    return "volume group '%s' missing" % vgname
461
  elif vgsize < 20480:
462
    return ("volume group '%s' too small (20480MiB required, %dMib found)" %
463
            (vgname, vgsize))
464
  return None
465

    
466

    
467
def _InitSSHSetup(node):
468
  """Setup the SSH configuration for the cluster.
469

470

471
  This generates a dsa keypair for root, adds the pub key to the
472
  permitted hosts and adds the hostkey to its own known hosts.
473

474
  Args:
475
    node: the name of this host as a fqdn
476

477
  """
478
  if os.path.exists('/root/.ssh/id_dsa'):
479
    utils.CreateBackup('/root/.ssh/id_dsa')
480
  if os.path.exists('/root/.ssh/id_dsa.pub'):
481
    utils.CreateBackup('/root/.ssh/id_dsa.pub')
482

    
483
  utils.RemoveFile('/root/.ssh/id_dsa')
484
  utils.RemoveFile('/root/.ssh/id_dsa.pub')
485

    
486
  result = utils.RunCmd(["ssh-keygen", "-t", "dsa",
487
                         "-f", "/root/.ssh/id_dsa",
488
                         "-q", "-N", ""])
489
  if result.failed:
490
    raise errors.OpExecError("Could not generate ssh keypair, error %s" %
491
                             result.output)
492

    
493
  f = open('/root/.ssh/id_dsa.pub', 'r')
494
  try:
495
    utils.AddAuthorizedKey('/root/.ssh/authorized_keys', f.read(8192))
496
  finally:
497
    f.close()
498

    
499

    
500
def _InitGanetiServerSetup(ss):
501
  """Setup the necessary configuration for the initial node daemon.
502

503
  This creates the nodepass file containing the shared password for
504
  the cluster and also generates the SSL certificate.
505

506
  """
507
  # Create pseudo random password
508
  randpass = sha.new(os.urandom(64)).hexdigest()
509
  # and write it into sstore
510
  ss.SetKey(ss.SS_NODED_PASS, randpass)
511

    
512
  result = utils.RunCmd(["openssl", "req", "-new", "-newkey", "rsa:1024",
513
                         "-days", str(365*5), "-nodes", "-x509",
514
                         "-keyout", constants.SSL_CERT_FILE,
515
                         "-out", constants.SSL_CERT_FILE, "-batch"])
516
  if result.failed:
517
    raise errors.OpExecError("could not generate server ssl cert, command"
518
                             " %s had exitcode %s and error message %s" %
519
                             (result.cmd, result.exit_code, result.output))
520

    
521
  os.chmod(constants.SSL_CERT_FILE, 0400)
522

    
523
  result = utils.RunCmd([constants.NODE_INITD_SCRIPT, "restart"])
524

    
525
  if result.failed:
526
    raise errors.OpExecError("Could not start the node daemon, command %s"
527
                             " had exitcode %s and error %s" %
528
                             (result.cmd, result.exit_code, result.output))
529

    
530

    
531
class LUInitCluster(LogicalUnit):
532
  """Initialise the cluster.
533

534
  """
535
  HPATH = "cluster-init"
536
  HTYPE = constants.HTYPE_CLUSTER
537
  _OP_REQP = ["cluster_name", "hypervisor_type", "vg_name", "mac_prefix",
538
              "def_bridge", "master_netdev"]
539
  REQ_CLUSTER = False
540

    
541
  def BuildHooksEnv(self):
542
    """Build hooks env.
543

544
    Notes: Since we don't require a cluster, we must manually add
545
    ourselves in the post-run node list.
546

547
    """
548
    env = {
549
      "CLUSTER": self.op.cluster_name,
550
      "MASTER": self.hostname['hostname_full'],
551
      }
552
    return env, [], [self.hostname['hostname_full']]
553

    
554
  def CheckPrereq(self):
555
    """Verify that the passed name is a valid one.
556

557
    """
558
    if config.ConfigWriter.IsCluster():
559
      raise errors.OpPrereqError("Cluster is already initialised")
560

    
561
    hostname_local = socket.gethostname()
562
    self.hostname = hostname = utils.LookupHostname(hostname_local)
563
    if not hostname:
564
      raise errors.OpPrereqError("Cannot resolve my own hostname ('%s')" %
565
                                 hostname_local)
566

    
567
    if hostname["hostname_full"] != hostname_local:
568
      raise errors.OpPrereqError("My own hostname (%s) does not match the"
569
                                 " resolver (%s): probably not using FQDN"
570
                                 " for hostname." %
571
                                 (hostname_local, hostname["hostname_full"]))
572

    
573
    if hostname["ip"].startswith("127."):
574
      raise errors.OpPrereqError("This host's IP resolves to the private"
575
                                 " range (%s). Please fix DNS or /etc/hosts." %
576
                                 (hostname["ip"],))
577

    
578
    self.clustername = clustername = utils.LookupHostname(self.op.cluster_name)
579
    if not clustername:
580
      raise errors.OpPrereqError("Cannot resolve given cluster name ('%s')"
581
                                 % self.op.cluster_name)
582

    
583
    result = utils.RunCmd(["fping", "-S127.0.0.1", "-q", hostname['ip']])
584
    if result.failed:
585
      raise errors.OpPrereqError("Inconsistency: this host's name resolves"
586
                                 " to %s,\nbut this ip address does not"
587
                                 " belong to this host."
588
                                 " Aborting." % hostname['ip'])
589

    
590
    secondary_ip = getattr(self.op, "secondary_ip", None)
591
    if secondary_ip and not utils.IsValidIP(secondary_ip):
592
      raise errors.OpPrereqError("Invalid secondary ip given")
593
    if secondary_ip and secondary_ip != hostname['ip']:
594
      result = utils.RunCmd(["fping", "-S127.0.0.1", "-q", secondary_ip])
595
      if result.failed:
596
        raise errors.OpPrereqError("You gave %s as secondary IP,\n"
597
                                   "but it does not belong to this host." %
598
                                   secondary_ip)
599
    self.secondary_ip = secondary_ip
600

    
601
    # checks presence of the volume group given
602
    vgstatus = _HasValidVG(utils.ListVolumeGroups(), self.op.vg_name)
603

    
604
    if vgstatus:
605
      raise errors.OpPrereqError("Error: %s" % vgstatus)
606

    
607
    if not re.match("^[0-9a-z]{2}:[0-9a-z]{2}:[0-9a-z]{2}$",
608
                    self.op.mac_prefix):
609
      raise errors.OpPrereqError("Invalid mac prefix given '%s'" %
610
                                 self.op.mac_prefix)
611

    
612
    if self.op.hypervisor_type not in hypervisor.VALID_HTYPES:
613
      raise errors.OpPrereqError("Invalid hypervisor type given '%s'" %
614
                                 self.op.hypervisor_type)
615

    
616
    result = utils.RunCmd(["ip", "link", "show", "dev", self.op.master_netdev])
617
    if result.failed:
618
      raise errors.OpPrereqError("Invalid master netdev given (%s): '%s'" %
619
                                 (self.op.master_netdev,
620
                                  result.output.strip()))
621

    
622
  def Exec(self, feedback_fn):
623
    """Initialize the cluster.
624

625
    """
626
    clustername = self.clustername
627
    hostname = self.hostname
628

    
629
    # set up the simple store
630
    ss = ssconf.SimpleStore()
631
    ss.SetKey(ss.SS_HYPERVISOR, self.op.hypervisor_type)
632
    ss.SetKey(ss.SS_MASTER_NODE, hostname['hostname_full'])
633
    ss.SetKey(ss.SS_MASTER_IP, clustername['ip'])
634
    ss.SetKey(ss.SS_MASTER_NETDEV, self.op.master_netdev)
635
    ss.SetKey(ss.SS_CLUSTER_NAME, clustername['hostname'])
636

    
637
    # set up the inter-node password and certificate
638
    _InitGanetiServerSetup(ss)
639

    
640
    # start the master ip
641
    rpc.call_node_start_master(hostname['hostname_full'])
642

    
643
    # set up ssh config and /etc/hosts
644
    f = open('/etc/ssh/ssh_host_rsa_key.pub', 'r')
645
    try:
646
      sshline = f.read()
647
    finally:
648
      f.close()
649
    sshkey = sshline.split(" ")[1]
650

    
651
    _UpdateEtcHosts(hostname['hostname_full'],
652
                    hostname['ip'],
653
                    )
654

    
655
    _UpdateKnownHosts(hostname['hostname_full'],
656
                      hostname['ip'],
657
                      sshkey,
658
                      )
659

    
660
    _InitSSHSetup(hostname['hostname'])
661

    
662
    # init of cluster config file
663
    cfgw = config.ConfigWriter()
664
    cfgw.InitConfig(hostname['hostname'], hostname['ip'], self.secondary_ip,
665
                    sshkey, self.op.mac_prefix,
666
                    self.op.vg_name, self.op.def_bridge)
667

    
668

    
669
class LUDestroyCluster(NoHooksLU):
670
  """Logical unit for destroying the cluster.
671

672
  """
673
  _OP_REQP = []
674

    
675
  def CheckPrereq(self):
676
    """Check prerequisites.
677

678
    This checks whether the cluster is empty.
679

680
    Any errors are signalled by raising errors.OpPrereqError.
681

682
    """
683
    master = self.sstore.GetMasterNode()
684

    
685
    nodelist = self.cfg.GetNodeList()
686
    if len(nodelist) != 1 or nodelist[0] != master:
687
      raise errors.OpPrereqError("There are still %d node(s) in"
688
                                 " this cluster." % (len(nodelist) - 1))
689
    instancelist = self.cfg.GetInstanceList()
690
    if instancelist:
691
      raise errors.OpPrereqError("There are still %d instance(s) in"
692
                                 " this cluster." % len(instancelist))
693

    
694
  def Exec(self, feedback_fn):
695
    """Destroys the cluster.
696

697
    """
698
    utils.CreateBackup('/root/.ssh/id_dsa')
699
    utils.CreateBackup('/root/.ssh/id_dsa.pub')
700
    rpc.call_node_leave_cluster(self.sstore.GetMasterNode())
701

    
702

    
703
class LUVerifyCluster(NoHooksLU):
704
  """Verifies the cluster status.
705

706
  """
707
  _OP_REQP = []
708

    
709
  def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
710
                  remote_version, feedback_fn):
711
    """Run multiple tests against a node.
712

713
    Test list:
714
      - compares ganeti version
715
      - checks vg existance and size > 20G
716
      - checks config file checksum
717
      - checks ssh to other nodes
718

719
    Args:
720
      node: name of the node to check
721
      file_list: required list of files
722
      local_cksum: dictionary of local files and their checksums
723

724
    """
725
    # compares ganeti version
726
    local_version = constants.PROTOCOL_VERSION
727
    if not remote_version:
728
      feedback_fn(" - ERROR: connection to %s failed" % (node))
729
      return True
730

    
731
    if local_version != remote_version:
732
      feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
733
                      (local_version, node, remote_version))
734
      return True
735

    
736
    # checks vg existance and size > 20G
737

    
738
    bad = False
739
    if not vglist:
740
      feedback_fn("  - ERROR: unable to check volume groups on node %s." %
741
                      (node,))
742
      bad = True
743
    else:
744
      vgstatus = _HasValidVG(vglist, self.cfg.GetVGName())
745
      if vgstatus:
746
        feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
747
        bad = True
748

    
749
    # checks config file checksum
750
    # checks ssh to any
751

    
752
    if 'filelist' not in node_result:
753
      bad = True
754
      feedback_fn("  - ERROR: node hasn't returned file checksum data")
755
    else:
756
      remote_cksum = node_result['filelist']
757
      for file_name in file_list:
758
        if file_name not in remote_cksum:
759
          bad = True
760
          feedback_fn("  - ERROR: file '%s' missing" % file_name)
761
        elif remote_cksum[file_name] != local_cksum[file_name]:
762
          bad = True
763
          feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
764

    
765
    if 'nodelist' not in node_result:
766
      bad = True
767
      feedback_fn("  - ERROR: node hasn't returned node connectivity data")
768
    else:
769
      if node_result['nodelist']:
770
        bad = True
771
        for node in node_result['nodelist']:
772
          feedback_fn("  - ERROR: communication with node '%s': %s" %
773
                          (node, node_result['nodelist'][node]))
774
    hyp_result = node_result.get('hypervisor', None)
775
    if hyp_result is not None:
776
      feedback_fn("  - ERROR: hypervisor verify failure: '%s'" % hyp_result)
777
    return bad
778

    
779
  def _VerifyInstance(self, instance, node_vol_is, node_instance, feedback_fn):
780
    """Verify an instance.
781

782
    This function checks to see if the required block devices are
783
    available on the instance's node.
784

785
    """
786
    bad = False
787

    
788
    instancelist = self.cfg.GetInstanceList()
789
    if not instance in instancelist:
790
      feedback_fn("  - ERROR: instance %s not in instance list %s" %
791
                      (instance, instancelist))
792
      bad = True
793

    
794
    instanceconfig = self.cfg.GetInstanceInfo(instance)
795
    node_current = instanceconfig.primary_node
796

    
797
    node_vol_should = {}
798
    instanceconfig.MapLVsByNode(node_vol_should)
799

    
800
    for node in node_vol_should:
801
      for volume in node_vol_should[node]:
802
        if node not in node_vol_is or volume not in node_vol_is[node]:
803
          feedback_fn("  - ERROR: volume %s missing on node %s" %
804
                          (volume, node))
805
          bad = True
806

    
807
    if not instanceconfig.status == 'down':
808
      if not instance in node_instance[node_current]:
809
        feedback_fn("  - ERROR: instance %s not running on node %s" %
810
                        (instance, node_current))
811
        bad = True
812

    
813
    for node in node_instance:
814
      if (not node == node_current):
815
        if instance in node_instance[node]:
816
          feedback_fn("  - ERROR: instance %s should not run on node %s" %
817
                          (instance, node))
818
          bad = True
819

    
820
    return not bad
821

    
822
  def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
823
    """Verify if there are any unknown volumes in the cluster.
824

825
    The .os, .swap and backup volumes are ignored. All other volumes are
826
    reported as unknown.
827

828
    """
829
    bad = False
830

    
831
    for node in node_vol_is:
832
      for volume in node_vol_is[node]:
833
        if node not in node_vol_should or volume not in node_vol_should[node]:
834
          feedback_fn("  - ERROR: volume %s on node %s should not exist" %
835
                      (volume, node))
836
          bad = True
837
    return bad
838

    
839
  def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
840
    """Verify the list of running instances.
841

842
    This checks what instances are running but unknown to the cluster.
843

844
    """
845
    bad = False
846
    for node in node_instance:
847
      for runninginstance in node_instance[node]:
848
        if runninginstance not in instancelist:
849
          feedback_fn("  - ERROR: instance %s on node %s should not exist" %
850
                          (runninginstance, node))
851
          bad = True
852
    return bad
853

    
854
  def CheckPrereq(self):
855
    """Check prerequisites.
856

857
    This has no prerequisites.
858

859
    """
860
    pass
861

    
862
  def Exec(self, feedback_fn):
863
    """Verify integrity of cluster, performing various test on nodes.
864

865
    """
866
    bad = False
867
    feedback_fn("* Verifying global settings")
868
    self.cfg.VerifyConfig()
869

    
870
    master = self.sstore.GetMasterNode()
871
    vg_name = self.cfg.GetVGName()
872
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
873
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
874
    node_volume = {}
875
    node_instance = {}
876

    
877
    # FIXME: verify OS list
878
    # do local checksums
879
    file_names = list(self.sstore.GetFileList())
880
    file_names.append(constants.SSL_CERT_FILE)
881
    file_names.append(constants.CLUSTER_CONF_FILE)
882
    local_checksums = utils.FingerprintFiles(file_names)
883

    
884
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
885
    all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
886
    all_instanceinfo = rpc.call_instance_list(nodelist)
887
    all_vglist = rpc.call_vg_list(nodelist)
888
    node_verify_param = {
889
      'filelist': file_names,
890
      'nodelist': nodelist,
891
      'hypervisor': None,
892
      }
893
    all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
894
    all_rversion = rpc.call_version(nodelist)
895

    
896
    for node in nodelist:
897
      feedback_fn("* Verifying node %s" % node)
898
      result = self._VerifyNode(node, file_names, local_checksums,
899
                                all_vglist[node], all_nvinfo[node],
900
                                all_rversion[node], feedback_fn)
901
      bad = bad or result
902

    
903
      # node_volume
904
      volumeinfo = all_volumeinfo[node]
905

    
906
      if type(volumeinfo) != dict:
907
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
908
        bad = True
909
        continue
910

    
911
      node_volume[node] = volumeinfo
912

    
913
      # node_instance
914
      nodeinstance = all_instanceinfo[node]
915
      if type(nodeinstance) != list:
916
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
917
        bad = True
918
        continue
919

    
920
      node_instance[node] = nodeinstance
921

    
922
    node_vol_should = {}
923

    
924
    for instance in instancelist:
925
      feedback_fn("* Verifying instance %s" % instance)
926
      result =  self._VerifyInstance(instance, node_volume, node_instance,
927
                                     feedback_fn)
928
      bad = bad or result
929

    
930
      inst_config = self.cfg.GetInstanceInfo(instance)
931

    
932
      inst_config.MapLVsByNode(node_vol_should)
933

    
934
    feedback_fn("* Verifying orphan volumes")
935
    result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
936
                                       feedback_fn)
937
    bad = bad or result
938

    
939
    feedback_fn("* Verifying remaining instances")
940
    result = self._VerifyOrphanInstances(instancelist, node_instance,
941
                                         feedback_fn)
942
    bad = bad or result
943

    
944
    return int(bad)
945

    
946

    
947
def _WaitForSync(cfgw, instance, oneshot=False, unlock=False):
948
  """Sleep and poll for an instance's disk to sync.
949

950
  """
951
  if not instance.disks:
952
    return True
953

    
954
  if not oneshot:
955
    logger.ToStdout("Waiting for instance %s to sync disks." % instance.name)
956

    
957
  node = instance.primary_node
958

    
959
  for dev in instance.disks:
960
    cfgw.SetDiskID(dev, node)
961

    
962
  retries = 0
963
  while True:
964
    max_time = 0
965
    done = True
966
    cumul_degraded = False
967
    rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
968
    if not rstats:
969
      logger.ToStderr("Can't get any data from node %s" % node)
970
      retries += 1
971
      if retries >= 10:
972
        raise errors.RemoteError("Can't contact node %s for mirror data,"
973
                                 " aborting." % node)
974
      time.sleep(6)
975
      continue
976
    retries = 0
977
    for i in range(len(rstats)):
978
      mstat = rstats[i]
979
      if mstat is None:
980
        logger.ToStderr("Can't compute data for node %s/%s" %
981
                        (node, instance.disks[i].iv_name))
982
        continue
983
      perc_done, est_time, is_degraded = mstat
984
      cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
985
      if perc_done is not None:
986
        done = False
987
        if est_time is not None:
988
          rem_time = "%d estimated seconds remaining" % est_time
989
          max_time = est_time
990
        else:
991
          rem_time = "no time estimate"
992
        logger.ToStdout("- device %s: %5.2f%% done, %s" %
993
                        (instance.disks[i].iv_name, perc_done, rem_time))
994
    if done or oneshot:
995
      break
996

    
997
    if unlock:
998
      utils.Unlock('cmd')
999
    try:
1000
      time.sleep(min(60, max_time))
1001
    finally:
1002
      if unlock:
1003
        utils.Lock('cmd')
1004

    
1005
  if done:
1006
    logger.ToStdout("Instance %s's disks are in sync." % instance.name)
1007
  return not cumul_degraded
1008

    
1009

    
1010
def _CheckDiskConsistency(cfgw, dev, node, on_primary):
1011
  """Check that mirrors are not degraded.
1012

1013
  """
1014
  cfgw.SetDiskID(dev, node)
1015

    
1016
  result = True
1017
  if on_primary or dev.AssembleOnSecondary():
1018
    rstats = rpc.call_blockdev_find(node, dev)
1019
    if not rstats:
1020
      logger.ToStderr("Can't get any data from node %s" % node)
1021
      result = False
1022
    else:
1023
      result = result and (not rstats[5])
1024
  if dev.children:
1025
    for child in dev.children:
1026
      result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1027

    
1028
  return result
1029

    
1030

    
1031
class LUDiagnoseOS(NoHooksLU):
1032
  """Logical unit for OS diagnose/query.
1033

1034
  """
1035
  _OP_REQP = []
1036

    
1037
  def CheckPrereq(self):
1038
    """Check prerequisites.
1039

1040
    This always succeeds, since this is a pure query LU.
1041

1042
    """
1043
    return
1044

    
1045
  def Exec(self, feedback_fn):
1046
    """Compute the list of OSes.
1047

1048
    """
1049
    node_list = self.cfg.GetNodeList()
1050
    node_data = rpc.call_os_diagnose(node_list)
1051
    if node_data == False:
1052
      raise errors.OpExecError("Can't gather the list of OSes")
1053
    return node_data
1054

    
1055

    
1056
class LURemoveNode(LogicalUnit):
1057
  """Logical unit for removing a node.
1058

1059
  """
1060
  HPATH = "node-remove"
1061
  HTYPE = constants.HTYPE_NODE
1062
  _OP_REQP = ["node_name"]
1063

    
1064
  def BuildHooksEnv(self):
1065
    """Build hooks env.
1066

1067
    This doesn't run on the target node in the pre phase as a failed
1068
    node would not allows itself to run.
1069

1070
    """
1071
    env = {
1072
      "NODE_NAME": self.op.node_name,
1073
      }
1074
    all_nodes = self.cfg.GetNodeList()
1075
    all_nodes.remove(self.op.node_name)
1076
    return env, all_nodes, all_nodes
1077

    
1078
  def CheckPrereq(self):
1079
    """Check prerequisites.
1080

1081
    This checks:
1082
     - the node exists in the configuration
1083
     - it does not have primary or secondary instances
1084
     - it's not the master
1085

1086
    Any errors are signalled by raising errors.OpPrereqError.
1087

1088
    """
1089
    node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1090
    if node is None:
1091
      raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1092

    
1093
    instance_list = self.cfg.GetInstanceList()
1094

    
1095
    masternode = self.sstore.GetMasterNode()
1096
    if node.name == masternode:
1097
      raise errors.OpPrereqError("Node is the master node,"
1098
                                 " you need to failover first.")
1099

    
1100
    for instance_name in instance_list:
1101
      instance = self.cfg.GetInstanceInfo(instance_name)
1102
      if node.name == instance.primary_node:
1103
        raise errors.OpPrereqError("Instance %s still running on the node,"
1104
                                   " please remove first." % instance_name)
1105
      if node.name in instance.secondary_nodes:
1106
        raise errors.OpPrereqError("Instance %s has node as a secondary,"
1107
                                   " please remove first." % instance_name)
1108
    self.op.node_name = node.name
1109
    self.node = node
1110

    
1111
  def Exec(self, feedback_fn):
1112
    """Removes the node from the cluster.
1113

1114
    """
1115
    node = self.node
1116
    logger.Info("stopping the node daemon and removing configs from node %s" %
1117
                node.name)
1118

    
1119
    rpc.call_node_leave_cluster(node.name)
1120

    
1121
    ssh.SSHCall(node.name, 'root', "%s stop" % constants.NODE_INITD_SCRIPT)
1122

    
1123
    logger.Info("Removing node %s from config" % node.name)
1124

    
1125
    self.cfg.RemoveNode(node.name)
1126

    
1127

    
1128
class LUQueryNodes(NoHooksLU):
1129
  """Logical unit for querying nodes.
1130

1131
  """
1132
  _OP_REQP = ["output_fields", "nodes"]
1133

    
1134
  def CheckPrereq(self):
1135
    """Check prerequisites.
1136

1137
    This checks that the fields required are valid output fields.
1138

1139
    """
1140
    self.dynamic_fields = frozenset(["dtotal", "dfree",
1141
                                     "mtotal", "mnode", "mfree"])
1142

    
1143
    _CheckOutputFields(static=["name", "pinst_cnt", "sinst_cnt",
1144
                               "pinst_list", "sinst_list",
1145
                               "pip", "sip"],
1146
                       dynamic=self.dynamic_fields,
1147
                       selected=self.op.output_fields)
1148

    
1149
    self.wanted_nodes = _GetWantedNodes(self, self.op.nodes)
1150

    
1151
  def Exec(self, feedback_fn):
1152
    """Computes the list of nodes and their attributes.
1153

1154
    """
1155
    nodenames = self.wanted_nodes
1156
    nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1157

    
1158
    # begin data gathering
1159

    
1160
    if self.dynamic_fields.intersection(self.op.output_fields):
1161
      live_data = {}
1162
      node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1163
      for name in nodenames:
1164
        nodeinfo = node_data.get(name, None)
1165
        if nodeinfo:
1166
          live_data[name] = {
1167
            "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1168
            "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1169
            "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1170
            "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1171
            "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1172
            }
1173
        else:
1174
          live_data[name] = {}
1175
    else:
1176
      live_data = dict.fromkeys(nodenames, {})
1177

    
1178
    node_to_primary = dict([(name, set()) for name in nodenames])
1179
    node_to_secondary = dict([(name, set()) for name in nodenames])
1180

    
1181
    inst_fields = frozenset(("pinst_cnt", "pinst_list",
1182
                             "sinst_cnt", "sinst_list"))
1183
    if inst_fields & frozenset(self.op.output_fields):
1184
      instancelist = self.cfg.GetInstanceList()
1185

    
1186
      for instance_name in instancelist:
1187
        inst = self.cfg.GetInstanceInfo(instance_name)
1188
        if inst.primary_node in node_to_primary:
1189
          node_to_primary[inst.primary_node].add(inst.name)
1190
        for secnode in inst.secondary_nodes:
1191
          if secnode in node_to_secondary:
1192
            node_to_secondary[secnode].add(inst.name)
1193

    
1194
    # end data gathering
1195

    
1196
    output = []
1197
    for node in nodelist:
1198
      node_output = []
1199
      for field in self.op.output_fields:
1200
        if field == "name":
1201
          val = node.name
1202
        elif field == "pinst_list":
1203
          val = list(node_to_primary[node.name])
1204
        elif field == "sinst_list":
1205
          val = list(node_to_secondary[node.name])
1206
        elif field == "pinst_cnt":
1207
          val = len(node_to_primary[node.name])
1208
        elif field == "sinst_cnt":
1209
          val = len(node_to_secondary[node.name])
1210
        elif field == "pip":
1211
          val = node.primary_ip
1212
        elif field == "sip":
1213
          val = node.secondary_ip
1214
        elif field in self.dynamic_fields:
1215
          val = live_data[node.name].get(field, None)
1216
        else:
1217
          raise errors.ParameterError(field)
1218
        node_output.append(val)
1219
      output.append(node_output)
1220

    
1221
    return output
1222

    
1223

    
1224
class LUQueryNodeVolumes(NoHooksLU):
1225
  """Logical unit for getting volumes on node(s).
1226

1227
  """
1228
  _OP_REQP = ["nodes", "output_fields"]
1229

    
1230
  def CheckPrereq(self):
1231
    """Check prerequisites.
1232

1233
    This checks that the fields required are valid output fields.
1234

1235
    """
1236
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1237

    
1238
    _CheckOutputFields(static=["node"],
1239
                       dynamic=["phys", "vg", "name", "size", "instance"],
1240
                       selected=self.op.output_fields)
1241

    
1242

    
1243
  def Exec(self, feedback_fn):
1244
    """Computes the list of nodes and their attributes.
1245

1246
    """
1247
    nodenames = self.nodes
1248
    volumes = rpc.call_node_volumes(nodenames)
1249

    
1250
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
1251
             in self.cfg.GetInstanceList()]
1252

    
1253
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1254

    
1255
    output = []
1256
    for node in nodenames:
1257
      if node not in volumes or not volumes[node]:
1258
        continue
1259

    
1260
      node_vols = volumes[node][:]
1261
      node_vols.sort(key=lambda vol: vol['dev'])
1262

    
1263
      for vol in node_vols:
1264
        node_output = []
1265
        for field in self.op.output_fields:
1266
          if field == "node":
1267
            val = node
1268
          elif field == "phys":
1269
            val = vol['dev']
1270
          elif field == "vg":
1271
            val = vol['vg']
1272
          elif field == "name":
1273
            val = vol['name']
1274
          elif field == "size":
1275
            val = int(float(vol['size']))
1276
          elif field == "instance":
1277
            for inst in ilist:
1278
              if node not in lv_by_node[inst]:
1279
                continue
1280
              if vol['name'] in lv_by_node[inst][node]:
1281
                val = inst.name
1282
                break
1283
            else:
1284
              val = '-'
1285
          else:
1286
            raise errors.ParameterError(field)
1287
          node_output.append(str(val))
1288

    
1289
        output.append(node_output)
1290

    
1291
    return output
1292

    
1293

    
1294
class LUAddNode(LogicalUnit):
1295
  """Logical unit for adding node to the cluster.
1296

1297
  """
1298
  HPATH = "node-add"
1299
  HTYPE = constants.HTYPE_NODE
1300
  _OP_REQP = ["node_name"]
1301

    
1302
  def BuildHooksEnv(self):
1303
    """Build hooks env.
1304

1305
    This will run on all nodes before, and on all nodes + the new node after.
1306

1307
    """
1308
    env = {
1309
      "NODE_NAME": self.op.node_name,
1310
      "NODE_PIP": self.op.primary_ip,
1311
      "NODE_SIP": self.op.secondary_ip,
1312
      }
1313
    nodes_0 = self.cfg.GetNodeList()
1314
    nodes_1 = nodes_0 + [self.op.node_name, ]
1315
    return env, nodes_0, nodes_1
1316

    
1317
  def CheckPrereq(self):
1318
    """Check prerequisites.
1319

1320
    This checks:
1321
     - the new node is not already in the config
1322
     - it is resolvable
1323
     - its parameters (single/dual homed) matches the cluster
1324

1325
    Any errors are signalled by raising errors.OpPrereqError.
1326

1327
    """
1328
    node_name = self.op.node_name
1329
    cfg = self.cfg
1330

    
1331
    dns_data = utils.LookupHostname(node_name)
1332
    if not dns_data:
1333
      raise errors.OpPrereqError("Node %s is not resolvable" % node_name)
1334

    
1335
    node = dns_data['hostname']
1336
    primary_ip = self.op.primary_ip = dns_data['ip']
1337
    secondary_ip = getattr(self.op, "secondary_ip", None)
1338
    if secondary_ip is None:
1339
      secondary_ip = primary_ip
1340
    if not utils.IsValidIP(secondary_ip):
1341
      raise errors.OpPrereqError("Invalid secondary IP given")
1342
    self.op.secondary_ip = secondary_ip
1343
    node_list = cfg.GetNodeList()
1344
    if node in node_list:
1345
      raise errors.OpPrereqError("Node %s is already in the configuration"
1346
                                 % node)
1347

    
1348
    for existing_node_name in node_list:
1349
      existing_node = cfg.GetNodeInfo(existing_node_name)
1350
      if (existing_node.primary_ip == primary_ip or
1351
          existing_node.secondary_ip == primary_ip or
1352
          existing_node.primary_ip == secondary_ip or
1353
          existing_node.secondary_ip == secondary_ip):
1354
        raise errors.OpPrereqError("New node ip address(es) conflict with"
1355
                                   " existing node %s" % existing_node.name)
1356

    
1357
    # check that the type of the node (single versus dual homed) is the
1358
    # same as for the master
1359
    myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1360
    master_singlehomed = myself.secondary_ip == myself.primary_ip
1361
    newbie_singlehomed = secondary_ip == primary_ip
1362
    if master_singlehomed != newbie_singlehomed:
1363
      if master_singlehomed:
1364
        raise errors.OpPrereqError("The master has no private ip but the"
1365
                                   " new node has one")
1366
      else:
1367
        raise errors.OpPrereqError("The master has a private ip but the"
1368
                                   " new node doesn't have one")
1369

    
1370
    # checks reachablity
1371
    command = ["fping", "-q", primary_ip]
1372
    result = utils.RunCmd(command)
1373
    if result.failed:
1374
      raise errors.OpPrereqError("Node not reachable by ping")
1375

    
1376
    if not newbie_singlehomed:
1377
      # check reachability from my secondary ip to newbie's secondary ip
1378
      command = ["fping", "-S%s" % myself.secondary_ip, "-q", secondary_ip]
1379
      result = utils.RunCmd(command)
1380
      if result.failed:
1381
        raise errors.OpPrereqError("Node secondary ip not reachable by ping")
1382

    
1383
    self.new_node = objects.Node(name=node,
1384
                                 primary_ip=primary_ip,
1385
                                 secondary_ip=secondary_ip)
1386

    
1387
  def Exec(self, feedback_fn):
1388
    """Adds the new node to the cluster.
1389

1390
    """
1391
    new_node = self.new_node
1392
    node = new_node.name
1393

    
1394
    # set up inter-node password and certificate and restarts the node daemon
1395
    gntpass = self.sstore.GetNodeDaemonPassword()
1396
    if not re.match('^[a-zA-Z0-9.]{1,64}$', gntpass):
1397
      raise errors.OpExecError("ganeti password corruption detected")
1398
    f = open(constants.SSL_CERT_FILE)
1399
    try:
1400
      gntpem = f.read(8192)
1401
    finally:
1402
      f.close()
1403
    # in the base64 pem encoding, neither '!' nor '.' are valid chars,
1404
    # so we use this to detect an invalid certificate; as long as the
1405
    # cert doesn't contain this, the here-document will be correctly
1406
    # parsed by the shell sequence below
1407
    if re.search('^!EOF\.', gntpem, re.MULTILINE):
1408
      raise errors.OpExecError("invalid PEM encoding in the SSL certificate")
1409
    if not gntpem.endswith("\n"):
1410
      raise errors.OpExecError("PEM must end with newline")
1411
    logger.Info("copy cluster pass to %s and starting the node daemon" % node)
1412

    
1413
    # and then connect with ssh to set password and start ganeti-noded
1414
    # note that all the below variables are sanitized at this point,
1415
    # either by being constants or by the checks above
1416
    ss = self.sstore
1417
    mycommand = ("umask 077 && "
1418
                 "echo '%s' > '%s' && "
1419
                 "cat > '%s' << '!EOF.' && \n"
1420
                 "%s!EOF.\n%s restart" %
1421
                 (gntpass, ss.KeyToFilename(ss.SS_NODED_PASS),
1422
                  constants.SSL_CERT_FILE, gntpem,
1423
                  constants.NODE_INITD_SCRIPT))
1424

    
1425
    result = ssh.SSHCall(node, 'root', mycommand, batch=False, ask_key=True)
1426
    if result.failed:
1427
      raise errors.OpExecError("Remote command on node %s, error: %s,"
1428
                               " output: %s" %
1429
                               (node, result.fail_reason, result.output))
1430

    
1431
    # check connectivity
1432
    time.sleep(4)
1433

    
1434
    result = rpc.call_version([node])[node]
1435
    if result:
1436
      if constants.PROTOCOL_VERSION == result:
1437
        logger.Info("communication to node %s fine, sw version %s match" %
1438
                    (node, result))
1439
      else:
1440
        raise errors.OpExecError("Version mismatch master version %s,"
1441
                                 " node version %s" %
1442
                                 (constants.PROTOCOL_VERSION, result))
1443
    else:
1444
      raise errors.OpExecError("Cannot get version from the new node")
1445

    
1446
    # setup ssh on node
1447
    logger.Info("copy ssh key to node %s" % node)
1448
    keyarray = []
1449
    keyfiles = ["/etc/ssh/ssh_host_dsa_key", "/etc/ssh/ssh_host_dsa_key.pub",
1450
                "/etc/ssh/ssh_host_rsa_key", "/etc/ssh/ssh_host_rsa_key.pub",
1451
                "/root/.ssh/id_dsa", "/root/.ssh/id_dsa.pub"]
1452

    
1453
    for i in keyfiles:
1454
      f = open(i, 'r')
1455
      try:
1456
        keyarray.append(f.read())
1457
      finally:
1458
        f.close()
1459

    
1460
    result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1461
                               keyarray[3], keyarray[4], keyarray[5])
1462

    
1463
    if not result:
1464
      raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1465

    
1466
    # Add node to our /etc/hosts, and add key to known_hosts
1467
    _UpdateEtcHosts(new_node.name, new_node.primary_ip)
1468
    _UpdateKnownHosts(new_node.name, new_node.primary_ip,
1469
                      self.cfg.GetHostKey())
1470

    
1471
    if new_node.secondary_ip != new_node.primary_ip:
1472
      result = ssh.SSHCall(node, "root",
1473
                           "fping -S 127.0.0.1 -q %s" % new_node.secondary_ip)
1474
      if result.failed:
1475
        raise errors.OpExecError("Node claims it doesn't have the"
1476
                                 " secondary ip you gave (%s).\n"
1477
                                 "Please fix and re-run this command." %
1478
                                 new_node.secondary_ip)
1479

    
1480
    success, msg = ssh.VerifyNodeHostname(node)
1481
    if not success:
1482
      raise errors.OpExecError("Node '%s' claims it has a different hostname"
1483
                               " than the one the resolver gives: %s.\n"
1484
                               "Please fix and re-run this command." %
1485
                               (node, msg))
1486

    
1487
    # Distribute updated /etc/hosts and known_hosts to all nodes,
1488
    # including the node just added
1489
    myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1490
    dist_nodes = self.cfg.GetNodeList() + [node]
1491
    if myself.name in dist_nodes:
1492
      dist_nodes.remove(myself.name)
1493

    
1494
    logger.Debug("Copying hosts and known_hosts to all nodes")
1495
    for fname in ("/etc/hosts", constants.SSH_KNOWN_HOSTS_FILE):
1496
      result = rpc.call_upload_file(dist_nodes, fname)
1497
      for to_node in dist_nodes:
1498
        if not result[to_node]:
1499
          logger.Error("copy of file %s to node %s failed" %
1500
                       (fname, to_node))
1501

    
1502
    to_copy = ss.GetFileList()
1503
    for fname in to_copy:
1504
      if not ssh.CopyFileToNode(node, fname):
1505
        logger.Error("could not copy file %s to node %s" % (fname, node))
1506

    
1507
    logger.Info("adding node %s to cluster.conf" % node)
1508
    self.cfg.AddNode(new_node)
1509

    
1510

    
1511
class LUMasterFailover(LogicalUnit):
1512
  """Failover the master node to the current node.
1513

1514
  This is a special LU in that it must run on a non-master node.
1515

1516
  """
1517
  HPATH = "master-failover"
1518
  HTYPE = constants.HTYPE_CLUSTER
1519
  REQ_MASTER = False
1520
  _OP_REQP = []
1521

    
1522
  def BuildHooksEnv(self):
1523
    """Build hooks env.
1524

1525
    This will run on the new master only in the pre phase, and on all
1526
    the nodes in the post phase.
1527

1528
    """
1529
    env = {
1530
      "NEW_MASTER": self.new_master,
1531
      "OLD_MASTER": self.old_master,
1532
      }
1533
    return env, [self.new_master], self.cfg.GetNodeList()
1534

    
1535
  def CheckPrereq(self):
1536
    """Check prerequisites.
1537

1538
    This checks that we are not already the master.
1539

1540
    """
1541
    self.new_master = socket.gethostname()
1542

    
1543
    self.old_master = self.sstore.GetMasterNode()
1544

    
1545
    if self.old_master == self.new_master:
1546
      raise errors.OpPrereqError("This commands must be run on the node"
1547
                                 " where you want the new master to be.\n"
1548
                                 "%s is already the master" %
1549
                                 self.old_master)
1550

    
1551
  def Exec(self, feedback_fn):
1552
    """Failover the master node.
1553

1554
    This command, when run on a non-master node, will cause the current
1555
    master to cease being master, and the non-master to become new
1556
    master.
1557

1558
    """
1559
    #TODO: do not rely on gethostname returning the FQDN
1560
    logger.Info("setting master to %s, old master: %s" %
1561
                (self.new_master, self.old_master))
1562

    
1563
    if not rpc.call_node_stop_master(self.old_master):
1564
      logger.Error("could disable the master role on the old master"
1565
                   " %s, please disable manually" % self.old_master)
1566

    
1567
    ss = self.sstore
1568
    ss.SetKey(ss.SS_MASTER_NODE, self.new_master)
1569
    if not rpc.call_upload_file(self.cfg.GetNodeList(),
1570
                                ss.KeyToFilename(ss.SS_MASTER_NODE)):
1571
      logger.Error("could not distribute the new simple store master file"
1572
                   " to the other nodes, please check.")
1573

    
1574
    if not rpc.call_node_start_master(self.new_master):
1575
      logger.Error("could not start the master role on the new master"
1576
                   " %s, please check" % self.new_master)
1577
      feedback_fn("Error in activating the master IP on the new master,\n"
1578
                  "please fix manually.")
1579

    
1580

    
1581

    
1582
class LUQueryClusterInfo(NoHooksLU):
1583
  """Query cluster configuration.
1584

1585
  """
1586
  _OP_REQP = []
1587
  REQ_MASTER = False
1588

    
1589
  def CheckPrereq(self):
1590
    """No prerequsites needed for this LU.
1591

1592
    """
1593
    pass
1594

    
1595
  def Exec(self, feedback_fn):
1596
    """Return cluster config.
1597

1598
    """
1599
    result = {
1600
      "name": self.sstore.GetClusterName(),
1601
      "software_version": constants.RELEASE_VERSION,
1602
      "protocol_version": constants.PROTOCOL_VERSION,
1603
      "config_version": constants.CONFIG_VERSION,
1604
      "os_api_version": constants.OS_API_VERSION,
1605
      "export_version": constants.EXPORT_VERSION,
1606
      "master": self.sstore.GetMasterNode(),
1607
      "architecture": (platform.architecture()[0], platform.machine()),
1608
      }
1609

    
1610
    return result
1611

    
1612

    
1613
class LUClusterCopyFile(NoHooksLU):
1614
  """Copy file to cluster.
1615

1616
  """
1617
  _OP_REQP = ["nodes", "filename"]
1618

    
1619
  def CheckPrereq(self):
1620
    """Check prerequisites.
1621

1622
    It should check that the named file exists and that the given list
1623
    of nodes is valid.
1624

1625
    """
1626
    if not os.path.exists(self.op.filename):
1627
      raise errors.OpPrereqError("No such filename '%s'" % self.op.filename)
1628

    
1629
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1630

    
1631
  def Exec(self, feedback_fn):
1632
    """Copy a file from master to some nodes.
1633

1634
    Args:
1635
      opts - class with options as members
1636
      args - list containing a single element, the file name
1637
    Opts used:
1638
      nodes - list containing the name of target nodes; if empty, all nodes
1639

1640
    """
1641
    filename = self.op.filename
1642

    
1643
    myname = socket.gethostname()
1644

    
1645
    for node in self.nodes:
1646
      if node == myname:
1647
        continue
1648
      if not ssh.CopyFileToNode(node, filename):
1649
        logger.Error("Copy of file %s to node %s failed" % (filename, node))
1650

    
1651

    
1652
class LUDumpClusterConfig(NoHooksLU):
1653
  """Return a text-representation of the cluster-config.
1654

1655
  """
1656
  _OP_REQP = []
1657

    
1658
  def CheckPrereq(self):
1659
    """No prerequisites.
1660

1661
    """
1662
    pass
1663

    
1664
  def Exec(self, feedback_fn):
1665
    """Dump a representation of the cluster config to the standard output.
1666

1667
    """
1668
    return self.cfg.DumpConfig()
1669

    
1670

    
1671
class LURunClusterCommand(NoHooksLU):
1672
  """Run a command on some nodes.
1673

1674
  """
1675
  _OP_REQP = ["command", "nodes"]
1676

    
1677
  def CheckPrereq(self):
1678
    """Check prerequisites.
1679

1680
    It checks that the given list of nodes is valid.
1681

1682
    """
1683
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1684

    
1685
  def Exec(self, feedback_fn):
1686
    """Run a command on some nodes.
1687

1688
    """
1689
    data = []
1690
    for node in self.nodes:
1691
      result = ssh.SSHCall(node, "root", self.op.command)
1692
      data.append((node, result.output, result.exit_code))
1693

    
1694
    return data
1695

    
1696

    
1697
class LUActivateInstanceDisks(NoHooksLU):
1698
  """Bring up an instance's disks.
1699

1700
  """
1701
  _OP_REQP = ["instance_name"]
1702

    
1703
  def CheckPrereq(self):
1704
    """Check prerequisites.
1705

1706
    This checks that the instance is in the cluster.
1707

1708
    """
1709
    instance = self.cfg.GetInstanceInfo(
1710
      self.cfg.ExpandInstanceName(self.op.instance_name))
1711
    if instance is None:
1712
      raise errors.OpPrereqError("Instance '%s' not known" %
1713
                                 self.op.instance_name)
1714
    self.instance = instance
1715

    
1716

    
1717
  def Exec(self, feedback_fn):
1718
    """Activate the disks.
1719

1720
    """
1721
    disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
1722
    if not disks_ok:
1723
      raise errors.OpExecError("Cannot activate block devices")
1724

    
1725
    return disks_info
1726

    
1727

    
1728
def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
1729
  """Prepare the block devices for an instance.
1730

1731
  This sets up the block devices on all nodes.
1732

1733
  Args:
1734
    instance: a ganeti.objects.Instance object
1735
    ignore_secondaries: if true, errors on secondary nodes won't result
1736
                        in an error return from the function
1737

1738
  Returns:
1739
    false if the operation failed
1740
    list of (host, instance_visible_name, node_visible_name) if the operation
1741
         suceeded with the mapping from node devices to instance devices
1742
  """
1743
  device_info = []
1744
  disks_ok = True
1745
  for inst_disk in instance.disks:
1746
    master_result = None
1747
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1748
      cfg.SetDiskID(node_disk, node)
1749
      is_primary = node == instance.primary_node
1750
      result = rpc.call_blockdev_assemble(node, node_disk, is_primary)
1751
      if not result:
1752
        logger.Error("could not prepare block device %s on node %s (is_pri"
1753
                     "mary=%s)" % (inst_disk.iv_name, node, is_primary))
1754
        if is_primary or not ignore_secondaries:
1755
          disks_ok = False
1756
      if is_primary:
1757
        master_result = result
1758
    device_info.append((instance.primary_node, inst_disk.iv_name,
1759
                        master_result))
1760

    
1761
  return disks_ok, device_info
1762

    
1763

    
1764
def _StartInstanceDisks(cfg, instance, force):
1765
  """Start the disks of an instance.
1766

1767
  """
1768
  disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
1769
                                           ignore_secondaries=force)
1770
  if not disks_ok:
1771
    _ShutdownInstanceDisks(instance, cfg)
1772
    if force is not None and not force:
1773
      logger.Error("If the message above refers to a secondary node,"
1774
                   " you can retry the operation using '--force'.")
1775
    raise errors.OpExecError("Disk consistency error")
1776

    
1777

    
1778
class LUDeactivateInstanceDisks(NoHooksLU):
1779
  """Shutdown an instance's disks.
1780

1781
  """
1782
  _OP_REQP = ["instance_name"]
1783

    
1784
  def CheckPrereq(self):
1785
    """Check prerequisites.
1786

1787
    This checks that the instance is in the cluster.
1788

1789
    """
1790
    instance = self.cfg.GetInstanceInfo(
1791
      self.cfg.ExpandInstanceName(self.op.instance_name))
1792
    if instance is None:
1793
      raise errors.OpPrereqError("Instance '%s' not known" %
1794
                                 self.op.instance_name)
1795
    self.instance = instance
1796

    
1797
  def Exec(self, feedback_fn):
1798
    """Deactivate the disks
1799

1800
    """
1801
    instance = self.instance
1802
    ins_l = rpc.call_instance_list([instance.primary_node])
1803
    ins_l = ins_l[instance.primary_node]
1804
    if not type(ins_l) is list:
1805
      raise errors.OpExecError("Can't contact node '%s'" %
1806
                               instance.primary_node)
1807

    
1808
    if self.instance.name in ins_l:
1809
      raise errors.OpExecError("Instance is running, can't shutdown"
1810
                               " block devices.")
1811

    
1812
    _ShutdownInstanceDisks(instance, self.cfg)
1813

    
1814

    
1815
def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
1816
  """Shutdown block devices of an instance.
1817

1818
  This does the shutdown on all nodes of the instance.
1819

1820
  If the ignore_primary is false, errors on the primary node are
1821
  ignored.
1822

1823
  """
1824
  result = True
1825
  for disk in instance.disks:
1826
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
1827
      cfg.SetDiskID(top_disk, node)
1828
      if not rpc.call_blockdev_shutdown(node, top_disk):
1829
        logger.Error("could not shutdown block device %s on node %s" %
1830
                     (disk.iv_name, node))
1831
        if not ignore_primary or node != instance.primary_node:
1832
          result = False
1833
  return result
1834

    
1835

    
1836
class LUStartupInstance(LogicalUnit):
1837
  """Starts an instance.
1838

1839
  """
1840
  HPATH = "instance-start"
1841
  HTYPE = constants.HTYPE_INSTANCE
1842
  _OP_REQP = ["instance_name", "force"]
1843

    
1844
  def BuildHooksEnv(self):
1845
    """Build hooks env.
1846

1847
    This runs on master, primary and secondary nodes of the instance.
1848

1849
    """
1850
    env = {
1851
      "FORCE": self.op.force,
1852
      }
1853
    env.update(_BuildInstanceHookEnvByObject(self.instance))
1854
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
1855
          list(self.instance.secondary_nodes))
1856
    return env, nl, nl
1857

    
1858
  def CheckPrereq(self):
1859
    """Check prerequisites.
1860

1861
    This checks that the instance is in the cluster.
1862

1863
    """
1864
    instance = self.cfg.GetInstanceInfo(
1865
      self.cfg.ExpandInstanceName(self.op.instance_name))
1866
    if instance is None:
1867
      raise errors.OpPrereqError("Instance '%s' not known" %
1868
                                 self.op.instance_name)
1869

    
1870
    # check bridges existance
1871
    brlist = [nic.bridge for nic in instance.nics]
1872
    if not rpc.call_bridges_exist(instance.primary_node, brlist):
1873
      raise errors.OpPrereqError("one or more target bridges %s does not"
1874
                                 " exist on destination node '%s'" %
1875
                                 (brlist, instance.primary_node))
1876

    
1877
    self.instance = instance
1878
    self.op.instance_name = instance.name
1879

    
1880
  def Exec(self, feedback_fn):
1881
    """Start the instance.
1882

1883
    """
1884
    instance = self.instance
1885
    force = self.op.force
1886
    extra_args = getattr(self.op, "extra_args", "")
1887

    
1888
    node_current = instance.primary_node
1889

    
1890
    nodeinfo = rpc.call_node_info([node_current], self.cfg.GetVGName())
1891
    if not nodeinfo:
1892
      raise errors.OpExecError("Could not contact node %s for infos" %
1893
                               (node_current))
1894

    
1895
    freememory = nodeinfo[node_current]['memory_free']
1896
    memory = instance.memory
1897
    if memory > freememory:
1898
      raise errors.OpExecError("Not enough memory to start instance"
1899
                               " %s on node %s"
1900
                               " needed %s MiB, available %s MiB" %
1901
                               (instance.name, node_current, memory,
1902
                                freememory))
1903

    
1904
    _StartInstanceDisks(self.cfg, instance, force)
1905

    
1906
    if not rpc.call_instance_start(node_current, instance, extra_args):
1907
      _ShutdownInstanceDisks(instance, self.cfg)
1908
      raise errors.OpExecError("Could not start instance")
1909

    
1910
    self.cfg.MarkInstanceUp(instance.name)
1911

    
1912

    
1913
class LUShutdownInstance(LogicalUnit):
1914
  """Shutdown an instance.
1915

1916
  """
1917
  HPATH = "instance-stop"
1918
  HTYPE = constants.HTYPE_INSTANCE
1919
  _OP_REQP = ["instance_name"]
1920

    
1921
  def BuildHooksEnv(self):
1922
    """Build hooks env.
1923

1924
    This runs on master, primary and secondary nodes of the instance.
1925

1926
    """
1927
    env = _BuildInstanceHookEnvByObject(self.instance)
1928
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
1929
          list(self.instance.secondary_nodes))
1930
    return env, nl, nl
1931

    
1932
  def CheckPrereq(self):
1933
    """Check prerequisites.
1934

1935
    This checks that the instance is in the cluster.
1936

1937
    """
1938
    instance = self.cfg.GetInstanceInfo(
1939
      self.cfg.ExpandInstanceName(self.op.instance_name))
1940
    if instance is None:
1941
      raise errors.OpPrereqError("Instance '%s' not known" %
1942
                                 self.op.instance_name)
1943
    self.instance = instance
1944

    
1945
  def Exec(self, feedback_fn):
1946
    """Shutdown the instance.
1947

1948
    """
1949
    instance = self.instance
1950
    node_current = instance.primary_node
1951
    if not rpc.call_instance_shutdown(node_current, instance):
1952
      logger.Error("could not shutdown instance")
1953

    
1954
    self.cfg.MarkInstanceDown(instance.name)
1955
    _ShutdownInstanceDisks(instance, self.cfg)
1956

    
1957

    
1958
class LUReinstallInstance(LogicalUnit):
1959
  """Reinstall an instance.
1960

1961
  """
1962
  HPATH = "instance-reinstall"
1963
  HTYPE = constants.HTYPE_INSTANCE
1964
  _OP_REQP = ["instance_name"]
1965

    
1966
  def BuildHooksEnv(self):
1967
    """Build hooks env.
1968

1969
    This runs on master, primary and secondary nodes of the instance.
1970

1971
    """
1972
    env = _BuildInstanceHookEnvByObject(self.instance)
1973
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
1974
          list(self.instance.secondary_nodes))
1975
    return env, nl, nl
1976

    
1977
  def CheckPrereq(self):
1978
    """Check prerequisites.
1979

1980
    This checks that the instance is in the cluster and is not running.
1981

1982
    """
1983
    instance = self.cfg.GetInstanceInfo(
1984
      self.cfg.ExpandInstanceName(self.op.instance_name))
1985
    if instance is None:
1986
      raise errors.OpPrereqError("Instance '%s' not known" %
1987
                                 self.op.instance_name)
1988
    if instance.disk_template == constants.DT_DISKLESS:
1989
      raise errors.OpPrereqError("Instance '%s' has no disks" %
1990
                                 self.op.instance_name)
1991
    if instance.status != "down":
1992
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
1993
                                 self.op.instance_name)
1994
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
1995
    if remote_info:
1996
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
1997
                                 (self.op.instance_name,
1998
                                  instance.primary_node))
1999

    
2000
    self.op.os_type = getattr(self.op, "os_type", None)
2001
    if self.op.os_type is not None:
2002
      # OS verification
2003
      pnode = self.cfg.GetNodeInfo(
2004
        self.cfg.ExpandNodeName(instance.primary_node))
2005
      if pnode is None:
2006
        raise errors.OpPrereqError("Primary node '%s' is unknown" %
2007
                                   self.op.pnode)
2008
      os_obj = rpc.call_os_get([pnode.name], self.op.os_type)[pnode.name]
2009
      if not isinstance(os_obj, objects.OS):
2010
        raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2011
                                   " primary node"  % self.op.os_type)
2012

    
2013
    self.instance = instance
2014

    
2015
  def Exec(self, feedback_fn):
2016
    """Reinstall the instance.
2017

2018
    """
2019
    inst = self.instance
2020

    
2021
    if self.op.os_type is not None:
2022
      feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2023
      inst.os = self.op.os_type
2024
      self.cfg.AddInstance(inst)
2025

    
2026
    _StartInstanceDisks(self.cfg, inst, None)
2027
    try:
2028
      feedback_fn("Running the instance OS create scripts...")
2029
      if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2030
        raise errors.OpExecError("Could not install OS for instance %s "
2031
                                 "on node %s" %
2032
                                 (inst.name, inst.primary_node))
2033
    finally:
2034
      _ShutdownInstanceDisks(inst, self.cfg)
2035

    
2036

    
2037
class LURemoveInstance(LogicalUnit):
2038
  """Remove an instance.
2039

2040
  """
2041
  HPATH = "instance-remove"
2042
  HTYPE = constants.HTYPE_INSTANCE
2043
  _OP_REQP = ["instance_name"]
2044

    
2045
  def BuildHooksEnv(self):
2046
    """Build hooks env.
2047

2048
    This runs on master, primary and secondary nodes of the instance.
2049

2050
    """
2051
    env = _BuildInstanceHookEnvByObject(self.instance)
2052
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2053
          list(self.instance.secondary_nodes))
2054
    return env, nl, nl
2055

    
2056
  def CheckPrereq(self):
2057
    """Check prerequisites.
2058

2059
    This checks that the instance is in the cluster.
2060

2061
    """
2062
    instance = self.cfg.GetInstanceInfo(
2063
      self.cfg.ExpandInstanceName(self.op.instance_name))
2064
    if instance is None:
2065
      raise errors.OpPrereqError("Instance '%s' not known" %
2066
                                 self.op.instance_name)
2067
    self.instance = instance
2068

    
2069
  def Exec(self, feedback_fn):
2070
    """Remove the instance.
2071

2072
    """
2073
    instance = self.instance
2074
    logger.Info("shutting down instance %s on node %s" %
2075
                (instance.name, instance.primary_node))
2076

    
2077
    if not rpc.call_instance_shutdown(instance.primary_node, instance):
2078
      raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2079
                               (instance.name, instance.primary_node))
2080

    
2081
    logger.Info("removing block devices for instance %s" % instance.name)
2082

    
2083
    _RemoveDisks(instance, self.cfg)
2084

    
2085
    logger.Info("removing instance %s out of cluster config" % instance.name)
2086

    
2087
    self.cfg.RemoveInstance(instance.name)
2088

    
2089

    
2090
class LUQueryInstances(NoHooksLU):
2091
  """Logical unit for querying instances.
2092

2093
  """
2094
  _OP_REQP = ["output_fields"]
2095

    
2096
  def CheckPrereq(self):
2097
    """Check prerequisites.
2098

2099
    This checks that the fields required are valid output fields.
2100

2101
    """
2102
    self.dynamic_fields = frozenset(["oper_state", "oper_ram"])
2103
    _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2104
                               "admin_state", "admin_ram",
2105
                               "disk_template", "ip", "mac", "bridge",
2106
                               "sda_size", "sdb_size"],
2107
                       dynamic=self.dynamic_fields,
2108
                       selected=self.op.output_fields)
2109

    
2110
  def Exec(self, feedback_fn):
2111
    """Computes the list of nodes and their attributes.
2112

2113
    """
2114
    instance_names = utils.NiceSort(self.cfg.GetInstanceList())
2115
    instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2116
                     in instance_names]
2117

    
2118
    # begin data gathering
2119

    
2120
    nodes = frozenset([inst.primary_node for inst in instance_list])
2121

    
2122
    bad_nodes = []
2123
    if self.dynamic_fields.intersection(self.op.output_fields):
2124
      live_data = {}
2125
      node_data = rpc.call_all_instances_info(nodes)
2126
      for name in nodes:
2127
        result = node_data[name]
2128
        if result:
2129
          live_data.update(result)
2130
        elif result == False:
2131
          bad_nodes.append(name)
2132
        # else no instance is alive
2133
    else:
2134
      live_data = dict([(name, {}) for name in instance_names])
2135

    
2136
    # end data gathering
2137

    
2138
    output = []
2139
    for instance in instance_list:
2140
      iout = []
2141
      for field in self.op.output_fields:
2142
        if field == "name":
2143
          val = instance.name
2144
        elif field == "os":
2145
          val = instance.os
2146
        elif field == "pnode":
2147
          val = instance.primary_node
2148
        elif field == "snodes":
2149
          val = list(instance.secondary_nodes)
2150
        elif field == "admin_state":
2151
          val = (instance.status != "down")
2152
        elif field == "oper_state":
2153
          if instance.primary_node in bad_nodes:
2154
            val = None
2155
          else:
2156
            val = bool(live_data.get(instance.name))
2157
        elif field == "admin_ram":
2158
          val = instance.memory
2159
        elif field == "oper_ram":
2160
          if instance.primary_node in bad_nodes:
2161
            val = None
2162
          elif instance.name in live_data:
2163
            val = live_data[instance.name].get("memory", "?")
2164
          else:
2165
            val = "-"
2166
        elif field == "disk_template":
2167
          val = instance.disk_template
2168
        elif field == "ip":
2169
          val = instance.nics[0].ip
2170
        elif field == "bridge":
2171
          val = instance.nics[0].bridge
2172
        elif field == "mac":
2173
          val = instance.nics[0].mac
2174
        elif field == "sda_size" or field == "sdb_size":
2175
          disk = instance.FindDisk(field[:3])
2176
          if disk is None:
2177
            val = None
2178
          else:
2179
            val = disk.size
2180
        else:
2181
          raise errors.ParameterError(field)
2182
        iout.append(val)
2183
      output.append(iout)
2184

    
2185
    return output
2186

    
2187

    
2188
class LUFailoverInstance(LogicalUnit):
2189
  """Failover an instance.
2190

2191
  """
2192
  HPATH = "instance-failover"
2193
  HTYPE = constants.HTYPE_INSTANCE
2194
  _OP_REQP = ["instance_name", "ignore_consistency"]
2195

    
2196
  def BuildHooksEnv(self):
2197
    """Build hooks env.
2198

2199
    This runs on master, primary and secondary nodes of the instance.
2200

2201
    """
2202
    env = {
2203
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2204
      }
2205
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2206
    nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2207
    return env, nl, nl
2208

    
2209
  def CheckPrereq(self):
2210
    """Check prerequisites.
2211

2212
    This checks that the instance is in the cluster.
2213

2214
    """
2215
    instance = self.cfg.GetInstanceInfo(
2216
      self.cfg.ExpandInstanceName(self.op.instance_name))
2217
    if instance is None:
2218
      raise errors.OpPrereqError("Instance '%s' not known" %
2219
                                 self.op.instance_name)
2220

    
2221
    if instance.disk_template != constants.DT_REMOTE_RAID1:
2222
      raise errors.OpPrereqError("Instance's disk layout is not"
2223
                                 " remote_raid1.")
2224

    
2225
    secondary_nodes = instance.secondary_nodes
2226
    if not secondary_nodes:
2227
      raise errors.ProgrammerError("no secondary node but using "
2228
                                   "DT_REMOTE_RAID1 template")
2229

    
2230
    # check memory requirements on the secondary node
2231
    target_node = secondary_nodes[0]
2232
    nodeinfo = rpc.call_node_info([target_node], self.cfg.GetVGName())
2233
    info = nodeinfo.get(target_node, None)
2234
    if not info:
2235
      raise errors.OpPrereqError("Cannot get current information"
2236
                                 " from node '%s'" % nodeinfo)
2237
    if instance.memory > info['memory_free']:
2238
      raise errors.OpPrereqError("Not enough memory on target node %s."
2239
                                 " %d MB available, %d MB required" %
2240
                                 (target_node, info['memory_free'],
2241
                                  instance.memory))
2242

    
2243
    # check bridge existance
2244
    brlist = [nic.bridge for nic in instance.nics]
2245
    if not rpc.call_bridges_exist(instance.primary_node, brlist):
2246
      raise errors.OpPrereqError("One or more target bridges %s does not"
2247
                                 " exist on destination node '%s'" %
2248
                                 (brlist, instance.primary_node))
2249

    
2250
    self.instance = instance
2251

    
2252
  def Exec(self, feedback_fn):
2253
    """Failover an instance.
2254

2255
    The failover is done by shutting it down on its present node and
2256
    starting it on the secondary.
2257

2258
    """
2259
    instance = self.instance
2260

    
2261
    source_node = instance.primary_node
2262
    target_node = instance.secondary_nodes[0]
2263

    
2264
    feedback_fn("* checking disk consistency between source and target")
2265
    for dev in instance.disks:
2266
      # for remote_raid1, these are md over drbd
2267
      if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2268
        if not self.op.ignore_consistency:
2269
          raise errors.OpExecError("Disk %s is degraded on target node,"
2270
                                   " aborting failover." % dev.iv_name)
2271

    
2272
    feedback_fn("* checking target node resource availability")
2273
    nodeinfo = rpc.call_node_info([target_node], self.cfg.GetVGName())
2274

    
2275
    if not nodeinfo:
2276
      raise errors.OpExecError("Could not contact target node %s." %
2277
                               target_node)
2278

    
2279
    free_memory = int(nodeinfo[target_node]['memory_free'])
2280
    memory = instance.memory
2281
    if memory > free_memory:
2282
      raise errors.OpExecError("Not enough memory to create instance %s on"
2283
                               " node %s. needed %s MiB, available %s MiB" %
2284
                               (instance.name, target_node, memory,
2285
                                free_memory))
2286

    
2287
    feedback_fn("* shutting down instance on source node")
2288
    logger.Info("Shutting down instance %s on node %s" %
2289
                (instance.name, source_node))
2290

    
2291
    if not rpc.call_instance_shutdown(source_node, instance):
2292
      logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2293
                   " anyway. Please make sure node %s is down"  %
2294
                   (instance.name, source_node, source_node))
2295

    
2296
    feedback_fn("* deactivating the instance's disks on source node")
2297
    if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2298
      raise errors.OpExecError("Can't shut down the instance's disks.")
2299

    
2300
    instance.primary_node = target_node
2301
    # distribute new instance config to the other nodes
2302
    self.cfg.AddInstance(instance)
2303

    
2304
    feedback_fn("* activating the instance's disks on target node")
2305
    logger.Info("Starting instance %s on node %s" %
2306
                (instance.name, target_node))
2307

    
2308
    disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2309
                                             ignore_secondaries=True)
2310
    if not disks_ok:
2311
      _ShutdownInstanceDisks(instance, self.cfg)
2312
      raise errors.OpExecError("Can't activate the instance's disks")
2313

    
2314
    feedback_fn("* starting the instance on the target node")
2315
    if not rpc.call_instance_start(target_node, instance, None):
2316
      _ShutdownInstanceDisks(instance, self.cfg)
2317
      raise errors.OpExecError("Could not start instance %s on node %s." %
2318
                               (instance.name, target_node))
2319

    
2320

    
2321
def _CreateBlockDevOnPrimary(cfg, node, device, info):
2322
  """Create a tree of block devices on the primary node.
2323

2324
  This always creates all devices.
2325

2326
  """
2327
  if device.children:
2328
    for child in device.children:
2329
      if not _CreateBlockDevOnPrimary(cfg, node, child, info):
2330
        return False
2331

    
2332
  cfg.SetDiskID(device, node)
2333
  new_id = rpc.call_blockdev_create(node, device, device.size, True, info)
2334
  if not new_id:
2335
    return False
2336
  if device.physical_id is None:
2337
    device.physical_id = new_id
2338
  return True
2339

    
2340

    
2341
def _CreateBlockDevOnSecondary(cfg, node, device, force, info):
2342
  """Create a tree of block devices on a secondary node.
2343

2344
  If this device type has to be created on secondaries, create it and
2345
  all its children.
2346

2347
  If not, just recurse to children keeping the same 'force' value.
2348

2349
  """
2350
  if device.CreateOnSecondary():
2351
    force = True
2352
  if device.children:
2353
    for child in device.children:
2354
      if not _CreateBlockDevOnSecondary(cfg, node, child, force, info):
2355
        return False
2356

    
2357
  if not force:
2358
    return True
2359
  cfg.SetDiskID(device, node)
2360
  new_id = rpc.call_blockdev_create(node, device, device.size, False, info)
2361
  if not new_id:
2362
    return False
2363
  if device.physical_id is None:
2364
    device.physical_id = new_id
2365
  return True
2366

    
2367

    
2368
def _GenerateUniqueNames(cfg, exts):
2369
  """Generate a suitable LV name.
2370

2371
  This will generate a logical volume name for the given instance.
2372

2373
  """
2374
  results = []
2375
  for val in exts:
2376
    new_id = cfg.GenerateUniqueID()
2377
    results.append("%s%s" % (new_id, val))
2378
  return results
2379

    
2380

    
2381
def _GenerateMDDRBDBranch(cfg, primary, secondary, size, names):
2382
  """Generate a drbd device complete with its children.
2383

2384
  """
2385
  port = cfg.AllocatePort()
2386
  vgname = cfg.GetVGName()
2387
  dev_data = objects.Disk(dev_type="lvm", size=size,
2388
                          logical_id=(vgname, names[0]))
2389
  dev_meta = objects.Disk(dev_type="lvm", size=128,
2390
                          logical_id=(vgname, names[1]))
2391
  drbd_dev = objects.Disk(dev_type="drbd", size=size,
2392
                          logical_id = (primary, secondary, port),
2393
                          children = [dev_data, dev_meta])
2394
  return drbd_dev
2395

    
2396

    
2397
def _GenerateDiskTemplate(cfg, template_name,
2398
                          instance_name, primary_node,
2399
                          secondary_nodes, disk_sz, swap_sz):
2400
  """Generate the entire disk layout for a given template type.
2401

2402
  """
2403
  #TODO: compute space requirements
2404

    
2405
  vgname = cfg.GetVGName()
2406
  if template_name == "diskless":
2407
    disks = []
2408
  elif template_name == "plain":
2409
    if len(secondary_nodes) != 0:
2410
      raise errors.ProgrammerError("Wrong template configuration")
2411

    
2412
    names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
2413
    sda_dev = objects.Disk(dev_type="lvm", size=disk_sz,
2414
                           logical_id=(vgname, names[0]),
2415
                           iv_name = "sda")
2416
    sdb_dev = objects.Disk(dev_type="lvm", size=swap_sz,
2417
                           logical_id=(vgname, names[1]),
2418
                           iv_name = "sdb")
2419
    disks = [sda_dev, sdb_dev]
2420
  elif template_name == "local_raid1":
2421
    if len(secondary_nodes) != 0:
2422
      raise errors.ProgrammerError("Wrong template configuration")
2423

    
2424

    
2425
    names = _GenerateUniqueNames(cfg, [".sda_m1", ".sda_m2",
2426
                                       ".sdb_m1", ".sdb_m2"])
2427
    sda_dev_m1 = objects.Disk(dev_type="lvm", size=disk_sz,
2428
                              logical_id=(vgname, names[0]))
2429
    sda_dev_m2 = objects.Disk(dev_type="lvm", size=disk_sz,
2430
                              logical_id=(vgname, names[1]))
2431
    md_sda_dev = objects.Disk(dev_type="md_raid1", iv_name = "sda",
2432
                              size=disk_sz,
2433
                              children = [sda_dev_m1, sda_dev_m2])
2434
    sdb_dev_m1 = objects.Disk(dev_type="lvm", size=swap_sz,
2435
                              logical_id=(vgname, names[2]))
2436
    sdb_dev_m2 = objects.Disk(dev_type="lvm", size=swap_sz,
2437
                              logical_id=(vgname, names[3]))
2438
    md_sdb_dev = objects.Disk(dev_type="md_raid1", iv_name = "sdb",
2439
                              size=swap_sz,
2440
                              children = [sdb_dev_m1, sdb_dev_m2])
2441
    disks = [md_sda_dev, md_sdb_dev]
2442
  elif template_name == constants.DT_REMOTE_RAID1:
2443
    if len(secondary_nodes) != 1:
2444
      raise errors.ProgrammerError("Wrong template configuration")
2445
    remote_node = secondary_nodes[0]
2446
    names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2447
                                       ".sdb_data", ".sdb_meta"])
2448
    drbd_sda_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2449
                                         disk_sz, names[0:2])
2450
    md_sda_dev = objects.Disk(dev_type="md_raid1", iv_name="sda",
2451
                              children = [drbd_sda_dev], size=disk_sz)
2452
    drbd_sdb_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2453
                                         swap_sz, names[2:4])
2454
    md_sdb_dev = objects.Disk(dev_type="md_raid1", iv_name="sdb",
2455
                              children = [drbd_sdb_dev], size=swap_sz)
2456
    disks = [md_sda_dev, md_sdb_dev]
2457
  else:
2458
    raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
2459
  return disks
2460

    
2461

    
2462
def _GetInstanceInfoText(instance):
2463
  """Compute that text that should be added to the disk's metadata.
2464

2465
  """
2466
  return "originstname+%s" % instance.name
2467

    
2468

    
2469
def _CreateDisks(cfg, instance):
2470
  """Create all disks for an instance.
2471

2472
  This abstracts away some work from AddInstance.
2473

2474
  Args:
2475
    instance: the instance object
2476

2477
  Returns:
2478
    True or False showing the success of the creation process
2479

2480
  """
2481
  info = _GetInstanceInfoText(instance)
2482

    
2483
  for device in instance.disks:
2484
    logger.Info("creating volume %s for instance %s" %
2485
              (device.iv_name, instance.name))
2486
    #HARDCODE
2487
    for secondary_node in instance.secondary_nodes:
2488
      if not _CreateBlockDevOnSecondary(cfg, secondary_node, device, False,
2489
                                        info):
2490
        logger.Error("failed to create volume %s (%s) on secondary node %s!" %
2491
                     (device.iv_name, device, secondary_node))
2492
        return False
2493
    #HARDCODE
2494
    if not _CreateBlockDevOnPrimary(cfg, instance.primary_node, device, info):
2495
      logger.Error("failed to create volume %s on primary!" %
2496
                   device.iv_name)
2497
      return False
2498
  return True
2499

    
2500

    
2501
def _RemoveDisks(instance, cfg):
2502
  """Remove all disks for an instance.
2503

2504
  This abstracts away some work from `AddInstance()` and
2505
  `RemoveInstance()`. Note that in case some of the devices couldn't
2506
  be remove, the removal will continue with the other ones (compare
2507
  with `_CreateDisks()`).
2508

2509
  Args:
2510
    instance: the instance object
2511

2512
  Returns:
2513
    True or False showing the success of the removal proces
2514

2515
  """
2516
  logger.Info("removing block devices for instance %s" % instance.name)
2517

    
2518
  result = True
2519
  for device in instance.disks:
2520
    for node, disk in device.ComputeNodeTree(instance.primary_node):
2521
      cfg.SetDiskID(disk, node)
2522
      if not rpc.call_blockdev_remove(node, disk):
2523
        logger.Error("could not remove block device %s on node %s,"
2524
                     " continuing anyway" %
2525
                     (device.iv_name, node))
2526
        result = False
2527
  return result
2528

    
2529

    
2530
class LUCreateInstance(LogicalUnit):
2531
  """Create an instance.
2532

2533
  """
2534
  HPATH = "instance-add"
2535
  HTYPE = constants.HTYPE_INSTANCE
2536
  _OP_REQP = ["instance_name", "mem_size", "disk_size", "pnode",
2537
              "disk_template", "swap_size", "mode", "start", "vcpus",
2538
              "wait_for_sync"]
2539

    
2540
  def BuildHooksEnv(self):
2541
    """Build hooks env.
2542

2543
    This runs on master, primary and secondary nodes of the instance.
2544

2545
    """
2546
    env = {
2547
      "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
2548
      "INSTANCE_DISK_SIZE": self.op.disk_size,
2549
      "INSTANCE_SWAP_SIZE": self.op.swap_size,
2550
      "INSTANCE_ADD_MODE": self.op.mode,
2551
      }
2552
    if self.op.mode == constants.INSTANCE_IMPORT:
2553
      env["INSTANCE_SRC_NODE"] = self.op.src_node
2554
      env["INSTANCE_SRC_PATH"] = self.op.src_path
2555
      env["INSTANCE_SRC_IMAGE"] = self.src_image
2556

    
2557
    env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
2558
      primary_node=self.op.pnode,
2559
      secondary_nodes=self.secondaries,
2560
      status=self.instance_status,
2561
      os_type=self.op.os_type,
2562
      memory=self.op.mem_size,
2563
      vcpus=self.op.vcpus,
2564
      nics=[(self.inst_ip, self.op.bridge)],
2565
    ))
2566

    
2567
    nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
2568
          self.secondaries)
2569
    return env, nl, nl
2570

    
2571

    
2572
  def CheckPrereq(self):
2573
    """Check prerequisites.
2574

2575
    """
2576
    if self.op.mode not in (constants.INSTANCE_CREATE,
2577
                            constants.INSTANCE_IMPORT):
2578
      raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
2579
                                 self.op.mode)
2580

    
2581
    if self.op.mode == constants.INSTANCE_IMPORT:
2582
      src_node = getattr(self.op, "src_node", None)
2583
      src_path = getattr(self.op, "src_path", None)
2584
      if src_node is None or src_path is None:
2585
        raise errors.OpPrereqError("Importing an instance requires source"
2586
                                   " node and path options")
2587
      src_node_full = self.cfg.ExpandNodeName(src_node)
2588
      if src_node_full is None:
2589
        raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
2590
      self.op.src_node = src_node = src_node_full
2591

    
2592
      if not os.path.isabs(src_path):
2593
        raise errors.OpPrereqError("The source path must be absolute")
2594

    
2595
      export_info = rpc.call_export_info(src_node, src_path)
2596

    
2597
      if not export_info:
2598
        raise errors.OpPrereqError("No export found in dir %s" % src_path)
2599

    
2600
      if not export_info.has_section(constants.INISECT_EXP):
2601
        raise errors.ProgrammerError("Corrupted export config")
2602

    
2603
      ei_version = export_info.get(constants.INISECT_EXP, 'version')
2604
      if (int(ei_version) != constants.EXPORT_VERSION):
2605
        raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
2606
                                   (ei_version, constants.EXPORT_VERSION))
2607

    
2608
      if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
2609
        raise errors.OpPrereqError("Can't import instance with more than"
2610
                                   " one data disk")
2611

    
2612
      # FIXME: are the old os-es, disk sizes, etc. useful?
2613
      self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
2614
      diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
2615
                                                         'disk0_dump'))
2616
      self.src_image = diskimage
2617
    else: # INSTANCE_CREATE
2618
      if getattr(self.op, "os_type", None) is None:
2619
        raise errors.OpPrereqError("No guest OS specified")
2620

    
2621
    # check primary node
2622
    pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
2623
    if pnode is None:
2624
      raise errors.OpPrereqError("Primary node '%s' is unknown" %
2625
                                 self.op.pnode)
2626
    self.op.pnode = pnode.name
2627
    self.pnode = pnode
2628
    self.secondaries = []
2629
    # disk template and mirror node verification
2630
    if self.op.disk_template not in constants.DISK_TEMPLATES:
2631
      raise errors.OpPrereqError("Invalid disk template name")
2632

    
2633
    if self.op.disk_template == constants.DT_REMOTE_RAID1:
2634
      if getattr(self.op, "snode", None) is None:
2635
        raise errors.OpPrereqError("The 'remote_raid1' disk template needs"
2636
                                   " a mirror node")
2637

    
2638
      snode_name = self.cfg.ExpandNodeName(self.op.snode)
2639
      if snode_name is None:
2640
        raise errors.OpPrereqError("Unknown secondary node '%s'" %
2641
                                   self.op.snode)
2642
      elif snode_name == pnode.name:
2643
        raise errors.OpPrereqError("The secondary node cannot be"
2644
                                   " the primary node.")
2645
      self.secondaries.append(snode_name)
2646

    
2647
    # Check lv size requirements
2648
    nodenames = [pnode.name] + self.secondaries
2649
    nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
2650

    
2651
    # Required free disk space as a function of disk and swap space
2652
    req_size_dict = {
2653
      constants.DT_DISKLESS: 0,
2654
      constants.DT_PLAIN: self.op.disk_size + self.op.swap_size,
2655
      constants.DT_LOCAL_RAID1: (self.op.disk_size + self.op.swap_size) * 2,
2656
      # 256 MB are added for drbd metadata, 128MB for each drbd device
2657
      constants.DT_REMOTE_RAID1: self.op.disk_size + self.op.swap_size + 256,
2658
    }
2659

    
2660
    if self.op.disk_template not in req_size_dict:
2661
      raise errors.ProgrammerError("Disk template '%s' size requirement"
2662
                                   " is unknown" %  self.op.disk_template)
2663

    
2664
    req_size = req_size_dict[self.op.disk_template]
2665

    
2666
    for node in nodenames:
2667
      info = nodeinfo.get(node, None)
2668
      if not info:
2669
        raise errors.OpPrereqError("Cannot get current information"
2670
                                   " from node '%s'" % nodeinfo)
2671
      if req_size > info['vg_free']:
2672
        raise errors.OpPrereqError("Not enough disk space on target node %s."
2673
                                   " %d MB available, %d MB required" %
2674
                                   (node, info['vg_free'], req_size))
2675

    
2676
    # os verification
2677
    os_obj = rpc.call_os_get([pnode.name], self.op.os_type)[pnode.name]
2678
    if not isinstance(os_obj, objects.OS):
2679
      raise errors.OpPrereqError("OS '%s' not in supported os list for"
2680
                                 " primary node"  % self.op.os_type)
2681

    
2682
    # instance verification
2683
    hostname1 = utils.LookupHostname(self.op.instance_name)
2684
    if not hostname1:
2685
      raise errors.OpPrereqError("Instance name '%s' not found in dns" %
2686
                                 self.op.instance_name)
2687

    
2688
    self.op.instance_name = instance_name = hostname1['hostname']
2689
    instance_list = self.cfg.GetInstanceList()
2690
    if instance_name in instance_list:
2691
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2692
                                 instance_name)
2693

    
2694
    ip = getattr(self.op, "ip", None)
2695
    if ip is None or ip.lower() == "none":
2696
      inst_ip = None
2697
    elif ip.lower() == "auto":
2698
      inst_ip = hostname1['ip']
2699
    else:
2700
      if not utils.IsValidIP(ip):
2701
        raise errors.OpPrereqError("given IP address '%s' doesn't look"
2702
                                   " like a valid IP" % ip)
2703
      inst_ip = ip
2704
    self.inst_ip = inst_ip
2705

    
2706
    command = ["fping", "-q", hostname1['ip']]
2707
    result = utils.RunCmd(command)
2708
    if not result.failed:
2709
      raise errors.OpPrereqError("IP %s of instance %s already in use" %
2710
                                 (hostname1['ip'], instance_name))
2711

    
2712
    # bridge verification
2713
    bridge = getattr(self.op, "bridge", None)
2714
    if bridge is None:
2715
      self.op.bridge = self.cfg.GetDefBridge()
2716
    else:
2717
      self.op.bridge = bridge
2718

    
2719
    if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
2720
      raise errors.OpPrereqError("target bridge '%s' does not exist on"
2721
                                 " destination node '%s'" %
2722
                                 (self.op.bridge, pnode.name))
2723

    
2724
    if self.op.start:
2725
      self.instance_status = 'up'
2726
    else:
2727
      self.instance_status = 'down'
2728

    
2729
  def Exec(self, feedback_fn):
2730
    """Create and add the instance to the cluster.
2731

2732
    """
2733
    instance = self.op.instance_name
2734
    pnode_name = self.pnode.name
2735

    
2736
    nic = objects.NIC(bridge=self.op.bridge, mac=self.cfg.GenerateMAC())
2737
    if self.inst_ip is not None:
2738
      nic.ip = self.inst_ip
2739

    
2740
    disks = _GenerateDiskTemplate(self.cfg,
2741
                                  self.op.disk_template,
2742
                                  instance, pnode_name,
2743
                                  self.secondaries, self.op.disk_size,
2744
                                  self.op.swap_size)
2745

    
2746
    iobj = objects.Instance(name=instance, os=self.op.os_type,
2747
                            primary_node=pnode_name,
2748
                            memory=self.op.mem_size,
2749
                            vcpus=self.op.vcpus,
2750
                            nics=[nic], disks=disks,
2751
                            disk_template=self.op.disk_template,
2752
                            status=self.instance_status,
2753
                            )
2754

    
2755
    feedback_fn("* creating instance disks...")
2756
    if not _CreateDisks(self.cfg, iobj):
2757
      _RemoveDisks(iobj, self.cfg)
2758
      raise errors.OpExecError("Device creation failed, reverting...")
2759

    
2760
    feedback_fn("adding instance %s to cluster config" % instance)
2761

    
2762
    self.cfg.AddInstance(iobj)
2763

    
2764
    if self.op.wait_for_sync:
2765
      disk_abort = not _WaitForSync(self.cfg, iobj)
2766
    elif iobj.disk_template == constants.DT_REMOTE_RAID1:
2767
      # make sure the disks are not degraded (still sync-ing is ok)
2768
      time.sleep(15)
2769
      feedback_fn("* checking mirrors status")
2770
      disk_abort = not _WaitForSync(self.cfg, iobj, oneshot=True)
2771
    else:
2772
      disk_abort = False
2773

    
2774
    if disk_abort:
2775
      _RemoveDisks(iobj, self.cfg)
2776
      self.cfg.RemoveInstance(iobj.name)
2777
      raise errors.OpExecError("There are some degraded disks for"
2778
                               " this instance")
2779

    
2780
    feedback_fn("creating os for instance %s on node %s" %
2781
                (instance, pnode_name))
2782

    
2783
    if iobj.disk_template != constants.DT_DISKLESS:
2784
      if self.op.mode == constants.INSTANCE_CREATE:
2785
        feedback_fn("* running the instance OS create scripts...")
2786
        if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
2787
          raise errors.OpExecError("could not add os for instance %s"
2788
                                   " on node %s" %
2789
                                   (instance, pnode_name))
2790

    
2791
      elif self.op.mode == constants.INSTANCE_IMPORT:
2792
        feedback_fn("* running the instance OS import scripts...")
2793
        src_node = self.op.src_node
2794
        src_image = self.src_image
2795
        if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
2796
                                                src_node, src_image):
2797
          raise errors.OpExecError("Could not import os for instance"
2798
                                   " %s on node %s" %
2799
                                   (instance, pnode_name))
2800
      else:
2801
        # also checked in the prereq part
2802
        raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
2803
                                     % self.op.mode)
2804

    
2805
    if self.op.start:
2806
      logger.Info("starting instance %s on node %s" % (instance, pnode_name))
2807
      feedback_fn("* starting instance...")
2808
      if not rpc.call_instance_start(pnode_name, iobj, None):
2809
        raise errors.OpExecError("Could not start instance")
2810

    
2811

    
2812
class LUConnectConsole(NoHooksLU):
2813
  """Connect to an instance's console.
2814

2815
  This is somewhat special in that it returns the command line that
2816
  you need to run on the master node in order to connect to the
2817
  console.
2818

2819
  """
2820
  _OP_REQP = ["instance_name"]
2821

    
2822
  def CheckPrereq(self):
2823
    """Check prerequisites.
2824

2825
    This checks that the instance is in the cluster.
2826

2827
    """
2828
    instance = self.cfg.GetInstanceInfo(
2829
      self.cfg.ExpandInstanceName(self.op.instance_name))
2830
    if instance is None:
2831
      raise errors.OpPrereqError("Instance '%s' not known" %
2832
                                 self.op.instance_name)
2833
    self.instance = instance
2834

    
2835
  def Exec(self, feedback_fn):
2836
    """Connect to the console of an instance
2837

2838
    """
2839
    instance = self.instance
2840
    node = instance.primary_node
2841

    
2842
    node_insts = rpc.call_instance_list([node])[node]
2843
    if node_insts is False:
2844
      raise errors.OpExecError("Can't connect to node %s." % node)
2845

    
2846
    if instance.name not in node_insts:
2847
      raise errors.OpExecError("Instance %s is not running." % instance.name)
2848

    
2849
    logger.Debug("connecting to console of %s on %s" % (instance.name, node))
2850

    
2851
    hyper = hypervisor.GetHypervisor()
2852
    console_cmd = hyper.GetShellCommandForConsole(instance.name)
2853
    # build ssh cmdline
2854
    argv = ["ssh", "-q", "-t"]
2855
    argv.extend(ssh.KNOWN_HOSTS_OPTS)
2856
    argv.extend(ssh.BATCH_MODE_OPTS)
2857
    argv.append(node)
2858
    argv.append(console_cmd)
2859
    return "ssh", argv
2860

    
2861

    
2862
class LUAddMDDRBDComponent(LogicalUnit):
2863
  """Adda new mirror member to an instance's disk.
2864

2865
  """
2866
  HPATH = "mirror-add"
2867
  HTYPE = constants.HTYPE_INSTANCE
2868
  _OP_REQP = ["instance_name", "remote_node", "disk_name"]
2869

    
2870
  def BuildHooksEnv(self):
2871
    """Build hooks env.
2872

2873
    This runs on the master, the primary and all the secondaries.
2874

2875
    """
2876
    env = {
2877
      "NEW_SECONDARY": self.op.remote_node,
2878
      "DISK_NAME": self.op.disk_name,
2879
      }
2880
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2881
    nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
2882
          self.op.remote_node,] + list(self.instance.secondary_nodes)
2883
    return env, nl, nl
2884

    
2885
  def CheckPrereq(self):
2886
    """Check prerequisites.
2887

2888
    This checks that the instance is in the cluster.
2889

2890
    """
2891
    instance = self.cfg.GetInstanceInfo(
2892
      self.cfg.ExpandInstanceName(self.op.instance_name))
2893
    if instance is None:
2894
      raise errors.OpPrereqError("Instance '%s' not known" %
2895
                                 self.op.instance_name)
2896
    self.instance = instance
2897

    
2898
    remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
2899
    if remote_node is None:
2900
      raise errors.OpPrereqError("Node '%s' not known" % self.op.remote_node)
2901
    self.remote_node = remote_node
2902

    
2903
    if remote_node == instance.primary_node:
2904
      raise errors.OpPrereqError("The specified node is the primary node of"
2905
                                 " the instance.")
2906

    
2907
    if instance.disk_template != constants.DT_REMOTE_RAID1:
2908
      raise errors.OpPrereqError("Instance's disk layout is not"
2909
                                 " remote_raid1.")
2910
    for disk in instance.disks:
2911
      if disk.iv_name == self.op.disk_name:
2912
        break
2913
    else:
2914
      raise errors.OpPrereqError("Can't find this device ('%s') in the"
2915
                                 " instance." % self.op.disk_name)
2916
    if len(disk.children) > 1:
2917
      raise errors.OpPrereqError("The device already has two slave"
2918
                                 " devices.\n"
2919
                                 "This would create a 3-disk raid1"
2920
                                 " which we don't allow.")
2921
    self.disk = disk
2922

    
2923
  def Exec(self, feedback_fn):
2924
    """Add the mirror component
2925

2926
    """
2927
    disk = self.disk
2928
    instance = self.instance
2929

    
2930
    remote_node = self.remote_node
2931
    lv_names = [".%s_%s" % (disk.iv_name, suf) for suf in ["data", "meta"]]
2932
    names = _GenerateUniqueNames(self.cfg, lv_names)
2933
    new_drbd = _GenerateMDDRBDBranch(self.cfg, instance.primary_node,
2934
                                     remote_node, disk.size, names)
2935

    
2936
    logger.Info("adding new mirror component on secondary")
2937
    #HARDCODE
2938
    if not _CreateBlockDevOnSecondary(self.cfg, remote_node, new_drbd, False,
2939
                                      _GetInstanceInfoText(instance)):
2940
      raise errors.OpExecError("Failed to create new component on secondary"
2941
                               " node %s" % remote_node)
2942

    
2943
    logger.Info("adding new mirror component on primary")
2944
    #HARDCODE
2945
    if not _CreateBlockDevOnPrimary(self.cfg, instance.primary_node, new_drbd,
2946
                                    _GetInstanceInfoText(instance)):
2947
      # remove secondary dev
2948
      self.cfg.SetDiskID(new_drbd, remote_node)
2949
      rpc.call_blockdev_remove(remote_node, new_drbd)
2950
      raise errors.OpExecError("Failed to create volume on primary")
2951

    
2952
    # the device exists now
2953
    # call the primary node to add the mirror to md
2954
    logger.Info("adding new mirror component to md")
2955
    if not rpc.call_blockdev_addchild(instance.primary_node,
2956
                                           disk, new_drbd):
2957
      logger.Error("Can't add mirror compoment to md!")
2958
      self.cfg.SetDiskID(new_drbd, remote_node)
2959
      if not rpc.call_blockdev_remove(remote_node, new_drbd):
2960
        logger.Error("Can't rollback on secondary")
2961
      self.cfg.SetDiskID(new_drbd, instance.primary_node)
2962
      if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
2963
        logger.Error("Can't rollback on primary")
2964
      raise errors.OpExecError("Can't add mirror component to md array")
2965

    
2966
    disk.children.append(new_drbd)
2967

    
2968
    self.cfg.AddInstance(instance)
2969

    
2970
    _WaitForSync(self.cfg, instance)
2971

    
2972
    return 0
2973

    
2974

    
2975
class LURemoveMDDRBDComponent(LogicalUnit):
2976
  """Remove a component from a remote_raid1 disk.
2977

2978
  """
2979
  HPATH = "mirror-remove"
2980
  HTYPE = constants.HTYPE_INSTANCE
2981
  _OP_REQP = ["instance_name", "disk_name", "disk_id"]
2982

    
2983
  def BuildHooksEnv(self):
2984
    """Build hooks env.
2985

2986
    This runs on the master, the primary and all the secondaries.
2987

2988
    """
2989
    env = {
2990
      "DISK_NAME": self.op.disk_name,
2991
      "DISK_ID": self.op.disk_id,
2992
      "OLD_SECONDARY": self.old_secondary,
2993
      }
2994
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2995
    nl = [self.sstore.GetMasterNode(),
2996
          self.instance.primary_node] + list(self.instance.secondary_nodes)
2997
    return env, nl, nl
2998

    
2999
  def CheckPrereq(self):
3000
    """Check prerequisites.
3001

3002
    This checks that the instance is in the cluster.
3003

3004
    """
3005
    instance = self.cfg.GetInstanceInfo(
3006
      self.cfg.ExpandInstanceName(self.op.instance_name))
3007
    if instance is None:
3008
      raise errors.OpPrereqError("Instance '%s' not known" %
3009
                                 self.op.instance_name)
3010
    self.instance = instance
3011

    
3012
    if instance.disk_template != constants.DT_REMOTE_RAID1:
3013
      raise errors.OpPrereqError("Instance's disk layout is not"
3014
                                 " remote_raid1.")
3015
    for disk in instance.disks:
3016
      if disk.iv_name == self.op.disk_name:
3017
        break
3018
    else:
3019
      raise errors.OpPrereqError("Can't find this device ('%s') in the"
3020
                                 " instance." % self.op.disk_name)
3021
    for child in disk.children:
3022
      if child.dev_type == "drbd" and child.logical_id[2] == self.op.disk_id:
3023
        break
3024
    else:
3025
      raise errors.OpPrereqError("Can't find the device with this port.")
3026

    
3027
    if len(disk.children) < 2:
3028
      raise errors.OpPrereqError("Cannot remove the last component from"
3029
                                 " a mirror.")
3030
    self.disk = disk
3031
    self.child = child
3032
    if self.child.logical_id[0] == instance.primary_node:
3033
      oid = 1
3034
    else:
3035
      oid = 0
3036
    self.old_secondary = self.child.logical_id[oid]
3037

    
3038
  def Exec(self, feedback_fn):
3039
    """Remove the mirror component
3040

3041
    """
3042
    instance = self.instance
3043
    disk = self.disk
3044
    child = self.child
3045
    logger.Info("remove mirror component")
3046
    self.cfg.SetDiskID(disk, instance.primary_node)
3047
    if not rpc.call_blockdev_removechild(instance.primary_node,
3048
                                              disk, child):
3049
      raise errors.OpExecError("Can't remove child from mirror.")
3050

    
3051
    for node in child.logical_id[:2]:
3052
      self.cfg.SetDiskID(child, node)
3053
      if not rpc.call_blockdev_remove(node, child):
3054
        logger.Error("Warning: failed to remove device from node %s,"
3055
                     " continuing operation." % node)
3056

    
3057
    disk.children.remove(child)
3058
    self.cfg.AddInstance(instance)
3059

    
3060

    
3061
class LUReplaceDisks(LogicalUnit):
3062
  """Replace the disks of an instance.
3063

3064
  """
3065
  HPATH = "mirrors-replace"
3066
  HTYPE = constants.HTYPE_INSTANCE
3067
  _OP_REQP = ["instance_name"]
3068

    
3069
  def BuildHooksEnv(self):
3070
    """Build hooks env.
3071

3072
    This runs on the master, the primary and all the secondaries.
3073

3074
    """
3075
    env = {
3076
      "NEW_SECONDARY": self.op.remote_node,
3077
      "OLD_SECONDARY": self.instance.secondary_nodes[0],
3078
      }
3079
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3080
    nl = [self.sstore.GetMasterNode(),
3081
          self.instance.primary_node] + list(self.instance.secondary_nodes)
3082
    return env, nl, nl
3083

    
3084
  def CheckPrereq(self):
3085
    """Check prerequisites.
3086

3087
    This checks that the instance is in the cluster.
3088

3089
    """
3090
    instance = self.cfg.GetInstanceInfo(
3091
      self.cfg.ExpandInstanceName(self.op.instance_name))
3092
    if instance is None:
3093
      raise errors.OpPrereqError("Instance '%s' not known" %
3094
                                 self.op.instance_name)
3095
    self.instance = instance
3096

    
3097
    if instance.disk_template != constants.DT_REMOTE_RAID1:
3098
      raise errors.OpPrereqError("Instance's disk layout is not"
3099
                                 " remote_raid1.")
3100

    
3101
    if len(instance.secondary_nodes) != 1:
3102
      raise errors.OpPrereqError("The instance has a strange layout,"
3103
                                 " expected one secondary but found %d" %
3104
                                 len(instance.secondary_nodes))
3105

    
3106
    remote_node = getattr(self.op, "remote_node", None)
3107
    if remote_node is None:
3108
      remote_node = instance.secondary_nodes[0]
3109
    else:
3110
      remote_node = self.cfg.ExpandNodeName(remote_node)
3111
      if remote_node is None:
3112
        raise errors.OpPrereqError("Node '%s' not known" %
3113
                                   self.op.remote_node)
3114
    if remote_node == instance.primary_node:
3115
      raise errors.OpPrereqError("The specified node is the primary node of"
3116
                                 " the instance.")
3117
    self.op.remote_node = remote_node
3118

    
3119
  def Exec(self, feedback_fn):
3120
    """Replace the disks of an instance.
3121

3122
    """
3123
    instance = self.instance
3124
    iv_names = {}
3125
    # start of work
3126
    remote_node = self.op.remote_node
3127
    cfg = self.cfg
3128
    for dev in instance.disks:
3129
      size = dev.size
3130
      lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3131
      names = _GenerateUniqueNames(cfg, lv_names)
3132
      new_drbd = _GenerateMDDRBDBranch(cfg, instance.primary_node,
3133
                                       remote_node, size, names)
3134
      iv_names[dev.iv_name] = (dev, dev.children[0], new_drbd)
3135
      logger.Info("adding new mirror component on secondary for %s" %
3136
                  dev.iv_name)
3137
      #HARDCODE
3138
      if not _CreateBlockDevOnSecondary(cfg, remote_node, new_drbd, False,
3139
                                        _GetInstanceInfoText(instance)):
3140
        raise errors.OpExecError("Failed to create new component on"
3141
                                 " secondary node %s\n"
3142
                                 "Full abort, cleanup manually!" %
3143
                                 remote_node)
3144

    
3145
      logger.Info("adding new mirror component on primary")
3146
      #HARDCODE
3147
      if not _CreateBlockDevOnPrimary(cfg, instance.primary_node, new_drbd,
3148
                                      _GetInstanceInfoText(instance)):
3149
        # remove secondary dev
3150
        cfg.SetDiskID(new_drbd, remote_node)
3151
        rpc.call_blockdev_remove(remote_node, new_drbd)
3152
        raise errors.OpExecError("Failed to create volume on primary!\n"
3153
                                 "Full abort, cleanup manually!!")
3154

    
3155
      # the device exists now
3156
      # call the primary node to add the mirror to md
3157
      logger.Info("adding new mirror component to md")
3158
      if not rpc.call_blockdev_addchild(instance.primary_node, dev,
3159
                                        new_drbd):
3160
        logger.Error("Can't add mirror compoment to md!")
3161
        cfg.SetDiskID(new_drbd, remote_node)
3162
        if not rpc.call_blockdev_remove(remote_node, new_drbd):
3163
          logger.Error("Can't rollback on secondary")
3164
        cfg.SetDiskID(new_drbd, instance.primary_node)
3165
        if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3166
          logger.Error("Can't rollback on primary")
3167
        raise errors.OpExecError("Full abort, cleanup manually!!")
3168

    
3169
      dev.children.append(new_drbd)
3170
      cfg.AddInstance(instance)
3171

    
3172
    # this can fail as the old devices are degraded and _WaitForSync
3173
    # does a combined result over all disks, so we don't check its
3174
    # return value
3175
    _WaitForSync(cfg, instance, unlock=True)
3176

    
3177
    # so check manually all the devices
3178
    for name in iv_names:
3179
      dev, child, new_drbd = iv_names[name]
3180
      cfg.SetDiskID(dev, instance.primary_node)
3181
      is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3182
      if is_degr:
3183
        raise errors.OpExecError("MD device %s is degraded!" % name)
3184
      cfg.SetDiskID(new_drbd, instance.primary_node)
3185
      is_degr = rpc.call_blockdev_find(instance.primary_node, new_drbd)[5]
3186
      if is_degr:
3187
        raise errors.OpExecError("New drbd device %s is degraded!" % name)
3188

    
3189
    for name in iv_names:
3190
      dev, child, new_drbd = iv_names[name]
3191
      logger.Info("remove mirror %s component" % name)
3192
      cfg.SetDiskID(dev, instance.primary_node)
3193
      if not rpc.call_blockdev_removechild(instance.primary_node,
3194
                                                dev, child):
3195
        logger.Error("Can't remove child from mirror, aborting"
3196
                     " *this device cleanup*.\nYou need to cleanup manually!!")
3197
        continue
3198

    
3199
      for node in child.logical_id[:2]:
3200
        logger.Info("remove child device on %s" % node)
3201
        cfg.SetDiskID(child, node)
3202
        if not rpc.call_blockdev_remove(node, child):
3203
          logger.Error("Warning: failed to remove device from node %s,"
3204
                       " continuing operation." % node)
3205

    
3206
      dev.children.remove(child)
3207

    
3208
      cfg.AddInstance(instance)
3209

    
3210

    
3211
class LUQueryInstanceData(NoHooksLU):
3212
  """Query runtime instance data.
3213

3214
  """
3215
  _OP_REQP = ["instances"]
3216

    
3217
  def CheckPrereq(self):
3218
    """Check prerequisites.
3219

3220
    This only checks the optional instance list against the existing names.
3221

3222
    """
3223
    if not isinstance(self.op.instances, list):
3224
      raise errors.OpPrereqError("Invalid argument type 'instances'")
3225
    if self.op.instances:
3226
      self.wanted_instances = []
3227
      names = self.op.instances
3228
      for name in names:
3229
        instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
3230
        if instance is None:
3231
          raise errors.OpPrereqError("No such instance name '%s'" % name)
3232
      self.wanted_instances.append(instance)
3233
    else:
3234
      self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
3235
                               in self.cfg.GetInstanceList()]
3236
    return
3237

    
3238

    
3239
  def _ComputeDiskStatus(self, instance, snode, dev):
3240
    """Compute block device status.
3241

3242
    """
3243
    self.cfg.SetDiskID(dev, instance.primary_node)
3244
    dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
3245
    if dev.dev_type == "drbd":
3246
      # we change the snode then (otherwise we use the one passed in)
3247
      if dev.logical_id[0] == instance.primary_node:
3248
        snode = dev.logical_id[1]
3249
      else:
3250
        snode = dev.logical_id[0]
3251

    
3252
    if snode:
3253
      self.cfg.SetDiskID(dev, snode)
3254
      dev_sstatus = rpc.call_blockdev_find(snode, dev)
3255
    else:
3256
      dev_sstatus = None
3257

    
3258
    if dev.children:
3259
      dev_children = [self._ComputeDiskStatus(instance, snode, child)
3260
                      for child in dev.children]
3261
    else:
3262
      dev_children = []
3263

    
3264
    data = {
3265
      "iv_name": dev.iv_name,
3266
      "dev_type": dev.dev_type,
3267
      "logical_id": dev.logical_id,
3268
      "physical_id": dev.physical_id,
3269
      "pstatus": dev_pstatus,
3270
      "sstatus": dev_sstatus,
3271
      "children": dev_children,
3272
      }
3273

    
3274
    return data
3275

    
3276
  def Exec(self, feedback_fn):
3277
    """Gather and return data"""
3278
    result = {}
3279
    for instance in self.wanted_instances:
3280
      remote_info = rpc.call_instance_info(instance.primary_node,
3281
                                                instance.name)
3282
      if remote_info and "state" in remote_info:
3283
        remote_state = "up"
3284
      else:
3285
        remote_state = "down"
3286
      if instance.status == "down":
3287
        config_state = "down"
3288
      else:
3289
        config_state = "up"
3290

    
3291
      disks = [self._ComputeDiskStatus(instance, None, device)
3292
               for device in instance.disks]
3293

    
3294
      idict = {
3295
        "name": instance.name,
3296
        "config_state": config_state,
3297
        "run_state": remote_state,
3298
        "pnode": instance.primary_node,
3299
        "snodes": instance.secondary_nodes,
3300
        "os": instance.os,
3301
        "memory": instance.memory,
3302
        "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
3303
        "disks": disks,
3304
        }
3305

    
3306
      result[instance.name] = idict
3307

    
3308
    return result
3309

    
3310

    
3311
class LUSetInstanceParms(LogicalUnit):
3312
  """Modifies an instances's parameters.
3313

3314
  """
3315
  HPATH = "instance-modify"
3316
  HTYPE = constants.HTYPE_INSTANCE
3317
  _OP_REQP = ["instance_name"]
3318

    
3319
  def BuildHooksEnv(self):
3320
    """Build hooks env.
3321

3322
    This runs on the master, primary and secondaries.
3323

3324
    """
3325
    args = dict()
3326
    if self.mem:
3327
      args['memory'] = self.mem
3328
    if self.vcpus:
3329
      args['vcpus'] = self.vcpus
3330
    if self.do_ip or self.do_bridge:
3331
      if self.do_ip:
3332
        ip = self.ip
3333
      else:
3334
        ip = self.instance.nics[0].ip
3335
      if self.bridge:
3336
        bridge = self.bridge
3337
      else:
3338
        bridge = self.instance.nics[0].bridge
3339
      args['nics'] = [(ip, bridge)]
3340
    env = _BuildInstanceHookEnvByObject(self.instance, override=args)
3341
    nl = [self.sstore.GetMasterNode(),
3342
          self.instance.primary_node] + list(self.instance.secondary_nodes)
3343
    return env, nl, nl
3344

    
3345
  def CheckPrereq(self):
3346
    """Check prerequisites.
3347

3348
    This only checks the instance list against the existing names.
3349

3350
    """
3351
    self.mem = getattr(self.op, "mem", None)
3352
    self.vcpus = getattr(self.op, "vcpus", None)
3353
    self.ip = getattr(self.op, "ip", None)
3354
    self.bridge = getattr(self.op, "bridge", None)
3355
    if [self.mem, self.vcpus, self.ip, self.bridge].count(None) == 4:
3356
      raise errors.OpPrereqError("No changes submitted")
3357
    if self.mem is not None:
3358
      try:
3359
        self.mem = int(self.mem)
3360
      except ValueError, err:
3361
        raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
3362
    if self.vcpus is not None:
3363
      try:
3364
        self.vcpus = int(self.vcpus)
3365
      except ValueError, err:
3366
        raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
3367
    if self.ip is not None:
3368
      self.do_ip = True
3369
      if self.ip.lower() == "none":
3370
        self.ip = None
3371
      else:
3372
        if not utils.IsValidIP(self.ip):
3373
          raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
3374
    else:
3375
      self.do_ip = False
3376
    self.do_bridge = (self.bridge is not None)
3377

    
3378
    instance = self.cfg.GetInstanceInfo(
3379
      self.cfg.ExpandInstanceName(self.op.instance_name))
3380
    if instance is None:
3381
      raise errors.OpPrereqError("No such instance name '%s'" %
3382
                                 self.op.instance_name)
3383
    self.op.instance_name = instance.name
3384
    self.instance = instance
3385
    return
3386

    
3387
  def Exec(self, feedback_fn):
3388
    """Modifies an instance.
3389

3390
    All parameters take effect only at the next restart of the instance.
3391
    """
3392
    result = []
3393
    instance = self.instance
3394
    if self.mem:
3395
      instance.memory = self.mem
3396
      result.append(("mem", self.mem))
3397
    if self.vcpus:
3398
      instance.vcpus = self.vcpus
3399
      result.append(("vcpus",  self.vcpus))
3400
    if self.do_ip:
3401
      instance.nics[0].ip = self.ip
3402
      result.append(("ip", self.ip))
3403
    if self.bridge:
3404
      instance.nics[0].bridge = self.bridge
3405
      result.append(("bridge", self.bridge))
3406

    
3407
    self.cfg.AddInstance(instance)
3408

    
3409
    return result
3410

    
3411

    
3412
class LUQueryExports(NoHooksLU):
3413
  """Query the exports list
3414

3415
  """
3416
  _OP_REQP = []
3417

    
3418
  def CheckPrereq(self):
3419
    """Check that the nodelist contains only existing nodes.
3420

3421
    """
3422
    self.nodes = _GetWantedNodes(self, getattr(self.op, "nodes", None))
3423

    
3424
  def Exec(self, feedback_fn):
3425
    """Compute the list of all the exported system images.
3426

3427
    Returns:
3428
      a dictionary with the structure node->(export-list)
3429
      where export-list is a list of the instances exported on
3430
      that node.
3431

3432
    """
3433
    return rpc.call_export_list(self.nodes)
3434

    
3435

    
3436
class LUExportInstance(LogicalUnit):
3437
  """Export an instance to an image in the cluster.
3438

3439
  """
3440
  HPATH = "instance-export"
3441
  HTYPE = constants.HTYPE_INSTANCE
3442
  _OP_REQP = ["instance_name", "target_node", "shutdown"]
3443

    
3444
  def BuildHooksEnv(self):
3445
    """Build hooks env.
3446

3447
    This will run on the master, primary node and target node.
3448

3449
    """
3450
    env = {
3451
      "EXPORT_NODE": self.op.target_node,
3452
      "EXPORT_DO_SHUTDOWN": self.op.shutdown,
3453
      }
3454
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3455
    nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
3456
          self.op.target_node]
3457
    return env, nl, nl
3458

    
3459
  def CheckPrereq(self):
3460
    """Check prerequisites.
3461

3462
    This checks that the instance name is a valid one.
3463

3464
    """
3465
    instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
3466
    self.instance = self.cfg.GetInstanceInfo(instance_name)
3467
    if self.instance is None:
3468
      raise errors.OpPrereqError("Instance '%s' not found" %
3469
                                 self.op.instance_name)
3470

    
3471
    # node verification
3472
    dst_node_short = self.cfg.ExpandNodeName(self.op.target_node)
3473
    self.dst_node = self.cfg.GetNodeInfo(dst_node_short)
3474

    
3475
    if self.dst_node is None:
3476
      raise errors.OpPrereqError("Destination node '%s' is unknown." %
3477
                                 self.op.target_node)
3478
    self.op.target_node = self.dst_node.name
3479

    
3480
  def Exec(self, feedback_fn):
3481
    """Export an instance to an image in the cluster.
3482

3483
    """
3484
    instance = self.instance
3485
    dst_node = self.dst_node
3486
    src_node = instance.primary_node
3487
    # shutdown the instance, unless requested not to do so
3488
    if self.op.shutdown:
3489
      op = opcodes.OpShutdownInstance(instance_name=instance.name)
3490
      self.processor.ChainOpCode(op, feedback_fn)
3491

    
3492
    vgname = self.cfg.GetVGName()
3493

    
3494
    snap_disks = []
3495

    
3496
    try:
3497
      for disk in instance.disks:
3498
        if disk.iv_name == "sda":
3499
          # new_dev_name will be a snapshot of an lvm leaf of the one we passed
3500
          new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
3501

    
3502
          if not new_dev_name:
3503
            logger.Error("could not snapshot block device %s on node %s" %
3504
                         (disk.logical_id[1], src_node))
3505
          else:
3506
            new_dev = objects.Disk(dev_type="lvm", size=disk.size,
3507
                                      logical_id=(vgname, new_dev_name),
3508
                                      physical_id=(vgname, new_dev_name),
3509
                                      iv_name=disk.iv_name)
3510
            snap_disks.append(new_dev)
3511

    
3512
    finally:
3513
      if self.op.shutdown:
3514
        op = opcodes.OpStartupInstance(instance_name=instance.name,
3515
                                       force=False)
3516
        self.processor.ChainOpCode(op, feedback_fn)
3517

    
3518
    # TODO: check for size
3519

    
3520
    for dev in snap_disks:
3521
      if not rpc.call_snapshot_export(src_node, dev, dst_node.name,
3522
                                           instance):
3523
        logger.Error("could not export block device %s from node"
3524
                     " %s to node %s" %
3525
                     (dev.logical_id[1], src_node, dst_node.name))
3526
      if not rpc.call_blockdev_remove(src_node, dev):
3527
        logger.Error("could not remove snapshot block device %s from"
3528
                     " node %s" % (dev.logical_id[1], src_node))
3529

    
3530
    if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
3531
      logger.Error("could not finalize export for instance %s on node %s" %
3532
                   (instance.name, dst_node.name))
3533

    
3534
    nodelist = self.cfg.GetNodeList()
3535
    nodelist.remove(dst_node.name)
3536

    
3537
    # on one-node clusters nodelist will be empty after the removal
3538
    # if we proceed the backup would be removed because OpQueryExports
3539
    # substitutes an empty list with the full cluster node list.
3540
    if nodelist:
3541
      op = opcodes.OpQueryExports(nodes=nodelist)
3542
      exportlist = self.processor.ChainOpCode(op, feedback_fn)
3543
      for node in exportlist:
3544
        if instance.name in exportlist[node]:
3545
          if not rpc.call_export_remove(node, instance.name):
3546
            logger.Error("could not remove older export for instance %s"
3547
                         " on node %s" % (instance.name, node))
3548

    
3549

    
3550
class TagsLU(NoHooksLU):
3551
  """Generic tags LU.
3552

3553
  This is an abstract class which is the parent of all the other tags LUs.
3554

3555
  """
3556
  def CheckPrereq(self):
3557
    """Check prerequisites.
3558

3559
    """
3560
    if self.op.kind == constants.TAG_CLUSTER:
3561
      self.target = self.cfg.GetClusterInfo()
3562
    elif self.op.kind == constants.TAG_NODE:
3563
      name = self.cfg.ExpandNodeName(self.op.name)
3564
      if name is None:
3565
        raise errors.OpPrereqError("Invalid node name (%s)" %
3566
                                   (self.op.name,))
3567
      self.op.name = name
3568
      self.target = self.cfg.GetNodeInfo(name)
3569
    elif self.op.kind == constants.TAG_INSTANCE:
3570
      name = self.cfg.ExpandInstanceName(name)
3571
      if name is None:
3572
        raise errors.OpPrereqError("Invalid instance name (%s)" %
3573
                                   (self.op.name,))
3574
      self.op.name = name
3575
      self.target = self.cfg.GetInstanceInfo(name)
3576
    else:
3577
      raise errors.OpPrereqError("Wrong tag type requested (%s)" %
3578
                                 str(self.op.kind))
3579

    
3580

    
3581
class LUGetTags(TagsLU):
3582
  """Returns the tags of a given object.
3583

3584
  """
3585
  _OP_REQP = ["kind", "name"]
3586

    
3587
  def Exec(self, feedback_fn):
3588
    """Returns the tag list.
3589

3590
    """
3591
    return self.target.GetTags()
3592

    
3593

    
3594
class LUAddTag(TagsLU):
3595
  """Sets a tag on a given object.
3596

3597
  """
3598
  _OP_REQP = ["kind", "name", "tag"]
3599

    
3600
  def CheckPrereq(self):
3601
    """Check prerequisites.
3602

3603
    This checks the type and length of the tag name and value.
3604

3605
    """
3606
    TagsLU.CheckPrereq(self)
3607
    objects.TaggableObject.ValidateTag(self.op.tag)
3608

    
3609
  def Exec(self, feedback_fn):
3610
    """Sets the tag.
3611

3612
    """
3613
    try:
3614
      self.target.AddTag(self.op.tag)
3615
    except errors.TagError, err:
3616
      raise errors.OpExecError("Error while setting tag: %s" % str(err))
3617
    try:
3618
      self.cfg.Update(self.target)
3619
    except errors.ConfigurationError:
3620
      raise errors.OpRetryError("There has been a modification to the"
3621
                                " config file and the operation has been"
3622
                                " aborted. Please retry.")
3623

    
3624

    
3625
class LUDelTag(TagsLU):
3626
  """Delete a tag from a given object.
3627

3628
  """
3629
  _OP_REQP = ["kind", "name", "tag"]
3630

    
3631
  def CheckPrereq(self):
3632
    """Check prerequisites.
3633

3634
    This checks that we have the given tag.
3635

3636
    """
3637
    TagsLU.CheckPrereq(self)
3638
    objects.TaggableObject.ValidateTag(self.op.tag)
3639
    if self.op.tag not in self.target.GetTags():
3640
      raise errors.OpPrereqError("Tag not found")
3641

    
3642
  def Exec(self, feedback_fn):
3643
    """Remove the tag from the object.
3644

3645
    """
3646
    self.target.RemoveTag(self.op.tag)
3647
    try:
3648
      self.cfg.Update(self.target)
3649
    except errors.ConfigurationError:
3650
      raise errors.OpRetryError("There has been a modification to the"
3651
                                " config file and the operation has been"
3652
                                " aborted. Please retry.")