Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ 00fe9e38

History | View | Annotate | Download (138.8 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0613,W0201
25

    
26
import os
27
import os.path
28
import sha
29
import socket
30
import time
31
import tempfile
32
import re
33
import platform
34

    
35
from ganeti import rpc
36
from ganeti import ssh
37
from ganeti import logger
38
from ganeti import utils
39
from ganeti import errors
40
from ganeti import hypervisor
41
from ganeti import config
42
from ganeti import constants
43
from ganeti import objects
44
from ganeti import opcodes
45
from ganeti import ssconf
46

    
47
class LogicalUnit(object):
48
  """Logical Unit base class.
49

50
  Subclasses must follow these rules:
51
    - implement CheckPrereq which also fills in the opcode instance
52
      with all the fields (even if as None)
53
    - implement Exec
54
    - implement BuildHooksEnv
55
    - redefine HPATH and HTYPE
56
    - optionally redefine their run requirements (REQ_CLUSTER,
57
      REQ_MASTER); note that all commands require root permissions
58

59
  """
60
  HPATH = None
61
  HTYPE = None
62
  _OP_REQP = []
63
  REQ_CLUSTER = True
64
  REQ_MASTER = True
65

    
66
  def __init__(self, processor, op, cfg, sstore):
67
    """Constructor for LogicalUnit.
68

69
    This needs to be overriden in derived classes in order to check op
70
    validity.
71

72
    """
73
    self.processor = processor
74
    self.op = op
75
    self.cfg = cfg
76
    self.sstore = sstore
77
    for attr_name in self._OP_REQP:
78
      attr_val = getattr(op, attr_name, None)
79
      if attr_val is None:
80
        raise errors.OpPrereqError("Required parameter '%s' missing" %
81
                                   attr_name)
82
    if self.REQ_CLUSTER:
83
      if not cfg.IsCluster():
84
        raise errors.OpPrereqError("Cluster not initialized yet,"
85
                                   " use 'gnt-cluster init' first.")
86
      if self.REQ_MASTER:
87
        master = sstore.GetMasterNode()
88
        if master != utils.HostInfo().name:
89
          raise errors.OpPrereqError("Commands must be run on the master"
90
                                     " node %s" % master)
91

    
92
  def CheckPrereq(self):
93
    """Check prerequisites for this LU.
94

95
    This method should check that the prerequisites for the execution
96
    of this LU are fulfilled. It can do internode communication, but
97
    it should be idempotent - no cluster or system changes are
98
    allowed.
99

100
    The method should raise errors.OpPrereqError in case something is
101
    not fulfilled. Its return value is ignored.
102

103
    This method should also update all the parameters of the opcode to
104
    their canonical form; e.g. a short node name must be fully
105
    expanded after this method has successfully completed (so that
106
    hooks, logging, etc. work correctly).
107

108
    """
109
    raise NotImplementedError
110

    
111
  def Exec(self, feedback_fn):
112
    """Execute the LU.
113

114
    This method should implement the actual work. It should raise
115
    errors.OpExecError for failures that are somewhat dealt with in
116
    code, or expected.
117

118
    """
119
    raise NotImplementedError
120

    
121
  def BuildHooksEnv(self):
122
    """Build hooks environment for this LU.
123

124
    This method should return a three-node tuple consisting of: a dict
125
    containing the environment that will be used for running the
126
    specific hook for this LU, a list of node names on which the hook
127
    should run before the execution, and a list of node names on which
128
    the hook should run after the execution.
129

130
    The keys of the dict must not have 'GANETI_' prefixed as this will
131
    be handled in the hooks runner. Also note additional keys will be
132
    added by the hooks runner. If the LU doesn't define any
133
    environment, an empty dict (and not None) should be returned.
134

135
    As for the node lists, the master should not be included in the
136
    them, as it will be added by the hooks runner in case this LU
137
    requires a cluster to run on (otherwise we don't have a node
138
    list). No nodes should be returned as an empty list (and not
139
    None).
140

141
    Note that if the HPATH for a LU class is None, this function will
142
    not be called.
143

144
    """
145
    raise NotImplementedError
146

    
147

    
148
class NoHooksLU(LogicalUnit):
149
  """Simple LU which runs no hooks.
150

151
  This LU is intended as a parent for other LogicalUnits which will
152
  run no hooks, in order to reduce duplicate code.
153

154
  """
155
  HPATH = None
156
  HTYPE = None
157

    
158
  def BuildHooksEnv(self):
159
    """Build hooks env.
160

161
    This is a no-op, since we don't run hooks.
162

163
    """
164
    return {}, [], []
165

    
166

    
167
def _GetWantedNodes(lu, nodes):
168
  """Returns list of checked and expanded node names.
169

170
  Args:
171
    nodes: List of nodes (strings) or None for all
172

173
  """
174
  if not isinstance(nodes, list):
175
    raise errors.OpPrereqError("Invalid argument type 'nodes'")
176

    
177
  if nodes:
178
    wanted = []
179

    
180
    for name in nodes:
181
      node = lu.cfg.ExpandNodeName(name)
182
      if node is None:
183
        raise errors.OpPrereqError("No such node name '%s'" % name)
184
      wanted.append(node)
185

    
186
  else:
187
    wanted = lu.cfg.GetNodeList()
188
  return utils.NiceSort(wanted)
189

    
190

    
191
def _GetWantedInstances(lu, instances):
192
  """Returns list of checked and expanded instance names.
193

194
  Args:
195
    instances: List of instances (strings) or None for all
196

197
  """
198
  if not isinstance(instances, list):
199
    raise errors.OpPrereqError("Invalid argument type 'instances'")
200

    
201
  if instances:
202
    wanted = []
203

    
204
    for name in instances:
205
      instance = lu.cfg.ExpandInstanceName(name)
206
      if instance is None:
207
        raise errors.OpPrereqError("No such instance name '%s'" % name)
208
      wanted.append(instance)
209

    
210
  else:
211
    wanted = lu.cfg.GetInstanceList()
212
  return utils.NiceSort(wanted)
213

    
214

    
215
def _CheckOutputFields(static, dynamic, selected):
216
  """Checks whether all selected fields are valid.
217

218
  Args:
219
    static: Static fields
220
    dynamic: Dynamic fields
221

222
  """
223
  static_fields = frozenset(static)
224
  dynamic_fields = frozenset(dynamic)
225

    
226
  all_fields = static_fields | dynamic_fields
227

    
228
  if not all_fields.issuperset(selected):
229
    raise errors.OpPrereqError("Unknown output fields selected: %s"
230
                               % ",".join(frozenset(selected).
231
                                          difference(all_fields)))
232

    
233

    
234
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
235
                          memory, vcpus, nics):
236
  """Builds instance related env variables for hooks from single variables.
237

238
  Args:
239
    secondary_nodes: List of secondary nodes as strings
240
  """
241
  env = {
242
    "OP_TARGET": name,
243
    "INSTANCE_NAME": name,
244
    "INSTANCE_PRIMARY": primary_node,
245
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
246
    "INSTANCE_OS_TYPE": os_type,
247
    "INSTANCE_STATUS": status,
248
    "INSTANCE_MEMORY": memory,
249
    "INSTANCE_VCPUS": vcpus,
250
  }
251

    
252
  if nics:
253
    nic_count = len(nics)
254
    for idx, (ip, bridge) in enumerate(nics):
255
      if ip is None:
256
        ip = ""
257
      env["INSTANCE_NIC%d_IP" % idx] = ip
258
      env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
259
  else:
260
    nic_count = 0
261

    
262
  env["INSTANCE_NIC_COUNT"] = nic_count
263

    
264
  return env
265

    
266

    
267
def _BuildInstanceHookEnvByObject(instance, override=None):
268
  """Builds instance related env variables for hooks from an object.
269

270
  Args:
271
    instance: objects.Instance object of instance
272
    override: dict of values to override
273
  """
274
  args = {
275
    'name': instance.name,
276
    'primary_node': instance.primary_node,
277
    'secondary_nodes': instance.secondary_nodes,
278
    'os_type': instance.os,
279
    'status': instance.os,
280
    'memory': instance.memory,
281
    'vcpus': instance.vcpus,
282
    'nics': [(nic.ip, nic.bridge) for nic in instance.nics],
283
  }
284
  if override:
285
    args.update(override)
286
  return _BuildInstanceHookEnv(**args)
287

    
288

    
289
def _UpdateEtcHosts(fullnode, ip):
290
  """Ensure a node has a correct entry in /etc/hosts.
291

292
  Args:
293
    fullnode - Fully qualified domain name of host. (str)
294
    ip       - IPv4 address of host (str)
295

296
  """
297
  node = fullnode.split(".", 1)[0]
298

    
299
  f = open('/etc/hosts', 'r+')
300

    
301
  inthere = False
302

    
303
  save_lines = []
304
  add_lines = []
305
  removed = False
306

    
307
  while True:
308
    rawline = f.readline()
309

    
310
    if not rawline:
311
      # End of file
312
      break
313

    
314
    line = rawline.split('\n')[0]
315

    
316
    # Strip off comments
317
    line = line.split('#')[0]
318

    
319
    if not line:
320
      # Entire line was comment, skip
321
      save_lines.append(rawline)
322
      continue
323

    
324
    fields = line.split()
325

    
326
    haveall = True
327
    havesome = False
328
    for spec in [ ip, fullnode, node ]:
329
      if spec not in fields:
330
        haveall = False
331
      if spec in fields:
332
        havesome = True
333

    
334
    if haveall:
335
      inthere = True
336
      save_lines.append(rawline)
337
      continue
338

    
339
    if havesome and not haveall:
340
      # Line (old, or manual?) which is missing some.  Remove.
341
      removed = True
342
      continue
343

    
344
    save_lines.append(rawline)
345

    
346
  if not inthere:
347
    add_lines.append('%s\t%s %s\n' % (ip, fullnode, node))
348

    
349
  if removed:
350
    if add_lines:
351
      save_lines = save_lines + add_lines
352

    
353
    # We removed a line, write a new file and replace old.
354
    fd, tmpname = tempfile.mkstemp('tmp', 'hosts_', '/etc')
355
    newfile = os.fdopen(fd, 'w')
356
    newfile.write(''.join(save_lines))
357
    newfile.close()
358
    os.rename(tmpname, '/etc/hosts')
359

    
360
  elif add_lines:
361
    # Simply appending a new line will do the trick.
362
    f.seek(0, 2)
363
    for add in add_lines:
364
      f.write(add)
365

    
366
  f.close()
367

    
368

    
369
def _UpdateKnownHosts(fullnode, ip, pubkey):
370
  """Ensure a node has a correct known_hosts entry.
371

372
  Args:
373
    fullnode - Fully qualified domain name of host. (str)
374
    ip       - IPv4 address of host (str)
375
    pubkey   - the public key of the cluster
376

377
  """
378
  if os.path.exists(constants.SSH_KNOWN_HOSTS_FILE):
379
    f = open(constants.SSH_KNOWN_HOSTS_FILE, 'r+')
380
  else:
381
    f = open(constants.SSH_KNOWN_HOSTS_FILE, 'w+')
382

    
383
  inthere = False
384

    
385
  save_lines = []
386
  add_lines = []
387
  removed = False
388

    
389
  for rawline in f:
390
    logger.Debug('read %s' % (repr(rawline),))
391

    
392
    parts = rawline.rstrip('\r\n').split()
393

    
394
    # Ignore unwanted lines
395
    if len(parts) >= 3 and not rawline.lstrip()[0] == '#':
396
      fields = parts[0].split(',')
397
      key = parts[2]
398

    
399
      haveall = True
400
      havesome = False
401
      for spec in [ ip, fullnode ]:
402
        if spec not in fields:
403
          haveall = False
404
        if spec in fields:
405
          havesome = True
406

    
407
      logger.Debug("key, pubkey = %s." % (repr((key, pubkey)),))
408
      if haveall and key == pubkey:
409
        inthere = True
410
        save_lines.append(rawline)
411
        logger.Debug("Keeping known_hosts '%s'." % (repr(rawline),))
412
        continue
413

    
414
      if havesome and (not haveall or key != pubkey):
415
        removed = True
416
        logger.Debug("Discarding known_hosts '%s'." % (repr(rawline),))
417
        continue
418

    
419
    save_lines.append(rawline)
420

    
421
  if not inthere:
422
    add_lines.append('%s,%s ssh-rsa %s\n' % (fullnode, ip, pubkey))
423
    logger.Debug("Adding known_hosts '%s'." % (repr(add_lines[-1]),))
424

    
425
  if removed:
426
    save_lines = save_lines + add_lines
427

    
428
    # Write a new file and replace old.
429
    fd, tmpname = tempfile.mkstemp('.tmp', 'known_hosts.',
430
                                   constants.DATA_DIR)
431
    newfile = os.fdopen(fd, 'w')
432
    try:
433
      newfile.write(''.join(save_lines))
434
    finally:
435
      newfile.close()
436
    logger.Debug("Wrote new known_hosts.")
437
    os.rename(tmpname, constants.SSH_KNOWN_HOSTS_FILE)
438

    
439
  elif add_lines:
440
    # Simply appending a new line will do the trick.
441
    f.seek(0, 2)
442
    for add in add_lines:
443
      f.write(add)
444

    
445
  f.close()
446

    
447

    
448
def _HasValidVG(vglist, vgname):
449
  """Checks if the volume group list is valid.
450

451
  A non-None return value means there's an error, and the return value
452
  is the error message.
453

454
  """
455
  vgsize = vglist.get(vgname, None)
456
  if vgsize is None:
457
    return "volume group '%s' missing" % vgname
458
  elif vgsize < 20480:
459
    return ("volume group '%s' too small (20480MiB required, %dMib found)" %
460
            (vgname, vgsize))
461
  return None
462

    
463

    
464
def _InitSSHSetup(node):
465
  """Setup the SSH configuration for the cluster.
466

467

468
  This generates a dsa keypair for root, adds the pub key to the
469
  permitted hosts and adds the hostkey to its own known hosts.
470

471
  Args:
472
    node: the name of this host as a fqdn
473

474
  """
475
  priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
476

    
477
  for name in priv_key, pub_key:
478
    if os.path.exists(name):
479
      utils.CreateBackup(name)
480
    utils.RemoveFile(name)
481

    
482
  result = utils.RunCmd(["ssh-keygen", "-t", "dsa",
483
                         "-f", priv_key,
484
                         "-q", "-N", ""])
485
  if result.failed:
486
    raise errors.OpExecError("Could not generate ssh keypair, error %s" %
487
                             result.output)
488

    
489
  f = open(pub_key, 'r')
490
  try:
491
    utils.AddAuthorizedKey(auth_keys, f.read(8192))
492
  finally:
493
    f.close()
494

    
495

    
496
def _InitGanetiServerSetup(ss):
497
  """Setup the necessary configuration for the initial node daemon.
498

499
  This creates the nodepass file containing the shared password for
500
  the cluster and also generates the SSL certificate.
501

502
  """
503
  # Create pseudo random password
504
  randpass = sha.new(os.urandom(64)).hexdigest()
505
  # and write it into sstore
506
  ss.SetKey(ss.SS_NODED_PASS, randpass)
507

    
508
  result = utils.RunCmd(["openssl", "req", "-new", "-newkey", "rsa:1024",
509
                         "-days", str(365*5), "-nodes", "-x509",
510
                         "-keyout", constants.SSL_CERT_FILE,
511
                         "-out", constants.SSL_CERT_FILE, "-batch"])
512
  if result.failed:
513
    raise errors.OpExecError("could not generate server ssl cert, command"
514
                             " %s had exitcode %s and error message %s" %
515
                             (result.cmd, result.exit_code, result.output))
516

    
517
  os.chmod(constants.SSL_CERT_FILE, 0400)
518

    
519
  result = utils.RunCmd([constants.NODE_INITD_SCRIPT, "restart"])
520

    
521
  if result.failed:
522
    raise errors.OpExecError("Could not start the node daemon, command %s"
523
                             " had exitcode %s and error %s" %
524
                             (result.cmd, result.exit_code, result.output))
525

    
526

    
527
def _CheckInstanceBridgesExist(instance):
528
  """Check that the brigdes needed by an instance exist.
529

530
  """
531
  # check bridges existance
532
  brlist = [nic.bridge for nic in instance.nics]
533
  if not rpc.call_bridges_exist(instance.primary_node, brlist):
534
    raise errors.OpPrereqError("one or more target bridges %s does not"
535
                               " exist on destination node '%s'" %
536
                               (brlist, instance.primary_node))
537

    
538

    
539
class LUInitCluster(LogicalUnit):
540
  """Initialise the cluster.
541

542
  """
543
  HPATH = "cluster-init"
544
  HTYPE = constants.HTYPE_CLUSTER
545
  _OP_REQP = ["cluster_name", "hypervisor_type", "vg_name", "mac_prefix",
546
              "def_bridge", "master_netdev"]
547
  REQ_CLUSTER = False
548

    
549
  def BuildHooksEnv(self):
550
    """Build hooks env.
551

552
    Notes: Since we don't require a cluster, we must manually add
553
    ourselves in the post-run node list.
554

555
    """
556
    env = {"OP_TARGET": self.op.cluster_name}
557
    return env, [], [self.hostname.name]
558

    
559
  def CheckPrereq(self):
560
    """Verify that the passed name is a valid one.
561

562
    """
563
    if config.ConfigWriter.IsCluster():
564
      raise errors.OpPrereqError("Cluster is already initialised")
565

    
566
    self.hostname = hostname = utils.HostInfo()
567

    
568
    if hostname.ip.startswith("127."):
569
      raise errors.OpPrereqError("This host's IP resolves to the private"
570
                                 " range (%s). Please fix DNS or /etc/hosts." %
571
                                 (hostname.ip,))
572

    
573
    self.clustername = clustername = utils.HostInfo(self.op.cluster_name)
574

    
575
    if not utils.TcpPing(constants.LOCALHOST_IP_ADDRESS, hostname.ip,
576
                         constants.DEFAULT_NODED_PORT):
577
      raise errors.OpPrereqError("Inconsistency: this host's name resolves"
578
                                 " to %s,\nbut this ip address does not"
579
                                 " belong to this host."
580
                                 " Aborting." % hostname.ip)
581

    
582
    secondary_ip = getattr(self.op, "secondary_ip", None)
583
    if secondary_ip and not utils.IsValidIP(secondary_ip):
584
      raise errors.OpPrereqError("Invalid secondary ip given")
585
    if (secondary_ip and
586
        secondary_ip != hostname.ip and
587
        (not utils.TcpPing(constants.LOCALHOST_IP_ADDRESS, secondary_ip,
588
                           constants.DEFAULT_NODED_PORT))):
589
      raise errors.OpPrereqError("You gave %s as secondary IP,\n"
590
                                 "but it does not belong to this host." %
591
                                 secondary_ip)
592
    self.secondary_ip = secondary_ip
593

    
594
    # checks presence of the volume group given
595
    vgstatus = _HasValidVG(utils.ListVolumeGroups(), self.op.vg_name)
596

    
597
    if vgstatus:
598
      raise errors.OpPrereqError("Error: %s" % vgstatus)
599

    
600
    if not re.match("^[0-9a-z]{2}:[0-9a-z]{2}:[0-9a-z]{2}$",
601
                    self.op.mac_prefix):
602
      raise errors.OpPrereqError("Invalid mac prefix given '%s'" %
603
                                 self.op.mac_prefix)
604

    
605
    if self.op.hypervisor_type not in hypervisor.VALID_HTYPES:
606
      raise errors.OpPrereqError("Invalid hypervisor type given '%s'" %
607
                                 self.op.hypervisor_type)
608

    
609
    result = utils.RunCmd(["ip", "link", "show", "dev", self.op.master_netdev])
610
    if result.failed:
611
      raise errors.OpPrereqError("Invalid master netdev given (%s): '%s'" %
612
                                 (self.op.master_netdev,
613
                                  result.output.strip()))
614

    
615
  def Exec(self, feedback_fn):
616
    """Initialize the cluster.
617

618
    """
619
    clustername = self.clustername
620
    hostname = self.hostname
621

    
622
    # set up the simple store
623
    self.sstore = ss = ssconf.SimpleStore()
624
    ss.SetKey(ss.SS_HYPERVISOR, self.op.hypervisor_type)
625
    ss.SetKey(ss.SS_MASTER_NODE, hostname.name)
626
    ss.SetKey(ss.SS_MASTER_IP, clustername.ip)
627
    ss.SetKey(ss.SS_MASTER_NETDEV, self.op.master_netdev)
628
    ss.SetKey(ss.SS_CLUSTER_NAME, clustername.name)
629

    
630
    # set up the inter-node password and certificate
631
    _InitGanetiServerSetup(ss)
632

    
633
    # start the master ip
634
    rpc.call_node_start_master(hostname.name)
635

    
636
    # set up ssh config and /etc/hosts
637
    f = open(constants.SSH_HOST_RSA_PUB, 'r')
638
    try:
639
      sshline = f.read()
640
    finally:
641
      f.close()
642
    sshkey = sshline.split(" ")[1]
643

    
644
    _UpdateEtcHosts(hostname.name, hostname.ip)
645

    
646
    _UpdateKnownHosts(hostname.name, hostname.ip, sshkey)
647

    
648
    _InitSSHSetup(hostname.name)
649

    
650
    # init of cluster config file
651
    self.cfg = cfgw = config.ConfigWriter()
652
    cfgw.InitConfig(hostname.name, hostname.ip, self.secondary_ip,
653
                    sshkey, self.op.mac_prefix,
654
                    self.op.vg_name, self.op.def_bridge)
655

    
656

    
657
class LUDestroyCluster(NoHooksLU):
658
  """Logical unit for destroying the cluster.
659

660
  """
661
  _OP_REQP = []
662

    
663
  def CheckPrereq(self):
664
    """Check prerequisites.
665

666
    This checks whether the cluster is empty.
667

668
    Any errors are signalled by raising errors.OpPrereqError.
669

670
    """
671
    master = self.sstore.GetMasterNode()
672

    
673
    nodelist = self.cfg.GetNodeList()
674
    if len(nodelist) != 1 or nodelist[0] != master:
675
      raise errors.OpPrereqError("There are still %d node(s) in"
676
                                 " this cluster." % (len(nodelist) - 1))
677
    instancelist = self.cfg.GetInstanceList()
678
    if instancelist:
679
      raise errors.OpPrereqError("There are still %d instance(s) in"
680
                                 " this cluster." % len(instancelist))
681

    
682
  def Exec(self, feedback_fn):
683
    """Destroys the cluster.
684

685
    """
686
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
687
    utils.CreateBackup(priv_key)
688
    utils.CreateBackup(pub_key)
689
    rpc.call_node_leave_cluster(self.sstore.GetMasterNode())
690

    
691

    
692
class LUVerifyCluster(NoHooksLU):
693
  """Verifies the cluster status.
694

695
  """
696
  _OP_REQP = []
697

    
698
  def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
699
                  remote_version, feedback_fn):
700
    """Run multiple tests against a node.
701

702
    Test list:
703
      - compares ganeti version
704
      - checks vg existance and size > 20G
705
      - checks config file checksum
706
      - checks ssh to other nodes
707

708
    Args:
709
      node: name of the node to check
710
      file_list: required list of files
711
      local_cksum: dictionary of local files and their checksums
712

713
    """
714
    # compares ganeti version
715
    local_version = constants.PROTOCOL_VERSION
716
    if not remote_version:
717
      feedback_fn(" - ERROR: connection to %s failed" % (node))
718
      return True
719

    
720
    if local_version != remote_version:
721
      feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
722
                      (local_version, node, remote_version))
723
      return True
724

    
725
    # checks vg existance and size > 20G
726

    
727
    bad = False
728
    if not vglist:
729
      feedback_fn("  - ERROR: unable to check volume groups on node %s." %
730
                      (node,))
731
      bad = True
732
    else:
733
      vgstatus = _HasValidVG(vglist, self.cfg.GetVGName())
734
      if vgstatus:
735
        feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
736
        bad = True
737

    
738
    # checks config file checksum
739
    # checks ssh to any
740

    
741
    if 'filelist' not in node_result:
742
      bad = True
743
      feedback_fn("  - ERROR: node hasn't returned file checksum data")
744
    else:
745
      remote_cksum = node_result['filelist']
746
      for file_name in file_list:
747
        if file_name not in remote_cksum:
748
          bad = True
749
          feedback_fn("  - ERROR: file '%s' missing" % file_name)
750
        elif remote_cksum[file_name] != local_cksum[file_name]:
751
          bad = True
752
          feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
753

    
754
    if 'nodelist' not in node_result:
755
      bad = True
756
      feedback_fn("  - ERROR: node hasn't returned node connectivity data")
757
    else:
758
      if node_result['nodelist']:
759
        bad = True
760
        for node in node_result['nodelist']:
761
          feedback_fn("  - ERROR: communication with node '%s': %s" %
762
                          (node, node_result['nodelist'][node]))
763
    hyp_result = node_result.get('hypervisor', None)
764
    if hyp_result is not None:
765
      feedback_fn("  - ERROR: hypervisor verify failure: '%s'" % hyp_result)
766
    return bad
767

    
768
  def _VerifyInstance(self, instance, node_vol_is, node_instance, feedback_fn):
769
    """Verify an instance.
770

771
    This function checks to see if the required block devices are
772
    available on the instance's node.
773

774
    """
775
    bad = False
776

    
777
    instancelist = self.cfg.GetInstanceList()
778
    if not instance in instancelist:
779
      feedback_fn("  - ERROR: instance %s not in instance list %s" %
780
                      (instance, instancelist))
781
      bad = True
782

    
783
    instanceconfig = self.cfg.GetInstanceInfo(instance)
784
    node_current = instanceconfig.primary_node
785

    
786
    node_vol_should = {}
787
    instanceconfig.MapLVsByNode(node_vol_should)
788

    
789
    for node in node_vol_should:
790
      for volume in node_vol_should[node]:
791
        if node not in node_vol_is or volume not in node_vol_is[node]:
792
          feedback_fn("  - ERROR: volume %s missing on node %s" %
793
                          (volume, node))
794
          bad = True
795

    
796
    if not instanceconfig.status == 'down':
797
      if not instance in node_instance[node_current]:
798
        feedback_fn("  - ERROR: instance %s not running on node %s" %
799
                        (instance, node_current))
800
        bad = True
801

    
802
    for node in node_instance:
803
      if (not node == node_current):
804
        if instance in node_instance[node]:
805
          feedback_fn("  - ERROR: instance %s should not run on node %s" %
806
                          (instance, node))
807
          bad = True
808

    
809
    return bad
810

    
811
  def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
812
    """Verify if there are any unknown volumes in the cluster.
813

814
    The .os, .swap and backup volumes are ignored. All other volumes are
815
    reported as unknown.
816

817
    """
818
    bad = False
819

    
820
    for node in node_vol_is:
821
      for volume in node_vol_is[node]:
822
        if node not in node_vol_should or volume not in node_vol_should[node]:
823
          feedback_fn("  - ERROR: volume %s on node %s should not exist" %
824
                      (volume, node))
825
          bad = True
826
    return bad
827

    
828
  def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
829
    """Verify the list of running instances.
830

831
    This checks what instances are running but unknown to the cluster.
832

833
    """
834
    bad = False
835
    for node in node_instance:
836
      for runninginstance in node_instance[node]:
837
        if runninginstance not in instancelist:
838
          feedback_fn("  - ERROR: instance %s on node %s should not exist" %
839
                          (runninginstance, node))
840
          bad = True
841
    return bad
842

    
843
  def CheckPrereq(self):
844
    """Check prerequisites.
845

846
    This has no prerequisites.
847

848
    """
849
    pass
850

    
851
  def Exec(self, feedback_fn):
852
    """Verify integrity of cluster, performing various test on nodes.
853

854
    """
855
    bad = False
856
    feedback_fn("* Verifying global settings")
857
    self.cfg.VerifyConfig()
858

    
859
    master = self.sstore.GetMasterNode()
860
    vg_name = self.cfg.GetVGName()
861
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
862
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
863
    node_volume = {}
864
    node_instance = {}
865

    
866
    # FIXME: verify OS list
867
    # do local checksums
868
    file_names = list(self.sstore.GetFileList())
869
    file_names.append(constants.SSL_CERT_FILE)
870
    file_names.append(constants.CLUSTER_CONF_FILE)
871
    local_checksums = utils.FingerprintFiles(file_names)
872

    
873
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
874
    all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
875
    all_instanceinfo = rpc.call_instance_list(nodelist)
876
    all_vglist = rpc.call_vg_list(nodelist)
877
    node_verify_param = {
878
      'filelist': file_names,
879
      'nodelist': nodelist,
880
      'hypervisor': None,
881
      }
882
    all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
883
    all_rversion = rpc.call_version(nodelist)
884

    
885
    for node in nodelist:
886
      feedback_fn("* Verifying node %s" % node)
887
      result = self._VerifyNode(node, file_names, local_checksums,
888
                                all_vglist[node], all_nvinfo[node],
889
                                all_rversion[node], feedback_fn)
890
      bad = bad or result
891

    
892
      # node_volume
893
      volumeinfo = all_volumeinfo[node]
894

    
895
      if type(volumeinfo) != dict:
896
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
897
        bad = True
898
        continue
899

    
900
      node_volume[node] = volumeinfo
901

    
902
      # node_instance
903
      nodeinstance = all_instanceinfo[node]
904
      if type(nodeinstance) != list:
905
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
906
        bad = True
907
        continue
908

    
909
      node_instance[node] = nodeinstance
910

    
911
    node_vol_should = {}
912

    
913
    for instance in instancelist:
914
      feedback_fn("* Verifying instance %s" % instance)
915
      result =  self._VerifyInstance(instance, node_volume, node_instance,
916
                                     feedback_fn)
917
      bad = bad or result
918

    
919
      inst_config = self.cfg.GetInstanceInfo(instance)
920

    
921
      inst_config.MapLVsByNode(node_vol_should)
922

    
923
    feedback_fn("* Verifying orphan volumes")
924
    result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
925
                                       feedback_fn)
926
    bad = bad or result
927

    
928
    feedback_fn("* Verifying remaining instances")
929
    result = self._VerifyOrphanInstances(instancelist, node_instance,
930
                                         feedback_fn)
931
    bad = bad or result
932

    
933
    return int(bad)
934

    
935

    
936
class LURenameCluster(LogicalUnit):
937
  """Rename the cluster.
938

939
  """
940
  HPATH = "cluster-rename"
941
  HTYPE = constants.HTYPE_CLUSTER
942
  _OP_REQP = ["name"]
943

    
944
  def BuildHooksEnv(self):
945
    """Build hooks env.
946

947
    """
948
    env = {
949
      "OP_TARGET": self.op.sstore.GetClusterName(),
950
      "NEW_NAME": self.op.name,
951
      }
952
    mn = self.sstore.GetMasterNode()
953
    return env, [mn], [mn]
954

    
955
  def CheckPrereq(self):
956
    """Verify that the passed name is a valid one.
957

958
    """
959
    hostname = utils.HostInfo(self.op.name)
960

    
961
    new_name = hostname.name
962
    self.ip = new_ip = hostname.ip
963
    old_name = self.sstore.GetClusterName()
964
    old_ip = self.sstore.GetMasterIP()
965
    if new_name == old_name and new_ip == old_ip:
966
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
967
                                 " cluster has changed")
968
    if new_ip != old_ip:
969
      result = utils.RunCmd(["fping", "-q", new_ip])
970
      if not result.failed:
971
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
972
                                   " reachable on the network. Aborting." %
973
                                   new_ip)
974

    
975
    self.op.name = new_name
976

    
977
  def Exec(self, feedback_fn):
978
    """Rename the cluster.
979

980
    """
981
    clustername = self.op.name
982
    ip = self.ip
983
    ss = self.sstore
984

    
985
    # shutdown the master IP
986
    master = ss.GetMasterNode()
987
    if not rpc.call_node_stop_master(master):
988
      raise errors.OpExecError("Could not disable the master role")
989

    
990
    try:
991
      # modify the sstore
992
      ss.SetKey(ss.SS_MASTER_IP, ip)
993
      ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
994

    
995
      # Distribute updated ss config to all nodes
996
      myself = self.cfg.GetNodeInfo(master)
997
      dist_nodes = self.cfg.GetNodeList()
998
      if myself.name in dist_nodes:
999
        dist_nodes.remove(myself.name)
1000

    
1001
      logger.Debug("Copying updated ssconf data to all nodes")
1002
      for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1003
        fname = ss.KeyToFilename(keyname)
1004
        result = rpc.call_upload_file(dist_nodes, fname)
1005
        for to_node in dist_nodes:
1006
          if not result[to_node]:
1007
            logger.Error("copy of file %s to node %s failed" %
1008
                         (fname, to_node))
1009
    finally:
1010
      if not rpc.call_node_start_master(master):
1011
        logger.Error("Could not re-enable the master role on the master,\n"
1012
                     "please restart manually.")
1013

    
1014

    
1015
def _WaitForSync(cfgw, instance, oneshot=False, unlock=False):
1016
  """Sleep and poll for an instance's disk to sync.
1017

1018
  """
1019
  if not instance.disks:
1020
    return True
1021

    
1022
  if not oneshot:
1023
    logger.ToStdout("Waiting for instance %s to sync disks." % instance.name)
1024

    
1025
  node = instance.primary_node
1026

    
1027
  for dev in instance.disks:
1028
    cfgw.SetDiskID(dev, node)
1029

    
1030
  retries = 0
1031
  while True:
1032
    max_time = 0
1033
    done = True
1034
    cumul_degraded = False
1035
    rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1036
    if not rstats:
1037
      logger.ToStderr("Can't get any data from node %s" % node)
1038
      retries += 1
1039
      if retries >= 10:
1040
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1041
                                 " aborting." % node)
1042
      time.sleep(6)
1043
      continue
1044
    retries = 0
1045
    for i in range(len(rstats)):
1046
      mstat = rstats[i]
1047
      if mstat is None:
1048
        logger.ToStderr("Can't compute data for node %s/%s" %
1049
                        (node, instance.disks[i].iv_name))
1050
        continue
1051
      perc_done, est_time, is_degraded = mstat
1052
      cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1053
      if perc_done is not None:
1054
        done = False
1055
        if est_time is not None:
1056
          rem_time = "%d estimated seconds remaining" % est_time
1057
          max_time = est_time
1058
        else:
1059
          rem_time = "no time estimate"
1060
        logger.ToStdout("- device %s: %5.2f%% done, %s" %
1061
                        (instance.disks[i].iv_name, perc_done, rem_time))
1062
    if done or oneshot:
1063
      break
1064

    
1065
    if unlock:
1066
      utils.Unlock('cmd')
1067
    try:
1068
      time.sleep(min(60, max_time))
1069
    finally:
1070
      if unlock:
1071
        utils.Lock('cmd')
1072

    
1073
  if done:
1074
    logger.ToStdout("Instance %s's disks are in sync." % instance.name)
1075
  return not cumul_degraded
1076

    
1077

    
1078
def _CheckDiskConsistency(cfgw, dev, node, on_primary):
1079
  """Check that mirrors are not degraded.
1080

1081
  """
1082
  cfgw.SetDiskID(dev, node)
1083

    
1084
  result = True
1085
  if on_primary or dev.AssembleOnSecondary():
1086
    rstats = rpc.call_blockdev_find(node, dev)
1087
    if not rstats:
1088
      logger.ToStderr("Can't get any data from node %s" % node)
1089
      result = False
1090
    else:
1091
      result = result and (not rstats[5])
1092
  if dev.children:
1093
    for child in dev.children:
1094
      result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1095

    
1096
  return result
1097

    
1098

    
1099
class LUDiagnoseOS(NoHooksLU):
1100
  """Logical unit for OS diagnose/query.
1101

1102
  """
1103
  _OP_REQP = []
1104

    
1105
  def CheckPrereq(self):
1106
    """Check prerequisites.
1107

1108
    This always succeeds, since this is a pure query LU.
1109

1110
    """
1111
    return
1112

    
1113
  def Exec(self, feedback_fn):
1114
    """Compute the list of OSes.
1115

1116
    """
1117
    node_list = self.cfg.GetNodeList()
1118
    node_data = rpc.call_os_diagnose(node_list)
1119
    if node_data == False:
1120
      raise errors.OpExecError("Can't gather the list of OSes")
1121
    return node_data
1122

    
1123

    
1124
class LURemoveNode(LogicalUnit):
1125
  """Logical unit for removing a node.
1126

1127
  """
1128
  HPATH = "node-remove"
1129
  HTYPE = constants.HTYPE_NODE
1130
  _OP_REQP = ["node_name"]
1131

    
1132
  def BuildHooksEnv(self):
1133
    """Build hooks env.
1134

1135
    This doesn't run on the target node in the pre phase as a failed
1136
    node would not allows itself to run.
1137

1138
    """
1139
    env = {
1140
      "OP_TARGET": self.op.node_name,
1141
      "NODE_NAME": self.op.node_name,
1142
      }
1143
    all_nodes = self.cfg.GetNodeList()
1144
    all_nodes.remove(self.op.node_name)
1145
    return env, all_nodes, all_nodes
1146

    
1147
  def CheckPrereq(self):
1148
    """Check prerequisites.
1149

1150
    This checks:
1151
     - the node exists in the configuration
1152
     - it does not have primary or secondary instances
1153
     - it's not the master
1154

1155
    Any errors are signalled by raising errors.OpPrereqError.
1156

1157
    """
1158
    node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1159
    if node is None:
1160
      raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1161

    
1162
    instance_list = self.cfg.GetInstanceList()
1163

    
1164
    masternode = self.sstore.GetMasterNode()
1165
    if node.name == masternode:
1166
      raise errors.OpPrereqError("Node is the master node,"
1167
                                 " you need to failover first.")
1168

    
1169
    for instance_name in instance_list:
1170
      instance = self.cfg.GetInstanceInfo(instance_name)
1171
      if node.name == instance.primary_node:
1172
        raise errors.OpPrereqError("Instance %s still running on the node,"
1173
                                   " please remove first." % instance_name)
1174
      if node.name in instance.secondary_nodes:
1175
        raise errors.OpPrereqError("Instance %s has node as a secondary,"
1176
                                   " please remove first." % instance_name)
1177
    self.op.node_name = node.name
1178
    self.node = node
1179

    
1180
  def Exec(self, feedback_fn):
1181
    """Removes the node from the cluster.
1182

1183
    """
1184
    node = self.node
1185
    logger.Info("stopping the node daemon and removing configs from node %s" %
1186
                node.name)
1187

    
1188
    rpc.call_node_leave_cluster(node.name)
1189

    
1190
    ssh.SSHCall(node.name, 'root', "%s stop" % constants.NODE_INITD_SCRIPT)
1191

    
1192
    logger.Info("Removing node %s from config" % node.name)
1193

    
1194
    self.cfg.RemoveNode(node.name)
1195

    
1196

    
1197
class LUQueryNodes(NoHooksLU):
1198
  """Logical unit for querying nodes.
1199

1200
  """
1201
  _OP_REQP = ["output_fields", "names"]
1202

    
1203
  def CheckPrereq(self):
1204
    """Check prerequisites.
1205

1206
    This checks that the fields required are valid output fields.
1207

1208
    """
1209
    self.dynamic_fields = frozenset(["dtotal", "dfree",
1210
                                     "mtotal", "mnode", "mfree",
1211
                                     "bootid"])
1212

    
1213
    _CheckOutputFields(static=["name", "pinst_cnt", "sinst_cnt",
1214
                               "pinst_list", "sinst_list",
1215
                               "pip", "sip"],
1216
                       dynamic=self.dynamic_fields,
1217
                       selected=self.op.output_fields)
1218

    
1219
    self.wanted = _GetWantedNodes(self, self.op.names)
1220

    
1221
  def Exec(self, feedback_fn):
1222
    """Computes the list of nodes and their attributes.
1223

1224
    """
1225
    nodenames = self.wanted
1226
    nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1227

    
1228
    # begin data gathering
1229

    
1230
    if self.dynamic_fields.intersection(self.op.output_fields):
1231
      live_data = {}
1232
      node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1233
      for name in nodenames:
1234
        nodeinfo = node_data.get(name, None)
1235
        if nodeinfo:
1236
          live_data[name] = {
1237
            "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1238
            "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1239
            "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1240
            "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1241
            "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1242
            "bootid": nodeinfo['bootid'],
1243
            }
1244
        else:
1245
          live_data[name] = {}
1246
    else:
1247
      live_data = dict.fromkeys(nodenames, {})
1248

    
1249
    node_to_primary = dict([(name, set()) for name in nodenames])
1250
    node_to_secondary = dict([(name, set()) for name in nodenames])
1251

    
1252
    inst_fields = frozenset(("pinst_cnt", "pinst_list",
1253
                             "sinst_cnt", "sinst_list"))
1254
    if inst_fields & frozenset(self.op.output_fields):
1255
      instancelist = self.cfg.GetInstanceList()
1256

    
1257
      for instance_name in instancelist:
1258
        inst = self.cfg.GetInstanceInfo(instance_name)
1259
        if inst.primary_node in node_to_primary:
1260
          node_to_primary[inst.primary_node].add(inst.name)
1261
        for secnode in inst.secondary_nodes:
1262
          if secnode in node_to_secondary:
1263
            node_to_secondary[secnode].add(inst.name)
1264

    
1265
    # end data gathering
1266

    
1267
    output = []
1268
    for node in nodelist:
1269
      node_output = []
1270
      for field in self.op.output_fields:
1271
        if field == "name":
1272
          val = node.name
1273
        elif field == "pinst_list":
1274
          val = list(node_to_primary[node.name])
1275
        elif field == "sinst_list":
1276
          val = list(node_to_secondary[node.name])
1277
        elif field == "pinst_cnt":
1278
          val = len(node_to_primary[node.name])
1279
        elif field == "sinst_cnt":
1280
          val = len(node_to_secondary[node.name])
1281
        elif field == "pip":
1282
          val = node.primary_ip
1283
        elif field == "sip":
1284
          val = node.secondary_ip
1285
        elif field in self.dynamic_fields:
1286
          val = live_data[node.name].get(field, None)
1287
        else:
1288
          raise errors.ParameterError(field)
1289
        node_output.append(val)
1290
      output.append(node_output)
1291

    
1292
    return output
1293

    
1294

    
1295
class LUQueryNodeVolumes(NoHooksLU):
1296
  """Logical unit for getting volumes on node(s).
1297

1298
  """
1299
  _OP_REQP = ["nodes", "output_fields"]
1300

    
1301
  def CheckPrereq(self):
1302
    """Check prerequisites.
1303

1304
    This checks that the fields required are valid output fields.
1305

1306
    """
1307
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1308

    
1309
    _CheckOutputFields(static=["node"],
1310
                       dynamic=["phys", "vg", "name", "size", "instance"],
1311
                       selected=self.op.output_fields)
1312

    
1313

    
1314
  def Exec(self, feedback_fn):
1315
    """Computes the list of nodes and their attributes.
1316

1317
    """
1318
    nodenames = self.nodes
1319
    volumes = rpc.call_node_volumes(nodenames)
1320

    
1321
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
1322
             in self.cfg.GetInstanceList()]
1323

    
1324
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1325

    
1326
    output = []
1327
    for node in nodenames:
1328
      if node not in volumes or not volumes[node]:
1329
        continue
1330

    
1331
      node_vols = volumes[node][:]
1332
      node_vols.sort(key=lambda vol: vol['dev'])
1333

    
1334
      for vol in node_vols:
1335
        node_output = []
1336
        for field in self.op.output_fields:
1337
          if field == "node":
1338
            val = node
1339
          elif field == "phys":
1340
            val = vol['dev']
1341
          elif field == "vg":
1342
            val = vol['vg']
1343
          elif field == "name":
1344
            val = vol['name']
1345
          elif field == "size":
1346
            val = int(float(vol['size']))
1347
          elif field == "instance":
1348
            for inst in ilist:
1349
              if node not in lv_by_node[inst]:
1350
                continue
1351
              if vol['name'] in lv_by_node[inst][node]:
1352
                val = inst.name
1353
                break
1354
            else:
1355
              val = '-'
1356
          else:
1357
            raise errors.ParameterError(field)
1358
          node_output.append(str(val))
1359

    
1360
        output.append(node_output)
1361

    
1362
    return output
1363

    
1364

    
1365
class LUAddNode(LogicalUnit):
1366
  """Logical unit for adding node to the cluster.
1367

1368
  """
1369
  HPATH = "node-add"
1370
  HTYPE = constants.HTYPE_NODE
1371
  _OP_REQP = ["node_name"]
1372

    
1373
  def BuildHooksEnv(self):
1374
    """Build hooks env.
1375

1376
    This will run on all nodes before, and on all nodes + the new node after.
1377

1378
    """
1379
    env = {
1380
      "OP_TARGET": self.op.node_name,
1381
      "NODE_NAME": self.op.node_name,
1382
      "NODE_PIP": self.op.primary_ip,
1383
      "NODE_SIP": self.op.secondary_ip,
1384
      }
1385
    nodes_0 = self.cfg.GetNodeList()
1386
    nodes_1 = nodes_0 + [self.op.node_name, ]
1387
    return env, nodes_0, nodes_1
1388

    
1389
  def CheckPrereq(self):
1390
    """Check prerequisites.
1391

1392
    This checks:
1393
     - the new node is not already in the config
1394
     - it is resolvable
1395
     - its parameters (single/dual homed) matches the cluster
1396

1397
    Any errors are signalled by raising errors.OpPrereqError.
1398

1399
    """
1400
    node_name = self.op.node_name
1401
    cfg = self.cfg
1402

    
1403
    dns_data = utils.HostInfo(node_name)
1404

    
1405
    node = dns_data.name
1406
    primary_ip = self.op.primary_ip = dns_data.ip
1407
    secondary_ip = getattr(self.op, "secondary_ip", None)
1408
    if secondary_ip is None:
1409
      secondary_ip = primary_ip
1410
    if not utils.IsValidIP(secondary_ip):
1411
      raise errors.OpPrereqError("Invalid secondary IP given")
1412
    self.op.secondary_ip = secondary_ip
1413
    node_list = cfg.GetNodeList()
1414
    if node in node_list:
1415
      raise errors.OpPrereqError("Node %s is already in the configuration"
1416
                                 % node)
1417

    
1418
    for existing_node_name in node_list:
1419
      existing_node = cfg.GetNodeInfo(existing_node_name)
1420
      if (existing_node.primary_ip == primary_ip or
1421
          existing_node.secondary_ip == primary_ip or
1422
          existing_node.primary_ip == secondary_ip or
1423
          existing_node.secondary_ip == secondary_ip):
1424
        raise errors.OpPrereqError("New node ip address(es) conflict with"
1425
                                   " existing node %s" % existing_node.name)
1426

    
1427
    # check that the type of the node (single versus dual homed) is the
1428
    # same as for the master
1429
    myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1430
    master_singlehomed = myself.secondary_ip == myself.primary_ip
1431
    newbie_singlehomed = secondary_ip == primary_ip
1432
    if master_singlehomed != newbie_singlehomed:
1433
      if master_singlehomed:
1434
        raise errors.OpPrereqError("The master has no private ip but the"
1435
                                   " new node has one")
1436
      else:
1437
        raise errors.OpPrereqError("The master has a private ip but the"
1438
                                   " new node doesn't have one")
1439

    
1440
    # checks reachablity
1441
    if not utils.TcpPing(utils.HostInfo().name,
1442
                         primary_ip,
1443
                         constants.DEFAULT_NODED_PORT):
1444
      raise errors.OpPrereqError("Node not reachable by ping")
1445

    
1446
    if not newbie_singlehomed:
1447
      # check reachability from my secondary ip to newbie's secondary ip
1448
      if not utils.TcpPing(myself.secondary_ip,
1449
                           secondary_ip,
1450
                           constants.DEFAULT_NODED_PORT):
1451
        raise errors.OpPrereqError(
1452
          "Node secondary ip not reachable by TCP based ping to noded port")
1453

    
1454
    self.new_node = objects.Node(name=node,
1455
                                 primary_ip=primary_ip,
1456
                                 secondary_ip=secondary_ip)
1457

    
1458
  def Exec(self, feedback_fn):
1459
    """Adds the new node to the cluster.
1460

1461
    """
1462
    new_node = self.new_node
1463
    node = new_node.name
1464

    
1465
    # set up inter-node password and certificate and restarts the node daemon
1466
    gntpass = self.sstore.GetNodeDaemonPassword()
1467
    if not re.match('^[a-zA-Z0-9.]{1,64}$', gntpass):
1468
      raise errors.OpExecError("ganeti password corruption detected")
1469
    f = open(constants.SSL_CERT_FILE)
1470
    try:
1471
      gntpem = f.read(8192)
1472
    finally:
1473
      f.close()
1474
    # in the base64 pem encoding, neither '!' nor '.' are valid chars,
1475
    # so we use this to detect an invalid certificate; as long as the
1476
    # cert doesn't contain this, the here-document will be correctly
1477
    # parsed by the shell sequence below
1478
    if re.search('^!EOF\.', gntpem, re.MULTILINE):
1479
      raise errors.OpExecError("invalid PEM encoding in the SSL certificate")
1480
    if not gntpem.endswith("\n"):
1481
      raise errors.OpExecError("PEM must end with newline")
1482
    logger.Info("copy cluster pass to %s and starting the node daemon" % node)
1483

    
1484
    # and then connect with ssh to set password and start ganeti-noded
1485
    # note that all the below variables are sanitized at this point,
1486
    # either by being constants or by the checks above
1487
    ss = self.sstore
1488
    mycommand = ("umask 077 && "
1489
                 "echo '%s' > '%s' && "
1490
                 "cat > '%s' << '!EOF.' && \n"
1491
                 "%s!EOF.\n%s restart" %
1492
                 (gntpass, ss.KeyToFilename(ss.SS_NODED_PASS),
1493
                  constants.SSL_CERT_FILE, gntpem,
1494
                  constants.NODE_INITD_SCRIPT))
1495

    
1496
    result = ssh.SSHCall(node, 'root', mycommand, batch=False, ask_key=True)
1497
    if result.failed:
1498
      raise errors.OpExecError("Remote command on node %s, error: %s,"
1499
                               " output: %s" %
1500
                               (node, result.fail_reason, result.output))
1501

    
1502
    # check connectivity
1503
    time.sleep(4)
1504

    
1505
    result = rpc.call_version([node])[node]
1506
    if result:
1507
      if constants.PROTOCOL_VERSION == result:
1508
        logger.Info("communication to node %s fine, sw version %s match" %
1509
                    (node, result))
1510
      else:
1511
        raise errors.OpExecError("Version mismatch master version %s,"
1512
                                 " node version %s" %
1513
                                 (constants.PROTOCOL_VERSION, result))
1514
    else:
1515
      raise errors.OpExecError("Cannot get version from the new node")
1516

    
1517
    # setup ssh on node
1518
    logger.Info("copy ssh key to node %s" % node)
1519
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1520
    keyarray = []
1521
    keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1522
                constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1523
                priv_key, pub_key]
1524

    
1525
    for i in keyfiles:
1526
      f = open(i, 'r')
1527
      try:
1528
        keyarray.append(f.read())
1529
      finally:
1530
        f.close()
1531

    
1532
    result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1533
                               keyarray[3], keyarray[4], keyarray[5])
1534

    
1535
    if not result:
1536
      raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1537

    
1538
    # Add node to our /etc/hosts, and add key to known_hosts
1539
    _UpdateEtcHosts(new_node.name, new_node.primary_ip)
1540
    _UpdateKnownHosts(new_node.name, new_node.primary_ip,
1541
                      self.cfg.GetHostKey())
1542

    
1543
    if new_node.secondary_ip != new_node.primary_ip:
1544
      if not rpc.call_node_tcp_ping(new_node.name,
1545
                                    constants.LOCALHOST_IP_ADDRESS,
1546
                                    new_node.secondary_ip,
1547
                                    constants.DEFAULT_NODED_PORT,
1548
                                    10, False):
1549
        raise errors.OpExecError("Node claims it doesn't have the"
1550
                                 " secondary ip you gave (%s).\n"
1551
                                 "Please fix and re-run this command." %
1552
                                 new_node.secondary_ip)
1553

    
1554
    success, msg = ssh.VerifyNodeHostname(node)
1555
    if not success:
1556
      raise errors.OpExecError("Node '%s' claims it has a different hostname"
1557
                               " than the one the resolver gives: %s.\n"
1558
                               "Please fix and re-run this command." %
1559
                               (node, msg))
1560

    
1561
    # Distribute updated /etc/hosts and known_hosts to all nodes,
1562
    # including the node just added
1563
    myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1564
    dist_nodes = self.cfg.GetNodeList() + [node]
1565
    if myself.name in dist_nodes:
1566
      dist_nodes.remove(myself.name)
1567

    
1568
    logger.Debug("Copying hosts and known_hosts to all nodes")
1569
    for fname in ("/etc/hosts", constants.SSH_KNOWN_HOSTS_FILE):
1570
      result = rpc.call_upload_file(dist_nodes, fname)
1571
      for to_node in dist_nodes:
1572
        if not result[to_node]:
1573
          logger.Error("copy of file %s to node %s failed" %
1574
                       (fname, to_node))
1575

    
1576
    to_copy = ss.GetFileList()
1577
    for fname in to_copy:
1578
      if not ssh.CopyFileToNode(node, fname):
1579
        logger.Error("could not copy file %s to node %s" % (fname, node))
1580

    
1581
    logger.Info("adding node %s to cluster.conf" % node)
1582
    self.cfg.AddNode(new_node)
1583

    
1584

    
1585
class LUMasterFailover(LogicalUnit):
1586
  """Failover the master node to the current node.
1587

1588
  This is a special LU in that it must run on a non-master node.
1589

1590
  """
1591
  HPATH = "master-failover"
1592
  HTYPE = constants.HTYPE_CLUSTER
1593
  REQ_MASTER = False
1594
  _OP_REQP = []
1595

    
1596
  def BuildHooksEnv(self):
1597
    """Build hooks env.
1598

1599
    This will run on the new master only in the pre phase, and on all
1600
    the nodes in the post phase.
1601

1602
    """
1603
    env = {
1604
      "OP_TARGET": self.new_master,
1605
      "NEW_MASTER": self.new_master,
1606
      "OLD_MASTER": self.old_master,
1607
      }
1608
    return env, [self.new_master], self.cfg.GetNodeList()
1609

    
1610
  def CheckPrereq(self):
1611
    """Check prerequisites.
1612

1613
    This checks that we are not already the master.
1614

1615
    """
1616
    self.new_master = utils.HostInfo().name
1617
    self.old_master = self.sstore.GetMasterNode()
1618

    
1619
    if self.old_master == self.new_master:
1620
      raise errors.OpPrereqError("This commands must be run on the node"
1621
                                 " where you want the new master to be.\n"
1622
                                 "%s is already the master" %
1623
                                 self.old_master)
1624

    
1625
  def Exec(self, feedback_fn):
1626
    """Failover the master node.
1627

1628
    This command, when run on a non-master node, will cause the current
1629
    master to cease being master, and the non-master to become new
1630
    master.
1631

1632
    """
1633
    #TODO: do not rely on gethostname returning the FQDN
1634
    logger.Info("setting master to %s, old master: %s" %
1635
                (self.new_master, self.old_master))
1636

    
1637
    if not rpc.call_node_stop_master(self.old_master):
1638
      logger.Error("could disable the master role on the old master"
1639
                   " %s, please disable manually" % self.old_master)
1640

    
1641
    ss = self.sstore
1642
    ss.SetKey(ss.SS_MASTER_NODE, self.new_master)
1643
    if not rpc.call_upload_file(self.cfg.GetNodeList(),
1644
                                ss.KeyToFilename(ss.SS_MASTER_NODE)):
1645
      logger.Error("could not distribute the new simple store master file"
1646
                   " to the other nodes, please check.")
1647

    
1648
    if not rpc.call_node_start_master(self.new_master):
1649
      logger.Error("could not start the master role on the new master"
1650
                   " %s, please check" % self.new_master)
1651
      feedback_fn("Error in activating the master IP on the new master,\n"
1652
                  "please fix manually.")
1653

    
1654

    
1655

    
1656
class LUQueryClusterInfo(NoHooksLU):
1657
  """Query cluster configuration.
1658

1659
  """
1660
  _OP_REQP = []
1661
  REQ_MASTER = False
1662

    
1663
  def CheckPrereq(self):
1664
    """No prerequsites needed for this LU.
1665

1666
    """
1667
    pass
1668

    
1669
  def Exec(self, feedback_fn):
1670
    """Return cluster config.
1671

1672
    """
1673
    result = {
1674
      "name": self.sstore.GetClusterName(),
1675
      "software_version": constants.RELEASE_VERSION,
1676
      "protocol_version": constants.PROTOCOL_VERSION,
1677
      "config_version": constants.CONFIG_VERSION,
1678
      "os_api_version": constants.OS_API_VERSION,
1679
      "export_version": constants.EXPORT_VERSION,
1680
      "master": self.sstore.GetMasterNode(),
1681
      "architecture": (platform.architecture()[0], platform.machine()),
1682
      }
1683

    
1684
    return result
1685

    
1686

    
1687
class LUClusterCopyFile(NoHooksLU):
1688
  """Copy file to cluster.
1689

1690
  """
1691
  _OP_REQP = ["nodes", "filename"]
1692

    
1693
  def CheckPrereq(self):
1694
    """Check prerequisites.
1695

1696
    It should check that the named file exists and that the given list
1697
    of nodes is valid.
1698

1699
    """
1700
    if not os.path.exists(self.op.filename):
1701
      raise errors.OpPrereqError("No such filename '%s'" % self.op.filename)
1702

    
1703
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1704

    
1705
  def Exec(self, feedback_fn):
1706
    """Copy a file from master to some nodes.
1707

1708
    Args:
1709
      opts - class with options as members
1710
      args - list containing a single element, the file name
1711
    Opts used:
1712
      nodes - list containing the name of target nodes; if empty, all nodes
1713

1714
    """
1715
    filename = self.op.filename
1716

    
1717
    myname = utils.HostInfo().name
1718

    
1719
    for node in self.nodes:
1720
      if node == myname:
1721
        continue
1722
      if not ssh.CopyFileToNode(node, filename):
1723
        logger.Error("Copy of file %s to node %s failed" % (filename, node))
1724

    
1725

    
1726
class LUDumpClusterConfig(NoHooksLU):
1727
  """Return a text-representation of the cluster-config.
1728

1729
  """
1730
  _OP_REQP = []
1731

    
1732
  def CheckPrereq(self):
1733
    """No prerequisites.
1734

1735
    """
1736
    pass
1737

    
1738
  def Exec(self, feedback_fn):
1739
    """Dump a representation of the cluster config to the standard output.
1740

1741
    """
1742
    return self.cfg.DumpConfig()
1743

    
1744

    
1745
class LURunClusterCommand(NoHooksLU):
1746
  """Run a command on some nodes.
1747

1748
  """
1749
  _OP_REQP = ["command", "nodes"]
1750

    
1751
  def CheckPrereq(self):
1752
    """Check prerequisites.
1753

1754
    It checks that the given list of nodes is valid.
1755

1756
    """
1757
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1758

    
1759
  def Exec(self, feedback_fn):
1760
    """Run a command on some nodes.
1761

1762
    """
1763
    data = []
1764
    for node in self.nodes:
1765
      result = ssh.SSHCall(node, "root", self.op.command)
1766
      data.append((node, result.output, result.exit_code))
1767

    
1768
    return data
1769

    
1770

    
1771
class LUActivateInstanceDisks(NoHooksLU):
1772
  """Bring up an instance's disks.
1773

1774
  """
1775
  _OP_REQP = ["instance_name"]
1776

    
1777
  def CheckPrereq(self):
1778
    """Check prerequisites.
1779

1780
    This checks that the instance is in the cluster.
1781

1782
    """
1783
    instance = self.cfg.GetInstanceInfo(
1784
      self.cfg.ExpandInstanceName(self.op.instance_name))
1785
    if instance is None:
1786
      raise errors.OpPrereqError("Instance '%s' not known" %
1787
                                 self.op.instance_name)
1788
    self.instance = instance
1789

    
1790

    
1791
  def Exec(self, feedback_fn):
1792
    """Activate the disks.
1793

1794
    """
1795
    disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
1796
    if not disks_ok:
1797
      raise errors.OpExecError("Cannot activate block devices")
1798

    
1799
    return disks_info
1800

    
1801

    
1802
def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
1803
  """Prepare the block devices for an instance.
1804

1805
  This sets up the block devices on all nodes.
1806

1807
  Args:
1808
    instance: a ganeti.objects.Instance object
1809
    ignore_secondaries: if true, errors on secondary nodes won't result
1810
                        in an error return from the function
1811

1812
  Returns:
1813
    false if the operation failed
1814
    list of (host, instance_visible_name, node_visible_name) if the operation
1815
         suceeded with the mapping from node devices to instance devices
1816
  """
1817
  device_info = []
1818
  disks_ok = True
1819
  for inst_disk in instance.disks:
1820
    master_result = None
1821
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1822
      cfg.SetDiskID(node_disk, node)
1823
      is_primary = node == instance.primary_node
1824
      result = rpc.call_blockdev_assemble(node, node_disk,
1825
                                          instance.name, is_primary)
1826
      if not result:
1827
        logger.Error("could not prepare block device %s on node %s (is_pri"
1828
                     "mary=%s)" % (inst_disk.iv_name, node, is_primary))
1829
        if is_primary or not ignore_secondaries:
1830
          disks_ok = False
1831
      if is_primary:
1832
        master_result = result
1833
    device_info.append((instance.primary_node, inst_disk.iv_name,
1834
                        master_result))
1835

    
1836
  # leave the disks configured for the primary node
1837
  # this is a workaround that would be fixed better by
1838
  # improving the logical/physical id handling
1839
  for disk in instance.disks:
1840
    cfg.SetDiskID(disk, instance.primary_node)
1841

    
1842
  return disks_ok, device_info
1843

    
1844

    
1845
def _StartInstanceDisks(cfg, instance, force):
1846
  """Start the disks of an instance.
1847

1848
  """
1849
  disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
1850
                                           ignore_secondaries=force)
1851
  if not disks_ok:
1852
    _ShutdownInstanceDisks(instance, cfg)
1853
    if force is not None and not force:
1854
      logger.Error("If the message above refers to a secondary node,"
1855
                   " you can retry the operation using '--force'.")
1856
    raise errors.OpExecError("Disk consistency error")
1857

    
1858

    
1859
class LUDeactivateInstanceDisks(NoHooksLU):
1860
  """Shutdown an instance's disks.
1861

1862
  """
1863
  _OP_REQP = ["instance_name"]
1864

    
1865
  def CheckPrereq(self):
1866
    """Check prerequisites.
1867

1868
    This checks that the instance is in the cluster.
1869

1870
    """
1871
    instance = self.cfg.GetInstanceInfo(
1872
      self.cfg.ExpandInstanceName(self.op.instance_name))
1873
    if instance is None:
1874
      raise errors.OpPrereqError("Instance '%s' not known" %
1875
                                 self.op.instance_name)
1876
    self.instance = instance
1877

    
1878
  def Exec(self, feedback_fn):
1879
    """Deactivate the disks
1880

1881
    """
1882
    instance = self.instance
1883
    ins_l = rpc.call_instance_list([instance.primary_node])
1884
    ins_l = ins_l[instance.primary_node]
1885
    if not type(ins_l) is list:
1886
      raise errors.OpExecError("Can't contact node '%s'" %
1887
                               instance.primary_node)
1888

    
1889
    if self.instance.name in ins_l:
1890
      raise errors.OpExecError("Instance is running, can't shutdown"
1891
                               " block devices.")
1892

    
1893
    _ShutdownInstanceDisks(instance, self.cfg)
1894

    
1895

    
1896
def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
1897
  """Shutdown block devices of an instance.
1898

1899
  This does the shutdown on all nodes of the instance.
1900

1901
  If the ignore_primary is false, errors on the primary node are
1902
  ignored.
1903

1904
  """
1905
  result = True
1906
  for disk in instance.disks:
1907
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
1908
      cfg.SetDiskID(top_disk, node)
1909
      if not rpc.call_blockdev_shutdown(node, top_disk):
1910
        logger.Error("could not shutdown block device %s on node %s" %
1911
                     (disk.iv_name, node))
1912
        if not ignore_primary or node != instance.primary_node:
1913
          result = False
1914
  return result
1915

    
1916

    
1917
class LUStartupInstance(LogicalUnit):
1918
  """Starts an instance.
1919

1920
  """
1921
  HPATH = "instance-start"
1922
  HTYPE = constants.HTYPE_INSTANCE
1923
  _OP_REQP = ["instance_name", "force"]
1924

    
1925
  def BuildHooksEnv(self):
1926
    """Build hooks env.
1927

1928
    This runs on master, primary and secondary nodes of the instance.
1929

1930
    """
1931
    env = {
1932
      "FORCE": self.op.force,
1933
      }
1934
    env.update(_BuildInstanceHookEnvByObject(self.instance))
1935
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
1936
          list(self.instance.secondary_nodes))
1937
    return env, nl, nl
1938

    
1939
  def CheckPrereq(self):
1940
    """Check prerequisites.
1941

1942
    This checks that the instance is in the cluster.
1943

1944
    """
1945
    instance = self.cfg.GetInstanceInfo(
1946
      self.cfg.ExpandInstanceName(self.op.instance_name))
1947
    if instance is None:
1948
      raise errors.OpPrereqError("Instance '%s' not known" %
1949
                                 self.op.instance_name)
1950

    
1951
    # check bridges existance
1952
    _CheckInstanceBridgesExist(instance)
1953

    
1954
    self.instance = instance
1955
    self.op.instance_name = instance.name
1956

    
1957
  def Exec(self, feedback_fn):
1958
    """Start the instance.
1959

1960
    """
1961
    instance = self.instance
1962
    force = self.op.force
1963
    extra_args = getattr(self.op, "extra_args", "")
1964

    
1965
    node_current = instance.primary_node
1966

    
1967
    nodeinfo = rpc.call_node_info([node_current], self.cfg.GetVGName())
1968
    if not nodeinfo:
1969
      raise errors.OpExecError("Could not contact node %s for infos" %
1970
                               (node_current))
1971

    
1972
    freememory = nodeinfo[node_current]['memory_free']
1973
    memory = instance.memory
1974
    if memory > freememory:
1975
      raise errors.OpExecError("Not enough memory to start instance"
1976
                               " %s on node %s"
1977
                               " needed %s MiB, available %s MiB" %
1978
                               (instance.name, node_current, memory,
1979
                                freememory))
1980

    
1981
    _StartInstanceDisks(self.cfg, instance, force)
1982

    
1983
    if not rpc.call_instance_start(node_current, instance, extra_args):
1984
      _ShutdownInstanceDisks(instance, self.cfg)
1985
      raise errors.OpExecError("Could not start instance")
1986

    
1987
    self.cfg.MarkInstanceUp(instance.name)
1988

    
1989

    
1990
class LURebootInstance(LogicalUnit):
1991
  """Reboot an instance.
1992

1993
  """
1994
  HPATH = "instance-reboot"
1995
  HTYPE = constants.HTYPE_INSTANCE
1996
  _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
1997

    
1998
  def BuildHooksEnv(self):
1999
    """Build hooks env.
2000

2001
    This runs on master, primary and secondary nodes of the instance.
2002

2003
    """
2004
    env = {
2005
      "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2006
      }
2007
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2008
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2009
          list(self.instance.secondary_nodes))
2010
    return env, nl, nl
2011

    
2012
  def CheckPrereq(self):
2013
    """Check prerequisites.
2014

2015
    This checks that the instance is in the cluster.
2016

2017
    """
2018
    instance = self.cfg.GetInstanceInfo(
2019
      self.cfg.ExpandInstanceName(self.op.instance_name))
2020
    if instance is None:
2021
      raise errors.OpPrereqError("Instance '%s' not known" %
2022
                                 self.op.instance_name)
2023

    
2024
    # check bridges existance
2025
    _CheckInstanceBridgesExist(instance)
2026

    
2027
    self.instance = instance
2028
    self.op.instance_name = instance.name
2029

    
2030
  def Exec(self, feedback_fn):
2031
    """Reboot the instance.
2032

2033
    """
2034
    instance = self.instance
2035
    ignore_secondaries = self.op.ignore_secondaries
2036
    reboot_type = self.op.reboot_type
2037
    extra_args = getattr(self.op, "extra_args", "")
2038

    
2039
    node_current = instance.primary_node
2040

    
2041
    if reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2042
                           constants.INSTANCE_REBOOT_HARD,
2043
                           constants.INSTANCE_REBOOT_FULL]:
2044
      raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2045
                                  (constants.INSTANCE_REBOOT_SOFT,
2046
                                   constants.INSTANCE_REBOOT_HARD,
2047
                                   constants.INSTANCE_REBOOT_FULL))
2048

    
2049
    if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2050
                       constants.INSTANCE_REBOOT_HARD]:
2051
      if not rpc.call_instance_reboot(node_current, instance,
2052
                                      reboot_type, extra_args):
2053
        raise errors.OpExecError("Could not reboot instance")
2054
    else:
2055
      if not rpc.call_instance_shutdown(node_current, instance):
2056
        raise errors.OpExecError("could not shutdown instance for full reboot")
2057
      _ShutdownInstanceDisks(instance, self.cfg)
2058
      _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
2059
      if not rpc.call_instance_start(node_current, instance, extra_args):
2060
        _ShutdownInstanceDisks(instance, self.cfg)
2061
        raise errors.OpExecError("Could not start instance for full reboot")
2062

    
2063
    self.cfg.MarkInstanceUp(instance.name)
2064

    
2065

    
2066
class LUShutdownInstance(LogicalUnit):
2067
  """Shutdown an instance.
2068

2069
  """
2070
  HPATH = "instance-stop"
2071
  HTYPE = constants.HTYPE_INSTANCE
2072
  _OP_REQP = ["instance_name"]
2073

    
2074
  def BuildHooksEnv(self):
2075
    """Build hooks env.
2076

2077
    This runs on master, primary and secondary nodes of the instance.
2078

2079
    """
2080
    env = _BuildInstanceHookEnvByObject(self.instance)
2081
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2082
          list(self.instance.secondary_nodes))
2083
    return env, nl, nl
2084

    
2085
  def CheckPrereq(self):
2086
    """Check prerequisites.
2087

2088
    This checks that the instance is in the cluster.
2089

2090
    """
2091
    instance = self.cfg.GetInstanceInfo(
2092
      self.cfg.ExpandInstanceName(self.op.instance_name))
2093
    if instance is None:
2094
      raise errors.OpPrereqError("Instance '%s' not known" %
2095
                                 self.op.instance_name)
2096
    self.instance = instance
2097

    
2098
  def Exec(self, feedback_fn):
2099
    """Shutdown the instance.
2100

2101
    """
2102
    instance = self.instance
2103
    node_current = instance.primary_node
2104
    if not rpc.call_instance_shutdown(node_current, instance):
2105
      logger.Error("could not shutdown instance")
2106

    
2107
    self.cfg.MarkInstanceDown(instance.name)
2108
    _ShutdownInstanceDisks(instance, self.cfg)
2109

    
2110

    
2111
class LUReinstallInstance(LogicalUnit):
2112
  """Reinstall an instance.
2113

2114
  """
2115
  HPATH = "instance-reinstall"
2116
  HTYPE = constants.HTYPE_INSTANCE
2117
  _OP_REQP = ["instance_name"]
2118

    
2119
  def BuildHooksEnv(self):
2120
    """Build hooks env.
2121

2122
    This runs on master, primary and secondary nodes of the instance.
2123

2124
    """
2125
    env = _BuildInstanceHookEnvByObject(self.instance)
2126
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2127
          list(self.instance.secondary_nodes))
2128
    return env, nl, nl
2129

    
2130
  def CheckPrereq(self):
2131
    """Check prerequisites.
2132

2133
    This checks that the instance is in the cluster and is not running.
2134

2135
    """
2136
    instance = self.cfg.GetInstanceInfo(
2137
      self.cfg.ExpandInstanceName(self.op.instance_name))
2138
    if instance is None:
2139
      raise errors.OpPrereqError("Instance '%s' not known" %
2140
                                 self.op.instance_name)
2141
    if instance.disk_template == constants.DT_DISKLESS:
2142
      raise errors.OpPrereqError("Instance '%s' has no disks" %
2143
                                 self.op.instance_name)
2144
    if instance.status != "down":
2145
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2146
                                 self.op.instance_name)
2147
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2148
    if remote_info:
2149
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2150
                                 (self.op.instance_name,
2151
                                  instance.primary_node))
2152

    
2153
    self.op.os_type = getattr(self.op, "os_type", None)
2154
    if self.op.os_type is not None:
2155
      # OS verification
2156
      pnode = self.cfg.GetNodeInfo(
2157
        self.cfg.ExpandNodeName(instance.primary_node))
2158
      if pnode is None:
2159
        raise errors.OpPrereqError("Primary node '%s' is unknown" %
2160
                                   self.op.pnode)
2161
      os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2162
      if not isinstance(os_obj, objects.OS):
2163
        raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2164
                                   " primary node"  % self.op.os_type)
2165

    
2166
    self.instance = instance
2167

    
2168
  def Exec(self, feedback_fn):
2169
    """Reinstall the instance.
2170

2171
    """
2172
    inst = self.instance
2173

    
2174
    if self.op.os_type is not None:
2175
      feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2176
      inst.os = self.op.os_type
2177
      self.cfg.AddInstance(inst)
2178

    
2179
    _StartInstanceDisks(self.cfg, inst, None)
2180
    try:
2181
      feedback_fn("Running the instance OS create scripts...")
2182
      if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2183
        raise errors.OpExecError("Could not install OS for instance %s "
2184
                                 "on node %s" %
2185
                                 (inst.name, inst.primary_node))
2186
    finally:
2187
      _ShutdownInstanceDisks(inst, self.cfg)
2188

    
2189

    
2190
class LURenameInstance(LogicalUnit):
2191
  """Rename an instance.
2192

2193
  """
2194
  HPATH = "instance-rename"
2195
  HTYPE = constants.HTYPE_INSTANCE
2196
  _OP_REQP = ["instance_name", "new_name"]
2197

    
2198
  def BuildHooksEnv(self):
2199
    """Build hooks env.
2200

2201
    This runs on master, primary and secondary nodes of the instance.
2202

2203
    """
2204
    env = _BuildInstanceHookEnvByObject(self.instance)
2205
    env["INSTANCE_NEW_NAME"] = self.op.new_name
2206
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2207
          list(self.instance.secondary_nodes))
2208
    return env, nl, nl
2209

    
2210
  def CheckPrereq(self):
2211
    """Check prerequisites.
2212

2213
    This checks that the instance is in the cluster and is not running.
2214

2215
    """
2216
    instance = self.cfg.GetInstanceInfo(
2217
      self.cfg.ExpandInstanceName(self.op.instance_name))
2218
    if instance is None:
2219
      raise errors.OpPrereqError("Instance '%s' not known" %
2220
                                 self.op.instance_name)
2221
    if instance.status != "down":
2222
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2223
                                 self.op.instance_name)
2224
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2225
    if remote_info:
2226
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2227
                                 (self.op.instance_name,
2228
                                  instance.primary_node))
2229
    self.instance = instance
2230

    
2231
    # new name verification
2232
    name_info = utils.HostInfo(self.op.new_name)
2233

    
2234
    self.op.new_name = new_name = name_info.name
2235
    if not getattr(self.op, "ignore_ip", False):
2236
      command = ["fping", "-q", name_info.ip]
2237
      result = utils.RunCmd(command)
2238
      if not result.failed:
2239
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
2240
                                   (name_info.ip, new_name))
2241

    
2242

    
2243
  def Exec(self, feedback_fn):
2244
    """Reinstall the instance.
2245

2246
    """
2247
    inst = self.instance
2248
    old_name = inst.name
2249

    
2250
    self.cfg.RenameInstance(inst.name, self.op.new_name)
2251

    
2252
    # re-read the instance from the configuration after rename
2253
    inst = self.cfg.GetInstanceInfo(self.op.new_name)
2254

    
2255
    _StartInstanceDisks(self.cfg, inst, None)
2256
    try:
2257
      if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2258
                                          "sda", "sdb"):
2259
        msg = ("Could run OS rename script for instance %s\n"
2260
               "on node %s\n"
2261
               "(but the instance has been renamed in Ganeti)" %
2262
               (inst.name, inst.primary_node))
2263
        logger.Error(msg)
2264
    finally:
2265
      _ShutdownInstanceDisks(inst, self.cfg)
2266

    
2267

    
2268
class LURemoveInstance(LogicalUnit):
2269
  """Remove an instance.
2270

2271
  """
2272
  HPATH = "instance-remove"
2273
  HTYPE = constants.HTYPE_INSTANCE
2274
  _OP_REQP = ["instance_name"]
2275

    
2276
  def BuildHooksEnv(self):
2277
    """Build hooks env.
2278

2279
    This runs on master, primary and secondary nodes of the instance.
2280

2281
    """
2282
    env = _BuildInstanceHookEnvByObject(self.instance)
2283
    nl = [self.sstore.GetMasterNode()]
2284
    return env, nl, nl
2285

    
2286
  def CheckPrereq(self):
2287
    """Check prerequisites.
2288

2289
    This checks that the instance is in the cluster.
2290

2291
    """
2292
    instance = self.cfg.GetInstanceInfo(
2293
      self.cfg.ExpandInstanceName(self.op.instance_name))
2294
    if instance is None:
2295
      raise errors.OpPrereqError("Instance '%s' not known" %
2296
                                 self.op.instance_name)
2297
    self.instance = instance
2298

    
2299
  def Exec(self, feedback_fn):
2300
    """Remove the instance.
2301

2302
    """
2303
    instance = self.instance
2304
    logger.Info("shutting down instance %s on node %s" %
2305
                (instance.name, instance.primary_node))
2306

    
2307
    if not rpc.call_instance_shutdown(instance.primary_node, instance):
2308
      if self.op.ignore_failures:
2309
        feedback_fn("Warning: can't shutdown instance")
2310
      else:
2311
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2312
                                 (instance.name, instance.primary_node))
2313

    
2314
    logger.Info("removing block devices for instance %s" % instance.name)
2315

    
2316
    if not _RemoveDisks(instance, self.cfg):
2317
      if self.op.ignore_failures:
2318
        feedback_fn("Warning: can't remove instance's disks")
2319
      else:
2320
        raise errors.OpExecError("Can't remove instance's disks")
2321

    
2322
    logger.Info("removing instance %s out of cluster config" % instance.name)
2323

    
2324
    self.cfg.RemoveInstance(instance.name)
2325

    
2326

    
2327
class LUQueryInstances(NoHooksLU):
2328
  """Logical unit for querying instances.
2329

2330
  """
2331
  _OP_REQP = ["output_fields", "names"]
2332

    
2333
  def CheckPrereq(self):
2334
    """Check prerequisites.
2335

2336
    This checks that the fields required are valid output fields.
2337

2338
    """
2339
    self.dynamic_fields = frozenset(["oper_state", "oper_ram"])
2340
    _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2341
                               "admin_state", "admin_ram",
2342
                               "disk_template", "ip", "mac", "bridge",
2343
                               "sda_size", "sdb_size"],
2344
                       dynamic=self.dynamic_fields,
2345
                       selected=self.op.output_fields)
2346

    
2347
    self.wanted = _GetWantedInstances(self, self.op.names)
2348

    
2349
  def Exec(self, feedback_fn):
2350
    """Computes the list of nodes and their attributes.
2351

2352
    """
2353
    instance_names = self.wanted
2354
    instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2355
                     in instance_names]
2356

    
2357
    # begin data gathering
2358

    
2359
    nodes = frozenset([inst.primary_node for inst in instance_list])
2360

    
2361
    bad_nodes = []
2362
    if self.dynamic_fields.intersection(self.op.output_fields):
2363
      live_data = {}
2364
      node_data = rpc.call_all_instances_info(nodes)
2365
      for name in nodes:
2366
        result = node_data[name]
2367
        if result:
2368
          live_data.update(result)
2369
        elif result == False:
2370
          bad_nodes.append(name)
2371
        # else no instance is alive
2372
    else:
2373
      live_data = dict([(name, {}) for name in instance_names])
2374

    
2375
    # end data gathering
2376

    
2377
    output = []
2378
    for instance in instance_list:
2379
      iout = []
2380
      for field in self.op.output_fields:
2381
        if field == "name":
2382
          val = instance.name
2383
        elif field == "os":
2384
          val = instance.os
2385
        elif field == "pnode":
2386
          val = instance.primary_node
2387
        elif field == "snodes":
2388
          val = list(instance.secondary_nodes)
2389
        elif field == "admin_state":
2390
          val = (instance.status != "down")
2391
        elif field == "oper_state":
2392
          if instance.primary_node in bad_nodes:
2393
            val = None
2394
          else:
2395
            val = bool(live_data.get(instance.name))
2396
        elif field == "admin_ram":
2397
          val = instance.memory
2398
        elif field == "oper_ram":
2399
          if instance.primary_node in bad_nodes:
2400
            val = None
2401
          elif instance.name in live_data:
2402
            val = live_data[instance.name].get("memory", "?")
2403
          else:
2404
            val = "-"
2405
        elif field == "disk_template":
2406
          val = instance.disk_template
2407
        elif field == "ip":
2408
          val = instance.nics[0].ip
2409
        elif field == "bridge":
2410
          val = instance.nics[0].bridge
2411
        elif field == "mac":
2412
          val = instance.nics[0].mac
2413
        elif field == "sda_size" or field == "sdb_size":
2414
          disk = instance.FindDisk(field[:3])
2415
          if disk is None:
2416
            val = None
2417
          else:
2418
            val = disk.size
2419
        else:
2420
          raise errors.ParameterError(field)
2421
        iout.append(val)
2422
      output.append(iout)
2423

    
2424
    return output
2425

    
2426

    
2427
class LUFailoverInstance(LogicalUnit):
2428
  """Failover an instance.
2429

2430
  """
2431
  HPATH = "instance-failover"
2432
  HTYPE = constants.HTYPE_INSTANCE
2433
  _OP_REQP = ["instance_name", "ignore_consistency"]
2434

    
2435
  def BuildHooksEnv(self):
2436
    """Build hooks env.
2437

2438
    This runs on master, primary and secondary nodes of the instance.
2439

2440
    """
2441
    env = {
2442
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2443
      }
2444
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2445
    nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2446
    return env, nl, nl
2447

    
2448
  def CheckPrereq(self):
2449
    """Check prerequisites.
2450

2451
    This checks that the instance is in the cluster.
2452

2453
    """
2454
    instance = self.cfg.GetInstanceInfo(
2455
      self.cfg.ExpandInstanceName(self.op.instance_name))
2456
    if instance is None:
2457
      raise errors.OpPrereqError("Instance '%s' not known" %
2458
                                 self.op.instance_name)
2459

    
2460
    if instance.disk_template not in constants.DTS_NET_MIRROR:
2461
      raise errors.OpPrereqError("Instance's disk layout is not"
2462
                                 " network mirrored, cannot failover.")
2463

    
2464
    secondary_nodes = instance.secondary_nodes
2465
    if not secondary_nodes:
2466
      raise errors.ProgrammerError("no secondary node but using "
2467
                                   "DT_REMOTE_RAID1 template")
2468

    
2469
    # check memory requirements on the secondary node
2470
    target_node = secondary_nodes[0]
2471
    nodeinfo = rpc.call_node_info([target_node], self.cfg.GetVGName())
2472
    info = nodeinfo.get(target_node, None)
2473
    if not info:
2474
      raise errors.OpPrereqError("Cannot get current information"
2475
                                 " from node '%s'" % nodeinfo)
2476
    if instance.memory > info['memory_free']:
2477
      raise errors.OpPrereqError("Not enough memory on target node %s."
2478
                                 " %d MB available, %d MB required" %
2479
                                 (target_node, info['memory_free'],
2480
                                  instance.memory))
2481

    
2482
    # check bridge existance
2483
    brlist = [nic.bridge for nic in instance.nics]
2484
    if not rpc.call_bridges_exist(target_node, brlist):
2485
      raise errors.OpPrereqError("One or more target bridges %s does not"
2486
                                 " exist on destination node '%s'" %
2487
                                 (brlist, target_node))
2488

    
2489
    self.instance = instance
2490

    
2491
  def Exec(self, feedback_fn):
2492
    """Failover an instance.
2493

2494
    The failover is done by shutting it down on its present node and
2495
    starting it on the secondary.
2496

2497
    """
2498
    instance = self.instance
2499

    
2500
    source_node = instance.primary_node
2501
    target_node = instance.secondary_nodes[0]
2502

    
2503
    feedback_fn("* checking disk consistency between source and target")
2504
    for dev in instance.disks:
2505
      # for remote_raid1, these are md over drbd
2506
      if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2507
        if not self.op.ignore_consistency:
2508
          raise errors.OpExecError("Disk %s is degraded on target node,"
2509
                                   " aborting failover." % dev.iv_name)
2510

    
2511
    feedback_fn("* checking target node resource availability")
2512
    nodeinfo = rpc.call_node_info([target_node], self.cfg.GetVGName())
2513

    
2514
    if not nodeinfo:
2515
      raise errors.OpExecError("Could not contact target node %s." %
2516
                               target_node)
2517

    
2518
    free_memory = int(nodeinfo[target_node]['memory_free'])
2519
    memory = instance.memory
2520
    if memory > free_memory:
2521
      raise errors.OpExecError("Not enough memory to create instance %s on"
2522
                               " node %s. needed %s MiB, available %s MiB" %
2523
                               (instance.name, target_node, memory,
2524
                                free_memory))
2525

    
2526
    feedback_fn("* shutting down instance on source node")
2527
    logger.Info("Shutting down instance %s on node %s" %
2528
                (instance.name, source_node))
2529

    
2530
    if not rpc.call_instance_shutdown(source_node, instance):
2531
      if self.op.ignore_consistency:
2532
        logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2533
                     " anyway. Please make sure node %s is down"  %
2534
                     (instance.name, source_node, source_node))
2535
      else:
2536
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2537
                                 (instance.name, source_node))
2538

    
2539
    feedback_fn("* deactivating the instance's disks on source node")
2540
    if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2541
      raise errors.OpExecError("Can't shut down the instance's disks.")
2542

    
2543
    instance.primary_node = target_node
2544
    # distribute new instance config to the other nodes
2545
    self.cfg.AddInstance(instance)
2546

    
2547
    feedback_fn("* activating the instance's disks on target node")
2548
    logger.Info("Starting instance %s on node %s" %
2549
                (instance.name, target_node))
2550

    
2551
    disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2552
                                             ignore_secondaries=True)
2553
    if not disks_ok:
2554
      _ShutdownInstanceDisks(instance, self.cfg)
2555
      raise errors.OpExecError("Can't activate the instance's disks")
2556

    
2557
    feedback_fn("* starting the instance on the target node")
2558
    if not rpc.call_instance_start(target_node, instance, None):
2559
      _ShutdownInstanceDisks(instance, self.cfg)
2560
      raise errors.OpExecError("Could not start instance %s on node %s." %
2561
                               (instance.name, target_node))
2562

    
2563

    
2564
def _CreateBlockDevOnPrimary(cfg, node, instance, device, info):
2565
  """Create a tree of block devices on the primary node.
2566

2567
  This always creates all devices.
2568

2569
  """
2570
  if device.children:
2571
    for child in device.children:
2572
      if not _CreateBlockDevOnPrimary(cfg, node, instance, child, info):
2573
        return False
2574

    
2575
  cfg.SetDiskID(device, node)
2576
  new_id = rpc.call_blockdev_create(node, device, device.size,
2577
                                    instance.name, True, info)
2578
  if not new_id:
2579
    return False
2580
  if device.physical_id is None:
2581
    device.physical_id = new_id
2582
  return True
2583

    
2584

    
2585
def _CreateBlockDevOnSecondary(cfg, node, instance, device, force, info):
2586
  """Create a tree of block devices on a secondary node.
2587

2588
  If this device type has to be created on secondaries, create it and
2589
  all its children.
2590

2591
  If not, just recurse to children keeping the same 'force' value.
2592

2593
  """
2594
  if device.CreateOnSecondary():
2595
    force = True
2596
  if device.children:
2597
    for child in device.children:
2598
      if not _CreateBlockDevOnSecondary(cfg, node, instance,
2599
                                        child, force, info):
2600
        return False
2601

    
2602
  if not force:
2603
    return True
2604
  cfg.SetDiskID(device, node)
2605
  new_id = rpc.call_blockdev_create(node, device, device.size,
2606
                                    instance.name, False, info)
2607
  if not new_id:
2608
    return False
2609
  if device.physical_id is None:
2610
    device.physical_id = new_id
2611
  return True
2612

    
2613

    
2614
def _GenerateUniqueNames(cfg, exts):
2615
  """Generate a suitable LV name.
2616

2617
  This will generate a logical volume name for the given instance.
2618

2619
  """
2620
  results = []
2621
  for val in exts:
2622
    new_id = cfg.GenerateUniqueID()
2623
    results.append("%s%s" % (new_id, val))
2624
  return results
2625

    
2626

    
2627
def _GenerateMDDRBDBranch(cfg, primary, secondary, size, names):
2628
  """Generate a drbd device complete with its children.
2629

2630
  """
2631
  port = cfg.AllocatePort()
2632
  vgname = cfg.GetVGName()
2633
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2634
                          logical_id=(vgname, names[0]))
2635
  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2636
                          logical_id=(vgname, names[1]))
2637
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD7, size=size,
2638
                          logical_id = (primary, secondary, port),
2639
                          children = [dev_data, dev_meta])
2640
  return drbd_dev
2641

    
2642

    
2643
def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name):
2644
  """Generate a drbd8 device complete with its children.
2645

2646
  """
2647
  port = cfg.AllocatePort()
2648
  vgname = cfg.GetVGName()
2649
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2650
                          logical_id=(vgname, names[0]))
2651
  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2652
                          logical_id=(vgname, names[1]))
2653
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
2654
                          logical_id = (primary, secondary, port),
2655
                          children = [dev_data, dev_meta],
2656
                          iv_name=iv_name)
2657
  return drbd_dev
2658

    
2659
def _GenerateDiskTemplate(cfg, template_name,
2660
                          instance_name, primary_node,
2661
                          secondary_nodes, disk_sz, swap_sz):
2662
  """Generate the entire disk layout for a given template type.
2663

2664
  """
2665
  #TODO: compute space requirements
2666

    
2667
  vgname = cfg.GetVGName()
2668
  if template_name == "diskless":
2669
    disks = []
2670
  elif template_name == "plain":
2671
    if len(secondary_nodes) != 0:
2672
      raise errors.ProgrammerError("Wrong template configuration")
2673

    
2674
    names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
2675
    sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2676
                           logical_id=(vgname, names[0]),
2677
                           iv_name = "sda")
2678
    sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2679
                           logical_id=(vgname, names[1]),
2680
                           iv_name = "sdb")
2681
    disks = [sda_dev, sdb_dev]
2682
  elif template_name == "local_raid1":
2683
    if len(secondary_nodes) != 0:
2684
      raise errors.ProgrammerError("Wrong template configuration")
2685

    
2686

    
2687
    names = _GenerateUniqueNames(cfg, [".sda_m1", ".sda_m2",
2688
                                       ".sdb_m1", ".sdb_m2"])
2689
    sda_dev_m1 = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2690
                              logical_id=(vgname, names[0]))
2691
    sda_dev_m2 = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2692
                              logical_id=(vgname, names[1]))
2693
    md_sda_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name = "sda",
2694
                              size=disk_sz,
2695
                              children = [sda_dev_m1, sda_dev_m2])
2696
    sdb_dev_m1 = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2697
                              logical_id=(vgname, names[2]))
2698
    sdb_dev_m2 = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2699
                              logical_id=(vgname, names[3]))
2700
    md_sdb_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name = "sdb",
2701
                              size=swap_sz,
2702
                              children = [sdb_dev_m1, sdb_dev_m2])
2703
    disks = [md_sda_dev, md_sdb_dev]
2704
  elif template_name == constants.DT_REMOTE_RAID1:
2705
    if len(secondary_nodes) != 1:
2706
      raise errors.ProgrammerError("Wrong template configuration")
2707
    remote_node = secondary_nodes[0]
2708
    names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2709
                                       ".sdb_data", ".sdb_meta"])
2710
    drbd_sda_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2711
                                         disk_sz, names[0:2])
2712
    md_sda_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name="sda",
2713
                              children = [drbd_sda_dev], size=disk_sz)
2714
    drbd_sdb_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2715
                                         swap_sz, names[2:4])
2716
    md_sdb_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name="sdb",
2717
                              children = [drbd_sdb_dev], size=swap_sz)
2718
    disks = [md_sda_dev, md_sdb_dev]
2719
  elif template_name == constants.DT_DRBD8:
2720
    if len(secondary_nodes) != 1:
2721
      raise errors.ProgrammerError("Wrong template configuration")
2722
    remote_node = secondary_nodes[0]
2723
    names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2724
                                       ".sdb_data", ".sdb_meta"])
2725
    drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2726
                                         disk_sz, names[0:2], "sda")
2727
    drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2728
                                         swap_sz, names[2:4], "sdb")
2729
    disks = [drbd_sda_dev, drbd_sdb_dev]
2730
  else:
2731
    raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
2732
  return disks
2733

    
2734

    
2735
def _GetInstanceInfoText(instance):
2736
  """Compute that text that should be added to the disk's metadata.
2737

2738
  """
2739
  return "originstname+%s" % instance.name
2740

    
2741

    
2742
def _CreateDisks(cfg, instance):
2743
  """Create all disks for an instance.
2744

2745
  This abstracts away some work from AddInstance.
2746

2747
  Args:
2748
    instance: the instance object
2749

2750
  Returns:
2751
    True or False showing the success of the creation process
2752

2753
  """
2754
  info = _GetInstanceInfoText(instance)
2755

    
2756
  for device in instance.disks:
2757
    logger.Info("creating volume %s for instance %s" %
2758
              (device.iv_name, instance.name))
2759
    #HARDCODE
2760
    for secondary_node in instance.secondary_nodes:
2761
      if not _CreateBlockDevOnSecondary(cfg, secondary_node, instance,
2762
                                        device, False, info):
2763
        logger.Error("failed to create volume %s (%s) on secondary node %s!" %
2764
                     (device.iv_name, device, secondary_node))
2765
        return False
2766
    #HARDCODE
2767
    if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
2768
                                    instance, device, info):
2769
      logger.Error("failed to create volume %s on primary!" %
2770
                   device.iv_name)
2771
      return False
2772
  return True
2773

    
2774

    
2775
def _RemoveDisks(instance, cfg):
2776
  """Remove all disks for an instance.
2777

2778
  This abstracts away some work from `AddInstance()` and
2779
  `RemoveInstance()`. Note that in case some of the devices couldn't
2780
  be removed, the removal will continue with the other ones (compare
2781
  with `_CreateDisks()`).
2782

2783
  Args:
2784
    instance: the instance object
2785

2786
  Returns:
2787
    True or False showing the success of the removal proces
2788

2789
  """
2790
  logger.Info("removing block devices for instance %s" % instance.name)
2791

    
2792
  result = True
2793
  for device in instance.disks:
2794
    for node, disk in device.ComputeNodeTree(instance.primary_node):
2795
      cfg.SetDiskID(disk, node)
2796
      if not rpc.call_blockdev_remove(node, disk):
2797
        logger.Error("could not remove block device %s on node %s,"
2798
                     " continuing anyway" %
2799
                     (device.iv_name, node))
2800
        result = False
2801
  return result
2802

    
2803

    
2804
class LUCreateInstance(LogicalUnit):
2805
  """Create an instance.
2806

2807
  """
2808
  HPATH = "instance-add"
2809
  HTYPE = constants.HTYPE_INSTANCE
2810
  _OP_REQP = ["instance_name", "mem_size", "disk_size", "pnode",
2811
              "disk_template", "swap_size", "mode", "start", "vcpus",
2812
              "wait_for_sync", "ip_check"]
2813

    
2814
  def BuildHooksEnv(self):
2815
    """Build hooks env.
2816

2817
    This runs on master, primary and secondary nodes of the instance.
2818

2819
    """
2820
    env = {
2821
      "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
2822
      "INSTANCE_DISK_SIZE": self.op.disk_size,
2823
      "INSTANCE_SWAP_SIZE": self.op.swap_size,
2824
      "INSTANCE_ADD_MODE": self.op.mode,
2825
      }
2826
    if self.op.mode == constants.INSTANCE_IMPORT:
2827
      env["INSTANCE_SRC_NODE"] = self.op.src_node
2828
      env["INSTANCE_SRC_PATH"] = self.op.src_path
2829
      env["INSTANCE_SRC_IMAGE"] = self.src_image
2830

    
2831
    env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
2832
      primary_node=self.op.pnode,
2833
      secondary_nodes=self.secondaries,
2834
      status=self.instance_status,
2835
      os_type=self.op.os_type,
2836
      memory=self.op.mem_size,
2837
      vcpus=self.op.vcpus,
2838
      nics=[(self.inst_ip, self.op.bridge)],
2839
    ))
2840

    
2841
    nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
2842
          self.secondaries)
2843
    return env, nl, nl
2844

    
2845

    
2846
  def CheckPrereq(self):
2847
    """Check prerequisites.
2848

2849
    """
2850
    if self.op.mode not in (constants.INSTANCE_CREATE,
2851
                            constants.INSTANCE_IMPORT):
2852
      raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
2853
                                 self.op.mode)
2854

    
2855
    if self.op.mode == constants.INSTANCE_IMPORT:
2856
      src_node = getattr(self.op, "src_node", None)
2857
      src_path = getattr(self.op, "src_path", None)
2858
      if src_node is None or src_path is None:
2859
        raise errors.OpPrereqError("Importing an instance requires source"
2860
                                   " node and path options")
2861
      src_node_full = self.cfg.ExpandNodeName(src_node)
2862
      if src_node_full is None:
2863
        raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
2864
      self.op.src_node = src_node = src_node_full
2865

    
2866
      if not os.path.isabs(src_path):
2867
        raise errors.OpPrereqError("The source path must be absolute")
2868

    
2869
      export_info = rpc.call_export_info(src_node, src_path)
2870

    
2871
      if not export_info:
2872
        raise errors.OpPrereqError("No export found in dir %s" % src_path)
2873

    
2874
      if not export_info.has_section(constants.INISECT_EXP):
2875
        raise errors.ProgrammerError("Corrupted export config")
2876

    
2877
      ei_version = export_info.get(constants.INISECT_EXP, 'version')
2878
      if (int(ei_version) != constants.EXPORT_VERSION):
2879
        raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
2880
                                   (ei_version, constants.EXPORT_VERSION))
2881

    
2882
      if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
2883
        raise errors.OpPrereqError("Can't import instance with more than"
2884
                                   " one data disk")
2885

    
2886
      # FIXME: are the old os-es, disk sizes, etc. useful?
2887
      self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
2888
      diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
2889
                                                         'disk0_dump'))
2890
      self.src_image = diskimage
2891
    else: # INSTANCE_CREATE
2892
      if getattr(self.op, "os_type", None) is None:
2893
        raise errors.OpPrereqError("No guest OS specified")
2894

    
2895
    # check primary node
2896
    pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
2897
    if pnode is None:
2898
      raise errors.OpPrereqError("Primary node '%s' is unknown" %
2899
                                 self.op.pnode)
2900
    self.op.pnode = pnode.name
2901
    self.pnode = pnode
2902
    self.secondaries = []
2903
    # disk template and mirror node verification
2904
    if self.op.disk_template not in constants.DISK_TEMPLATES:
2905
      raise errors.OpPrereqError("Invalid disk template name")
2906

    
2907
    if self.op.disk_template in constants.DTS_NET_MIRROR:
2908
      if getattr(self.op, "snode", None) is None:
2909
        raise errors.OpPrereqError("The networked disk templates need"
2910
                                   " a mirror node")
2911

    
2912
      snode_name = self.cfg.ExpandNodeName(self.op.snode)
2913
      if snode_name is None:
2914
        raise errors.OpPrereqError("Unknown secondary node '%s'" %
2915
                                   self.op.snode)
2916
      elif snode_name == pnode.name:
2917
        raise errors.OpPrereqError("The secondary node cannot be"
2918
                                   " the primary node.")
2919
      self.secondaries.append(snode_name)
2920

    
2921
    # Check lv size requirements
2922
    nodenames = [pnode.name] + self.secondaries
2923
    nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
2924

    
2925
    # Required free disk space as a function of disk and swap space
2926
    req_size_dict = {
2927
      constants.DT_DISKLESS: 0,
2928
      constants.DT_PLAIN: self.op.disk_size + self.op.swap_size,
2929
      constants.DT_LOCAL_RAID1: (self.op.disk_size + self.op.swap_size) * 2,
2930
      # 256 MB are added for drbd metadata, 128MB for each drbd device
2931
      constants.DT_REMOTE_RAID1: self.op.disk_size + self.op.swap_size + 256,
2932
      constants.DT_DRBD8: self.op.disk_size + self.op.swap_size + 256,
2933
    }
2934

    
2935
    if self.op.disk_template not in req_size_dict:
2936
      raise errors.ProgrammerError("Disk template '%s' size requirement"
2937
                                   " is unknown" %  self.op.disk_template)
2938

    
2939
    req_size = req_size_dict[self.op.disk_template]
2940

    
2941
    for node in nodenames:
2942
      info = nodeinfo.get(node, None)
2943
      if not info:
2944
        raise errors.OpPrereqError("Cannot get current information"
2945
                                   " from node '%s'" % nodeinfo)
2946
      if req_size > info['vg_free']:
2947
        raise errors.OpPrereqError("Not enough disk space on target node %s."
2948
                                   " %d MB available, %d MB required" %
2949
                                   (node, info['vg_free'], req_size))
2950

    
2951
    # os verification
2952
    os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2953
    if not isinstance(os_obj, objects.OS):
2954
      raise errors.OpPrereqError("OS '%s' not in supported os list for"
2955
                                 " primary node"  % self.op.os_type)
2956

    
2957
    # instance verification
2958
    hostname1 = utils.HostInfo(self.op.instance_name)
2959

    
2960
    self.op.instance_name = instance_name = hostname1.name
2961
    instance_list = self.cfg.GetInstanceList()
2962
    if instance_name in instance_list:
2963
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2964
                                 instance_name)
2965

    
2966
    ip = getattr(self.op, "ip", None)
2967
    if ip is None or ip.lower() == "none":
2968
      inst_ip = None
2969
    elif ip.lower() == "auto":
2970
      inst_ip = hostname1.ip
2971
    else:
2972
      if not utils.IsValidIP(ip):
2973
        raise errors.OpPrereqError("given IP address '%s' doesn't look"
2974
                                   " like a valid IP" % ip)
2975
      inst_ip = ip
2976
    self.inst_ip = inst_ip
2977

    
2978
    if self.op.start and not self.op.ip_check:
2979
      raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
2980
                                 " adding an instance in start mode")
2981

    
2982
    if self.op.ip_check:
2983
      if utils.TcpPing(utils.HostInfo().name, hostname1.ip,
2984
                       constants.DEFAULT_NODED_PORT):
2985
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
2986
                                   (hostname1.ip, instance_name))
2987

    
2988
    # bridge verification
2989
    bridge = getattr(self.op, "bridge", None)
2990
    if bridge is None:
2991
      self.op.bridge = self.cfg.GetDefBridge()
2992
    else:
2993
      self.op.bridge = bridge
2994

    
2995
    if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
2996
      raise errors.OpPrereqError("target bridge '%s' does not exist on"
2997
                                 " destination node '%s'" %
2998
                                 (self.op.bridge, pnode.name))
2999

    
3000
    if self.op.start:
3001
      self.instance_status = 'up'
3002
    else:
3003
      self.instance_status = 'down'
3004

    
3005
  def Exec(self, feedback_fn):
3006
    """Create and add the instance to the cluster.
3007

3008
    """
3009
    instance = self.op.instance_name
3010
    pnode_name = self.pnode.name
3011

    
3012
    nic = objects.NIC(bridge=self.op.bridge, mac=self.cfg.GenerateMAC())
3013
    if self.inst_ip is not None:
3014
      nic.ip = self.inst_ip
3015

    
3016
    disks = _GenerateDiskTemplate(self.cfg,
3017
                                  self.op.disk_template,
3018
                                  instance, pnode_name,
3019
                                  self.secondaries, self.op.disk_size,
3020
                                  self.op.swap_size)
3021

    
3022
    iobj = objects.Instance(name=instance, os=self.op.os_type,
3023
                            primary_node=pnode_name,
3024
                            memory=self.op.mem_size,
3025
                            vcpus=self.op.vcpus,
3026
                            nics=[nic], disks=disks,
3027
                            disk_template=self.op.disk_template,
3028
                            status=self.instance_status,
3029
                            )
3030

    
3031
    feedback_fn("* creating instance disks...")
3032
    if not _CreateDisks(self.cfg, iobj):
3033
      _RemoveDisks(iobj, self.cfg)
3034
      raise errors.OpExecError("Device creation failed, reverting...")
3035

    
3036
    feedback_fn("adding instance %s to cluster config" % instance)
3037

    
3038
    self.cfg.AddInstance(iobj)
3039

    
3040
    if self.op.wait_for_sync:
3041
      disk_abort = not _WaitForSync(self.cfg, iobj)
3042
    elif iobj.disk_template in constants.DTS_NET_MIRROR:
3043
      # make sure the disks are not degraded (still sync-ing is ok)
3044
      time.sleep(15)
3045
      feedback_fn("* checking mirrors status")
3046
      disk_abort = not _WaitForSync(self.cfg, iobj, oneshot=True)
3047
    else:
3048
      disk_abort = False
3049

    
3050
    if disk_abort:
3051
      _RemoveDisks(iobj, self.cfg)
3052
      self.cfg.RemoveInstance(iobj.name)
3053
      raise errors.OpExecError("There are some degraded disks for"
3054
                               " this instance")
3055

    
3056
    feedback_fn("creating os for instance %s on node %s" %
3057
                (instance, pnode_name))
3058

    
3059
    if iobj.disk_template != constants.DT_DISKLESS:
3060
      if self.op.mode == constants.INSTANCE_CREATE:
3061
        feedback_fn("* running the instance OS create scripts...")
3062
        if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3063
          raise errors.OpExecError("could not add os for instance %s"
3064
                                   " on node %s" %
3065
                                   (instance, pnode_name))
3066

    
3067
      elif self.op.mode == constants.INSTANCE_IMPORT:
3068
        feedback_fn("* running the instance OS import scripts...")
3069
        src_node = self.op.src_node
3070
        src_image = self.src_image
3071
        if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3072
                                                src_node, src_image):
3073
          raise errors.OpExecError("Could not import os for instance"
3074
                                   " %s on node %s" %
3075
                                   (instance, pnode_name))
3076
      else:
3077
        # also checked in the prereq part
3078
        raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3079
                                     % self.op.mode)
3080

    
3081
    if self.op.start:
3082
      logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3083
      feedback_fn("* starting instance...")
3084
      if not rpc.call_instance_start(pnode_name, iobj, None):
3085
        raise errors.OpExecError("Could not start instance")
3086

    
3087

    
3088
class LUConnectConsole(NoHooksLU):
3089
  """Connect to an instance's console.
3090

3091
  This is somewhat special in that it returns the command line that
3092
  you need to run on the master node in order to connect to the
3093
  console.
3094

3095
  """
3096
  _OP_REQP = ["instance_name"]
3097

    
3098
  def CheckPrereq(self):
3099
    """Check prerequisites.
3100

3101
    This checks that the instance is in the cluster.
3102

3103
    """
3104
    instance = self.cfg.GetInstanceInfo(
3105
      self.cfg.ExpandInstanceName(self.op.instance_name))
3106
    if instance is None:
3107
      raise errors.OpPrereqError("Instance '%s' not known" %
3108
                                 self.op.instance_name)
3109
    self.instance = instance
3110

    
3111
  def Exec(self, feedback_fn):
3112
    """Connect to the console of an instance
3113

3114
    """
3115
    instance = self.instance
3116
    node = instance.primary_node
3117

    
3118
    node_insts = rpc.call_instance_list([node])[node]
3119
    if node_insts is False:
3120
      raise errors.OpExecError("Can't connect to node %s." % node)
3121

    
3122
    if instance.name not in node_insts:
3123
      raise errors.OpExecError("Instance %s is not running." % instance.name)
3124

    
3125
    logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3126

    
3127
    hyper = hypervisor.GetHypervisor()
3128
    console_cmd = hyper.GetShellCommandForConsole(instance.name)
3129
    # build ssh cmdline
3130
    argv = ["ssh", "-q", "-t"]
3131
    argv.extend(ssh.KNOWN_HOSTS_OPTS)
3132
    argv.extend(ssh.BATCH_MODE_OPTS)
3133
    argv.append(node)
3134
    argv.append(console_cmd)
3135
    return "ssh", argv
3136

    
3137

    
3138
class LUAddMDDRBDComponent(LogicalUnit):
3139
  """Adda new mirror member to an instance's disk.
3140

3141
  """
3142
  HPATH = "mirror-add"
3143
  HTYPE = constants.HTYPE_INSTANCE
3144
  _OP_REQP = ["instance_name", "remote_node", "disk_name"]
3145

    
3146
  def BuildHooksEnv(self):
3147
    """Build hooks env.
3148

3149
    This runs on the master, the primary and all the secondaries.
3150

3151
    """
3152
    env = {
3153
      "NEW_SECONDARY": self.op.remote_node,
3154
      "DISK_NAME": self.op.disk_name,
3155
      }
3156
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3157
    nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
3158
          self.op.remote_node,] + list(self.instance.secondary_nodes)
3159
    return env, nl, nl
3160

    
3161
  def CheckPrereq(self):
3162
    """Check prerequisites.
3163

3164
    This checks that the instance is in the cluster.
3165

3166
    """
3167
    instance = self.cfg.GetInstanceInfo(
3168
      self.cfg.ExpandInstanceName(self.op.instance_name))
3169
    if instance is None:
3170
      raise errors.OpPrereqError("Instance '%s' not known" %
3171
                                 self.op.instance_name)
3172
    self.instance = instance
3173

    
3174
    remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
3175
    if remote_node is None:
3176
      raise errors.OpPrereqError("Node '%s' not known" % self.op.remote_node)
3177
    self.remote_node = remote_node
3178

    
3179
    if remote_node == instance.primary_node:
3180
      raise errors.OpPrereqError("The specified node is the primary node of"
3181
                                 " the instance.")
3182

    
3183
    if instance.disk_template != constants.DT_REMOTE_RAID1:
3184
      raise errors.OpPrereqError("Instance's disk layout is not"
3185
                                 " remote_raid1.")
3186
    for disk in instance.disks:
3187
      if disk.iv_name == self.op.disk_name:
3188
        break
3189
    else:
3190
      raise errors.OpPrereqError("Can't find this device ('%s') in the"
3191
                                 " instance." % self.op.disk_name)
3192
    if len(disk.children) > 1:
3193
      raise errors.OpPrereqError("The device already has two slave"
3194
                                 " devices.\n"
3195
                                 "This would create a 3-disk raid1"
3196
                                 " which we don't allow.")
3197
    self.disk = disk
3198

    
3199
  def Exec(self, feedback_fn):
3200
    """Add the mirror component
3201

3202
    """
3203
    disk = self.disk
3204
    instance = self.instance
3205

    
3206
    remote_node = self.remote_node
3207
    lv_names = [".%s_%s" % (disk.iv_name, suf) for suf in ["data", "meta"]]
3208
    names = _GenerateUniqueNames(self.cfg, lv_names)
3209
    new_drbd = _GenerateMDDRBDBranch(self.cfg, instance.primary_node,
3210
                                     remote_node, disk.size, names)
3211

    
3212
    logger.Info("adding new mirror component on secondary")
3213
    #HARDCODE
3214
    if not _CreateBlockDevOnSecondary(self.cfg, remote_node, instance,
3215
                                      new_drbd, False,
3216
                                      _GetInstanceInfoText(instance)):
3217
      raise errors.OpExecError("Failed to create new component on secondary"
3218
                               " node %s" % remote_node)
3219

    
3220
    logger.Info("adding new mirror component on primary")
3221
    #HARDCODE
3222
    if not _CreateBlockDevOnPrimary(self.cfg, instance.primary_node,
3223
                                    instance, new_drbd,
3224
                                    _GetInstanceInfoText(instance)):
3225
      # remove secondary dev
3226
      self.cfg.SetDiskID(new_drbd, remote_node)
3227
      rpc.call_blockdev_remove(remote_node, new_drbd)
3228
      raise errors.OpExecError("Failed to create volume on primary")
3229

    
3230
    # the device exists now
3231
    # call the primary node to add the mirror to md
3232
    logger.Info("adding new mirror component to md")
3233
    if not rpc.call_blockdev_addchildren(instance.primary_node,
3234
                                         disk, [new_drbd]):
3235
      logger.Error("Can't add mirror compoment to md!")
3236
      self.cfg.SetDiskID(new_drbd, remote_node)
3237
      if not rpc.call_blockdev_remove(remote_node, new_drbd):
3238
        logger.Error("Can't rollback on secondary")
3239
      self.cfg.SetDiskID(new_drbd, instance.primary_node)
3240
      if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3241
        logger.Error("Can't rollback on primary")
3242
      raise errors.OpExecError("Can't add mirror component to md array")
3243

    
3244
    disk.children.append(new_drbd)
3245

    
3246
    self.cfg.AddInstance(instance)
3247

    
3248
    _WaitForSync(self.cfg, instance)
3249

    
3250
    return 0
3251

    
3252

    
3253
class LURemoveMDDRBDComponent(LogicalUnit):
3254
  """Remove a component from a remote_raid1 disk.
3255

3256
  """
3257
  HPATH = "mirror-remove"
3258
  HTYPE = constants.HTYPE_INSTANCE
3259
  _OP_REQP = ["instance_name", "disk_name", "disk_id"]
3260

    
3261
  def BuildHooksEnv(self):
3262
    """Build hooks env.
3263

3264
    This runs on the master, the primary and all the secondaries.
3265

3266
    """
3267
    env = {
3268
      "DISK_NAME": self.op.disk_name,
3269
      "DISK_ID": self.op.disk_id,
3270
      "OLD_SECONDARY": self.old_secondary,
3271
      }
3272
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3273
    nl = [self.sstore.GetMasterNode(),
3274
          self.instance.primary_node] + list(self.instance.secondary_nodes)
3275
    return env, nl, nl
3276

    
3277
  def CheckPrereq(self):
3278
    """Check prerequisites.
3279

3280
    This checks that the instance is in the cluster.
3281

3282
    """
3283
    instance = self.cfg.GetInstanceInfo(
3284
      self.cfg.ExpandInstanceName(self.op.instance_name))
3285
    if instance is None:
3286
      raise errors.OpPrereqError("Instance '%s' not known" %
3287
                                 self.op.instance_name)
3288
    self.instance = instance
3289

    
3290
    if instance.disk_template != constants.DT_REMOTE_RAID1:
3291
      raise errors.OpPrereqError("Instance's disk layout is not"
3292
                                 " remote_raid1.")
3293
    for disk in instance.disks:
3294
      if disk.iv_name == self.op.disk_name:
3295
        break
3296
    else:
3297
      raise errors.OpPrereqError("Can't find this device ('%s') in the"
3298
                                 " instance." % self.op.disk_name)
3299
    for child in disk.children:
3300
      if (child.dev_type == constants.LD_DRBD7 and
3301
          child.logical_id[2] == self.op.disk_id):
3302
        break
3303
    else:
3304
      raise errors.OpPrereqError("Can't find the device with this port.")
3305

    
3306
    if len(disk.children) < 2:
3307
      raise errors.OpPrereqError("Cannot remove the last component from"
3308
                                 " a mirror.")
3309
    self.disk = disk
3310
    self.child = child
3311
    if self.child.logical_id[0] == instance.primary_node:
3312
      oid = 1
3313
    else:
3314
      oid = 0
3315
    self.old_secondary = self.child.logical_id[oid]
3316

    
3317
  def Exec(self, feedback_fn):
3318
    """Remove the mirror component
3319

3320
    """
3321
    instance = self.instance
3322
    disk = self.disk
3323
    child = self.child
3324
    logger.Info("remove mirror component")
3325
    self.cfg.SetDiskID(disk, instance.primary_node)
3326
    if not rpc.call_blockdev_removechildren(instance.primary_node,
3327
                                            disk, [child]):
3328
      raise errors.OpExecError("Can't remove child from mirror.")
3329

    
3330
    for node in child.logical_id[:2]:
3331
      self.cfg.SetDiskID(child, node)
3332
      if not rpc.call_blockdev_remove(node, child):
3333
        logger.Error("Warning: failed to remove device from node %s,"
3334
                     " continuing operation." % node)
3335

    
3336
    disk.children.remove(child)
3337
    self.cfg.AddInstance(instance)
3338

    
3339

    
3340
class LUReplaceDisks(LogicalUnit):
3341
  """Replace the disks of an instance.
3342

3343
  """
3344
  HPATH = "mirrors-replace"
3345
  HTYPE = constants.HTYPE_INSTANCE
3346
  _OP_REQP = ["instance_name", "mode", "disks"]
3347

    
3348
  def BuildHooksEnv(self):
3349
    """Build hooks env.
3350

3351
    This runs on the master, the primary and all the secondaries.
3352

3353
    """
3354
    env = {
3355
      "MODE": self.op.mode,
3356
      "NEW_SECONDARY": self.op.remote_node,
3357
      "OLD_SECONDARY": self.instance.secondary_nodes[0],
3358
      }
3359
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3360
    nl = [self.sstore.GetMasterNode(),
3361
          self.instance.primary_node] + list(self.instance.secondary_nodes)
3362
    return env, nl, nl
3363

    
3364
  def CheckPrereq(self):
3365
    """Check prerequisites.
3366

3367
    This checks that the instance is in the cluster.
3368

3369
    """
3370
    instance = self.cfg.GetInstanceInfo(
3371
      self.cfg.ExpandInstanceName(self.op.instance_name))
3372
    if instance is None:
3373
      raise errors.OpPrereqError("Instance '%s' not known" %
3374
                                 self.op.instance_name)
3375
    self.instance = instance
3376

    
3377
    if instance.disk_template not in constants.DTS_NET_MIRROR:
3378
      raise errors.OpPrereqError("Instance's disk layout is not"
3379
                                 " network mirrored.")
3380

    
3381
    if len(instance.secondary_nodes) != 1:
3382
      raise errors.OpPrereqError("The instance has a strange layout,"
3383
                                 " expected one secondary but found %d" %
3384
                                 len(instance.secondary_nodes))
3385

    
3386
    self.sec_node = instance.secondary_nodes[0]
3387

    
3388
    remote_node = getattr(self.op, "remote_node", None)
3389
    if remote_node is not None:
3390
      remote_node = self.cfg.ExpandNodeName(remote_node)
3391
      if remote_node is None:
3392
        raise errors.OpPrereqError("Node '%s' not known" %
3393
                                   self.op.remote_node)
3394
      self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3395
    else:
3396
      self.remote_node_info = None
3397
    if remote_node == instance.primary_node:
3398
      raise errors.OpPrereqError("The specified node is the primary node of"
3399
                                 " the instance.")
3400
    elif remote_node == self.sec_node:
3401
      # the user gave the current secondary, switch to
3402
      # 'no-replace-secondary' mode
3403
      remote_node = None
3404
    if (instance.disk_template == constants.DT_REMOTE_RAID1 and
3405
        self.op.mode != constants.REPLACE_DISK_ALL):
3406
      raise errors.OpPrereqError("Template 'remote_raid1' only allows all"
3407
                                 " disks replacement, not individual ones")
3408
    if instance.disk_template == constants.DT_DRBD8:
3409
      if self.op.mode == constants.REPLACE_DISK_ALL:
3410
        raise errors.OpPrereqError("Template 'drbd8' only allows primary or"
3411
                                   " secondary disk replacement, not"
3412
                                   " both at once")
3413
      elif self.op.mode == constants.REPLACE_DISK_PRI:
3414
        if remote_node is not None:
3415
          raise errors.OpPrereqError("Template 'drbd8' does not allow changing"
3416
                                     " the secondary while doing a primary"
3417
                                     " node disk replacement")
3418
        self.tgt_node = instance.primary_node
3419
      elif self.op.mode == constants.REPLACE_DISK_SEC:
3420
        self.new_node = remote_node # this can be None, in which case
3421
                                    # we don't change the secondary
3422
        self.tgt_node = instance.secondary_nodes[0]
3423
      else:
3424
        raise errors.ProgrammerError("Unhandled disk replace mode")
3425

    
3426
    for name in self.op.disks:
3427
      if instance.FindDisk(name) is None:
3428
        raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3429
                                   (name, instance.name))
3430
    self.op.remote_node = remote_node
3431

    
3432
  def _ExecRR1(self, feedback_fn):
3433
    """Replace the disks of an instance.
3434

3435
    """
3436
    instance = self.instance
3437
    iv_names = {}
3438
    # start of work
3439
    if self.op.remote_node is None:
3440
      remote_node = self.sec_node
3441
    else:
3442
      remote_node = self.op.remote_node
3443
    cfg = self.cfg
3444
    for dev in instance.disks:
3445
      size = dev.size
3446
      lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3447
      names = _GenerateUniqueNames(cfg, lv_names)
3448
      new_drbd = _GenerateMDDRBDBranch(cfg, instance.primary_node,
3449
                                       remote_node, size, names)
3450
      iv_names[dev.iv_name] = (dev, dev.children[0], new_drbd)
3451
      logger.Info("adding new mirror component on secondary for %s" %
3452
                  dev.iv_name)
3453
      #HARDCODE
3454
      if not _CreateBlockDevOnSecondary(cfg, remote_node, instance,
3455
                                        new_drbd, False,
3456
                                        _GetInstanceInfoText(instance)):
3457
        raise errors.OpExecError("Failed to create new component on"
3458
                                 " secondary node %s\n"
3459
                                 "Full abort, cleanup manually!" %
3460
                                 remote_node)
3461

    
3462
      logger.Info("adding new mirror component on primary")
3463
      #HARDCODE
3464
      if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
3465
                                      instance, new_drbd,
3466
                                      _GetInstanceInfoText(instance)):
3467
        # remove secondary dev
3468
        cfg.SetDiskID(new_drbd, remote_node)
3469
        rpc.call_blockdev_remove(remote_node, new_drbd)
3470
        raise errors.OpExecError("Failed to create volume on primary!\n"
3471
                                 "Full abort, cleanup manually!!")
3472

    
3473
      # the device exists now
3474
      # call the primary node to add the mirror to md
3475
      logger.Info("adding new mirror component to md")
3476
      if not rpc.call_blockdev_addchildren(instance.primary_node, dev,
3477
                                           [new_drbd]):
3478
        logger.Error("Can't add mirror compoment to md!")
3479
        cfg.SetDiskID(new_drbd, remote_node)
3480
        if not rpc.call_blockdev_remove(remote_node, new_drbd):
3481
          logger.Error("Can't rollback on secondary")
3482
        cfg.SetDiskID(new_drbd, instance.primary_node)
3483
        if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3484
          logger.Error("Can't rollback on primary")
3485
        raise errors.OpExecError("Full abort, cleanup manually!!")
3486

    
3487
      dev.children.append(new_drbd)
3488
      cfg.AddInstance(instance)
3489

    
3490
    # this can fail as the old devices are degraded and _WaitForSync
3491
    # does a combined result over all disks, so we don't check its
3492
    # return value
3493
    _WaitForSync(cfg, instance, unlock=True)
3494

    
3495
    # so check manually all the devices
3496
    for name in iv_names:
3497
      dev, child, new_drbd = iv_names[name]
3498
      cfg.SetDiskID(dev, instance.primary_node)
3499
      is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3500
      if is_degr:
3501
        raise errors.OpExecError("MD device %s is degraded!" % name)
3502
      cfg.SetDiskID(new_drbd, instance.primary_node)
3503
      is_degr = rpc.call_blockdev_find(instance.primary_node, new_drbd)[5]
3504
      if is_degr:
3505
        raise errors.OpExecError("New drbd device %s is degraded!" % name)
3506

    
3507
    for name in iv_names:
3508
      dev, child, new_drbd = iv_names[name]
3509
      logger.Info("remove mirror %s component" % name)
3510
      cfg.SetDiskID(dev, instance.primary_node)
3511
      if not rpc.call_blockdev_removechildren(instance.primary_node,
3512
                                              dev, [child]):
3513
        logger.Error("Can't remove child from mirror, aborting"
3514
                     " *this device cleanup*.\nYou need to cleanup manually!!")
3515
        continue
3516

    
3517
      for node in child.logical_id[:2]:
3518
        logger.Info("remove child device on %s" % node)
3519
        cfg.SetDiskID(child, node)
3520
        if not rpc.call_blockdev_remove(node, child):
3521
          logger.Error("Warning: failed to remove device from node %s,"
3522
                       " continuing operation." % node)
3523

    
3524
      dev.children.remove(child)
3525

    
3526
      cfg.AddInstance(instance)
3527

    
3528
  def _ExecD8DiskOnly(self, feedback_fn):
3529
    """Replace a disk on the primary or secondary for dbrd8.
3530

3531
    The algorithm for replace is quite complicated:
3532
      - for each disk to be replaced:
3533
        - create new LVs on the target node with unique names
3534
        - detach old LVs from the drbd device
3535
        - rename old LVs to name_replaced.<time_t>
3536
        - rename new LVs to old LVs
3537
        - attach the new LVs (with the old names now) to the drbd device
3538
      - wait for sync across all devices
3539
      - for each modified disk:
3540
        - remove old LVs (which have the name name_replaces.<time_t>)
3541

3542
    Failures are not very well handled.
3543
    """
3544
    instance = self.instance
3545
    iv_names = {}
3546
    vgname = self.cfg.GetVGName()
3547
    # start of work
3548
    cfg = self.cfg
3549
    tgt_node = self.tgt_node
3550
    for dev in instance.disks:
3551
      if not dev.iv_name in self.op.disks:
3552
        continue
3553
      size = dev.size
3554
      cfg.SetDiskID(dev, tgt_node)
3555
      lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3556
      names = _GenerateUniqueNames(cfg, lv_names)
3557
      lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3558
                             logical_id=(vgname, names[0]))
3559
      lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3560
                             logical_id=(vgname, names[1]))
3561
      new_lvs = [lv_data, lv_meta]
3562
      old_lvs = dev.children
3563
      iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3564
      logger.Info("adding new local storage on %s for %s" %
3565
                  (tgt_node, dev.iv_name))
3566
      # since we *always* want to create this LV, we use the
3567
      # _Create...OnPrimary (which forces the creation), even if we
3568
      # are talking about the secondary node
3569
      for new_lv in new_lvs:
3570
        if not _CreateBlockDevOnPrimary(cfg, tgt_node, instance, new_lv,
3571
                                        _GetInstanceInfoText(instance)):
3572
          raise errors.OpExecError("Failed to create new LV named '%s' on"
3573
                                   " node '%s'" %
3574
                                   (new_lv.logical_id[1], tgt_node))
3575

    
3576
      if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
3577
        raise errors.OpExecError("Can't detach drbd from local storage on node"
3578
                                 " %s for device %s" % (tgt_node, dev.iv_name))
3579
      dev.children = []
3580
      cfg.Update(instance)
3581

    
3582
      # ok, we created the new LVs, so now we know we have the needed
3583
      # storage; as such, we proceed on the target node to rename
3584
      # old_lv to _old, and new_lv to old_lv; note that we rename LVs
3585
      # using the assumption than logical_id == physical_id (which in
3586
      # turn is the unique_id on that node)
3587
      temp_suffix = int(time.time())
3588
      logger.Info("renaming the old LVs on the target node")
3589
      ren_fn = lambda d, suff: (d.physical_id[0],
3590
                                d.physical_id[1] + "_replaced-%s" % suff)
3591
      rlist = [(disk, ren_fn(disk, temp_suffix)) for disk in old_lvs]
3592
      if not rpc.call_blockdev_rename(tgt_node, rlist):
3593
        logger.Error("Can't rename old LVs on node %s" % tgt_node)
3594
        do_change_old = False
3595
      else:
3596
        do_change_old = True
3597
      # now we rename the new LVs to the old LVs
3598
      logger.Info("renaming the new LVs on the target node")
3599
      rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
3600
      if not rpc.call_blockdev_rename(tgt_node, rlist):
3601
        logger.Error("Can't rename new LVs on node %s" % tgt_node)
3602
      else:
3603
        for old, new in zip(old_lvs, new_lvs):
3604
          new.logical_id = old.logical_id
3605
          cfg.SetDiskID(new, tgt_node)
3606

    
3607
      if do_change_old:
3608
        for disk in old_lvs:
3609
          disk.logical_id = ren_fn(disk, temp_suffix)
3610
          cfg.SetDiskID(disk, tgt_node)
3611

    
3612
      # now that the new lvs have the old name, we can add them to the device
3613
      logger.Info("adding new mirror component on %s" % tgt_node)
3614
      if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
3615
        logger.Error("Can't add local storage to drbd!")
3616
        for new_lv in new_lvs:
3617
          if not rpc.call_blockdev_remove(tgt_node, new_lv):
3618
            logger.Error("Can't rollback device %s")
3619
        return
3620

    
3621
      dev.children = new_lvs
3622
      cfg.Update(instance)
3623

    
3624

    
3625
    # this can fail as the old devices are degraded and _WaitForSync
3626
    # does a combined result over all disks, so we don't check its
3627
    # return value
3628
    logger.Info("Done changing drbd configs, waiting for sync")
3629
    _WaitForSync(cfg, instance, unlock=True)
3630

    
3631
    # so check manually all the devices
3632
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3633
      cfg.SetDiskID(dev, instance.primary_node)
3634
      is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3635
      if is_degr:
3636
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
3637

    
3638
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3639
      logger.Info("remove logical volumes for %s" % name)
3640
      for lv in old_lvs:
3641
        cfg.SetDiskID(lv, tgt_node)
3642
        if not rpc.call_blockdev_remove(tgt_node, lv):
3643
          logger.Error("Can't cleanup child device, skipping. You need to"
3644
                       " fix manually!")
3645
          continue
3646

    
3647
  def _ExecD8Secondary(self, feedback_fn):
3648
    """Replace the secondary node for drbd8.
3649

3650
    The algorithm for replace is quite complicated:
3651
      - for all disks of the instance:
3652
        - create new LVs on the new node with same names
3653
        - shutdown the drbd device on the old secondary
3654
        - disconnect the drbd network on the primary
3655
        - create the drbd device on the new secondary
3656
        - network attach the drbd on the primary, using an artifice:
3657
          the drbd code for Attach() will connect to the network if it
3658
          finds a device which is connected to the good local disks but
3659
          not network enabled
3660
      - wait for sync across all devices
3661
      - remove all disks from the old secondary
3662

3663
    Failures are not very well handled.
3664
    """
3665
    instance = self.instance
3666
    iv_names = {}
3667
    vgname = self.cfg.GetVGName()
3668
    # start of work
3669
    cfg = self.cfg
3670
    old_node = self.tgt_node
3671
    new_node = self.new_node
3672
    pri_node = instance.primary_node
3673
    for dev in instance.disks:
3674
      size = dev.size
3675
      logger.Info("adding new local storage on %s for %s" %
3676
                  (new_node, dev.iv_name))
3677
      # since we *always* want to create this LV, we use the
3678
      # _Create...OnPrimary (which forces the creation), even if we
3679
      # are talking about the secondary node
3680
      for new_lv in dev.children:
3681
        if not _CreateBlockDevOnPrimary(cfg, new_node, instance, new_lv,
3682
                                        _GetInstanceInfoText(instance)):
3683
          raise errors.OpExecError("Failed to create new LV named '%s' on"
3684
                                   " node '%s'" %
3685
                                   (new_lv.logical_id[1], new_node))
3686

    
3687
      # create new devices on new_node
3688
      new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
3689
                              logical_id=(pri_node, new_node,
3690
                                          dev.logical_id[2]),
3691
                              children=dev.children)
3692
      if not _CreateBlockDevOnSecondary(cfg, new_node, instance,
3693
                                        new_drbd, False,
3694
                                      _GetInstanceInfoText(instance)):
3695
        raise errors.OpExecError("Failed to create new DRBD on"
3696
                                 " node '%s'" % new_node)
3697

    
3698
      # we have new devices, shutdown the drbd on the old secondary
3699
      cfg.SetDiskID(dev, old_node)
3700
      if not rpc.call_blockdev_shutdown(old_node, dev):
3701
        raise errors.OpExecError("Failed to shutdown DRBD on old node")
3702

    
3703
      # we have new storage, we 'rename' the network on the primary
3704
      cfg.SetDiskID(dev, pri_node)
3705
      # rename to the ip of the new node
3706
      new_uid = list(dev.physical_id)
3707
      new_uid[2] = self.remote_node_info.secondary_ip
3708
      rlist = [(dev, tuple(new_uid))]
3709
      if not rpc.call_blockdev_rename(pri_node, rlist):
3710
        raise errors.OpExecError("Can't detach re-attach drbd %s on node"
3711
                                 " %s from %s to %s" %
3712
                                 (dev.iv_name, pri_node, old_node, new_node))
3713
      dev.logical_id = (pri_node, new_node, dev.logical_id[2])
3714
      cfg.SetDiskID(dev, pri_node)
3715
      cfg.Update(instance)
3716

    
3717
      iv_names[dev.iv_name] = (dev, dev.children)
3718

    
3719
    # this can fail as the old devices are degraded and _WaitForSync
3720
    # does a combined result over all disks, so we don't check its
3721
    # return value
3722
    logger.Info("Done changing drbd configs, waiting for sync")
3723
    _WaitForSync(cfg, instance, unlock=True)
3724

    
3725
    # so check manually all the devices
3726
    for name, (dev, old_lvs) in iv_names.iteritems():
3727
      cfg.SetDiskID(dev, pri_node)
3728
      is_degr = rpc.call_blockdev_find(pri_node, dev)[5]
3729
      if is_degr:
3730
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
3731

    
3732
    for name, (dev, old_lvs) in iv_names.iteritems():
3733
      logger.Info("remove logical volumes for %s" % name)
3734
      for lv in old_lvs:
3735
        cfg.SetDiskID(lv, old_node)
3736
        if not rpc.call_blockdev_remove(old_node, lv):
3737
          logger.Error("Can't cleanup child device, skipping. You need to"
3738
                       " fix manually!")
3739
          continue
3740

    
3741
  def Exec(self, feedback_fn):
3742
    """Execute disk replacement.
3743

3744
    This dispatches the disk replacement to the appropriate handler.
3745

3746
    """
3747
    instance = self.instance
3748
    if instance.disk_template == constants.DT_REMOTE_RAID1:
3749
      fn = self._ExecRR1
3750
    elif instance.disk_template == constants.DT_DRBD8:
3751
      if self.op.remote_node is None:
3752
        fn = self._ExecD8DiskOnly
3753
      else:
3754
        fn = self._ExecD8Secondary
3755
    else:
3756
      raise errors.ProgrammerError("Unhandled disk replacement case")
3757
    return fn(feedback_fn)
3758

    
3759

    
3760
class LUQueryInstanceData(NoHooksLU):
3761
  """Query runtime instance data.
3762

3763
  """
3764
  _OP_REQP = ["instances"]
3765

    
3766
  def CheckPrereq(self):
3767
    """Check prerequisites.
3768

3769
    This only checks the optional instance list against the existing names.
3770

3771
    """
3772
    if not isinstance(self.op.instances, list):
3773
      raise errors.OpPrereqError("Invalid argument type 'instances'")
3774
    if self.op.instances:
3775
      self.wanted_instances = []
3776
      names = self.op.instances
3777
      for name in names:
3778
        instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
3779
        if instance is None:
3780
          raise errors.OpPrereqError("No such instance name '%s'" % name)
3781
      self.wanted_instances.append(instance)
3782
    else:
3783
      self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
3784
                               in self.cfg.GetInstanceList()]
3785
    return
3786

    
3787

    
3788
  def _ComputeDiskStatus(self, instance, snode, dev):
3789
    """Compute block device status.
3790

3791
    """
3792
    self.cfg.SetDiskID(dev, instance.primary_node)
3793
    dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
3794
    if dev.dev_type in constants.LDS_DRBD:
3795
      # we change the snode then (otherwise we use the one passed in)
3796
      if dev.logical_id[0] == instance.primary_node:
3797
        snode = dev.logical_id[1]
3798
      else:
3799
        snode = dev.logical_id[0]
3800

    
3801
    if snode:
3802
      self.cfg.SetDiskID(dev, snode)
3803
      dev_sstatus = rpc.call_blockdev_find(snode, dev)
3804
    else:
3805
      dev_sstatus = None
3806

    
3807
    if dev.children:
3808
      dev_children = [self._ComputeDiskStatus(instance, snode, child)
3809
                      for child in dev.children]
3810
    else:
3811
      dev_children = []
3812

    
3813
    data = {
3814
      "iv_name": dev.iv_name,
3815
      "dev_type": dev.dev_type,
3816
      "logical_id": dev.logical_id,
3817
      "physical_id": dev.physical_id,
3818
      "pstatus": dev_pstatus,
3819
      "sstatus": dev_sstatus,
3820
      "children": dev_children,
3821
      }
3822

    
3823
    return data
3824

    
3825
  def Exec(self, feedback_fn):
3826
    """Gather and return data"""
3827
    result = {}
3828
    for instance in self.wanted_instances:
3829
      remote_info = rpc.call_instance_info(instance.primary_node,
3830
                                                instance.name)
3831
      if remote_info and "state" in remote_info:
3832
        remote_state = "up"
3833
      else:
3834
        remote_state = "down"
3835
      if instance.status == "down":
3836
        config_state = "down"
3837
      else:
3838
        config_state = "up"
3839

    
3840
      disks = [self._ComputeDiskStatus(instance, None, device)
3841
               for device in instance.disks]
3842

    
3843
      idict = {
3844
        "name": instance.name,
3845
        "config_state": config_state,
3846
        "run_state": remote_state,
3847
        "pnode": instance.primary_node,
3848
        "snodes": instance.secondary_nodes,
3849
        "os": instance.os,
3850
        "memory": instance.memory,
3851
        "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
3852
        "disks": disks,
3853
        "vcpus": instance.vcpus,
3854
        }
3855

    
3856
      result[instance.name] = idict
3857

    
3858
    return result
3859

    
3860

    
3861
class LUSetInstanceParms(LogicalUnit):
3862
  """Modifies an instances's parameters.
3863

3864
  """
3865
  HPATH = "instance-modify"
3866
  HTYPE = constants.HTYPE_INSTANCE
3867
  _OP_REQP = ["instance_name"]
3868

    
3869
  def BuildHooksEnv(self):
3870
    """Build hooks env.
3871

3872
    This runs on the master, primary and secondaries.
3873

3874
    """
3875
    args = dict()
3876
    if self.mem:
3877
      args['memory'] = self.mem
3878
    if self.vcpus:
3879
      args['vcpus'] = self.vcpus
3880
    if self.do_ip or self.do_bridge:
3881
      if self.do_ip:
3882
        ip = self.ip
3883
      else:
3884
        ip = self.instance.nics[0].ip
3885
      if self.bridge:
3886
        bridge = self.bridge
3887
      else:
3888
        bridge = self.instance.nics[0].bridge
3889
      args['nics'] = [(ip, bridge)]
3890
    env = _BuildInstanceHookEnvByObject(self.instance, override=args)
3891
    nl = [self.sstore.GetMasterNode(),
3892
          self.instance.primary_node] + list(self.instance.secondary_nodes)
3893
    return env, nl, nl
3894

    
3895
  def CheckPrereq(self):
3896
    """Check prerequisites.
3897

3898
    This only checks the instance list against the existing names.
3899

3900
    """
3901
    self.mem = getattr(self.op, "mem", None)
3902
    self.vcpus = getattr(self.op, "vcpus", None)
3903
    self.ip = getattr(self.op, "ip", None)
3904
    self.bridge = getattr(self.op, "bridge", None)
3905
    if [self.mem, self.vcpus, self.ip, self.bridge].count(None) == 4:
3906
      raise errors.OpPrereqError("No changes submitted")
3907
    if self.mem is not None:
3908
      try:
3909
        self.mem = int(self.mem)
3910
      except ValueError, err:
3911
        raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
3912
    if self.vcpus is not None:
3913
      try:
3914
        self.vcpus = int(self.vcpus)
3915
      except ValueError, err:
3916
        raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
3917
    if self.ip is not None:
3918
      self.do_ip = True
3919
      if self.ip.lower() == "none":
3920
        self.ip = None
3921
      else:
3922
        if not utils.IsValidIP(self.ip):
3923
          raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
3924
    else:
3925
      self.do_ip = False
3926
    self.do_bridge = (self.bridge is not None)
3927

    
3928
    instance = self.cfg.GetInstanceInfo(
3929
      self.cfg.ExpandInstanceName(self.op.instance_name))
3930
    if instance is None:
3931
      raise errors.OpPrereqError("No such instance name '%s'" %
3932
                                 self.op.instance_name)
3933
    self.op.instance_name = instance.name
3934
    self.instance = instance
3935
    return
3936

    
3937
  def Exec(self, feedback_fn):
3938
    """Modifies an instance.
3939

3940
    All parameters take effect only at the next restart of the instance.
3941
    """
3942
    result = []
3943
    instance = self.instance
3944
    if self.mem:
3945
      instance.memory = self.mem
3946
      result.append(("mem", self.mem))
3947
    if self.vcpus:
3948
      instance.vcpus = self.vcpus
3949
      result.append(("vcpus",  self.vcpus))
3950
    if self.do_ip:
3951
      instance.nics[0].ip = self.ip
3952
      result.append(("ip", self.ip))
3953
    if self.bridge:
3954
      instance.nics[0].bridge = self.bridge
3955
      result.append(("bridge", self.bridge))
3956

    
3957
    self.cfg.AddInstance(instance)
3958

    
3959
    return result
3960

    
3961

    
3962
class LUQueryExports(NoHooksLU):
3963
  """Query the exports list
3964

3965
  """
3966
  _OP_REQP = []
3967

    
3968
  def CheckPrereq(self):
3969
    """Check that the nodelist contains only existing nodes.
3970

3971
    """
3972
    self.nodes = _GetWantedNodes(self, getattr(self.op, "nodes", None))
3973

    
3974
  def Exec(self, feedback_fn):
3975
    """Compute the list of all the exported system images.
3976

3977
    Returns:
3978
      a dictionary with the structure node->(export-list)
3979
      where export-list is a list of the instances exported on
3980
      that node.
3981

3982
    """
3983
    return rpc.call_export_list(self.nodes)
3984

    
3985

    
3986
class LUExportInstance(LogicalUnit):
3987
  """Export an instance to an image in the cluster.
3988

3989
  """
3990
  HPATH = "instance-export"
3991
  HTYPE = constants.HTYPE_INSTANCE
3992
  _OP_REQP = ["instance_name", "target_node", "shutdown"]
3993

    
3994
  def BuildHooksEnv(self):
3995
    """Build hooks env.
3996

3997
    This will run on the master, primary node and target node.
3998

3999
    """
4000
    env = {
4001
      "EXPORT_NODE": self.op.target_node,
4002
      "EXPORT_DO_SHUTDOWN": self.op.shutdown,
4003
      }
4004
    env.update(_BuildInstanceHookEnvByObject(self.instance))
4005
    nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
4006
          self.op.target_node]
4007
    return env, nl, nl
4008

    
4009
  def CheckPrereq(self):
4010
    """Check prerequisites.
4011

4012
    This checks that the instance name is a valid one.
4013

4014
    """
4015
    instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4016
    self.instance = self.cfg.GetInstanceInfo(instance_name)
4017
    if self.instance is None:
4018
      raise errors.OpPrereqError("Instance '%s' not found" %
4019
                                 self.op.instance_name)
4020

    
4021
    # node verification
4022
    dst_node_short = self.cfg.ExpandNodeName(self.op.target_node)
4023
    self.dst_node = self.cfg.GetNodeInfo(dst_node_short)
4024

    
4025
    if self.dst_node is None:
4026
      raise errors.OpPrereqError("Destination node '%s' is unknown." %
4027
                                 self.op.target_node)
4028
    self.op.target_node = self.dst_node.name
4029

    
4030
  def Exec(self, feedback_fn):
4031
    """Export an instance to an image in the cluster.
4032

4033
    """
4034
    instance = self.instance
4035
    dst_node = self.dst_node
4036
    src_node = instance.primary_node
4037
    # shutdown the instance, unless requested not to do so
4038
    if self.op.shutdown:
4039
      op = opcodes.OpShutdownInstance(instance_name=instance.name)
4040
      self.processor.ChainOpCode(op)
4041

    
4042
    vgname = self.cfg.GetVGName()
4043

    
4044
    snap_disks = []
4045

    
4046
    try:
4047
      for disk in instance.disks:
4048
        if disk.iv_name == "sda":
4049
          # new_dev_name will be a snapshot of an lvm leaf of the one we passed
4050
          new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
4051

    
4052
          if not new_dev_name:
4053
            logger.Error("could not snapshot block device %s on node %s" %
4054
                         (disk.logical_id[1], src_node))
4055
          else:
4056
            new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
4057
                                      logical_id=(vgname, new_dev_name),
4058
                                      physical_id=(vgname, new_dev_name),
4059
                                      iv_name=disk.iv_name)
4060
            snap_disks.append(new_dev)
4061

    
4062
    finally:
4063
      if self.op.shutdown:
4064
        op = opcodes.OpStartupInstance(instance_name=instance.name,
4065
                                       force=False)
4066
        self.processor.ChainOpCode(op)
4067

    
4068
    # TODO: check for size
4069

    
4070
    for dev in snap_disks:
4071
      if not rpc.call_snapshot_export(src_node, dev, dst_node.name,
4072
                                           instance):
4073
        logger.Error("could not export block device %s from node"
4074
                     " %s to node %s" %
4075
                     (dev.logical_id[1], src_node, dst_node.name))
4076
      if not rpc.call_blockdev_remove(src_node, dev):
4077
        logger.Error("could not remove snapshot block device %s from"
4078
                     " node %s" % (dev.logical_id[1], src_node))
4079

    
4080
    if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
4081
      logger.Error("could not finalize export for instance %s on node %s" %
4082
                   (instance.name, dst_node.name))
4083

    
4084
    nodelist = self.cfg.GetNodeList()
4085
    nodelist.remove(dst_node.name)
4086

    
4087
    # on one-node clusters nodelist will be empty after the removal
4088
    # if we proceed the backup would be removed because OpQueryExports
4089
    # substitutes an empty list with the full cluster node list.
4090
    if nodelist:
4091
      op = opcodes.OpQueryExports(nodes=nodelist)
4092
      exportlist = self.processor.ChainOpCode(op)
4093
      for node in exportlist:
4094
        if instance.name in exportlist[node]:
4095
          if not rpc.call_export_remove(node, instance.name):
4096
            logger.Error("could not remove older export for instance %s"
4097
                         " on node %s" % (instance.name, node))
4098

    
4099

    
4100
class TagsLU(NoHooksLU):
4101
  """Generic tags LU.
4102

4103
  This is an abstract class which is the parent of all the other tags LUs.
4104

4105
  """
4106
  def CheckPrereq(self):
4107
    """Check prerequisites.
4108

4109
    """
4110
    if self.op.kind == constants.TAG_CLUSTER:
4111
      self.target = self.cfg.GetClusterInfo()
4112
    elif self.op.kind == constants.TAG_NODE:
4113
      name = self.cfg.ExpandNodeName(self.op.name)
4114
      if name is None:
4115
        raise errors.OpPrereqError("Invalid node name (%s)" %
4116
                                   (self.op.name,))
4117
      self.op.name = name
4118
      self.target = self.cfg.GetNodeInfo(name)
4119
    elif self.op.kind == constants.TAG_INSTANCE:
4120
      name = self.cfg.ExpandInstanceName(self.op.name)
4121
      if name is None:
4122
        raise errors.OpPrereqError("Invalid instance name (%s)" %
4123
                                   (self.op.name,))
4124
      self.op.name = name
4125
      self.target = self.cfg.GetInstanceInfo(name)
4126
    else:
4127
      raise errors.OpPrereqError("Wrong tag type requested (%s)" %
4128
                                 str(self.op.kind))
4129

    
4130

    
4131
class LUGetTags(TagsLU):
4132
  """Returns the tags of a given object.
4133

4134
  """
4135
  _OP_REQP = ["kind", "name"]
4136

    
4137
  def Exec(self, feedback_fn):
4138
    """Returns the tag list.
4139

4140
    """
4141
    return self.target.GetTags()
4142

    
4143

    
4144
class LUSearchTags(NoHooksLU):
4145
  """Searches the tags for a given pattern.
4146

4147
  """
4148
  _OP_REQP = ["pattern"]
4149

    
4150
  def CheckPrereq(self):
4151
    """Check prerequisites.
4152

4153
    This checks the pattern passed for validity by compiling it.
4154

4155
    """
4156
    try:
4157
      self.re = re.compile(self.op.pattern)
4158
    except re.error, err:
4159
      raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
4160
                                 (self.op.pattern, err))
4161

    
4162
  def Exec(self, feedback_fn):
4163
    """Returns the tag list.
4164

4165
    """
4166
    cfg = self.cfg
4167
    tgts = [("/cluster", cfg.GetClusterInfo())]
4168
    ilist = [cfg.GetInstanceInfo(name) for name in cfg.GetInstanceList()]
4169
    tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
4170
    nlist = [cfg.GetNodeInfo(name) for name in cfg.GetNodeList()]
4171
    tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
4172
    results = []
4173
    for path, target in tgts:
4174
      for tag in target.GetTags():
4175
        if self.re.search(tag):
4176
          results.append((path, tag))
4177
    return results
4178

    
4179

    
4180
class LUAddTags(TagsLU):
4181
  """Sets a tag on a given object.
4182

4183
  """
4184
  _OP_REQP = ["kind", "name", "tags"]
4185

    
4186
  def CheckPrereq(self):
4187
    """Check prerequisites.
4188

4189
    This checks the type and length of the tag name and value.
4190

4191
    """
4192
    TagsLU.CheckPrereq(self)
4193
    for tag in self.op.tags:
4194
      objects.TaggableObject.ValidateTag(tag)
4195

    
4196
  def Exec(self, feedback_fn):
4197
    """Sets the tag.
4198

4199
    """
4200
    try:
4201
      for tag in self.op.tags:
4202
        self.target.AddTag(tag)
4203
    except errors.TagError, err:
4204
      raise errors.OpExecError("Error while setting tag: %s" % str(err))
4205
    try:
4206
      self.cfg.Update(self.target)
4207
    except errors.ConfigurationError:
4208
      raise errors.OpRetryError("There has been a modification to the"
4209
                                " config file and the operation has been"
4210
                                " aborted. Please retry.")
4211

    
4212

    
4213
class LUDelTags(TagsLU):
4214
  """Delete a list of tags from a given object.
4215

4216
  """
4217
  _OP_REQP = ["kind", "name", "tags"]
4218

    
4219
  def CheckPrereq(self):
4220
    """Check prerequisites.
4221

4222
    This checks that we have the given tag.
4223

4224
    """
4225
    TagsLU.CheckPrereq(self)
4226
    for tag in self.op.tags:
4227
      objects.TaggableObject.ValidateTag(tag)
4228
    del_tags = frozenset(self.op.tags)
4229
    cur_tags = self.target.GetTags()
4230
    if not del_tags <= cur_tags:
4231
      diff_tags = del_tags - cur_tags
4232
      diff_names = ["'%s'" % tag for tag in diff_tags]
4233
      diff_names.sort()
4234
      raise errors.OpPrereqError("Tag(s) %s not found" %
4235
                                 (",".join(diff_names)))
4236

    
4237
  def Exec(self, feedback_fn):
4238
    """Remove the tag from the object.
4239

4240
    """
4241
    for tag in self.op.tags:
4242
      self.target.RemoveTag(tag)
4243
    try:
4244
      self.cfg.Update(self.target)
4245
    except errors.ConfigurationError:
4246
      raise errors.OpRetryError("There has been a modification to the"
4247
                                " config file and the operation has been"
4248
                                " aborted. Please retry.")